From c1a8210c25d83c829cd2b8df8aa5d3bda23e5441 Mon Sep 17 00:00:00 2001 From: OTLegend Date: Mon, 15 Dec 2025 08:47:12 +0100 Subject: [PATCH 1/6] [fix] don't remove cache on complete, instead use scheduled cleaners the cache cleaner now runs every 1h but frees only from memory, for disk it waits for 24h as before --- src/commands/cleaners/operation-id-cleaner-command.js | 5 +++-- src/constants/constants.js | 5 +++++ src/service/operation-service.js | 11 ++++++----- src/service/publish-service.js | 4 +++- 4 files changed, 17 insertions(+), 8 deletions(-) diff --git a/src/commands/cleaners/operation-id-cleaner-command.js b/src/commands/cleaners/operation-id-cleaner-command.js index a604c6430..de6a19497 100644 --- a/src/commands/cleaners/operation-id-cleaner-command.js +++ b/src/commands/cleaners/operation-id-cleaner-command.js @@ -3,6 +3,7 @@ import { BYTES_IN_KILOBYTE, OPERATION_ID_FILES_FOR_REMOVAL_MAX_NUMBER, OPERATION_ID_COMMAND_CLEANUP_TIME_MILLS, + OPERATION_ID_MEMORY_CLEANUP_TIME_MILLS, OPERATION_ID_STATUS, COMMAND_PRIORITY, } from '../../constants/constants.js'; @@ -30,7 +31,7 @@ class OperationIdCleanerCommand extends Command { OPERATION_ID_STATUS.FAILED, ]); let removed = await this.operationIdService.removeExpiredOperationIdMemoryCache( - OPERATION_ID_COMMAND_CLEANUP_TIME_MILLS, + OPERATION_ID_MEMORY_CLEANUP_TIME_MILLS, ); if (removed) { this.logger.debug( @@ -68,7 +69,7 @@ class OperationIdCleanerCommand extends Command { default(map) { const command = { name: 'operationIdCleanerCommand', - period: OPERATION_ID_COMMAND_CLEANUP_TIME_MILLS, + period: OPERATION_ID_MEMORY_CLEANUP_TIME_MILLS, data: {}, transactional: false, priority: COMMAND_PRIORITY.LOWEST, diff --git a/src/constants/constants.js b/src/constants/constants.js index e96c96b64..560bd0873 100644 --- a/src/constants/constants.js +++ b/src/constants/constants.js @@ -722,6 +722,11 @@ export const EXPECTED_TRANSACTION_ERRORS = { * operation id command cleanup interval time 24h */ export const OPERATION_ID_COMMAND_CLEANUP_TIME_MILLS = 24 * 60 * 60 * 1000; +/** + * @constant {number} OPERATION_ID_MEMORY_CLEANUP_TIME_MILLS - + * operation id memory cleanup interval time 1h + */ +export const OPERATION_ID_MEMORY_CLEANUP_TIME_MILLS = 60 * 60 * 1000; /** * @constant {number} FINALIZED_COMMAND_CLEANUP_TIME_MILLS - Command cleanup interval time * finalized commands command cleanup interval time 24h diff --git a/src/service/operation-service.js b/src/service/operation-service.js index 64d7a0b31..b706312d4 100644 --- a/src/service/operation-service.js +++ b/src/service/operation-service.js @@ -61,6 +61,12 @@ class OperationService { async markOperationAsCompleted(operationId, blockchain, responseData, endStatuses) { this.logger.info(`Finalizing ${this.operationName} for operationId: ${operationId}`); + await this.repositoryModuleManager.updateOperationStatus( + this.operationName, + operationId, + OPERATION_STATUS.COMPLETED, + ); + if (responseData === null) { await this.operationIdService.removeOperationIdCache(operationId); } else { @@ -68,11 +74,6 @@ class OperationService { await this.operationIdService.cacheOperationIdDataToFile(operationId, responseData); } - await this.repositoryModuleManager.updateOperationStatus( - this.operationName, - operationId, - OPERATION_STATUS.COMPLETED, - ); for (let i = 0; i < endStatuses.length; i += 1) { const status = endStatuses[i]; const response = { diff --git a/src/service/publish-service.js b/src/service/publish-service.js index 8851f8403..e4910a33b 100644 --- a/src/service/publish-service.js +++ b/src/service/publish-service.js @@ -79,10 +79,12 @@ class PublishService extends OperationService { `[PUBLISH] Minimum replication reached for operationId: ${operationId}, ` + `datasetRoot: ${datasetRoot}, completed: ${completedNumber}/${minAckResponses}`, ); + const cachedData = + (await this.operationIdService.getCachedOperationIdData(operationId)) || null; await this.markOperationAsCompleted( operationId, blockchain, - null, + cachedData, this.completedStatuses, ); await this.repositoryModuleManager.updateMinAcksReached(operationId, true); From 3609d941f8347c07add20de71797c1948f4078bd Mon Sep 17 00:00:00 2001 From: OTLegend Date: Mon, 15 Dec 2025 09:05:43 +0100 Subject: [PATCH 2/6] [feature] Add metrics for in memory and disk cache size before cleanup --- .../cleaners/operation-id-cleaner-command.js | 9 +++++++ src/service/operation-id-service.js | 24 +++++++++++++++++++ 2 files changed, 33 insertions(+) diff --git a/src/commands/cleaners/operation-id-cleaner-command.js b/src/commands/cleaners/operation-id-cleaner-command.js index de6a19497..8e9935107 100644 --- a/src/commands/cleaners/operation-id-cleaner-command.js +++ b/src/commands/cleaners/operation-id-cleaner-command.js @@ -24,6 +24,15 @@ class OperationIdCleanerCommand extends Command { * @param command */ async execute() { + const memoryBytes = this.operationIdService.getOperationIdMemoryCacheSizeBytes(); + const fileBytes = await this.operationIdService.getOperationIdFileCacheSizeBytes(); + const bytesInMegabyte = 1024 * 1024; + this.logger.debug( + `Operation cache footprint before cleanup: memory=${( + memoryBytes / bytesInMegabyte + ).toFixed(2)}MB, files=${(fileBytes / bytesInMegabyte).toFixed(2)}MB`, + ); + this.logger.debug('Starting command for removal of expired cache files'); const timeToBeDeleted = Date.now() - OPERATION_ID_COMMAND_CLEANUP_TIME_MILLS; await this.repositoryModuleManager.removeOperationIdRecord(timeToBeDeleted, [ diff --git a/src/service/operation-id-service.js b/src/service/operation-id-service.js index b959699e2..57e509a02 100644 --- a/src/service/operation-id-service.js +++ b/src/service/operation-id-service.js @@ -150,6 +150,30 @@ class OperationIdService { delete this.memoryCachedHandlersData[operationId]; } + getOperationIdMemoryCacheSizeBytes() { + let total = 0; + for (const operationId in this.memoryCachedHandlersData) { + const { data } = this.memoryCachedHandlersData[operationId]; + total += Buffer.from(JSON.stringify(data)).byteLength; + } + return total; + } + + async getOperationIdFileCacheSizeBytes() { + const cacheFolderPath = this.fileService.getOperationIdCachePath(); + const cacheFolderExists = await this.fileService.pathExists(cacheFolderPath); + if (!cacheFolderExists) return 0; + + const fileList = await this.fileService.readDirectory(cacheFolderPath); + let total = 0; + for (const fileName of fileList) { + // eslint-disable-next-line no-await-in-loop + const stats = await this.fileService.stat(path.join(cacheFolderPath, fileName)); + total += stats.size; + } + return total; + } + async removeExpiredOperationIdMemoryCache(expiredTimeout) { const now = Date.now(); let deleted = 0; From 7f9d514e2dc979642abcecd624103ec77b09e5b4 Mon Sep 17 00:00:00 2001 From: OTLegend Date: Mon, 15 Dec 2025 09:19:27 +0100 Subject: [PATCH 3/6] Add test coverage for operation cache cleanup --- .../cleaners/operation-id-cleaner-command.js | 14 ++- src/service/operation-id-service.js | 17 +-- src/service/operation-service.js | 13 ++- src/service/publish-service.js | 1 + .../operation-id-cleaner-command.test.js | 100 ++++++++++++++++++ .../operation-id-service-cache.test.js | 64 +++++++++++ 6 files changed, 198 insertions(+), 11 deletions(-) create mode 100644 test/unit/commands/operation-id-cleaner-command.test.js create mode 100644 test/unit/service/operation-id-service-cache.test.js diff --git a/src/commands/cleaners/operation-id-cleaner-command.js b/src/commands/cleaners/operation-id-cleaner-command.js index 8e9935107..9935321bf 100644 --- a/src/commands/cleaners/operation-id-cleaner-command.js +++ b/src/commands/cleaners/operation-id-cleaner-command.js @@ -24,8 +24,18 @@ class OperationIdCleanerCommand extends Command { * @param command */ async execute() { - const memoryBytes = this.operationIdService.getOperationIdMemoryCacheSizeBytes(); - const fileBytes = await this.operationIdService.getOperationIdFileCacheSizeBytes(); + let memoryBytes = 0; + let fileBytes = 0; + try { + memoryBytes = this.operationIdService.getOperationIdMemoryCacheSizeBytes(); + } catch (error) { + this.logger.warn(`Unable to read memory cache footprint: ${error.message}`); + } + try { + fileBytes = await this.operationIdService.getOperationIdFileCacheSizeBytes(); + } catch (error) { + this.logger.warn(`Unable to read file cache footprint: ${error.message}`); + } const bytesInMegabyte = 1024 * 1024; this.logger.debug( `Operation cache footprint before cleanup: memory=${( diff --git a/src/service/operation-id-service.js b/src/service/operation-id-service.js index 57e509a02..1f91780c6 100644 --- a/src/service/operation-id-service.js +++ b/src/service/operation-id-service.js @@ -165,13 +165,16 @@ class OperationIdService { if (!cacheFolderExists) return 0; const fileList = await this.fileService.readDirectory(cacheFolderPath); - let total = 0; - for (const fileName of fileList) { - // eslint-disable-next-line no-await-in-loop - const stats = await this.fileService.stat(path.join(cacheFolderPath, fileName)); - total += stats.size; - } - return total; + const sizeResults = await Promise.allSettled( + fileList.map((fileName) => + this.fileService + .stat(path.join(cacheFolderPath, fileName)) + .then((stats) => stats.size), + ), + ); + return sizeResults + .filter((res) => res.status === 'fulfilled') + .reduce((acc, res) => acc + res.value, 0); } async removeExpiredOperationIdMemoryCache(expiredTimeout) { diff --git a/src/service/operation-service.js b/src/service/operation-service.js index b706312d4..10d84b2f4 100644 --- a/src/service/operation-service.js +++ b/src/service/operation-service.js @@ -58,7 +58,14 @@ class OperationService { return operationIdStatuses; } - async markOperationAsCompleted(operationId, blockchain, responseData, endStatuses) { + async markOperationAsCompleted( + operationId, + blockchain, + responseData, + endStatuses, + options = {}, + ) { + const { reuseExistingCache = false } = options; this.logger.info(`Finalizing ${this.operationName} for operationId: ${operationId}`); await this.repositoryModuleManager.updateOperationStatus( @@ -71,7 +78,9 @@ class OperationService { await this.operationIdService.removeOperationIdCache(operationId); } else { await this.operationIdService.cacheOperationIdDataToMemory(operationId, responseData); - await this.operationIdService.cacheOperationIdDataToFile(operationId, responseData); + if (!reuseExistingCache) { + await this.operationIdService.cacheOperationIdDataToFile(operationId, responseData); + } } for (let i = 0; i < endStatuses.length; i += 1) { diff --git a/src/service/publish-service.js b/src/service/publish-service.js index e4910a33b..1e6046240 100644 --- a/src/service/publish-service.js +++ b/src/service/publish-service.js @@ -86,6 +86,7 @@ class PublishService extends OperationService { blockchain, cachedData, this.completedStatuses, + { reuseExistingCache: true }, ); await this.repositoryModuleManager.updateMinAcksReached(operationId, true); this.logResponsesSummary(completedNumber, failedNumber); diff --git a/test/unit/commands/operation-id-cleaner-command.test.js b/test/unit/commands/operation-id-cleaner-command.test.js new file mode 100644 index 000000000..c62734034 --- /dev/null +++ b/test/unit/commands/operation-id-cleaner-command.test.js @@ -0,0 +1,100 @@ +import { describe, it, beforeEach, afterEach } from 'mocha'; +import { expect } from 'chai'; +import sinon from 'sinon'; + +import OperationIdCleanerCommand from '../../../src/commands/cleaners/operation-id-cleaner-command.js'; +import { + OPERATION_ID_COMMAND_CLEANUP_TIME_MILLS, + OPERATION_ID_FILES_FOR_REMOVAL_MAX_NUMBER, + OPERATION_ID_MEMORY_CLEANUP_TIME_MILLS, + OPERATION_ID_STATUS, +} from '../../../src/constants/constants.js'; + +describe('OperationIdCleanerCommand', () => { + let clock; + let operationIdService; + let repositoryModuleManager; + let logger; + let command; + + beforeEach(() => { + clock = sinon.useFakeTimers(new Date('2023-01-01T00:00:00Z').getTime()); + + operationIdService = { + getOperationIdMemoryCacheSizeBytes: sinon.stub().returns(1024), + getOperationIdFileCacheSizeBytes: sinon.stub().resolves(2048), + removeExpiredOperationIdMemoryCache: sinon.stub().resolves(512), + removeExpiredOperationIdFileCache: sinon.stub().resolves(3), + }; + + repositoryModuleManager = { + removeOperationIdRecord: sinon.stub().resolves(), + }; + + logger = { + debug: sinon.spy(), + info: sinon.spy(), + warn: sinon.spy(), + error: sinon.spy(), + }; + + command = new OperationIdCleanerCommand({ + logger, + repositoryModuleManager, + operationIdService, + fileService: {}, + }); + }); + + afterEach(() => { + clock.restore(); + }); + + it('cleans memory with 1h TTL and files with 24h TTL while reporting footprint', async () => { + await command.execute(); + + expect(operationIdService.getOperationIdMemoryCacheSizeBytes.calledOnce).to.be.true; + expect(operationIdService.getOperationIdFileCacheSizeBytes.calledOnce).to.be.true; + + expect( + repositoryModuleManager.removeOperationIdRecord.calledWith( + Date.now() - OPERATION_ID_COMMAND_CLEANUP_TIME_MILLS, + [OPERATION_ID_STATUS.COMPLETED, OPERATION_ID_STATUS.FAILED], + ), + ).to.be.true; + + expect( + operationIdService.removeExpiredOperationIdMemoryCache.calledWith( + OPERATION_ID_MEMORY_CLEANUP_TIME_MILLS, + ), + ).to.be.true; + + expect( + operationIdService.removeExpiredOperationIdFileCache.calledWith( + OPERATION_ID_COMMAND_CLEANUP_TIME_MILLS, + OPERATION_ID_FILES_FOR_REMOVAL_MAX_NUMBER, + ), + ).to.be.true; + + expect(logger.debug.called).to.be.true; + }); + + it('handles missing memory cache gracefully', async () => { + operationIdService.getOperationIdMemoryCacheSizeBytes.throws(new Error('no memory cache')); + await command.execute(); + + expect( + repositoryModuleManager.removeOperationIdRecord.calledWith( + Date.now() - OPERATION_ID_COMMAND_CLEANUP_TIME_MILLS, + [OPERATION_ID_STATUS.COMPLETED, OPERATION_ID_STATUS.FAILED], + ), + ).to.be.true; + + expect( + operationIdService.removeExpiredOperationIdFileCache.calledWith( + OPERATION_ID_COMMAND_CLEANUP_TIME_MILLS, + OPERATION_ID_FILES_FOR_REMOVAL_MAX_NUMBER, + ), + ).to.be.true; + }); +}); diff --git a/test/unit/service/operation-id-service-cache.test.js b/test/unit/service/operation-id-service-cache.test.js new file mode 100644 index 000000000..43b40c219 --- /dev/null +++ b/test/unit/service/operation-id-service-cache.test.js @@ -0,0 +1,64 @@ +import { describe, it, beforeEach, afterEach } from 'mocha'; +import { expect } from 'chai'; +import fs from 'fs/promises'; +import path from 'path'; +import os from 'os'; +import OperationIdService from '../../../src/service/operation-id-service.js'; + +describe('OperationIdService file cache cleanup', () => { + let tmpDir; + let service; + + beforeEach(async () => { + tmpDir = await fs.mkdtemp(path.join(os.tmpdir(), 'opid-cache-')); + const now = Date.now(); + + // Older than TTL (2 hours) + const oldFile = path.join(tmpDir, 'old.json'); + await fs.writeFile(oldFile, '{}'); + await fs.utimes( + oldFile, + new Date(now - 2 * 60 * 60 * 1000), + new Date(now - 2 * 60 * 60 * 1000), + ); + + // Newer than TTL (10 minutes) + const newFile = path.join(tmpDir, 'new.json'); + await fs.writeFile(newFile, '{}'); + await fs.utimes(newFile, new Date(now - 10 * 60 * 1000), new Date(now - 10 * 60 * 1000)); + + const fileService = { + getOperationIdCachePath: () => tmpDir, + async pathExists(p) { + try { + await fs.stat(p); + return true; + } catch { + return false; + } + }, + readDirectory: (p) => fs.readdir(p), + stat: (p) => fs.stat(p), + removeFile: (p) => fs.rm(p, { force: true }), + }; + + service = new OperationIdService({ + logger: { debug: () => {}, warn: () => {}, error: () => {} }, + fileService, + repositoryModuleManager: {}, + eventEmitter: { emit: () => {} }, + }); + }); + + afterEach(async () => { + await fs.rm(tmpDir, { recursive: true, force: true }); + }); + + it('removes only files older than TTL', async () => { + const deleted = await service.removeExpiredOperationIdFileCache(60 * 60 * 1000, 10); + const remainingFiles = await fs.readdir(tmpDir); + + expect(deleted).to.equal(1); + expect(remainingFiles).to.deep.equal(['new.json']); + }); +}); From 7850caf8236baa09d1de8e87ff64b17825cd67f4 Mon Sep 17 00:00:00 2001 From: OTLegend Date: Mon, 15 Dec 2025 12:32:21 +0100 Subject: [PATCH 4/6] increment version for release --- package.json | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/package.json b/package.json index e4813f273..66555d117 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "origintrail_node", - "version": "8.2.3", + "version": "8.2.4", "description": "OTNode V8", "main": "index.js", "type": "module", @@ -31,10 +31,7 @@ "type": "git", "url": "git+https://github.com/OriginTrail/ot-node.git" }, - "keywords": [ - "ot-node", - "v8" - ], + "keywords": ["ot-node", "v8"], "author": "TraceLabs", "license": "ISC", "bugs": { From 5e01ed3671b56feb266ca80566aaf1746430e9c8 Mon Sep 17 00:00:00 2001 From: Zvonimir Date: Mon, 15 Dec 2025 12:47:00 +0100 Subject: [PATCH 5/6] revert to old version --- package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/package.json b/package.json index 66555d117..1e09f2234 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "origintrail_node", - "version": "8.2.4", + "version": "8.2.3", "description": "OTNode V8", "main": "index.js", "type": "module", From eef936b577c4994938ce243fe9db8f93796c236b Mon Sep 17 00:00:00 2001 From: Zvonimir Date: Mon, 15 Dec 2025 12:47:04 +0100 Subject: [PATCH 6/6] 8.2.4 --- package-lock.json | 4 ++-- package.json | 7 +++++-- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/package-lock.json b/package-lock.json index ff0ea7053..8248b91a7 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,12 @@ { "name": "origintrail_node", - "version": "8.2.3", + "version": "8.2.4", "lockfileVersion": 2, "requires": true, "packages": { "": { "name": "origintrail_node", - "version": "8.2.3", + "version": "8.2.4", "license": "ISC", "dependencies": { "@comunica/query-sparql": "^4.0.2", diff --git a/package.json b/package.json index 1e09f2234..7cc8b8bf1 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "origintrail_node", - "version": "8.2.3", + "version": "8.2.4", "description": "OTNode V8", "main": "index.js", "type": "module", @@ -31,7 +31,10 @@ "type": "git", "url": "git+https://github.com/OriginTrail/ot-node.git" }, - "keywords": ["ot-node", "v8"], + "keywords": [ + "ot-node", + "v8" + ], "author": "TraceLabs", "license": "ISC", "bugs": {