Skip to content

Mobility

The mobility subsystem currently exposes two handler families:

  • MasslessMobilityHandler: target-based motion with instantaneous speed changes and no inertia.
  • DynamicVelocityMobilityHandler: dynamic velocity mobility with velocity commands, acceleration limits, and velocity telemetry.

For a conceptual explanation of how protocols, mobility commands, handlers, and telemetry fit together, see How Mobility Works.

For the dynamic velocity mobility API, see Dynamic Velocity Mobility Handler.

gradysim.simulator.handler.mobility

Mobility handlers introduce mobility to nodes in the simulation. Different mobility handlers can implement different mobility models, from simple static nodes to complex models that simulate real-world movement patterns. They are responsible for updating the position of nodes based on mobility commands received from protocols and for sending updated position telemetry back to the protocols.

DynamicVelocityMobilityConfiguration dataclass

Configuration parameters for the Dynamic Velocity Mobility Handler.

Attributes:

Name Type Description
update_rate float

Time interval (in seconds) between mobility updates. Typical for trajectory-level quadrotor sims: 0.01–0.05 s.

max_speed_xy float

Maximum horizontal speed (m/s). Applied to the norm of (vx, vy). Typical: 3–15 m/s (depends heavily on vehicle and mission).

max_speed_z float

Maximum vertical speed (m/s). Applied to |vz|. Typical: 1–8 m/s.

max_acc_xy float

Maximum horizontal acceleration (m/s²). Applied to the norm of (ax, ay). Typical: 2–8 m/s².

max_acc_z float

Maximum vertical acceleration (m/s²). Applied to |az|. Typical: 2–10 m/s².

tau_xy Optional[float]

Optional first-order time constant (seconds) for horizontal velocity tracking. When set, the model behaves like a 1st-order system + acceleration saturation: a* = (v_des - v) / tau_xy Typical: 0.3–0.8 s.

tau_z Optional[float]

Optional first-order time constant (seconds) for vertical velocity tracking. Typical: 0.5–1.2 s.

send_telemetry bool

If True, emit Telemetry messages after position updates.

telemetry_decimation int

Emit telemetry every N mobility updates (default: 1).

Source code in gradysim/simulator/handler/mobility/dynamic_velocity/config.py
@dataclass
class DynamicVelocityMobilityConfiguration:
    """
    Configuration parameters for the Dynamic Velocity Mobility Handler.

    Attributes:
        update_rate: Time interval (in seconds) between mobility updates.
            Typical for trajectory-level quadrotor sims: 0.01–0.05 s.
        max_speed_xy: Maximum horizontal speed (m/s). Applied to the norm of (vx, vy).
            Typical: 3–15 m/s (depends heavily on vehicle and mission).
        max_speed_z: Maximum vertical speed (m/s). Applied to |vz|.
            Typical: 1–8 m/s.
        max_acc_xy: Maximum horizontal acceleration (m/s²). Applied to the norm of (ax, ay).
            Typical: 2–8 m/s².
        max_acc_z: Maximum vertical acceleration (m/s²). Applied to |az|.
            Typical: 2–10 m/s².
        tau_xy: Optional first-order time constant (seconds) for horizontal velocity tracking.
            When set, the model behaves like a 1st-order system + acceleration saturation:
                a* = (v_des - v) / tau_xy
            Typical: 0.3–0.8 s.
        tau_z: Optional first-order time constant (seconds) for vertical velocity tracking.
            Typical: 0.5–1.2 s.
        send_telemetry: If True, emit Telemetry messages after position updates.
        telemetry_decimation: Emit telemetry every N mobility updates (default: 1).
    """
    update_rate: float
    max_speed_xy: float
    max_speed_z: float
    max_acc_xy: float
    max_acc_z: float
    tau_xy: Optional[float] = None
    tau_z: Optional[float] = None
    send_telemetry: bool = True
    telemetry_decimation: int = 1

DynamicVelocityMobilityHandler

Bases: INodeHandler

Dynamic Velocity Mobility Handler for GrADyS-SIM NG.

This handler updates node positions based on velocity commands with realistic physical constraints (velocity and acceleration limits).

