Skip to content

Commit ad0bd0c

Browse files
committed
Swap Q to bluebird for promises, allow skipping indexing by resolving with 'undefined' and fix error
1 parent 1ccd0c1 commit ad0bd0c

File tree

3 files changed

+32
-13
lines changed

3 files changed

+32
-13
lines changed

bin/elasticsearch-reindex.js

+22-7
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,13 @@ cli
1919
.option('-b, --bulk [value]', 'bulk size for a thread', 100)
2020
.option('-q, --query_size [value]', 'query size for scroll', 100)
2121
.option('-s, --scroll [value]', 'default 1m', '1m')
22+
.option('-i, --sniff_cluster [value]', 'sniff the rest of the cluster upon initial connection and connection errors', true)
2223
.option('-o, --request_timeout [value]', 'default 60000', 60000)
2324
.option('-l, --log_path [value]', 'default ./reindex.log', './reindex.log')
2425
.option('-r, --trace', 'default false', false)
2526
.option('-n, --max_docs [value]', 'default -1 unlimited', -1)
26-
.option('-v, --api_ver [value]', 'default 1.5', '1.5')
27+
.option('--from_ver [value]', 'default 1.5', '1.5')
28+
.option('--to_ver [value]', 'default 1.5', '1.5')
2729
.option('-p, --parent [value]', 'if set, uses this field as parent field', '')
2830
.option('-m, --promise [value]', 'if set indexes expecting promises, default: false', false)
2931
.option('-z, --compress [value]', 'if set, requests compression of data in transit', false)
@@ -32,6 +34,16 @@ cli
3234
.option('-e, --region [value]', 'AWS region', false)
3335
.parse(process.argv);
3436

37+
for (var key in cli) {
38+
if (cli.hasOwnProperty(key)) {
39+
if (cli[key] === 'false') {
40+
cli[key] = false;
41+
} else if (cli[key] === 'true') {
42+
cli[key] = true;
43+
}
44+
}
45+
}
46+
3547
var logger = bunyan.createLogger({
3648
src: true,
3749
name: "elasticsearch-reindex",
@@ -147,7 +159,7 @@ if (cluster.isMaster) {
147159
shard_name = cluster.worker.id;
148160
}
149161

150-
function createClient(uri) {
162+
function createClient(uri, apiVersion) {
151163
if (!/\w+:\/\//.test(uri)) {
152164
uri = 'http://' + uri;
153165
}
@@ -162,10 +174,10 @@ if (cluster.isMaster) {
162174

163175
var config = {
164176
requestTimeout: cli.request_timeout,
165-
apiVersion: cli.api_ver,
177+
apiVersion: apiVersion,
166178
suggestCompression: cli.compress,
167-
sniffOnStart: true,
168-
sniffOnConnectionFault: true
179+
sniffOnStart: cli.sniff_cluster,
180+
sniffOnConnectionFault: cli.sniff_cluster
169181
};
170182

171183
if (cli.access_key && cli.secret_key && cli.region && /\.amazonaws\./.test(uri)) {
@@ -187,8 +199,8 @@ if (cluster.isMaster) {
187199
throw new Error('"from" and "to" parameters are required');
188200
}
189201

190-
var from = createClient(cli.from);
191-
to = createClient(cli.to),
202+
var from = createClient(cli.from, cli.from_ver);
203+
to = createClient(cli.to, cli.to_ver),
192204
processed_total = 0,
193205
processed_failed = 0;
194206

@@ -226,6 +238,9 @@ if (cluster.isMaster) {
226238

227239
from.client.search(scan_options, function scroll_fetch(err, res) {
228240
if (err) {
241+
if (err.message instanceof Error) {
242+
err = err.message;
243+
}
229244
logger.fatal(err);
230245
if (err.message.indexOf('parse') > -1) {
231246
throw new Error("Scroll body parsing error, query_size param is possibly too high.");

lib/indexer.js

+8-4
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ var async = require('async'),
44
_ = require('underscore'),
55
EventEmitter = require('events').EventEmitter,
66
inherits = require('util').inherits,
7-
Q = require('q');
7+
Promise = require('bluebird');
88

99
function Indexer () {
1010
EventEmitter.call(this);
@@ -90,16 +90,20 @@ Indexer.prototype.indexPromise = function(docs, options, cb) {
9090
var sent = false;
9191

9292
// map each indexer to return a promise
93-
return Q.all(chunk.map(function(item) {
94-
return Q.when(options.indexer(item, options, options.client));
93+
return Promise.all(chunk.map(function(item) {
94+
return options.indexer(item, options, options.client);
9595
})).catch(function (err) {
9696
self.emit('error', err);
9797
throw cb(sent = err);
9898
})
9999

100100
// Once all of the data resolves index it
101101
.then(function (bulk_data) {
102-
bulk_data = _.flatten(bulk_data);
102+
bulk_data = _.flatten(bulk_data).filter(function(x) { return x; });
103+
104+
if (!bulk_data.length) {
105+
return {};
106+
}
103107

104108
return options.client.bulk({
105109
body: bulk_data

package.json

+2-2
Original file line numberDiff line numberDiff line change
@@ -26,15 +26,15 @@
2626
"homepage": "https://github.com/garbin/elasticsearch-reindex",
2727
"dependencies": {
2828
"async": "^1.5.0",
29+
"bluebird": "^3.3.4",
2930
"bunyan": "^1.2.0",
3031
"commander": "^2.4.0",
31-
"elasticsearch": "^9.0.2",
32+
"elasticsearch": "^11.0.0",
3233
"event-emitter": "^0.3.1",
3334
"http-aws-es": "^1.1.3",
3435
"moment": "^2.8.3",
3536
"pace": "0.0.4",
3637
"progress": "^1.1.8",
37-
"q": "^1.4.1",
3838
"underscore": "^1.7.0"
3939
}
4040
}

0 commit comments

Comments
 (0)