Creating a Modbus TCP Client
Build robust Modbus TCP clients with PyModbus. Connection handling, timeouts, retry logic, and connection pooling best practices.
Modbus TCP Client
Connect to Modbus devices over Ethernet. Handle connections, timeouts, and errors properly.
Basic TCP Client
from pymodbus.client import ModbusTcpClient
# Create client
client = ModbusTcpClient(
host='192.168.1.100', # IP address
port=502, # Default Modbus port
timeout=3, # Socket timeout
retries=3, # Retry count
retry_on_empty=False, # Retry on empty response
close_comm_on_error=False,
strict=True, # Strict timing
source_address=None # Source IP (optional)
)
# Connect
if client.connect():
print("Connected")
# Read registers
result = client.read_holding_registers(0, 10)
client.close()
else:
print("Connection failed")Connection Management
Auto-reconnect Client
class ReconnectingModbusClient:
"""TCP client with automatic reconnection."""
def __init__(self, host, port=502, timeout=3):
self.host = host
self.port = port
self.timeout = timeout
self.client = None
self._connect()
def _connect(self):
"""Create and connect client."""
self.client = ModbusTcpClient(
self.host,
port=self.port,
timeout=self.timeout
)
return self.client.connect()
def _ensure_connected(self):
"""Ensure client is connected."""
if not self.client or not self.client.is_socket_open():
print(f"Reconnecting to {self.host}:{self.port}")
if not self._connect():
raise ConnectionError(f"Cannot connect to {self.host}:{self.port}")
def read_holding_registers(self, address, count, slave=1):
"""Read with automatic reconnection."""
self._ensure_connected()
try:
result = self.client.read_holding_registers(address, count, slave)
if result.isError():
print(f"Modbus error: {result}")
return result
except Exception as e:
print(f"Read failed: {e}")
self.client.close()
self.client = None
raise
def close(self):
"""Close connection."""
if self.client:
self.client.close()
# Use it
client = ReconnectingModbusClient('192.168.1.100')
for i in range(100):
result = client.read_holding_registers(0, 10)
time.sleep(1)
client.close()Context Manager
from contextlib import contextmanager
@contextmanager
def modbus_client(host, port=502):
"""Context manager for Modbus TCP client."""
client = ModbusTcpClient(host, port=port)
try:
if not client.connect():
raise ConnectionError(f"Cannot connect to {host}:{port}")
yield client
finally:
client.close()
# Use it
with modbus_client('192.168.1.100') as client:
result = client.read_holding_registers(0, 10)
print(result.registers)Connection Pool
from queue import Queue
import threading
class ModbusConnectionPool:
"""Connection pool for multiple clients."""
def __init__(self, host, port=502, pool_size=5):
self.host = host
self.port = port
self.pool = Queue(maxsize=pool_size)
self.lock = threading.Lock()
# Create initial connections
for _ in range(pool_size):
client = ModbusTcpClient(host, port=port)
if client.connect():
self.pool.put(client)
def get_client(self, timeout=10):
"""Get client from pool."""
client = self.pool.get(timeout=timeout)
# Check if still connected
if not client.is_socket_open():
client.connect()
return client
def return_client(self, client):
"""Return client to pool."""
self.pool.put(client)
def close_all(self):
"""Close all connections."""
while not self.pool.empty():
client = self.pool.get()
client.close()
# Use connection pool
pool = ModbusConnectionPool('192.168.1.100', pool_size=3)
def worker():
client = pool.get_client()
try:
result = client.read_holding_registers(0, 10)
print(result.registers)
finally:
pool.return_client(client)
# Run multiple workers
threads = [threading.Thread(target=worker) for _ in range(10)]
for t in threads:
t.start()
for t in threads:
t.join()
pool.close_all()Timeout Handling
import socket
from pymodbus.exceptions import ConnectionException, ModbusIOException
def read_with_timeout(host, address, count, timeout=5):
"""Read with custom timeout handling."""
client = ModbusTcpClient(
host,
port=502,
timeout=timeout
)
try:
if not client.connect():
return None, "Connection failed"
# Set socket timeout
client.comm_params.timeout = timeout
result = client.read_holding_registers(address, count)
if result.isError():
return None, f"Modbus error: {result}"
return result.registers, None
except socket.timeout:
return None, f"Timeout after {timeout} seconds"
except ConnectionException as e:
return None, f"Connection error: {e}"
except ModbusIOException as e:
return None, f"I/O error: {e}"
finally:
client.close()
# Use it
data, error = read_with_timeout('192.168.1.100', 0, 10, timeout=2)
if error:
print(f"Failed: {error}")
else:
print(f"Data: {data}")Multiple Device Management
class ModbusDeviceManager:
"""Manage multiple Modbus TCP devices."""
def __init__(self):
self.devices = {}
def add_device(self, name, host, port=502, slave=1):
"""Add device to manager."""
self.devices[name] = {
'host': host,
'port': port,
'slave': slave,
'client': None
}
def connect(self, name):
"""Connect to specific device."""
if name not in self.devices:
return False
device = self.devices[name]
client = ModbusTcpClient(device['host'], port=device['port'])
if client.connect():
device['client'] = client
return True
return False
def read_registers(self, name, address, count):
"""Read from specific device."""
if name not in self.devices:
return None
device = self.devices[name]
if not device['client']:
if not self.connect(name):
return None
return device['client'].read_holding_registers(
address, count, device['slave']
)
def disconnect_all(self):
"""Disconnect all devices."""
for device in self.devices.values():
if device['client']:
device['client'].close()
device['client'] = None
# Use it
manager = ModbusDeviceManager()
manager.add_device('plc1', '192.168.1.10', slave=1)
manager.add_device('plc2', '192.168.1.11', slave=1)
manager.add_device('meter', '192.168.1.20', slave=2)
# Read from different devices
plc1_data = manager.read_registers('plc1', 0, 10)
plc2_data = manager.read_registers('plc2', 100, 5)
meter_data = manager.read_registers('meter', 0, 20)
manager.disconnect_all()TLS/SSL Secure Connection
import ssl
# Create SSL context
ssl_context = ssl.create_default_context()
ssl_context.check_hostname = False
ssl_context.verify_mode = ssl.CERT_NONE
# Create secure client
client = ModbusTcpClient(
'192.168.1.100',
port=802, # Common secure Modbus port
sslctx=ssl_context,
certfile='client.crt', # Optional client cert
keyfile='client.key' # Optional client key
)
if client.connect():
result = client.read_holding_registers(0, 10)
client.close()Performance Optimization
Keep-Alive Connection
class PersistentModbusClient:
"""Keep connection alive for better performance."""
def __init__(self, host, port=502):
self.client = ModbusTcpClient(host, port=port)
self.connected = False
self.last_activity = 0
self.keepalive_interval = 30 # seconds
def _keepalive(self):
"""Send keepalive to maintain connection."""
if time.time() - self.last_activity > self.keepalive_interval:
try:
# Read one register as keepalive
self.client.read_holding_registers(0, 1)
self.last_activity = time.time()
except:
self.connected = False
def read(self, address, count):
"""Read with keepalive."""
if not self.connected:
self.connected = self.client.connect()
if self.connected:
self._keepalive()
result = self.client.read_holding_registers(address, count)
self.last_activity = time.time()
return result
return NoneBatch Operations
def batch_read(client, operations):
"""Perform multiple reads efficiently."""
results = {}
for name, address, count in operations:
start = time.time()
result = client.read_holding_registers(address, count)
elapsed = time.time() - start
results[name] = {
'data': result.registers if not result.isError() else None,
'error': str(result) if result.isError() else None,
'time': elapsed
}
return results
# Define batch operations
operations = [
('temperature', 100, 1),
('pressure', 101, 1),
('flow_rate', 102, 2),
('status', 200, 1),
]
# Execute batch
with modbus_client('192.168.1.100') as client:
results = batch_read(client, operations)
for name, data in results.items():
if data['data']:
print(f"{name}: {data['data']} ({data['time']:.3f}s)")
else:
print(f"{name}: ERROR - {data['error']}")Network Diagnostics
import ping3
def diagnose_connection(host, port=502):
"""Diagnose connection issues."""
print(f"Diagnosing {host}:{port}")
# Check if host is reachable
ping_time = ping3.ping(host)
if ping_time is None:
print(f"✗ Host unreachable")
return False
print(f"✓ Ping: {ping_time*1000:.1f}ms")
# Check if port is open
import socket
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(3)
result = sock.connect_ex((host, port))
sock.close()
if result == 0:
print(f"✓ Port {port} open")
else:
print(f"✗ Port {port} closed")
return False
# Try Modbus connection
client = ModbusTcpClient(host, port=port, timeout=3)
if client.connect():
print(f"✓ Modbus connection successful")
# Try reading
result = client.read_holding_registers(0, 1)
if not result.isError():
print(f"✓ Modbus communication working")
else:
print(f"⚠ Modbus error: {result}")
client.close()
return True
else:
print(f"✗ Modbus connection failed")
return False
# Diagnose
diagnose_connection('192.168.1.100')Most industrial networks use static IPs. Always document IP addresses and port numbers.
Error Recovery
class RobustModbusClient:
"""Client with comprehensive error handling."""
def __init__(self, host, port=502):
self.host = host
self.port = port
self.client = None
self.error_count = 0
self.max_errors = 3
def read_safe(self, address, count, slave=1):
"""Read with full error recovery."""
try:
# Ensure connected
if not self.client:
self.client = ModbusTcpClient(self.host, self.port)
if not self.client.connect():
raise ConnectionError("Cannot connect")
# Read data
result = self.client.read_holding_registers(address, count, slave)
# Check result
if result.isError():
self.error_count += 1
if self.error_count >= self.max_errors:
# Too many errors, reset connection
self.reset()
raise Exception(f"Too many errors: {result}")
return None
# Success, reset error count
self.error_count = 0
return result.registers
except Exception as e:
print(f"Error: {e}")
self.reset()
return None
def reset(self):
"""Reset connection."""
if self.client:
try:
self.client.close()
except:
pass
self.client = None
self.error_count = 0Next Steps
- RTU Client - Serial communication
- Async Client - High-performance async
- Error Handling - Handle all error cases