Node.js

Fast and convenient access to tick-level historical and real-time cryptocurrency market data via Node.js

Introduction

Node.js tardis-dev library provides convenient access to tick-level historical and real-time cryptocurrency market data both in exchange native and normalized formats. Instead of callbacks it relies on async iteration (for await ...of) enabling composability features like seamless switching between real-time data streaming and historical data replay or computing derived data locally.

historical market data replay
real-time market data streaming
const tardis = require('tardis-dev')
const { replayNormalized, normalizeTrades, normalizeBookChanges } = tardis
const messages = replayNormalized(
{
exchange: 'bitmex',
symbols: ['XBTUSD', 'ETHUSD'],
from: '2019-05-01',
to: '2019-05-02'
},
normalizeTrades,
normalizeBookChanges
)
for await (const message of messages) {
console.log(message)
}
const tardis = require('tardis-dev')
const { streamNormalized, normalizeTrades, normalizeBookChanges } = tardis
const messages = streamNormalized(
{
exchange: 'bitmex',
symbols: ['XBTUSD', 'ETHUSD']
},
normalizeTrades,
normalizeBookChanges
)
for await (const message of messages) {
console.log(message)
}

Try this code live on RunKit

Features

  • real-time streaming of tick-level market data with unified API for connecting directly to exchanges public WebSocket APIs without any intermediary/3rd party proxy

  • historical tick-level market data replay backed by tardis.dev HTTP API — includes full order book depth snapshots plus incremental updates, tick-by-tick trades, historical open interest, funding, index, mark prices, liquidations and more

  • support for both exchange native and normalized market data formats (consistent format for accessing market data across multiple exchanges — normalized trades, order book and ticker data)

  • transparent historical local data caching (cached data is stored on disk in compressed GZIP format and decompressed on demand when reading the data)

  • support for top cryptocurrency exchanges: BitMEX, Deribit, Binance, Binance Futures, FTX, OKEx, Huobi Global, Huobi DM, bitFlyer, Bitstamp, Coinbase Pro, Crypto Facilities, Gemini, Kraken, Bitfinex, Huobi US and Bybit.

  • automatic closed connections and stale connections reconnection logic for real-time streams

  • combining multiple exchanges feeds into single one via combine helper function — synchronized historical market data replay and consolidated real-time data streaming from multiple exchanges

  • computing derived data locally like order book imbalance, custom trade bars, book snapshots and more via compute helper function and computables, e.g., volume based bars, top 20 levels order book snapshots taken every 10 ms etc.

  • full limit order book reconstruction both for real-time and historical data via OrderBook object

  • fast and lightweight architecture — low memory footprint and no heavy in-memory buffering

  • extensible mapping logic that allows adjusting normalized formats for specific needs

  • built-in TypeScript support

Installation

Requires Node.js v12+ installed.

npm install tardis-dev --save

Debugging and logging

tardis-dev lib uses debug package for verbose logging and debugging purposes that can be enabled via DEBUG environment variable set to tardis-dev*.

Usage with TypeScript

Simply change from require:

const { replay, stream } = require('tardis-dev')

to ES Modules import:

import { replay, stream } from 'tardis-dev'

to enjoy first class TypeScript typings.

Replaying historical market data

See historical data details page to get detailed information about historical market data available for each exchange.

replay(options)

Replays historical market data messages for given replay options in exchange native format. Historical market data is being fetched efficiently (in parallel) from the tardis.dev HTTP API and cached locally. Returns async iterable.

const { replay } = require('tardis-dev')
const messages = replay({
exchange: 'bitmex',
filters: [
{ channel: 'trade', symbols: ['XBTUSD'] },
{ channel: 'orderBookL2', symbols: ['XBTUSD'] }
],
from: '2019-05-01',
to: '2019-05-02'
})
for await (const message of messages) {
console.log(message)
}

Try this code live on RunKit

stream(options) is the real-time counterpart of replayfunction — returns market data in the same format, just real-time one not historical

replay options

name

type

default

exchange

string

-

requested exchange id - one of allowed values

filters

{channel:string, symbols?: string[]}[]

[]

optional filters of requested historical data feed - use getExchangeDetails function to get allowed channels and symbols ids for requested exchange

from

string

-

replay period start date (UTC) in a format recognized by the Date.parse(), e.g., 2019-04-01

to

string

-

