Links

Node.js Client

Convenient access to tick-level historical and real-time cryptocurrency market data via Node.js

Introduction

Node.js tardis-dev library provides convenient access to tick-level historical and real-time cryptocurrency market data both in exchange-native and normalized formats. Instead of callbacks it relies on async iteration (for await ...of) enabling composability features like seamless switching between real-time data streaming and historical data replay or computing derived data locally.
historical market data replay
real-time market data streaming
const tardis = require('tardis-dev')
const { replayNormalized, normalizeTrades, normalizeBookChanges } = tardis
const messages = replayNormalized(
{
exchange: 'bitmex',
symbols: ['XBTUSD', 'ETHUSD'],
from: '2019-05-01',
to: '2019-05-02'
},
normalizeTrades,
normalizeBookChanges
)
for await (const message of messages) {
console.log(message)
}
const tardis = require('tardis-dev')
const { streamNormalized, normalizeTrades, normalizeBookChanges } = tardis
const messages = streamNormalized(
{
exchange: 'bitmex',
symbols: ['XBTUSD', 'ETHUSD']
},
normalizeTrades,
normalizeBookChanges
)
for await (const message of messages) {
console.log(message)
}
Try this code live on RunKit

Features

Examples (TOC)

Installation

Requires Node.js v12+ installed.
npm install tardis-dev --save

Debugging and logging

tardis-dev lib uses debug package for verbose logging and debugging purposes that can be enabled via DEBUG environment variable set to tardis-dev*.

Usage with TypeScript

Simply change from require
const { replay, stream } = require('tardis-dev')
to ES Modules import
import { replay, stream } from 'tardis-dev'
to enjoy first class TypeScript typings.

Replaying historical market data

See historical data details page to get detailed information about historical market data available for each exchange.

replay(options)

Replays historical market data messages for given replay options in exchange-native format. Historical market data is being fetched efficiently (in parallel) from the Tardis.dev HTTP API and cached locally. Returns async iterable.
const { replay } = require('tardis-dev')
const messages = replay({
exchange: 'bitmex',
filters: [
{ channel: 'trade', symbols: ['XBTUSD'] },
{ channel: 'orderBookL2', symbols: ['XBTUSD'] }
],
from: '2019-05-01',
to: '2019-05-02'
})
for await (const message of messages) {
console.log(message)
}
Try this code live on RunKit
stream(options) is the real-time counterpart of replayfunction, returning real-time market data in the same format.

replay options

name
type
default
exchange
string
-
requested exchange id - one of allowed values
filters
{channel:string, symbols?: string[]}[]
[]
optional filters of requested historical data feed - use getExchangeDetails function to get allowed channels and symbols ids for requested exchange
from
string
-
replay period start date (UTC) in a format recognized by the Date.parse(), e.g., 2019-04-01
to
string
-
replay period end date (UTC) in a format recognized by the Date.parse(), e.g., 2019-04-02
skipDecoding
boolean | undefined
undefined
when set to true returns messages as buffers instead of decoding them to objects
withDisconnects
boolean | undefined
undefined
when set to true returns message with value undefined for events when connection that was recording the historical data got disconnected
apiKey
string | undefined
undefined
API key for Tardis.dev HTTP API - if not provided only first day of each month of historical data is accessible. It can also be set via init function for all replay calls.

type of messages provided by replay iterator (for await ...of)

type Message =
| {
localTimestamp: Date // local timestamp when message has been received
message: any // message in exchange-native data format
}
// when skipDecoding is set to true
| {
localTimestamp: Buffer
message: Buffer
}
// when withDisconnects is set to true whole message can be undefined (disconnect)
Message | undefined

sample message

{
localTimestamp: 2019-05-01T00:00:07.013Z,
message: {
table: 'orderBookL2',
action: 'update',
data: [ { symbol: 'XBTUSD', id: 8799474100, side: 'Buy', size: 167340 } ]
}
}

replayNormalized(options, ...normalizers)

Replays historical market data messages for given replay options and normalizes messages using normalizers provided as rest arguments. Historical market data is being fetched efficiently (in parallel) from the Tardis.dev HTTP API and cached locally. Returns async iterable.
const tardis = require('tardis-dev')
const { replayNormalized, normalizeTrades, normalizeBookChanges } = tardis
const messages = replayNormalized(
{
exchange: 'bitmex',
symbols: ['XBTUSD', 'ETHUSD'],
from: '2019-05-01',
to: '2019-05-02'
},
normalizeTrades,
normalizeBookChanges
)
for await (const message of messages) {
console.log(message)
}
Try this code live on RunKit
streamNormalized(options, ...normalizers) is the real-time counterpart of replayNormalized function, returning real-time market data in the same format.

replay normalized options

name
type
default
exchange
string
-
requested exchange id - one of allowed values
symbols
string[] | undefined
undefined
optional symbols for requested data feed - use getExchangeDetails function to get allowed symbols ids for requested exchange
from
string
-
replay period start date (UTC) in a format recognized by the Date.parse(), e.g., 2019-04-01
to
string
-
replay period end date (UTC) in a format recognized by the Date.parse() e.g., 2019-04-02
withDisconnectMessages
boolean | undefined
undefined
when set to true returns disconnect messages for events when connection that was recording the historical data got disconnected
apiKey
string | undefined
undefined
API key for Tardis.dev HTTP API - if not provided only first day of each month of historical data is accessible. It can also be set via init function for all replayNormalized calls.

Built-in normalizers

