Node.js Client

Convenient access to tick-level historical and real-time cryptocurrency market data via Node.js

Introduction

Node.js tardis-dev library provides convenient access to tick-level historical and real-time cryptocurrency market data both in exchange-native and normalized formats. Instead of callbacks it relies on async iteration (for await ...of) enabling composability features like seamless switching between real-time data streaming and historical data replay or computing derived data locally.

const tardis = require('tardis-dev')
const { replayNormalized, normalizeTrades, normalizeBookChanges } = tardis

const messages = replayNormalized(
  {
    exchange: 'bitmex',
    symbols: ['XBTUSD', 'ETHUSD'],
    from: '2019-05-01',
    to: '2019-05-02'
  },
  normalizeTrades,
  normalizeBookChanges
)

for await (const message of messages) {
  console.log(message)
}

Features

Tardis-dev GitHub repository

Examples (TOC)

Installation

Requires Node.js v12+ installed.

npm install tardis-dev --save

Debugging and logging

tardis-dev lib uses debug package for verbose logging and debugging purposes that can be enabled via DEBUG environment variable set to tardis-dev*.

Usage with TypeScript

Simply change from require

const { replay, stream } = require('tardis-dev')

to ES Modules import

import { replay, stream } from 'tardis-dev'

to enjoy first class TypeScript typings.

Replaying historical market data

See historical data details page to get detailed information about historical market data available for each exchange.

replay(options)

Replays historical market data messages for given replay options in exchange-native format. Historical market data is being fetched efficiently (in parallel) from the Tardis.dev HTTP API and cached locally. Returns async iterable.

const { replay } = require('tardis-dev')

const messages = replay({
  exchange: 'bitmex',
  filters: [
    { channel: 'trade', symbols: ['XBTUSD'] },
    { channel: 'orderBookL2', symbols: ['XBTUSD'] }
  ],
  from: '2019-05-01',
  to: '2019-05-02'
})

for await (const message of messages) {
  console.log(message)
}

stream(options) is the real-time counterpart of replayfunction, returning real-time market data in the same format.

replay options

name

type

default

exchange

string

-

requested exchange id - one of allowed values

filters

{channel:string, symbols?: string[]}[]

[]

optional filters of requested historical data feed - use getExchangeDetails function to get allowed channels and symbols ids for requested exchange

from

string

-

replay period start date (UTC) in a format recognized by the Date.parse(), e.g., 2019-04-01

to

string

-

replay period end date (UTC) in a format recognized by the Date.parse(), e.g., 2019-04-02

skipDecoding

boolean | undefined

undefined

when set to true returns messages as buffers instead of decoding them to objects

withDisconnects

boolean | undefined

undefined

when set to true returns message with value undefined for events when connection that was recording the historical data got disconnected

apiKey

string | undefined

undefined

API key for Tardis.dev HTTP API - if not provided only first day of each month of historical data is accessible. It can also be set via init function for all replay calls.

type of messages provided by replay iterator (for await ...of)

type Message =
  | {
      localTimestamp: Date // local timestamp when message has been received
      message: any // message in exchange-native data format
    }
    // when skipDecoding is set to true
  | {
      localTimestamp: Buffer
      message: Buffer
    }

// when withDisconnects is set to true whole message can be undefined (disconnect)
Message | undefined

sample message

{
  localTimestamp: 2019-05-01T00:00:07.013Z,
  message: {
    table: 'orderBookL2',
    action: 'update',
    data: [ { symbol: 'XBTUSD', id: 8799474100, side: 'Buy', size: 167340 } ]
  }
}

replayNormalized(options, ...normalizers)

Replays historical market data messages for given replay options and normalizes messages using normalizers provided as rest arguments. Historical market data is being fetched efficiently (in parallel) from the Tardis.dev HTTP API and cached locally. Returns async iterable.

const tardis = require('tardis-dev')
const { replayNormalized, normalizeTrades, normalizeBookChanges } = tardis

const messages = replayNormalized(
  {
    exchange: 'bitmex',
    symbols: ['XBTUSD', 'ETHUSD'],
    from: '2019-05-01',
    to: '2019-05-02'
  },
  normalizeTrades,
  normalizeBookChanges
)

for await (const message of messages) {
  console.log(message)
}

streamNormalized(options, ...normalizers) is the real-time counterpart of replayNormalized function, returning real-time market data in the same format.

replay normalized options

name

type

default

exchange

string

-

requested exchange id - one of allowed values

symbols

string[] | undefined

undefined

optional symbols for requested data feed - use getExchangeDetails function to get allowed symbols ids for requested exchange

from

string

-

replay period start date (UTC) in a format recognized by the Date.parse(), e.g., 2019-04-01

to

string

-

replay period end date (UTC) in a format recognized by the Date.parse() e.g., 2019-04-02

withDisconnectMessages

boolean | undefined

undefined

when set to true returns disconnect messages for events when connection that was recording the historical data got disconnected

apiKey

string | undefined

undefined

API key for Tardis.dev HTTP API - if not provided only first day of each month of historical data is accessible. It can also be set via init function for all replayNormalized calls.

Built-in normalizers

replayNormalized function accepts any number of normalizers as rest parameters that map from exchange-native format to normalized data format. tardis-dev ships with built in ones that normalize trades, order book and derivative ticker data but also allows adding custom ones.

