Skip to content

Dynamic Velocity Mobility Handler

The Dynamic Velocity Mobility Handler is the mobility model for nodes that are controlled through velocity commands instead of waypoint targets.

For the conceptual overview of mobility command interpretation, telemetry flow, and handler switching, see How Mobility Works.

Use it when you need:

  • bounded horizontal and vertical speed
  • bounded horizontal and vertical acceleration
  • optional first-order velocity tracking with tau_xy and tau_z
  • telemetry that includes both position and current velocity

Related example:

  • showcases/dynamic-velocity-mobility/main.py

Package

gradysim.simulator.handler.mobility.dynamic_velocity

GrADyS-SIM NG Dynamic Velocity Mobility Handler

A dynamic velocity mobility handler for the GrADyS-SIM NG simulator, designed for distributed controllers that output velocity vectors.

Key features:

  • Direct velocity control (no waypoints)
  • Independent horizontal and vertical constraints
  • Acceleration-limited velocity tracking
  • Optional telemetry emission

DynamicVelocityMobilityConfiguration dataclass

Configuration parameters for the Dynamic Velocity Mobility Handler.

Attributes:

Name Type Description
update_rate float

Time interval (in seconds) between mobility updates. Typical for trajectory-level quadrotor sims: 0.01–0.05 s.

max_speed_xy float

Maximum horizontal speed (m/s). Applied to the norm of (vx, vy). Typical: 3–15 m/s (depends heavily on vehicle and mission).

max_speed_z float

Maximum vertical speed (m/s). Applied to |vz|. Typical: 1–8 m/s.

max_acc_xy float

Maximum horizontal acceleration (m/s²). Applied to the norm of (ax, ay). Typical: 2–8 m/s².

max_acc_z float

Maximum vertical acceleration (m/s²). Applied to |az|. Typical: 2–10 m/s².

tau_xy Optional[float]

Optional first-order time constant (seconds) for horizontal velocity tracking. When set, the model behaves like a 1st-order system + acceleration saturation: a* = (v_des - v) / tau_xy Typical: 0.3–0.8 s.

tau_z Optional[float]

Optional first-order time constant (seconds) for vertical velocity tracking. Typical: 0.5–1.2 s.

send_telemetry bool

If True, emit Telemetry messages after position updates.

telemetry_decimation int

Emit telemetry every N mobility updates (default: 1).

Source code in gradysim/simulator/handler/mobility/dynamic_velocity/config.py
@dataclass
class DynamicVelocityMobilityConfiguration:
    """
    Configuration parameters for the Dynamic Velocity Mobility Handler.

    Attributes:
        update_rate: Time interval (in seconds) between mobility updates.
            Typical for trajectory-level quadrotor sims: 0.01–0.05 s.
        max_speed_xy: Maximum horizontal speed (m/s). Applied to the norm of (vx, vy).
            Typical: 3–15 m/s (depends heavily on vehicle and mission).
        max_speed_z: Maximum vertical speed (m/s). Applied to |vz|.
            Typical: 1–8 m/s.
        max_acc_xy: Maximum horizontal acceleration (m/s²). Applied to the norm of (ax, ay).
            Typical: 2–8 m/s².
        max_acc_z: Maximum vertical acceleration (m/s²). Applied to |az|.
            Typical: 2–10 m/s².
        tau_xy: Optional first-order time constant (seconds) for horizontal velocity tracking.
            When set, the model behaves like a 1st-order system + acceleration saturation:
                a* = (v_des - v) / tau_xy
            Typical: 0.3–0.8 s.
        tau_z: Optional first-order time constant (seconds) for vertical velocity tracking.
            Typical: 0.5–1.2 s.
        send_telemetry: If True, emit Telemetry messages after position updates.
        telemetry_decimation: Emit telemetry every N mobility updates (default: 1).
    """
    update_rate: float
    max_speed_xy: float
    max_speed_z: float
    max_acc_xy: float
    max_acc_z: float
    tau_xy: Optional[float] = None
    tau_z: Optional[float] = None
    send_telemetry: bool = True
    telemetry_decimation: int = 1

DynamicVelocityMobilityHandler

Bases: INodeHandler

Dynamic Velocity Mobility Handler for GrADyS-SIM NG.

This handler updates node positions based on velocity commands with realistic physical constraints (velocity and acceleration limits).

Key features:

  • Direct velocity control (no waypoints)
  • Independent horizontal and vertical constraints
  • Acceleration-limited velocity tracking (optionally with 1st-order lag via tau)
  • Optional telemetry emission
Usage

config = DynamicVelocityMobilityConfiguration( update_rate=0.1, max_speed_xy=10.0, max_speed_z=5.0, max_acc_xy=2.0, max_acc_z=1.0 ) handler = DynamicVelocityMobilityHandler(config)

In your protocol handler:

handler.set_velocity(node.identifier, (vx, vy, vz))

