TCP Client
Connect to Modbus TCP devices with PyModbus. Connection parameters, context managers, timeouts, batch reads, and TLS encryption.
Connect to Modbus devices over Ethernet using TCP on port 502.
Basic Connection
from pymodbus.client import ModbusTcpClient
client = ModbusTcpClient(
host='192.168.1.100',
port=502,
timeout=3,
retries=3
)
if client.connect():
result = client.read_holding_registers(0, 10, slave=1)
if not result.isError():
print(f"Registers: {result.registers}")
client.close()
else:
print("Connection failed")Context Manager
PyModbus clients support with statements, so the connection closes automatically:
from pymodbus.client import ModbusTcpClient
with ModbusTcpClient('192.168.1.100', port=502, timeout=3) as client:
result = client.read_holding_registers(0, 10, slave=1)
if not result.isError():
print(result.registers)If you want to raise on connection failure, wrap it:
from contextlib import contextmanager
from pymodbus.client import ModbusTcpClient
@contextmanager
def modbus_tcp(host, port=502, timeout=3):
client = ModbusTcpClient(host, port=port, timeout=timeout)
if not client.connect():
raise ConnectionError(f"Cannot connect to {host}:{port}")
try:
yield client
finally:
client.close()
with modbus_tcp('192.168.1.100') as client:
result = client.read_holding_registers(0, 10, slave=1)
print(result.registers)Log Modbus data automatically
TofuPilot records test results from your PyModbus scripts, tracks pass/fail rates, and generates compliance reports. Free to start.
Timeout Handling
import socket
from pymodbus.client import ModbusTcpClient
from pymodbus.exceptions import ConnectionException, ModbusIOException
def read_with_timeout(host, address, count, slave=1, timeout=5):
"""Read registers with explicit timeout handling."""
client = ModbusTcpClient(host, port=502, timeout=timeout)
try:
if not client.connect():
return None, "Connection failed"
result = client.read_holding_registers(address, count, slave=slave)
if result.isError():
return None, f"Modbus error: {result}"
return result.registers, None
except socket.timeout:
return None, f"Timeout after {timeout}s"
except ConnectionException as e:
return None, f"Connection error: {e}"
except ModbusIOException as e:
return None, f"I/O error: {e}"
finally:
client.close()
data, error = read_with_timeout('192.168.1.100', 0, 10)
if error:
print(f"Failed: {error}")
else:
print(f"Data: {data}")Reconnection
For long-running scripts, handle dropped connections by reconnecting before each operation:
import time
from pymodbus.client import ModbusTcpClient
def poll_device(host, address, count, slave=1, interval=1.0, readings=100):
"""Poll a device with automatic reconnection."""
client = ModbusTcpClient(host, port=502, timeout=3)
for i in range(readings):
if not client.is_socket_open():
if not client.connect():
print(f"[{i}] Reconnection failed, retrying in {interval}s")
time.sleep(interval)
continue
try:
result = client.read_holding_registers(address, count, slave=slave)
if not result.isError():
print(f"[{i}] {result.registers}")
else:
print(f"[{i}] Error: {result}")
except Exception as e:
print(f"[{i}] Exception: {e}")
client.close()
time.sleep(interval)
client.close()
poll_device('192.168.1.100', 0, 10)Batch Reads
Reading multiple register ranges in a single session reduces overhead:
import time
from pymodbus.client import ModbusTcpClient
def batch_read(client, operations, slave=1):
"""Read multiple register ranges and return results."""
results = {}
for name, address, count in operations:
start = time.time()
result = client.read_holding_registers(address, count, slave=slave)
elapsed = time.time() - start
if not result.isError():
results[name] = {'data': result.registers, 'time_ms': elapsed * 1000}
else:
results[name] = {'error': str(result), 'time_ms': elapsed * 1000}
return results
operations = [
('temperature', 100, 1),
('pressure', 101, 1),
('flow_rate', 102, 2),
('status', 200, 1),
]
with ModbusTcpClient('192.168.1.100') as client:
results = batch_read(client, operations)
for name, data in results.items():
if 'data' in data:
print(f"{name}: {data['data']} ({data['time_ms']:.1f}ms)")
else:
print(f"{name}: ERROR - {data['error']}")TLS/SSL Connection
import ssl
from pymodbus.client import ModbusTcpClient
ssl_context = ssl.create_default_context()
ssl_context.check_hostname = False
ssl_context.verify_mode = ssl.CERT_NONE
client = ModbusTcpClient(
'192.168.1.100',
port=802,
sslctx=ssl_context
)
if client.connect():
result = client.read_holding_registers(0, 10, slave=1)
if not result.isError():
print(result.registers)
client.close()Network Diagnostics
Check connectivity without third-party dependencies:
import socket
from pymodbus.client import ModbusTcpClient
def diagnose_connection(host, port=502):
"""Check network and Modbus connectivity."""
print(f"Diagnosing {host}:{port}")
# Check if port is open
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(3)
result = sock.connect_ex((host, port))
sock.close()
if result != 0:
print(f"Port {port} closed or host unreachable")
return False
print(f"Port {port} open")
# Try Modbus connection
client = ModbusTcpClient(host, port=port, timeout=3)
if not client.connect():
print("Modbus connection failed")
return False
print("Modbus connection OK")
# Try a read
result = client.read_holding_registers(0, 1, slave=1)
if not result.isError():
print(f"Read OK: register 0 = {result.registers[0]}")
else:
print(f"Read failed: {result}")
client.close()
return True
diagnose_connection('192.168.1.100')Most industrial networks use static IPs. Document the IP address, port, and slave ID for every device on the bus.
Connection Parameters
| Parameter | Default | Description |
|---|---|---|
host | (required) | Device IP address |
port | 502 | TCP port |
timeout | 3 | Socket timeout in seconds |
retries | 3 | Number of retries on failure |
retry_on_empty | False | Retry when response is empty |
close_comm_on_error | False | Close socket on error |
strict | True | Use strict Modbus timing |
source_address | None | Bind to specific local IP |