forked from HISP-Uganda/dhis2-wal
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathindex.js
More file actions
61 lines (58 loc) · 1.51 KB
/
index.js
File metadata and controls
61 lines (58 loc) · 1.51 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
import * as pgwire from "pgwire";
import * as common from "./common.js";
import * as dotenv from "dotenv";
dotenv.config();
const args = process.argv.slice(2);
const conn = await pgwire.pgconnect(
{
user: process.env.PG_USER,
password: process.env.PG_PASSWORD,
hostname: process.env.PG_HOST,
port: process.env.PG_PORT,
database: process.env.PG_DATABASE,
},
{
replication: "database",
}
);
let organisationUnits = {};
try {
// const [stopLsn] = await conn.query(`select pg_current_wal_lsn()`);
// if (args[0] !== "organisationunit") {
// const data = await common.api.post(`wal/bulk?index=${channel}`, data);
// }
const replicationStream = conn.logicalReplication({
slot: `${args[0]}_slot`,
startLsn: "0/0",
options: {
proto_version: 1,
publication_names: `${args[0]}_pub`,
// messages: "true",
},
});
for await (const chunk of replicationStream.pgoutputDecode()) {
const { messages, lastLsn } = chunk;
const data = messages
.filter(({ tag }) => ["update", "insert"].indexOf(tag) !== -1)
.map(({ after }) => {
const { geometry, ...others } = after;
return others;
});
try {
if (data.length > 0) {
await common.api.post(`wal/bulk?index=${args[0]}`, {
data,
index: args[0],
});
}
// replicationStream.ack(lastLsn);
} catch (error) {
console.log(error.message);
}
// if (lastLsn >= stopLsn) {
// break;
// }
}
} finally {
conn.end();
}