PyModbus
PyModbusDocs

Error Handling Best Practices

Handle Modbus errors properly - exception codes, timeouts, retries, logging, recovery strategies.

Error Handling in PyModbus

Handle all the things that go wrong: timeouts, exceptions, connection failures, and invalid data.

Exception Types

from pymodbus.exceptions import (
    ModbusException,          # Base exception
    ModbusIOException,        # I/O errors
    ParameterException,       # Invalid parameters
    NoSuchSlaveException,     # Slave doesn't exist
    NotImplementedException, # Function not supported
    ConnectionException,      # Connection issues
    InvalidMessageReceivedException  # Corrupt message
)

try:
    result = client.read_holding_registers(0, 10)
except ModbusIOException as e:
    print(f"I/O error: {e}")
except ConnectionException as e:
    print(f"Connection error: {e}")
except ModbusException as e:
    print(f"Modbus error: {e}")
except Exception as e:
    print(f"Unexpected error: {e}")

Modbus Exception Codes

def decode_exception(result):
    """Decode Modbus exception response."""
    if not result.isError():
        return None
    
    exceptions = {
        0x01: "Illegal Function - Function code not supported",
        0x02: "Illegal Data Address - Address not allowed",
        0x03: "Illegal Data Value - Value out of range",
        0x04: "Slave Device Failure - Device error",
        0x05: "Acknowledge - Request accepted, processing",
        0x06: "Slave Device Busy - Try again later",
        0x08: "Memory Parity Error - Device memory error",
        0x0A: "Gateway Path Unavailable - Gateway error",
        0x0B: "Gateway Target Failed - Target not responding"
    }
    
    code = getattr(result, 'exception_code', None)
    if code:
        return exceptions.get(code, f"Unknown exception: {code}")
    return str(result)

# Use it
result = client.read_holding_registers(9999, 10)
error = decode_exception(result)
if error:
    print(f"Error: {error}")

Retry Logic

import time
from functools import wraps

def retry_on_error(max_retries=3, delay=1.0, backoff=2.0):
    """Decorator for automatic retry."""
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            retries = 0
            current_delay = delay
            
            while retries < max_retries:
                try:
                    result = func(*args, **kwargs)
                    
                    # Check if Modbus error
                    if hasattr(result, 'isError') and result.isError():
                        raise ModbusException(f"Modbus error: {result}")
                    
                    return result
                    
                except Exception as e:
                    retries += 1
                    if retries >= max_retries:
                        print(f"Failed after {max_retries} retries: {e}")
                        raise
                    
                    print(f"Retry {retries}/{max_retries} after {current_delay}s")
                    time.sleep(current_delay)
                    current_delay *= backoff
            
        return wrapper
    return decorator

# Use decorator
@retry_on_error(max_retries=3, delay=0.5)
def read_temperature(client):
    result = client.read_holding_registers(100, 1)
    return result.registers[0] / 10.0

# Will retry automatically
temp = read_temperature(client)

Connection Recovery

class ResilientModbusClient:
    """Client that recovers from connection failures."""
    
    def __init__(self, host, port=502):
        self.host = host
        self.port = port
        self.client = None
        self.connected = False
        self.reconnect_delay = 1.0
        self.max_reconnect_delay = 30.0
    
    def connect(self):
        """Connect with exponential backoff."""
        delay = self.reconnect_delay
        
        while not self.connected:
            try:
                if self.client:
                    self.client.close()
                
                self.client = ModbusTcpClient(self.host, self.port)
                self.connected = self.client.connect()
                
                if self.connected:
                    print(f"Connected to {self.host}")
                    self.reconnect_delay = 1.0  # Reset delay
                    return True
                
            except Exception as e:
                print(f"Connection failed: {e}")
            
            print(f"Retrying in {delay}s...")
            time.sleep(delay)
            delay = min(delay * 2, self.max_reconnect_delay)
        
        return False
    
    def read(self, address, count, slave=1):
        """Read with automatic reconnection."""
        if not self.connected:
            self.connect()
        
        try:
            result = self.client.read_holding_registers(address, count, slave)
            
            if result.isError():
                # Check if connection error
                if result.exception_code in [0x0A, 0x0B]:
                    self.connected = False
                    return self.read(address, count, slave)  # Retry
                
            return result
            
        except Exception as e:
            print(f"Read error: {e}")
            self.connected = False
            return self.read(address, count, slave)  # Retry

Timeout Handling

import signal
from contextlib import contextmanager

class TimeoutError(Exception):
    pass

@contextmanager
def timeout(seconds):
    """Context manager for operation timeout."""
    def timeout_handler(signum, frame):
        raise TimeoutError(f"Operation timed out after {seconds}s")
    
    # Set alarm
    signal.signal(signal.SIGALRM, timeout_handler)
    signal.alarm(seconds)
    
    try:
        yield
    finally:
        signal.alarm(0)  # Cancel alarm

# Use timeout
try:
    with timeout(5):
        # This must complete within 5 seconds
        result = client.read_holding_registers(0, 100)
        print(result.registers)
except TimeoutError as e:
    print(f"Timeout: {e}")

Logging Errors

import logging
from datetime import datetime

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler('modbus_errors.log'),
        logging.StreamHandler()
    ]
)

logger = logging.getLogger('modbus')

