A Websocket streaming serving NuBot
General description
NuBot can connect to a streaming server (if available) using a websocket and listen and react to commands received from the Server. In this scenario we have two kinds of actors : the server (Streamer) and the clients (Subscriber).
Streamer details
- The Streamer is a stand-alone runnable jar that listen on a predetermined port
- The Streamer read options from an option file
- The Streamer main scope is to monitor the exchange rate of a list of currencies vs USD
- The list of price feeds is defined in currencies.csv as in NuBot
- For each currency, at least 3 price feeds are required : 1 mainFeed and 2 or more backup Feeds.
- For each currency, a new socket is opened separately
- For each currency, The Streamer keeps in memory two prices for each currency : the lastPriceFetched from feeds, and the lastPriceSent to Subscribers
- For each currency, the Stream have a background task that fetches prices from the external price feeds with the highest possible frequency that won’t break the remote API limits.
- For each currency, everytime a new price is read from remote feeds (different from the previous lastPriceFetched stored in memory), if that price deviates more than a defined threshold from lastPriceSent, the Streamer should stream out a shiftOrders command, after performing sanityChecks
- Each time the shiftOrders command is sent, the Streamer should also compute and stream suggested priced based on a predefined spread
- The Streamer responds to ping? with pong! on a determined socket
- On the first execution of Streamer, the first price fetched should be sent out to Subscribers for all tracked currencies,
- The Streamer should be fail safe : in case sanityCheck fails and priceFeeds are not responsive, it should transmit a standard to Subscribers with an accurate description of the problem.
Subscriber details
- The Subscriber is integrated within NuBot's logic
- NuBot exposes a configuration parameter that points at the remote price server.
- NuBot exposes a configuration parameter that allows to bypass the Streamer : (boolean flag), default false. When true, NuBot behaves as in v<0.3.0 using PriceMonitorTriggerTask. When false, PriceMonitorTriggerTask is not used at all.
- The first time the Subscriber connects to the Streamer, there is a handshake procedure specified in next subsection.
- The Subscriber connects to the Streamer websocket(s), listen , parse and react to commands.
- The Subscriber constantly checks the availability of the Streamer by sending pings.
- The Subscriber independently verifies the prices before shifting walls. In case the deviation is too high (Settings.MAX_DEVIANCE_REMOTEFEED % ), it switches to local price tracking mechanism.
- If price received from Streamer passes the client’s sanityCheck, then the StrategyTask.notifyPriceChanged is triggered
- The Subscriber is fail safe : in case Pusher sends a command with errors, they must be handled properly.
Subscriber/Streamer connection handshake
The Subscriber has an open socket on a port [mainport] set via the streamer.properties
file that accepts incoming connections from clients.
The Subscriber has a socket for keepalive ping!pong? open on [mainport]-1 = [pingport]. The Streamer will respond with a pong!
to ping?
reqeusts.
The Subscriber has a pair of sockets open for each currency being tracked. [currencyPort] it is used to stream price updates, while [initialCurrencyPort] it is used for first time connections or disconnections. [initialCurrencyPort] = [currencyPort] + 100.
sample Ports Map
pingport:5555
mainport:5556
BTCfeed:7561, BTCFirstTimePort:7661, BTCToken:BTC_df106d
EURfeed:8563, EURFirstTimePort:8663, EURToken:EUR_ee19f1
PPCfeed:9562, PPCFirstTimePort:9662, PPCToken:PPC_70e82f
Handshake:
0. Subscriber checks connection
Subscriber check connectivity with Streamer by sending a message on [pingport]
ping?
If alive, Streamer responds with
pong!
1. Subscriber requests port and token
In order to obtain the port number of a specific currency, Subscriber must send a message to Streamer on [mainport] formed as following :
<uniqueSessionId> <token> <currencycode>
The token is an authorization code. In case of the official streamer, get in touch with development team to request one.
2. Streamer respond with information
If all information is correct, the Streamer will respond with a initSocket message containing two key parameter as args: The [currencyPort] and a [currencyToken] that must be used to request prices.
3. Subscriber request price stream
Subcriber sends a message to [initialCurrencyPort] formed as following :
<uniqueSessionId> <currencyToken> start
4. Streamer respond with last price
If all information is correct, the Streamer will respond with a shiftWalls message containing the latest price
5. Subscriber subscribes
Subscribers connects to [currencyPort] waiting for shiftWalls messages
7. Streamer closes connection
On shutdown, Subscriber must communicate an interruption signal to Streamer on [initialCurrencyPort]:
<uniqueSessionId> <currencyToken> stop
Overview
Overview of the process
Auth token
TBD
Messaging protocol
Most messages sent from Streamer to Subscriber are JSON message formed as follows:
{
"command": "cmdString",
"commandArgs": [
"arg1",
"arg2",
...
],
"error": "error description. Empty string if ok",
"attachments": {},
"serverTimestamp": 21231232123,
"token": ""
}
refer to com.nubits.nubotstream.AbstractStreamMessage.java for the bean implementation.
NOTE: There are exceptions to the direction of the connection and the format of message exchanged, for authentication, ping, and first time message.
Commands
shiftOrders
The shiftOrders command indicate to Subscribers that it is time to shift the orders.
It has 8 args :
args[0] = currencyCode; //Pegged currency. The three letter uppercase currency code (BTC,PPC,EUR,JPY….)
args[1] = midprice; //The price of 1 unit [expressed in USD]
args[2] = suggestedSellPricePEG; //A suggested** sell price [expressed in Pegged currency]
args[3] = suggestedBuyPricePEG; //A suggested** buy price [expressed in Pegged currency]
args[4] = suggestedSellPriceUSD; //A suggested** sell price [expressed in USD]
args[5] = suggestedBuyPriceUSD; //A suggested** buy price [expressed in USD]
args[6] = sourceName; //the name of the source feed
args[7] = shiftThreshold; //the threshold being used [expressed in absolute percentage]
args[8] = spread; //the spread being used in recommended prices [expressed in absolute percentage]
All prices have a 8 decimal digits precision when available. Rounding is done using RoundingMode.HALF_DOWN .
**The suggested prices are computed used a pre-determined formula containing the default spread
refer to com.nubits.nubotstream.ShiftOrdersMessage.java for the bean implementation.
Sample shiftOrders command message :
{
"command": "shiftWalls",
"args": [
"btcnbt",
"237.99504",
"237.04496",
"0.00421859",
"0.00420175",
"237.52",
"bitfinex"
],
"errorMessage": "",
"token": "",
"serverTimestamp": "2015-05-22 09:25:31",
"attachments": []
}
initSocket
The initSocket command indicate to Subscriber meta information about the socket to use for the specific currency
It has 2 args :
args[0] = port; //the specific port for the currency
args[1] = token; //the specific token for the currency
In case of error read the description.
refer to com.nubits.nubotstream.InitSocketMessage.java for the bean implementation.
Sample shiftOrders command message :
{
"command": "initSocket",
"args": [
"5566",
"TOksDASuhd"
],
"errorMessage": "",
"token": "",
"serverTimestamp": "2015-05-22 09:25:31",
"attachments": []
}