Extending the Simulation
The simulation may not have all possible features that you need. We urge you to use the tools for extensibility provided by the simulator itself, instead of modifying the simulator code directly. This will make it easier to update the simulator to newer versions and sharing your creations with others.
There are a few ways to extend the simulator. We are going to showcase them in this guide.
Extension Classes
Extension classes, documented here, allow a protocol to interact directly with the simulation through an extension class. These classes have access to simulation internals and can modify them as needed.
To be more specific, extension classes have access to handlers; which are the classes that implement parts of the simulation logic. Some examples of handlers are ones that implement mobility or communication.
To showcase how to create an extension class, we are going to analyze the code of one of our extension classes, the camera hardware. This extension class implements a camera attached to a node that can "visually" detect other nodes' presence within it's field of view.
Before we start, let's take a look at the base extension class code:
import warnings
from typing import Optional
from gradysim.encapsulator.python import PythonProvider
from gradysim.protocol.interface import IProtocol
class Extension:
"""
Base class for all extensions. Extensions are classes that can be used to extend the functionality of the simulation
environment. They are designed to be used by protocols to interact with the simulation environment in a more
sophisticated way.
"""
_provider: Optional[PythonProvider]
def __init__(self, protocol: IProtocol):
provider = protocol.provider
if protocol.provider is None:
raise ReferenceError("Protocol provider is not initialized. Make sure you are not creating this extension "
"before the protocol's initialize method is called.")
if not isinstance(provider, PythonProvider):
warnings.warn("Extensions can only be ran in a python simulation environment. "
"Every functionality in this extension will be a no-op.")
self._provider = None
else:
self._provider = provider
The base extension class is a simple class designed to be subclassed. It only a single method, the initializer itself. The initializer receives a protocol instance and, through its provider, acceses simulation internals. Beware that the protocol itself should no do this if you are interested in keeping your protocol code abstraction in order. The extension intentionally breaks the abstraction layer to allow for mode flexibility.
The extension class can only be used when the protocol is running on the python simulator. Every operation of an extension class should become a no-op when the runtime environment is not the python simulator. The type of provider is PythonProvider when the runtime environment is the python simulator, if another provider is present the base extension class is set to None.
Now, let's take a look at the camera extension class code. The camera extension class introduces a simulated camera hardware to the simulation. The camera can take pictures of the nodes within its field of view and detect their position and class. The camera integrates with the mobility module like this:
class CameraHardware(Extension):
def __init__(self, protocol: IProtocol, configuration: CameraConfiguration):
super().__init__(protocol)
if self._provider is not None:
self._mobility: Optional[MobilityHandler] = self._provider.handlers.get('mobility')
Full code of the camera extension class initializer
import math
from dataclasses import dataclass
from typing import Optional, TypedDict, Tuple, List
from gradysim.protocol.interface import IProtocol
from gradysim.protocol.position import Position
from gradysim.simulator.extension.extension import Extension
from gradysim.simulator.handler.mobility import MobilityHandler
class DetectedNode(TypedDict):
position: Position
type: str
@dataclass
class CameraConfiguration:
"""
Configuration class for the camera hardware
"""
camera_reach: float
"""The length of the cone that represents the camera's area of detection"""
camera_theta: float
"""The angle that determines how wide the cone, that represents the camera's area of detection, is"""
facing_elevation: float
"""The inclination of where the camera is pointing to in degrees, with 0 being directly up"""
facing_rotation: float
"""The rotation of the camera in degrees, with zero being along the x-axis in the positive direction"""
class CameraHardware(Extension):
"""
This extension simulates a camera hardware that can detect other nodes within its area of detection. The camera
has a reach, field of view, and direction of facing. The camera is capable of taking pictures, returning the list
of detected nodes within its area of detection.
The area of detection is a cone whose point is at the node's position and base faces the direction the camera is
pointing to, determined by the facing_inclination and facing_rotation attributes of the configuration. The cone's
angle at the point is determined by the field_of_view attribute of the configuration. The cone's length, or the
distance between its point and base, is determined by the cone_reach attribute of the configuration.
"""
def __init__(self, protocol: IProtocol, configuration: CameraConfiguration):
super().__init__(protocol)
if self._provider is not None:
self._mobility: Optional[MobilityHandler] = self._provider.handlers.get('mobility')
self._configuration = configuration
self._camera_vector = self._camera_direction_unit_vector()
self._camera_theta = math.radians(self._configuration.camera_theta)
def _camera_direction_unit_vector(self) -> Tuple[float, float, float]:
"""
Returns the unit vector that represents the direction the camera is facing to
Returns:
A tuple representing the unit vector
"""
facing_inclination = math.radians(self._configuration.facing_elevation)
facing_rotation = math.radians(self._configuration.facing_rotation)
x = math.sin(facing_inclination) * math.cos(facing_rotation)
y = math.sin(facing_inclination) * math.sin(facing_rotation)
z = math.cos(facing_inclination)
return x, y, z
def take_picture(self) -> List[DetectedNode]:
"""
This simulated camera hardware is able to detect other nodes within its are of detection. This method returns
the list of nodes currently inside the area of detection of the camera.
Returns:
A list of detected nodes
"""
if self._mobility is None:
return []
node_position = self._provider.node.position
other_nodes = [node for node in self._mobility.nodes.values() if node.id != self._provider.node.id]
detected_nodes = []
for node in other_nodes:
other_node_position = node.position
relative_vector = (
other_node_position[0] - node_position[0],
other_node_position[1] - node_position[1],
other_node_position[2] - node_position[2]
)
# Check if the node is within the camera's reach
distance = math.sqrt(relative_vector[0] ** 2 + relative_vector[1] ** 2 + relative_vector[2] ** 2)
if distance > self._configuration.camera_reach:
continue
if distance > 0:
# Check if the angle between vectors is less than theta
normalized_relative_vector = (
relative_vector[0] / distance,
relative_vector[1] / distance,
relative_vector[2] / distance
)
dot_product = (
self._camera_vector[0] * normalized_relative_vector[0] +
self._camera_vector[1] * normalized_relative_vector[1] +
self._camera_vector[2] * normalized_relative_vector[2]
)
angle = math.acos(dot_product) - 1e-6 # Tolerance
if angle > self._camera_theta:
continue
detected_nodes.append({
'position': other_node_position,
'type': 'node'
})
return detected_nodes
def change_facing(self, facing_elevation: float, facing_rotation: float):
"""
Changes the direction the camera is facing to
Args:
facing_elevation: The inclination of where the camera is pointing to in degrees, with 0 being at the ground
facing_rotation: The rotation of the camera in degrees, with zero being along the x-axis in the positive direction
"""
self._configuration.facing_elevation = facing_elevation
self._configuration.facing_rotation = facing_rotation
self._camera_vector = self._camera_direction_unit_vector()
The picture taking process is implemented by looking at every sensor's position and calculating if its within the camera's field of view. The calculation is out of the scope of this tutorial, but you can check the code and documentation to understand how it works. The camera extension class uses the information available in the mobility handler to determine the full list of node's and their positions here:
def take_picture(self) -> List[DetectedNode]:
...
other_nodes = [node for node in self._mobility.nodes.values() if node.id != self._provider.node.id]
detected_nodes = []
for node in other_nodes:
other_node_position = node.position
Full code of the camera extension class
import math
from dataclasses import dataclass
from typing import Optional, TypedDict, Tuple, List
from gradysim.protocol.interface import IProtocol
from gradysim.protocol.position import Position
from gradysim.simulator.extension.extension import Extension
from gradysim.simulator.handler.mobility import MobilityHandler
class DetectedNode(TypedDict):
position: Position
type: str
@dataclass
class CameraConfiguration:
"""
Configuration class for the camera hardware
"""
camera_reach: float
"""The length of the cone that represents the camera's area of detection"""
camera_theta: float
"""The angle that determines how wide the cone, that represents the camera's area of detection, is"""
facing_elevation: float
"""The inclination of where the camera is pointing to in degrees, with 0 being directly up"""
facing_rotation: float
"""The rotation of the camera in degrees, with zero being along the x-axis in the positive direction"""
class CameraHardware(Extension):
"""
This extension simulates a camera hardware that can detect other nodes within its area of detection. The camera
has a reach, field of view, and direction of facing. The camera is capable of taking pictures, returning the list
of detected nodes within its area of detection.
The area of detection is a cone whose point is at the node's position and base faces the direction the camera is
pointing to, determined by the facing_inclination and facing_rotation attributes of the configuration. The cone's
angle at the point is determined by the field_of_view attribute of the configuration. The cone's length, or the
distance between its point and base, is determined by the cone_reach attribute of the configuration.
"""
def __init__(self, protocol: IProtocol, configuration: CameraConfiguration):
super().__init__(protocol)
if self._provider is not None:
self._mobility: Optional[MobilityHandler] = self._provider.handlers.get('mobility')
self._configuration = configuration
self._camera_vector = self._camera_direction_unit_vector()
self._camera_theta = math.radians(self._configuration.camera_theta)
def _camera_direction_unit_vector(self) -> Tuple[float, float, float]:
"""
Returns the unit vector that represents the direction the camera is facing to
Returns:
A tuple representing the unit vector
"""
facing_inclination = math.radians(self._configuration.facing_elevation)
facing_rotation = math.radians(self._configuration.facing_rotation)
x = math.sin(facing_inclination) * math.cos(facing_rotation)
y = math.sin(facing_inclination) * math.sin(facing_rotation)
z = math.cos(facing_inclination)
return x, y, z
def take_picture(self) -> List[DetectedNode]:
"""
This simulated camera hardware is able to detect other nodes within its are of detection. This method returns
the list of nodes currently inside the area of detection of the camera.
Returns:
A list of detected nodes
"""
if self._mobility is None:
return []
node_position = self._provider.node.position
other_nodes = [node for node in self._mobility.nodes.values() if node.id != self._provider.node.id]
detected_nodes = []
for node in other_nodes:
other_node_position = node.position
relative_vector = (
other_node_position[0] - node_position[0],
other_node_position[1] - node_position[1],
other_node_position[2] - node_position[2]
)
# Check if the node is within the camera's reach
distance = math.sqrt(relative_vector[0] ** 2 + relative_vector[1] ** 2 + relative_vector[2] ** 2)
if distance > self._configuration.camera_reach:
continue
if distance > 0:
# Check if the angle between vectors is less than theta
normalized_relative_vector = (
relative_vector[0] / distance,
relative_vector[1] / distance,
relative_vector[2] / distance
)
dot_product = (
self._camera_vector[0] * normalized_relative_vector[0] +
self._camera_vector[1] * normalized_relative_vector[1] +
self._camera_vector[2] * normalized_relative_vector[2]
)
angle = math.acos(dot_product) - 1e-6 # Tolerance
if angle > self._camera_theta:
continue
detected_nodes.append({
'position': other_node_position,
'type': 'node'
})
return detected_nodes
def change_facing(self, facing_elevation: float, facing_rotation: float):
"""
Changes the direction the camera is facing to
Args:
facing_elevation: The inclination of where the camera is pointing to in degrees, with 0 being at the ground
facing_rotation: The rotation of the camera in degrees, with zero being along the x-axis in the positive direction
"""
self._configuration.facing_elevation = facing_elevation
self._configuration.facing_rotation = facing_rotation
self._camera_vector = self._camera_direction_unit_vector()
To use the camera extension class, simply instantiate it during initialization. To illustrate a potential use case, we'll implement a simple simulation where a couple of drones fly over an area and take pictures of the nodes below them, logging how many they see. The code for this simulation is shown below:
import logging
from gradysim.protocol.interface import IProtocol
from gradysim.protocol.messages.telemetry import Telemetry
from gradysim.protocol.plugin.random_mobility import RandomMobilityPlugin, RandomMobilityConfig
from gradysim.simulator.extension.camera import CameraHardware, CameraConfiguration
class PointOfInterest(IProtocol):
def initialize(self) -> None:
pass
def handle_timer(self, timer: str) -> None:
pass
def handle_packet(self, message: str) -> None:
pass
def handle_telemetry(self, telemetry: Telemetry) -> None:
pass
def finish(self) -> None:
pass
class Drone(IProtocol):
camera: CameraHardware
def initialize(self) -> None:
plugin = RandomMobilityPlugin(self, RandomMobilityConfig(x_range=(-50, 50), y_range=(-50, 50), z_range=(10, 10)))
plugin.initiate_random_trip()
self.provider.schedule_timer("", self.provider.current_time())
configuration = CameraConfiguration(
20, # Reach of the camera
30, # Angle of the cone
180, # Facing elevation - facing downwards
0 # Facing rotation
)
self.camera = CameraHardware(self, configuration)
def handle_timer(self, timer: str) -> None:
detected = self.camera.take_picture()
logging.info(f"Detected {len(detected)} points of interest")
self.provider.schedule_timer("", self.provider.current_time() + 5)
def handle_packet(self, message: str) -> None:
pass
def handle_telemetry(self, telemetry: Telemetry) -> None:
pass
def finish(self) -> None:
pass
The point of interest protocol implements no behaviour, it serves simply as a point of interest for the camera hardware to detect. The Drone protocol randomly flies above the area and takes pictures of the nodes below it at 5 second intervals. The simulation code is shown below:
import random
from gradysim.simulator.handler.mobility import MobilityHandler
from gradysim.simulator.handler.timer import TimerHandler
from gradysim.simulator.handler.visualization import VisualizationHandler
from gradysim.simulator.simulation import SimulationConfiguration, SimulationBuilder
from protocol import PointOfInterest, Drone
def main():
# Configuring simulation
config = SimulationConfiguration(
duration=200
)
builder = SimulationBuilder(config)
builder.add_handler(TimerHandler())
builder.add_handler(MobilityHandler())
builder.add_handler(VisualizationHandler())
# Instantiating a bunch of ground sensors
for _ in range(100):
builder.add_node(PointOfInterest,
(random.randint(-50, 50), random.randint(-50, 50), 0))
# Instantiating 4 UAVs at (0,0,0)
for _ in range(4):
builder.add_node(Drone, (0, 0, 0))
# Building & starting
simulation = builder.build()
simulation.start_simulation()
if __name__ == "__main__":
main()
The expected output of the simulation is:
INFO [--------- Simulation started ---------]
INFO [it=0 time=0:00:00 | Drone 100 Initialization] RandomMobilityPlugin: Initiating a random trip
INFO [it=0 time=0:00:00 | Drone 100 Initialization] RandomMobilityPlugin: traveling to waypoint (14.370034502815273, 12.48244048437983, 10.0)
INFO [it=0 time=0:00:00 | Drone 101 Initialization] RandomMobilityPlugin: Initiating a random trip
INFO [it=0 time=0:00:00 | Drone 101 Initialization] RandomMobilityPlugin: traveling to waypoint (-24.720737786286406, 32.04056460547929, 10.0)
INFO [it=0 time=0:00:00 | Drone 102 Initialization] RandomMobilityPlugin: Initiating a random trip
INFO [it=0 time=0:00:00 | Drone 102 Initialization] RandomMobilityPlugin: traveling to waypoint (-1.4705086519382746, 5.365903305621153, 10.0)
INFO [it=0 time=0:00:00 | Drone 103 Initialization] RandomMobilityPlugin: Initiating a random trip
INFO [it=0 time=0:00:00 | Drone 103 Initialization] RandomMobilityPlugin: traveling to waypoint (-22.70707551545876, -36.412604020078106, 10.0)
INFO [it=0 time=0:00:00 | Drone 100 handle_timer] Detected 3 points of interest
INFO [it=1 time=0:00:00 | Drone 103 handle_timer] Detected 3 points of interest
INFO [it=2 time=0:00:00 | Drone 101 handle_timer] Detected 3 points of interest
INFO [it=3 time=0:00:00 | Drone 102 handle_timer] Detected 3 points of interest
INFO [it=11037 time=0:00:01.050000 | Drone 102 handle_telemetry] RandomMobilityPlugin: traveling to waypoint (30.939559651294985, 4.471458779580473, 10.0)
INFO [it=21745 time=0:00:02.060000 | Drone 100 handle_telemetry] RandomMobilityPlugin: traveling to waypoint (10.641625971935532, -43.2282648851834, 10.0)
INFO [it=43050 time=0:00:04.070000 | Drone 101 handle_telemetry] RandomMobilityPlugin: traveling to waypoint (37.52680363429771, 37.89473119809303, 10.0)
INFO [it=44321 time=0:00:04.190000 | Drone 102 handle_telemetry] RandomMobilityPlugin: traveling to waypoint (17.293162626366794, 27.386533869882385, 10.0)
INFO [it=45592 time=0:00:04.310000 | Drone 103 handle_telemetry] RandomMobilityPlugin: traveling to waypoint (-26.725198762915912, 8.771431035907064, 10.0)
INFO [it=53004 time=0:00:05 | Drone 102 handle_timer] Detected 0 points of interest
INFO [it=53005 time=0:00:05 | Drone 101 handle_timer] Detected 0 points of interest
INFO [it=53006 time=0:00:05 | Drone 103 handle_timer] Detected 1 points of interest
INFO [it=53007 time=0:00:05 | Drone 100 handle_timer] Detected 1 points of interest
INFO [it=71143 time=0:00:06.720000 | Drone 102 handle_telemetry] RandomMobilityPlugin: traveling to waypoint (-26.91921588222399, 38.910668436612625, 10.0)
INFO [it=79307 time=0:00:07.490000 | Drone 100 handle_telemetry] RandomMobilityPlugin: traveling to waypoint (-9.859851685045506, -22.136296678139523, 10.0)
INFO [it=91918 time=0:00:08.680000 | Drone 103 handle_telemetry] RandomMobilityPlugin: traveling to waypoint (8.848617474256883, 18.21708387682807, 10.0)
INFO [it=106008 time=0:00:10 | Drone 100 handle_timer] Detected 1 points of interest
INFO [it=106009 time=0:00:10 | Drone 102 handle_timer] Detected 1 points of interest
INFO [it=106010 time=0:00:10 | Drone 103 handle_timer] Detected 1 points of interest
INFO [it=106011 time=0:00:10 | Drone 101 handle_timer] Detected 3 points of interest
...
INFO [it=2120160 time=0:03:20 | Drone 102 handle_timer] Detected 3 points of interest
INFO [it=2120161 time=0:03:20 | Drone 101 handle_timer] Detected 0 points of interest
INFO [it=2120162 time=0:03:20 | Drone 103 handle_timer] Detected 1 points of interest
INFO [it=2120163 time=0:03:20 | Drone 100 handle_timer] Detected 0 points of interest
INFO [--------- Simulation finished ---------]
INFO Real time elapsed: 0:00:22.017243 Total iterations: 2120165 Simulation time: 0:03:20.010000