Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
d47f6c9
feat(fleetnode): add command artifact transfer foundation
ankitgoswami Jun 25, 2026
386f8fb
fix(review): harden command artifact transfers
ankitgoswami Jun 25, 2026
a214d75
refactor(review): simplify command artifact transfer code
ankitgoswami Jun 25, 2026
d4fb0ca
chore: regenerate fleetnode gateway client proto
ankitgoswami Jun 25, 2026
baea2b9
fix: bound command artifact lifecycle
ankitgoswami Jun 25, 2026
77c9312
fix: bound command artifact stream resources
ankitgoswami Jun 25, 2026
bdfe386
fix: tighten command artifact retries and metadata
ankitgoswami Jun 25, 2026
4448af9
refactor: simplify command artifact plumbing
ankitgoswami Jun 25, 2026
b97c8da
Address PR review feedback (#596)
ankitgoswami Jun 25, 2026
dadc89e
Address artifact upload review feedback (#596)
ankitgoswami Jun 25, 2026
eec4cf9
Address artifact transfer slot feedback (#596)
ankitgoswami Jun 25, 2026
6237ea5
Address artifact review follow-ups (#596)
ankitgoswami Jun 25, 2026
697275a
Make completed artifact uploads retryable (#596)
ankitgoswami Jun 25, 2026
47ab949
Bound artifact chunks and download retries (#596)
ankitgoswami Jun 25, 2026
c836a67
Drain duplicate artifact upload retries (#596)
ankitgoswami Jun 26, 2026
c3e74c9
Fix command artifact transfer teardown handling (#596)
ankitgoswami Jun 26, 2026
813cef5
Simplify command artifact plumbing (#596)
ankitgoswami Jun 26, 2026
cf37c2d
Address artifact transfer feedback (#596)
ankitgoswami Jun 26, 2026
ad4ff24
Cap command artifact upload messages (#596)
ankitgoswami Jun 26, 2026
6e72e9e
Simplify command artifact tests (#596)
ankitgoswami Jun 26, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

64 changes: 64 additions & 0 deletions proto/fleetnodegateway/v1/fleetnodegateway.proto
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ service FleetNodeGatewayService {
rpc UploadTelemetry(stream UploadTelemetryRequest) returns (UploadTelemetryResponse);
rpc UploadEvents(stream UploadEventsRequest) returns (UploadEventsResponse);
rpc UploadHeartbeat(UploadHeartbeatRequest) returns (UploadHeartbeatResponse);
rpc UploadCommandArtifact(stream UploadCommandArtifactRequest) returns (UploadCommandArtifactResponse);
rpc DownloadCommandArtifact(DownloadCommandArtifactRequest) returns (stream DownloadCommandArtifactResponse);
rpc ReportDiscoveredDevices(ReportDiscoveredDevicesRequest) returns (ReportDiscoveredDevicesResponse);
rpc ReportPairedDevices(ReportPairedDevicesRequest) returns (ReportPairedDevicesResponse);
rpc ControlStream(stream ControlStreamRequest) returns (stream ControlStreamResponse);
Expand Down Expand Up @@ -95,6 +97,68 @@ message UploadHeartbeatResponse {
google.protobuf.Timestamp received_at = 1;
}

// CommandArtifactPurpose describes the logical command feature that owns an
// artifact transfer. The command_id plus purpose bind bytes to an in-flight
// server-issued command; nodes do not get a generic file API.
enum CommandArtifactPurpose {
COMMAND_ARTIFACT_PURPOSE_UNSPECIFIED = 0;
COMMAND_ARTIFACT_PURPOSE_MINER_LOGS = 1;
COMMAND_ARTIFACT_PURPOSE_FIRMWARE_PAYLOAD = 2;
}

message CommandArtifactRef {
string artifact_id = 1 [(buf.validate.field).string = {min_len: 1, max_len: 128}];
CommandArtifactPurpose purpose = 2 [(buf.validate.field).enum = {defined_only: true, not_in: [0]}];
string filename = 3 [(buf.validate.field).string = {min_len: 1, max_len: 255}];
int64 size_bytes = 4 [(buf.validate.field).int64.gt = 0];
string sha256 = 5 [(buf.validate.field).string = {len: 64, pattern: "^[a-f0-9]{64}$"}];
}

message CommandArtifactUploadHeader {
string command_id = 1 [(buf.validate.field).string = {min_len: 1, max_len: 128}];
CommandArtifactPurpose purpose = 2 [(buf.validate.field).enum = {defined_only: true, not_in: [0]}];
string filename = 3 [(buf.validate.field).string = {min_len: 1, max_len: 255}];
int64 size_bytes = 4 [(buf.validate.field).int64.gt = 0];
string sha256 = 5 [(buf.validate.field).string = {len: 64, pattern: "^[a-f0-9]{64}$"}];
string device_identifier = 6 [(buf.validate.field).string.max_len = 255];
}

message CommandArtifactChunk {
bytes data = 1 [(buf.validate.field).bytes = {min_len: 1, max_len: 1048576}];
}

message UploadCommandArtifactRequest {
oneof part {
option (buf.validate.oneof).required = true;

CommandArtifactUploadHeader header = 1;
CommandArtifactChunk chunk = 2;
}
}

message UploadCommandArtifactResponse {
CommandArtifactRef artifact = 1 [(buf.validate.field).required = true];
}

message DownloadCommandArtifactRequest {
string command_id = 1 [(buf.validate.field).string = {min_len: 1, max_len: 128}];
CommandArtifactRef artifact = 2 [(buf.validate.field).required = true];
string device_identifier = 3 [(buf.validate.field).string.max_len = 255];
}

message CommandArtifactDownloadHeader {
CommandArtifactRef artifact = 1 [(buf.validate.field).required = true];
}

message DownloadCommandArtifactResponse {
oneof part {
option (buf.validate.oneof).required = true;

CommandArtifactDownloadHeader header = 1;
CommandArtifactChunk chunk = 2;
}
}

message ReportDiscoveredDevicesRequest {
repeated DiscoveredDeviceReport devices = 1
[(buf.validate.field).repeated.max_items = 1024];
Expand Down
1 change: 1 addition & 0 deletions server/cmd/fleetd/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
type HTTPConfig struct {
Address string `help:"Address to listen on" default:"127.0.0.1:8080" env:"LISTEN_ADDRESS"`
ReadHeaderTimeout time.Duration `help:"Read header timeout" default:"3s" env:"READ_HEADER_TIMEOUT"`
WriteByteTimeout time.Duration `help:"HTTP/2 timeout for stalled response writes." default:"30s" env:"WRITE_BYTE_TIMEOUT"`
SuppressCors bool `help:"Suppress CORS" default:"false" env:"SUPPRESS_CORS"`
PprofAddr string `help:"Address to listen for pprof debug server, e.g. 127.0.0.1:6060 (empty disables it; use a non-loopback address only if you intentionally want remote access)" default:"" env:"PPROF_ADDR"`
}
Expand Down
40 changes: 38 additions & 2 deletions server/cmd/fleetd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,32 @@ func start(config *Config) error {
if err != nil {
return err
}
commandArtifactCleanupCtx, commandArtifactCleanupCancel := context.WithCancel(context.Background())
runCommandArtifactSweep := func() {
deleted, sweepErr := filesService.SweepExpiredCommandArtifacts(time.Now().UTC(), filesService.CommandArtifactRetentionTTL())
if sweepErr != nil {
slog.Error("failed to sweep expired command artifacts", "error", sweepErr)
return
}
if deleted > 0 {
slog.Debug("swept expired command artifacts", "count", deleted)
}
}
go func() {
ticker := time.NewTicker(filesService.CommandArtifactCleanupInterval())
defer ticker.Stop()
runCommandArtifactSweep()

for {
select {
case <-ticker.C:
runCommandArtifactSweep()
case <-commandArtifactCleanupCtx.Done():
return
}
}
}()
defer commandArtifactCleanupCancel()
minerService := miner.NewMinerService(conn, userStore, encryptSvc, filesService, pluginManager).
WithCommandSender(fleetNodeControlRegistry)

Expand Down Expand Up @@ -613,7 +639,11 @@ func start(config *Config) error {
mux.Handle(curtailmentv1connect.NewCurtailmentServiceHandler(curtailmentHandler.NewHandlerWithAutomation(curtailmentSvc, curtailmentResponseProfileSvc, curtailmentAutomationSvc, mqttSettingsSvc), li))
mux.Handle(sitesv1connect.NewSiteServiceHandler(sitesHandler.NewHandler(sitesSvc), li))
mux.Handle(buildingsv1connect.NewBuildingServiceHandler(buildingsHandler.NewHandler(buildingsSvc), li))
mux.Handle(fleetnodegatewayv1connect.NewFleetNodeGatewayServiceHandler(gateway.NewHandler(fleetNodeEnrollmentSvc, fleetNodeAuthSvc, fleetNodePairingSvc, fleetNodeControlRegistry), li))
mux.Handle(fleetnodegatewayv1connect.NewFleetNodeGatewayServiceHandler(
gateway.NewHandler(fleetNodeEnrollmentSvc, fleetNodeAuthSvc, fleetNodePairingSvc, fleetNodeControlRegistry, filesService),
li,
gateway.CommandArtifactUploadReadLimitOption(),
))
mux.Handle(fleetnodeadminv1connect.NewFleetNodeAdminServiceHandler(admin.NewHandler(fleetNodeEnrollmentSvc, fleetNodePairingSvc, fleetNodeDiscoverySvc), li))
mux.Handle(collectionv1connect.NewDeviceCollectionServiceHandler(collectionHandler.NewHandler(collectionSvc), li))
mux.Handle(device_setv1connect.NewDeviceSetServiceHandler(devicesetHandler.NewHandler(collectionSvc), li))
Expand Down Expand Up @@ -663,7 +693,7 @@ func start(config *Config) error {
handler = m.Wrap(handler)
}

handler = h2c.NewHandler(handler, &http2.Server{})
handler = h2c.NewHandler(handler, newHTTP2Server(config.HTTP))
httpServer := http.Server{
Addr: config.HTTP.Address,
Handler: handler,
Expand All @@ -675,3 +705,9 @@ func start(config *Config) error {
}
return nil
}

func newHTTP2Server(config HTTPConfig) *http2.Server {
return &http2.Server{
WriteByteTimeout: config.WriteByteTimeout,
}
}
70 changes: 70 additions & 0 deletions server/cmd/fleetd/main_test.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,18 @@
package main

import (
"bytes"
"errors"
"net"
"net/http"
"os"
"path/filepath"
"testing"
"time"

"golang.org/x/net/http2"
"golang.org/x/net/http2/hpack"

"github.com/alecthomas/kong"
kongyaml "github.com/alecthomas/kong-yaml"
"github.com/stretchr/testify/require"
Expand All @@ -26,6 +33,7 @@ encrypt:
service-master-key: "test-master-key"
http:
address: "0.0.0.0:9090"
write-byte-timeout: "45s"
suppress-cors: true
logging:
json: true
Expand All @@ -42,6 +50,7 @@ logging:
_, err = parser.Parse(nil)
require.NoError(t, err)
require.Equal(t, "0.0.0.0:9090", config.HTTP.Address)
require.Equal(t, 45*time.Second, config.HTTP.WriteByteTimeout)
require.True(t, config.HTTP.SuppressCors)
require.Equal(t, "db.internal:5432", config.DB.Address)
require.True(t, config.Log.JSON)
Expand Down Expand Up @@ -77,13 +86,74 @@ logging:

_, err = parser.Parse([]string{
"--http-address=127.0.0.1:8081",
"--http-write-byte-timeout=1m",
"--logging-json=false",
})
require.NoError(t, err)
require.Equal(t, "127.0.0.1:8081", config.HTTP.Address)
require.Equal(t, time.Minute, config.HTTP.WriteByteTimeout)
require.False(t, config.Log.JSON)
}

func TestHTTP2WriteByteTimeoutStopsNonReadingClient(t *testing.T) {
t.Parallel()

errMissingFlusher := errors.New("response writer does not implement http.Flusher")
handlerDone := make(chan error, 1)
handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
chunk := bytes.Repeat([]byte("x"), 1024)
flusher, ok := w.(http.Flusher)
if !ok {
handlerDone <- errMissingFlusher
return
}
for {
if _, err := w.Write(chunk); err != nil {
handlerDone <- err
return
}
flusher.Flush()
if err := r.Context().Err(); err != nil {
handlerDone <- err
return
}
}
})

serverConn, clientConn := net.Pipe()
defer clientConn.Close()
go newHTTP2Server(HTTPConfig{WriteByteTimeout: 50 * time.Millisecond}).ServeConn(serverConn, &http2.ServeConnOpts{
Handler: handler,
})

framer := http2.NewFramer(clientConn, clientConn)
_, err := clientConn.Write([]byte(http2.ClientPreface))
require.NoError(t, err)
require.NoError(t, framer.WriteSettings())
var headers bytes.Buffer
encoder := hpack.NewEncoder(&headers)
require.NoError(t, encoder.WriteField(hpack.HeaderField{Name: ":method", Value: http.MethodGet}))
require.NoError(t, encoder.WriteField(hpack.HeaderField{Name: ":scheme", Value: "http"}))
require.NoError(t, encoder.WriteField(hpack.HeaderField{Name: ":authority", Value: "fleetd.test"}))
require.NoError(t, encoder.WriteField(hpack.HeaderField{Name: ":path", Value: "/"}))
require.NoError(t, framer.WriteHeaders(http2.HeadersFrameParam{
StreamID: 1,
BlockFragment: headers.Bytes(),
EndHeaders: true,
EndStream: true,
}))
_, err = framer.ReadFrame()
require.NoError(t, err)

select {
case err := <-handlerDone:
require.Error(t, err)
require.NotErrorIs(t, err, errMissingFlusher)
case <-time.After(3 * time.Second):
t.Fatal("handler did not unblock after client stopped reading response body")
}
}

func writeFleetdConfigFile(t *testing.T, contents string) string {
t.Helper()

Expand Down
2 changes: 2 additions & 0 deletions server/cmd/fleetnode/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ type gatewayClient interface {
UploadHeartbeat(ctx context.Context, req *connect.Request[pb.UploadHeartbeatRequest]) (*connect.Response[pb.UploadHeartbeatResponse], error)
ReportDiscoveredDevices(ctx context.Context, req *connect.Request[pb.ReportDiscoveredDevicesRequest]) (*connect.Response[pb.ReportDiscoveredDevicesResponse], error)
ReportPairedDevices(ctx context.Context, req *connect.Request[pb.ReportPairedDevicesRequest]) (*connect.Response[pb.ReportPairedDevicesResponse], error)
UploadCommandArtifact(ctx context.Context) *connect.ClientStreamForClient[pb.UploadCommandArtifactRequest, pb.UploadCommandArtifactResponse]
DownloadCommandArtifact(ctx context.Context, req *connect.Request[pb.DownloadCommandArtifactRequest]) (*connect.ServerStreamForClient[pb.DownloadCommandArtifactResponse], error)
ControlStream(ctx context.Context) *connect.BidiStreamForClient[pb.ControlStreamRequest, pb.ControlStreamResponse]
}

Expand Down
8 changes: 8 additions & 0 deletions server/cmd/fleetnode/run_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,14 @@ func (s *stubGatewayClient) ReportPairedDevices(_ context.Context, _ *connect.Re
return connect.NewResponse(&pb.ReportPairedDevicesResponse{}), nil
}

func (s *stubGatewayClient) UploadCommandArtifact(_ context.Context) *connect.ClientStreamForClient[pb.UploadCommandArtifactRequest, pb.UploadCommandArtifactResponse] {
return nil
}

func (s *stubGatewayClient) DownloadCommandArtifact(_ context.Context, _ *connect.Request[pb.DownloadCommandArtifactRequest]) (*connect.ServerStreamForClient[pb.DownloadCommandArtifactResponse], error) {
return nil, connect.NewError(connect.CodeUnimplemented, errors.New("not implemented"))
}

func (s *stubGatewayClient) ControlStream(_ context.Context) *connect.BidiStreamForClient[pb.ControlStreamRequest, pb.ControlStreamResponse] {
return nil
}
Expand Down
Loading
Loading