Source code in gradysim/simulator/handler/mobility/dynamic_velocity/handler.py
class DynamicVelocityMobilityHandler(INodeHandler):
    """
    Dynamic Velocity Mobility Handler for GrADyS-SIM NG.

    This handler updates node positions based on velocity commands with
    realistic physical constraints (velocity and acceleration limits).

    Key features:

    - Direct velocity control (no waypoints)
    - Independent horizontal and vertical constraints
    - Acceleration-limited velocity tracking (optionally with 1st-order lag via tau)
    - Optional telemetry emission

    Usage:
        config = DynamicVelocityMobilityConfiguration(
            update_rate=0.1,
            max_speed_xy=10.0,
            max_speed_z=5.0,
            max_acc_xy=2.0,
            max_acc_z=1.0
        )
        handler = DynamicVelocityMobilityHandler(config)

        # In your protocol handler:
        handler.set_velocity(node.identifier, (vx, vy, vz))
    """

    def __init__(self, config: DynamicVelocityMobilityConfiguration):
        """
        Initialize the Dynamic Velocity Mobility Handler.

        Args:
            config: Configuration parameters for mobility constraints and update rate.
        """
        self._config = config
        self._loop: EventLoop = None
        self._nodes: Dict[int, Node] = {}  # Track all registered nodes

        # Per-node state: current velocity and desired velocity
        self._current_velocity: Dict[int, Tuple[float, float, float]] = {}
        self._desired_velocity: Dict[int, Tuple[float, float, float]] = {}

        # Telemetry tracking: count updates per node
        self._update_counter: Dict[int, int] = {}

    @staticmethod
    def get_label() -> str:
        return "mobility"

    def register_node(self, node: Node):
        """
        Register a node with this handler (called when node is created).

        Args:
            node: The node instance to register.
        """
        # Initialize state for this node
        node_id = node.id
        self._nodes[node_id] = node
        self._current_velocity[node_id] = (0.0, 0.0, 0.0)
        self._desired_velocity[node_id] = (0.0, 0.0, 0.0)
        self._update_counter[node_id] = 0

    def inject(self, event_loop: EventLoop):
        """
        Inject the event loop into the handler.

        Args:
            event_loop: The event loop for scheduling periodic updates.
        """
        self._loop = event_loop

    def initialize(self):
        """
        Start the periodic mobility update loop.

        Schedules the first mobility update event for all nodes.
        """
        if self._nodes:  # Only schedule if we have nodes
            self._loop.schedule_event(
                self._loop.current_time + self._config.update_rate,
                self._mobility_update
            )

    def finalize(self):
        """Finalize handler after simulation ends (not used by this handler)."""
        pass

    def after_simulation_step(self, iteration: int, time: float):
        """
        Called after each simulation step.

        Args:
            iteration: Current simulation iteration number
            time: Current simulation time
        """
        pass

    def handle_command(self, command: MobilityCommand, node: Node):
        """
        Performs a mobility command. This method is called by the node's
        provider to transmit it's mobility command to the mobility handler.

        Args:
            command: Command being issued
            node: Node that issued the command
        """
        if node.id not in self._nodes:
            raise Exception("Error handling commands: Cannot handle command from unregistered node")

        if command.command_type == MobilityCommandType.SET_SPEED:
            if command.param_1 is None or command.param_2 is None or command.param_3 is None:
                raise Exception("Velocity SET_SPEED command requires three parameters (vx, vy, vz). "
                                "Make sure that the command is properly constructed using SetVelocityMobilityCommand.")
            self._set_velocity(node.id, (command.param_1, command.param_2, command.param_3))
        else:
            raise Exception("The only command accepted in DynamicVelocityMobilityHandler is SET_SPEED")

    def _set_velocity(self, node_id: int, v_des: Tuple[float, float, float]) -> None:
        """
        Set the desired velocity for a node.

        The velocity persists until updated by a new command. To stop a node,
        explicitly command zero velocity: (0.0, 0.0, 0.0).

        Args:
            node_id: Identifier of the node to control.
            v_des: Desired velocity as (vx, vy, vz) in m/s.

        Example:
            # Move northeast at 5 m/s horizontally, ascending at 2 m/s
            handler.set_velocity(node.id, (3.54, 3.54, 2.0))

            # Stop the node
            handler.set_velocity(node.id, (0.0, 0.0, 0.0))
        """
        if node_id not in self._desired_velocity:
            # Initialize state if node not yet known
            self._current_velocity[node_id] = (0.0, 0.0, 0.0)
            self._desired_velocity[node_id] = v_des
            self._update_counter[node_id] = 0
        else:
            self._desired_velocity[node_id] = v_des

    def _get_node_velocity(self, node_id: int) -> Optional[Tuple[float, float, float]]:
        """
        Get the current velocity of a node.

        Args:
            node_id: ID of the node

        Returns:
            Velocity vector (vx, vy, vz) in m/s, or None if node not registered
        """
        return self._current_velocity.get(node_id)

    def _get_node_position(self, node_id: int) -> Optional[Tuple[float, float, float]]:
        """
        Get the current position of a node.

        Args:
            node_id: ID of the node

        Returns:
            Position vector (x, y, z) in meters, or None if node not registered
        """
        node = self._nodes.get(node_id)
        return node.position if node is not None else None

    def _mobility_update(self):
        """
        Perform a single mobility update step for all nodes.

        This method:
        1. Iterates through all registered nodes
        2. Retrieves the current and desired velocities for each
        3. Applies acceleration limits to compute the new velocity
        4. Applies velocity saturation
        5. Updates the node position using numerical integration
        6. Optionally emits telemetry
        7. Schedules the next update
        """
        dt = self._config.update_rate

        # Update all nodes
        for node_id, node in self._nodes.items():
            # Get current and desired velocities
            v_current = self._current_velocity[node_id]
            v_desired = self._desired_velocity[node_id]

            # UAV trajectory model:
            # - The protocol commands a desired velocity v_desired.
            # - The vehicle tracks it with bounded acceleration.
            # - Optionally, a 1st-order lag (time constant tau) shapes the transient:
            #       a* = (v_des - v) / tau
            #   then a is saturated and integrated.
            if self._config.tau_xy is None and self._config.tau_z is None:
                v_new = apply_acceleration_limits(
                    v_current,
                    v_desired,
                    dt,
                    self._config.max_acc_xy,
                    self._config.max_acc_z,
                )
            else:
                v_new = apply_velocity_tracking_first_order(
                    v_current,
                    v_desired,
                    dt,
                    self._config.max_acc_xy,
                    self._config.max_acc_z,
                    tau_xy=self._config.tau_xy,
                    tau_z=self._config.tau_z,
                )

            # Apply velocity saturation
            v_new = apply_velocity_limits(
                v_new,
                self._config.max_speed_xy,
                self._config.max_speed_z
            )

            # Update position: x_{k+1} = x_k + v_k * dt
            new_pos = integrate_position(node.position, v_new, dt)
            node.position = new_pos

            # Update stored velocity
            self._current_velocity[node_id] = v_new

            # Update counter and emit telemetry if needed
            self._update_counter[node_id] += 1
            if self._should_emit_telemetry(node_id):
                self._emit_telemetry(node)

        # Schedule next update
        self._loop.schedule_event(
            self._loop.current_time + self._config.update_rate,
            self._mobility_update
        )

    def _should_emit_telemetry(self, node_id: int) -> bool:
        """
        Determine if telemetry should be emitted on this update.

        Args:
            node_id: Identifier of the node.

        Returns:
            True if telemetry should be emitted, False otherwise.
        """
        if not self._config.send_telemetry:
            return False

        count = self._update_counter[node_id]
        return (count % self._config.telemetry_decimation) == 0

    def _emit_telemetry(self, node: Node):
        """
        Emit a Telemetry message with the current node position.

        Args:
            node: The node to emit telemetry for.

        The telemetry is sent directly to the node's protocol encapsulator.
        """
        telemetry = DynamicVelocityTelemetry(
            current_position=node.position,
            current_velocity=self._get_node_velocity(node.id),
        )

        # Schedule telemetry delivery to protocol (same pattern as MobilityHandler)
        def send_telemetry():
            node.protocol_encapsulator.handle_telemetry(telemetry)

        self._loop.schedule_event(
            self._loop.current_time,
            send_telemetry,
            f"Node {node.id} handle_telemetry"
        )

__init__(config)

Initialize the Dynamic Velocity Mobility Handler.

Parameters:

Name Type Description Default
config DynamicVelocityMobilityConfiguration

Configuration parameters for mobility constraints and update rate.

