Skip to content

Commit

Permalink
WIP: Modular: Re-implements streaming store as handler
Browse files Browse the repository at this point in the history
  • Loading branch information
DougReeder committed Mar 12, 2024
1 parent 0d0a798 commit cf0a0b5
Show file tree
Hide file tree
Showing 8 changed files with 1,250 additions and 81 deletions.
255 changes: 255 additions & 0 deletions lib/routes/S3Handler.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,255 @@
/* streaming storage to an S3-compatible service */

/* eslint-env node */
/* eslint-disable camelcase */
const express = require('express');
const { posix } = require('node:path');
const { HeadObjectCommand, S3Client, DeleteObjectCommand, GetObjectCommand, PutObjectCommand } = require('@aws-sdk/client-s3');
const normalizeETag = require('../util/normalizeETag');
const ParameterError = require('../util/ParameterError');
const { dirname, basename } = require('path');
const { createHash } = require('node:crypto');
const YAML = require('yaml');
const TimeoutError = require('../util/timeoutError');
const { Upload } = require('@aws-sdk/lib-storage');
const { pipeline } = require('node:stream/promises');

const PUT_TIMEOUT = 24 * 60 * 60 * 1000;
// const AUTH_PREFIX = 'remoteStorageAuth';
// const AUTHENTICATION_LOCAL_PASSWORD = 'authenticationLocalPassword';
// const USER_METADATA = 'userMetadata';
const FILE_PREFIX = 'remoteStorageBlob';
const EMPTY_DIRECTORY = { '@context': 'http://remotestorage.io/spec/folder-description', items: {} };

