|
| 1 | +var tb = require('timebucket') |
| 2 | + , crypto = require('crypto') |
| 3 | + , objectifySelector = require('./lib/objectify-selector') |
| 4 | + , collectionService = require('./lib/services/collection-service') |
| 5 | + |
| 6 | +var conf = require('./conf') |
| 7 | + |
| 8 | +let zenbot = {} |
| 9 | +zenbot.conf = conf |
| 10 | + |
| 11 | +var authStr = '', authMechanism, connectionString |
| 12 | + |
| 13 | +if(zenbot.conf.mongo.username){ |
| 14 | + authStr = encodeURIComponent(zenbot.conf.mongo.username) |
| 15 | + |
| 16 | + if(zenbot.conf.mongo.password) authStr += ':' + encodeURIComponent(zenbot.conf.mongo.password) |
| 17 | + |
| 18 | + authStr += '@' |
| 19 | + |
| 20 | + // authMechanism could be a conf.js parameter to support more mongodb authentication methods |
| 21 | + authMechanism = zenbot.conf.mongo.authMechanism || 'DEFAULT' |
| 22 | +} |
| 23 | + |
| 24 | +var connectionString = 'mongodb://' + authStr + zenbot.conf.mongo.host + ':' + zenbot.conf.mongo.port + '/' + zenbot.conf.mongo.db + '?' + |
| 25 | + (zenbot.conf.mongo.replicaSet ? '&replicaSet=' + zenbot.conf.mongo.replicaSet : '' ) + |
| 26 | + (authMechanism ? '&authMechanism=' + authMechanism : '' ) |
| 27 | + |
| 28 | + |
| 29 | +require('mongodb').MongoClient.connect(connectionString, { useNewUrlParser: true }, function (err, client) { |
| 30 | + if (err) { |
| 31 | + console.error('WARNING: MongoDB Connection Error: ', err) |
| 32 | + console.error('WARNING: without MongoDB some features (such as backfilling/simulation) may be disabled.') |
| 33 | + console.error('Attempted authentication string: ' + connectionString) |
| 34 | + cb(null, zenbot) |
| 35 | + return |
| 36 | + } |
| 37 | + |
| 38 | + |
| 39 | + |
| 40 | + var db = client.db('zenbot4') |
| 41 | + |
| 42 | + var EventEmitter = require('events') |
| 43 | + var eventBus = new EventEmitter() |
| 44 | + var conf = require('./conf') |
| 45 | + let zenbot = {} |
| 46 | + zenbot.conf = conf |
| 47 | + conf.eventBus = eventBus |
| 48 | + conf.db = {} |
| 49 | + conf.db.mongo = db |
| 50 | + console.log(conf.db) |
| 51 | + |
| 52 | + let cmd = {} |
| 53 | + cmd.days = 10 |
| 54 | + |
| 55 | + selector = objectifySelector(conf.selector) |
| 56 | + var exchange = require(`./extensions/exchanges/${selector.exchange_id}/exchange`)(conf) |
| 57 | + if (!exchange) { |
| 58 | + console.error('cannot backfill ' + selector.normalized + ': exchange not implemented') |
| 59 | + process.exit(1) |
| 60 | + } |
| 61 | + |
| 62 | + var collectionServiceInstance = collectionService(conf) |
| 63 | + var tradesCollection = collectionServiceInstance.getTrades() |
| 64 | + var resume_markers = collectionServiceInstance.getResumeMarkers() |
| 65 | + |
| 66 | + var marker = { |
| 67 | + id: crypto.randomBytes(4).toString('hex'), |
| 68 | + selector: selector.normalized, |
| 69 | + from: null, |
| 70 | + to: null, |
| 71 | + oldest_time: null, |
| 72 | + newest_time: null |
| 73 | + } |
| 74 | + marker._id = marker.id |
| 75 | + var trade_counter = 0 |
| 76 | + var day_trade_counter = 0 |
| 77 | + var get_trade_retry_count = 0 |
| 78 | + var days_left = cmd.days + 1 |
| 79 | + var target_time, start_time |
| 80 | + var mode = exchange.historyScan |
| 81 | + var last_batch_id, last_batch_opts |
| 82 | + var offset = exchange.offset |
| 83 | + var markers, trades |
| 84 | + if (!mode) { |
| 85 | + console.error('cannot backfill ' + selector.normalized + ': exchange does not offer historical data') |
| 86 | + process.exit(0) |
| 87 | + } |
| 88 | + if (mode === 'backward') { |
| 89 | + target_time = new Date().getTime() - (86400000 * cmd.days) |
| 90 | + } else { |
| 91 | + if (cmd.start >= 0 && cmd.end >= 0) { |
| 92 | + start_time = cmd.start |
| 93 | + target_time = cmd.end |
| 94 | + } else { |
| 95 | + target_time = new Date().getTime() |
| 96 | + start_time = new Date().getTime() - (86400000 * cmd.days) |
| 97 | + } |
| 98 | + } |
| 99 | + resume_markers.find({selector: selector.normalized}).toArray(function (err, results) { |
| 100 | + if (err) throw err |
| 101 | + markers = results.sort(function (a, b) { |
| 102 | + if (mode === 'backward') { |
| 103 | + if (a.to > b.to) return -1 |
| 104 | + if (a.to < b.to) return 1 |
| 105 | + } else { |
| 106 | + if (a.from < b.from) return -1 |
| 107 | + if (a.from > b.from) return 1 |
| 108 | + } |
| 109 | + return 0 |
| 110 | + }) |
| 111 | + getNext() |
| 112 | + }) |
| 113 | + |
| 114 | + function getNext() { |
| 115 | + var opts = {product_id: selector.product_id} |
| 116 | + if (mode === 'backward') { |
| 117 | + opts.to = marker.from |
| 118 | + } else { |
| 119 | + if (marker.to) opts.from = marker.to + 1 |
| 120 | + else opts.from = exchange.getCursor(start_time) |
| 121 | + } |
| 122 | + if (offset) { |
| 123 | + opts.offset = offset |
| 124 | + } |
| 125 | + last_batch_opts = opts |
| 126 | + |
| 127 | + console.log("opts: ",opts) |
| 128 | + exchange.getTrades(opts, function (err, results) { |
| 129 | + trades = results |
| 130 | + if (err) { |
| 131 | + console.error('err backfilling selector: ' + selector.normalized) |
| 132 | + console.error(err) |
| 133 | + if (err.code === 'ETIMEDOUT' || err.code === 'ENOTFOUND' || err.code === 'ECONNRESET') { |
| 134 | + console.error('retrying...') |
| 135 | + setImmediate(getNext) |
| 136 | + return |
| 137 | + } |
| 138 | + console.error('aborting!') |
| 139 | + process.exit(1) |
| 140 | + } |
| 141 | + if (mode !== 'backward' && !trades.length) { |
| 142 | + if (trade_counter) { |
| 143 | + console.log('\ndownload complete!\n') |
| 144 | + process.exit(0) |
| 145 | + } else { |
| 146 | + if (get_trade_retry_count < 5) { |
| 147 | + console.error('\ngetTrades() returned no trades, retrying with smaller interval.') |
| 148 | + get_trade_retry_count++ |
| 149 | + start_time += (target_time - start_time) * 0.4 |
| 150 | + setImmediate(getNext) |
| 151 | + return |
| 152 | + } else { |
| 153 | + console.error('\ngetTrades() returned no trades, --start may be too remotely in the past.') |
| 154 | + process.exit(1) |
| 155 | + } |
| 156 | + } |
| 157 | + } else if (!trades.length) { |
| 158 | + console.log('\ngetTrades() returned no trades, we may have exhausted the historical data range.') |
| 159 | + process.exit(0) |
| 160 | + } |
| 161 | + trades.sort(function (a, b) { |
| 162 | + if (mode === 'backward') { |
| 163 | + if (a.time > b.time) return -1 |
| 164 | + if (a.time < b.time) return 1 |
| 165 | + } else { |
| 166 | + if (a.time < b.time) return -1 |
| 167 | + if (a.time > b.time) return 1 |
| 168 | + } |
| 169 | + return 0 |
| 170 | + }) |
| 171 | + if (last_batch_id && last_batch_id === trades[0].trade_id) { |
| 172 | + console.error('\nerror: getTrades() returned duplicate results') |
| 173 | + console.error(opts) |
| 174 | + console.error(last_batch_opts) |
| 175 | + process.exit(0) |
| 176 | + } |
| 177 | + last_batch_id = trades[0].trade_id |
| 178 | + runTasks(trades) |
| 179 | + }) |
| 180 | + } |
| 181 | + |
| 182 | + function runTasks(trades) { |
| 183 | + Promise.all(trades.map((trade) => saveTrade(trade))).then(function (/*results*/) { |
| 184 | + var oldest_time = marker.oldest_time |
| 185 | + var newest_time = marker.newest_time |
| 186 | + markers.forEach(function (other_marker) { |
| 187 | + // for backward scan, if the oldest_time is within another marker's range, skip to the other marker's start point. |
| 188 | + // for forward scan, if the newest_time is within another marker's range, skip to the other marker's end point. |
| 189 | + if (mode === 'backward' && marker.id !== other_marker.id && marker.from <= other_marker.to && marker.from > other_marker.from) { |
| 190 | + marker.from = other_marker.from |
| 191 | + marker.oldest_time = other_marker.oldest_time |
| 192 | + } else if (mode !== 'backward' && marker.id !== other_marker.id && marker.to >= other_marker.from && marker.to < other_marker.to) { |
| 193 | + marker.to = other_marker.to |
| 194 | + marker.newest_time = other_marker.newest_time |
| 195 | + } |
| 196 | + }) |
| 197 | + var diff |
| 198 | + if (oldest_time !== marker.oldest_time) { |
| 199 | + diff = tb(oldest_time - marker.oldest_time).resize('1h').value |
| 200 | + console.log('\nskipping ' + diff + ' hrs of previously collected data') |
| 201 | + } else if (newest_time !== marker.newest_time) { |
| 202 | + diff = tb(marker.newest_time - newest_time).resize('1h').value |
| 203 | + console.log('\nskipping ' + diff + ' hrs of previously collected data') |
| 204 | + } |
| 205 | + resume_markers.save(marker) |
| 206 | + .then(setupNext) |
| 207 | + .catch(function (err) { |
| 208 | + if (err) throw err |
| 209 | + }) |
| 210 | + }).catch(function (err) { |
| 211 | + if (err) { |
| 212 | + console.error(err) |
| 213 | + console.error('retrying...') |
| 214 | + return setTimeout(runTasks, 10000, trades) |
| 215 | + } |
| 216 | + }) |
| 217 | + } |
| 218 | + |
| 219 | + function setupNext() { |
| 220 | + trade_counter += trades.length |
| 221 | + day_trade_counter += trades.length |
| 222 | + var current_days_left = 1 + (mode === 'backward' ? tb(marker.oldest_time - target_time).resize('1d').value : tb(target_time - marker.newest_time).resize('1d').value) |
| 223 | + if (current_days_left >= 0 && current_days_left != days_left) { |
| 224 | + console.log('\n' + selector.normalized, 'saved', day_trade_counter, 'trades', current_days_left, 'days left') |
| 225 | + day_trade_counter = 0 |
| 226 | + days_left = current_days_left |
| 227 | + } else { |
| 228 | + process.stdout.write('.') |
| 229 | + } |
| 230 | + |
| 231 | + if (mode === 'backward' && marker.oldest_time <= target_time) { |
| 232 | + console.log('\ndownload complete!\n') |
| 233 | + process.exit(0) |
| 234 | + } else if (cmd.start >= 0 && cmd.end >= 0 && target_time <= marker.newest_time) { |
| 235 | + console.log('\ndownload of span (' + cmd.start + ' - ' + cmd.end + ') complete!\n') |
| 236 | + process.exit(0) |
| 237 | + } |
| 238 | + |
| 239 | + if (exchange.backfillRateLimit) { |
| 240 | + setTimeout(getNext, exchange.backfillRateLimit) |
| 241 | + } else { |
| 242 | + setImmediate(getNext) |
| 243 | + } |
| 244 | + } |
| 245 | + |
| 246 | + function saveTrade(trade) { |
| 247 | + trade.id = selector.normalized + '-' + String(trade.trade_id) |
| 248 | + trade._id = trade.id |
| 249 | + trade.selector = selector.normalized |
| 250 | + var cursor = exchange.getCursor(trade) |
| 251 | + if (mode === 'backward') { |
| 252 | + if (!marker.to) { |
| 253 | + marker.to = cursor |
| 254 | + marker.oldest_time = trade.time |
| 255 | + marker.newest_time = trade.time |
| 256 | + } |
| 257 | + marker.from = marker.from ? Math.min(marker.from, cursor) : cursor |
| 258 | + marker.oldest_time = Math.min(marker.oldest_time, trade.time) |
| 259 | + } else { |
| 260 | + if (!marker.from) { |
| 261 | + marker.from = cursor |
| 262 | + marker.oldest_time = trade.time |
| 263 | + marker.newest_time = trade.time |
| 264 | + } |
| 265 | + marker.to = marker.to ? Math.max(marker.to, cursor) : cursor |
| 266 | + marker.newest_time = Math.max(marker.newest_time, trade.time) |
| 267 | + } |
| 268 | + return tradesCollection.save(trade) |
| 269 | + } |
| 270 | +}) |
0 commit comments