PyModbus Quick Start Guide - Get Started in 5 Minutes
Start using PyModbus in 5 minutes. Simple examples for reading and writing Modbus registers with both TCP and RTU clients.
PyModbus Quick Start Guide
Get up and running with PyModbus in 5 minutes. This guide shows you how to create your first Modbus client and start communicating with devices immediately.
Your First PyModbus Client
Install PyModbus
pip install pymodbus
Create a Simple Client
from pymodbus.client import ModbusTcpClient
# Connect to a Modbus device
client = ModbusTcpClient('192.168.1.100') # Replace with your device IP
client.connect()
# Read 10 holding registers starting at address 0
result = client.read_holding_registers(address=0, count=10)
if not result.isError():
print(f"Register values: {result.registers}")
else:
print(f"Error reading registers: {result}")
# Close connection
client.close()
Run Your Code
python modbus_client.py
Complete Working Examples
Example 1: Read Temperature Sensor
#!/usr/bin/env python3
"""Read temperature from a Modbus sensor."""
from pymodbus.client import ModbusTcpClient
def read_temperature():
# Connect to device
client = ModbusTcpClient('192.168.1.100', port=502)
if client.connect():
print("✓ Connected to device")
# Read temperature (assuming it's at register 100)
result = client.read_holding_registers(address=100, count=1)
if not result.isError():
# Convert register value to temperature
# Many devices store temperature * 10
temperature = result.registers[0] / 10.0
print(f"Temperature: {temperature}°C")
else:
print(f"Error: {result}")
client.close()
else:
print("✗ Failed to connect")
if __name__ == "__main__":
read_temperature()
Example 2: Control a Relay
#!/usr/bin/env python3
"""Control a Modbus relay/coil."""
from pymodbus.client import ModbusTcpClient
import time
def control_relay():
client = ModbusTcpClient('192.168.1.100')
if client.connect():
print("✓ Connected to relay controller")
# Turn relay ON (coil 0)
client.write_coil(address=0, value=True)
print("Relay ON")
time.sleep(2)
# Turn relay OFF
client.write_coil(address=0, value=False)
print("Relay OFF")
# Read relay status
result = client.read_coils(address=0, count=1)
if not result.isError():
status = "ON" if result.bits[0] else "OFF"
print(f"Relay status: {status}")
client.close()
else:
print("✗ Failed to connect")
if __name__ == "__main__":
control_relay()
Example 3: Read and Write Multiple Values
#!/usr/bin/env python3
"""Read and write multiple Modbus values."""
from pymodbus.client import ModbusTcpClient
def read_write_example():
client = ModbusTcpClient('192.168.1.100')
if client.connect():
print("✓ Connected successfully\n")
# Read multiple holding registers
print("Reading 5 registers from address 0:")
result = client.read_holding_registers(address=0, count=5)
if not result.isError():
for i, value in enumerate(result.registers):
print(f" Register {i}: {value}")
print("\nWriting values to registers:")
# Write multiple registers
values = [100, 200, 300, 400, 500]
client.write_registers(address=10, values=values)
print(f" Wrote {values} to addresses 10-14")
# Read them back to verify
print("\nVerifying written values:")
result = client.read_holding_registers(address=10, count=5)
if not result.isError():
for i, value in enumerate(result.registers):
print(f" Register {10+i}: {value}")
client.close()
else:
print("✗ Connection failed")
if __name__ == "__main__":
read_write_example()
Modbus RTU (Serial) Quick Start
Serial Support: Install pyserial for RTU support: pip install pyserial
from pymodbus.client import ModbusSerialClient
# Create RTU client
client = ModbusSerialClient(
port='/dev/ttyUSB0', # Linux
# port='COM3', # Windows
baudrate=9600,
parity='N',
stopbits=1,
bytesize=8,
timeout=1
)
if client.connect():
# Read holding registers
result = client.read_holding_registers(
address=0,
count=10,
slave=1 # Slave ID
)
if not result.isError():
print(f"Data: {result.registers}")
client.close()
Async Client Quick Start
import asyncio
from pymodbus.client import AsyncModbusTcpClient
async def async_example():
"""Async client example for better performance."""
async with AsyncModbusTcpClient('192.168.1.100') as client:
# Read registers asynchronously
result = await client.read_holding_registers(
address=0,
count=10
)
if not result.isError():
print(f"Async read: {result.registers}")
# Write registers asynchronously
await client.write_registers(address=10, values=[1, 2, 3, 4, 5])
print("Async write complete")
# Run async function
asyncio.run(async_example())
Common Operations Reference
Reading Operations
# Read coils (discrete outputs) - Function Code 01
result = client.read_coils(address=0, count=8)
bits = result.bits # List of boolean values
# Read discrete inputs - Function Code 02
result = client.read_discrete_inputs(address=0, count=8)
bits = result.bits
# Read holding registers - Function Code 03
result = client.read_holding_registers(address=0, count=10)
registers = result.registers # List of 16-bit values
# Read input registers - Function Code 04
result = client.read_input_registers(address=0, count=10)
registers = result.registers
Writing Operations
# Write single coil - Function Code 05
client.write_coil(address=0, value=True)
# Write single register - Function Code 06
client.write_register(address=0, value=12345)
# Write multiple coils - Function Code 15
client.write_coils(address=0, values=[True, False, True, True])
# Write multiple registers - Function Code 16
client.write_registers(address=0, values=[100, 200, 300])
Error Handling Basics
from pymodbus.exceptions import ModbusException
try:
client = ModbusTcpClient('192.168.1.100')
if not client.connect():
raise Exception("Connection failed")
result = client.read_holding_registers(0, 10)
if result.isError():
# Handle Modbus errors
print(f"Modbus error: {result}")
else:
# Process successful result
print(f"Success: {result.registers}")
except ModbusException as e:
print(f"Modbus exception: {e}")
except Exception as e:
print(f"General error: {e}")
finally:
if client:
client.close()
Quick Debugging Tips
1. Test Connection
client = ModbusTcpClient('192.168.1.100', port=502)
if client.connect():
print("✓ Connection successful")
client.close()
else:
print("✗ Connection failed - check IP and port")
2. Check Slave ID
# Try different slave IDs if communication fails
for slave_id in [0, 1, 247]:
result = client.read_holding_registers(
address=0,
count=1,
slave=slave_id
)
if not result.isError():
print(f"✓ Slave ID {slave_id} works")
break
3. Verify Register Address
# Some devices use different addressing
# Try both 0-based and 1-based addressing
addresses_to_try = [0, 1, 40000, 40001]
for addr in addresses_to_try:
result = client.read_holding_registers(addr, 1)
if not result.isError():
print(f"✓ Address {addr} works")
break
Project Structure Template
my_modbus_project/
├── config.py # Configuration settings
├── client.py # Modbus client wrapper
├── devices/
│ ├── __init__.py
│ ├── sensor.py # Sensor device class
│ └── controller.py # Controller device class
├── utils.py # Helper functions
└── main.py # Main application
Sample config.py
:
# config.py
MODBUS_CONFIG = {
'host': '192.168.1.100',
'port': 502,
'timeout': 3,
'retry_count': 3,
'slave_id': 1
}
REGISTER_MAP = {
'temperature': 100,
'humidity': 101,
'pressure': 102,
'status': 200
}
Sample client.py
:
# client.py
from pymodbus.client import ModbusTcpClient
from config import MODBUS_CONFIG
class ModbusClient:
def __init__(self):
self.client = ModbusTcpClient(
MODBUS_CONFIG['host'],
port=MODBUS_CONFIG['port']
)
self.slave_id = MODBUS_CONFIG['slave_id']
def connect(self):
return self.client.connect()
def disconnect(self):
self.client.close()
def read_register(self, address, count=1):
result = self.client.read_holding_registers(
address=address,
count=count,
slave=self.slave_id
)
if not result.isError():
return result.registers
return None
Next Steps
Now that you've got the basics, explore:
How is this guide?