Support

API reference - Live

You can receive live data from Databento via our live APIs, namely the Databento Raw API.

The Raw API is a simple socket-based, subscription-style protocol. Clients communicate with our live data gateways through a regular TCP/IP socket. To make it easier to integrate the API, we also provide official client libraries that simplify the code you need to write.

You can use our live APIs to subscribe to real-time data in your application. You can also use the APIs to request intraday historical data and instrument definitions for any number of products in a venue.

LIVE DATA
Client Libraries
Python
Python
C++
C++
Rust
Rust
APIs
Raw
Raw
$
pip install -U databento
209

Basics

Overview

Databento's live API offers both real-time subscriptions and intraday replay within the last 24 hours to various market data schemas and symbols. Multiple subscriptions may be combined into a single session, such as trades, subsampled data (second, minute, hour, daily aggregates), and definitions (expirations, settlement, etc.). For an easy transition from backtesting to live trading, the live API supports the same schemas, datasets, and symbology as the historical API and returns the same record structures. Our live clients use our binary DBN encoding for performant zero-copy market data decoding.

Authentication

Databento uses API keys to authenticate requests. You can view and manage your keys on the API Keys page of your portal.

Each API key is a 32-character string. By default, our library uses the environment variable DATABENTO_API_KEY as your API key. However, if you pass an API key to the Live constructor through the key parameter, then this value will be used instead.

Our Live API uses a challenge-response authentication mechanism to ensure that your API key is never sent over the network.

Related: Securing your API keys.

Example usage
import databento as db

# Create a live client
client = db.Live(key="$YOUR_API_KEY")

# Authentication happens on the first subscribe
client.subscribe(
    dataset="GLBX.MDP3",
    schema="trades",
    stype_in="parent",
    symbols="ES.FUT",
)

Sessions

Each instance of the Live client manages a single Raw API session. Each session is associated with one dataset.

A session will begin streaming when Live.start is called. A session can be stopped gracefully with Live.stop or forcefully with Live.terminate.

A session can also be stopped by specifying a timeout with Live.block_for_close for synchronous applications and Live.wait_for_close for asynchronous applications.

See also
See also

Connection limits for more details.

Schemas and conventions

A schema is a data record format represented as a collection of different data fields. Our datasets support multiple schemas, such as order book, trades, bar aggregates, and so on. You can get a dictionary describing the fields of each schema from our List of market data schemas.

You can get a list of all supported schemas for any given dataset using the Historical client's list_schemas method. The same information can also be found on the dataset details pages on the user portal.

The following table provides details about the data types and conventions used for various fields that you will commonly encounter in the data.

Name Field Description
Dataset dataset A unique string name assigned to each dataset by Databento. Full list of datasets can be found from the metadata.
Publisher ID publisher_id A unique 16-bit unsigned integer assigned to each publisher by Databento. Full list of publisher IDs can be found from the metadata.
Instrument ID instrument_id A unique 32-bit unsigned integer assigned to each instrument by the venue. Information about instrument IDs for any given dataset can be found in the symbology.
Order ID order_id A unique 64-bit unsigned integer assigned to each order by the venue.
Timestamp (event) ts_event The matching-engine-received timestamp expressed as the number of nanoseconds since the UNIX epoch.
Timestamp (receive) ts_recv The capture-server-received timestamp expressed as the number of nanoseconds since the UNIX epoch.
Timestamp delta (in) ts_in_delta The matching-engine-sending timestamp expressed as the number of nanoseconds before ts_recv. See timestamping guide.
Timestamp out ts_out The Databento gateway-sending timestamp expressed as the number of nanoseconds since the UNIX epoch. See timestamping guide.
Price price The price expressed as signed integer where every 1 unit corresponds to 1e-9, i.e. 1/1,000,000,000 or 0.000000001.
Book side side The side that initiates the event. Can be Ask for a sell order (or sell aggressor in a trade), Bid for a buy order (or buy aggressor in a trade), or None where no side is specified by the original source.
Size size The order quantity.
Flag flag A bit field indicating event end, message characteristics, and data quality.
Action action The event type or order book operation. Can be Add, Cancel, Modify, cleaR book, Trade, Fill, or None.
Sequence number sequence The original message sequence number from the venue.

