Skip to content

Commit caa0440

Browse files
authored
Handful of PRs in one (#461)
* move node info to system only * stop workloads fixed * missing envelope wrappers * adds namespace verification to stop workloads * adds namespace to monitoring * multi clones on workload * workload replicas * adds workload ls command - aka namespace ping * adds a dev mode with warning * run state to persist in internal nats Signed-off-by: Jordan Rash <[email protected]>
1 parent 449c72f commit caa0440

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

43 files changed

+2207
-975
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,3 +47,4 @@ TESTCREDS/
4747
test/testdata/direct_start/direct_start
4848
test/testdata/counter
4949
test/testdata/function/function
50+
test/testdata/forever/forever

Taskfile.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,8 @@ tasks:
5454
--schema-output=io.nats.nex.v2.stop_workload_response=gen/stop_workload_response.go
5555
--schema-package=io.nats.nex.v2.stop_workload_request=github.com/synadia-io/nex/api/nodecontrol/gen
5656
--schema-output=io.nats.nex.v2.stop_workload_request=gen/stop_workload_request.go
57+
--schema-package=io.nats.nex.v2.node_info_request=github.com/synadia-io/nex/api/nodecontrol/gen
58+
--schema-output=io.nats.nex.v2.node_info_request=gen/node_info_request.go
5759
--schema-package=io.nats.nex.v2.node_info_response=github.com/synadia-io/nex/api/nodecontrol/gen
5860
--schema-output=io.nats.nex.v2.node_info_response=gen/node_info_response.go
5961
--schema-package=io.nats.nex.v2.lameduck_response=github.com/synadia-io/nex/api/nodecontrol/gen

api/nodecontrol/client.go

Lines changed: 72 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ import (
1212
)
1313

1414
var (
15-
DefaultRequestTimeout = 10 * time.Second
15+
DefaultRequestTimeout = 5 * time.Second
1616
)
1717

1818
type ControlAPIClient struct {
@@ -31,7 +31,7 @@ func (c *ControlAPIClient) Auction(namespace string, tags map[string]string) ([]
3131
resp := []*nodegen.AuctionResponseJson{}
3232
auctionRespInbox := nats.NewInbox()
3333

34-
_, err := c.nc.Subscribe(auctionRespInbox, func(m *nats.Msg) {
34+
s, err := c.nc.Subscribe(auctionRespInbox, func(m *nats.Msg) {
3535
envelope := new(models.Envelope[nodegen.AuctionResponseJson])
3636
err := json.Unmarshal(m.Data, envelope)
3737
if err != nil {
@@ -43,19 +43,25 @@ func (c *ControlAPIClient) Auction(namespace string, tags map[string]string) ([]
4343
if err != nil {
4444
return nil, err
4545
}
46+
defer func() {
47+
err = s.Drain()
48+
if err != nil {
49+
c.logger.Error("failed to drain subscription", slog.Any("err", err))
50+
}
51+
}()
4652

4753
req := nodegen.AuctionRequestJson{
4854
AuctionId: nuid.New().Next(),
4955
Tags: nodegen.AuctionRequestJsonTags{
5056
Tags: tags,
5157
},
5258
}
53-
req_b, err := json.Marshal(req)
59+
reqB, err := json.Marshal(req)
5460
if err != nil {
5561
return nil, err
5662
}
5763

58-
err = c.nc.PublishRequest(models.AuctionRequestSubject(namespace), auctionRespInbox, req_b)
64+
err = c.nc.PublishRequest(models.AuctionRequestSubject(namespace), auctionRespInbox, reqB)
5965
if err != nil {
6066
return nil, err
6167
}
@@ -68,7 +74,7 @@ func (c *ControlAPIClient) Ping() ([]*nodegen.NodePingResponseJson, error) {
6874
resp := []*nodegen.NodePingResponseJson{}
6975
pingRespInbox := nats.NewInbox()
7076

71-
_, err := c.nc.Subscribe(pingRespInbox, func(m *nats.Msg) {
77+
s, err := c.nc.Subscribe(pingRespInbox, func(m *nats.Msg) {
7278
envelope := new(models.Envelope[nodegen.NodePingResponseJson])
7379
err := json.Unmarshal(m.Data, envelope)
7480
if err != nil {
@@ -80,6 +86,12 @@ func (c *ControlAPIClient) Ping() ([]*nodegen.NodePingResponseJson, error) {
8086
if err != nil {
8187
return nil, err
8288
}
89+
defer func() {
90+
err = s.Drain()
91+
if err != nil {
92+
c.logger.Error("failed to drain subscription", slog.Any("err", err))
93+
}
94+
}()
8395

8496
err = c.nc.PublishRequest(models.PingSubject(), pingRespInbox, nil)
8597
if err != nil {
@@ -104,8 +116,8 @@ func (c *ControlAPIClient) DirectPing(nodeId string) (*nodegen.NodePingResponseJ
104116
return resp, nil
105117
}
106118

107-
func (c *ControlAPIClient) FindWorkload(inType, namespace, workloadId string) (*nodegen.WorkloadPingResponseJson, error) {
108-
msg, err := c.nc.Request(models.WorkloadPingRequestSubject(inType, namespace, workloadId), nil, DefaultRequestTimeout)
119+
func (c *ControlAPIClient) FindWorkload(namespace, workloadId string) (*nodegen.WorkloadPingResponseJson, error) {
120+
msg, err := c.nc.Request(models.WorkloadPingRequestSubject(namespace, workloadId), nil, DefaultRequestTimeout)
109121
if err != nil {
110122
return nil, err
111123
}
@@ -119,12 +131,44 @@ func (c *ControlAPIClient) FindWorkload(inType, namespace, workloadId string) (*
119131
return &envelope.Data, nil
120132
}
121133

134+
func (c *ControlAPIClient) ListWorkloads(namespace string) ([]nodegen.WorkloadSummary, error) {
135+
workloadsInbox := nats.NewInbox()
136+
137+
var ret []nodegen.WorkloadSummary
138+
s, err := c.nc.Subscribe(workloadsInbox, func(m *nats.Msg) {
139+
envelope := new(models.Envelope[[]nodegen.WorkloadSummary])
140+
err := json.Unmarshal(m.Data, envelope)
141+
if err != nil {
142+
c.logger.Error("failed to unmarshal workloads response", slog.Any("err", err), slog.String("data", string(m.Data)))
143+
return
144+
}
145+
ret = append(ret, envelope.Data...)
146+
})
147+
if err != nil {
148+
return nil, err
149+
}
150+
defer func() {
151+
err = s.Drain()
152+
if err != nil {
153+
c.logger.Error("failed to drain subscription", slog.Any("err", err))
154+
}
155+
}()
156+
157+
err = c.nc.PublishRequest(models.NamespacePingRequestSubject(namespace), workloadsInbox, nil)
158+
if err != nil {
159+
return nil, err
160+
}
161+
162+
time.Sleep(5 * time.Second)
163+
return ret, nil
164+
}
165+
122166
func (c *ControlAPIClient) AuctionDeployWorkload(namespace, bidderId string, req nodegen.StartWorkloadRequestJson) (*nodegen.StartWorkloadResponseJson, error) {
123-
req_b, err := json.Marshal(req)
167+
reqB, err := json.Marshal(req)
124168
if err != nil {
125169
return nil, err
126170
}
127-
msg, err := c.nc.Request(models.AuctionDeployRequestSubject(namespace, bidderId), req_b, DefaultRequestTimeout)
171+
msg, err := c.nc.Request(models.AuctionDeployRequestSubject(namespace, bidderId), reqB, 30*time.Second)
128172
if err != nil {
129173
return nil, err
130174
}
@@ -139,12 +183,12 @@ func (c *ControlAPIClient) AuctionDeployWorkload(namespace, bidderId string, req
139183
}
140184

141185
func (c *ControlAPIClient) DeployWorkload(namespace, nodeId string, req nodegen.StartWorkloadRequestJson) (*nodegen.StartWorkloadResponseJson, error) {
142-
req_b, err := json.Marshal(req)
186+
reqB, err := json.Marshal(req)
143187
if err != nil {
144188
return nil, err
145189
}
146190

147-
msg, err := c.nc.Request(models.DirectDeploySubject(nodeId), req_b, DefaultRequestTimeout)
191+
msg, err := c.nc.Request(models.DirectDeploySubject(nodeId), reqB, 30*time.Second)
148192
if err != nil {
149193
return nil, err
150194
}
@@ -158,13 +202,8 @@ func (c *ControlAPIClient) DeployWorkload(namespace, nodeId string, req nodegen.
158202
return &envelope.Data, nil
159203
}
160204

161-
func (c *ControlAPIClient) UndeployWorkload(namespace, nodeId, workloadId string, req nodegen.StopWorkloadRequestJson) (*nodegen.StopWorkloadResponseJson, error) {
162-
req_b, err := json.Marshal(req)
163-
if err != nil {
164-
return nil, err
165-
}
166-
167-
msg, err := c.nc.Request(models.UndeployRequestSubject(namespace, nodeId), req_b, DefaultRequestTimeout)
205+
func (c *ControlAPIClient) UndeployWorkload(namespace, workloadId string) (*nodegen.StopWorkloadResponseJson, error) {
206+
msg, err := c.nc.Request(models.UndeployRequestSubject(namespace, workloadId), nil, DefaultRequestTimeout)
168207
if err != nil {
169208
return nil, err
170209
}
@@ -178,8 +217,13 @@ func (c *ControlAPIClient) UndeployWorkload(namespace, nodeId, workloadId string
178217
return &envelope.Data, nil
179218
}
180219

181-
func (c *ControlAPIClient) GetInfo(nodeId, namespace string) (*nodegen.NodeInfoResponseJson, error) {
182-
msg, err := c.nc.Request(models.InfoRequestSubject(namespace, nodeId), nil, DefaultRequestTimeout)
220+
func (c *ControlAPIClient) GetInfo(nodeId string, req nodegen.NodeInfoRequestJson) (*nodegen.NodeInfoResponseJson, error) {
221+
reqB, err := json.Marshal(req)
222+
if err != nil {
223+
return nil, err
224+
}
225+
226+
msg, err := c.nc.Request(models.InfoSubject(nodeId), reqB, DefaultRequestTimeout)
183227
if err != nil {
184228
return nil, err
185229
}
@@ -198,12 +242,12 @@ func (c *ControlAPIClient) SetLameDuck(nodeId string, delay time.Duration) (*nod
198242
Delay: delay.String(),
199243
}
200244

201-
req_b, err := json.Marshal(req)
245+
reqB, err := json.Marshal(req)
202246
if err != nil {
203247
return nil, err
204248
}
205249

206-
msg, err := c.nc.Request(models.LameduckSubject(nodeId), req_b, DefaultRequestTimeout)
250+
msg, err := c.nc.Request(models.LameduckSubject(nodeId), reqB, DefaultRequestTimeout)
207251
if err != nil {
208252
return nil, err
209253
}
@@ -217,9 +261,9 @@ func (c *ControlAPIClient) SetLameDuck(nodeId string, delay time.Duration) (*nod
217261
return &envelope.Data, nil
218262
}
219263

220-
func (c *ControlAPIClient) MonitorLogs(workloadId, level string) (chan []byte, error) {
264+
func (c *ControlAPIClient) MonitorLogs(namespace, workloadId, level string) (chan []byte, error) {
221265
subject := models.LOGS_SUBJECT
222-
f_subject, err := subject.Filter(workloadId, level)
266+
f_subject, err := subject.Filter(namespace, workloadId, level)
223267
if err != nil {
224268
return nil, err
225269
}
@@ -235,9 +279,9 @@ func (c *ControlAPIClient) MonitorLogs(workloadId, level string) (chan []byte, e
235279
return ret, nil
236280
}
237281

238-
func (c *ControlAPIClient) MonitorEvents(workloadId, eventType string) (chan *json.RawMessage, error) {
282+
func (c *ControlAPIClient) MonitorEvents(namespace, workloadId, eventType string) (chan *json.RawMessage, error) {
239283
subject := models.EVENTS_SUBJECT
240-
f_subject, err := subject.Filter(workloadId, eventType)
284+
f_subject, err := subject.Filter(namespace, workloadId, eventType)
241285
if err != nil {
242286
return nil, err
243287
}
@@ -264,12 +308,12 @@ func (c *ControlAPIClient) CopyWorkload(workloadId, namespace string, targetXkey
264308
NewTargetXkey: targetXkey,
265309
}
266310

267-
req_b, err := json.Marshal(req)
311+
reqB, err := json.Marshal(req)
268312
if err != nil {
269313
return nil, err
270314
}
271315

272-
msg, err := c.nc.Request(models.CloneWorkloadRequestSubject(namespace, workloadId), req_b, DefaultRequestTimeout)
316+
msg, err := c.nc.Request(models.CloneWorkloadRequestSubject(namespace, workloadId), reqB, DefaultRequestTimeout)
273317
if err != nil {
274318
return nil, err
275319
}

api/nodecontrol/gen/node_info_request.go

Lines changed: 29 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

api/nodecontrol/gen/stop_workload_request.go

Lines changed: 4 additions & 16 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
{
2+
"$schema": "http://json-schema.org/draft-07/schema#",
3+
"$id": "io.nats.nex.v2.node_info_request",
4+
"title": "InfoRequest",
5+
"type": "object",
6+
"properties": {
7+
"namespace": {
8+
"type": "string",
9+
"description": "Namespace of the node"
10+
}
11+
},
12+
"required": [
13+
"namespace"
14+
]
15+
}

api/nodecontrol/stop-workload-request.json

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -4,24 +4,16 @@
44
"title": "StopWorkloadRequest",
55
"type": "object",
66
"properties": {
7-
"node_id": {
8-
"type": "string"
9-
},
10-
"workload_type": {
7+
"namespace": {
118
"type": "string"
129
},
1310
"workload_id": {
1411
"type": "string"
15-
},
16-
"workload_jwt": {
17-
"type": "string"
1812
}
1913
},
2014
"required": [
21-
"node_id",
22-
"workload_type",
23-
"workload_id",
24-
"workload_jwt"
15+
"namespace",
16+
"workload_id"
2517
],
2618
"definitions": {},
2719
"additionalProperties": false

cmd/nex/cli_unix.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ type Globals struct {
1818
Check bool `help:"Print the current values of all options without running a command"`
1919
DisableUpgradeCheck bool `env:"NEX_DISABLE_UPGRADE_CHECK" name:"disable-upgrade-check" help:"Disable the upgrade check"`
2020
AutoUpgrade bool `env:"NEX_AUTO_UPGRADE" name:"auto-upgrade" help:"Automatically upgrade the nex CLI when a new version is available"`
21+
DevMode bool `name:"dev" default:"false" help:"Enable development mode"`
2122
}
2223

2324
type NexCLI struct {

cmd/nex/cli_windows.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ type Globals struct {
1212
Version kong.VersionFlag `help:"Print version information"`
1313
Namespace string `env:"NEX_NAMESPACE" default:"system" help:"Specifies namespace when running nex commands"`
1414
Check bool `help:"Print the current values of all options without running a command"`
15+
DevMode bool `name:"dev" default:"false" help:"Enable development mode"`
1516
}
1617

1718
type NexCLI struct {

0 commit comments

Comments
 (0)