required
Source code in gradysim/simulator/handler/mobility/dynamic_velocity/handler.py
def __init__(self, config: DynamicVelocityMobilityConfiguration):
    """
    Initialize the Dynamic Velocity Mobility Handler.

    Args:
        config: Configuration parameters for mobility constraints and update rate.
    """
    self._config = config
    self._loop: EventLoop = None
    self._nodes: Dict[int, Node] = {}  # Track all registered nodes

    # Per-node state: current velocity and desired velocity
    self._current_velocity: Dict[int, Tuple[float, float, float]] = {}
    self._desired_velocity: Dict[int, Tuple[float, float, float]] = {}

    # Telemetry tracking: count updates per node
    self._update_counter: Dict[int, int] = {}

after_simulation_step(iteration, time)

Called after each simulation step.

Parameters:

Name Type Description Default
iteration int

Current simulation iteration number

required
time float

Current simulation time

required
Source code in gradysim/simulator/handler/mobility/dynamic_velocity/handler.py
def after_simulation_step(self, iteration: int, time: float):
    """
    Called after each simulation step.

    Args:
        iteration: Current simulation iteration number
        time: Current simulation time
    """
    pass

finalize()

Finalize handler after simulation ends (not used by this handler).

Source code in gradysim/simulator/handler/mobility/dynamic_velocity/handler.py
def finalize(self):
    """Finalize handler after simulation ends (not used by this handler)."""
    pass

handle_command(command, node)

Performs a mobility command. This method is called by the node's provider to transmit it's mobility command to the mobility handler.

Parameters:

Name Type Description Default
command MobilityCommand

Command being issued

required
node Node

Node that issued the command

required
Source code in gradysim/simulator/handler/mobility/dynamic_velocity/handler.py
def handle_command(self, command: MobilityCommand, node: Node):
    """
    Performs a mobility command. This method is called by the node's
    provider to transmit it's mobility command to the mobility handler.

    Args:
        command: Command being issued
        node: Node that issued the command
    """
    if node.id not in self._nodes:
        raise Exception("Error handling commands: Cannot handle command from unregistered node")

    if command.command_type == MobilityCommandType.SET_SPEED:
        if command.param_1 is None or command.param_2 is None or command.param_3 is None:
            raise Exception("Velocity SET_SPEED command requires three parameters (vx, vy, vz). "
                            "Make sure that the command is properly constructed using SetVelocityMobilityCommand.")
        self._set_velocity(node.id, (command.param_1, command.param_2, command.param_3))
    else:
        raise Exception("The only command accepted in DynamicVelocityMobilityHandler is SET_SPEED")

initialize()

Start the periodic mobility update loop.

Schedules the first mobility update event for all nodes.

Source code in gradysim/simulator/handler/mobility/dynamic_velocity/handler.py
def initialize(self):
    """
    Start the periodic mobility update loop.

    Schedules the first mobility update event for all nodes.
    """
    if self._nodes:  # Only schedule if we have nodes
        self._loop.schedule_event(
            self._loop.current_time + self._config.update_rate,
            self._mobility_update
        )

inject(event_loop)

Inject the event loop into the handler.

Parameters:

Name Type Description Default
event_loop EventLoop

The event loop for scheduling periodic updates.

required
Source code in gradysim/simulator/handler/mobility/dynamic_velocity/handler.py
def inject(self, event_loop: EventLoop):
    """
    Inject the event loop into the handler.

    Args:
        event_loop: The event loop for scheduling periodic updates.
    """
    self._loop = event_loop

register_node(node)

Register a node with this handler (called when node is created).

Parameters:

Name Type Description Default
node Node

The node instance to register.

required
Source code in gradysim/simulator/handler/mobility/dynamic_velocity/handler.py
def register_node(self, node: Node):
    """
    Register a node with this handler (called when node is created).

    Args:
        node: The node instance to register.
    """
    # Initialize state for this node
    node_id = node.id
    self._nodes[node_id] = node
    self._current_velocity[node_id] = (0.0, 0.0, 0.0)
    self._desired_velocity[node_id] = (0.0, 0.0, 0.0)
    self._update_counter[node_id] = 0

apply_acceleration_limits(v_current, v_desired, dt, max_acc_xy, max_acc_z)

Limit velocity change based on acceleration constraints.

Applies independent horizontal and vertical acceleration limits:

  • Horizontal: ||a_xy|| <= max_acc_xy
  • Vertical: |a_z| <= max_acc_z

Parameters:

Name Type Description Default
v_current Tuple[float, float, float]

Current velocity (vx, vy, vz) in m/s.

required
v_desired Tuple[float, float, float]

Desired velocity (vx, vy, vz) in m/s.

required
dt float

Time step in seconds.

required
max_acc_xy float

Maximum horizontal acceleration in m/s².

required
max_acc_z float

Maximum vertical acceleration in m/s².

required

Returns:

Type Description
Tuple[float, float, float]

New velocity after applying acceleration limits.

Example

v_cur = (0.0, 0.0, 0.0) v_des = (10.0, 10.0, 5.0) v_new = apply_acceleration_limits(v_cur, v_des, 0.1, 2.0, 1.0)

Horizontal component limited to 2.0 m/s² * 0.1 s = 0.2 m/s change

Vertical component limited to 1.0 m/s² * 0.1 s = 0.1 m/s change

Source code in gradysim/simulator/handler/mobility/dynamic_velocity/core.py
def apply_acceleration_limits(
    v_current: Tuple[float, float, float],
    v_desired: Tuple[float, float, float],
    dt: float,
    max_acc_xy: float,
    max_acc_z: float
) -> Tuple[float, float, float]:
    """
    Limit velocity change based on acceleration constraints.

    Applies independent horizontal and vertical acceleration limits:

    - Horizontal: ||a_xy|| <= max_acc_xy
    - Vertical: |a_z| <= max_acc_z

    Args:
        v_current: Current velocity (vx, vy, vz) in m/s.
        v_desired: Desired velocity (vx, vy, vz) in m/s.
        dt: Time step in seconds.
        max_acc_xy: Maximum horizontal acceleration in m/s².
        max_acc_z: Maximum vertical acceleration in m/s².

    Returns:
        New velocity after applying acceleration limits.

    Example:
        >>> v_cur = (0.0, 0.0, 0.0)
        >>> v_des = (10.0, 10.0, 5.0)
        >>> v_new = apply_acceleration_limits(v_cur, v_des, 0.1, 2.0, 1.0)
        >>> # Horizontal component limited to 2.0 m/s² * 0.1 s = 0.2 m/s change
        >>> # Vertical component limited to 1.0 m/s² * 0.1 s = 0.1 m/s change
    """
    # Decompose into horizontal (xy) and vertical (z) components
    vx_cur, vy_cur, vz_cur = v_current
    vx_des, vy_des, vz_des = v_desired

    # Horizontal acceleration limiting
    dvx = vx_des - vx_cur
    dvy = vy_des - vy_cur
    dv_xy_norm = math.sqrt(dvx**2 + dvy**2)

    max_dv_xy = max_acc_xy * dt

    if dv_xy_norm > max_dv_xy:
        # Scale down to respect acceleration limit
        scale = max_dv_xy / dv_xy_norm
        dvx *= scale
        dvy *= scale

    vx_new = vx_cur + dvx
    vy_new = vy_cur + dvy

    # Vertical acceleration limiting
    dvz = vz_des - vz_cur
    max_dvz = max_acc_z * dt

    if abs(dvz) > max_dvz:
        dvz = math.copysign(max_dvz, dvz)

    vz_new = vz_cur + dvz

    return (vx_new, vy_new, vz_new)