types of messages provided by replayNormalized iterator (for await ...of)

Message types and formats depend on specific normalizers provided to replayNormalized function and are documented in detail in data normalization section.

sample message

Sample message produced by normalizeBookChanges

{
  type: 'book_change',
  symbol: 'XBTUSD',
  exchange: 'bitmex',
  isSnapshot: false,
  bids: [{ price: 5263.5, amount: 1780043 }],
  asks: [],
  timestamp: 2019-05-01T00:00:04.430Z,
  localTimestamp: 2019-05-01T00:00:04.430Z
}

Streaming real-time market data

stream(options)

Streams real-time market data messages for given stream options in exchange-native format. It connects directly to exchanges WebSocket APIs and transparently restarts closed, broken or stale connections (open connections without data being send for specified amount of time). Returns async iterable.

const { stream } = require('tardis-dev')

const messages = stream({
  exchange: 'bitmex',
  filters: [
    { channel: 'trade', symbols: ['XBTUSD'] },
    { channel: 'orderBookL2', symbols: ['XBTUSD'] }
  ]
})

for await (const message of messages) {
  console.log(message)
}

replay(options) is the historical market data counterpart of stream function, returning historical market data in the same format.

stream options

name

type

default

exchange

string

-

requested exchange id - one of allowed values

filters

{channel:string, symbols?: string[]}[]

[]

optional filters of requested real-time data feed - use getExchangeDetails to get allowed channels and symbols ids for requested exchange

skipDecoding

boolean | undefined

undefined

when set to true returns messages as buffers instead of decoding them to objects

withDisconnects

boolean | undefined

undefined

when set to true returns message with value undefined for real-time stream disconnect events

timeoutIntervalMS

number

10000

specifies time in milliseconds after which connection is restarted if no message has been received from the exchange

onError

(err) => void | undefined

undefined

Optional callback invoked when real-time WebSocket connection error occurs, useful for custom error logging etc.

type of messages provided by stream iterator (for await ...of)

type Message =
  | {
      localTimestamp: Date // local timestamp when message has been received
      message: any // message in exchange-native data format
    }
    // when skipDecoding is set to true
  | {
      localTimestamp: Buffer
      message: Buffer
    }

// when withDisconnects is set to true whole message can be undefined
Message | undefined

sample message

{
  localTimestamp: 2019-10-22T09:24:51.025Z,
  message: {
    table: 'orderBookL2',
    action: 'update',
    data: [
      { symbol: 'XBTUSD', id: 8799172400, side: 'Sell', size: 369501 }
    ]
  }
}

streamNormalized(options, ...normalizers)

Streams real-time market data messages for given stream options and normalizes messages using provided normalizers provided as rest arguments. It connects directly to exchanges WebSocket APIs and transparently restarts closed, broken or stale connections (open connections without data being send for specified amount of time). Returns async iterable.

const tardis = require('tardis-dev')
const { streamNormalized, normalizeTrades, normalizeBookChanges } = tardis

const messages = streamNormalized(
  {
    exchange: 'bitmex',
    symbols: ['XBTUSD', 'ETHUSD']
  },
  normalizeTrades,
  normalizeBookChanges
)

for await (const message of messages) {
  console.log(message)
}

replayNormalized(options) is the historical counterpart of streamNormalized function, returning historical market data in the same format.

stream normalized options

name

type

default

exchange

string

-

requested exchange id - one of allowed values

symbols

string[] | undefined

undefined

instruments symbols for requested data feed

withDisconnectMessages

boolean | undefined

undefined

when set to true returns disconnect messages for real-time stream disconnect events

timeoutIntervalMS

number

10000

specifies time in milliseconds after which connection is restarted if no message has been received from the exchange

onError

((err) => void) | undefined

undefined

Optional callback invoked when real-time WebSocket connection or mapping error occurs, useful for custom error logging etc.

Built-in normalizers

streamNormalized function can accept any number of custom normalizers as rest parameters that map from exchange-native format to normalized data format. tardis-dev ships with built in ones that normalize trades, order book and derivative ticker data but also allows adding custom ones.

types of messages provided by streamNormalized iterator (for await ...of)

Message types and formats depend on specific normalizers provided to streamNormalized function and are documented in detail in data normalization section.

sample message

Sample message produced by normalizeTrades

{
  type: 'trade',
  symbol: 'XBTUSD',
  exchange: 'bitmex',
  id: 'b1f4b309-80e2-1ffb-340b-2f7576f6ef0d',
  price: 7447.5,
  amount: 100,
  side: 'buy',
  timestamp: 2019-10-24T13:28:07.867Z,
  localTimestamp: 2019-10-24T13:28:07.938Z
}

Historical market data helpers

init(options)

This function doesn't affect real-time streaming functionality in any way, it's useful only for historical data replay.

When working with market data viareplay and replayNormalized functions by default only first day of each month of historical data is available for replay as well as locally cached historical data is stored in default location on disk (OS temp dir).

Init function allows providing apiKey received via email after ordering historical market data access via Tardis.dev website as well as customcacheDir. ApiKey can also be provided directly via options of replay and replayNormalized functions - that overrides anything that was provided via init.

const { init } = require('tardis-dev')

init({
  apiKey: 'YOUR API KEY',
  cacheDir: 'CUSTOM CACHE DIR PATH'
})

init options

name

type