Key features:

  • Direct velocity control (no waypoints)
  • Independent horizontal and vertical constraints
  • Acceleration-limited velocity tracking (optionally with 1st-order lag via tau)
  • Optional telemetry emission
Usage

config = DynamicVelocityMobilityConfiguration( update_rate=0.1, max_speed_xy=10.0, max_speed_z=5.0, max_acc_xy=2.0, max_acc_z=1.0 ) handler = DynamicVelocityMobilityHandler(config)

In your protocol handler:

handler.set_velocity(node.identifier, (vx, vy, vz))

Source code in gradysim/simulator/handler/mobility/dynamic_velocity/handler.py
class DynamicVelocityMobilityHandler(INodeHandler):
    """
    Dynamic Velocity Mobility Handler for GrADyS-SIM NG.

    This handler updates node positions based on velocity commands with
    realistic physical constraints (velocity and acceleration limits).

    Key features:

    - Direct velocity control (no waypoints)
    - Independent horizontal and vertical constraints
    - Acceleration-limited velocity tracking (optionally with 1st-order lag via tau)
    - Optional telemetry emission

    Usage:
        config = DynamicVelocityMobilityConfiguration(
            update_rate=0.1,
            max_speed_xy=10.0,
            max_speed_z=5.0,
            max_acc_xy=2.0,
            max_acc_z=1.0
        )
        handler = DynamicVelocityMobilityHandler(config)

        # In your protocol handler:
        handler.set_velocity(node.identifier, (vx, vy, vz))
    """

    def __init__(self, config: DynamicVelocityMobilityConfiguration):
        """
        Initialize the Dynamic Velocity Mobility Handler.

        Args:
            config: Configuration parameters for mobility constraints and update rate.
        """
        self._config = config
        self._loop: EventLoop = None
        self._nodes: Dict[int, Node] = {}  # Track all registered nodes

        # Per-node state: current velocity and desired velocity
        self._current_velocity: Dict[int, Tuple[float, float, float]] = {}
        self._desired_velocity: Dict[int, Tuple[float, float, float]] = {}

        # Telemetry tracking: count updates per node
        self._update_counter: Dict[int, int] = {}

    @staticmethod
    def get_label() -> str:
        return "mobility"

    def register_node(self, node: Node):
        """
        Register a node with this handler (called when node is created).

        Args:
            node: The node instance to register.
        """
        # Initialize state for this node
        node_id = node.id
        self._nodes[node_id] = node
        self._current_velocity[node_id] = (0.0, 0.0, 0.0)
        self._desired_velocity[node_id] = (0.0, 0.0, 0.0)
        self._update_counter[node_id] = 0

    def inject(self, event_loop: EventLoop):
        """
        Inject the event loop into the handler.

        Args:
            event_loop: The event loop for scheduling periodic updates.
        """
        self._loop = event_loop

    def initialize(self):
        """
        Start the periodic mobility update loop.

        Schedules the first mobility update event for all nodes.
        """
        if self._nodes:  # Only schedule if we have nodes
            self._loop.schedule_event(
                self._loop.current_time + self._config.update_rate,
                self._mobility_update
            )

    def finalize(self):
        """Finalize handler after simulation ends (not used by this handler)."""
        pass

    def after_simulation_step(self, iteration: int, time: float):
        """
        Called after each simulation step.

        Args:
            iteration: Current simulation iteration number
            time: Current simulation time
        """
        pass

    def handle_command(self, command: MobilityCommand, node: Node):
        """
        Performs a mobility command. This method is called by the node's
        provider to transmit it's mobility command to the mobility handler.

        Args:
            command: Command being issued
            node: Node that issued the command
        """
        if node.id not in self._nodes:
            raise Exception("Error handling commands: Cannot handle command from unregistered node")

        if command.command_type == MobilityCommandType.SET_SPEED:
            if command.param_1 is None or command.param_2 is None or command.param_3 is None:
                raise Exception("Velocity SET_SPEED command requires three parameters (vx, vy, vz). "
                                "Make sure that the command is properly constructed using SetVelocityMobilityCommand.")
            self._set_velocity(node.id, (command.param_1, command.param_2, command.param_3))
        else:
            raise Exception("The only command accepted in DynamicVelocityMobilityHandler is SET_SPEED")

    def _set_velocity(self, node_id: int, v_des: Tuple[float, float, float]) -> None:
        """
        Set the desired velocity for a node.

        The velocity persists until updated by a new command. To stop a node,
        explicitly command zero velocity: (0.0, 0.0, 0.0).

        Args:
            node_id: Identifier of the node to control.
            v_des: Desired velocity as (vx, vy, vz) in m/s.

        Example:
            # Move northeast at 5 m/s horizontally, ascending at 2 m/s
            handler.set_velocity(node.id, (3.54, 3.54, 2.0))

            # Stop the node
            handler.set_velocity(node.id, (0.0, 0.0, 0.0))
        """
        if node_id not in self._desired_velocity:
            # Initialize state if node not yet known
            self._current_velocity[node_id] = (0.0, 0.0, 0.0)
            self._desired_velocity[node_id] = v_des
            self._update_counter[node_id] = 0
        else:
            self._desired_velocity[node_id] = v_des

    def _get_node_velocity(self, node_id: int) -> Optional[Tuple[float, float, float]]:
        """
        Get the current velocity of a node.

        Args:
            node_id: ID of the node

        Returns:
            Velocity vector (vx, vy, vz) in m/s, or None if node not registered
        """
        return self._current_velocity.get(node_id)

    def _get_node_position(self, node_id: int) -> Optional[Tuple[float, float, float]]:
        """
        Get the current position of a node.

        Args:
            node_id: ID of the node

        Returns:
            Position vector (x, y, z) in meters, or None if node not registered
        """
        node = self._nodes.get(node_id)
        return node.position if node is not None else None

    def _mobility_update(self):
        """
        Perform a single mobility update step for all nodes.

        This method:
        1. Iterates through all registered nodes
        2. Retrieves the current and desired velocities for each
        3. Applies acceleration limits to compute the new velocity
        4. Applies velocity saturation
        5. Updates the node position using numerical integration
        6. Optionally emits telemetry
        7. Schedules the next update
        """
        dt = self._config.update_rate

        # Update all nodes
        for node_id, node in self._nodes.items():
            # Get current and desired velocities
            v_current = self._current_velocity[node_id]
            v_desired = self._desired_velocity[node_id]

            # UAV trajectory model:
            # - The protocol commands a desired velocity v_desired.
            # - The vehicle tracks it with bounded acceleration.
            # - Optionally, a 1st-order lag (time constant tau) shapes the transient:
            #       a* = (v_des - v) / tau
            #   then a is saturated and integrated.
            if self._config.tau_xy is None and self._config.tau_z is None:
                v_new = apply_acceleration_limits(
                    v_current,
                    v_desired,
                    dt,
                    self._config.max_acc_xy,
                    self._config.max_acc_z,
                )
            else:
                v_new = apply_velocity_tracking_first_order(
                    v_current,
                    v_desired,
                    dt,
                    self._config.max_acc_xy,
                    self._config.max_acc_z,
                    tau_xy=self._config.tau_xy,
                    tau_z=self._config.tau_z,
                )

            # Apply velocity saturation
            v_new = apply_velocity_limits(
                v_new,
                self._config.max_speed_xy,
                self._config.max_speed_z
            )

            # Update position: x_{k+1} = x_k + v_k * dt
            new_pos = integrate_position(node.position, v_new, dt)
            node.position = new_pos

            # Update stored velocity
            self._current_velocity[node_id] = v_new

            # Update counter and emit telemetry if needed
            self._update_counter[node_id] += 1
            if self._should_emit_telemetry(node_id):
                self._emit_telemetry(node)

        # Schedule next update
        self._loop.schedule_event(
            self._loop.current_time + self._config.update_rate,
            self._mobility_update
        )

    def _should_emit_telemetry(self, node_id: int) -> bool:
        """
        Determine if telemetry should be emitted on this update.

        Args:
            node_id: Identifier of the node.

        Returns:
            True if telemetry should be emitted, False otherwise.
        """
        if not self._config.send_telemetry:
            return False

        count = self._update_counter[node_id]
        return (count % self._config.telemetry_decimation) == 0

    def _emit_telemetry(self, node: Node):
        """
        Emit a Telemetry message with the current node position.

        Args:
            node: The node to emit telemetry for.

        The telemetry is sent directly to the node's protocol encapsulator.
        """
        telemetry = DynamicVelocityTelemetry(
            current_position=node.position,
            current_velocity=self._get_node_velocity(node.id),
        )

        # Schedule telemetry delivery to protocol (same pattern as MobilityHandler)
        def send_telemetry():
            node.protocol_encapsulator.handle_telemetry(telemetry)

        self._loop.schedule_event(
            self._loop.current_time,
            send_telemetry,
            f"Node {node.id} handle_telemetry"
        )

