Working with Data Types in PyModbus
Convert between Modbus registers and Python data types with PyModbus. Handle integers, floats, strings, and bit manipulation.
Data Types in PyModbus
Modbus only knows 16-bit registers and single bits. Here's how to work with real data types.
Basic Conversions
16-bit Integer (Single Register)
# Read single register as unsigned int (0-65535)
result = client.read_holding_registers(100, 1)
value = result.registers[0]
# Convert to signed int (-32768 to 32767)
if value > 32767:
signed_value = value - 65536
else:
signed_value = value
# Or use struct
import struct
bytes_val = struct.pack('>H', value)
signed = struct.unpack('>h', bytes_val)[0]32-bit Integer (Two Registers)
from pymodbus.payload import BinaryPayloadDecoder, BinaryPayloadBuilder
from pymodbus.constants import Endian
# Read 32-bit integer
result = client.read_holding_registers(100, 2)
decoder = BinaryPayloadDecoder.fromRegisters(
result.registers,
byteorder=Endian.BIG,
wordorder=Endian.BIG
)
value = decoder.decode_32bit_int() # or decode_32bit_uint() for unsigned
# Write 32-bit integer
builder = BinaryPayloadBuilder(
byteorder=Endian.BIG,
wordorder=Endian.BIG
)
builder.add_32bit_int(-123456)
registers = builder.to_registers()
client.write_registers(100, registers)Float (32-bit IEEE 754)
# Read float
result = client.read_holding_registers(100, 2)
decoder = BinaryPayloadDecoder.fromRegisters(
result.registers,
byteorder=Endian.BIG,
wordorder=Endian.BIG
)
float_value = decoder.decode_32bit_float()
# Write float
builder = BinaryPayloadBuilder(
byteorder=Endian.BIG,
wordorder=Endian.BIG
)
builder.add_32bit_float(3.14159)
registers = builder.to_registers()
client.write_registers(100, registers)Double (64-bit Float)
# Read double (4 registers)
result = client.read_holding_registers(100, 4)
decoder = BinaryPayloadDecoder.fromRegisters(
result.registers,
byteorder=Endian.BIG,
wordorder=Endian.BIG
)
double_value = decoder.decode_64bit_float()
# Write double
builder = BinaryPayloadBuilder(
byteorder=Endian.BIG,
wordorder=Endian.BIG
)
builder.add_64bit_float(3.141592653589793)
registers = builder.to_registers()
client.write_registers(100, registers)Strings
# Read string (8 registers = 16 characters)
result = client.read_holding_registers(100, 8)
decoder = BinaryPayloadDecoder.fromRegisters(
result.registers,
byteorder=Endian.BIG
)
string_value = decoder.decode_string(16).decode('ascii').strip('\x00')
# Write string
builder = BinaryPayloadBuilder(
byteorder=Endian.BIG
)
text = "HELLO WORLD"
# Pad to fixed length
text = text.ljust(16, '\x00')
builder.add_string(text)
registers = builder.to_registers()
client.write_registers(100, registers)Bits (From Registers)
# Read register and extract bits
result = client.read_holding_registers(100, 1)
value = result.registers[0]
# Extract individual bits
bit_0 = bool(value & 0x0001)
bit_1 = bool(value & 0x0002)
bit_2 = bool(value & 0x0004)
bit_7 = bool(value & 0x0080)
bit_15 = bool(value & 0x8000)
# Extract all bits
bits = [(value >> i) & 1 for i in range(16)]
print(f"Bits: {bits}")
# Set specific bits
value = 0
value |= (1 << 0) # Set bit 0
value |= (1 << 5) # Set bit 5
value |= (1 << 15) # Set bit 15
client.write_register(100, value)Endianness (Byte Order)
Different devices use different byte orders. Try all combinations:
def try_all_endianness(registers):
"""Try all byte/word order combinations."""
from pymodbus.constants import Endian
combinations = [
(Endian.BIG, Endian.BIG, "Big-Big"),
(Endian.BIG, Endian.LITTLE, "Big-Little"),
(Endian.LITTLE, Endian.BIG, "Little-Big"),
(Endian.LITTLE, Endian.LITTLE, "Little-Little"),
]
for byte_order, word_order, name in combinations:
decoder = BinaryPayloadDecoder.fromRegisters(
registers,
byteorder=byte_order,
wordorder=word_order
)
value = decoder.decode_32bit_float()
print(f"{name}: {value}")
# Read and test
result = client.read_holding_registers(100, 2)
try_all_endianness(result.registers)Common Scaled Values
Many devices use scaled integers instead of floats:
# Temperature scaled by 10 (23.5°C = 235)
result = client.read_holding_registers(100, 1)
temperature = result.registers[0] / 10.0
# Pressure scaled by 100 (1.23 bar = 123)
result = client.read_holding_registers(101, 1)
pressure = result.registers[0] / 100.0
# Power in kilowatts (1234 = 1.234 kW)
result = client.read_holding_registers(102, 1)
power = result.registers[0] / 1000.0
# Write scaled values
temp_scaled = int(23.5 * 10) # 235
client.write_register(100, temp_scaled)BCD (Binary Coded Decimal)
Some devices use BCD encoding:
def bcd_to_int(bcd_value):
"""Convert BCD to integer."""
result = 0
multiplier = 1
while bcd_value > 0:
digit = bcd_value & 0x0F
result += digit * multiplier
multiplier *= 10
bcd_value >>= 4
return result
def int_to_bcd(value):
"""Convert integer to BCD."""
result = 0
shift = 0
while value > 0:
digit = value % 10
result |= (digit << shift)
shift += 4
value //= 10
return result
# Read BCD value
result = client.read_holding_registers(100, 1)
bcd = result.registers[0]
value = bcd_to_int(bcd)
print(f"BCD {bcd:04X} = {value}")
# Write BCD value
bcd = int_to_bcd(1234)
client.write_register(100, bcd)Custom Data Structures
from dataclasses import dataclass
from typing import List
@dataclass
class MotorData:
speed: float # RPM
current: float # Amps
temperature: float # Celsius
status: int # Status bits
@classmethod
def from_registers(cls, registers: List[int]):
"""Create from Modbus registers."""
decoder = BinaryPayloadDecoder.fromRegisters(
registers,
byteorder=Endian.BIG,
wordorder=Endian.BIG
)
return cls(
speed=decoder.decode_32bit_float(),
current=decoder.decode_32bit_float(),
temperature=decoder.decode_32bit_float(),
status=decoder.decode_16bit_uint()
)
def to_registers(self) -> List[int]:
"""Convert to Modbus registers."""
builder = BinaryPayloadBuilder(
byteorder=Endian.BIG,
wordorder=Endian.BIG
)
builder.add_32bit_float(self.speed)
builder.add_32bit_float(self.current)
builder.add_32bit_float(self.temperature)
builder.add_16bit_uint(self.status)
return builder.to_registers()
# Read motor data (7 registers total)
result = client.read_holding_registers(100, 7)
motor = MotorData.from_registers(result.registers)
print(f"Motor: Speed={motor.speed} RPM, Current={motor.current} A")
# Write motor data
motor = MotorData(speed=1500.0, current=2.5, temperature=45.0, status=1)
client.write_registers(100, motor.to_registers())Time and Date
from datetime import datetime
def read_datetime(client, address):
"""Read date/time from 3 registers (BCD format)."""
result = client.read_holding_registers(address, 3)
# Register 0: Year (BCD)
# Register 1: Month (high byte) and Day (low byte)
# Register 2: Hour (high byte) and Minute (low byte)
year = bcd_to_int(result.registers[0]) + 2000
month = bcd_to_int(result.registers[1] >> 8)
day = bcd_to_int(result.registers[1] & 0xFF)
hour = bcd_to_int(result.registers[2] >> 8)
minute = bcd_to_int(result.registers[2] & 0xFF)
return datetime(year, month, day, hour, minute)
def write_datetime(client, address, dt):
"""Write date/time to 3 registers (BCD format)."""
year_bcd = int_to_bcd(dt.year - 2000)
month_bcd = int_to_bcd(dt.month)
day_bcd = int_to_bcd(dt.day)
hour_bcd = int_to_bcd(dt.hour)
minute_bcd = int_to_bcd(dt.minute)
registers = [
year_bcd,
(month_bcd << 8) | day_bcd,
(hour_bcd << 8) | minute_bcd
]
client.write_registers(address, registers)
# Example usage
dt = read_datetime(client, 100)
print(f"Device time: {dt}")
write_datetime(client, 100, datetime.now())Array Handling
def read_float_array(client, address, count):
"""Read array of floats."""
# Each float is 2 registers
result = client.read_holding_registers(address, count * 2)
decoder = BinaryPayloadDecoder.fromRegisters(
result.registers,
byteorder=Endian.BIG,
wordorder=Endian.BIG
)
values = []
for _ in range(count):
values.append(decoder.decode_32bit_float())
return values
def write_float_array(client, address, values):
"""Write array of floats."""
builder = BinaryPayloadBuilder(
byteorder=Endian.BIG,
wordorder=Endian.BIG
)
for value in values:
builder.add_32bit_float(value)
client.write_registers(address, builder.to_registers())
# Read 10 temperature values
temperatures = read_float_array(client, 100, 10)
print(f"Temperatures: {temperatures}")
# Write setpoints
setpoints = [20.0, 21.5, 22.0, 23.5, 25.0]
write_float_array(client, 200, setpoints)Quick Reference
# All available decoders
decoder.decode_8bit_int()
decoder.decode_8bit_uint()
decoder.decode_16bit_int()
decoder.decode_16bit_uint()
decoder.decode_32bit_int()
decoder.decode_32bit_uint()
decoder.decode_64bit_int()
decoder.decode_64bit_uint()
decoder.decode_32bit_float()
decoder.decode_64bit_float()
decoder.decode_string(size)
decoder.decode_bits()
# All available builders
builder.add_8bit_int(value)
builder.add_8bit_uint(value)
builder.add_16bit_int(value)
builder.add_16bit_uint(value)
builder.add_32bit_int(value)
builder.add_32bit_uint(value)
builder.add_64bit_int(value)
builder.add_64bit_uint(value)
builder.add_32bit_float(value)
builder.add_64bit_float(value)
builder.add_string(value)
builder.add_bits(values)Always check device documentation for data type, scaling, and byte order. When in doubt, try all combinations.
Next Steps
- Error Handling - Handle conversion errors
- TCP Client - Read/write over Ethernet
- RTU Client - Read/write over serial