Normalization and Local Processing

This page contains tardis-dev normalization, local processing and advanced examples that were split out of Getting Started. Detailed API usage for replay and streaming lives on Replaying Historical Data and Streaming Real-Time Data.

Data normalization

Data normalization allows consuming market data feeds from various exchanges in consistent format.

tardis-dev has following built-in normalizers that can be provided to replayNormalized or streamNormalized functions:

If you're interested in how exactly data is mapped from exchange-native format to normalized one, please follow code in tardis-dev GitHub repositoryarrow-up-right for each exchange and if you determined that mapping should be done differently please read "modifying built-in and adding custom normalizers" section.

const { streamNormalized, normalizeTrades, normalizeBookChanges,
  normalizeDerivativeTickers } = require('tardis-dev')

// or replayNormalized to replay normalized historical data
const messages = streamNormalized(
  {
    exchange: 'deribit',
    symbols: ['BTC-PERPETUAL']
  },
  normalizeTrades,
  normalizeBookChanges,
  normalizeDerivativeTickers
)

for await (const message of messages) {
  if (message.type === 'book_change') {
    // process normalized book change
  }
  if (message.type === 'trade') {
    // process normalized trade
  }
  if (message.type === 'derivative_ticker') {
    // process normalized derivative_ticker
  }
}

normalizeTrades

When passed as an arg to replayNormalized or streamNormalized, it provides normalized trade data for all supported exchanges.

normalizeBookChanges

When passed as an arg to replayNormalized or streamNormalized, it provides normalized book_change data for all supported exchanges.

Provides initial L2 (market by price) order book snapshots (isSnapshot=true) plus incremental updates for each order book change. Please note that amount is the updated amount at that price level, not a delta. An amount of 0 indicates the price level can be removed.

When processing book_change updates: an amount of 0 means remove that price level. If you receive a removal for a price level not in your local book, ignore it. Snapshot levels may include zero-amount entries — these should also be treated as absent levels.

normalizeBookTickers

When passed as an arg to replayNormalized or streamNormalized, it provides normalized book_ticker data — best bid/ask sourced from exchange-native BBO feeds (e.g., Binance bookTicker, Bybit orderbook.1). Unlike quote, which can be computed from L2 order book data via computeBookSnapshots, book_ticker uses the exchange's dedicated BBO stream when available.

Because it is a standalone feed, book_ticker replay can start from any point in time, whereas quote requires starting from 00:00 UTC (when the initial L2 order book snapshot is provided).

normalizeDerivativeTickers

When passed as an arg to replayNormalized or streamNormalized, it provides normalized derivative_ticker data for supported exchanges that trade derivative instruments.

normalizeLiquidations

When passed as an arg to replayNormalized or streamNormalized, it provides normalized liquidation data for exchanges that publish liquidation events. See which exchanges support liquidations.

normalizeOptionsSummary

When passed as an arg to replayNormalized or streamNormalized, it provides normalized option_summary data for exchanges that provide options data (e.g., Deribit, OKX Options). Includes greeks, implied volatility, and best bid/ask.

disconnect message

When replayNormalized or streamNormalized functions options have withDisconnectMessages flag set to true and disconnect event occurred (eg.: WebSocket connection close) then disconnect message is being returned.

Modifying built-in and adding custom normalizers

In tardis-dev data normalization is implemented via normalize factory functions provided to replayNormalized and streamNormalized functions. This design gives lots of flexibility by allowing replacing, extending and modifying built-in normalizers or adding new ones for new normalized data types without the need of forking the whole library.

Any normalize function provided to replayNormalized and streamNormalized functions needs to have following signature:

Exchange is an exchange id for which mapper object needs to be returned for, localTimestamp is a date for which mapper is created (and is created after each disconnection). In most cases localTimestamp is not necessary for anything, but in certain cases like for example exchange API change it can be used to switch to different mapping logic like using new data channel that wasn't available until certain date.

Returned Mapper object has following signature:

On every disconnection event, normalize factory functions are called again to provide new Mapper objects with clean state if required (stateful mapping like BitMEX order book data needs to persist mapping between price level ID and price level and reset for each new connection). If a mapper object is stateful, it is required to always return a new clean state object from the normalize factory function or reset its state in one way or another.

Normalized data returned by iterable iteratorarrow-up-right of Mapper.map method is expected to have a shape that has at least fields as described in normalized data type section below to play well with other tardis-dev functions like combine or compute.

normalized data type

Adding custom normalizeLiquidations normalizer

Example implementation of a custom normalizeLiquidations function that normalizes liquidations data for deribit exchange. Implementations for other exchanges are left as an exercise for the reader.

type of messages provided by normalizeLiquidations

implementation of deribitLiquidations mapper and normalizeLiquidations

normalizeLiquidations usage example

We could as well provide the same normalizeLiquidations function to streamNormalized function or use it together it with other normalizers (normalizeTrades etc.).

Changing normalizeTrades for Binance exchange

