Skip to content

Visualization

gradysim.simulator.handler.visualization

This handler adds visualization to the simulation. It shows regularly updated node positions in a graphical representation of the simulation. This graphical representation is web-based and can be accessed via a browser through this link. When the simulation starts running a WebSocket server is started which the browser connects to. The browser then displays the simulation in a 3D environment. The visualization is updated regularly with the current node positions and other information.

Danger

The visualization handler uses a separate process to run the WebSocket server, this means that on Windows your script will be rerun when the new process starts. This means that any code that should not be run multiple times should be put in the if __name__ == "__main__" block.

VisualizationConfiguration dataclass

Configuration for the VisualizationHandler

Source code in gradysim/simulator/handler/visualization.py
@dataclass
class VisualizationConfiguration:
    """
    Configuration for the VisualizationHandler
    """

    information_collection_interval: float = 0.01
    """
    Interval in simulation seconds between visualization information update. Beware that this is not the frequency
    at which the visualization is updated in the browser, but the frequency at which data is collected from the 
    running simulation. You can change the broser update frequency with the `update_rate` parameter.
    """

    update_rate: float = 0.05
    """
    Rate in seconds at which the visualization is updated in the browser. This is the frequency at which the browser
    receives the information from the simulation.
    """

    x_range: Tuple[float, float] = (-50, 50)
    """Range of the X axis of the visualization in meters"""

    y_range: Tuple[float, float] = (-50, 50)
    """Range of the Y axis of the visualization in meters"""

    z_range: Tuple[float, float] = (0, 50)
    """Range of the Z axis of the visualization in meters"""

    host: str = "localhost"
    """Host address of the visualization server"""

    port: int = 5678
    """Port that the visualization server will run in"""

    open_browser: bool = False
    """Whether to open the browser automatically when the simulation starts"""

host: str = 'localhost' class-attribute instance-attribute

Host address of the visualization server

information_collection_interval: float = 0.01 class-attribute instance-attribute

Interval in simulation seconds between visualization information update. Beware that this is not the frequency at which the visualization is updated in the browser, but the frequency at which data is collected from the running simulation. You can change the broser update frequency with the update_rate parameter.

open_browser: bool = False class-attribute instance-attribute

Whether to open the browser automatically when the simulation starts

port: int = 5678 class-attribute instance-attribute

Port that the visualization server will run in

update_rate: float = 0.05 class-attribute instance-attribute

Rate in seconds at which the visualization is updated in the browser. This is the frequency at which the browser receives the information from the simulation.

x_range: Tuple[float, float] = (-50, 50) class-attribute instance-attribute

Range of the X axis of the visualization in meters

y_range: Tuple[float, float] = (-50, 50) class-attribute instance-attribute

Range of the Y axis of the visualization in meters

z_range: Tuple[float, float] = (0, 50) class-attribute instance-attribute

Range of the Z axis of the visualization in meters

VisualizationController

Controller for the visualization handler. Can be used to send commands to the visualization handler from a protocol. Commands can be used to affect the visualization, such as painting nodes or changing the environment's color.

Info

Every method in this class is a no-op if a visualization handler is not active. This includes when the protocol is not runnign on a python simulation environment.

Warning

The VisualizationController is attached to the last active visualization handler. This means that if you have multiple visualization handlers active for some reason, the controller will only affect the last one. This also means that this class should always be instantiated in the protocol's initialize method, to avoid initialization before the visualization handler is active.

