Skip to content
This repository was archived by the owner on Jan 23, 2025. It is now read-only.

Commit 64702be

Browse files
Read groupIds from ES
1 parent 423ec89 commit 64702be

File tree

4 files changed

+264
-10
lines changed

4 files changed

+264
-10
lines changed

npm-shrinkwrap.json

+185-4
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

+2
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,10 @@
2323
"dependencies": {
2424
"@hapi/joi": "^15.0.2",
2525
"async-mutex": "^0.2.4",
26+
"aws-sdk": "^2.694.0",
2627
"bluebird": "^3.7.2",
2728
"config": "^3.3.1",
29+
"elasticsearch": "^16.7.1",
2830
"get-parameter-names": "^0.3.0",
2931
"gulp": "^4.0.2",
3032
"http-json-response": "^1.0.1",

src/common/helper.js

+36-1
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22
* Contains generic helper methods
33
*/
44

5+
const elasticsearch = require('elasticsearch')
6+
const AWS = require('aws-sdk')
57
const _ = require('lodash')
68
const config = require('config')
79
const ifxnjs = require('ifxnjs')
@@ -15,9 +17,41 @@ const Pool = ifxnjs.Pool
1517
const pool = Promise.promisifyAll(new Pool())
1618
pool.setMaxPoolSize(config.get('INFORMIX.POOL_MAX_SIZE'))
1719

20+
AWS.config.region = config.get('V4_ES.AWS_REGION')
21+
// ES Client
22+
let esClient
23+
1824
// Bus API Client
1925
let busApiClient
2026

27+
/**
28+
* Get ES Client
29+
* @return {Object} Elastic Host Client Instance
30+
*/
31+
function getESClient () {
32+
const esHost = config.get('V4_ES.HOST')
33+
if (!esClient) {
34+
// AWS ES configuration is different from other providers
35+
if (/.*amazonaws.*/.test(esHost)) {
36+
esClient = elasticsearch.Client({
37+
apiVersion: config.get('V4_ES.API_VERSION'),
38+
hosts: esHost,
39+
connectionClass: require('http-aws-es'), // eslint-disable-line global-require
40+
amazonES: {
41+
region: config.get('V4_ES.AWS_REGION'),
42+
credentials: new AWS.EnvironmentCredentials('AWS')
43+
}
44+
})
45+
} else {
46+
esClient = new elasticsearch.Client({
47+
apiVersion: config.get('V4_ES.API_VERSION'),
48+
hosts: esHost
49+
})
50+
}
51+
}
52+
return esClient
53+
}
54+
2155
/**
2256
* Get Informix connection using the configured parameters
2357
* @return {Object} Informix connection
@@ -158,5 +192,6 @@ module.exports = {
158192
getRequest,
159193
putRequest,
160194
postRequest,
161-
postBusEvent
195+
postBusEvent,
196+
getESClient
162197
}

src/services/ProcessorService.js

+41-5
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@ async function getLegacyTrackInformation (trackId, typeId, tags, m2mToken) {
120120
* @param {Object} payload the Kafka message payload
121121
* @param {String} m2mToken the m2m token
122122
* @param {Boolean} isCreated flag indicate the DTO is used in creating challenge
123-
* @param {Object} existingV4Challenge the existing V4 challenge
123+
* @param {Object} existingV4Challenge the existing V4 challenge from ES
124124
* @returns the DTO for saving a draft contest.(refer SaveDraftContestDTO in ap-challenge-microservice)
125125
*/
126126
async function parsePayload (payload, m2mToken, isCreated = true, existingV4Challenge) {
@@ -374,6 +374,9 @@ async function processUpdate (message) {
374374
try {
375375
// ensure challenge existed
376376
challenge = await getChallengeById(m2mToken, message.payload.legacyId)
377+
if (!challenge) {
378+
throw new Error(`Could not find challenge ${message.payload.legacyId}`)
379+
}
377380
} catch (e) {
378381
// postponne kafka event
379382
logger.info('Challenge does not exist yet. Will post the same message back to the bus API')
@@ -386,13 +389,46 @@ async function processUpdate (message) {
386389
return
387390
}
388391

389-
const saveDraftContestDTO = await parsePayload(message.payload, m2mToken, false, challenge)
390-
logger.debug('Parsed Payload', saveDraftContestDTO)
392+
let challengeV4FromEs
391393
try {
392-
if (!challenge) {
393-
throw new Error(`Could not find challenge ${message.payload.legacyId}`)
394+
// Search with constructed query
395+
const esQuery = {
396+
index: config.get('ES.ES_INDEX'),
397+
type: config.get('ES.ES_TYPE'),
398+
size: 1,
399+
from: 0,
400+
body: {
401+
query: {
402+
match_phrase: {
403+
_id: message.payload.legacyId
404+
}
405+
}
406+
}
394407
}
408+
const docs = await helper.getESClient().search(esQuery)
409+
// Extract data from hits
410+
if (docs.hits.total === 0) {
411+
throw new Error('Challenge does not exist yet on ES')
412+
}
413+
challengeV4FromEs = _.map(docs.hits.hits, item => item._source)[0]
414+
if (!challengeV4FromEs) {
415+
throw new Error(`Could not find challenge ${message.payload.legacyId} on ES`)
416+
}
417+
} catch (e) {
418+
// postponne kafka event
419+
logger.info('Challenge does not exist yet on ES. Will post the same message back to the bus API')
420+
await new Promise((resolve) => {
421+
setTimeout(async () => {
422+
await helper.postBusEvent(config.UPDATE_CHALLENGE_TOPIC, message.payload)
423+
resolve()
424+
}, config.RETRY_TIMEOUT)
425+
})
426+
return
427+
}
395428

429+
const saveDraftContestDTO = await parsePayload(message.payload, m2mToken, false, challengeV4FromEs)
430+
logger.debug('Parsed Payload', saveDraftContestDTO)
431+
try {
396432
await helper.putRequest(`${config.V4_CHALLENGE_API_URL}/${message.payload.legacyId}`, { param: _.omit(saveDraftContestDTO, ['groupsToBeAdded', 'groupsToBeDeleted']) }, m2mToken)
397433
await associateChallengeGroups(saveDraftContestDTO.groupsToBeAdded, saveDraftContestDTO.groupsToBeDeleted, message.payload.legacyId)
398434

0 commit comments

Comments
 (0)