__init__(config)

Initialize the Dynamic Velocity Mobility Handler.

Parameters:

Name Type Description Default
config DynamicVelocityMobilityConfiguration

Configuration parameters for mobility constraints and update rate.

required
Source code in gradysim/simulator/handler/mobility/dynamic_velocity/handler.py
def __init__(self, config: DynamicVelocityMobilityConfiguration):
    """
    Initialize the Dynamic Velocity Mobility Handler.

    Args:
        config: Configuration parameters for mobility constraints and update rate.
    """
    self._config = config
    self._loop: EventLoop = None
    self._nodes: Dict[int, Node] = {}  # Track all registered nodes

    # Per-node state: current velocity and desired velocity
    self._current_velocity: Dict[int, Tuple[float, float, float]] = {}
    self._desired_velocity: Dict[int, Tuple[float, float, float]] = {}

    # Telemetry tracking: count updates per node
    self._update_counter: Dict[int, int] = {}

after_simulation_step(iteration, time)

Called after each simulation step.

Parameters:

Name Type Description Default
iteration int

Current simulation iteration number

required
time float

Current simulation time

required
Source code in gradysim/simulator/handler/mobility/dynamic_velocity/handler.py
def after_simulation_step(self, iteration: int, time: float):
    """
    Called after each simulation step.

    Args:
        iteration: Current simulation iteration number
        time: Current simulation time
    """
    pass