Source code in gradysim/simulator/handler/visualization.py
class VisualizationController:
    """
    Controller for the visualization handler. Can be used to send commands to the visualization handler from a
    protocol. Commands can be used to affect the visualization, such as painting nodes or changing the
    environment's color.

    !!!info
        Every method in this class is a no-op if a visualization handler is not active. This includes when the protocol is
        not runnign on a python simulation environment.

    !!!warning
        The VisualizationController is attached to the last active visualization handler. This means that if you have
        multiple visualization handlers active for some reason, the controller will only affect the last one. This also
        means that this class should always be instantiated in the protocol's `initialize` method, to avoid
        initialization before the visualization handler is active.
    """
    def __init__(self):
        self._visualization_handler = _active_handler

        if _active_handler is None:
            logging.warning("No visualization handler active, visualization commands will be ignored")

    def paint_node(self, node_id: int, color: Tuple[float, float, float]) -> None:
        """
        Paints a node in the visualization with a specific color.

        Args:
            node_id: ID of the node to paint
            color: RGB color of the node
        """
        if self._visualization_handler is None:
            return

        self._visualization_handler.command_queue.put({
            "command": "paint_node",
            "payload": {
                "node_id": node_id,
                "color": color
            }
        })

    def paint_environment(self, color: Tuple[float, float, float]) -> None:
        """
        Paints the environment in the visualization with a specific color.

        Args:
            color: RGB color of the environment
        """
        if self._visualization_handler is None:
            return

        self._visualization_handler.command_queue.put({
            "command": "paint_environment",
            "payload": {
                "color": color
            }
        })

    def resize_nodes(self, size: float) -> None:
        """
        Resizes the nodes in the visualization
        Args:
            size: New size of the nodes
        """
        if self._visualization_handler is None:
            return

        self._visualization_handler.command_queue.put({
            "command": "resize_nodes",
            "payload": {
                "size": size
            }
        })

    def show_node_id(self, node_id: int, show: bool):
        """
        Paints or unpaints the node_id text over the node.
        Args:
            node_id: ID of the node
            show: wheter to show the node_id or not
        """
        if self._visualization_handler is None:
            return

        self._visualization_handler.command_queue.put({
            "command": "show_node_id",
            "payload": {
                "node_id": node_id,
                "show": show,
            }
        })

paint_environment(color)

Paints the environment in the visualization with a specific color.

Parameters:

Name Type Description Default
color Tuple[float, float, float]

RGB color of the environment

required
Source code in gradysim/simulator/handler/visualization.py
def paint_environment(self, color: Tuple[float, float, float]) -> None:
    """
    Paints the environment in the visualization with a specific color.

    Args:
        color: RGB color of the environment
    """
    if self._visualization_handler is None:
        return

    self._visualization_handler.command_queue.put({
        "command": "paint_environment",
        "payload": {
            "color": color
        }
    })

paint_node(node_id, color)

Paints a node in the visualization with a specific color.

Parameters:

Name Type Description Default
node_id int

ID of the node to paint

required
color Tuple[float, float, float]

RGB color of the node

required
Source code in gradysim/simulator/handler/visualization.py
def paint_node(self, node_id: int, color: Tuple[float, float, float]) -> None:
    """
    Paints a node in the visualization with a specific color.

    Args:
        node_id: ID of the node to paint
        color: RGB color of the node
    """
    if self._visualization_handler is None:
        return

    self._visualization_handler.command_queue.put({
        "command": "paint_node",
        "payload": {
            "node_id": node_id,
            "color": color
        }
    })

resize_nodes(size)

Resizes the nodes in the visualization Args: size: New size of the nodes

Source code in gradysim/simulator/handler/visualization.py
def resize_nodes(self, size: float) -> None:
    """
    Resizes the nodes in the visualization
    Args:
        size: New size of the nodes
    """
    if self._visualization_handler is None:
        return

    self._visualization_handler.command_queue.put({
        "command": "resize_nodes",
        "payload": {
            "size": size
        }
    })

show_node_id(node_id, show)

Paints or unpaints the node_id text over the node. Args: node_id: ID of the node show: wheter to show the node_id or not

Source code in gradysim/simulator/handler/visualization.py
def show_node_id(self, node_id: int, show: bool):
    """
    Paints or unpaints the node_id text over the node.
    Args:
        node_id: ID of the node
        show: wheter to show the node_id or not
    """
    if self._visualization_handler is None:
        return

    self._visualization_handler.command_queue.put({
        "command": "show_node_id",
        "payload": {
            "node_id": node_id,
            "show": show,
        }
    })

VisualizationHandler

Bases: INodeHandler

Adds visualization to the simulation. Shows regularly updated node position and other simulation information in a graphical representation of the simulation. The graphical representation is web-based and can be accessed via a browser through this link.

The visualization handler uses a separate process to run the WebSocket server, on Windows your script will be rerun when the new process starts. This means that any code that should not be run multiple times should be put in the if __name__ == "__main__" block.