Datasets

Databento provides time series datasets for a variety of markets, sourced from different publishers. Our available datasets can be browsed through the search feature on our site.

Each dataset is assigned a unique string identifier (dataset ID) in the form PUBLISHER.DATASET, such as GLBX.MDP3. For publishers that are also markets, we use standard four-character ISO 10383 Market Identifier Codes (MIC). Otherwise, Databento arbitrarily assigns a four-character identifier for the publisher.

These dataset IDs are also found on the Data catalog and Download request features of the Databento user portal.

When a publisher provides multiple data products with different levels of granularity, Databento subscribes to the most-granular product. We then provide this dataset with alternate schemas to make it easy to work with the level of detail most appropriate for your application.

More information about different types of venues and publishers is available in our FAQs.

Symbology

Databento's live API supports several ways to select an instrument in a dataset. An instrument is specified using a symbol and a symbology type, also referred to as an stype. The supported symbology types are:

  • Raw symbology (raw_symbol) original string symbols used by the publisher in the source data.
  • Instrument ID symbology (instrument_id) unique numeric ID assigned to each instrument by the publisher.
  • Parent symbology (parent) groups instruments related to the market for the same underlying.
  • Continuous contract symbology (continuous) proprietary symbology that specifies instruments based on certain systematic rules.
Info
Info

In the live API, existing subscriptions to continuous contracts will not be remapped to different instruments. However, submitting a new identical subscription may result in a new mapping.

When subscribing to live data, an input symbology type can be specified. By default, our client libraries will use raw symbology for the input type. Not all symbology types are supported for every dataset.

For live data, symbology mappings are provided through SymbolMappingMsg records. These records are sent after the session has started and can be used to map the instrument_id from a data record's header to a text symbol.

For more about symbology at Databento, see our Standards and conventions.

Dates and times

Our Python client library has several functions with timestamp arguments. These arguments will have type pandas.Timestamp | datetime.date | str | int and support a variety of formats.

It's recommended to use pandas.Timestamp, which fully supports timezones and nanosecond-precision. If a datetime.date is used, the time is set to midnight UTC. If an int is provided, the value is interpreted as UNIX nanoseconds.

The client library also handles several string-based timestamp formats based on ISO 8601.

  • yyyy-mm-dd, e.g. "2022-02-28" (midnight UTC)
  • yyyy-mm-ddTHH:MM, e.g. "2022-02-28T23:50"
  • yyyy-mm-ddTHH:MM:SS, e.g. "2022-02-28T23:50:59"
  • yyyy-mm-ddTHH:MM:SS.NNNNNNNNN, e.g. "2022-02-28T23:50:59.123456789"

Timezone specification is also supported.

  • yyyy-mm-ddTHH:MMZ
  • yyyy-mm-ddTHH:MM±hh
  • yyyy-mm-ddTHH:MM±hhmm
  • yyyy-mm-ddTHH:MM±hh:mm

Bare dates

Some parameters require a bare date, without a time. These arguments have type datetime.date | str and must either be a datetime.date object, or a string in yyyy-mm-dd format, e.g. "2022-02-28".

Intraday replay

Our live API offers intraday replay within the last 24 hours. Users can connect to the live service and request data from a particular start time. Data will be filtered on ts_event for all schemas except CBBO and BBO, which will be filtered on ts_recv.

A different start time can be specified for each subscription. There can be multiple subscriptions for the same schema, with each subscription having a different start time. When the session starts, records will be immediately replayed for each schema. A replay completed SystemMsg will be sent for each replayed schema once it catches up to real-time data. Once a session has started, newly added subscriptions are not eligible for intraday replay.

As a special case for the GLBX.MDP3 dataset, we also provide replay of the entire weekly session in the MBO and definition schemas outside of the 24-hour window to aid with recovery, as these schemas are highly stateful.

Pass start=0 to easily request the full replay history available for each schema.

Our Python client library supports several convenient date formats, such as pd.Timestamp, Python datetime, Python date, ISO 8601 strings, or UNIX timestamps in nanoseconds. Refer to the Dates and times article for more information on how our client library handles timestamps.

Example usage
import databento as db

# Create a live client
client = db.Live(key="$YOUR_API_KEY")