apply_velocity_limits(v, max_speed_xy, max_speed_z)

Apply velocity saturation constraints.

Applies independent horizontal and vertical velocity limits:

  • Horizontal: ||v_xy|| <= max_speed_xy
  • Vertical: |v_z| <= max_speed_z

Parameters:

Name Type Description Default
v Tuple[float, float, float]

Velocity vector (vx, vy, vz) in m/s.

required
max_speed_xy float

Maximum horizontal speed in m/s.

required
max_speed_z float

Maximum vertical speed in m/s.

required

Returns:

Type Description
Tuple[float, float, float]

Saturated velocity vector.

Example

v = (15.0, 15.0, 10.0) v_sat = apply_velocity_limits(v, 10.0, 5.0)

Horizontal norm is 21.21 m/s, scaled down to 10.0 m/s

Vertical component clamped to 5.0 m/s

Source code in gradysim/simulator/handler/mobility/dynamic_velocity/core.py
def apply_velocity_limits(
    v: Tuple[float, float, float],
    max_speed_xy: float,
    max_speed_z: float
) -> Tuple[float, float, float]:
    """
    Apply velocity saturation constraints.

    Applies independent horizontal and vertical velocity limits:

    - Horizontal: ||v_xy|| <= max_speed_xy
    - Vertical: |v_z| <= max_speed_z

    Args:
        v: Velocity vector (vx, vy, vz) in m/s.
        max_speed_xy: Maximum horizontal speed in m/s.
        max_speed_z: Maximum vertical speed in m/s.

    Returns:
        Saturated velocity vector.

    Example:
        >>> v = (15.0, 15.0, 10.0)
        >>> v_sat = apply_velocity_limits(v, 10.0, 5.0)
        >>> # Horizontal norm is 21.21 m/s, scaled down to 10.0 m/s
        >>> # Vertical component clamped to 5.0 m/s
    """
    vx, vy, vz = v

    # Horizontal velocity saturation
    v_xy_norm = math.sqrt(vx**2 + vy**2)

    if v_xy_norm > max_speed_xy:
        scale = max_speed_xy / v_xy_norm
        vx *= scale
        vy *= scale

    # Vertical velocity saturation
    if abs(vz) > max_speed_z:
        vz = math.copysign(max_speed_z, vz)

    return (vx, vy, vz)

integrate_position(position, velocity, dt)

Update position using simple Euler integration.

Implements: x_{k+1} = x_k + v_k * dt

Parameters:

Name Type Description Default
position Tuple[float, float, float]

Current position (x, y, z) in meters.

required
velocity Tuple[float, float, float]

Current velocity (vx, vy, vz) in m/s.

required
dt float

Time step in seconds.

required

Returns:

Type Description
Tuple[float, float, float]

New position after integration step.

Example

pos = (0.0, 0.0, 0.0) vel = (1.0, 2.0, 0.5) new_pos = integrate_position(pos, vel, 0.1) new_pos (0.1, 0.2, 0.05)

Source code in gradysim/simulator/handler/mobility/dynamic_velocity/core.py
def integrate_position(
    position: Tuple[float, float, float],
    velocity: Tuple[float, float, float],
    dt: float
) -> Tuple[float, float, float]:
    """
    Update position using simple Euler integration.

    Implements: x_{k+1} = x_k + v_k * dt

    Args:
        position: Current position (x, y, z) in meters.
        velocity: Current velocity (vx, vy, vz) in m/s.
        dt: Time step in seconds.

    Returns:
        New position after integration step.

    Example:
        >>> pos = (0.0, 0.0, 0.0)
        >>> vel = (1.0, 2.0, 0.5)
        >>> new_pos = integrate_position(pos, vel, 0.1)
        >>> new_pos
        (0.1, 0.2, 0.05)
    """
    x, y, z = position
    vx, vy, vz = velocity

    return (
        x + vx * dt,
        y + vy * dt,
        z + vz * dt
    )

Handler

gradysim.simulator.handler.mobility.dynamic_velocity.handler.DynamicVelocityMobilityHandler

Bases: INodeHandler

Dynamic Velocity Mobility Handler for GrADyS-SIM NG.

This handler updates node positions based on velocity commands with realistic physical constraints (velocity and acceleration limits).

Key features:

  • Direct velocity control (no waypoints)
  • Independent horizontal and vertical constraints
  • Acceleration-limited velocity tracking (optionally with 1st-order lag via tau)
  • Optional telemetry emission
Usage

config = DynamicVelocityMobilityConfiguration( update_rate=0.1, max_speed_xy=10.0, max_speed_z=5.0, max_acc_xy=2.0, max_acc_z=1.0 ) handler = DynamicVelocityMobilityHandler(config)

In your protocol handler:

handler.set_velocity(node.identifier, (vx, vy, vz))

