Error Handling Best Practices
Handle Modbus errors properly - exception codes, timeouts, retries, logging, recovery strategies.
Error Handling in PyModbus
Handle all the things that go wrong: timeouts, exceptions, connection failures, and invalid data.
Exception Types
from pymodbus.exceptions import (
ModbusException, # Base exception
ModbusIOException, # I/O errors
ParameterException, # Invalid parameters
NoSuchSlaveException, # Slave doesn't exist
NotImplementedException, # Function not supported
ConnectionException, # Connection issues
InvalidMessageReceivedException # Corrupt message
)
try:
result = client.read_holding_registers(0, 10)
except ModbusIOException as e:
print(f"I/O error: {e}")
except ConnectionException as e:
print(f"Connection error: {e}")
except ModbusException as e:
print(f"Modbus error: {e}")
except Exception as e:
print(f"Unexpected error: {e}")
Modbus Exception Codes
def decode_exception(result):
"""Decode Modbus exception response."""
if not result.isError():
return None
exceptions = {
0x01: "Illegal Function - Function code not supported",
0x02: "Illegal Data Address - Address not allowed",
0x03: "Illegal Data Value - Value out of range",
0x04: "Slave Device Failure - Device error",
0x05: "Acknowledge - Request accepted, processing",
0x06: "Slave Device Busy - Try again later",
0x08: "Memory Parity Error - Device memory error",
0x0A: "Gateway Path Unavailable - Gateway error",
0x0B: "Gateway Target Failed - Target not responding"
}
code = getattr(result, 'exception_code', None)
if code:
return exceptions.get(code, f"Unknown exception: {code}")
return str(result)
# Use it
result = client.read_holding_registers(9999, 10)
error = decode_exception(result)
if error:
print(f"Error: {error}")
Retry Logic
import time
from functools import wraps
def retry_on_error(max_retries=3, delay=1.0, backoff=2.0):
"""Decorator for automatic retry."""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
retries = 0
current_delay = delay
while retries < max_retries:
try:
result = func(*args, **kwargs)
# Check if Modbus error
if hasattr(result, 'isError') and result.isError():
raise ModbusException(f"Modbus error: {result}")
return result
except Exception as e:
retries += 1
if retries >= max_retries:
print(f"Failed after {max_retries} retries: {e}")
raise
print(f"Retry {retries}/{max_retries} after {current_delay}s")
time.sleep(current_delay)
current_delay *= backoff
return wrapper
return decorator
# Use decorator
@retry_on_error(max_retries=3, delay=0.5)
def read_temperature(client):
result = client.read_holding_registers(100, 1)
return result.registers[0] / 10.0
# Will retry automatically
temp = read_temperature(client)
Connection Recovery
class ResilientModbusClient:
"""Client that recovers from connection failures."""
def __init__(self, host, port=502):
self.host = host
self.port = port
self.client = None
self.connected = False
self.reconnect_delay = 1.0
self.max_reconnect_delay = 30.0
def connect(self):
"""Connect with exponential backoff."""
delay = self.reconnect_delay
while not self.connected:
try:
if self.client:
self.client.close()
self.client = ModbusTcpClient(self.host, self.port)
self.connected = self.client.connect()
if self.connected:
print(f"Connected to {self.host}")
self.reconnect_delay = 1.0 # Reset delay
return True
except Exception as e:
print(f"Connection failed: {e}")
print(f"Retrying in {delay}s...")
time.sleep(delay)
delay = min(delay * 2, self.max_reconnect_delay)
return False
def read(self, address, count, slave=1):
"""Read with automatic reconnection."""
if not self.connected:
self.connect()
try:
result = self.client.read_holding_registers(address, count, slave)
if result.isError():
# Check if connection error
if result.exception_code in [0x0A, 0x0B]:
self.connected = False
return self.read(address, count, slave) # Retry
return result
except Exception as e:
print(f"Read error: {e}")
self.connected = False
return self.read(address, count, slave) # Retry
Timeout Handling
import signal
from contextlib import contextmanager
class TimeoutError(Exception):
pass
@contextmanager
def timeout(seconds):
"""Context manager for operation timeout."""
def timeout_handler(signum, frame):
raise TimeoutError(f"Operation timed out after {seconds}s")
# Set alarm
signal.signal(signal.SIGALRM, timeout_handler)
signal.alarm(seconds)
try:
yield
finally:
signal.alarm(0) # Cancel alarm
# Use timeout
try:
with timeout(5):
# This must complete within 5 seconds
result = client.read_holding_registers(0, 100)
print(result.registers)
except TimeoutError as e:
print(f"Timeout: {e}")
Logging Errors
import logging
from datetime import datetime
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('modbus_errors.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger('modbus')
class LoggingModbusClient:
"""Client with comprehensive logging."""
def __init__(self, host):
self.host = host
self.client = ModbusTcpClient(host)
self.error_count = 0
self.success_count = 0
def read(self, address, count, slave=1):
"""Read with logging."""
start_time = time.time()
try:
result = self.client.read_holding_registers(address, count, slave)
elapsed = time.time() - start_time
if result.isError():
self.error_count += 1
logger.error(
f"Read error: addr={address}, count={count}, "
f"slave={slave}, error={result}, time={elapsed:.3f}s"
)
return None
self.success_count += 1
logger.debug(
f"Read success: addr={address}, count={count}, "
f"slave={slave}, time={elapsed:.3f}s"
)
return result.registers
except Exception as e:
self.error_count += 1
elapsed = time.time() - start_time
logger.exception(
f"Read exception: addr={address}, count={count}, "
f"slave={slave}, error={e}, time={elapsed:.3f}s"
)
return None
def get_stats(self):
"""Get error statistics."""
total = self.success_count + self.error_count
if total > 0:
error_rate = (self.error_count / total) * 100
logger.info(
f"Stats: {self.success_count} success, "
f"{self.error_count} errors ({error_rate:.1f}%)"
)
return self.success_count, self.error_count
Validation
def validate_register_value(value, min_val=None, max_val=None, name="Value"):
"""Validate register value."""
if value is None:
raise ValueError(f"{name} is None")
if not isinstance(value, (int, float)):
raise TypeError(f"{name} must be numeric, got {type(value)}")
if min_val is not None and value < min_val:
raise ValueError(f"{name} {value} below minimum {min_val}")
if max_val is not None and value > max_val:
raise ValueError(f"{name} {value} above maximum {max_val}")
return True
def safe_read_temperature(client):
"""Read temperature with validation."""
try:
result = client.read_holding_registers(100, 1)
if result.isError():
raise ModbusException(f"Read error: {result}")
raw_value = result.registers[0]
temperature = raw_value / 10.0
# Validate temperature range
validate_register_value(
temperature,
min_val=-50,
max_val=200,
name="Temperature"
)
return temperature
except (ModbusException, ValueError) as e:
logger.error(f"Temperature read failed: {e}")
return None
Circuit Breaker Pattern
class CircuitBreaker:
"""Circuit breaker for failing connections."""
def __init__(self, failure_threshold=5, recovery_timeout=60):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.failure_count = 0
self.last_failure = None
self.state = 'CLOSED' # CLOSED, OPEN, HALF_OPEN
def call(self, func, *args, **kwargs):
"""Call function with circuit breaker."""
if self.state == 'OPEN':
if time.time() - self.last_failure > self.recovery_timeout:
self.state = 'HALF_OPEN'
print("Circuit breaker: Trying recovery")
else:
raise Exception("Circuit breaker is OPEN")
try:
result = func(*args, **kwargs)
if self.state == 'HALF_OPEN':
self.state = 'CLOSED'
self.failure_count = 0
print("Circuit breaker: Recovered")
return result
except Exception as e:
self.failure_count += 1
self.last_failure = time.time()
if self.failure_count >= self.failure_threshold:
self.state = 'OPEN'
print(f"Circuit breaker: OPEN after {self.failure_count} failures")
raise
# Use circuit breaker
breaker = CircuitBreaker(failure_threshold=3, recovery_timeout=30)
def read_with_breaker(client, address, count):
try:
return breaker.call(client.read_holding_registers, address, count)
except Exception as e:
print(f"Failed: {e}")
return None
Health Check
class ModbusHealthCheck:
"""Monitor Modbus device health."""
def __init__(self, client, check_register=0):
self.client = client
self.check_register = check_register
self.consecutive_failures = 0
self.max_failures = 3
def is_healthy(self):
"""Check if device is responding."""
try:
result = self.client.read_holding_registers(
self.check_register, 1
)
if result.isError():
self.consecutive_failures += 1
else:
self.consecutive_failures = 0
return True
except Exception:
self.consecutive_failures += 1
if self.consecutive_failures >= self.max_failures:
logger.warning(f"Device unhealthy: {self.consecutive_failures} failures")
return False
return True
def wait_until_healthy(self, timeout=60):
"""Wait for device to become healthy."""
start = time.time()
while time.time() - start < timeout:
if self.is_healthy():
return True
time.sleep(2)
return False
Always implement proper error handling in production. Unhandled errors can crash your application or corrupt data.
Error Recovery Strategies
- Retry with backoff - Don't hammer failing devices
- Circuit breaker - Stop trying when device is down
- Fallback values - Use last known good value
- Graceful degradation - Continue with reduced functionality
- Alert on persistent errors - Notify operators
Next Steps
- TCP Server - Handle errors in servers
- Debugging - Debug Modbus issues
- Testing - Test error scenarios
How is this guide?