Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion justfile
Original file line number Diff line number Diff line change
Expand Up @@ -201,14 +201,15 @@ fleetnode-ui-test-up:
-H 'Content-Type: application/json' \
-d '{}' \
http://localhost:4000/onboarding.v1.OnboardingService/GetFleetInitStatus >/dev/null
"${COMPOSE[@]}" build fleetnode-ui-test
"${COMPOSE[@]}" run --rm \
-e FLEET_ADMIN_USERNAME \
-e FLEET_ADMIN_PASSWORD \
--entrypoint /app/fleetnode-ui-test fleetnode-ui-test \
--api-url=http://fleet-api:4000 \
--node-server-url=http://fleet-api:4000 \
--state-dir=/state
"${COMPOSE[@]}" up -d --build fleetnode-ui-test
"${COMPOSE[@]}" up -d fleetnode-ui-test
cd ../client
GIT_VERSION=$(git describe --tags --always --dirty 2>/dev/null || echo "dev")
BUILD_DATE=$(date -u +"%Y-%m-%dT%H:%M:%SZ")
Expand Down
5 changes: 5 additions & 0 deletions proto/fleetnodegateway/v1/fleetnodegateway.proto
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,7 @@ message MinerCommand {
GetMiningPoolsAction get_mining_pools = 11;
GetErrorsAction get_errors = 12;
UpdateMinerPasswordAction update_miner_password = 13;
DownloadLogsAction download_logs = 14;
}
}

Expand All @@ -340,6 +341,10 @@ message UncurtailAction {}
message GetMiningPoolsAction {}
message GetErrorsAction {}

message DownloadLogsAction {
string batch_log_uuid = 1 [(buf.validate.field).string = {min_len: 1, max_len: 128}];
}