Source code in gradysim/simulator/handler/mobility/dynamic_velocity/handler.py
class DynamicVelocityMobilityHandler(INodeHandler):
    """
    Dynamic Velocity Mobility Handler for GrADyS-SIM NG.

    This handler updates node positions based on velocity commands with
    realistic physical constraints (velocity and acceleration limits).

    Key features:

    - Direct velocity control (no waypoints)
    - Independent horizontal and vertical constraints
    - Acceleration-limited velocity tracking (optionally with 1st-order lag via tau)
    - Optional telemetry emission

    Usage:
        config = DynamicVelocityMobilityConfiguration(
            update_rate=0.1,
            max_speed_xy=10.0,
            max_speed_z=5.0,
            max_acc_xy=2.0,
            max_acc_z=1.0
        )
        handler = DynamicVelocityMobilityHandler(config)

        # In your protocol handler:
        handler.set_velocity(node.identifier, (vx, vy, vz))
    """

    def __init__(self, config: DynamicVelocityMobilityConfiguration):
        """
        Initialize the Dynamic Velocity Mobility Handler.

        Args:
            config: Configuration parameters for mobility constraints and update rate.
        """
        self._config = config
        self._loop: EventLoop = None
        self._nodes: Dict[int, Node] = {}  # Track all registered nodes

        # Per-node state: current velocity and desired velocity
        self._current_velocity: Dict[int, Tuple[float, float, float]] = {}
        self._desired_velocity: Dict[int, Tuple[float, float, float]] = {}

        # Telemetry tracking: count updates per node
        self._update_counter: Dict[int, int] = {}

    @staticmethod
    def get_label() -> str:
        return "mobility"

    def register_node(self, node: Node):
        """
        Register a node with this handler (called when node is created).

        Args:
            node: The node instance to register.
        """
        # Initialize state for this node
        node_id = node.id
        self._nodes[node_id] = node
        self._current_velocity[node_id] = (0.0, 0.0, 0.0)
        self._desired_velocity[node_id] = (0.0, 0.0, 0.0)
        self._update_counter[node_id] = 0

    def inject(self, event_loop: EventLoop):
        """
        Inject the event loop into the handler.

        Args:
            event_loop: The event loop for scheduling periodic updates.
        """
        self._loop = event_loop

    def initialize(self):
        """
        Start the periodic mobility update loop.

        Schedules the first mobility update event for all nodes.
        """
        if self._nodes:  # Only schedule if we have nodes
            self._loop.schedule_event(
                self._loop.current_time + self._config.update_rate,
                self._mobility_update
            )

    def finalize(self):
        """Finalize handler after simulation ends (not used by this handler)."""
        pass

    def after_simulation_step(self, iteration: int, time: float):
        """
        Called after each simulation step.

        Args:
            iteration: Current simulation iteration number
            time: Current simulation time
        """
        pass

    def handle_command(self, command: MobilityCommand, node: Node):
        """
        Performs a mobility command. This method is called by the node's
        provider to transmit it's mobility command to the mobility handler.

        Args:
            command: Command being issued
            node: Node that issued the command
        """
        if node.id not in self._nodes:
            raise Exception("Error handling commands: Cannot handle command from unregistered node")

        if command.command_type == MobilityCommandType.SET_SPEED:
            if command.param_1 is None or command.param_2 is None or command.param_3 is None:
                raise Exception("Velocity SET_SPEED command requires three parameters (vx, vy, vz). "
                                "Make sure that the command is properly constructed using SetVelocityMobilityCommand.")
            self._set_velocity(node.id, (command.param_1, command.param_2, command.param_3))
        else:
            raise Exception("The only command accepted in DynamicVelocityMobilityHandler is SET_SPEED")

    def _set_velocity(self, node_id: int, v_des: Tuple[float, float, float]) -> None:
        """
        Set the desired velocity for a node.

        The velocity persists until updated by a new command. To stop a node,
        explicitly command zero velocity: (0.0, 0.0, 0.0).

        Args:
            node_id: Identifier of the node to control.
            v_des: Desired velocity as (vx, vy, vz) in m/s.

        Example:
            # Move northeast at 5 m/s horizontally, ascending at 2 m/s
            handler.set_velocity(node.id, (3.54, 3.54, 2.0))

            # Stop the node
            handler.set_velocity(node.id, (0.0, 0.0, 0.0))
        """
        if node_id not in self._desired_velocity:
            # Initialize state if node not yet known
            self._current_velocity[node_id] = (0.0, 0.0, 0.0)
            self._desired_velocity[node_id] = v_des
            self._update_counter[node_id] = 0
        else:
            self._desired_velocity[node_id] = v_des

    def _get_node_velocity(self, node_id: int) -> Optional[Tuple[float, float, float]]:
        """
        Get the current velocity of a node.

        Args:
            node_id: ID of the node

        Returns:
            Velocity vector (vx, vy, vz) in m/s, or None if node not registered
        """
        return self._current_velocity.get(node_id)

    def _get_node_position(self, node_id: int) -> Optional[Tuple[float, float, float]]:
        """
        Get the current position of a node.

        Args:
            node_id: ID of the node

        Returns:
            Position vector (x, y, z) in meters, or None if node not registered
        """
        node = self._nodes.get(node_id)
        return node.position if node is not None else None

    def _mobility_update(self):
        """
        Perform a single mobility update step for all nodes.

        This method:
        1. Iterates through all registered nodes
        2. Retrieves the current and desired velocities for each
        3. Applies acceleration limits to compute the new velocity
        4. Applies velocity saturation
        5. Updates the node position using numerical integration
        6. Optionally emits telemetry
        7. Schedules the next update
        """
        dt = self._config.update_rate

        # Update all nodes
        for node_id, node in self._nodes.items():
            # Get current and desired velocities
            v_current = self._current_velocity[node_id]
            v_desired = self._desired_velocity[node_id]

            # UAV trajectory model:
            # - The protocol commands a desired velocity v_desired.
            # - The vehicle tracks it with bounded acceleration.
            # - Optionally, a 1st-order lag (time constant tau) shapes the transient:
            #       a* = (v_des - v) / tau
            #   then a is saturated and integrated.
            if self._config.tau_xy is None and self._config.tau_z is None:
                v_new = apply_acceleration_limits(
                    v_current,
                    v_desired,
                    dt,
                    self._config.max_acc_xy,
                    self._config.max_acc_z,
                )
            else:
                v_new = apply_velocity_tracking_first_order(
                    v_current,
                    v_desired,
                    dt,
                    self._config.max_acc_xy,
                    self._config.max_acc_z,
                    tau_xy=self._config.tau_xy,
                    tau_z=self._config.tau_z,
                )

            # Apply velocity saturation
            v_new = apply_velocity_limits(
                v_new,
                self._config.max_speed_xy,
                self._config.max_speed_z
            )

            # Update position: x_{k+1} = x_k + v_k * dt
            new_pos = integrate_position(node.position, v_new, dt)
            node.position = new_pos

            # Update stored velocity
            self._current_velocity[node_id] = v_new

            # Update counter and emit telemetry if needed
            self._update_counter[node_id] += 1
            if self._should_emit_telemetry(node_id):
                self._emit_telemetry(node)

        # Schedule next update
        self._loop.schedule_event(
            self._loop.current_time + self._config.update_rate,
            self._mobility_update
        )

    def _should_emit_telemetry(self, node_id: int) -> bool:
        """
        Determine if telemetry should be emitted on this update.

        Args:
            node_id: Identifier of the node.

        Returns:
            True if telemetry should be emitted, False otherwise.
        """
        if not self._config.send_telemetry:
            return False

        count = self._update_counter[node_id]
        return (count % self._config.telemetry_decimation) == 0

    def _emit_telemetry(self, node: Node):
        """
        Emit a Telemetry message with the current node position.

        Args:
            node: The node to emit telemetry for.

        The telemetry is sent directly to the node's protocol encapsulator.
        """
        telemetry = DynamicVelocityTelemetry(
            current_position=node.position,
            current_velocity=self._get_node_velocity(node.id),
        )

        # Schedule telemetry delivery to protocol (same pattern as MobilityHandler)
        def send_telemetry():
            node.protocol_encapsulator.handle_telemetry(telemetry)

        self._loop.schedule_event(
            self._loop.current_time,
            send_telemetry,
            f"Node {node.id} handle_telemetry"
        )

