PyStrandsClient
The synchronous client for PyStrands, using Python's threading module for concurrent message handling.
Overview
PyStrandsClient is ideal for:
- Simple applications that don't need asyncio
- Integration with synchronous frameworks (Flask, Django)
- Quick prototypes and scripts
from pystrands import PyStrandsClient
class MyBackend(PyStrandsClient):
def on_message(self, message, context):
self.send_room_message(context.room_id, f"echo: {message}")
client = MyBackend(host="localhost", port=8081)
client.run_forever()
Constructor
PyStrandsClient.__init__
PyStrandsClient(
host: str = "localhost",
port: int = 8081,
auto_reconnect: bool = True,
reconnect_delay: float = 1.0,
max_reconnect_delay: float = 30.0,
reconnect_backoff: float = 2.0
)
Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
host |
str |
"localhost" |
Hostname or IP of the Go broker |
port |
int |
8081 |
TCP port for the Go broker |
auto_reconnect |
bool |
True |
Whether to automatically reconnect on connection loss |
reconnect_delay |
float |
1.0 |
Initial delay between reconnection attempts (seconds) |
max_reconnect_delay |
float |
30.0 |
Maximum delay cap for reconnection (seconds) |
reconnect_backoff |
float |
2.0 |
Exponential backoff multiplier |
Example:
client = PyStrandsClient(
host="broker.example.com",
port=8081,
auto_reconnect=True,
reconnect_delay=1.0,
max_reconnect_delay=30.0,
reconnect_backoff=2.0
)
Methods
connect()
Connect to the TCP server.
Returns: bool — True if connection successful, False otherwise
Example:
client = PyStrandsClient()
if client.connect():
print("Connected successfully")
else:
print("Connection failed")
disconnect()
Cleanly disconnect from the server. Waits for in-flight handlers to finish.
Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
timeout |
float |
5.0 |
Max seconds to wait for in-flight handlers before force-closing |
Example:
run_forever()
Connect to the server and block the main thread until disconnect. Uses threading.Event for CPU-efficient waiting.
Example:
client = PyStrandsClient()
try:
client.run_forever() # Blocks until KeyboardInterrupt or disconnect
except KeyboardInterrupt:
print("Shutting down...")
Auto-connect
run_forever() automatically calls connect() if not already connected.
broadcast_message()
Broadcast a message to all connected WebSocket clients.
Parameters:
| Parameter | Type | Description |
|---|---|---|
message |
str |
The message to broadcast |
Example:
def on_message(self, message, context):
if message == "!announce":
self.broadcast_message("Important system update!")
send_room_message()
Send a message to all clients in a specific room.
Parameters:
| Parameter | Type | Description |
|---|---|---|
room_id |
str |
The room ID to send to |
message |
str |
The message to send |
Example:
def on_message(self, message, context):
# Echo to the same room
self.send_room_message(context.room_id, f"echo: {message}")
send_private_message()
Send a message to a specific client.
Parameters:
| Parameter | Type | Description |
|---|---|---|
client_id |
str |
The target client's ID |
message |
str |
The message to send |
Example:
def on_new_connection(self, context):
# Welcome message to the new client only
self.send_private_message(
context.client_id,
f"Welcome to {context.room_id}!"
)
Callbacks (Override These)
on_connection_request()
Handle new WebSocket connection requests. Return True to accept, False to reject.
Parameters:
| Parameter | Type | Description |
|---|---|---|
context |
ConnectionRequestContext |
Contains headers, URL, remote_addr, and mutable context |
Returns: bool — True to accept, False to reject. Or modify context.accepted directly.
Example:
def on_connection_request(self, request):
# Check authorization header
auth = request.headers.get("Authorization", [None])[0]
if auth != "Bearer valid-token":
return False
# Set room based on URL path
request.context.room_id = request.url.strip("/")
# Store user metadata
request.context.metadata = {"user": "alice", "role": "admin"}
return True
Room Assignment
Always set request.context.room_id in this callback to route clients to the correct room.
on_new_connection()
Called when a WebSocket client successfully connects.
Parameters:
| Parameter | Type | Description |
|---|---|---|
context |
Context |
Contains client_id, room_id, and metadata |
Example:
def on_new_connection(self, context):
print(f"New client: {context.client_id} in room: {context.room_id}")
self.send_private_message(context.client_id, "Welcome!")
on_message()
Called when a client sends a message.
Parameters:
| Parameter | Type | Description |
|---|---|---|
message |
str |
The message content |
context |
Context |
Contains client_id, room_id, and metadata |
Example:
def on_message(self, message, context):
print(f"[{context.room_id}] {context.client_id}: {message}")
# Echo back to room
self.send_room_message(context.room_id, f"echo: {message}")
on_disconnect()
Called when a WebSocket client disconnects.
Parameters:
| Parameter | Type | Description |
|---|---|---|
context |
Context |
Contains client_id, room_id, and metadata |
Example:
def on_disconnect(self, context):
print(f"Client disconnected: {context.client_id}")
# Notify others in the room
self.send_room_message(
context.room_id,
f"User {context.client_id} left"
)
on_error()
Called when an error occurs.
Parameters:
| Parameter | Type | Description |
|---|---|---|
error |
str |
Error message |
context |
Context |
Contains client_id, room_id, and metadata |
Example:
Attributes
| Attribute | Type | Description |
|---|---|---|
host |
str |
Server hostname |
port |
int |
Server port |
connected |
bool |
Current connection state |
auto_reconnect |
bool |
Whether auto-reconnect is enabled |
reconnect_delay |
float |
Initial reconnection delay |
max_reconnect_delay |
float |
Maximum reconnection delay cap |
reconnect_backoff |
float |
Exponential backoff multiplier |
Complete Example
import logging
from pystrands import PyStrandsClient
logging.basicConfig(level=logging.INFO)
class ChatServer(PyStrandsClient):
def on_connection_request(self, request):
# Simple auth check
token = request.headers.get("X-Token", [None])[0]
if token != "secret":
return False
# Set room from URL
request.context.room_id = request.url.strip("/") or "lobby"
request.context.metadata = {"token": token}
return True
def on_new_connection(self, context):
self.send_room_message(
context.room_id,
f"User {context.client_id} joined"
)
def on_message(self, message, context):
formatted = f"{context.client_id}: {message}"
self.send_room_message(context.room_id, formatted)
def on_disconnect(self, context):
self.send_room_message(
context.room_id,
f"User {context.client_id} left"
)
if __name__ == "__main__":
client = ChatServer(
host="localhost",
port=8081,
auto_reconnect=True
)
client.run_forever()
See Also
- AsyncPyStrandsClient — The async variant
- Context — Context and ConnectionRequestContext reference