finalize()

Finalize handler after simulation ends (not used by this handler).

Source code in gradysim/simulator/handler/mobility/dynamic_velocity/handler.py
def finalize(self):
    """Finalize handler after simulation ends (not used by this handler)."""
    pass

handle_command(command, node)

Performs a mobility command. This method is called by the node's provider to transmit it's mobility command to the mobility handler.

Parameters:

Name Type Description Default
command MobilityCommand

Command being issued

required
node Node

Node that issued the command

required
Source code in gradysim/simulator/handler/mobility/dynamic_velocity/handler.py
def handle_command(self, command: MobilityCommand, node: Node):
    """
    Performs a mobility command. This method is called by the node's
    provider to transmit it's mobility command to the mobility handler.

    Args:
        command: Command being issued
        node: Node that issued the command
    """
    if node.id not in self._nodes:
        raise Exception("Error handling commands: Cannot handle command from unregistered node")

    if command.command_type == MobilityCommandType.SET_SPEED:
        if command.param_1 is None or command.param_2 is None or command.param_3 is None:
            raise Exception("Velocity SET_SPEED command requires three parameters (vx, vy, vz). "
                            "Make sure that the command is properly constructed using SetVelocityMobilityCommand.")
        self._set_velocity(node.id, (command.param_1, command.param_2, command.param_3))
    else:
        raise Exception("The only command accepted in DynamicVelocityMobilityHandler is SET_SPEED")

initialize()

Start the periodic mobility update loop.

Schedules the first mobility update event for all nodes.

Source code in gradysim/simulator/handler/mobility/dynamic_velocity/handler.py
def initialize(self):
    """
    Start the periodic mobility update loop.

    Schedules the first mobility update event for all nodes.
    """
    if self._nodes:  # Only schedule if we have nodes
        self._loop.schedule_event(
            self._loop.current_time + self._config.update_rate,
            self._mobility_update
        )

inject(event_loop)

Inject the event loop into the handler.

Parameters:

Name Type Description Default
event_loop EventLoop

The event loop for scheduling periodic updates.