replayNormalized function accepts any number of normalizers as rest parameters that map from exchange-native format to normalized data format. tardis-dev ships with built in ones that normalize trades, order book and derivative ticker data but also allows adding custom ones.

types of messages provided by replayNormalized iterator (for await ...of)

Message types and formats depend on specific normalizers provided to replayNormalized function and are documented in detail in data normalization section.

sample message

Sample message produced by normalizeBookChanges
{
type: 'book_change',
symbol: 'XBTUSD',
exchange: 'bitmex',
isSnapshot: false,
bids: [{ price: 5263.5, amount: 1780043 }],
asks: [],
timestamp: 2019-05-01T00:00:04.430Z,
localTimestamp: 2019-05-01T00:00:04.430Z
}

Streaming real-time market data

stream(options)

Streams real-time market data messages for given stream options in exchange-native format. It connects directly to exchanges WebSocket APIs and transparently restarts closed, broken or stale connections (open connections without data being send for specified amount of time). Returns async iterable.
const { stream } = require('tardis-dev')
const messages = stream({
exchange: 'bitmex',
filters: [
{ channel: 'trade', symbols: ['XBTUSD'] },
{ channel: 'orderBookL2', symbols: ['XBTUSD'] }
]
})
for await (const message of messages) {
console.log(message)
}
Try this code live on RunKit
replay(options) is the historical market data counterpart of stream function, returning historical market data in the same format.

stream options

name
type
default
exchange
string
-
requested exchange id - one of allowed values
filters
{channel:string, symbols?: string[]}[]
[]
optional filters of requested real-time data feed - use getExchangeDetails to get allowed channels and symbols ids for requested exchange
skipDecoding
boolean | undefined
undefined
when set to true returns messages as buffers instead of decoding them to objects
withDisconnects
boolean | undefined
undefined
when set to true returns message with value undefined for real-time stream disconnect events
timeoutIntervalMS
number
10000
specifies time in milliseconds after which connection is restarted if no message has been received from the exchange
onError
(err) => void | undefined
undefined
Optional callback invoked when real-time WebSocket connection error occurs, useful for custom error logging etc.

type of messages provided by stream iterator (for await ...of)

type Message =
| {
localTimestamp: Date // local timestamp when message has been received
message: any // message in exchange-native data format
}
// when skipDecoding is set to true
| {
localTimestamp: Buffer
message: Buffer
}
// when withDisconnects is set to true whole message can be undefined
Message | undefined

sample message

{
localTimestamp: 2019-10-22T09:24:51.025Z,
message: {
table: 'orderBookL2',
action: 'update',
data: [
{ symbol: 'XBTUSD', id: 8799172400, side: 'Sell', size: 369501 }
]
}
}

streamNormalized(options, ...normalizers)

Streams real-time market data messages for given stream options and normalizes messages using provided normalizers provided as rest arguments. It connects directly to exchanges WebSocket APIs and transparently restarts closed, broken or stale connections (open connections without data being send for specified amount of time). Returns async iterable.
const tardis = require('tardis-dev')
const { streamNormalized, normalizeTrades, normalizeBookChanges } = tardis
const messages = streamNormalized(
{
exchange: 'bitmex',
symbols: ['XBTUSD', 'ETHUSD']
},
normalizeTrades,
normalizeBookChanges
)
for await (const message of messages) {
console.log(message)
}
Try this code live on RunKit
replayNormalized(options) is the historical counterpart of streamNormalized function, returning historical market data in the same format.

stream normalized options

name
type
default
exchange
string
-
requested exchange id - one of allowed values
symbols
string[] | undefined
undefined
instruments symbols for requested data feed
withDisconnectMessages
boolean | undefined
undefined
when set to true returns disconnect messages for real-time stream disconnect events
timeoutIntervalMS
number
10000
specifies time in milliseconds after which connection is restarted if no message has been received from the exchange
onError
((err) => void) | undefined
undefined
Optional callback invoked when real-time WebSocket connection or mapping error occurs, useful for custom error logging etc.

Built-in normalizers

streamNormalized function can accept any number of custom normalizers as rest parameters that map from exchange-native format to normalized data format. tardis-dev ships with built in ones that normalize trades, order book and derivative ticker data but also allows adding custom ones.

types of messages provided by streamNormalized iterator (for await ...of)

Message types and formats depend on specific normalizers provided to streamNormalized function and are documented in detail in data normalization section.

sample message

Sample message produced by normalizeTrades
{
type: 'trade',
symbol: 'XBTUSD',
exchange: 'bitmex',
id: 'b1f4b309-80e2-1ffb-340b-2f7576f6ef0d',
price: 7447.5,
amount: 100,
side: 'buy',
timestamp: 2019-10-24T13:28:07.867Z,
localTimestamp: 2019-10-24T13:28:07.938Z
}

Historical market data helpers

init(options)

This function doesn't affect real-time streaming functionality in any way, it's useful only for historical data replay.
When working with market data viareplay and replayNormalized functions by default only first day of each month of historical data is available for replay as well as locally cached historical data is stored in default location on disk (OS temp dir).
Init function allows providing apiKey received via email after ordering historical market data access via Tardis.dev website as well as customcacheDir. ApiKey can also be provided directly via options of replay and replayNormalized functions - that overrides anything that was provided via init.
const { init } = require('tardis-dev')
init({
apiKey: 'YOUR API KEY',
cacheDir: 'CUSTOM CACHE DIR PATH'
})

init options

name
type
defaults
apiKey
string | undefined
undefined
API key for Tardis.dev HTTP API - if not provided only first day of each month of historical data is accessible
cacheDir
string
<os.tmpdir>/.tardis-cache
path to local dir that will be used as cache location - if not provided default temp dir for given OS will be used

