Skip to content

RAFT Consensus

gradysim.protocol.plugin.raft

RAFT Consensus Plugin for GrADyS-SIM

This module provides distributed consensus capabilities using the RAFT algorithm. It enables protocols to reach agreement on shared values in a fault-tolerant manner.

Key Features:

  • Fault-tolerant consensus with node failure handling

  • Active node discovery for dynamic majority calculations

  • Heartbeat-based failure detection

  • Dual operation modes (Classic and Fault-Tolerant)

  • Seamless integration with GrADyS-SIM protocols

  • Consensus variables instead of traditional log replication

Example:

from gradysim.protocol.plugin.raft import RaftConfig, RaftMode, RaftConsensusPlugin

# Configure consensus
config = RaftConfig()
config.set_election_timeout(150, 300)
config.set_heartbeat_interval(50)
config.add_consensus_variable("sequence", int)
config.set_raft_mode(RaftMode.FAULT_TOLERANT)

# Initialize and start
consensus = RaftConsensusPlugin(config=config, protocol=self)
consensus.set_known_nodes([0, 1, 2, 3, 4])
consensus.start()

# Propose values (leader only)
if consensus.is_leader():
    consensus.propose_value("sequence", 42)

FailureConfig

Configuration for heartbeat-based failure detection.

This class manages all parameters related to failure detection, including thresholds, intervals, and timeouts.

Source code in gradysim/protocol/plugin/raft/failure_detection/failure_config.py
class FailureConfig:
    """
    Configuration for heartbeat-based failure detection.

    This class manages all parameters related to failure detection,
    including thresholds, intervals, and timeouts.
    """

    def __init__(self):
        # Default configuration
        self._failure_threshold = 3          # Failed heartbeats to mark as failed
        self._recovery_threshold = 2         # Successful heartbeats to recover
        self._detection_interval = 2         # Check failures every N heartbeats
        self._heartbeat_interval_ms = None   # Reference to heartbeat_interval for calculation
        self._timeout_multiplier = 4         # Default multiplier (4× heartbeat_interval)
        self._heartbeat_timeout_ms = None    # Will be calculated when heartbeat_interval is set

        # Absolute timeout configuration
        self._absolute_timeout_ms = None     # Absolute timeout in milliseconds
        self._use_absolute_timeout = False   # Whether to use absolute timeout instead of relative

    def set_failure_threshold(self, threshold: int) -> None:
        """
        Set the number of consecutive failed heartbeats to mark a node as failed.

        Args:
            threshold: Number of failed heartbeats (default: 3)
        """
        if threshold < 1:
            raise ValueError("Failure threshold must be at least 1")
        self._failure_threshold = threshold

    def set_recovery_threshold(self, threshold: int) -> None:
        """
        Set the number of consecutive successful heartbeats to mark a node as recovered.

        Args:
            threshold: Number of successful heartbeats (default: 2)
        """
        if threshold < 1:
            raise ValueError("Recovery threshold must be at least 1")
        self._recovery_threshold = threshold

    def set_detection_interval(self, interval: int) -> None:
        """
        Set how often to check for failures (in heartbeat intervals).

        Args:
            interval: Check every N heartbeats (default: 2)
        """
        if interval < 1:
            raise ValueError("Detection interval must be at least 1")
        self._detection_interval = interval

    def set_heartbeat_timeout(self, multiplier: int) -> None:
        """
        Set the timeout for heartbeat responses as a multiple of heartbeat_interval.

        Args:
            multiplier: Number of heartbeat intervals to wait (e.g., 4 = 4× heartbeat_interval)
        """
        if multiplier < 1:
            raise ValueError("Timeout multiplier must be at least 1")

        self._timeout_multiplier = multiplier
        self._use_absolute_timeout = False  # Switch back to relative timeout

        # Validate timeout if heartbeat_interval is already set
        if self._heartbeat_interval_ms is not None:
            timeout_ms = multiplier * self._heartbeat_interval_ms
            if timeout_ms < 100:
                raise ValueError(f"Resulting timeout {timeout_ms}ms must be at least 100ms")

    def set_absolute_timeout(self, timeout_ms: int) -> None:
        """
        Set an absolute timeout for heartbeat responses independent of heartbeat_interval.

        Args:
            timeout_ms: Timeout in milliseconds (must be positive)
        """
        if timeout_ms <= 0:
            raise ValueError("Absolute timeout must be positive")

        # Warning for very low values that may cause false positives
        if timeout_ms < 50:
            import warnings
            warnings.warn(f"Very low timeout ({timeout_ms}ms) may cause false positives in some network conditions")

        self._absolute_timeout_ms = timeout_ms
        self._use_absolute_timeout = True  # Switch to absolute timeout

    def set_heartbeat_interval_reference(self, interval_ms: int) -> None:
        """
        Set the heartbeat interval reference.
        This is called internally by RaftConfig when heartbeat_interval is set.

        Args:
            interval_ms: Heartbeat interval in milliseconds
        """
        if interval_ms <= 0:
            raise ValueError("Heartbeat interval must be positive")

        self._heartbeat_interval_ms = interval_ms

        # Validate timeout if multiplier is already set
        if self._timeout_multiplier is not None:
            timeout_ms = self._timeout_multiplier * interval_ms
            if timeout_ms < 100:
                raise ValueError(f"Resulting timeout {timeout_ms}ms must be at least 100ms")

    def get_heartbeat_interval_reference(self) -> Optional[int]:
        """
        Get the heartbeat interval reference.

        Returns:
            Heartbeat interval in milliseconds, or None if not set
        """
        return self._heartbeat_interval_ms

    def get_timeout_multiplier(self) -> Optional[int]:
        """
        Get the current timeout as a multiplier of heartbeat interval.

        Returns:
            Multiplier value, or None if heartbeat_interval is not set
        """
        if self._heartbeat_interval_ms is None or self._heartbeat_interval_ms == 0:
            return None

        # Calculate timeout dynamically
        timeout_ms = self.get_timeout_ms()
        return timeout_ms // self._heartbeat_interval_ms

    def get_absolute_timeout_ms(self) -> Optional[int]:
        """
        Get the absolute timeout value.

        Returns:
            Absolute timeout in milliseconds, or None if not set
        """
        return self._absolute_timeout_ms

    def is_using_absolute_timeout(self) -> bool:
        """
        Check if absolute timeout is being used.

        Returns:
            True if absolute timeout is enabled, False otherwise
        """
        return self._use_absolute_timeout

    def get_timeout_ms(self) -> int:
        """
        Get the timeout in milliseconds.

        Returns:
            Timeout in milliseconds (absolute or calculated as multiplier × heartbeat_interval)
        Raises:
            ValueError: If neither absolute timeout nor heartbeat_interval/multiplier is set
        """
        # If absolute timeout is configured, use it
        if self._use_absolute_timeout and self._absolute_timeout_ms is not None:
            return self._absolute_timeout_ms

        # Otherwise, use relative timeout calculation
        if self._heartbeat_interval_ms is None:
            raise ValueError("heartbeat_interval must be set before calculating timeout")
        if self._timeout_multiplier is None:
            raise ValueError("Timeout multiplier must be set before calculating timeout")
        return self._timeout_multiplier * self._heartbeat_interval_ms

    @property
    def failure_threshold(self) -> int:
        """Get the failure threshold."""
        return self._failure_threshold

    @property
    def recovery_threshold(self) -> int:
        """Get the recovery threshold."""
        return self._recovery_threshold

    @property
    def detection_interval(self) -> int:
        """Get the detection interval."""
        return self._detection_interval

    @property
    def heartbeat_timeout_ms(self) -> int:
        """Get the heartbeat timeout in milliseconds (calculated dynamically)."""
        return self.get_timeout_ms()

    def __str__(self) -> str:
        """String representation of the configuration."""
        if self._use_absolute_timeout and self._absolute_timeout_ms is not None:
            timeout_info = f"absolute_timeout={self._absolute_timeout_ms}ms"
        else:
            timeout_info = f"timeout={self.get_timeout_ms()}ms"
            if self._heartbeat_interval_ms is not None and self._timeout_multiplier is not None:
                timeout_info = f"timeout={self._timeout_multiplier}×{self._heartbeat_interval_ms}ms={self.get_timeout_ms()}ms"

        return (f"FailureConfig(failure_threshold={self._failure_threshold}, "
                f"recovery_threshold={self._recovery_threshold}, "
                f"detection_interval={self._detection_interval}, "
                f"{timeout_info})") 

detection_interval: int property

Get the detection interval.

failure_threshold: int property

Get the failure threshold.

heartbeat_timeout_ms: int property

Get the heartbeat timeout in milliseconds (calculated dynamically).

recovery_threshold: int property

Get the recovery threshold.

__str__()

String representation of the configuration.

Source code in gradysim/protocol/plugin/raft/failure_detection/failure_config.py
def __str__(self) -> str:
    """String representation of the configuration."""
    if self._use_absolute_timeout and self._absolute_timeout_ms is not None:
        timeout_info = f"absolute_timeout={self._absolute_timeout_ms}ms"
    else:
        timeout_info = f"timeout={self.get_timeout_ms()}ms"
        if self._heartbeat_interval_ms is not None and self._timeout_multiplier is not None:
            timeout_info = f"timeout={self._timeout_multiplier}×{self._heartbeat_interval_ms}ms={self.get_timeout_ms()}ms"

    return (f"FailureConfig(failure_threshold={self._failure_threshold}, "
            f"recovery_threshold={self._recovery_threshold}, "
            f"detection_interval={self._detection_interval}, "
            f"{timeout_info})") 

get_absolute_timeout_ms()

Get the absolute timeout value.

Returns:

Type Description
Optional[int]

Absolute timeout in milliseconds, or None if not set

Source code in gradysim/protocol/plugin/raft/failure_detection/failure_config.py
def get_absolute_timeout_ms(self) -> Optional[int]:
    """
    Get the absolute timeout value.

    Returns:
        Absolute timeout in milliseconds, or None if not set
    """
    return self._absolute_timeout_ms

get_heartbeat_interval_reference()

Get the heartbeat interval reference.

Returns:

Type Description
Optional[int]

Heartbeat interval in milliseconds, or None if not set

Source code in gradysim/protocol/plugin/raft/failure_detection/failure_config.py
def get_heartbeat_interval_reference(self) -> Optional[int]:
    """
    Get the heartbeat interval reference.

    Returns:
        Heartbeat interval in milliseconds, or None if not set
    """
    return self._heartbeat_interval_ms

get_timeout_ms()

Get the timeout in milliseconds.

Returns:

Type Description
int

Timeout in milliseconds (absolute or calculated as multiplier × heartbeat_interval)

Raises: ValueError: If neither absolute timeout nor heartbeat_interval/multiplier is set

Source code in gradysim/protocol/plugin/raft/failure_detection/failure_config.py
def get_timeout_ms(self) -> int:
    """
    Get the timeout in milliseconds.

    Returns:
        Timeout in milliseconds (absolute or calculated as multiplier × heartbeat_interval)
    Raises:
        ValueError: If neither absolute timeout nor heartbeat_interval/multiplier is set
    """
    # If absolute timeout is configured, use it
    if self._use_absolute_timeout and self._absolute_timeout_ms is not None:
        return self._absolute_timeout_ms

    # Otherwise, use relative timeout calculation
    if self._heartbeat_interval_ms is None:
        raise ValueError("heartbeat_interval must be set before calculating timeout")
    if self._timeout_multiplier is None:
        raise ValueError("Timeout multiplier must be set before calculating timeout")
    return self._timeout_multiplier * self._heartbeat_interval_ms

get_timeout_multiplier()

Get the current timeout as a multiplier of heartbeat interval.

Returns:

Type Description
Optional[int]

Multiplier value, or None if heartbeat_interval is not set