replay period end date (UTC) in a format recognized by the Date.parse(), e.g., 2019-04-02

skipDecoding

boolean | undefined

undefined

when set to true returns messages as buffers instead of decoding them to objects

withDisconnects

boolean | undefined

undefined

when set to true returns message with value undefined for events when connection that was recording the historical data was closed

apiKey

string | undefined

undefined

API key for tardis.dev API - if not provided only first day of each month of historical data is accessible. It can also be set via init function for all replay calls.

type of messages provided by replay iterator (for await ...of)

type Message =
| {
localTimestamp: Date // local timestamp when message has been received
message: any // data message native exchange format
}
// when skipDecoding is set to true
| {
localTimestamp: Buffer
message: Buffer
}
// when withDisconnects is set to true whole message can be undefined
Message | undefined

sample message

{
localTimestamp: 2019-05-01T00:00:07.013Z,
message: {
table: 'orderBookL2',
action: 'update',
data: [ { symbol: 'XBTUSD', id: 8799474100, side: 'Buy', size: 167340 } ]
}
}

replayNormalized(options, ...normalizers)

Replays historical market data messages for given replay options and normalizes messages using normalizers provided as rest arguments. Historical market data is being fetched efficiently (in parallel) from the tardis.dev HTTP API and cached locally. Returns async iterable.

const tardis = require('tardis-dev')
const { replayNormalized, normalizeTrades, normalizeBookChanges } = tardis
const messages = replayNormalized(
{
exchange: 'bitmex',
symbols: ['XBTUSD', 'ETHUSD'],
from: '2019-05-01',
to: '2019-05-02'
},
normalizeTrades,
normalizeBookChanges
)
for await (const message of messages) {
console.log(message)
}

Try this code live on RunKit

streamNormalized(options, ...normalizers) is the real-time counterpart of replayNormalized function — returns market data in the same format, just real-time one not historical

replay normalized options

name

type

default

exchange

string

-

requested exchange id - one of allowed values

symbols

string[] | undefined

undefined

optional symbols for requested data feed - use getExchangeDetails function to get allowed symbols ids for requested exchange

from

string

-

replay period start date (UTC) in a format recognized by the Date.parse(), e.g., 2019-04-01

to

string

-

replay period end date (UTC) in a format recognized by the Date.parse() e.g., 2019-04-02

withDisconnectMessages

boolean | undefined

undefined

when set to true returns disconnect messages for events when connection that was recording the historical data was closed

apiKey

string | undefined

undefined

API key for tardis.dev API - if not provided only first day of each month of historical data is accessible. It can also be set via init function for all replayNormalized calls.

Built-in normalizers

replayNormalized function accepts any number of normalizers as rest parameters that map from native exchange format to normalized data format. tardis-dev ships with built in ones that normalize trades, order book and derivative ticker data but also allows adding custom ones.

types of messages provided by replayNormalized iterator (for await ...of)

Message types and formats depend on specific normalizers provided to replayNormalized function and are documented in detail in data normalization section.

sample message

Sample message produced by normalizeBookChanges

{
type: 'book_change',
symbol: 'XBTUSD',
exchange: 'bitmex',
isSnapshot: false,
bids: [{ price: 5263.5, amount: 1780043 }],
asks: [],
timestamp: 2019-05-01T00:00:04.430Z,
localTimestamp: 2019-05-01T00:00:04.430Z
}

Streaming real-time market data

stream(options)

Streams real-time market data messages for given stream options in exchange native format. It connects directly to exchanges WebSocket APIs and transparently restarts closed, broken or stale connections (open connections without data being send for specified amount of time). Returns async iterable.

const { stream } = require('tardis-dev')
const messages = stream({
exchange: 'bitmex',
filters: [
{ channel: 'trade', symbols: ['XBTUSD'] },
{ channel: 'orderBookL2', symbols: ['XBTUSD'] }
]
})
for await (const message of messages) {
console.log(message)
}

Try this code live on RunKit

replay(options) is a historical market data counterpart of stream function — returns market data in the same format, just historical one not real-time

stream options

name

type

default

exchange

string

-

requested exchange id - one of allowed values

filters

{channel:string, symbols?: string[]}[]

[]

optional filters of requested real-time data feed - use getExchangeDetails to get allowed channels and symbols ids for requested exchange

skipDecoding

boolean | undefined

undefined

when set to true returns messages as buffers instead of decoding them to objects