Let's assume that default normalization of Binance exchange trades data doesn't fit our use case and we need to use @aggTradearrow-up-right stream as a source of trade data instead of the default @tradearrow-up-right stream.

normalizeTradesWithBinancePatch usage example

Limit order book reconstruction

tardis-dev exports OrderBook class that, when instantiated, can process normalized book_change messages with order book snapshots and incremental updates and allows maintaining full local order book (level 2 - aggregated market-by-price) state both for real-time data and for reconstructing historical order book state at any past point in time. It waits for the first book_change message that is a snapshot (isSnapshot = true) and then applies subsequent updates to it. A single orderBook object can maintain order book state only for a single symbol/instrument. It uses a Red-Black treearrow-up-right data structure under the hood to efficiently maintain its local state in sorted order.

OrderBook constructor options

new OrderBook() accepts an optional options object:

name
type
default
description

removeCrossedLevels

boolean (optional)

undefined

when set to true, automatically detects and removes crossed levels (where best bid >= best ask) that can occur when exchanges fail to publish delete messages

onCrossedLevelRemoved

function (optional)

undefined

optional callback (bookChange, bestBidBefore, bestBidAfter, bestAskBefore, bestAskAfter) => void invoked whenever a crossed level is removed

orderBook.update(bookChange)

Processes normalized book_change messages to update its internal local state that maintains ordered bids and asks sets. It ignores any non-snapshot book_change messages before an initial snapshot is received. It should be called for every book_change message received for the symbol for which you'd like to reconstruct the order book.

orderBook.bestBid()

Returns book price level object for highest bid order (best bid) in order book or undefined if book doesn't have any bids (not initialized yet with initial snapshot).

orderBook.bestAsk()

Returns book price level object for lowest ask order (best ask) in order book or undefined if book doesn't have any asks (not initialized yet with initial snapshot).

orderBook.asks()

Returns iterable iteratorarrow-up-right of book price level objects for all asks available ordered from the lowest to highest ask.

orderBook.bids()

Returns iterable iteratorarrow-up-right of book price level objects for all bids available ordered from highest to lowest bid.

book price level type

Combining data streams

combine(...iterators)

Combine function given multiple async iterators combines them into single one. That allows synchronized historical market data replay and consolidated streaming of real-time data for multiple exchanges via single for await ...ofarrow-up-right loop.

Accepts async iterables of normalized messages as rest parametersarrow-up-right and combines them returning single async iteratable.

For historical data replay it combines input async iterables messages by sorting them by localTimestamp in ascending order, this allows synchronized/ordered market data replay for multiple exchanges.

For real-time market data streaming it combines input async iterables messages in FIFO order by using Promise.racearrow-up-right.

Computing derived data locally

compute(iterator, ...computables)

Compute function allows computing various derived data locally via so called computables like:

If you're interested in adding custom computables like for example order book imbalance, volume imbalance, open interest or funding rate based bars please read "adding custom computable" section.

Compute function accepts an async iterable producing normalized messages together with computables as rest parametersarrow-up-right, and returns an async iterable with normalized messages produced by the provided iterable plus all computed messages based on provided computable functions. It computes and produces separate normalized computed messages for each symbol and exchange combination. When a disconnect message is returned by the provided async iterable, it discards existing pending computables and starts computing them from scratch.

computeTradeBars(options)

When provided to compute function, it computes normalized trade_bar messages based on normalized trade data.

compute trade bars options

name
type
default
description

kind

'time', 'volume', or 'tick'

-

determines the way trades within a bar will be aggregated. time creates classic OHLC candles aggregated by time. volume creates volume-based trade bars aggregated by the sum of trades amount. tick creates trade bars aggregated by trades count.

interval

number

-

determines interval to aggregate by - for time based bars it's number of milliseconds, for volume based bars it's accumulated volume, for tick it's count of trades

name

string (optional)

undefined

optional custom name of trade_bar, if not specified computed name will be provided based on kind and interval options

type of message provided by computeTradeBars

sample normalized trade_bar message

computeBookSnapshots(options)

When provided to compute function, computes normalized book_snapshot messages based on normalized order book data. It produces new snapshots only if there is an actual change in order book state for requested depth.

compute book snapshots options

name
type
default
description

depth

number

-

number of closest bids and asks levels to provide snaphot for

interval

number

-

snapshot interval in milliseconds, if 0 is provided it computes snapshots real-time any time there is a change in order book state for requested depth

name

string (optional)

undefined

optional custom name of book_snapshot, if not specified computed name will be provided based on depth and interval options

grouping

number (optional)

undefined

when provided, aggregates order book price levels into groups of the specified price increment (e.g., 10 groups all levels within each $10 range into a single level). Bids are floored and asks are ceiled to the grouping boundary.

removeCrossedLevels

boolean (optional)

undefined

when set to true, automatically detects and removes crossed levels (where best bid >= best ask) that can occur when exchanges fail to publish delete messages

onCrossedLevelRemoved

function (optional)

undefined

optional callback (bookChange, bestBidBefore, bestBidAfter, bestAskBefore, bestAskAfter) => void invoked whenever a crossed level is removed

