It's recommended to first start with the Order tracking and Limit order book examples.
Queue position of an order
See also
Overview
In this example, we will use a complete limit order book to track the queue position of an order. This example assumes a First-In-First-Out (FIFO) matching algorithm is being utilized. FIFO is the most common algorithm used in modern matching engines.
When market participants have active orders in different instruments and contract months, knowing exact queue positions can be important in managing execution risk.
The ability to track orders as they move throughout the limit order book can be utilized as a trading signal in automated strategies.
Design
We will use the MBO schema ("L3" data), which assigns a unique ID to every order in the book, to exactly track this queue position. The logic for this calculation is a straightforward extension of our limit order book example.
We will also use the MBP-10 schema ("L2" data) to estimate queue position, assuming we did not have access to MBO data.
Suppose p(x) denotes the probability of a cancel in front when your order is positioned 0ā¤xā¤1 in the queue.
- When you first join the queue, p(1)=1, since all cancels must happen in front of you.
- When you are at the front of the queue, p(0)=0, since all cancels must happen from behind.
A naive assumption is that orders cancel uniformly through the queue, i.e. p(x)=x. In reality, cancels are more likely to happen behind you vs. in front of you, since the orders in front of you have value from their time priority.
We can model this cancellation bias with a parameter k, ranging from -1 to 1. k=0 results in an unbiased, uniform distribution.
pkā(x)=ā©āØā§āx1+kx1ā(1āx)1ākāif k<0if k=0if k>0āWe'll take a look at this uniform cancellation estimation (k=0), as well as a few weighted estimations that assumes a higher probability of cancels from behind (k=ā0.8 and k=ā0.95).
Example
from __future__ import annotations
import os
import random
from collections import defaultdict
from dataclasses import dataclass, field
from itertools import takewhile
import databento as db
import matplotlib.pyplot as plt
import pandas as pd
from sortedcontainers import SortedDict
@dataclass
class PriceLevel:
price: int
size: int = 0
count: int = 0
def __str__(self) -> str:
price = self.price / db.FIXED_PRICE_SCALE
return f"{self.size:4} @ {price:6.2f} | {self.count:2} order(s)"
@dataclass
class LevelOrders:
price: int
orders: list[db.MBOMsg] = field(default_factory=list, compare=False)
def __bool__(self) -> bool:
return bool(self.orders)
@property
def level(self) -> PriceLevel:
return PriceLevel(
price=self.price,
count=sum(1 for o in self.orders if not o.flags & db.RecordFlags.F_TOB),
size=sum(o.size for o in self.orders),
)
@dataclass
class Book:
orders_by_id: dict[int, db.MBOMsg] = field(default_factory=dict)
offers: SortedDict[int, LevelOrders] = field(default_factory=SortedDict)
bids: SortedDict[int, LevelOrders] = field(default_factory=SortedDict)
def bbo(self) -> tuple[PriceLevel | None, PriceLevel | None]:
return self.get_bid_level(), self.get_ask_level()
def get_bid_level(self, idx: int = 0) -> PriceLevel | None:
if self.bids and len(self.bids) > idx:
# Reverse for bids to get highest prices first
return self.bids.peekitem(-(idx + 1))[1].level
return None
def get_ask_level(self, idx: int = 0) -> PriceLevel | None:
if self.offers and len(self.offers) > idx:
return self.offers.peekitem(idx)[1].level
return None
def get_bid_level_by_px(self, px: int) -> PriceLevel | None:
try:
return self._get_level(px, "B").level
except KeyError:
return None
def get_ask_level_by_px(self, px: int) -> PriceLevel | None:
try:
return self._get_level(px, "A").level
except KeyError:
return None
def get_order(self, id: int) -> db.MBOMsg | None:
return self.orders_by_id.get(id)
def get_queue_pos(self, id: int) -> int | None:
order = self.get_order(id)
if not order:
return None
level = self._get_level(order.price, order.side)
return sum(
order.size for order in takewhile(lambda o: o.order_id != id, level.orders)
)
def get_snapshot(self, level_count: int = 1) -> list[db.BidAskPair]:
snapshots = []
for level in range(level_count):
ba_pair = db.BidAskPair()
bid = self.get_bid_level(level)
if bid:
ba_pair.bid_px = bid.price
ba_pair.bid_sz = bid.size
ba_pair.bid_ct = bid.count
ask = self.get_ask_level(level)
if ask:
ba_pair.ask_px = ask.price
ba_pair.ask_sz = ask.size
ba_pair.ask_ct = ask.count
snapshots.append(ba_pair)
return snapshots
def apply(self, mbo: db.MBOMsg) -> None:
# Trade, Fill, or None: no change
if mbo.action in ("T", "F", "N"):
return
# Clear book: remove all resting orders
if mbo.action == "R":
self._clear()
return
# side=N is only valid with Trade, Fill, and Clear actions
assert mbo.side in ("A", "B")
# UNDEF_PRICE indicates the book level should be removed
if mbo.price == db.UNDEF_PRICE and mbo.flags & db.RecordFlags.F_TOB:
self._side_levels(mbo.side).clear()
return
# Add: insert a new order
if mbo.action == "A":
self._add(mbo)
# Cancel: partially or fully cancel some size from a resting order
elif mbo.action == "C":
self._cancel(mbo)
# Modify: change the price and/or size of a resting order
elif mbo.action == "M":
self._modify(mbo)
else:
raise ValueError(f"Unknown action={mbo.action}")
def _clear(self) -> None:
self.orders_by_id.clear()
self.offers.clear()
self.bids.clear()
def _add(self, mbo: db.MBOMsg) -> None:
if mbo.flags & db.RecordFlags.F_TOB:
levels = self._side_levels(mbo.side)
levels.clear()
levels[mbo.price] = LevelOrders(price=mbo.price, orders=[mbo])
else:
level = self._get_or_insert_level(mbo.price, mbo.side)
assert mbo.order_id not in self.orders_by_id
self.orders_by_id[mbo.order_id] = mbo
level.orders.append(mbo)
def _cancel(self, mbo: db.MBOMsg) -> None:
order = self.orders_by_id[mbo.order_id]
level = self._get_level(mbo.price, mbo.side)
assert order.size >= mbo.size
order.size -= mbo.size
# If the full size is cancelled, remove the order from the book
if order.size == 0:
self.orders_by_id.pop(mbo.order_id)
level.orders.remove(order)
# If the level is now empty, remove it from the book
if not level:
self._remove_level(mbo.price, mbo.side)
def _modify(self, mbo: db.MBOMsg) -> None:
order = self.orders_by_id.get(mbo.order_id)
if order is None:
# If order not found, treat it as an add
self._add(mbo)
return
assert order.side == mbo.side, f"Order {order} changed side to {mbo.side}"
level = self._get_level(order.price, order.side)
if order.price != mbo.price:
# Changing price loses priority
level.orders.remove(order)
if not level:
self._remove_level(order.price, mbo.side)
level = self._get_or_insert_level(mbo.price, mbo.side)
level.orders.append(mbo)
elif order.size < mbo.size:
# Increasing size loses priority
level.orders.remove(order)
level.orders.append(mbo)
else:
# Update in place
level.orders[level.orders.index(order)] = mbo
self.orders_by_id[mbo.order_id] = mbo
def _side_levels(self, side: str) -> SortedDict:
if side == "A":
return self.offers
if side == "B":
return self.bids
raise ValueError(f"Invalid {side =}")
def _get_level(self, price: int, side: str) -> LevelOrders:
levels = self._side_levels(side)
if price not in levels:
raise KeyError(f"No price level found for {price =} and {side =}")
return levels[price]
def _get_or_insert_level(self, price: int, side: str) -> LevelOrders:
levels = self._side_levels(side)
if price in levels:
return levels[price]
level = LevelOrders(price=price)
levels[price] = level
return level
def _remove_level(self, price: int, side: str) -> None:
levels = self._side_levels(side)
levels.pop(price)
@dataclass
class Market:
books: defaultdict[int, defaultdict[int, Book]] = field(
default_factory=lambda: defaultdict(lambda: defaultdict(Book)),
)
def get_books_by_pub(self, instrument_id: int) -> defaultdict[int, Book]:
return self.books[instrument_id]
def get_book(self, instrument_id: int, publisher_id: int) -> Book:
return self.books[instrument_id][publisher_id]
def bbo(
self,
instrument_id: int,
publisher_id: int,
) -> tuple[PriceLevel | None, PriceLevel | None]:
return self.books[instrument_id][publisher_id].bbo()
def aggregated_bbo(
self,
instrument_id: int,
) -> tuple[PriceLevel | None, PriceLevel | None]:
agg_bbo: list[PriceLevel | None] = [None, None]
# max for bids, min for asks
for idx, reducer in [(0, max), (1, min)]:
all_best = [b.bbo()[idx] for b in self.books[instrument_id].values()]
all_best = [b for b in all_best if b]
if not all_best:
continue
best_price = reducer(b.price for b in all_best)
best = [b for b in all_best if b.price == best_price]
agg_bbo[idx] = PriceLevel(
price=best_price,
size=sum(b.size for b in best),
count=sum(b.count for b in best),
)
return tuple(agg_bbo)
def apply(self, mbo: db.MBOMsg) -> None:
book = self.books[mbo.instrument_id][mbo.publisher_id]
book.apply(mbo)
def apply_bias(x: float, k: float) -> float:
"""
Apply bias to the random number `x` in the range [0, 1].
-1 < k < 0 results in a bias towards 1.
k == 0 results in a uniform distribution
0 < k < 1 results in a bias towards 0
k == -1 always returns 1
k == 1 always returns 0
"""
assert 0 <= x <= 1 # Random number between 0 and 1
assert -1 <= k <= 1 # Bias between -1 and 1
if k <= 0:
return x ** (1 + k)
else:
return 1 - (1 - x) ** (1 - k)
if __name__ == "__main__":
# Create a historical client
client = db.Historical("$YOUR_API_KEY")
# Set parameters
dataset = "GLBX.MDP3"
start_date = "2025-02-12"
end_date = "2025-02-13"
symbol = "ES.v.0"
# For this example, we will retroactively pick an order to track
order_id = 6414447712675
order_add_time = 1739375799442261697
order_fill_time = 1739375974175277429
order_price = 6064000000000
order_size = 1
# Now we will request MBO data.
mbo_data_path = "glbx-mdp3-20250212.mbo.dbn.zst"
if os.path.exists(mbo_data_path):
mbo_data = db.DBNStore.from_file(mbo_data_path)
else:
mbo_data = client.timeseries.get_range(
dataset=dataset,
start=start_date,
end=end_date,
symbols=symbol,
stype_in="continuous",
schema="mbo",
path=mbo_data_path,
)
# Next, we will record all updates at this price while the order was active
price_level_msgs: list[dict] = []
book = Book()
for mbo_msg in mbo_data:
# Process MBO msg
book.apply(mbo_msg)
# Only record updates when order is active
if order_add_time <= mbo_msg.ts_event < order_fill_time:
# Wait for end of event
if mbo_msg.flags & db.RecordFlags.F_LAST:
# Get total size at price and actual queue position
price_level_msg = {
"ts_event": mbo_msg.ts_event,
"Depth at price level": book.get_ask_level_by_px(order_price).size,
"Queue position": book.get_queue_pos(order_id),
}
price_level_msgs.append(price_level_msg)
# Construct a DataFrame from MBO updates
df = pd.DataFrame.from_records(price_level_msgs)
df["Order lifetime (s)"] = (df["ts_event"] - df["ts_event"][0]) / 1e9
df = df.set_index("Order lifetime (s)")
# Plot actual queue position using MBO data
ax = df[["Depth at price level", "Queue position"]].plot(
drawstyle="steps-post",
ylabel="Contracts",
)
def generate_simulations(df: pd.DataFrame, k: float) -> pd.Series:
"""
Generate simulations for estimated queue position.
"""
# Run simulations for estimated queue position
sim_list = [df.index.to_list()]
# Order is starting at the back of the queue
total_size_list = df["Depth at price level"].to_list()
start_queue_pos = total_size_list[0]
# Run 100 simulations
for i in range(100):
c_total_size = start_queue_pos # Current size at price level
c_queue_pos = start_queue_pos - order_size # Current estimated queue position
estimated_queue_pos = []
for total_size in total_size_list:
# Decrease in size at price level. A cancel occurred
if total_size < c_total_size:
cancel_size = c_total_size - total_size
others_size = total_size - order_size # Size of rest of level
random_number = apply_bias(random.random(), k)
if random_number < (c_queue_pos / others_size):
c_queue_pos -= cancel_size
c_queue_pos = min(max(c_queue_pos, 0), others_size)
c_total_size = total_size
estimated_queue_pos.append(c_queue_pos)
sim_list.append(estimated_queue_pos)
col_names = [f"sim_{i}" for i in range(len(sim_list))]
df_sim = pd.DataFrame(dict(zip(col_names, sim_list)))
df_sim = df_sim.set_index(df.index)
return df_sim.quantile(0.50, axis=1)
# Next, we will also download MBP-10 data for estimating queue position
mbp10_data_path = "glbx-mdp3-20250212.mbp-10.dbn.zst"
if os.path.exists(mbp10_data_path):
mbp10_data = db.DBNStore.from_file(mbp10_data_path)
else:
mbp10_data = client.timeseries.get_range(
dataset=dataset,
start=start_date,
end=end_date,
symbols=symbol,
stype_in="continuous",
schema="mbp-10",
path=mbp10_data_path,
)
# Now we will use the MBP-10 data to record similar information as above
mbp10_price_level_msgs: list[dict] = []
for mbp10_msg in mbp10_data:
if order_add_time <= mbp10_msg.ts_event < order_fill_time:
for level_pair in mbp10_msg.levels:
if level_pair.ask_px == order_price:
# Get total size at price
mbp_price_level_msg = {
"ts_event": mbp10_msg.ts_event,
"Depth at price level": level_pair.ask_sz,
}
mbp10_price_level_msgs.append(mbp_price_level_msg)
# Construct a DataFrame from MBP-10 updates
df_mbp10 = pd.DataFrame.from_records(mbp10_price_level_msgs)
df_mbp10["Order lifetime (s)"] = (df_mbp10["ts_event"] - df_mbp10["ts_event"][0]) / 1e9
df_mbp10 = df_mbp10.set_index("Order lifetime (s)")
# Run simulations using various weights
for k in (0, -0.8, -0.95):
df_mbp10[f"Estimated queue position (k={k})"] = generate_simulations(df_mbp10, k)
# Add estimated queue position using MBP-10 data to plot
df_mbp10.filter(regex="Estimated", axis=1).plot(
ax=ax,
drawstyle="steps-post",
title="Actual vs. estimated queue position",
style=["C3", "C5", "C2"],
ylim=(0, 85),
)
plt.show()
Results
We can show how the exact queue position of the order within its price level decreases over time (orange), even as orders are added and removed from the book (purple). The naive model assuming uniform cancellation (green) does a poor job estimating the position, but the biased models perform better.