__init__(config)

Initialize the Dynamic Velocity Mobility Handler.

Parameters:

Name Type Description Default
config DynamicVelocityMobilityConfiguration

Configuration parameters for mobility constraints and update rate.

required
Source code in gradysim/simulator/handler/mobility/dynamic_velocity/handler.py
def __init__(self, config: DynamicVelocityMobilityConfiguration):
    """
    Initialize the Dynamic Velocity Mobility Handler.

    Args:
        config: Configuration parameters for mobility constraints and update rate.
    """
    self._config = config
    self._loop: EventLoop = None
    self._nodes: Dict[int, Node] = {}  # Track all registered nodes

    # Per-node state: current velocity and desired velocity
    self._current_velocity: Dict[int, Tuple[float, float, float]] = {}
    self._desired_velocity: Dict[int, Tuple[float, float, float]] = {}

    # Telemetry tracking: count updates per node
    self._update_counter: Dict[int, int] = {}

after_simulation_step(iteration, time)

Called after each simulation step.

Parameters:

Name Type Description Default
iteration int

Current simulation iteration number

required
time float

Current simulation time

required
Source code in gradysim/simulator/handler/mobility/dynamic_velocity/handler.py
def after_simulation_step(self, iteration: int, time: float):
    """
    Called after each simulation step.

    Args:
        iteration: Current simulation iteration number
        time: Current simulation time
    """
    pass

finalize()

Finalize handler after simulation ends (not used by this handler).

Source code in gradysim/simulator/handler/mobility/dynamic_velocity/handler.py
def finalize(self):
    """Finalize handler after simulation ends (not used by this handler)."""
    pass

handle_command(command, node)

Performs a mobility command. This method is called by the node's provider to transmit it's mobility command to the mobility handler.

Parameters:

Name Type Description Default
command MobilityCommand

Command being issued

required
node Node

Node that issued the command

required
Source code in gradysim/simulator/handler/mobility/dynamic_velocity/handler.py
def handle_command(self, command: MobilityCommand, node: Node):
    """
    Performs a mobility command. This method is called by the node's
    provider to transmit it's mobility command to the mobility handler.

    Args:
        command: Command being issued
        node: Node that issued the command
    """
    if node.id not in self._nodes:
        raise Exception("Error handling commands: Cannot handle command from unregistered node")

    if command.command_type == MobilityCommandType.SET_SPEED:
        if command.param_1 is None or command.param_2 is None or command.param_3 is None:
            raise Exception("Velocity SET_SPEED command requires three parameters (vx, vy, vz). "
                            "Make sure that the command is properly constructed using SetVelocityMobilityCommand.")
        self._set_velocity(node.id, (command.param_1, command.param_2, command.param_3))
    else:
        raise Exception("The only command accepted in DynamicVelocityMobilityHandler is SET_SPEED")

initialize()

Start the periodic mobility update loop.

Schedules the first mobility update event for all nodes.

Source code in gradysim/simulator/handler/mobility/dynamic_velocity/handler.py
def initialize(self):
    """
    Start the periodic mobility update loop.

    Schedules the first mobility update event for all nodes.
    """
    if self._nodes:  # Only schedule if we have nodes
        self._loop.schedule_event(
            self._loop.current_time + self._config.update_rate,
            self._mobility_update
        )

inject(event_loop)

Inject the event loop into the handler.

Parameters:

Name Type Description Default
event_loop EventLoop

The event loop for scheduling periodic updates.

required
Source code in gradysim/simulator/handler/mobility/dynamic_velocity/handler.py
def inject(self, event_loop: EventLoop):
    """
    Inject the event loop into the handler.

    Args:
        event_loop: The event loop for scheduling periodic updates.
    """
    self._loop = event_loop

register_node(node)

Register a node with this handler (called when node is created).

Parameters:

Name Type Description Default
node Node

The node instance to register.

required
Source code in gradysim/simulator/handler/mobility/dynamic_velocity/handler.py
def register_node(self, node: Node):
    """
    Register a node with this handler (called when node is created).

    Args:
        node: The node instance to register.
    """
    # Initialize state for this node
    node_id = node.id
    self._nodes[node_id] = node
    self._current_velocity[node_id] = (0.0, 0.0, 0.0)
    self._desired_velocity[node_id] = (0.0, 0.0, 0.0)
    self._update_counter[node_id] = 0

Configuration

gradysim.simulator.handler.mobility.dynamic_velocity.config.DynamicVelocityMobilityConfiguration dataclass

Configuration parameters for the Dynamic Velocity Mobility Handler.

Attributes:

Name Type Description
update_rate float

Time interval (in seconds) between mobility updates. Typical for trajectory-level quadrotor sims: 0.01–0.05 s.

max_speed_xy float

Maximum horizontal speed (m/s). Applied to the norm of (vx, vy). Typical: 3–15 m/s (depends heavily on vehicle and mission).

max_speed_z float

Maximum vertical speed (m/s). Applied to |vz|. Typical: 1–8 m/s.

max_acc_xy float

Maximum horizontal acceleration (m/s²). Applied to the norm of (ax, ay). Typical: 2–8 m/s².

max_acc_z float

Maximum vertical acceleration (m/s²). Applied to |az|. Typical: 2–10 m/s².

tau_xy Optional[float]

Optional first-order time constant (seconds) for horizontal velocity tracking. When set, the model behaves like a 1st-order system + acceleration saturation: a* = (v_des - v) / tau_xy Typical: 0.3–0.8 s.

tau_z Optional[float]

Optional first-order time constant (seconds) for vertical velocity tracking. Typical: 0.5–1.2 s.

send_telemetry bool

If True, emit Telemetry messages after position updates.

telemetry_decimation int

Emit telemetry every N mobility updates (default: 1).

