Skip to main content
tpluspy exposes all real-time data feeds as async generators — Python objects you consume with an async for loop. Under the hood, each generator opens and maintains a WebSocket connection to the relevant service, decodes incoming messages into typed Pydantic models, and yields them to your loop one at a time. You never touch raw WebSocket frames; the client handles framing, reconnection signalling, and model parsing for you.

The Async Generator Pattern

Every stream follows the same pattern:
async for event in client.some_stream(asset):
    # handle event — each iteration yields one typed message
    process(event)
The loop runs indefinitely until the connection closes, you break out, or you cancel the surrounding task. In interactive scripts, press Ctrl-C to stop.

OrderBookClient Streams

OrderBookClient provides two authenticated streams that carry user-specific events. Both require a valid User to be set on the client.

stream_orders — Live Order Events

Receive real-time status updates for your own orders: new acknowledgements, partial fills, full fills, cancellations, and replacements.
import asyncio
from tplus.client import OrderBookClient
from tplus.utils.user import load_user

API_BASE_URL = "http://127.0.0.1:8000"

async def watch_orders():
    user = load_user("alice")

    async with OrderBookClient(API_BASE_URL, default_user=user) as client:
        async for event in client.stream_orders():
            print(f"[OrderEvent] {event}")

asyncio.run(watch_orders())

stream_user_trade_events — Your Trade Feed

Receive a real-time feed of every trade execution that involves your orders.
async def watch_my_trades():
    user = load_user("alice")

    async with OrderBookClient(API_BASE_URL, default_user=user) as client:
        async for trade_event in client.stream_user_trade_events():
            print(f"[MyTrade] {trade_event}")

asyncio.run(watch_my_trades())

MarketDataClient Streams

MarketDataClient provides three public, unauthenticated streams. No user is required.

stream_finalized_trades — Public Trade Feed

Every matched and settled trade on the exchange, in real time.
import asyncio
from tplus.client import MarketDataClient
from tplus.model.trades import Trade

async def watch_trades():
    async with MarketDataClient() as md_client:
        async for trade in md_client.stream_finalized_trades():
            if isinstance(trade, Trade):
                print(
                    f"[Trade]  id={trade.trade_id}"
                    f"  price={trade.price}"
                    f"  qty={trade.quantity}"
                )

asyncio.run(watch_trades())

stream_depth — Incremental Order Book Updates

Receive OrderBookDiff messages containing only the price levels that changed since the last update. Apply them to a local snapshot to maintain a live full-depth order book.
from tplus.model.asset_identifier import AssetIdentifier
from tplus.model.orderbook import OrderBookDiff

asset = AssetIdentifier(200)

async def watch_depth():
    async with MarketDataClient() as md_client:
        # Bootstrap: fetch the current snapshot first
        snapshot = await md_client.get_orderbook_snapshot(asset)
        local_book = {"asks": snapshot.asks, "bids": snapshot.bids}
        last_seq = snapshot.sequence_number

        async for diff in md_client.stream_depth(asset):
            if isinstance(diff, OrderBookDiff):
                if diff.sequence_number <= last_seq:
                    continue  # stale — skip
                # apply the diff to local_book here
                last_seq = diff.sequence_number
                print(
                    f"[Depth]  seq={diff.sequence_number}"
                    f"  Δasks={len(diff.asks)}"
                    f"  Δbids={len(diff.bids)}"
                )

asyncio.run(watch_depth())

stream_klines — Live Candlestick Updates

Receive a KlineUpdate each time a new trade closes a candle or updates the current one.
async def watch_klines():
    async with MarketDataClient() as md_client:
        async for update in md_client.stream_klines(asset):
            print(f"[Kline] {update}")

asyncio.run(watch_klines())

Running Multiple Streams Concurrently

Use asyncio.gather to run multiple async generators at the same time inside a single event loop. Each stream runs as an independent coroutine; they share the same client connection pool.
import asyncio
from tplus.client import OrderBookClient, MarketDataClient
from tplus.model.asset_identifier import AssetIdentifier
from tplus.model.orderbook import OrderBookDiff
from tplus.model.trades import Trade
from tplus.utils.user import load_user

API_BASE_URL = "http://127.0.0.1:8000"
asset = AssetIdentifier(200)


async def watch_order_events(client: OrderBookClient):
    async for event in client.stream_orders():
        print(f"[OrderEvent] {event}")


async def watch_user_trades(client: OrderBookClient):
    async for trade_event in client.stream_user_trade_events():
        print(f"[MyTrade] {trade_event}")


async def watch_public_trades(md_client: MarketDataClient):
    async for trade in md_client.stream_finalized_trades():
        if isinstance(trade, Trade):
            print(f"[PublicTrade]  id={trade.trade_id}  px={trade.price}  qty={trade.quantity}")


async def watch_depth(md_client: MarketDataClient):
    async for diff in md_client.stream_depth(asset):
        if isinstance(diff, OrderBookDiff):
            print(f"[Depth]  seq={diff.sequence_number}")


async def main():
    user = load_user("alice")

    async with (
        OrderBookClient(API_BASE_URL, default_user=user) as client,
        MarketDataClient() as md_client,
    ):
        await asyncio.gather(
            watch_order_events(client),
            watch_user_trades(client),
            watch_public_trades(md_client),
            watch_depth(md_client),
        )


asyncio.run(main())
A full runnable version of this pattern lives in examples/websocket_usage.py in the tpluspy repository.

Error Handling and Reconnection

WebSocket streams raise exceptions when the underlying connection fails. The async generator propagates the error out of the async for loop, at which point you can decide whether to reconnect.
import asyncio
from tplus.client import MarketDataClient
from tplus.model.trades import Trade

async def resilient_trade_stream():
    while True:
        try:
            async with MarketDataClient() as md_client:
                async for trade in md_client.stream_finalized_trades():
                    if isinstance(trade, Trade):
                        print(f"[Trade] {trade.trade_id}")
        except Exception as err:
            print(f"Stream disconnected: {err}. Reconnecting in 2 s…")
            await asyncio.sleep(2)
            # loop restarts: a new client and stream are created

asyncio.run(resilient_trade_stream())
Reconnection restarts the async generator from scratch. If you maintain local state derived from the stream (e.g. a running order book), you should re-fetch a fresh snapshot after reconnecting and re-synchronise before processing new diffs.
StreamClientAuth required
stream_orders()OrderBookClientYes
stream_user_trade_events()OrderBookClientYes
stream_finalized_trades()MarketDataClientNo
stream_depth(asset_id)MarketDataClientNo
stream_klines(asset_id)MarketDataClientNo