Skip to content

AsyncPyStrandsClient

The asynchronous client for PyStrands, using Python's asyncio for concurrent message handling.

Overview

AsyncPyStrandsClient is ideal for: - Modern async Python applications - Integration with async frameworks (FastAPI, Sanic, Tornado) - High-concurrency scenarios - Applications already using asyncio

import asyncio
from pystrands import AsyncPyStrandsClient

class MyBackend(AsyncPyStrandsClient):
    async def on_message(self, message, context):
        await self.send_room_message(context.room_id, f"echo: {message}")

client = MyBackend(host="localhost", port=8081)
asyncio.run(client.run_forever())

Constructor

AsyncPyStrandsClient.__init__

AsyncPyStrandsClient(
    host: str = "localhost",
    port: int = 8081,
    auto_reconnect: bool = True,
    reconnect_delay: float = 1.0,
    max_reconnect_delay: float = 30.0,
    reconnect_backoff: float = 2.0
)

Parameters:

Parameter Type Default Description
host str "localhost" Hostname or IP of the Go broker
port int 8081 TCP port for the Go broker
auto_reconnect bool True Whether to automatically reconnect on connection loss
reconnect_delay float 1.0 Initial delay between reconnection attempts (seconds)
max_reconnect_delay float 30.0 Maximum delay cap for reconnection (seconds)
reconnect_backoff float 2.0 Exponential backoff multiplier

Example:

client = AsyncPyStrandsClient(
    host="broker.example.com",
    port=8081,
    auto_reconnect=True,
    reconnect_delay=1.0,
    max_reconnect_delay=30.0,
    reconnect_backoff=2.0
)

Methods

connect()

Connect to the TCP server.

async def connect(self) -> bool

Returns: boolTrue if connection successful, False otherwise

Example:

client = AsyncPyStrandsClient()
if await client.connect():
    print("Connected successfully")
else:
    print("Connection failed")

disconnect()

Cleanly disconnect from the server. Waits for in-flight handlers to finish.

async def disconnect(self, timeout: float = 5.0)

Parameters:

Parameter Type Default Description
timeout float 5.0 Max seconds to wait for in-flight handlers before force-closing

Example:

await client.disconnect(timeout=10.0)

run_forever()

Connect to the server and block until disconnect. Use with asyncio.run() or await.

async def run_forever(self)

Example:

client = AsyncPyStrandsClient()
asyncio.run(client.run_forever())

Or within an existing async context:

async def main():
    client = AsyncPyStrandsClient()
    await client.run_forever()

asyncio.run(main())

Auto-connect

run_forever() automatically calls connect() if not already connected.


broadcast_message()

Broadcast a message to all connected WebSocket clients.

async def broadcast_message(self, message: str)

Parameters:

Parameter Type Description
message str The message to broadcast

Example:

async def on_message(self, message, context):
    if message == "!announce":
        await self.broadcast_message("Important system update!")

send_room_message()

Send a message to all clients in a specific room.

async def send_room_message(self, room_id: str, message: str)

Parameters:

Parameter Type Description
room_id str The room ID to send to
message str The message to send

Example:

async def on_message(self, message, context):
    # Echo to the same room
    await self.send_room_message(context.room_id, f"echo: {message}")

send_private_message()

Send a message to a specific client.

async def send_private_message(self, client_id: str, message: str)

Parameters:

Parameter Type Description
client_id str The target client's ID
message str The message to send

Example:

async def on_new_connection(self, context):
    # Welcome message to the new client only
    await self.send_private_message(
        context.client_id,
        f"Welcome to {context.room_id}!"
    )

Callbacks (Override These)

on_connection_request()

Handle new WebSocket connection requests. Return True to accept, False to reject.

async def on_connection_request(self, context: ConnectionRequestContext) -> bool

Parameters:

Parameter Type Description
context ConnectionRequestContext Contains headers, URL, remote_addr, and mutable context

Returns: boolTrue to accept, False to reject. Or modify context.accepted directly.

Example:

async def on_connection_request(self, request):
    # Async auth check (e.g., database lookup)
    user = await self.db.get_user_by_token(
        request.headers.get("Authorization", [""])[0]
    )

    if not user:
        return False

    request.context.room_id = request.url.strip("/")
    request.context.metadata = {"user_id": user.id}
    return True

Room Assignment

Always set request.context.room_id in this callback to route clients to the correct room.


on_new_connection()

Called when a WebSocket client successfully connects.

async def on_new_connection(self, context: Context)

Parameters:

Parameter Type Description
context Context Contains client_id, room_id, and metadata

Example:

async def on_new_connection(self, context):
    print(f"New client: {context.client_id} in room: {context.room_id}")
    await self.send_private_message(context.client_id, "Welcome!")

on_message()

Called when a client sends a message.

async def on_message(self, message: str, context: Context)

Parameters:

Parameter Type Description
message str The message content
context Context Contains client_id, room_id, and metadata

Example:

async def on_message(self, message, context):
    print(f"[{context.room_id}] {context.client_id}: {message}")

    # Process async (e.g., save to database)
    await self.db.save_message(context.room_id, context.client_id, message)

    # Echo back to room
    await self.send_room_message(context.room_id, f"echo: {message}")

on_disconnect()

Called when a WebSocket client disconnects.

async def on_disconnect(self, context: Context)

Parameters:

Parameter Type Description
context Context Contains client_id, room_id, and metadata

Example:

async def on_disconnect(self, context):
    print(f"Client disconnected: {context.client_id}")
    await self.db.update_user_status(context.metadata["user_id"], "offline")
    await self.send_room_message(
        context.room_id,
        f"User {context.client_id} left"
    )

on_error()

Called when an error occurs.

async def on_error(self, error: str, context: Context)

Parameters:

Parameter Type Description
error str Error message
context Context Contains client_id, room_id, and metadata

Example:

async def on_error(self, error, context):
    print(f"Error for {context.client_id}: {error}")
    await self.log_error(context.client_id, error)

Attributes

Attribute Type Description
host str Server hostname
port int Server port
connected bool Current connection state
auto_reconnect bool Whether auto-reconnect is enabled
reconnect_delay float Initial reconnection delay
max_reconnect_delay float Maximum reconnection delay cap
reconnect_backoff float Exponential backoff multiplier

Complete Example

import asyncio
import logging
from pystrands import AsyncPyStrandsClient

logging.basicConfig(level=logging.INFO)

class ChatServer(AsyncPyStrandsClient):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.users = {}  # client_id -> username

    async def on_connection_request(self, request):
        token = request.headers.get("X-Token", [None])[0]
        if token != "secret":
            return False

        request.context.room_id = request.url.strip("/") or "lobby"
        request.context.metadata = {"token": token}
        return True

    async def on_new_connection(self, context):
        self.users[context.client_id] = f"User_{context.client_id[:8]}"
        await self.send_room_message(
            context.room_id,
            f"{self.users[context.client_id]} joined"
        )

    async def on_message(self, message, context):
        username = self.users.get(context.client_id, "Unknown")
        formatted = f"{username}: {message}"
        await self.send_room_message(context.room_id, formatted)

    async def on_disconnect(self, context):
        username = self.users.pop(context.client_id, "Unknown")
        await self.send_room_message(
            context.room_id,
            f"{username} left"
        )

async def main():
    client = ChatServer(
        host="localhost",
        port=8081,
        auto_reconnect=True
    )
    await client.run_forever()

if __name__ == "__main__":
    asyncio.run(main())

Integration with FastAPI

from fastapi import FastAPI
from pystrands import AsyncPyStrandsClient
import asyncio

app = FastAPI()

class FastAPIBackend(AsyncPyStrandsClient):
    async def on_message(self, message, context):
        await self.send_room_message(context.room_id, f"echo: {message}")

backend = FastAPIBackend()

@app.on_event("startup")
async def startup():
    asyncio.create_task(backend.run_forever())

@app.on_event("shutdown")
async def shutdown():
    await backend.disconnect()

@app.get("/")
async def root():
    return {"status": "running", "connected": backend.connected}

See Also