Support

Queue position of an order

See also
See also

It's recommended to first start with the Order tracking and Limit order book examples.

Overview

In this example, we will use a complete limit order book to track the queue position of an order. This example assumes a First-In-First-Out (FIFO) matching algorithm is being utilized. FIFO is the most common algorithm used in modern matching engines.

When market participants have active orders in different instruments and contract months, knowing exact queue positions can be important in managing execution risk.

The ability to track orders as they move throughout the limit order book can be utilized as a trading signal in automated strategies.

Design

We will use the MBO schema ("L3" data), which assigns a unique ID to every order in the book, to exactly track this queue position. The logic for this calculation is a straightforward extension of our limit order book example.

We will also use the MBP-10 schema ("L2" data) to estimate queue position, assuming we did not have access to MBO data.

Suppose denotes the probability of a cancel in front when your order is positioned in the queue.

  • When you first join the queue, , since all cancels must happen in front of you.
  • When you are at the front of the queue, , since all cancels must happen from behind.

A naive assumption is that orders cancel uniformly through the queue, i.e. . In reality, cancels are more likely to happen behind you vs. in front of you, since the orders in front of you have value from their time priority.

We can model this cancellation bias with a parameter , ranging from -1 to 1. results in an unbiased, uniform distribution.

We'll take a look at this uniform cancellation estimation (), as well as a few weighted estimations that assumes a higher probability of cancels from behind ( and ).

Example

from __future__ import annotations
import os
import random
from collections import defaultdict
from dataclasses import dataclass, field
from itertools import takewhile
import databento as db
import matplotlib.pyplot as plt
import pandas as pd
from sortedcontainers import SortedDict

@dataclass
class PriceLevel:
    price: int
    size: int = 0
    count: int = 0

    def __str__(self) -> str:
        price = self.price / db.FIXED_PRICE_SCALE
        return f"{self.size:4} @ {price:6.2f} | {self.count:2} order(s)"


@dataclass
class LevelOrders:
    price: int
    orders: list[db.MBOMsg] = field(default_factory=list, compare=False)

    def __bool__(self) -> bool:
        return bool(self.orders)

    @property
    def level(self) -> PriceLevel:
        return PriceLevel(
            price=self.price,
            count=sum(1 for o in self.orders if not o.flags & db.RecordFlags.F_TOB),
            size=sum(o.size for o in self.orders),
        )