defaults

apiKey

string | undefined

undefined

API key for Tardis.dev HTTP API - if not provided only first day of each month of historical data is accessible

cacheDir

string

<os.tmpdir>/.tardis-cache

path to local dir that will be used as cache location - if not provided default temp dir for given OS will be used

getExchangeDetails(exchange)

Given exchange id provides exchange details (available symbols, availability dates, available channels, pricing info etc) provided by exchanges/:exchange API endpoint.

const { getExchangeDetails } = require('tardis-dev')

const bitmexExchangeDetails = await getExchangeDetails('bitmex')

type of response returned by awaiting on getExchangeDetails

{
  id: string
  name: string
  enabled: boolean
  filterable: boolean
  availableSince: string
  availableSymbols: {
    id: string
    type: 'spot' | 'future' | 'perpetual' | 'option'
    availableSince: string
    availableTo?: string
  }[]
  availableChannels: string[]
  incidentReports: {
    from: string
    to: string
    status: string
    details: string
  }
}

getApiKeyAccessInfo(apiKey?)

Given apiKey provided as optional parameter or provided in init function provides information about what historical data is available for it - exchanges, date ranges, symbols.

const { getApiKeyAccessInfo } = require('tardis-dev')

const details = await getApiKeyAccessInfo('YOUR_API_KEY')

type of response returned by awaiting on getApiKeyAccessInfo()

{
  exchange: string
  from: string
  to: string
  symbols: string[]
}[]

clearCache()

Clears local data cache dir.

const { clearCache } = require('tardis-dev')

await clearCache()

Data normalization

Data normalization allows consuming market data feeds from various exchanges in consistent format.

tardis-dev has following built-in normalizers that can be provided to replayNormalized or streamNormalized functions:

If you're interested in how exactly data is mapped from exchange-native format to normalized one, please follow code in tardis-dev GitHub repository for each exchange and if you determined that mapping should be done differently please read "modifying built-in and adding custom normalizers" section.

const { streamNormalized, normalizeTrades, normalizeBookChanges,
  normalizeDerivativeTickers } = require('tardis-dev')

// or replayNormalized to replay normalized historical data
const messages = streamNormalized(
  {
    exchange: 'deribit',
    symbols: ['BTC-PERPETUAL']
  },
  normalizeTrades,
  normalizeBookChanges,
  normalizeDerivativeTickers
)

for await (const message of messages) {
  if (message.type === 'book_change') {
    // process normalized book change
  }
  if (message.type === 'trade') {
    // process normalized trade
  }
  if (message.type === 'derivative_ticker') {
    // process normalized derivative_ticker
  }
}

normalizeTrades

When passed as an arg toreplayNormalized or streamNormalized function provides normalized trade data for all supported exchanges.

{
  type: 'trade',
  symbol: 'XBTUSD',
  exchange: 'bitmex',
  id: '282a0445-0e3a-abeb-f403-11003204ea1b',
  price: 7996,
  amount: 50,
  side: 'sell',
  timestamp: 2019-10-23T10:32:49.669Z,
  localTimestamp: 2019-10-23T10:32:49.740Z
}

normalizeBookChanges

When passed as an arg toreplayNormalized or streamNormalized function provides normalized book_change data for all supported exchanges.

Provides initial L2 (market by price) order book snapshots (isSnapshot=true) plus incremental updates for each order book change. Please note that amount is the updated amount at that price level, not a delta. An amount of 0 indicates the price level can be removed.

{
  type: 'book_change',
  symbol: 'XBTUSD',
  exchange: 'bitmex',
  isSnapshot: false,
  bids: [],
  asks: [{ price: 7985, amount: 283318 }],
  timestamp: 2019-10-23T11:29:53.469Z,
  localTimestamp: 2019-10-23T11:29:53.469Z
}

normalizeDerivativeTickers

When passed as an arg toreplayNormalized or streamNormalized function provides normalized derivative_ticker data for supported exchanges that trade derivative instruments.

{
  type: 'derivative_ticker',
  symbol: 'BTC-PERPETUAL',
  exchange: 'deribit',
  lastPrice: 7987.5,
  openInterest: 84129491,
  fundingRate: -0.00001568,
  indexPrice: 7989.28,
  markPrice: 7987.56,
  timestamp: 2019-10-23T11:34:29.302Z,
  localTimestamp: 2019-10-23T11:34:29.416Z
}

disconnect message

When replayNormalized or streamNormalized functions options have withDisconnectMessages flag set to true and disconnect event occurred (eg.: WebSocket connection close) then disconnect message is being returned.

{
  type: 'disconnect',
  exchange: 'deribit',
  localTimestamp: 2019-10-23T11:34:29.416Z
}

Modifying built-in and adding custom normalizers

Intardis-dev data normalization is implemented via normalize factory functions provided to replayNormalized and streamNormalized functions. This design gives lots of flexibility by allowing replacing, extending and modifying built-in normalizers or adding new ones for new normalized data types without the need of forking the whole library.

Any normalize function provided to replayNormalized and streamNormalized functions needs to have following signature:

(exchange: string, localTimestamp: Date) => Mapper

Exchange is an exchange id for which mapper object needs to be returned for, localTimestamp is a date for which mapper is created (and is created after each disconnection). In most cases localTimestamp is not necessary for anything, but in certain cases like for example exchange API change it can be used to switch to different mapping logic like using new data channel that wasn't available until certain date.

