PyModbus
PyModbusDocs

Creating a Modbus TCP Client

Build robust Modbus TCP clients - connection handling, timeouts, retry logic, connection pooling.

Modbus TCP Client

Connect to Modbus devices over Ethernet. Handle connections, timeouts, and errors properly.

Basic TCP Client

from pymodbus.client import ModbusTcpClient

# Create client
client = ModbusTcpClient(
    host='192.168.1.100',  # IP address
    port=502,               # Default Modbus port
    timeout=3,              # Socket timeout
    retries=3,              # Retry count
    retry_on_empty=False,   # Retry on empty response
    close_comm_on_error=False,
    strict=True,            # Strict timing
    source_address=None     # Source IP (optional)
)

# Connect
if client.connect():
    print("Connected")
    # Read registers
    result = client.read_holding_registers(0, 10)
    client.close()
else:
    print("Connection failed")

Connection Management

Auto-reconnect Client

class ReconnectingModbusClient:
    """TCP client with automatic reconnection."""
    
    def __init__(self, host, port=502, timeout=3):
        self.host = host
        self.port = port
        self.timeout = timeout
        self.client = None
        self._connect()
    
    def _connect(self):
        """Create and connect client."""
        self.client = ModbusTcpClient(
            self.host,
            port=self.port,
            timeout=self.timeout
        )
        return self.client.connect()
    
    def _ensure_connected(self):
        """Ensure client is connected."""
        if not self.client or not self.client.is_socket_open():
            print(f"Reconnecting to {self.host}:{self.port}")
            if not self._connect():
                raise ConnectionError(f"Cannot connect to {self.host}:{self.port}")
    
    def read_holding_registers(self, address, count, slave=1):
        """Read with automatic reconnection."""
        self._ensure_connected()
        try:
            result = self.client.read_holding_registers(address, count, slave)
            if result.isError():
                print(f"Modbus error: {result}")
            return result
        except Exception as e:
            print(f"Read failed: {e}")
            self.client.close()
            self.client = None
            raise
    
    def close(self):
        """Close connection."""
        if self.client:
            self.client.close()

# Use it
client = ReconnectingModbusClient('192.168.1.100')
for i in range(100):
    result = client.read_holding_registers(0, 10)
    time.sleep(1)
client.close()

Context Manager

from contextlib import contextmanager

@contextmanager
def modbus_client(host, port=502):
    """Context manager for Modbus TCP client."""
    client = ModbusTcpClient(host, port=port)
    try:
        if not client.connect():
            raise ConnectionError(f"Cannot connect to {host}:{port}")
        yield client
    finally:
        client.close()

# Use it
with modbus_client('192.168.1.100') as client:
    result = client.read_holding_registers(0, 10)
    print(result.registers)

Connection Pool

from queue import Queue
import threading

class ModbusConnectionPool:
    """Connection pool for multiple clients."""
    
    def __init__(self, host, port=502, pool_size=5):
        self.host = host
        self.port = port
        self.pool = Queue(maxsize=pool_size)
        self.lock = threading.Lock()
        
        # Create initial connections
        for _ in range(pool_size):
            client = ModbusTcpClient(host, port=port)
            if client.connect():
                self.pool.put(client)
    
    def get_client(self, timeout=10):
        """Get client from pool."""
        client = self.pool.get(timeout=timeout)
        
        # Check if still connected
        if not client.is_socket_open():
            client.connect()
        
        return client
    
    def return_client(self, client):
        """Return client to pool."""
        self.pool.put(client)
    
    def close_all(self):
        """Close all connections."""
        while not self.pool.empty():
            client = self.pool.get()
            client.close()

# Use connection pool
pool = ModbusConnectionPool('192.168.1.100', pool_size=3)

def worker():
    client = pool.get_client()
    try:
        result = client.read_holding_registers(0, 10)
        print(result.registers)
    finally:
        pool.return_client(client)

# Run multiple workers
threads = [threading.Thread(target=worker) for _ in range(10)]
for t in threads:
    t.start()
for t in threads:
    t.join()

pool.close_all()

Timeout Handling

import socket
from pymodbus.exceptions import ConnectionException, ModbusIOException

def read_with_timeout(host, address, count, timeout=5):
    """Read with custom timeout handling."""
    client = ModbusTcpClient(
        host,
        port=502,
        timeout=timeout
    )
    
    try:
        if not client.connect():
            return None, "Connection failed"
        
        # Set socket timeout
        client.comm_params.timeout = timeout
        
        result = client.read_holding_registers(address, count)
        
        if result.isError():
            return None, f"Modbus error: {result}"
        
        return result.registers, None
        
    except socket.timeout:
        return None, f"Timeout after {timeout} seconds"
    except ConnectionException as e:
        return None, f"Connection error: {e}"
    except ModbusIOException as e:
        return None, f"I/O error: {e}"
    finally:
        client.close()

# Use it
data, error = read_with_timeout('192.168.1.100', 0, 10, timeout=2)
if error:
    print(f"Failed: {error}")
else:
    print(f"Data: {data}")

Multiple Device Management

class ModbusDeviceManager:
    """Manage multiple Modbus TCP devices."""
    
    def __init__(self):
        self.devices = {}
    
    def add_device(self, name, host, port=502, slave=1):
        """Add device to manager."""
        self.devices[name] = {
            'host': host,
            'port': port,
            'slave': slave,
            'client': None
        }
    
    def connect(self, name):
        """Connect to specific device."""
        if name not in self.devices:
            return False
        
        device = self.devices[name]
        client = ModbusTcpClient(device['host'], port=device['port'])
        
        if client.connect():
            device['client'] = client
            return True
        return False
    
    def read_registers(self, name, address, count):
        """Read from specific device."""
        if name not in self.devices:
            return None
        
        device = self.devices[name]
        if not device['client']:
            if not self.connect(name):
                return None
        
        return device['client'].read_holding_registers(
            address, count, device['slave']
        )
    
    def disconnect_all(self):
        """Disconnect all devices."""
        for device in self.devices.values():
            if device['client']:
                device['client'].close()
                device['client'] = None

