Skip to content

Commit c809dd4

Browse files
committed
grpc client and server adjustments after review and test
1 parent 312c763 commit c809dd4

23 files changed

Lines changed: 1124 additions & 1405 deletions

GRPC_ARCHITECTURE.md

Lines changed: 23 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,9 @@ The cuOpt remote execution system uses gRPC for client-server communication. The
66
supports arbitrarily large optimization problems (multi-GB) through a chunked array transfer
77
protocol that uses only unary (request-response) RPCs — no bidirectional streaming.
88

9-
All serialization uses protocol buffers generated by `protoc` and `grpc_cpp_plugin`
10-
no custom serialization logic is implemented.
9+
All client-server serialization uses protocol buffers generated by `protoc` and
10+
`grpc_cpp_plugin`. The internal server-to-worker pipe uses protobuf for metadata
11+
headers and raw byte transfer for bulk array data (see Security Notes).
1112

1213
## Directory Layout
1314

@@ -30,7 +31,7 @@ cpp/src/grpc/
3031
├── grpc_service_impl.cpp # CuOptRemoteServiceImpl — all RPC handlers
3132
├── grpc_server_types.hpp # Shared types, globals, forward declarations
3233
├── grpc_field_element_size.hpp # ArrayFieldId → element byte size (codegen target)
33-
├── grpc_pipe_serialization.hpp # Pipe blob serialize/deserialize (varint-delimited proto)
34+
├── grpc_pipe_serialization.hpp # Pipe I/O: protobuf headers + raw byte arrays (request/result)
3435
├── grpc_incumbent_proto.hpp # Incumbent proto build/parse (codegen target)
3536
├── grpc_worker.cpp # worker_process(), incumbent callback, store_simple_result
3637
├── grpc_worker_infra.cpp # Pipes, spawn, wait_for_workers, mark_worker_jobs_failed
@@ -192,34 +193,15 @@ The client handles size-based routing transparently:
192193
- Server cleans up job state on client disconnect during upload
193194
- Automatic reconnection is NOT built-in (caller should retry)
194195

195-
## Completion Strategies
196+
## Completion Strategy
196197

197-
The client supports two strategies for waiting until a job completes:
198+
The `solve_lp` and `solve_mip` methods poll `CheckStatus` every `poll_interval_ms`
199+
until the job reaches a terminal state (COMPLETED/FAILED/CANCELLED) or `timeout_seconds`
200+
is exceeded. During polling, MIP incumbent callbacks are invoked on the main thread.
198201

199-
### Polling (default)
200-
201-
```cpp
202-
config.use_wait = false; // default
203-
```
204-
205-
- Main thread polls `CheckStatus` every `poll_interval_ms`
206-
- Detects completion when status changes to COMPLETED/FAILED/CANCELLED
207-
- Allows timeout detection (max_polls = timeout_seconds / poll_interval_ms)
208-
- Compatible with all server configurations
209-
210-
### Wait RPC
211-
212-
```cpp
213-
config.use_wait = true;
214-
```
215-
216-
- Main thread makes single blocking `WaitForCompletion` call
217-
- More efficient (no repeated RPCs)
218-
- Server blocks until job completes, then returns final status
219-
- Result must still be fetched separately via `GetResult` or chunked download
220-
221-
Both strategies support concurrent log streaming and incumbent callbacks — these
222-
run in background threads independent of the main completion check.
202+
The `WaitForCompletion` RPC is available as a public async API primitive for callers
203+
managing jobs directly, but it is not used by the convenience `solve_*` methods because
204+
polling provides timeout protection and enables incumbent callbacks.
223205

224206
## Client API (`grpc_client_t`)
225207

