Skip to content
This repository was archived by the owner on Dec 22, 2020. It is now read-only.

Commit d750f35

Browse files
committed
first commit
1 parent a4b87ba commit d750f35

File tree

9 files changed

+319
-4
lines changed

9 files changed

+319
-4
lines changed

.eslintignore

+2
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
node_modules
2+
coverage

.eslintrc.js

+6
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
module.exports = {
2+
"extends": "standard",
3+
"plugins": [
4+
"standard"
5+
]
6+
};

.istanbul.yml

+8
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
instrumentation:
2+
excludes: ['test', 'node_modules']
3+
check:
4+
global:
5+
lines: 100
6+
branches: 100
7+
statements: 100
8+
functions: 100

package.json

+23-4
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
11
{
22
"name": "skiff",
33
"version": "1.0.0",
4-
"description": "",
5-
"main": "index.js",
4+
"description": "Raft for Node.js",
5+
"main": "src/index.js",
66
"scripts": {
7-
"test": "echo \"Error: no test specified\" && exit 1"
7+
"test": "node --harmony node_modules/istanbul/lib/cli.js cover -- lab -vl && istanbul check-coverage",
8+
"style": "eslint src"
89
},
910
"repository": {
1011
"type": "git",
@@ -20,5 +21,23 @@
2021
"bugs": {
2122
"url": "https://github.com/pgte/skiff/issues"
2223
},
23-
"homepage": "https://github.com/pgte/skiff#readme"
24+
"homepage": "https://github.com/pgte/skiff#readme",
25+
"devDependencies": {
26+
"eslint": "^3.1.1",
27+
"eslint-config-standard": "^5.3.5",
28+
"eslint-plugin-promise": "^2.0.0",
29+
"eslint-plugin-standard": "^2.0.0",
30+
"istanbul": "^0.4.4",
31+
"pre-commit": "^1.1.3"
32+
},
33+
"pre-commit": [
34+
"style",
35+
"test"
36+
],
37+
"dependencies": {
38+
"debug": "^2.2.0",
39+
"msgpack5": "^3.4.0",
40+
"multiaddr": "^2.0.2",
41+
"reconnect-core": "^1.3.0"
42+
}
2443
}

src/network/index.js

+7
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
const Network = require('./network')
2+
3+
module.exports = createNetwork
4+
5+
function createNetwork (options) {
6+
return new Network(options)
7+
}

src/network/network.js

+53
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
'use strict'
2+
3+
const debug = require('debug')('skiff.network')
4+
const Duplex = require('stream').Duplex
5+
const Peer = require('./peer')
6+
7+
const defaultOptions = {
8+
objectMode: true,
9+
highWaterMark: 50
10+
}
11+
12+
class Network extends Duplex {
13+
14+
constructor (_options) {
15+
const options = Object.assign({}, _options || {}, defaultOptions)
16+
debug('creating network with options %j', options)
17+
super(options)
18+
this._peers = {}
19+
this._options = options
20+
21+
this.once('finish', this._finish.bind(this))
22+
}
23+
24+
_read (size) {
25+
// do nothing
26+
}
27+
28+
_write (message, encoding, callback) {
29+
const peer = this._ensurePeer(message.to)
30+
peer.write(message, callback)
31+
}
32+
33+
_ensurePeer (address) {
34+
debug('ensuring peer %s', address)
35+
let peer = this._peers[address]
36+
if (!peer) {
37+
peer = this._peers[address] = new Peer(address, this._options)
38+
peer.on('error', (err) => this.emit('error', err))
39+
}
40+
41+
peer.pipe(this, { end: false }).pipe(peer, { end: false })
42+
return peer
43+
}
44+
45+
_finish () {
46+
debug('finishing')
47+
Object.keys(this._peers).forEach((addr) => this._peers[addr].end())
48+
this._peers = {}
49+
}
50+
51+
}
52+
53+
module.exports = Network

src/network/peer.js

+105
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
'use strict'
2+
3+
const debug = require('debug')('skiff.peer')
4+
const Duplex = require('stream').Duplex
5+
const Multiaddr = require('multiaddr')
6+
const Msgpack = require('msgpack5')
7+
8+
const reconnect = require('./reconnect')
9+
10+
const defaultOptions = {
11+
objectMode: true,
12+
highWaterMark: 50
13+
}
14+
15+
const interestingEvents = [
16+
'connect',
17+
'reconnect',
18+
'disconnect',
19+
'error'
20+
]
21+
22+
const reconnectOptions = {
23+
immediate: true
24+
}
25+
26+
const OK_ERRORS = [
27+
'ECONNREFUSED'
28+
]
29+
30+
class Peer extends Duplex {
31+
32+
constructor (address, options) {
33+
debug('constructing peer from address %j', address)
34+
super(Object.assign({}, options || {}, defaultOptions))
35+
this._address = Multiaddr(address)
36+
37+
this.once('finish', this._finish.bind(this))
38+
this._connect()
39+
}
40+
41+
_connect () {
42+
debug('connecting to %s', this._address)
43+
44+
this._reconnect = reconnect(reconnectOptions, (peer) => {
45+
debug('got stream to peer %s', this._address)
46+
const msgpack = Msgpack()
47+
48+
const toPeer = msgpack.encoder()
49+
toPeer.pipe(peer)
50+
51+
const fromPeer = this._out = msgpack.decoder()
52+
fromPeer.pipe(this, { end: false })
53+
fromPeer.on('data', (data) => this.push(data))
54+
55+
peer.on('error', (err) => {
56+
if (OK_ERRORS.indexOf(err.code) === -1) {
57+
this.emit('error', err)
58+
}
59+
})
60+
})
61+
.connect(this._address)
62+
63+
interestingEvents.forEach((event) => {
64+
this._reconnect.on(event, (payload) => {
65+
this.emit(event, payload)
66+
})
67+
})
68+
69+
this._reconnect
70+
.on('connect', (con) => {
71+
debug('connected to %s', this._address)
72+
con.on('error', (err) => {
73+
debug('error from peer:\n%s', err.stack)
74+
this.emit('warning', err)
75+
})
76+
})
77+
.on('disconnect', () => {
78+
debug('disconnected from %s', this._address)
79+
this._out = undefined
80+
})
81+
}
82+
83+
_read (size) {
84+
// do nothing, we'll emit data when the peer emits data
85+
}
86+
87+
_write (message, encoding, callback) {
88+
debug('writing %j to %s', message, this._address)
89+
if (this._out) {
90+
return this._out.write(message, callback)
91+
} else {
92+
debug('not connected yet to peer %s', this._address)
93+
// if we're not connected we discard the message
94+
callback()
95+
}
96+
}
97+
98+
_finish () {
99+
debug('finishing connection to peer %s', this._address)
100+
this._reconnect.disconnect()
101+
}
102+
103+
}
104+
105+
module.exports = Peer

src/network/reconnect.js

+15
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
'use strict'
2+
3+
const debug = require('debug')('skiff.reconnect')
4+
const Reconnect = require('reconnect-core')
5+
const net = require('net')
6+
7+
module.exports = Reconnect((maddr) => {
8+
const nodeAddr = maddr.nodeAddress()
9+
const addr = {
10+
port: nodeAddr.port,
11+
host: nodeAddr.address
12+
}
13+
debug('connecting to %j', addr)
14+
return net.connect(addr)
15+
})

test/network.js

+100
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
'use strict'
2+
3+
const Lab = require('lab')
4+
const lab = exports.lab = Lab.script()
5+
const describe = lab.experiment
6+
const before = lab.before
7+
const after = lab.after
8+
const it = lab.it
9+
const Code = require('code')
10+
const expect = Code.expect
11+
12+
const Multiaddr = require('multiaddr')
13+
const net = require('net')
14+
const timers = require('timers')
15+
const Msgpack = require('msgpack5')
16+
17+
const Network = require('../src/network')
18+
19+
const serverAddresses = [
20+
'/ip4/127.0.0.1/tcp/8080',
21+
'/ip4/127.0.0.1/tcp/8081',
22+
'/ip4/127.0.0.1/tcp/8082',
23+
]
24+
25+
describe('network', () => {
26+
27+
let network, servers
28+
let serverData = serverAddresses.map(() => [])
29+
30+
before(done => {
31+
let listening = 0
32+
33+
servers = serverAddresses.map((addr, index) => {
34+
const maddr = Multiaddr(addr)
35+
const server = net.createServer(onServerConnection)
36+
const listenAddr = maddr.nodeAddress()
37+
server.listen({port: listenAddr.port, host: listenAddr.address}, onceListening)
38+
return server
39+
40+
function onServerConnection(conn) {
41+
const msgpack = Msgpack()
42+
conn.pipe(msgpack.decoder()).on('data', onServerData)
43+
}
44+
45+
function onServerData(data) {
46+
serverData[index].push(data)
47+
}
48+
})
49+
50+
51+
function onceListening() {
52+
if (++ listening === servers.length) {
53+
done()
54+
}
55+
}
56+
})
57+
58+
after(done => {
59+
let closed = 0
60+
servers.forEach(server => server.close(onceClosed))
61+
network.end()
62+
63+
function onceClosed() {
64+
if (++ closed === servers.length) {
65+
done()
66+
}
67+
}
68+
})
69+
70+
it('can be instantiated', done => {
71+
network = Network()
72+
done()
73+
})
74+
75+
it('can be used to send a message to a peer', done => {
76+
network.write({to: serverAddresses[0], what: 'hey'}, done)
77+
})
78+
79+
it('peer gets the message', done => {
80+
expect(serverData[0]).to.equal([{to: serverAddresses[0], what: 'hey'}])
81+
done();
82+
})
83+
84+
it('allows message to unconnected peer', done => {
85+
network.write({to: '/ip4/127.0.0.1/tcp/8083', what: 'hey'}, done)
86+
})
87+
88+
it('waits a bit', done => {
89+
timers.setTimeout(done, 1000)
90+
})
91+
92+
// it('allows peer to disconnect', done => {
93+
94+
// })
95+
96+
// it('allows peer to reconnect', done => {
97+
98+
// })
99+
100+
})

0 commit comments

Comments
 (0)