getExchangeDetails(exchange)

Given exchange id provides exchange details (available symbols, availability dates, available channels, pricing info etc) provided by exchanges/:exchange API endpoint.
const { getExchangeDetails } = require('tardis-dev')
const bitmexExchangeDetails = await getExchangeDetails('bitmex')
Try this code live on RunKit

type of response returned by awaiting on getExchangeDetails

{
id: string
name: string
enabled: boolean
filterable: boolean
availableSince: string
availableSymbols: {
id: string
type: 'spot' | 'future' | 'perpetual' | 'option'
availableSince: string
availableTo?: string
}[]
availableChannels: string[]
incidentReports: {
from: string
to: string
status: string
details: string
}
}

getApiKeyAccessInfo(apiKey?)

Given apiKey provided as optional parameter or provided in init function provides information about what historical data is available for it - exchanges, date ranges, symbols.
const { getApiKeyAccessInfo } = require('tardis-dev')
const details = await getApiKeyAccessInfo('YOUR_API_KEY')

type of response returned by awaiting on getApiKeyAccessInfo()

{
exchange: string
from: string
to: string
symbols: string[]
}[]

clearCache()

Clears local data cache dir.
const { clearCache } = require('tardis-dev')
await clearCache()

Data normalization

Data normalization allows consuming market data feeds from various exchanges in consistent format.
tardis-dev has following built-in normalizers that can be provided to replayNormalized or streamNormalized functions:
If you're interested in how exactly data is mapped from exchange-native format to normalized one, please follow code in tardis-dev GitHub repository for each exchange and if you determined that mapping should be done differently please read "modifying built-in and adding custom normalizers" section.
const { streamNormalized, normalizeTrades, normalizeBookChanges,
normalizeDerivativeTickers } = require('tardis-dev')
// or replayNormalized to replay normalized historical data
const messages = streamNormalized(
{
exchange: 'deribit',
symbols: ['BTC-PERPETUAL']
},
normalizeTrades,
normalizeBookChanges,
normalizeDerivativeTickers
)
for await (const message of messages) {
if (message.type === 'book_change') {
// process normalized book change
}
if (message.type === 'trade') {
// process normalized trade
}
if (message.type === 'derivative_ticker') {
// process normalized derivative_ticker
}
}
Try this code live on RunKit

normalizeTrades

When passed as an arg toreplayNormalized or streamNormalized function provides normalized trade data for all supported exchanges.
sample message
type definition
{
type: 'trade',
symbol: 'XBTUSD',
exchange: 'bitmex',
id: '282a0445-0e3a-abeb-f403-11003204ea1b',
price: 7996,
amount: 50,
side: 'sell',
timestamp: 2019-10-23T10:32:49.669Z,
localTimestamp: 2019-10-23T10:32:49.740Z
}
{
type: 'trade'
symbol: string
exchange: string
id: string | undefined
price: number
amount: number
side: 'buy' | 'sell' | 'unknown' // liquidity taker side (aggressor)
timestamp: Date
localTimestamp: Date
}

normalizeBookChanges

When passed as an arg toreplayNormalized or streamNormalized function provides normalized book_change data for all supported exchanges.
Provides initial L2 (market by price) order book snapshots (isSnapshot=true) plus incremental updates for each order book change. Please note that amount is the updated amount at that price level, not a delta. An amount of 0 indicates the price level can be removed.
sample message
type definition
{
type: 'book_change',
symbol: 'XBTUSD',
exchange: 'bitmex',
isSnapshot: false,
bids: [],
asks: [{ price: 7985, amount: 283318 }],
timestamp: 2019-10-23T11:29:53.469Z,
localTimestamp: 2019-10-23T11:29:53.469Z
}
{
type: 'book_change'
symbol: string
exchange: string
isSnapshot: boolean
bids: { price: number; amount: number }[]
asks: { price: number; amount: number }[]
timestamp: Date
localTimestamp: Date
}

normalizeDerivativeTickers

When passed as an arg toreplayNormalized or streamNormalized function provides normalized derivative_ticker data for supported exchanges that trade derivative instruments.
sample message
type definition
{
type: 'derivative_ticker',
symbol: 'BTC-PERPETUAL',
exchange: 'deribit',
lastPrice: 7987.5,
openInterest: 84129491,
fundingRate: -0.00001568,
indexPrice: 7989.28,
markPrice: 7987.56,
timestamp: 2019-10-23T11:34:29.302Z,
localTimestamp: 2019-10-23T11:34:29.416Z
}
{
type: 'derivative_ticker'
symbol: string
exchange: string
lastPrice: number | undefined
openInterest: number | undefined
fundingRate: number | undefined
indexPrice: number | undefined
markPrice: number | undefined
timestamp: Date
localTimestamp: Date
}

disconnect message

When replayNormalized or streamNormalized functions options have withDisconnectMessages flag set to true and disconnect event occurred (eg.: WebSocket connection close) then disconnect message is being returned.
sample message
type definition
{
type: 'disconnect',
exchange: 'deribit',
localTimestamp: 2019-10-23T11:34:29.416Z
}
{
type: 'disconnect'
exchange: string
localTimestamp: Date
}

Modifying built-in and adding custom normalizers

