tpluspy exposes all real-time data feeds as async generators — Python objects you consume with an async for loop. Under the hood, each generator opens and maintains a WebSocket connection to the relevant service, decodes incoming messages into typed Pydantic models, and yields them to your loop one at a time. You never touch raw WebSocket frames; the client handles framing, reconnection signalling, and model parsing for you.
The Async Generator Pattern
Every stream follows the same pattern:
async for event in client.some_stream(asset):
# handle event — each iteration yields one typed message
process(event)
The loop runs indefinitely until the connection closes, you break out, or you cancel the surrounding task. In interactive scripts, press Ctrl-C to stop.
OrderBookClient Streams
OrderBookClient provides two authenticated streams that carry user-specific events. Both require a valid User to be set on the client.
stream_orders — Live Order Events
Receive real-time status updates for your own orders: new acknowledgements, partial fills, full fills, cancellations, and replacements.
import asyncio
from tplus.client import OrderBookClient
from tplus.utils.user import load_user
API_BASE_URL = "http://127.0.0.1:8000"
async def watch_orders():
user = load_user("alice")
async with OrderBookClient(API_BASE_URL, default_user=user) as client:
async for event in client.stream_orders():
print(f"[OrderEvent] {event}")
asyncio.run(watch_orders())
stream_user_trade_events — Your Trade Feed
Receive a real-time feed of every trade execution that involves your orders.
async def watch_my_trades():
user = load_user("alice")
async with OrderBookClient(API_BASE_URL, default_user=user) as client:
async for trade_event in client.stream_user_trade_events():
print(f"[MyTrade] {trade_event}")
asyncio.run(watch_my_trades())
MarketDataClient Streams
MarketDataClient provides three public, unauthenticated streams. No user is required.
stream_finalized_trades — Public Trade Feed
Every matched and settled trade on the exchange, in real time.
import asyncio
from tplus.client import MarketDataClient
from tplus.model.trades import Trade
async def watch_trades():
async with MarketDataClient() as md_client:
async for trade in md_client.stream_finalized_trades():
if isinstance(trade, Trade):
print(
f"[Trade] id={trade.trade_id}"
f" price={trade.price}"
f" qty={trade.quantity}"
)
asyncio.run(watch_trades())
stream_depth — Incremental Order Book Updates
Receive OrderBookDiff messages containing only the price levels that changed since the last update. Apply them to a local snapshot to maintain a live full-depth order book.
from tplus.model.asset_identifier import AssetIdentifier
from tplus.model.orderbook import OrderBookDiff
asset = AssetIdentifier(200)
async def watch_depth():
async with MarketDataClient() as md_client:
# Bootstrap: fetch the current snapshot first
snapshot = await md_client.get_orderbook_snapshot(asset)
local_book = {"asks": snapshot.asks, "bids": snapshot.bids}
last_seq = snapshot.sequence_number
async for diff in md_client.stream_depth(asset):
if isinstance(diff, OrderBookDiff):
if diff.sequence_number <= last_seq:
continue # stale — skip
# apply the diff to local_book here
last_seq = diff.sequence_number
print(
f"[Depth] seq={diff.sequence_number}"
f" Δasks={len(diff.asks)}"
f" Δbids={len(diff.bids)}"
)
asyncio.run(watch_depth())
stream_klines — Live Candlestick Updates
Receive a KlineUpdate each time a new trade closes a candle or updates the current one.
async def watch_klines():
async with MarketDataClient() as md_client:
async for update in md_client.stream_klines(asset):
print(f"[Kline] {update}")
asyncio.run(watch_klines())
Running Multiple Streams Concurrently
Use asyncio.gather to run multiple async generators at the same time inside a single event loop. Each stream runs as an independent coroutine; they share the same client connection pool.
import asyncio
from tplus.client import OrderBookClient, MarketDataClient
from tplus.model.asset_identifier import AssetIdentifier
from tplus.model.orderbook import OrderBookDiff
from tplus.model.trades import Trade
from tplus.utils.user import load_user
API_BASE_URL = "http://127.0.0.1:8000"
asset = AssetIdentifier(200)
async def watch_order_events(client: OrderBookClient):
async for event in client.stream_orders():
print(f"[OrderEvent] {event}")
async def watch_user_trades(client: OrderBookClient):
async for trade_event in client.stream_user_trade_events():
print(f"[MyTrade] {trade_event}")
async def watch_public_trades(md_client: MarketDataClient):
async for trade in md_client.stream_finalized_trades():
if isinstance(trade, Trade):
print(f"[PublicTrade] id={trade.trade_id} px={trade.price} qty={trade.quantity}")
async def watch_depth(md_client: MarketDataClient):
async for diff in md_client.stream_depth(asset):
if isinstance(diff, OrderBookDiff):
print(f"[Depth] seq={diff.sequence_number}")
async def main():
user = load_user("alice")
async with (
OrderBookClient(API_BASE_URL, default_user=user) as client,
MarketDataClient() as md_client,
):
await asyncio.gather(
watch_order_events(client),
watch_user_trades(client),
watch_public_trades(md_client),
watch_depth(md_client),
)
asyncio.run(main())
A full runnable version of this pattern lives in examples/websocket_usage.py in the tpluspy repository.
Error Handling and Reconnection
WebSocket streams raise exceptions when the underlying connection fails. The async generator propagates the error out of the async for loop, at which point you can decide whether to reconnect.
import asyncio
from tplus.client import MarketDataClient
from tplus.model.trades import Trade
async def resilient_trade_stream():
while True:
try:
async with MarketDataClient() as md_client:
async for trade in md_client.stream_finalized_trades():
if isinstance(trade, Trade):
print(f"[Trade] {trade.trade_id}")
except Exception as err:
print(f"Stream disconnected: {err}. Reconnecting in 2 s…")
await asyncio.sleep(2)
# loop restarts: a new client and stream are created
asyncio.run(resilient_trade_stream())
Reconnection restarts the async generator from scratch. If you maintain local state derived from the stream (e.g. a running order book), you should re-fetch a fresh snapshot after reconnecting and re-synchronise before processing new diffs.
| Stream | Client | Auth required |
|---|
stream_orders() | OrderBookClient | Yes |
stream_user_trade_events() | OrderBookClient | Yes |
stream_finalized_trades() | MarketDataClient | No |
stream_depth(asset_id) | MarketDataClient | No |
stream_klines(asset_id) | MarketDataClient | No |