Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,10 @@ MESH_MAX_CONNECTION_TIME_SECONDS=1500
# MESH_MEMBER_HEARTBEAT_INTERVAL_SECONDS=15
# MESH_MEMBER_HEARTBEAT_TTL_SECONDS=60
# MESH_MAX_CONNECTION_TIME_SECONDS=300

# Event Polling Settings
# Event TTL in seconds (events are auto-deleted after this time)
MESH_EVENT_TTL_SECONDS=10

# Polling interval for clients that cannot use WebSocket (in seconds)
MESH_POLLING_INTERVAL_SECONDS=2
17 changes: 17 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,23 @@ Mesh v2 is a cloud-based backend system that enables real-time data sharing and
- **Event Rate**: 2 events/sec/group
- **Total Write Load**: 170 TPS

## Event Communication Protocols

Mesh v2 supports two protocols for event communication:

### 1. WebSocket Protocol (Primary)
- **Mechanism**: AppSync Subscriptions (WebSocket).
- **Features**: Real-time, low latency, low cost.
- **Requirement**: Network environment must allow `wss://` protocol.

### 2. Polling Protocol (Fallback)
- **Mechanism**: `recordEventsByNode` mutation (save to DynamoDB) + `getEventsSince` query (polling).
- **Features**: HTTPS only, works behind strict firewalls/filters.
- **Latency**: Up to 2 seconds (default polling interval).
- **TTL**: Events are automatically deleted after 10 seconds to minimize storage costs.

Clients automatically detect WebSocket availability during group creation and switch to the Polling protocol if necessary.

## Event Batching

Mesh v2 supports batch event sending to optimize AWS AppSync Subscription costs and preserve event timing.
Expand Down
62 changes: 61 additions & 1 deletion docs/api-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ type Group {
createdAt: AWSDateTime!
expiresAt: AWSDateTime! # グループの有効期限
heartbeatIntervalSeconds: Int
useWebSocket: Boolean! # NEW: WebSocket 使用フラグ
pollingIntervalSeconds: Int # NEW: ポーリング間隔(useWebSocket=false の場合のみ)
}
```

Expand All @@ -38,7 +40,10 @@ type Node {
name: String!
groupId: ID
domain: String # 所属しているdomain
expiresAt: AWSDateTime
heartbeatIntervalSeconds: Int
useWebSocket: Boolean # NEW: グループの設定を継承
pollingIntervalSeconds: Int # NEW: ポーリング間隔(useWebSocket=false の場合のみ)
}
```