Intardis-dev data normalization is implemented via normalize factory functions provided to replayNormalized and streamNormalized functions. This design gives lots of flexibility by allowing replacing, extending and modifying built-in normalizers or adding new ones for new normalized data types without the need of forking the whole library.
Any normalize function provided to replayNormalized and streamNormalized functions needs to have following signature:
(exchange: string, localTimestamp: Date) => Mapper
Exchange is an exchange id for which mapper object needs to be returned for, localTimestamp is a date for which mapper is created (and is created after each disconnection). In most cases localTimestamp is not necessary for anything, but in certain cases like for example exchange API change it can be used to switch to different mapping logic like using new data channel that wasn't available until certain date.
Returned Mapper object has following signature:
{
canHandle: (message: any) => boolean
map(message: any, localTimestamp: Date): IterableIterator<Data> | undefined
getFilters: (symbols?: string[]) => {channel: string, symbols?: string[]}[]
}
On every disconnection event that occurs normalize factory functions are called again to provide new Mapper objects with clean state if required (stateful mapping like BitMEX order book data that needs to persist state of mapping between price level id and price level and needs to 'reset' for each new connection). If mapper object is stateful it's required to always return new clean state object from normalize factory function or reset it's state in one way or another.
Normalized data returned by iterable iterator of Mapper.map method is expected to have a shape that has at least fields as described in normalized data type section below to play well with other tardis-dev functions like combine or compute.

normalized data type

{
type: string
symbol: string
exchange: string
timestamp: Date
localTimestamp: Date
name? : string | undefined
}

Adding custom normalizeLiquidations normalizer

Example implementation of custom normalizeLiquidations function that normalizes liquidations data for deribit exchange. Implementations for for other exchanges are left as an exercise for the reader.
type of messages provided by normalizeLiquidations
{
type: 'liquidation'
symbol: string
exchange: string
side: 'buy' | 'sell'
amount: number
price: number
timestamp: Date
localTimestamp: Date
}
implementation of deribitLiquidations mapper and normalizeLiquidations
// object that maps from deribit trades real-time channel to our custom
// liquidations messages if determines that trade was caused by liquidation
const deribitLiquidationsMapper = {
// function that given message in deribit native format
// https://docs.deribit.com/v2/#trades-instrument_name-interval
// determines if such message can be mapped/normalized
canHandle(message) {
const params = message.params
const channel = params !== undefined ? params.channel : undefined
if (channel === undefined) {
return false
}
return (
channel.startsWith('trades') &&
params.data.some(trade => trade.liquidation !== undefined)
)
},
// given symbols returns filters that are provided
// as filters to replay & stream functions options
// in our case we're interested in trades deribit channel
// but it could be multiple channels in certain scenarions as well
getFilters(symbols) {
return [
{
channel: 'trades',
symbols
}
]
},
// map message that was determined that is can be handled via canHandle
// to normalized liquidation message
*map(message, localTimestamp) {
for (const deribitLiquidationTrade of message.params.data) {
if (deribitLiquidationTrade.liquidation === undefined) {
continue
}
yield {
type: 'liquidation',
symbol: deribitLiquidationTrade.instrument_name,
exchange: 'deribit',
price: deribitLiquidationTrade.price,
amount: deribitLiquidationTrade.amount,
side: deribitLiquidationTrade.direction,
timestamp: new Date(deribitLiquidationTrade.timestamp),
localTimestamp: localTimestamp
}
}
}
}
// provides factory function that given exchange returns mapper for it
// if such mapper if implemented
// in our case deribitLiquidationsMapper is stateless so we can return the same
// instance every time
const normalizeLiquidations = (exchange, localTimestamp) => {
if (exchange === 'deribit') {
return deribitLiquidationsMapper
}
throw new Error(`normalizeLiquidations: ${exchange} not supported`)
}
normalizeLiquidations usage example
We could as well provide the same normalizeLiquidations function to streamNormalized function or use it together it with other normalizers (normalizeTrades etc.).
const { replayNormalized } = require('tardis-dev')
const liquidations = replayNormalized(
{
exchange: 'deribit',
symbols: ['BTC-PERPETUAL'],
from: '2019-07-01',
to: '2019-07-02'
},
normalizeLiquidations
)
for await (const liquidation of liquidations) {
console.log(liquidation)
}
Try this code live on RunKit

Changing normalizeTrades for Binance exchange

Let's assume that default normalization of Binance exchange trades data doesn't fit our use case and we need to use @aggTrade stream as source of trade data instead of used by default@trade stream.
// add custom binance trades mapper that used @aggTrade stream as source of
// normalized trades
const customBinanceTradesMapper = {
canHandle(message) {
return message.stream && message.stream.endsWith('@aggTrade')
},
getFilters(symbols) {
if (symbols !== undefined) {
symbols = symbols.map(s => s.toLocaleLowerCase())
}
// binance api expects all symbols to be lower cased
return [
{
channel: 'aggTrade',
symbols
}
]
},
*map(message, localTimestamp) {
const binanceAggTrade = message.data
yield {
type: 'trade',
symbol: binanceAggTrade.s,
exchange: 'binance',
id: String(binanceAggTrade.a),
price: Number(binanceAggTrade.p),
amount: Number(binanceAggTrade.q),
side: binanceAggTrade.m ? 'sell' : 'buy',
timestamp: new Date(binanceAggTrade.T),
localTimestamp: localTimestamp
}
}
}
// add new normalize function that only for binance exchange
// uses custom trades mapper and defaults to default normalizeTrades
// for other exchanges
const normalizeTradesWithBinancePatch = (exchange, localTimestamp) => {
if (exchange === 'binance') {
return customBinanceTradesMapper
}
return normalizeTrades(exchange)
}
normalizeTradesWithBinancePatch usage example
const { streamNormalized } = require('tardis-dev')
const messages = streamNormalized(
{
exchange: 'binance',
symbols: ['btcusdt']
},
normalizeTradesWithBinancePatch
)
for await (const message of messages) {
console.log(message)
}
Try this code live on RunKit

