diff --git a/package-lock.json b/package-lock.json index 1c2c3f653..ff0ea7053 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,12 @@ { "name": "origintrail_node", - "version": "8.2.2", + "version": "8.2.3", "lockfileVersion": 2, "requires": true, "packages": { "": { "name": "origintrail_node", - "version": "8.2.2", + "version": "8.2.3", "license": "ISC", "dependencies": { "@comunica/query-sparql": "^4.0.2", diff --git a/package.json b/package.json index 8ea0c3e0a..e4813f273 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "origintrail_node", - "version": "8.2.2", + "version": "8.2.3", "description": "OTNode V8", "main": "index.js", "type": "module", diff --git a/src/commands/protocols/publish/publish-finalization-command.js b/src/commands/protocols/publish/publish-finalization-command.js index b314b5c53..78db13bd0 100644 --- a/src/commands/protocols/publish/publish-finalization-command.js +++ b/src/commands/protocols/publish/publish-finalization-command.js @@ -33,6 +33,8 @@ class PublishFinalizationCommand extends Command { const { id, publishOperationId, merkleRoot, byteSize } = eventData; const { blockchain, contractAddress } = event; const operationId = this.operationIdService.generateId(); + const ual = this.ualService.deriveUAL(blockchain, contractAddress, id); + this.operationIdService.emitChangeEvent( OPERATION_ID_STATUS.PUBLISH_FINALIZATION.PUBLISH_FINALIZATION_START, operationId, @@ -70,8 +72,10 @@ class PublishFinalizationCommand extends Command { cachedMerkleRoot = result.merkleRoot; assertion = result.assertion; publisherPeerId = result.remotePeerId; - } catch (error) { - this.logger.error(`Failed to read cached publish data: ${error.message}`); // TODO: Make this log more descriptive + } catch (_error) { + this.logger.warn( + `[Cache] Failed to read cached publish data for UAL ${ual} (publishOperationId: ${publishOperationId}, txHash: ${txHash}, operationId: ${operationId})`, + ); this.operationIdService.emitChangeEvent( OPERATION_ID_STATUS.FAILED, operationId, @@ -81,8 +85,6 @@ class PublishFinalizationCommand extends Command { return Command.empty(); } - const ual = this.ualService.deriveUAL(blockchain, contractAddress, id); - try { await this.validatePublishData(merkleRoot, cachedMerkleRoot, byteSize, assertion, ual); } catch (e) { @@ -185,23 +187,24 @@ class PublishFinalizationCommand extends Command { async readWithRetries(publishOperationId) { let attempt = 0; + const datasetPath = this.fileService.getPendingStorageDocumentPath(publishOperationId); while (attempt < MAX_RETRIES_READ_CACHED_PUBLISH_DATA) { try { - const datasetPath = - this.fileService.getPendingStorageDocumentPath(publishOperationId); // eslint-disable-next-line no-await-in-loop const cachedData = await this.fileService.readFile(datasetPath, true); return cachedData; } catch (error) { attempt += 1; - // eslint-disable-next-line no-await-in-loop await new Promise((resolve) => { setTimeout(resolve, RETRY_DELAY_READ_CACHED_PUBLISH_DATA); }); } } + this.logger.warn( + `[Cache] Exhausted retries reading cached publish data (publishOperationId: ${publishOperationId}, path: ${datasetPath}).`, + ); // TODO: Mark this operation as failed throw new Error('Failed to read cached publish data'); } diff --git a/src/modules/triple-store/implementation/ot-blazegraph/ot-blazegraph.js b/src/modules/triple-store/implementation/ot-blazegraph/ot-blazegraph.js index 16a4d9af3..ee4c478b1 100644 --- a/src/modules/triple-store/implementation/ot-blazegraph/ot-blazegraph.js +++ b/src/modules/triple-store/implementation/ot-blazegraph/ot-blazegraph.js @@ -177,12 +177,24 @@ class OtBlazegraph extends OtTripleStore { } async queryVoid(repository, query, timeout) { - return axios.post(this.repositories[repository].sparqlEndpoint, query, { - headers: { - 'Content-Type': 'application/sparql-update; charset=UTF-8', - 'X-BIGDATA-MAX-QUERY-MILLIS': timeout, - }, - }); + try { + return await axios.post(this.repositories[repository].sparqlEndpoint, query, { + headers: { + 'Content-Type': 'application/sparql-update; charset=UTF-8', + 'X-BIGDATA-MAX-QUERY-MILLIS': timeout, + }, + }); + } catch (error) { + const status = error?.response?.status; + const dataSnippet = + typeof error?.response?.data === 'string' ? error.response.data.slice(0, 200) : ''; + this.logger.error( + `[OtBlazegraph.queryVoid] Update failed for ${repository} (status: ${status}): ${ + error.message + }${dataSnippet ? ` | data: ${dataSnippet}` : ''}`, + ); + throw error; + } } async deleteRepository(repository) { diff --git a/src/service/publish-service.js b/src/service/publish-service.js index 6f043c1a5..8851f8403 100644 --- a/src/service/publish-service.js +++ b/src/service/publish-service.js @@ -69,32 +69,42 @@ class PublishService extends OperationService { // } // 2. Check if all responses have been received - if (totalResponses === numberOfFoundNodes) { - // 2.1 If minimum replication is reached, mark the operation as completed - if (completedNumber >= minAckResponses) { - await this.markOperationAsCompleted( - operationId, - blockchain, - null, - this.completedStatuses, - ); - await this.repositoryModuleManager.updateMinAcksReached(operationId, true); - this.logResponsesSummary(completedNumber, failedNumber); - } - // 2.2 Otherwise, mark as failed - else { - await this.markOperationAsFailed( - operationId, - blockchain, - 'Not replicated to enough nodes!', - this.errorType, - ); - this.operationIdService.emitChangeEvent( - OPERATION_ID_STATUS.PUBLISH.PUBLISH_FAILED, - operationId, - ); - this.logResponsesSummary(completedNumber, failedNumber); - } + // 2.1 If minimum replication is reached, mark the operation as completed + + const record = await this.operationIdService.getOperationIdRecord(operationId); + if (record?.minAcksReached) return; + + if (completedNumber >= minAckResponses) { + this.logger.info( + `[PUBLISH] Minimum replication reached for operationId: ${operationId}, ` + + `datasetRoot: ${datasetRoot}, completed: ${completedNumber}/${minAckResponses}`, + ); + await this.markOperationAsCompleted( + operationId, + blockchain, + null, + this.completedStatuses, + ); + await this.repositoryModuleManager.updateMinAcksReached(operationId, true); + this.logResponsesSummary(completedNumber, failedNumber); + } + // 2.2 Otherwise, mark as failed + else if (totalResponses === numberOfFoundNodes) { + this.logger.warn( + `[PUBLISH] Failed for operationId: ${operationId}, ` + + `only ${completedNumber}/${minAckResponses} nodes responded successfully`, + ); + await this.markOperationAsFailed( + operationId, + blockchain, + 'Not replicated to enough nodes!', + this.errorType, + ); + this.operationIdService.emitChangeEvent( + OPERATION_ID_STATUS.PUBLISH.PUBLISH_FAILED, + operationId, + ); + this.logResponsesSummary(completedNumber, failedNumber); } // else { // // 3. Not all responses have arrived yet.