Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29,768 changes: 27,165 additions & 2,603 deletions package-lock.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "origintrail_node",
"version": "8.1.1-rc.9",
"version": "8.1.1-rc.10",
"description": "OTNode V8",
"main": "index.js",
"type": "module",
Expand Down
25 changes: 14 additions & 11 deletions src/commands/protocols/common/handle-protocol-message-command.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,7 @@ class HandleProtocolMessageCommand extends Command {
async execute(command) {
const { remotePeerId, operationId, protocol, blockchain } = command.data;

this.operationIdService.updateOperationIdStatus(
operationId,
blockchain,
this.operationStartEvent,
);
this.operationIdService.emitChangeEvent(this.operationStartEvent, operationId, blockchain);

try {
const { messageType, messageData } = await this.prepareMessage(command.data);
Expand All @@ -38,11 +34,6 @@ class HandleProtocolMessageCommand extends Command {
operationId,
messageData,
);
await this.operationIdService.updateOperationIdStatus(
operationId,
blockchain,
this.operationEndEvent,
);
} catch (error) {
if (command.retries) {
this.logger.warn(error.message);
Expand Down Expand Up @@ -122,7 +113,19 @@ class HandleProtocolMessageCommand extends Command {
async handleError(errorMessage, command) {
const { operationId, blockchain, remotePeerId, protocol } = command.data;

await super.handleError(operationId, blockchain, errorMessage, this.errorType, true);
this.logger.error(`Command error (${this.errorType}): ${errorMessage}`);
if (errorMessage !== null) {
this.logger.debug(`Marking operation id ${operationId} as failed`);
await this.removeOperationIdCache(operationId);
}
this.operationIdService.emitChangeEvent(
this.errorType,
operationId,
blockchain,
errorMessage,
this.errorType,
);

await this.networkModuleManager.sendMessageResponse(
protocol,
remotePeerId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,11 @@ class HandleFinalityRequestCommand extends HandleProtocolMessageCommand {
if (state) {
ualWithState = `${ual}:${state}`;
}
await this.operationIdService.updateOperationIdStatus(

this.operationIdService.emitChangeEvent(
OPERATION_ID_STATUS.FINALITY.PUBLISH_FINALITY_REMOTE_START,
operationId,
blockchain,
OPERATION_ID_STATUS.FINALITY.PUBLISH_FINALITY_REMOTE_START,
);

let response;
Expand Down Expand Up @@ -70,10 +71,10 @@ class HandleFinalityRequestCommand extends HandleProtocolMessageCommand {
OPERATION_ID_STATUS.FINALITY.PUBLISH_FINALITY_END,
OPERATION_ID_STATUS.COMPLETED,
]);
await this.operationIdService.updateOperationIdStatus(
this.operationIdService.emitChangeEvent(
OPERATION_ID_STATUS.FINALITY.PUBLISH_FINALITY_REMOTE_END,
operationId,
blockchain,
OPERATION_ID_STATUS.FINALITY.PUBLISH_FINALITY_REMOTE_END,
);

// eslint-disable-next-line no-param-reassign
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,10 @@ class HandleBatchGetRequestCommand extends HandleProtocolMessageCommand {
this.logger.startTimer(
`HandleBatchGetRequestCommand [PREPARE]: ${operationId} ${uals.length}`,
);
await this.operationIdService.updateOperationIdStatus(
await this.operationIdService.emitChangeEvent(
this.operationStartEvent,
operationId,
blockchain,
this.operationStartEvent,
);

// Trim uals and tokenIds to the max limit of BATCH_GET_UAL_MAX_LIMIT
Expand Down Expand Up @@ -108,10 +108,10 @@ class HandleBatchGetRequestCommand extends HandleProtocolMessageCommand {
);

if (assertions?.length) {
await this.operationIdService.updateOperationIdStatus(
await this.operationIdService.emitChangeEvent(
this.operationEndEvent,
operationId,
blockchain,
this.operationEndEvent,
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,10 +174,10 @@ class HandleGetRequestCommand extends HandleProtocolMessageCommand {
};

if (assertion?.public?.length || assertion?.length) {
await this.operationIdService.updateOperationIdStatus(
await this.operationIdService.emitChangeEvent(
this.operationEndEvent,
operationId,
blockchain,
OPERATION_ID_STATUS.GET.GET_REMOTE_END,
);
}

Expand Down
15 changes: 10 additions & 5 deletions src/commands/protocols/get/sender/get-command.js
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,11 @@ class GetCommand extends Command {
errorMessage,
errorType,
);
this.operationIdService.emitChangeEvent(OPERATION_ID_STATUS.GET.GET_FAILED, operationId);
this.operationIdService.emitChangeEvent(
OPERATION_ID_STATUS.GET.GET_FAILED,
operationId,
blockchain,
);
}

/**
Expand Down Expand Up @@ -150,10 +154,11 @@ class GetCommand extends Command {
return Command.empty();
}
}
await this.operationIdService.updateOperationIdStatus(

this.operationIdService.emitChangeEvent(
OPERATION_ID_STATUS.GET.GET_VALIDATE_ASSET_END,
operationId,
blockchain,
OPERATION_ID_STATUS.GET.GET_VALIDATE_ASSET_END,
);

await this.operationIdService.updateOperationIdStatus(
Expand Down Expand Up @@ -258,10 +263,10 @@ class GetCommand extends Command {
}
this.logger.debug(`Could not find asset with UAL: ${ual} locally`);

await this.operationIdService.updateOperationIdStatus(
await this.operationIdService.emitChangeEvent(
OPERATION_ID_STATUS.GET.GET_LOCAL_END,
operationId,
blockchain,
OPERATION_ID_STATUS.GET.GET_LOCAL_END,
);

await this.operationIdService.updateOperationIdStatus(
Expand Down
55 changes: 31 additions & 24 deletions src/commands/protocols/publish/publish-finalization-command.js
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,10 @@ class PublishFinalizationCommand extends Command {
const { txHash, blockNumber } = event;
const { id, publishOperationId, merkleRoot, byteSize } = eventData;
const { blockchain, contractAddress } = event;
const operationId = await this.operationIdService.generateOperationId(
const operationId = this.operationIdService.generateId();
this.operationIdService.emitChangeEvent(
OPERATION_ID_STATUS.PUBLISH_FINALIZATION.PUBLISH_FINALIZATION_START,
operationId,
blockchain,
publishOperationId,
);
Expand Down Expand Up @@ -103,10 +105,10 @@ class PublishFinalizationCommand extends Command {
}

try {
await this.operationIdService.updateOperationIdStatus(
await this.operationIdService.emitChangeEvent(
OPERATION_ID_STATUS.PUBLISH_FINALIZATION.PUBLISH_FINALIZATION_STORE_ASSERTION_START,
operationId,
blockchain,
OPERATION_ID_STATUS.PUBLISH_FINALIZATION.PUBLISH_FINALIZATION_STORE_ASSERTION_START,
);

const totalTriples = await this.tripleStoreService.insertKnowledgeCollection(
Expand All @@ -119,10 +121,10 @@ class PublishFinalizationCommand extends Command {
await this.repositoryModuleManager.incrementInsertedTriples(totalTriples ?? 0);
this.logger.info(`Number of triples added to the database +${totalTriples}`);

await this.operationIdService.updateOperationIdStatus(
await this.operationIdService.emitChangeEvent(
OPERATION_ID_STATUS.PUBLISH_FINALIZATION.PUBLISH_FINALIZATION_STORE_ASSERTION_END,
operationId,
blockchain,
OPERATION_ID_STATUS.PUBLISH_FINALIZATION.PUBLISH_FINALIZATION_STORE_ASSERTION_END,
);

const myPeerId = this.networkModuleManager.getPeerId().toB58String();
Expand All @@ -133,15 +135,9 @@ class PublishFinalizationCommand extends Command {
publisherPeerId,
);

await this.operationService.markOperationAsCompleted(
operationId,
blockchain,
{
completedNodes: 1,
allNodesReplicatedData: true,
},
[...this.operationService.completedStatuses],
);
for (const status of this.operationService.completedStatuses) {
this.operationIdService.emitChangeEvent(status, operationId, blockchain);
}
} else {
const networkProtocols = this.operationService.getNetworkProtocols();
const node = { id: publisherPeerId, protocol: networkProtocols[0] };
Expand All @@ -164,7 +160,8 @@ class PublishFinalizationCommand extends Command {
);
}
} catch (e) {
await this.handleError(operationId, blockchain, e.message, this.errorType, true);
this.logger.error(`Command error (${this.errorType}): ${e.message}`);

this.operationIdService.emitChangeEvent(
OPERATION_ID_STATUS.FAILED,
operationId,
Expand All @@ -187,12 +184,14 @@ class PublishFinalizationCommand extends Command {
) {
try {
if (merkleRoot !== cachedMerkleRoot) {
await this.handleError(
const errorMessage = `Invalid Merkle Root for Knowledge Collection: ${ual}. Received value from blockchain: ${merkleRoot}, Cached value from publish operation: ${cachedMerkleRoot}`;

this.logger.error(`Command error (${this.errorType}): ${errorMessage}`);

this.operationIdService.emitChangeEvent(
OPERATION_ID_STATUS.FAILED,
operationId,
blockchain,
`Invalid Merkle Root for Knowledge Collection: ${ual}. Received value from blockchain: ${merkleRoot}, Cached value from publish operation: ${cachedMerkleRoot}`,
this.errorType,
true,
);
}

Expand All @@ -201,16 +200,24 @@ class PublishFinalizationCommand extends Command {
);

if (byteSize.toString() !== calculatedAssertionSize.toString()) {
await this.handleError(
const errorMessage = `Invalid Assertion Size for Knowledge Collection: ${ual}. Received value from blockchain: ${byteSize}, Calculated value: ${calculatedAssertionSize}`;

this.logger.error(`Command error (${this.errorType}): ${errorMessage}`);

this.operationIdService.emitChangeEvent(
OPERATION_ID_STATUS.FAILED,
operationId,
blockchain,
`Invalid Assertion Size for Knowledge Collection: ${ual}. Received value from blockchain: ${byteSize}, Calculated value: ${calculatedAssertionSize}`,
this.errorType,
true,
);
}
} catch (e) {
await this.handleError(operationId, blockchain, e.message, this.errorType, true);
this.logger.error(`Command error (${this.errorType}): ${e.message}`);

this.operationIdService.emitChangeEvent(
OPERATION_ID_STATUS.FAILED,
operationId,
blockchain,
);
throw e;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@ class HandleStoreRequestCommand extends HandleProtocolMessageCommand {
async prepareMessage(commandData) {
const { blockchain, operationId, datasetRoot, remotePeerId, isOperationV0 } = commandData;

await this.operationIdService.updateOperationIdStatus(
await this.operationIdService.emitChangeEvent(
OPERATION_ID_STATUS.PUBLISH.PUBLISH_VALIDATE_ASSET_REMOTE_START,
operationId,
blockchain,
OPERATION_ID_STATUS.PUBLISH.PUBLISH_VALIDATE_ASSET_REMOTE_START,
);

const { dataset } = await this.operationIdService.getCachedOperationIdData(operationId);
Expand All @@ -45,20 +45,20 @@ class HandleStoreRequestCommand extends HandleProtocolMessageCommand {
isOperationV0,
);

await this.operationIdService.updateOperationIdStatus(
await this.operationIdService.emitChangeEvent(
OPERATION_ID_STATUS.PUBLISH.PUBLISH_VALIDATE_ASSET_REMOTE_END,
operationId,
blockchain,
OPERATION_ID_STATUS.PUBLISH.PUBLISH_VALIDATE_ASSET_REMOTE_END,
);

if (validationResult.messageType === NETWORK_MESSAGE_TYPES.RESPONSES.NACK) {
return validationResult;
}

await this.operationIdService.updateOperationIdStatus(
await this.operationIdService.emitChangeEvent(
OPERATION_ID_STATUS.PUBLISH.PUBLISH_LOCAL_STORE_REMOTE_CACHE_DATASET_START,
operationId,
blockchain,
OPERATION_ID_STATUS.PUBLISH.PUBLISH_LOCAL_STORE_REMOTE_CACHE_DATASET_START,
);
if (isOperationV0) {
const { contract, tokenId } = commandData;
Expand All @@ -72,20 +72,20 @@ class HandleStoreRequestCommand extends HandleProtocolMessageCommand {
remotePeerId,
);
}
await this.operationIdService.updateOperationIdStatus(
await this.operationIdService.emitChangeEvent(
OPERATION_ID_STATUS.PUBLISH.PUBLISH_LOCAL_STORE_REMOTE_CACHE_DATASET_END,
operationId,
blockchain,
OPERATION_ID_STATUS.PUBLISH.PUBLISH_LOCAL_STORE_REMOTE_CACHE_DATASET_END,
);

const identityId = await this.blockchainModuleManager.getIdentityId(blockchain);

const { v, r, s, vs } = await this.signatureService.signMessage(blockchain, datasetRoot);

await this.operationIdService.updateOperationIdStatus(
await this.operationIdService.emitChangeEvent(
OPERATION_ID_STATUS.PUBLISH.PUBLISH_VALIDATE_ASSET_REMOTE_END,
operationId,
blockchain,
OPERATION_ID_STATUS.PUBLISH.PUBLISH_VALIDATE_ASSET_REMOTE_END,
);

return {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,12 +89,6 @@ class PublishReplicationCommand extends Command {
return Command.empty();
}

await this.operationIdService.updateOperationIdStatus(
operationId,
blockchain,
OPERATION_ID_STATUS.FAILED,
);

try {
await this.operationIdService.updateOperationIdStatus(
operationId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@ class PublishController extends BaseController {
blockchain,
);

await this.operationIdService.updateOperationIdStatus(
this.operationIdService.emitChangeEvent(
OPERATION_ID_STATUS.PUBLISH.PUBLISH_INIT_START,
operationId,
blockchain,
OPERATION_ID_STATUS.PUBLISH.PUBLISH_INIT_START,
);

this.returnResponse(res, 202, {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ class BlockchainEventRepository {
return this.model.update(
{ processed: true },
{
where: { blockchain },
where: { blockchain, processed: false },
...options,
},
);
Expand Down
15 changes: 11 additions & 4 deletions src/service/operation-service.js
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,17 @@ class OperationService {
operationId,
OPERATION_STATUS.COMPLETED,
);

for (const status of endStatuses) {
// eslint-disable-next-line no-await-in-loop
await this.operationIdService.updateOperationIdStatus(operationId, blockchain, status);
for (let i = 0; i < endStatuses.length; i += 1) {
const status = endStatuses[i];
const response = {
status,
};

this.operationIdService.emitChangeEvent(status, operationId, blockchain);
if (i === endStatuses.length - 1) {
// eslint-disable-next-line no-await-in-loop
await this.repositoryModuleManager.updateOperationIdRecord(response, operationId);
}
}
}

Expand Down
Loading
Loading