PyModbus
PyModbusDocs

Asynchronous Client Operations

Build high-performance async Modbus clients with asyncio - concurrent operations, performance optimization.

Async Modbus Client

Use Python's asyncio for high-performance, concurrent Modbus operations.

Basic Async Client

import asyncio
from pymodbus.client import AsyncModbusTcpClient

async def read_data():
    """Basic async read operation."""
    async with AsyncModbusTcpClient('192.168.1.100') as client:
        result = await client.read_holding_registers(0, 10, slave=1)
        if not result.isError():
            print(f"Data: {result.registers}")
        return result.registers

# Run async function
data = asyncio.run(read_data())

Concurrent Operations

async def read_multiple_devices():
    """Read from multiple devices concurrently."""
    devices = [
        ('192.168.1.10', 1),
        ('192.168.1.11', 1),
        ('192.168.1.12', 1),
    ]
    
    async def read_device(host, slave):
        async with AsyncModbusTcpClient(host) as client:
            result = await client.read_holding_registers(0, 10, slave)
            return host, result.registers if not result.isError() else None
    
    # Run all reads concurrently
    tasks = [read_device(host, slave) for host, slave in devices]
    results = await asyncio.gather(*tasks)
    
    for host, data in results:
        print(f"{host}: {data}")
    
    return results

# Run
asyncio.run(read_multiple_devices())

Async Serial RTU

from pymodbus.client import AsyncModbusSerialClient

async def async_rtu_example():
    """Async RTU client example."""
    client = AsyncModbusSerialClient(
        port='/dev/ttyUSB0',
        baudrate=9600,
        timeout=1
    )
    
    await client.connect()
    
    # Read from multiple slaves
    for slave_id in range(1, 5):
        result = await client.read_holding_registers(0, 10, slave=slave_id)
        if not result.isError():
            print(f"Slave {slave_id}: {result.registers}")
    
    client.close()

asyncio.run(async_rtu_example())

Continuous Monitoring

class AsyncModbusMonitor:
    """Continuously monitor Modbus devices."""
    
    def __init__(self, host, interval=1.0):
        self.host = host
        self.interval = interval
        self.running = False
        self.client = None
    
    async def start(self):
        """Start monitoring."""
        self.client = AsyncModbusTcpClient(self.host)
        await self.client.connect()
        self.running = True
        
        while self.running:
            try:
                # Read data
                result = await self.client.read_holding_registers(0, 10)
                if not result.isError():
                    await self.process_data(result.registers)
                
                # Wait before next read
                await asyncio.sleep(self.interval)
                
            except Exception as e:
                print(f"Error: {e}")
                await asyncio.sleep(5)  # Wait longer on error
    
    async def process_data(self, data):
        """Process received data."""
        print(f"Data: {data}")
        # Add your processing here
    
    async def stop(self):
        """Stop monitoring."""
        self.running = False
        if self.client:
            self.client.close()

# Use monitor
async def main():
    monitor = AsyncModbusMonitor('192.168.1.100')
    
    # Run for 10 seconds
    task = asyncio.create_task(monitor.start())
    await asyncio.sleep(10)
    await monitor.stop()

asyncio.run(main())

Queue-Based Processing

import asyncio
from asyncio import Queue

class AsyncModbusQueue:
    """Queue-based async Modbus processor."""
    
    def __init__(self, host):
        self.host = host
        self.request_queue = Queue()
        self.result_queue = Queue()
    
    async def worker(self):
        """Process requests from queue."""
        async with AsyncModbusTcpClient(self.host) as client:
            while True:
                # Get request from queue
                request = await self.request_queue.get()
                
                if request is None:  # Shutdown signal
                    break
                
                func, address, count, slave = request
                
                # Execute request
                if func == 'read_holding':
                    result = await client.read_holding_registers(
                        address, count, slave
                    )
                elif func == 'write_register':
                    result = await client.write_register(
                        address, count, slave  # count is value here
                    )
                
                # Put result in queue
                await self.result_queue.put(result)
    
    async def read_holding(self, address, count, slave=1):
        """Queue a read request."""
        await self.request_queue.put(
            ('read_holding', address, count, slave)
        )
        return await self.result_queue.get()
    
    async def write_register(self, address, value, slave=1):
        """Queue a write request."""
        await self.request_queue.put(
            ('write_register', address, value, slave)
        )
        return await self.result_queue.get()

# Use queue
async def main():
    queue = AsyncModbusQueue('192.168.1.100')
    
    # Start worker
    worker_task = asyncio.create_task(queue.worker())
    
    # Queue multiple requests
    tasks = [
        queue.read_holding(0, 10),
        queue.read_holding(100, 5),
        queue.write_register(200, 123),
    ]
    
    results = await asyncio.gather(*tasks)
    
    # Shutdown
    await queue.request_queue.put(None)
    await worker_task

asyncio.run(main())

Rate Limiting