Source code in gradysim/protocol/plugin/raft/failure_detection/failure_config.py
def get_timeout_multiplier(self) -> Optional[int]:
    """
    Get the current timeout as a multiplier of heartbeat interval.

    Returns:
        Multiplier value, or None if heartbeat_interval is not set
    """
    if self._heartbeat_interval_ms is None or self._heartbeat_interval_ms == 0:
        return None

    # Calculate timeout dynamically
    timeout_ms = self.get_timeout_ms()
    return timeout_ms // self._heartbeat_interval_ms

is_using_absolute_timeout()

Check if absolute timeout is being used.

Returns:

Type Description
bool

True if absolute timeout is enabled, False otherwise

Source code in gradysim/protocol/plugin/raft/failure_detection/failure_config.py
def is_using_absolute_timeout(self) -> bool:
    """
    Check if absolute timeout is being used.

    Returns:
        True if absolute timeout is enabled, False otherwise
    """
    return self._use_absolute_timeout

set_absolute_timeout(timeout_ms)

Set an absolute timeout for heartbeat responses independent of heartbeat_interval.

Parameters:

Name Type Description Default
timeout_ms int

Timeout in milliseconds (must be positive)

required
Source code in gradysim/protocol/plugin/raft/failure_detection/failure_config.py
def set_absolute_timeout(self, timeout_ms: int) -> None:
    """
    Set an absolute timeout for heartbeat responses independent of heartbeat_interval.

    Args:
        timeout_ms: Timeout in milliseconds (must be positive)
    """
    if timeout_ms <= 0:
        raise ValueError("Absolute timeout must be positive")

    # Warning for very low values that may cause false positives
    if timeout_ms < 50:
        import warnings
        warnings.warn(f"Very low timeout ({timeout_ms}ms) may cause false positives in some network conditions")

    self._absolute_timeout_ms = timeout_ms
    self._use_absolute_timeout = True  # Switch to absolute timeout

set_detection_interval(interval)

Set how often to check for failures (in heartbeat intervals).

Parameters:

Name Type Description Default
interval int

Check every N heartbeats (default: 2)

required
Source code in gradysim/protocol/plugin/raft/failure_detection/failure_config.py
def set_detection_interval(self, interval: int) -> None:
    """
    Set how often to check for failures (in heartbeat intervals).

    Args:
        interval: Check every N heartbeats (default: 2)
    """
    if interval < 1:
        raise ValueError("Detection interval must be at least 1")
    self._detection_interval = interval

set_failure_threshold(threshold)

Set the number of consecutive failed heartbeats to mark a node as failed.

Parameters:

Name Type Description Default
threshold int

Number of failed heartbeats (default: 3)

required
Source code in gradysim/protocol/plugin/raft/failure_detection/failure_config.py
def set_failure_threshold(self, threshold: int) -> None:
    """
    Set the number of consecutive failed heartbeats to mark a node as failed.

    Args:
        threshold: Number of failed heartbeats (default: 3)
    """
    if threshold < 1:
        raise ValueError("Failure threshold must be at least 1")
    self._failure_threshold = threshold

set_heartbeat_interval_reference(interval_ms)

Set the heartbeat interval reference. This is called internally by RaftConfig when heartbeat_interval is set.

Parameters:

Name Type Description Default
interval_ms int

Heartbeat interval in milliseconds

required
Source code in gradysim/protocol/plugin/raft/failure_detection/failure_config.py
def set_heartbeat_interval_reference(self, interval_ms: int) -> None:
    """
    Set the heartbeat interval reference.
    This is called internally by RaftConfig when heartbeat_interval is set.

    Args:
        interval_ms: Heartbeat interval in milliseconds
    """
    if interval_ms <= 0:
        raise ValueError("Heartbeat interval must be positive")

    self._heartbeat_interval_ms = interval_ms

    # Validate timeout if multiplier is already set
    if self._timeout_multiplier is not None:
        timeout_ms = self._timeout_multiplier * interval_ms
        if timeout_ms < 100:
            raise ValueError(f"Resulting timeout {timeout_ms}ms must be at least 100ms")

set_heartbeat_timeout(multiplier)

Set the timeout for heartbeat responses as a multiple of heartbeat_interval.

Parameters:

Name Type Description Default
multiplier int

Number of heartbeat intervals to wait (e.g., 4 = 4× heartbeat_interval)

required
Source code in gradysim/protocol/plugin/raft/failure_detection/failure_config.py
def set_heartbeat_timeout(self, multiplier: int) -> None:
    """
    Set the timeout for heartbeat responses as a multiple of heartbeat_interval.

    Args:
        multiplier: Number of heartbeat intervals to wait (e.g., 4 = 4× heartbeat_interval)
    """
    if multiplier < 1:
        raise ValueError("Timeout multiplier must be at least 1")

    self._timeout_multiplier = multiplier
    self._use_absolute_timeout = False  # Switch back to relative timeout

    # Validate timeout if heartbeat_interval is already set
    if self._heartbeat_interval_ms is not None:
        timeout_ms = multiplier * self._heartbeat_interval_ms
        if timeout_ms < 100:
            raise ValueError(f"Resulting timeout {timeout_ms}ms must be at least 100ms")

set_recovery_threshold(threshold)

Set the number of consecutive successful heartbeats to mark a node as recovered.

Parameters:

Name Type Description Default
threshold int

Number of successful heartbeats (default: 2)

required
Source code in gradysim/protocol/plugin/raft/failure_detection/failure_config.py
def set_recovery_threshold(self, threshold: int) -> None:
    """
    Set the number of consecutive successful heartbeats to mark a node as recovered.

    Args:
        threshold: Number of successful heartbeats (default: 2)
    """
    if threshold < 1:
        raise ValueError("Recovery threshold must be at least 1")
    self._recovery_threshold = threshold

RaftConfig

Configuration class for Raft consensus using Builder pattern.

Provides a fluent interface for configuring all aspects of Raft consensus:

  • Election timeouts

  • Heartbeat intervals

  • Consensus variables

  • Logging options

Example:

config = RaftConfig()

config.set_election_timeout(150, 300)

config.set_heartbeat_interval(50)

config.add_consensus_variable("sequence", int)

config.set_logging(True, "INFO")
Source code in gradysim/protocol/plugin/raft/raft_config.py
class RaftConfig:
    """
    Configuration class for Raft consensus using Builder pattern.

    Provides a fluent interface for configuring all aspects of Raft consensus:

    - Election timeouts

    - Heartbeat intervals

    - Consensus variables

    - Logging options

    Example:

        config = RaftConfig()

        config.set_election_timeout(150, 300)

        config.set_heartbeat_interval(50)

        config.add_consensus_variable("sequence", int)

        config.set_logging(True, "INFO")
    """

    def __init__(self):
        """Initialize RaftConfig with default values."""
        # Election parameters
        self._election_timeout_min = 150  # milliseconds
        self._election_timeout_max = 300  # milliseconds

        # Heartbeat parameters
        self._heartbeat_interval = 50  # milliseconds

        # Consensus variables
        self._consensus_variables: Dict[str, Type] = {}

        # Logging and debugging
        self._enable_logging = True
        self._log_level = "INFO"

        # Failure detection (always enabled for system reliability)
        self._failure_config = FailureConfig()

        # Raft operation mode
        self._raft_mode = RaftMode.FAULT_TOLERANT  # Default to fault-tolerant (current behavior)



    def set_election_timeout(self, min_timeout: int, max_timeout: int) -> 'RaftConfig':
        """
        Set election timeout range.

        Args:
            min_timeout: Minimum election timeout in milliseconds
            max_timeout: Maximum election timeout in milliseconds

        Returns:
            Self for method chaining

        Raises:
            ValueError: If min_timeout >= max_timeout or timeouts are negative
        """
        if min_timeout >= max_timeout:
            raise ValueError("min_timeout must be less than max_timeout")
        if min_timeout < 0 or max_timeout < 0:
            raise ValueError("Timeouts must be non-negative")

        self._election_timeout_min = min_timeout
        self._election_timeout_max = max_timeout
        return self

    def set_heartbeat_interval(self, interval: int) -> 'RaftConfig':
        """
        Set heartbeat interval.

        Args:
            interval: Heartbeat interval in milliseconds

        Returns:
            Self for method chaining

        Raises:
            ValueError: If interval is negative
        """
        if interval < 0:
            raise ValueError("Heartbeat interval must be non-negative")

        self._heartbeat_interval = interval

        # Update failure detection configuration with heartbeat interval reference
        self._failure_config.set_heartbeat_interval_reference(interval)

        return self

    def add_consensus_variable(self, name: str, var_type: Type) -> 'RaftConfig':
        """
        Add a consensus variable.

        Args:
            name: Name of the consensus variable
            var_type: Type of the variable (e.g., int, str, float)

        Returns:
            Self for method chaining

        Raises:
            ValueError: If variable name is empty or already exists
        """
        if not name or not name.strip():
            raise ValueError("Variable name cannot be empty")
        if name in self._consensus_variables:
            raise ValueError(f"Consensus variable '{name}' already exists")

        self._consensus_variables[name] = var_type
        return self

    def remove_consensus_variable(self, name: str) -> 'RaftConfig':
        """
        Remove a consensus variable.

        Args:
            name: Name of the consensus variable to remove

        Returns:
            Self for method chaining
        """
        if name in self._consensus_variables:
            del self._consensus_variables[name]
        return self

    def set_logging(self, enable: bool, level: str = "INFO") -> 'RaftConfig':
        """
        Configure logging.

        Args:
            enable: Whether to enable logging
            level: Log level (DEBUG, INFO, WARNING, ERROR)

        Returns:
            Self for method chaining
        """
        self._enable_logging = enable
        self._log_level = level.upper()
        return self

    def set_raft_mode(self, mode: RaftMode) -> 'RaftConfig':
        """
        Set the Raft operation mode.

        Args:
            mode: Raft operation mode
                  CLASSIC - Classic Raft behavior (fixed cluster size)
                  FAULT_TOLERANT - Fault-tolerant Raft behavior (dynamic active node count)

        Returns:
            Self for method chaining
        """
        if not isinstance(mode, RaftMode):
            raise ValueError("mode must be a RaftMode enum value")

        self._raft_mode = mode
        return self





    def get_failure_config(self) -> FailureConfig:
        """
        Get the failure detection configuration.

        Returns:
            FailureConfig instance for configuring failure detection
        """
        return self._failure_config

    def get_raft_mode(self) -> RaftMode:
        """
        Get the current Raft operation mode.

        Returns:
            Current Raft operation mode
        """
        return self._raft_mode

    def is_classic_mode(self) -> bool:
        """
        Check if running in classic Raft mode.

        Returns:
            True if classic mode, False if fault-tolerant mode
        """
        return self._raft_mode == RaftMode.CLASSIC

    def is_fault_tolerant_mode(self) -> bool:
        """
        Check if running in fault-tolerant Raft mode.

        Returns:
            True if fault-tolerant mode, False if classic mode
        """
        return self._raft_mode == RaftMode.FAULT_TOLERANT





    def get_random_election_timeout(self) -> int:
        """
        Get a random election timeout within the configured range.

        Returns:
            Random election timeout in milliseconds
        """
        return random.randint(self._election_timeout_min, self._election_timeout_max)

    def get_consensus_variables(self) -> Dict[str, Type]:
        """
        Get all configured consensus variables.

        Returns:
            Dictionary mapping variable names to their types
        """
        return self._consensus_variables.copy()

    def has_consensus_variable(self, name: str) -> bool:
        """
        Check if a consensus variable exists.

        Args:
            name: Name of the consensus variable

        Returns:
            True if the variable exists, False otherwise
        """
        return name in self._consensus_variables

    def get_consensus_variable_type(self, name: str) -> Optional[Type]:
        """
        Get the type of a consensus variable.

        Args:
            name: Name of the consensus variable

        Returns:
            Type of the variable, or None if not found
        """
        return self._consensus_variables.get(name)

    def validate(self) -> List[str]:
        """
        Validate the configuration.

        Returns:
            List of validation errors (empty if valid)
        """
        errors = []

        # Check election timeout
        if self._election_timeout_min >= self._election_timeout_max:
            errors.append("min_election_timeout must be less than max_election_timeout")

        # Check heartbeat interval
        if self._heartbeat_interval >= self._election_timeout_min:
            errors.append("heartbeat_interval should be less than min_election_timeout")

        # Check consensus variables
        if not self._consensus_variables:
            errors.append("At least one consensus variable must be configured")



        return errors

    def to_dict(self) -> Dict[str, Any]:
        """
        Convert configuration to dictionary.

        Returns:
            Dictionary representation of the configuration
        """
        config_dict = {
            "election_timeout_min": self._election_timeout_min,
            "election_timeout_max": self._election_timeout_max,
            "heartbeat_interval": self._heartbeat_interval,
            "consensus_variables": {name: var_type.__name__ 
                                  for name, var_type in self._consensus_variables.items()},
            "enable_logging": self._enable_logging,
            "log_level": self._log_level,
            "raft_mode": self._raft_mode.value,
            "failure_config": str(self._failure_config)
        }

        # Add heartbeat timeout multiplier information if available
        multiplier = self._failure_config.get_timeout_multiplier()
        if multiplier is not None:
            config_dict["heartbeat_timeout_multiplier"] = multiplier

        return config_dict

    def __str__(self) -> str:
        """Return string representation of the configuration."""
        return f"RaftConfig({self.to_dict()})"

    def __repr__(self) -> str:
        """Return detailed string representation of the configuration."""
        return f"RaftConfig(election_timeout=({self._election_timeout_min}, {self._election_timeout_max}), " \
               f"heartbeat_interval={self._heartbeat_interval}, " \
               f"consensus_variables={list(self._consensus_variables.keys())})" 