withDisconnects

boolean | undefined

undefined

when set to true returns message with value undefined for real-time stream disconnect events

timeoutIntervalMS

number

10000

specifies time in milliseconds after which connection is restarted if no message has been received from the exchange

onError

(err) => void | undefined

undefined

Optional callback invoked when real-time WebSocket connection error occurs, useful for custom error logging etc.

type of messages provided by stream iterator (for await ...of)

type Message =
| {
localTimestamp: Date // local timestamp when message has been received
message: any // data message in exchange native data format
}
// when skipDecoding is set to true
| {
localTimestamp: Buffer
message: Buffer
}
// when withDisconnects is set to true whole message can be undefined
Message | undefined

sample message

{
localTimestamp: 2019-10-22T09:24:51.025Z,
message: {
table: 'orderBookL2',
action: 'update',
data: [
{ symbol: 'XBTUSD', id: 8799172400, side: 'Sell', size: 369501 }
]
}
}

streamNormalized(options, ...normalizers)

Streams real-time market data messages for given stream options and normalizes messages using provided normalizers provided as rest arguments. It connects directly to exchanges WebSocket APIs and transparently restarts closed, broken or stale connections (open connections without data being send for specified amount of time). Returns async iterable.

const tardis = require('tardis-dev')
const { streamNormalized, normalizeTrades, normalizeBookChanges } = tardis
const messages = streamNormalized(
{
exchange: 'bitmex',
symbols: ['XBTUSD', 'ETHUSD']
},
normalizeTrades,
normalizeBookChanges
)
for await (const message of messages) {
console.log(message)
}

Try this code live on RunKit

replayNormalized(options) is a historical counterpart of streamNormalized function — returns market data in the same format, just historical one not real-time

stream normalized options

name

type

default

exchange

string

-

requested exchange id - one of allowed values

symbols

string[] | undefined

undefined

instruments symbols for requested data feed

withDisconnectMessages

boolean | undefined

undefined

when set to true returns disconnect messages for for real-time stream disconnect events

timeoutIntervalMS

number

10000

specifies time in milliseconds after which connection is restarted if no message has been received from the exchange

onError

((err) => void) | undefined

undefined

Optional callback invoked when real-time WebSocket connection or mapping error occurs, useful for custom error logging etc.

Built-in normalizers

streamNormalized function can accept any number of custom normalizers as rest parameters that map from native exchange format to normalized data format. tardis-dev ships with built in ones that normalize trades, order book and derivative ticker data but also allows adding custom ones.

types of messages provided by streamNormalized iterator (for await ...of)

Message types and formats depend on specific normalizers provided to streamNormalized function and are documented in detail in data normalization section.

sample message

Sample message produced by normalizeTrades

{
type: 'trade',
symbol: 'XBTUSD',
exchange: 'bitmex',
id: 'b1f4b309-80e2-1ffb-340b-2f7576f6ef0d',
price: 7447.5,
amount: 100,
side: 'buy',
timestamp: 2019-10-24T13:28:07.867Z,
localTimestamp: 2019-10-24T13:28:07.938Z
}

Historical market data helpers

init(options)

When working with market data viareplay and replayNormalized functions by default only first day of each month of historical data is available for replay as well as locally cached historical data is stored in default location on disk (OS temp dir).

Init function allows providing apiKey received via email after ordering historical market data access via tardis.dev website as well as customcacheDir. ApiKey can also be provided directly via options of replay and replayNormalized functions - that overrides anything that was provided via init.

This function doesn't affect real-time streaming functionality in any way, it's useful only for historical data replay.

const { init } = require('tardis-dev')
init({
apiKey: '<YOUR API KEY>',
cacheDir: '<CUSTOM CACHE DIR>'
})

init options

name

type

defaults

apiKey

string | undefined

undefined

API key for tardis.dev API - if not provided only first day of each month of historical data is accessible

cacheDir

string

<os.tmpdir>/.tardis-cache

path to local dir that will be used as cache location - if not provided default temp dir for given OS will be used

getExchangeDetails(exchange)

Given exchange id provides exchange details (available symbols, availability dates, available channels, pricing info etc) provided by exchanges/:exchange API endpoint.

const { getExchangeDetails } = require('tardis-dev')
const bitmexExchangeDetails = await getExchangeDetails('bitmex')

Try this code live on RunKit

type of response returned by awaiting on getExchangeDetails

