/*
* Copyright (C) 2015 Nu Development Team
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
*/
package com.nubits.nubot.tasks;
import com.nubits.nubot.bot.Global;
import com.nubits.nubot.bot.NuBotConnectionException;
import com.nubits.nubot.bot.SessionManager;
import com.nubits.nubot.global.Constant;
import com.nubits.nubot.global.Settings;
import com.nubits.nubot.models.BidAskPair;
import com.nubits.nubot.models.LastPrice;
import com.nubits.nubot.notifications.GitterNotifications;
import com.nubits.nubot.notifications.MailNotifications;
import com.nubits.nubot.pricefeeds.FeedQuality;
import com.nubits.nubot.pricefeeds.PriceBatch;
import com.nubits.nubot.pricefeeds.PriceFeedManager;
import com.nubits.nubot.strategy.Secondary.StrategySecondaryPegTask;
import com.nubits.nubot.trading.TradeUtils;
import com.nubits.nubot.utils.FilesystemUtils;
import com.nubits.nubot.utils.Utils;
import io.evanwong.oss.hipchat.v2.rooms.MessageColor;
import org.json.simple.JSONArray;
import org.json.simple.JSONObject;
import org.json.simple.parser.JSONParser;
import org.json.simple.parser.ParseException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.*;
/**
* A task for monitoring prices and triggering actions
*/
public class PriceMonitorTriggerTask extends TimerTask {
private static final int REFRESH_OFFSET = 1000; //this is how close to the refresh interval is considered a fail (millisecond)
private static final Logger LOG = LoggerFactory.getLogger(PriceMonitorTriggerTask.class.getName());
private static int SLEEP_COUNT = 0;
private final int MOVING_AVERAGE_SIZE = 30; //this is how many elements the Moving average queue holds
/**
* threshold for signaling a deviation of prices
*/
private final double DISTANCE_TRESHHOLD = Settings.PRICE_DISTANCE_MAX_PERCENT; //Expressed in absolute percentage
private final int MAX_ATTEMPTS = 5;
protected PriceFeedManager pfm = null;
//set up a Queue to hold the prices used to calculate the moving average of prices
protected Queue<Double> queueMA = new LinkedList<>();
protected LastPrice lastPrice;
protected ArrayList<LastPrice> lastPrices;
private double wallchangeThreshold;
//options
private double sellPriceUSD, buyPriceUSD;
private String pegPriceDirection;
private LastPrice currentWallPEGPrice;
private boolean wallsBeingShifted = false;
private BidAskPair bidask;
private StrategySecondaryPegTask strategy = null;
private int count;
private boolean isFirstTimeExecution = true;
public static String wallshiftsFilePathCSV = Global.sessionPath + "/" + Settings.WALLSHIFTS_FILENAME + ".csv";
public static String wallshiftsFilePathJSON = Global.sessionPath + "/" + Settings.WALLSHIFTS_FILENAME + ".json";
private static String emailHistory = "";
private Long currentTime = null;
private boolean first = true;
private final int PR = Settings.DEFAULT_PRECISION;
public void init() {
Utils.initSupplementaryLogFiles(wallshiftsFilePathCSV, wallshiftsFilePathJSON);
}
public void setPriceFeedManager(PriceFeedManager pfm) {
this.pfm = pfm;
}
@Override
public void run() {
if (SessionManager.sessionInterrupted()) return; //external interruption
LOG.debug("Executing " + this.getClass());
if (first) {
LOG.info("running PriceMonitorTrigger for first time");
init();
first = false;
}
//if a problem occurred we sleep for a period using the SLEEP_COUNTER
if (SLEEP_COUNT > 0) {
LOG.error("error occurred. sleep " + SLEEP_COUNT);
SLEEP_COUNT--;
currentTime = System.currentTimeMillis();
return;
}
if (SessionManager.sessionInterrupted()) return; //external interruption
//take a note of the current time.
//sudden changes in price can cause the bot to re-request the price data repeatedly
// until the moving average is within 10% of the reported price.
//we don't want that process to take longer than the price refresh interval
currentTime = System.currentTimeMillis();
LOG.debug("Executing task : PriceMonitorTriggerTask ");
if (pfm == null || strategy == null) {
LOG.error("PriceMonitorTriggerTask task needs a PriceFeedManager and a Strategy to work. Please assign it before running it");
} else {
count = 1;
try {
executeUpdatePrice(count);
if (SessionManager.sessionInterrupted()) return; //external interruption
} catch (FeedPriceException e) {
LOG.error("" + e);
sendErrorNotification();
Global.trade.clearOrders(Global.options.getPair());
if (SessionManager.sessionInterrupted()) return; //external interruption
}
}
}
private void initStrategy(double peg_price) throws NuBotConnectionException {
if (SessionManager.sessionInterrupted()) return; //external interruption
Global.conversion = peg_price; //used then for liquidity info
//Compute the buy/sell prices in USD
BidAskPair usdPrices = TradeUtils.computeUSDPrices(peg_price);
String message = "Computing USD prices with sellOffset:" + Global.options.getBookSellOffset() + "$ and buyOffset:" + Global.options.getBookBuyOffset() + " : buy @ " + usdPrices.getAsk();
if (Global.options.isDualSide()) {
message += " buy @ " + usdPrices.getBid();
}
LOG.info(message);
sellPriceUSD = usdPrices.getAsk();
buyPriceUSD = usdPrices.getBid();
BidAskPair pegPrices = TradeUtils.computePEGPrices(usdPrices, peg_price);
//store first value
this.bidask = new BidAskPair(pegPrices.getBid(), pegPrices.getAsk());
String message2 = "Converted price (using 1 " + Global.options.getPair().getPaymentCurrency().getCode() + " = " + peg_price + " USD)"
+ " : sell @ " + pegPrices.getAsk() + " " + Global.options.getPair().getPaymentCurrency().getCode() + "";
if (Global.options.isDualSide()) {
message2 += "; buy @ " + pegPrices.getBid() + " " + Global.options.getPair().getPaymentCurrency().getCode();
}
LOG.info(message2);
if (SessionManager.sessionInterrupted()) return; //external interruption
//Assign prices
StrategySecondaryPegTask secTask = (StrategySecondaryPegTask) Global.taskManager.getSecondaryPegTask().getTask();
if (!Global.swappedPair) {
secTask.setBuyPricePEG(pegPrices.getBid());
secTask.setSellPricePEG(pegPrices.getAsk());
} else {
secTask.setBuyPricePEG(pegPrices.getAsk());
secTask.setSellPricePEG(pegPrices.getBid());
}
//Start/restart strategy
if (!Global.taskManager.getSecondaryPegTask().isRunning()) {
Global.taskManager.getSecondaryPegTask().start();
} else {
secTask.setIsFirstTime(true);
}
//Send email notification
String title = " production (" + Global.options.getExchangeName() + ") [" + pfm.getPair().toString() + "] price tracking started";
String tldr = pfm.getPair().getOrderCurrency().getCode().toUpperCase() + " price tracking started at " + peg_price + " " + pfm.getPair().getPaymentCurrency().getCode().toUpperCase() + ".\n"
+ "Will send a new mail notification everytime the price of " + pfm.getPair().getOrderCurrency().getCode().toUpperCase() + " changes more than " + Global.options.getWallchangeThreshold() + "%.";
MailNotifications.send(Global.options.getMailRecipient(), title, tldr);
}
private void executeUpdatePrice(int countTrials) throws FeedPriceException {
if (SessionManager.sessionInterrupted()) return; //external interruption
if (countTrials <= MAX_ATTEMPTS) {
if (SessionManager.sessionInterrupted()) return; //external interruption
pfm.fetchLastPrices();
PriceBatch batch = pfm.getPriceBatch();
LOG.debug("CheckLastPrice received values from " + batch.size() + "/" + pfm.getFeedList().size() + " available remote feeds. ");
boolean gotall = batch.size() == pfm.getFeedList().size();
LastPrice goodPrice = null;
if (gotall) {
goodPrice = FeedQuality.handleNormal(DISTANCE_TRESHHOLD, batch);
} else {
try {
goodPrice = FeedQuality.handleFail(DISTANCE_TRESHHOLD, batch);
this.updateLastPrice(goodPrice, batch);
} catch (Exception e) {
unableToUpdatePrice();
}
}
if (goodPrice != null) {
this.updateLastPrice(goodPrice, batch);
} else {
unableToUpdatePrice();
}
} else {
//Tried more than three times without success
throw new FeedPriceException("The price has failed updating more than " + MAX_ATTEMPTS + " times in a row");
}
}
private void unableToUpdatePrice() {
count++;
try {
Thread.sleep(count * 60 * 1000);
} catch (InterruptedException ex) {
LOG.error(ex.toString());
}
try {
executeUpdatePrice(count);
} catch (FeedPriceException ex) {
LOG.error(ex.toString());
}
}
private void notify(String subject, String logMessage, String notification, MessageColor notificationColor) {
LOG.error(logMessage);
LOG.error("Notifying HipChat");
GitterNotifications.sendMessage(notification);
LOG.error("Sending Email");
MailNotifications.send(Global.options.getMailRecipient(), subject, notification);
}
public void notifyNotConnected() {
String logMessage = "There has been a connection issue for " + Settings.CHECK_PRICE_INTERVAL + " seconds\n"
+ "Consider restarting the bot if the connection issue persists";
String notification = "";
MessageColor notificationColor = MessageColor.YELLOW;
String subject = Global.options.getExchangeName() + " Bot is suffering a connection issue";
notify(subject, logMessage, notification, notificationColor);
}
public void notifyShutdown(LastPrice lp, double sleepTime) {
String logMessage = "The Fetched Exchange rate data has remained outside of the required price band for "
+ Settings.CHECK_PRICE_INTERVAL + "seconds.\nThe bot will notify and restart in "
+ sleepTime + "seconds.";
String notification = "A large price difference was detected at " + Global.options.getExchangeName()
+ ".\nThe Last obtained price of " + Objects.toString(lp.getPrice().getQuantity()) + " was outside of "
+ Objects.toString(Settings.PRICE_DISTANCE_MAX_PERCENT) + "% of the moving average figure of " + Objects.toString(getMovingAverage())
+ ".\nNuBot will remove the current orders and replace them in "
+ sleepTime + "seconds.";
MessageColor notificationColor = MessageColor.PURPLE;
String subject = Global.options.getExchangeName() + " Moving Average issue. Bot will replace orders in "
+ sleepTime + "seconds.";
notify(subject, logMessage, notification, notificationColor);
}
public void gracefulPause(LastPrice lp) {
//we need to check the reason that the refresh took a whole period.
//if it's because of a no connection issue, we need to wait to see if connection restarts
//we want to send gitter and mail notifications
//otherwise something bad has happened so we shutdown.
//This is called is an abnormal price is detected for one whole refresh period
int p = 3;
double sleepTime = Settings.CHECK_PRICE_INTERVAL * p;
notifyShutdown(lp, sleepTime);
// cancel all orders to avoid arbitrage against the bot and
// exit execution gracefully
LOG.error("Cancelling Orders to avoid Arbitrage against the bot");
Global.trade.clearOrders(Global.options.getPair());
//clear the moving average so the restart is fresh
queueMA.clear();
LOG.error("Sleeping for " + sleepTime);
SLEEP_COUNT = 3;
currentTime = System.currentTimeMillis();
}
public void updateLastPrice(LastPrice lp, PriceBatch batch) {
if (SessionManager.sessionInterrupted()) return; //external interruption
//We need to fill up the moving average queue so that 30 data points exist.
if (queueMA.size() < MOVING_AVERAGE_SIZE) {
initMA(lp.getPrice().getQuantity());
}
if (!Global.options.isMultipleCustodians()) { //
//we check against the moving average
double current = lp.getPrice().getQuantity();
double MA = getMovingAverage();
//calculate the percentage difference
double percentageDiff = (((MA - current) / ((MA + current) / 2)) * 100);
if ((percentageDiff > Settings.PRICE_DISTANCE_MAX_PERCENT) || (percentageDiff < -Settings.PRICE_DISTANCE_MAX_PERCENT)) {
//The potential price is more than % different to the moving average
//add it to the MA-Queue to raise the Moving Average and re-request the currency data
//in this way we can react to a large change in price when we are sure it is not an anomaly
LOG.warn("Latest price " + Objects.toString(current) + " is " + Objects.toString(percentageDiff) + "% outside of the moving average of " + Objects.toString(MA) + "."
+ "\nShifting moving average and re-fetching exchange rate data.");
updateMovingAverageQueue(current);
try {
int trials = 1;
executeUpdatePrice(trials);
} catch (FeedPriceException ex) {
}
return;
}
//the potential price is within the % boundary.
//add it to the MA-Queue to keep the moving average moving
// Only do this if the standard update interval hasn't passed
if (((System.currentTimeMillis() - (currentTime + REFRESH_OFFSET)) / 1000L) < Settings.CHECK_PRICE_INTERVAL) {
updateMovingAverageQueue(current);
} else {
//If we get here, we haven't had a price within % of the average for as long as a standard update period
//the action is to send notifications, cancel all orders and turn off the bot
gracefulPause(lp);
return;
}
}
if (SessionManager.sessionInterrupted()) return; //external interruption
//carry on with updating the wall price shift
this.lastPrice = lp;
this.lastPrices = batch.priceList;
LOG.info("Price Updated. " + lp.getSource() + ":1 " + lp.getCurrencyMeasured().getCode() + " = "
+ "" + lp.getPrice().getQuantity() + " " + lp.getPrice().getCurrency().getCode());
if (isFirstTimeExecution) {
try {
initStrategy(lp.getPrice().getQuantity());
} catch (NuBotConnectionException e) {
}
currentWallPEGPrice = lp;
isFirstTimeExecution = false;
} else {
verifyPegPrices();
}
}
private void verifyPegPrices() {
if (SessionManager.sessionInterrupted()) return; //external interruption
LOG.debug("Executing tryMoveWalls");
boolean needToShift = true;
if (!Global.options.isMultipleCustodians()) {
needToShift = needToMoveWalls(lastPrice);
//check if price moved more than x% from when the wall was setup
}
if (needToShift && !isWallsBeingShifted()) { //prevent a wall shift trigger if the strategy is already shifting walls.
LOG.info("Walls need to be shifted");
//Compute price for walls
currentWallPEGPrice = lastPrice;
computeNewPrices();
} else {
LOG.debug("No need to move walls");
currentTime = System.currentTimeMillis();
if (isWallsBeingShifted() && needToShift) {
LOG.warn("Wall shift is postponed: another process is already shifting existing walls. Will try again on next execution.");
}
}
}
private boolean needToMoveWalls(LastPrice last) {
double currentWallPEGprice = currentWallPEGPrice.getPrice().getQuantity();
double distance = Math.abs(last.getPrice().getQuantity() - currentWallPEGprice);
double percentageDistance = Utils.roundPlaces((distance * 100) / currentWallPEGprice, 4);
LOG.debug("delta =" + percentageDistance + "% (old : " + currentWallPEGprice + " new " + last.getPrice().getQuantity() + ")");
if (percentageDistance < wallchangeThreshold) {
return false;
} else {
return true;
}
}
private void computeNewPrices() {
if (SessionManager.sessionInterrupted()) return; //external interruption
double peg_price = lastPrice.getPrice().getQuantity();
double sellPricePEG_new;
double buyPricePEG_new;
if (Global.swappedPair) { //NBT as paymentCurrency
sellPricePEG_new = Utils.round(sellPriceUSD * peg_price);
buyPricePEG_new = Utils.round(buyPriceUSD * peg_price);
} else {
//convert sell price to PEG
sellPricePEG_new = Utils.round(sellPriceUSD / peg_price);
buyPricePEG_new = Utils.round(buyPriceUSD / peg_price);
}
BidAskPair newPrice = new BidAskPair(buyPricePEG_new, sellPricePEG_new);
//check if the price increased or decreased compared to last
if ((newPrice.getAsk() - this.bidask.getAsk()) > 0) {
this.pegPriceDirection = Constant.UP;
} else {
this.pegPriceDirection = Constant.DOWN;
}
//Store new value
this.bidask = newPrice;
LOG.info("Sell Price " + sellPricePEG_new + " | "
+ "Buy Price " + buyPricePEG_new);
//------------ here for output csv
//Call Strategy and notify the price change
strategy.notifyPriceChanged(sellPricePEG_new, buyPricePEG_new, peg_price);
double price = currentWallPEGPrice.getPrice().getQuantity();
Global.conversion = price;
writeShiftToFileAndNotify(currentWallPEGPrice.getSource(), price, currentWallPEGPrice.getPrice().getCurrency().getCode(),
pfm.getPair().getOrderCurrency().getCode(), this.lastPrices, sellPricePEG_new, buyPricePEG_new, wallchangeThreshold, false);
}
public static void writeShiftToFileAndNotify(String source, double price, String currency, String crypto, ArrayList<LastPrice> lastPrices, double sellPricePEG, double buyPricePEG, double wallchangeThreshold, boolean remote) {
Date currentDate = new Date();
String row = currentDate + ","
+ source + ","
+ crypto + ","
+ price + ","
+ currency + ","
+ sellPricePEG + ","
+ buyPricePEG + ",";
JSONArray backup_feeds = new JSONArray();
JSONArray otherPricesAtThisTime = new JSONArray();
JSONObject tempBackupFeed = new JSONObject();
for (int i = 0; i < lastPrices.size(); i++) {
LastPrice tempPrice = lastPrices.get(i);
tempBackupFeed.put("feed", tempPrice.getSource());
tempBackupFeed.put("price", tempPrice.getPrice().getQuantity());
otherPricesAtThisTime.add(tempBackupFeed);
}
LOG.info("New price computed [" + row + "]");
if (SessionManager.sessionInterrupted()) return; //external interruption
row += otherPricesAtThisTime.toString() + "\n";
backup_feeds.add(otherPricesAtThisTime);
logrow(row, wallshiftsFilePathCSV, true);
//Also update a json version of the output file
//build the latest data into a JSONObject
JSONObject wall_shift = new JSONObject();
wall_shift.put("timestamp", currentDate.getTime());
wall_shift.put("feed", source);
wall_shift.put("crypto", crypto);
wall_shift.put("price", price);
wall_shift.put("currency", currency);
wall_shift.put("sell_price", sellPricePEG);
wall_shift.put("buy_price", buyPricePEG);
wall_shift.put("backup_feeds", backup_feeds);
//now read the existing object if one exists
JSONParser parser = new JSONParser();
JSONObject wall_shift_info = new JSONObject();
JSONArray wall_shifts = new JSONArray();
try { //object already exists in file
wall_shift_info = (JSONObject) parser.parse(FilesystemUtils.readFromFile(wallshiftsFilePathJSON));
wall_shifts = (JSONArray) wall_shift_info.get("wall_shifts");
} catch (ParseException pe) {
LOG.error("Unable to parse order_history.json");
}
//add the latest orders to the orders array
wall_shifts.add(wall_shift);
wall_shift_info.put("wall_shifts", wall_shifts);
//then save
logWallShift(wall_shift_info.toJSONString());
if (SessionManager.sessionInterrupted()) return; //external interruption
if (Global.options.sendMails()) {
String title = " production (" + Global.options.getExchangeName() + ") [" + currency + "] price changed more than " + wallchangeThreshold + "%";
String messageNow = row;
emailHistory += messageNow;
String tldr = crypto + " price changed more than " + wallchangeThreshold + "% since last notification: "
+ "now is " + price + " " + currency + ".\n"
+ "Here are the prices the bot used in the new orders : \n"
+ "Sell at " + sellPricePEG + " " + crypto + " "
+ "and buy at " + buyPricePEG + " " + crypto + "\n"
+ "\n#########\n"
+ "Below you can see the history of price changes. You can copy paste to create a csv report."
+ "For each row the bot should have shifted the sell/buy walls.\n\n";
if (!Global.options.isMultipleCustodians()) {
MailNotifications.send(Global.options.getMailRecipient(), title, tldr + emailHistory);
}
}
}
public void setWallchangeThreshold(double wallchangeThreshold) {
this.wallchangeThreshold = wallchangeThreshold;
}
public void setStrategy(StrategySecondaryPegTask strategy) {
this.strategy = strategy;
}
public boolean isWallsBeingShifted() {
return wallsBeingShifted;
}
public void setWallsBeingShifted(boolean wallsBeingShifted) {
currentTime = System.currentTimeMillis();
this.wallsBeingShifted = wallsBeingShifted;
}
private void sendErrorNotification() {
String title = "Problems while updating " + pfm.getPair().getOrderCurrency().getCode() + " price. Cannot find a reliable feed.";
String message = "NuBot timed out after " + MAX_ATTEMPTS + " failed attempts to update " + pfm.getPair().getOrderCurrency().getCode() + ""
+ " price. Please restart the bot and get in touch with Nu Dev team ";
message += "[**" + SessionManager.getSessionId() + "**]";
MailNotifications.sendCritical(Global.options.getMailRecipient(), title, message);
GitterNotifications.sendMessageCritical(title + message);
LOG.error(title + message);
}
// ----- price utils ------
public double getMovingAverage() {
double MA = 0;
for (Iterator<Double> price = queueMA.iterator(); price.hasNext(); ) {
MA += price.next();
}
MA = MA / queueMA.size();
return MA;
}
public void updateMovingAverageQueue(double price) {
if (price == 0) {
//don't add 0
return;
}
queueMA.add(price);
//trim the queue so that it is a moving average over the correct number of data points
if (queueMA.size() > MOVING_AVERAGE_SIZE) {
queueMA.remove();
}
}
/**
* init queue by filling it with one price only
*
* @param price
*/
protected void initMA(double price) {
for (int i = 0; i <= 30; i++) {
updateMovingAverageQueue(price);
}
}
/*protected void notifyDeviation(PriceBatch priceBatch) {
String title = "Problems while updating " + pfm.getPair().getOrderCurrency().getCode() + " price. Cannot find a reliable feed.";
String message = "Positive response from " + priceBatch.size() + "/" + pfm.getFeedList().size() + " feeds\n";
message += "[<strong>" + SessionManager.getSessionId() + "</strong>]";
for (int i = 0; i < priceBatch.size(); i++) {
LastPrice tempPrice = priceBatch.get(i);
message += (tempPrice.getSource() + ":1 " + tempPrice.getCurrencyMeasured().getCode() + " = "
+ tempPrice.getPrice().getQuantity() + " " + tempPrice.getPrice().getCurrency().getCode()) + "\n";
}
MailNotifications.sendCritical(Global.options.getMailRecipient(), title, message);
GitterNotifications.sendMessageCritical(title + message);
LOG.error(title + message);
}*/
private static void logrow(String row, String outputPath, boolean append) {
FilesystemUtils.writeToFile(row, outputPath, append);
}
private static void logWallShift(String wall_shift) {
FilesystemUtils.writeToFile(wall_shift, wallshiftsFilePathJSON, false);
}
}