Skip to content

Commit ccd6adb

Browse files
authored
[FSSDK-11898] serialize concurrent cmab service calls (#1086)
The cmab service caches the results of a cmab prediction retrieve from the server and returns it for subsequent call. This ensures a consistent value is returned for getDecision() within the cache ttl. However, when there is no cached value, if there is concurrent calls to gertDecision() for same userId and ruleId combination, all of these will cause a call to the server and may potentially return different values. The solution is to run concurrent calls for same userId and ruleId combinations one after another. To achieve this, we put each (userId, ruleId) combination in one of the predefined bucktes by hashing the (userId, ruleId) combination and serialize all calls for that particular hash % (num_buckets).
1 parent f04de07 commit ccd6adb

File tree

4 files changed

+293
-1
lines changed

4 files changed

+293
-1
lines changed

lib/core/decision_service/cmab/cmab_service.spec.ts

Lines changed: 90 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,29 @@
1-
import { describe, it, expect, vi, Mocked, Mock, MockInstance, beforeEach, afterEach } from 'vitest';
1+
/**
2+
* Copyright 2025, Optimizely
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
import { describe, it, expect, vi } from 'vitest';
218

319
import { DefaultCmabService } from './cmab_service';
420
import { getMockSyncCache } from '../../../tests/mock/mock_cache';
521
import { ProjectConfig } from '../../../project_config/project_config';
622
import { OptimizelyDecideOption, UserAttributes } from '../../../shared_types';
723
import OptimizelyUserContext from '../../../optimizely_user_context';
824
import { validate as uuidValidate } from 'uuid';
25+
import { resolvablePromise } from '../../../utils/promise/resolvablePromise';
26+
import { exhaustMicrotasks } from '../../../tests/testUtils';
927

1028
const mockProjectConfig = (): ProjectConfig => ({
1129
experimentIdMap: {
@@ -418,4 +436,75 @@ describe('DefaultCmabService', () => {
418436

419437
expect(mockCmabClient.fetchDecision).toHaveBeenCalledTimes(2);
420438
});
439+
440+
it('should serialize concurrent calls to getDecision with the same userId and ruleId', async () => {
441+
const nCall = 10;
442+
let currentVar = 123;
443+
const fetchPromises = Array.from({ length: nCall }, () => resolvablePromise());
444+
445+
let callCount = 0;
446+
const mockCmabClient = {
447+
fetchDecision: vi.fn().mockImplementation(async () => {
448+
const variation = `${currentVar++}`;
449+
await fetchPromises[callCount++];
450+
return variation;
451+
}),
452+
};
453+
454+
const cmabService = new DefaultCmabService({
455+
cmabCache: getMockSyncCache(),
456+
cmabClient: mockCmabClient,
457+
});
458+
459+
const projectConfig = mockProjectConfig();
460+
const userContext = mockUserContext('user123', {});
461+
462+
const resultPromises = [];
463+
for (let i = 0; i < nCall; i++) {
464+
resultPromises.push(cmabService.getDecision(projectConfig, userContext, '1234', {}));
465+
}
466+
467+
await exhaustMicrotasks();
468+
469+
expect(mockCmabClient.fetchDecision).toHaveBeenCalledTimes(1);
470+
471+
for(let i = 0; i < nCall; i++) {
472+
fetchPromises[i].resolve('');
473+
await exhaustMicrotasks();
474+
const result = await resultPromises[i];
475+
expect(result.variationId).toBe('123');
476+
expect(mockCmabClient.fetchDecision).toHaveBeenCalledTimes(1);
477+
}
478+
});
479+
480+
it('should not serialize calls to getDecision with different userId or ruleId', async () => {
481+
let currentVar = 123;
482+
const mockCmabClient = {
483+
fetchDecision: vi.fn().mockImplementation(() => Promise.resolve(`${currentVar++}`)),
484+
};
485+
486+
const cmabService = new DefaultCmabService({
487+
cmabCache: getMockSyncCache(),
488+
cmabClient: mockCmabClient,
489+
});
490+
491+
const projectConfig = mockProjectConfig();
492+
const userContext1 = mockUserContext('user123', {});
493+
const userContext2 = mockUserContext('user456', {});
494+
495+
const resultPromises = [];
496+
resultPromises.push(cmabService.getDecision(projectConfig, userContext1, '1234', {}));
497+
resultPromises.push(cmabService.getDecision(projectConfig, userContext1, '5678', {}));
498+
resultPromises.push(cmabService.getDecision(projectConfig, userContext2, '1234', {}));
499+
resultPromises.push(cmabService.getDecision(projectConfig, userContext2, '5678', {}));
500+
501+
await exhaustMicrotasks();
502+
503+
expect(mockCmabClient.fetchDecision).toHaveBeenCalledTimes(4);
504+
505+
for(let i = 0; i < resultPromises.length; i++) {
506+
const result = await resultPromises[i];
507+
expect(result.variationId).toBe(`${123 + i}`);
508+
}
509+
});
421510
});

lib/core/decision_service/cmab/cmab_service.ts

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import { CmabClient } from "./cmab_client";
2323
import { v4 as uuidV4 } from 'uuid';
2424
import murmurhash from "murmurhash";
2525
import { DecideOptionsMap } from "..";
26+
import { SerialRunner } from "../../../utils/executor/serial_runner";
2627

2728
export type CmabDecision = {
2829
variationId: string,
@@ -57,22 +58,45 @@ export type CmabServiceOptions = {
5758
cmabClient: CmabClient;
5859
}
5960

61+
const SERIALIZER_BUCKETS = 1000;
62+
6063
export class DefaultCmabService implements CmabService {
6164
private cmabCache: CacheWithRemove<CmabCacheValue>;
6265
private cmabClient: CmabClient;
6366
private logger?: LoggerFacade;
67+
private serializers: SerialRunner[] = Array.from(
68+
{ length: SERIALIZER_BUCKETS }, () => new SerialRunner()
69+
);
6470

6571
constructor(options: CmabServiceOptions) {
6672
this.cmabCache = options.cmabCache;
6773
this.cmabClient = options.cmabClient;
6874
this.logger = options.logger;
6975
}
7076

77+
private getSerializerIndex(userId: string, experimentId: string): number {
78+
const key = this.getCacheKey(userId, experimentId);
79+
const hash = murmurhash.v3(key);
80+
return Math.abs(hash) % SERIALIZER_BUCKETS;
81+
}
82+
7183
async getDecision(
7284
projectConfig: ProjectConfig,
7385
userContext: IOptimizelyUserContext,
7486
ruleId: string,
7587
options: DecideOptionsMap,
88+
): Promise<CmabDecision> {
89+
const serializerIndex = this.getSerializerIndex(userContext.getUserId(), ruleId);
90+
return this.serializers[serializerIndex].run(() =>
91+
this.getDecisionInternal(projectConfig, userContext, ruleId, options)
92+
);
93+
}
94+
95+
private async getDecisionInternal(
96+
projectConfig: ProjectConfig,
97+
userContext: IOptimizelyUserContext,
98+
ruleId: string,
99+
options: DecideOptionsMap,
76100
): Promise<CmabDecision> {
77101
const filteredAttributes = this.filterAttributes(projectConfig, userContext, ruleId);
78102

Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
/**
2+
* Copyright 2025, Optimizely
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
import { describe, it, expect, beforeEach } from 'vitest';
17+
18+
import { SerialRunner } from './serial_runner';
19+
import { resolvablePromise } from '../promise/resolvablePromise';
20+
import { exhaustMicrotasks } from '../../tests/testUtils';
21+
22+
describe('SerialRunner', () => {
23+
let serialRunner: SerialRunner;
24+
25+
beforeEach(() => {
26+
serialRunner = new SerialRunner();
27+
});
28+
29+
it('should return result from a single async function', async () => {
30+
const fn = () => Promise.resolve('result');
31+
32+
const result = await serialRunner.run(fn);
33+
34+
expect(result).toBe('result');
35+
});
36+
37+
it('should reject with same error when the passed function rejects', async () => {
38+
const error = new Error('test error');
39+
const fn = () => Promise.reject(error);
40+
41+
await expect(serialRunner.run(fn)).rejects.toThrow(error);
42+
});
43+
44+
it('should execute multiple async functions in order', async () => {
45+
const executionOrder: number[] = [];
46+
const promises = [resolvablePromise(), resolvablePromise(), resolvablePromise()];
47+
48+
const createTask = (id: number) => async () => {
49+
executionOrder.push(id);
50+
await promises[id];
51+
return id;
52+
};
53+
54+
const results = [serialRunner.run(createTask(0)), serialRunner.run(createTask(1)), serialRunner.run(createTask(2))];
55+
56+
// only first task should have started
57+
await exhaustMicrotasks();
58+
expect(executionOrder).toEqual([0]);
59+
60+
// Resolve first task - second should start
61+
promises[0].resolve('');
62+
await exhaustMicrotasks();
63+
expect(executionOrder).toEqual([0, 1]);
64+
65+
// Resolve second task - third should start
66+
promises[1].resolve('');
67+
await exhaustMicrotasks();
68+
expect(executionOrder).toEqual([0, 1, 2]);
69+
70+
// Resolve third task - all done
71+
promises[2].resolve('');
72+
73+
// Verify all results are correct
74+
expect(await results[0]).toBe(0);
75+
expect(await results[1]).toBe(1);
76+
expect(await results[2]).toBe(2);
77+
});
78+
79+
it('should continue execution even if one function throws an error', async () => {
80+
const executionOrder: number[] = [];
81+
const promises = [resolvablePromise(), resolvablePromise(), resolvablePromise()];
82+
83+
const createTask = (id: number) => async () => {
84+
executionOrder.push(id);
85+
await promises[id];
86+
return id;
87+
};
88+
89+
const results = [serialRunner.run(createTask(0)), serialRunner.run(createTask(1)), serialRunner.run(createTask(2))];
90+
91+
// only first task should have started
92+
await exhaustMicrotasks();
93+
expect(executionOrder).toEqual([0]);
94+
95+
// reject first task - second should still start
96+
promises[0].reject(new Error('first error'));
97+
await exhaustMicrotasks();
98+
expect(executionOrder).toEqual([0, 1]);
99+
100+
// reject second task - third should still start
101+
promises[1].reject(new Error('second error'));
102+
await exhaustMicrotasks();
103+
expect(executionOrder).toEqual([0, 1, 2]);
104+
105+
// Resolve third task - all done
106+
promises[2].resolve('');
107+
108+
// Verify results - first and third succeed, second fails
109+
await expect(results[0]).rejects.toThrow('first error');
110+
await expect(results[1]).rejects.toThrow('second error');
111+
await expect(results[2]).resolves.toBe(2);
112+
});
113+
114+
it('should handle functions that return different types', async () => {
115+
const numberFn = () => Promise.resolve(42);
116+
const stringFn = () => Promise.resolve('hello');
117+
const objectFn = () => Promise.resolve({ key: 'value' });
118+
const arrayFn = () => Promise.resolve([1, 2, 3]);
119+
const booleanFn = () => Promise.resolve(true);
120+
const nullFn = () => Promise.resolve(null);
121+
const undefinedFn = () => Promise.resolve(undefined);
122+
123+
const results = await Promise.all([
124+
serialRunner.run(numberFn),
125+
serialRunner.run(stringFn),
126+
serialRunner.run(objectFn),
127+
serialRunner.run(arrayFn),
128+
serialRunner.run(booleanFn),
129+
serialRunner.run(nullFn),
130+
serialRunner.run(undefinedFn),
131+
]);
132+
133+
expect(results).toEqual([42, 'hello', { key: 'value' }, [1, 2, 3], true, null, undefined]);
134+
});
135+
136+
it('should handle empty function that returns undefined', async () => {
137+
const emptyFn = () => Promise.resolve(undefined);
138+
139+
const result = await serialRunner.run(emptyFn);
140+
141+
expect(result).toBeUndefined();
142+
});
143+
});
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/**
2+
* Copyright 2025, Optimizely
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
import { AsyncProducer } from "../type";
18+
19+
class SerialRunner {
20+
private waitPromise: Promise<unknown> = Promise.resolve();
21+
22+
// each call to serialize adds a new function to the end of the promise chain
23+
// the function is called when the previous promise resolves
24+
// if the function throws, the error is caught and ignored to allow the chain to continue
25+
// the result of the function is returned as a promise
26+
// if multiple calls to serialize are made, they will be executed in order
27+
// even if some of them throw errors
28+
29+
run<T>(fn: AsyncProducer<T>): Promise<T> {
30+
const resultPromise = this.waitPromise.then(fn);
31+
this.waitPromise = resultPromise.catch(() => {});
32+
return resultPromise;
33+
}
34+
}
35+
36+
export { SerialRunner };

0 commit comments

Comments
 (0)