required
Source code in gradysim/simulator/handler/mobility/dynamic_velocity/handler.py
def inject(self, event_loop: EventLoop):
    """
    Inject the event loop into the handler.

    Args:
        event_loop: The event loop for scheduling periodic updates.
    """
    self._loop = event_loop

register_node(node)

Register a node with this handler (called when node is created).

Parameters:

Name Type Description Default
node Node

The node instance to register.

required
Source code in gradysim/simulator/handler/mobility/dynamic_velocity/handler.py
def register_node(self, node: Node):
    """
    Register a node with this handler (called when node is created).

    Args:
        node: The node instance to register.
    """
    # Initialize state for this node
    node_id = node.id
    self._nodes[node_id] = node
    self._current_velocity[node_id] = (0.0, 0.0, 0.0)
    self._desired_velocity[node_id] = (0.0, 0.0, 0.0)
    self._update_counter[node_id] = 0

MasslessMobilityConfiguration dataclass

Configuration class for the mobility handler

Source code in gradysim/simulator/handler/mobility/massless.py
@dataclass
class MasslessMobilityConfiguration:
    """
    Configuration class for the mobility handler
    """

    update_rate: float = 0.01
    """Interval in simulation seconds between mobility updates"""

    default_speed: float = 10
    """This is the default speed of a node in m/s"""

    reference_coordinates: Tuple[float, float, float] = (0, 0, 0)
    """
    These coordinates are used as a reference frame to convert geographical coordinates to cartesian coordinates. They
    will be used as the center of the scene and all geographical coordinates will be converted relative to it.
    """

default_speed: float = 10 class-attribute instance-attribute

This is the default speed of a node in m/s

reference_coordinates: Tuple[float, float, float] = (0, 0, 0) class-attribute instance-attribute

These coordinates are used as a reference frame to convert geographical coordinates to cartesian coordinates. They will be used as the center of the scene and all geographical coordinates will be converted relative to it.

update_rate: float = 0.01 class-attribute instance-attribute

Interval in simulation seconds between mobility updates

MasslessMobilityHandler

Bases: INodeHandler

Introduces mobility into the simulation. Simulates nodes as massless and dimensionless points that can move freely in 3D space. There is no concept of acceleration or inertia, all changes in velocity are instantaneous.

Works by registering a regular event that updates every node's position based on its target and speed. A protocol is capable of altering its own speed and heading by sending mobility commands through its provider. These commands will reach the mobility handler which will update the node's target and speed accordingly. Nodes also receive telemetry updates containing information pertaining a node's current mobility status.

