diff --git a/index.js b/index.js index 55e7517..38ae805 100644 --- a/index.js +++ b/index.js @@ -5,6 +5,7 @@ const path = require('path'); const mime = require('mime'); const _ = require('lodash'); const concat = require('concat-stream'); +const buildProgressStream = require('./standalone/build-progress-stream'); @@ -128,20 +129,20 @@ module.exports = function SkipperGridFS(globalOptions) { adapter.receive = (opts) => { const receiver__ = WritableStream({ objectMode: true }); - receiver__.once('done', (client, done) => { - if (client) client.close(); - if (done) done(); - }); + // if onProgress handler was provided, bind an event automatically: + if (_.isFunction(options.onProgress)) { + receiver__.on('progress', options.onProgress); + } - receiver__.once('error', (error, client, done) => { - if (client) client.close(); - if (done) done(error); - }); + // Track the progress of all file uploads that pass through this receiver + // through one or more attached Upstream(s). + receiver__._files = []; - receiver__.write = (__newFile, encoding, done) => { + receiver__._write = (__newFile, encoding, done) => { client(options.uri, options.mongoOptions, (error, client) => { if (error) { - receiver__.emit('error', error, client, done); + if (client) client.close(); + if (done) done(error); } const fd = __newFile.fd; @@ -155,21 +156,37 @@ module.exports = function SkipperGridFS(globalOptions) { }, contentType: mime.getType(fd) }); - - __newFile.once('close', () => { - receiver__.emit('done', client, done); - }); - __newFile.once('error', (error) => { - receiver__.emit('error', error, client, done); - }); + outs__.once('finish', () => { - receiver__.emit('done', client, done); + receiver__.emit('writefile', __newFile); + if (client) client.close(); + done(); + }); + + outs__.once('E_EXCEEDS_UPLOAD_LIMIT', (err) => { + //Aborts GridFS Upload Stream cleaning all invalid chunks (garbage collector) + outs__.abort(() => { + if (client) client.close(); + return done(err); + }); }); - outs__.once('error', (error) => { - receiver__.emit('error', error, client, done); + + outs__.once('error', (err) => { + //Aborts GridFS Upload Stream cleaning all invalid chunks (garbage collector) + outs__.abort(() => { + if (client) client.close(); + return done(err); + }); }); - __newFile.pipe(outs__); + // Create another stream that simply keeps track of the progress of the file stream and emits `progress` events + // on the receiver. + const __progress__ = buildProgressStream(options, __newFile, receiver__, outs__, client, done); + + + __newFile + .pipe(__progress__) + .pipe(outs__); }); } return receiver__; diff --git a/package.json b/package.json index 93b5ad7..ee97a0c 100644 --- a/package.json +++ b/package.json @@ -24,12 +24,12 @@ }, "homepage": "https://github.com/willhuang85/skipper-gridfs", "devDependencies": { - "skipper-adapter-tests": "github:willhuang85/skipper-adapter-tests#master" + "skipper-adapter-tests": "^2.0.0" }, "dependencies": { "concat-stream": "^1.6.2", "lodash": "^4.17.11", "mime": "^2.3.1", - "mongodb": "^3.1.8" + "mongodb": "^3.6.5" } -} \ No newline at end of file +} diff --git a/standalone/build-progress-stream.js b/standalone/build-progress-stream.js new file mode 100644 index 0000000..f305f51 --- /dev/null +++ b/standalone/build-progress-stream.js @@ -0,0 +1,139 @@ +/** +* Module dependencies +*/ + +var _ = require('@sailshq/lodash'); +var TransformStream = require('stream').Transform; + + + +/** +* [exports description] +* @param {[type]} options [description] +* @param {[type]} __newFile [description] +* @param {[type]} receiver__ [description] +* @param {[type]} outs__ [description] +* @return {[type]} [description] +*/ +module.exports = function buildProgressStream (options, __newFile, receiver__, outs__) { + options = options || {}; + var log = options.log || function noOpLog(){}; + + // Generate a progress stream and unique id for this file + // then pipe the bytes down to the outs___ stream + // We will pipe the incoming file stream to this, which will + var localID = _.uniqueId(); + var guessedTotal = 0; + var writtenSoFar = 0; + var __progress__ = new TransformStream(); + __progress__._transform = function(chunk, enctype, next) { + + // Update the guessedTotal to make % estimate + // more accurate: + guessedTotal += chunk.length; + writtenSoFar += chunk.length; + + // Do the actual "writing", which in our case will pipe + // the bytes to the outs___ stream that writes to GridFS + this.push(chunk); + + // Emit an event that will calculate our total upload + // progress and determine whether we're within quota + this.emit('progress', { + id: localID, + fd: __newFile.fd, + skipperFd: (__newFile.skipperFd || (_.isString(__newFile.fd) ? __newFile.fd : undefined)), + name: __newFile.name, + written: writtenSoFar, + total: guessedTotal, + percent: (writtenSoFar / guessedTotal) * 100 | 0 + }); + next(); + }; + + // This event is fired when a single file stream emits a progress event. + // Each time we receive a file, we must recalculate the TOTAL progress + // for the aggregate file upload. + // + // events emitted look like: + /* + { + percentage: 9.05, + transferred: 949624, + length: 10485760, + remaining: 9536136, + eta: 10, + runtime: 0, + delta: 295396, + speed: 949624 + } + */ + __progress__.on('progress', function singleFileProgress(milestone) { + // Lookup or create new object to track file progress + var currentFileProgress = _.find(receiver__._files, { + id: localID + }); + if (currentFileProgress) { + currentFileProgress.written = milestone.written; + currentFileProgress.total = milestone.total; + currentFileProgress.percent = milestone.percent; + currentFileProgress.stream = __newFile; + } else { + currentFileProgress = { + id: localID, + fd: __newFile.fd, + skipperFd: (__newFile.skipperFd || (_.isString(__newFile.fd) ? __newFile.fd : undefined)), + name: __newFile.filename, + written: milestone.written, + total: milestone.total, + percent: milestone.percent, + stream: __newFile + }; + receiver__._files.push(currentFileProgress); + } + //////////////////////////////////////////////////////////////// + + + // Recalculate `totalBytesWritten` so far for this receiver instance + // (across ALL OF ITS FILES) + // using the sum of all bytes written to each file in `receiver__._files` + var totalBytesWritten = _.reduce(receiver__._files, function(memo, status) { + memo += status.written; + return memo; + }, 0); + + log(currentFileProgress.percent, '::', currentFileProgress.written, '/', currentFileProgress.total, ' (file #' + currentFileProgress.id + ' :: ' + /*'update#'+counter*/ '' + ')'); //receiver__._files.length+' files)'); + + // Emit an event on the receiver. Someone using Skipper may listen for this to show + // a progress bar, for example. + receiver__.emit('progress', currentFileProgress); + + // and then enforce its `maxBytes`. + if (options.maxBytes && totalBytesWritten >= options.maxBytes) { + + var err = new Error(); + err.code = 'E_EXCEEDS_UPLOAD_LIMIT'; + err.name = 'Upload Error'; + err.maxBytes = options.maxBytes; + err.written = totalBytesWritten; + err.message = 'Upload limit of ' + err.maxBytes + ' bytes exceeded (' + err.written + ' bytes written)'; + + // Stop listening for progress events + __progress__.removeAllListeners('progress'); + // Unpipe the progress stream, which feeds the GridFS stream, so we don't keep dumping to GridFS + process.nextTick(function() { + __progress__.unpipe(); + }); + + // In skipper-disk, this is the point where Garbage Collecting is done. + // In skipper-gridfs, the job is done inside the outs__ error listeners using GridFSBucketWriteStream.abort() (see index.js) + + outs__.emit('E_EXCEEDS_UPLOAD_LIMIT',err); + + + return; + } + }); + + return __progress__; +}; \ No newline at end of file