Expand All @@ -57,8 +62,11 @@ type SensorData {
type Event {
name: String!
firedByNodeId: ID!
payload: AWSJSON
groupId: ID!
domain: String!
payload: String
timestamp: AWSDateTime!
cursor: String # NEW: ポーリング用のカーソル(SK)
}
```

Expand Down Expand Up @@ -177,6 +185,29 @@ query ListNodesInGroup($groupId: ID!, $domain: String!) {

**用途**: グループメンバーの一覧取得。

### getEventsSince

前回取得日時以降のイベントを取得します(ポーリング用)。

```graphql
query GetEventsSince($groupId: ID!, $domain: String!, $since: String!) {
getEventsSince(groupId: $groupId, domain: $domain, since: $since) {
name
firedByNodeId
groupId
domain
payload
timestamp
cursor
}
}
```

**パラメータ**:
- `since: String!` - 前回の `nextSince` または最後に取得したイベントの `cursor` を指定します。

**戻り値**: イベントの配列。最大 100 件まで取得されます。

## Mutations

### createDomain
Expand Down Expand Up @@ -293,6 +324,35 @@ mutation FireEventsByNode(
}
```

### recordEventsByNode

ノードが複数のイベントを一度に送信し、DynamoDB に保存します(ポーリング用)。

```graphql
mutation RecordEventsByNode(
$nodeId: ID!
$groupId: ID!
$domain: String!
$events: [EventInput!]!
) {
recordEventsByNode(
nodeId: $nodeId
groupId: $groupId
domain: $domain
events: $events
) {
groupId
domain
recordedCount
nextSince
}
}
```

**用途**: WebSocket が使用できない環境でのイベント送信に使用。この mutation は `onMessageInGroup` subscription を**トリガーしません**。

---

### leaveGroup

ノードがグループから退出します。
Expand Down
42 changes: 38 additions & 4 deletions docs/architecture.md
Original file line number Diff line number Diff line change
Expand Up @@ -170,15 +170,16 @@ sequenceDiagram
participant DynamoDB
participant Subscription

Note over Node1,Subscription: WebSocket プロトコル使用時

Node2->>AppSync: subscribe: onMessageInGroup(groupId, domain)
AppSync->>Subscription: WebSocket接続確立

Node1->>AppSync: fireEventsByNode(nodeId, groupId, domain, events[])
AppSync->>Resolver: JS Resolver
Resolver->>DynamoDB: Query: checkGroupExists
alt グループ存在
Resolver->>DynamoDB: BatchWriteItem: イベント保存(最大25件)
DynamoDB-->>Resolver: Success
Resolver->>DynamoDB: (None DataSource): Skip persistence
Resolver-->>AppSync: MeshMessage
AppSync->>Subscription: Publish: onMessageInGroup (batchEvent)
Subscription-->>Node2: MeshMessage(リアルタイム配信)
Expand All @@ -189,6 +190,36 @@ sequenceDiagram
end
```

### イベント通信フロー(ポーリング)

```mermaid
sequenceDiagram
participant Node1 as Node 1 (送信)
participant Node2 as Node 2 (受信)
participant AppSync
participant Resolver
participant DynamoDB

Note over Node1,DynamoDB: ポーリング プロトコル使用時

Node1->>AppSync: recordEventsByNode(nodeId, groupId, domain, events[])
AppSync->>Resolver: JS Resolver (Pipeline)
Resolver->>DynamoDB: checkGroupExists
Resolver->>DynamoDB: BatchWriteItem: イベント保存 (TTL 10秒)
DynamoDB-->>Resolver: Success
Resolver-->>AppSync: RecordEventsPayload (nextSince)
AppSync-->>Node1: Response

loop 2秒間隔
Node2->>AppSync: getEventsSince(groupId, domain, since)
AppSync->>Resolver: JS Resolver (Unit)
Resolver->>DynamoDB: Query: pk=GROUP#id@domain AND sk > EVENT#since
DynamoDB-->>Resolver: Items (Event[])
Resolver-->>AppSync: Event[]
AppSync-->>Node2: Response
end
```

### ハートビート更新フロー

```mermaid
Expand Down Expand Up @@ -299,7 +330,9 @@ Mesh v2 は Single Table Design を採用し、1つのテーブルにすべて
"hostId": "node-001",
"createdAt": "2026-01-01T00:00:00Z",
"expiresAt": 1704067200,
"ttl": 1704067200
"ttl": 1704067200,
"useWebSocket": true,
"pollingIntervalSeconds": 2
}
```

Expand Down Expand Up @@ -376,7 +409,8 @@ Mesh v2 は Single Table Design を採用し、1つのテーブルにすべて
"groupId": "abc123",
"domain": "192.168.1.1",
"payload": "{\"button\":\"A\"}",
"timestamp": "2026-01-01T12:00:00.123Z"
"timestamp": "2026-01-01T12:00:00.123Z",
"ttl": 1704067210
}
```

Expand Down
31 changes: 30 additions & 1 deletion graphql/schema.graphql
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@ type Group {
fullId: String! # {group_id}@{domain}の完全なID
name: String!
hostId: ID!
createdAt: AWSDateTime!
expiresAt: AWSDateTime!
heartbeatIntervalSeconds: Int
useWebSocket: Boolean! # NEW: WebSocket 使用フラグ
pollingIntervalSeconds: Int # NEW: ポーリング間隔(useWebSocket=false の場合のみ)
}

