PyModbus
PyModbusDocs

TCP Client

Connect to Modbus TCP devices with PyModbus. Connection parameters, context managers, timeouts, batch reads, and TLS encryption.

Connect to Modbus devices over Ethernet using TCP on port 502.

Basic Connection

from pymodbus.client import ModbusTcpClient

client = ModbusTcpClient(
    host='192.168.1.100',
    port=502,
    timeout=3,
    retries=3
)

if client.connect():
    result = client.read_holding_registers(0, 10, slave=1)
    if not result.isError():
        print(f"Registers: {result.registers}")
    client.close()
else:
    print("Connection failed")

Context Manager

PyModbus clients support with statements, so the connection closes automatically:

from pymodbus.client import ModbusTcpClient

with ModbusTcpClient('192.168.1.100', port=502, timeout=3) as client:
    result = client.read_holding_registers(0, 10, slave=1)
    if not result.isError():
        print(result.registers)

If you want to raise on connection failure, wrap it:

from contextlib import contextmanager
from pymodbus.client import ModbusTcpClient

@contextmanager
def modbus_tcp(host, port=502, timeout=3):
    client = ModbusTcpClient(host, port=port, timeout=timeout)
    if not client.connect():
        raise ConnectionError(f"Cannot connect to {host}:{port}")
    try:
        yield client
    finally:
        client.close()

with modbus_tcp('192.168.1.100') as client:
    result = client.read_holding_registers(0, 10, slave=1)
    print(result.registers)

Log Modbus data automatically

TofuPilot records test results from your PyModbus scripts, tracks pass/fail rates, and generates compliance reports. Free to start.

Timeout Handling

import socket
from pymodbus.client import ModbusTcpClient
from pymodbus.exceptions import ConnectionException, ModbusIOException

def read_with_timeout(host, address, count, slave=1, timeout=5):
    """Read registers with explicit timeout handling."""
    client = ModbusTcpClient(host, port=502, timeout=timeout)

    try:
        if not client.connect():
            return None, "Connection failed"

        result = client.read_holding_registers(address, count, slave=slave)

        if result.isError():
            return None, f"Modbus error: {result}"

        return result.registers, None

    except socket.timeout:
        return None, f"Timeout after {timeout}s"
    except ConnectionException as e:
        return None, f"Connection error: {e}"
    except ModbusIOException as e:
        return None, f"I/O error: {e}"
    finally:
        client.close()

data, error = read_with_timeout('192.168.1.100', 0, 10)
if error:
    print(f"Failed: {error}")
else:
    print(f"Data: {data}")

Reconnection

For long-running scripts, handle dropped connections by reconnecting before each operation:

import time
from pymodbus.client import ModbusTcpClient

def poll_device(host, address, count, slave=1, interval=1.0, readings=100):
    """Poll a device with automatic reconnection."""
    client = ModbusTcpClient(host, port=502, timeout=3)

    for i in range(readings):
        if not client.is_socket_open():
            if not client.connect():
                print(f"[{i}] Reconnection failed, retrying in {interval}s")
                time.sleep(interval)
                continue

        try:
            result = client.read_holding_registers(address, count, slave=slave)
            if not result.isError():
                print(f"[{i}] {result.registers}")
            else:
                print(f"[{i}] Error: {result}")
        except Exception as e:
            print(f"[{i}] Exception: {e}")
            client.close()

        time.sleep(interval)

    client.close()

poll_device('192.168.1.100', 0, 10)

Batch Reads

Reading multiple register ranges in a single session reduces overhead:

import time
from pymodbus.client import ModbusTcpClient

def batch_read(client, operations, slave=1):
    """Read multiple register ranges and return results."""
    results = {}
    for name, address, count in operations:
        start = time.time()
        result = client.read_holding_registers(address, count, slave=slave)
        elapsed = time.time() - start

        if not result.isError():
            results[name] = {'data': result.registers, 'time_ms': elapsed * 1000}
        else:
            results[name] = {'error': str(result), 'time_ms': elapsed * 1000}

    return results

operations = [
    ('temperature', 100, 1),
    ('pressure', 101, 1),
    ('flow_rate', 102, 2),
    ('status', 200, 1),
]

with ModbusTcpClient('192.168.1.100') as client:
    results = batch_read(client, operations)

    for name, data in results.items():
        if 'data' in data:
            print(f"{name}: {data['data']} ({data['time_ms']:.1f}ms)")
        else:
            print(f"{name}: ERROR - {data['error']}")

TLS/SSL Connection

import ssl
from pymodbus.client import ModbusTcpClient

ssl_context = ssl.create_default_context()
ssl_context.check_hostname = False
ssl_context.verify_mode = ssl.CERT_NONE

client = ModbusTcpClient(
    '192.168.1.100',
    port=802,
    sslctx=ssl_context
)

if client.connect():
    result = client.read_holding_registers(0, 10, slave=1)
    if not result.isError():
        print(result.registers)
    client.close()

Network Diagnostics

Check connectivity without third-party dependencies:

import socket
from pymodbus.client import ModbusTcpClient

def diagnose_connection(host, port=502):
    """Check network and Modbus connectivity."""
    print(f"Diagnosing {host}:{port}")

    # Check if port is open
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.settimeout(3)
    result = sock.connect_ex((host, port))
    sock.close()

    if result != 0:
        print(f"Port {port} closed or host unreachable")
        return False
    print(f"Port {port} open")

    # Try Modbus connection
    client = ModbusTcpClient(host, port=port, timeout=3)
    if not client.connect():
        print("Modbus connection failed")
        return False
    print("Modbus connection OK")

    # Try a read
    result = client.read_holding_registers(0, 1, slave=1)
    if not result.isError():
        print(f"Read OK: register 0 = {result.registers[0]}")
    else:
        print(f"Read failed: {result}")

    client.close()
    return True

diagnose_connection('192.168.1.100')

Most industrial networks use static IPs. Document the IP address, port, and slave ID for every device on the bus.

Connection Parameters

ParameterDefaultDescription
host(required)Device IP address
port502TCP port
timeout3Socket timeout in seconds
retries3Number of retries on failure
retry_on_emptyFalseRetry when response is empty
close_comm_on_errorFalseClose socket on error
strictTrueUse strict Modbus timing
source_addressNoneBind to specific local IP