Skip to content

PyStrandsClient

The synchronous client for PyStrands, using Python's threading module for concurrent message handling.

Overview

PyStrandsClient is ideal for: - Simple applications that don't need asyncio - Integration with synchronous frameworks (Flask, Django) - Quick prototypes and scripts

from pystrands import PyStrandsClient

class MyBackend(PyStrandsClient):
    def on_message(self, message, context):
        self.send_room_message(context.room_id, f"echo: {message}")

client = MyBackend(host="localhost", port=8081)
client.run_forever()

Constructor

PyStrandsClient.__init__

PyStrandsClient(
    host: str = "localhost",
    port: int = 8081,
    auto_reconnect: bool = True,
    reconnect_delay: float = 1.0,
    max_reconnect_delay: float = 30.0,
    reconnect_backoff: float = 2.0
)

Parameters:

Parameter Type Default Description
host str "localhost" Hostname or IP of the Go broker
port int 8081 TCP port for the Go broker
auto_reconnect bool True Whether to automatically reconnect on connection loss
reconnect_delay float 1.0 Initial delay between reconnection attempts (seconds)
max_reconnect_delay float 30.0 Maximum delay cap for reconnection (seconds)
reconnect_backoff float 2.0 Exponential backoff multiplier

Example:

client = PyStrandsClient(
    host="broker.example.com",
    port=8081,
    auto_reconnect=True,
    reconnect_delay=1.0,
    max_reconnect_delay=30.0,
    reconnect_backoff=2.0
)

Methods

connect()

Connect to the TCP server.

def connect(self) -> bool

Returns: boolTrue if connection successful, False otherwise

Example:

client = PyStrandsClient()
if client.connect():
    print("Connected successfully")
else:
    print("Connection failed")

disconnect()

Cleanly disconnect from the server. Waits for in-flight handlers to finish.

def disconnect(self, timeout: float = 5.0)

Parameters:

Parameter Type Default Description
timeout float 5.0 Max seconds to wait for in-flight handlers before force-closing

Example:

client.disconnect(timeout=10.0)

run_forever()

Connect to the server and block the main thread until disconnect. Uses threading.Event for CPU-efficient waiting.

def run_forever(self)

Example:

client = PyStrandsClient()
try:
    client.run_forever()  # Blocks until KeyboardInterrupt or disconnect
except KeyboardInterrupt:
    print("Shutting down...")

Auto-connect

run_forever() automatically calls connect() if not already connected.


broadcast_message()

Broadcast a message to all connected WebSocket clients.

def broadcast_message(self, message: str)

Parameters:

Parameter Type Description
message str The message to broadcast

Example:

def on_message(self, message, context):
    if message == "!announce":
        self.broadcast_message("Important system update!")

send_room_message()

Send a message to all clients in a specific room.

def send_room_message(self, room_id: str, message: str)

Parameters:

Parameter Type Description
room_id str The room ID to send to
message str The message to send

Example:

def on_message(self, message, context):
    # Echo to the same room
    self.send_room_message(context.room_id, f"echo: {message}")

send_private_message()

Send a message to a specific client.

def send_private_message(self, client_id: str, message: str)

Parameters:

Parameter Type Description
client_id str The target client's ID
message str The message to send

Example:

def on_new_connection(self, context):
    # Welcome message to the new client only
    self.send_private_message(
        context.client_id,
        f"Welcome to {context.room_id}!"
    )

Callbacks (Override These)

on_connection_request()

Handle new WebSocket connection requests. Return True to accept, False to reject.

def on_connection_request(self, context: ConnectionRequestContext) -> bool

Parameters:

Parameter Type Description
context ConnectionRequestContext Contains headers, URL, remote_addr, and mutable context

Returns: boolTrue to accept, False to reject. Or modify context.accepted directly.

Example:

def on_connection_request(self, request):
    # Check authorization header
    auth = request.headers.get("Authorization", [None])[0]
    if auth != "Bearer valid-token":
        return False

    # Set room based on URL path
    request.context.room_id = request.url.strip("/")

    # Store user metadata
    request.context.metadata = {"user": "alice", "role": "admin"}

    return True

Room Assignment

Always set request.context.room_id in this callback to route clients to the correct room.


on_new_connection()

Called when a WebSocket client successfully connects.

def on_new_connection(self, context: Context)

Parameters:

Parameter Type Description
context Context Contains client_id, room_id, and metadata

Example:

def on_new_connection(self, context):
    print(f"New client: {context.client_id} in room: {context.room_id}")
    self.send_private_message(context.client_id, "Welcome!")

on_message()

Called when a client sends a message.

def on_message(self, message: str, context: Context)

Parameters:

Parameter Type Description
message str The message content
context Context Contains client_id, room_id, and metadata

Example:

def on_message(self, message, context):
    print(f"[{context.room_id}] {context.client_id}: {message}")

    # Echo back to room
    self.send_room_message(context.room_id, f"echo: {message}")

on_disconnect()

Called when a WebSocket client disconnects.

def on_disconnect(self, context: Context)

Parameters:

Parameter Type Description
context Context Contains client_id, room_id, and metadata

Example:

def on_disconnect(self, context):
    print(f"Client disconnected: {context.client_id}")
    # Notify others in the room
    self.send_room_message(
        context.room_id,
        f"User {context.client_id} left"
    )

on_error()

Called when an error occurs.

def on_error(self, error: str, context: Context)

Parameters:

Parameter Type Description
error str Error message
context Context Contains client_id, room_id, and metadata

Example:

def on_error(self, error, context):
    print(f"Error for {context.client_id}: {error}")

Attributes

Attribute Type Description
host str Server hostname
port int Server port
connected bool Current connection state
auto_reconnect bool Whether auto-reconnect is enabled
reconnect_delay float Initial reconnection delay
max_reconnect_delay float Maximum reconnection delay cap
reconnect_backoff float Exponential backoff multiplier

Complete Example

import logging
from pystrands import PyStrandsClient

logging.basicConfig(level=logging.INFO)

class ChatServer(PyStrandsClient):
    def on_connection_request(self, request):
        # Simple auth check
        token = request.headers.get("X-Token", [None])[0]
        if token != "secret":
            return False

        # Set room from URL
        request.context.room_id = request.url.strip("/") or "lobby"
        request.context.metadata = {"token": token}
        return True

    def on_new_connection(self, context):
        self.send_room_message(
            context.room_id,
            f"User {context.client_id} joined"
        )

    def on_message(self, message, context):
        formatted = f"{context.client_id}: {message}"
        self.send_room_message(context.room_id, formatted)

    def on_disconnect(self, context):
        self.send_room_message(
            context.room_id,
            f"User {context.client_id} left"
        )

if __name__ == "__main__":
    client = ChatServer(
        host="localhost",
        port=8081,
        auto_reconnect=True
    )
    client.run_forever()

See Also