Source code in gradysim/simulator/handler/mobility/massless.py
class MasslessMobilityHandler(INodeHandler):
    """
    Introduces mobility into the simulation. Simulates nodes as massless and dimensionless points that can move
    freely in 3D space. There is no concept of acceleration or inertia, all changes in velocity are instantaneous.

    Works by registering a regular event that updates every node's position based on its target and speed. A protocol
    is capable of altering its own speed and heading by sending mobility commands through its provider. These commands
    will reach the mobility handler which will update the node's target and speed accordingly. Nodes also receive
    telemetry updates containing information pertaining a node's current mobility status.
    """

    @staticmethod
    def get_label() -> str:
        return "mobility"

    _event_loop: EventLoop

    nodes: Dict[int, Node]
    targets: Dict[int, Position]
    speeds: Dict[int, float]

    def __init__(self, configuration: MasslessMobilityConfiguration = MasslessMobilityConfiguration()):
        """
        Constructor for the mobility handler

        Args:
            configuration: Configuration for the mobility handler. If not set all default values will be used.
        """
        self._configuration = configuration
        self.nodes = {}
        self.targets = {}
        self.speeds = {}
        self._injected = False

    def inject(self, event_loop: EventLoop):
        self._injected = True
        self._event_loop = event_loop

        event_loop.schedule_event(event_loop.current_time + self._configuration.update_rate,
                                  self._update_movement,
                                  "Mobility")

    def register_node(self, node: Node):
        if not self._injected:
            raise MasslessMobilityException("Error registering node: cannot register nodes while mobility handler "
                                    "is uninitialized.")

        self.nodes[node.id] = node
        self.speeds[node.id] = self._configuration.default_speed

    def _update_movement(self):
        for node_id in self.nodes.keys():
            node = self.nodes[node_id]

            # If the node has a target update its position
            if node_id in self.targets:
                target = self.targets[node_id]
                current_position = node.position
                speed = self.speeds[node_id]
                target_vector: Position = (target[0] - current_position[0],
                                           target[1] - current_position[1],
                                           target[2] - current_position[2])
                movement_multiplier = speed * self._configuration.update_rate
                distance_delta = math.sqrt(target_vector[0] ** 2 + target_vector[1] ** 2 + target_vector[2] ** 2)

                if movement_multiplier >= distance_delta:
                    node.position = (
                        target[0],
                        target[1],
                        target[2]
                    )
                else:
                    target_vector_multiplier = movement_multiplier / distance_delta

                    node.position = (
                        current_position[0] + target_vector[0] * target_vector_multiplier,
                        current_position[1] + target_vector[1] * target_vector_multiplier,
                        current_position[2] + target_vector[2] * target_vector_multiplier
                    )
            telemetry = Telemetry(current_position=node.position)

            def make_send_telemetry(node_ref, telemetry_ref):
                return lambda: node_ref.protocol_encapsulator.handle_telemetry(telemetry_ref)

            self._event_loop.schedule_event(
                self._event_loop.current_time,
                make_send_telemetry(node, telemetry),
                label_node(node) + " handle_telemetry"
            )

        self._event_loop.schedule_event(self._event_loop.current_time + self._configuration.update_rate,
                                        self._update_movement,
                                        "Mobility")

    def handle_command(self, command: MobilityCommand, node: Node):
        """
        Performs a mobility command. This method is called by the node's 
        provider to transmit it's mobility command to the mobility handler.

        Args:
            command: Command being issued
            node: Node that issued the command
        """
        if node.id not in self.nodes:
            raise MasslessMobilityException("Error handling commands: Cannot handle command from unregistered node")

        if command.command_type == MobilityCommandType.GOTO_COORDS:
            self._goto((command.param_1, command.param_2, command.param_3), node)
        elif command.command_type == MobilityCommandType.GOTO_GEO_COORDS:
            relative_coords = geo_to_cartesian(self._configuration.reference_coordinates,
                                               (command.param_1, command.param_2, command.param_3))
            self._goto(relative_coords, node)
        elif command.command_type == MobilityCommandType.SET_SPEED:
            self.speeds[node.id] = command.param_1

    def _goto(self, position: Position, node: Node):
        self.targets[node.id] = position

    def _stop(self, node: Node):
        del self.targets[node.id]

__init__(configuration=MasslessMobilityConfiguration())

Constructor for the mobility handler

Parameters:

Name Type Description Default
configuration MasslessMobilityConfiguration

Configuration for the mobility handler. If not set all default values will be used.

MasslessMobilityConfiguration()
Source code in gradysim/simulator/handler/mobility/massless.py
def __init__(self, configuration: MasslessMobilityConfiguration = MasslessMobilityConfiguration()):
    """
    Constructor for the mobility handler

    Args:
        configuration: Configuration for the mobility handler. If not set all default values will be used.
    """
    self._configuration = configuration
    self.nodes = {}
    self.targets = {}
    self.speeds = {}
    self._injected = False

handle_command(command, node)

Performs a mobility command. This method is called by the node's provider to transmit it's mobility command to the mobility handler.

Parameters:

Name Type Description Default
command MobilityCommand

Command being issued

required
node Node

Node that issued the command

required
Source code in gradysim/simulator/handler/mobility/massless.py
def handle_command(self, command: MobilityCommand, node: Node):
    """
    Performs a mobility command. This method is called by the node's 
    provider to transmit it's mobility command to the mobility handler.

    Args:
        command: Command being issued
        node: Node that issued the command
    """
    if node.id not in self.nodes:
        raise MasslessMobilityException("Error handling commands: Cannot handle command from unregistered node")

    if command.command_type == MobilityCommandType.GOTO_COORDS:
        self._goto((command.param_1, command.param_2, command.param_3), node)
    elif command.command_type == MobilityCommandType.GOTO_GEO_COORDS:
        relative_coords = geo_to_cartesian(self._configuration.reference_coordinates,
                                           (command.param_1, command.param_2, command.param_3))
        self._goto(relative_coords, node)
    elif command.command_type == MobilityCommandType.SET_SPEED:
        self.speeds[node.id] = command.param_1

