Skip to content

Commit ec89439

Browse files
committed
Implements durabletask ListInstanceIDs & GetInstanceHistory
Today, there is no ways of discovering the list of workflow instances that are currently running or have completed in the past without using external storage queries. The Dapr CLI [introduced list and workflow history commands](dapr/cli#1560) to get information about running and completed workflows, however these commands rely on direct queries to the underlying storage provider. By introducing this functionality into the durabletask framework itself, these commands need only talk to Daprd, removing the requirement for direct access to the storage provider as well as authentication. Daprd can make these queries itself, and use the Actor State Store component to access the underlying storage. Implements the new durabletask APIs according to dapr/proposals#93 Signed-off-by: joshvanl <[email protected]>
1 parent efee8ae commit ec89439

File tree

32 files changed

+1087
-81
lines changed

32 files changed

+1087
-81
lines changed

go.mod

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -525,3 +525,7 @@ replace (
525525
//
526526
// Then, run `make modtidy-all` in this repository.
527527
// This ensures that go.mod and go.sum are up-to-date for each go.mod file.
528+
529+
replace github.com/dapr/durabletask-go => github.com/joshvanl/durabletask-go v0.0.0-20251105162850-0096c0be3cae
530+
531+
replace github.com/dapr/components-contrib => github.com/joshvanl/components-contrib v0.0.0-20251105114938-38d2fb353cc5

go.sum

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -515,10 +515,6 @@ github.com/dancannon/gorethink v4.0.0+incompatible h1:KFV7Gha3AuqT+gr0B/eKvGhbjm
515515
github.com/dancannon/gorethink v4.0.0+incompatible/go.mod h1:BLvkat9KmZc1efyYwhz3WnybhRZtgF1K929FD8z1avU=
516516
github.com/danieljoos/wincred v1.1.2 h1:QLdCxFs1/Yl4zduvBdcHB8goaYk9RARS2SgLLRuAyr0=
517517
github.com/danieljoos/wincred v1.1.2/go.mod h1:GijpziifJoIBfYh+S7BbkdUTU4LfM+QnGqR5Vl2tAx0=
518-
github.com/dapr/components-contrib v1.16.1 h1:EpntHk5qUXTbB55kec97cMQbC2QXBwbBU6QMI3ljV8g=
519-
github.com/dapr/components-contrib v1.16.1/go.mod h1:1AufCWqZwBj//UkyS7FesOEmp5/E6Xgy1tyCn8peiR4=
520-
github.com/dapr/durabletask-go v0.10.1 h1:gE88Qh4+/6zKdegHjOAOx+UQaPxmwWKWoIDivee23XY=
521-
github.com/dapr/durabletask-go v0.10.1/go.mod h1:0Ts4rXp74JyG19gDWPcwNo5V6NBZzhARzHF5XynmA7Q=
522518
github.com/dapr/kit v0.16.1 h1:MqLAhHVg8trPy2WJChMZFU7ToeondvxcNHYVvMDiVf4=
523519
github.com/dapr/kit v0.16.1/go.mod h1:40ZWs5P6xfYf7O59XgwqZkIyDldTIXlhTQhGop8QoSM=
524520
github.com/dave/jennifer v1.4.0/go.mod h1:fIb+770HOpJ2fmN9EPPKOqm1vMGhB+TwXKMZhrIygKg=
@@ -1132,6 +1128,10 @@ github.com/jonboulle/clockwork v0.5.0 h1:Hyh9A8u51kptdkR+cqRpT1EebBwTn1oK9YfGYbd
11321128
github.com/jonboulle/clockwork v0.5.0/go.mod h1:3mZlmanh0g2NDKO5TWZVJAfofYk64M7XN3SzBPjZF60=
11331129
github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY=
11341130
github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y=
1131+
github.com/joshvanl/components-contrib v0.0.0-20251105114938-38d2fb353cc5 h1:IgQW5uO5mvT2jTfpAeVecJ9aTSuRPS/y9g1SU8XmPn0=
1132+
github.com/joshvanl/components-contrib v0.0.0-20251105114938-38d2fb353cc5/go.mod h1:1AufCWqZwBj//UkyS7FesOEmp5/E6Xgy1tyCn8peiR4=
1133+
github.com/joshvanl/durabletask-go v0.0.0-20251105162850-0096c0be3cae h1:cWA+L6X+Kq3CzzwB3U4kQ8nmPs7e6UwhAEnTIh8hSOo=
1134+
github.com/joshvanl/durabletask-go v0.0.0-20251105162850-0096c0be3cae/go.mod h1:0Ts4rXp74JyG19gDWPcwNo5V6NBZzhARzHF5XynmA7Q=
11351135
github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4=
11361136
github.com/json-iterator/go v1.1.5/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU=
11371137
github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU=

pkg/runtime/runtime.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -307,6 +307,7 @@ func newDaprRuntime(ctx context.Context,
307307
Resiliency: resiliencyProvider,
308308
EventSink: runtimeConfig.workflowEventSink,
309309
EnableClusteredDeployment: globalConfig.IsFeatureEnabled(config.WorkflowsClusteredDeployment),
310+
ComponentStore: compStore,
310311
})
311312

312313
jobsManager, err := scheduler.New(scheduler.Options{

pkg/runtime/wfengine/backends/actors/actors.go

Lines changed: 46 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,9 @@ import (
4242
invokev1 "github.com/dapr/dapr/pkg/messaging/v1"
4343
internalsv1pb "github.com/dapr/dapr/pkg/proto/internals/v1"
4444
"github.com/dapr/dapr/pkg/resiliency"
45+
"github.com/dapr/dapr/pkg/runtime/compstore"
4546
"github.com/dapr/dapr/pkg/runtime/wfengine/state"
47+
"github.com/dapr/dapr/pkg/runtime/wfengine/state/list"
4648
"github.com/dapr/dapr/pkg/runtime/wfengine/todo"
4749
"github.com/dapr/dapr/utils"
4850
"github.com/dapr/durabletask-go/api"
@@ -64,11 +66,12 @@ const (
6466
)
6567

6668
type Options struct {
67-
AppID string
68-
Namespace string
69-
Actors actors.Interface
70-
Resiliency resiliency.Provider
71-
EventSink orchestrator.EventSink
69+
AppID string
70+
Namespace string
71+
Actors actors.Interface
72+
Resiliency resiliency.Provider
73+
EventSink orchestrator.EventSink
74+
ComponentStore *compstore.ComponentStore
7275
// experimental feature
7376
// enabling this will use the cluster tasks backend for pending tasks, instead of the default local implementation
7477
// the cluster tasks backend uses actors to share the state of pending tasks
@@ -88,6 +91,7 @@ type Actors struct {
8891
resiliency resiliency.Provider
8992
actors actors.Interface
9093
eventSink orchestrator.EventSink
94+
compStore *compstore.ComponentStore
9195

9296
orchestrationWorkItemChan chan *backend.OrchestrationWorkItem
9397
activityWorkItemChan chan *backend.ActivityWorkItem
@@ -113,6 +117,7 @@ func New(opts Options) *Actors {
113117
resiliency: opts.Resiliency,
114118
pendingTasksBackend: pendingTasksBackend,
115119
enableClusteredDeployment: opts.EnableClusteredDeployment,
120+
compStore: opts.ComponentStore,
116121
orchestrationWorkItemChan: make(chan *backend.OrchestrationWorkItem, 1),
117122
activityWorkItemChan: make(chan *backend.ActivityWorkItem, 1),
118123
eventSink: opts.EventSink,
@@ -629,3 +634,39 @@ func (abe *Actors) WaitForActivityCompletion(ctx context.Context, request *proto
629634
func (abe *Actors) WaitForOrchestratorCompletion(ctx context.Context, request *protos.OrchestratorRequest) (*protos.OrchestratorResponse, error) {
630635
return abe.pendingTasksBackend.WaitForOrchestratorCompletion(ctx, request)
631636
}
637+
638+
func (abe *Actors) ListInstanceIDs(ctx context.Context, req *protos.ListInstanceIDsRequest) (*protos.ListInstanceIDsResponse, error) {
639+
resp, err := list.ListInstanceIDs(ctx, list.ListOptions{
640+
ComponentStore: abe.compStore,
641+
Namespace: abe.namespace,
642+
AppID: abe.appID,
643+
PageSize: req.PageSize,
644+
ContinuationToken: req.ContinuationToken,
645+
})
646+
if err != nil {
647+
return nil, err
648+
}
649+
650+
return &protos.ListInstanceIDsResponse{
651+
InstanceIds: resp.Keys,
652+
ContinuationToken: resp.ContinuationToken,
653+
}, nil
654+
}
655+
656+
func (abe *Actors) GetInstanceHistory(ctx context.Context, req *protos.GetInstanceHistoryRequest) (*protos.GetInstanceHistoryResponse, error) {
657+
ss, err := abe.actors.State(ctx)
658+
if err != nil {
659+
return nil, err
660+
}
661+
662+
resp, err := state.LoadWorkflowState(ctx, ss, req.GetInstanceId(), state.Options{
663+
AppID: abe.appID,
664+
WorkflowActorType: abe.workflowActorType,
665+
ActivityActorType: abe.activityActorType,
666+
})
667+
if err != nil {
668+
return nil, err
669+
}
670+
671+
return &protos.GetInstanceHistoryResponse{Events: resp.History}, nil
672+
}
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
/*
2+
Copyright 2025 The Dapr Authors
3+
Licensed under the Apache License, Version 2.0 (the "License");
4+
you may not use this file except in compliance with the License.
5+
You may obtain a copy of the License at
6+
http://www.apache.org/licenses/LICENSE-2.0
7+
Unless required by applicable law or agreed to in writing, software
8+
distributed under the License is distributed on an "AS IS" BASIS,
9+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
See the License for the specific language governing permissions and
11+
limitations under the License.
12+
*/
13+
14+
package list
15+
16+
import (
17+
"context"
18+
"errors"
19+
"fmt"
20+
"strings"
21+
22+
"github.com/dapr/components-contrib/state"
23+
"github.com/dapr/dapr/pkg/runtime/compstore"
24+
)
25+
26+
type ListOptions struct {
27+
ComponentStore *compstore.ComponentStore
28+
Namespace string
29+
AppID string
30+
31+
PageSize *uint32
32+
ContinuationToken *string
33+
}
34+
35+
type ListInstanceIDsResult struct {
36+
Keys []string
37+
ContinuationToken *string
38+
}
39+
40+
func ListInstanceIDs(ctx context.Context, opts ListOptions) (*ListInstanceIDsResult, error) {
41+
store, _, ok := opts.ComponentStore.GetStateStoreActor()
42+
if !ok {
43+
return nil, errors.New("no state store with actor support found")
44+
}
45+
46+
ks, ok := store.(state.KeysLiker)
47+
if !ok {
48+
return nil, fmt.Errorf("state store %T does not support listing keys", store)
49+
}
50+
51+
like := opts.AppID + "||dapr.internal." + opts.Namespace + "." + opts.AppID + ".workflow||%||metadata"
52+
53+
resp, err := ks.KeysLike(ctx, &state.KeysLikeRequest{
54+
Pattern: like,
55+
ContinueToken: opts.ContinuationToken,
56+
PageSize: opts.PageSize,
57+
})
58+
if err != nil {
59+
return nil, err
60+
}
61+
62+
keys := make([]string, 0, len(resp.Keys))
63+
for _, key := range resp.Keys {
64+
split := strings.Split(key, "||")
65+
if len(split) != 4 {
66+
return nil, fmt.Errorf("invalid key format: %s", key)
67+
}
68+
69+
keys = append(keys, split[2])
70+
}
71+
72+
return &ListInstanceIDsResult{
73+
Keys: keys,
74+
ContinuationToken: resp.ContinueToken,
75+
}, nil
76+
}

pkg/runtime/wfengine/wfengine.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import (
3030
"github.com/dapr/dapr/pkg/config"
3131
runtimev1pb "github.com/dapr/dapr/pkg/proto/runtime/v1"
3232
"github.com/dapr/dapr/pkg/resiliency"
33+
"github.com/dapr/dapr/pkg/runtime/compstore"
3334
"github.com/dapr/dapr/pkg/runtime/processor"
3435
backendactors "github.com/dapr/dapr/pkg/runtime/wfengine/backends/actors"
3536
"github.com/dapr/durabletask-go/backend"
@@ -59,6 +60,7 @@ type Options struct {
5960
Resiliency resiliency.Provider
6061
EventSink orchestrator.EventSink
6162
EnableClusteredDeployment bool
63+
ComponentStore *compstore.ComponentStore
6264
}
6365

6466
type engine struct {
@@ -83,6 +85,7 @@ func New(opts Options) Interface {
8385
Resiliency: opts.Resiliency,
8486
EventSink: opts.EventSink,
8587
EnableClusteredDeployment: opts.EnableClusteredDeployment,
88+
ComponentStore: opts.ComponentStore,
8689
})
8790

8891
var getWorkItemsCount atomic.Int32

tests/integration/framework/process/workflow/workflow.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import (
3030
"github.com/dapr/dapr/tests/integration/framework/process/sqlite"
3131
"github.com/dapr/durabletask-go/client"
3232
"github.com/dapr/durabletask-go/task"
33+
"github.com/dapr/durabletask-go/workflow"
3334
)
3435

3536
type Workflow struct {
@@ -83,6 +84,11 @@ func New(t *testing.T, fopts ...Option) *Workflow {
8384
}
8485
}
8586

87+
// TODO: @joshvanl
88+
//if i > 0 {
89+
// dopts = append(dopts, daprd.WithAppID(daprds[0].AppID()))
90+
//}
91+
8692
daprds[i] = daprd.New(t, dopts...)
8793
}
8894

@@ -165,6 +171,17 @@ func (w *Workflow) BackendClient(t *testing.T, ctx context.Context) *client.Task
165171
return w.BackendClientN(t, ctx, 0)
166172
}
167173

174+
func (w *Workflow) WorkflowClient(t *testing.T, ctx context.Context) *workflow.Client {
175+
t.Helper()
176+
return workflow.NewClient(w.Dapr().GRPCConn(t, ctx))
177+
}
178+
179+
func (w *Workflow) WorkflowClientN(t *testing.T, ctx context.Context, index int) *workflow.Client {
180+
t.Helper()
181+
require.Less(t, index, len(w.daprds), "index out of range")
182+
return workflow.NewClient(w.DaprN(index).GRPCConn(t, ctx))
183+
}
184+
168185
// BackendClient returns a backend client for the specified index
169186
func (w *Workflow) BackendClientN(t *testing.T, ctx context.Context, index int) *client.TaskHubGrpcClient {
170187
t.Helper()

tests/integration/suite/daprd/hotreload/operator/actorstate.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,7 @@ func (a *actorstate) Run(t *testing.T, ctx context.Context) {
176176
require.ElementsMatch(t, []*rtv1.RegisteredComponents{
177177
{
178178
Name: "mystore", Type: "state.in-memory", Version: "v1",
179-
Capabilities: []string{"ETAG", "TRANSACTIONAL", "TTL", "DELETE_WITH_PREFIX", "ACTOR"},
179+
Capabilities: []string{"ETAG", "TRANSACTIONAL", "TTL", "DELETE_WITH_PREFIX", "KEYS_LIKE", "ACTOR"},
180180
},
181181
}, comps)
182182
inmemStore.Spec.Metadata = []common.NameValuePair{}
@@ -188,7 +188,7 @@ func (a *actorstate) Run(t *testing.T, ctx context.Context) {
188188
require.ElementsMatch(t, []*rtv1.RegisteredComponents{
189189
{
190190
Name: "mystore", Type: "state.in-memory", Version: "v1",
191-
Capabilities: []string{"ETAG", "TRANSACTIONAL", "TTL", "DELETE_WITH_PREFIX", "ACTOR"},
191+
Capabilities: []string{"ETAG", "TRANSACTIONAL", "TTL", "DELETE_WITH_PREFIX", "KEYS_LIKE", "ACTOR"},
192192
},
193193
}, comps)
194194
a.operatorDelete.SetComponents()

tests/integration/suite/daprd/hotreload/operator/crypto.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -217,7 +217,7 @@ func (c *crypto) Run(t *testing.T, ctx context.Context) {
217217
{Name: "crypto3", Type: "crypto.dapr.localstorage", Version: "v1"},
218218
{
219219
Name: "crypto2", Type: "state.in-memory", Version: "v1",
220-
Capabilities: []string{"ETAG", "TRANSACTIONAL", "TTL", "DELETE_WITH_PREFIX", "ACTOR"},
220+
Capabilities: []string{"ETAG", "TRANSACTIONAL", "TTL", "DELETE_WITH_PREFIX", "KEYS_LIKE", "ACTOR"},
221221
},
222222
}, resp.GetRegisteredComponents())
223223
}, time.Second*10, time.Millisecond*10)
@@ -255,7 +255,7 @@ func (c *crypto) Run(t *testing.T, ctx context.Context) {
255255
assert.ElementsMatch(c, []*rtv1.RegisteredComponents{
256256
{
257257
Name: "crypto3", Type: "state.in-memory", Version: "v1",
258-
Capabilities: []string{"ETAG", "TRANSACTIONAL", "TTL", "DELETE_WITH_PREFIX", "ACTOR"},
258+
Capabilities: []string{"ETAG", "TRANSACTIONAL", "TTL", "DELETE_WITH_PREFIX", "KEYS_LIKE", "ACTOR"},
259259
},
260260
}, resp.GetRegisteredComponents())
261261
}, time.Second*10, time.Millisecond*10)

tests/integration/suite/daprd/hotreload/operator/informer/components.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,7 @@ func (c *components) Run(t *testing.T, ctx context.Context) {
141141
exp := []*rtv1.RegisteredComponents{
142142
{
143143
Name: "123", Type: "state.in-memory", Version: "v1",
144-
Capabilities: []string{"ETAG", "TRANSACTIONAL", "TTL", "DELETE_WITH_PREFIX", "ACTOR"},
144+
Capabilities: []string{"ETAG", "TRANSACTIONAL", "TTL", "DELETE_WITH_PREFIX", "KEYS_LIKE", "ACTOR"},
145145
},
146146
}
147147
require.EventuallyWithT(t, func(ct *assert.CollectT) {
@@ -179,7 +179,7 @@ func (c *components) Run(t *testing.T, ctx context.Context) {
179179
exp := []*rtv1.RegisteredComponents{
180180
{
181181
Name: "123", Type: "state.sqlite", Version: "v1",
182-
Capabilities: []string{"ETAG", "TRANSACTIONAL", "TTL", "ACTOR"},
182+
Capabilities: []string{"ETAG", "TRANSACTIONAL", "TTL", "KEYS_LIKE", "ACTOR"},
183183
},
184184
}
185185
assert.ElementsMatch(ct, exp, c.daprd1.GetMetaRegisteredComponents(ct, ctx))

0 commit comments

Comments
 (0)