Support

Live data: Introduction

Overview

In this example, we'll show how to:

  • Create subscriptions for live data
  • Start and stop streams
  • Handle different record types
Info
Info

A subscription is required to access live data. See the Plans and live data page to sign up for a subscription.

The first step in using the Live API is to create an instance of the Live client. This requires a valid API key, which can be found on the API Keys page. You can also pass your API key into your environment directly. This is recommended for production applications.

import databento as db

live_client = db.Live("YOUR_API_KEY")

Creating subsciptions

Before any live data is sent, you must first create subscriptions for the data you are interested in with Live.subscribe.

First, you will need to determine the dataset you are interested in. The dataset ID can be found in the dataset documentation, or from the online portal. All subscriptions must be for the same dataset for a given instance of the Live client.

A subscription can be for one or many symbols in a dataset. Databento supports many symbology types, which are described in our symbology documentation. Our live client supports streaming all symbols on a single connection with ALL_SYMBOLS.

A subscription must include a schema. The available schemas for a dataset can be found with metadata.list_schemas in the Historical API. A list of all schemas and the fields for each schema can be found in the schema documentation.

Multiple subscriptions for different schemas can be made with the same instance of the Live client, as long as they are all for the same dataset.

A live connection may optionally be started at a specified time within the last 24 hours by setting the start parameter in Live.subscribe to utilize intraday replay. The stream will replay all messages beginning at the specified time, then continue with real-time data once caught up.

In the example below, we will create two subscriptions using continuous contract symbology. One subscription is for the OHLCV-1s schema for the continuous contract for both ES and NQ. The other subscription is for the BBO-1s schema for the CL continuous contract.

See also
See also

Along with continuous contract symbology, Databento support many other symbology types.

import databento as db

live_client = db.Live("YOUR_API_KEY")

live_client.subscribe(
    dataset="GLBX.MDP3",
    symbols=["ES.v.0", "NQ.v.0"],
    stype_in="continuous",
    schema="ohlcv-1s",
)

live_client.subscribe(
    dataset="GLBX.MDP3",
    symbols="CL.v.0",
    stype_in="continuous",
    schema="bbo-1s",
)

Starting and stopping streams

Before starting a stream, you can add a callback function with Live.add_callback. This function will be called for every record in the stream.

A stream can be started with Live.start. Once a stream has been started, data will begin flowing.

See also
See also

Live streams can also be started by iterating over the live client, which will yield records as they arrive.

A stream can be stopped with Live.stop. A stream can be also be stopped after a specified period of time with Live.block_for_close. Omitting a timeout will keep the session open indefinitely.

It is possible for a stream to end unexpectedly, due to a client-side network issue or error sent from the gateway. The error detection guide outlines different methods for clients to monitor their connection.

Info
Info

Visit the recovering from a disconnection guide for more information on reconnecting.

Handling different record types

After the stream has started, there are multiple record types that you can expect to see, regardless of the schemas you subscribed to.

  • SymbolMappingMsg: These records are sent for each symbol subscribed to. These will be sent before any other records for that instrument are sent. These messages contain the subscription input symbol, as well as the resolved instrument_id and raw_symbol. Since every record contains an instrument_id fields, it's suggested to keep a dictionary of these symbol mappings.
  • SystemMsg: These records are used to indicate non-error information, such as heartbeats and successful subscriptions.
  • ErrorMsg: These records are used to indicate errors, such as failed authentiation or exceeding connection limits.
import databento as db

live_client = db.Live("YOUR_API_KEY")

live_client.subscribe(
    dataset="GLBX.MDP3",
    symbols=["ES.v.0", "NQ.v.0"],
    stype_in="continuous",
    schema="ohlcv-1s",
)

live_client.subscribe(
    dataset="GLBX.MDP3",
    symbols="CL.v.0",
    stype_in="continuous",
    schema="bbo-1s",
)

symbol_map: dict[int, str] = {}

def on_msg(msg: db.DBNRecord):
    match msg.rtype:
        case db.RType.SYMBOL_MAPPING:
            symbol_map[msg.instrument_id] = msg.stype_out_symbol
        case db.RType.SYSTEM:
            print(f"System message: {msg.msg}")
        case db.RType.ERROR:
            print(f"Error message : {msg.err}")
        case db.RType.OHLCV_1S:
            symbol = symbol_map[msg.instrument_id]
            print(f"Received OHLCV message for {symbol}: {msg}")
        case db.RType.BBO_1S:
            symbol = symbol_map[msg.instrument_id]
            print(f"Received BBO message for {symbol}: {msg}")
        case _:
            print(f"Record type not handled: {msg.rtype}")

live_client.add_callback(on_msg)

live_client.start()

live_client.block_for_close(15)
System message: Subscription request 0 for ohlcv-1s data succeeded
System message: Subscription request 1 for bbo-1s data succeeded
Received OHLCV message for ESH6: OhlcvMsg { hd: RecordHeader { length: 14, rtype: Ohlcv1S, publisher_id: GlbxMdp3Glbx, instrument_id: 42140878, ts_event: 1770126711000000000 }, open: 7015.500000000, high: 7015.500000000, low: 7015.500000000, close: 7015.500000000, volume: 11 }
Received OHLCV message for NQH6: OhlcvMsg { hd: RecordHeader { length: 14, rtype: Ohlcv1S, publisher_id: GlbxMdp3Glbx, instrument_id: 42002475, ts_event: 1770126711000000000 }, open: 25969.250000000, high: 25969.250000000, low: 25968.500000000, close: 25968.500000000, volume: 8 }
System message: End of interval for ohlcv-1s
Received BBO message for CLH6: BboMsg { hd: RecordHeader { length: 20, rtype: Bbo1S, publisher_id: GlbxMdp3Glbx, instrument_id: 191166, ts_event: 1770126708912707389 }, price: 62.550000000, size: 1, side: 'B', flags: LAST (128), ts_recv: 1770126712000000000, sequence: 123373636, levels: [BidAskPair { bid_px: 62.550000000, ask_px: 62.560000000, bid_sz: 3, ask_sz: 10, bid_ct: 3, ask_ct: 8 }] }
...
System message: End of interval for bbo-1s
Received OHLCV message for ESH6: OhlcvMsg { hd: RecordHeader { length: 14, rtype: Ohlcv1S, publisher_id: GlbxMdp3Glbx, instrument_id: 42140878, ts_event: 1770126712000000000 }, open: 7015.500000000, high: 7015.500000000, low: 7015.500000000, close: 7015.500000000, volume: 17 }
Received OHLCV message for NQH6: OhlcvMsg { hd: RecordHeader { length: 14, rtype: Ohlcv1S, publisher_id: GlbxMdp3Glbx, instrument_id: 42002475, ts_event: 1770126712000000000 }, open: 25968.500000000, high: 25969.500000000, low: 25968.500000000, close: 25969.500000000, volume: 2 }
System message: End of interval for ohlcv-1s
Received BBO message for CLH6: BboMsg { hd: RecordHeader { length: 20, rtype: Bbo1S, publisher_id: GlbxMdp3Glbx, instrument_id: 191166, ts_event: 1770126708912707389 }, price: 62.550000000, size: 1, side: 'B', flags: LAST (128), ts_recv: 1770126713000000000, sequence: 123373933, levels: [BidAskPair { bid_px: 62.550000000, ask_px: 62.560000000, bid_sz: 5, ask_sz: 10, bid_ct: 5, ask_ct: 8 }] }
System message: End of interval for bbo-1s