MarketDataClient for public market feeds (depth, trades, klines) that require no authentication, and OrderBookClient for user-specific streams such as order status updates and personal trade events. Both clients expose async generators — you iterate over them with async for and press Ctrl-C to stop.
The two streaming clients
| Client | Streams | Auth needed? |
|---|---|---|
MarketDataClient | stream_depth, stream_finalized_trades, stream_klines | No |
OrderBookClient | stream_orders, stream_user_trade_events | Yes — requires a User |
MarketDataClient is read-only and requires no user account. Point it at the market-data service endpoint (default: http://localhost:8011).import asyncio
from tplus.client import MarketDataClient
from tplus.model.asset_identifier import AssetIdentifier
MARKET_DATA_URL = "http://localhost:8011" # Replace with your endpoint
async def main():
asset = AssetIdentifier(200) # registry index form
async with MarketDataClient(base_url=MARKET_DATA_URL) as md_client:
print("MarketDataClient ready")
# Streaming calls go here.
asyncio.run(main())
stream_depth yields OrderBookDiff objects that describe incremental changes to the order book. Each update carries a sequence_number so you can detect gaps, plus updated asks and bids levels.import asyncio
from tplus.client import MarketDataClient
from tplus.model.asset_identifier import AssetIdentifier
from tplus.model.orderbook import OrderBookDiff
MARKET_DATA_URL = "http://localhost:8011"
async def main():
asset = AssetIdentifier(200)
async with MarketDataClient(base_url=MARKET_DATA_URL) as md_client:
print("Listening for depth updates — Ctrl-C to stop.")
async for update in md_client.stream_depth(asset):
if isinstance(update, OrderBookDiff):
print(
f"[Depth] Seq={update.sequence_number} "
f"Asks={len(update.asks)} Bids={len(update.bids)}"
)
asyncio.run(main())
stream_depth sends incremental diffs, not full snapshots. To build a local
order book, call get_orderbook_snapshot(asset_id) once to seed your state,
then apply each OrderBookDiff in sequence order on top of it.stream_finalized_trades yields Trade objects for every matched trade on the exchange — across all assets.import asyncio
from tplus.client import MarketDataClient
from tplus.model.trades import Trade
MARKET_DATA_URL = "http://localhost:8011"
async def main():
async with MarketDataClient(base_url=MARKET_DATA_URL) as md_client:
print("Listening for finalized trades — Ctrl-C to stop.")
async for trade in md_client.stream_finalized_trades():
if isinstance(trade, Trade):
print(
f"[Trade] ID={trade.trade_id} "
f"Price={trade.price} Qty={trade.quantity}"
)
asyncio.run(main())
stream_klines yields KlineUpdate objects for a specific asset. Each update represents a candlestick bar with open, high, low, close, and volume data.import asyncio
from tplus.client import MarketDataClient
from tplus.model.asset_identifier import AssetIdentifier
MARKET_DATA_URL = "http://localhost:8011"
async def main():
asset = AssetIdentifier(200)
async with MarketDataClient(base_url=MARKET_DATA_URL) as md_client:
print("Listening for kline updates — Ctrl-C to stop.")
async for kline in md_client.stream_klines(asset):
print(f"[Kline] {kline}")
asyncio.run(main())
Use
asyncio.gather to run several streams concurrently in the same process. Each stream runs in its own coroutine; they share the event loop but do not block each other.import asyncio
from tplus.client import MarketDataClient, OrderBookClient
from tplus.model.asset_identifier import AssetIdentifier
from tplus.model.orderbook import OrderBookDiff
from tplus.model.trades import Trade
from tplus.utils.user import load_user
MARKET_DATA_URL = "http://localhost:8011"
OMS_URL = "http://127.0.0.1:8000"
async def watch_depth(md_client: MarketDataClient, asset: AssetIdentifier):
async for update in md_client.stream_depth(asset):
if isinstance(update, OrderBookDiff):
print(
f"[Depth] Seq={update.sequence_number} "
f"Asks={len(update.asks)} Bids={len(update.bids)}"
)
async def watch_trades(md_client: MarketDataClient):
async for trade in md_client.stream_finalized_trades():
if isinstance(trade, Trade):
print(
f"[Trade] ID={trade.trade_id} "
f"Price={trade.price} Qty={trade.quantity}"
)
async def watch_my_orders(oms_client: OrderBookClient):
async for event in oms_client.stream_orders():
print(f"[Order Event] {event}")
async def main():
asset = AssetIdentifier(200)
user = load_user("alice")
async with (
MarketDataClient(base_url=MARKET_DATA_URL) as md_client,
OrderBookClient(OMS_URL, default_user=user) as oms_client,
):
print("All streams active — Ctrl-C to stop.")
await asyncio.gather(
watch_depth(md_client, asset),
watch_trades(md_client),
watch_my_orders(oms_client),
)
asyncio.run(main())
User-specific streams via OrderBookClient
WhileMarketDataClient covers public market data, OrderBookClient exposes two streams that are scoped to your account:
| Method | Yields | Description |
|---|---|---|
stream_orders() | OrderEvent | Status updates for your own orders (accepted, partially filled, cancelled, etc.) |
stream_user_trade_events() | TradeEvent | Every fill event where you are a counterparty |
All stream methods are async generators. They run until the WebSocket
connection is closed or the loop is cancelled — there is no built-in timeout.
Press Ctrl-C in an interactive script or cancel the task programmatically
when you are done.
WebSocket Streaming Reference
Full reference for all streaming methods, message types, and reconnection behaviour.