@dataclass
class Book:
    orders_by_id: dict[int, db.MBOMsg] = field(default_factory=dict)
    offers: SortedDict[int, LevelOrders] = field(default_factory=SortedDict)
    bids: SortedDict[int, LevelOrders] = field(default_factory=SortedDict)

    def bbo(self) -> tuple[PriceLevel | None, PriceLevel | None]:
        return self.get_bid_level(), self.get_ask_level()

    def get_bid_level(self, idx: int = 0) -> PriceLevel | None:
        if self.bids and len(self.bids) > idx:
            # Reverse for bids to get highest prices first
            return self.bids.peekitem(-(idx + 1))[1].level
        return None

    def get_ask_level(self, idx: int = 0) -> PriceLevel | None:
        if self.offers and len(self.offers) > idx:
            return self.offers.peekitem(idx)[1].level
        return None

    def get_bid_level_by_px(self, px: int) -> PriceLevel | None:
        try:
            return self._get_level(px, "B").level
        except KeyError:
            return None

    def get_ask_level_by_px(self, px: int) -> PriceLevel | None:
        try:
            return self._get_level(px, "A").level
        except KeyError:
            return None

    def get_order(self, id: int) -> db.MBOMsg | None:
        return self.orders_by_id.get(id)

    def get_queue_pos(self, id: int) -> int | None:
        order = self.get_order(id)
        if not order:
            return None
        level = self._get_level(order.price, order.side)
        return sum(
            order.size for order in takewhile(lambda o: o.order_id != id, level.orders)
        )

    def get_snapshot(self, level_count: int = 1) -> list[db.BidAskPair]:
        snapshots = []
        for level in range(level_count):
            ba_pair = db.BidAskPair()
            bid = self.get_bid_level(level)
            if bid:
                ba_pair.bid_px = bid.price
                ba_pair.bid_sz = bid.size
                ba_pair.bid_ct = bid.count
            ask = self.get_ask_level(level)
            if ask:
                ba_pair.ask_px = ask.price
                ba_pair.ask_sz = ask.size
                ba_pair.ask_ct = ask.count
            snapshots.append(ba_pair)
        return snapshots

    def apply(self, mbo: db.MBOMsg) -> None:
        # Trade, Fill, or None: no change
        if mbo.action in ("T", "F", "N"):
            return
        # Clear book: remove all resting orders
        if mbo.action == "R":
            self._clear()
            return
        # side=N is only valid with Trade, Fill, and Clear actions
        assert mbo.side in ("A", "B")
        # UNDEF_PRICE indicates the book level should be removed
        if mbo.price == db.UNDEF_PRICE and mbo.flags & db.RecordFlags.F_TOB:
            self._side_levels(mbo.side).clear()
            return
        # Add: insert a new order
        if mbo.action == "A":
            self._add(mbo)
        # Cancel: partially or fully cancel some size from a resting order
        elif mbo.action == "C":
            self._cancel(mbo)
        # Modify: change the price and/or size of a resting order
        elif mbo.action == "M":
            self._modify(mbo)
        else:
            raise ValueError(f"Unknown action={mbo.action}")

    def _clear(self) -> None:
        self.orders_by_id.clear()
        self.offers.clear()
        self.bids.clear()

    def _add(self, mbo: db.MBOMsg) -> None:
        if mbo.flags & db.RecordFlags.F_TOB:
            levels = self._side_levels(mbo.side)
            levels.clear()
            levels[mbo.price] = LevelOrders(price=mbo.price, orders=[mbo])
        else:
            level = self._get_or_insert_level(mbo.price, mbo.side)
            assert mbo.order_id not in self.orders_by_id
            self.orders_by_id[mbo.order_id] = mbo
            level.orders.append(mbo)

    def _cancel(self, mbo: db.MBOMsg) -> None:
        order = self.orders_by_id[mbo.order_id]
        level = self._get_level(mbo.price, mbo.side)
        assert order.size >= mbo.size
        order.size -= mbo.size
        # If the full size is cancelled, remove the order from the book
        if order.size == 0:
            self.orders_by_id.pop(mbo.order_id)
            level.orders.remove(order)
            # If the level is now empty, remove it from the book
            if not level:
                self._remove_level(mbo.price, mbo.side)

    def _modify(self, mbo: db.MBOMsg) -> None:
        order = self.orders_by_id.get(mbo.order_id)
        if order is None:
            # If order not found, treat it as an add
            self._add(mbo)
            return
        assert order.side == mbo.side, f"Order {order} changed side to {mbo.side}"
        level = self._get_level(order.price, order.side)
        if order.price != mbo.price:
            # Changing price loses priority
            level.orders.remove(order)
            if not level:
                self._remove_level(order.price, mbo.side)
            level = self._get_or_insert_level(mbo.price, mbo.side)
            level.orders.append(mbo)
        elif order.size < mbo.size:
            # Increasing size loses priority
            level.orders.remove(order)
            level.orders.append(mbo)
        else:
            # Update in place
            level.orders[level.orders.index(order)] = mbo
        self.orders_by_id[mbo.order_id] = mbo

    def _side_levels(self, side: str) -> SortedDict:
        if side == "A":
            return self.offers
        if side == "B":
            return self.bids
        raise ValueError(f"Invalid {side =}")

    def _get_level(self, price: int, side: str) -> LevelOrders:
        levels = self._side_levels(side)
        if price not in levels:
            raise KeyError(f"No price level found for {price =} and {side =}")
        return levels[price]

    def _get_or_insert_level(self, price: int, side: str) -> LevelOrders:
        levels = self._side_levels(side)
        if price in levels:
            return levels[price]
        level = LevelOrders(price=price)
        levels[price] = level
        return level

    def _remove_level(self, price: int, side: str) -> None:
        levels = self._side_levels(side)
        levels.pop(price)