# Use it
manager = ModbusDeviceManager()
manager.add_device('plc1', '192.168.1.10', slave=1)
manager.add_device('plc2', '192.168.1.11', slave=1)
manager.add_device('meter', '192.168.1.20', slave=2)

# Read from different devices
plc1_data = manager.read_registers('plc1', 0, 10)
plc2_data = manager.read_registers('plc2', 100, 5)
meter_data = manager.read_registers('meter', 0, 20)

manager.disconnect_all()

TLS/SSL Secure Connection

import ssl

# Create SSL context
ssl_context = ssl.create_default_context()
ssl_context.check_hostname = False
ssl_context.verify_mode = ssl.CERT_NONE

# Create secure client
client = ModbusTcpClient(
    '192.168.1.100',
    port=802,  # Common secure Modbus port
    sslctx=ssl_context,
    certfile='client.crt',  # Optional client cert
    keyfile='client.key'     # Optional client key
)

if client.connect():
    result = client.read_holding_registers(0, 10)
    client.close()

Performance Optimization

Keep-Alive Connection

class PersistentModbusClient:
    """Keep connection alive for better performance."""
    
    def __init__(self, host, port=502):
        self.client = ModbusTcpClient(host, port=port)
        self.connected = False
        self.last_activity = 0
        self.keepalive_interval = 30  # seconds
    
    def _keepalive(self):
        """Send keepalive to maintain connection."""
        if time.time() - self.last_activity > self.keepalive_interval:
            try:
                # Read one register as keepalive
                self.client.read_holding_registers(0, 1)
                self.last_activity = time.time()
            except:
                self.connected = False
    
    def read(self, address, count):
        """Read with keepalive."""
        if not self.connected:
            self.connected = self.client.connect()
        
        if self.connected:
            self._keepalive()
            result = self.client.read_holding_registers(address, count)
            self.last_activity = time.time()
            return result
        return None

Batch Operations

def batch_read(client, operations):
    """Perform multiple reads efficiently."""
    results = {}
    
    for name, address, count in operations:
        start = time.time()
        result = client.read_holding_registers(address, count)
        elapsed = time.time() - start
        
        results[name] = {
            'data': result.registers if not result.isError() else None,
            'error': str(result) if result.isError() else None,
            'time': elapsed
        }
    
    return results

# Define batch operations
operations = [
    ('temperature', 100, 1),
    ('pressure', 101, 1),
    ('flow_rate', 102, 2),
    ('status', 200, 1),
]

# Execute batch
with modbus_client('192.168.1.100') as client:
    results = batch_read(client, operations)
    
    for name, data in results.items():
        if data['data']:
            print(f"{name}: {data['data']} ({data['time']:.3f}s)")
        else:
            print(f"{name}: ERROR - {data['error']}")

Network Diagnostics

import ping3

def diagnose_connection(host, port=502):
    """Diagnose connection issues."""
    print(f"Diagnosing {host}:{port}")
    
    # Check if host is reachable
    ping_time = ping3.ping(host)
    if ping_time is None:
        print(f"✗ Host unreachable")
        return False
    print(f"✓ Ping: {ping_time*1000:.1f}ms")
    
    # Check if port is open
    import socket
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.settimeout(3)
    result = sock.connect_ex((host, port))
    sock.close()
    
    if result == 0:
        print(f"✓ Port {port} open")
    else:
        print(f"✗ Port {port} closed")
        return False
    
    # Try Modbus connection
    client = ModbusTcpClient(host, port=port, timeout=3)
    if client.connect():
        print(f"✓ Modbus connection successful")
        
        # Try reading
        result = client.read_holding_registers(0, 1)
        if not result.isError():
            print(f"✓ Modbus communication working")
        else:
            print(f"⚠ Modbus error: {result}")
        
        client.close()
        return True
    else:
        print(f"✗ Modbus connection failed")
        return False

# Diagnose
diagnose_connection('192.168.1.100')

Most industrial networks use static IPs. Always document IP addresses and port numbers.

Error Recovery

class RobustModbusClient:
    """Client with comprehensive error handling."""
    
    def __init__(self, host, port=502):
        self.host = host
        self.port = port
        self.client = None
        self.error_count = 0
        self.max_errors = 3
    
    def read_safe(self, address, count, slave=1):
        """Read with full error recovery."""
        try:
            # Ensure connected
            if not self.client:
                self.client = ModbusTcpClient(self.host, self.port)
                if not self.client.connect():
                    raise ConnectionError("Cannot connect")
            
            # Read data
            result = self.client.read_holding_registers(address, count, slave)
            
            # Check result
            if result.isError():
                self.error_count += 1
                if self.error_count >= self.max_errors:
                    # Too many errors, reset connection
                    self.reset()
                    raise Exception(f"Too many errors: {result}")
                return None
            
            # Success, reset error count
            self.error_count = 0
            return result.registers
            
        except Exception as e:
            print(f"Error: {e}")
            self.reset()
            return None
    
    def reset(self):
        """Reset connection."""
        if self.client:
            try:
                self.client.close()
            except:
                pass
            self.client = None
        self.error_count = 0

Next Steps

How is this guide?