AsyncPyStrandsClient
The asynchronous client for PyStrands, using Python's asyncio for concurrent message handling.
Overview
AsyncPyStrandsClient is ideal for:
- Modern async Python applications
- Integration with async frameworks (FastAPI, Sanic, Tornado)
- High-concurrency scenarios
- Applications already using asyncio
import asyncio
from pystrands import AsyncPyStrandsClient
class MyBackend(AsyncPyStrandsClient):
async def on_message(self, message, context):
await self.send_room_message(context.room_id, f"echo: {message}")
client = MyBackend(host="localhost", port=8081)
asyncio.run(client.run_forever())
Constructor
AsyncPyStrandsClient.__init__
AsyncPyStrandsClient(
host: str = "localhost",
port: int = 8081,
auto_reconnect: bool = True,
reconnect_delay: float = 1.0,
max_reconnect_delay: float = 30.0,
reconnect_backoff: float = 2.0
)
Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
host |
str |
"localhost" |
Hostname or IP of the Go broker |
port |
int |
8081 |
TCP port for the Go broker |
auto_reconnect |
bool |
True |
Whether to automatically reconnect on connection loss |
reconnect_delay |
float |
1.0 |
Initial delay between reconnection attempts (seconds) |
max_reconnect_delay |
float |
30.0 |
Maximum delay cap for reconnection (seconds) |
reconnect_backoff |
float |
2.0 |
Exponential backoff multiplier |
Example:
client = AsyncPyStrandsClient(
host="broker.example.com",
port=8081,
auto_reconnect=True,
reconnect_delay=1.0,
max_reconnect_delay=30.0,
reconnect_backoff=2.0
)
Methods
connect()
Connect to the TCP server.
Returns: bool — True if connection successful, False otherwise
Example:
client = AsyncPyStrandsClient()
if await client.connect():
print("Connected successfully")
else:
print("Connection failed")
disconnect()
Cleanly disconnect from the server. Waits for in-flight handlers to finish.
Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
timeout |
float |
5.0 |
Max seconds to wait for in-flight handlers before force-closing |
Example:
run_forever()
Connect to the server and block until disconnect. Use with asyncio.run() or await.
Example:
Or within an existing async context:
Auto-connect
run_forever() automatically calls connect() if not already connected.
broadcast_message()
Broadcast a message to all connected WebSocket clients.
Parameters:
| Parameter | Type | Description |
|---|---|---|
message |
str |
The message to broadcast |
Example:
async def on_message(self, message, context):
if message == "!announce":
await self.broadcast_message("Important system update!")
send_room_message()
Send a message to all clients in a specific room.
Parameters:
| Parameter | Type | Description |
|---|---|---|
room_id |
str |
The room ID to send to |
message |
str |
The message to send |
Example:
async def on_message(self, message, context):
# Echo to the same room
await self.send_room_message(context.room_id, f"echo: {message}")
send_private_message()
Send a message to a specific client.
Parameters:
| Parameter | Type | Description |
|---|---|---|
client_id |
str |
The target client's ID |
message |
str |
The message to send |
Example:
async def on_new_connection(self, context):
# Welcome message to the new client only
await self.send_private_message(
context.client_id,
f"Welcome to {context.room_id}!"
)
Callbacks (Override These)
on_connection_request()
Handle new WebSocket connection requests. Return True to accept, False to reject.
Parameters:
| Parameter | Type | Description |
|---|---|---|
context |
ConnectionRequestContext |
Contains headers, URL, remote_addr, and mutable context |
Returns: bool — True to accept, False to reject. Or modify context.accepted directly.
Example:
async def on_connection_request(self, request):
# Async auth check (e.g., database lookup)
user = await self.db.get_user_by_token(
request.headers.get("Authorization", [""])[0]
)
if not user:
return False
request.context.room_id = request.url.strip("/")
request.context.metadata = {"user_id": user.id}
return True
Room Assignment
Always set request.context.room_id in this callback to route clients to the correct room.
on_new_connection()
Called when a WebSocket client successfully connects.
Parameters:
| Parameter | Type | Description |
|---|---|---|
context |
Context |
Contains client_id, room_id, and metadata |
Example:
async def on_new_connection(self, context):
print(f"New client: {context.client_id} in room: {context.room_id}")
await self.send_private_message(context.client_id, "Welcome!")
on_message()
Called when a client sends a message.
Parameters:
| Parameter | Type | Description |
|---|---|---|
message |
str |
The message content |
context |
Context |
Contains client_id, room_id, and metadata |
Example:
async def on_message(self, message, context):
print(f"[{context.room_id}] {context.client_id}: {message}")
# Process async (e.g., save to database)
await self.db.save_message(context.room_id, context.client_id, message)
# Echo back to room
await self.send_room_message(context.room_id, f"echo: {message}")
on_disconnect()
Called when a WebSocket client disconnects.
Parameters:
| Parameter | Type | Description |
|---|---|---|
context |
Context |
Contains client_id, room_id, and metadata |
Example:
async def on_disconnect(self, context):
print(f"Client disconnected: {context.client_id}")
await self.db.update_user_status(context.metadata["user_id"], "offline")
await self.send_room_message(
context.room_id,
f"User {context.client_id} left"
)
on_error()
Called when an error occurs.
Parameters:
| Parameter | Type | Description |
|---|---|---|
error |
str |
Error message |
context |
Context |
Contains client_id, room_id, and metadata |
Example:
async def on_error(self, error, context):
print(f"Error for {context.client_id}: {error}")
await self.log_error(context.client_id, error)
Attributes
| Attribute | Type | Description |
|---|---|---|
host |
str |
Server hostname |
port |
int |
Server port |
connected |
bool |
Current connection state |
auto_reconnect |
bool |
Whether auto-reconnect is enabled |
reconnect_delay |
float |
Initial reconnection delay |
max_reconnect_delay |
float |
Maximum reconnection delay cap |
reconnect_backoff |
float |
Exponential backoff multiplier |
Complete Example
import asyncio
import logging
from pystrands import AsyncPyStrandsClient
logging.basicConfig(level=logging.INFO)
class ChatServer(AsyncPyStrandsClient):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.users = {} # client_id -> username
async def on_connection_request(self, request):
token = request.headers.get("X-Token", [None])[0]
if token != "secret":
return False
request.context.room_id = request.url.strip("/") or "lobby"
request.context.metadata = {"token": token}
return True
async def on_new_connection(self, context):
self.users[context.client_id] = f"User_{context.client_id[:8]}"
await self.send_room_message(
context.room_id,
f"{self.users[context.client_id]} joined"
)
async def on_message(self, message, context):
username = self.users.get(context.client_id, "Unknown")
formatted = f"{username}: {message}"
await self.send_room_message(context.room_id, formatted)
async def on_disconnect(self, context):
username = self.users.pop(context.client_id, "Unknown")
await self.send_room_message(
context.room_id,
f"{username} left"
)
async def main():
client = ChatServer(
host="localhost",
port=8081,
auto_reconnect=True
)
await client.run_forever()
if __name__ == "__main__":
asyncio.run(main())
Integration with FastAPI
from fastapi import FastAPI
from pystrands import AsyncPyStrandsClient
import asyncio
app = FastAPI()
class FastAPIBackend(AsyncPyStrandsClient):
async def on_message(self, message, context):
await self.send_room_message(context.room_id, f"echo: {message}")
backend = FastAPIBackend()
@app.on_event("startup")
async def startup():
asyncio.create_task(backend.run_forever())
@app.on_event("shutdown")
async def shutdown():
await backend.disconnect()
@app.get("/")
async def root():
return {"status": "running", "connected": backend.connected}
See Also
- PyStrandsClient — The synchronous variant
- Context — Context and ConnectionRequestContext reference