Returned Mapper object has following signature:

{
  canHandle: (message: any) => boolean

  map(message: any, localTimestamp: Date): IterableIterator<Data> | undefined

  getFilters: (symbols?: string[]) => {channel: string, symbols?: string[]}[]
}

On every disconnection event that occurs normalize factory functions are called again to provide new Mapper objects with clean state if required (stateful mapping like BitMEX order book data that needs to persist state of mapping between price level id and price level and needs to 'reset' for each new connection). If mapper object is stateful it's required to always return new clean state object from normalize factory function or reset it's state in one way or another.

Normalized data returned by iterable iterator of Mapper.map method is expected to have a shape that has at least fields as described in normalized data type section below to play well with other tardis-dev functions like combine or compute.

normalized data type

{
  type: string
  symbol: string
  exchange: string
  timestamp: Date
  localTimestamp: Date
  name? : string | undefined
}

Adding custom normalizeLiquidations normalizer

Example implementation of custom normalizeLiquidations function that normalizes liquidations data for deribit exchange. Implementations for for other exchanges are left as an exercise for the reader.

type of messages provided by normalizeLiquidations

{
  type: 'liquidation'
  symbol: string
  exchange: string
  side: 'buy' | 'sell'
  amount: number
  price: number
  timestamp: Date
  localTimestamp: Date
}

implementation of deribitLiquidations mapper and normalizeLiquidations

// object that maps from deribit trades real-time channel to our custom
// liquidations messages if determines that trade was caused by liquidation

const deribitLiquidationsMapper = {
  
  // function that given message in deribit native format
  // https://docs.deribit.com/v2/#trades-instrument_name-interval
  // determines if such message can be mapped/normalized
  canHandle(message) {
    const params = message.params
    const channel = params !== undefined ? params.channel : undefined
    if (channel === undefined) {
      return false
    }
    return (
      channel.startsWith('trades') &&
      params.data.some(trade => trade.liquidation !== undefined)
    )
  },
  
  // given symbols returns filters that are provided 
  // as filters to replay & stream functions options
  // in our case we're interested in trades deribit channel
  // but it could be multiple channels in certain scenarions as well
  getFilters(symbols) {
    return [
      {
        channel: 'trades',
        symbols
      }
    ]
  },

  // map message that was determined that is can be handled via canHandle
  // to normalized liquidation message
  *map(message, localTimestamp) {
    for (const deribitLiquidationTrade of message.params.data) {
      if (deribitLiquidationTrade.liquidation === undefined) {
        continue
      }

      yield {
        type: 'liquidation',
        symbol: deribitLiquidationTrade.instrument_name,
        exchange: 'deribit',
        price: deribitLiquidationTrade.price,
        amount: deribitLiquidationTrade.amount,
        side: deribitLiquidationTrade.direction,
        timestamp: new Date(deribitLiquidationTrade.timestamp),
        localTimestamp: localTimestamp
      }
    }
  }
}

// provides factory function that given exchange returns mapper for it
// if such mapper if implemented
// in our case deribitLiquidationsMapper is stateless so we can return the same 
// instance every time

const normalizeLiquidations = (exchange, localTimestamp) => {
  if (exchange === 'deribit') {
    return deribitLiquidationsMapper
  }
  throw new Error(`normalizeLiquidations: ${exchange} not supported`)
}

normalizeLiquidations usage example

We could as well provide the same normalizeLiquidations function to streamNormalized function or use it together it with other normalizers (normalizeTrades etc.).

const { replayNormalized } = require('tardis-dev')

const liquidations = replayNormalized(
  {
    exchange: 'deribit',
    symbols: ['BTC-PERPETUAL'],
    from: '2019-07-01',
    to: '2019-07-02'
  },
  normalizeLiquidations
)

for await (const liquidation of liquidations) {
  console.log(liquidation)
}

Changing normalizeTrades for Binance exchange

Let's assume that default normalization of Binance exchange trades data doesn't fit our use case and we need to use @aggTrade stream as source of trade data instead of used by default@trade stream.

// add custom binance trades mapper that used @aggTrade stream as source of 
// normalized trades
const customBinanceTradesMapper = {
  canHandle(message) {
    return message.stream && message.stream.endsWith('@aggTrade')
  },

  getFilters(symbols) {
    if (symbols !== undefined) {
      symbols = symbols.map(s => s.toLocaleLowerCase())
    }
    // binance api expects all symbols to be lower cased
    return [
      {
        channel: 'aggTrade',
        symbols
      }
    ]
  },

  *map(message, localTimestamp) {
    const binanceAggTrade = message.data

    yield {
      type: 'trade',
      symbol: binanceAggTrade.s,
      exchange: 'binance',
      id: String(binanceAggTrade.a),
      price: Number(binanceAggTrade.p),
      amount: Number(binanceAggTrade.q),
      side: binanceAggTrade.m ? 'sell' : 'buy',
      timestamp: new Date(binanceAggTrade.T),
      localTimestamp: localTimestamp
    }
  }
}

// add new normalize function that only for binance exchange
// uses custom trades mapper and defaults to default normalizeTrades
// for other exchanges

const normalizeTradesWithBinancePatch = (exchange, localTimestamp) => {
  if (exchange === 'binance') {
    return customBinanceTradesMapper
  }
  return normalizeTrades(exchange)
}