Source code in gradysim/simulator/handler/mobility/dynamic_velocity/config.py
@dataclass
class DynamicVelocityMobilityConfiguration:
    """
    Configuration parameters for the Dynamic Velocity Mobility Handler.

    Attributes:
        update_rate: Time interval (in seconds) between mobility updates.
            Typical for trajectory-level quadrotor sims: 0.01–0.05 s.
        max_speed_xy: Maximum horizontal speed (m/s). Applied to the norm of (vx, vy).
            Typical: 3–15 m/s (depends heavily on vehicle and mission).
        max_speed_z: Maximum vertical speed (m/s). Applied to |vz|.
            Typical: 1–8 m/s.
        max_acc_xy: Maximum horizontal acceleration (m/s²). Applied to the norm of (ax, ay).
            Typical: 2–8 m/s².
        max_acc_z: Maximum vertical acceleration (m/s²). Applied to |az|.
            Typical: 2–10 m/s².
        tau_xy: Optional first-order time constant (seconds) for horizontal velocity tracking.
            When set, the model behaves like a 1st-order system + acceleration saturation:
                a* = (v_des - v) / tau_xy
            Typical: 0.3–0.8 s.
        tau_z: Optional first-order time constant (seconds) for vertical velocity tracking.
            Typical: 0.5–1.2 s.
        send_telemetry: If True, emit Telemetry messages after position updates.
        telemetry_decimation: Emit telemetry every N mobility updates (default: 1).
    """
    update_rate: float
    max_speed_xy: float
    max_speed_z: float
    max_acc_xy: float
    max_acc_z: float
    tau_xy: Optional[float] = None
    tau_z: Optional[float] = None
    send_telemetry: bool = True
    telemetry_decimation: int = 1

Telemetry

gradysim.simulator.handler.mobility.dynamic_velocity.telemetry.DynamicVelocityTelemetry dataclass

Bases: Telemetry

Telemetry message emitted by DynamicVelocityMobilityHandler after position updates.

This class extends the base Telemetry message with additional fields specifying the node's velocity, which is relevant information for protocols using dynamic velocity mobility.

Source code in gradysim/simulator/handler/mobility/dynamic_velocity/telemetry.py
@dataclass
class DynamicVelocityTelemetry(Telemetry):
    """
    Telemetry message emitted by DynamicVelocityMobilityHandler after position updates.

    This class extends the base Telemetry message with additional fields
    specifying the node's velocity, which is relevant information for
    protocols using dynamic velocity mobility.
    """
    current_velocity: Tuple[float, float, float]

Core Utilities

gradysim.simulator.handler.mobility.dynamic_velocity.core

Pure mathematical functions for velocity-based mobility.

This module contains stateless mathematical operations for: - Acceleration limiting - Velocity saturation - Position integration

All functions operate on simple tuples and floats, making them easy to test and reuse independently of the simulation framework.

apply_acceleration_limits(v_current, v_desired, dt, max_acc_xy, max_acc_z)

Limit velocity change based on acceleration constraints.

Applies independent horizontal and vertical acceleration limits:

  • Horizontal: ||a_xy|| <= max_acc_xy
  • Vertical: |a_z| <= max_acc_z

Parameters:

Name Type Description Default
v_current Tuple[float, float, float]

Current velocity (vx, vy, vz) in m/s.

required
v_desired Tuple[float, float, float]

Desired velocity (vx, vy, vz) in m/s.

required
dt float

Time step in seconds.

required
max_acc_xy float

Maximum horizontal acceleration in m/s².

required
max_acc_z float

Maximum vertical acceleration in m/s².

required

Returns:

Type Description
Tuple[float, float, float]

New velocity after applying acceleration limits.

Example

v_cur = (0.0, 0.0, 0.0) v_des = (10.0, 10.0, 5.0) v_new = apply_acceleration_limits(v_cur, v_des, 0.1, 2.0, 1.0)

Horizontal component limited to 2.0 m/s² * 0.1 s = 0.2 m/s change

Vertical component limited to 1.0 m/s² * 0.1 s = 0.1 m/s change

Source code in gradysim/simulator/handler/mobility/dynamic_velocity/core.py
def apply_acceleration_limits(
    v_current: Tuple[float, float, float],
    v_desired: Tuple[float, float, float],
    dt: float,
    max_acc_xy: float,
    max_acc_z: float
) -> Tuple[float, float, float]:
    """
    Limit velocity change based on acceleration constraints.

    Applies independent horizontal and vertical acceleration limits:

    - Horizontal: ||a_xy|| <= max_acc_xy
    - Vertical: |a_z| <= max_acc_z

    Args:
        v_current: Current velocity (vx, vy, vz) in m/s.
        v_desired: Desired velocity (vx, vy, vz) in m/s.
        dt: Time step in seconds.
        max_acc_xy: Maximum horizontal acceleration in m/s².
        max_acc_z: Maximum vertical acceleration in m/s².

    Returns:
        New velocity after applying acceleration limits.

    Example:
        >>> v_cur = (0.0, 0.0, 0.0)
        >>> v_des = (10.0, 10.0, 5.0)
        >>> v_new = apply_acceleration_limits(v_cur, v_des, 0.1, 2.0, 1.0)
        >>> # Horizontal component limited to 2.0 m/s² * 0.1 s = 0.2 m/s change
        >>> # Vertical component limited to 1.0 m/s² * 0.1 s = 0.1 m/s change
    """
    # Decompose into horizontal (xy) and vertical (z) components
    vx_cur, vy_cur, vz_cur = v_current
    vx_des, vy_des, vz_des = v_desired

    # Horizontal acceleration limiting
    dvx = vx_des - vx_cur
    dvy = vy_des - vy_cur
    dv_xy_norm = math.sqrt(dvx**2 + dvy**2)

    max_dv_xy = max_acc_xy * dt

    if dv_xy_norm > max_dv_xy:
        # Scale down to respect acceleration limit
        scale = max_dv_xy / dv_xy_norm
        dvx *= scale
        dvy *= scale

    vx_new = vx_cur + dvx
    vy_new = vy_cur + dvy

    # Vertical acceleration limiting
    dvz = vz_des - vz_cur
    max_dvz = max_acc_z * dt

    if abs(dvz) > max_dvz:
        dvz = math.copysign(max_dvz, dvz)

    vz_new = vz_cur + dvz

    return (vx_new, vy_new, vz_new)

apply_velocity_limits(v, max_speed_xy, max_speed_z)

Apply velocity saturation constraints.

Applies independent horizontal and vertical velocity limits:

  • Horizontal: ||v_xy|| <= max_speed_xy
  • Vertical: |v_z| <= max_speed_z

Parameters:

Name Type Description Default
v Tuple[float, float, float]

Velocity vector (vx, vy, vz) in m/s.

required
max_speed_xy float

Maximum horizontal speed in m/s.

required
max_speed_z float

Maximum vertical speed in m/s.

required

Returns:

Type Description
Tuple[float, float, float]

Saturated velocity vector.

Example

