diff --git a/src/extensions/scratch3_mesh_v2/gql-operations.js b/src/extensions/scratch3_mesh_v2/gql-operations.js index 19ee27b244..75cf5e264d 100644 --- a/src/extensions/scratch3_mesh_v2/gql-operations.js +++ b/src/extensions/scratch3_mesh_v2/gql-operations.js @@ -9,7 +9,6 @@ const LIST_GROUPS_BY_DOMAIN = gql` fullId name hostId - createdAt expiresAt } } @@ -22,16 +21,22 @@ const CREATE_DOMAIN = gql` `; const CREATE_GROUP = gql` - mutation CreateGroup($name: String!, $hostId: ID!, $domain: String!, $maxConnectionTimeSeconds: Int) { - createGroup(name: $name, hostId: $hostId, domain: $domain, maxConnectionTimeSeconds: $maxConnectionTimeSeconds) { + mutation CreateGroup( + $name: String!, $hostId: ID!, $domain: String!, $maxConnectionTimeSeconds: Int, $useWebSocket: Boolean! + ) { + createGroup( + name: $name, hostId: $hostId, domain: $domain, + maxConnectionTimeSeconds: $maxConnectionTimeSeconds, useWebSocket: $useWebSocket + ) { id domain fullId name hostId - createdAt expiresAt heartbeatIntervalSeconds + useWebSocket + pollingIntervalSeconds } } `; @@ -45,6 +50,8 @@ const JOIN_GROUP = gql` domain expiresAt heartbeatIntervalSeconds + useWebSocket + pollingIntervalSeconds } } `; @@ -139,6 +146,33 @@ const FIRE_EVENTS = gql` } `; +const RECORD_EVENTS = gql` + mutation RecordEventsByNode($groupId: ID!, $domain: String!, $nodeId: ID!, $events: [EventInput!]!) { + recordEventsByNode( + groupId: $groupId, domain: $domain, nodeId: $nodeId, events: $events + ) { + groupId + domain + recordedCount + nextSince + } + } +`; + +const GET_EVENTS_SINCE = gql` + query GetEventsSince($groupId: ID!, $domain: String!, $since: String!) { + getEventsSince(groupId: $groupId, domain: $domain, since: $since) { + name + firedByNodeId + groupId + domain + payload + timestamp + cursor + } + } +`; + const ON_MESSAGE_IN_GROUP = gql` subscription OnMessageInGroup($groupId: ID!, $domain: String!) { onMessageInGroup(groupId: $groupId, domain: $domain) { @@ -203,6 +237,8 @@ module.exports = { SEND_MEMBER_HEARTBEAT, REPORT_DATA, FIRE_EVENTS, + RECORD_EVENTS, + GET_EVENTS_SINCE, ON_MESSAGE_IN_GROUP, LIST_GROUP_STATUSES }; diff --git a/src/extensions/scratch3_mesh_v2/mesh-client.js b/src/extensions/scratch3_mesh_v2/mesh-client.js index 8c5f0e38f0..737ec34a69 100644 --- a/src/extensions/scratch3_mesh_v2/mesh-client.js +++ b/src/extensions/scratch3_mesh_v2/mesh-client.js @@ -32,5 +32,6 @@ const getClient = () => client; module.exports = { createClient, getClient, - gql + gql, + GRAPHQL_ENDPOINT }; diff --git a/src/extensions/scratch3_mesh_v2/mesh-service.js b/src/extensions/scratch3_mesh_v2/mesh-service.js index fec9e7a8ad..ca20b09067 100644 --- a/src/extensions/scratch3_mesh_v2/mesh-service.js +++ b/src/extensions/scratch3_mesh_v2/mesh-service.js @@ -14,10 +14,16 @@ const { SEND_MEMBER_HEARTBEAT, REPORT_DATA, FIRE_EVENTS, + RECORD_EVENTS, + GET_EVENTS_SINCE, ON_MESSAGE_IN_GROUP, LIST_GROUP_STATUSES } = require('./gql-operations'); +const {getForcePollingFromUrl} = require('./utils'); + +const {GRAPHQL_ENDPOINT} = require('./mesh-client'); + /** * Parses an environment variable as an integer with validation. * @param {string} envVar - The environment variable value. @@ -58,11 +64,19 @@ class MeshV2Service { this.groupName = null; this.expiresAt = null; this.isHost = false; + this.forcePolling = getForcePollingFromUrl(); + this.useWebSocket = !this.forcePolling; + if (this.forcePolling) { + log.info('Mesh V2: Forced polling mode enabled via URL parameter'); + } + this.pollingIntervalSeconds = 2; + this.lastFetchTime = ''; this.subscriptions = []; this.connectionTimer = null; this.heartbeatTimer = null; this.dataSyncTimer = null; + this.pollingTimer = null; // Last data send promise to track completion of the most recent data transmission this.lastDataSendPromise = Promise.resolve(); @@ -198,6 +212,49 @@ class MeshV2Service { } } + /** + * Test if WebSocket connection is possible in the current environment. + * @returns {Promise} True if WebSocket is available. + */ + testWebSocket () { + return new Promise(resolve => { + try { + // Derived from https://xxx.appsync-api.region.amazonaws.com/graphql + // to wss://xxx.appsync-realtime-api.region.amazonaws.com/graphql + // or for custom domains, to wss://api.example.com/graphql/realtime + let wsUrl = GRAPHQL_ENDPOINT.replace('https://', 'wss://'); + if (wsUrl.includes('appsync-api')) { + wsUrl = wsUrl.replace('appsync-api', 'appsync-realtime-api'); + } else { + wsUrl = wsUrl.replace(/\/graphql$/, '/graphql/realtime'); + } + + const socket = new WebSocket(wsUrl, 'graphql-ws'); + const timeout = setTimeout(() => { + log.warn('Mesh V2: WebSocket test timed out'); + socket.close(); + resolve(false); + }, 3000); // 3 seconds timeout for test + + socket.onopen = () => { + log.info('Mesh V2: WebSocket test successful'); + clearTimeout(timeout); + socket.close(); + resolve(true); + }; + + socket.onerror = err => { + log.warn(`Mesh V2: WebSocket test failed: ${err}`); + clearTimeout(timeout); + resolve(false); + }; + } catch (error) { + log.warn(`Mesh V2: WebSocket not supported or failed to initialize: ${error}`); + resolve(false); + } + }); + } + async createDomain () { if (!this.client) throw new Error('Client not initialized'); @@ -224,13 +281,24 @@ class MeshV2Service { await this.createDomain(); } + // Test WebSocket availability + if (this.forcePolling) { + this.useWebSocket = false; + } else { + this.useWebSocket = await this.testWebSocket(); + } + log.info(`Mesh V2: WebSocket available: ${this.useWebSocket}`); + this.costTracking.mutationCount++; + this.lastFetchTime = new Date().toISOString(); + log.info(`Mesh V2: Initialized lastFetchTime to ${this.lastFetchTime} (before createGroup)`); const result = await this.client.mutate({ mutation: CREATE_GROUP, variables: { name: groupName, hostId: this.meshId, - domain: this.domain + domain: this.domain, + useWebSocket: this.useWebSocket } }); @@ -239,13 +307,21 @@ class MeshV2Service { this.groupName = group.name; this.domain = group.domain; // Update domain from server this.expiresAt = group.expiresAt; + this.useWebSocket = group.useWebSocket; + if (group.pollingIntervalSeconds) { + this.pollingIntervalSeconds = group.pollingIntervalSeconds; + } this.isHost = true; if (group.heartbeatIntervalSeconds) { this.hostHeartbeatInterval = group.heartbeatIntervalSeconds; } this.costTracking.connectionStartTime = Date.now(); - this.startSubscriptions(); + if (this.useWebSocket) { + this.startSubscriptions(); + } else { + this.startPolling(); + } this.startHeartbeat(); this.startEventBatchTimer(); this.startConnectionTimer(); @@ -253,7 +329,8 @@ class MeshV2Service { await this.sendAllGlobalVariables(); - log.info(`Mesh V2: Created group ${this.groupName} (${this.groupId}) in domain ${this.domain}`); + log.info(`Mesh V2: Created group ${this.groupName} (${this.groupId}) in domain ${this.domain} ` + + `(Protocol: ${this.useWebSocket ? 'WebSocket' : 'Polling'})`); return group; } catch (error) { log.error(`Mesh V2: Failed to create group: ${error}`); @@ -291,6 +368,8 @@ class MeshV2Service { try { this.costTracking.mutationCount++; + this.lastFetchTime = new Date().toISOString(); + log.info(`Mesh V2: Initialized lastFetchTime to ${this.lastFetchTime} (before joinGroup)`); const result = await this.client.mutate({ mutation: JOIN_GROUP, variables: { @@ -305,13 +384,21 @@ class MeshV2Service { this.groupName = groupName || groupId; this.domain = node.domain; // Update domain from server this.expiresAt = node.expiresAt; + this.useWebSocket = this.forcePolling ? false : node.useWebSocket; + if (node.pollingIntervalSeconds) { + this.pollingIntervalSeconds = node.pollingIntervalSeconds; + } this.isHost = false; if (node.heartbeatIntervalSeconds) { this.memberHeartbeatInterval = node.heartbeatIntervalSeconds; } this.costTracking.connectionStartTime = Date.now(); - this.startSubscriptions(); + if (this.useWebSocket) { + this.startSubscriptions(); + } else { + this.startPolling(); + } this.startHeartbeat(); // Start heartbeat for member too this.startEventBatchTimer(); this.startConnectionTimer(); @@ -320,7 +407,8 @@ class MeshV2Service { await this.sendAllGlobalVariables(); await this.fetchAllNodesData(); - log.info(`Mesh V2: Joined group ${this.groupId} in domain ${this.domain}`); + log.info(`Mesh V2: Joined group ${this.groupId} in domain ${this.domain} ` + + `(Protocol: ${this.useWebSocket ? 'WebSocket' : 'Polling'})`); return node; } catch (error) { log.error(`Mesh V2: Failed to join group: ${error}`); @@ -426,6 +514,7 @@ class MeshV2Service { this.lastBroadcastOffset = 0; this.stopSubscriptions(); + this.stopPolling(); this.stopHeartbeat(); this.stopEventBatchTimer(); this.stopConnectionTimer(); @@ -483,6 +572,83 @@ class MeshV2Service { this.subscriptions = []; } + /** + * Start polling for events when WebSocket is not available. + */ + startPolling () { + this.stopPolling(); + if (!this.groupId) return; + + log.info(`Mesh V2: Starting event polling (Interval: ${this.pollingIntervalSeconds}s)`); + + this.pollingTimer = setInterval(() => { + this.pollEvents(); + }, this.pollingIntervalSeconds * 1000); + } + + /** + * Stop event polling. + */ + stopPolling () { + if (this.pollingTimer) { + log.info('Mesh V2: Stopping event polling'); + clearInterval(this.pollingTimer); + this.pollingTimer = null; + } + this.lastFetchTime = ''; + } + + /** + * Fetch new events from the server since the last fetch time. + */ + async pollEvents () { + if (!this.groupId || !this.client || this.useWebSocket) return; + + if (!this.lastFetchTime) { + log.warn('Mesh V2: pollEvents called but lastFetchTime is empty. Falling back to current time.'); + this.lastFetchTime = new Date().toISOString(); + } + + log.debug(`Mesh V2: pollEvents for group ${this.groupId}. since=${this.lastFetchTime}`); + + try { + this.costTracking.queryCount++; + const result = await this.client.query({ + query: GET_EVENTS_SINCE, + variables: { + groupId: this.groupId, + domain: this.domain, + since: this.lastFetchTime + }, + fetchPolicy: 'network-only' + }); + + if (result.data && result.data.getEventsSince) { + const events = result.data.getEventsSince; + if (events.length > 0) { + log.info(`Mesh V2: Polled ${events.length} events`); + + // Filter out events from self and sort by timestamp to preserve order + const otherEvents = events + .filter(event => event.firedByNodeId !== this.meshId); + + if (otherEvents.length > 0) { + this._queueEventsForPlayback(otherEvents); + } + + // ALWAYS update lastFetchTime from the LAST event in the result (including our own) + // to ensure we don't fetch the same events again. + const lastEvent = events[events.length - 1]; + if (lastEvent.cursor) { + this.lastFetchTime = lastEvent.cursor; + } + } + } + } catch (error) { + log.error(`Mesh V2: Event polling failed: ${error}`); + } + } + handleDataUpdate (nodeStatus) { if (!nodeStatus || nodeStatus.nodeId === this.meshId) return; @@ -514,26 +680,37 @@ class MeshV2Service { log.info(`Mesh V2: Received ${events.length} events from ${batchEvent.firedByNodeId}`); - // タイムスタンプでソート - const sortedEvents = events.sort((a, b) => new Date(a.timestamp) - new Date(b.timestamp)); + this._queueEventsForPlayback(events); + } + + /** + * Internal method to queue events for playback with relative timing. + * @param {Array} events - Array of events to queue. + * @private + */ + _queueEventsForPlayback (events) { + if (!events || events.length === 0) return; + + // タイムスタンプでソート(副作用を避けるためコピーを作成) + const sortedEvents = [...events].sort((a, b) => new Date(a.timestamp) - new Date(b.timestamp)); // 最初のイベントを基準にオフセットを計算 const baseTime = new Date(sortedEvents[0].timestamp).getTime(); - // キューに追加(setTimeoutは使わない) + // キューに追加 sortedEvents.forEach(event => { const eventTime = new Date(event.timestamp).getTime(); const offsetMs = eventTime - baseTime; this.pendingBroadcasts.push({ event: event, - offsetMs: offsetMs // 元のタイミング情報を保持 + offsetMs: offsetMs }); log.info(`Mesh V2: Queued event: ${event.name} ` + `(offset: ${offsetMs}ms, original timestamp: ${event.timestamp})`); }); - // バッチ処理開始時刻を記録(最初のイベント追加時のみ) + // バッチ処理開始時刻を記録(未開始の場合のみ) if (this.batchStartTime === null && this.pendingBroadcasts.length > 0) { this.batchStartTime = Date.now(); this.lastBroadcastOffset = 0; @@ -679,16 +856,33 @@ class MeshV2Service { this.costTracking.mutationCount++; this.costTracking.fireEventsCount++; - log.info(`Mesh V2: Sending batch of ${events.length} events to group ${this.groupId}`); - await this.client.mutate({ - mutation: FIRE_EVENTS, - variables: { - groupId: this.groupId, - domain: this.domain, - nodeId: this.meshId, - events: events + log.info(`Mesh V2: Sending batch of ${events.length} events to group ${this.groupId} ` + + `(Protocol: ${this.useWebSocket ? 'WebSocket' : 'Polling'})`); + + if (this.useWebSocket) { + await this.client.mutate({ + mutation: FIRE_EVENTS, + variables: { + groupId: this.groupId, + domain: this.domain, + nodeId: this.meshId, + events: events + } + }); + } else { + const result = await this.client.mutate({ + mutation: RECORD_EVENTS, + variables: { + groupId: this.groupId, + domain: this.domain, + nodeId: this.meshId, + events: events + } + }); + if (result.data && result.data.recordEventsByNode && result.data.recordEventsByNode.nextSince) { + this.lastFetchTime = result.data.recordEventsByNode.nextSince; } - }); + } } catch (error) { log.error(`Mesh V2: Failed to fire batch events: ${error}`); const reason = this.shouldDisconnectOnError(error); @@ -737,8 +931,10 @@ class MeshV2Service { hostId: this.meshId } }); + this.expiresAt = result.data.renewHeartbeat.expiresAt; log.info(`Mesh V2: Heartbeat renewed. Expires at: ${this.expiresAt}`); + if (result.data.renewHeartbeat.heartbeatIntervalSeconds) { const newInterval = result.data.renewHeartbeat.heartbeatIntervalSeconds; if (newInterval !== this.hostHeartbeatInterval) { @@ -771,18 +967,12 @@ class MeshV2Service { nodeId: this.meshId } }); + log.info('Mesh V2: Member heartbeat sent'); if (result.data.sendMemberHeartbeat.expiresAt) { this.expiresAt = result.data.sendMemberHeartbeat.expiresAt; - this.startConnectionTimer(); - } - if (result.data.sendMemberHeartbeat.heartbeatIntervalSeconds) { - const newInterval = result.data.sendMemberHeartbeat.heartbeatIntervalSeconds; - if (newInterval !== this.memberHeartbeatInterval) { - this.memberHeartbeatInterval = newInterval; - this.startHeartbeat(); // Restart with new interval - } } + return result.data.sendMemberHeartbeat; } catch (error) { log.error(`Mesh V2: Member heartbeat failed: ${error}`); diff --git a/src/extensions/scratch3_mesh_v2/utils.js b/src/extensions/scratch3_mesh_v2/utils.js index ad3eb5bd92..194d1cdb98 100644 --- a/src/extensions/scratch3_mesh_v2/utils.js +++ b/src/extensions/scratch3_mesh_v2/utils.js @@ -53,10 +53,20 @@ const saveDomainToLocalStorage = domain => { /* istanbul ignore next */ const getDomain = () => getDomainFromUrl() || getDomainFromLocalStorage(); +/* istanbul ignore next */ +const getForcePollingFromUrl = () => { + /* istanbul ignore next */ + if (typeof window === 'undefined') return false; + const urlParams = new URLSearchParams(window.location.search); + const polling = urlParams.get('mesh_polling'); + return !!polling && polling !== '0' && polling !== 'false'; +}; + module.exports = { getDomainFromUrl, getDomainFromLocalStorage, saveDomainToLocalStorage, getDomain, - validateDomain + validateDomain, + getForcePollingFromUrl }; diff --git a/test/unit/mesh_service_v2_polling.js b/test/unit/mesh_service_v2_polling.js new file mode 100644 index 0000000000..03da178e70 --- /dev/null +++ b/test/unit/mesh_service_v2_polling.js @@ -0,0 +1,218 @@ +const test = require('tap').test; +const minilog = require('minilog'); +// Suppress debug and info logs during tests +minilog.suggest.deny('vm', 'debug'); +minilog.suggest.deny('vm', 'info'); + +const MeshV2Service = require('../../src/extensions/scratch3_mesh_v2/mesh-service'); +const {GET_EVENTS_SINCE, RECORD_EVENTS} = require('../../src/extensions/scratch3_mesh_v2/gql-operations'); + +const createMockBlocks = () => ({ + runtime: { + sequencer: {}, + emit: () => {}, + on: () => {}, + off: () => {} + }, + opcodeFunctions: { + event_broadcast: () => {} + } +}); + +test('MeshV2Service Polling', t => { + t.test('pollEvents fetches and handles events', async st => { + const blocks = createMockBlocks(); + const service = new MeshV2Service(blocks, 'node1', 'domain1'); + service.groupId = 'group1'; + service.useWebSocket = false; + service.lastFetchTime = 'T1'; + + const events = [ + { + name: 'e1', + firedByNodeId: 'node2', + groupId: 'group1', + domain: 'domain1', + payload: 'p1', + timestamp: 'T2', + cursor: 'C2' + }, + { + name: 'e2', + firedByNodeId: 'node2', + groupId: 'group1', + domain: 'domain1', + payload: 'p2', + timestamp: 'T3', + cursor: 'C3' + } + ]; + + service.client = { + query: options => { + st.equal(options.query, GET_EVENTS_SINCE); + st.equal(options.variables.since, 'T1'); + return Promise.resolve({ + data: { + getEventsSince: events + } + }); + } + }; + + await service.pollEvents(); + + st.equal(service.pendingBroadcasts.length, 2); + st.equal(service.pendingBroadcasts[0].event.name, 'e1'); + st.equal(service.lastFetchTime, 'C3'); + + st.end(); + }); + + t.test('fireEventsBatch uses RECORD_EVENTS when useWebSocket is false', async st => { + const blocks = createMockBlocks(); + const service = new MeshV2Service(blocks, 'node1', 'domain1'); + service.groupId = 'group1'; + service.useWebSocket = false; + service.lastFetchTime = null; + + const events = [{eventName: 'e1', payload: 'p1', firedAt: 't1'}]; + + service.client = { + mutate: options => { + st.equal(options.mutation, RECORD_EVENTS); + return Promise.resolve({ + data: { + recordEventsByNode: { + nextSince: 'T_NEW' + } + } + }); + } + }; + + await service.fireEventsBatch(events); + + st.equal(service.lastFetchTime, 'T_NEW'); + + st.end(); + }); + + t.test('startPolling sets up interval', st => { + const blocks = createMockBlocks(); + const service = new MeshV2Service(blocks, 'node1', 'domain1'); + service.groupId = 'group1'; + service.useWebSocket = false; + service.pollingIntervalSeconds = 0.01; // 10ms + + let pollCount = 0; + service.pollEvents = () => { + pollCount++; + }; + + service.startPolling(); + st.ok(service.pollingTimer); + + setTimeout(() => { + service.stopPolling(); + st.ok(pollCount > 0); + st.equal(service.pollingTimer, null); + st.end(); + }, 50); + }); + + t.test('testWebSocket success', async st => { + const blocks = createMockBlocks(); + const service = new MeshV2Service(blocks, 'node1', 'domain1'); + + // Mock WebSocket + global.WebSocket = class { + constructor () { + setTimeout(() => this.onopen(), 10); + } + close () {} + }; + + const result = await service.testWebSocket(); + st.equal(result, true); + + delete global.WebSocket; + st.end(); + }); + + t.test('testWebSocket failure', async st => { + const blocks = createMockBlocks(); + const service = new MeshV2Service(blocks, 'node1', 'domain1'); + + // Mock WebSocket + global.WebSocket = class { + constructor () { + setTimeout(() => this.onerror(new Error('fail')), 10); + } + close () {} + }; + + const result = await service.testWebSocket(); + st.equal(result, false); + + delete global.WebSocket; + st.end(); + }); + + t.test('pollEvents filters out self-fired events', async st => { + const blocks = createMockBlocks(); + const service = new MeshV2Service(blocks, 'node1', 'domain1'); + service.groupId = 'group1'; + service.useWebSocket = false; + service.lastFetchTime = 'T1'; + + const events = [ + { + name: 'self-event', + firedByNodeId: 'node1', // self + timestamp: 'T2', + cursor: 'C2' + }, + { + name: 'other-event', + firedByNodeId: 'node2', + timestamp: 'T3', + cursor: 'C3' + } + ]; + + service.client = { + query: () => Promise.resolve({data: {getEventsSince: events}}) + }; + + await service.pollEvents(); + + st.equal(service.pendingBroadcasts.length, 1); + st.equal(service.pendingBroadcasts[0].event.name, 'other-event'); + st.equal(service.lastFetchTime, 'C3'); // cursor still updates + st.equal(service.costTracking.queryCount, 1); + + st.end(); + }); + + t.test('pollEvents falls back to current time if lastFetchTime is empty', async st => { + const blocks = createMockBlocks(); + const service = new MeshV2Service(blocks, 'node1', 'domain1'); + service.groupId = 'group1'; + service.useWebSocket = false; + service.lastFetchTime = ''; // empty + + service.client = { + query: options => { + st.ok(options.variables.since); + st.ok(new Date(options.variables.since).getTime() > 0); + return Promise.resolve({data: {getEventsSince: []}}); + } + }; + + await service.pollEvents(); + st.end(); + }); + + t.end(); +});