@dataclass
class Market:
    books: defaultdict[int, defaultdict[int, Book]] = field(
        default_factory=lambda: defaultdict(lambda: defaultdict(Book)),
    )

    def get_books_by_pub(self, instrument_id: int) -> defaultdict[int, Book]:
        return self.books[instrument_id]

    def get_book(self, instrument_id: int, publisher_id: int) -> Book:
        return self.books[instrument_id][publisher_id]

    def bbo(
        self,
        instrument_id: int,
        publisher_id: int,
    ) -> tuple[PriceLevel | None, PriceLevel | None]:
        return self.books[instrument_id][publisher_id].bbo()

    def aggregated_bbo(
        self,
        instrument_id: int,
    ) -> tuple[PriceLevel | None, PriceLevel | None]:
        agg_bbo: list[PriceLevel | None] = [None, None]
        # max for bids, min for asks
        for idx, reducer in [(0, max), (1, min)]:
            all_best = [b.bbo()[idx] for b in self.books[instrument_id].values()]
            all_best = [b for b in all_best if b]
            if not all_best:
                continue
            best_price = reducer(b.price for b in all_best)
            best = [b for b in all_best if b.price == best_price]
            agg_bbo[idx] = PriceLevel(
                price=best_price,
                size=sum(b.size for b in best),
                count=sum(b.count for b in best),
            )
        return tuple(agg_bbo)

    def apply(self, mbo: db.MBOMsg) -> None:
        book = self.books[mbo.instrument_id][mbo.publisher_id]
        book.apply(mbo)


def apply_bias(x: float, k: float) -> float:
    """
    Apply bias to the random number `x` in the range [0, 1].

    -1 < k < 0 results in a bias towards 1.
    k == 0 results in a uniform distribution
    0 < k < 1 results in a bias towards 0

    k == -1 always returns 1
    k == 1 always returns 0
    """
    assert 0 <= x <= 1  # Random number between 0 and 1
    assert -1 <= k <= 1  # Bias between -1 and 1

    if k <= 0:
        return x ** (1 + k)
    else:
        return 1 - (1 - x) ** (1 - k)


