@@ -2,6 +2,7 @@ package graph
2
2
3
3
import (
4
4
"context"
5
+ "strconv"
5
6
"sync"
6
7
7
8
"github.com/authzed/spicedb/internal/dispatch"
@@ -15,10 +16,10 @@ import (
15
16
"github.com/authzed/spicedb/pkg/typesystem"
16
17
)
17
18
18
- // runDispatchAndChecker runs the dispatch and checker for a lookup resources call, and publishes
19
- // the results to the parent stream. This function is responsible for handling the dispatching
20
- // of the lookup resources call , and then checking the results to filter them .
21
- func runDispatchAndChecker (
19
+ // runCheckerAndDispatch runs the dispatch and checker for a lookup resources call, and publishes
20
+ // the results to the parent stream. This function is responsible for handling checking the
21
+ // results to filter them , and then dispatching those found .
22
+ func runCheckerAndDispatch (
22
23
ctx context.Context ,
23
24
parentReq ValidatedLookupResources2Request ,
24
25
foundResources dispatchableResourcesSubjectMap2 ,
@@ -35,13 +36,19 @@ func runDispatchAndChecker(
35
36
// Only allow max one dispatcher and one checker to run concurrently.
36
37
concurrencyLimit = min (concurrencyLimit , 2 )
37
38
38
- rdc := & dispatchAndCheckRunner {
39
+ currentCheckIndex , err := ci .integerSectionValue ()
40
+ if err != nil {
41
+ return err
42
+ }
43
+
44
+ rdc := & checkAndDispatchRunner {
39
45
parentRequest : parentReq ,
40
46
foundResources : foundResources ,
41
47
ci : ci ,
42
48
parentStream : parentStream ,
43
49
newSubjectType : newSubjectType ,
44
50
filteredSubjectIDs : filteredSubjectIDs ,
51
+ currentCheckIndex : currentCheckIndex ,
45
52
entrypoint : entrypoint ,
46
53
lrDispatcher : lrDispatcher ,
47
54
checkDispatcher : checkDispatcher ,
@@ -53,87 +60,53 @@ func runDispatchAndChecker(
53
60
return rdc .runAndWait ()
54
61
}
55
62
56
- type dispatchAndCheckRunner struct {
57
- parentRequest ValidatedLookupResources2Request
58
- foundResources dispatchableResourcesSubjectMap2
59
- ci cursorInformation
60
- parentStream dispatch.LookupResources2Stream
61
- newSubjectType * core.RelationReference
63
+ type checkAndDispatchRunner struct {
64
+ parentRequest ValidatedLookupResources2Request
65
+ foundResources dispatchableResourcesSubjectMap2
66
+ ci cursorInformation
67
+ parentStream dispatch.LookupResources2Stream
68
+ newSubjectType * core.RelationReference
69
+ entrypoint typesystem.ReachabilityEntrypoint
70
+ lrDispatcher dispatch.LookupResources2
71
+ checkDispatcher dispatch.Check
72
+ dispatchChunkSize uint16
73
+
62
74
filteredSubjectIDs []string
63
- entrypoint typesystem.ReachabilityEntrypoint
64
- lrDispatcher dispatch.LookupResources2
65
- checkDispatcher dispatch.Check
66
- dispatchChunkSize uint16
75
+ currentCheckIndex int
67
76
68
77
taskrunner * taskrunner.TaskRunner
69
78
70
79
lock * sync.Mutex
71
80
}
72
81
73
- func (rdc * dispatchAndCheckRunner ) dispatchAndCollect (ctx context.Context , cursor * v1.Cursor ) ([]* v1.DispatchLookupResources2Response , error ) {
74
- collectingStream := dispatch.NewCollectingDispatchStream [* v1.DispatchLookupResources2Response ](ctx )
75
- err := rdc .lrDispatcher .DispatchLookupResources2 (& v1.DispatchLookupResources2Request {
76
- ResourceRelation : rdc .parentRequest .ResourceRelation ,
77
- SubjectRelation : rdc .newSubjectType ,
78
- SubjectIds : rdc .filteredSubjectIDs ,
79
- TerminalSubject : rdc .parentRequest .TerminalSubject ,
80
- Metadata : & v1.ResolverMeta {
81
- AtRevision : rdc .parentRequest .Revision .String (),
82
- DepthRemaining : rdc .parentRequest .Metadata .DepthRemaining - 1 ,
83
- },
84
- OptionalCursor : cursor ,
85
- OptionalLimit : uint32 (rdc .dispatchChunkSize ),
86
- }, collectingStream )
87
- return collectingStream .Results (), err
88
- }
89
-
90
- func (rdc * dispatchAndCheckRunner ) runDispatch (ctx context.Context , cursor * v1.Cursor ) error {
91
- rdc .lock .Lock ()
92
- if rdc .ci .limits .hasExhaustedLimit () {
93
- rdc .lock .Unlock ()
94
- return nil
95
- }
96
- rdc .lock .Unlock ()
97
-
98
- collected , err := rdc .dispatchAndCollect (ctx , cursor )
99
- if err != nil {
100
- return err
101
- }
102
-
103
- if len (collected ) == 0 {
104
- return nil
105
- }
106
-
107
- // Kick off a worker to filter the results via a check and then publish what was found.
82
+ func (rdc * checkAndDispatchRunner ) runAndWait () error {
83
+ // Kick off a check at the current cursor, to filter a portion of the initial results set.
108
84
rdc .taskrunner .Schedule (func (ctx context.Context ) error {
109
- return rdc .runChecker (ctx , collected )
85
+ return rdc .runChecker (ctx , rdc . currentCheckIndex )
110
86
})
111
87
112
- // Start another dispatch at the cursor of the last response, to run in the background
113
- // and collect more results for filtering while the checker is running.
114
- rdc .taskrunner .Schedule (func (ctx context.Context ) error {
115
- return rdc .runDispatch (ctx , collected [len (collected )- 1 ].AfterResponseCursor )
116
- })
117
-
118
- return nil
88
+ return rdc .taskrunner .Wait ()
119
89
}
120
90
121
- func (rdc * dispatchAndCheckRunner ) runChecker (ctx context.Context , collected [] * v1. DispatchLookupResources2Response ) error {
91
+ func (rdc * checkAndDispatchRunner ) runChecker (ctx context.Context , startingIndex int ) error {
122
92
rdc .lock .Lock ()
123
93
if rdc .ci .limits .hasExhaustedLimit () {
124
94
rdc .lock .Unlock ()
125
95
return nil
126
96
}
127
97
rdc .lock .Unlock ()
128
98
129
- checkHints := make ([]* v1.CheckHint , 0 , len (collected ))
130
- resourceIDsToCheck := make ([]string , 0 , len (collected ))
131
- for _ , resource := range collected {
132
- resourceIDsToCheck = append (resourceIDsToCheck , resource .Resource .ResourceId )
99
+ endingIndex := min (startingIndex + int (rdc .dispatchChunkSize ), len (rdc .filteredSubjectIDs ))
100
+ resourceIDsToCheck := rdc .filteredSubjectIDs [startingIndex :endingIndex ]
101
+ if len (resourceIDsToCheck ) == 0 {
102
+ return nil
103
+ }
133
104
105
+ checkHints := make ([]* v1.CheckHint , 0 , len (resourceIDsToCheck ))
106
+ for _ , resourceID := range resourceIDsToCheck {
134
107
checkHint , err := hints .HintForEntrypoint (
135
108
rdc .entrypoint ,
136
- resource . Resource . ResourceId ,
109
+ resourceID ,
137
110
rdc .parentRequest .TerminalSubject ,
138
111
& v1.ResourceCheckResult {
139
112
Membership : v1 .ResourceCheckResult_MEMBER ,
@@ -144,9 +117,10 @@ func (rdc *dispatchAndCheckRunner) runChecker(ctx context.Context, collected []*
144
117
checkHints = append (checkHints , checkHint )
145
118
}
146
119
147
- // Batch check the results to filter to those visible and then publish just the visible resources.
120
+ // NOTE: we are checking the containing permission here, *not* the target relation, as
121
+ // the goal is to shear for the containing permission.
148
122
resultsByResourceID , checkMetadata , err := computed .ComputeBulkCheck (ctx , rdc .checkDispatcher , computed.CheckParameters {
149
- ResourceType : rdc .parentRequest . ResourceRelation ,
123
+ ResourceType : rdc .newSubjectType ,
150
124
Subject : rdc .parentRequest .TerminalSubject ,
151
125
CaveatContext : rdc .parentRequest .Context .AsMap (),
152
126
AtRevision : rdc .parentRequest .Revision ,
@@ -158,10 +132,12 @@ func (rdc *dispatchAndCheckRunner) runChecker(ctx context.Context, collected []*
158
132
return err
159
133
}
160
134
161
- // Publish any resources that are visible.
162
- isFirstPublishCall := true
163
- for _ , resource := range collected {
164
- result , ok := resultsByResourceID [resource .Resource .ResourceId ]
135
+ adjustedResources := rdc .foundResources .cloneAsMutable ()
136
+
137
+ // Dispatch any resources that are visible.
138
+ resourceIDToDispatch := make ([]string , 0 , len (resourceIDsToCheck ))
139
+ for _ , resourceID := range resourceIDsToCheck {
140
+ result , ok := resultsByResourceID [resourceID ]
165
141
if ! ok {
166
142
continue
167
143
}
@@ -171,19 +147,9 @@ func (rdc *dispatchAndCheckRunner) runChecker(ctx context.Context, collected []*
171
147
fallthrough
172
148
173
149
case v1 .ResourceCheckResult_CAVEATED_MEMBER :
174
- rdc .lock .Lock ()
175
- if err := publishResultToParentStream (resource , rdc .ci , rdc .foundResources , result .MissingExprFields , isFirstPublishCall , checkMetadata , rdc .parentStream ); err != nil {
176
- rdc .lock .Unlock ()
177
- return err
178
- }
179
-
180
- isFirstPublishCall = false
181
-
182
- if rdc .ci .limits .hasExhaustedLimit () {
183
- rdc .lock .Unlock ()
184
- return nil
185
- }
186
- rdc .lock .Unlock ()
150
+ // Record any additional caveats missing from the check.
151
+ adjustedResources .withAdditionalMissingContextForDispatchedResourceID (resourceID , result .MissingExprFields )
152
+ resourceIDToDispatch = append (resourceIDToDispatch , resourceID )
187
153
188
154
case v1 .ResourceCheckResult_NOT_MEMBER :
189
155
// Skip.
@@ -194,18 +160,74 @@ func (rdc *dispatchAndCheckRunner) runChecker(ctx context.Context, collected []*
194
160
}
195
161
}
196
162
163
+ if len (resourceIDToDispatch ) > 0 {
164
+ // Schedule a dispatch of those resources.
165
+ rdc .taskrunner .Schedule (func (ctx context.Context ) error {
166
+ return rdc .runDispatch (ctx , resourceIDToDispatch , adjustedResources .asReadOnly (), checkMetadata , startingIndex )
167
+ })
168
+ }
169
+
170
+ // Start the next check chunk (if applicable).
171
+ nextIndex := startingIndex + len (resourceIDsToCheck )
172
+ if nextIndex < len (rdc .filteredSubjectIDs ) {
173
+ rdc .taskrunner .Schedule (func (ctx context.Context ) error {
174
+ return rdc .runChecker (ctx , nextIndex )
175
+ })
176
+ }
177
+
197
178
return nil
198
179
}
199
180
200
- func (rdc * dispatchAndCheckRunner ) runAndWait () error {
201
- currentCursor := rdc .ci .currentCursor
181
+ func (rdc * checkAndDispatchRunner ) runDispatch (
182
+ ctx context.Context ,
183
+ resourceIDsToDispatch []string ,
184
+ adjustedResources dispatchableResourcesSubjectMap2 ,
185
+ checkMetadata * v1.ResponseMeta ,
186
+ startingIndex int ,
187
+ ) error {
188
+ rdc .lock .Lock ()
189
+ if rdc .ci .limits .hasExhaustedLimit () {
190
+ rdc .lock .Unlock ()
191
+ return nil
192
+ }
193
+ rdc .lock .Unlock ()
202
194
203
- // Kick off a dispatch at the current cursor.
204
- rdc .taskrunner .Schedule (func (ctx context.Context ) error {
205
- return rdc .runDispatch (ctx , currentCursor )
195
+ // NOTE: Since we extracted a custom section from the cursor at the beginning of this run, we have to add
196
+ // the starting index to the cursor to ensure that the next run starts from the correct place, and we have
197
+ // to use the *updated* cursor below on the dispatch.
198
+ updatedCi , err := rdc .ci .withOutgoingSection (strconv .Itoa (startingIndex ))
199
+ if err != nil {
200
+ return err
201
+ }
202
+ responsePartialCursor := updatedCi .responsePartialCursor ()
203
+
204
+ // Dispatch to the parent resource type and publish any results found.
205
+ isFirstPublishCall := true
206
+
207
+ wrappedStream := dispatch .NewHandlingDispatchStream (ctx , func (result * v1.DispatchLookupResources2Response ) error {
208
+ if err := ctx .Err (); err != nil {
209
+ return err
210
+ }
211
+
212
+ if err := publishResultToParentStream (result , rdc .ci , responsePartialCursor , adjustedResources , nil , isFirstPublishCall , checkMetadata , rdc .parentStream ); err != nil {
213
+ return err
214
+ }
215
+ isFirstPublishCall = false
216
+ return nil
206
217
})
207
218
208
- return rdc .taskrunner .Wait ()
219
+ return rdc .lrDispatcher .DispatchLookupResources2 (& v1.DispatchLookupResources2Request {
220
+ ResourceRelation : rdc .parentRequest .ResourceRelation ,
221
+ SubjectRelation : rdc .newSubjectType ,
222
+ SubjectIds : resourceIDsToDispatch ,
223
+ TerminalSubject : rdc .parentRequest .TerminalSubject ,
224
+ Metadata : & v1.ResolverMeta {
225
+ AtRevision : rdc .parentRequest .Revision .String (),
226
+ DepthRemaining : rdc .parentRequest .Metadata .DepthRemaining - 1 ,
227
+ },
228
+ OptionalCursor : updatedCi .currentCursor ,
229
+ OptionalLimit : rdc .ci .limits .currentLimit ,
230
+ }, wrappedStream )
209
231
}
210
232
211
233
// unfilteredLookupResourcesDispatchStreamForEntrypoint creates a new dispatch stream that wraps
@@ -227,7 +249,7 @@ func unfilteredLookupResourcesDispatchStreamForEntrypoint(
227
249
default :
228
250
}
229
251
230
- if err := publishResultToParentStream (result , ci , foundResources , nil , isFirstPublishCall , emptyMetadata , parentStream ); err != nil {
252
+ if err := publishResultToParentStream (result , ci , ci . responsePartialCursor (), foundResources , nil , isFirstPublishCall , emptyMetadata , parentStream ); err != nil {
231
253
return err
232
254
}
233
255
isFirstPublishCall = false
@@ -242,6 +264,7 @@ func unfilteredLookupResourcesDispatchStreamForEntrypoint(
242
264
func publishResultToParentStream (
243
265
result * v1.DispatchLookupResources2Response ,
244
266
ci cursorInformation ,
267
+ responseCursor * v1.Cursor ,
245
268
foundResources dispatchableResourcesSubjectMap2 ,
246
269
additionalMissingContext []string ,
247
270
isFirstPublishCall bool ,
@@ -261,7 +284,7 @@ func publishResultToParentStream(
261
284
262
285
// The cursor for the response is that of the parent response + the cursor from the result itself.
263
286
afterResponseCursor , err := combineCursors (
264
- ci . responsePartialCursor () ,
287
+ responseCursor ,
265
288
result .AfterResponseCursor ,
266
289
)
267
290
if err != nil {
0 commit comments