Massless Mobility Handler

gradysim.simulator.handler.mobility.massless.MasslessMobilityHandler

Bases: INodeHandler

Introduces mobility into the simulation. Simulates nodes as massless and dimensionless points that can move freely in 3D space. There is no concept of acceleration or inertia, all changes in velocity are instantaneous.

Works by registering a regular event that updates every node's position based on its target and speed. A protocol is capable of altering its own speed and heading by sending mobility commands through its provider. These commands will reach the mobility handler which will update the node's target and speed accordingly. Nodes also receive telemetry updates containing information pertaining a node's current mobility status.

Source code in gradysim/simulator/handler/mobility/massless.py
class MasslessMobilityHandler(INodeHandler):
    """
    Introduces mobility into the simulation. Simulates nodes as massless and dimensionless points that can move
    freely in 3D space. There is no concept of acceleration or inertia, all changes in velocity are instantaneous.

    Works by registering a regular event that updates every node's position based on its target and speed. A protocol
    is capable of altering its own speed and heading by sending mobility commands through its provider. These commands
    will reach the mobility handler which will update the node's target and speed accordingly. Nodes also receive
    telemetry updates containing information pertaining a node's current mobility status.
    """

    @staticmethod
    def get_label() -> str:
        return "mobility"

    _event_loop: EventLoop

    nodes: Dict[int, Node]
    targets: Dict[int, Position]
    speeds: Dict[int, float]

    def __init__(self, configuration: MasslessMobilityConfiguration = MasslessMobilityConfiguration()):
        """
        Constructor for the mobility handler

        Args:
            configuration: Configuration for the mobility handler. If not set all default values will be used.
        """
        self._configuration = configuration
        self.nodes = {}
        self.targets = {}
        self.speeds = {}
        self._injected = False

    def inject(self, event_loop: EventLoop):
        self._injected = True
        self._event_loop = event_loop

        event_loop.schedule_event(event_loop.current_time + self._configuration.update_rate,
                                  self._update_movement,
                                  "Mobility")

    def register_node(self, node: Node):
        if not self._injected:
            raise MasslessMobilityException("Error registering node: cannot register nodes while mobility handler "
                                    "is uninitialized.")

        self.nodes[node.id] = node
        self.speeds[node.id] = self._configuration.default_speed

    def _update_movement(self):
        for node_id in self.nodes.keys():
            node = self.nodes[node_id]

            # If the node has a target update its position
            if node_id in self.targets:
                target = self.targets[node_id]
                current_position = node.position
                speed = self.speeds[node_id]
                target_vector: Position = (target[0] - current_position[0],
                                           target[1] - current_position[1],
                                           target[2] - current_position[2])
                movement_multiplier = speed * self._configuration.update_rate
                distance_delta = math.sqrt(target_vector[0] ** 2 + target_vector[1] ** 2 + target_vector[2] ** 2)

                if movement_multiplier >= distance_delta:
                    node.position = (
                        target[0],
                        target[1],
                        target[2]
                    )
                else:
                    target_vector_multiplier = movement_multiplier / distance_delta

                    node.position = (
                        current_position[0] + target_vector[0] * target_vector_multiplier,
                        current_position[1] + target_vector[1] * target_vector_multiplier,
                        current_position[2] + target_vector[2] * target_vector_multiplier
                    )
            telemetry = Telemetry(current_position=node.position)

            def make_send_telemetry(node_ref, telemetry_ref):
                return lambda: node_ref.protocol_encapsulator.handle_telemetry(telemetry_ref)

            self._event_loop.schedule_event(
                self._event_loop.current_time,
                make_send_telemetry(node, telemetry),
                label_node(node) + " handle_telemetry"
            )

        self._event_loop.schedule_event(self._event_loop.current_time + self._configuration.update_rate,
                                        self._update_movement,
                                        "Mobility")

    def handle_command(self, command: MobilityCommand, node: Node):
        """
        Performs a mobility command. This method is called by the node's 
        provider to transmit it's mobility command to the mobility handler.

        Args:
            command: Command being issued
            node: Node that issued the command
        """
        if node.id not in self.nodes:
            raise MasslessMobilityException("Error handling commands: Cannot handle command from unregistered node")

        if command.command_type == MobilityCommandType.GOTO_COORDS:
            self._goto((command.param_1, command.param_2, command.param_3), node)
        elif command.command_type == MobilityCommandType.GOTO_GEO_COORDS:
            relative_coords = geo_to_cartesian(self._configuration.reference_coordinates,
                                               (command.param_1, command.param_2, command.param_3))
            self._goto(relative_coords, node)
        elif command.command_type == MobilityCommandType.SET_SPEED:
            self.speeds[node.id] = command.param_1

    def _goto(self, position: Position, node: Node):
        self.targets[node.id] = position

    def _stop(self, node: Node):
        del self.targets[node.id]