Limit order book reconstruction

tardis-dev exports OrderBook class that when instantiated can process normalized book_change messages with order book snapshots and incremental updates and allows maintaining full local order book (level 2 - aggregated market-by-price) state both for real-time data as well as reconstructing historical order book state at any past point in time. It waits for first book_change message that is a snapshot (isSnaphot = true) and then applies subsequent updates to it. Single orderBook object can maintain order book state only for single symbol/instrument. It uses Red-Black tree data structure under the hood to efficiently maintain it's local state in sorted order.
historical order book reconstruction
maintaining order book for real-time stream
const { replayNormalized, normalizeBookChanges, OrderBook } = require('tardis-dev')
const books = {
XBTUSD: new OrderBook(),
ETHUSD: new OrderBook()
}
const messages = replayNormalized(
{
exchange: 'bitmex',
symbols: ['XBTUSD', 'ETHUSD'],
from: '2019-05-01',
to: '2019-05-02'
},
normalizeBookChanges
)
for await (const message of messages) {
const orderBook = books[message.symbol]
if (message.type === 'book_change') {
orderBook.update(message)
}
const timestamp = message.localTimestamp.toISOString()
// print best bid/ask for every exchange tick
console.log(timestamp, orderBook.bestAsk(), orderBook.bestBid())
}
const { streamNormalized, normalizeBookChanges, OrderBook } = require('tardis-dev')
const books = {
XBTUSD: new OrderBook(),
ETHUSD: new OrderBook()
}
const messages = streamNormalized(
{
exchange: 'bitmex',
symbols: ['XBTUSD', 'ETHUSD']
},
normalizeBookChanges
)
for await (const message of messages) {
const orderBook = books[message.symbol]
if (message.type === 'book_change') {
orderBook.update(message)
}
const timestamp = message.localTimestamp.toISOString()
// print best bid/ask for every exchange tick
console.log(timestamp, orderBook.bestAsk(), orderBook.bestBid())
}
Try this code live on RunKit

orderBook.update(bookChange)

Processes normalized book_change messages to update it's internal local state that maintains ordered bids and asks sets. It ignores any non snapshot book_change messages before initial snapshot is received. It should be called for every book_change message received for given symbol we'd like to reconstruct order book for.
orderBook.update({
type: 'book_change',
symbol: 'XBTUSD',
exchange: 'bitmex',
isSnapshot: false,
bids: [],
asks: [{ price: 7985, amount: 283318 }],
timestamp: new Date(),
localTimestamp: new Date()
})

orderBook.bestBid()

Returns book price level object for highest bid order (best bid) in order book or undefined if book doesn't have any bids (not initialized yet with initial snapshot).
const { price, amount } = orderBook.bestBid()

orderBook.bestAsk()

Returns book price level object for lowest ask order (best ask) in order book or undefined if book doesn't have any asks (not initialized yet with initial snapshot).
const { price, amount } = orderBook.bestAsk()

orderBook.asks()

Returns iterable iterator of book price level objects for all asks available ordered from the lowest to highest ask.
for (const ask of orderBook.asks()) {
// process asks levels one by one from lowest to highest without
// creating in memory array for all levels
}
const orderedAsksArray = Array.from(orderBook.asks())

orderBook.bids()

Returns iterable iterator of book price level objects for all bids available ordered from highest to lowest bid.
for (const bid of orderBook.bids()) {
// process bid levels one by one from highest to lowest without
// creating in memory array for all levels
}
const orderedBidsArray = Array.from(orderBook.bids())

book price level type

{
price: number
amount: number
}

Combining data streams

combine(...iterators)

Combine function given multiple async iterators combines them into single one. That allows synchronized historical market data replay and consolidated streaming of real-time data for multiple exchanges via single for await ...of loop.
Accepts async iterables of normalized messages as rest parameters and combines them returning single async iteratable.
For historical data replay it combines input async iterables messages by sorting them by localTimestamp in ascending order, this allows synchronized/ordered market data replay for multiple exchanges.
For real-time market data streaming it combines input async iterables messages in FIFO order by using Promise.race.
combining historical market data from multiple exchanges
combining real-time stream of market data from multiple exchanges
const { replayNormalized, normalizeTrades, combine } = require('tardis-dev')
const bitmexMessages = replayNormalized(
{
exchange: 'bitmex',
symbols: ['XBTUSD'],
from: '2019-05-01',
to: '2019-05-02'
},
normalizeTrades
)
const deribitMessages = replayNormalized(
{
exchange: 'deribit',
symbols: ['BTC-PERPETUAL'],
from: '2019-05-01',
to: '2019-05-02'
},
normalizeTrades
)
const combinedStream = combine(bitmexMessages, deribitMessages)
// order at which messages have historically arrived is preserved
for await (const message of combinedStream) {
if (message.exchange === 'deribit') {
// process deribit trades
console.log(message)
}
if (message.exchange === 'bitmex') {
// process bitmex trades
console.log(message)
}
}
const { streamNormalized, normalizeTrades, combine } = require('tardis-dev')
const bitmexMessages = streamNormalized(
{
exchange: 'bitmex',
symbols: ['XBTUSD']
},
normalizeTrades
)
const deribitMessages = streamNormalized(
{
exchange: 'deribit',
symbols: ['BTC-PERPETUAL']
},
normalizeTrades
)
const combinedStream = combine(bitmexMessages, deribitMessages)
// messages are provided in FIFO order as they arrive
for await (const message of combinedStream) {
if (message.exchange === 'deribit') {
// process deribit trades
console.log(message)
}
if (message.exchange === 'bitmex') {
// process bitmex trades
console.log(message)
}
}
Try this code live on RunKit

