Skip to content

Commit 3787313

Browse files
committed
lets spread the gospel of streams
1 parent 04ff2b7 commit 3787313

10 files changed

+480
-150
lines changed

LICENSE

+19
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
Copyright (c) 2015 Timothy J Fontaine <[email protected]>
2+
3+
Permission is hereby granted, free of charge, to any person obtaining a copy
4+
of this software and associated documentation files (the "Software"), to deal
5+
in the Software without restriction, including without limitation the rights
6+
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
7+
copies of the Software, and to permit persons to whom the Software is
8+
furnished to do so, subject to the following conditions:
9+
10+
The above copyright notice and this permission notice shall be included in
11+
all copies or substantial portions of the Software.
12+
13+
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14+
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15+
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16+
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17+
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
18+
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
19+
THE SOFTWARE.

connectstream.js

+29
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
"use strict";
2+
3+
var stream = require('stream');
4+
var util = require('util');
5+
6+
function ConnectStream(socket, options) {
7+
if (!(this instanceof ConnectStream))
8+
return new ConnectStream(socket, options);
9+
10+
stream.Readable.call(this, {
11+
objectMode: true,
12+
});
13+
14+
this.cs_socket = socket;
15+
16+
var self = this;
17+
18+
socket.on('connection', function socketOnConnect(client) {
19+
self.push(client);
20+
});
21+
}
22+
util.inherits(ConnectStream, stream.Readable);
23+
24+
// One day Node.js will allow me to accept here, for now we have no
25+
// listen backpressure controls
26+
ConnectStream.prototype._read = function connectStreamRead() {
27+
}
28+
29+
module.exports = ConnectStream;

dnsbl.js

+117
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
"use strict";
2+
3+
var dns = require('dns');
4+
var EE = require('events').EventEmitter;
5+
var util = require('util');
6+
7+
var vasync = require('vasync');
8+
9+
10+
/*
11+
{
12+
ip: '192.168.1.1',
13+
timeout: 4000,
14+
servers: {
15+
'dnsbl.example.com': {
16+
zone: 'dnsbl.example.com',
17+
defaultScore: 1,
18+
defaultCloak: 'exdnsbl.oftc.net',
19+
records: {
20+
'127.0.0.1': {
21+
score: 1,
22+
cloak: 'robot.exdnsbl.oftc.net',
23+
stop: true,
24+
},
25+
},
26+
},
27+
},
28+
}
29+
*/
30+
31+
function DNSBLQuery(options, cb) {
32+
if (!(this instanceof DNSBLQuery))
33+
return new DNSBLQuery(options, cb);
34+
35+
if (cb)
36+
this.once('end', cb);
37+
38+
this.score = {
39+
total: 0,
40+
results: {},
41+
errors: {},
42+
};
43+
44+
var q = this.dnsbl_parallel = vasync.queue(dnsblQuery, options.parallel || 4);
45+
46+
// TODO IPv6
47+
var revip = options.ip.split('.').reverse().join('.');
48+
49+
var servers = Object.keys(options.servers);
50+
51+
for (var s in servers) {
52+
q.push({
53+
revip: revip,
54+
server: options.servers[servers[s]],
55+
timeout: options.timeout || 4000,
56+
query: this,
57+
});
58+
}
59+
60+
q.close();
61+
62+
var self = this;
63+
64+
q.on('end', function onQueueEnd() {
65+
self.emit('end', null, self.score);
66+
});
67+
}
68+
util.inherits(DNSBLQuery, EE);
69+
70+
71+
function dnsblQuery(args, cb) {
72+
var server = args.server;
73+
var query = args.query;
74+
var name = args.revip + '.' + server.zone;
75+
76+
var timeout = setTimeout(endSingleQuery, args.timeout);
77+
78+
function endSingleQuery() {
79+
clearTimeout(timeout);
80+
cb();
81+
}
82+
83+
// TODO use native-dns
84+
dns.resolve(name, function dnsResult(err, hosts) {
85+
if (err) {
86+
query.score.errors[server.zone] = err;
87+
} else {
88+
var ret = query.score.results[server.zone] = {};
89+
var anyScore = false;
90+
var anyCloak = false;
91+
92+
for (var h in hosts) {
93+
var entry = server.records[h];
94+
if (entry) {
95+
anyScore = true;
96+
ret[h] = query.score.total;
97+
query.score.total += entry.score;
98+
99+
100+
if (entry.cloak) {
101+
anyCloak = true;
102+
query.score.cloak = entry.cloak;
103+
}
104+
}
105+
}
106+
107+
if (!anyScore)
108+
query.score.total += server.defaultScore;
109+
110+
if (!anyCloak && server.defaultCloak)
111+
query.score.cloak = server.defaultCloak;
112+
}
113+
endSingleQuery();
114+
});
115+
}
116+
117+
module.exports = DNSBLQuery;

dnsfilter.js