__init__(configuration=MasslessMobilityConfiguration())

Constructor for the mobility handler

Parameters:

Name Type Description Default
configuration MasslessMobilityConfiguration

Configuration for the mobility handler. If not set all default values will be used.

MasslessMobilityConfiguration()
Source code in gradysim/simulator/handler/mobility/massless.py
def __init__(self, configuration: MasslessMobilityConfiguration = MasslessMobilityConfiguration()):
    """
    Constructor for the mobility handler

    Args:
        configuration: Configuration for the mobility handler. If not set all default values will be used.
    """
    self._configuration = configuration
    self.nodes = {}
    self.targets = {}
    self.speeds = {}
    self._injected = False

handle_command(command, node)

Performs a mobility command. This method is called by the node's provider to transmit it's mobility command to the mobility handler.

Parameters:

Name Type Description Default
command MobilityCommand

Command being issued

required
node Node

Node that issued the command

required
Source code in gradysim/simulator/handler/mobility/massless.py
def handle_command(self, command: MobilityCommand, node: Node):
    """
    Performs a mobility command. This method is called by the node's 
    provider to transmit it's mobility command to the mobility handler.

    Args:
        command: Command being issued
        node: Node that issued the command
    """
    if node.id not in self.nodes:
        raise MasslessMobilityException("Error handling commands: Cannot handle command from unregistered node")

    if command.command_type == MobilityCommandType.GOTO_COORDS:
        self._goto((command.param_1, command.param_2, command.param_3), node)
    elif command.command_type == MobilityCommandType.GOTO_GEO_COORDS:
        relative_coords = geo_to_cartesian(self._configuration.reference_coordinates,
                                           (command.param_1, command.param_2, command.param_3))
        self._goto(relative_coords, node)
    elif command.command_type == MobilityCommandType.SET_SPEED:
        self.speeds[node.id] = command.param_1

gradysim.simulator.handler.mobility.massless.MasslessMobilityConfiguration dataclass

Configuration class for the mobility handler

Source code in gradysim/simulator/handler/mobility/massless.py
@dataclass
class MasslessMobilityConfiguration:
    """
    Configuration class for the mobility handler
    """

    update_rate: float = 0.01
    """Interval in simulation seconds between mobility updates"""

    default_speed: float = 10
    """This is the default speed of a node in m/s"""

    reference_coordinates: Tuple[float, float, float] = (0, 0, 0)
    """
    These coordinates are used as a reference frame to convert geographical coordinates to cartesian coordinates. They
    will be used as the center of the scene and all geographical coordinates will be converted relative to it.
    """

default_speed: float = 10 class-attribute instance-attribute

This is the default speed of a node in m/s

reference_coordinates: Tuple[float, float, float] = (0, 0, 0) class-attribute instance-attribute

These coordinates are used as a reference frame to convert geographical coordinates to cartesian coordinates. They will be used as the center of the scene and all geographical coordinates will be converted relative to it.

update_rate: float = 0.01 class-attribute instance-attribute

Interval in simulation seconds between mobility updates

Dynamic Velocity Mobility Handler

See Dynamic Velocity Mobility Handler for the full handler, configuration, telemetry, and core API reference.