Skip to content

Messaging

PyStrands supports multiple messaging patterns: room-based messages, private messages, and broadcasts. This guide covers each pattern and best practices.

Messaging Patterns Overview

┌─────────────────────────────────────────────────────────────────┐
│                        Go Broker                                 │
│                                                                  │
│   ┌─────────┐  ┌─────────┐  ┌─────────┐  ┌─────────┐           │
│   │ Client A│  │ Client B│  │ Client C│  │ Client D│           │
│   │ Room: 1 │  │ Room: 1 │  │ Room: 2 │  │ Room: 1 │           │
│   └────┬────┘  └────┬────┘  └────┬────┘  └────┬────┘           │
│        │            │            │            │                  │
│        └────────────┴────────────┘            │                  │
│                   Room 1                      │                  │
│                                              │                  │
└──────────────────────────────────────────────┼──────────────────┘
                                        ┌──────▼──────┐
                                        │   Python    │
                                        │   Backend   │
                                        └─────────────┘

Room Messages

Room messages are sent to all clients in a specific room. This is the primary messaging pattern for most applications (chat rooms, game lobbies, collaboration spaces).

Sending Room Messages

await self.send_room_message(room_id, message)
self.send_room_message(room_id, message)

Example: Echo Server

from pystrands import AsyncPyStrandsClient

class EchoServer(AsyncPyStrandsClient):
    async def on_connection_request(self, request):
        # Clients connect to ws://broker/room-name
        # Extract room from URL path
        request.context.room_id = request.url.strip("/") or "lobby"
        return True

    async def on_message(self, message, context):
        # Echo the message to everyone in the same room
        echo = f"{context.client_id}: {message}"
        await self.send_room_message(context.room_id, echo)

client = EchoServer()

Example: Chat Room with History

from collections import defaultdict
import asyncio

class ChatServer(AsyncPyStrandsClient):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        # Store recent messages per room
        self.room_history = defaultdict(list)
        self.max_history = 50

    async def on_new_connection(self, context):
        # Send recent history to new client
        history = self.room_history.get(context.room_id, [])
        for msg in history:
            await self.send_private_message(context.client_id, msg)

    async def on_message(self, message, context):
        formatted = f"[{context.client_id}] {message}"

        # Store in history
        room_msgs = self.room_history[context.room_id]
        room_msgs.append(formatted)
        if len(room_msgs) > self.max_history:
            room_msgs.pop(0)

        # Broadcast to room
        await self.send_room_message(context.room_id, formatted)

Private Messages

Send messages to specific clients using their client_id:

Sending Private Messages

await self.send_private_message(client_id, message)
self.send_private_message(client_id, message)

Example: Direct Messages

async def on_message(self, message, context):
    # Format: @client_id message
    if message.startswith("@"):
        parts = message.split(" ", 1)
        if len(parts) == 2:
            target_id = parts[0][1:]  # Remove @
            dm_content = parts[1]

            await self.send_private_message(
                target_id,
                f"[DM from {context.client_id}] {dm_content}"
            )

            # Confirm to sender
            await self.send_private_message(
                context.client_id,
                f"[DM to {target_id}] {dm_content}"
            )
    else:
        # Regular room message
        await self.send_room_message(context.room_id, f"{context.client_id}: {message}")

Example: Server Notifications

async def on_new_connection(self, context):
    # Welcome message to the new user
    await self.send_private_message(
        context.client_id,
        f"Welcome! You're in room: {context.room_id}"
    )

    # Notify others in the room
    await self.send_room_message(
        context.room_id,
        f"User {context.client_id} joined the room"
    )

async def on_disconnect(self, context):
    # Notify room of departure
    await self.send_room_message(
        context.room_id,
        f"User {context.client_id} left the room"
    )

Broadcast Messages

Broadcasts send messages to all connected clients, regardless of room:

Sending Broadcasts

await self.broadcast_message(message)
self.broadcast_message(message)

Example: System Announcements

class GameServer(AsyncPyStrandsClient):
    async def handle_admin_command(self, command):
        if command.startswith("/announce "):
            announcement = command[10:]
            await self.broadcast_message(f"📢 SYSTEM: {announcement}")

    async def on_message(self, message, context):
        # Check for admin commands
        if context.metadata.get("role") == "admin":
            if message.startswith("/"):
                await self.handle_admin_command(message)
                return

        # Regular message
        await self.send_room_message(context.room_id, message)

