Writing to Modbus Registers
Write data to Modbus devices - single and multiple registers, coils, data type conversion.
Writing to Modbus Registers
Write data to PLCs, controllers, and other Modbus devices. Cover single writes, multiple writes, and data type conversion.
Write Single Register
from pymodbus.client import ModbusTcpClient
client = ModbusTcpClient('192.168.1.100')
client.connect()
# Write value 1234 to register 100
client.write_register(address=100, value=1234, slave=1)
client.close()Write Multiple Registers
# Write multiple values starting at address 100
values = [100, 200, 300, 400, 500]
client.write_registers(address=100, values=values, slave=1)
# Verify by reading back
result = client.read_holding_registers(100, 5, slave=1)
print(f"Written values: {result.registers}")Write Coils (Digital Outputs)
# Write single coil
client.write_coil(address=0, value=True, slave=1)
# Write multiple coils
coil_values = [True, False, True, True, False, False, True, False]
client.write_coils(address=0, values=coil_values, slave=1)
# Read back to verify
result = client.read_coils(0, 8, slave=1)
print(f"Coil states: {result.bits[:8]}")Writing Different Data Types
32-bit Integer
from pymodbus.payload import BinaryPayloadBuilder
from pymodbus.constants import Endian
def write_int32(client, address, value):
"""Write 32-bit integer to 2 registers."""
builder = BinaryPayloadBuilder(
byteorder=Endian.BIG,
wordorder=Endian.BIG
)
builder.add_32bit_int(value)
registers = builder.to_registers()
client.write_registers(address, registers)
# Write value -123456
write_int32(client, address=100, value=-123456)Float Values
def write_float(client, address, value):
"""Write 32-bit float to 2 registers."""
builder = BinaryPayloadBuilder(
byteorder=Endian.BIG,
wordorder=Endian.BIG
)
builder.add_32bit_float(value)
registers = builder.to_registers()
client.write_registers(address, registers)
# Write temperature 23.5°C
write_float(client, address=100, value=23.5)String Values
def write_string(client, address, text, length=16):
"""Write string to multiple registers."""
builder = BinaryPayloadBuilder(
byteorder=Endian.BIG,
wordorder=Endian.BIG
)
# Pad string to fixed length
text = text[:length].ljust(length, '\x00')
builder.add_string(text)
registers = builder.to_registers()
client.write_registers(address, registers)
# Write device name
write_string(client, address=100, text="PUMP_01", length=16)Real Examples
Set Temperature Setpoint
def set_temperature_setpoint(client, temperature):
"""Set temperature controller setpoint."""
# Many controllers use scaled integers
# e.g., 23.5°C = 235 (scaled by 10)
scaled_value = int(temperature * 10)
# Write to setpoint register (device-specific address)
result = client.write_register(
address=1000, # Setpoint register
value=scaled_value,
slave=1
)
if not result.isError():
print(f"Setpoint set to {temperature}°C")
return True
else:
print(f"Failed to set setpoint: {result}")
return False
# Set to 23.5°C
set_temperature_setpoint(client, 23.5)Control Motor Speed
def set_motor_speed(client, speed_rpm):
"""Set VFD motor speed."""
# VFD expects frequency in Hz * 100
# Convert RPM to Hz (assuming 2-pole motor)
frequency_hz = speed_rpm / 60
scaled_freq = int(frequency_hz * 100)
# Write to frequency register
client.write_register(
address=8192, # Common VFD frequency register
value=scaled_freq,
slave=1
)
# Start motor (write to control word)
client.write_register(
address=8193, # Control word
value=0x047F, # Start command
slave=1
)
# Set motor to 1500 RPM
set_motor_speed(client, 1500)Configure PLC Timer
def set_timer(client, timer_id, seconds):
"""Configure PLC timer preset value."""
# Convert seconds to milliseconds
ms_value = int(seconds * 1000)
# Timer preset addresses often follow pattern
timer_address = 4000 + (timer_id * 3) # T0=4000, T1=4003, etc.
# Write timer preset (often 32-bit)
builder = BinaryPayloadBuilder(
byteorder=Endian.BIG,
wordorder=Endian.BIG
)
builder.add_32bit_uint(ms_value)
registers = builder.to_registers()
client.write_registers(timer_address, registers)
print(f"Timer T{timer_id} set to {seconds} seconds")
# Set Timer 0 to 30 seconds
set_timer(client, timer_id=0, seconds=30)Write with Validation
def write_and_verify(client, address, value, slave=1):
"""Write register and verify it was written correctly."""
# Write the value
write_result = client.write_register(address, value, slave)
if write_result.isError():
print(f"Write failed: {write_result}")
return False
# Read back to verify
read_result = client.read_holding_registers(address, 1, slave)
if read_result.isError():
print(f"Read verification failed: {read_result}")
return False
actual_value = read_result.registers[0]
if actual_value == value:
print(f"✓ Successfully wrote {value} to address {address}")
return True
else:
print(f"✗ Write failed! Expected {value}, got {actual_value}")
return False
# Use it
write_and_verify(client, address=100, value=12345)Batch Writing
def batch_write_configs(client, configs):
"""Write multiple configuration values efficiently."""
for address, value in configs.items():
result = client.write_register(address, value)
if result.isError():
print(f"Failed to write address {address}: {result}")
return False
return True
# Configure multiple parameters
configs = {
1000: 235, # Temperature setpoint: 23.5°C
1001: 750, # Pressure setpoint: 75.0 bar
1002: 1500, # Flow setpoint: 1500 L/h
1003: 60, # Time setpoint: 60 seconds
}
batch_write_configs(client, configs)Async Writing
import asyncio
from pymodbus.client import AsyncModbusTcpClient
async def async_write_registers():
async with AsyncModbusTcpClient('192.168.1.100') as client:
# Write multiple operations concurrently
tasks = [
client.write_register(100, 123),
client.write_register(101, 456),
client.write_register(102, 789),
]
results = await asyncio.gather(*tasks)
for i, result in enumerate(results):
if not result.isError():
print(f"Register {100+i} written successfully")
asyncio.run(async_write_registers())Error Handling
from pymodbus.exceptions import ModbusException
def safe_write(client, address, value, max_retries=3):
"""Write with retry logic."""
for attempt in range(max_retries):
try:
result = client.write_register(address, value)
if not result.isError():
return True
print(f"Attempt {attempt + 1} failed: {result}")
# Check specific exception codes
if hasattr(result, 'exception_code'):
if result.exception_code == 2: # Illegal address
print(f"Address {address} doesn't exist")
return False
elif result.exception_code == 3: # Illegal value
print(f"Value {value} out of range")
return False
except ModbusException as e:
print(f"Modbus exception: {e}")
if attempt < max_retries - 1:
time.sleep(0.5) # Wait before retry
return FalseWrite Protection
Some devices have write protection:
def unlock_and_write(client, password_register, password, address, value):
"""Unlock device before writing."""
# Write password to unlock
client.write_register(password_register, password)
time.sleep(0.1) # Wait for unlock
# Write actual value
result = client.write_register(address, value)
# Lock again (write 0 to password register)
client.write_register(password_register, 0)
return not result.isError()
# Example: unlock with password 1234, then write
unlock_and_write(client, 9999, 1234, 100, 500)Safety: Always validate values before writing to production equipment. Wrong values can damage equipment or cause safety issues.
Common Issues
Value Out of Range
# Register is 16-bit (0-65535)
# This will fail or wrap around
client.write_register(100, 70000) # Too big!
# Fix: Check limits
value = min(65535, max(0, your_value))
client.write_register(100, value)Wrong Data Type
# Device expects scaled integer, not float
# WRONG
client.write_register(100, 23.5) # Will truncate to 23
# RIGHT
scaled = int(23.5 * 10) # Scale by 10
client.write_register(100, scaled)Byte Order Issues
# If device shows wrong values, try different byte order
builder = BinaryPayloadBuilder(
byteorder=Endian.LITTLE, # Try BIG if wrong
wordorder=Endian.LITTLE # Try BIG if wrong
)Next Steps
- Data Types - Convert complex data types
- Error Handling - Handle write failures
- Server Development - Build your own Modbus server
How is this guide?