Skip to content

Commit 5478cfc

Browse files
committed
Fix experimental LookupResources2 to shear the tree earlier on indirect permissions
1 parent 4429644 commit 5478cfc

9 files changed

+616
-119
lines changed

internal/dispatch/graph/lookupresources2_test.go

+422-6
Large diffs are not rendered by default.

internal/dispatch/graph/lookupresources_test.go

+11-3
Original file line numberDiff line numberDiff line change
@@ -332,8 +332,12 @@ func TestMaxDepthLookup(t *testing.T) {
332332
require.Error(err)
333333
}
334334

335-
func joinTuples(first []*core.RelationTuple, second []*core.RelationTuple) []*core.RelationTuple {
336-
return append(first, second...)
335+
func joinTuples(first []*core.RelationTuple, others ...[]*core.RelationTuple) []*core.RelationTuple {
336+
current := first
337+
for _, second := range others {
338+
current = append(current, second...)
339+
}
340+
return current
337341
}
338342

339343
func genTuplesWithOffset(resourceName string, relation string, subjectName string, subjectID string, offset int, number int) []*core.RelationTuple {
@@ -357,11 +361,15 @@ func genSubjectTuples(resourceName string, relation string, subjectName string,
357361
}
358362

359363
func genTuplesWithCaveat(resourceName string, relation string, subjectName string, subjectID string, caveatName string, context map[string]any, offset int, number int) []*core.RelationTuple {
364+
return genTuplesWithCaveatAndSubjectRelation(resourceName, relation, subjectName, subjectID, "...", caveatName, context, offset, number)
365+
}
366+
367+
func genTuplesWithCaveatAndSubjectRelation(resourceName string, relation string, subjectName string, subjectID string, subjectRelation string, caveatName string, context map[string]any, offset int, number int) []*core.RelationTuple {
360368
tuples := make([]*core.RelationTuple, 0, number)
361369
for i := 0; i < number; i++ {
362370
tpl := &core.RelationTuple{
363371
ResourceAndRelation: ONR(resourceName, fmt.Sprintf("%s-%d", resourceName, i+offset), relation),
364-
Subject: ONR(subjectName, subjectID, "..."),
372+
Subject: ONR(subjectName, subjectID, subjectRelation),
365373
}
366374
if caveatName != "" {
367375
tpl = tuple.MustWithCaveat(tpl, caveatName, context)

internal/graph/lookupresources2.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -615,8 +615,8 @@ func (crr *CursoredLookupResources2) redispatchOrReport(
615615
}, stream)
616616
}
617617

618-
// Otherwise, we need to dispatch and filter results by batch checking along the way.
619-
return runDispatchAndChecker(
618+
// Otherwise, we need to filter results by batch checking along the way before dispatching.
619+
return runCheckerAndDispatch(
620620
ctx,
621621
parentRequest,
622622
foundResources,

internal/graph/lr2streams.go

+114-91
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package graph
22

33
import (
44
"context"
5+
"strconv"
56
"sync"
67

78
"github.com/authzed/spicedb/internal/dispatch"
@@ -15,10 +16,10 @@ import (
1516
"github.com/authzed/spicedb/pkg/typesystem"
1617
)
1718

18-
// runDispatchAndChecker runs the dispatch and checker for a lookup resources call, and publishes
19-
// the results to the parent stream. This function is responsible for handling the dispatching
20-
// of the lookup resources call, and then checking the results to filter them.
21-
func runDispatchAndChecker(
19+
// runCheckerAndDispatch runs the dispatch and checker for a lookup resources call, and publishes
20+
// the results to the parent stream. This function is responsible for handling checking the
21+
// results to filter them, and then dispatching those found.
22+
func runCheckerAndDispatch(
2223
ctx context.Context,
2324
parentReq ValidatedLookupResources2Request,
2425
foundResources dispatchableResourcesSubjectMap2,
@@ -35,13 +36,19 @@ func runDispatchAndChecker(
3536
// Only allow max one dispatcher and one checker to run concurrently.
3637
concurrencyLimit = min(concurrencyLimit, 2)
3738

38-
rdc := &dispatchAndCheckRunner{
39+
currentCheckIndex, err := ci.integerSectionValue()
40+
if err != nil {
41+
return err
42+
}
43+
44+
rdc := &checkAndDispatchRunner{
3945
parentRequest: parentReq,
4046
foundResources: foundResources,
4147
ci: ci,
4248
parentStream: parentStream,
4349
newSubjectType: newSubjectType,
4450
filteredSubjectIDs: filteredSubjectIDs,
51+
currentCheckIndex: currentCheckIndex,
4552
entrypoint: entrypoint,
4653
lrDispatcher: lrDispatcher,
4754
checkDispatcher: checkDispatcher,
@@ -53,87 +60,53 @@ func runDispatchAndChecker(
5360
return rdc.runAndWait()
5461
}
5562

56-
type dispatchAndCheckRunner struct {
57-
parentRequest ValidatedLookupResources2Request
58-
foundResources dispatchableResourcesSubjectMap2
59-
ci cursorInformation
60-
parentStream dispatch.LookupResources2Stream
61-
newSubjectType *core.RelationReference
63+
type checkAndDispatchRunner struct {
64+
parentRequest ValidatedLookupResources2Request
65+
foundResources dispatchableResourcesSubjectMap2
66+
ci cursorInformation
67+
parentStream dispatch.LookupResources2Stream
68+
newSubjectType *core.RelationReference
69+
entrypoint typesystem.ReachabilityEntrypoint
70+
lrDispatcher dispatch.LookupResources2
71+
checkDispatcher dispatch.Check
72+
dispatchChunkSize uint16
73+
6274
filteredSubjectIDs []string
63-
entrypoint typesystem.ReachabilityEntrypoint
64-
lrDispatcher dispatch.LookupResources2
65-
checkDispatcher dispatch.Check
66-
dispatchChunkSize uint16
75+
currentCheckIndex int
6776

6877
taskrunner *taskrunner.TaskRunner
6978

7079
lock *sync.Mutex
7180
}
7281

73-
func (rdc *dispatchAndCheckRunner) dispatchAndCollect(ctx context.Context, cursor *v1.Cursor) ([]*v1.DispatchLookupResources2Response, error) {
74-
collectingStream := dispatch.NewCollectingDispatchStream[*v1.DispatchLookupResources2Response](ctx)
75-
err := rdc.lrDispatcher.DispatchLookupResources2(&v1.DispatchLookupResources2Request{
76-
ResourceRelation: rdc.parentRequest.ResourceRelation,
77-
SubjectRelation: rdc.newSubjectType,
78-
SubjectIds: rdc.filteredSubjectIDs,
79-
TerminalSubject: rdc.parentRequest.TerminalSubject,
80-
Metadata: &v1.ResolverMeta{
81-
AtRevision: rdc.parentRequest.Revision.String(),
82-
DepthRemaining: rdc.parentRequest.Metadata.DepthRemaining - 1,
83-
},
84-
OptionalCursor: cursor,
85-
OptionalLimit: uint32(rdc.dispatchChunkSize),
86-
}, collectingStream)
87-
return collectingStream.Results(), err
88-
}
89-
90-
func (rdc *dispatchAndCheckRunner) runDispatch(ctx context.Context, cursor *v1.Cursor) error {
91-
rdc.lock.Lock()
92-
if rdc.ci.limits.hasExhaustedLimit() {
93-
rdc.lock.Unlock()
94-
return nil
95-
}
96-
rdc.lock.Unlock()
97-
98-
collected, err := rdc.dispatchAndCollect(ctx, cursor)
99-
if err != nil {
100-
return err
101-
}
102-
103-
if len(collected) == 0 {
104-
return nil
105-
}
106-
107-
// Kick off a worker to filter the results via a check and then publish what was found.
82+
func (rdc *checkAndDispatchRunner) runAndWait() error {
83+
// Kick off a check at the current cursor, to filter a portion of the initial results set.
10884
rdc.taskrunner.Schedule(func(ctx context.Context) error {
109-
return rdc.runChecker(ctx, collected)
85+
return rdc.runChecker(ctx, rdc.currentCheckIndex)
11086
})
11187

112-
// Start another dispatch at the cursor of the last response, to run in the background
113-
// and collect more results for filtering while the checker is running.
114-
rdc.taskrunner.Schedule(func(ctx context.Context) error {
115-
return rdc.runDispatch(ctx, collected[len(collected)-1].AfterResponseCursor)
116-
})
117-
118-
return nil
88+
return rdc.taskrunner.Wait()
11989
}
12090

121-
func (rdc *dispatchAndCheckRunner) runChecker(ctx context.Context, collected []*v1.DispatchLookupResources2Response) error {
91+
func (rdc *checkAndDispatchRunner) runChecker(ctx context.Context, startingIndex int) error {
12292
rdc.lock.Lock()
12393
if rdc.ci.limits.hasExhaustedLimit() {
12494
rdc.lock.Unlock()
12595
return nil
12696
}
12797
rdc.lock.Unlock()
12898

129-
checkHints := make([]*v1.CheckHint, 0, len(collected))
130-
resourceIDsToCheck := make([]string, 0, len(collected))
131-
for _, resource := range collected {
132-
resourceIDsToCheck = append(resourceIDsToCheck, resource.Resource.ResourceId)
99+
endingIndex := min(startingIndex+int(rdc.dispatchChunkSize), len(rdc.filteredSubjectIDs))
100+
resourceIDsToCheck := rdc.filteredSubjectIDs[startingIndex:endingIndex]
101+
if len(resourceIDsToCheck) == 0 {
102+
return nil
103+
}
133104

105+
checkHints := make([]*v1.CheckHint, 0, len(resourceIDsToCheck))
106+
for _, resourceID := range resourceIDsToCheck {
134107
checkHint, err := hints.HintForEntrypoint(
135108
rdc.entrypoint,
136-
resource.Resource.ResourceId,
109+
resourceID,
137110
rdc.parentRequest.TerminalSubject,
138111
&v1.ResourceCheckResult{
139112
Membership: v1.ResourceCheckResult_MEMBER,
@@ -144,9 +117,10 @@ func (rdc *dispatchAndCheckRunner) runChecker(ctx context.Context, collected []*
144117
checkHints = append(checkHints, checkHint)
145118
}
146119

147-
// Batch check the results to filter to those visible and then publish just the visible resources.
120+
// NOTE: we are checking the containing permission here, *not* the target relation, as
121+
// the goal is to shear for the containing permission.
148122
resultsByResourceID, checkMetadata, err := computed.ComputeBulkCheck(ctx, rdc.checkDispatcher, computed.CheckParameters{
149-
ResourceType: rdc.parentRequest.ResourceRelation,
123+
ResourceType: rdc.newSubjectType,
150124
Subject: rdc.parentRequest.TerminalSubject,
151125
CaveatContext: rdc.parentRequest.Context.AsMap(),
152126
AtRevision: rdc.parentRequest.Revision,
@@ -158,10 +132,12 @@ func (rdc *dispatchAndCheckRunner) runChecker(ctx context.Context, collected []*
158132
return err
159133
}
160134

161-
// Publish any resources that are visible.
162-
isFirstPublishCall := true
163-
for _, resource := range collected {
164-
result, ok := resultsByResourceID[resource.Resource.ResourceId]
135+
adjustedResources := rdc.foundResources.cloneAsMutable()
136+
137+
// Dispatch any resources that are visible.
138+
resourceIDToDispatch := make([]string, 0, len(resourceIDsToCheck))
139+
for _, resourceID := range resourceIDsToCheck {
140+
result, ok := resultsByResourceID[resourceID]
165141
if !ok {
166142
continue
167143
}
@@ -171,19 +147,9 @@ func (rdc *dispatchAndCheckRunner) runChecker(ctx context.Context, collected []*
171147
fallthrough
172148

173149
case v1.ResourceCheckResult_CAVEATED_MEMBER:
174-
rdc.lock.Lock()
175-
if err := publishResultToParentStream(resource, rdc.ci, rdc.foundResources, result.MissingExprFields, isFirstPublishCall, checkMetadata, rdc.parentStream); err != nil {
176-
rdc.lock.Unlock()
177-
return err
178-
}
179-
180-
isFirstPublishCall = false
181-
182-
if rdc.ci.limits.hasExhaustedLimit() {
183-
rdc.lock.Unlock()
184-
return nil
185-
}
186-
rdc.lock.Unlock()
150+
// Record any additional caveats missing from the check.
151+
adjustedResources.withAdditionalMissingContextForDispatchedResourceID(resourceID, result.MissingExprFields)
152+
resourceIDToDispatch = append(resourceIDToDispatch, resourceID)
187153

188154
case v1.ResourceCheckResult_NOT_MEMBER:
189155
// Skip.
@@ -194,18 +160,74 @@ func (rdc *dispatchAndCheckRunner) runChecker(ctx context.Context, collected []*
194160
}
195161
}
196162

163+
if len(resourceIDToDispatch) > 0 {
164+
// Schedule a dispatch of those resources.
165+
rdc.taskrunner.Schedule(func(ctx context.Context) error {
166+
return rdc.runDispatch(ctx, resourceIDToDispatch, adjustedResources.asReadOnly(), checkMetadata, startingIndex)
167+
})
168+
}
169+
170+
// Start the next check chunk (if applicable).
171+
nextIndex := startingIndex + len(resourceIDsToCheck)
172+
if nextIndex < len(rdc.filteredSubjectIDs) {
173+
rdc.taskrunner.Schedule(func(ctx context.Context) error {
174+
return rdc.runChecker(ctx, nextIndex)
175+
})
176+
}
177+
197178
return nil
198179
}
199180

200-
func (rdc *dispatchAndCheckRunner) runAndWait() error {
201-
currentCursor := rdc.ci.currentCursor
181+
func (rdc *checkAndDispatchRunner) runDispatch(
182+
ctx context.Context,
183+
resourceIDsToDispatch []string,
184+
adjustedResources dispatchableResourcesSubjectMap2,
185+
checkMetadata *v1.ResponseMeta,
186+
startingIndex int,
187+
) error {
188+
rdc.lock.Lock()
189+
if rdc.ci.limits.hasExhaustedLimit() {
190+
rdc.lock.Unlock()
191+
return nil
192+
}
193+
rdc.lock.Unlock()
202194

203-
// Kick off a dispatch at the current cursor.
204-
rdc.taskrunner.Schedule(func(ctx context.Context) error {
205-
return rdc.runDispatch(ctx, currentCursor)
195+
// NOTE: Since we extracted a custom section from the cursor at the beginning of this run, we have to add
196+
// the starting index to the cursor to ensure that the next run starts from the correct place, and we have
197+
// to use the *updated* cursor below on the dispatch.
198+
updatedCi, err := rdc.ci.withOutgoingSection(strconv.Itoa(startingIndex))
199+
if err != nil {
200+
return err
201+
}
202+
responsePartialCursor := updatedCi.responsePartialCursor()
203+
204+
// Dispatch to the parent resource type and publish any results found.
205+
isFirstPublishCall := true
206+
207+
wrappedStream := dispatch.NewHandlingDispatchStream(ctx, func(result *v1.DispatchLookupResources2Response) error {
208+
if err := ctx.Err(); err != nil {
209+
return err
210+
}
211+
212+
if err := publishResultToParentStream(result, rdc.ci, responsePartialCursor, adjustedResources, nil, isFirstPublishCall, checkMetadata, rdc.parentStream); err != nil {
213+
return err
214+
}
215+
isFirstPublishCall = false
216+
return nil
206217
})
207218

208-
return rdc.taskrunner.Wait()
219+
return rdc.lrDispatcher.DispatchLookupResources2(&v1.DispatchLookupResources2Request{
220+
ResourceRelation: rdc.parentRequest.ResourceRelation,
221+
SubjectRelation: rdc.newSubjectType,
222+
SubjectIds: resourceIDsToDispatch,
223+
TerminalSubject: rdc.parentRequest.TerminalSubject,
224+
Metadata: &v1.ResolverMeta{
225+
AtRevision: rdc.parentRequest.Revision.String(),
226+
DepthRemaining: rdc.parentRequest.Metadata.DepthRemaining - 1,
227+
},
228+
OptionalCursor: updatedCi.currentCursor,
229+
OptionalLimit: rdc.ci.limits.currentLimit,
230+
}, wrappedStream)
209231
}
210232

211233
// unfilteredLookupResourcesDispatchStreamForEntrypoint creates a new dispatch stream that wraps
@@ -227,7 +249,7 @@ func unfilteredLookupResourcesDispatchStreamForEntrypoint(
227249
default:
228250
}
229251

230-
if err := publishResultToParentStream(result, ci, foundResources, nil, isFirstPublishCall, emptyMetadata, parentStream); err != nil {
252+
if err := publishResultToParentStream(result, ci, ci.responsePartialCursor(), foundResources, nil, isFirstPublishCall, emptyMetadata, parentStream); err != nil {
231253
return err
232254
}
233255
isFirstPublishCall = false
@@ -242,6 +264,7 @@ func unfilteredLookupResourcesDispatchStreamForEntrypoint(
242264
func publishResultToParentStream(
243265
result *v1.DispatchLookupResources2Response,
244266
ci cursorInformation,
267+
responseCursor *v1.Cursor,
245268
foundResources dispatchableResourcesSubjectMap2,
246269
additionalMissingContext []string,
247270
isFirstPublishCall bool,
@@ -261,7 +284,7 @@ func publishResultToParentStream(
261284

262285
// The cursor for the response is that of the parent response + the cursor from the result itself.
263286
afterResponseCursor, err := combineCursors(
264-
ci.responsePartialCursor(),
287+
responseCursor,
265288
result.AfterResponseCursor,
266289
)
267290
if err != nil {

0 commit comments

Comments
 (0)