__init__()

Initialize RaftConfig with default values.

Source code in gradysim/protocol/plugin/raft/raft_config.py
def __init__(self):
    """Initialize RaftConfig with default values."""
    # Election parameters
    self._election_timeout_min = 150  # milliseconds
    self._election_timeout_max = 300  # milliseconds

    # Heartbeat parameters
    self._heartbeat_interval = 50  # milliseconds

    # Consensus variables
    self._consensus_variables: Dict[str, Type] = {}

    # Logging and debugging
    self._enable_logging = True
    self._log_level = "INFO"

    # Failure detection (always enabled for system reliability)
    self._failure_config = FailureConfig()

    # Raft operation mode
    self._raft_mode = RaftMode.FAULT_TOLERANT  # Default to fault-tolerant (current behavior)

__repr__()

Return detailed string representation of the configuration.

Source code in gradysim/protocol/plugin/raft/raft_config.py
def __repr__(self) -> str:
    """Return detailed string representation of the configuration."""
    return f"RaftConfig(election_timeout=({self._election_timeout_min}, {self._election_timeout_max}), " \
           f"heartbeat_interval={self._heartbeat_interval}, " \
           f"consensus_variables={list(self._consensus_variables.keys())})" 

__str__()

Return string representation of the configuration.

Source code in gradysim/protocol/plugin/raft/raft_config.py
def __str__(self) -> str:
    """Return string representation of the configuration."""
    return f"RaftConfig({self.to_dict()})"

add_consensus_variable(name, var_type)

Add a consensus variable.

Parameters:

Name Type Description Default
name str

Name of the consensus variable

required
var_type Type

Type of the variable (e.g., int, str, float)

required

Returns:

Type Description
RaftConfig

Self for method chaining

Raises:

Type Description
ValueError

If variable name is empty or already exists

Source code in gradysim/protocol/plugin/raft/raft_config.py
def add_consensus_variable(self, name: str, var_type: Type) -> 'RaftConfig':
    """
    Add a consensus variable.

    Args:
        name: Name of the consensus variable
        var_type: Type of the variable (e.g., int, str, float)

    Returns:
        Self for method chaining

    Raises:
        ValueError: If variable name is empty or already exists
    """
    if not name or not name.strip():
        raise ValueError("Variable name cannot be empty")
    if name in self._consensus_variables:
        raise ValueError(f"Consensus variable '{name}' already exists")

    self._consensus_variables[name] = var_type
    return self

get_consensus_variable_type(name)

Get the type of a consensus variable.

Parameters:

Name Type Description Default
name str

Name of the consensus variable

required

Returns:

Type Description
Optional[Type]

Type of the variable, or None if not found

Source code in gradysim/protocol/plugin/raft/raft_config.py
def get_consensus_variable_type(self, name: str) -> Optional[Type]:
    """
    Get the type of a consensus variable.

    Args:
        name: Name of the consensus variable

    Returns:
        Type of the variable, or None if not found
    """
    return self._consensus_variables.get(name)

get_consensus_variables()

Get all configured consensus variables.

Returns:

Type Description
Dict[str, Type]

Dictionary mapping variable names to their types

Source code in gradysim/protocol/plugin/raft/raft_config.py
def get_consensus_variables(self) -> Dict[str, Type]:
    """
    Get all configured consensus variables.

    Returns:
        Dictionary mapping variable names to their types
    """
    return self._consensus_variables.copy()

get_failure_config()

Get the failure detection configuration.

Returns:

Type Description
FailureConfig

FailureConfig instance for configuring failure detection

Source code in gradysim/protocol/plugin/raft/raft_config.py
def get_failure_config(self) -> FailureConfig:
    """
    Get the failure detection configuration.

    Returns:
        FailureConfig instance for configuring failure detection
    """
    return self._failure_config

get_raft_mode()

Get the current Raft operation mode.

Returns:

Type Description
RaftMode

Current Raft operation mode

Source code in gradysim/protocol/plugin/raft/raft_config.py
def get_raft_mode(self) -> RaftMode:
    """
    Get the current Raft operation mode.

    Returns:
        Current Raft operation mode
    """
    return self._raft_mode

get_random_election_timeout()

Get a random election timeout within the configured range.

Returns:

Type Description
int

Random election timeout in milliseconds

Source code in gradysim/protocol/plugin/raft/raft_config.py
def get_random_election_timeout(self) -> int:
    """
    Get a random election timeout within the configured range.

    Returns:
        Random election timeout in milliseconds
    """
    return random.randint(self._election_timeout_min, self._election_timeout_max)

has_consensus_variable(name)

Check if a consensus variable exists.

Parameters:

Name Type Description Default
name str

Name of the consensus variable

required

Returns:

Type Description
bool

True if the variable exists, False otherwise

Source code in gradysim/protocol/plugin/raft/raft_config.py
def has_consensus_variable(self, name: str) -> bool:
    """
    Check if a consensus variable exists.

    Args:
        name: Name of the consensus variable

    Returns:
        True if the variable exists, False otherwise
    """
    return name in self._consensus_variables

is_classic_mode()

Check if running in classic Raft mode.

Returns:

Type Description
bool

True if classic mode, False if fault-tolerant mode

Source code in gradysim/protocol/plugin/raft/raft_config.py
def is_classic_mode(self) -> bool:
    """
    Check if running in classic Raft mode.

    Returns:
        True if classic mode, False if fault-tolerant mode
    """
    return self._raft_mode == RaftMode.CLASSIC

is_fault_tolerant_mode()

Check if running in fault-tolerant Raft mode.

Returns:

Type Description
bool

True if fault-tolerant mode, False if classic mode

Source code in gradysim/protocol/plugin/raft/raft_config.py
def is_fault_tolerant_mode(self) -> bool:
    """
    Check if running in fault-tolerant Raft mode.

    Returns:
        True if fault-tolerant mode, False if classic mode
    """
    return self._raft_mode == RaftMode.FAULT_TOLERANT

remove_consensus_variable(name)

Remove a consensus variable.

Parameters:

Name Type Description Default
name str

Name of the consensus variable to remove

required

Returns:

Type Description
RaftConfig

Self for method chaining

Source code in gradysim/protocol/plugin/raft/raft_config.py
def remove_consensus_variable(self, name: str) -> 'RaftConfig':
    """
    Remove a consensus variable.

    Args:
        name: Name of the consensus variable to remove

    Returns:
        Self for method chaining
    """
    if name in self._consensus_variables:
        del self._consensus_variables[name]
    return self

set_election_timeout(min_timeout, max_timeout)

Set election timeout range.

Parameters:

Name Type Description Default
min_timeout int

Minimum election timeout in milliseconds

required
max_timeout int

Maximum election timeout in milliseconds

required

Returns:

Type Description
RaftConfig

Self for method chaining

Raises:

Type Description
ValueError

If min_timeout >= max_timeout or timeouts are negative

Source code in gradysim/protocol/plugin/raft/raft_config.py
def set_election_timeout(self, min_timeout: int, max_timeout: int) -> 'RaftConfig':
    """
    Set election timeout range.

    Args:
        min_timeout: Minimum election timeout in milliseconds
        max_timeout: Maximum election timeout in milliseconds

    Returns:
        Self for method chaining

    Raises:
        ValueError: If min_timeout >= max_timeout or timeouts are negative
    """
    if min_timeout >= max_timeout:
        raise ValueError("min_timeout must be less than max_timeout")
    if min_timeout < 0 or max_timeout < 0:
        raise ValueError("Timeouts must be non-negative")

    self._election_timeout_min = min_timeout
    self._election_timeout_max = max_timeout
    return self

set_heartbeat_interval(interval)

Set heartbeat interval.

Parameters:

Name Type Description Default
interval int

Heartbeat interval in milliseconds

required

Returns:

Type Description
RaftConfig

Self for method chaining

Raises:

Type Description
ValueError

If interval is negative

Source code in gradysim/protocol/plugin/raft/raft_config.py
def set_heartbeat_interval(self, interval: int) -> 'RaftConfig':
    """
    Set heartbeat interval.

    Args:
        interval: Heartbeat interval in milliseconds

    Returns:
        Self for method chaining

    Raises:
        ValueError: If interval is negative
    """
    if interval < 0:
        raise ValueError("Heartbeat interval must be non-negative")

    self._heartbeat_interval = interval

    # Update failure detection configuration with heartbeat interval reference
    self._failure_config.set_heartbeat_interval_reference(interval)

    return self

set_logging(enable, level='INFO')

Configure logging.

Parameters:

Name Type Description Default
enable bool

Whether to enable logging

required
level str

Log level (DEBUG, INFO, WARNING, ERROR)

'INFO'

Returns:

Type Description
RaftConfig

Self for method chaining

Source code in gradysim/protocol/plugin/raft/raft_config.py
def set_logging(self, enable: bool, level: str = "INFO") -> 'RaftConfig':
    """
    Configure logging.

    Args:
        enable: Whether to enable logging
        level: Log level (DEBUG, INFO, WARNING, ERROR)

    Returns:
        Self for method chaining
    """
    self._enable_logging = enable
    self._log_level = level.upper()
    return self

set_raft_mode(mode)

Set the Raft operation mode.

Parameters:

Name Type Description Default
mode RaftMode

Raft operation mode CLASSIC - Classic Raft behavior (fixed cluster size) FAULT_TOLERANT - Fault-tolerant Raft behavior (dynamic active node count)

required

Returns:

Type Description
RaftConfig

Self for method chaining

Source code in gradysim/protocol/plugin/raft/raft_config.py
def set_raft_mode(self, mode: RaftMode) -> 'RaftConfig':
    """
    Set the Raft operation mode.

    Args:
        mode: Raft operation mode
              CLASSIC - Classic Raft behavior (fixed cluster size)
              FAULT_TOLERANT - Fault-tolerant Raft behavior (dynamic active node count)

    Returns:
        Self for method chaining
    """
    if not isinstance(mode, RaftMode):
        raise ValueError("mode must be a RaftMode enum value")

    self._raft_mode = mode
    return self

to_dict()

Convert configuration to dictionary.