Source code in gradysim/simulator/handler/visualization.py
class VisualizationHandler(INodeHandler):
    """
    Adds visualization to the simulation. Shows regularly updated node position and other simulation information
    in a graphical representation of the simulation. The graphical representation is web-based and can be accessed
    via a browser through [this link](https://project-gradys.github.io/gradys-sim-nextgen-visualization/).

    The visualization handler uses a separate process to run the WebSocket server, on Windows your
    script will be rerun when the new process starts. This means that any code that should not be run multiple times
    should be put in the `if __name__ == "__main__"` block.
    """
    _event_loop: EventLoop
    _nodes: List[Node]
    _information: _VisualizationInformation
    _visualization_state: _VisualizationState
    _information_thread: multiprocessing.Process
    _start_time: float
    command_queue: multiprocessing.Queue

    def __init__(self, configuration: VisualizationConfiguration = VisualizationConfiguration()):
        """
        Constructs the visualization handler.

        Args:
            configuration: Configuration for the visualization handler. If not set all default values will be used.
        """
        self._nodes = []

        # Current simulation information shared with the visualization server
        manager = multiprocessing.Manager()
        self._information = manager.dict()
        self._visualization_state = manager.dict()
        self.command_queue = manager.Queue()
        self._visualization_state["paused"] = False

        self._configuration = configuration

        self._logger = logging.getLogger()

        global _active_handler
        _active_handler = self

    @staticmethod
    def get_label() -> str:
        return "visualization"

    def inject(self, event_loop: EventLoop) -> None:
        self._event_loop = event_loop

        self._event_loop.schedule_event(self._event_loop.current_time + self._configuration.information_collection_interval,
                                        self._report_information,
                                        "Visualization")

    def initialize(self) -> None:
        initialization_information = {
            "x_range": self._configuration.x_range,
            "y_range": self._configuration.y_range,
            "z_range": self._configuration.z_range,
            "nodes": [label_node(node) for node in self._nodes],
        }

        # Initialize with precise CPU timestamp of simulation's start
        # does not use event loop
        self._start_time = time.time()

        self._information_thread = multiprocessing.Process(target=_visualization_thread,
                                                           args=(self._configuration,
                                                                 initialization_information,
                                                                 self._information,
                                                                 self._visualization_state,
                                                                 self.command_queue))
        self._information_thread.start()

    def finalize(self) -> None:
        self._information_thread.terminate()

    def register_node(self, node: Node) -> None:
        self._nodes.append(node)

    def _report_information(self) -> None:
        self._information["nodes"] = [node.id for node in self._nodes]
        self._information["positions"] = [node.position for node in self._nodes]
        self._information["simulation_time"] = self._event_loop.current_time
        self._information["real_time"] = time.time() - self._start_time
        self._information["tracked_variables"] = \
            [node.protocol_encapsulator.protocol.provider.tracked_variables.copy() for node in self._nodes]

        if self._visualization_state["paused"]:
            logging.info("Visualization paused by user")

            while self._visualization_state["paused"]:
                time.sleep(0.1)

            logging.info("Visualization resumed by user")

        self._event_loop.schedule_event(
            self._event_loop.current_time + self._configuration.information_collection_interval,
            self._report_information,
            "Visualization"
        )

__init__(configuration=VisualizationConfiguration())

Constructs the visualization handler.

Parameters:

Name Type Description Default
configuration VisualizationConfiguration

Configuration for the visualization handler. If not set all default values will be used.

VisualizationConfiguration()
Source code in gradysim/simulator/handler/visualization.py
def __init__(self, configuration: VisualizationConfiguration = VisualizationConfiguration()):
    """
    Constructs the visualization handler.

    Args:
        configuration: Configuration for the visualization handler. If not set all default values will be used.
    """
    self._nodes = []

    # Current simulation information shared with the visualization server
    manager = multiprocessing.Manager()
    self._information = manager.dict()
    self._visualization_state = manager.dict()
    self.command_queue = manager.Queue()
    self._visualization_state["paused"] = False

    self._configuration = configuration

    self._logger = logging.getLogger()

    global _active_handler
    _active_handler = self