Skip to content

Ardupilot mobility

ArdupilotMobilityHandler serves as an alternate mobility implementation for traditional mobility commands.

gradysim.simulator.handler.ardupilot_mobility.ArdupilotMobilityHandler

Bases: INodeHandler

Introduces mobility into the simulatuon by communicating with a SITL-based simulation of the Node. Works by sending requests to UAV API library which connects to the Ardupilot software. It implements telemetry by constantly making requests to 'telemetry/ned' at a fixed rate and translating mobility commands into HTTP requests to UAV API

Source code in gradysim/simulator/handler/ardupilot_mobility.py
class ArdupilotMobilityHandler(INodeHandler):
    """
    Introduces mobility into the simulatuon by communicating with a SITL-based simulation of the Node. Works by
    sending requests to UAV API library which connects to the Ardupilot software. It implements telemetry by
    constantly making requests to 'telemetry/ned' at a fixed rate and translating mobility commands into HTTP 
    requests to UAV API
    """
    @staticmethod
    def get_label() -> str:
        return "mobility"

    _event_loop: EventLoop
    _configuration: ArdupilotMobilityConfiguration
    _logger: logging.Logger
    _report: Dict[str, int]
    _injected: bool

    nodes: Dict[int, Node]
    drones: Dict[int, Drone]
    def __init__(self, configuration: ArdupilotMobilityConfiguration = ArdupilotMobilityConfiguration()):
        """
        Constructor for the Ardupilot mobility handler

        Args:
            configuration: Configuration for the Ardupilot mobility handle. This includes parameters used in 
            UAV API initialization If not set all default values will be used.
        """
        self._configuration = configuration
        self.nodes = {}
        self.drones = {}
        self._injected = False
        self._logger = logging.getLogger()
        self._report = {}
        self._loop = asyncio.get_event_loop()

    def inject(self, event_loop: EventLoop):
        self._injected = True
        self._event_loop = event_loop

    def register_node(self, node: Node):
        """
        Instantiates Drone object equivalent to the Node provided. Starts the UAV API
        process associated with this instance.

        Args:
            node: the Node instance that will be registered in the handler
        """

        if not self._injected:
            self._ardupilot_error("Error registering node: cannot register nodes while Ardupilot mobility handler "
                                    "is uninitialized.")

        self.drones[node.id] = Drone(node.id, node.position, self._logger, self._configuration.starting_api_port)

        if self._configuration.simulate_drones:
            self.drones[node.id].start_simulated_drone(
                ground_station_ip=self._configuration.ground_station_ip,
                speedup=self._configuration.simulation_startup_speedup,
                ardupilot_path=self._configuration.ardupilot_path,
                uav_api_log_path=self._configuration.uav_api_log_path
            )
        self.nodes[node.id] = node

    async def _initialize_report(self):
        """
        Initializes properties for tracking variables used in report
        """
        for node_id in self.nodes.keys():
            drone = self.drones[node_id]
            battery_level = await drone.get_battery_level()
            if battery_level is None:
                self._logger.debug(f"Error fetching battery level for node {node_id}. Cancelling report generation...")
                self._configuration.generate_report = False
                return
            self._report[node_id] = {
                "initial_battery": battery_level,
                "telemetry_requests": 0,
                "telemetry_drops": 0
            }

    async def _initialize_drones(self):
        """
        Sends each drone to the starting position through the goto_initial_position routine.
        Each drone has it's own routine and they run concurrently.
        """
        for node_id in self.nodes.keys():
            http_session = aiohttp.ClientSession()
            drone = self.drones[node_id]
            drone.set_session(http_session)

        if self._configuration.simulate_drones:
            time.sleep(SITL_SLEEP_TIME) # Wait for API process to start

        drone_init_tasks = []
        for node_id in self.nodes.keys():
            drone_init_tasks.append(asyncio.create_task(self.drones[node_id].goto_initial_position()))

        try:
            await asyncio.gather(*drone_init_tasks)  
        except Exception as e:
            self._logger.error(e)
            self._ardupilot_error("Error initializing drones.")

        if not self._configuration.simulate_drones:
            return

    def initialize(self):
        self._loop.run_until_complete(self._initialize_drones())
        if self._configuration.generate_report:
            self._loop.run_until_complete(self._initialize_report())
        self._setup_telemetry()     

    def _ardupilot_error(self, message):
        """
        Prints an error message and shutdowns Drone instances. This function
        should be called when an error has ocurred in the connection with 
        UAV API. Raises an ArdupilotMobilityHandlerException

        Args:
            message: the message to be printed
        """
        try:
            asyncio.get_running_loop()
            tasks = []
            for node_id in self.drones.keys():
                drone = self.drones[node_id]
                try:
                    tasks.append(asyncio.create_task(drone.shutdown()))
                except Exception as e:
                    self._logger.error(f"Error scheduling drone shutdown. {e}")
                    continue

            asyncio.gather(*tasks)
        except RuntimeError:
            event_loop = asyncio.get_event_loop()
            for node_id in self.drones.keys():
                drone = self.drones[node_id]
                try:
                    shutdown_result = event_loop.run_until_complete(drone.shutdown())
                    self._logger.debug(f"Shutdown_result: {shutdown_result}")
                except Exception as e:
                    self._logger.error(f"Error shutting down drone. {e}")
                    continue
        raise ArdupilotMobilityException(message)

    def _setup_telemetry(self):
        """
        Initiates a recorrent telemetry event at a fixed rate for each node. Every time the event
        fires, it checks if the last position information was updated. If not, it simply skips.
        If it was updated, it calls handle_telemetry method of the corresponding node and requests
        a new telemetry update.
        """
        def send_telemetry(node_id):
            node = self.nodes[node_id]
            drone = self.drones[node_id]

            if self._configuration.generate_report:
                self._report[node_id]["telemetry_requests"] += 1
            if not drone.telemetry_requested:
                node.position = drone.position
                telemetry = Telemetry(current_position=node.position)
                node.protocol_encapsulator.handle_telemetry(telemetry)
                self._logger.debug(f"Telemetry sent for node {node_id}. Value")
                drone.request_telemetry()
                self._logger.debug(f"Telemetry requested for node {node_id}.")
            else:
                if self._configuration.generate_report:
                    self._report[node_id]["telemetry_drops"] += 1
                self._logger.debug(f"Telemetry already requested for node {node_id}, skipping.")


            self._event_loop.schedule_event(self._event_loop.current_time + self._configuration.update_rate,
                                    make_send_telemetry(node_id), "ArdupilotMobility")

        def make_send_telemetry(node_id):
            return lambda: send_telemetry(node_id)

        for node_id in self.nodes.keys():
            self._event_loop.schedule_event(self._event_loop.current_time,
                    make_send_telemetry(node_id), "ArdupilotMobility")

    def handle_command(self, command: MobilityCommand, node: Node):
        """
        Performs a mobility command in the SITL-based simulation. This method is called
        by the node's provider to transmit it's mobility command to the ardupilot mobility
        handler and then to the node's UAV API.

        Args:
            command: Command being issued
            node: Node that issued the command
        """
        drone = self.drones[node.id]
        self._logger.debug(f"Handling command: {command.command_type}, {command.param_1}, {command.param_2}, {command.param_3}")
        if node.id not in self.nodes:
            self._ardupilot_error("Error handling commands: Cannot handle command from unregistered node")

        if command.command_type == MobilityCommandType.GOTO_COORDS:
            drone.move_to_xyz((command.param_1, command.param_2, command.param_3))
        elif command.command_type == MobilityCommandType.GOTO_GEO_COORDS:
            drone.move_to_gps(command.param_1, command.param_2, command.param_3)
        elif command.command_type == MobilityCommandType.SET_SPEED:
            drone.set_speed(command.param_1)
        elif command.command_type == MobilityCommandType.STOP:
            drone.stop()

    async def _finalize_report(self):
        """
        Ends report tracking and outputs csv file with report information.
        """
        report_str = ""
        report_str += "GENERATING ARDUPILOT MOBILITY HANDLER REPORT:\n"
        for node_id in self.nodes.keys():
            drone = self.drones[node_id]
            battery_level = await drone.get_battery_level()
            if battery_level is None:
                self._logger.debug(f"Error fetching battery level for node {node_id}. Cancelling report generation...")
                self._configuration.generate_report = False
                return
            self._report[node_id]["final_battery"] = battery_level
            report_str += f"Report for drone {node_id}:\n"
            report_str += str(self._report[node_id]) + "\n\n"

        try:
            csv_path = "ardupilot_mobility_report.csv"
            with open(csv_path, "w", newline="") as csvfile:
                writer = csv.writer(csvfile)
                writer.writerow(["node_id", "telemetry_requests", "telemetry_drops", "battery_wasted"])
                for node_id in self.nodes.keys():
                    entry = self._report.get(node_id, {})
                    req = entry.get("telemetry_requests", 0)
                    drops = entry.get("telemetry_drops", 0)
                    initial_b = entry.get("initial_battery")
                    final_b = entry.get("final_battery")
                    wasted = ""
                    try:
                        if initial_b is not None and final_b is not None:
                            wasted = float(initial_b) - float(final_b)
                    except Exception:
                        wasted = ""
                    writer.writerow([node_id, req, drops, wasted])
            self._logger.info(f"Ardupilot mobility CSV report written to {csv_path}")
        except Exception as e:
            self._logger.debug(f"Failed writing CSV report: {e}")

        self._logger.info(report_str)
    def finalize(self):
        """Ends simulation by finalizing report and shutting down drones."""
        if self._configuration.generate_report:
            self._loop.run_until_complete(self._finalize_report())
        for node_id in self.drones.keys():
            drone = self.drones[node_id]
            self._loop.run_until_complete(drone.shutdown())

        self._loop.close()