Returns:

Type Description
Dict[str, Any]

Dictionary representation of the configuration

Source code in gradysim/protocol/plugin/raft/raft_config.py
def to_dict(self) -> Dict[str, Any]:
    """
    Convert configuration to dictionary.

    Returns:
        Dictionary representation of the configuration
    """
    config_dict = {
        "election_timeout_min": self._election_timeout_min,
        "election_timeout_max": self._election_timeout_max,
        "heartbeat_interval": self._heartbeat_interval,
        "consensus_variables": {name: var_type.__name__ 
                              for name, var_type in self._consensus_variables.items()},
        "enable_logging": self._enable_logging,
        "log_level": self._log_level,
        "raft_mode": self._raft_mode.value,
        "failure_config": str(self._failure_config)
    }

    # Add heartbeat timeout multiplier information if available
    multiplier = self._failure_config.get_timeout_multiplier()
    if multiplier is not None:
        config_dict["heartbeat_timeout_multiplier"] = multiplier

    return config_dict

validate()

Validate the configuration.

Returns:

Type Description
List[str]

List of validation errors (empty if valid)

Source code in gradysim/protocol/plugin/raft/raft_config.py
def validate(self) -> List[str]:
    """
    Validate the configuration.

    Returns:
        List of validation errors (empty if valid)
    """
    errors = []

    # Check election timeout
    if self._election_timeout_min >= self._election_timeout_max:
        errors.append("min_election_timeout must be less than max_election_timeout")

    # Check heartbeat interval
    if self._heartbeat_interval >= self._election_timeout_min:
        errors.append("heartbeat_interval should be less than min_election_timeout")

    # Check consensus variables
    if not self._consensus_variables:
        errors.append("At least one consensus variable must be configured")



    return errors

RaftConsensusPlugin

Main interface for Raft consensus implementation.

Provides a simple, clean API for integrating Raft consensus into Gradysim protocols. This class implements the Facade pattern to hide the complexity of the underlying Raft implementation.

Main Methods
  • start() / stop(): Control consensus lifecycle
  • propose_value(): Propose new values (leader only)
  • get_committed_value() / get_all_committed_values(): Retrieve consensus values
  • is_leader() / get_leader_id(): Check leadership status
  • get_current_term() / get_current_state(): Get consensus state
  • handle_message() / handle_timer(): Process Raft protocol events
  • set_known_nodes(): Configure cluster membership
  • get_statistics() / get_state_info(): Get debugging information
Example

1. Configure consensus

config = RaftConfig() config.set_election_timeout(150, 300) # 150-300ms election timeout config.set_heartbeat_interval(50) # 50ms heartbeat interval config.add_consensus_variable("sequence", int) config.add_consensus_variable("leader_position", str) config.set_logging(enable=True, level="INFO")

2. Create consensus instance (simplified)

consensus = RaftConsensus(config=config, protocol=protocol)

3. Set known nodes and start consensus

consensus.set_known_nodes([1, 2, 3, 4, 5]) consensus.start()

4. Propose values (only works if this node is leader)

if consensus.is_leader(): consensus.propose_value("sequence", 42) consensus.propose_value("leader_position", "north")

5. Get committed values

sequence_value = consensus.get_committed_value("sequence") position_value = consensus.get_committed_value("leader_position")

6. Get all committed values

all_values = consensus.get_all_committed_values()

7. Check consensus state

is_leader = consensus.is_leader() leader_id = consensus.get_leader_id() current_term = consensus.get_current_term() current_state = consensus.get_current_state()

8. Handle messages and timers (call these from your protocol)

consensus.handle_message(message_str) consensus.handle_timer("heartbeat") consensus.handle_timer("election")

9. Get statistics and information

stats = consensus.get_statistics() state_info = consensus.get_state_info() config_info = consensus.get_configuration()

10. Check if system is ready

if consensus.is_ready(): print("Consensus system is ready")

11. Check failure detection (if enabled)

failed_nodes = consensus.get_failed_nodes() active_nodes = consensus.get_active_nodes() if consensus.is_node_failed(3): print("Node 3 is currently failed")

12. Stop consensus when done

consensus.stop()

