@@ -48,6 +48,10 @@ var common = require('./common');
4848var async = require ( 'async' ) ;
4949var uuid = require ( 'uuid' ) ;
5050var pg = require ( 'pg' ) ;
51+
52+ // empty import/invocation of the keepalive fix for node-postgres module
53+ require ( 'pg-ka-fix' ) ( ) ;
54+
5155var upgrade = require ( './upgrades' ) ;
5256
5357String . prototype . shortenPrefix = function ( ) {
@@ -362,8 +366,9 @@ exports.handler = function (event, context) {
362366 // add the file to the pending batch
363367 dynamoDB . updateItem ( item , function ( err , data ) {
364368 if ( err ) {
369+ var waitFor = Math . max ( Math . pow ( tryNumber , 2 ) * 10 , 200 ) ;
370+
365371 if ( err . code === provisionedThroughputExceeded ) {
366- var waitFor = Math . max ( Math . pow ( tryNumber , 2 ) * 10 , 200 ) ;
367372 console . log ( "Provisioned Throughput Exceeded on addition of " + s3info . prefix + " to pending batch " + thisBatchId + ". Trying again in " + waitFor + " ms" ) ;
368373 setTimeout ( callback , waitFor ) ;
369374 } else if ( err . code === conditionCheckFailed ) {
@@ -379,6 +384,7 @@ exports.handler = function (event, context) {
379384 }
380385 } ,
381386 TableName : configTable ,
387+ /* we need a consistent read here to ensure we get the latest batch ID */
382388 ConsistentRead : true
383389 } ;
384390 dynamoDB . getItem ( configReloadRequest , function ( err , data ) {
@@ -392,26 +398,23 @@ exports.handler = function (event, context) {
392398 callback ( err ) ;
393399 }
394400 } else {
395- /*
396- * reset the batch ID to the
397- * current marked batch
398- */
399- thisBatchId = data . Item . currentBatch . S ;
400-
401- /*
402- * we've not set proceed to
403- * true, so async will retry
404- */
405- console . log ( "Reload of Configuration Complete after attempting to write to Locked Batch " + thisBatchId + ". Attempt " + configReloads ) ;
406-
407- /*
408- * we can call into the callback
409- * immediately, as we probably
410- * just missed the pending batch
411- * processor's rotate of the
412- * configuration batch ID
413- */
414- callback ( ) ;
401+ if ( data . Item . currentBatch . S === thisBatchId ) {
402+ // we've obtained the same batch ID back from the configuration as we have now, meaning it hasn't yet rotated
403+ console . log ( "Batch " + thisBatchId + " still current after configuration reload attempt " + configReloads + ". Recycling in " + waitFor + " ms." ) ;
404+
405+ // because the batch hasn't been reloaded on the configuration, we'll backoff here for a moment to let that happen
406+ setTimeout ( callback , waitFor ) ;
407+ } else {
408+ // we've got an updated batch id, so use this in the next cycle of file add
409+ thisBatchId = data . Item . currentBatch . S ;
410+
411+ console . log ( "Obtained new Batch ID " + thisBatchId + " after configuration reload. Attempt " + configReloads ) ;
412+
413+ /*
414+ callback immediately, as we should now have a valid and open batch to use
415+ */
416+ callback ( ) ;
417+ }
415418 }
416419 } ) ;
417420 } else {
0 commit comments