class LoggingModbusClient:
    """Client with comprehensive logging."""
    
    def __init__(self, host):
        self.host = host
        self.client = ModbusTcpClient(host)
        self.error_count = 0
        self.success_count = 0
    
    def read(self, address, count, slave=1):
        """Read with logging."""
        start_time = time.time()
        
        try:
            result = self.client.read_holding_registers(address, count, slave)
            elapsed = time.time() - start_time
            
            if result.isError():
                self.error_count += 1
                logger.error(
                    f"Read error: addr={address}, count={count}, "
                    f"slave={slave}, error={result}, time={elapsed:.3f}s"
                )
                return None
            
            self.success_count += 1
            logger.debug(
                f"Read success: addr={address}, count={count}, "
                f"slave={slave}, time={elapsed:.3f}s"
            )
            
            return result.registers
            
        except Exception as e:
            self.error_count += 1
            elapsed = time.time() - start_time
            logger.exception(
                f"Read exception: addr={address}, count={count}, "
                f"slave={slave}, error={e}, time={elapsed:.3f}s"
            )
            return None
    
    def get_stats(self):
        """Get error statistics."""
        total = self.success_count + self.error_count
        if total > 0:
            error_rate = (self.error_count / total) * 100
            logger.info(
                f"Stats: {self.success_count} success, "
                f"{self.error_count} errors ({error_rate:.1f}%)"
            )
        return self.success_count, self.error_count

Validation

def validate_register_value(value, min_val=None, max_val=None, name="Value"):
    """Validate register value."""
    if value is None:
        raise ValueError(f"{name} is None")
    
    if not isinstance(value, (int, float)):
        raise TypeError(f"{name} must be numeric, got {type(value)}")
    
    if min_val is not None and value < min_val:
        raise ValueError(f"{name} {value} below minimum {min_val}")
    
    if max_val is not None and value > max_val:
        raise ValueError(f"{name} {value} above maximum {max_val}")
    
    return True

def safe_read_temperature(client):
    """Read temperature with validation."""
    try:
        result = client.read_holding_registers(100, 1)
        
        if result.isError():
            raise ModbusException(f"Read error: {result}")
        
        raw_value = result.registers[0]
        temperature = raw_value / 10.0
        
        # Validate temperature range
        validate_register_value(
            temperature,
            min_val=-50,
            max_val=200,
            name="Temperature"
        )
        
        return temperature
        
    except (ModbusException, ValueError) as e:
        logger.error(f"Temperature read failed: {e}")
        return None

Circuit Breaker Pattern

class CircuitBreaker:
    """Circuit breaker for failing connections."""
    
    def __init__(self, failure_threshold=5, recovery_timeout=60):
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.failure_count = 0
        self.last_failure = None
        self.state = 'CLOSED'  # CLOSED, OPEN, HALF_OPEN
    
    def call(self, func, *args, **kwargs):
        """Call function with circuit breaker."""
        if self.state == 'OPEN':
            if time.time() - self.last_failure > self.recovery_timeout:
                self.state = 'HALF_OPEN'
                print("Circuit breaker: Trying recovery")
            else:
                raise Exception("Circuit breaker is OPEN")
        
        try:
            result = func(*args, **kwargs)
            
            if self.state == 'HALF_OPEN':
                self.state = 'CLOSED'
                self.failure_count = 0
                print("Circuit breaker: Recovered")
            
            return result
            
        except Exception as e:
            self.failure_count += 1
            self.last_failure = time.time()
            
            if self.failure_count >= self.failure_threshold:
                self.state = 'OPEN'
                print(f"Circuit breaker: OPEN after {self.failure_count} failures")
            
            raise

# Use circuit breaker
breaker = CircuitBreaker(failure_threshold=3, recovery_timeout=30)

def read_with_breaker(client, address, count):
    try:
        return breaker.call(client.read_holding_registers, address, count)
    except Exception as e:
        print(f"Failed: {e}")
        return None

Health Check

class ModbusHealthCheck:
    """Monitor Modbus device health."""
    
    def __init__(self, client, check_register=0):
        self.client = client
        self.check_register = check_register
        self.consecutive_failures = 0
        self.max_failures = 3
    
    def is_healthy(self):
        """Check if device is responding."""
        try:
            result = self.client.read_holding_registers(
                self.check_register, 1
            )
            
            if result.isError():
                self.consecutive_failures += 1
            else:
                self.consecutive_failures = 0
                return True
                
        except Exception:
            self.consecutive_failures += 1
        
        if self.consecutive_failures >= self.max_failures:
            logger.warning(f"Device unhealthy: {self.consecutive_failures} failures")
            return False
        
        return True
    
    def wait_until_healthy(self, timeout=60):
        """Wait for device to become healthy."""
        start = time.time()
        
        while time.time() - start < timeout:
            if self.is_healthy():
                return True
            time.sleep(2)
        
        return False

Always implement proper error handling in production. Unhandled errors can crash your application or corrupt data.

Error Recovery Strategies

  1. Retry with backoff - Don't hammer failing devices
  2. Circuit breaker - Stop trying when device is down
  3. Fallback values - Use last known good value
  4. Graceful degradation - Continue with reduced functionality
  5. Alert on persistent errors - Notify operators

Next Steps

How is this guide?