Source code in gradysim/protocol/plugin/raft/raft_consensus.py
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
class RaftConsensusPlugin:
    """
    Main interface for Raft consensus implementation.

    Provides a simple, clean API for integrating Raft consensus into
    Gradysim protocols. This class implements the Facade pattern to
    hide the complexity of the underlying Raft implementation.

    Main Methods:
        - start() / stop(): Control consensus lifecycle
        - propose_value(): Propose new values (leader only)
        - get_committed_value() / get_all_committed_values(): Retrieve consensus values
        - is_leader() / get_leader_id(): Check leadership status
        - get_current_term() / get_current_state(): Get consensus state
        - handle_message() / handle_timer(): Process Raft protocol events
        - set_known_nodes(): Configure cluster membership
        - get_statistics() / get_state_info(): Get debugging information

    Example:
        # 1. Configure consensus
        config = RaftConfig()
        config.set_election_timeout(150, 300)  # 150-300ms election timeout
        config.set_heartbeat_interval(50)      # 50ms heartbeat interval
        config.add_consensus_variable("sequence", int)
        config.add_consensus_variable("leader_position", str)
        config.set_logging(enable=True, level="INFO")

        # 2. Create consensus instance (simplified)
        consensus = RaftConsensus(config=config, protocol=protocol)

        # 3. Set known nodes and start consensus
        consensus.set_known_nodes([1, 2, 3, 4, 5])
        consensus.start()

        # 4. Propose values (only works if this node is leader)
        if consensus.is_leader():
            consensus.propose_value("sequence", 42)
            consensus.propose_value("leader_position", "north")

        # 5. Get committed values
        sequence_value = consensus.get_committed_value("sequence")
        position_value = consensus.get_committed_value("leader_position")

        # 6. Get all committed values
        all_values = consensus.get_all_committed_values()

        # 7. Check consensus state
        is_leader = consensus.is_leader()
        leader_id = consensus.get_leader_id()
        current_term = consensus.get_current_term()
        current_state = consensus.get_current_state()

        # 8. Handle messages and timers (call these from your protocol)
        consensus.handle_message(message_str)
        consensus.handle_timer("heartbeat")
        consensus.handle_timer("election")

        # 9. Get statistics and information
        stats = consensus.get_statistics()
        state_info = consensus.get_state_info()
        config_info = consensus.get_configuration()

        # 10. Check if system is ready
        if consensus.is_ready():
            print("Consensus system is ready")

        # 11. Check failure detection (if enabled)
        failed_nodes = consensus.get_failed_nodes()
        active_nodes = consensus.get_active_nodes()
        if consensus.is_node_failed(3):
            print("Node 3 is currently failed")

        # 12. Stop consensus when done
        consensus.stop()
    """

    def __init__(self, config: RaftConfig, protocol: IProtocol):
        """Initialize Raft consensus plugin with Gradysim protocol provider."""
        errors = config.validate()
        if errors:
            raise ValueError(f"Invalid configuration: {'; '.join(errors)}")

        self.config = config
        self._protocol = protocol
        self._provider = protocol.provider
        self._dispatch_header = f"{RAFT_DISPATCH_PREFIX}:"

        self._validate_provider()

        self._get_node_id_callback: Optional[Callable[[], int]] = getattr(self._provider, "get_id", None)
        if self._get_node_id_callback is None:
            raise ValueError("Protocol provider must implement get_id() for RaftConsensusPlugin")

        try:
            self.node_id = self._get_node_id_callback()
        except Exception as exc:
            raise ValueError(f"Failed to get node ID from provider: {exc}") from exc

        self.logger = logging.getLogger(f"RaftConsensus-{self.node_id}")
        if config._enable_logging:
            self.logger.setLevel(getattr(logging, config._log_level))

        callbacks = self._build_callbacks()

        self._raft_node = RaftNode(
            node_id=self.node_id,
            config=config,
            callbacks=callbacks
        )

        self._dispatcher = create_dispatcher(protocol)

        self.configure_handle_message()
        self.configure_handle_timer()

        self.logger.info(f"RaftConsensus initialized for node {self.node_id}")

    def _validate_provider(self) -> None:
        """Ensure the protocol provider exposes the expected Gradysim hooks."""
        required_methods = ["send_communication_command", "schedule_timer", "cancel_timer", "current_time"]
        missing = [name for name in required_methods if not callable(getattr(self._provider, name, None))]
        if missing:
            raise ValueError(f"Protocol provider missing required methods: {missing}")

    def _build_callbacks(self) -> Dict[str, Callable[..., Any]]:
        """Create callbacks consumed by the internal Raft node."""
        return {
            "send_message_callback": self._send_message,
            "send_broadcast_callback": self._send_broadcast,
            "schedule_timer_callback": self._schedule_timer,
            "cancel_timer_callback": self._cancel_timer,
            "get_current_time_callback": self._get_current_time
        }

    def _add_dispatch_prefix(self, value: str) -> str:
        """Attach the RAFT dispatcher prefix to timer and message identifiers."""
        value_str = str(value)
        if value_str.startswith(self._dispatch_header):
            return value_str
        return f"{self._dispatch_header}{value_str}"

    def _strip_dispatch_prefix(self, value: str) -> Optional[str]:
        """Remove the RAFT dispatcher prefix; return None if it is not present."""
        if not isinstance(value, str) or not value.startswith(self._dispatch_header):
            return None
        return value[len(self._dispatch_header):]

    def _send_message(self, message: str, target_id: int) -> None:
        """Send point-to-point RAFT messages through the provider."""
        try:
            payload = self._add_dispatch_prefix(message)
            command = CommunicationCommand(CommunicationCommandType.SEND, payload, target_id)
            self._provider.send_communication_command(command)
        except Exception as exc:
            self.logger.error("Error sending RAFT message to node %s: %s", target_id, exc)

    def _send_broadcast(self, message: str) -> None:
        """Broadcast RAFT messages through the provider."""
        try:
            payload = self._add_dispatch_prefix(message)
            command = CommunicationCommand(CommunicationCommandType.BROADCAST, payload)
            self._provider.send_communication_command(command)
        except Exception as exc:
            self.logger.error("Error broadcasting RAFT message: %s", exc)

    def _schedule_timer(self, timer_name: str, delay_ms: int) -> None:
        """Schedule RAFT timers using Gradysim's timing service."""
        prefixed_timer = self._add_dispatch_prefix(timer_name)
        try:
            delay_seconds = delay_ms / 1000.0
            absolute_time = self._provider.current_time() + delay_seconds
            self._provider.schedule_timer(prefixed_timer, absolute_time)
        except Exception as exc:
            self.logger.error("Error scheduling RAFT timer '%s': %s", timer_name, exc)

    def _cancel_timer(self, timer_name: str) -> None:
        """Cancel RAFT timers previously scheduled through the provider."""
        prefixed_timer = self._add_dispatch_prefix(timer_name)
        try:
            self._provider.cancel_timer(prefixed_timer)
        except Exception as exc:
            self.logger.error("Error canceling RAFT timer '%s': %s", timer_name, exc)

    def _get_current_time(self) -> float:
        """Expose the provider current time as required by the Raft node."""
        try:
            return float(self._provider.current_time())
        except Exception as exc:
            raise RuntimeError(f"Failed to get simulation time: {exc}") from exc

    def get_node_id(self) -> int:
        """
        Get the current node ID.

        If a get_node_id_callback was provided during initialization,
        this method will call it to get the current node ID dynamically.
        Otherwise, it returns the static node_id.

        Returns:
            Current node ID
        """
        if self._get_node_id_callback is not None:
            try:
                return self._get_node_id_callback()
            except Exception as e:
                self.logger.error(f"Error getting node ID from callback: {e}")
                return self.node_id  # Fallback to stored node_id
        return self.node_id

    def start(self) -> None:
        """
        Start the consensus process.

        This method initializes the Raft node and begins the election timeout.
        The node will start as a follower and may become a candidate if no
        leader is discovered.
        """
        self.logger.info(f"Starting Raft consensus for node {self.node_id}")
        self._raft_node.start()

    def stop(self) -> None:
        """
        Stop the consensus process.

        This method stops the Raft node and cancels all active timers.
        """
        # Don't log here since RaftNode will log the stop
        self._raft_node.stop()

    def propose_value(self, variable_name: str, value: Any) -> bool:
        """
        Propose a new value for consensus.

        This method can only be called by the current leader. If this node
        is not the leader, the proposal will be rejected.

        Args:
            variable_name: Name of the consensus variable
            value: Value to propose

        Returns:
            True if proposal was accepted, False otherwise

        Raises:
            ValueError: If variable is not configured or value type is invalid
        """
        return self._raft_node.propose_value(variable_name, value)

    def get_committed_value(self, variable_name: str) -> Optional[Any]:
        """
        Get the committed value for a consensus variable.

        Args:
            variable_name: Name of the consensus variable

        Returns:
            Committed value, or None if not available

        Raises:
            ValueError: If variable is not configured
        """
        if not self.config.has_consensus_variable(variable_name):
            raise ValueError(f"Consensus variable '{variable_name}' not configured")

        return self._raft_node.get_committed_value(variable_name)

    def get_all_committed_values(self) -> Dict[str, Any]:
        """
        Get all committed consensus values.

        Returns:
            Dictionary of all committed values
        """
        return self._raft_node.get_all_committed_values()

    def is_leader(self) -> bool:
        """
        Check if this node is the current leader.

        Returns:
            True if this node is the leader, False otherwise
        """
        return self._raft_node.is_leader()

    def get_leader_id(self) -> Optional[int]:
        """
        Get the current leader ID.

        Returns:
            ID of the current leader, or None if no leader is known
        """
        return self._raft_node.get_leader_id()

    def get_current_term(self) -> int:
        """
        Get the current term.

        Returns:
            Current term number
        """
        return self._raft_node.get_current_term()

    def get_current_state(self) -> RaftState:
        """
        Get the current state of this node.

        Returns:
            Current Raft state (FOLLOWER, CANDIDATE, or LEADER)
        """
        return self._raft_node.state

    def configure_handle_message(self) -> None:
        """Intercept RAFT packets and route them to the internal node."""
        def handle_message(_instance: IProtocol, message_str: str) -> DispatchReturn:
            payload = self._strip_dispatch_prefix(message_str)
            if payload is None:
                return DispatchReturn.CONTINUE

            import json

            sender_id = 0
            try:
                data = json.loads(payload)
                sender_id = data.get("sender_id", 0)
            except (json.JSONDecodeError, TypeError, AttributeError):
                pass

            self._raft_node.handle_message(payload, sender_id)
            return DispatchReturn.INTERRUPT

        self._dispatcher.register_handle_packet(handle_message)

    def send_broadcast(self, message: str) -> None:
        """
        Send broadcast message to all nodes.

        This method sends a message to all nodes in the cluster using
        broadcast communication if available.

        Args:
            message: Message content to broadcast
        """
        if hasattr(self._raft_node, '_send_broadcast') and self._raft_node._send_broadcast is not None:
            self._raft_node._send_broadcast(message)
        else:
            self.logger.warning("Broadcast not available, falling back to individual messages")
            # Fallback to individual messages if broadcast not available
            for node_id in getattr(self._raft_node, '_known_nodes', []):
                if node_id != self.node_id:
                    self._raft_node._send_message(message, node_id)

    def configure_handle_timer(self) -> None:
        """Intercept RAFT timers created by the plugin."""
        def handle_timer(_instance: IProtocol, timer_name: str) -> DispatchReturn:
            payload = self._strip_dispatch_prefix(timer_name)
            if payload is None:
                return DispatchReturn.CONTINUE

            self._raft_node.handle_timer(payload)
            return DispatchReturn.INTERRUPT
        self._dispatcher.register_handle_timer(handle_timer)

    def set_known_nodes(self, node_ids: List[int]) -> None:
        """
        Set the list of known node IDs.

        This method should be called to inform the consensus system about
        all nodes in the cluster. This information is used for sending
        messages during elections and heartbeats.

        Args:
            node_ids: List of all node IDs in the cluster
        """
        self._raft_node.set_known_nodes(node_ids)

        detector = getattr(self._raft_node, '_heartbeat_detector', None)
        if detector and hasattr(self._provider, 'set_failure_detector'):
            try:
                self._provider.set_failure_detector(detector)
                self.logger.info('Connected failure detector to provider for connectivity-based failure detection')
            except Exception as exc:
                self.logger.warning('Failed to connect failure detector via provider: %s', exc)

        self.logger.info(f"Set known nodes: {node_ids}")

    def get_state_info(self) -> Dict[str, Any]:
        """
        Get current state information for debugging.

        Returns:
            Dictionary with current state information
        """
        return self._raft_node.get_state_info()

    def get_consensus_variables(self) -> Dict[str, Type]:
        """
        Get all configured consensus variables.

        Returns:
            Dictionary mapping variable names to their types
        """
        return self.config.get_consensus_variables()

    def has_consensus_variable(self, variable_name: str) -> bool:
        """
        Check if a consensus variable is configured.

        Args:
            variable_name: Name of the consensus variable

        Returns:
            True if the variable is configured, False otherwise
        """
        return self.config.has_consensus_variable(variable_name)

    def get_consensus_variable_type(self, variable_name: str) -> Optional[Type]:
        """
        Get the type of a consensus variable.

        Args:
            variable_name: Name of the consensus variable

        Returns:
            Type of the variable, or None if not found
        """
        return self.config.get_consensus_variable_type(variable_name)

    def get_configuration(self) -> Dict[str, Any]:
        """
        Get the current configuration.

        Returns:
            Dictionary representation of the configuration
        """
        return self.config.to_dict()

    def is_ready(self) -> bool:
        """
        Check if the consensus system is ready.

        Returns:
            True if the system is ready, False otherwise
        """
        return hasattr(self._raft_node, '_known_nodes') and self._raft_node._known_nodes is not None

    def get_statistics(self) -> Dict[str, Any]:
        """
        Get consensus statistics.

        Returns:
            Dictionary with consensus statistics
        """
        state_info = self.get_state_info()
        return {
            "node_id": self.node_id,
            "current_term": state_info["current_term"],
            "current_state": state_info["state"],
            "is_leader": self.is_leader(),
            "leader_id": self.get_leader_id(),
            "consensus_variables": list(self.get_consensus_variables().keys()),
            "committed_values_count": len(state_info["committed_values"]),
            "active_timers_count": len(state_info["active_timers"])
        }

    def get_simulation_active_nodes(self) -> Set[int]:
        """
        Get the set of nodes that are active in simulation.
        This is based on manual control (active/inactive state).

        Returns:
            Set of simulation active node IDs
        """
        return self._raft_node.get_simulation_active_nodes()

    def get_communication_failed_nodes(self) -> Set[int]:
        """
        Get the set of nodes that have communication failures.
        This is based on heartbeat detection.

        Returns:
            Set of communication failed node IDs
        """
        return self._raft_node.get_communication_failed_nodes()

    def get_communication_active_nodes(self) -> Set[int]:
        """
        Get the set of nodes that have active communication.
        This is based on heartbeat detection.

        Returns:
            Set of communication active node IDs
        """
        return self._raft_node.get_communication_active_nodes()

    def get_failed_nodes(self) -> Set[int]:
        """
        Get the set of currently failed nodes.
        DEPRECATED: Use get_communication_failed_nodes() instead.

        Returns:
            Set of failed node IDs, empty if failure detection is disabled
        """
        return self._raft_node.get_failed_nodes()

    def get_active_nodes(self) -> Set[int]:
        """
        Get the set of currently active nodes.
        DEPRECATED: Use get_simulation_active_nodes() or get_communication_active_nodes() instead.

        Returns:
            Set of active node IDs, empty if failure detection is disabled
        """
        return self._raft_node.get_active_nodes()

    def get_active_nodes_info(self) -> Dict[str, Any]:
        """
        Get detailed information about active nodes from this node's perspective.
        This method provides comprehensive information about cluster state and active nodes.

        Works in both CLASSIC and FAULT_TOLERANT modes with appropriate behavior for each:

        **CLASSIC mode:**

        - All known nodes are considered active (no failure detection)

        - Returns information based on the complete known node list

        **FAULT_TOLERANT mode:**

        - Uses actual failure detection to determine active nodes

        - Information accuracy differs by node role:

          * Leader: Complete and accurate active nodes information from heartbeat detection

          * Follower/Candidate: Active count from leader + limited local node knowledge

        Can be called on any node (leader, candidate, or follower) in any mode.
        Use this method to get detailed monitoring information about which nodes are active, 
        failed, and the current majority status from the perspective of the calling node.

        Returns:
            Dictionary containing:
            - 'active_nodes': List of active node IDs (sorted)
            - 'active_count': Number of active nodes
            - 'total_known': Total number of known nodes
            - 'majority_threshold': Current majority threshold
            - 'has_majority': Whether cluster has majority
            - 'detection_method': How active nodes were determined
                * 'classic_mode_all_active': All nodes active (Classic mode)
                * 'leader_heartbeat_detection': Leader using heartbeat detector
                * 'leader_shared_complete_list': Follower using complete list from leader
                * 'leader_shared_count_only': Follower using count from leader (limited IDs)
                * 'follower_local_detection': Follower using local detection (fallback)
            - 'last_update': Timestamp of last update (if available)
            - 'node_role': Role of this node ('leader', 'candidate', 'follower')
            - 'is_leader': Whether this node is the leader
            - 'leader_id': ID of the current leader (if known)
            - 'current_node_id': ID of this node
            - 'current_term': Current Raft term
            - 'raft_mode': Current Raft operation mode ('classic' or 'fault_tolerant')
            - 'failed_nodes': List of failed node IDs (empty in Classic mode)
            - 'failed_count': Number of failed nodes (0 in Classic mode)
            - 'detection_summary': Detailed detection info (if available)

        Example:
            ```python
            # Works in both modes
            active_info = consensus.get_active_nodes_info()
            print(f"Node {active_info['current_node_id']} role: {active_info['node_role']}")
            print(f"Active nodes: {active_info['active_nodes']}")
            print(f"Failed nodes: {active_info['failed_nodes']}")
            print(f"Has majority: {active_info['has_majority']}")
            print(f"Detection method: {active_info['detection_method']}")
            print(f"Raft mode: {active_info['raft_mode']}")

            if active_info['is_leader']:
                print("This node is the leader")
            elif active_info['leader_id']:
                print(f"Leader is node {active_info['leader_id']}")
            else:
                print("No current leader")
            ```
        """
        return self._raft_node.get_active_nodes_info()

    def has_quorum(self) -> bool:
        """
        Check if the system has enough active nodes to form a quorum.

        Returns:
            True if there are enough active nodes to operate, False otherwise
        """
        return self._raft_node.has_quorum()

    def has_majority_votes(self) -> bool:
        """
        Check if this node has received majority of votes in current election.

        Returns:
            True if majority of active nodes have voted for this node, False otherwise
        """
        return self._raft_node.has_majority_votes()

    def has_majority_confirmation(self) -> bool:
        """
        Check if majority of active nodes have confirmed current values.

        Returns:
            True if majority of active nodes have confirmed, False otherwise
        """
        return self._raft_node.has_majority_confirmation()

    def get_majority_info(self) -> Dict[str, Any]:
        """
        Get detailed information about majority status.

        Returns:
            Dictionary with majority information including active nodes, 
            majority threshold, and current status
        """
        return self._raft_node.get_majority_info()

    def is_node_failed(self, node_id: int) -> bool:
        """
        Check if a specific node is currently failed.

        Args:
            node_id: ID of the node to check

        Returns:
            True if the node is failed, False otherwise
        """
        return self._raft_node.is_node_failed(node_id)

    def is_simulation_active(self, node_id: int) -> bool:
        """
        Check if a specific node is currently active in simulation.
        This is the manual control state (active/inactive).

        Args:
            node_id: ID of the node to check

        Returns:
            True if the node is active in simulation, False otherwise
        """
        return self._raft_node.is_simulation_active(node_id)

    def is_communication_failed(self, node_id: int) -> bool:
        """
        Check if a specific node has communication failure.
        This is based on heartbeat detection.

        Args:
            node_id: ID of the node to check

        Returns:
            True if the node has communication failure, False otherwise
        """
        return self._raft_node.is_communication_failed(node_id)

    def get_is_active(self, node_id: int) -> bool:
        """
        Check if a specific node is currently active.
        DEPRECATED: Use is_simulation_active() or is_communication_failed() instead.

        Args:
            node_id: ID of the node to check

        Returns:
            True if the node is active, False otherwise
        """
        return self._raft_node.get_is_active(node_id)

    def set_simulation_active(self, node_id: int, active: bool) -> None:
        """
        Set this node's simulation active/inactive state.
        Only affects this node if node_id matches this node's ID.

        Args:
            node_id: ID of the node to set state
            active: True to make node active in simulation, False to make it inactive
        """
        self._raft_node.set_simulation_active(node_id, active)

    def set_is_active(self, node_id: int, active: bool) -> None:
        """
        Set this node's active/inactive state.
        DEPRECATED: Use set_simulation_active() instead.

        Args:
            node_id: ID of the node to set state
            active: True to make node active, False to make it inactive
        """
        self._raft_node.set_is_active(node_id, active)

    def get_failure_detection_metrics(self) -> Dict[str, Any]:
        """
        Get detailed metrics about failure detection performance.

        Returns:
            Dictionary with detailed failure detection metrics, or empty dict if not available
        """
        return self._raft_node.get_failure_detection_metrics()

    def set_cluster_id(self, cluster_id: Optional[int]) -> None:
        """
        Set the cluster ID for this node.

        Args:
            cluster_id: Cluster ID to assign to this node, or None to clear
        """
        self._raft_node.set_cluster_id(cluster_id)

    def get_cluster_id(self) -> Optional[int]:
        """
        Get the cluster ID for this node.

        Returns:
            Cluster ID, or None if not set
        """
        return self._raft_node.get_cluster_id()

    def is_in_same_cluster(self, other_node_id: int) -> bool:
        """
        Check if another node is in the same cluster.

        Args:
            other_node_id: ID of the other node to check

        Returns:
            True if both nodes are in the same cluster, False otherwise
        """
        return self._raft_node.is_in_same_cluster(other_node_id)