type of message provided by computeBookSnaphots

sample normalized book_snapshot message

Adding custom computable

Any computables provided to compute function need to be factory functions with following signature:

where returned Computable object has following signature:

Computable.compute returned iterator is expected to provide objects that at least have fields as described in normalized data type section to play well with other tardis-dev functions like combine.

computeOrderBookImbalanceRatio()

Example implementation of custom computeOrderBookImbalanceRatio function that as a source data type uses book snapshots and based on it computes ratio of asks amounts (sell orders) to bids amounts (buy orders) for given book_snapshot depth. It may be used to determine relative buy or sell pressure.

type of messages produced by computeOrderBookImbalanceRatio

implementation of BookImbalanceRatioComputable computable and computeOrderBookImbalanceRatio factory function.

computeOrderBookImbalanceRatio usage example

Given implementation above we can compute book imbalance ratio for BitMEX real-time XBTUSD message stream. For this example we compute top 5 levels, 2 second book snapshots as a source to our custom computable. We need to have async iterable that produces book snapshots as a source to our book imbalance computable, hence two invocations of compute.

Examples

Real-time spread across multiple exchanges

Example showing how to very easy display real-time spread and best bid/ask info across multiple exchanges at once. It can be easily adapted to do the same for historical data (replayNormalized instead of streamNormalized).

Replay large historical trades across multiple exchanges

Example showing replaying large historical trades across multiple exchanges as those happened.

Seamless switching between real-time streaming and historical market data replay

Example showing a simple pattern of providing an async iterable of market data messages to a function that can process them whether it is real-time or historical market data. That effectively enables having the same data pipeline for backtesting and live trading.

Real-time funding rate and open interest across multiple exchanges

Example showing how to quickly display real-time funding rate and open interest info across multiple exchanges at once.

Saving historical funding, index and open interest data to CSV file

Example showing how to write Deribit exchange historical funding, index and open interest data into CSV.

Computing simple moving average of volume based trade bars

Example showing implementation of SimpleMovingAverageComputable that calculates average of trade bar closes prices for specified rolling window in incremental way. It uses CircularBuffer under the hood.

Usage

Environment variables (advanced)

The following environment variables configure streaming and network behavior. Network variables (HTTP_PROXY, SOCKS_PROXY) apply to both real-time streaming and historical data downloads. Exchange credentials and WebSocket URL overrides apply only to real-time streaming.

Network

Name
Description

HTTP_PROXY

HTTP/HTTPS proxy URL for all outgoing requests

SOCKS_PROXY

SOCKS proxy URL (used if HTTP_PROXY is not set)

WSS_URL_<EXCHANGE>

Override the default WebSocket URL for a specific exchange. Exchange name is uppercased with dashes replaced by underscores (e.g., WSS_URL_BINANCE_US)

Exchange credentials

Required for exchanges that need authentication to access real-time WebSocket feeds.

Name
Description

OKX_API_KEY

OKX API key — required for restricted real-time channels such as books-l2-tbt (VIP-only)

OKX_API_SECRET_KEY

OKX API secret key

OKX_API_PASSPHRASE

OKX API passphrase

OKX_API_VIP_5

Set to true if OKX account has VIP 5+ tier — enables access to books-l2-tbt channel

OKX_API_COLO

Set to true if using OKX colocation — enables access to books-l2-tbt channel

DERIBIT_API_CLIENT_ID

Deribit API client ID — required for authenticated real-time channels

DERIBIT_API_CLIENT_SECRET

Deribit API client secret

COINBASE_API_KEY

Coinbase API key — required for authenticated WebSocket feeds

COINBASE_API_SECRET

Coinbase API secret

COINBASE_API_PASSPHRASE

Coinbase API passphrase

COINBASE_INTERNATIONAL_API_KEY

Coinbase International API key

COINBASE_INTERNATIONAL_API_SECRET

Coinbase International API secret

COINBASE_INTERNATIONAL_API_PASSPHRASE

Coinbase International API passphrase

Binance rate limit tuning

Control rate limiting for Binance depth snapshot requests and openInterest REST polling during real-time streaming. Variable names use the exchange name uppercased with dashes replaced by underscores (e.g., BINANCE_FUTURES_REQUEST_WEIGHT_LIMIT).

Name
Default
Applies to
Description

<EXCHANGE>_REQUEST_WEIGHT_LIMIT

from API

depth + OI

Override the request weight limit per minute

<EXCHANGE>_MIN_AVAILABLE_WEIGHT_BUFFER

auto

depth + OI

Minimum request weight buffer before throttling kicks in

<EXCHANGE>_CONCURRENCY_LIMIT

4

depth

Number of concurrent depth snapshot requests

<EXCHANGE>_SNAPSHOTS_DELAY_MS

none

depth

Delay in ms between individual depth snapshot requests

<EXCHANGE>_OPEN_INTEREST_POLLING_INTERVAL_MS

5000

openInterest

Minimum polling interval in ms for generated openInterest snapshots

Last updated