message NodeEncryptedPayload {
string algorithm = 1 [
(buf.validate.field).string = {min_len: 1, max_len: 64},
Expand Down
2 changes: 1 addition & 1 deletion server/cmd/fleetnode/control.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ func (r *RunCmd) handleCommand(ctx context.Context, client gatewayClient, stream
case *pb.AgentCommand_Discover:
r.handleDiscover(ctx, client, stream, commandID, k.Discover, logger)
case *pb.AgentCommand_MinerCommand:
r.handleMinerCommand(ctx, stream, commandID, k.MinerCommand, logger)
r.handleMinerCommand(ctx, client, stream, commandID, k.MinerCommand, logger)
case *pb.AgentCommand_Pair:
r.handlePairCommand(ctx, client, stream, commandID, k.Pair, logger)
case *pb.AgentCommand_Telemetry:
Expand Down
40 changes: 40 additions & 0 deletions server/cmd/fleetnode/fake_gateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@ import (
"context"
"crypto/ed25519"
"errors"
"fmt"
"net/http"
"net/http/httptest"
"sync"
"testing"
"time"

"connectrpc.com/connect"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/timestamppb"

pb "github.com/block/proto-fleet/server/generated/grpc/fleetnodegateway/v1"
Expand Down Expand Up @@ -40,6 +42,11 @@ type fakeFleetNodeGateway struct {
heartbeatsReceived []heartbeatRecord
expectedSessionToken string
onHeartbeat func(count int)

artifactMu sync.Mutex
commandArtifactUploads []*pb.UploadCommandArtifactRequest
commandArtifactRef *pb.CommandArtifactRef
commandArtifactErr error
}

type heartbeatRecord struct {
Expand Down Expand Up @@ -106,6 +113,31 @@ func (f *fakeFleetNodeGateway) UploadHeartbeat(_ context.Context, req *connect.R
return connect.NewResponse(&pb.UploadHeartbeatResponse{ReceivedAt: timestamppb.Now()}), nil
}

func (f *fakeFleetNodeGateway) UploadCommandArtifact(_ context.Context, stream *connect.ClientStream[pb.UploadCommandArtifactRequest]) (*connect.Response[pb.UploadCommandArtifactResponse], error) {
requests := []*pb.UploadCommandArtifactRequest{}
for stream.Receive() {
cloned := proto.Clone(stream.Msg())
clonedReq, ok := cloned.(*pb.UploadCommandArtifactRequest)
if !ok {
return nil, fmt.Errorf("clone command artifact upload request: got %T", cloned)
}
requests = append(requests, clonedReq)
}
if err := stream.Err(); err != nil {
return nil, fmt.Errorf("receive command artifact upload: %w", err)
}

f.artifactMu.Lock()
f.commandArtifactUploads = append(f.commandArtifactUploads, requests...)
ref := f.commandArtifactRef
err := f.commandArtifactErr
f.artifactMu.Unlock()
if err != nil {
return nil, err
}
return connect.NewResponse(&pb.UploadCommandArtifactResponse{Artifact: ref}), nil
}

func (f *fakeFleetNodeGateway) heartbeatCount() int {
f.heartbeatMu.Lock()
defer f.heartbeatMu.Unlock()
Expand All @@ -120,6 +152,14 @@ func (f *fakeFleetNodeGateway) heartbeats() []heartbeatRecord {
return out
}

func (f *fakeFleetNodeGateway) artifactUploads() []*pb.UploadCommandArtifactRequest {
f.artifactMu.Lock()
defer f.artifactMu.Unlock()
out := make([]*pb.UploadCommandArtifactRequest, len(f.commandArtifactUploads))
copy(out, f.commandArtifactUploads)
return out
}

func newFakeServer(t *testing.T, fake *fakeFleetNodeGateway) *httptest.Server {
t.Helper()
mux := http.NewServeMux()
Expand Down
120 changes: 117 additions & 3 deletions server/cmd/fleetnode/minercommand.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package main

import (
"bytes"
"context"
"crypto/sha256"
"encoding/hex"
"errors"
"fmt"
Expand All @@ -22,6 +24,7 @@ import (
minercommandpb "github.com/block/proto-fleet/server/generated/grpc/minercommand/v1"
"github.com/block/proto-fleet/server/internal/domain/fleetnode/commandresult"
"github.com/block/proto-fleet/server/internal/domain/fleetnode/passwordupdate"
"github.com/block/proto-fleet/server/internal/domain/miner/logformat"
minermodels "github.com/block/proto-fleet/server/internal/domain/miner/models"
"github.com/block/proto-fleet/server/internal/domain/sv2"
"github.com/block/proto-fleet/server/internal/fleetnode/bootstrap"
Expand All @@ -38,6 +41,8 @@ const (
supportedMiningPoolSlots = 3
maxSupportedMiningPoolPriority = supportedMiningPoolSlots - 1
maxGetErrorsReports = 512
minerLogsArtifactFilename = "miner-logs.csv"
commandArtifactChunkSize = 1 << 20
)

// driverGetter is the plugin-manager seam the executor needs; *plugins.Manager satisfies it.
Expand All @@ -64,7 +69,7 @@ func (nodeSecretProvider) Seal(_ sdk.SecretBundle) (*pb.EncryptedCredentials, er
return nil, cmdErr(pb.AckCode_ACK_CODE_AGENT_INCAPABLE, "fleet node has no credential sealer configured")
}

func (r *RunCmd) handleMinerCommand(ctx context.Context, stream acker, commandID string, mc *pb.MinerCommand, logger *slog.Logger) {
func (r *RunCmd) handleMinerCommand(ctx context.Context, client gatewayClient, stream acker, commandID string, mc *pb.MinerCommand, logger *slog.Logger) {
if r.driverGetter == nil || r.minerSecrets == nil {
r.sendAck(stream, commandID, pb.AckCode_ACK_CODE_AGENT_INCAPABLE, "fleet node has no plugins loaded", logger)
return
Expand Down Expand Up @@ -128,11 +133,18 @@ func (r *RunCmd) handleMinerCommand(ctx context.Context, stream acker, commandID
}
}()

caps, err := commandCapabilities(cmdCtx, driver, mc)
if err != nil {
code, msg := classifyMinerCommandError("load driver capabilities", err)
r.sendAck(stream, commandID, code, msg, logger)
return
}

var payload []byte
if passwordUpdate != nil {
payload, err = passwordUpdate.run(cmdCtx, dev, bundle, r.minerSecrets)
} else {
payload, err = runMinerAction(cmdCtx, dev, mc)
payload, err = runMinerAction(cmdCtx, client, commandID, caps, dev, mc)
}
if err != nil {
code, msg := classifyMinerActionError("execute command", mc, err)
Expand Down Expand Up @@ -218,7 +230,15 @@ func validateDialTarget(t *pb.MinerConnectionDescriptor) error {
return nil
}

func runMinerAction(ctx context.Context, dev sdk.Device, mc *pb.MinerCommand) ([]byte, error) {
func commandCapabilities(ctx context.Context, driver sdk.Driver, mc *pb.MinerCommand) (sdk.Capabilities, error) {
if _, ok := mc.GetAction().(*pb.MinerCommand_DownloadLogs); !ok {
return nil, nil
}
_, caps, err := driver.DescribeDriver(ctx)
return caps, err
}

func runMinerAction(ctx context.Context, client gatewayClient, commandID string, caps sdk.Capabilities, dev sdk.Device, mc *pb.MinerCommand) ([]byte, error) {
switch a := mc.GetAction().(type) {
case *pb.MinerCommand_Reboot:
return nil, dev.Reboot(ctx)
Expand Down Expand Up @@ -274,11 +294,105 @@ func runMinerAction(ctx context.Context, dev sdk.Device, mc *pb.MinerCommand) ([
return nil, err
}
return getErrorsResultPayload(mc.GetTarget().GetDeviceIdentifier(), deviceErrors)
case *pb.MinerCommand_DownloadLogs:
logData, moreData, err := dev.DownloadLogs(ctx, nil, a.DownloadLogs.GetBatchLogUuid())
if err != nil {
return nil, err
}
payload, err := minerLogsArtifactPayload(logData, caps[sdk.CapabilityLogLevels])
if err != nil {
return nil, err
}
if _, err := uploadMinerLogsArtifact(ctx, client, commandID, mc.GetTarget().GetDeviceIdentifier(), payload); err != nil {
return nil, err
}
if moreData {
return nil, cmdErr(pb.AckCode_ACK_CODE_PARTIAL, "uploaded partial miner log data; retry after partial log pagination is supported")
Comment thread
ankitgoswami marked this conversation as resolved.
}
return nil, nil
default:
return nil, cmdErr(pb.AckCode_ACK_CODE_BAD_REQUEST, "unrecognized miner command action")
}
}

func uploadMinerLogsArtifact(ctx context.Context, client gatewayClient, commandID string, deviceIdentifier string, payload []byte) (*pb.CommandArtifactRef, error) {
if client == nil {
return nil, fmt.Errorf("gateway client unavailable for miner log upload")
}
sum := sha256.Sum256(payload)
sha := hex.EncodeToString(sum[:])

stream := client.UploadCommandArtifact(ctx)
if stream == nil {
return nil, fmt.Errorf("gateway client returned nil command artifact upload stream")
}
if err := stream.Send(&pb.UploadCommandArtifactRequest{Part: &pb.UploadCommandArtifactRequest_Header{
Header: &pb.CommandArtifactUploadHeader{
CommandId: commandID,
Purpose: pb.CommandArtifactPurpose_COMMAND_ARTIFACT_PURPOSE_MINER_LOGS,
Filename: minerLogsArtifactFilename,
SizeBytes: int64(len(payload)),
Sha256: sha,
DeviceIdentifier: deviceIdentifier,
},
}}); err != nil {
return nil, fmt.Errorf("upload miner logs header: %w", err)
}
for offset := 0; offset < len(payload); offset += commandArtifactChunkSize {
end := offset + commandArtifactChunkSize
if end > len(payload) {
end = len(payload)
}
if err := stream.Send(&pb.UploadCommandArtifactRequest{Part: &pb.UploadCommandArtifactRequest_Chunk{
Chunk: &pb.CommandArtifactChunk{Data: payload[offset:end]},
}}); err != nil {
return nil, fmt.Errorf("upload miner logs chunk: %w", err)
}
}
resp, err := stream.CloseAndReceive()
if err != nil {
return nil, fmt.Errorf("finish miner logs upload: %w", err)
}
if resp == nil || resp.Msg == nil || resp.Msg.GetArtifact() == nil {
return nil, fmt.Errorf("miner logs upload returned no artifact")
}
ref := resp.Msg.GetArtifact()
if ref.GetPurpose() != pb.CommandArtifactPurpose_COMMAND_ARTIFACT_PURPOSE_MINER_LOGS {
return nil, fmt.Errorf("miner logs upload returned artifact purpose %s", ref.GetPurpose())
}
if ref.GetSizeBytes() != int64(len(payload)) {
return nil, fmt.Errorf("miner logs upload returned size %d, want %d", ref.GetSizeBytes(), len(payload))
}
if ref.GetSha256() != sha {
return nil, fmt.Errorf("miner logs upload returned sha256 %q, want %q", ref.GetSha256(), sha)
}
return ref, nil
}

func minerLogsArtifactPayload(logData string, includeType bool) ([]byte, error) {
if int64(len(logData)) > logformat.MaxArtifactBytes {
return nil, cmdErr(pb.AckCode_ACK_CODE_BAD_REQUEST, "miner log data exceeds %d byte download limit", logformat.MaxArtifactBytes)
Comment thread
ankitgoswami marked this conversation as resolved.
}
Comment thread
ankitgoswami marked this conversation as resolved.
payload := &limitedBuffer{limit: logformat.MaxArtifactBytes}
if err := logformat.WriteTextToCSV(payload, logData, includeType); err != nil {
return nil, err
}
return payload.Bytes(), nil
}

type limitedBuffer struct {
bytes.Buffer
limit int64
}

func (b *limitedBuffer) Write(p []byte) (int, error) {
if int64(b.Len()+len(p)) > b.limit {
return 0, cmdErr(pb.AckCode_ACK_CODE_BAD_REQUEST, "miner log artifact exceeds %d byte download limit", b.limit)
}
n, _ := b.Buffer.Write(p)
return n, nil
}

func decryptUpdateMinerPasswordSecret(privateKey []byte, target *pb.MinerConnectionDescriptor, action *pb.UpdateMinerPasswordAction) (passwordupdate.Secret, error) {
if len(privateKey) == 0 {
return passwordupdate.Secret{}, cmdErr(pb.AckCode_ACK_CODE_AGENT_INCAPABLE, "fleet node has no password update decryption key configured")
Expand Down
Loading
Loading