__init__(config, protocol)

Initialize Raft consensus plugin with Gradysim protocol provider.

Source code in gradysim/protocol/plugin/raft/raft_consensus.py
def __init__(self, config: RaftConfig, protocol: IProtocol):
    """Initialize Raft consensus plugin with Gradysim protocol provider."""
    errors = config.validate()
    if errors:
        raise ValueError(f"Invalid configuration: {'; '.join(errors)}")

    self.config = config
    self._protocol = protocol
    self._provider = protocol.provider
    self._dispatch_header = f"{RAFT_DISPATCH_PREFIX}:"

    self._validate_provider()

    self._get_node_id_callback: Optional[Callable[[], int]] = getattr(self._provider, "get_id", None)
    if self._get_node_id_callback is None:
        raise ValueError("Protocol provider must implement get_id() for RaftConsensusPlugin")

    try:
        self.node_id = self._get_node_id_callback()
    except Exception as exc:
        raise ValueError(f"Failed to get node ID from provider: {exc}") from exc

    self.logger = logging.getLogger(f"RaftConsensus-{self.node_id}")
    if config._enable_logging:
        self.logger.setLevel(getattr(logging, config._log_level))

    callbacks = self._build_callbacks()

    self._raft_node = RaftNode(
        node_id=self.node_id,
        config=config,
        callbacks=callbacks
    )

    self._dispatcher = create_dispatcher(protocol)

    self.configure_handle_message()
    self.configure_handle_timer()

    self.logger.info(f"RaftConsensus initialized for node {self.node_id}")

configure_handle_message()

Intercept RAFT packets and route them to the internal node.

Source code in gradysim/protocol/plugin/raft/raft_consensus.py
def configure_handle_message(self) -> None:
    """Intercept RAFT packets and route them to the internal node."""
    def handle_message(_instance: IProtocol, message_str: str) -> DispatchReturn:
        payload = self._strip_dispatch_prefix(message_str)
        if payload is None:
            return DispatchReturn.CONTINUE

        import json

        sender_id = 0
        try:
            data = json.loads(payload)
            sender_id = data.get("sender_id", 0)
        except (json.JSONDecodeError, TypeError, AttributeError):
            pass

        self._raft_node.handle_message(payload, sender_id)
        return DispatchReturn.INTERRUPT

    self._dispatcher.register_handle_packet(handle_message)

configure_handle_timer()

Intercept RAFT timers created by the plugin.

Source code in gradysim/protocol/plugin/raft/raft_consensus.py
def configure_handle_timer(self) -> None:
    """Intercept RAFT timers created by the plugin."""
    def handle_timer(_instance: IProtocol, timer_name: str) -> DispatchReturn:
        payload = self._strip_dispatch_prefix(timer_name)
        if payload is None:
            return DispatchReturn.CONTINUE

        self._raft_node.handle_timer(payload)
        return DispatchReturn.INTERRUPT
    self._dispatcher.register_handle_timer(handle_timer)

get_active_nodes()

Get the set of currently active nodes. DEPRECATED: Use get_simulation_active_nodes() or get_communication_active_nodes() instead.

Returns:

Type Description
Set[int]

Set of active node IDs, empty if failure detection is disabled

Source code in gradysim/protocol/plugin/raft/raft_consensus.py
def get_active_nodes(self) -> Set[int]:
    """
    Get the set of currently active nodes.
    DEPRECATED: Use get_simulation_active_nodes() or get_communication_active_nodes() instead.

    Returns:
        Set of active node IDs, empty if failure detection is disabled
    """
    return self._raft_node.get_active_nodes()

get_active_nodes_info()

Get detailed information about active nodes from this node's perspective. This method provides comprehensive information about cluster state and active nodes.

Works in both CLASSIC and FAULT_TOLERANT modes with appropriate behavior for each:

CLASSIC mode:

  • All known nodes are considered active (no failure detection)

  • Returns information based on the complete known node list

FAULT_TOLERANT mode:

  • Uses actual failure detection to determine active nodes

  • Information accuracy differs by node role:

  • Leader: Complete and accurate active nodes information from heartbeat detection

  • Follower/Candidate: Active count from leader + limited local node knowledge

Can be called on any node (leader, candidate, or follower) in any mode. Use this method to get detailed monitoring information about which nodes are active, failed, and the current majority status from the perspective of the calling node.

Returns:

Type Description
Dict[str, Any]

Dictionary containing:

Dict[str, Any]
  • 'active_nodes': List of active node IDs (sorted)
Dict[str, Any]
  • 'active_count': Number of active nodes
Dict[str, Any]
  • 'total_known': Total number of known nodes
Dict[str, Any]
  • 'majority_threshold': Current majority threshold
Dict[str, Any]
  • 'has_majority': Whether cluster has majority
Dict[str, Any]
  • 'detection_method': How active nodes were determined
  • 'classic_mode_all_active': All nodes active (Classic mode)
  • 'leader_heartbeat_detection': Leader using heartbeat detector
  • 'leader_shared_complete_list': Follower using complete list from leader
  • 'leader_shared_count_only': Follower using count from leader (limited IDs)
  • 'follower_local_detection': Follower using local detection (fallback)
Dict[str, Any]
  • 'last_update': Timestamp of last update (if available)
Dict[str, Any]
  • 'node_role': Role of this node ('leader', 'candidate', 'follower')
Dict[str, Any]
  • 'is_leader': Whether this node is the leader
Dict[str, Any]
  • 'leader_id': ID of the current leader (if known)
Dict[str, Any]
  • 'current_node_id': ID of this node
Dict[str, Any]
  • 'current_term': Current Raft term
Dict[str, Any]
  • 'raft_mode': Current Raft operation mode ('classic' or 'fault_tolerant')
Dict[str, Any]
  • 'failed_nodes': List of failed node IDs (empty in Classic mode)
Dict[str, Any]
  • 'failed_count': Number of failed nodes (0 in Classic mode)
Dict[str, Any]
  • 'detection_summary': Detailed detection info (if available)
Example
# Works in both modes
active_info = consensus.get_active_nodes_info()
print(f"Node {active_info['current_node_id']} role: {active_info['node_role']}")
print(f"Active nodes: {active_info['active_nodes']}")
print(f"Failed nodes: {active_info['failed_nodes']}")
print(f"Has majority: {active_info['has_majority']}")
print(f"Detection method: {active_info['detection_method']}")
print(f"Raft mode: {active_info['raft_mode']}")

if active_info['is_leader']:
    print("This node is the leader")
elif active_info['leader_id']:
    print(f"Leader is node {active_info['leader_id']}")
else:
    print("No current leader")
Source code in gradysim/protocol/plugin/raft/raft_consensus.py
def get_active_nodes_info(self) -> Dict[str, Any]:
    """
    Get detailed information about active nodes from this node's perspective.
    This method provides comprehensive information about cluster state and active nodes.

    Works in both CLASSIC and FAULT_TOLERANT modes with appropriate behavior for each:

    **CLASSIC mode:**

    - All known nodes are considered active (no failure detection)

    - Returns information based on the complete known node list

    **FAULT_TOLERANT mode:**

    - Uses actual failure detection to determine active nodes

    - Information accuracy differs by node role:

      * Leader: Complete and accurate active nodes information from heartbeat detection

      * Follower/Candidate: Active count from leader + limited local node knowledge

    Can be called on any node (leader, candidate, or follower) in any mode.
    Use this method to get detailed monitoring information about which nodes are active, 
    failed, and the current majority status from the perspective of the calling node.

    Returns:
        Dictionary containing:
        - 'active_nodes': List of active node IDs (sorted)
        - 'active_count': Number of active nodes
        - 'total_known': Total number of known nodes
        - 'majority_threshold': Current majority threshold
        - 'has_majority': Whether cluster has majority
        - 'detection_method': How active nodes were determined
            * 'classic_mode_all_active': All nodes active (Classic mode)
            * 'leader_heartbeat_detection': Leader using heartbeat detector
            * 'leader_shared_complete_list': Follower using complete list from leader
            * 'leader_shared_count_only': Follower using count from leader (limited IDs)
            * 'follower_local_detection': Follower using local detection (fallback)
        - 'last_update': Timestamp of last update (if available)
        - 'node_role': Role of this node ('leader', 'candidate', 'follower')
        - 'is_leader': Whether this node is the leader
        - 'leader_id': ID of the current leader (if known)
        - 'current_node_id': ID of this node
        - 'current_term': Current Raft term
        - 'raft_mode': Current Raft operation mode ('classic' or 'fault_tolerant')
        - 'failed_nodes': List of failed node IDs (empty in Classic mode)
        - 'failed_count': Number of failed nodes (0 in Classic mode)
        - 'detection_summary': Detailed detection info (if available)

    Example:
        ```python
        # Works in both modes
        active_info = consensus.get_active_nodes_info()
        print(f"Node {active_info['current_node_id']} role: {active_info['node_role']}")
        print(f"Active nodes: {active_info['active_nodes']}")
        print(f"Failed nodes: {active_info['failed_nodes']}")
        print(f"Has majority: {active_info['has_majority']}")
        print(f"Detection method: {active_info['detection_method']}")
        print(f"Raft mode: {active_info['raft_mode']}")

        if active_info['is_leader']:
            print("This node is the leader")
        elif active_info['leader_id']:
            print(f"Leader is node {active_info['leader_id']}")
        else:
            print("No current leader")
        ```
    """
    return self._raft_node.get_active_nodes_info()