__init__(configuration=ArdupilotMobilityConfiguration())

Constructor for the Ardupilot mobility handler

Parameters:

Name Type Description Default
configuration ArdupilotMobilityConfiguration

Configuration for the Ardupilot mobility handle. This includes parameters used in

ArdupilotMobilityConfiguration()
Source code in gradysim/simulator/handler/ardupilot_mobility.py
def __init__(self, configuration: ArdupilotMobilityConfiguration = ArdupilotMobilityConfiguration()):
    """
    Constructor for the Ardupilot mobility handler

    Args:
        configuration: Configuration for the Ardupilot mobility handle. This includes parameters used in 
        UAV API initialization If not set all default values will be used.
    """
    self._configuration = configuration
    self.nodes = {}
    self.drones = {}
    self._injected = False
    self._logger = logging.getLogger()
    self._report = {}
    self._loop = asyncio.get_event_loop()

finalize()

Ends simulation by finalizing report and shutting down drones.

Source code in gradysim/simulator/handler/ardupilot_mobility.py
def finalize(self):
    """Ends simulation by finalizing report and shutting down drones."""
    if self._configuration.generate_report:
        self._loop.run_until_complete(self._finalize_report())
    for node_id in self.drones.keys():
        drone = self.drones[node_id]
        self._loop.run_until_complete(drone.shutdown())

    self._loop.close()