# Subscribe with a specified start time for intraday replay
client.subscribe(
    dataset="GLBX.MDP3",
    schema="trades",
    symbols="ES.FUT",
    stype_in="parent",
    start="2023-04-17T09:00:00",
)

Snapshot

Users can request a snapshot for a live subscription to obtain the recent order book state without replaying the whole trading session. This is only supported for the MBO schema.

More details can be found in this article.

Example usage
import databento as db

# Create a live client
client = db.Live(key="$YOUR_API_KEY")

# Subscribe to live snapshot
client.subscribe(
    dataset="GLBX.MDP3",
    schema="mbo",
    symbols="ES.FUT",
    stype_in="parent",
    snapshot=True,
)

System messages

Our live API uses a system record (SystemMsg) to indicate non-error information to clients.

One use is heartbeating, to ensure the TCP connection remains open and to help clients detect a connection issue. A heartbeat will only be sent if no other data record was sent to the client during the heartbeat interval. The interval between heartbeat messages can be configured with the heartbeat_interval_s parameter to Live. The is_heartbeat() method will return True if the record is a heartbeat.

Field Type Description
msg str The message from the gateway.
code int Describes the type of system message. See table below.
Info
Info

For datasets on DBN version 1 where code is not populated (255), you should parse the msg field to determine the type of message.

System code variants

Corresponds with the SystemCode enum.

Variant code Description
HEARTBEAT 0 A message sent in the absence of other records to indicate the connection remains open.
SUBSCRIPTION_ACK 1 An acknowledgement of a subscription request.
SLOW_READER_WARNING 2 The gateway has detected this session is falling behind real-time data.
REPLAY_COMPLETED 3 Indicates a replay subscription has caught up with real-time data. This message will be sent per schema.
END_OF_INTERVAL 4 Signals that all records for interval-based schemas have been published for the given timestamp.

Errors

Our live API uses an error record (ErrorMsg) to indicate failures to clients. Error records are processed like any other record, and as such will be passed on to any callbacks, streams, and iterators for the Live client.

Field Type Description
err str The error message.
code int Describes the type of error message. See table below.
is_last bool True if this is the last in a series of error records.
Info
Info

For datasets on DBN version 1 where code is not populated (255), you should parse the err field to determine the type of message.

Such errors will close the connection to the gateway.

Error code variants

Corresponds with the ErrorCode enum.

Variant code Description
AuthFailed 1 The authentication step failed.
ApiKeyDeactivated 2 The user account or API key were deactivated.
ConnectionLimitExceeded 3 The user has exceeded their open connection limit.
SymbolResolutionFailed 4 One or more symbols failed to resolve.
InvalidSubscription 5 There was an issue with a subscription request (other than symbol resolution).
InternalError 6 An error occurred in the gateway.

Logging

Our Python client library is fully compatible with the built-in logging module.

When using the live client to build an application, it is recommended to enable logging in the databento module. This logging is disabled by default.

Connection limits

With our live API, there is a limit of 10 simultaneous connections (sessions) per (dataset) per team for Usage-based and Standard plans. Unlimited and Enterprise plans will be limited to 50 simultaneous connections per dataset per team. Creating additional API keys will not affect the maximum number of connections per team.

In addition, a single gateway will allow at most five incoming connections per second from the same IP address. If an IP address goes over this limit, incoming connections will be immediately closed by the gateway - existing connections will not be affected. If this happens, clients should wait one second before retrying.

Subscription rate limits

Symbol resolution is a relatively slow operation, as such, subscription requests are throttled to prevent abuse and accidental performance impact on other users. Subscriptions above the limit of 3 per second will not be rejected; instead, the gateway will wait until the session is back under the rate limit before processing it. The gateway will send a subscription acknowledgement when it has finished processing a subscription request.

Metered pricing

Databento only charges for the data that you use. You can find rates (per MB) for the various datasets and estimate pricing on our Data catalog. We meter the data by its uncompressed size in binary encoding.

When you stream the data, you are billed incrementally for each outbound byte of data sent from our live subscription gateway. If your connection is interrupted while streaming our data and our live gateway detects connection timeout over 5 seconds, it will immediately stop sending data and you will not be billed for the remainder of your request.