+75
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
"use strict";
2+
3+
var dns = require('dns');
4+
var stream = require('stream');
5+
var util = require('util');
6+
7+
var vasync = require('vasync');
8+
9+
var DNSBLQuery = require('./dnsbl');
10+
11+
function DNSFilter(config) {
12+
if (!(this instanceof DNSFilter))
13+
return new DNSFilter(config);
14+
15+
stream.Transform.call(this, {
16+
objectMode: true,
17+
highWaterMark: 0,
18+
});
19+
20+
this.dnsfltr_config = config.dnsbl;
21+
}
22+
util.inherits(DNSFilter, stream.Transform);
23+
24+
25+
DNSFilter.prototype._transform = function dnsfltrTransform(client, enc, cb) {
26+
var self = this;
27+
28+
var work = vasync.parallel({
29+
funcs: [
30+
function reverseDns(next) {
31+
dns.resolve(client.remoteAddress, 'PTR', function dnsResult(err,
32+
hostnames) {
33+
console.error('reverse dns result', err, hostnames);
34+
next(err, hostnames);
35+
});
36+
},
37+
function dnsblQuery(next) {
38+
DNSBLQuery({
39+
ip: client.remoteAddress,
40+
servers: self.dnsfltr_config.servers,
41+
}, next);
42+
},
43+
],
44+
}, function (err, results) {
45+
var reverse = results.operations[0].result;
46+
47+
if (reverse && reverse.length)
48+
client.hostname = reverse[0];
49+
else
50+
client.hostname = client.remoteAddress;
51+
52+
var score = results.operations[1].result;
53+
54+
console.error('dnsbl results', score);
55+
56+
if (score) {
57+
client.score = score;
58+
59+
if (score.total > self.dnsfltr_config.maxScore) {
60+
// TODO log
61+
client.end('Administratively refused');
62+
return client.destroy();
63+
}
64+
65+
if (score.cloak)
66+
client.cloak = score.cloak;
67+
}
68+
69+
self.push(client);
70+
});
71+
72+
return cb();
73+
};
74+
75+
module.exports = DNSFilter;

index.js

+59
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
#!/usr/bin/env node
2+
3+
"use strict";
4+
5+
var net = require('net');
6+
var tls = require('tls');
7+
8+
var ConnectStream = require('./connectstream');
9+
var DNSFilter = require('./dnsfilter');
10+
var IRCProxy = require('./ircproxy');
11+
var Throttle = require('./throttle');
12+
var config = require('./config');
13+
14+
var throttle = new Throttle(config);
15+
16+
Object.keys(config.listeners).forEach(function eachListener(ip) {
17+
var listener = config.listeners[ip];
18+
var proto = undefined;
19+
20+
var options = {
21+
host: ip,
22+
port: listener.port,
23+
};
24+
25+
switch (listener.type) {
26+
case 'plain':
27+
proto = net;
28+
break;
29+
case 'ssl':
30+
proto = tls;
31+
break;
32+
case 'websocket':
33+
case 'socketio':
34+
throw new Exception('Not Implemented Yet');
35+
break;
36+
default:
37+
throw new Exception(
38+
'Must define listener type: [plain, ssl, websocket, socketio]'
39+
);
40+
break;
41+
}
42+
43+
var server = proto.createServer();
44+
45+
server.listen(options);
46+
47+
server.on('listening', function serverListening() {
48+
ConnectStream(server)
49+
.pipe(throttle)
50+
.pipe(DNSFilter(config))
51+
.pipe(IRCProxy(config))
52+
.resume(); // Don't stop accepting new clients
53+
});
54+
55+
server.on('error', function serverError(err) {
56+
console.error('listener failed', err);
57+
// TODO restart listener?
58+
});
59+
});

ircproxy.js

+73
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
"use strict";
2+
3+
var net = require('net');
4+
var stream = require('stream');
5+
var tls = require('tls');
6+
var util = require('util');
7+
8+
var lstream = require('lstream');
9+
10+
var WebIRC = require('./webirc');
11+
12+
function IRCProxy(config) {
13+
if (!(this instanceof IRCProxy))
14+
return new IRCProxy(config);
15+
16+
stream.Transform.call(this, {
17+
objectMode: true,
18+
highWaterMark: 0,
19+
});
20+
21+
this.ircp_config = config;
22+
23+
switch (config.destination.type) {
24+
case 'plain':
25+
this.ircp_proto = net;
26+
break;
27+
case 'ssl':
28+
this.ircp_proto = tls;
29+
break;
30+
}
31+
32+
this.ircp_options = {
33+
host: config.destination.host,
34+
port: config.destination.port,
35+
//TODO TLS
36+
};
37+
}
38+
util.inherits(IRCProxy, stream.Transform);
39+
40+
IRCProxy.prototype._transform = function ircProxyTransform(client, enc, cb) {
41+
var outbound = this.ircp_proto.connect(this.ircp_options);
42+
43+
var self = this;
44+
45+
outbound.on('connect', function outboundConnected() {
46+
client
47+
.pipe(WebIRC(client, self.ircp_config))
48+
.pipe(outbound)
49+
.pipe(client);
50+
});
51+
52+
client.on('close', function clientClosed() {
53+
outbound.destroy();
54+
});
55+
56+
outbound.on('close', function outboundClosed() {
57+
client.destroy();
58+
});
59+
60+
outbound.on('error', function outboundError(err) {
61+
// TODO relay error
62+
console.error('outbound connection error', client, err);
63+
});
64+
65+
client.on('error', function clientError(err) {
66+
// TODO relay error
67+
console.error('client connection error', client, err);
68+
});
69+
70+
cb();
71+
};
72+
73+
module.exports = IRCProxy;

0 commit comments

Comments
 (0)