handle_command(command, node)

Performs a mobility command in the SITL-based simulation. This method is called by the node's provider to transmit it's mobility command to the ardupilot mobility handler and then to the node's UAV API.

Parameters:

Name Type Description Default
command MobilityCommand

Command being issued

required
node Node

Node that issued the command

required
Source code in gradysim/simulator/handler/ardupilot_mobility.py
def handle_command(self, command: MobilityCommand, node: Node):
    """
    Performs a mobility command in the SITL-based simulation. This method is called
    by the node's provider to transmit it's mobility command to the ardupilot mobility
    handler and then to the node's UAV API.

    Args:
        command: Command being issued
        node: Node that issued the command
    """
    drone = self.drones[node.id]
    self._logger.debug(f"Handling command: {command.command_type}, {command.param_1}, {command.param_2}, {command.param_3}")
    if node.id not in self.nodes:
        self._ardupilot_error("Error handling commands: Cannot handle command from unregistered node")

    if command.command_type == MobilityCommandType.GOTO_COORDS:
        drone.move_to_xyz((command.param_1, command.param_2, command.param_3))
    elif command.command_type == MobilityCommandType.GOTO_GEO_COORDS:
        drone.move_to_gps(command.param_1, command.param_2, command.param_3)
    elif command.command_type == MobilityCommandType.SET_SPEED:
        drone.set_speed(command.param_1)
    elif command.command_type == MobilityCommandType.STOP:
        drone.stop()

register_node(node)

Instantiates Drone object equivalent to the Node provided. Starts the UAV API process associated with this instance.

Parameters:

Name Type Description Default
node Node

the Node instance that will be registered in the handler

required
Source code in gradysim/simulator/handler/ardupilot_mobility.py
def register_node(self, node: Node):
    """
    Instantiates Drone object equivalent to the Node provided. Starts the UAV API
    process associated with this instance.

    Args:
        node: the Node instance that will be registered in the handler
    """

    if not self._injected:
        self._ardupilot_error("Error registering node: cannot register nodes while Ardupilot mobility handler "
                                "is uninitialized.")

    self.drones[node.id] = Drone(node.id, node.position, self._logger, self._configuration.starting_api_port)

    if self._configuration.simulate_drones:
        self.drones[node.id].start_simulated_drone(
            ground_station_ip=self._configuration.ground_station_ip,
            speedup=self._configuration.simulation_startup_speedup,
            ardupilot_path=self._configuration.ardupilot_path,
            uav_api_log_path=self._configuration.uav_api_log_path
        )
    self.nodes[node.id] = node

gradysim.simulator.handler.ardupilot_mobility.ArdupilotMobilityConfiguration dataclass

Configuration class for the Ardupilot mobility handler

Source code in gradysim/simulator/handler/ardupilot_mobility.py
@dataclass
class ArdupilotMobilityConfiguration:
    """
    Configuration class for the Ardupilot mobility handler
    """

    update_rate: float = 0.5
    """Interval in simulation seconds between Ardupilot telemetry updates"""

    default_speed: float = 10
    """Default starting airspeed of a node in m/s"""

    reference_coordinates: Tuple[float, float, float] = (0, 0, 0)
    """
    These coordinates are used as a reference frame to convert geographical coordinates to cartesian coordinates. They
    will be used as the center of the scene and all geographical coordinates will be converted relative to it.
    """

    generate_report: bool = True
    """Whether to output a report in the end of the simulation or not"""

    simulate_drones: bool = True
    """Wheter to use SITL or connect to real vehicle"""

    ground_station_ip: str = None
    """If provided and simulate_drones is True, this ip is used in UAV API to connect a GroundStation software to the simulated vehicle"""

    starting_api_port: int = 8000
    """
    The port in which UAV API of Node 0 will run. Following nodes use the ports in sequence by
    the formula port_of_node_{node_id} = starting_api_port + {node_id}
    """

    ardupilot_path: str = None
    """Path for cloned ardupilot repository. Used for SITL initialization when in simulated mode"""

    uav_api_log_path: str = None
    """Path in which UAV API will save log files. Used in simulated mode."""

    simulation_startup_speedup: int = 1
    """
    Multiplier for SITL simulation time. This value will only affect the setup of the simulation,
    after all drones are positioned in the right place and are ready to start, the simulation time
    goes back to matching real time.    
    """

ardupilot_path: str = None class-attribute instance-attribute

Path for cloned ardupilot repository. Used for SITL initialization when in simulated mode

default_speed: float = 10 class-attribute instance-attribute

Default starting airspeed of a node in m/s

generate_report: bool = True class-attribute instance-attribute

Whether to output a report in the end of the simulation or not

ground_station_ip: str = None class-attribute instance-attribute

If provided and simulate_drones is True, this ip is used in UAV API to connect a GroundStation software to the simulated vehicle

reference_coordinates: Tuple[float, float, float] = (0, 0, 0) class-attribute instance-attribute

These coordinates are used as a reference frame to convert geographical coordinates to cartesian coordinates. They will be used as the center of the scene and all geographical coordinates will be converted relative to it.

simulate_drones: bool = True class-attribute instance-attribute

Wheter to use SITL or connect to real vehicle

simulation_startup_speedup: int = 1 class-attribute instance-attribute

Multiplier for SITL simulation time. This value will only affect the setup of the simulation, after all drones are positioned in the right place and are ready to start, the simulation time goes back to matching real time.

starting_api_port: int = 8000 class-attribute instance-attribute

The port in which UAV API of Node 0 will run. Following nodes use the ports in sequence by the formula port_of_node_{node_id} = starting_api_port + {node_id}

uav_api_log_path: str = None class-attribute instance-attribute

Path in which UAV API will save log files. Used in simulated mode.

update_rate: float = 0.5 class-attribute instance-attribute

Interval in simulation seconds between Ardupilot telemetry updates