3
3
*/
4
4
5
5
require ( './bootstrap' )
6
+
7
+ const AWSXRay = require ( 'aws-xray-sdk' )
8
+ const ns = AWSXRay . getNamespace ( ) ;
9
+
6
10
const _ = require ( 'lodash' )
7
11
const config = require ( 'config' )
8
12
const Kafka = require ( 'no-kafka' )
@@ -11,8 +15,6 @@ const logger = require('./common/logger')
11
15
const helper = require ( './common/helper' )
12
16
const ProcessorService = require ( './services/ProcessorService' )
13
17
14
- const AWSXRay = require ( 'aws-xray-sdk' )
15
- const segment = new AWSXRay . Segment ( 'legacy-challenge-processor' ) ;
16
18
17
19
// Start kafka consumer
18
20
logger . info ( 'Starting kafka consumer' )
@@ -26,6 +28,9 @@ const consumer = new Kafka.GroupConsumer(helper.getKafkaOptions())
26
28
* this function will be invoked
27
29
*/
28
30
const dataHandler = ( messageSet , topic , partition ) => Promise . each ( messageSet , async ( m ) => {
31
+ const segment = new AWSXRay . Segment ( 'legacy-challenge-processor' ) ;
32
+ AWSXRay . setSegment ( segment ) ;
33
+
29
34
const message = m . message . value . toString ( 'utf8' )
30
35
logger . info ( `Handle Kafka event message; Topic: ${ topic } ; Partition: ${ partition } ; Offset: ${ m . offset } ; Message: ${ message } .` )
31
36
@@ -57,67 +62,57 @@ const dataHandler = (messageSet, topic, partition) => Promise.each(messageSet, a
57
62
return
58
63
}
59
64
60
- const ns = AWSXRay . getNamespace ( ) ;
61
- console . log ( 'Running within context' , ns ) ;
62
- ns . run ( async ( ) => {
63
- console . log ( 'creating segment' ) ;
64
- console . log ( 'created segment' ) ;
65
- AWSXRay . setSegment ( segment ) ;
66
- console . log ( 'set segment' ) ;
67
-
68
- const { traceInformation : {
69
- traceId,
70
- parentSegmentId,
71
- } = {
72
- traceId : null ,
73
- parentSegmentId : null
74
- } } = messageJSON . payload ;
75
-
76
- console . log ( 'tracing information' , traceId , parentSegmentId ) ;
77
-
78
- if ( traceId ) {
79
- segment . trace_id = traceId ;
80
- segment . id = parentSegmentId ;
81
- }
65
+ const { traceInformation : {
66
+ traceId,
67
+ parentSegmentId,
68
+ } = {
69
+ traceId : null ,
70
+ parentSegmentId : null
71
+ } } = messageJSON . payload ;
82
72
73
+ console . log ( 'tracing information' , traceId , parentSegmentId ) ;
83
74
84
- // do not trust the message payload
85
- // the message.payload will be replaced with the data from the API
86
- try {
87
- console . log ( 'Fetch challenge details' ) ;
88
- const challengeUuid = _ . get ( messageJSON , 'payload.id' )
89
- if ( _ . isEmpty ( challengeUuid ) ) {
90
- segment . close ( ) ;
91
- segment . addError ( new Error ( err ) ) ;
92
- throw new Error ( 'Invalid payload' )
93
- }
94
- const m2mToken = await helper . getM2MToken ( )
95
- const v5Challenge = await helper . getRequest ( `${ config . V5_CHALLENGE_API_URL } /${ challengeUuid } ` , m2mToken )
96
- // TODO : Cleanup. Pulling the billingAccountId from the payload, it's not part of the challenge object
97
- messageJSON . payload = { billingAccountId : messageJSON . payload . billingAccountId , ...v5Challenge . body }
98
- } catch ( err ) {
99
- segment . addError ( new Error ( err ) ) ;
100
- logger . debug ( 'Failed to fetch challenge information' )
101
- logger . logFullError ( err )
102
- }
75
+ if ( traceId ) {
76
+ segment . trace_id = traceId ;
77
+ segment . id = parentSegmentId ;
78
+ }
103
79
104
- try {
105
- console . log ( 'Process challenge' )
106
- await ProcessorService . processMessage ( messageJSON )
107
80
108
- // logger.debug('Successfully processed message')
109
- } catch ( err ) {
81
+ // do not trust the message payload
82
+ // the message.payload will be replaced with the data from the API
83
+ try {
84
+ console . log ( 'Fetch challenge details' ) ;
85
+ const challengeUuid = _ . get ( messageJSON , 'payload.id' )
86
+ if ( _ . isEmpty ( challengeUuid ) ) {
87
+ segment . close ( ) ;
110
88
segment . addError ( new Error ( err ) ) ;
111
- logger . error ( `Error processing message ${ JSON . stringify ( messageJSON ) } ` )
112
- logger . logFullError ( err )
113
- } finally {
114
- // Commit offset regardless of error
115
- await consumer . commitOffset ( { topic, partition, offset : m . offset } )
89
+ throw new Error ( 'Invalid payload' )
116
90
}
91
+ const m2mToken = await helper . getM2MToken ( )
92
+ const v5Challenge = await helper . getRequest ( `${ config . V5_CHALLENGE_API_URL } /${ challengeUuid } ` , m2mToken )
93
+ // TODO : Cleanup. Pulling the billingAccountId from the payload, it's not part of the challenge object
94
+ messageJSON . payload = { billingAccountId : messageJSON . payload . billingAccountId , ...v5Challenge . body }
95
+ } catch ( err ) {
96
+ segment . addError ( new Error ( err ) ) ;
97
+ logger . debug ( 'Failed to fetch challenge information' )
98
+ logger . logFullError ( err )
99
+ }
117
100
118
- segment . close ( ) ;
119
- } )
101
+ try {
102
+ console . log ( 'Process challenge' )
103
+ await ProcessorService . processMessage ( messageJSON )
104
+
105
+ // logger.debug('Successfully processed message')
106
+ } catch ( err ) {
107
+ segment . addError ( new Error ( err ) ) ;
108
+ logger . error ( `Error processing message ${ JSON . stringify ( messageJSON ) } ` )
109
+ logger . logFullError ( err )
110
+ } finally {
111
+ // Commit offset regardless of error
112
+ await consumer . commitOffset ( { topic, partition, offset : m . offset } )
113
+ }
120
114
115
+ segment . close ( ) ;
121
116
} )
122
117
123
118
// check if there is kafka connection alive
@@ -135,20 +130,24 @@ const check = () => {
135
130
136
131
const topics = [ config . CREATE_CHALLENGE_TOPIC , config . UPDATE_CHALLENGE_TOPIC ]
137
132
138
- consumer
139
- . init ( [ {
140
- subscriptions : topics ,
141
- handler : dataHandler
142
- } ] )
143
- // consume configured topics
144
- . then ( ( ) => {
145
- logger . info ( 'Initialized.......' )
146
- healthcheck . init ( [ check ] )
147
- logger . info ( 'Adding topics successfully.......' )
148
- logger . info ( topics )
149
- logger . info ( 'Kick Start.......' )
133
+ ( ( ) => {
134
+ ns . run ( ( ) => {
135
+ consumer
136
+ . init ( [ {
137
+ subscriptions : topics ,
138
+ handler : dataHandler
139
+ } ] )
140
+ // consume configured topics
141
+ . then ( ( ) => {
142
+ logger . info ( 'Initialized.......' )
143
+ healthcheck . init ( [ check ] )
144
+ logger . info ( 'Adding topics successfully.......' )
145
+ logger . info ( topics )
146
+ logger . info ( 'Kick Start.......' )
147
+ } )
148
+ . catch ( ( err ) => logger . error ( err ) )
150
149
} )
151
- . catch ( ( err ) => logger . error ( err ) )
150
+ } ) ( ) ;
152
151
153
152
if ( process . env . NODE_ENV === 'test' ) {
154
153
module . exports = consumer
0 commit comments