Duplicate subscriptions within the same client will be deduplicated and not incur additional charges. Separate sessions with identical subscriptions will incur repeated charges.

Access to metadata, symbology, and account management is free.

Related: Billing management.

Error detection

When maintaining a connected Live client, clients should monitor their connection for errors.

There are three main ways in which a session can enter an error state:

  • Hung connection: The client is not receiving any data from the gateway
  • Disconnect without error: The client is explicitly disconnected by the gateway, without receiving an error message
  • Disconnect with error: The client is explicitly disconnected by the gateway. Immediately prior to being disconnected, the client will receive an error record

Hung connection

To detect hung connections, clients are instructed to make use of system heartbeats. Clients can configure a heartbeat interval when creating the Live client by setting the heartbeat_interval_s parameter. If the heartbeat interval is not set by the client, it will default to 30 seconds.

Once a session is started, if no data is sent by the gateway for the entirety of a heartbeat interval, the gateway will send a system message to the client to indicate a heartbeat. If the gateway is regularly sending other messages to the client (for example, MboMsg), heartbeats will not be sent.

Once a session is started, if a client does not receive any messages from the gateway for the duration of one heartbeat interval plus two seconds, the session can be considered hung. Clients are instructed to disconnect from the gateway and reconnect upon detecting hung connections.

Clients with unstable internet connections may need larger intervals than two seconds to ensure the connection is truly hung, as opposed to merely delayed.

Disconnect without error

From the point of view of the Live client, a disconnect is detected when the underlying TCP session is closed. When using Live.block_for_close or Live.wait_for_close, an exception will be raised if the TCP session was closed without an error. Upon being disconnected, clients are instructed to wait one second and initiate a new connection. Waiting too short an interval to reconnect may trigger the gateway's rate limiter.

See also
See also

Connection limits for more details.

Disconnect with error

From the point of view of the Live client, a disconnect with error is detected when the underlying TCP session is closed after an ErrorMsg or a API error response is received. Any ErrorMsg records received by the client can be consumed by the application, such as when iterating or using a callback with Live.add_callback. When using Live.block_for_close or Live.wait_for_close, any ErrorMsg records received will be raised in an exception.

Clients disconnected with an error are instructed to not reconnect automatically. In the vast majority of cases, reconnecting and resubscribing with the same parameters will lead to the same errors being received again. The error sent to the client will indicate the issue to be fixed prior to resubscribing.

Versioning

Our historical and live APIs and its client libraries adopt MAJOR.MINOR.PATCH format for version numbers. These version numbers conform to semantic versioning. We are using major version 0 for initial development, where our API is not considered stable.

Once we release major version 1, our public API will be stable. This means that you will be able to upgrade minor or patch versions to pick up new functionality, without breaking your integration.

Starting with major versions after 1, we will provide support for previous versions for one year after the date of the subsequent major release. For example, if version 2.0.0 is released on January 1, 2024, then all versions 1.x.y of the API and client libraries will be deprecated. However, they will remain supported until January 1, 2025.

We may introduce backwards-compatible changes between minor versions in the form of:

Our Release notes will contain information about both breaking and backwards-compatible changes in each release.

Our API and official client libraries are kept in sync with same-day releases for major versions. For instance, 1.x.y of the C++ client library will use the same functionality found in any 1.x.y version of the Python client.

Related: Release notes.

Recovering after a disconnection

When reconnecting to the gateway, clients should resubscribe to all desired symbols. In order to avoid missing data after a reconnection, there are three main approaches to recovery:

  • Natural refresh
  • Intraday replay
  • Snapshot (MBO only)

The best approach to recovery will depend on the client's use case and specific needs.

Natural refresh

To recover via natural refresh, clients can resubscribe to all desired symbols without the start or snapshot parameters. This means no data will be replayed, and the client will immediately receive the newest messages upon subscribing.