normalizeTradesWithBinancePatch usage example

const { streamNormalized } = require('tardis-dev')

const messages = streamNormalized(
  {
    exchange: 'binance',
    symbols: ['btcusdt']
  },
  normalizeTradesWithBinancePatch
)

for await (const message of messages) {
  console.log(message)
}

Limit order book reconstruction

tardis-dev exports OrderBook class that when instantiated can process normalized book_change messages with order book snapshots and incremental updates and allows maintaining full local order book (level 2 - aggregated market-by-price) state both for real-time data as well as reconstructing historical order book state at any past point in time. It waits for first book_change message that is a snapshot (isSnaphot = true) and then applies subsequent updates to it. Single orderBook object can maintain order book state only for single symbol/instrument. It uses Red-Black tree data structure under the hood to efficiently maintain it's local state in sorted order.

const { replayNormalized, normalizeBookChanges, OrderBook } = require('tardis-dev')

const books = {
  XBTUSD: new OrderBook(),
  ETHUSD: new OrderBook()
}

const messages = replayNormalized(
  {
    exchange: 'bitmex',
    symbols: ['XBTUSD', 'ETHUSD'],
    from: '2019-05-01',
    to: '2019-05-02'
  },
  normalizeBookChanges
)

for await (const message of messages) {
  const orderBook = books[message.symbol]
  if (message.type === 'book_change') {
    orderBook.update(message)
  }
  const timestamp = message.localTimestamp.toISOString()
  // print best bid/ask for every exchange tick
  console.log(timestamp, orderBook.bestAsk(), orderBook.bestBid())
}

orderBook.update(bookChange)

Processes normalized book_change messages to update it's internal local state that maintains ordered bids and asks sets. It ignores any non snapshot book_change messages before initial snapshot is received. It should be called for every book_change message received for given symbol we'd like to reconstruct order book for.

orderBook.update({
  type: 'book_change',
  symbol: 'XBTUSD',
  exchange: 'bitmex',
  isSnapshot: false,
  bids: [],
  asks: [{ price: 7985, amount: 283318 }],
  timestamp: new Date(),
  localTimestamp: new Date()
})

orderBook.bestBid()

Returns book price level object for highest bid order (best bid) in order book or undefined if book doesn't have any bids (not initialized yet with initial snapshot).

const { price, amount } = orderBook.bestBid()

orderBook.bestAsk()

Returns book price level object for lowest ask order (best ask) in order book or undefined if book doesn't have any asks (not initialized yet with initial snapshot).

const { price, amount } = orderBook.bestAsk()

orderBook.asks()

Returns iterable iterator of book price level objects for all asks available ordered from the lowest to highest ask.

for (const ask of orderBook.asks()) {
    // process asks levels one by one from lowest to highest without 
    // creating in memory array for all levels
}

const orderedAsksArray = Array.from(orderBook.asks())

orderBook.bids()

Returns iterable iterator of book price level objects for all bids available ordered from highest to lowest bid.

for (const bid of orderBook.bids()) {
    // process bid levels one by one from highest to lowest without 
    // creating in memory array for all levels
}

const orderedBidsArray = Array.from(orderBook.bids())

book price level type

{
  price: number
  amount: number
}

Combining data streams

combine(...iterators)

Combine function given multiple async iterators combines them into single one. That allows synchronized historical market data replay and consolidated streaming of real-time data for multiple exchanges via single for await ...of loop.

Accepts async iterables of normalized messages as rest parameters and combines them returning single async iteratable.

For historical data replay it combines input async iterables messages by sorting them by localTimestamp in ascending order, this allows synchronized/ordered market data replay for multiple exchanges.

For real-time market data streaming it combines input async iterables messages in FIFO order by using Promise.race.

const { replayNormalized, normalizeTrades, combine } = require('tardis-dev')

const bitmexMessages = replayNormalized(
  {
    exchange: 'bitmex',
    symbols: ['XBTUSD'],
    from: '2019-05-01',
    to: '2019-05-02'
  },
  normalizeTrades
)

const deribitMessages = replayNormalized(
  {
    exchange: 'deribit',
    symbols: ['BTC-PERPETUAL'],
    from: '2019-05-01',
    to: '2019-05-02'
  },
  normalizeTrades
)

const combinedStream = combine(bitmexMessages, deribitMessages)

// order at which messages have historically arrived is preserved
for await (const message of combinedStream) {
  if (message.exchange === 'deribit') {
    // process deribit trades
    console.log(message)
  }

  if (message.exchange === 'bitmex') {
    // process bitmex trades
    console.log(message)
  }
}

Computing derived data locally

compute(iterator, ...computables)

Compute function allows computing various derived data locally via so called computables like:

If you're interested in adding custom computables like for example order book imbalance, volume imbalance, open interest or funding rate based bars please read "adding custom computable" section.

Compute function accepts async iterable producing normalized messages together withcomputables as a rest parameters and returns async iterable with normalized messages produced by provided iterable and additionally all computed messages based on provided computable functions. It computes and produces separate computed normalized messages for each symbol and exchange combination. When disconnect message is returned by provided async iterable it discards existing pending computables and starts from computing them from scratch.

const { streamNormalized, normalizeTrades, normalizeBookChanges,
  compute, computeTradeBars, computeBookSnapshots } = require('tardis-dev')

