Skip to content

Commit b5f432d

Browse files
committed
add feature to easy use sharded reindexing
1 parent e049860 commit b5f432d

File tree

3 files changed

+43
-23
lines changed

3 files changed

+43
-23
lines changed

bin/elasticsearch-reindex.js

+39-3
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ var cli = require('commander'),
44
elasticsearch = require('elasticsearch')
55
async = require('async'),
66
cluster = require('cluster'),
7+
moment = require('moment'),
78
_ = require('underscore'),
89
bunyan = require('bunyan'),
910
ProgressBar = require('progress'),
@@ -13,7 +14,7 @@ var cli = require('commander'),
1314

1415

1516
cli
16-
.version('1.1.1')
17+
.version('1.1.2')
1718
.option('-f, --from [value]', 'source index, eg. http://192.168.1.100:9200/old_index/old_type')
1819
.option('-t, --to [value]', 'to index, eg. http://192.168.1.100:9200/new_index/new_type')
1920
.option('-c, --concurrency [value]', 'concurrency for reindex', require('os').cpus().length)
@@ -37,7 +38,41 @@ var custom_indexer = cli.args[0] ? require(fs.realpathSync(cli.args[0])) : null;
3738

3839
if (cluster.isMaster) {
3940
if (custom_indexer.sharded) {
40-
custom_indexer.sharded.ranges.forEach(function(shard) {
41+
var ranges = [];
42+
if (custom_indexer.sharded.ranges) {
43+
ranges = custom_indexer.sharded.ranges;
44+
} else {
45+
var now = moment();
46+
var start = moment(custom_indexer.sharded.start);
47+
var end = custom_indexer.sharded.end ? moment(custom_indexer.sharded.end) : now;
48+
var current = start;
49+
var interval_days = 1;
50+
switch(custom_indexer.sharded.interval) {
51+
case 'month':
52+
interval_days = 30;
53+
break;
54+
case 'week':
55+
interval_days = 7;
56+
break;
57+
default:
58+
interval_days = parseInt(custom_indexer.sharded.interval);
59+
}
60+
while(current < end){
61+
var current_end = current.clone().add(interval_days, 'days');
62+
if (current_end > end) {
63+
current_end = end;
64+
}
65+
ranges.push({
66+
name: current.format('YYMMDD') + '-' + current_end.format('YYMMDD'),
67+
range: {
68+
gte: current.format('YYYY-MM-DD'),
69+
lt: current_end.format('YYYY-MM-DD')
70+
}
71+
});
72+
current = current_end;
73+
}
74+
}
75+
ranges.forEach(function(shard) {
4176
var worker_arg = {range:{}, name: shard.name};
4277
worker_arg.range[custom_indexer.sharded.field] = shard.range;
4378
cluster.fork({worker_arg:JSON.stringify(worker_arg)});
@@ -96,6 +131,7 @@ if (cluster.isMaster) {
96131
});
97132

98133
reindexer.on('batch-complete', function(num_of_success) {
134+
console.log("\n");
99135
bar.tick(num_of_success);
100136
});
101137

@@ -129,7 +165,7 @@ if (cluster.isMaster) {
129165
scroll : cli.scroll
130166
}, scroll_fetch);
131167
} else {
132-
console.log("\n Total " + processed_total + " documents have been reindexed!");
168+
console.log("\n " + shard_name + " Total " + processed_total + " documents have been reindexed!");
133169
process.exit();
134170
}
135171
});

package.json

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "elasticsearch-reindex",
3-
"version": "1.1.1",
3+
"version": "1.1.2",
44
"description": "Elasticsearch reindex tool",
55
"main": "index.js",
66
"scripts": {

sample/indexer.js

+3-19
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,9 @@
11
var moment = require('moment');
22

33
module.exports = {
4-
sharded: {
5-
field: 'idate',
6-
ranges:[
7-
{
8-
name: '7_1',
9-
range: {
10-
lt: '2014-07-15'
11-
}
12-
},
13-
{
14-
name: '7_2',
15-
range: {
16-
gte: '2014-07-15',
17-
lt: '2014-08-01'
18-
}
19-
}
20-
]
21-
},
22-
query: { match_all:{} },
4+
// interval: months, weeks, days
5+
sharded: { field: 'idate', interval:'5', start:'2014-09-01', end:'2014-10-01'},
6+
// query: { match_all:{} },
237
index: function(item, options) {
248
return [
259
{index:{_index: 'listening_' + moment(item._source.cdate).format('YYYYMM'), _type:options.type || item._type, _id: item._id}},

0 commit comments

Comments
 (0)