Computing derived data locally

compute(iterator, ...computables)

Compute function allows computing various derived data locally via so called computables like:
If you're interested in adding custom computables like for example order book imbalance, volume imbalance, open interest or funding rate based bars please read "adding custom computable" section.
Compute function accepts async iterable producing normalized messages together withcomputables as a rest parameters and returns async iterable with normalized messages produced by provided iterable and additionally all computed messages based on provided computable functions. It computes and produces separate computed normalized messages for each symbol and exchange combination. When disconnect message is returned by provided async iterable it discards existing pending computables and starts from computing them from scratch.
const { streamNormalized, normalizeTrades, normalizeBookChanges,
compute, computeTradeBars, computeBookSnapshots } = require('tardis-dev')
const bitmexMessages = streamNormalized(
{
exchange: 'bitmex',
symbols: ['XBTUSD']
},
normalizeTrades,
normalizeBookChanges
)
const messagesWithComputedTypes = compute(
bitmexMessages,
// 10 seconds time bars
computeTradeBars({ kind: 'time', interval: 10 * 1000 }),
// top 20 levels 50 millisecond order book snapshots
computeBookSnapshots({ depth: 20, interval: 50 }),
// volume based trade bar - 1 million vol buckets
computeTradeBars({ kind: 'volume', interval: 1000 * 1000 })
)
for await (const message of messagesWithComputedTypes) {
if (message.type === 'book_snapshot' || message.type === 'trade_bar') {
console.log(message)
}
}
Try this code live on RunKit

computeTradeBars(options)

When provided to compute function, computes normalized trade_bar messages based on normalized trade data.
const { computeTradeBars } = require('tardis-dev')
computeTradeBars({ kind: 'volume', interval: 1000 })

compute trade bars options

name
type
default
kind
| 'time'
| 'volume'
| 'tick'
-
determines the way trades within a bar will be aggregated.
time - classic OHLC candles aggregated by time
volume - volume based trade bars agg by sum of trades amount tick - tick based trade bars, aggregated by trades count
interval
number
-
determines interval to aggregate by - for time based bars it's number of milliseconds, for volume based bars it's accumulated volume, for tick it's count of trades
name
string | undefined
undefined
optional custom name of trade_bar, if not specified computed name will be provided based on kind and interval options

type of message provided by computeTradeBars

{
type: 'trade_bar'
symbol: string
exchange: string
name: string
interval: number
kind: 'time' | 'volume' | 'tick'
open: number
high: number
low: number
close: number
volume: number
buyVolume: number
sellVolume: number
trades: number
vwap: number
openTimestamp: Date
closeTimestamp: Date
timestamp: Date
localTimestamp: Date
}

sample normalized trade_bar message

{
type: 'trade_bar',
symbol: 'XBTUSD',
exchange: 'bitmex',
name: 'trade_bar_10000ms',
interval: 10000,
kind: 'time',
open: 7623.5,
high: 7623.5,
low: 7623,
close: 7623.5,
volume: 37034,
buyVolume: 24244,
sellVolume: 12790,
trades: 9,
vwap: 7623.327320840309,
openTimestamp: 2019-10-25T13:11:31.574Z,
closeTimestamp: 2019-10-25T13:11:39.212Z,
localTimestamp: 2019-10-25T13:11:40.369Z,
timestamp: 2019-10-25T13:11:40.000Z,
}

computeBookSnapshots(options)

When provided to compute function, computes normalized book_snapshot messages based on normalized order book data. It produces new snapshots only if there is an actual change in order book state for requested depth.
const { computeBookSnapshots } = require('tardis-dev')
computeBookSnapshots({ depth: 20, interval: 50 })

compute book snapshots options

name
type
default
depth
number
-
number of closest bids and asks levels to provide snaphot for
interval
number
-
snapshot interval in milliseconds, if 0 is provided it computes snapshots real-time any time there is a change in order book state for requested depth
name
string | undefined
undefined
optional custom name of book_snapshot, if not specified computed name will be provided based on depth and interval options

type of message provided by computeBookSnaphots

{
type: 'book_snapshot'
symbol: string
exchange: string
name: string
depth: number
interval: number
bids: { price: number; amount: number }[]
asks: { price: number; amount: number }[]
timestamp: Date
localTimestamp: Date
}

sample normalized book_snapshot message

{
type: 'book_snapshot',
symbol: 'XBTUSD',
exchange: 'bitmex',
name: 'book_snapshot_2_50ms',
depth: 2,
interval: 50,
bids: [
{ price: 7633.5, amount: 1906067 },
{ price: 7633, amount: 65319 }
],
asks: [
{ price: 7634, amount: 1467849 },
{ price: 7634.5, amount: 67939 }
],
timestamp: 2019-10-25T13:39:46.950Z,
localTimestamp: 2019-10-25T13:39:46.961Z
}

Adding custom computable

Any computables provided to compute function need to be a factory functions with following signature:
() => Computable
where returned Computable object has following signature:
{
sourceDataTypes: string[]
compute(message: NormalizedData): IterableIterator<NormalizedData>
}
Computable.compute returned iterator is expected to provide objects that at least have fields as described in normalized data type section to play well with other tardis-dev functions like combine.