v = (15.0, 15.0, 10.0) v_sat = apply_velocity_limits(v, 10.0, 5.0)

Horizontal norm is 21.21 m/s, scaled down to 10.0 m/s

Vertical component clamped to 5.0 m/s

Source code in gradysim/simulator/handler/mobility/dynamic_velocity/core.py
def apply_velocity_limits(
    v: Tuple[float, float, float],
    max_speed_xy: float,
    max_speed_z: float
) -> Tuple[float, float, float]:
    """
    Apply velocity saturation constraints.

    Applies independent horizontal and vertical velocity limits:

    - Horizontal: ||v_xy|| <= max_speed_xy
    - Vertical: |v_z| <= max_speed_z

    Args:
        v: Velocity vector (vx, vy, vz) in m/s.
        max_speed_xy: Maximum horizontal speed in m/s.
        max_speed_z: Maximum vertical speed in m/s.

    Returns:
        Saturated velocity vector.

    Example:
        >>> v = (15.0, 15.0, 10.0)
        >>> v_sat = apply_velocity_limits(v, 10.0, 5.0)
        >>> # Horizontal norm is 21.21 m/s, scaled down to 10.0 m/s
        >>> # Vertical component clamped to 5.0 m/s
    """
    vx, vy, vz = v

    # Horizontal velocity saturation
    v_xy_norm = math.sqrt(vx**2 + vy**2)

    if v_xy_norm > max_speed_xy:
        scale = max_speed_xy / v_xy_norm
        vx *= scale
        vy *= scale

    # Vertical velocity saturation
    if abs(vz) > max_speed_z:
        vz = math.copysign(max_speed_z, vz)

    return (vx, vy, vz)

apply_velocity_tracking_first_order(v_current, v_desired, dt, max_acc_xy, max_acc_z, tau_xy, tau_z)

Track desired velocity using a 1st-order lag + acceleration saturation.

This is a trajectory-level quadrotor-friendly model:

a* = (v_des - v) / tau
a  = sat(a*, max_acc)
v+ = v + a * dt

Horizontal saturation is applied on the norm of (ax, ay), while vertical saturation is applied on |az|.

When tau_xy/tau_z are None, this function falls back to the existing "acceleration-limited step" behavior for that axis.

Source code in gradysim/simulator/handler/mobility/dynamic_velocity/core.py
def apply_velocity_tracking_first_order(
    v_current: Tuple[float, float, float],
    v_desired: Tuple[float, float, float],
    dt: float,
    max_acc_xy: float,
    max_acc_z: float,
    tau_xy: Optional[float],
    tau_z: Optional[float],
) -> Tuple[float, float, float]:
    """Track desired velocity using a 1st-order lag + acceleration saturation.

    This is a trajectory-level quadrotor-friendly model:

        a* = (v_des - v) / tau
        a  = sat(a*, max_acc)
        v+ = v + a * dt

    Horizontal saturation is applied on the norm of (ax, ay), while vertical
    saturation is applied on |az|.

    When tau_xy/tau_z are None, this function falls back to the existing
    "acceleration-limited step" behavior for that axis.
    """
    if dt <= 0:
        raise ValueError("dt must be > 0")
    if max_acc_xy < 0 or max_acc_z < 0:
        raise ValueError("max_acc must be >= 0")
    if tau_xy is not None and tau_xy <= 0:
        raise ValueError("tau_xy must be > 0 when provided")
    if tau_z is not None and tau_z <= 0:
        raise ValueError("tau_z must be > 0 when provided")

    vx_cur, vy_cur, vz_cur = v_current
    vx_des, vy_des, vz_des = v_desired

    # Horizontal (xy)
    if tau_xy is None:
        # Preserve legacy behavior for horizontal plane
        dvx = vx_des - vx_cur
        dvy = vy_des - vy_cur
        dv_xy_norm = math.sqrt(dvx**2 + dvy**2)
        max_dv_xy = max_acc_xy * dt
        if dv_xy_norm > max_dv_xy and dv_xy_norm > 0:
            scale = max_dv_xy / dv_xy_norm
            dvx *= scale
            dvy *= scale
        vx_new = vx_cur + dvx
        vy_new = vy_cur + dvy
    else:
        ax_des = (vx_des - vx_cur) / tau_xy
        ay_des = (vy_des - vy_cur) / tau_xy
        a_xy_norm = math.sqrt(ax_des**2 + ay_des**2)
        if a_xy_norm > max_acc_xy and a_xy_norm > 0:
            scale = max_acc_xy / a_xy_norm
            ax_des *= scale
            ay_des *= scale
        vx_new = vx_cur + ax_des * dt
        vy_new = vy_cur + ay_des * dt

    # Vertical (z)
    if tau_z is None:
        dvz = vz_des - vz_cur
        max_dvz = max_acc_z * dt
        if abs(dvz) > max_dvz:
            dvz = math.copysign(max_dvz, dvz)
        vz_new = vz_cur + dvz
    else:
        az_des = (vz_des - vz_cur) / tau_z
        if abs(az_des) > max_acc_z:
            az_des = math.copysign(max_acc_z, az_des)
        vz_new = vz_cur + az_des * dt

    return (vx_new, vy_new, vz_new)

integrate_position(position, velocity, dt)

Update position using simple Euler integration.

Implements: x_{k+1} = x_k + v_k * dt

Parameters:

Name Type Description Default
position Tuple[float, float, float]

Current position (x, y, z) in meters.

required
velocity Tuple[float, float, float]

Current velocity (vx, vy, vz) in m/s.

required
dt float

Time step in seconds.

required

Returns:

Type Description
Tuple[float, float, float]

New position after integration step.

Example

pos = (0.0, 0.0, 0.0) vel = (1.0, 2.0, 0.5) new_pos = integrate_position(pos, vel, 0.1) new_pos (0.1, 0.2, 0.05)

Source code in gradysim/simulator/handler/mobility/dynamic_velocity/core.py
def integrate_position(
    position: Tuple[float, float, float],
    velocity: Tuple[float, float, float],
    dt: float
) -> Tuple[float, float, float]:
    """
    Update position using simple Euler integration.

    Implements: x_{k+1} = x_k + v_k * dt

    Args:
        position: Current position (x, y, z) in meters.
        velocity: Current velocity (vx, vy, vz) in m/s.
        dt: Time step in seconds.

    Returns:
        New position after integration step.

    Example:
        >>> pos = (0.0, 0.0, 0.0)
        >>> vel = (1.0, 2.0, 0.5)
        >>> new_pos = integrate_position(pos, vel, 0.1)
        >>> new_pos
        (0.1, 0.2, 0.05)
    """
    x, y, z = position
    vx, vy, vz = velocity

    return (
        x + vx * dt,
        y + vy * dt,
        z + vz * dt
    )