Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 38 additions & 21 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ const path = require('path');
const mime = require('mime');
const _ = require('lodash');
const concat = require('concat-stream');
const buildProgressStream = require('./standalone/build-progress-stream');



Expand Down Expand Up @@ -128,20 +129,20 @@ module.exports = function SkipperGridFS(globalOptions) {
adapter.receive = (opts) => {
const receiver__ = WritableStream({ objectMode: true });

receiver__.once('done', (client, done) => {
if (client) client.close();
if (done) done();
});
// if onProgress handler was provided, bind an event automatically:
if (_.isFunction(options.onProgress)) {
receiver__.on('progress', options.onProgress);
}

receiver__.once('error', (error, client, done) => {
if (client) client.close();
if (done) done(error);
});
// Track the progress of all file uploads that pass through this receiver
// through one or more attached Upstream(s).
receiver__._files = [];

receiver__.write = (__newFile, encoding, done) => {
receiver__._write = (__newFile, encoding, done) => {
client(options.uri, options.mongoOptions, (error, client) => {
if (error) {
receiver__.emit('error', error, client, done);
if (client) client.close();
if (done) done(error);
}

const fd = __newFile.fd;
Expand All @@ -155,21 +156,37 @@ module.exports = function SkipperGridFS(globalOptions) {
},
contentType: mime.getType(fd)
});

__newFile.once('close', () => {
receiver__.emit('done', client, done);
});
__newFile.once('error', (error) => {
receiver__.emit('error', error, client, done);
});

outs__.once('finish', () => {
receiver__.emit('done', client, done);
receiver__.emit('writefile', __newFile);
if (client) client.close();
done();
});

outs__.once('E_EXCEEDS_UPLOAD_LIMIT', (err) => {
//Aborts GridFS Upload Stream cleaning all invalid chunks (garbage collector)
outs__.abort(() => {
if (client) client.close();
return done(err);
});
});
outs__.once('error', (error) => {
receiver__.emit('error', error, client, done);

outs__.once('error', (err) => {
//Aborts GridFS Upload Stream cleaning all invalid chunks (garbage collector)
outs__.abort(() => {
if (client) client.close();
return done(err);
});
});

__newFile.pipe(outs__);
// Create another stream that simply keeps track of the progress of the file stream and emits `progress` events
// on the receiver.
const __progress__ = buildProgressStream(options, __newFile, receiver__, outs__, client, done);


__newFile
.pipe(__progress__)
.pipe(outs__);
});
}
return receiver__;
Expand Down
6 changes: 3 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@
},
"homepage": "https://github.com/willhuang85/skipper-gridfs",
"devDependencies": {
"skipper-adapter-tests": "github:willhuang85/skipper-adapter-tests#master"
"skipper-adapter-tests": "^2.0.0"
},
"dependencies": {
"concat-stream": "^1.6.2",
"lodash": "^4.17.11",
"mime": "^2.3.1",
"mongodb": "^3.1.8"
"mongodb": "^3.6.5"
}
}
}
139 changes: 139 additions & 0 deletions standalone/build-progress-stream.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
/**
* Module dependencies
*/

var _ = require('@sailshq/lodash');
var TransformStream = require('stream').Transform;



/**
* [exports description]
* @param {[type]} options [description]
* @param {[type]} __newFile [description]
* @param {[type]} receiver__ [description]
* @param {[type]} outs__ [description]
* @return {[type]} [description]
*/
module.exports = function buildProgressStream (options, __newFile, receiver__, outs__) {
options = options || {};
var log = options.log || function noOpLog(){};

// Generate a progress stream and unique id for this file
// then pipe the bytes down to the outs___ stream
// We will pipe the incoming file stream to this, which will
var localID = _.uniqueId();
var guessedTotal = 0;
var writtenSoFar = 0;
var __progress__ = new TransformStream();
__progress__._transform = function(chunk, enctype, next) {

// Update the guessedTotal to make % estimate
// more accurate:
guessedTotal += chunk.length;
writtenSoFar += chunk.length;

// Do the actual "writing", which in our case will pipe
// the bytes to the outs___ stream that writes to GridFS
this.push(chunk);

// Emit an event that will calculate our total upload
// progress and determine whether we're within quota
this.emit('progress', {
id: localID,
fd: __newFile.fd,
skipperFd: (__newFile.skipperFd || (_.isString(__newFile.fd) ? __newFile.fd : undefined)),
name: __newFile.name,
written: writtenSoFar,
total: guessedTotal,
percent: (writtenSoFar / guessedTotal) * 100 | 0
});
next();
};

// This event is fired when a single file stream emits a progress event.
// Each time we receive a file, we must recalculate the TOTAL progress
// for the aggregate file upload.
//
// events emitted look like:
/*
{
percentage: 9.05,
transferred: 949624,
length: 10485760,
remaining: 9536136,
eta: 10,
runtime: 0,
delta: 295396,
speed: 949624
}
*/
__progress__.on('progress', function singleFileProgress(milestone) {
// Lookup or create new object to track file progress
var currentFileProgress = _.find(receiver__._files, {
id: localID
});
if (currentFileProgress) {
currentFileProgress.written = milestone.written;
currentFileProgress.total = milestone.total;
currentFileProgress.percent = milestone.percent;
currentFileProgress.stream = __newFile;
} else {
currentFileProgress = {
id: localID,
fd: __newFile.fd,
skipperFd: (__newFile.skipperFd || (_.isString(__newFile.fd) ? __newFile.fd : undefined)),
name: __newFile.filename,
written: milestone.written,
total: milestone.total,
percent: milestone.percent,
stream: __newFile
};
receiver__._files.push(currentFileProgress);
}
////////////////////////////////////////////////////////////////


// Recalculate `totalBytesWritten` so far for this receiver instance
// (across ALL OF ITS FILES)
// using the sum of all bytes written to each file in `receiver__._files`
var totalBytesWritten = _.reduce(receiver__._files, function(memo, status) {
memo += status.written;
return memo;
}, 0);

log(currentFileProgress.percent, '::', currentFileProgress.written, '/', currentFileProgress.total, ' (file #' + currentFileProgress.id + ' :: ' + /*'update#'+counter*/ '' + ')'); //receiver__._files.length+' files)');

// Emit an event on the receiver. Someone using Skipper may listen for this to show
// a progress bar, for example.
receiver__.emit('progress', currentFileProgress);

// and then enforce its `maxBytes`.
if (options.maxBytes && totalBytesWritten >= options.maxBytes) {

var err = new Error();
err.code = 'E_EXCEEDS_UPLOAD_LIMIT';
err.name = 'Upload Error';
err.maxBytes = options.maxBytes;
err.written = totalBytesWritten;
err.message = 'Upload limit of ' + err.maxBytes + ' bytes exceeded (' + err.written + ' bytes written)';

// Stop listening for progress events
__progress__.removeAllListeners('progress');
// Unpipe the progress stream, which feeds the GridFS stream, so we don't keep dumping to GridFS
process.nextTick(function() {
__progress__.unpipe();
});

// In skipper-disk, this is the point where Garbage Collecting is done.
// In skipper-gridfs, the job is done inside the outs__ error listeners using GridFSBucketWriteStream.abort() (see index.js)

outs__.emit('E_EXCEEDS_UPLOAD_LIMIT',err);


return;
}
});

return __progress__;
};