Skip to main content
T+ exposes real-time WebSocket feeds for market data and user-specific order events. The SDK provides two clients for streaming: MarketDataClient for public market feeds (depth, trades, klines) that require no authentication, and OrderBookClient for user-specific streams such as order status updates and personal trade events. Both clients expose async generators — you iterate over them with async for and press Ctrl-C to stop.

The two streaming clients

ClientStreamsAuth needed?
MarketDataClientstream_depth, stream_finalized_trades, stream_klinesNo
OrderBookClientstream_orders, stream_user_trade_eventsYes — requires a User
1
Set up MarketDataClient
2
MarketDataClient is read-only and requires no user account. Point it at the market-data service endpoint (default: http://localhost:8011).
3
import asyncio
from tplus.client import MarketDataClient
from tplus.model.asset_identifier import AssetIdentifier

MARKET_DATA_URL = "http://localhost:8011"  # Replace with your endpoint

async def main():
    asset = AssetIdentifier(200)  # registry index form

    async with MarketDataClient(base_url=MARKET_DATA_URL) as md_client:
        print("MarketDataClient ready")
        # Streaming calls go here.

asyncio.run(main())
4
5
Stream order book depth diffs
6
stream_depth yields OrderBookDiff objects that describe incremental changes to the order book. Each update carries a sequence_number so you can detect gaps, plus updated asks and bids levels.
7
import asyncio
from tplus.client import MarketDataClient
from tplus.model.asset_identifier import AssetIdentifier
from tplus.model.orderbook import OrderBookDiff

MARKET_DATA_URL = "http://localhost:8011"

async def main():
    asset = AssetIdentifier(200)

    async with MarketDataClient(base_url=MARKET_DATA_URL) as md_client:
        print("Listening for depth updates — Ctrl-C to stop.")
        async for update in md_client.stream_depth(asset):
            if isinstance(update, OrderBookDiff):
                print(
                    f"[Depth] Seq={update.sequence_number}  "
                    f"Asks={len(update.asks)}  Bids={len(update.bids)}"
                )

asyncio.run(main())
8
stream_depth sends incremental diffs, not full snapshots. To build a local order book, call get_orderbook_snapshot(asset_id) once to seed your state, then apply each OrderBookDiff in sequence order on top of it.
9
10
Stream finalized trades
11
stream_finalized_trades yields Trade objects for every matched trade on the exchange — across all assets.
12
import asyncio
from tplus.client import MarketDataClient
from tplus.model.trades import Trade

MARKET_DATA_URL = "http://localhost:8011"

async def main():
    async with MarketDataClient(base_url=MARKET_DATA_URL) as md_client:
        print("Listening for finalized trades — Ctrl-C to stop.")
        async for trade in md_client.stream_finalized_trades():
            if isinstance(trade, Trade):
                print(
                    f"[Trade] ID={trade.trade_id}  "
                    f"Price={trade.price}  Qty={trade.quantity}"
                )

asyncio.run(main())
13
14
Stream klines
15
stream_klines yields KlineUpdate objects for a specific asset. Each update represents a candlestick bar with open, high, low, close, and volume data.
16
import asyncio
from tplus.client import MarketDataClient
from tplus.model.asset_identifier import AssetIdentifier

MARKET_DATA_URL = "http://localhost:8011"

async def main():
    asset = AssetIdentifier(200)

    async with MarketDataClient(base_url=MARKET_DATA_URL) as md_client:
        print("Listening for kline updates — Ctrl-C to stop.")
        async for kline in md_client.stream_klines(asset):
            print(f"[Kline] {kline}")

asyncio.run(main())
17
18
Combine multiple streams with asyncio.gather
19
Use asyncio.gather to run several streams concurrently in the same process. Each stream runs in its own coroutine; they share the event loop but do not block each other.
20
import asyncio
from tplus.client import MarketDataClient, OrderBookClient
from tplus.model.asset_identifier import AssetIdentifier
from tplus.model.orderbook import OrderBookDiff
from tplus.model.trades import Trade
from tplus.utils.user import load_user

MARKET_DATA_URL = "http://localhost:8011"
OMS_URL = "http://127.0.0.1:8000"

async def watch_depth(md_client: MarketDataClient, asset: AssetIdentifier):
    async for update in md_client.stream_depth(asset):
        if isinstance(update, OrderBookDiff):
            print(
                f"[Depth] Seq={update.sequence_number}  "
                f"Asks={len(update.asks)}  Bids={len(update.bids)}"
            )

async def watch_trades(md_client: MarketDataClient):
    async for trade in md_client.stream_finalized_trades():
        if isinstance(trade, Trade):
            print(
                f"[Trade] ID={trade.trade_id}  "
                f"Price={trade.price}  Qty={trade.quantity}"
            )

async def watch_my_orders(oms_client: OrderBookClient):
    async for event in oms_client.stream_orders():
        print(f"[Order Event] {event}")

async def main():
    asset = AssetIdentifier(200)
    user = load_user("alice")

    async with (
        MarketDataClient(base_url=MARKET_DATA_URL) as md_client,
        OrderBookClient(OMS_URL, default_user=user) as oms_client,
    ):
        print("All streams active — Ctrl-C to stop.")
        await asyncio.gather(
            watch_depth(md_client, asset),
            watch_trades(md_client),
            watch_my_orders(oms_client),
        )

asyncio.run(main())
21
When using asyncio.gather, an exception in one coroutine propagates to the gather call. Wrap individual stream coroutines in try/except if you want one stream failure to be non-fatal.

User-specific streams via OrderBookClient

While MarketDataClient covers public market data, OrderBookClient exposes two streams that are scoped to your account:
MethodYieldsDescription
stream_orders()OrderEventStatus updates for your own orders (accepted, partially filled, cancelled, etc.)
stream_user_trade_events()TradeEventEvery fill event where you are a counterparty
import asyncio
from tplus.client import OrderBookClient
from tplus.utils.user import load_user

OMS_URL = "http://127.0.0.1:8000"

async def main():
    user = load_user("alice")

    async with OrderBookClient(OMS_URL, default_user=user) as client:
        # Stream your order status updates.
        async for event in client.stream_orders():
            print(f"[My Order] {event}")

asyncio.run(main())
import asyncio
from tplus.client import OrderBookClient
from tplus.utils.user import load_user

OMS_URL = "http://127.0.0.1:8000"

async def main():
    user = load_user("alice")

    async with OrderBookClient(OMS_URL, default_user=user) as client:
        # Stream every trade event where you are a participant.
        async for event in client.stream_user_trade_events():
            print(f"[My Trade] {event}")

asyncio.run(main())
All stream methods are async generators. They run until the WebSocket connection is closed or the loop is cancelled — there is no built-in timeout. Press Ctrl-C in an interactive script or cancel the task programmatically when you are done.

WebSocket Streaming Reference

Full reference for all streaming methods, message types, and reconnection behaviour.