module.exports = function (endPoint = 'play.min.io', accessKey = 'Q3AM3UQ867SPQQA43P2F', secretKey = 'zuf+tfteSlswRu7BJ86wekitnifILbZam1KYY3TG', region = 'us-east-1') {
const sslEnabled = !/\blocalhost\b|\b127.0.0.1\b|\b10.0.0.2\b/.test(endPoint);
if (!endPoint.startsWith('http')) {
endPoint = (sslEnabled ? 'https://' : 'http://') + endPoint;
}
// if (!/:\d{1,5}\/?$/.test(endPoint)) {
// endPoint += ':9000';
// }

const s3client = new S3Client({
forcePathStyle: true,
region,
endpoint: endPoint,
sslEnabled,
credentials: {
accessKeyId: accessKey,
secretAccessKey: secretKey,
Version: 1
}
// logger: getLogger(),
});

const router = express.Router();
router.get('/:username/*',
async function (req, res, next) {
try {
const bucketName = req.params.username.toLowerCase();
const isDirectoryRequest = req.url.endsWith('/');
const s3Path = isDirectoryRequest ? posix.join(FILE_PREFIX, req.params[0]).slice(0, -1) : posix.join(FILE_PREFIX, req.params[0]);
let getParam;
if (req.get('If-None-Match')) {
getParam = { Bucket: bucketName, Key: s3Path, IfNoneMatch: req.get('If-None-Match') };
} else if (req.get('If-Match')) {
getParam = { Bucket: bucketName, Key: s3Path, IfMatch: req.get('If-Match') };
} else { // unconditional
getParam = { Bucket: bucketName, Key: s3Path };
}

const { Body, ETag, ContentType, ContentLength } = await s3client.send(new GetObjectCommand(getParam));
const isDirectory = ContentType === 'application/x.remotestorage-ld+json';
const contentType = isDirectory ? 'application/ld+json' : ContentType;
if (isDirectoryRequest ^ isDirectory) {
return res.status(409).end(); // Conflict
// return { status: 409, readStream: null, contentType, contentLength: null, ETag: null }; // Conflict
} else {
res.status(200).set('Content-Length', ContentLength).set('Content-Type', contentType).set('ETag', normalizeETag(ETag));
return pipeline(Body, res);
}
} catch (err) {
if (['NotFound', 'NoSuchKey'].includes(err.name)) {
return res.status(404).end(); // Not Found
// return next(Object.assign(new Error(`No file exists at path “${req.blobPath}”`), { status: 404 }));
} else if (err.name === 'PreconditionFailed') {
return res.status(412).end();
// return { status: 412 };
} else if (err.name === 'NotModified' || err.$metadata?.httpStatusCode === 304 || err.name === 304) {
return res.status(304).end();
} else {
return next(Object.assign(err, { status: 502 }));
}
}
}
);

router.put('/:username/*',
async function (req, res, next) {
try {
if (req.url.length === 0 || /\/\/|(^|\/)\.($|\/)|(^|\/)\.\.($|\/)|\0/.test(req.url)) {
return next(Object.assign(new ParameterError('A parameter value is bad'), { status: 400 }));
}
if (req.url.endsWith('/')) {
return res.status(409).end();
}
const bucketName = req.params.username.toLowerCase();
const s3Path = posix.join(FILE_PREFIX, req.params[0]);
let currentETag;
try {
const headResponse = await s3client.send(new HeadObjectCommand({ Bucket: bucketName, Key: s3Path }));
if (headResponse.ContentType === 'application/x.remotestorage-ld+json') {
return res.status(409).end();
}
currentETag = normalizeETag(headResponse.ETag);
} catch (err) {
if (err.$metadata?.httpStatusCode === 400 || err.name === '400') {
return next(Object.assign(new ParameterError('A parameter value is bad', { cause: err }), { status: 400 }));
} else if (!['NotFound', 'NoSuchKey'].includes(err.name)) {
return next(Object.assign(err, { status: 502 }));
}
}

if (req.get('If-Match') && req.get('If-Match') !== currentETag) {
return res.status(412).end();
} else if (req.get('If-None-Match') && req.get('If-None-Match') === currentETag) {
return res.status(412).end();
}

const contentLength = parseInt(req.get('Content-Length')) ? parseInt(req.get('Content-Length')) : undefined;
const ETag = await putBlob(bucketName, s3Path, req.get('Content-Type'), contentLength, req);
return res.status(201).set('ETag', ETag).end();
} catch (err) {
if (err.name === 'TimeoutError') {
return res.status(504).end();
} else if (err.$metadata?.httpStatusCode === 400 || err.name === '400' || err.name === 'NoSuchBucket') {
return next(Object.assign(new ParameterError('A parameter value is bad', { cause: err }), { status: 400 }));
} else {
return next(Object.assign(err, { status: 502 }));
}
}
});

router.delete('/:username/*',
async function (req, res, next) {
try {
const bucketName = req.params.username.toLowerCase();
const s3Path = posix.join(FILE_PREFIX, req.url.slice(1 + bucketName.length));
let currentETag;
try {
const headResponse = await s3client.send(new HeadObjectCommand({ Bucket: bucketName, Key: s3Path }));
if (headResponse.ContentType === 'application/x.remotestorage-ld+json') {
return res.status(409).end();
}
currentETag = normalizeETag(headResponse.ETag);

if (req.get('If-Match') && req.get('If-Match') !== currentETag) {
return res.status(412).end();
} else if (req.get('If-None-Match') && req.get('If-None-Match') === currentETag) {
return res.status(412).end();
}
/* const { DeleteMarker, VersionId } = */ await s3client.send(new DeleteObjectCommand({ Bucket: bucketName, Key: s3Path }));
} catch (err) {
if (['NotFound', 'NoSuchKey'].includes(err.name)) {
return res.status(404).end();
} else if (err.$metadata?.httpStatusCode === 400 || err.name === '400' || /\bBucket\b/.test(err.message)) {
return next(Object.assign(new ParameterError('A parameter value is bad', { cause: err }), { status: 400 }));
} else {
return next(Object.assign(err, { status: 502 }));
}
}

// updates all ancestor directories
let itemETag = null;
let itemPath = s3Path;
do {
let directory;
try {
directory = await readJson(bucketName, dirname(itemPath));
} catch (err) {
if (!['NotFound', 'NoSuchKey'].includes(err.name)) { return next(Object.assign(err, { status: 502 })); }
}
if (!(directory?.items instanceof Object)) {
directory = structuredClone(EMPTY_DIRECTORY);
// TODO: scan for existing blobs
}
if (typeof itemETag === 'string') { // item is folder
if (itemETag.length > 0) {
directory.items[basename(itemPath) + '/'] = { ETag: itemETag };
} else {
delete directory.items[basename(itemPath) + '/'];
}
} else {
delete directory.items[basename(itemPath)];
}
if (Array.from(Object.keys(directory.items)).length > 0) {
const dirJSON = JSON.stringify(directory);
await putBlob(bucketName, dirname(itemPath), 'application/x.remotestorage-ld+json', dirJSON.length, dirJSON);

if (dirname(itemPath) !== FILE_PREFIX) {
// calculates ETag for the folder
const hash = createHash('md5');
for (const itemMeta of Object.values(directory.items)) {
hash.update(itemMeta?.ETag?.replace(/^W\/"|^"|"$/g, '') || '');
}
itemETag = '"' + hash.digest('hex') + '"';
}
} else { // that was the last blob in the folder, so delete the folder
/* const { DeleteMarker, VersionId } = */ await s3client.send(new DeleteObjectCommand({ Bucket: bucketName, Key: dirname(itemPath) }));
itemETag = '';
}

itemPath = dirname(itemPath);
} while (itemPath.length > FILE_PREFIX.length);

if (currentETag) {
res.set('ETag', normalizeETag(currentETag));
}
res.status(204).end();
} catch (err) {
if (err.$metadata?.httpStatusCode === 400 || err.name === '400') {
return next(Object.assign(new ParameterError('A parameter value is bad', { cause: err }), { status: 400 }));
} else {
return next(Object.assign(err, { status: 502 }));
}
}
}
);

async function putBlob (bucketName, s3Path, contentType, contentLength, contentStream) {
if (contentLength <= 500_000_000) {
const putPrms = s3client.send(new PutObjectCommand(
{ Bucket: bucketName, Key: s3Path, Body: contentStream, ContentType: contentType, ContentLength: contentLength }));
const timeoutPrms = new Promise((_resolve, reject) =>
setTimeout(reject, PUT_TIMEOUT, new TimeoutError(`PUT of ${contentLength / 1_000_000} MB to ${bucketName} ${s3Path} took more than ${Math.round(PUT_TIMEOUT / 60_000)} minutes`)));
const putResponse = await Promise.race([putPrms, timeoutPrms]);
return normalizeETag(putResponse.ETag);
} else {
const parallelUpload = new Upload({
client: s3client,
params: { Bucket: bucketName, Key: s3Path, Body: contentStream, ContentType: contentType, ContentLength: contentLength }
});

parallelUpload.on('httpUploadProgress', (progress) => {
console.debug(bucketName, s3Path, `part ${progress.part} ${progress.loaded} / ${progress.total} bytes`);
});

return normalizeETag((await parallelUpload.done()).ETag);
}
}

async function readYaml (bucketName, s3Path) { // eslint-disable-line no-unused-vars
const { Body } = await s3client.send(new GetObjectCommand({ Bucket: bucketName, Key: s3Path }));
const string = (await Body.setEncoding('utf-8').toArray())[0];
return YAML.parse(string);
}

async function readJson (bucketName, s3Path) {
const { Body } = await s3client.send(new GetObjectCommand({ Bucket: bucketName, Key: s3Path }));
const string = (await Body.setEncoding('utf-8').toArray())[0];
return JSON.parse(string);
}

return router;
};
101 changes: 29 additions & 72 deletions lib/streaming_stores/S3.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ const {
PutObjectCommand,
CreateBucketCommand,
DeleteBucketCommand,
GetObjectCommand, PutBucketVersioningCommand, DeleteObjectsCommand, ListObjectVersionsCommand,
GetObjectCommand, PutBucketVersioningCommand, ListObjectVersionsCommand,
DeleteObjectCommand, HeadObjectCommand
} = require('@aws-sdk/client-s3');
const { Upload } = require('@aws-sdk/lib-storage');
Expand All @@ -36,9 +36,9 @@ class S3 {
if (!endPoint.startsWith('http')) {
endPoint = (sslEnabled ? 'https://' : 'http://') + endPoint;
}
if (!/:\d{1,5}\/?$/.test(endPoint)) {
endPoint += ':9000';
}
// if (!/:\d{1,5}\/?$/.test(endPoint)) {
// endPoint += ':9000';
// }

this.#S3Client = new S3Client({
forcePathStyle: true,
Expand Down Expand Up @@ -107,90 +107,47 @@ class S3 {
* @returns {Promise<number>} number of files deleted
*/
async deleteUser (username) {
return new Promise((resolve, reject) => {
const DELETE_GROUP_SIZE = 100;
const objectVersions = [];
let numRequested = 0; let numResolved = 0; let isReceiveComplete = false;

const removeObjectVersions = async () => {
let group;
try {
if (objectVersions.length > 0) {
group = objectVersions.slice(0);
objectVersions.length = 0;
numRequested += group.length;
const { Errors } = await this.#S3Client.send(new DeleteObjectsCommand({ Bucket: username, Delete: { Objects: group } }));
numResolved += group.length;
if (Errors?.length > 0) {
getLogger().error('errors deleting object versions:', YAML.stringify(Errors));
}
}
} catch (err) {
if (err.name === 'NoSuchBucket') {
resolve(numResolved);
} else if (err.name === 'NotImplemented') { // OpenIO
getLogger().warning('while deleting object versions: ' + err);
for (const objectVersion of group) {
const { Errors } = await this.#S3Client.send(new DeleteObjectCommand({ Bucket: username, Key: objectVersion.Key, VersionId: objectVersion.VersionId }));
if (Errors?.length > 0) {
getLogger().error('errors deleting object version:', YAML.stringify(Errors));
}
++numResolved;
}
} else {
reject(Object.assign(new Error('while deleting object versions: ' + err), { cause: err }));
}
}
try {
if (isReceiveComplete && numResolved === numRequested) {
// will fail if any object versions remain
await this.#S3Client.send(new DeleteBucketCommand({ Bucket: username }));
resolve(numResolved);
}
} catch (err) {
if (err.name === 'NoSuchBucket') {
resolve(numResolved);
} else {
reject(new Error('while deleting bucket: ' + err));
await new Promise((resolve, reject) => {
const bucketName = username.toLowerCase();

const deleteItems = async items => {
for (const item of items) {
try {
/* const { DeleteMarker } = */ await this.#S3Client.send(new DeleteObjectCommand({ Bucket: bucketName, Key: item.Key, VersionId: item.VersionId }));
// console.log(`deleted ${item.Key} ${DeleteMarker}`);
} catch (err) {
console.warn('while deleting', bucketName, item.Key, item.VersionID);
}
}
};

const removeObjectVersionsAndBucket = async err => {
const pageObjectVersions = async (KeyMarker) => {
try {
isReceiveComplete = true;
await removeObjectVersions();
if (err) {
reject(err);
}
} catch (err2) {
reject(err || err2);
}
};
const { Versions, DeleteMarkers, IsTruncated, NextKeyMarker } = await this.#S3Client.send(new ListObjectVersionsCommand({ Bucket: bucketName, ...(KeyMarker ? { KeyMarker } : null) }));

let keyMarker = null;
const pageVersions = async () => {
try {
const { Versions, IsTruncated, NextKeyMarker } = await this.#S3Client.send(new ListObjectVersionsCommand({ Bucket: username, KeyMarker: keyMarker /*, MaxKeys: DELETE_GROUP_SIZE */ }));
keyMarker = NextKeyMarker;
objectVersions.push(...Versions);
isReceiveComplete = !IsTruncated;
if (objectVersions.length >= DELETE_GROUP_SIZE || !IsTruncated) {
await removeObjectVersions();
if (typeof Versions?.[Symbol.iterator] === 'function') {
await deleteItems(Versions);
}
if (typeof DeleteMarkers?.[Symbol.iterator] === 'function') {
await deleteItems(DeleteMarkers);
}

if (IsTruncated) {
return pageVersions();
return pageObjectVersions(NextKeyMarker).catch(reject);
} else {
await this.#S3Client.send(new DeleteBucketCommand({ Bucket: username }));
resolve();
}
} catch (err) {
if (err.name === 'NoSuchBucket') {
resolve(numResolved);
resolve();
} else {
return removeObjectVersionsAndBucket(err);
reject(err);
}
}
};

pageVersions();
pageObjectVersions(undefined).catch(reject);
});
}

Expand Down
Loading

0 comments on commit cf0a0b5

Please sign in to comment.