Simulating a data collection scenario
Info
This guide will step through the complete implementation of a protocol and execution in prototype-mode.
Full code of the protocols implemented in this guide
import enum
import json
import logging
from typing import TypedDict
from gradysim.protocol.interface import IProtocol
from gradysim.protocol.messages.communication import SendMessageCommand, BroadcastMessageCommand
from gradysim.protocol.messages.telemetry import Telemetry
from gradysim.protocol.plugin.mission_mobility import MissionMobilityPlugin, MissionMobilityConfiguration, LoopMission
class SimpleSender(enum.Enum):
SENSOR = 0
UAV = 1
GROUND_STATION = 2
class SimpleMessage(TypedDict):
packet_count: int
sender_type: int
sender: int
def report_message(message: SimpleMessage) -> str:
return (f"Received message with {message['packet_count']} packets from "
f"{SimpleSender(message['sender_type']).name} {message['sender']}")
class SimpleSensorProtocol(IProtocol):
_log: logging.Logger
packet_count: int
def initialize(self) -> None:
self._log = logging.getLogger()
self.packet_count = 0
self._generate_packet()
def _generate_packet(self) -> None:
self.packet_count += 1
self._log.info(f"Generated packet, current count {self.packet_count}")
self.provider.schedule_timer("", self.provider.current_time() + 1)
def handle_timer(self, timer: str) -> None:
self._generate_packet()
def handle_packet(self, message: str) -> None:
simple_message: SimpleMessage = json.loads(message)
self._log.info(report_message(simple_message))
if simple_message['sender_type'] == SimpleSender.UAV.value:
response: SimpleMessage = {
'packet_count': self.packet_count,
'sender_type': SimpleSender.SENSOR.value,
'sender': self.provider.get_id()
}
command = SendMessageCommand(json.dumps(response), simple_message['sender'])
self.provider.send_communication_command(command)
self._log.info(f"Sent {response['packet_count']} packets to UAV {simple_message['sender']}")
self.packet_count = 0
def handle_telemetry(self, telemetry: Telemetry) -> None:
pass
def finish(self) -> None:
self._log.info(f"Final packet count: {self.packet_count}")
mission_list = [
[
(0, 0, 20),
(150, 0, 20)
],
[
(0, 0, 20),
(0, 150, 20)
],
[
(0, 0, 20),
(-150, 0, 20)
],
[
(0, 0, 20),
(0, -150, 20)
]
]
class SimpleUAVProtocol(IProtocol):
_log: logging.Logger
packet_count: int
_mission: MissionMobilityPlugin
def initialize(self) -> None:
self._log = logging.getLogger()
self.packet_count = 0
self._mission = MissionMobilityPlugin(self, MissionMobilityConfiguration(
loop_mission=LoopMission.REVERSE,
))
self._mission.start_mission(mission_list.pop())
self._send_heartbeat()
def _send_heartbeat(self) -> None:
self._log.info(f"Sending heartbeat, current count {self.packet_count}")
message: SimpleMessage = {
'packet_count': self.packet_count,
'sender_type': SimpleSender.UAV.value,
'sender': self.provider.get_id()
}
command = BroadcastMessageCommand(json.dumps(message))
self.provider.send_communication_command(command)
self.provider.schedule_timer("", self.provider.current_time() + 1)
def handle_timer(self, timer: str) -> None:
self._send_heartbeat()
def handle_packet(self, message: str) -> None:
simple_message: SimpleMessage = json.loads(message)
self._log.info(report_message(simple_message))
if simple_message['sender_type'] == SimpleSender.SENSOR.value:
self.packet_count += simple_message['packet_count']
self._log.info(f"Received {simple_message['packet_count']} packets from "
f"sensor {simple_message['sender']}. Current count {self.packet_count}")
elif simple_message['sender_type'] == SimpleSender.GROUND_STATION.value:
self._log.info("Received acknowledgment from ground station")
self.packet_count = 0
def handle_telemetry(self, telemetry: Telemetry) -> None:
pass
def finish(self) -> None:
self._log.info(f"Final packet count: {self.packet_count}")
class SimpleGroundStationProtocol(IProtocol):
_log: logging.Logger
packet_count: int
def initialize(self) -> None:
self._log = logging.getLogger()
self.packet_count = 0
def handle_timer(self, timer: str) -> None:
pass
def handle_packet(self, message: str) -> None:
simple_message: SimpleMessage = json.loads(message)
self._log.info(report_message(simple_message))
if simple_message['sender_type'] == SimpleSender.UAV.value:
response: SimpleMessage = {
'packet_count': self.packet_count,
'sender_type': SimpleSender.GROUND_STATION.value,
'sender': self.provider.get_id()
}
command = SendMessageCommand(json.dumps(response), simple_message['sender'])
self.provider.send_communication_command(command)
self.packet_count += simple_message['packet_count']
self._log.info(f"Sent acknowledgment to UAV {simple_message['sender']}. Current count {self.packet_count}")
def handle_telemetry(self, telemetry: Telemetry) -> None:
pass
def finish(self) -> None:
self._log.info(f"Final packet count: {self.packet_count}")
Full code needed to execute this example
from gradysim.simulator.handler.communication import CommunicationHandler, CommunicationMedium
from gradysim.simulator.handler.mobility import MobilityHandler
from gradysim.simulator.handler.timer import TimerHandler
from gradysim.simulator.handler.visualization import VisualizationHandler, VisualizationConfiguration
from gradysim.simulator.simulation import SimulationBuilder, SimulationConfiguration
from simple_protocol import SimpleSensorProtocol, SimpleGroundStationProtocol, SimpleUAVProtocol
def main():
# Configuring simulation
config = SimulationConfiguration(
duration=200
)
builder = SimulationBuilder(config)
# Instantiating 4 sensors in fixed positions
builder.add_node(SimpleSensorProtocol, (150, 0, 0))
builder.add_node(SimpleSensorProtocol, (0, 150, 0))
builder.add_node(SimpleSensorProtocol, (-150, 0, 0))
builder.add_node(SimpleSensorProtocol, (0, -150, 0))
# Instantiating 4 UAVs at (0,0,0)
builder.add_node(SimpleUAVProtocol, (0, 0, 0))
builder.add_node(SimpleUAVProtocol, (0, 0, 0))
builder.add_node(SimpleUAVProtocol, (0, 0, 0))
builder.add_node(SimpleUAVProtocol, (0, 0, 0))
# Instantiating ground station at (0,0,0)
builder.add_node(SimpleGroundStationProtocol, (0, 0, 0))
# Adding required handlers
builder.add_handler(TimerHandler())
builder.add_handler(CommunicationHandler(CommunicationMedium(
transmission_range=30
)))
builder.add_handler(MobilityHandler())
builder.add_handler(VisualizationHandler(VisualizationConfiguration(
x_range=(-150, 150),
y_range=(-150, 150),
z_range=(0, 150)
)))
# Building & starting
simulation = builder.build()
simulation.start_simulation()
if __name__ == "__main__":
main()
Scenario description
We have sensors spread around some location. These sensors collect information about their local environment which is of interest to us. The deployment location has no communication infrastructure and is hard to access, so to get the information out of these sensors we will employ a UAV swarm. The UAVs will fly autonomously to the deployment location, retrieve data from the sensors through local communication and fly back to a central ground station where the data will be offloaded and analysed.
The number of UAVs that are available for this mission is equal to the number of sensors deployed in the field, thus each UAV will only monitor one sensor.
Here is some additional information about the scenario's requirements:
- UAVs should continuously fly between the deployment location and the ground station
- Sensors are continuously collecting data and generate new packets periodically at a steady rate
- Every packet a sensor has will be transferred once a UAV makes contact with it
- There is no limit to how many packets any agent can carry
- The ground station is effectively a packet sink
- Communication is local and it's range is limited
Designing the simulation
We have three different types of agents in this scenario: sensors, UAVs and the ground station. Since each agent type behaves differently, three protocol classes will be created. The protocols that will implement the agent type behaviours are very simple and can each be described in a couple of steps:
1. Periodically generates packets and stores them
2. Listens for any UAV messages and responds by sending to the UAV every packet
stored in the sensor
1. Repeatedly flies between the ground station and a sensor
2. Periodically tries to communicate with nearby agents, advertising the number
of packets it contains
3. Stores packets received from sensors
4. Drops packets after sending them to the ground station
1. Listens for any UAV communication and absorbs their packets
2. Responds to the sender UAV confirming that their packets were received
Although independent the three protocol types will need to communicate to complete their task. In GrADyS-SIM NextGen communication is done through messages containing a string payload. In order to facilitate implementation we need to define a common message format that is serializable and has every field necessary for communication.
Interactions in the protocols above are defined between agent types, UAV <-> sensor
and UAV <-> ground station
message exchanges are handled in different ways. So, if we want to use a common message format, we need to include
the agent type in the format to allow these interactions to be defined.
Protocols talk to each other in a request-response manner, generally with the UAV initiating the exchange. In order to know who to respond to we need to have the agent id in the message definition.
Finally, in our interactions the sensor transmits packets to the UAVs, and they transmit packets to the ground station. We need to know the agent's packet count in order to implement the transmission.
Implementing the protocols
With our requirements ready all that is left is translating all this into protocol code. Our use-case is very simple
so it will be easy to present the code step-by-step. Let's first show the SimpleMessage implementation. Following
our requirements we implemented the simple message as a dictionary. In order to have typing hints we create a TypedDict
class containing the desired fields. GrADyS-SIM NextGen doesn't impose any
To identify the types of agents in our simulation we will use the SimpleSender
enum shown below.
To help us with logging a report_message
function is defined. It receives
a SimpleMessage and appropriately logs it.
Sensor
The first protocol we will implement in the sensor is the periodic data generation. Below is a snippet from the protocol's
initialize method, we are storing the generated packets in the self.packet_count
variable, at simulation start 0
packets are in storage. The _generate_packet
method increments this counter by one and schedules a timer for one
second later. The handle_timer
is called when this timer fires and in turn calls the _generate_packet
method again.
This creates an infinite loop which with a periodicity of 1 second increments the packet counter.
def initialize(self) -> None:
self.packet_count = 0
self._generate_packet()
def _generate_packet(self) -> None:
self.packet_count += 1
self.provider.schedule_timer("", self.provider.current_time() + 1)
def handle_timer(self, timer: str) -> None:
self._generate_packet()
Next we need to implement the interaction with other modules. The handle_packet
method will be called when a message
is recieved from another module. This missage will be a SimpleMessage and needs to be deserialized. The only relevant
interaction in the sensor is with an UAV, so we will only implement that. The code below shows a snipped from the
handle_packet
method. After receiving a message from a UAV the sensor will send every one of its packets to the UAV
and zero out the self.packet_count
counter.
def handle_packet(self, message: str) -> None:
simple_message: SimpleMessage = json.loads(message)
if simple_message['sender_type'] == SimpleSender.UAV.value:
response: SimpleMessage = {
'packet_count': self.packet_count,
'sender_type': SimpleSender.SENSOR.value,
'sender': self.provider.get_id()
}
command = SendMessageCommand(json.dumps(response), simple_message['sender'])
self.provider.send_communication_command(command)
self._log.info(f"Sent {response['packet_count']} packets to UAV {simple_message['sender']}")
self.packet_count = 0
Full sensor code
class SimpleSensorProtocol(IProtocol):
_log: logging.Logger
packet_count: int
def initialize(self) -> None:
self._log = logging.getLogger()
self.packet_count = 0
self._generate_packet()
def _generate_packet(self) -> None:
self.packet_count += 1
self._log.info(f"Generated packet, current count {self.packet_count}")
self.provider.schedule_timer("", self.provider.current_time() + 1)
def handle_timer(self, timer: str) -> None:
self._generate_packet()
def handle_packet(self, message: str) -> None:
simple_message: SimpleMessage = json.loads(message)
self._log.info(report_message(simple_message))
if simple_message['sender_type'] == SimpleSender.UAV.value:
response: SimpleMessage = {
'packet_count': self.packet_count,
'sender_type': SimpleSender.SENSOR.value,
'sender': self.provider.get_id()
}
command = SendMessageCommand(json.dumps(response), simple_message['sender'])
self.provider.send_communication_command(command)
self._log.info(f"Sent {response['packet_count']} packets to UAV {simple_message['sender']}")
self.packet_count = 0
def handle_telemetry(self, telemetry: Telemetry) -> None:
pass
def finish(self) -> None:
self._log.info(f"Final packet count: {self.packet_count}")
Ground Station
The ground station is the second protocol we will implement. It is even more simple them the sensor since it doesn't
require any periodic timer to function. The snipped below covers the initialization of a self.packet_count
property
storing the number of packets that have been delivered to the ground station and the definition of the handle_packet
method. The handling behaviour only cares about messages that came from UAVs, when received the ground station adds the
packets the UAV contains to its packet count and sends a message back to the UAV, so it can know his packets have been
delivered.
def initialize(self) -> None:
self.packet_count = 0
def handle_packet(self, message: str) -> None:
simple_message: SimpleMessage = json.loads(message)
if simple_message['sender_type'] == SimpleSender.UAV.value:
response: SimpleMessage = {
'packet_count': self.packet_count,
'sender_type': SimpleSender.GROUND_STATION.value,
'sender': self.provider.get_id()
}
command = SendMessageCommand(json.dumps(response), simple_message['sender'])
self.provider.send_communication_command(command)
self.packet_count += simple_message['packet_count']
Full ground station code
class SimpleGroundStationProtocol(IProtocol):
_log: logging.Logger
packet_count: int
def initialize(self) -> None:
self._log = logging.getLogger()
self.packet_count = 0
def handle_timer(self, timer: str) -> None:
pass
def handle_packet(self, message: str) -> None:
simple_message: SimpleMessage = json.loads(message)
self._log.info(report_message(simple_message))
if simple_message['sender_type'] == SimpleSender.UAV.value:
response: SimpleMessage = {
'packet_count': self.packet_count,
'sender_type': SimpleSender.GROUND_STATION.value,
'sender': self.provider.get_id()
}
command = SendMessageCommand(json.dumps(response), simple_message['sender'])
self.provider.send_communication_command(command)
self.packet_count += simple_message['packet_count']
self._log.info(f"Sent acknowledgment to UAV {simple_message['sender']}. Current count {self.packet_count}")
def handle_telemetry(self, telemetry: Telemetry) -> None:
pass
def finish(self) -> None:
self._log.info(f"Final packet count: {self.packet_count}")
UAV
The UAV protocol is the most complicated one as it has to interact to both sensors and the ground station. It also needs
to periodically broadcast the number of packets it has to initiate interactions with other agents. The snipped bellow
illustrates the protocol's initialization and this timer, it is implemented in a similar way to the sensor packet
generation timer. The _send_heartbeat
function implements a recurrent timer with a periodicity of 1 second that
broadcasts the message.
def initialize(self) -> None:
self.packet_count = 0
self._send_heartbeat()
def _send_heartbeat(self) -> None:
message: SimpleMessage = {
'packet_count': self.packet_count,
'sender_type': SimpleSender.UAV.value,
'sender': self.provider.get_id()
}
command = BroadcastMessageCommand(json.dumps(message))
self.provider.send_communication_command(command)
self.provider.schedule_timer("", self.provider.current_time() + 1)
Another thing the UAV needs to do is actually travel from the sensors to the ground station. We will use the MissionMobilityPlugin to implement this. This plugin allows us to specify a list of waypoints and configure a looping mission that will have our node follow this list of waypoints infinitely. Since we want each of our UAVs to go to a different sensor, we will prepare a list of missions that the UAVs will pick from during initialization. Each mission starts at (0,0), where the ground station will be and flies to one of the sensor's deployment locations which we will define later. The UAVs always fly 20 meters above the ground.
mission_list = [
[
(0, 0, 20),
(150, 0, 20)
],
[
(0, 0, 20),
(0, 150, 20)
],
[
(0, 0, 20),
(-150, 0, 20)
],
[
(0, 0, 20),
(0, -150, 20)
]
]
self._mission = MissionMobilityPlugin(self, MissionMobilityConfiguration(
loop_mission=LoopMission.REVERSE,
))
self._mission.start_mission(mission_list.pop())
Next let's define our interactions with other protocols in the handle_packet
method. A message from a sensor means
that one of our messages has been received, and it is sending us its packets, in that case we accept them and add them
to our self.packet_count
property. If a message is received from the ground station it means that we have succesfully
delivered our packets to it, and we should discard them. The snippet below shows this behaviour.
def handle_packet(self, message: str) -> None:
simple_message: SimpleMessage = json.loads(message)
if simple_message['sender_type'] == SimpleSender.SENSOR.value:
self.packet_count += simple_message['packet_count']
elif simple_message['sender_type'] == SimpleSender.GROUND_STATION.value:
self.packet_count = 0
Full UAV code
mission_list = [
[
(0, 0, 20),
(150, 0, 20)
],
[
(0, 0, 20),
(0, 150, 20)
],
[
(0, 0, 20),
(-150, 0, 20)
],
[
(0, 0, 20),
(0, -150, 20)
]
]
class SimpleUAVProtocol(IProtocol):
_log: logging.Logger
packet_count: int
_mission: MissionMobilityPlugin
def initialize(self) -> None:
self._log = logging.getLogger()
self.packet_count = 0
self._mission = MissionMobilityPlugin(self, MissionMobilityConfiguration(
loop_mission=LoopMission.REVERSE,
))
self._mission.start_mission(mission_list.pop())
self._send_heartbeat()
def _send_heartbeat(self) -> None:
self._log.info(f"Sending heartbeat, current count {self.packet_count}")
message: SimpleMessage = {
'packet_count': self.packet_count,
'sender_type': SimpleSender.UAV.value,
'sender': self.provider.get_id()
}
command = BroadcastMessageCommand(json.dumps(message))
self.provider.send_communication_command(command)
self.provider.schedule_timer("", self.provider.current_time() + 1)
def handle_timer(self, timer: str) -> None:
self._send_heartbeat()
def handle_packet(self, message: str) -> None:
simple_message: SimpleMessage = json.loads(message)
self._log.info(report_message(simple_message))
if simple_message['sender_type'] == SimpleSender.SENSOR.value:
self.packet_count += simple_message['packet_count']
self._log.info(f"Received {simple_message['packet_count']} packets from "
f"sensor {simple_message['sender']}. Current count {self.packet_count}")
elif simple_message['sender_type'] == SimpleSender.GROUND_STATION.value:
self._log.info("Received acknowledgment from ground station")
self.packet_count = 0
def handle_telemetry(self, telemetry: Telemetry) -> None:
pass
def finish(self) -> None:
self._log.info(f"Final packet count: {self.packet_count}")
Executing what we've built
Everything up to this point has been related only to protocols. The protocol we created above should be able to run in any execution-mode. We will create a prototype-mode simulation to test it.
Warning
Although the protocol will run on any environment it doesn't mean that all interactions will work exactly the same. Different environments bring with them new complexities that you might not have anticipated when you built your protocol. These protocols, for example, would have issues in integrated-mode as OMNeT++ simulates message colisions and all our UAVs send messages exactly every 1 second. A simple way to fix this would be introducing a small random delay to UAV messages. This is an example of how having varied environments can help you build more robust protocols.
from gradysim.simulator.handler.communication import CommunicationHandler, CommunicationMedium
from gradysim.simulator.handler.mobility import MobilityHandler
from gradysim.simulator.handler.timer import TimerHandler
from gradysim.simulator.handler.visualization import VisualizationHandler, VisualizationConfiguration
from gradysim.simulator.simulation import SimulationBuilder, SimulationConfiguration
from simple_protocol import SimpleSensorProtocol, SimpleGroundStationProtocol, SimpleUAVProtocol
def main():
# Configuring simulation
config = SimulationConfiguration(
duration=200
)
builder = SimulationBuilder(config)
# Instantiating 4 sensors in fixed positions
builder.add_node(SimpleSensorProtocol, (150, 0, 0))
builder.add_node(SimpleSensorProtocol, (0, 150, 0))
builder.add_node(SimpleSensorProtocol, (-150, 0, 0))
builder.add_node(SimpleSensorProtocol, (0, -150, 0))
# Instantiating 4 UAVs at (0,0,0)
builder.add_node(SimpleUAVProtocol, (0, 0, 0))
builder.add_node(SimpleUAVProtocol, (0, 0, 0))
builder.add_node(SimpleUAVProtocol, (0, 0, 0))
builder.add_node(SimpleUAVProtocol, (0, 0, 0))
# Instantiating ground station at (0,0,0)
builder.add_node(SimpleGroundStationProtocol, (0, 0, 0))
# Adding required handlers
builder.add_handler(TimerHandler())
builder.add_handler(CommunicationHandler(CommunicationMedium(
transmission_range=30
)))
builder.add_handler(MobilityHandler())
builder.add_handler(VisualizationHandler(VisualizationConfiguration(
x_range=(-150, 150),
y_range=(-150, 150),
z_range=(0, 150)
)))
# Building & starting
simulation = builder.build()
simulation.start_simulation()
if __name__ == "__main__":
main()
Danger
This pattern of defining a main function and running it only if the file is being executed directly is required if you are using the VisualizationHandler as it will spawn a new process to run the visualization thread. Read more about this in the handler's documentation.
The code above configures a python simulation that will run for 200 seconds. Four sensors are added to the simulation at specific deployment locations around a 300x300 meter area centered around (0,0). Four UAVs are instantiated, each of them will serve one of these sensors. Last, the ground station is created at (0,0)) on the ground.
After all the node's have been added we will add the handlers required for this simulation to run. The timer handler is required to enable timers, communication handler is also added and configured for a communication range of 30 meters. Mobility is also necessary to enable movement and finally visualization is added so we can watch the simulation progress.
Finally, we build the simulation and run it.
Results
After running the simulation you will see that some useful stats have been logged. They indicate that the UAVs were successful in transporting messages from the sensors to the ground station. Over the entire simulation over 600 packets were delivered to the ground station by them.
INFO [it=0 time=0:00:00 | SimpleSensorProtocol 0 Finalization] Final packet count: 43
INFO [it=0 time=0:00:00 | SimpleSensorProtocol 1 Finalization] Final packet count: 43
INFO [it=0 time=0:00:00 | SimpleSensorProtocol 2 Finalization] Final packet count: 43
INFO [it=0 time=0:00:00 | SimpleSensorProtocol 3 Finalization] Final packet count: 43
INFO [it=0 time=0:00:00 | SimpleUAVProtocol 4 Finalization] Final packet count: 0
INFO [it=0 time=0:00:00 | SimpleUAVProtocol 5 Finalization] Final packet count: 0
INFO [it=0 time=0:00:00 | SimpleUAVProtocol 6 Finalization] Final packet count: 0
INFO [it=0 time=0:00:00 | SimpleUAVProtocol 7 Finalization] Final packet count: 0
INFO [it=0 time=0:00:00 | SimpleGroundStationProtocol 8 Finalization] Final packet count: 632