@@ -230,7 +212,6 @@ struct grpc_client_config_t {
230212
std::string server_address = "localhost:8765";
231213
int poll_interval_ms = 1000;
232214
int timeout_seconds = 3600; // Max wait for job completion (1 hour)
233-
bool use_wait = false; // Use WaitForCompletion instead of polling
234215
bool stream_logs = false; // Stream solver logs from server
235216

236217
// Callbacks
@@ -339,20 +320,25 @@ config.tls_client_key = read_file("client.key");
339320

340321
| Configuration | Default | Notes |
341322
|---------------|---------|-------|
342-
| Server `--max-message-mb` | 256 MiB | Per-message limit |
343-
| Client `max_message_bytes` | 256 MiB | Should match server |
323+
| Server `--max-message-mb` | 256 MiB | Per-message limit (also `--max-message-bytes` for exact byte values) |
324+
| Server clamping | [4 MiB, ~2 GiB] | Enforced at startup to stay within protobuf's serialization limit |
325+
| Client `max_message_bytes` | 256 MiB | Clamped to [4 MiB, ~2 GiB] at construction |
344326
| Chunk size | 16 MiB | Payload per `SendArrayChunk`/`GetResultChunk` |
345327
| Chunked threshold | 75% of max_message_bytes | Problems above this use chunked upload (e.g. 192 MiB when max is 256 MiB) |
346328

347329
Chunked transfer allows unlimited total payload size; only individual
348-
chunks must fit within the per-message limit.
330+
chunks must fit within the per-message limit. Neither client nor server
331+
allows "unlimited" message size — both clamp to the protobuf 2 GiB ceiling.
349332

350333
## Security Notes
351334

352-
1. **No Custom Serialization**: All message parsing uses protobuf-generated code
353-
2. **Standard gRPC Security**: HTTP/2 framing, flow control, standard status codes
354-
3. **TLS Support**: Optional encryption with mutual authentication
355-
4. **Input Validation**: Server validates all incoming messages before processing
335+
1. **gRPC Layer**: All client-server message parsing uses protobuf-generated code
336+
2. **Internal Pipe**: The server-to-worker pipe uses protobuf for metadata headers
337+
and length-prefixed raw `read()`/`write()` for bulk array data. This pipe is
338+
internal to the server process (main → forked worker) and not exposed to clients.
339+
3. **Standard gRPC Security**: HTTP/2 framing, flow control, standard status codes
340+
4. **TLS Support**: Optional encryption with mutual authentication
341+
5. **Input Validation**: Server validates all incoming gRPC messages before processing
356342

357343
## Data Flow Summary
358344

SERVER_ARCHITECTURE.md

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ All paths below are under `cpp/src/grpc/server/`.
8585
| `grpc_service_impl.cpp` | `CuOptRemoteServiceImpl`: all 14 RPC handlers (SubmitJob, CheckStatus, GetResult, chunked upload/download, StreamLogs, GetIncumbents, CancelJob, DeleteResult, WaitForCompletion, Status probe). Uses mappers and job_management to enqueue jobs and trigger pipe I/O. |
8686
| `grpc_server_types.hpp` | Shared structs (e.g. `JobQueueEntry`, `ResultQueueEntry`, `ServerConfig`, `JobInfo`), enums, globals (atomics, mutexes, condition variables), and forward declarations used across server .cpp files. |
8787
| `grpc_field_element_size.hpp` | Maps `cuopt::remote::ArrayFieldId` to element byte size; used by pipe deserialization and chunked logic. |
88-
| `grpc_pipe_serialization.hpp` | Serialize/deserialize result blobs (ChunkedResultHeader + array chunks) and chunked request blobs (ChunkedProblemHeader + chunks) and SubmitJobRequest for pipe transfer. |
88+
| `grpc_pipe_serialization.hpp` | Streaming pipe I/O: write/read individual length-prefixed protobuf messages (ChunkedProblemHeader, ChunkedResultHeader, ArrayChunk) directly to/from pipe fds. Avoids large intermediate buffers. Also serializes SubmitJobRequest for unary pipe transfer. |
8989
| `grpc_incumbent_proto.hpp` | Build `Incumbent` proto from (job_id, objective, assignment) and parse it back; used by worker when pushing incumbents and by main when reading from the incumbent pipe. |
9090
| `grpc_worker.cpp` | `worker_process(worker_index)`: loop over job queue, receive job data via pipe (unary or chunked), call solver, send result (and optionally incumbents) back. Contains `IncumbentPipeCallback` and `store_simple_result`. |
9191
| `grpc_worker_infra.cpp` | Pipe creation/teardown, `spawn_worker` / `spawn_workers`, `wait_for_workers`, `mark_worker_jobs_failed`, `cleanup_shared_memory`. |
@@ -97,9 +97,11 @@ All paths below are under `cpp/src/grpc/server/`.
9797
For large problems uploaded via chunked gRPC RPCs:
9898
9999
1. Server holds chunked upload state in memory (`ChunkedUploadState`: header + array chunks per `upload_id`).
100-
2. When `FinishChunkedUpload` is called, the server serializes header and chunks (varint-delimited) and sends them to a worker via the job's pipe.
101-
3. Worker deserializes, runs the solver, and writes result (and optionally incumbents) back via pipes.
102-
4. Main process result-retrieval thread reads the result pipe and stores the result for `GetResult` or chunked download.
100+
2. When `FinishChunkedUpload` is called, the header and chunks are stored in `pending_chunked_data`. The data dispatch thread streams them directly to the worker pipe as individual length-prefixed protobuf messages — no intermediate blob is created.
101+
3. Worker reads the streamed messages from the pipe, reassembles arrays, runs the solver, and writes the result (and optionally incumbents) back via pipes using the same streaming format.
102+
4. Main process result-retrieval thread reads the streamed result messages from the pipe and stores the result for `GetResult` or chunked download.
103+
104+
This streaming approach avoids creating a single large buffer, eliminating the 2 GiB protobuf serialization limit for pipe transfers and reducing peak memory usage. Each individual protobuf message (max 64 MiB) is serialized with standard `SerializeToArray`/`ParseFromArray`.
103105
104106
No disk spooling: chunked data is kept in memory in the main process until forwarded to the worker.
105107

cpp/cuopt_grpc_server.cpp

Lines changed: 0 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -75,33 +75,6 @@ using grpc::StatusCode;
7575
using namespace cuopt::linear_programming;
7676
// Note: NOT using "using namespace cuopt::remote" to avoid JobStatus enum conflict
7777

78-
// =============================================================================
79-
// Data Integrity - Simple Hash for Transfer Verification
80-
// =============================================================================
81-
82-
/**
83-
* @brief Compute FNV-1a 64-bit hash for data integrity verification.
84-
* Same algorithm as client - allows comparison of upload/download hashes.
85-
*/
86-
inline uint64_t compute_data_hash(const uint8_t* data, size_t size)
87-
{
88-
constexpr uint64_t FNV_OFFSET_BASIS = 14695981039346656037ULL;
89-
constexpr uint64_t FNV_PRIME = 1099511628211ULL;
90-
uint64_t hash = FNV_OFFSET_BASIS;
91-
for (size_t i = 0; i < size; ++i) {
92-
hash ^= static_cast<uint64_t>(data[i]);
93-
hash *= FNV_PRIME;
94-
}
95-
return hash;
96-
}
97-
98-
inline std::string hash_to_hex(uint64_t hash)
99-
{
100-
std::ostringstream oss;
101-
oss << std::hex << std::setfill('0') << std::setw(16) << hash;
102-
return oss.str();
103-
}
104-
10578
// ============================================================================
10679
// Pipe IPC result serialization.
10780
// Uses standard protobuf varint-delimited format (SerializeDelimitedToCodedStream).

0 commit comments

Comments
 (0)