This recovery approach is the fastest (since there's no data replay), and is recommended for stateless schemas such as MBP-10, in cases where the client only requires the current state of each instrument.

Intraday replay

To recover via intraday replay, clients should store the last ts_event and the number of records received with that last timestamp, per schema and instrument. The ts_event and record count should be continuously updated when processing incoming records.

When reconnecting, clients should set the start parameter of the resubscription to the lowest stored ts_event across all instruments for that schema. The gateway will then send all records starting from that timestamp (including records with the exact same ts_event).

The resubscription may lead to duplicated data being sent to the client. Clients who require that each message is delivered exactly once are instructed to:

  • Discard all records with a lower ts_event than the stored one for the corresponding instrument
  • Discard the first N records with the same ts_event as the stored one for the corresponding instrument, where N is the number of records already seen with that ts_event prior to the disconnection. This is important in case there are multiple events with the same ts_event and the client is disconnected halfway through processing those events

This recovery approach is recommended when clients require the uninterrupted history of records for the desired schema (for example, when using the Trades schema to construct a ticker tape). However, this approach can take a longer time to synchronize with the live stream, depending on how long the client was disconnected.

For the CBBO and BBO schemas where filtering is based on ts_recv, clients should store the last ts_recv per instrument. When reconnecting, clients should set the start parameter of the resubscription to the lowest stored ts_recv across all instruments. The gateway will then send all records starting from that timestamp (including records with the same ts_recv). Since CBBO and BBO are stateless schemas, you should always refer to the most recent record per instrument.

Snapshot (MBO only)

When resubscribing to the MBO schema, clients can request a snapshot to receive the current state of the book after a disconnection. This eliminates the need to replay the missed messages and leads to faster synchronization with the live stream. This recovery approach is generally recommended over intraday replay when using the MBO schema.

Maintenance schedule

We restart our live gateways on Sunday at the following times:

  • CME Globex. 09:30 UTC
  • All ICE venues. 09:45 UTC
  • All other datasets. 10:30 UTC

All clients will be disconnected during this time.

Additionally, we may restart our gateways mid-week. While we generally post these mid-week restarts on our status page, we may perform these restarts without notice due to an urgent fix. You should configure your client to handle reconnecting automatically.

While exchanges are closed on Saturday, our systems are still connected to the exchange feeds. The exchange may send test data, and our gateways will disseminate this data to all connected clients. If you are not interested in receiving this test data, we recommend you disconnect after the Friday close and reconnect on Sunday after the scheduled restart.

Info
Info

Any test data sent through the Live API will not be seen in our historical data.

Client

Live

To access Databento's live API, first create an instance of the Live client. The entire API for a streaming session is exposed through instance methods of the client.

Databento's Live client is built with Python's asyncio module and can be easily integrated into asynchronous applications.

Note that the API key can be passed as a parameter, which is not recommended for production applications. Instead, you can leave out this parameter to pass your API key via the DATABENTO_API_KEY environment variable:

Parameters

key
optional | str
32-character API key. Found on your API keys page. If None then DATABENTO_API_KEY environment variable is used.
gateway
optional | str
The url of the remote gateway to connect to. This is for advanced use.
port
optional | int
The port to connect to the remote gateway on. This is for advanced use.
ts_out
optional | bool, default False
Whether the gateway should append the send timestamp (ts_out) to each record.
heartbeat_interval_s
optional | int, default None
The interval in seconds at which the gateway will send heartbeat records if no other data records are sent. By default heartbeats will be sent at the gateway's default interval.
reconnect_policy
optional | ReconnectPolicy or str, default None
The reconnection policy for automatically resuming a live session when an unexpected disconnection occurs.
API method
class Live(
    key: str | None = None,
    gateway: str | None = None,
    port: int | None = None,
    ts_out: bool = False,
    heartbeat_interval_s: int | None = None,
    reconnect_policy: ReconnectPolicy | str = ReconnectPolicy.NONE,
)
Example usage
import databento as db

# Pass as parameter
client = db.Live(key="$YOUR_API_KEY")

# Or, pass as `DATABENTO_API_KEY` environment variable
client = db.Live()

Live.add_callback

Add a callback to the live client. This callback must take a single record argument. Refer to the What's a schema article for documentation on the fields contained with each record type.

A callback will receive error messages from the gateway with the ErrorMsg record type. This indicates a problem occurred with the session. Exceptions raised in a callback can be handled explicitly by specifying an exception_callback.

The callbacks are executed in the order they are added.

Info
Info

It is recommended that callback functions be non-blocking to ensure the networking event loop remains running.

Parameters

record_callback
required | Callable[[DBNRecord], None]
A callback to dispatch records to. The callback must take a single record argument.
exception_callback
optional | Callable[[Exception], None]
A callback to dispatch exceptions to that are raised when executing record_callback. The callback must take a single exception argument.
API method
Live.add_callback(
    record_callback: Callable[[DBNRecord], None],
    exception_callback: Callable[[Exception], None] | None = None,
) -> None
Example usage
import databento as db

# Create a callback to handle DBN records
def user_callback(record: db.DBNRecord) -> None:
    print(f"callback run for {record}")


# Create a callback to handle exceptions from `user_callback`
def error_handler(exception: Exception) -> None:
    print(f"an error occurred {exception}")


# Create a live client
client = db.Live(key="$YOUR_API_KEY")

# Subscribe to the trades schema for all ES futures
client.subscribe(
    dataset="GLBX.MDP3",
    schema="trades",
    stype_in="parent",
    symbols="ES.FUT",
)

# Add the user_callback to the client
client.add_callback(
    record_callback=user_callback,
    exception_callback=error_handler,  # optional error handler
)

Live.add_stream

Add an output stream to the live client. The client will write binary DBN records to the stream. This stream must be writable in bytes mode, or a path to a file that is writable.

A stream will receive error messages from the gateway with the ErrMsg record type. This indicates a problem occurred with the session. Exceptions raised in a callback can be handled explicitly by specifying an exception_callback.

The writes are performed in the order the streams were added.

See also
See also

DBN records are optimized for stream-like processing.

While pandas DataFrames are not well-suited for this due to their column-oriented format, they can still be used by first streaming DBN data to a file, then converting to a DataFrame with DBNStore.from_file().to_df(). See this example for more information.

Parameters

stream
required | IO[bytes] or PathLike[str] or str
An IO stream to write records to, or a writable file path. The stream must support writing bytes.
exception_callback
optional | Callable[[Exception], None]
A callback to dispatch exceptions to that are raised when writing to the stream. The callback must take a single exception argument.
API method
Live.add_stream(
    stream: IO[bytes] | PathLike[str] | str,
    exception_callback: Callable[[Exception], None] | None = None,
) -> None
Example usage
import databento as db

# Open a file for writing
output = open("output.dbn", mode="wb")


# Create a callback to handle exceptions when writing to `output`
def error_handler(exception: Exception) -> None:
    print(f"an error occurred {exception}")


# Create a live client
client = db.Live(key="$YOUR_API_KEY")

# Subscribe to the trades schema for all ES futures
client.subscribe(
    dataset="GLBX.MDP3",
    schema="trades",
    stype_in="parent",
    symbols="ES.FUT",
)

# Add the output file stream to the client
client.add_stream(
    stream=output,
    exception_callback=error_handler,  # optional error handler
)

Live.add_reconnect_callback

Add a reconnect callback to the live client. This callback must take two arguments. When a reconnection policy is set on the Live client, this callback will be executed when a reconnection occurs with two arguments:

  • The last received record's ts_event timestamp, or start timestamp from the session Metadata if no records were received yet.
  • The start timestamp from the reconnected session's Metadata.

These values can be used to record any gaps in the received data due to a disconnection. Exceptions raised in a callback can be handled explicitly by specifying an exception_callback.

The callbacks are executed in the order they are added.

Info
Info

It is recommended that callback functions be non-blocking to ensure the networking event loop remains running.

Parameters

reconnect_callback
required | Callable[[pd.Timestamp, pd.Timestamp], None]
A callback for handling reconnections. The callback must take two arguments.
exception_callback
optional | Callable[[Exception], None]
A callback to dispatch exceptions to that are raised when executing reconnect_callback. The callback must take a single exception argument.
API method
Live.add_reconnect_callback(
    reconnect_callback: Callable[[pd.Timestamp, pd.Timestamp], None]
    exception_callback: Callable[[Exception], None] | None = None,
) -> None
Example usage
import databento as db

# Create a callback to handle reconnections
def reconnect_callback(start, end) -> None:
    print(f"reconnection gap from {start} to {end}")


# Create a callback to handle exceptions from `reconnect_callback`
def error_handler(exception: Exception) -> None:
    print(f"an error occurred {exception}")


# Create a live client with a reconnect policy
client = db.Live(
    key="$YOUR_API_KEY",
    reconnect_policy="reconnect",
)

# Subscribe to the trades schema for all ES futures
client.subscribe(
    dataset="GLBX.MDP3",
    schema="trades",
    stype_in="parent",
    symbols="ES.FUT",
)

# Add the reconnect callback to the client
client.add_reconnect_callback(
    reconnect_callback=reconnect_callback,
    exception_callback=error_handler,  # optional error handler
)

Live.block_for_close

Block until the session closes or a timeout is reached. A session will close when the remote gateway disconnects or after Live.stop or Live.terminate are called. In the event the connection is closed unexpectedly, a BentoError will be raised.

If a timeout is specified, Live.terminate will be called when the timeout is reached.

When this method unblocks, the session is guaranteed to be closed.

Parameters

timeout
optional | float
A duration in seconds to wait for the session to close. If None, wait forever.
API method
Live.block_for_close(
    timeout: float | None = None,
) -> None
Example usage
import databento as db

# Create a live client
client = db.Live(key="$YOUR_API_KEY")

# Subscribe to the trades schema for all ES futures
client.subscribe(
    dataset="GLBX.MDP3",
    schema="trades",
    stype_in="parent",
    symbols="ES.FUT",
)

# Start the session
client.start()

# Block for the session to close
client.block_for_close()

Live.start

Start the session.

A client can only be started once, and after a successful connection is made by calling Live.subscribe.

A client can be gracefully stopped by calling Live.stop or forcefully using Live.terminate

When iterating records with Live.__iter__ or Live.__aiter__ it is not necessary to call Live.start and doing so before iterating will raise a ValueError.

Info
Info

A session cannot be started more than once.

API method
Live.start() -> None
Example usage
import databento as db

# Create a live client
client = db.Live(key="$YOUR_API_KEY")

# Subscribe to the trades schema for all ES futures
client.subscribe(
    dataset="GLBX.MDP3",
    schema="trades",
    stype_in="parent",
    symbols="ES.FUT",
)

# Print records as they arrive
client.add_callback(print)

# Start the streaming session
client.start()

# Block until disconnection
client.block_for_close()

Live.stop

Stop the session and finish processing received records.

A client can only be stopped after a successful connection is made by calling Live.subscribe.

This method does not block waiting for the connection to close. If this is desired, use Live.block_for_close or Live.wait_for_close to wait for the client to disconnect.

The connection will eventually close after calling this method. Once the connection has closed, the Live client can be reused but the session state is not preserved.

API method
Live.stop() -> None
Example usage
import databento as db

# Create a live client
client = db.Live(key="$YOUR_API_KEY")

# Subscribe to the trades schema for all ES futures
client.subscribe(
    dataset="GLBX.MDP3",
    schema="trades",
    stype_in="parent",
    symbols="ES.FUT",
)

# Start the streaming session
client.start()

# Stop the streaming session
client.stop()

Live.subscribe

Add a new subscription to the session. All subscriptions must be for the same dataset.

Supports multiple subscriptions for different schemas, which enables rich data streams containing mixed record types.

Specify an optional start time for intraday replay with subscriptions made before starting the session.

Please note there is no unsubscribe method. Subscriptions end when the client disconnects or by calling Live.stop or Live.terminate

Parameters

dataset
required | Dataset or str
The dataset code (string identifier). Must be one of the values from list_datasets.
schema
required | Schema or str
The data record schema. Must be one of the values from list_schemas.
symbols
optional | Iterable[str | int] or str or int
The product symbols to subscribe to. If 'ALL_SYMBOLS' or None then will select all symbols.
stype_in
optional | SType or str, default 'raw_symbol'
The symbology type of input symbols. Must be one of 'raw_symbol', 'instrument_id', 'parent', or 'continuous'.
start
optional | pd.Timestamp, datetime, date, str or int
The inclusive start of subscription replay. Filters on ts_event except for the CBBO and BBO schemas, which filter on ts_recv. Takes pd.Timestamp, Python datetime, Python date, ISO 8601 string, or UNIX timestamp in nanoseconds. Assumes UTC as timezone unless otherwise specified. Must be within the last 24 hours. Pass 0 to request all available data. Cannot be specified after the session is started.
snapshot
optional | bool, default False
Request subscription with snapshot. Only supported with mbo schema. Defaults to False. Conflicts with the start parameter.
API method
Live.subscribe(
    dataset: Dataset | str,
    schema: Schema | str,
    symbols: Iterable[str | int] | str | int = "ALL_SYMBOLS",
    stype_in: SType | str = "raw_symbol",
    start: pd.Timestamp | datetime | date | str | int | None = None,
    snapshot: bool = False,
) -> None
Example usage
import databento as db

# Create a live client
client = db.Live(key="$YOUR_API_KEY")

# Subscribe to the trades schema for all ES futures
client.subscribe(
    dataset="GLBX.MDP3",
    schema="trades",
    stype_in="parent",
    symbols="ES.FUT",
)

Live.terminate

Terminate the session and stop processing records immediately.

A client can only be terminated after a successful connection is made by calling Live.subscribe.

Unlike Live.stop, the session will end immediately and received records will no longer be processed.

Once terminated, the Live client can be reused but the session state is not preserved.

API method
Live.terminate() -> None
Example usage
import databento as db

# Create a live client
client = db.Live(key="$YOUR_API_KEY")

# Subscribe to the trades schema for all ES futures
client.subscribe(
    dataset="GLBX.MDP3",
    schema="trades",
    stype_in="parent",
    symbols="ES.FUT",
)

# Start the streaming session
client.start()

# Terminate the streaming session
client.terminate()

Live.wait_for_close

Wait until the session closes or a timeout is reached. A session will close when the remote gateway disconnects, or after Live.stop or Live.terminate are called. In the event the connection is closed unexpectedly, a BentoError will be raised.

If a timeout is specified, Live.terminate will be called when the timeout is reached.

When this method unblocks, the session is guaranteed to be closed.

Info
Info

This method is a coroutine and must be used with an await expression.

Parameters

timeout
optional | float
A duration in seconds to wait for the session to close. If None, wait forever.
API method
Live.wait_for_close(
    timeout: float | None = None,
) -> None
Example usage
import databento as db

# Create a live client
client = db.Live(key="$YOUR_API_KEY")

# Subscribe to the trades schema for all ES futures
client.subscribe(
    dataset="GLBX.MDP3",
    schema="trades",
    stype_in="parent",
    symbols="ES.FUT",
)

# Start the session
client.start()

# Wait for the session to end
await client.wait_for_close()

Live.__aiter__

Using async for; records will be yielded as they arrive. Iteration will stop when the connection is closed and all records are processed. This is best for integration into asynchronous applications.

Asynchronous iteration is only supported inside a coroutine.

Asynchronous iteration will automatically call Live.start if the session is connected but it has not been started. Starting iteration after the session has started will cause a ValueError.

Asynchronous iteration will automatically call Live.stop if the iterator is destroyed, such as when an async for loop is escaped with an exception or break statement. To prevent this behavior, an iterator can be created explicitly using aiter().

API method
Live.__aiter__() -> AsyncIterator[DBNRecord]
Example usage
import databento as db

# Create a live client
client = db.Live(key="$YOUR_API_KEY")

# Subscribe to a data stream
client.subscribe(
    dataset="GLBX.MDP3",
    schema="trades",
    stype_in="parent",
    symbols="ES.FUT",
)

# Start streaming and asynchronously iterate the records
async for record in client:
    print(record)

Live.__iter__

Using for; records will be yielded as they arrive. Iteration will stop when the connection is closed and all records are processed. This is best for integration into simple synchronous applications.

Iteration will automatically call Live.start if the session is connected but it has not been started. Starting iteration after the session has started will cause a ValueError.

Iteration will automatically call Live.stop if the iterator is destroyed, such as when a for loop is escaped with an exception or break statement. To prevent this behavior, an iterator can be created explicitly using iter().

API method
Live.__iter__() -> Iterator[DBNRecord]
Example usage
import databento as db

# Create a live client
client = db.Live(key="$YOUR_API_KEY")

# Subscribe to a data stream
client.subscribe(
    dataset="GLBX.MDP3",
    schema="trades",
    stype_in="parent",
    symbols="ES.FUT",
)

# Start streaming and synchronously iterate the records
for record in client:
    print(record)