Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ A node Thrift client utilising a pool of service connections and improved error
- builtin retry support using upstream Thrift code
- faster detection and pruning of dead connections
- async/await compatibility and other niceties
- enabled oneway function
- enable trace log with debug mod

## Example usage

Expand Down
5 changes: 3 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,9 @@
},
"homepage": "https://github.com/brigade/pooled-thrift-client#readme",
"dependencies": {
"generic-pool": "^3.4.0",
"thrift": "^0.11.0"
"debug": "^4.1.1",
"generic-pool": "^3.7.1",
"thrift": "^0.12.0"
},
"devDependencies": {
"eslint": "^4.13.1",
Expand Down
119 changes: 71 additions & 48 deletions src/index.js
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
const GenericPool = require('generic-pool');
const Thrift = require('thrift');
const debug = require("debug")("thrift-pool");
const GenericPool = require("generic-pool");
const Thrift = require("thrift");

const {
AcquisitionTimeoutError,
ConnectionTimeoutError,
ConnectionClosedError,
} = require('./errors');
} = require("./errors");

/* Callback handlers */

Expand All @@ -16,20 +18,23 @@ const {
*
* @return {Object} Three callbacks: onTimeout, onClose, onError
*/
const attachCallbacks = (connection, reject) => {
const attachCallbacks = (connection, reject, role) => {
const onTimeout = () => {
connection.alive = false;
if (role) debug(`connection timeout for thrift[${role}]`);
reject(new ConnectionTimeoutError());
};
const onClose = () => {
connection.alive = false;
if (role) debug(`connection closed for thrift[${role}]`);
reject(new ConnectionClosedError());
};
const onError = (error) => {
connection.alive = false;
if (role) debug(`connection error for thrift[${role}]`);
reject(error);
};
connection.on('timeout', onTimeout).on('close', onClose).on('error', onError);
connection.on("timeout", onTimeout).on("close", onClose).on("error", onError);

return { onTimeout, onClose, onError };
};
Expand All @@ -42,9 +47,9 @@ const attachCallbacks = (connection, reject) => {
*/
const detachCallbacks = (connection, { onTimeout, onClose, onError }) => {
connection
.removeListener('timeout', onTimeout)
.removeListener('close', onClose)
.removeListener('error', onError);
.removeListener("timeout", onTimeout)
.removeListener("close", onClose)
.removeListener("error", onError);
};

/* Constructors */
Expand All @@ -65,16 +70,20 @@ const createThriftConnection = (thriftOptions) => {
connection = Thrift.createConnection(host, port, thriftOptions);
connection.alive = false; // add a property for validation purposes
callbacks = attachCallbacks(connection, reject);
connection.on('connect', resolve);
}).then(() => {
detachCallbacks(connection, callbacks);
connection.connection.setKeepAlive(true); // socket manipulation
connection.alive = true; // state tracking
return connection;
}).catch((error) => {
detachCallbacks(connection, callbacks);
throw error;
});
connection.on("connect", resolve);
})
.then(() => {
detachCallbacks(connection, callbacks);
connection.connection.setKeepAlive(true); // socket manipulation
connection.alive = true; // state tracking
debug(`connection connected for thrift[${thriftOptions.role}]`);
return connection;
})
.catch((error) => {
debug(`connection error for thrift[${thriftOptions.role}]`, error);
detachCallbacks(connection, callbacks);
throw error;
});
};

/**
Expand All @@ -88,42 +97,45 @@ const createThriftConnection = (thriftOptions) => {
* the RPC response or an error
*/
const pooledRpc = (TService, rpc, pool) => (...args) => {
return pool.acquire()
.catch(e => Promise.reject(new AcquisitionTimeoutError(e.message)))
return pool
.acquire()
.catch((e) => Promise.reject(new AcquisitionTimeoutError(e.message)))
.then((connection) => {
let callbacks;
return new Promise((resolve, reject) => {
callbacks = attachCallbacks(connection, reject);
const client = Thrift.createClient(TService, connection);
resolve(client[rpc](...args));
}).then((response) => {
detachCallbacks(connection, callbacks);
pool.release(connection);
return response;
}).catch((error) => {
detachCallbacks(connection, callbacks);
pool.release(connection);
throw error;
});
})
.then((response) => {
detachCallbacks(connection, callbacks);
pool.release(connection);
return response;
})
.catch((error) => {
detachCallbacks(connection, callbacks);
pool.release(connection);
throw error;
});
});
};

/* Default options */

const DEFAULT_POOL_OPTIONS = {
max: 1,
min: 0,
max: 3,
min: 1,
idleTimeoutMillis: 30000,
acquireTimeoutMillis: 10000,
testOnBorrow: true,
testOnReturn: true,
};

const DEFAULT_THRIFT_OPTIONS = {
transport: Thrift.TFramedTransport,
protocol: Thrift.TBinaryProtocol,
connect_timeout: 1000,
max_attempts: 3,
role: "unknown",
};

/* Entrypoint */
Expand Down Expand Up @@ -161,25 +173,36 @@ const DEFAULT_THRIFT_OPTIONS = {
*/
module.exports = (TService, thriftOptions, clientOptions) => {
if (!thriftOptions.host || !thriftOptions.port) {
throw new Error('PooledThriftClient: both host and port must be specified');
throw new Error("PooledThriftClient: both host and port must be specified");
}

thriftOptions = Object.assign({}, DEFAULT_THRIFT_OPTIONS, thriftOptions);
const poolOptions = Object.assign({}, DEFAULT_POOL_OPTIONS, clientOptions.poolOptions);

const pool = GenericPool.createPool({
create: () => createThriftConnection(thriftOptions),
destroy: connection => new Promise(resolve => resolve(connection.end())),
validate: connection => new Promise((resolve) => {
resolve(connection.alive && connection.connected);
}),
}, poolOptions);
const poolOptions = Object.assign(
{},
DEFAULT_POOL_OPTIONS,
clientOptions.poolOptions
);

const pool = GenericPool.createPool(
{
create: () => createThriftConnection(thriftOptions),
destroy: (connection) =>
new Promise((resolve) => resolve(connection.end())),
validate: (connection) =>
new Promise((resolve) => {
resolve(connection.alive && connection.connected);
}),
},
poolOptions
);

const clientClass = TService.Client.prototype;
return Object.keys(clientClass).filter((k) => {
return clientClass.hasOwnProperty(`send_${k}`) && clientClass.hasOwnProperty(`recv_${k}`);
}).reduce((thriftClient, rpc) => {
thriftClient[rpc] = pooledRpc(TService, rpc, pool);
return thriftClient;
}, {});
return Object.keys(clientClass)
.filter((k) => {
return clientClass.hasOwnProperty(`send_${k}`);
})
.reduce((thriftClient, rpc) => {
thriftClient[rpc] = pooledRpc(TService, rpc, pool);
return thriftClient;
}, {});
};