This repository was archived by the owner on Apr 30, 2021. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdata-api.js
120 lines (108 loc) · 4.87 KB
/
data-api.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
const http = require('http')
const cors = require('cors')
const express = require('express')
const ws = require('sc-uws')
const Optimist = require('optimist')
const StreamFetcher = require('./src/StreamFetcher')
const WebsocketServer = require('./src/WebsocketServer')
const RedisUtil = require('./src/RedisUtil')
const { startCassandraStorage } = require('./src/Storage')
const StreamrKafkaProducer = require('./src/KafkaUtil')
const Partitioner = require('./src/Partitioner')
const Publisher = require('./src/Publisher')
const MetricsLoggerConsole = require('./src/utils/MetricsLoggerConsole')
const MetricsLoggerStream = require('./src/utils/MetricsLoggerStream')
const MetricsLoggerMulti = require('./src/utils/MetricsLoggerMulti')
const dataQueryEndpoints = require('./src/rest/DataQueryEndpoints')
const dataProduceEndpoints = require('./src/rest/DataProduceEndpoints')
const volumeEndpoint = require('./src/rest/VolumeEndpoint')
module.exports = async (externalConfig) => {
const config = (externalConfig || Optimist.usage(`You must pass the following command line options:
--data-topic <topic>
--zookeeper <conn_string>
--redis <redis_hosts_separated_by_commas>
--redis-pwd <password>
--cassandra <cassandra_hosts_separated_by_commas>
--cassandra-username <cassandra_username>
--cassandra-pwd <cassandra_password>
--keyspace <cassandra_keyspace>
--streamr <streamr>
--port <port>
--apiKey <apiKey>
--streamId <streamId>
--nodeId <nodeId>`)
.demand(['data-topic', 'zookeeper', 'redis', 'redis-pwd', 'cassandra', 'cassandra-username', 'cassandra-pwd', 'keyspace', 'streamr', 'port'])
.argv)
// Metrics loggers
const nodeId = config.nodeId ? config.nodeId : `data-api:${config.port}`
const consoleLogger = new MetricsLoggerConsole(60, nodeId)
const streamLogger = new MetricsLoggerStream(3, nodeId, config.apiKey, config.streamId)
const combinedLogger = new MetricsLoggerMulti([consoleLogger, streamLogger])
// Create some utils
const streamFetcher = new StreamFetcher(config.streamr)
const redis = new RedisUtil(config.redis.split(','), config['redis-pwd'])
const storage = await startCassandraStorage(
config.cassandra.split(','), 'datacenter1', config.keyspace,
config['cassandra-username'], config['cassandra-pwd'],
)
const kafka = new StreamrKafkaProducer(config['data-topic'], Partitioner, config.zookeeper)
const publisher = new Publisher(kafka, Partitioner, combinedLogger)
// Create HTTP server
const app = express()
const httpServer = http.Server(app)
// Add CORS headers
app.use(cors())
// Websocket endpoint is handled by WebsocketServer
const websocketServer = new WebsocketServer(
new ws.Server({
server: httpServer,
path: '/api/v1/ws',
/**
* Gracefully reject clients sending invalid headers. Without this change, the connection gets abruptly closed,
* which makes load balancers such as nginx think the node is not healthy.
* This blocks ill-behaving clients sending invalid headers, as well as very old websocket implementations
* using draft 00 protocol version (https://tools.ietf.org/html/draft-ietf-hybi-thewebsocketprotocol-00)
*/
verifyClient: (info, cb) => {
if (info.req.headers['sec-websocket-key']) {
cb(true)
} else {
cb(false, 400, 'Invalid headers on websocket request. Please upgrade your browser or websocket library!')
}
},
}),
redis,
storage,
streamFetcher,
publisher,
combinedLogger
)
// Rest endpoints
app.use('/api/v1', dataQueryEndpoints(storage, streamFetcher, combinedLogger))
app.use('/api/v1', dataProduceEndpoints(streamFetcher, publisher, combinedLogger))
app.use('/api/v1', volumeEndpoint(consoleLogger)) // respond from consoleLogger because it is less noisy due to longer interval
// Start the server
httpServer.listen(config.port, () => {
console.log(`Configured with Redis: ${config.redis}`)
console.log(`Configured with Cassandra: ${config.cassandra}`)
console.log(`Configured with Kafka: ${config.zookeeper} and topic '${config['data-topic']}'`)
console.log(`Configured with Streamr: ${config.streamr}`)
console.log(`Listening on port ${config.port}`)
httpServer.emit('listening')
})
return {
httpServer,
websocketServer,
close: () => {
httpServer.close()
redis.quit()
storage.close()
kafka.close()
combinedLogger.stop()
},
}
}
// Start the server if we're not being required from another module
if (require.main === module) {
module.exports()
}