get_all_committed_values()

Get all committed consensus values.

Returns:

Type Description
Dict[str, Any]

Dictionary of all committed values

Source code in gradysim/protocol/plugin/raft/raft_consensus.py
def get_all_committed_values(self) -> Dict[str, Any]:
    """
    Get all committed consensus values.

    Returns:
        Dictionary of all committed values
    """
    return self._raft_node.get_all_committed_values()

get_cluster_id()

Get the cluster ID for this node.

Returns:

Type Description
Optional[int]

Cluster ID, or None if not set

Source code in gradysim/protocol/plugin/raft/raft_consensus.py
def get_cluster_id(self) -> Optional[int]:
    """
    Get the cluster ID for this node.

    Returns:
        Cluster ID, or None if not set
    """
    return self._raft_node.get_cluster_id()

get_committed_value(variable_name)

Get the committed value for a consensus variable.

Parameters:

Name Type Description Default
variable_name str

Name of the consensus variable

required

Returns:

Type Description
Optional[Any]

Committed value, or None if not available

Raises:

Type Description
ValueError

If variable is not configured

Source code in gradysim/protocol/plugin/raft/raft_consensus.py
def get_committed_value(self, variable_name: str) -> Optional[Any]:
    """
    Get the committed value for a consensus variable.

    Args:
        variable_name: Name of the consensus variable

    Returns:
        Committed value, or None if not available

    Raises:
        ValueError: If variable is not configured
    """
    if not self.config.has_consensus_variable(variable_name):
        raise ValueError(f"Consensus variable '{variable_name}' not configured")

    return self._raft_node.get_committed_value(variable_name)

get_communication_active_nodes()

Get the set of nodes that have active communication. This is based on heartbeat detection.

Returns:

Type Description
Set[int]

Set of communication active node IDs

Source code in gradysim/protocol/plugin/raft/raft_consensus.py
def get_communication_active_nodes(self) -> Set[int]:
    """
    Get the set of nodes that have active communication.
    This is based on heartbeat detection.

    Returns:
        Set of communication active node IDs
    """
    return self._raft_node.get_communication_active_nodes()

get_communication_failed_nodes()

Get the set of nodes that have communication failures. This is based on heartbeat detection.

Returns:

Type Description
Set[int]

Set of communication failed node IDs

Source code in gradysim/protocol/plugin/raft/raft_consensus.py
def get_communication_failed_nodes(self) -> Set[int]:
    """
    Get the set of nodes that have communication failures.
    This is based on heartbeat detection.

    Returns:
        Set of communication failed node IDs
    """
    return self._raft_node.get_communication_failed_nodes()

get_configuration()

Get the current configuration.

Returns:

Type Description
Dict[str, Any]

Dictionary representation of the configuration

Source code in gradysim/protocol/plugin/raft/raft_consensus.py
def get_configuration(self) -> Dict[str, Any]:
    """
    Get the current configuration.

    Returns:
        Dictionary representation of the configuration
    """
    return self.config.to_dict()

get_consensus_variable_type(variable_name)

Get the type of a consensus variable.

Parameters:

Name Type Description Default
variable_name str

Name of the consensus variable

required

Returns:

Type Description
Optional[Type]

Type of the variable, or None if not found

Source code in gradysim/protocol/plugin/raft/raft_consensus.py
def get_consensus_variable_type(self, variable_name: str) -> Optional[Type]:
    """
    Get the type of a consensus variable.

    Args:
        variable_name: Name of the consensus variable

    Returns:
        Type of the variable, or None if not found
    """
    return self.config.get_consensus_variable_type(variable_name)

get_consensus_variables()

Get all configured consensus variables.

Returns:

Type Description
Dict[str, Type]

Dictionary mapping variable names to their types

Source code in gradysim/protocol/plugin/raft/raft_consensus.py
def get_consensus_variables(self) -> Dict[str, Type]:
    """
    Get all configured consensus variables.

    Returns:
        Dictionary mapping variable names to their types
    """
    return self.config.get_consensus_variables()

get_current_state()

Get the current state of this node.

Returns:

Type Description
RaftState

Current Raft state (FOLLOWER, CANDIDATE, or LEADER)

Source code in gradysim/protocol/plugin/raft/raft_consensus.py
def get_current_state(self) -> RaftState:
    """
    Get the current state of this node.

    Returns:
        Current Raft state (FOLLOWER, CANDIDATE, or LEADER)
    """
    return self._raft_node.state

get_current_term()

Get the current term.

Returns:

Type Description
int

Current term number

Source code in gradysim/protocol/plugin/raft/raft_consensus.py
def get_current_term(self) -> int:
    """
    Get the current term.

    Returns:
        Current term number
    """
    return self._raft_node.get_current_term()

get_failed_nodes()

Get the set of currently failed nodes. DEPRECATED: Use get_communication_failed_nodes() instead.

Returns:

Type Description
Set[int]

Set of failed node IDs, empty if failure detection is disabled

Source code in gradysim/protocol/plugin/raft/raft_consensus.py
def get_failed_nodes(self) -> Set[int]:
    """
    Get the set of currently failed nodes.
    DEPRECATED: Use get_communication_failed_nodes() instead.

    Returns:
        Set of failed node IDs, empty if failure detection is disabled
    """
    return self._raft_node.get_failed_nodes()

get_failure_detection_metrics()

Get detailed metrics about failure detection performance.

Returns:

Type Description
Dict[str, Any]

Dictionary with detailed failure detection metrics, or empty dict if not available

Source code in gradysim/protocol/plugin/raft/raft_consensus.py
def get_failure_detection_metrics(self) -> Dict[str, Any]:
    """
    Get detailed metrics about failure detection performance.

    Returns:
        Dictionary with detailed failure detection metrics, or empty dict if not available
    """
    return self._raft_node.get_failure_detection_metrics()

get_is_active(node_id)

Check if a specific node is currently active. DEPRECATED: Use is_simulation_active() or is_communication_failed() instead.

Parameters:

Name Type Description Default
node_id int

ID of the node to check

required

Returns:

Type Description
bool

True if the node is active, False otherwise

Source code in gradysim/protocol/plugin/raft/raft_consensus.py
def get_is_active(self, node_id: int) -> bool:
    """
    Check if a specific node is currently active.
    DEPRECATED: Use is_simulation_active() or is_communication_failed() instead.

    Args:
        node_id: ID of the node to check

    Returns:
        True if the node is active, False otherwise
    """
    return self._raft_node.get_is_active(node_id)

get_leader_id()

Get the current leader ID.

Returns:

Type Description
Optional[int]

ID of the current leader, or None if no leader is known

Source code in gradysim/protocol/plugin/raft/raft_consensus.py
def get_leader_id(self) -> Optional[int]:
    """
    Get the current leader ID.

    Returns:
        ID of the current leader, or None if no leader is known
    """
    return self._raft_node.get_leader_id()

get_majority_info()

Get detailed information about majority status.

Returns:

Type Description
Dict[str, Any]

Dictionary with majority information including active nodes,

Dict[str, Any]

majority threshold, and current status

Source code in gradysim/protocol/plugin/raft/raft_consensus.py
def get_majority_info(self) -> Dict[str, Any]:
    """
    Get detailed information about majority status.

    Returns:
        Dictionary with majority information including active nodes, 
        majority threshold, and current status
    """
    return self._raft_node.get_majority_info()

get_node_id()

Get the current node ID.

If a get_node_id_callback was provided during initialization, this method will call it to get the current node ID dynamically. Otherwise, it returns the static node_id.

Returns:

Type Description
int

Current node ID

Source code in gradysim/protocol/plugin/raft/raft_consensus.py
def get_node_id(self) -> int:
    """
    Get the current node ID.

    If a get_node_id_callback was provided during initialization,
    this method will call it to get the current node ID dynamically.
    Otherwise, it returns the static node_id.

    Returns:
        Current node ID
    """
    if self._get_node_id_callback is not None:
        try:
            return self._get_node_id_callback()
        except Exception as e:
            self.logger.error(f"Error getting node ID from callback: {e}")
            return self.node_id  # Fallback to stored node_id
    return self.node_id

get_simulation_active_nodes()

Get the set of nodes that are active in simulation. This is based on manual control (active/inactive state).

Returns:

Type Description
Set[int]

Set of simulation active node IDs

Source code in gradysim/protocol/plugin/raft/raft_consensus.py
def get_simulation_active_nodes(self) -> Set[int]:
    """
    Get the set of nodes that are active in simulation.
    This is based on manual control (active/inactive state).

    Returns:
        Set of simulation active node IDs
    """
    return self._raft_node.get_simulation_active_nodes()

get_state_info()

Get current state information for debugging.

Returns:

Type Description
Dict[str, Any]

Dictionary with current state information

Source code in gradysim/protocol/plugin/raft/raft_consensus.py
def get_state_info(self) -> Dict[str, Any]:
    """
    Get current state information for debugging.

    Returns:
        Dictionary with current state information
    """
    return self._raft_node.get_state_info()

get_statistics()

Get consensus statistics.

Returns:

Type Description
Dict[str, Any]

Dictionary with consensus statistics

Source code in gradysim/protocol/plugin/raft/raft_consensus.py
def get_statistics(self) -> Dict[str, Any]:
    """
    Get consensus statistics.

    Returns:
        Dictionary with consensus statistics
    """
    state_info = self.get_state_info()
    return {
        "node_id": self.node_id,
        "current_term": state_info["current_term"],
        "current_state": state_info["state"],
        "is_leader": self.is_leader(),
        "leader_id": self.get_leader_id(),
        "consensus_variables": list(self.get_consensus_variables().keys()),
        "committed_values_count": len(state_info["committed_values"]),
        "active_timers_count": len(state_info["active_timers"])
    }

has_consensus_variable(variable_name)

Check if a consensus variable is configured.

Parameters:

Name Type Description Default
variable_name str

Name of the consensus variable

required

Returns:

Type Description
bool

True if the variable is configured, False otherwise

Source code in gradysim/protocol/plugin/raft/raft_consensus.py
def has_consensus_variable(self, variable_name: str) -> bool:
    """
    Check if a consensus variable is configured.

    Args:
        variable_name: Name of the consensus variable

    Returns:
        True if the variable is configured, False otherwise
    """
    return self.config.has_consensus_variable(variable_name)

has_majority_confirmation()

Check if majority of active nodes have confirmed current values.

Returns:

Type Description
bool

True if majority of active nodes have confirmed, False otherwise

Source code in gradysim/protocol/plugin/raft/raft_consensus.py
def has_majority_confirmation(self) -> bool:
    """
    Check if majority of active nodes have confirmed current values.

    Returns:
        True if majority of active nodes have confirmed, False otherwise
    """
    return self._raft_node.has_majority_confirmation()

has_majority_votes()

Check if this node has received majority of votes in current election.

Returns:

Type Description
bool

True if majority of active nodes have voted for this node, False otherwise

Source code in gradysim/protocol/plugin/raft/raft_consensus.py
def has_majority_votes(self) -> bool:
    """
    Check if this node has received majority of votes in current election.

    Returns:
        True if majority of active nodes have voted for this node, False otherwise
    """
    return self._raft_node.has_majority_votes()

has_quorum()

Check if the system has enough active nodes to form a quorum.

Returns:

Type Description
bool

True if there are enough active nodes to operate, False otherwise

Source code in gradysim/protocol/plugin/raft/raft_consensus.py
def has_quorum(self) -> bool:
    """
    Check if the system has enough active nodes to form a quorum.

    Returns:
        True if there are enough active nodes to operate, False otherwise
    """
    return self._raft_node.has_quorum()

is_communication_failed(node_id)