type Node {
Expand All @@ -30,6 +31,8 @@ type Node {
domain: String # 所属しているdomain
expiresAt: AWSDateTime
heartbeatIntervalSeconds: Int
useWebSocket: Boolean # NEW: グループの設定を継承
pollingIntervalSeconds: Int # NEW: ポーリング間隔(useWebSocket=false の場合のみ)
}

# メッセージの包括的な型
Expand Down Expand Up @@ -58,6 +61,7 @@ type Event {
domain: String!
payload: String
timestamp: AWSDateTime!
cursor: String # NEW: ポーリング用のカーソル(SK)
}

# イベントのバッチ送信用の入力型
Expand Down Expand Up @@ -104,6 +108,14 @@ type LeaveGroupPayload {
message: String!
}

# NEW: recordEventsByNode のレスポンス型
type RecordEventsPayload {
groupId: ID!
domain: String!
recordedCount: Int! # 記録したイベント数
nextSince: String! # NEW: 次回の getEventsSince の since に指定する値
}

# --- クエリ (Queries) ---

type Query {
Expand All @@ -118,6 +130,14 @@ type Query {

# グループ内の全Node取得
listNodesInGroup(groupId: ID!, domain: String!): [Node!]!

# NEW: 前回取得日時以降のイベントを取得(ポーリング用)
# since: 前回の nextSince または Event.cursor を指定
getEventsSince(
groupId: ID!
domain: String!
since: String!
): [Event!]!
}

# --- ミューテーション (Mutations) ---
Expand All @@ -133,6 +153,7 @@ type Mutation {
hostId: ID!
domain: String!
maxConnectionTimeSeconds: Int # オプション: 1以上、環境変数の値以下
useWebSocket: Boolean! # NEW: WebSocket 使用フラグ
): Group!

joinGroup(
Expand Down Expand Up @@ -182,6 +203,14 @@ type Mutation {
nodeId: ID!
events: [EventInput!]!
): MeshMessage

# NEW: ポーリング用のイベント記録 mutation
recordEventsByNode(
groupId: ID!
domain: String!
nodeId: ID!
events: [EventInput!]!
): RecordEventsPayload!
}

# --- サブスクリプション (Subscriptions) ---
Expand Down
4 changes: 3 additions & 1 deletion js/functions/checkExistingGroup.js
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,9 @@ export function response(ctx) {
fullId: existingGroups[0].fullId,
name: existingGroups[0].name,
hostId: existingGroups[0].hostId,
createdAt: existingGroups[0].createdAt
expiresAt: existingGroups[0].expiresAt,
useWebSocket: existingGroups[0].useWebSocket !== false, // デフォルト true
pollingIntervalSeconds: existingGroups[0].pollingIntervalSeconds
};
}

Expand Down
15 changes: 12 additions & 3 deletions js/functions/createGroupIfNotExists.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ export function request(ctx) {
}

// 新規グループ作成
const { name, hostId, domain, maxConnectionTimeSeconds } = ctx.args;
const { name, hostId, domain, maxConnectionTimeSeconds, useWebSocket } = ctx.args;

// Domain文字列のバリデーション(最大256文字)
if (domain.length > 256) {
Expand All @@ -40,6 +40,9 @@ export function request(ctx) {
actualMaxSeconds = maxConnectionTimeSeconds;
}

// Polling interval decision
const pollingInterval = useWebSocket ? null : +(ctx.env.MESH_POLLING_INTERVAL_SECONDS || '2');

// グループID生成
const groupId = util.autoId();
const fullId = `${groupId}@${domain}`;
Expand All @@ -65,6 +68,8 @@ export function request(ctx) {
expiresAt: expiresAt,
heartbeatAt: nowEpoch,
ttl: ttl,
useWebSocket: useWebSocket,
pollingIntervalSeconds: pollingInterval,
// GSI用(groupId -> domain の逆引き検索)
gsi_pk: `GROUP#${groupId}`,
gsi_sk: `DOMAIN#${domain}`
Expand All @@ -91,7 +96,9 @@ export function response(ctx) {
hostId: ctx.stash.existingGroup.hostId,
createdAt: ctx.stash.existingGroup.createdAt,
expiresAt: ctx.stash.existingGroup.expiresAt,
heartbeatIntervalSeconds: heartbeatIntervalSeconds
heartbeatIntervalSeconds: heartbeatIntervalSeconds,
useWebSocket: ctx.stash.existingGroup.useWebSocket !== false, // デフォルト true
pollingIntervalSeconds: ctx.stash.existingGroup.pollingIntervalSeconds
};
}

Expand All @@ -104,6 +111,8 @@ export function response(ctx) {
hostId: ctx.result.hostId,
createdAt: ctx.result.createdAt,
expiresAt: ctx.result.expiresAt,
heartbeatIntervalSeconds: heartbeatIntervalSeconds
heartbeatIntervalSeconds: heartbeatIntervalSeconds,
useWebSocket: ctx.result.useWebSocket,
pollingIntervalSeconds: ctx.result.pollingIntervalSeconds
};
}
4 changes: 3 additions & 1 deletion js/functions/joinGroupFunction.js
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ export function response(ctx) {
groupId: groupId,
domain: domain,
expiresAt: group ? group.expiresAt : null,
heartbeatIntervalSeconds: +(ctx.env.MESH_MEMBER_HEARTBEAT_INTERVAL_SECONDS || '120')
heartbeatIntervalSeconds: +(ctx.env.MESH_MEMBER_HEARTBEAT_INTERVAL_SECONDS || '120'),
useWebSocket: group ? group.useWebSocket !== false : true,
pollingIntervalSeconds: group ? group.pollingIntervalSeconds : null
};
}
Loading
Loading