{
id: string
name: string
enabled: boolean
filterable: boolean
availableSince: string
availableSymbols: {
id: string
type: 'spot' | 'future' | 'perpetual' | 'option'
availableSince: string
availableTo?: string
}[]
availableChannels: string[]
incidentReports: {
from: string
to: string
status: string
details: string
}
}

getApiKeyAccessInfo(apiKey?)

Given apiKey provided as optional parameter or provided in init function provides information about what historical data is available for it - exchanges, date ranges, symbols.

const { getApiKeyAccessInfo } = require('tardis-dev')
const details = await getApiKeyAccessInfo()

type of response returned by awaiting on getApiKeyAccessInfo()

{
exchange: string
from: string
to: string
symbols: string[]
}[]

clearCache()

Clears local data cache dir.

const { clearCache } = require('tardis-dev')
await clearCache()

Data normalization

Data normalization allows consuming market data from various exchanges in consistent format.

tardis-dev has following built-in normalizers:

If you're interested in how exactly data is mapped from exchange native format to normalized one, please follow code in tardis-dev GitHub repository for each exchange and if you determined that mapping should be done differently please read "modifying built-in and adding custom normalizers" section.

const { streamNormalized, normalizeTrades, normalizeBookChanges,
normalizeDerivativeTickers } = require('tardis-dev')
// or replayNormalized to replay normalized historical data
const messages = streamNormalized(
{
exchange: 'deribit',
symbols: ['BTC-PERPETUAL']
},
normalizeTrades,
normalizeBookChanges,
normalizeDerivativeTickers
)
for await (const message of messages) {
if (message.type === 'book_change') {
// process normalized book change
}
if (message.type === 'trade') {
// process normalized trade
}
if (message.type === 'derivative_ticker') {
// process normalized derivative_ticker
}
}

Try this code live on RunKit

normalizeTrades

Provides normalized trade data for all exchanges supported by tardis-dev. It's a function that returns mapper for requested exchange - when provided to replayNormalized or streamNormalized functions, provides necessary filters for replaying historical or subscribing to real-time trade data and mapping from exchange native trades data into normalized trade messages.

type of message provided by normalizeTrades

{
type: 'trade'
symbol: string
exchange: string
id?: string
price: number
amount: number
side: 'buy' | 'sell' | 'unknown' // liquidity taker side (aggressor)
timestamp: Date
localTimestamp: Date
}

sample normalized trade message

{
type: 'trade',
symbol: 'XBTUSD',
exchange: 'bitmex',
id: '282a0445-0e3a-abeb-f403-11003204ea1b',
price: 7996,
amount: 50,
side: 'sell',
timestamp: 2019-10-23T10:32:49.669Z,
localTimestamp: 2019-10-23T10:32:49.740Z
}

normalizeBookChanges

Provides normalized book_change data for all exchanges supported by tardis-dev. It's a function that returns mapper for requested exchange - when provided to replayNormalized or streamNormalized functions, provides necessary filters for replaying historical or subscribing to real-time order book data and mapping from exchange native order book snapshots and updates to normalized book_change messages. It provides initial full order book snapshots (isSnapshot=true) plus incremental updates for each order book change as published by exchange.

type of message provided by normalizeBookChanges

{
type: 'book_change'
symbol: string
exchange: string
isSnapshot: boolean
bids: { price: number; amount: number }[]
asks: { price: number; amount: number }[]
timestamp: Date
localTimestamp: Date
}

sample normalized book_change message

{
type: 'book_change',
symbol: 'XBTUSD',
exchange: 'bitmex',
isSnapshot: false,
bids: [],
asks: [{ price: 7985, amount: 283318 }],
timestamp: 2019-10-23T11:29:53.469Z,
localTimestamp: 2019-10-23T11:29:53.469Z
}

normalizeDerivativeTickers

Provides normalized derivative_ticker data for exchanges that trade derivative instruments and are supported by tardis-dev. It's a function that returns mapper for requested exchange - when provided to replayNormalized or streamNormalized functions, provides necessary filters for replaying historical or subscribing to real-time data necessary for providing normalized derivative_ticker messages.

type of message provided by normalizeDerivativeTickers

{
type: 'derivative_ticker'
symbol: string
exchange: string
lastPrice?: number
openInterest?: number
fundingRate?: number
indexPrice?: number
markPrice?: number
timestamp: Date
localTimestamp: Date
}