const bitmexMessages = streamNormalized(
  {
    exchange: 'bitmex',
    symbols: ['XBTUSD']
  },
  normalizeTrades,
  normalizeBookChanges
)

const messagesWithComputedTypes = compute(
  bitmexMessages,
  // 10 seconds time bars
  computeTradeBars({ kind: 'time', interval: 10 * 1000 }),
  // top 20 levels 50 millisecond order book snapshots
  computeBookSnapshots({ depth: 20, interval: 50 }),
  // volume based trade bar - 1 million vol buckets
  computeTradeBars({ kind: 'volume', interval: 1000 * 1000 })
)

for await (const message of messagesWithComputedTypes) {
  if (message.type === 'book_snapshot' || message.type === 'trade_bar') {
    console.log(message)
  }
}

computeTradeBars(options)

When provided to compute function, computes normalized trade_bar messages based on normalized trade data.

const { computeTradeBars } = require('tardis-dev')

computeTradeBars({ kind: 'volume', interval: 1000 })

compute trade bars options

name

type

default

kind

| 'time'

| 'volume'

| 'tick'

-

determines the way trades within a bar will be aggregated.

time - classic OHLC candles aggregated by time

volume - volume based trade bars agg by sum of trades amount tick - tick based trade bars, aggregated by trades count

interval

number

-

determines interval to aggregate by - for time based bars it's number of milliseconds, for volume based bars it's accumulated volume, for tick it's count of trades

name

string | undefined

undefined

optional custom name of trade_bar, if not specified computed name will be provided based on kind and interval options

type of message provided by computeTradeBars

{
  type: 'trade_bar'
  symbol: string
  exchange: string
  name: string
  interval: number
  kind: 'time' | 'volume' | 'tick'
  open: number
  high: number
  low: number
  close: number
  volume: number
  buyVolume: number
  sellVolume: number
  trades: number
  vwap: number
  openTimestamp: Date
  closeTimestamp: Date
  timestamp: Date
  localTimestamp: Date
}

sample normalized trade_bar message

{
  type: 'trade_bar',
  symbol: 'XBTUSD',
  exchange: 'bitmex',
  name: 'trade_bar_10000ms',
  interval: 10000,
  kind: 'time',
  open: 7623.5,
  high: 7623.5,
  low: 7623,
  close: 7623.5,
  volume: 37034,
  buyVolume: 24244,
  sellVolume: 12790,
  trades: 9,
  vwap: 7623.327320840309,
  openTimestamp: 2019-10-25T13:11:31.574Z,
  closeTimestamp: 2019-10-25T13:11:39.212Z,
  localTimestamp: 2019-10-25T13:11:40.369Z,
  timestamp: 2019-10-25T13:11:40.000Z,
}

computeBookSnapshots(options)

When provided to compute function, computes normalized book_snapshot messages based on normalized order book data. It produces new snapshots only if there is an actual change in order book state for requested depth.

const { computeBookSnapshots } = require('tardis-dev')

computeBookSnapshots({ depth: 20, interval: 50 })

compute book snapshots options

name

type

default

depth

number

-

number of closest bids and asks levels to provide snaphot for

interval

number

-

snapshot interval in milliseconds, if 0 is provided it computes snapshots real-time any time there is a change in order book state for requested depth

name

string | undefined

undefined

optional custom name of book_snapshot, if not specified computed name will be provided based on depth and interval options

type of message provided by computeBookSnaphots

{
  type: 'book_snapshot'
  symbol: string
  exchange: string
  name: string
  depth: number
  interval: number
  bids: { price: number; amount: number }[]
  asks: { price: number; amount: number }[]
  timestamp: Date
  localTimestamp: Date
}

sample normalized book_snapshot message

{
  type: 'book_snapshot',
  symbol: 'XBTUSD',
  exchange: 'bitmex',
  name: 'book_snapshot_2_50ms',
  depth: 2,
  interval: 50,
  bids: [
    { price: 7633.5, amount: 1906067 },
    { price: 7633, amount: 65319 }
  ],
  asks: [
    { price: 7634, amount: 1467849 },
    { price: 7634.5, amount: 67939 }
  ],
  timestamp: 2019-10-25T13:39:46.950Z,
  localTimestamp: 2019-10-25T13:39:46.961Z
}

Adding custom computable

Any computables provided to compute function need to be a factory functions with following signature:

() => Computable

where returned Computable object has following signature:

{
  sourceDataTypes: string[]
  compute(message: NormalizedData): IterableIterator<NormalizedData>
}

Computable.compute returned iterator is expected to provide objects that at least have fields as described in normalized data type section to play well with other tardis-dev functions like combine.

computeOrderBookImbalanceRatio()

Example implementation of custom computeOrderBookImbalanceRatio function that as a source data type uses book snapshots and based on it computes ratio of asks amounts (sell orders) to bids amounts (buy orders) for given book_snapshot depth. It may be used to determine relative buy or sell pressure.

type of messages produced by computeOrderBookImbalanceRatio

{
  type: 'book_imbalance'
  symbol: string
  exchange: string
  asksToBidsRatio: number
  timestamp: Date
  localTimestamp: Date
}

implementation of BookImbalanceRatioComputable computable and computeOrderBookImbalanceRatio factory function.

