Asynchronous Client Operations
Build high-performance async Modbus clients with asyncio - concurrent operations, performance optimization.
Async Modbus Client
Use Python's asyncio for high-performance, concurrent Modbus operations.
Basic Async Client
import asyncio
from pymodbus.client import AsyncModbusTcpClient
async def read_data():
"""Basic async read operation."""
async with AsyncModbusTcpClient('192.168.1.100') as client:
result = await client.read_holding_registers(0, 10, slave=1)
if not result.isError():
print(f"Data: {result.registers}")
return result.registers
# Run async function
data = asyncio.run(read_data())
Concurrent Operations
async def read_multiple_devices():
"""Read from multiple devices concurrently."""
devices = [
('192.168.1.10', 1),
('192.168.1.11', 1),
('192.168.1.12', 1),
]
async def read_device(host, slave):
async with AsyncModbusTcpClient(host) as client:
result = await client.read_holding_registers(0, 10, slave)
return host, result.registers if not result.isError() else None
# Run all reads concurrently
tasks = [read_device(host, slave) for host, slave in devices]
results = await asyncio.gather(*tasks)
for host, data in results:
print(f"{host}: {data}")
return results
# Run
asyncio.run(read_multiple_devices())
Async Serial RTU
from pymodbus.client import AsyncModbusSerialClient
async def async_rtu_example():
"""Async RTU client example."""
client = AsyncModbusSerialClient(
port='/dev/ttyUSB0',
baudrate=9600,
timeout=1
)
await client.connect()
# Read from multiple slaves
for slave_id in range(1, 5):
result = await client.read_holding_registers(0, 10, slave=slave_id)
if not result.isError():
print(f"Slave {slave_id}: {result.registers}")
client.close()
asyncio.run(async_rtu_example())
Continuous Monitoring
class AsyncModbusMonitor:
"""Continuously monitor Modbus devices."""
def __init__(self, host, interval=1.0):
self.host = host
self.interval = interval
self.running = False
self.client = None
async def start(self):
"""Start monitoring."""
self.client = AsyncModbusTcpClient(self.host)
await self.client.connect()
self.running = True
while self.running:
try:
# Read data
result = await self.client.read_holding_registers(0, 10)
if not result.isError():
await self.process_data(result.registers)
# Wait before next read
await asyncio.sleep(self.interval)
except Exception as e:
print(f"Error: {e}")
await asyncio.sleep(5) # Wait longer on error
async def process_data(self, data):
"""Process received data."""
print(f"Data: {data}")
# Add your processing here
async def stop(self):
"""Stop monitoring."""
self.running = False
if self.client:
self.client.close()
# Use monitor
async def main():
monitor = AsyncModbusMonitor('192.168.1.100')
# Run for 10 seconds
task = asyncio.create_task(monitor.start())
await asyncio.sleep(10)
await monitor.stop()
asyncio.run(main())
Queue-Based Processing
import asyncio
from asyncio import Queue
class AsyncModbusQueue:
"""Queue-based async Modbus processor."""
def __init__(self, host):
self.host = host
self.request_queue = Queue()
self.result_queue = Queue()
async def worker(self):
"""Process requests from queue."""
async with AsyncModbusTcpClient(self.host) as client:
while True:
# Get request from queue
request = await self.request_queue.get()
if request is None: # Shutdown signal
break
func, address, count, slave = request
# Execute request
if func == 'read_holding':
result = await client.read_holding_registers(
address, count, slave
)
elif func == 'write_register':
result = await client.write_register(
address, count, slave # count is value here
)
# Put result in queue
await self.result_queue.put(result)
async def read_holding(self, address, count, slave=1):
"""Queue a read request."""
await self.request_queue.put(
('read_holding', address, count, slave)
)
return await self.result_queue.get()
async def write_register(self, address, value, slave=1):
"""Queue a write request."""
await self.request_queue.put(
('write_register', address, value, slave)
)
return await self.result_queue.get()
# Use queue
async def main():
queue = AsyncModbusQueue('192.168.1.100')
# Start worker
worker_task = asyncio.create_task(queue.worker())
# Queue multiple requests
tasks = [
queue.read_holding(0, 10),
queue.read_holding(100, 5),
queue.write_register(200, 123),
]
results = await asyncio.gather(*tasks)
# Shutdown
await queue.request_queue.put(None)
await worker_task
asyncio.run(main())
Rate Limiting
class RateLimitedClient:
"""Async client with rate limiting."""
def __init__(self, host, max_requests_per_second=10):
self.host = host
self.max_rps = max_requests_per_second
self.min_interval = 1.0 / max_requests_per_second
self.last_request = 0
self.client = None
async def _rate_limit(self):
"""Enforce rate limit."""
now = asyncio.get_event_loop().time()
time_since_last = now - self.last_request
if time_since_last < self.min_interval:
await asyncio.sleep(self.min_interval - time_since_last)
self.last_request = asyncio.get_event_loop().time()
async def read(self, address, count, slave=1):
"""Rate-limited read."""
await self._rate_limit()
if not self.client:
self.client = AsyncModbusTcpClient(self.host)
await self.client.connect()
return await self.client.read_holding_registers(address, count, slave)
# Use rate-limited client
async def main():
client = RateLimitedClient('192.168.1.100', max_requests_per_second=5)
# These will be rate-limited to 5 per second
for i in range(20):
result = await client.read(0, 10)
print(f"Request {i+1}: {result.registers[:3]}...")
asyncio.run(main())
Error Handling
async def robust_async_read(host, address, count, max_retries=3):
"""Async read with retry logic."""
for attempt in range(max_retries):
try:
async with AsyncModbusTcpClient(
host,
timeout=5
) as client:
result = await asyncio.wait_for(
client.read_holding_registers(address, count),
timeout=3.0
)
if not result.isError():
return result.registers
print(f"Attempt {attempt + 1} error: {result}")
except asyncio.TimeoutError:
print(f"Attempt {attempt + 1} timeout")
except Exception as e:
print(f"Attempt {attempt + 1} exception: {e}")
if attempt < max_retries - 1:
await asyncio.sleep(1) # Wait before retry
return None
Performance Comparison
import time
async def benchmark_async(host, num_requests=100):
"""Benchmark async performance."""
start = time.time()
async with AsyncModbusTcpClient(host) as client:
tasks = [
client.read_holding_registers(0, 10)
for _ in range(num_requests)
]
results = await asyncio.gather(*tasks)
elapsed = time.time() - start
successful = sum(1 for r in results if not r.isError())
print(f"Async: {num_requests} requests in {elapsed:.2f}s")
print(f" {num_requests/elapsed:.1f} req/s")
print(f" {successful}/{num_requests} successful")
def benchmark_sync(host, num_requests=100):
"""Benchmark sync performance."""
from pymodbus.client import ModbusTcpClient
start = time.time()
client = ModbusTcpClient(host)
client.connect()
successful = 0
for _ in range(num_requests):
result = client.read_holding_registers(0, 10)
if not result.isError():
successful += 1
client.close()
elapsed = time.time() - start
print(f"Sync: {num_requests} requests in {elapsed:.2f}s")
print(f" {num_requests/elapsed:.1f} req/s")
print(f" {successful}/{num_requests} successful")
# Compare performance
asyncio.run(benchmark_async('192.168.1.100'))
benchmark_sync('192.168.1.100')
WebSocket Bridge
import websockets
import json
class ModbusWebSocketBridge:
"""Bridge Modbus to WebSocket for real-time web apps."""
def __init__(self, modbus_host, ws_port=8765):
self.modbus_host = modbus_host
self.ws_port = ws_port
self.clients = set()
async def handle_client(self, websocket, path):
"""Handle WebSocket client."""
self.clients.add(websocket)
try:
async for message in websocket:
data = json.loads(message)
if data['action'] == 'read':
result = await self.read_modbus(
data['address'],
data['count']
)
await websocket.send(json.dumps({
'type': 'data',
'values': result
}))
finally:
self.clients.remove(websocket)
async def read_modbus(self, address, count):
"""Read from Modbus device."""
async with AsyncModbusTcpClient(self.modbus_host) as client:
result = await client.read_holding_registers(address, count)
return result.registers if not result.isError() else None
async def broadcast_updates(self):
"""Broadcast Modbus data to all clients."""
while True:
data = await self.read_modbus(0, 10)
if data and self.clients:
message = json.dumps({
'type': 'update',
'values': data
})
# Send to all connected clients
await asyncio.gather(
*[client.send(message) for client in self.clients],
return_exceptions=True
)
await asyncio.sleep(1)
async def start(self):
"""Start WebSocket server."""
server = await websockets.serve(
self.handle_client,
'localhost',
self.ws_port
)
# Run server and broadcast concurrently
await asyncio.gather(
server.wait_closed(),
self.broadcast_updates()
)
# Run bridge
# bridge = ModbusWebSocketBridge('192.168.1.100')
# asyncio.run(bridge.start())
Async is best for: multiple devices, high-frequency polling, web applications, concurrent operations.
Best Practices
- Use connection pooling for multiple devices
- Implement rate limiting to avoid overwhelming devices
- Handle timeouts with asyncio.wait_for()
- Use queues for request/response patterns
- Close connections properly in finally blocks
Next Steps
- Error Handling - Handle async errors
- TCP Server - Build async servers
- Performance - Optimize async code
How is this guide?