if __name__ == "__main__":
    # Create a historical client
    client = db.Historical("$YOUR_API_KEY")

    # Set parameters
    dataset = "GLBX.MDP3"
    start_date = "2025-02-12"
    end_date = "2025-02-13"
    symbol = "ES.v.0"

    # For this example, we will retroactively pick an order to track
    order_id = 6414447712675
    order_add_time = 1739375799442261697
    order_fill_time = 1739375974175277429
    order_price = 6064000000000
    order_size = 1

    # Now we will request MBO data.
    mbo_data_path = "glbx-mdp3-20250212.mbo.dbn.zst"
    if os.path.exists(mbo_data_path):
        mbo_data = db.DBNStore.from_file(mbo_data_path)
    else:
        mbo_data = client.timeseries.get_range(
            dataset=dataset,
            start=start_date,
            end=end_date,
            symbols=symbol,
            stype_in="continuous",
            schema="mbo",
            path=mbo_data_path,
        )

    # Next, we will record all updates at this price while the order was active
    price_level_msgs: list[dict] = []
    book = Book()
    for mbo_msg in mbo_data:
        # Process MBO msg
        book.apply(mbo_msg)

        # Only record updates when order is active
        if order_add_time <= mbo_msg.ts_event < order_fill_time:
            # Wait for end of event
            if mbo_msg.flags & db.RecordFlags.F_LAST:
                # Get total size at price and actual queue position
                price_level_msg = {
                    "ts_event": mbo_msg.ts_event,
                    "Depth at price level": book.get_ask_level_by_px(order_price).size,
                    "Queue position": book.get_queue_pos(order_id),
                }
                price_level_msgs.append(price_level_msg)

    # Construct a DataFrame from MBO updates
    df = pd.DataFrame.from_records(price_level_msgs)
    df["Order lifetime (s)"] = (df["ts_event"] - df["ts_event"][0]) / 1e9
    df = df.set_index("Order lifetime (s)")

    # Plot actual queue position using MBO data
    ax = df[["Depth at price level", "Queue position"]].plot(
        drawstyle="steps-post",
        ylabel="Contracts",
    )

    def generate_simulations(df: pd.DataFrame, k: float) -> pd.Series:
        """
        Generate simulations for estimated queue position.
        """
        # Run simulations for estimated queue position
        sim_list = [df.index.to_list()]

        # Order is starting at the back of the queue
        total_size_list = df["Depth at price level"].to_list()
        start_queue_pos = total_size_list[0]

        # Run 100 simulations
        for i in range(100):
            c_total_size = start_queue_pos  # Current size at price level
            c_queue_pos = start_queue_pos - order_size  # Current estimated queue position

            estimated_queue_pos = []
            for total_size in total_size_list:
                # Decrease in size at price level. A cancel occurred
                if total_size < c_total_size:
                    cancel_size = c_total_size - total_size
                    others_size = total_size - order_size  # Size of rest of level

                    random_number = apply_bias(random.random(), k)
                    if random_number < (c_queue_pos / others_size):
                        c_queue_pos -= cancel_size

                    c_queue_pos = min(max(c_queue_pos, 0), others_size)

                c_total_size = total_size
                estimated_queue_pos.append(c_queue_pos)

            sim_list.append(estimated_queue_pos)

        col_names = [f"sim_{i}" for i in range(len(sim_list))]
        df_sim = pd.DataFrame(dict(zip(col_names, sim_list)))
        df_sim = df_sim.set_index(df.index)

        return df_sim.quantile(0.50, axis=1)


    # Next, we will also download MBP-10 data for estimating queue position
    mbp10_data_path = "glbx-mdp3-20250212.mbp-10.dbn.zst"
    if os.path.exists(mbp10_data_path):
        mbp10_data = db.DBNStore.from_file(mbp10_data_path)
    else:
        mbp10_data = client.timeseries.get_range(
            dataset=dataset,
            start=start_date,
            end=end_date,
            symbols=symbol,
            stype_in="continuous",
            schema="mbp-10",
            path=mbp10_data_path,
        )

    # Now we will use the MBP-10 data to record similar information as above
    mbp10_price_level_msgs: list[dict] = []
    for mbp10_msg in mbp10_data:
        if order_add_time <= mbp10_msg.ts_event < order_fill_time:
            for level_pair in mbp10_msg.levels:
                if level_pair.ask_px == order_price:
                    # Get total size at price
                    mbp_price_level_msg = {
                        "ts_event": mbp10_msg.ts_event,
                        "Depth at price level": level_pair.ask_sz,
                    }
                    mbp10_price_level_msgs.append(mbp_price_level_msg)

    # Construct a DataFrame from MBP-10 updates
    df_mbp10 = pd.DataFrame.from_records(mbp10_price_level_msgs)
    df_mbp10["Order lifetime (s)"] = (df_mbp10["ts_event"] - df_mbp10["ts_event"][0]) / 1e9
    df_mbp10 = df_mbp10.set_index("Order lifetime (s)")

    # Run simulations using various weights
    for k in (0, -0.8, -0.95):
        df_mbp10[f"Estimated queue position (k={k})"] = generate_simulations(df_mbp10, k)

    # Add estimated queue position using MBP-10 data to plot
    df_mbp10.filter(regex="Estimated", axis=1).plot(
        ax=ax,
        drawstyle="steps-post",
        title="Actual vs. estimated queue position",
        style=["C3", "C5", "C2"],
        ylim=(0, 85),
    )
    plt.show()

Results

We can show how the exact queue position of the order within its price level decreases over time (orange), even as orders are added and removed from the book (purple). The naive model assuming uniform cancellation (green) does a poor job estimating the position, but the biased models perform better.

Queue position