sample normalized derivative_ticker message

{
type: 'derivative_ticker',
symbol: 'BTC-PERPETUAL',
exchange: 'deribit',
lastPrice: 7987.5,
openInterest: 84129491,
fundingRate: -0.00001568,
indexPrice: 7989.28,
markPrice: 7987.56,
timestamp: 2019-10-23T11:34:29.302Z,
localTimestamp: 2019-10-23T11:34:29.416Z
}

disconnect message

When replayNormalized or streamNormalized functions options have withDisconnectMessages flag set to true and disconnect event occurred (eg.: WebSocket connection close) then disconnect message is being provided by the async iteratable from one of those functions.

type of disconnect message

{
type: 'disconnect'
exchange: string
localTimestamp: Date
}

sample disconnect message

{
type: 'disconnect',
exchange: 'deribit',
localTimestamp: 2019-10-23T11:34:29.416Z
}

Modifying built-in and adding custom normalizers

Intardis-dev data normalization is implemented via normalize factory functions provided to replayNormalized and streamNormalized functions. This design gives lots of flexibility by allowing replacing, extending and modifying built-in normalizers or adding new ones for new normalized data types without the need of forking the whole library.

Any normalize function provided to replayNormalized and streamNormalized functions needs to have following signature:

(exchange: string, localTimestamp: Date) => Mapper

Exchange is an exchange id for which mapper object needs to be returned for, localTimestamp is a date for which mapper is created (and is created after each disconnection). In most cases localTimestamp is not necessary for anything, but in certain cases like for example exchange API change it can be used to switch to different mapping logic like using new data channel that wasn't available until certain date.

Returned Mapper object has following signature:

{
canHandle: (message: any) => boolean
map(message: any, localTimestamp: Date): IterableIterator<Data> | undefined
getFilters: (symbols?: string[]) => {channel: string, symbols?: string[]}[]
}

On every disconnection event that occurs normalize factory functions are called again to provide new Mapper objects with clean state if required (stateful mapping like BitMEX order book data that needs to persist state of mapping between price level id and price level and needs to 'reset' for each new connection). If mapper object is stateful it's required to always return new clean state object from normalize factory function or reset it's state in one way or another.

Normalized data returned by iterable iterator of Mapper.map method is expected to have a shape that has at least fields as described in normalized data type section to play well with other tardis-dev functions like combine or compute.

normalized data type

{
type: string
symbol: string
exchange: string
timestamp: Date
localTimestamp: Date
name? : string | undefined
}

Adding custom normalizeLiquidations normalizer

Example implementation of custom normalizeLiquidations function that normalizes liquidations data for deribit exchange. Implementations for for other exchanges are left as an exercise for the reader.

type of messages provided by normalizeLiquidations

{
type: 'liquidation'
symbol: string
exchange: string
side: 'buy' | 'sell'
amount: number
price: number
timestamp: Date
localTimestamp: Date
}

implementation of deribitLiquidations mapper and normalizeLiquidations

// object that maps from deribit trades real-time channel to our custom
// liquidations messages if determines that trade was caused by liquidation
const deribitLiquidationsMapper = {
// function that given message in deribit native format
// https://docs.deribit.com/v2/#trades-instrument_name-interval
// determines if such message can be mapped/normalized
canHandle(message) {
const params = message.params
const channel = params !== undefined ? params.channel : undefined
if (channel === undefined) {
return false
}
return (
channel.startsWith('trades') &&
params.data.some(trade => trade.liquidation !== undefined)
)
},
// given symbols returns filters that are provided
// as filters to replay & stream functions options
// in our case we're interested in trades deribit channel
// but it could be multiple channels in certain scenarions as well
getFilters(symbols) {
return [
{
channel: 'trades',
symbols
}
]
},
// map message that was determined that is can be handled via canHandle
// to normalized liquidation message
*map(message, localTimestamp) {
for (const deribitLiquidationTrade of message.params.data) {
if (deribitLiquidationTrade.liquidation === undefined) {
continue
}
yield {
type: 'liquidation',
symbol: deribitLiquidationTrade.instrument_name,
exchange: 'deribit',
price: deribitLiquidationTrade.price,
amount: deribitLiquidationTrade.amount,
side: deribitLiquidationTrade.direction,
timestamp: new Date(deribitLiquidationTrade.timestamp),
localTimestamp: localTimestamp
}
}
}
}
// provides factory function that given exchange returns mapper for it
// if such mapper if implemented
// in our case deribitLiquidationsMapper is stateless so we can return the same
// instance every time
const normalizeLiquidations = (exchange, localTimestamp) => {
if (exchange === 'deribit') {
return deribitLiquidationsMapper
}
throw new Error(`normalizeLiquidations: ${exchange} not supported`)
}

