Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 40 additions & 1 deletion server/routes/generalDownloadRoutes.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ const generalDownloadStorage = require('../services/generalDownloadStorage');
const router = express.Router();
const generalDownloadRateLimit = rateLimit({
windowMs: Math.max(60_000, Number(process.env.HOMEBRAIN_GENERAL_DOWNLOAD_RATE_LIMIT_WINDOW_MS || 60_000)),
limit: Math.max(5, Number(process.env.HOMEBRAIN_GENERAL_DOWNLOAD_RATE_LIMIT_MAX || 60)),
limit: Math.max(5, Number(process.env.HOMEBRAIN_GENERAL_DOWNLOAD_RATE_LIMIT_MAX || 300)),
standardHeaders: true,
legacyHeaders: false,
message: {
Expand Down Expand Up @@ -50,6 +50,22 @@ router.get('/file', async (req, res) => {
}
});

router.get('/upload-status', async (req, res) => {
try {
const info = await generalDownloadStorage.getDownloadUploadInfo(req.query.path);
return res.status(200).json({
success: true,
...info
});
} catch (error) {
console.error('GET /api/admin/general-downloads/upload-status - Error:', error.message);
return res.status(error.status || 500).json({
success: false,
message: error.message || 'Failed to inspect download upload state'
});
}
});

router.put('/upload', async (req, res) => {
try {
const uploaded = await generalDownloadStorage.writeDownloadStream(req.query.path, req, {
Expand All @@ -69,4 +85,27 @@ router.put('/upload', async (req, res) => {
}
});

router.put('/upload-chunk', async (req, res) => {
try {
const uploaded = await generalDownloadStorage.writeDownloadChunk(req.query.path, req, {
offset: req.query.offset,
totalBytes: req.query.totalBytes,
expectedBytes: req.get('content-length'),
complete: String(req.query.complete || '').toLowerCase() === 'true'
});

return res.status(uploaded.complete ? 201 : 202).json({
success: true,
file: uploaded
});
} catch (error) {
console.error('PUT /api/admin/general-downloads/upload-chunk - Error:', error.message);
return res.status(error.status || 500).json({
success: false,
message: error.message || 'Failed to upload download chunk',
expectedOffset: error.expectedOffset
});
}
});

module.exports = router;
174 changes: 174 additions & 0 deletions server/services/generalDownloadStorage.js
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,33 @@ function parseMaxBytes(value) {
return parsed;
}

function parseRequiredByteCount(value, label) {
const parsed = Number(value);
if (!Number.isSafeInteger(parsed) || parsed < 0) {
throw Object.assign(new Error(`${label} must be a non-negative integer byte count`), { status: 400 });
}
return parsed;
}

function getChunkUploadTempPath(absolutePath) {
return path.join(path.dirname(absolutePath), `.${path.basename(absolutePath)}.chunked-upload`);
}

async function hashFile(filePath) {
const hash = crypto.createHash('sha256');
let bytes = 0;

for await (const chunk of fs.createReadStream(filePath)) {
bytes += chunk.length;
hash.update(chunk);
}

return {
bytes,
sha256: hash.digest('hex')
};
}

async function writeDownloadStream(inputPath, readable, options = {}) {
const { root, relativePath, absolutePath } = resolveDownloadPath(inputPath);
const maxBytes = parseMaxBytes(options.maxBytes || process.env.GENERAL_DOWNLOADS_MAX_UPLOAD_BYTES);
Expand Down Expand Up @@ -132,6 +159,118 @@ async function writeDownloadStream(inputPath, readable, options = {}) {
};
}

async function writeDownloadChunk(inputPath, readable, options = {}) {
const { root, relativePath, absolutePath } = resolveDownloadPath(inputPath);
const maxBytes = parseMaxBytes(options.maxBytes || process.env.GENERAL_DOWNLOADS_MAX_UPLOAD_BYTES);
const offset = parseRequiredByteCount(options.offset, 'offset');
const totalBytes = parseRequiredByteCount(options.totalBytes, 'totalBytes');
const expectedBytes = parseRequiredByteCount(options.expectedBytes, 'content-length');
const completeRequested = Boolean(options.complete);

if (totalBytes > maxBytes) {
throw Object.assign(new Error(`Upload exceeds the ${maxBytes} byte limit`), { status: 413 });
}

if (offset > totalBytes || offset + expectedBytes > totalBytes) {
throw Object.assign(new Error('Chunk byte range exceeds the declared upload size'), { status: 400 });
}

await fs.promises.mkdir(path.dirname(absolutePath), { recursive: true });

const tempPath = getChunkUploadTempPath(absolutePath);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Isolate chunk uploads per session

Using a single deterministic staging path for every upload to the same target allows concurrent sessions to write into the same temp file. In writeDownloadChunk, two clients uploading the same path (or one retrying from offset=0 while another is in progress) will share tempPath, and the later request can truncate/overwrite bytes from the other session, producing a corrupted finalized file even though both chunk requests individually return success.

Useful? React with 👍 / 👎.

if (offset > 0) {
let currentSize = 0;
try {
currentSize = (await fs.promises.stat(tempPath)).size;
} catch (error) {
if (error?.code !== 'ENOENT') {
throw error;
}
}

if (currentSize !== offset) {
throw Object.assign(
new Error(`Chunk offset mismatch: expected ${currentSize}, received ${offset}`),
{ status: 409, expectedOffset: currentSize }
);
}
}

let bytes = 0;
const counter = new Transform({
transform(chunk, _encoding, callback) {
bytes += chunk.length;
if (bytes > expectedBytes) {
callback(Object.assign(new Error('Chunk exceeded its declared content-length'), { status: 400 }));
return;
}
callback(null, chunk);
}
});

try {
await pipeline(
readable,
counter,
fs.createWriteStream(tempPath, {
flags: offset === 0 ? 'w' : 'r+',
start: offset
})
);
} catch (error) {
if (offset === 0) {
await fs.promises.rm(tempPath, { force: true }).catch(() => {});
} else {
await fs.promises.truncate(tempPath, offset).catch(() => {});
}
throw error;
}

if (bytes !== expectedBytes) {
await fs.promises.truncate(tempPath, offset).catch(() => {});
throw Object.assign(new Error(`Chunk size mismatch: expected ${expectedBytes}, received ${bytes}`), { status: 400 });
}

const nextOffset = offset + bytes;
if (!completeRequested) {
return {
root,
relativePath,
absolutePath,
tempPath,
complete: false,
offset,
bytes,
nextOffset,
totalBytes
};
}

const stat = await fs.promises.stat(tempPath);
if (nextOffset !== totalBytes || stat.size !== totalBytes) {
throw Object.assign(new Error('Upload cannot be completed before all bytes are received'), { status: 400 });
}

const digest = await hashFile(tempPath);
if (digest.bytes !== totalBytes) {
throw Object.assign(new Error('Upload size changed while finalizing'), { status: 500 });
}

await fs.promises.rename(tempPath, absolutePath);

return {
root,
relativePath,
absolutePath,
complete: true,
offset,
bytes,
nextOffset,
totalBytes,
sha256: digest.sha256
};
}

async function getDownloadFileInfo(inputPath) {
const { root, relativePath, absolutePath } = resolveDownloadPath(inputPath);

Expand Down Expand Up @@ -162,12 +301,47 @@ async function getDownloadFileInfo(inputPath) {
}
}

async function getDownloadUploadInfo(inputPath) {
const file = await getDownloadFileInfo(inputPath);
const tempPath = getChunkUploadTempPath(file.absolutePath);

try {
const stat = await fs.promises.stat(tempPath);
return {
file,
staging: {
exists: true,
isFile: stat.isFile(),
size: stat.size,
updatedAt: stat.mtime.toISOString(),
absolutePath: tempPath
}
};
} catch (error) {
if (error?.code === 'ENOENT') {
return {
file,
staging: {
exists: false,
isFile: false,
size: 0,
updatedAt: null,
absolutePath: tempPath
}
};
}
throw error;
}
}

module.exports = {
DEFAULT_MAX_UPLOAD_BYTES,
ensureGeneralDownloadsRoot,
getDownloadFileInfo,
getDownloadUploadInfo,
getGeneralDownloadsRoot,
normalizeDownloadPath,
resolveDownloadPath,
writeDownloadChunk,
writeDownloadStream
};
101 changes: 89 additions & 12 deletions server/tests/generalDownloadStorage.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,22 @@ const test = require('node:test');

const generalDownloadStorage = require('../services/generalDownloadStorage');

async function withTempDownloadsRoot(t) {
const originalRoot = process.env.GENERAL_DOWNLOADS_ROOT;
const tempRoot = await fs.promises.mkdtemp(path.join(os.tmpdir(), 'homebrain-downloads-'));
t.after(async () => {
if (originalRoot === undefined) {
delete process.env.GENERAL_DOWNLOADS_ROOT;
} else {
process.env.GENERAL_DOWNLOADS_ROOT = originalRoot;
}
await fs.promises.rm(tempRoot, { recursive: true, force: true });
});

process.env.GENERAL_DOWNLOADS_ROOT = tempRoot;
return tempRoot;
}

test('normalizeDownloadPath accepts nested public download paths', () => {
assert.equal(
generalDownloadStorage.normalizeDownloadPath('public-domain/PreprocessedPublicDomainLibrary.scoreflowseed'),
Expand All @@ -22,18 +38,7 @@ test('normalizeDownloadPath rejects traversal outside the downloads root', () =>
});

test('writeDownloadStream stores files under the configured general downloads root', async (t) => {
const originalRoot = process.env.GENERAL_DOWNLOADS_ROOT;
const tempRoot = await fs.promises.mkdtemp(path.join(os.tmpdir(), 'homebrain-downloads-'));
t.after(async () => {
if (originalRoot === undefined) {
delete process.env.GENERAL_DOWNLOADS_ROOT;
} else {
process.env.GENERAL_DOWNLOADS_ROOT = originalRoot;
}
await fs.promises.rm(tempRoot, { recursive: true, force: true });
});

process.env.GENERAL_DOWNLOADS_ROOT = tempRoot;
await withTempDownloadsRoot(t);
const result = await generalDownloadStorage.writeDownloadStream(
'public-domain/example.scoreflowseed',
Readable.from([Buffer.from('scoreflow')])
Expand All @@ -43,3 +48,75 @@ test('writeDownloadStream stores files under the configured general downloads ro
assert.equal(result.bytes, 9);
assert.equal(await fs.promises.readFile(result.absolutePath, 'utf8'), 'scoreflow');
});

test('writeDownloadChunk resumes chunks and finalizes with a SHA-256 digest', async (t) => {
await withTempDownloadsRoot(t);
const first = await generalDownloadStorage.writeDownloadChunk(
'public-domain/example.scoreflowseed',
Readable.from([Buffer.from('score')]),
{
offset: 0,
totalBytes: 9,
expectedBytes: 5
}
);

assert.equal(first.complete, false);
assert.equal(first.nextOffset, 5);

const uploadInfo = await generalDownloadStorage.getDownloadUploadInfo('public-domain/example.scoreflowseed');
assert.equal(uploadInfo.file.exists, false);
assert.equal(uploadInfo.staging.exists, true);
assert.equal(uploadInfo.staging.size, 5);

const second = await generalDownloadStorage.writeDownloadChunk(
'public-domain/example.scoreflowseed',
Readable.from([Buffer.from('flow')]),
{
offset: 5,
totalBytes: 9,
expectedBytes: 4,
complete: true
}
);

assert.equal(second.complete, true);
assert.equal(second.nextOffset, 9);
assert.equal(second.sha256, '8a64c099404d141e3cc882d08fdb6ca131bed21f665d0a28832d1e0a222e16ae');
assert.equal(await fs.promises.readFile(second.absolutePath, 'utf8'), 'scoreflow');

const finalInfo = await generalDownloadStorage.getDownloadUploadInfo('public-domain/example.scoreflowseed');
assert.equal(finalInfo.file.exists, true);
assert.equal(finalInfo.file.size, 9);
assert.equal(finalInfo.staging.exists, false);
});

test('writeDownloadChunk rejects mismatched offsets so resumable uploads stay consistent', async (t) => {
await withTempDownloadsRoot(t);
await generalDownloadStorage.writeDownloadChunk(
'public-domain/example.scoreflowseed',
Readable.from([Buffer.from('score')]),
{
offset: 0,
totalBytes: 9,
expectedBytes: 5
}
);

await assert.rejects(
() => generalDownloadStorage.writeDownloadChunk(
'public-domain/example.scoreflowseed',
Readable.from([Buffer.from('flow')]),
{
offset: 4,
totalBytes: 9,
expectedBytes: 4
}
),
(error) => {
assert.equal(error.status, 409);
assert.equal(error.expectedOffset, 5);
return true;
}
);
});
Loading