Check if a specific node has communication failure. This is based on heartbeat detection.

Parameters:

Name Type Description Default
node_id int

ID of the node to check

required

Returns:

Type Description
bool

True if the node has communication failure, False otherwise

Source code in gradysim/protocol/plugin/raft/raft_consensus.py
def is_communication_failed(self, node_id: int) -> bool:
    """
    Check if a specific node has communication failure.
    This is based on heartbeat detection.

    Args:
        node_id: ID of the node to check

    Returns:
        True if the node has communication failure, False otherwise
    """
    return self._raft_node.is_communication_failed(node_id)

is_in_same_cluster(other_node_id)

Check if another node is in the same cluster.

Parameters:

Name Type Description Default
other_node_id int

ID of the other node to check

required

Returns:

Type Description
bool

True if both nodes are in the same cluster, False otherwise

Source code in gradysim/protocol/plugin/raft/raft_consensus.py
def is_in_same_cluster(self, other_node_id: int) -> bool:
    """
    Check if another node is in the same cluster.

    Args:
        other_node_id: ID of the other node to check

    Returns:
        True if both nodes are in the same cluster, False otherwise
    """
    return self._raft_node.is_in_same_cluster(other_node_id)

is_leader()

Check if this node is the current leader.

Returns:

Type Description
bool

True if this node is the leader, False otherwise

Source code in gradysim/protocol/plugin/raft/raft_consensus.py
def is_leader(self) -> bool:
    """
    Check if this node is the current leader.

    Returns:
        True if this node is the leader, False otherwise
    """
    return self._raft_node.is_leader()

is_node_failed(node_id)

Check if a specific node is currently failed.

Parameters:

Name Type Description Default
node_id int

ID of the node to check

required

Returns:

Type Description
bool

True if the node is failed, False otherwise

Source code in gradysim/protocol/plugin/raft/raft_consensus.py
def is_node_failed(self, node_id: int) -> bool:
    """
    Check if a specific node is currently failed.

    Args:
        node_id: ID of the node to check

    Returns:
        True if the node is failed, False otherwise
    """
    return self._raft_node.is_node_failed(node_id)

is_ready()

Check if the consensus system is ready.

Returns:

Type Description
bool

True if the system is ready, False otherwise

Source code in gradysim/protocol/plugin/raft/raft_consensus.py
def is_ready(self) -> bool:
    """
    Check if the consensus system is ready.

    Returns:
        True if the system is ready, False otherwise
    """
    return hasattr(self._raft_node, '_known_nodes') and self._raft_node._known_nodes is not None

is_simulation_active(node_id)

Check if a specific node is currently active in simulation. This is the manual control state (active/inactive).

Parameters:

Name Type Description Default
node_id int

ID of the node to check

required

Returns:

Type Description
bool

True if the node is active in simulation, False otherwise

Source code in gradysim/protocol/plugin/raft/raft_consensus.py
def is_simulation_active(self, node_id: int) -> bool:
    """
    Check if a specific node is currently active in simulation.
    This is the manual control state (active/inactive).

    Args:
        node_id: ID of the node to check

    Returns:
        True if the node is active in simulation, False otherwise
    """
    return self._raft_node.is_simulation_active(node_id)

propose_value(variable_name, value)

Propose a new value for consensus.

This method can only be called by the current leader. If this node is not the leader, the proposal will be rejected.

Parameters:

Name Type Description Default
variable_name str

Name of the consensus variable

required
value Any

Value to propose

required

Returns:

Type Description
bool

True if proposal was accepted, False otherwise

Raises:

Type Description
ValueError

If variable is not configured or value type is invalid

Source code in gradysim/protocol/plugin/raft/raft_consensus.py
def propose_value(self, variable_name: str, value: Any) -> bool:
    """
    Propose a new value for consensus.

    This method can only be called by the current leader. If this node
    is not the leader, the proposal will be rejected.

    Args:
        variable_name: Name of the consensus variable
        value: Value to propose

    Returns:
        True if proposal was accepted, False otherwise

    Raises:
        ValueError: If variable is not configured or value type is invalid
    """
    return self._raft_node.propose_value(variable_name, value)

send_broadcast(message)

Send broadcast message to all nodes.

This method sends a message to all nodes in the cluster using broadcast communication if available.

Parameters:

Name Type Description Default
message str

Message content to broadcast

required
Source code in gradysim/protocol/plugin/raft/raft_consensus.py
def send_broadcast(self, message: str) -> None:
    """
    Send broadcast message to all nodes.

    This method sends a message to all nodes in the cluster using
    broadcast communication if available.

    Args:
        message: Message content to broadcast
    """
    if hasattr(self._raft_node, '_send_broadcast') and self._raft_node._send_broadcast is not None:
        self._raft_node._send_broadcast(message)
    else:
        self.logger.warning("Broadcast not available, falling back to individual messages")
        # Fallback to individual messages if broadcast not available
        for node_id in getattr(self._raft_node, '_known_nodes', []):
            if node_id != self.node_id:
                self._raft_node._send_message(message, node_id)

set_cluster_id(cluster_id)

Set the cluster ID for this node.

Parameters:

Name Type Description Default
cluster_id Optional[int]

Cluster ID to assign to this node, or None to clear

required
Source code in gradysim/protocol/plugin/raft/raft_consensus.py
def set_cluster_id(self, cluster_id: Optional[int]) -> None:
    """
    Set the cluster ID for this node.

    Args:
        cluster_id: Cluster ID to assign to this node, or None to clear
    """
    self._raft_node.set_cluster_id(cluster_id)

set_is_active(node_id, active)

Set this node's active/inactive state. DEPRECATED: Use set_simulation_active() instead.

Parameters:

Name Type Description Default
node_id int

ID of the node to set state

required
active bool

True to make node active, False to make it inactive

required
Source code in gradysim/protocol/plugin/raft/raft_consensus.py
def set_is_active(self, node_id: int, active: bool) -> None:
    """
    Set this node's active/inactive state.
    DEPRECATED: Use set_simulation_active() instead.

    Args:
        node_id: ID of the node to set state
        active: True to make node active, False to make it inactive
    """
    self._raft_node.set_is_active(node_id, active)

set_known_nodes(node_ids)

Set the list of known node IDs.

This method should be called to inform the consensus system about all nodes in the cluster. This information is used for sending messages during elections and heartbeats.

Parameters:

Name Type Description Default
node_ids List[int]

List of all node IDs in the cluster

required
Source code in gradysim/protocol/plugin/raft/raft_consensus.py
def set_known_nodes(self, node_ids: List[int]) -> None:
    """
    Set the list of known node IDs.

    This method should be called to inform the consensus system about
    all nodes in the cluster. This information is used for sending
    messages during elections and heartbeats.

    Args:
        node_ids: List of all node IDs in the cluster
    """
    self._raft_node.set_known_nodes(node_ids)

    detector = getattr(self._raft_node, '_heartbeat_detector', None)
    if detector and hasattr(self._provider, 'set_failure_detector'):
        try:
            self._provider.set_failure_detector(detector)
            self.logger.info('Connected failure detector to provider for connectivity-based failure detection')
        except Exception as exc:
            self.logger.warning('Failed to connect failure detector via provider: %s', exc)

    self.logger.info(f"Set known nodes: {node_ids}")

set_simulation_active(node_id, active)

Set this node's simulation active/inactive state. Only affects this node if node_id matches this node's ID.

Parameters:

Name Type Description Default
node_id int

ID of the node to set state

required
active bool

True to make node active in simulation, False to make it inactive

required
Source code in gradysim/protocol/plugin/raft/raft_consensus.py
def set_simulation_active(self, node_id: int, active: bool) -> None:
    """
    Set this node's simulation active/inactive state.
    Only affects this node if node_id matches this node's ID.

    Args:
        node_id: ID of the node to set state
        active: True to make node active in simulation, False to make it inactive
    """
    self._raft_node.set_simulation_active(node_id, active)

start()

Start the consensus process.

This method initializes the Raft node and begins the election timeout. The node will start as a follower and may become a candidate if no leader is discovered.

Source code in gradysim/protocol/plugin/raft/raft_consensus.py
def start(self) -> None:
    """
    Start the consensus process.

    This method initializes the Raft node and begins the election timeout.
    The node will start as a follower and may become a candidate if no
    leader is discovered.
    """
    self.logger.info(f"Starting Raft consensus for node {self.node_id}")
    self._raft_node.start()

stop()

Stop the consensus process.

This method stops the Raft node and cancels all active timers.

Source code in gradysim/protocol/plugin/raft/raft_consensus.py
def stop(self) -> None:
    """
    Stop the consensus process.

    This method stops the Raft node and cancels all active timers.
    """
    # Don't log here since RaftNode will log the stop
    self._raft_node.stop()

RaftMode

Bases: Enum

Enumeration for Raft operation modes.

CLASSIC: Classic Raft mode

  • uses fixed cluster size for all calculations

FAULT_TOLERANT: Fault-tolerant Raft mode

  • uses active node count for majority calculations
Source code in gradysim/protocol/plugin/raft/raft_config.py
class RaftMode(Enum):
    """
    Enumeration for Raft operation modes.

    CLASSIC: Classic Raft mode 

    - uses fixed cluster size for all calculations

    FAULT_TOLERANT: Fault-tolerant Raft mode 

    - uses active node count for majority calculations
    """
    CLASSIC = "classic"
    FAULT_TOLERANT = "fault_tolerant"

RaftState

Bases: Enum

Raft node states enumeration.

Each state represents a different role and behavior in the Raft consensus algorithm.

Source code in gradysim/protocol/plugin/raft/raft_state.py
class RaftState(Enum):
    """
    Raft node states enumeration.

    Each state represents a different role and behavior in the Raft consensus algorithm.
    """

    FOLLOWER = auto()
    """
    Follower state - Default state for all nodes.

    Responsibilities:

    - Respond to requests from leaders and candidates

    - Start election if election timeout elapses

    - Vote for candidates during elections

    - Accept log entries from leader
    """

    CANDIDATE = auto()
    """
    Candidate state - Used during leader election.

    Responsibilities:

    - Increment current term

    - Vote for self

    - Request votes from other nodes

    - Become leader if majority votes received

    - Return to follower if another leader discovered
    """

    LEADER = auto()
    """
    Leader state - Handles all client requests and log replication.

    Responsibilities:

    - Send periodic heartbeats to all followers

    - Handle client requests

    - Replicate log entries to followers

    - Commit entries when majority replicated

    - Step down if higher term discovered
    """

    def __str__(self) -> str:
        """Return string representation of the state."""
        return self.name

    def __repr__(self) -> str:
        """Return detailed string representation of the state."""
        return f"RaftState.{self.name}" 

CANDIDATE = auto() class-attribute instance-attribute

Candidate state - Used during leader election.

Responsibilities:

  • Increment current term

  • Vote for self

  • Request votes from other nodes

  • Become leader if majority votes received

  • Return to follower if another leader discovered

FOLLOWER = auto() class-attribute instance-attribute

Follower state - Default state for all nodes.

Responsibilities:

  • Respond to requests from leaders and candidates

  • Start election if election timeout elapses

  • Vote for candidates during elections

  • Accept log entries from leader

LEADER = auto() class-attribute instance-attribute

Leader state - Handles all client requests and log replication.

Responsibilities:

  • Send periodic heartbeats to all followers

  • Handle client requests

  • Replicate log entries to followers

  • Commit entries when majority replicated

  • Step down if higher term discovered

__repr__()

Return detailed string representation of the state.

Source code in gradysim/protocol/plugin/raft/raft_state.py
def __repr__(self) -> str:
    """Return detailed string representation of the state."""
    return f"RaftState.{self.name}" 

__str__()

Return string representation of the state.

Source code in gradysim/protocol/plugin/raft/raft_state.py
def __str__(self) -> str:
    """Return string representation of the state."""
    return self.name