normalizeLiquidations usage example

We could as well provide the same normalizeLiquidations function to streamNormalized function or use it together it with other normalizers (normalizeTrades etc.).

const { replayNormalized } = require('tardis-dev')
const liquidations = replayNormalized(
{
exchange: 'deribit',
symbols: ['BTC-PERPETUAL'],
from: '2019-07-01',
to: '2019-07-02'
},
normalizeLiquidations
)
for await (const liquidation of liquidations) {
console.log(liquidation)
}

Try this code live on RunKit

Changing normalizeTrades for Binance exchange

Let's assume that default normalization of Binance exchange trades data doesn't fit our use case and we need to use @aggTrade stream as source of trade data instead of used by default@trade stream.

// add custom binance trades mapper that used @aggTrade stream as source of
// normalized trades
const customBinanceTradesMapper = {
canHandle(message) {
return message.stream.endsWith('@aggTrade')
},
getFilters(symbols) {
if (symbols !== undefined) {
symbols = symbols.map(s => s.toLocaleLowerCase())
}
// binance api expects all symbols to be lower cased
return [
{
channel: 'aggTrade',
symbols
}
]
},
*map(message, localTimestamp) {
const binanceAggTrade = message.data
yield {
type: 'trade',
symbol: binanceAggTrade.s,
exchange: 'binance',
id: String(binanceAggTrade.a),
price: Number(binanceAggTrade.p),
amount: Number(binanceAggTrade.q),
side: binanceAggTrade.m ? 'sell' : 'buy',
timestamp: new Date(binanceAggTrade.T),
localTimestamp: localTimestamp
}
}
}
// add new normalize function that only for binance exchange
// uses custom trades mapper and defaults to default normalizeTrades
// for other exchanges
const normalizeTradesWithBinancePatch = (exchange, localTimestamp) => {
if (exchange === 'binance') {
return customBinanceTradesMapper
}
return normalizeTrades(exchange)
}

normalizeTradesWithBinancePatch usage example

const { streamNormalized } = require('tardis-dev')
const messages = streamNormalized(
{
exchange: 'binance',
symbols: ['btcusdt']
},
normalizeTradesWithBinancePatch
)
for await (const message of messages) {
console.log(message)
}

Try this code live on RunKit

Limit order book reconstruction

tardis-dev exports OrderBook class that when instantiated can process normalized book_change messages with order book snapshots and incremental updates and allows maintaining full local order book (level 2 - aggregated market-by-price) state both for real-time data as well as reconstructing historical order book state at any past point in time. It waits for first book_change message that is a snapshot (isSnaphot = true) and then applies subsequent updates to it. Single orderBook object can maintain order book state only for single symbol/instrument. It uses B+ tree data structure under the hood to efficiently maintain it's local state.

historical order book reconstruction
maintaining order book for real-time stream
const { replayNormalized, normalizeBookChanges, OrderBook } = require('tardis-dev')
const books = {
XBTUSD: new OrderBook(),
ETHUSD: new OrderBook()
}
const messages = replayNormalized(
{
exchange: 'bitmex',
symbols: ['XBTUSD', 'ETHUSD'],
from: '2019-05-01',
to: '2019-05-02'
},
normalizeBookChanges
)
for await (const message of messages) {
const orderBook = books[message.symbol]
if (message.type === 'book_change') {
orderBook.update(message)
}
const timestamp = message.localTimestamp.toISOString()
// print best bid/ask for every exchange tick
console.log(timestamp, orderBook.bestAsk(), orderBook.bestBid())
}
const { streamNormalized, normalizeBookChanges, OrderBook } = require('tardis-dev')
const books = {
XBTUSD: new OrderBook(),
ETHUSD: new OrderBook()
}
const messages = streamNormalized(
{
exchange: 'bitmex',
symbols: ['XBTUSD', 'ETHUSD']
},
normalizeBookChanges
)
for await (const message of messages) {
const orderBook = books[message.symbol]
if (message.type === 'book_change') {
orderBook.update(message)
}
const timestamp = message.localTimestamp.toISOString()
// print best bid/ask for every exchange tick
console.log(timestamp, orderBook.bestAsk(), orderBook.bestBid())
}