class BookImbalanceRatioComputable {
  constructor() {
    this.sourceDataTypes = ['book_snapshot']
  }

  *compute(bookSnapshot) {
    let bidsAmount = 0
    let asksAmount = 0

    for (let i = 0; i < bookSnapshot.depth; i++) {
      bidsAmount += bookSnapshot.bids[i].amount
      asksAmount += bookSnapshot.asks[i].amount
    }

    const asksToBidsRatio = asksAmount / bidsAmount
    yield {
      type: 'book_imbalance',
      symbol: bookSnapshot.symbol,
      exchange: bookSnapshot.exchange,
      asksToBidsRatio,
      timestamp: bookSnapshot.timestamp,
      localTimestamp: bookSnapshot.localTimestamp
    }
  }
}

const computeOrderBookImbalanceRatio = () => new BookImbalanceRatioComputable()

computeOrderBookImbalanceRatio usage example

Given implementation above we can compute book imbalance ratio for BitMEX real-time XBTUSD message stream. For this example we compute top 5 levels, 2 second book snapshots as a source to our custom computable. We need to have async iterable that produces book snapshots as a source to our book imbalance computable, hence two invocations of compute.

const { streamNormalized, normalizeBookChanges, 
        compute, computeBookSnapshots } = require('tardis-dev')

const bitmexMessages = streamNormalized(
  {
    exchange: 'bitmex',
    symbols: ['XBTUSD']
  },
  normalizeBookChanges
)

const messagesWithBookSnapshots = compute(
  bitmexMessages,
  computeBookSnapshots({ depth: 5, interval: 2 * 1000 })
)

const messagesWithComputedData = compute(
  messagesWithBookSnapshots,
  computeOrderBookImbalanceRatio
)

for await (const message of messagesWithComputedData) {
  if (message.type === 'book_imbalance') {
    console.log(message)
  }
}

Examples

Real-time spread across multiple exchanges

Example showing how to very easy display real-time spread and best bid/ask info across multiple exchanges at once. It can be easily adapted to do the same for historical data (replayNormalized instead of streamNormalized).

const { streamNormalized, normalizeBookChanges, combine, 
        compute, computeBookSnapshots } = require('tardis-dev')

const exchangesToStream = [
  { exchange: 'bitmex', symbols: ['XBTUSD'] },
  { exchange: 'deribit', symbols: ['BTC-PERPETUAL'] },
  { exchange: 'cryptofacilities', symbols: ['PI_XBTUSD'] }
]

// for each specified exchange call streamNormalized for it
// so we have multiple real-time streams for all specified exchanges
const realTimeStreams = exchangesToStream.map(e => {
  return streamNormalized(e, normalizeBookChanges)
})

// combine all real-time message streams into one
const messages = combine(...realTimeStreams)

// create book snapshots with depth1 that are produced
// every time best bid/ask info is changed
// effectively computing real-time quotes
const realTimeQuoteComputable = computeBookSnapshots({
  depth: 1,
  interval: 0,
  name: 'realtime_quote'
})

// compute real-time quotes for combines real-time messages
const messagesWithQuotes = compute(messages, realTimeQuoteComputable)

const spreads = {}

// print spreads info every 100ms
setInterval(() => {
  console.clear()
  console.log(spreads)
}, 100)

// update spreads info real-time
for await (const message of messagesWithQuotes) {
  if (message.type === 'book_snapshot') {
    spreads[message.exchange] = {
      spread: message.asks[0].price - message.bids[0].price,
      bestBid: message.bids[0],
      bestAsk: message.asks[0]
    }
  }
}

Replay large historical trades across multiple exchanges

Example showing replaying large historical trades across multiple exchanges as those happened.

const { replayNormalized, normalizeTrades, combine } = require('tardis-dev')

const from = '2019-07-01'
const to = '2019-07-02'
const exchanges = [
  { exchange: 'bitmex', symbols: ['XBTUSD'] },
  { exchange: 'deribit', symbols: ['BTC-PERPETUAL'] },
  { exchange: 'cryptofacilities', symbols: ['PI_XBTUSD'] }
]

const historicalTrades = exchanges.map(exchange => {
  return replayNormalized({ from, to, ...exchange }, normalizeTrades)
})

const LARGE_TRADE_THRESHOLD = 100 * 1000 // 100k  contracts

for await (const trade of combine(...historicalTrades)) {
  if (trade.amount >= LARGE_TRADE_THRESHOLD) {
    console.log(trade.exchange, trade)
  }
}

Seamless switching between real-time streaming and historical market data replay

Example showing simple pattern of providing async iterable of market data messages to the function that can process them no matter if it's is real-time or historical market data. That effectively enables having the same 'data pipeline' for backtesting and live trading.

const { replayNormalized, streamNormalized, normalizeTrades,
       compute, computeTradeBars } = require('tardis-dev')

const historicalMessages = replayNormalized(
  {
    exchange: 'bitmex',
    symbols: ['XBTUSD'],
    from: '2019-08-01',
    to: '2019-08-02'
  },
  normalizeTrades
)

const realTimeMessages = streamNormalized(
  {
    exchange: 'bitmex',
    symbols: ['XBTUSD']
  },
  normalizeTrades
)

