Analyzing live market data with Databento and RisingWave
In this tutorial, we'll combine Databento 's market data streams with RisingWave 's stream processing capabilities to create a live market analysis system.
Sourcing high-quality market data can be challenging, especially when dealing with multiple financial exchanges. Databento offers a powerful but lightweight solution with comprehensive coverage for live and historical market data, designed to be:
- Easy-to-use : A small API surface and straightforward protocols enable users to get started in minutes.
- Fast : 6.1-microsecond normalization latency, and close to zero data gaps with FPGA-based capture.
- Versatile : Supports multiple asset classes and venues with a unified message format. Listen to every order book message or hundreds of thousands of symbols at once.
Databento provides normalized market data through a modern API. Think of it as your window into the markets: you get both historical and real-time data in consistent formats, regardless of which exchange you're interested in.
RisingWave helps you analyze streaming data in real-time using familiar SQL. If you've worked with databases before, you'll feel right at home. The Python SDK makes it even easier to integrate RisingWave into your Python applications.
Before we dive in, you'll need:
- A Databento account and license for real-time data. Licensing is a fairly straightforward process and can be instant for non-professional users — check out Databento's licensing guide for full details!
- A running RisingWave instance. The easiest way is to create a free RisingWave Cloud account. It’ll take a couple of minutes. You can also install the open-source version and start it locally. For detailed instructions, see the quick start guide .
- Python 3.7+.
Install the required Python packages to your Python environment.
pip3 install databento pip3 install risingwave-py
Let's start with E-mini S&P 500 futures data - it's one of the most liquid futures contracts and perfect for learning. Here's how to get the live data:
import databento as db
client = db.Live()
client.subscribe(
dataset="GLBX.MDP3",
schema="trades",
stype_in="parent",
symbols="ES.FUT",
)
# Print incoming data for testing
for record in client:
print(record)
Now Let’s connect to RisingWave. The parameters shown below are for connecting to a local instance of RisingWave. For Cloud users, you can grab the connection string by navigating to your cluster and clicking Connect from your cluster card.
from risingwave import RisingWave, RisingWaveConnOptions
rw = RisingWave(
RisingWaveConnOptions.from_connection_info(
host="localhost", port=4566, user="root", password="root", database="dev"
)
)
To store the data from Databento, we need to create a table in RisingWave with the correct schema.
with rw.getconn() as conn:
conn.execute("""
CREATE TABLE IF NOT EXISTS es_futures_live (
timestamp TIMESTAMPTZ,
symbol VARCHAR,
price NUMERIC,
size BIGINT
);
""")
We can define a materialized view for each metric we want to get real-time results for. Materialized views in RisingWave will process data continuously as new data records arrive.
with rw.getconn() as conn:
conn.execute("""
CREATE MATERIALIZED VIEW vwap_analysis_live AS
SELECT
window_start,
SUM(price * size) / SUM(size) as vwap,
AVG(price) as simple_avg_price,
SUM(size) as total_volume,
COUNT(*) as trade_count
FROM TUMBLE(es_futures_live, timestamp, INTERVAL '5 SECONDS')
GROUP BY window_start;
""")
This materialized view gives us VWAP calculations every 5 seconds - crucial for measuring trading execution quality. Institutional traders often aim to match or beat VWAP to demonstrate they're getting good prices for their trades.
We can define a function to format data records as soon as they arrive and insert into RisingWave.
async def handle_trade(record):
with rw.getconn() as conn:
timestamp = datetime.fromtimestamp(record.ts_event / 1e9)
params = {
"timestamp": timestamp,
"symbol": record.symbol,
"price": float(record.price),
"size": int(record.size)
}
conn.execute("""
INSERT INTO es_futures_live
(timestamp, symbol, price, size)
VALUES (:timestamp, :symbol, :price, :size)
""", params)
As our materialized view contains time-windowed values, we may want to subscribe to its changes to make our application event-driven. For that purpose, we’ll need to define a change event handler first, and then use that handler to subscribe to changes in the materialized view.
# Event handler for MV changes
def handle_vwap_changes(event_df: pd.DataFrame) -> None:
# Only include update operations
event_df = event_df[event_df["op"].isin(["Insert", "UpdateInsert"])]
if event_df.empty:
return
# Format the dataframe for printing
event_df = event_df.rename(
{
"window_start": "Timestamp",
"symbol": "Symbol",
"vwap": "VWAP",
"simple_avg_price": "Avg Price",
"total_volume": "Volume",
"trade_count": "Trades",
},
axis=1,
)
event_df = event_df.drop(["op", "rw_timestamp"], axis=1)
event_df = event_df.set_index(["Timestamp", "Symbol"])
print()
print("VWAP Analysis Update:")
print(event_df)
# Subscribe to MV changes
threading.Thread(
target=lambda: rw.on_change(
subscribe_from="vwap_analysis_live",
handler=handle_vwap_changes,
output_format=OutputFormat.DATAFRAME,
persist_progress=False,
max_batch_size=10
)
).start()
Now let’s put everything together and see how the results might look like.
from risingwave import RisingWave, RisingWaveConnOptions, OutputFormat
import databento as db
import pandas as pd
import threading
# Setup database function
def setup_database(rw: RisingWave) -> None:
with rw.getconn() as conn:
conn.execute(
"""
CREATE TABLE IF NOT EXISTS es_futures_live (
timestamp TIMESTAMP,
symbol VARCHAR,
price DOUBLE PRECISION,
size BIGINT
)"""
)
conn.execute(
"""
CREATE MATERIALIZED VIEW IF NOT EXISTS vwap_analysis_live AS
SELECT
window_start,
symbol,
SUM(price * size) / SUM(size) as vwap,
AVG(price) as simple_avg_price,
SUM(size) as total_volume,
COUNT(*) as trade_count
FROM TUMBLE(es_futures_live, timestamp, INTERVAL '5 SECONDS')
GROUP BY window_start, symbol;"""
)
# Event handler for MV changes
def handle_vwap_changes(event_df: pd.DataFrame) -> None:
# Only include update operations
event_df = event_df[event_df["op"].isin(["Insert", "UpdateInsert"])]
if event_df.empty:
return
# Format the dataframe for printing
event_df = event_df.rename(
{
"window_start": "Timestamp",
"symbol": "Symbol",
"vwap": "VWAP",
"simple_avg_price": "Avg Price",
"total_volume": "Volume",
"trade_count": "Trades",
},
axis=1,
)
event_df = event_df.drop(["op", "rw_timestamp"], axis=1)
event_df = event_df.set_index(["Timestamp", "Symbol"])
print()
print("VWAP Analysis Update:")
print(event_df)
def main() -> None:
# Initialize RisingWave connection
rw = RisingWave(
RisingWaveConnOptions.from_connection_info(
host="localhost", port=4566, user="root", password="root", database="dev"
)
)
setup_database(rw)
# Subscribe to MV changes
threading.Thread(
target=lambda: rw.on_change(
subscribe_from="vwap_analysis_live",
handler=handle_vwap_changes,
output_format=OutputFormat.DATAFRAME,
persist_progress=False,
max_batch_size=10,
)
).start()
# Subscribe to CME data through Databento
db.enable_logging()
client = db.Live()
client.subscribe(
dataset="GLBX.MDP3",
schema="trades",
stype_in="parent",
symbols="ES.FUT",
)
# Send trades to RisingWave
with rw.getconn() as conn:
for record in client:
# Only handle Trade records
if not isinstance(record, db.TradeMsg):
continue
# Get the human-readable symbol name
symbol = client.symbology_map.get(record.instrument_id)
if symbol is None:
continue
params = {
"timestamp": record.pretty_ts_recv,
"symbol": symbol,
"price": record.pretty_price,
"size": record.size,
}
conn.execute(
"""
INSERT INTO es_futures_live
(timestamp, symbol, price, size)
VALUES (:timestamp, :symbol, :price, :size)""",
params,
)
if __name__ == "__main__":
main()
Below is an example of the dynamic output from the live VWAP analysis, which is continuously updated in real-time as new market data becomes available.
| Timestamp | Symbol | VWAP | Avg Price | Volume | Trades | |---------------------|---------------|----------|-----------|--------|--------| | 2024-12-13 20:03:30 | ESZ4-ESH5 | 69.3259 | 69.3250 | 297 | 44 | | 2024-12-13 20:03:35 | ESZ4 | 6057.875 | 6057.875 | 22 | 22 | | 2024-12-13 20:03:35 | ESZ4-ESH5 | 69.3313 | 69.3200 | 1265 | 55 | | 2024-12-13 20:03:40 | ESH5 | 6127.587 | 6127.588 | 1199 | 407 | | 2024-12-13 20:03:40 | ESZ4 | 6058.076 | 6058.034 | 3839 | 1045 |
Now that you have real-time market data flowing through your system, you could:
- Add more sophisticated analyses like order flow imbalance,
- Create trading signals based on VWAP crossovers,
- Build a dashboard to visualize your metrics.