Creating a Modbus TCP Client
Build robust Modbus TCP clients - connection handling, timeouts, retry logic, connection pooling.
Modbus TCP Client
Connect to Modbus devices over Ethernet. Handle connections, timeouts, and errors properly.
Basic TCP Client
from pymodbus.client import ModbusTcpClient
# Create client
client = ModbusTcpClient(
host='192.168.1.100', # IP address
port=502, # Default Modbus port
timeout=3, # Socket timeout
retries=3, # Retry count
retry_on_empty=False, # Retry on empty response
close_comm_on_error=False,
strict=True, # Strict timing
source_address=None # Source IP (optional)
)
# Connect
if client.connect():
print("Connected")
# Read registers
result = client.read_holding_registers(0, 10)
client.close()
else:
print("Connection failed")
Connection Management
Auto-reconnect Client
class ReconnectingModbusClient:
"""TCP client with automatic reconnection."""
def __init__(self, host, port=502, timeout=3):
self.host = host
self.port = port
self.timeout = timeout
self.client = None
self._connect()
def _connect(self):
"""Create and connect client."""
self.client = ModbusTcpClient(
self.host,
port=self.port,
timeout=self.timeout
)
return self.client.connect()
def _ensure_connected(self):
"""Ensure client is connected."""
if not self.client or not self.client.is_socket_open():
print(f"Reconnecting to {self.host}:{self.port}")
if not self._connect():
raise ConnectionError(f"Cannot connect to {self.host}:{self.port}")
def read_holding_registers(self, address, count, slave=1):
"""Read with automatic reconnection."""
self._ensure_connected()
try:
result = self.client.read_holding_registers(address, count, slave)
if result.isError():
print(f"Modbus error: {result}")
return result
except Exception as e:
print(f"Read failed: {e}")
self.client.close()
self.client = None
raise
def close(self):
"""Close connection."""
if self.client:
self.client.close()
# Use it
client = ReconnectingModbusClient('192.168.1.100')
for i in range(100):
result = client.read_holding_registers(0, 10)
time.sleep(1)
client.close()
Context Manager
from contextlib import contextmanager
@contextmanager
def modbus_client(host, port=502):
"""Context manager for Modbus TCP client."""
client = ModbusTcpClient(host, port=port)
try:
if not client.connect():
raise ConnectionError(f"Cannot connect to {host}:{port}")
yield client
finally:
client.close()
# Use it
with modbus_client('192.168.1.100') as client:
result = client.read_holding_registers(0, 10)
print(result.registers)
Connection Pool
from queue import Queue
import threading
class ModbusConnectionPool:
"""Connection pool for multiple clients."""
def __init__(self, host, port=502, pool_size=5):
self.host = host
self.port = port
self.pool = Queue(maxsize=pool_size)
self.lock = threading.Lock()
# Create initial connections
for _ in range(pool_size):
client = ModbusTcpClient(host, port=port)
if client.connect():
self.pool.put(client)
def get_client(self, timeout=10):
"""Get client from pool."""
client = self.pool.get(timeout=timeout)
# Check if still connected
if not client.is_socket_open():
client.connect()
return client
def return_client(self, client):
"""Return client to pool."""
self.pool.put(client)
def close_all(self):
"""Close all connections."""
while not self.pool.empty():
client = self.pool.get()
client.close()
# Use connection pool
pool = ModbusConnectionPool('192.168.1.100', pool_size=3)
def worker():
client = pool.get_client()
try:
result = client.read_holding_registers(0, 10)
print(result.registers)
finally:
pool.return_client(client)
# Run multiple workers
threads = [threading.Thread(target=worker) for _ in range(10)]
for t in threads:
t.start()
for t in threads:
t.join()
pool.close_all()
Timeout Handling
import socket
from pymodbus.exceptions import ConnectionException, ModbusIOException
def read_with_timeout(host, address, count, timeout=5):
"""Read with custom timeout handling."""
client = ModbusTcpClient(
host,
port=502,
timeout=timeout
)
try:
if not client.connect():
return None, "Connection failed"
# Set socket timeout
client.comm_params.timeout = timeout
result = client.read_holding_registers(address, count)
if result.isError():
return None, f"Modbus error: {result}"
return result.registers, None
except socket.timeout:
return None, f"Timeout after {timeout} seconds"
except ConnectionException as e:
return None, f"Connection error: {e}"
except ModbusIOException as e:
return None, f"I/O error: {e}"
finally:
client.close()
# Use it
data, error = read_with_timeout('192.168.1.100', 0, 10, timeout=2)
if error:
print(f"Failed: {error}")
else:
print(f"Data: {data}")
Multiple Device Management
class ModbusDeviceManager:
"""Manage multiple Modbus TCP devices."""
def __init__(self):
self.devices = {}
def add_device(self, name, host, port=502, slave=1):
"""Add device to manager."""
self.devices[name] = {
'host': host,
'port': port,
'slave': slave,
'client': None
}
def connect(self, name):
"""Connect to specific device."""
if name not in self.devices:
return False
device = self.devices[name]
client = ModbusTcpClient(device['host'], port=device['port'])
if client.connect():
device['client'] = client
return True
return False
def read_registers(self, name, address, count):
"""Read from specific device."""
if name not in self.devices:
return None
device = self.devices[name]
if not device['client']:
if not self.connect(name):
return None
return device['client'].read_holding_registers(
address, count, device['slave']
)
def disconnect_all(self):
"""Disconnect all devices."""
for device in self.devices.values():
if device['client']:
device['client'].close()
device['client'] = None
# Use it
manager = ModbusDeviceManager()
manager.add_device('plc1', '192.168.1.10', slave=1)
manager.add_device('plc2', '192.168.1.11', slave=1)
manager.add_device('meter', '192.168.1.20', slave=2)
# Read from different devices
plc1_data = manager.read_registers('plc1', 0, 10)
plc2_data = manager.read_registers('plc2', 100, 5)
meter_data = manager.read_registers('meter', 0, 20)
manager.disconnect_all()
TLS/SSL Secure Connection
import ssl
# Create SSL context
ssl_context = ssl.create_default_context()
ssl_context.check_hostname = False
ssl_context.verify_mode = ssl.CERT_NONE
# Create secure client
client = ModbusTcpClient(
'192.168.1.100',
port=802, # Common secure Modbus port
sslctx=ssl_context,
certfile='client.crt', # Optional client cert
keyfile='client.key' # Optional client key
)
if client.connect():
result = client.read_holding_registers(0, 10)
client.close()
Performance Optimization
Keep-Alive Connection
class PersistentModbusClient:
"""Keep connection alive for better performance."""
def __init__(self, host, port=502):
self.client = ModbusTcpClient(host, port=port)
self.connected = False
self.last_activity = 0
self.keepalive_interval = 30 # seconds
def _keepalive(self):
"""Send keepalive to maintain connection."""
if time.time() - self.last_activity > self.keepalive_interval:
try:
# Read one register as keepalive
self.client.read_holding_registers(0, 1)
self.last_activity = time.time()
except:
self.connected = False
def read(self, address, count):
"""Read with keepalive."""
if not self.connected:
self.connected = self.client.connect()
if self.connected:
self._keepalive()
result = self.client.read_holding_registers(address, count)
self.last_activity = time.time()
return result
return None
Batch Operations
def batch_read(client, operations):
"""Perform multiple reads efficiently."""
results = {}
for name, address, count in operations:
start = time.time()
result = client.read_holding_registers(address, count)
elapsed = time.time() - start
results[name] = {
'data': result.registers if not result.isError() else None,
'error': str(result) if result.isError() else None,
'time': elapsed
}
return results
# Define batch operations
operations = [
('temperature', 100, 1),
('pressure', 101, 1),
('flow_rate', 102, 2),
('status', 200, 1),
]
# Execute batch
with modbus_client('192.168.1.100') as client:
results = batch_read(client, operations)
for name, data in results.items():
if data['data']:
print(f"{name}: {data['data']} ({data['time']:.3f}s)")
else:
print(f"{name}: ERROR - {data['error']}")
Network Diagnostics
import ping3
def diagnose_connection(host, port=502):
"""Diagnose connection issues."""
print(f"Diagnosing {host}:{port}")
# Check if host is reachable
ping_time = ping3.ping(host)
if ping_time is None:
print(f"✗ Host unreachable")
return False
print(f"✓ Ping: {ping_time*1000:.1f}ms")
# Check if port is open
import socket
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(3)
result = sock.connect_ex((host, port))
sock.close()
if result == 0:
print(f"✓ Port {port} open")
else:
print(f"✗ Port {port} closed")
return False
# Try Modbus connection
client = ModbusTcpClient(host, port=port, timeout=3)
if client.connect():
print(f"✓ Modbus connection successful")
# Try reading
result = client.read_holding_registers(0, 1)
if not result.isError():
print(f"✓ Modbus communication working")
else:
print(f"⚠ Modbus error: {result}")
client.close()
return True
else:
print(f"✗ Modbus connection failed")
return False
# Diagnose
diagnose_connection('192.168.1.100')
Most industrial networks use static IPs. Always document IP addresses and port numbers.
Error Recovery
class RobustModbusClient:
"""Client with comprehensive error handling."""
def __init__(self, host, port=502):
self.host = host
self.port = port
self.client = None
self.error_count = 0
self.max_errors = 3
def read_safe(self, address, count, slave=1):
"""Read with full error recovery."""
try:
# Ensure connected
if not self.client:
self.client = ModbusTcpClient(self.host, self.port)
if not self.client.connect():
raise ConnectionError("Cannot connect")
# Read data
result = self.client.read_holding_registers(address, count, slave)
# Check result
if result.isError():
self.error_count += 1
if self.error_count >= self.max_errors:
# Too many errors, reset connection
self.reset()
raise Exception(f"Too many errors: {result}")
return None
# Success, reset error count
self.error_count = 0
return result.registers
except Exception as e:
print(f"Error: {e}")
self.reset()
return None
def reset(self):
"""Reset connection."""
if self.client:
try:
self.client.close()
except:
pass
self.client = None
self.error_count = 0
Next Steps
- RTU Client - Serial communication
- Async Client - High-performance async
- Error Handling - Handle all error cases
How is this guide?