# Normalization

Data normalization allows consuming market data feeds from various exchanges in consistent format.

`tardis-dev` has following built-in normalizers that can be provided to [`replayNormalized`](https://docs.tardis.dev/replaying-historical-data#replaynormalized-options-...normalizers) or [`streamNormalized`](https://docs.tardis.dev/streaming-real-time-data#streamnormalized-options-...normalizers) functions:

* [`normalizeTrades`](#normalizetrades) - provides normalized `trade` data
* [`normalizeBookChanges`](#normalizebookchanges) - provides normalized [`book_change`](#normalizebookchanges) data
* [`normalizeDerivativeTickers`](#normalizederivativetickers) - provides normalized funding, index and mark price data
* [`normalizeBookTickers`](#normalizebooktickers) - provides normalized [`book_ticker`](#normalizebooktickers) (best bid/ask) data from exchange-native BBO feeds
* [`normalizeLiquidations`](#normalizeliquidations) - provides normalized `liquidation` data
* [`normalizeOptionsSummary`](#normalizeoptionssummary) - provides normalized `option_summary` data (greeks, IV, bid/ask for options)

If you're interested in how exactly data is mapped from [exchange-native format](https://docs.tardis.dev/faq/data#what-is-a-difference-between-exchange-native-and-normalized-data-format) to normalized one, please follow code in [`tardis-dev` GitHub repository](https://github.com/tardis-dev/tardis-node/tree/master/src/mappers) for each exchange and if you determined that mapping should be done differently please read "[modifying built-in and adding custom normalizers](#modifying-built-in-and-adding-custom-normalizers)" section.

```javascript
import { streamNormalized, normalizeTrades, normalizeBookChanges, normalizeDerivativeTickers } from 'tardis-dev'

// or replayNormalized to replay normalized historical data
const messages = streamNormalized(
  {
    exchange: 'binance-futures',
    symbols: ['BTCUSDT']
  },
  normalizeTrades,
  normalizeBookChanges,
  normalizeDerivativeTickers
)

for await (const message of messages) {
  if (message.type === 'book_change') {
    // process normalized book change
  }
  if (message.type === 'trade') {
    // process normalized trade
  }
  if (message.type === 'derivative_ticker') {
    // process normalized derivative_ticker
  }
}
```

### `normalizeTrades`

When passed as an arg to [`replayNormalized`](https://docs.tardis.dev/replaying-historical-data#replaynormalized-options-...normalizers) or [`streamNormalized`](https://docs.tardis.dev/streaming-real-time-data#streamnormalized-options-...normalizers), it provides normalized `trade` data for all supported exchanges.

{% tabs %}
{% tab title="sample message" %}

```javascript
{
  type: 'trade',
  symbol: 'BTCUSDT',
  exchange: 'binance',
  id: '3445374963',
  price: 61130.98,
  amount: 0.00145,
  side: 'sell',
  timestamp: 2024-02-29T23:59:59.998Z,
  localTimestamp: 2024-03-01T00:00:00.001Z
}
```

{% endtab %}

{% tab title="type definition" %}

```typescript
{
  type: 'trade'
  symbol: string // instrument symbol as provided by exchange
  exchange: string // exchange id
  id: string | undefined // trade id if provided by exchange
  price: number // trade price as provided by exchange
  amount: number // trade amount as provided by exchange
  side: 'buy' | 'sell' | 'unknown' // liquidity taker side (aggressor)
  timestamp: Date // trade timestamp provided by exchange
  localTimestamp: Date // message arrival timestamp
}
```

{% endtab %}
{% endtabs %}

### `normalizeBookChanges`

When passed as an arg to [`replayNormalized`](https://docs.tardis.dev/replaying-historical-data#replaynormalized-options-...normalizers) or [`streamNormalized`](https://docs.tardis.dev/streaming-real-time-data#streamnormalized-options-...normalizers), it provides normalized `book_change` data for all supported exchanges.

Provides initial [L2 (market by price)](https://docs.tardis.dev/faq/order-books#what-l2-order-book-data-can-be-used-for) order book snapshots (`isSnapshot=true`) plus incremental updates for each order book change. Please note that `amount` is the updated amount at that price level, not a delta. An `amount` of `0` indicates the price level can be removed.

{% tabs %}
{% tab title="sample message" %}

```javascript
{
  type: 'book_change',
  symbol: 'BTCUSDT',
  exchange: 'binance',
  isSnapshot: false,
  bids: [{ price: 61141.09, amount: 2.02515 }],
  asks: [{ price: 61141.1, amount: 4.09368 }],
  timestamp: 2024-03-01T00:00:01.489Z,
  localTimestamp: 2024-03-01T00:00:01.491Z
}
```

{% endtab %}

{% tab title="type definition" %}

```typescript
{
  type: 'book_change'
  symbol: string // instrument symbol as provided by exchange
  exchange: string // exchange id
  isSnapshot: boolean // if true marks initial order book snapshot
  bids: { price: number; amount: number }[] // updated bids price-amount levels
  asks: { price: number; amount: number }[] // updated asks price-amount levels
  timestamp: Date // order book update timestamp if provided by exchange,
                  // otherwise equals to localTimestamp
  localTimestamp: Date // message arrival timestamp
}
```

{% endtab %}
{% endtabs %}

When processing `book_change` updates: an `amount` of `0` means remove that price level. If you receive a removal for a price level not in your local book, ignore it. Snapshot levels may include zero-amount entries — these should also be treated as absent levels.

### `normalizeBookTickers`

When passed as an arg to [`replayNormalized`](https://docs.tardis.dev/replaying-historical-data#replaynormalized-options-...normalizers) or [`streamNormalized`](https://docs.tardis.dev/streaming-real-time-data#streamnormalized-options-...normalizers), it provides normalized `book_ticker` data — top of the book (best bid/ask) data from exchanges' native best bid/offer channels (e.g., Binance `bookTicker`, Bybit `orderbook.1`). Unlike `quote`, which is derived from L2 order book data via [`computeBookSnapshots`](#computebooksnapshots-options), `book_ticker` is sourced from the native exchange-provided WebSocket best bid/offer feed.

Because it is a standalone feed, `book_ticker` replay can start from any point in time, whereas `quote` requires starting from 00:00 UTC (when the initial L2 order book snapshot is provided).

{% tabs %}
{% tab title="sample message" %}

```javascript
{
  type: 'book_ticker',
  symbol: 'BTCUSDT',
  exchange: 'binance-futures',
  askPrice: 63125.5,
  askAmount: 2.5,
  bidPrice: 63125.4,
  bidAmount: 1.8,
  timestamp: 2024-01-15T10:30:00.123Z,
  localTimestamp: 2024-01-15T10:30:00.234Z
}
```

{% endtab %}

{% tab title="type definition" %}

```typescript
{
  type: 'book_ticker'
  symbol: string // instrument symbol as provided by exchange
  exchange: string // exchange id
  askPrice: number | undefined // best ask price
  askAmount: number | undefined // best ask amount
  bidPrice: number | undefined // best bid price
  bidAmount: number | undefined // best bid amount
  timestamp: Date // message timestamp provided by exchange
  localTimestamp: Date // message arrival timestamp
}
```

{% endtab %}
{% endtabs %}

### `normalizeDerivativeTickers`

When passed as an arg to [`replayNormalized`](https://docs.tardis.dev/replaying-historical-data#replaynormalized-options-...normalizers) or [`streamNormalized`](https://docs.tardis.dev/streaming-real-time-data#streamnormalized-options-...normalizers), it provides normalized `derivative_ticker` data for supported exchanges that trade derivative instruments.

{% tabs %}
{% tab title="sample message" %}

```javascript
{
  type: 'derivative_ticker',
  symbol: 'BTCUSDT',
  exchange: 'binance-futures',
  lastPrice: 61131.0,
  openInterest: 1458236000,
  fundingRate: 0.0001,
  indexPrice: 61130.9,
  markPrice: 61131.1,
  timestamp: 2024-03-01T00:00:00.250Z,
  localTimestamp: 2024-03-01T00:00:00.251Z
}
```

{% endtab %}

{% tab title="type definition" %}

```typescript
{
  type: 'derivative_ticker'
  symbol: string // instrument symbol as provided by exchange
  exchange: string // exchange id
  lastPrice: number | undefined // last instrument price if provided by exchange
  openInterest: number | undefined // last open interest if provided by exchange
  fundingRate: number | undefined // last funding rate if provided by exchange
  indexPrice: number | undefined // last index price if provided by exchange
  markPrice: number | undefined // last mark price if provided by exchange
  fundingTimestamp: Date | undefined // next funding event timestamp if provided by exchange
  predictedFundingRate: number | undefined // predicted next funding rate if provided by exchange
  timestamp: Date // message timestamp provided by exchange
  localTimestamp: Date // message arrival timestamp
}
```

{% endtab %}
{% endtabs %}

### `normalizeLiquidations`

When passed as an arg to [`replayNormalized`](https://docs.tardis.dev/replaying-historical-data#replaynormalized-options-...normalizers) or [`streamNormalized`](https://docs.tardis.dev/streaming-real-time-data#streamnormalized-options-...normalizers), it provides normalized `liquidation` data for exchanges that publish liquidation events. See [which exchanges support liquidations](https://docs.tardis.dev/faq/data#which-exchanges-support-liquidations-data-type).

{% tabs %}
{% tab title="sample message" %}

```javascript
{
  type: 'liquidation',
  symbol: 'BTCUSDT',
  exchange: 'binance-futures',
  id: '2045830192',
  price: 61110.5,
  amount: 0.25,
  side: 'sell',
  timestamp: 2024-03-01T00:05:12.123Z,
  localTimestamp: 2024-03-01T00:05:12.124Z
}
```

{% endtab %}

{% tab title="type definition" %}

```typescript
{
  type: 'liquidation'
  symbol: string // instrument symbol as provided by exchange
  exchange: string // exchange id
  id: string | undefined // liquidation id if provided by exchange
  price: number // liquidation price
  amount: number // liquidation amount
  side: 'buy' | 'sell' | 'unknown' // liquidation side
  timestamp: Date // message timestamp provided by exchange; some exchanges do not provide a separate liquidation timestamp, in which case this equals localTimestamp
  localTimestamp: Date // message arrival timestamp
}
```

{% endtab %}
{% endtabs %}

### `normalizeOptionsSummary`

When passed as an arg to [`replayNormalized`](https://docs.tardis.dev/replaying-historical-data#replaynormalized-options-...normalizers) or [`streamNormalized`](https://docs.tardis.dev/streaming-real-time-data#streamnormalized-options-...normalizers), it provides normalized `option_summary` data for exchanges that provide options data (e.g., Deribit, OKX Options, Binance Options). Includes greeks, implied volatility, and best bid/ask.

{% tabs %}
{% tab title="sample message" %}

```javascript
{
  type: 'option_summary',
  symbol: 'BTC-28JUN24-70000-C',
  exchange: 'deribit',
  optionType: 'call',
  strikePrice: 70000,
  expirationDate: 2024-06-28T08:00:00.000Z,
  bestBidPrice: 0.035,
  bestBidAmount: 5,
  bestBidIV: 0.55,
  bestAskPrice: 0.04,
  bestAskAmount: 10,
  bestAskIV: 0.58,
  lastPrice: 0.0375,
  openInterest: 150,
  markPrice: 0.0372,
  markIV: 0.565,
  delta: 0.25,
  gamma: 0.00002,
  vega: 45.5,
  theta: -15.2,
  rho: 0.05,
  underlyingPrice: 63500,
  underlyingIndex: 'BTC-USD',
  timestamp: 2024-01-15T10:30:00.123Z,
  localTimestamp: 2024-01-15T10:30:00.234Z
}
```

{% endtab %}

{% tab title="type definition" %}

```typescript
{
  type: 'option_summary'
  symbol: string // instrument symbol as provided by exchange
  exchange: string // exchange id
  optionType: 'put' | 'call' // option type
  strikePrice: number // strike price
  expirationDate: Date // option expiration date
  bestBidPrice: number | undefined // best bid price
  bestBidAmount: number | undefined // best bid amount
  bestBidIV: number | undefined // best bid implied volatility
  bestAskPrice: number | undefined // best ask price
  bestAskAmount: number | undefined // best ask amount
  bestAskIV: number | undefined // best ask implied volatility
  lastPrice: number | undefined // last trade price
  openInterest: number | undefined // open interest
  markPrice: number | undefined // mark price
  markIV: number | undefined // mark implied volatility
  delta: number | undefined // delta greek
  gamma: number | undefined // gamma greek
  vega: number | undefined // vega greek
  theta: number | undefined // theta greek
  rho: number | undefined // rho greek
  underlyingPrice: number | undefined // underlying asset price
  underlyingIndex: string // underlying index name
  timestamp: Date // message timestamp provided by exchange
  localTimestamp: Date // message arrival timestamp
}
```

{% endtab %}
{% endtabs %}

### `disconnect` message

When [`replayNormalized`](https://docs.tardis.dev/replaying-historical-data#replaynormalized-options-...normalizers) or [`streamNormalized`](https://docs.tardis.dev/streaming-real-time-data#streamnormalized-options-...normalizers) have the `withDisconnectMessages` flag set to `true` and a disconnect event occurs (e.g., WebSocket connection close), a `disconnect` message is returned.

{% tabs %}
{% tab title="sample message" %}

```typescript
{
  type: 'disconnect',
  exchange: 'binance',
  localTimestamp: 2024-03-01T00:00:00.001Z
}
```

{% endtab %}

{% tab title="type definition" %}

```typescript
{
  type: 'disconnect'
  exchange: string // exchange id
  localTimestamp: Date // timestamp when disconnect occurred
  symbols: string[] | undefined
}
```

{% endtab %}
{% endtabs %}

### Modifying built-in and adding custom normalizers

In `tardis-dev` data normalization is implemented via `normalize` factory functions provided to [`replayNormalized`](https://docs.tardis.dev/replaying-historical-data#replaynormalized-options-...normalizers) and [`streamNormalized`](https://docs.tardis.dev/streaming-real-time-data#streamnormalized-options-...normalizers) functions. This design gives lots of flexibility by allowing replacing, extending and modifying built-in normalizers or adding new ones for new normalized data types without the need of forking the whole library.

Any normalize function provided to [`replayNormalized`](https://docs.tardis.dev/replaying-historical-data#replaynormalized-options-...normalizers) and [`streamNormalized`](https://docs.tardis.dev/streaming-real-time-data#streamnormalized-options-...normalizers) functions needs to have following signature:

```
(exchange: string, localTimestamp: Date) => Mapper
```

`Exchange` is the exchange id for which a mapper object must be returned, `localTimestamp` is the date for which the mapper is created (recreated after each disconnection). In most cases `localTimestamp` is not necessary for anything, but in certain cases like for example exchange API change it can be used to switch to different mapping logic like using new data channel that wasn't available until certain date.

Returned `Mapper` object has following signature:

```typescript
{
  canHandle: (message: any) => boolean

  map(message: any, localTimestamp: Date): IterableIterator<Data> | undefined

  getFilters: (symbols?: string[]) => {channel: string, symbols?: string[]}[]
}
```

On every disconnection event, normalize factory functions are called again to provide new `Mapper` objects with clean state if required (stateful mapping needs to persist metadata such as price level IDs and reset for each new connection). If a mapper object is stateful, it is required to always return a new clean state object from the normalize factory function or reset its state in one way or another.

Normalized data returned by [`iterable iterator`](https://developer.mozilla.org/pl/docs/Web/JavaScript/Guide/Iterators_and_Generators) of `Mapper.map` method is expected to have a shape that has at least fields as described in [normalized data type section](#normalized-data-type) below to play well with other `tardis-dev` functions like [`combine`](#combining-data-streams) or [`compute`](#computing-derived-data-locally).

#### normalized data type

```typescript
{
  type: string
  symbol: string
  exchange: string
  timestamp: Date
  localTimestamp: Date
  name? : string | undefined
}
```

#### Adding custom `normalizeLiquidations` normalizer

Example implementation of a custom `normalizeLiquidations` function that normalizes liquidations data for `deribit` exchange. Implementations for other exchanges are left as an exercise for the reader.

**type of messages provided by `normalizeLiquidations`**

```typescript
{
  type: 'liquidation'
  symbol: string
  exchange: string
  side: 'buy' | 'sell'
  amount: number
  price: number
  timestamp: Date
  localTimestamp: Date
}
```

implementation of `deribitLiquidations` mapper and `normalizeLiquidations`

```javascript
// object that maps from deribit trades real-time channel to our custom
// liquidations messages if determines that trade was caused by liquidation

const deribitLiquidationsMapper = {

  // function that given message in deribit native format
  // https://docs.deribit.com/v2/#trades-instrument_name-interval
  // determines if such message can be mapped/normalized
  canHandle(message) {
    const params = message.params
    const channel = params !== undefined ? params.channel : undefined
    if (channel === undefined) {
      return false
    }
    return (
      channel.startsWith('trades') &&
      params.data.some(trade => trade.liquidation !== undefined)
    )
  },

  // given symbols returns filters that are provided
  // as filters to replay & stream functions options
  // in our case we're interested in trades deribit channel
  // but it could be multiple channels in certain scenarions as well
  getFilters(symbols) {
    return [
      {
        channel: 'trades',
        symbols
      }
    ]
  },

  // map message that was determined that is can be handled via canHandle
  // to normalized liquidation message
  *map(message, localTimestamp) {
    for (const deribitLiquidationTrade of message.params.data) {
      if (deribitLiquidationTrade.liquidation === undefined) {
        continue
      }

      yield {
        type: 'liquidation',
        symbol: deribitLiquidationTrade.instrument_name,
        exchange: 'deribit',
        price: deribitLiquidationTrade.price,
        amount: deribitLiquidationTrade.amount,
        side: deribitLiquidationTrade.direction,
        timestamp: new Date(deribitLiquidationTrade.timestamp),
        localTimestamp: localTimestamp
      }
    }
  }
}

// provides factory function that given exchange returns mapper for it
// if such mapper if implemented
// in our case deribitLiquidationsMapper is stateless so we can return the same
// instance every time

const normalizeLiquidations = (exchange, localTimestamp) => {
  if (exchange === 'deribit') {
    return deribitLiquidationsMapper
  }
  throw new Error(`normalizeLiquidations: ${exchange} not supported`)
}
```

**`normalizeLiquidations` usage example**

We could as well provide the same `normalizeLiquidations` function to [`streamNormalized`](https://docs.tardis.dev/streaming-real-time-data#streamnormalized-options-...normalizers) function or use it together it with other normalizers (`normalizeTrades` etc.).

```javascript
import { replayNormalized } from 'tardis-dev'

const liquidations = replayNormalized(
  {
    exchange: 'deribit',
    symbols: ['BTC-PERPETUAL'],
    from: '2024-03-01',
    to: '2024-03-02'
  },
  normalizeLiquidations
)

for await (const liquidation of liquidations) {
  console.log(liquidation)
}
```

#### Changing `normalizeTrades` for Binance exchange

Let's assume that default normalization of Binance exchange trades data doesn't fit our use case and we need to use [`@aggTrade`](https://github.com/binance-exchange/binance-official-api-docs/blob/master/web-socket-streams.md#aggregate-trade-streams) stream as a source of trade data instead of the default [`@trade`](https://github.com/binance-exchange/binance-official-api-docs/blob/master/web-socket-streams.md#trade-streams) stream.

```javascript
// add custom binance trades mapper that used @aggTrade stream as source of
// normalized trades
const customBinanceTradesMapper = {
  canHandle(message) {
    return message.stream && message.stream.endsWith('@aggTrade')
  },

  getFilters(symbols) {
    if (symbols !== undefined) {
      symbols = symbols.map(s => s.toLocaleLowerCase())
    }
    // binance api expects all symbols to be lower cased
    return [
      {
        channel: 'aggTrade',
        symbols
      }
    ]
  },

  *map(message, localTimestamp) {
    const binanceAggTrade = message.data

    yield {
      type: 'trade',
      symbol: binanceAggTrade.s,
      exchange: 'binance',
      id: String(binanceAggTrade.a),
      price: Number(binanceAggTrade.p),
      amount: Number(binanceAggTrade.q),
      side: binanceAggTrade.m ? 'sell' : 'buy',
      timestamp: new Date(binanceAggTrade.T),
      localTimestamp: localTimestamp
    }
  }
}

// add new normalize function that only for binance exchange
// uses custom trades mapper and defaults to default normalizeTrades
// for other exchanges

const normalizeTradesWithBinancePatch = (exchange, localTimestamp) => {
  if (exchange === 'binance') {
    return customBinanceTradesMapper
  }
  return normalizeTrades(exchange, localTimestamp)
}
```

**`normalizeTradesWithBinancePatch` usage example**

```javascript
import { streamNormalized } from 'tardis-dev'

const messages = streamNormalized(
  {
    exchange: 'binance',
    symbols: ['btcusdt']
  },
  normalizeTradesWithBinancePatch
)

for await (const message of messages) {
  console.log(message)
}
```

## Limit order book reconstruction

`tardis-dev` exports `OrderBook` class that, when instantiated, can process normalized [`book_change`](#normalizebookchanges) messages with order book snapshots and incremental updates and allows maintaining full local order book (level 2 - aggregated market-by-price) state both for real-time data and for reconstructing historical order book state at any past point in time. It waits for the first `book_change` message that is a snapshot (`isSnapshot = true`) and then applies subsequent updates to it. A single `orderBook` object can maintain order book state only for a single symbol/instrument. It uses a [Red-Black tree](https://github.com/vadimg/js_bintrees) data structure under the hood to efficiently maintain its local state in sorted order.

{% tabs %}
{% tab title="historical order book reconstruction" %}

```javascript
import { replayNormalized, normalizeBookChanges, OrderBook } from 'tardis-dev'

const books = {
  BTCUSDT: new OrderBook(),
  ETHUSDT: new OrderBook()
}

const messages = replayNormalized(
  {
    exchange: 'binance-futures',
    symbols: ['BTCUSDT', 'ETHUSDT'],
    from: '2024-03-01',
    to: '2024-03-02'
  },
  normalizeBookChanges
)

for await (const message of messages) {
  const orderBook = books[message.symbol]
  if (message.type === 'book_change') {
    orderBook.update(message)
  }
  const timestamp = message.localTimestamp.toISOString()
  // print best bid/ask for every exchange tick
  console.log(timestamp, orderBook.bestAsk(), orderBook.bestBid())
}
```

{% endtab %}

{% tab title="maintaining order book for real-time stream" %}

```javascript
import { streamNormalized, normalizeBookChanges, OrderBook } from 'tardis-dev'

const books = {
  BTCUSDT: new OrderBook(),
  ETHUSDT: new OrderBook()
}

const messages = streamNormalized(
  {
    exchange: 'binance-futures',
    symbols: ['BTCUSDT', 'ETHUSDT']
  },
  normalizeBookChanges
)

for await (const message of messages) {
  const orderBook = books[message.symbol]
  if (message.type === 'book_change') {
    orderBook.update(message)
  }
  const timestamp = message.localTimestamp.toISOString()
  // print best bid/ask for every exchange tick
  console.log(timestamp, orderBook.bestAsk(), orderBook.bestBid())
}
```

{% endtab %}
{% endtabs %}

#### OrderBook constructor options

`new OrderBook()` accepts an optional options object:

| name                        | type                | default   | description                                                                                                                                                     |
| --------------------------- | ------------------- | --------- | --------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| **`removeCrossedLevels`**   | boolean (optional)  | undefined | when set to `true`, automatically detects and removes crossed levels (where best bid >= best ask) that can occur when exchanges fail to publish delete messages |
| **`onCrossedLevelRemoved`** | function (optional) | undefined | optional callback `(bookChange, bestBidBefore, bestBidAfter, bestAskBefore, bestAskAfter) => void` invoked whenever a crossed level is removed                  |

### `orderBook.update(bookChange)`

Processes normalized [`book_change`](#normalizebookchanges) messages to update its internal local state that maintains ordered bids and asks sets. It ignores any non-snapshot `book_change` messages before an initial snapshot is received. It should be called for every `book_change` message received for the symbol for which you'd like to reconstruct the order book.

```javascript
orderBook.update({
  type: 'book_change',
  symbol: 'BTCUSDT',
  exchange: 'binance-futures',
  isSnapshot: false,
  bids: [],
  asks: [{ price: 7985, amount: 283318 }],
  timestamp: new Date(),
  localTimestamp: new Date()
})
```

### `orderBook.bestBid()`

Returns [book price level object](#book-price-level-type) for highest bid order (best bid) in order book or `undefined` if book doesn't have any bids (not initialized yet with initial snapshot).

```javascript
const { price, amount } = orderBook.bestBid()
```

### `orderBook.bestAsk()`

Returns [book price level object](#book-price-level-type) for lowest ask order (best ask) in order book or `undefined` if book doesn't have any asks (not initialized yet with initial snapshot).

```javascript
const { price, amount } = orderBook.bestAsk()
```

### `orderBook.asks()`

Returns [iterable iterator](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Guide/Iterators_and_Generators) of [book price level](#book-price-level-type) objects for all **asks** available ordered from the lowest to highest ask.

```typescript
for (const ask of orderBook.asks()) {
    // process asks levels one by one from lowest to highest without
    // creating in memory array for all levels
}

const orderedAsksArray = Array.from(orderBook.asks())
```

### `orderBook.bids()`

Returns [iterable iterator](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Guide/Iterators_and_Generators) of [book price level](#book-price-level-type) objects for all **bids** available ordered from highest to lowest bid.

```typescript
for (const bid of orderBook.bids()) {
    // process bid levels one by one from highest to lowest without
    // creating in memory array for all levels
}

const orderedBidsArray = Array.from(orderBook.bids())
```

#### book price level type

```typescript
{
  price: number
  amount: number
}
```

## Utility functions

### `filter(asyncIterable, fn)`

Filters an async iterable, returning only messages that match the predicate function.

```javascript
import { replayNormalized, normalizeTrades, filter } from 'tardis-dev'

const messages = replayNormalized(
  { exchange: 'binance', from: '2024-03-01', to: '2024-03-02', symbols: ['btcusdt'] },
  normalizeTrades
)
const largeTrades = filter(messages, (trade) => trade.amount >= 10000)

for await (const trade of largeTrades) {
  console.log(trade)
}
```

### `uniqueTradesOnly(options?)`

Returns a predicate function for use with [`filter()`](#filter-asynciterable-fn) that removes duplicate trades (same trade ID) and optionally skips stale trades.

**Options**

| Name                        | Type                     | Default | Description                                                                                 |
| --------------------------- | ------------------------ | ------- | ------------------------------------------------------------------------------------------- |
| `maxWindow`                 | `number`                 | `500`   | Maximum number of trade IDs to track per symbol for deduplication                           |
| `skipStaleOlderThanSeconds` | `number`                 | —       | If set, skips trades where `localTimestamp - timestamp` exceeds this threshold (in seconds) |
| `onDuplicateFound`          | `(trade: Trade) => void` | —       | Optional callback invoked when a duplicate or stale trade is detected                       |

```javascript
import { replayNormalized, normalizeTrades, filter, uniqueTradesOnly } from 'tardis-dev'

const messages = replayNormalized(
  { exchange: 'binance', from: '2024-03-01', to: '2024-03-02', symbols: ['btcusdt'] },
  normalizeTrades
)
const uniqueTrades = filter(messages, uniqueTradesOnly())

for await (const trade of uniqueTrades) {
  console.log(trade)
}
```

**With options**

```javascript
const uniqueTrades = filter(messages, uniqueTradesOnly({
  maxWindow: 1000,
  skipStaleOlderThanSeconds: 30,
  onDuplicateFound: (trade) => console.warn('Duplicate trade:', trade.id)
}))
```

## Combining data streams

### `combine(...iterators)`

[`Combine`](#combining-data-streams) function given multiple `async iterators` combines them into single one. That allows synchronized historical market data replay and consolidated streaming of real-time data for multiple exchanges via single [`for await ...of`](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Statements/for-await...of) loop.

Accepts `async iterables` of [normalized messages](https://docs.tardis.dev/node-client/normalization) as [rest parameters](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Functions/rest_parameters) and combines them returning single `async iteratable`.

For historical data replay it combines input `async iterables` messages by sorting them by `localTimestamp` in ascending order, this allows synchronized/ordered market data replay for multiple exchanges.

For real-time market data streaming it combines input `async iterables` messages in FIFO order by writing them into a shared Node.js object-mode stream (`PassThrough`) as they arrive.

{% tabs %}
{% tab title="combining historical market data from multiple exchanges" %}

```javascript
import { replayNormalized, normalizeTrades, combine } from 'tardis-dev'

const binanceMessages = replayNormalized(
  {
    exchange: 'binance',
    symbols: ['btcusdt'],
    from: '2024-03-01',
    to: '2024-03-02'
  },
  normalizeTrades
)

const binanceFuturesMessages = replayNormalized(
  {
    exchange: 'binance-futures',
    symbols: ['BTCUSDT'],
    from: '2024-03-01',
    to: '2024-03-02'
  },
  normalizeTrades
)

const combinedStream = combine(binanceMessages, binanceFuturesMessages)

// order at which messages have historically arrived is preserved
for await (const message of combinedStream) {
  if (message.exchange === 'binance-futures') {
    // process binance futures trades
    console.log(message)
  }

  if (message.exchange === 'binance') {
    // process binance spot trades
    console.log(message)
  }
}
```

{% endtab %}

{% tab title="combining real-time stream of market data from multiple exchanges" %}

```javascript
import { streamNormalized, normalizeTrades, combine } from 'tardis-dev'

const binanceMessages = streamNormalized(
  {
    exchange: 'binance',
    symbols: ['btcusdt']
  },
  normalizeTrades
)

const binanceFuturesMessages = streamNormalized(
  {
    exchange: 'binance-futures',
    symbols: ['BTCUSDT']
  },
  normalizeTrades
)

const combinedStream = combine(binanceMessages, binanceFuturesMessages)

// messages are provided in FIFO order as they arrive
for await (const message of combinedStream) {
  if (message.exchange === 'binance-futures') {
    // process binance futures trades
    console.log(message)
  }

  if (message.exchange === 'binance') {
    // process binance spot trades
    console.log(message)
  }
}
```

{% endtab %}
{% endtabs %}

## Computing derived data locally

### `compute(iterator, ...computables)`

[`Compute`](#computing-derived-data-locally) function allows computing various derived data locally via so called [`computables`](#adding-custom-computable) like:

* [`computeTradeBars`](#computetradebars-options) - computes various trade bars (OHLC, volume based bars, tick based bars) based on [normalized trade](#normalizetrades) data
* [`computeBookSnapshots`](#computebooksnapshots-options) - computes various order book snapshots based on normalized [order book data](#normalizebookchanges)

If you're interested in adding custom [`computables`](#adding-custom-computable) like for example order book imbalance, volume imbalance, open interest or funding rate based bars please read ["adding custom computable"](#adding-custom-computable) section.

[`Compute`](#computing-derived-data-locally) function accepts an `async iterable` producing [normalized messages](https://docs.tardis.dev/node-client/normalization) together with [`computables`](#adding-custom-computable) as [rest parameters](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Functions/rest_parameters), and returns an `async iterable` with normalized messages produced by the provided iterable plus all computed messages based on provided [`computable`](#adding-custom-computable) functions. It computes and produces separate normalized computed messages for each symbol and exchange combination. When a [`disconnect message`](#disconnect-message) is returned by the provided `async iterable`, it discards existing pending computables and starts computing them from scratch.

```javascript
import { streamNormalized, normalizeTrades, normalizeBookChanges, compute, computeTradeBars, computeBookSnapshots } from 'tardis-dev'

const binanceFuturesMessages = streamNormalized(
  {
    exchange: 'binance-futures',
    symbols: ['BTCUSDT']
  },
  normalizeTrades,
  normalizeBookChanges
)

const messagesWithComputedTypes = compute(
  binanceFuturesMessages,
  // 10 seconds time bars
  computeTradeBars({ kind: 'time', interval: 10 * 1000 }),
  // top 20 levels 50 millisecond order book snapshots
  computeBookSnapshots({ depth: 20, interval: 50 }),
  // volume based trade bar - 1 million vol buckets
  computeTradeBars({ kind: 'volume', interval: 1000 * 1000 })
)

for await (const message of messagesWithComputedTypes) {
  if (message.type === 'book_snapshot' || message.type === 'trade_bar') {
    console.log(message)
  }
}
```

### `computeTradeBars(options)`

When provided to [`compute`](#computing-derived-data-locally) function, it computes normalized [`trade_bar`](#type-of-message-provided-by-computetradebars) messages based on [normalized trade](#normalizetrades) data.

```javascript
import { computeTradeBars } from 'tardis-dev'

computeTradeBars({ kind: 'volume', interval: 1000 })
```

#### compute trade bars options

| name         | type                              | default   | description                                                                                                                                                                                                                                                 |
| ------------ | --------------------------------- | --------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| **kind**     | `'time'`, `'volume'`, or `'tick'` | -         | determines the way trades within a bar will be aggregated. `time` creates classic OHLC candles aggregated by time. `volume` creates volume-based trade bars aggregated by the sum of trades `amount`. `tick` creates trade bars aggregated by trades count. |
| **interval** | number                            | -         | determines interval to aggregate by - for time based bars it's number of milliseconds, for volume based bars it's accumulated volume, for tick it's count of trades                                                                                         |
| **name**     | string (optional)                 | undefined | optional custom name of `trade_bar`, if not specified computed name will be provided based on kind and interval options                                                                                                                                     |

#### type of message provided by `computeTradeBars`

```typescript
{
  type: 'trade_bar'
  symbol: string // instrument symbol as provided by exchange
  exchange: string // exchange id
  name: string // name with format trade_bar_{interval}
  interval: number // requested trade bar interval
  kind: 'time' | 'volume' | 'tick' // trade bar kind
  open: number // open price
  high: number // high price
  low: number // low price
  close: number // close price
  volume: number // total volume traded in given interval
  buyVolume: number // buy volume traded in given interval
  sellVolume: number // sell volume traded in given interval
  trades: number // trades count in given interval
  vwap: number // volume weighted average price
  openTimestamp: Date // timestamp of first trade for given bar
  closeTimestamp: Date // timestamp of last trade for given bar
  timestamp: Date // end of interval period timestamp
  localTimestamp: Date // message arrival timestamp
                       // that triggered given bar computation
}
```

#### sample normalized `trade_bar` message

```javascript
{
  type: 'trade_bar',
  symbol: 'BTCUSDT',
  exchange: 'binance-futures',
  name: 'trade_bar_10000ms',
  interval: 10000,
  kind: 'time',
  open: 61130.98,
  high: 61142.3,
  low: 61129.75,
  close: 61138.12,
  volume: 12.845,
  buyVolume: 7.421,
  sellVolume: 5.424,
  trades: 18,
  vwap: 61135.84,
  openTimestamp: 2024-03-01T00:00:00.112Z,
  closeTimestamp: 2024-03-01T00:00:09.881Z,
  localTimestamp: 2024-03-01T00:00:10.004Z,
  timestamp: 2024-03-01T00:00:10.000Z,
}
```

### `computeBookSnapshots(options)`

When provided to [`compute`](#computing-derived-data-locally) function, computes normalized [`book_snapshot`](#type-of-message-provided-by-computebooksnaphots) messages based on normalized [order book data](#normalizebookchanges). It produces new snapshots only if there is an actual change in order book state for requested `depth`.

```javascript
import { computeBookSnapshots } from 'tardis-dev'

computeBookSnapshots({ depth: 20, interval: 50 })
```

#### compute book snapshots options

| name                      | type                | default   | description                                                                                                                                                                                                                             |
| ------------------------- | ------------------- | --------- | --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| **depth**                 | number              | -         | number of closest bids and asks levels to provide snaphot for                                                                                                                                                                           |
| **interval**              | number              | -         | snapshot interval in milliseconds, if `0` is provided it computes snapshots real-time any time there is a change in order book state for requested `depth`                                                                              |
| **name**                  | string (optional)   | undefined | optional custom name of `book_snapshot`, if not specified computed name will be provided based on depth and interval options                                                                                                            |
| **grouping**              | number (optional)   | undefined | when provided, aggregates order book price levels into groups of the specified price increment (e.g., `10` groups all levels within each $10 range into a single level). Bids are floored and asks are ceiled to the grouping boundary. |
| **removeCrossedLevels**   | boolean (optional)  | undefined | when set to `true`, automatically detects and removes crossed levels (where best bid >= best ask) that can occur when exchanges fail to publish delete messages                                                                         |
| **onCrossedLevelRemoved** | function (optional) | undefined | optional callback `(bookChange, bestBidBefore, bestBidAfter, bestAskBefore, bestAskAfter) => void` invoked whenever a crossed level is removed                                                                                          |

#### type of message provided by `computeBookSnaphots`

```typescript
{
  type: 'book_snapshot'
  symbol: string // instrument symbol as provided by exchange
  exchange: string // exchange id
  name: string // name with format book_snapshot_{depth}_{interval}{time_unit}
  depth: number // requested number of levels (top bids/asks)
  interval: number // requested snapshot interval in milliseconds
  bids: { price: number | undefined; amount: number | undefined }[] // top "depth" bids price-amount levels
  asks: { price: number | undefined; amount: number | undefined }[] // top "depth" asks price-amount levels
  grouping: number | undefined // price grouping increment, if grouping was requested
  timestamp: Date // snapshot timestamp based on last book_change message
                  // processed timestamp adjusted to snapshot interval
  localTimestamp: Date // message arrival timestamp
                       // that triggered snapshot
}
```

#### sample normalized `book_snapshot` message

```javascript
{
  type: 'book_snapshot',
  symbol: 'BTCUSDT',
  exchange: 'binance-futures',
  name: 'book_snapshot_2_50ms',
  depth: 2,
  interval: 50,
  bids: [
    { price: 61141.09, amount: 2.02515 },
    { price: 61141.08, amount: 0.55321 }
  ],
  asks: [
    { price: 61141.1, amount: 4.09368 },
    { price: 61141.11, amount: 1.20457 }
  ],
  timestamp: 2024-03-01T00:00:01.450Z,
  localTimestamp: 2024-03-01T00:00:01.491Z
}
```

### Adding custom `computable`

Any `computables` provided to [`compute function`](#computing-derived-data-locally) need to be factory functions with following signature:

```
() => Computable
```

where returned `Computable` object has following signature:

```typescript
{
  sourceDataTypes: string[]
  compute(message: NormalizedData): IterableIterator<NormalizedData>
}
```

`Computable.compute` returned iterator is expected to provide objects that at least have fields as described in [normalized data type section](#normalized-data-type) to play well with other `tardis-dev` functions like [`combine`](#combining-data-streams).

#### `computeOrderBookImbalanceRatio()`

Example implementation of custom `computeOrderBookImbalanceRatio` function that as a source data type uses book snapshots and based on it computes ratio of asks amounts (sell orders) to bids amounts (buy orders) for given [`book_snapshot`](#computebooksnapshots-options) depth. It may be used to determine relative buy or sell pressure.

**type of messages produced by** `computeOrderBookImbalanceRatio`

```typescript
{
  type: 'book_imbalance'
  symbol: string
  exchange: string
  asksToBidsRatio: number
  timestamp: Date
  localTimestamp: Date
}
```

#### implementation of `BookImbalanceRatioComputable` `computable` and `computeOrderBookImbalanceRatio` factory function.

```javascript
class BookImbalanceRatioComputable {
  constructor() {
    this.sourceDataTypes = ['book_snapshot']
  }

  *compute(bookSnapshot) {
    let bidsAmount = 0
    let asksAmount = 0

    for (let i = 0; i < bookSnapshot.depth; i++) {
      bidsAmount += bookSnapshot.bids[i].amount
      asksAmount += bookSnapshot.asks[i].amount
    }

    const asksToBidsRatio = asksAmount / bidsAmount
    yield {
      type: 'book_imbalance',
      symbol: bookSnapshot.symbol,
      exchange: bookSnapshot.exchange,
      asksToBidsRatio,
      timestamp: bookSnapshot.timestamp,
      localTimestamp: bookSnapshot.localTimestamp
    }
  }
}

const computeOrderBookImbalanceRatio = () => new BookImbalanceRatioComputable()
```

#### `computeOrderBookImbalanceRatio` **usage example**

Given implementation above we can compute book imbalance ratio for Binance Futures real-time `BTCUSDT` message stream. For this example we compute top 5 levels, 2 second book snapshots as a source to our custom computable. We need to have `async iterable` that produces book snapshots as a source to our book imbalance computable, hence two invocations of `compute.`

```javascript
import { streamNormalized, normalizeBookChanges, compute, computeBookSnapshots } from 'tardis-dev'

const binanceFuturesMessages = streamNormalized(
  {
    exchange: 'binance-futures',
    symbols: ['BTCUSDT']
  },
  normalizeBookChanges
)

const messagesWithBookSnapshots = compute(
  binanceFuturesMessages,
  computeBookSnapshots({ depth: 5, interval: 2 * 1000 })
)

const messagesWithComputedData = compute(
  messagesWithBookSnapshots,
  computeOrderBookImbalanceRatio
)

for await (const message of messagesWithComputedData) {
  if (message.type === 'book_imbalance') {
    console.log(message)
  }
}
```

## Examples

### Real-time spread across multiple exchanges

Example showing how to display real-time spread and best bid/ask info across multiple exchanges at once. It can be easily adapted to do the same for historical data ([`replayNormalized`](https://docs.tardis.dev/replaying-historical-data#replaynormalized-options-...normalizers) instead of [`streamNormalized`](https://docs.tardis.dev/streaming-real-time-data#streamnormalized-options-...normalizers)).

```javascript
import { streamNormalized, normalizeBookChanges, combine, compute, computeBookSnapshots } from 'tardis-dev'

const exchangesToStream = [
  { exchange: 'binance', symbols: ['btcusdt'] },
  { exchange: 'binance-futures', symbols: ['BTCUSDT'] },
  { exchange: 'bybit', symbols: ['BTCUSDT'] }
]

// for each specified exchange call streamNormalized for it
// so we have multiple real-time streams for all specified exchanges
const realTimeStreams = exchangesToStream.map(e => {
  return streamNormalized(e, normalizeBookChanges)
})

// combine all real-time message streams into one
const messages = combine(...realTimeStreams)

// create book snapshots with depth1 that are produced
// every time best bid/ask info is changed
// effectively computing real-time quotes
const realTimeQuoteComputable = computeBookSnapshots({
  depth: 1,
  interval: 0,
  name: 'realtime_quote'
})

// compute real-time quotes for combines real-time messages
const messagesWithQuotes = compute(messages, realTimeQuoteComputable)

const spreads = {}

// print spreads info every 100ms
setInterval(() => {
  console.clear()
  console.log(spreads)
}, 100)

// update spreads info real-time
for await (const message of messagesWithQuotes) {
  if (message.type === 'book_snapshot') {
    spreads[message.exchange] = {
      spread: message.asks[0].price - message.bids[0].price,
      bestBid: message.bids[0],
      bestAsk: message.asks[0]
    }
  }
}
```

### Replay large historical trades across multiple exchanges

Example showing replaying large historical trades across multiple exchanges as those happened.

```javascript
import { replayNormalized, normalizeTrades, combine } from 'tardis-dev'

const from = '2024-03-01'
const to = '2024-03-02'
const exchanges = [
  { exchange: 'binance', symbols: ['btcusdt'] },
  { exchange: 'binance-futures', symbols: ['BTCUSDT'] },
  { exchange: 'bybit', symbols: ['BTCUSDT'] }
]

const historicalTrades = exchanges.map(exchange => {
  return replayNormalized({ from, to, ...exchange }, normalizeTrades)
})

const LARGE_TRADE_THRESHOLD = 100 * 1000 // 100k  contracts

for await (const trade of combine(...historicalTrades)) {
  if (trade.amount >= LARGE_TRADE_THRESHOLD) {
    console.log(trade.exchange, trade)
  }
}
```

### Seamless switching between real-time streaming and historical market data replay

Example showing a simple pattern of providing an `async iterable` of market data messages to a function that can process them whether it is real-time or historical market data. That effectively enables having the same data pipeline for backtesting and live trading.

```javascript
import { replayNormalized, streamNormalized, normalizeTrades, compute, computeTradeBars } from 'tardis-dev'

const historicalMessages = replayNormalized(
  {
    exchange: 'binance-futures',
    symbols: ['BTCUSDT'],
    from: '2024-03-01',
    to: '2024-03-02'
  },
  normalizeTrades
)

const realTimeMessages = streamNormalized(
  {
    exchange: 'binance-futures',
    symbols: ['BTCUSDT']
  },
  normalizeTrades
)

async function produceVolumeBasedTradeBars(messages) {
  const withVolumeTradeBars = compute(
    messages,
    computeTradeBars({
      kind: 'volume',
      interval: 100 * 1000 // aggregate by 100k contracts volume
    })
  )

  for await (const message of withVolumeTradeBars) {
    if (message.type === 'trade_bar') {
      console.log(message.name, message)
    }
  }
}

produceVolumeBasedTradeBars(historicalMessages)

// or for real time data
//  await produceVolumeBasedTradeBars(realTimeMessages)
```

### Real-time funding rate and open interest across multiple exchanges

Example showing how to quickly display real-time funding rate and open interest info across multiple exchanges at once.

```javascript
import { streamNormalized, normalizeDerivativeTickers, combine } from 'tardis-dev'

const exchangesToStream = [
  { exchange: 'binance-futures', symbols: ['BTCUSDT'] },
  { exchange: 'bybit', symbols: ['BTCUSDT'] },
  { exchange: 'okex-swap', symbols: ['BTC-USD-SWAP'] },
  { exchange: 'deribit', symbols: ['BTC-PERPETUAL'] },
  { exchange: 'bitfinex-derivatives', symbols: ['BTCF0:USTF0'] }
]

const realTimeStreams = exchangesToStream.map(e => {
  return streamNormalized(e, normalizeDerivativeTickers)
})

// combine all real-time message streams into one
const messages = combine(...realTimeStreams)

const funding = {}

// print funding info every 100ms
setInterval(() => {
  console.clear()
  console.log(new Date().toISOString(), funding)
}, 100)

// update funding info real-time
for await (const message of messages) {
  if (message.type === 'derivative_ticker') {
    funding[message.exchange] = {
      symbol: message.symbol,
      fundingRate: message.fundingRate,
      lastPrice: message.lastPrice,
      openInterest: message.openInterest,
      markPrice: message.markPrice
    }
  }
}
```

### Saving historical funding, index and open interest data to CSV file

Example showing how to write Deribit exchange historical funding, index and open interest data into CSV.

```javascript
import { replayNormalized, normalizeDerivativeTickers } from 'tardis-dev'
import fs from 'fs'
import csv from 'fast-csv'
import { once } from 'events'
const fileStream = fs.createWriteStream('./deribit_funding.csv')
const csvStream = csv.format({ headers: true })
csvStream.pipe(fileStream)

const messages = replayNormalized(
  {
    exchange: 'deribit',
    from: '2024-03-01',
    to: '2024-03-02',
    symbols: ['BTC-PERPETUAL']
  },
  normalizeDerivativeTickers
)

for await (const message of messages) {
  if (message.type === 'derivative_ticker') {
    const ok = csvStream.write({
      fundingRate: message.fundingRate,
      lastPrice: message.lastPrice,
      openInterest: message.openInterest,
      indexPrice: message.indexPrice,
      timestamp: message.timestamp.toISOString()
    })

    if (!ok) {
      await once(csvStream, 'drain')
    }
  }
}
```

### Computing simple moving average of volume based trade bars

Example showing implementation of `SimpleMovingAverageComputable` that calculates average of trade bar closes prices for specified rolling window in incremental way. It uses `CircularBuffer` under the hood.

```javascript
class CircularBuffer {
  constructor(_bufferSize) {
    this._bufferSize = _bufferSize
    this._buffer = []
    this._index = 0
  }
  append(value) {
    const isFull = this._buffer.length === this._bufferSize
    let poppedValue
    if (isFull) {
      poppedValue = this._buffer[this._index]
    }
    this._buffer[this._index] = value
    this._index = (this._index + 1) % this._bufferSize
    return poppedValue
  }
  *items() {
    for (let i = 0; i < this._buffer.length; i++) {
      const index = (this._index + i) % this._buffer.length
      yield this._buffer[index]
    }
  }
  get count() {
    return this._buffer.length
  }
}

class SimpleMovingAverageComputable {
  constructor({ periods }) {
    this.sourceDataTypes = ['trade_bar']
    this._average = 0
    this._circularBuffer = new CircularBuffer(periods)
  }

  *compute(tradeBar) {
    const result = this._circularBuffer.append(tradeBar.close)
    const poppedVal = result !== undefined ? result : this._average
    const increment = (tradeBar.close - poppedVal) / this._circularBuffer.count
    this._average = this._average + increment

    yield {
      type: 'sma',
      symbol: tradeBar.symbol,
      exchange: tradeBar.exchange,
      name: `sma_${tradeBar.name}`,
      average: this._average,
      interval: tradeBar.interval,
      kind: tradeBar.kind,
      timestamp: tradeBar.timestamp,
      localTimestamp: tradeBar.localTimestamp
    }
  }
}

const computeSimpleMovingAverages = options => () =>
  new SimpleMovingAverageComputable(options)
```

#### Usage

```javascript
import { streamNormalized, normalizeTrades, compute, computeTradeBars } from 'tardis-dev'

const messages = streamNormalized(
  {
    exchange: 'binance-futures',
    symbols: ['BTCUSDT']
  },
  normalizeTrades
)

const withTradeBars = compute(
  messages,
  computeTradeBars({
    kind: 'volume',
    interval: 1000
  })
)
const withSimpleMovingAverage = compute(
  withTradeBars,
  computeSimpleMovingAverages({ periods: 5 })
)

for await (const message of withSimpleMovingAverage) {
  if (message.type === 'sma') {
    console.log(message)
  }
}
```

## Environment variables (advanced)

The following environment variables configure streaming and network behavior. Network variables (`HTTP_PROXY`, `SOCKS_PROXY`) apply to both real-time streaming and historical data downloads. Exchange credentials and WebSocket URL overrides apply only to real-time streaming.

#### Network

| Name                     | Description                                                                                                                                              |
| ------------------------ | -------------------------------------------------------------------------------------------------------------------------------------------------------- |
| **`HTTP_PROXY`**         | HTTP/HTTPS proxy URL for all outgoing requests                                                                                                           |
| **`SOCKS_PROXY`**        | SOCKS proxy URL (used if `HTTP_PROXY` is not set)                                                                                                        |
| **`WSS_URL_<EXCHANGE>`** | Override the default WebSocket URL for a specific exchange. Exchange name is uppercased with dashes replaced by underscores (e.g., `WSS_URL_BINANCE_US`) |

#### Exchange credentials

Required for exchanges that need authentication to access real-time WebSocket feeds.

| Name                                        | Description                                                                                |
| ------------------------------------------- | ------------------------------------------------------------------------------------------ |
| **`OKX_API_KEY`**                           | OKX API key — required for restricted real-time channels such as `books-l2-tbt` (VIP-only) |
| **`OKX_API_SECRET_KEY`**                    | OKX API secret key                                                                         |
| **`OKX_API_PASSPHRASE`**                    | OKX API passphrase                                                                         |
| **`OKX_API_VIP_5`**                         | Set to `true` if OKX account has VIP 5+ tier — enables access to `books-l2-tbt` channel    |
| **`OKX_API_COLO`**                          | Set to `true` if using OKX colocation — enables access to `books-l2-tbt` channel           |
| **`DERIBIT_API_CLIENT_ID`**                 | Deribit API client ID — required for authenticated real-time channels                      |
| **`DERIBIT_API_CLIENT_SECRET`**             | Deribit API client secret                                                                  |
| **`COINBASE_API_KEY`**                      | Coinbase API key — required for authenticated WebSocket feeds                              |
| **`COINBASE_API_SECRET`**                   | Coinbase API secret                                                                        |
| **`COINBASE_API_PASSPHRASE`**               | Coinbase API passphrase                                                                    |
| **`COINBASE_INTERNATIONAL_API_KEY`**        | Coinbase International API key                                                             |
| **`COINBASE_INTERNATIONAL_API_SECRET`**     | Coinbase International API secret                                                          |
| **`COINBASE_INTERNATIONAL_API_PASSPHRASE`** | Coinbase International API passphrase                                                      |

#### Binance rate limit tuning

Control rate limiting for Binance depth snapshot requests and `openInterest` REST polling during real-time streaming. Variable names use the exchange name uppercased with dashes replaced by underscores (e.g., `BINANCE_FUTURES_REQUEST_WEIGHT_LIMIT`).

| Name                                               | Default  | Applies to     | Description                                                           |
| -------------------------------------------------- | -------- | -------------- | --------------------------------------------------------------------- |
| **`<EXCHANGE>_REQUEST_WEIGHT_LIMIT`**              | from API | depth + OI     | Override the request weight limit per minute                          |
| **`<EXCHANGE>_MIN_AVAILABLE_WEIGHT_BUFFER`**       | auto     | depth + OI     | Minimum request weight buffer before throttling kicks in              |
| **`<EXCHANGE>_CONCURRENCY_LIMIT`**                 | `4`      | depth          | Number of concurrent depth snapshot requests                          |
| **`<EXCHANGE>_SNAPSHOTS_DELAY_MS`**                | none     | depth          | Delay in ms between individual depth snapshot requests                |
| **`<EXCHANGE>_OPEN_INTEREST_POLLING_INTERVAL_MS`** | `5000`   | `openInterest` | Minimum polling interval in ms for generated `openInterest` snapshots |
