diff --git a/.env.example b/.env.example index 5869f05..84cd6ef 100644 --- a/.env.example +++ b/.env.example @@ -32,3 +32,10 @@ MESH_MAX_CONNECTION_TIME_SECONDS=1500 # MESH_MEMBER_HEARTBEAT_INTERVAL_SECONDS=15 # MESH_MEMBER_HEARTBEAT_TTL_SECONDS=60 # MESH_MAX_CONNECTION_TIME_SECONDS=300 + +# Event Polling Settings +# Event TTL in seconds (events are auto-deleted after this time) +MESH_EVENT_TTL_SECONDS=10 + +# Polling interval for clients that cannot use WebSocket (in seconds) +MESH_POLLING_INTERVAL_SECONDS=2 diff --git a/README.md b/README.md index 86cf888..65d3ba9 100644 --- a/README.md +++ b/README.md @@ -38,6 +38,23 @@ Mesh v2 is a cloud-based backend system that enables real-time data sharing and - **Event Rate**: 2 events/sec/group - **Total Write Load**: 170 TPS +## Event Communication Protocols + +Mesh v2 supports two protocols for event communication: + +### 1. WebSocket Protocol (Primary) +- **Mechanism**: AppSync Subscriptions (WebSocket). +- **Features**: Real-time, low latency, low cost. +- **Requirement**: Network environment must allow `wss://` protocol. + +### 2. Polling Protocol (Fallback) +- **Mechanism**: `recordEventsByNode` mutation (save to DynamoDB) + `getEventsSince` query (polling). +- **Features**: HTTPS only, works behind strict firewalls/filters. +- **Latency**: Up to 2 seconds (default polling interval). +- **TTL**: Events are automatically deleted after 10 seconds to minimize storage costs. + +Clients automatically detect WebSocket availability during group creation and switch to the Polling protocol if necessary. + ## Event Batching Mesh v2 supports batch event sending to optimize AWS AppSync Subscription costs and preserve event timing. diff --git a/docs/api-reference.md b/docs/api-reference.md index 1b93510..b2ab2ab 100644 --- a/docs/api-reference.md +++ b/docs/api-reference.md @@ -27,6 +27,8 @@ type Group { createdAt: AWSDateTime! expiresAt: AWSDateTime! # グループの有効期限 heartbeatIntervalSeconds: Int + useWebSocket: Boolean! # NEW: WebSocket 使用フラグ + pollingIntervalSeconds: Int # NEW: ポーリング間隔(useWebSocket=false の場合のみ) } ``` @@ -38,7 +40,10 @@ type Node { name: String! groupId: ID domain: String # 所属しているdomain + expiresAt: AWSDateTime heartbeatIntervalSeconds: Int + useWebSocket: Boolean # NEW: グループの設定を継承 + pollingIntervalSeconds: Int # NEW: ポーリング間隔(useWebSocket=false の場合のみ) } ``` @@ -57,8 +62,11 @@ type SensorData { type Event { name: String! firedByNodeId: ID! - payload: AWSJSON + groupId: ID! + domain: String! + payload: String timestamp: AWSDateTime! + cursor: String # NEW: ポーリング用のカーソル(SK) } ``` @@ -177,6 +185,29 @@ query ListNodesInGroup($groupId: ID!, $domain: String!) { **用途**: グループメンバーの一覧取得。 +### getEventsSince + +前回取得日時以降のイベントを取得します(ポーリング用)。 + +```graphql +query GetEventsSince($groupId: ID!, $domain: String!, $since: String!) { + getEventsSince(groupId: $groupId, domain: $domain, since: $since) { + name + firedByNodeId + groupId + domain + payload + timestamp + cursor + } +} +``` + +**パラメータ**: +- `since: String!` - 前回の `nextSince` または最後に取得したイベントの `cursor` を指定します。 + +**戻り値**: イベントの配列。最大 100 件まで取得されます。 + ## Mutations ### createDomain @@ -293,6 +324,35 @@ mutation FireEventsByNode( } ``` +### recordEventsByNode + +ノードが複数のイベントを一度に送信し、DynamoDB に保存します(ポーリング用)。 + +```graphql +mutation RecordEventsByNode( + $nodeId: ID! + $groupId: ID! + $domain: String! + $events: [EventInput!]! +) { + recordEventsByNode( + nodeId: $nodeId + groupId: $groupId + domain: $domain + events: $events + ) { + groupId + domain + recordedCount + nextSince + } +} +``` + +**用途**: WebSocket が使用できない環境でのイベント送信に使用。この mutation は `onMessageInGroup` subscription を**トリガーしません**。 + +--- + ### leaveGroup ノードがグループから退出します。 diff --git a/docs/architecture.md b/docs/architecture.md index c66f96d..50c7185 100644 --- a/docs/architecture.md +++ b/docs/architecture.md @@ -170,6 +170,8 @@ sequenceDiagram participant DynamoDB participant Subscription + Note over Node1,Subscription: WebSocket プロトコル使用時 + Node2->>AppSync: subscribe: onMessageInGroup(groupId, domain) AppSync->>Subscription: WebSocket接続確立 @@ -177,8 +179,7 @@ sequenceDiagram AppSync->>Resolver: JS Resolver Resolver->>DynamoDB: Query: checkGroupExists alt グループ存在 - Resolver->>DynamoDB: BatchWriteItem: イベント保存(最大25件) - DynamoDB-->>Resolver: Success + Resolver->>DynamoDB: (None DataSource): Skip persistence Resolver-->>AppSync: MeshMessage AppSync->>Subscription: Publish: onMessageInGroup (batchEvent) Subscription-->>Node2: MeshMessage(リアルタイム配信) @@ -189,6 +190,36 @@ sequenceDiagram end ``` +### イベント通信フロー(ポーリング) + +```mermaid +sequenceDiagram + participant Node1 as Node 1 (送信) + participant Node2 as Node 2 (受信) + participant AppSync + participant Resolver + participant DynamoDB + + Note over Node1,DynamoDB: ポーリング プロトコル使用時 + + Node1->>AppSync: recordEventsByNode(nodeId, groupId, domain, events[]) + AppSync->>Resolver: JS Resolver (Pipeline) + Resolver->>DynamoDB: checkGroupExists + Resolver->>DynamoDB: BatchWriteItem: イベント保存 (TTL 10秒) + DynamoDB-->>Resolver: Success + Resolver-->>AppSync: RecordEventsPayload (nextSince) + AppSync-->>Node1: Response + + loop 2秒間隔 + Node2->>AppSync: getEventsSince(groupId, domain, since) + AppSync->>Resolver: JS Resolver (Unit) + Resolver->>DynamoDB: Query: pk=GROUP#id@domain AND sk > EVENT#since + DynamoDB-->>Resolver: Items (Event[]) + Resolver-->>AppSync: Event[] + AppSync-->>Node2: Response + end +``` + ### ハートビート更新フロー ```mermaid @@ -299,7 +330,9 @@ Mesh v2 は Single Table Design を採用し、1つのテーブルにすべて "hostId": "node-001", "createdAt": "2026-01-01T00:00:00Z", "expiresAt": 1704067200, - "ttl": 1704067200 + "ttl": 1704067200, + "useWebSocket": true, + "pollingIntervalSeconds": 2 } ``` @@ -376,7 +409,8 @@ Mesh v2 は Single Table Design を採用し、1つのテーブルにすべて "groupId": "abc123", "domain": "192.168.1.1", "payload": "{\"button\":\"A\"}", - "timestamp": "2026-01-01T12:00:00.123Z" + "timestamp": "2026-01-01T12:00:00.123Z", + "ttl": 1704067210 } ``` diff --git a/graphql/schema.graphql b/graphql/schema.graphql index 21cc0a2..4d2eaec 100644 --- a/graphql/schema.graphql +++ b/graphql/schema.graphql @@ -18,9 +18,10 @@ type Group { fullId: String! # {group_id}@{domain}の完全なID name: String! hostId: ID! - createdAt: AWSDateTime! expiresAt: AWSDateTime! heartbeatIntervalSeconds: Int + useWebSocket: Boolean! # NEW: WebSocket 使用フラグ + pollingIntervalSeconds: Int # NEW: ポーリング間隔(useWebSocket=false の場合のみ) } type Node { @@ -30,6 +31,8 @@ type Node { domain: String # 所属しているdomain expiresAt: AWSDateTime heartbeatIntervalSeconds: Int + useWebSocket: Boolean # NEW: グループの設定を継承 + pollingIntervalSeconds: Int # NEW: ポーリング間隔(useWebSocket=false の場合のみ) } # メッセージの包括的な型 @@ -58,6 +61,7 @@ type Event { domain: String! payload: String timestamp: AWSDateTime! + cursor: String # NEW: ポーリング用のカーソル(SK) } # イベントのバッチ送信用の入力型 @@ -104,6 +108,14 @@ type LeaveGroupPayload { message: String! } +# NEW: recordEventsByNode のレスポンス型 +type RecordEventsPayload { + groupId: ID! + domain: String! + recordedCount: Int! # 記録したイベント数 + nextSince: String! # NEW: 次回の getEventsSince の since に指定する値 +} + # --- クエリ (Queries) --- type Query { @@ -118,6 +130,14 @@ type Query { # グループ内の全Node取得 listNodesInGroup(groupId: ID!, domain: String!): [Node!]! + + # NEW: 前回取得日時以降のイベントを取得(ポーリング用) + # since: 前回の nextSince または Event.cursor を指定 + getEventsSince( + groupId: ID! + domain: String! + since: String! + ): [Event!]! } # --- ミューテーション (Mutations) --- @@ -133,6 +153,7 @@ type Mutation { hostId: ID! domain: String! maxConnectionTimeSeconds: Int # オプション: 1以上、環境変数の値以下 + useWebSocket: Boolean! # NEW: WebSocket 使用フラグ ): Group! joinGroup( @@ -182,6 +203,14 @@ type Mutation { nodeId: ID! events: [EventInput!]! ): MeshMessage + + # NEW: ポーリング用のイベント記録 mutation + recordEventsByNode( + groupId: ID! + domain: String! + nodeId: ID! + events: [EventInput!]! + ): RecordEventsPayload! } # --- サブスクリプション (Subscriptions) --- diff --git a/js/functions/checkExistingGroup.js b/js/functions/checkExistingGroup.js index 10a1c16..db81532 100644 --- a/js/functions/checkExistingGroup.js +++ b/js/functions/checkExistingGroup.js @@ -52,7 +52,9 @@ export function response(ctx) { fullId: existingGroups[0].fullId, name: existingGroups[0].name, hostId: existingGroups[0].hostId, - createdAt: existingGroups[0].createdAt + expiresAt: existingGroups[0].expiresAt, + useWebSocket: existingGroups[0].useWebSocket !== false, // デフォルト true + pollingIntervalSeconds: existingGroups[0].pollingIntervalSeconds }; } diff --git a/js/functions/createGroupIfNotExists.js b/js/functions/createGroupIfNotExists.js index 4a011ff..02b8387 100644 --- a/js/functions/createGroupIfNotExists.js +++ b/js/functions/createGroupIfNotExists.js @@ -16,7 +16,7 @@ export function request(ctx) { } // 新規グループ作成 - const { name, hostId, domain, maxConnectionTimeSeconds } = ctx.args; + const { name, hostId, domain, maxConnectionTimeSeconds, useWebSocket } = ctx.args; // Domain文字列のバリデーション(最大256文字) if (domain.length > 256) { @@ -40,6 +40,9 @@ export function request(ctx) { actualMaxSeconds = maxConnectionTimeSeconds; } + // Polling interval decision + const pollingInterval = useWebSocket ? null : +(ctx.env.MESH_POLLING_INTERVAL_SECONDS || '2'); + // グループID生成 const groupId = util.autoId(); const fullId = `${groupId}@${domain}`; @@ -65,6 +68,8 @@ export function request(ctx) { expiresAt: expiresAt, heartbeatAt: nowEpoch, ttl: ttl, + useWebSocket: useWebSocket, + pollingIntervalSeconds: pollingInterval, // GSI用(groupId -> domain の逆引き検索) gsi_pk: `GROUP#${groupId}`, gsi_sk: `DOMAIN#${domain}` @@ -91,7 +96,9 @@ export function response(ctx) { hostId: ctx.stash.existingGroup.hostId, createdAt: ctx.stash.existingGroup.createdAt, expiresAt: ctx.stash.existingGroup.expiresAt, - heartbeatIntervalSeconds: heartbeatIntervalSeconds + heartbeatIntervalSeconds: heartbeatIntervalSeconds, + useWebSocket: ctx.stash.existingGroup.useWebSocket !== false, // デフォルト true + pollingIntervalSeconds: ctx.stash.existingGroup.pollingIntervalSeconds }; } @@ -104,6 +111,8 @@ export function response(ctx) { hostId: ctx.result.hostId, createdAt: ctx.result.createdAt, expiresAt: ctx.result.expiresAt, - heartbeatIntervalSeconds: heartbeatIntervalSeconds + heartbeatIntervalSeconds: heartbeatIntervalSeconds, + useWebSocket: ctx.result.useWebSocket, + pollingIntervalSeconds: ctx.result.pollingIntervalSeconds }; } diff --git a/js/functions/joinGroupFunction.js b/js/functions/joinGroupFunction.js index 72871e1..bc7a5f3 100644 --- a/js/functions/joinGroupFunction.js +++ b/js/functions/joinGroupFunction.js @@ -72,6 +72,8 @@ export function response(ctx) { groupId: groupId, domain: domain, expiresAt: group ? group.expiresAt : null, - heartbeatIntervalSeconds: +(ctx.env.MESH_MEMBER_HEARTBEAT_INTERVAL_SECONDS || '120') + heartbeatIntervalSeconds: +(ctx.env.MESH_MEMBER_HEARTBEAT_INTERVAL_SECONDS || '120'), + useWebSocket: group ? group.useWebSocket !== false : true, + pollingIntervalSeconds: group ? group.pollingIntervalSeconds : null }; } diff --git a/js/resolvers/Query.getEventsSince.js b/js/resolvers/Query.getEventsSince.js new file mode 100644 index 0000000..e45b9ad --- /dev/null +++ b/js/resolvers/Query.getEventsSince.js @@ -0,0 +1,38 @@ +// Query.getEventsSince Resolver +// 前回取得日時以降のイベントを取得する(ポーリング用) + +import { util } from '@aws-appsync/utils'; + +export function request(ctx) { + const { groupId, domain, since } = ctx.arguments; + const sk = since.startsWith('EVENT#') ? since : `EVENT#${since}`; + + return { + operation: 'Query', + query: { + expression: 'pk = :pk AND sk > :sk', + expressionValues: util.dynamodb.toMapValues({ + ':pk': `GROUP#${groupId}@${domain}`, + ':sk': sk + }) + }, + limit: 100, + scanIndexForward: true // timestamp で昇順にソート + }; +} + +export function response(ctx) { + if (ctx.error) { + util.error(ctx.error.message, ctx.error.type); + } + + return ctx.result.items.map(item => ({ + name: item.eventName, + firedByNodeId: item.firedByNodeId, + groupId: item.groupId, + domain: item.domain, + payload: item.payload, + timestamp: item.timestamp, + cursor: item.sk + })); +} diff --git a/lambda/domain/group.rb b/lambda/domain/group.rb index 765adfa..0be996a 100644 --- a/lambda/domain/group.rb +++ b/lambda/domain/group.rb @@ -1,14 +1,16 @@ # Group Domain Model # ドメインモデル - ビジネスルールとバリデーションを持つ class Group - attr_reader :id, :name, :host_id, :domain, :created_at + attr_reader :id, :name, :host_id, :domain, :created_at, :use_websocket, :polling_interval_seconds - def initialize(id:, name:, host_id:, domain:, created_at:) + def initialize(id:, name:, host_id:, domain:, created_at:, use_websocket: true, polling_interval_seconds: nil) @id = id @name = name @host_id = host_id @domain = domain @created_at = created_at + @use_websocket = use_websocket + @polling_interval_seconds = polling_interval_seconds validate! end diff --git a/lambda/handlers/appsync_handler.rb b/lambda/handlers/appsync_handler.rb index 862b34f..1602b82 100644 --- a/lambda/handlers/appsync_handler.rb +++ b/lambda/handlers/appsync_handler.rb @@ -4,6 +4,7 @@ require_relative "../use_cases/dissolve_group" require_relative "../use_cases/leave_group" require_relative "../use_cases/create_domain" +require_relative "../use_cases/record_events" require_relative "../repositories/dynamodb_repository" require "aws-sdk-dynamodb" require "json" @@ -24,6 +25,8 @@ def lambda_handler(event:, context:) handle_dissolve_group(arguments) when "leaveGroup" handle_leave_group(arguments) + when "recordEventsByNode" + handle_record_events(arguments) else raise StandardError, "Unknown field: #{field_name}" end @@ -31,6 +34,33 @@ def lambda_handler(event:, context:) # AppSyncが自動的にGraphQLエラーに変換する end +def handle_record_events(arguments) + # DynamoDBクライアントとリポジトリの初期化 + dynamodb = Aws::DynamoDB::Client.new(region: ENV["AWS_REGION"] || "ap-northeast-1") + repository = DynamoDBRepository.new(dynamodb) + + # ユースケースの実行 + use_case = RecordEventsUseCase.new(repository) + result = use_case.execute( + group_id: arguments["groupId"], + domain: arguments["domain"], + node_id: arguments["nodeId"], + events: arguments["events"], + ttl_seconds: (ENV["MESH_EVENT_TTL_SECONDS"] || "10").to_i + ) + + # エラーハンドリング + raise StandardError, result[:error] unless result[:success] + + # AppSync形式にフォーマット + { + groupId: result[:groupId], + domain: result[:domain], + recordedCount: result[:recordedCount], + nextSince: result[:nextSince] + } +end + def handle_create_group(arguments) # DynamoDBクライアントとリポジトリの初期化 dynamodb = Aws::DynamoDB::Client.new(region: ENV["AWS_REGION"] || "ap-northeast-1") @@ -41,7 +71,8 @@ def handle_create_group(arguments) group = use_case.execute( name: arguments["name"], host_id: arguments["hostId"], - domain: arguments["domain"] + domain: arguments["domain"], + use_websocket: arguments.key?("useWebSocket") ? arguments["useWebSocket"] : true ) # AppSync形式にフォーマット @@ -110,7 +141,9 @@ def format_group_response(group) fullId: group.full_id, name: group.name, hostId: group.host_id, - createdAt: group.created_at + createdAt: group.created_at, + useWebSocket: group.use_websocket, + pollingIntervalSeconds: group.polling_interval_seconds } end diff --git a/lambda/repositories/dynamodb_repository.rb b/lambda/repositories/dynamodb_repository.rb index 155fc84..f6aa022 100644 --- a/lambda/repositories/dynamodb_repository.rb +++ b/lambda/repositories/dynamodb_repository.rb @@ -1,4 +1,5 @@ require "aws-sdk-dynamodb" +require "securerandom" require_relative "../domain/group" # DynamoDB Repository @@ -53,6 +54,8 @@ def save_group(group) "name" => group.name, "hostId" => group.host_id, "createdAt" => group.created_at, + "useWebSocket" => group.use_websocket, + "pollingIntervalSeconds" => group.polling_interval_seconds, "gsi_pk" => "GROUP##{group.id}", "gsi_sk" => "DOMAIN##{group.domain}" } @@ -186,6 +189,49 @@ def delete_peer_data(group_id, domain, peer_id) false end + # イベントをバッチ保存 + def record_events(group_id, domain, node_id, events, ttl_seconds) + return {success: false, error: "DynamoDB client not initialized"} unless @dynamodb + + server_timestamp = Time.now.iso8601 + ttl = Time.now.to_i + ttl_seconds + last_sk = nil + + # DynamoDB BatchWriteItem 操作 + # 最大25アイテムまで + events.each_slice(25) do |slice| + put_requests = slice.map do |event| + sk = "EVENT##{server_timestamp}##{SecureRandom.uuid}" + last_sk = sk + { + put_request: { + item: { + "pk" => "GROUP##{group_id}@#{domain}", + "sk" => sk, + "eventName" => event["eventName"], + "firedByNodeId" => node_id, + "groupId" => group_id, + "domain" => domain, + "payload" => event["payload"], + "timestamp" => server_timestamp, + "ttl" => ttl + } + } + } + end + + @dynamodb.batch_write_item( + request_items: { + @table_name => put_requests + } + ) + end + {success: true, recordedCount: events.length, last_sk: last_sk} + rescue Aws::DynamoDB::Errors::ServiceError => e + puts "DynamoDB Error: #{e.message}" + {success: false, error: e.message} + end + private def item_to_group(item) @@ -194,7 +240,9 @@ def item_to_group(item) name: item["name"], host_id: item["hostId"], domain: item["domain"], - created_at: item["createdAt"] + created_at: item["createdAt"], + use_websocket: item.key?("useWebSocket") ? item["useWebSocket"] : true, + polling_interval_seconds: item["pollingIntervalSeconds"] ) end end diff --git a/lambda/use_cases/create_group.rb b/lambda/use_cases/create_group.rb index 00f65b5..9da855a 100644 --- a/lambda/use_cases/create_group.rb +++ b/lambda/use_cases/create_group.rb @@ -9,18 +9,22 @@ def initialize(repository) @repository = repository end - def execute(name:, host_id:, domain:) + def execute(name:, host_id:, domain:, use_websocket: true) # ビジネスロジック: 既存グループチェック(冪等性の実装) existing_group = @repository.find_group_by_host_and_domain(host_id, domain) return existing_group if existing_group + polling_interval = use_websocket ? nil : (ENV["MESH_POLLING_INTERVAL_SECONDS"] || 2).to_i + # 新規グループ作成 group = Group.new( id: generate_id, name: name, host_id: host_id, domain: domain, - created_at: Time.now.utc.iso8601 + created_at: Time.now.utc.iso8601, + use_websocket: use_websocket, + polling_interval_seconds: polling_interval ) @repository.save_group(group) diff --git a/lambda/use_cases/record_events.rb b/lambda/use_cases/record_events.rb new file mode 100644 index 0000000..c8457b8 --- /dev/null +++ b/lambda/use_cases/record_events.rb @@ -0,0 +1,29 @@ +require "time" +require "securerandom" + +class RecordEventsUseCase + def initialize(repository) + @repository = repository + end + + def execute(group_id:, domain:, node_id:, events:, ttl_seconds: 10) + # 1. グループの存在確認 + group = @repository.find_group(group_id, domain) + return {success: false, error: "Group not found"} unless group + + # 2. イベントの保存 + result = @repository.record_events(group_id, domain, node_id, events, ttl_seconds) + + if result[:success] + { + success: true, + groupId: group_id, + domain: domain, + recordedCount: result[:recordedCount], + nextSince: result[:last_sk] || "EVENT##{Time.now.iso8601}" + } + else + {success: false, error: result[:error] || "Failed to record events"} + end + end +end diff --git a/lib/mesh-v2-stack.ts b/lib/mesh-v2-stack.ts index 2d11396..aa2645d 100644 --- a/lib/mesh-v2-stack.ts +++ b/lib/mesh-v2-stack.ts @@ -124,7 +124,10 @@ export class MeshV2Stack extends cdk.Stack { MESH_MEMBER_HEARTBEAT_INTERVAL_SECONDS: process.env.MESH_MEMBER_HEARTBEAT_INTERVAL_SECONDS || '120', MESH_MEMBER_HEARTBEAT_TTL_SECONDS: process.env.MESH_MEMBER_HEARTBEAT_TTL_SECONDS || '600', MESH_MAX_CONNECTION_TIME_SECONDS: process.env.MESH_MAX_CONNECTION_TIME_SECONDS || defaultMaxConnTimeSeconds, + MESH_EVENT_TTL_SECONDS: process.env.MESH_EVENT_TTL_SECONDS || '10', + MESH_POLLING_INTERVAL_SECONDS: process.env.MESH_POLLING_INTERVAL_SECONDS || '2', }, + xrayEnabled: true, logConfig: { fieldLogLevel: appsync.FieldLogLevel.ALL, @@ -209,6 +212,40 @@ export class MeshV2Stack extends cdk.Stack { this.table ); + // None Data Source for event pass-through + const noneDataSource = this.api.addNoneDataSource('NoneDataSource'); + + // Lambda function for complex operations (Dissolve, Leave, RecordEvents, etc.) + const meshV2Lambda = new lambda.Function(this, 'MeshV2LambdaFunction', { + functionName: `MeshV2-GraphQL${stageSuffix}`, + runtime: lambda.Runtime.RUBY_3_4, + handler: 'handlers/appsync_handler.lambda_handler', + code: lambda.Code.fromAsset(path.join(__dirname, '../lambda')), + environment: { + LANG: 'en_US.UTF-8', + LC_ALL: 'en_US.UTF-8', + DYNAMODB_TABLE_NAME: this.table.tableName, + MESH_SECRET_KEY: process.env.MESH_SECRET_KEY || 'default-secret-key', + MESH_HOST_HEARTBEAT_INTERVAL_SECONDS: process.env.MESH_HOST_HEARTBEAT_INTERVAL_SECONDS || '60', + MESH_HOST_HEARTBEAT_TTL_SECONDS: process.env.MESH_HOST_HEARTBEAT_TTL_SECONDS || '150', + MESH_MEMBER_HEARTBEAT_INTERVAL_SECONDS: process.env.MESH_MEMBER_HEARTBEAT_INTERVAL_SECONDS || '120', + MESH_MEMBER_HEARTBEAT_TTL_SECONDS: process.env.MESH_MEMBER_HEARTBEAT_TTL_SECONDS || '600', + MESH_MAX_CONNECTION_TIME_SECONDS: process.env.MESH_MAX_CONNECTION_TIME_SECONDS || defaultMaxConnTimeSeconds, + MESH_EVENT_TTL_SECONDS: process.env.MESH_EVENT_TTL_SECONDS || '10', + MESH_POLLING_INTERVAL_SECONDS: process.env.MESH_POLLING_INTERVAL_SECONDS || '2', + }, + timeout: cdk.Duration.seconds(30), + }); + + // Grant Lambda permissions to DynamoDB + this.table.grantReadWriteData(meshV2Lambda); + + // Lambda Data Source + const meshV2DataSource = this.api.addLambdaDataSource( + 'MeshV2LambdaDataSource', + meshV2Lambda + ); + // Function: checkGroupExists (共通のグループ存在確認) const checkGroupExistsFunction = new appsync.AppsyncFunction(this, 'CheckGroupExistsFunction', { name: 'checkGroupExists', @@ -424,9 +461,6 @@ export class MeshV2Stack extends cdk.Stack { `) }); - // None Data Source for event pass-through - const noneDataSource = this.api.addNoneDataSource('NoneDataSource'); - // Function: fireEventsByNode (main logic for batch) const fireEventsByNodeFunction = new appsync.AppsyncFunction(this, 'FireEventsByNodeFunction', { name: 'fireEventsByNode', @@ -454,51 +488,34 @@ export class MeshV2Stack extends cdk.Stack { `) }); - // Resolvers for Phase 2-4: dissolveGroup with Lambda - - // Lambda function for dissolveGroup - const dissolveGroupLambda = new lambda.Function(this, 'DissolveGroupFunction', { - functionName: `MeshV2-DissolveGroup${stageSuffix}`, - runtime: lambda.Runtime.RUBY_3_4, - handler: 'handlers/appsync_handler.lambda_handler', - code: lambda.Code.fromAsset(path.join(__dirname, '../lambda')), - environment: { - LANG: 'en_US.UTF-8', - LC_ALL: 'en_US.UTF-8', - DYNAMODB_TABLE_NAME: this.table.tableName, - MESH_SECRET_KEY: process.env.MESH_SECRET_KEY || 'default-secret-key', - MESH_HOST_HEARTBEAT_INTERVAL_SECONDS: process.env.MESH_HOST_HEARTBEAT_INTERVAL_SECONDS || '60', - MESH_HOST_HEARTBEAT_TTL_SECONDS: process.env.MESH_HOST_HEARTBEAT_TTL_SECONDS || '150', - MESH_MEMBER_HEARTBEAT_INTERVAL_SECONDS: process.env.MESH_MEMBER_HEARTBEAT_INTERVAL_SECONDS || '120', - MESH_MEMBER_HEARTBEAT_TTL_SECONDS: process.env.MESH_MEMBER_HEARTBEAT_TTL_SECONDS || '600', - MESH_MAX_CONNECTION_TIME_SECONDS: process.env.MESH_MAX_CONNECTION_TIME_SECONDS || defaultMaxConnTimeSeconds, - }, - timeout: cdk.Duration.seconds(30), + // Mutation: recordEventsByNode (Lambda resolver due to BatchPutItem restrictions in JS) + meshV2DataSource.createResolver('RecordEventsByNodeResolver', { + typeName: 'Mutation', + fieldName: 'recordEventsByNode', }); - // Grant Lambda permissions to DynamoDB - this.table.grantReadWriteData(dissolveGroupLambda); - - // Lambda Data Source - const dissolveGroupDataSource = this.api.addLambdaDataSource( - 'DissolveGroupDataSource', - dissolveGroupLambda - ); + // Query: getEventsSince + dynamoDbDataSource.createResolver('GetEventsSinceResolver', { + typeName: 'Query', + fieldName: 'getEventsSince', + runtime: appsync.FunctionRuntime.JS_1_0_0, + code: appsync.Code.fromAsset(path.join(__dirname, '../js/resolvers/Query.getEventsSince.js')) + }); // Mutation: dissolveGroup (Lambda resolver) - dissolveGroupDataSource.createResolver('DissolveGroupResolver', { + meshV2DataSource.createResolver('DissolveGroupResolver', { typeName: 'Mutation', fieldName: 'dissolveGroup', }); // Mutation: createDomain (Lambda resolver) - dissolveGroupDataSource.createResolver('CreateDomainResolver', { + meshV2DataSource.createResolver('CreateDomainResolver', { typeName: 'Mutation', fieldName: 'createDomain', }); // Mutation: leaveGroup (Lambda resolver) - dissolveGroupDataSource.createResolver('LeaveGroupResolver', { + meshV2DataSource.createResolver('LeaveGroupResolver', { typeName: 'Mutation', fieldName: 'leaveGroup', }); diff --git a/spec/fixtures/mutations/create_group.graphql b/spec/fixtures/mutations/create_group.graphql index 4a5f889..4cf8d94 100644 --- a/spec/fixtures/mutations/create_group.graphql +++ b/spec/fixtures/mutations/create_group.graphql @@ -3,12 +3,14 @@ mutation CreateGroup( $hostId: ID! $domain: String! $maxConnectionTimeSeconds: Int + $useWebSocket: Boolean! ) { createGroup( name: $name hostId: $hostId domain: $domain maxConnectionTimeSeconds: $maxConnectionTimeSeconds + useWebSocket: $useWebSocket ) { id domain @@ -17,5 +19,7 @@ mutation CreateGroup( hostId createdAt expiresAt + useWebSocket + pollingIntervalSeconds } -} +} \ No newline at end of file diff --git a/spec/requests/group_management_spec.rb b/spec/requests/group_management_spec.rb index 7113502..6d1f7fc 100644 --- a/spec/requests/group_management_spec.rb +++ b/spec/requests/group_management_spec.rb @@ -10,7 +10,8 @@ variables = { name: "Test Group", hostId: "host-#{Time.now.to_i}-001", - domain: "test.example.com" + domain: "test.example.com", + useWebSocket: true } response = execute_graphql(query, variables) @@ -32,7 +33,8 @@ variables = { name: "Test Group", hostId: host_id, - domain: "test.example.com" + domain: "test.example.com", + useWebSocket: true } # 1回目 @@ -57,7 +59,8 @@ variables1 = { name: "Group A", hostId: "host-#{timestamp}-003", - domain: "test.example.com" + domain: "test.example.com", + useWebSocket: true } response1 = execute_graphql(query, variables1) expect(response1["errors"]).to be_nil @@ -67,7 +70,8 @@ variables2 = { name: "Group B", hostId: "host-#{timestamp}-004", - domain: "test.example.com" + domain: "test.example.com", + useWebSocket: true } response2 = execute_graphql(query, variables2) expect(response2["errors"]).to be_nil diff --git a/spec/requests/leave_group_spec.rb b/spec/requests/leave_group_spec.rb index 66df920..b272523 100644 --- a/spec/requests/leave_group_spec.rb +++ b/spec/requests/leave_group_spec.rb @@ -17,7 +17,8 @@ create_res = execute_graphql(create_query, { name: group_name, hostId: host_id, - domain: domain + domain: domain, + useWebSocket: true }) group_id = create_res["data"]["createGroup"]["id"] diff --git a/spec/requests/renew_heartbeat_expires_at_spec.rb b/spec/requests/renew_heartbeat_expires_at_spec.rb index 99adc8e..f05fdef 100644 --- a/spec/requests/renew_heartbeat_expires_at_spec.rb +++ b/spec/requests/renew_heartbeat_expires_at_spec.rb @@ -8,8 +8,8 @@ it "returns the same expiresAt during renewHeartbeat as createGroup" do # 1. Create a group create_mutation = <<~GRAPHQL - mutation CreateGroup($name: String!, $hostId: ID!, $domain: String!) { - createGroup(name: $name, hostId: $hostId, domain: $domain) { + mutation CreateGroup($name: String!, $hostId: ID!, $domain: String!, $useWebSocket: Boolean!) { + createGroup(name: $name, hostId: $hostId, domain: $domain, useWebSocket: $useWebSocket) { id expiresAt } @@ -19,7 +19,8 @@ create_vars = { name: group_name, hostId: host_id, - domain: domain + domain: domain, + useWebSocket: true } create_response = execute_graphql(create_mutation, create_vars) diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index bd7ead3..8fb50c4 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -55,12 +55,13 @@ def execute_graphql(query, variables = {}, suppress_errors: false) end # テスト用グループ作成ヘルパー -def create_test_group(name, host_id, domain, max_connection_time_seconds: nil) +def create_test_group(name, host_id, domain, max_connection_time_seconds: nil, use_websocket: true) query = File.read(File.join(__dir__, "fixtures/mutations/create_group.graphql")) variables = { name: name, hostId: host_id, - domain: domain + domain: domain, + useWebSocket: use_websocket } variables[:maxConnectionTimeSeconds] = max_connection_time_seconds if max_connection_time_seconds diff --git a/spec/unit/domain/group_spec.rb b/spec/unit/domain/group_spec.rb index 2b757d7..b0842c7 100644 --- a/spec/unit/domain/group_spec.rb +++ b/spec/unit/domain/group_spec.rb @@ -9,7 +9,9 @@ name: "Test Group", host_id: "host-001", domain: "example.com", - created_at: "2025-01-01T00:00:00Z" + created_at: "2025-01-01T00:00:00Z", + use_websocket: false, + polling_interval_seconds: 2 ) expect(group.id).to eq("group-001") @@ -17,6 +19,8 @@ expect(group.host_id).to eq("host-001") expect(group.domain).to eq("example.com") expect(group.created_at).to eq("2025-01-01T00:00:00Z") + expect(group.use_websocket).to eq(false) + expect(group.polling_interval_seconds).to eq(2) end it "idがnilの場合はエラーを発生させる" do diff --git a/spec/unit/use_cases/create_group_spec.rb b/spec/unit/use_cases/create_group_spec.rb index bcc742e..c1bf9a7 100644 --- a/spec/unit/use_cases/create_group_spec.rb +++ b/spec/unit/use_cases/create_group_spec.rb @@ -20,6 +20,8 @@ expect(group.domain).to eq("example.com") expect(group.id).to be_present expect(group.created_at).to match_iso8601 + expect(group.use_websocket).to eq(true) + expect(group.polling_interval_seconds).to be_nil end result = use_case.execute( @@ -34,6 +36,24 @@ expect(result.domain).to eq("example.com") end + it "use_websocket: false の場合、ポーリング間隔が設定される" do + allow(repository).to receive(:find_group_by_host_and_domain).and_return(nil) + allow(ENV).to receive(:[]).and_call_original + allow(ENV).to receive(:[]).with("MESH_POLLING_INTERVAL_SECONDS").and_return("5") + + expect(repository).to receive(:save_group) do |group| + expect(group.use_websocket).to eq(false) + expect(group.polling_interval_seconds).to eq(5) + end + + use_case.execute( + name: "Polling Group", + host_id: "host-001", + domain: "example.com", + use_websocket: false + ) + end + it "グループIDが自動生成される" do allow(repository).to receive(:find_group_by_host_and_domain) .and_return(nil) diff --git a/test/mesh-v2.test.ts b/test/mesh-v2.test.ts index 8d72ac4..68a4cac 100644 --- a/test/mesh-v2.test.ts +++ b/test/mesh-v2.test.ts @@ -19,6 +19,60 @@ describe('MeshV2Stack', () => { }); }); + test('Polling Resolvers and Environment Variables', () => { + const app = new cdk.App(); + const stack = new MeshV2.MeshV2Stack(app, 'MyTestStack', { + env: { account: '123456789012', region: 'us-east-1' } + }); + const template = Template.fromStack(stack); + + // Environment Variables + template.hasResourceProperties('AWS::AppSync::GraphQLApi', { + EnvironmentVariables: { + MESH_EVENT_TTL_SECONDS: '10', + MESH_POLLING_INTERVAL_SECONDS: '2' + } + }); + + // Resolvers + // 1. listGroupsByDomain (Unit) + // 2. listGroupStatuses (Unit) + // 3. findNodeMetadata (Function for getNodeStatus) -> Wait, this is Function, not Resolver + // 4. fetchNodeStatus (Function for getNodeStatus) -> Wait, this is Function, not Resolver + // 5. getNodeStatus (Pipeline Resolver) + // 6. listNodesInGroup (Unit) + // 7. createGroup (Pipeline Resolver) + // 8. joinGroup (Pipeline Resolver) + // 9. renewHeartbeat (Pipeline Resolver) + // 10. sendMemberHeartbeat (Pipeline Resolver) + // 11. reportDataByNode (Pipeline Resolver) + // 12. fireEventsByNode (Pipeline Resolver) + // 13. recordEventsByNode (Pipeline Resolver) + // 14. getEventsSince (Unit) + // 15. dissolveGroup (Lambda) + // 16. createDomain (Lambda) + // 17. leaveGroup (Lambda) + // Functions are different resources (AWS::AppSync::Function) + + // Let's count Resolvers again: + // listGroupsByDomain, listGroupStatuses, getNodeStatus, listNodesInGroup, createGroup, + // joinGroup, renewHeartbeat, sendMemberHeartbeat, reportDataByNode, fireEventsByNode, + // recordEventsByNode, getEventsSince, dissolveGroup, createDomain, leaveGroup. + // That's 15. + + template.resourceCountIs('AWS::AppSync::Resolver', 15); + + template.hasResourceProperties('AWS::AppSync::Resolver', { + FieldName: 'recordEventsByNode', + TypeName: 'Mutation' + }); + + template.hasResourceProperties('AWS::AppSync::Resolver', { + FieldName: 'getEventsSince', + TypeName: 'Query' + }); + }); + test('WAF is created when stage is prod', () => { const app = new cdk.App({ context: {