Use Broadcasts Sparingly

Broadcasts reach every connected client. For large applications, prefer room-based messaging to reduce load.

Message Queuing

When Python backends disconnect, the Go broker can queue messages if --queue-size is set:

python -m pystrands server --queue-size 1000

How Queuing Works

1. All Python backends disconnect
2. Client sends message to Go Broker
3. Go Broker stores message in queue
4. Python backend reconnects
5. Go Broker flushes queued messages to backend

Handling Queued Messages

class ResilientServer(AsyncPyStrandsClient):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.reconnect_time = None

    async def connect(self):
        result = await super().connect()
        if result:
            self.reconnect_time = time.time()
            print(f"Connected at {self.reconnect_time}")
        return result

    async def on_message(self, message, context):
        # Check if this might be a queued message
        # (received shortly after reconnect)
        if self.reconnect_time and time.time() - self.reconnect_time < 5:
            print(f"Processing queued message: {message}")

        await self.send_room_message(context.room_id, message)

Queue Limitations

  • Queue is in-memory only (lost on broker restart)
  • FIFO order is preserved
  • When queue is full, new messages are dropped

Message Format Best Practices

JSON Messages

For complex applications, use JSON for structured messages:

import json

async def on_message(self, message, context):
    try:
        data = json.loads(message)
        msg_type = data.get("type")

        if msg_type == "chat":
            await self.handle_chat(data, context)
        elif msg_type == "typing":
            await self.handle_typing(data, context)
        elif msg_type == "command":
            await self.handle_command(data, context)
    except json.JSONDecodeError:
        # Handle plain text fallback
        await self.send_room_message(context.room_id, message)

async def handle_chat(self, data, context):
    payload = {
        "type": "chat",
        "from": context.client_id,
        "text": data["text"],
        "timestamp": time.time()
    }
    await self.send_room_message(
        context.room_id,
        json.dumps(payload)
    )

Binary Data

For binary data (images, audio), base64 encode:

import base64

async def on_message(self, message, context):
    data = json.loads(message)

    if data["type"] == "image":
        # image_data is base64 encoded
        image_bytes = base64.b64decode(data["image_data"])
        # Process image...

        # Broadcast processed image
        await self.send_room_message(context.room_id, json.dumps({
            "type": "image",
            "from": context.client_id,
            "image_data": data["image_data"]
        }))

Error Handling

Handle messaging errors gracefully:

import logging

logger = logging.getLogger(__name__)

class RobustServer(AsyncPyStrandsClient):
    async def on_message(self, message, context):
        try:
            # Process message
            result = self.process_message(message)

            # Send response
            await self.send_room_message(context.room_id, result)

        except Exception as e:
            logger.error(f"Error processing message: {e}")

            # Notify sender of error
            await self.send_private_message(
                context.client_id,
                json.dumps({"type": "error", "message": "Failed to process message"})
            )

    async def on_error(self, error, context):
        logger.error(f"Server error: {error}")
        # Optionally notify admins
        await self.notify_admins(f"Error for {context.client_id}: {error}")

Message Flow Summary

┌─────────────┐     ┌─────────────┐     ┌─────────────┐
│   Client    │────→│  Go Broker  │────→│   Python    │
│  (sends)    │     │  (routes)   │     │  (processes)│
└─────────────┘     └─────────────┘     └──────┬──────┘
                       ┌────────────────────────┼────────────────────────┐
                       │                        │                        │
                       ▼                        ▼                        ▼
                ┌─────────────┐         ┌─────────────┐         ┌─────────────┐
                │  Broadcast  │         │   Room      │         │   Private   │
                │  (all)      │         │  (room_id)  │         │  (client_id)│
                └─────────────┘         └─────────────┘         └─────────────┘

Performance Tips

  1. Batch messages — If sending to many clients, consider batching
  2. Use rooms — Organize clients into rooms to reduce message fanout
  3. Limit message size — Keep messages under a few KB for best performance
  4. Handle backpressure — Monitor and handle cases where clients can't keep up

Load Testing

Use tools like artillery or k6 to test your messaging patterns under load before deploying to production.