Messaging
PyStrands supports multiple messaging patterns: room-based messages, private messages, and broadcasts. This guide covers each pattern and best practices.
Messaging Patterns Overview
┌─────────────────────────────────────────────────────────────────┐
│ Go Broker │
│ │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │ Client A│ │ Client B│ │ Client C│ │ Client D│ │
│ │ Room: 1 │ │ Room: 1 │ │ Room: 2 │ │ Room: 1 │ │
│ └────┬────┘ └────┬────┘ └────┬────┘ └────┬────┘ │
│ │ │ │ │ │
│ └────────────┴────────────┘ │ │
│ Room 1 │ │
│ │ │
└──────────────────────────────────────────────┼──────────────────┘
│
┌──────▼──────┐
│ Python │
│ Backend │
└─────────────┘
Room Messages
Room messages are sent to all clients in a specific room. This is the primary messaging pattern for most applications (chat rooms, game lobbies, collaboration spaces).
Sending Room Messages
Example: Echo Server
from pystrands import AsyncPyStrandsClient
class EchoServer(AsyncPyStrandsClient):
async def on_connection_request(self, request):
# Clients connect to ws://broker/room-name
# Extract room from URL path
request.context.room_id = request.url.strip("/") or "lobby"
return True
async def on_message(self, message, context):
# Echo the message to everyone in the same room
echo = f"{context.client_id}: {message}"
await self.send_room_message(context.room_id, echo)
client = EchoServer()
Example: Chat Room with History
from collections import defaultdict
import asyncio
class ChatServer(AsyncPyStrandsClient):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# Store recent messages per room
self.room_history = defaultdict(list)
self.max_history = 50
async def on_new_connection(self, context):
# Send recent history to new client
history = self.room_history.get(context.room_id, [])
for msg in history:
await self.send_private_message(context.client_id, msg)
async def on_message(self, message, context):
formatted = f"[{context.client_id}] {message}"
# Store in history
room_msgs = self.room_history[context.room_id]
room_msgs.append(formatted)
if len(room_msgs) > self.max_history:
room_msgs.pop(0)
# Broadcast to room
await self.send_room_message(context.room_id, formatted)
Private Messages
Send messages to specific clients using their client_id:
Sending Private Messages
Example: Direct Messages
async def on_message(self, message, context):
# Format: @client_id message
if message.startswith("@"):
parts = message.split(" ", 1)
if len(parts) == 2:
target_id = parts[0][1:] # Remove @
dm_content = parts[1]
await self.send_private_message(
target_id,
f"[DM from {context.client_id}] {dm_content}"
)
# Confirm to sender
await self.send_private_message(
context.client_id,
f"[DM to {target_id}] {dm_content}"
)
else:
# Regular room message
await self.send_room_message(context.room_id, f"{context.client_id}: {message}")
Example: Server Notifications
async def on_new_connection(self, context):
# Welcome message to the new user
await self.send_private_message(
context.client_id,
f"Welcome! You're in room: {context.room_id}"
)
# Notify others in the room
await self.send_room_message(
context.room_id,
f"User {context.client_id} joined the room"
)
async def on_disconnect(self, context):
# Notify room of departure
await self.send_room_message(
context.room_id,
f"User {context.client_id} left the room"
)
Broadcast Messages
Broadcasts send messages to all connected clients, regardless of room:
Sending Broadcasts
Example: System Announcements
class GameServer(AsyncPyStrandsClient):
async def handle_admin_command(self, command):
if command.startswith("/announce "):
announcement = command[10:]
await self.broadcast_message(f"📢 SYSTEM: {announcement}")
async def on_message(self, message, context):
# Check for admin commands
if context.metadata.get("role") == "admin":
if message.startswith("/"):
await self.handle_admin_command(message)
return
# Regular message
await self.send_room_message(context.room_id, message)
Use Broadcasts Sparingly
Broadcasts reach every connected client. For large applications, prefer room-based messaging to reduce load.
Message Queuing
When Python backends disconnect, the Go broker can queue messages if --queue-size is set:
How Queuing Works
1. All Python backends disconnect
2. Client sends message to Go Broker
3. Go Broker stores message in queue
4. Python backend reconnects
5. Go Broker flushes queued messages to backend
Handling Queued Messages
class ResilientServer(AsyncPyStrandsClient):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.reconnect_time = None
async def connect(self):
result = await super().connect()
if result:
self.reconnect_time = time.time()
print(f"Connected at {self.reconnect_time}")
return result
async def on_message(self, message, context):
# Check if this might be a queued message
# (received shortly after reconnect)
if self.reconnect_time and time.time() - self.reconnect_time < 5:
print(f"Processing queued message: {message}")
await self.send_room_message(context.room_id, message)
Queue Limitations
- Queue is in-memory only (lost on broker restart)
- FIFO order is preserved
- When queue is full, new messages are dropped
Message Format Best Practices
JSON Messages
For complex applications, use JSON for structured messages:
import json
async def on_message(self, message, context):
try:
data = json.loads(message)
msg_type = data.get("type")
if msg_type == "chat":
await self.handle_chat(data, context)
elif msg_type == "typing":
await self.handle_typing(data, context)
elif msg_type == "command":
await self.handle_command(data, context)
except json.JSONDecodeError:
# Handle plain text fallback
await self.send_room_message(context.room_id, message)
async def handle_chat(self, data, context):
payload = {
"type": "chat",
"from": context.client_id,
"text": data["text"],
"timestamp": time.time()
}
await self.send_room_message(
context.room_id,
json.dumps(payload)
)
Binary Data
For binary data (images, audio), base64 encode:
import base64
async def on_message(self, message, context):
data = json.loads(message)
if data["type"] == "image":
# image_data is base64 encoded
image_bytes = base64.b64decode(data["image_data"])
# Process image...
# Broadcast processed image
await self.send_room_message(context.room_id, json.dumps({
"type": "image",
"from": context.client_id,
"image_data": data["image_data"]
}))
Error Handling
Handle messaging errors gracefully:
import logging
logger = logging.getLogger(__name__)
class RobustServer(AsyncPyStrandsClient):
async def on_message(self, message, context):
try:
# Process message
result = self.process_message(message)
# Send response
await self.send_room_message(context.room_id, result)
except Exception as e:
logger.error(f"Error processing message: {e}")
# Notify sender of error
await self.send_private_message(
context.client_id,
json.dumps({"type": "error", "message": "Failed to process message"})
)
async def on_error(self, error, context):
logger.error(f"Server error: {error}")
# Optionally notify admins
await self.notify_admins(f"Error for {context.client_id}: {error}")
Message Flow Summary
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Client │────→│ Go Broker │────→│ Python │
│ (sends) │ │ (routes) │ │ (processes)│
└─────────────┘ └─────────────┘ └──────┬──────┘
│
┌────────────────────────┼────────────────────────┐
│ │ │
▼ ▼ ▼
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Broadcast │ │ Room │ │ Private │
│ (all) │ │ (room_id) │ │ (client_id)│
└─────────────┘ └─────────────┘ └─────────────┘
Performance Tips
- Batch messages — If sending to many clients, consider batching
- Use rooms — Organize clients into rooms to reduce message fanout
- Limit message size — Keep messages under a few KB for best performance
- Handle backpressure — Monitor and handle cases where clients can't keep up
Load Testing
Use tools like artillery or k6 to test your messaging patterns under load before deploying to production.