@@ -644,13 +644,13 @@ WalletService.prototype._endpointResyncAddresses = function() {
644644 if ( ! oldAddresses ) {
645645 return res . status ( 404 ) . send ( 'Not found' ) ;
646646 }
647-
647+
648648 self . _removeWallet ( walletId , function ( err ) {
649649
650650 if ( err ) {
651651 return utils . sendError ( err , res ) ;
652652 }
653-
653+
654654 self . _createWallet ( walletId , function ( ) {
655655
656656 var jobId = utils . generateJobId ( ) ;
@@ -713,48 +713,60 @@ WalletService.prototype._endpointGetTransactions = function() {
713713 walletId : walletId
714714 } ;
715715
716- var missingTxidCount = 0 ;
717- var transform = new Transform ( { objectMode : true , highWaterMark : 1000000 } ) ;
716+ var missingTxidCount = 0 ;
717+ var txStream = new Transform ( { objectMode : true , highWaterMark : 1000000 } ) ;
718718 //txids are sent in and the actual tx's are found here
719- transform . _transform = function ( chunk , enc , callback ) {
719+ txStream . _transform = function ( chunk , enc , callback ) {
720720
721721 var txid = self . _encoding . decodeWalletTransactionKey ( chunk ) . txid . toString ( 'hex' ) ;
722722
723723 if ( txid . length !== 64 || txid === '0000000000000000000000000000000000000000000000000000000000000000' ) {
724724 missingTxidCount ++ ;
725- log . error ( 'missingTxidCount : ', missingTxidCount ) ;
725+ txStream . emit ( ' error' , new Error ( 'Chunk : ' + chunk . toString ( 'hex' ) + ' did not contain a txid.' ) ) ;
726726 return callback ( ) ;
727727 }
728728
729729 self . _getTransactionFromDb ( options , txid , function ( err , tx ) {
730730
731+ err = new Error ( 'this is a test error' + txid ) ;
732+
731733 if ( err ) {
732734 log . error ( err ) ;
733- transform . unpipe ( ) ;
735+ txStream . emit ( 'error' , err ) ;
734736 return callback ( ) ;
735737 }
736738
737739 var formattedTx = utils . toJSONL ( self . _formatTransaction ( tx ) ) ;
738- transform . push ( formattedTx ) ;
740+ txStream . push ( formattedTx ) ;
739741 callback ( ) ;
740742
741743 } ) ;
742744
743745 } ;
744746
745- transform . _flush = function ( callback ) {
747+ txStream . on ( 'error' , function ( err ) {
748+ log . error ( err ) ;
749+ utils . sendError ( err , res ) ;
750+ txStream . unpipe ( ) ;
751+ } ) ;
752+
753+ txStream . _flush = function ( callback ) {
746754 self . db . resumeSync ( ) ;
747755 callback ( ) ;
748756 } ;
749757
750758 var encodingFn = self . _encoding . encodeWalletTransactionKey . bind ( self . _encoding ) ;
751- var stream = self . db . createKeyStream ( self . _getSearchParams ( encodingFn , options ) ) ;
759+ var dbStream = self . db . createKeyStream ( self . _getSearchParams ( encodingFn , options ) ) ;
752760
753- stream . on ( 'close' , function ( ) {
754- stream . unpipe ( ) ;
761+ dbStream . on ( 'close' , function ( ) {
762+ dbStream . unpipe ( ) ;
755763 } ) ;
756764
757- stream . pipe ( transform ) . pipe ( res ) ;
765+ dbStream . pipe ( txStream ) . pipe ( res ) ;
766+
767+ res . on ( 'end' , function ( ) {
768+ console . log ( 'res has ended' ) ;
769+ } ) ;
758770 } ) ;
759771 } ) ;
760772 } ;
@@ -1373,7 +1385,7 @@ WalletService.prototype._setupWriteRoutes = function(app) {
13731385 v . checkAddresses ,
13741386 s . _endpointPostAddresses ( )
13751387 ) ;
1376- app . put ( '/wallets/:walletId/addresses/resync' ,
1388+ app . put ( '/wallets/:walletId/addresses/resync' ,
13771389 s . _endpointResyncAddresses ( )
13781390 ) ;
13791391} ;
0 commit comments