computeOrderBookImbalanceRatio()

Example implementation of custom computeOrderBookImbalanceRatio function that as a source data type uses book snapshots and based on it computes ratio of asks amounts (sell orders) to bids amounts (buy orders) for given book_snapshot depth. It may be used to determine relative buy or sell pressure.
type of messages produced by computeOrderBookImbalanceRatio
{
type: 'book_imbalance'
symbol: string
exchange: string
asksToBidsRatio: number
timestamp: Date
localTimestamp: Date
}

implementation of BookImbalanceRatioComputable computable and computeOrderBookImbalanceRatio factory function.

class BookImbalanceRatioComputable {
constructor() {
this.sourceDataTypes = ['book_snapshot']
}
*compute(bookSnapshot) {
let bidsAmount = 0
let asksAmount = 0
for (let i = 0; i < bookSnapshot.depth; i++) {
bidsAmount += bookSnapshot.bids[i].amount
asksAmount += bookSnapshot.asks[i].amount
}
const asksToBidsRatio = asksAmount / bidsAmount
yield {
type: 'book_imbalance',
symbol: bookSnapshot.symbol,
exchange: bookSnapshot.exchange,
asksToBidsRatio,
timestamp: bookSnapshot.timestamp,
localTimestamp: bookSnapshot.localTimestamp
}
}
}
const computeOrderBookImbalanceRatio = () => new BookImbalanceRatioComputable()

computeOrderBookImbalanceRatio usage example

Given implementation above we can compute book imbalance ratio for BitMEX real-time XBTUSD message stream. For this example we compute top 5 levels, 2 second book snapshots as a source to our custom computable. We need to have async iterable that produces book snapshots as a source to our book imbalance computable, hence two invocations of compute.
const { streamNormalized, normalizeBookChanges,
compute, computeBookSnapshots } = require('tardis-dev')
const bitmexMessages = streamNormalized(
{
exchange: 'bitmex',
symbols: ['XBTUSD']
},
normalizeBookChanges
)
const messagesWithBookSnapshots = compute(
bitmexMessages,
computeBookSnapshots({ depth: 5, interval: 2 * 1000 })
)
const messagesWithComputedData = compute(
messagesWithBookSnapshots,
computeOrderBookImbalanceRatio
)
for await (const message of messagesWithComputedData) {
if (message.type === 'book_imbalance') {
console.log(message)
}
}
Try this code live on RunKit

Examples

Real-time spread across multiple exchanges

Example showing how to very easy display real-time spread and best bid/ask info across multiple exchanges at once. It can be easily adapted to do the same for historical data (replayNormalized instead of streamNormalized).
const { streamNormalized, normalizeBookChanges, combine,
compute, computeBookSnapshots } = require('tardis-dev')
const exchangesToStream = [
{ exchange: 'bitmex', symbols: ['XBTUSD'] },
{ exchange: 'deribit', symbols: ['BTC-PERPETUAL'] },
{ exchange: 'cryptofacilities', symbols: ['PI_XBTUSD'] }
]
// for each specified exchange call streamNormalized for it
// so we have multiple real-time streams for all specified exchanges
const realTimeStreams = exchangesToStream.map(e => {
return streamNormalized(e, normalizeBookChanges)
})
// combine all real-time message streams into one
const messages = combine(...realTimeStreams)
// create book snapshots with depth1 that are produced
// every time best bid/ask info is changed
// effectively computing real-time quotes
const realTimeQuoteComputable = computeBookSnapshots({
depth: 1,
interval: 0,
name: 'realtime_quote'
})
// compute real-time quotes for combines real-time messages
const messagesWithQuotes = compute(messages, realTimeQuoteComputable)
const spreads = {}
// print spreads info every 100ms
setInterval(() => {
console.clear()
console.log(spreads)
}, 100)
// update spreads info real-time
for await (const message of messagesWithQuotes) {
if (message.type === 'book_snapshot') {
spreads[message.exchange] = {
spread: message.asks[0].price - message.bids[0].price,
bestBid: message.bids[0],
bestAsk: message.asks[0]
}
}
}
Try this code live on RunKit

Replay large historical trades across multiple exchanges

Example showing replaying large historical trades across multiple exchanges as those happened.
const { replayNormalized, normalizeTrades, combine } = require('tardis-dev')
const from = '2019-07-01'
const to = '2019-07-02'
const exchanges = [
{ exchange: 'bitmex', symbols: ['XBTUSD'] },
{ exchange: 'deribit', symbols: ['BTC-PERPETUAL'] },
{ exchange: 'cryptofacilities', symbols: ['PI_XBTUSD'] }
]
const historicalTrades = exchanges.map(exchange => {
return replayNormalized({ from, to, ...exchange }, normalizeTrades)
})
const LARGE_TRADE_THRESHOLD = 100 * 1000 // 100k contracts
for await (const trade of combine(...historicalTrades)) {
if (trade.amount >= LARGE_TRADE_THRESHOLD) {
console.log(trade.exchange, trade)
}
}
Try this code live on RunKit

Seamless switching between real-time streaming and historical market data replay