Try this code live on RunKit

orderBook.update(bookChange)

Processes normalized book_change messages to update it's internal local state that maintains ordered bids and asks sets. It ignores any non snapshot book_change messages before initial snapshot is received. It should be called for every book_change message received for given symbol we'd like to reconstruct order book for.

orderBook.update({
type: 'book_change',
symbol: 'XBTUSD',
exchange: 'bitmex',
isSnapshot: false,
bids: [],
asks: [{ price: 7985, amount: 283318 }],
timestamp: new Date(),
localTimestamp: new Date()
})

orderBook.bestBid()

Returns book price level object for highest bid order (best bid) in order book or undefined if book doesn't have any bids (not initialized yet with initial snapshot).

const { price, amount } = orderBook.bestBid()

orderBook.bestAsk()

Returns book price level object for lowest ask order (best ask) in order book or undefined if book doesn't have any asks (not initialized yet with initial snapshot).

const { price, amount } = orderBook.bestAsk()

orderBook.asks()

Returns iterable iterator of book price level objects for all asks available ordered from the lowest to highest ask.

for (const ask of orderBook.asks()) {
// process asks levels one by one without
// creating in memory array for all levels
}
const orderedAsksArray = Array.from(orderBook.asks())

orderBook.bids()

Returns iterable iterator of book price level objects for all bids available ordered from highest to lowest bid.

for (const bid of orderBook.bids()) {
// process bid levels one by one without
// creating in memory array for all levels
}
const orderedBidsArray = Array.from(orderBook.bids())

book price level type

{
price: number
amount: number
}

Combining data streams

combine(...iterators)

Combine function given multiple async iterators combines them into single one. That allows synchronized historical market data replay and consolidated streaming of real-time data for multiple exchanges via single for await ...of loop.

Accepts async iterables of normalized messages as rest parameters and combines them returning single async iteratable.

For historical data replay it combines input async iterables messages by sorting them by localTimestamp in ascending order, this allows synchronized/ordered market data replay for multiple exchanges.

For real-time market data streaming it combines input async iterables messages in FIFO order by using Promise.race.

combining historical market data from multiple exchanges
combining real-time stream of market data from multiple exchanges
const { replayNormalized, normalizeTrades, combine } = require('tardis-dev')
const bitmexMessages = replayNormalized(
{
exchange: 'bitmex',
symbols: ['XBTUSD'],
from: '2019-05-01',
to: '2019-05-02'
},
normalizeTrades
)
const deribitMessages = replayNormalized(
{
exchange: 'deribit',
symbols: ['BTC-PERPETUAL'],
from: '2019-05-01',
to: '2019-05-02'
},
normalizeTrades
)
const combinedStream = combine(bitmexMessages, deribitMessages)
// order at which messages have historically arrived is preserved
for await (const message of combinedStream) {
if (message.exchange === 'deribit') {
// process deribit trades
console.log(message)
}
if (message.exchange === 'bitmex') {
// process bitmex trades
console.log(message)
}
}
const { streamNormalized, normalizeTrades, combine } = require('tardis-dev')
const bitmexMessages = streamNormalized(
{
exchange: 'bitmex',
symbols: ['XBTUSD']
},
normalizeTrades
)
const deribitMessages = streamNormalized(
{
exchange: 'deribit',
symbols: ['BTC-PERPETUAL']
},
normalizeTrades
)
const combinedStream = combine(bitmexMessages, deribitMessages)
// messages are provided in FIFO order as they arrive
for await (const message of combinedStream) {
if (message.exchange === 'deribit') {
// process deribit trades
console.log(message)
}
if (message.exchange === 'bitmex') {
// process bitmex trades
console.log(message)
}
}

Try this code live on RunKit

Computing derived data locally

compute(iterator, ...computables)

Compute function allows computing various derived data locally via so called computables like:

If you're interested in adding custom computables like for example order book imbalance, volume imbalance, open interest or funding rate based bars please read "adding custom computable" section.

Compute function accepts async iterable producing normalized messages together withcomputables as a rest parameters and returns async iterable with normalized messages produced by provided iterable and additionally all computed messages based on provided computable functions. It computes and produces separate computed normalized messages for each symbol and exchange combination. When disconnect message is returned by provided async iterable it discards existing pending computables and starts from computing them from scratch.

