Skip to content

Commit 3fe52dd

Browse files
committed
Implements durabletask ListInstanceIDs & GetInstanceHistory
Today, there is no ways of discovering the list of workflow instances that are currently running or have completed in the past without using external storage queries. The Dapr CLI [introduced list and workflow history commands](dapr/cli#1560) to get information about running and completed workflows, however these commands rely on direct queries to the underlying storage provider. By introducing this functionality into the durabletask framework itself, these commands need only talk to Daprd, removing the requirement for direct access to the storage provider as well as authentication. Daprd can make these queries itself, and use the Actor State Store component to access the underlying storage. Implements the new durabletask APIs according to dapr/proposals#93 Update components-contrib & durabletask-go to origin Use shared const for ActorTypePrefix Signed-off-by: joshvanl <[email protected]>
1 parent 8e1a73b commit 3fe52dd

File tree

32 files changed

+1073
-93
lines changed

32 files changed

+1073
-93
lines changed

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ require (
1212
github.com/cenkalti/backoff/v4 v4.3.0
1313
github.com/cloudevents/sdk-go/v2 v2.15.2
1414
github.com/coreos/go-oidc/v3 v3.14.1
15-
github.com/dapr/components-contrib v1.16.1
15+
github.com/dapr/components-contrib v1.16.2-0.20251113171451-b78f056c8491
1616
github.com/dapr/durabletask-go v0.10.2-0.20251113171253-87ecdf8f0547
1717
github.com/dapr/kit v0.16.2-0.20251117143824-2fd5d0c93524
1818
github.com/diagridio/go-etcd-cron v0.10.0

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -515,8 +515,8 @@ github.com/dancannon/gorethink v4.0.0+incompatible h1:KFV7Gha3AuqT+gr0B/eKvGhbjm
515515
github.com/dancannon/gorethink v4.0.0+incompatible/go.mod h1:BLvkat9KmZc1efyYwhz3WnybhRZtgF1K929FD8z1avU=
516516
github.com/danieljoos/wincred v1.1.2 h1:QLdCxFs1/Yl4zduvBdcHB8goaYk9RARS2SgLLRuAyr0=
517517
github.com/danieljoos/wincred v1.1.2/go.mod h1:GijpziifJoIBfYh+S7BbkdUTU4LfM+QnGqR5Vl2tAx0=
518-
github.com/dapr/components-contrib v1.16.1 h1:EpntHk5qUXTbB55kec97cMQbC2QXBwbBU6QMI3ljV8g=
519-
github.com/dapr/components-contrib v1.16.1/go.mod h1:1AufCWqZwBj//UkyS7FesOEmp5/E6Xgy1tyCn8peiR4=
518+
github.com/dapr/components-contrib v1.16.2-0.20251113171451-b78f056c8491 h1:ms0IhiGK6Mow+e1DZgIxD3qo7xqOPjpxeFkhahBo+Tg=
519+
github.com/dapr/components-contrib v1.16.2-0.20251113171451-b78f056c8491/go.mod h1:1AufCWqZwBj//UkyS7FesOEmp5/E6Xgy1tyCn8peiR4=
520520
github.com/dapr/durabletask-go v0.10.2-0.20251113171253-87ecdf8f0547 h1:bD4JBlXDHURsgvhIB1HQ1q0k8kYfVo/iNSBi0guSoe0=
521521
github.com/dapr/durabletask-go v0.10.2-0.20251113171253-87ecdf8f0547/go.mod h1:0Ts4rXp74JyG19gDWPcwNo5V6NBZzhARzHF5XynmA7Q=
522522
github.com/dapr/kit v0.16.2-0.20251117143824-2fd5d0c93524 h1:SQ7VeWGnypENpGjsL94wN2IgH+oYHx9ULpYrpFoRExQ=

pkg/runtime/runtime.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -307,6 +307,7 @@ func newDaprRuntime(ctx context.Context,
307307
Resiliency: resiliencyProvider,
308308
EventSink: runtimeConfig.workflowEventSink,
309309
EnableClusteredDeployment: globalConfig.IsFeatureEnabled(config.WorkflowsClusteredDeployment),
310+
ComponentStore: compStore,
310311
})
311312

312313
jobsManager, err := scheduler.New(scheduler.Options{

pkg/runtime/wfengine/backends/actors/actors.go

Lines changed: 50 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,9 @@ import (
4343
invokev1 "github.com/dapr/dapr/pkg/messaging/v1"
4444
internalsv1pb "github.com/dapr/dapr/pkg/proto/internals/v1"
4545
"github.com/dapr/dapr/pkg/resiliency"
46+
"github.com/dapr/dapr/pkg/runtime/compstore"
4647
"github.com/dapr/dapr/pkg/runtime/wfengine/state"
48+
"github.com/dapr/dapr/pkg/runtime/wfengine/state/list"
4749
"github.com/dapr/dapr/pkg/runtime/wfengine/todo"
4850
"github.com/dapr/dapr/utils"
4951
"github.com/dapr/durabletask-go/api"
@@ -62,15 +64,15 @@ const (
6264
WorkflowNameLabelKey = "workflow"
6365
ActivityNameLabelKey = "activity"
6466
ExecutorNameLabelKey = "executor"
65-
ActorTypePrefix = "dapr.internal."
6667
)
6768

6869
type Options struct {
69-
AppID string
70-
Namespace string
71-
Actors actors.Interface
72-
Resiliency resiliency.Provider
73-
EventSink orchestrator.EventSink
70+
AppID string
71+
Namespace string
72+
Actors actors.Interface
73+
Resiliency resiliency.Provider
74+
EventSink orchestrator.EventSink
75+
ComponentStore *compstore.ComponentStore
7476
// experimental feature
7577
// enabling this will use the cluster tasks backend for pending tasks, instead of the default local implementation
7678
// the cluster tasks backend uses actors to share the state of pending tasks
@@ -90,6 +92,7 @@ type Actors struct {
9092
resiliency resiliency.Provider
9193
actors actors.Interface
9294
eventSink orchestrator.EventSink
95+
compStore *compstore.ComponentStore
9396

9497
orchestrationWorkItemChan chan *backend.OrchestrationWorkItem
9598
activityWorkItemChan chan *backend.ActivityWorkItem
@@ -102,19 +105,20 @@ func New(opts Options) *Actors {
102105
if opts.EnableClusteredDeployment {
103106
pendingTasksBackend = NewClusterTasksBackend(ClusterTasksBackendOptions{
104107
Actors: opts.Actors,
105-
ExecutorActorType: ActorTypePrefix + opts.Namespace + utils.DotDelimiter + opts.AppID + utils.DotDelimiter + ExecutorNameLabelKey,
108+
ExecutorActorType: todo.ActorTypePrefix + opts.Namespace + utils.DotDelimiter + opts.AppID + utils.DotDelimiter + ExecutorNameLabelKey,
106109
})
107110
}
108111
return &Actors{
109112
appID: opts.AppID,
110113
namespace: opts.Namespace,
111-
workflowActorType: ActorTypePrefix + opts.Namespace + utils.DotDelimiter + opts.AppID + utils.DotDelimiter + WorkflowNameLabelKey,
112-
activityActorType: ActorTypePrefix + opts.Namespace + utils.DotDelimiter + opts.AppID + utils.DotDelimiter + ActivityNameLabelKey,
113-
executorActorType: ActorTypePrefix + opts.Namespace + utils.DotDelimiter + opts.AppID + utils.DotDelimiter + ExecutorNameLabelKey,
114+
workflowActorType: todo.ActorTypePrefix + opts.Namespace + utils.DotDelimiter + opts.AppID + utils.DotDelimiter + WorkflowNameLabelKey,
115+
activityActorType: todo.ActorTypePrefix + opts.Namespace + utils.DotDelimiter + opts.AppID + utils.DotDelimiter + ActivityNameLabelKey,
116+
executorActorType: todo.ActorTypePrefix + opts.Namespace + utils.DotDelimiter + opts.AppID + utils.DotDelimiter + ExecutorNameLabelKey,
114117
actors: opts.Actors,
115118
resiliency: opts.Resiliency,
116119
pendingTasksBackend: pendingTasksBackend,
117120
enableClusteredDeployment: opts.EnableClusteredDeployment,
121+
compStore: opts.ComponentStore,
118122
orchestrationWorkItemChan: make(chan *backend.OrchestrationWorkItem, 1),
119123
activityWorkItemChan: make(chan *backend.ActivityWorkItem, 1),
120124
eventSink: opts.EventSink,
@@ -630,6 +634,42 @@ func (abe *Actors) WaitForOrchestratorCompletion(ctx context.Context, request *p
630634
return abe.pendingTasksBackend.WaitForOrchestratorCompletion(ctx, request)
631635
}
632636

637+
func (abe *Actors) ListInstanceIDs(ctx context.Context, req *protos.ListInstanceIDsRequest) (*protos.ListInstanceIDsResponse, error) {
638+
resp, err := list.ListInstanceIDs(ctx, list.ListOptions{
639+
ComponentStore: abe.compStore,
640+
Namespace: abe.namespace,
641+
AppID: abe.appID,
642+
PageSize: req.PageSize, //nolint:protogetter
643+
ContinuationToken: req.ContinuationToken, //nolint:protogetter
644+
})
645+
if err != nil {
646+
return nil, err
647+
}
648+
649+
return &protos.ListInstanceIDsResponse{
650+
InstanceIds: resp.Keys,
651+
ContinuationToken: resp.ContinuationToken,
652+
}, nil
653+
}
654+
655+
func (abe *Actors) GetInstanceHistory(ctx context.Context, req *protos.GetInstanceHistoryRequest) (*protos.GetInstanceHistoryResponse, error) {
656+
ss, err := abe.actors.State(ctx)
657+
if err != nil {
658+
return nil, err
659+
}
660+
661+
resp, err := state.LoadWorkflowState(ctx, ss, req.GetInstanceId(), state.Options{
662+
AppID: abe.appID,
663+
WorkflowActorType: abe.workflowActorType,
664+
ActivityActorType: abe.activityActorType,
665+
})
666+
if err != nil {
667+
return nil, err
668+
}
669+
670+
return &protos.GetInstanceHistoryResponse{Events: resp.History}, nil
671+
}
672+
633673
func (abe *Actors) purgeWorkflow(ctx context.Context, id api.InstanceID) error {
634674
req := internalsv1pb.
635675
NewInternalInvokeRequest(todo.PurgeWorkflowStateMethod).
@@ -702,11 +742,3 @@ func (abe *Actors) purgeWorkflowForce(ctx context.Context, id api.InstanceID) er
702742
},
703743
)
704744
}
705-
706-
func (abe *Actors) GetInstanceHistory(ctx context.Context, req *protos.GetInstanceHistoryRequest) (*protos.GetInstanceHistoryResponse, error) {
707-
return nil, status.Error(codes.Unimplemented, "GetInstanceHistory is not implemented in the Actors backend")
708-
}
709-
710-
func (abe *Actors) ListInstanceIDs(ctx context.Context, req *protos.ListInstanceIDsRequest) (*protos.ListInstanceIDsResponse, error) {
711-
return nil, status.Error(codes.Unimplemented, "ListInstanceIDs is not implemented in the Actors backend")
712-
}
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
/*
2+
Copyright 2025 The Dapr Authors
3+
Licensed under the Apache License, Version 2.0 (the "License");
4+
you may not use this file except in compliance with the License.
5+
You may obtain a copy of the License at
6+
http://www.apache.org/licenses/LICENSE-2.0
7+
Unless required by applicable law or agreed to in writing, software
8+
distributed under the License is distributed on an "AS IS" BASIS,
9+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
See the License for the specific language governing permissions and
11+
limitations under the License.
12+
*/
13+
14+
package list
15+
16+
import (
17+
"context"
18+
"errors"
19+
"fmt"
20+
"strings"
21+
22+
"github.com/dapr/components-contrib/state"
23+
"github.com/dapr/dapr/pkg/runtime/compstore"
24+
"github.com/dapr/dapr/pkg/runtime/wfengine/todo"
25+
)
26+
27+
type ListOptions struct {
28+
ComponentStore *compstore.ComponentStore
29+
Namespace string
30+
AppID string
31+
32+
PageSize *uint32
33+
ContinuationToken *string
34+
}
35+
36+
type ListInstanceIDsResult struct {
37+
Keys []string
38+
ContinuationToken *string
39+
}
40+
41+
func ListInstanceIDs(ctx context.Context, opts ListOptions) (*ListInstanceIDsResult, error) {
42+
store, _, ok := opts.ComponentStore.GetStateStoreActor()
43+
if !ok {
44+
return nil, errors.New("no state store with actor support found")
45+
}
46+
47+
ks, ok := store.(state.KeysLiker)
48+
if !ok {
49+
return nil, fmt.Errorf("state store %T does not support listing keys", store)
50+
}
51+
52+
like := opts.AppID + "||" + todo.ActorTypePrefix + opts.Namespace + "." + opts.AppID + ".workflow||%||metadata"
53+
54+
resp, err := ks.KeysLike(ctx, &state.KeysLikeRequest{
55+
Pattern: like,
56+
ContinuationToken: opts.ContinuationToken,
57+
PageSize: opts.PageSize,
58+
})
59+
if err != nil {
60+
return nil, err
61+
}
62+
63+
keys := make([]string, 0, len(resp.Keys))
64+
for _, key := range resp.Keys {
65+
split := strings.Split(key, "||")
66+
if len(split) != 4 {
67+
return nil, fmt.Errorf("invalid key format: %s", key)
68+
}
69+
70+
keys = append(keys, split[2])
71+
}
72+
73+
return &ListInstanceIDsResult{
74+
Keys: keys,
75+
ContinuationToken: resp.ContinuationToken,
76+
}, nil
77+
}

pkg/runtime/wfengine/todo/todo.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@ const (
3232
RerunWorkflowInstance = "RerunWorkflowInstance"
3333

3434
MetadataActivityReminderDueTime = "dueTime"
35+
36+
ActorTypePrefix = "dapr.internal."
3537
)
3638

3739
var (

pkg/runtime/wfengine/wfengine.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import (
3030
"github.com/dapr/dapr/pkg/config"
3131
runtimev1pb "github.com/dapr/dapr/pkg/proto/runtime/v1"
3232
"github.com/dapr/dapr/pkg/resiliency"
33+
"github.com/dapr/dapr/pkg/runtime/compstore"
3334
"github.com/dapr/dapr/pkg/runtime/processor"
3435
backendactors "github.com/dapr/dapr/pkg/runtime/wfengine/backends/actors"
3536
"github.com/dapr/durabletask-go/backend"
@@ -59,6 +60,7 @@ type Options struct {
5960
Resiliency resiliency.Provider
6061
EventSink orchestrator.EventSink
6162
EnableClusteredDeployment bool
63+
ComponentStore *compstore.ComponentStore
6264
}
6365

6466
type engine struct {
@@ -83,6 +85,7 @@ func New(opts Options) Interface {
8385
Resiliency: opts.Resiliency,
8486
EventSink: opts.EventSink,
8587
EnableClusteredDeployment: opts.EnableClusteredDeployment,
88+
ComponentStore: opts.ComponentStore,
8689
})
8790

8891
var getWorkItemsCount atomic.Int32

tests/integration/suite/daprd/hotreload/operator/actorstate.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,7 @@ func (a *actorstate) Run(t *testing.T, ctx context.Context) {
176176
require.ElementsMatch(t, []*rtv1.RegisteredComponents{
177177
{
178178
Name: "mystore", Type: "state.in-memory", Version: "v1",
179-
Capabilities: []string{"ETAG", "TRANSACTIONAL", "TTL", "DELETE_WITH_PREFIX", "ACTOR"},
179+
Capabilities: []string{"ETAG", "TRANSACTIONAL", "TTL", "DELETE_WITH_PREFIX", "KEYS_LIKE", "ACTOR"},
180180
},
181181
}, comps)
182182
inmemStore.Spec.Metadata = []common.NameValuePair{}
@@ -188,7 +188,7 @@ func (a *actorstate) Run(t *testing.T, ctx context.Context) {
188188
require.ElementsMatch(t, []*rtv1.RegisteredComponents{
189189
{
190190
Name: "mystore", Type: "state.in-memory", Version: "v1",
191-
Capabilities: []string{"ETAG", "TRANSACTIONAL", "TTL", "DELETE_WITH_PREFIX", "ACTOR"},
191+
Capabilities: []string{"ETAG", "TRANSACTIONAL", "TTL", "DELETE_WITH_PREFIX", "KEYS_LIKE", "ACTOR"},
192192
},
193193
}, comps)
194194
a.operatorDelete.SetComponents()

tests/integration/suite/daprd/hotreload/operator/crypto.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -217,7 +217,7 @@ func (c *crypto) Run(t *testing.T, ctx context.Context) {
217217
{Name: "crypto3", Type: "crypto.dapr.localstorage", Version: "v1"},
218218
{
219219
Name: "crypto2", Type: "state.in-memory", Version: "v1",
220-
Capabilities: []string{"ETAG", "TRANSACTIONAL", "TTL", "DELETE_WITH_PREFIX", "ACTOR"},
220+
Capabilities: []string{"ETAG", "TRANSACTIONAL", "TTL", "DELETE_WITH_PREFIX", "KEYS_LIKE", "ACTOR"},
221221
},
222222
}, resp.GetRegisteredComponents())
223223
}, time.Second*10, time.Millisecond*10)
@@ -255,7 +255,7 @@ func (c *crypto) Run(t *testing.T, ctx context.Context) {
255255
assert.ElementsMatch(c, []*rtv1.RegisteredComponents{
256256
{
257257
Name: "crypto3", Type: "state.in-memory", Version: "v1",
258-
Capabilities: []string{"ETAG", "TRANSACTIONAL", "TTL", "DELETE_WITH_PREFIX", "ACTOR"},
258+
Capabilities: []string{"ETAG", "TRANSACTIONAL", "TTL", "DELETE_WITH_PREFIX", "KEYS_LIKE", "ACTOR"},
259259
},
260260
}, resp.GetRegisteredComponents())
261261
}, time.Second*10, time.Millisecond*10)

tests/integration/suite/daprd/hotreload/operator/informer/components.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,7 @@ func (c *components) Run(t *testing.T, ctx context.Context) {
141141
exp := []*rtv1.RegisteredComponents{
142142
{
143143
Name: "123", Type: "state.in-memory", Version: "v1",
144-
Capabilities: []string{"ETAG", "TRANSACTIONAL", "TTL", "DELETE_WITH_PREFIX", "ACTOR"},
144+
Capabilities: []string{"ETAG", "TRANSACTIONAL", "TTL", "DELETE_WITH_PREFIX", "KEYS_LIKE", "ACTOR"},
145145
},
146146
}
147147
require.EventuallyWithT(t, func(ct *assert.CollectT) {
@@ -179,7 +179,7 @@ func (c *components) Run(t *testing.T, ctx context.Context) {
179179
exp := []*rtv1.RegisteredComponents{
180180
{
181181
Name: "123", Type: "state.sqlite", Version: "v1",
182-
Capabilities: []string{"ETAG", "TRANSACTIONAL", "TTL", "ACTOR"},
182+
Capabilities: []string{"ETAG", "TRANSACTIONAL", "TTL", "KEYS_LIKE", "ACTOR"},
183183
},
184184
}
185185
assert.ElementsMatch(ct, exp, c.daprd1.GetMetaRegisteredComponents(ct, ctx))

0 commit comments

Comments
 (0)