Working with Data Types in PyModbus
Convert between Modbus registers and Python data types - integers, floats, strings, bits.
Data Types in PyModbus
Modbus only knows 16-bit registers and single bits. Here's how to work with real data types.
Basic Conversions
16-bit Integer (Single Register)
# Read single register as unsigned int (0-65535)
result = client.read_holding_registers(100, 1)
value = result.registers[0]
# Convert to signed int (-32768 to 32767)
if value > 32767:
signed_value = value - 65536
else:
signed_value = value
# Or use struct
import struct
bytes_val = struct.pack('>H', value)
signed = struct.unpack('>h', bytes_val)[0]
32-bit Integer (Two Registers)
from pymodbus.payload import BinaryPayloadDecoder, BinaryPayloadBuilder
from pymodbus.constants import Endian
# Read 32-bit integer
result = client.read_holding_registers(100, 2)
decoder = BinaryPayloadDecoder.fromRegisters(
result.registers,
byteorder=Endian.BIG,
wordorder=Endian.BIG
)
value = decoder.decode_32bit_int() # or decode_32bit_uint() for unsigned
# Write 32-bit integer
builder = BinaryPayloadBuilder(
byteorder=Endian.BIG,
wordorder=Endian.BIG
)
builder.add_32bit_int(-123456)
registers = builder.to_registers()
client.write_registers(100, registers)
Float (32-bit IEEE 754)
# Read float
result = client.read_holding_registers(100, 2)
decoder = BinaryPayloadDecoder.fromRegisters(
result.registers,
byteorder=Endian.BIG,
wordorder=Endian.BIG
)
float_value = decoder.decode_32bit_float()
# Write float
builder = BinaryPayloadBuilder(
byteorder=Endian.BIG,
wordorder=Endian.BIG
)
builder.add_32bit_float(3.14159)
registers = builder.to_registers()
client.write_registers(100, registers)
Double (64-bit Float)
# Read double (4 registers)
result = client.read_holding_registers(100, 4)
decoder = BinaryPayloadDecoder.fromRegisters(
result.registers,
byteorder=Endian.BIG,
wordorder=Endian.BIG
)
double_value = decoder.decode_64bit_float()
# Write double
builder = BinaryPayloadBuilder(
byteorder=Endian.BIG,
wordorder=Endian.BIG
)
builder.add_64bit_float(3.141592653589793)
registers = builder.to_registers()
client.write_registers(100, registers)
Strings
# Read string (8 registers = 16 characters)
result = client.read_holding_registers(100, 8)
decoder = BinaryPayloadDecoder.fromRegisters(
result.registers,
byteorder=Endian.BIG
)
string_value = decoder.decode_string(16).decode('ascii').strip('\x00')
# Write string
builder = BinaryPayloadBuilder(
byteorder=Endian.BIG
)
text = "HELLO WORLD"
# Pad to fixed length
text = text.ljust(16, '\x00')
builder.add_string(text)
registers = builder.to_registers()
client.write_registers(100, registers)
Bits (From Registers)
# Read register and extract bits
result = client.read_holding_registers(100, 1)
value = result.registers[0]
# Extract individual bits
bit_0 = bool(value & 0x0001)
bit_1 = bool(value & 0x0002)
bit_2 = bool(value & 0x0004)
bit_7 = bool(value & 0x0080)
bit_15 = bool(value & 0x8000)
# Extract all bits
bits = [(value >> i) & 1 for i in range(16)]
print(f"Bits: {bits}")
# Set specific bits
value = 0
value |= (1 << 0) # Set bit 0
value |= (1 << 5) # Set bit 5
value |= (1 << 15) # Set bit 15
client.write_register(100, value)
Endianness (Byte Order)
Different devices use different byte orders. Try all combinations:
def try_all_endianness(registers):
"""Try all byte/word order combinations."""
from pymodbus.constants import Endian
combinations = [
(Endian.BIG, Endian.BIG, "Big-Big"),
(Endian.BIG, Endian.LITTLE, "Big-Little"),
(Endian.LITTLE, Endian.BIG, "Little-Big"),
(Endian.LITTLE, Endian.LITTLE, "Little-Little"),
]
for byte_order, word_order, name in combinations:
decoder = BinaryPayloadDecoder.fromRegisters(
registers,
byteorder=byte_order,
wordorder=word_order
)
value = decoder.decode_32bit_float()
print(f"{name}: {value}")
# Read and test
result = client.read_holding_registers(100, 2)
try_all_endianness(result.registers)
Common Scaled Values
Many devices use scaled integers instead of floats:
# Temperature scaled by 10 (23.5°C = 235)
result = client.read_holding_registers(100, 1)
temperature = result.registers[0] / 10.0
# Pressure scaled by 100 (1.23 bar = 123)
result = client.read_holding_registers(101, 1)
pressure = result.registers[0] / 100.0
# Power in kilowatts (1234 = 1.234 kW)
result = client.read_holding_registers(102, 1)
power = result.registers[0] / 1000.0
# Write scaled values
temp_scaled = int(23.5 * 10) # 235
client.write_register(100, temp_scaled)
BCD (Binary Coded Decimal)
Some devices use BCD encoding:
def bcd_to_int(bcd_value):
"""Convert BCD to integer."""
result = 0
multiplier = 1
while bcd_value > 0:
digit = bcd_value & 0x0F
result += digit * multiplier
multiplier *= 10
bcd_value >>= 4
return result
def int_to_bcd(value):
"""Convert integer to BCD."""
result = 0
shift = 0
while value > 0:
digit = value % 10
result |= (digit << shift)
shift += 4
value //= 10
return result
# Read BCD value
result = client.read_holding_registers(100, 1)
bcd = result.registers[0]
value = bcd_to_int(bcd)
print(f"BCD {bcd:04X} = {value}")
# Write BCD value
bcd = int_to_bcd(1234)
client.write_register(100, bcd)
Custom Data Structures
from dataclasses import dataclass
from typing import List
@dataclass
class MotorData:
speed: float # RPM
current: float # Amps
temperature: float # Celsius
status: int # Status bits
@classmethod
def from_registers(cls, registers: List[int]):
"""Create from Modbus registers."""
decoder = BinaryPayloadDecoder.fromRegisters(
registers,
byteorder=Endian.BIG,
wordorder=Endian.BIG
)
return cls(
speed=decoder.decode_32bit_float(),
current=decoder.decode_32bit_float(),
temperature=decoder.decode_32bit_float(),
status=decoder.decode_16bit_uint()
)
def to_registers(self) -> List[int]:
"""Convert to Modbus registers."""
builder = BinaryPayloadBuilder(
byteorder=Endian.BIG,
wordorder=Endian.BIG
)
builder.add_32bit_float(self.speed)
builder.add_32bit_float(self.current)
builder.add_32bit_float(self.temperature)
builder.add_16bit_uint(self.status)
return builder.to_registers()
# Read motor data (7 registers total)
result = client.read_holding_registers(100, 7)
motor = MotorData.from_registers(result.registers)
print(f"Motor: Speed={motor.speed} RPM, Current={motor.current} A")
# Write motor data
motor = MotorData(speed=1500.0, current=2.5, temperature=45.0, status=1)
client.write_registers(100, motor.to_registers())
Time and Date
from datetime import datetime
def read_datetime(client, address):
"""Read date/time from 3 registers (BCD format)."""
result = client.read_holding_registers(address, 3)
# Register 0: Year (BCD)
# Register 1: Month (high byte) and Day (low byte)
# Register 2: Hour (high byte) and Minute (low byte)
year = bcd_to_int(result.registers[0]) + 2000
month = bcd_to_int(result.registers[1] >> 8)
day = bcd_to_int(result.registers[1] & 0xFF)
hour = bcd_to_int(result.registers[2] >> 8)
minute = bcd_to_int(result.registers[2] & 0xFF)
return datetime(year, month, day, hour, minute)
def write_datetime(client, address, dt):
"""Write date/time to 3 registers (BCD format)."""
year_bcd = int_to_bcd(dt.year - 2000)
month_bcd = int_to_bcd(dt.month)
day_bcd = int_to_bcd(dt.day)
hour_bcd = int_to_bcd(dt.hour)
minute_bcd = int_to_bcd(dt.minute)
registers = [
year_bcd,
(month_bcd << 8) | day_bcd,
(hour_bcd << 8) | minute_bcd
]
client.write_registers(address, registers)
# Example usage
dt = read_datetime(client, 100)
print(f"Device time: {dt}")
write_datetime(client, 100, datetime.now())
Array Handling
def read_float_array(client, address, count):
"""Read array of floats."""
# Each float is 2 registers
result = client.read_holding_registers(address, count * 2)
decoder = BinaryPayloadDecoder.fromRegisters(
result.registers,
byteorder=Endian.BIG,
wordorder=Endian.BIG
)
values = []
for _ in range(count):
values.append(decoder.decode_32bit_float())
return values
def write_float_array(client, address, values):
"""Write array of floats."""
builder = BinaryPayloadBuilder(
byteorder=Endian.BIG,
wordorder=Endian.BIG
)
for value in values:
builder.add_32bit_float(value)
client.write_registers(address, builder.to_registers())
# Read 10 temperature values
temperatures = read_float_array(client, 100, 10)
print(f"Temperatures: {temperatures}")
# Write setpoints
setpoints = [20.0, 21.5, 22.0, 23.5, 25.0]
write_float_array(client, 200, setpoints)
Quick Reference
# All available decoders
decoder.decode_8bit_int()
decoder.decode_8bit_uint()
decoder.decode_16bit_int()
decoder.decode_16bit_uint()
decoder.decode_32bit_int()
decoder.decode_32bit_uint()
decoder.decode_64bit_int()
decoder.decode_64bit_uint()
decoder.decode_32bit_float()
decoder.decode_64bit_float()
decoder.decode_string(size)
decoder.decode_bits()
# All available builders
builder.add_8bit_int(value)
builder.add_8bit_uint(value)
builder.add_16bit_int(value)
builder.add_16bit_uint(value)
builder.add_32bit_int(value)
builder.add_32bit_uint(value)
builder.add_64bit_int(value)
builder.add_64bit_uint(value)
builder.add_32bit_float(value)
builder.add_64bit_float(value)
builder.add_string(value)
builder.add_bits(values)
Always check device documentation for data type, scaling, and byte order. When in doubt, try all combinations.
Next Steps
- Error Handling - Handle conversion errors
- TCP Client - Read/write over Ethernet
- RTU Client - Read/write over serial
How is this guide?