const { streamNormalized, normalizeTrades, normalizeBookChanges,
compute, computeTradeBars, computeBookSnapshots } = require('tardis-dev')
const bitmexMessages = streamNormalized(
{
exchange: 'bitmex',
symbols: ['XBTUSD']
},
normalizeTrades,
normalizeBookChanges
)
const messagesWithComputedTypes = compute(
bitmexMessages,
// 10 seconds time bars
computeTradeBars({ kind: 'time', interval: 10 * 1000 }),
// top 20 levels 50 millisecond order book snapshots
computeBookSnapshots({ depth: 20, interval: 50 }),
// volume based trade bar - 1 million vol buckets
computeTradeBars({ kind: 'volume', interval: 1000 * 1000 })
)
for await (const message of messagesWithComputedTypes) {
if (message.type === 'book_snapshot' || message.type === 'trade_bar') {
console.log(message)
}
}

Try this code live on RunKit

computeTradeBars(options)

When provided to compute function, computes normalized trade_bar messages given provided options based on normalized trade data.

const { computeTradeBars } = require('tardis-dev')
computeTradeBars({ kind: 'volume', interval: 1000 })

compute trade bars options

name

type

default

kind

| 'time'

| 'volume'

| 'tick'

-

determines the way trades within a bar will be aggregated.

time - classic OHLC candles aggregated by time

volume - volume based trade bars agg by sum of trades amount tick - tick based trade bars, aggregated by trades count

interval

number

-

determines interval to aggregate by - for time based bars it's number of milliseconds, for volume based bars it's accumulated volume, for tick it's count of trades

name

string | undefined

undefined

optional custom name of trade_bar, if not specified computed name will be provided based on kind and interval options

type of message provided by computeTradeBars

{
type: 'trade_bar'
symbol: string
exchange: string
name: string
interval: number
kind: 'time' | 'volume' | 'tick'
open: number
high: number
low: number
close: number
volume: number
buyVolume: number
sellVolume: number
trades: number
vwap: number
openTimestamp: Date
closeTimestamp: Date
timestamp: Date
localTimestamp: Date
}

sample normalized trade_bar message

{
type: 'trade_bar',
symbol: 'XBTUSD',
exchange: 'bitmex'
name: 'trade_bar_10000ms',
interval: 10000,
kind: 'time',
open: 7623.5,
high: 7623.5,
low: 7623,
close: 7623.5,
volume: 37034,
buyVolume: 24244,
sellVolume: 12790,
trades: 9,
vwap: 7623.327320840309,
openTimestamp: 2019-10-25T13:11:31.574Z,
closeTimestamp: 2019-10-25T13:11:39.212Z,
localTimestamp: 2019-10-25T13:11:40.369Z,
timestamp: 2019-10-25T13:11:40.000Z,
}

computeBookSnapshots(options)

When provided to compute function, computes normalized book_snapshot messages given provided options based on normalized order book data. It produces new snapshots only if there is an actual change in order book state for requested depth.

const { computeBookSnapshots } = require('tardis-dev')
computeBookSnapshots({ depth: 20, interval: 50 })

compute book snapshots options

name

type

default

depth

number

-

number of closest bids and asks levels to provide snaphot for

interval

number

-

snapshot interval in milliseconds, if 0 is provided it computes snapshots real-time any time there is a change in order book state for requested depth

name

string | undefined

undefined

optional custom name of book_snapshot, if not specified computed name will be provided based on depth and interval options

type of message provided by computeBookSnaphots

{
type: 'book_snapshot'
symbol: string
exchange: string
name: string
depth: number
interval: number
bids: { price: number; amount: number }[]
asks: { price: number; amount: number }[]
timestamp: Date
localTimestamp: Date
}

sample normalized book_snapshot message

{
type: 'book_snapshot',
symbol: 'XBTUSD',
exchange: 'bitmex',
name: 'book_snapshot_2_50ms',
depth: 2,
interval: 50,
bids: [
{ price: 7633.5, amount: 1906067 },
{ price: 7633, amount: 65319 }
],
asks: [
{ price: 7634, amount: 1467849 },
{ price: 7634.5, amount: 67939 }
],
timestamp: 2019-10-25T13:39:46.950Z,