async function produceVolumeBasedTradeBars(messages) {
  const withVolumeTradeBars = compute(
    messages,
    computeTradeBars({
      kind: 'volume',
      interval: 100 * 1000 // aggregate by 100k contracts volume
    })
  )

  for await (const message of withVolumeTradeBars) {
    if (message.type === 'trade_bar') {
      console.log(message.name, message)
    }
  }
}

produceVolumeBasedTradeBars(historicalMessages)

// or for real time data
//  await produceVolumeBasedTradeBars(realTimeMessages)

Real-time funding rate and open interest across multiple exchanges

Example showing how to quickly display real-time funding rate and open interest info across multiple exchanges at once.

const { streamNormalized, normalizeDerivativeTickers, 
       combine } = require('tardis-dev')

const exchangesToStream = [
  { exchange: 'bitmex', symbols: ['XBTUSD'] },
  { exchange: 'deribit', symbols: ['BTC-PERPETUAL'] },
  { exchange: 'cryptofacilities', symbols: ['PI_XBTUSD'] },
  { exchange: 'okex-swap', symbols: ['BTC-USD-SWAP'] },
  { exchange: 'binance-futures', symbols: ['BTCUSDT'] },
  { exchange: 'bitfinex-derivatives', symbols: ['BTCF0:USTF0'] }
]

const realTimeStreams = exchangesToStream.map(e => {
  return streamNormalized(e, normalizeDerivativeTickers)
})

// combine all real-time message streams into one
const messages = combine(...realTimeStreams)

const funding = {}

// print funding info every 100ms
setInterval(() => {
  console.clear()
  console.log(new Date().toISOString(), funding)
}, 100)

// update funding info real-time
for await (const message of messages) {
  if (message.type === 'derivative_ticker') {
    funding[message.exchange] = {
      symbol: message.symbol,
      fundingRate: message.fundingRate,
      lastPrice: message.lastPrice,
      openInterest: message.openInterest,
      markPrice: message.markPrice
    }
  }
}

Saving historical funding, index and open interest data to CSV file

Example showing how to write Deribit exchange historical funding, index and open interest data into CSV.

const { replayNormalized, normalizeDerivativeTickers } = require('tardis-dev')
const fs = require('fs')
const csv = require('fast-csv')
const { once } = require('events')
const fileStream = fs.createWriteStream('./deribit_funding.csv')
const csvStream = csv.format({ headers: true })
csvStream.pipe(fileStream)

const messages = replayNormalized(
  {
    exchange: 'deribit',
    from: '2019-04-01',
    to: '2019-04-02',
    symbols: ['BTC-PERPETUAL']
  },
  normalizeDerivativeTickers
)

for await (const message of messages) {
  if (message.type === 'derivative_ticker') {
    const ok = csvStream.write({
      fundingRate: message.fundingRate,
      lastPrice: message.lastPrice,
      openInterest: message.openInterest,
      indexPrice: message.indexPrice,
      timestamp: message.timestamp.toISOString()
    })

    if (!ok) {
      await once(csvStream, 'drain')
    }
  }
}

Computing simple moving average of volume based trade bars

Example showing implementation of SimpleMovingAverageComputable that calculates average of trade bar closes prices for specified rolling window in incremental way. It uses CircularBuffer under the hood.

class CircularBuffer {
  constructor(_bufferSize) {
    this._bufferSize = _bufferSize
    this._buffer = []
    this._index = 0
  }
  append(value) {
    const isFull = this._buffer.length === this._bufferSize
    let poppedValue
    if (isFull) {
      poppedValue = this._buffer[this._index]
    }
    this._buffer[this._index] = value
    this._index = (this._index + 1) % this._bufferSize
    return poppedValue
  }
  *items() {
    for (let i = 0; i < this._buffer.length; i++) {
      const index = (this._index + i) % this._buffer.length
      yield this._buffer[index]
    }
  }
  get count() {
    return this._buffer.length
  }
}

class SimpleMovingAverageComputable {
  constructor({ periods }) {
    this.sourceDataTypes = ['trade_bar']
    this._average = 0
    this._circularBuffer = new CircularBuffer(periods)
  }

  *compute(tradeBar) {
    const result = this._circularBuffer.append(tradeBar.close)
    const poppedVal = result !== undefined ? result : this._average
    const increment = (tradeBar.close - poppedVal) / this._circularBuffer.count
    this._average = this._average + increment

    yield {
      type: 'sma',
      symbol: tradeBar.symbol,
      exchange: tradeBar.exchange,
      name: `sma_${tradeBar.name}`,
      average: this._average,
      interval: tradeBar.interval,
      kind: tradeBar.kind,
      timestamp: tradeBar.timestamp,
      localTimestamp: tradeBar.localTimestamp
    }
  }
}

const computeSimpleMovingAverages = options => () =>
  new SimpleMovingAverageComputable(options)

Usage

const { streamNormalized, normalizeTrades, 
       compute, computeTradeBars } = require('tardis-dev')

const messages = streamNormalized(
  {
    exchange: 'bitmex',
    symbols: ['XBTUSD']
  },
  normalizeTrades
)

const withTradeBars = compute(
  messages,
  computeTradeBars({
    kind: 'volume',
    interval: 1000
  })
)
const withSimpleMovingAverage = compute(
  withTradeBars,
  computeSimpleMovingAverages({ period: 5 })
)

for await (message of withSimpleMovingAverage) {
  if (message.type === 'sma') {
    console.log(message)
  }
}

Last updated