Example showing simple pattern of providing async iterable of market data messages to the function that can process them no matter if it's is real-time or historical market data. That effectively enables having the same 'data pipeline' for backtesting and live trading.
const { replayNormalized, streamNormalized, normalizeTrades,
compute, computeTradeBars } = require('tardis-dev')
const historicalMessages = replayNormalized(
{
exchange: 'bitmex',
symbols: ['XBTUSD'],
from: '2019-08-01',
to: '2019-08-02'
},
normalizeTrades
)
const realTimeMessages = streamNormalized(
{
exchange: 'bitmex',
symbols: ['XBTUSD']
},
normalizeTrades
)
async function produceVolumeBasedTradeBars(messages) {
const withVolumeTradeBars = compute(
messages,
computeTradeBars({
kind: 'volume',
interval: 100 * 1000 // aggregate by 100k contracts volume
})
)
for await (const message of withVolumeTradeBars) {
if (message.type === 'trade_bar') {
console.log(message.name, message)
}
}
}
produceVolumeBasedTradeBars(historicalMessages)
// or for real time data
// await produceVolumeBasedTradeBars(realTimeMessages)
Try this code live on RunKit

Real-time funding rate and open interest across multiple exchanges

Example showing how to quickly display real-time funding rate and open interest info across multiple exchanges at once.
const { streamNormalized, normalizeDerivativeTickers,
combine } = require('tardis-dev')
const exchangesToStream = [
{ exchange: 'bitmex', symbols: ['XBTUSD'] },
{ exchange: 'deribit', symbols: ['BTC-PERPETUAL'] },
{ exchange: 'cryptofacilities', symbols: ['PI_XBTUSD'] },
{ exchange: 'okex-swap', symbols: ['BTC-USD-SWAP'] },
{ exchange: 'binance-futures', symbols: ['BTCUSDT'] },
{ exchange: 'bitfinex-derivatives', symbols: ['BTCF0:USTF0'] }
]
const realTimeStreams = exchangesToStream.map(e => {
return streamNormalized(e, normalizeDerivativeTickers)
})
// combine all real-time message streams into one
const messages = combine(...realTimeStreams)
const funding = {}
// print funding info every 100ms
setInterval(() => {
console.clear()
console.log(new Date().toISOString(), funding)
}, 100)
// update funding info real-time
for await (const message of messages) {
if (message.type === 'derivative_ticker') {
funding[message.exchange] = {
symbol: message.symbol,
fundingRate: message.fundingRate,
lastPrice: message.lastPrice,
openInterest: message.openInterest,
markPrice: message.markPrice
}
}
}
Try this code live on RunKit

Saving historical funding, index and open interest data to CSV file

Example showing how to write Deribit exchange historical funding, index and open interest data into CSV.
const { replayNormalized, normalizeDerivativeTickers } = require('tardis-dev')
const fs = require('fs')
const csv = require('fast-csv')
const { once } = require('events')
const fileStream = fs.createWriteStream('./deribit_funding.csv')
const csvStream = csv.format({ headers: true })
csvStream.pipe(fileStream)
const messages = replayNormalized(
{
exchange: 'deribit',
from: '2019-04-01',
to: '2019-04-02',
symbols: ['BTC-PERPETUAL']
},
normalizeDerivativeTickers
)
for await (const message of messages) {
if (message.type === 'derivative_ticker') {
const ok = csvStream.write({
fundingRate: message.fundingRate,
lastPrice: message.lastPrice,
openInterest: message.openInterest,
indexPrice: message.indexPrice,
timestamp: message.timestamp.toISOString()
})
if (!ok) {
await once(csvStream, 'drain')
}
}
}
Try this code live on RunKit

Computing simple moving average of volume based trade bars

Example showing implementation of SimpleMovingAverageComputable that calculates average of trade bar closes prices for specified rolling window in incremental way. It uses CircularBuffer under the hood.
class CircularBuffer {
constructor(_bufferSize) {
this._bufferSize = _bufferSize
this._buffer = []
this._index = 0
}
append(value) {
const isFull = this._buffer.length === this._bufferSize
let poppedValue
if (isFull) {
poppedValue = this._buffer[this._index]
}
this._buffer[this._index] = value
this._index = (this._index + 1) % this._bufferSize
return poppedValue
}
*items() {
for (let i = 0; i < this._buffer.length; i++) {
const index = (this._index + i) % this._buffer.length
yield this._buffer[index]
}
}
get count() {
return this._buffer.length
}
}
class SimpleMovingAverageComputable {
constructor({ periods }) {
this.sourceDataTypes = ['trade_bar']
this._average = 0
this._circularBuffer = new CircularBuffer(periods)
}
*compute(tradeBar) {
const result = this._circularBuffer.append(tradeBar.close)
const poppedVal = result !== undefined ? result : this._average
const increment = (tradeBar.close - poppedVal) / this._circularBuffer.count
this._average = this._average + increment
yield {
type: 'sma',
symbol: tradeBar.symbol,
exchange: tradeBar.exchange,
name: `sma_${tradeBar.name}`,
average: this._average,
interval: tradeBar.interval,
kind: tradeBar.kind,
timestamp: tradeBar.timestamp,
localTimestamp: tradeBar.localTimestamp
}
}
}
const computeSimpleMovingAverages = options => () =>
new SimpleMovingAverageComputable(options)

Usage

const { streamNormalized, normalizeTrades,
compute, computeTradeBars } = require('tardis-dev')
const messages = streamNormalized(
{
exchange: 'bitmex',
symbols: ['XBTUSD']
},
normalizeTrades
)
const withTradeBars = compute(
messages,
computeTradeBars({
kind: 'volume',
interval: 1000
})
)
const withSimpleMovingAverage = compute(
withTradeBars,
computeSimpleMovingAverages({ period: 5 })
)
for await (message of withSimpleMovingAverage) {
if (message.type === 'sma') {
console.log(message)
}
}
Try this code live on RunKit