class RateLimitedClient:
    """Async client with rate limiting."""
    
    def __init__(self, host, max_requests_per_second=10):
        self.host = host
        self.max_rps = max_requests_per_second
        self.min_interval = 1.0 / max_requests_per_second
        self.last_request = 0
        self.client = None
    
    async def _rate_limit(self):
        """Enforce rate limit."""
        now = asyncio.get_event_loop().time()
        time_since_last = now - self.last_request
        
        if time_since_last < self.min_interval:
            await asyncio.sleep(self.min_interval - time_since_last)
        
        self.last_request = asyncio.get_event_loop().time()
    
    async def read(self, address, count, slave=1):
        """Rate-limited read."""
        await self._rate_limit()
        
        if not self.client:
            self.client = AsyncModbusTcpClient(self.host)
            await self.client.connect()
        
        return await self.client.read_holding_registers(address, count, slave)

# Use rate-limited client
async def main():
    client = RateLimitedClient('192.168.1.100', max_requests_per_second=5)
    
    # These will be rate-limited to 5 per second
    for i in range(20):
        result = await client.read(0, 10)
        print(f"Request {i+1}: {result.registers[:3]}...")

asyncio.run(main())

Error Handling

async def robust_async_read(host, address, count, max_retries=3):
    """Async read with retry logic."""
    for attempt in range(max_retries):
        try:
            async with AsyncModbusTcpClient(
                host,
                timeout=5
            ) as client:
                result = await asyncio.wait_for(
                    client.read_holding_registers(address, count),
                    timeout=3.0
                )
                
                if not result.isError():
                    return result.registers
                
                print(f"Attempt {attempt + 1} error: {result}")
                
        except asyncio.TimeoutError:
            print(f"Attempt {attempt + 1} timeout")
        except Exception as e:
            print(f"Attempt {attempt + 1} exception: {e}")
        
        if attempt < max_retries - 1:
            await asyncio.sleep(1)  # Wait before retry
    
    return None

Performance Comparison

import time

async def benchmark_async(host, num_requests=100):
    """Benchmark async performance."""
    start = time.time()
    
    async with AsyncModbusTcpClient(host) as client:
        tasks = [
            client.read_holding_registers(0, 10)
            for _ in range(num_requests)
        ]
        results = await asyncio.gather(*tasks)
    
    elapsed = time.time() - start
    successful = sum(1 for r in results if not r.isError())
    
    print(f"Async: {num_requests} requests in {elapsed:.2f}s")
    print(f"  {num_requests/elapsed:.1f} req/s")
    print(f"  {successful}/{num_requests} successful")

def benchmark_sync(host, num_requests=100):
    """Benchmark sync performance."""
    from pymodbus.client import ModbusTcpClient
    
    start = time.time()
    client = ModbusTcpClient(host)
    client.connect()
    
    successful = 0
    for _ in range(num_requests):
        result = client.read_holding_registers(0, 10)
        if not result.isError():
            successful += 1
    
    client.close()
    elapsed = time.time() - start
    
    print(f"Sync: {num_requests} requests in {elapsed:.2f}s")
    print(f"  {num_requests/elapsed:.1f} req/s")
    print(f"  {successful}/{num_requests} successful")

# Compare performance
asyncio.run(benchmark_async('192.168.1.100'))
benchmark_sync('192.168.1.100')

WebSocket Bridge

import websockets
import json

class ModbusWebSocketBridge:
    """Bridge Modbus to WebSocket for real-time web apps."""
    
    def __init__(self, modbus_host, ws_port=8765):
        self.modbus_host = modbus_host
        self.ws_port = ws_port
        self.clients = set()
    
    async def handle_client(self, websocket, path):
        """Handle WebSocket client."""
        self.clients.add(websocket)
        try:
            async for message in websocket:
                data = json.loads(message)
                
                if data['action'] == 'read':
                    result = await self.read_modbus(
                        data['address'],
                        data['count']
                    )
                    await websocket.send(json.dumps({
                        'type': 'data',
                        'values': result
                    }))
                    
        finally:
            self.clients.remove(websocket)
    
    async def read_modbus(self, address, count):
        """Read from Modbus device."""
        async with AsyncModbusTcpClient(self.modbus_host) as client:
            result = await client.read_holding_registers(address, count)
            return result.registers if not result.isError() else None
    
    async def broadcast_updates(self):
        """Broadcast Modbus data to all clients."""
        while True:
            data = await self.read_modbus(0, 10)
            
            if data and self.clients:
                message = json.dumps({
                    'type': 'update',
                    'values': data
                })
                
                # Send to all connected clients
                await asyncio.gather(
                    *[client.send(message) for client in self.clients],
                    return_exceptions=True
                )
            
            await asyncio.sleep(1)
    
    async def start(self):
        """Start WebSocket server."""
        server = await websockets.serve(
            self.handle_client,
            'localhost',
            self.ws_port
        )
        
        # Run server and broadcast concurrently
        await asyncio.gather(
            server.wait_closed(),
            self.broadcast_updates()
        )

# Run bridge
# bridge = ModbusWebSocketBridge('192.168.1.100')
# asyncio.run(bridge.start())

Async is best for: multiple devices, high-frequency polling, web applications, concurrent operations.

Best Practices

  1. Use connection pooling for multiple devices
  2. Implement rate limiting to avoid overwhelming devices
  3. Handle timeouts with asyncio.wait_for()
  4. Use queues for request/response patterns
  5. Close connections properly in finally blocks

Next Steps

How is this guide?