From a85fd1edf7034b528bcdb6efbd6d3643dbd08d6d Mon Sep 17 00:00:00 2001 From: Claude Date: Mon, 1 Jun 2026 01:41:46 +0000 Subject: [PATCH] feat(apply): add --concurrency limit for deploy/destroy Bound how many resources run a provider lifecycle operation at once. Defaults to unbounded; defers to Effect's concurrency model via a shared semaphore that gates only the actual provider call, never the dependency wait, so a tight limit can never deadlock the DAG. - `apply(plan, { concurrency })` accepts `"unbounded" | number` (default `"unbounded"`). The limit is enforced at `instrumentLifecycle`, the single dispatch chokepoint, so it covers create/update/replace/precreate/delete across executePlan, converge, and collectGarbage with one shared permit. - `--concurrency ` flag on `deploy` and `destroy` (e.g. `--concurrency 64`). - Test harness `deploy`/`destroy` accept `ApplyOptions` so the limit is testable. - apply.test.ts: unbounded runs all at once; a limit caps the high-water mark; concurrency 1 serializes; a dependency chain doesn't deadlock under a tight limit; destroy honors the limit. --- packages/alchemy/src/Apply.ts | 66 +++++++- packages/alchemy/src/Cli/commands/_shared.ts | 16 ++ packages/alchemy/src/Cli/commands/deploy.ts | 10 +- packages/alchemy/src/Test/Core.ts | 22 ++- packages/alchemy/test/apply.test.ts | 156 +++++++++++++++++++ 5 files changed, 260 insertions(+), 10 deletions(-) diff --git a/packages/alchemy/src/Apply.ts b/packages/alchemy/src/Apply.ts index dd1580916..a97f56811 100644 --- a/packages/alchemy/src/Apply.ts +++ b/packages/alchemy/src/Apply.ts @@ -2,6 +2,7 @@ import * as Cause from "effect/Cause"; import * as Deferred from "effect/Deferred"; import * as Effect from "effect/Effect"; import * as Option from "effect/Option"; +import * as Semaphore from "effect/Semaphore"; import type { Simplify } from "effect/Types"; import { Artifacts, @@ -78,6 +79,46 @@ interface ResourceTracker { instanceId: string; } +export interface ApplyOptions { + /** + * Upper bound on the number of resources that may be running a provider + * lifecycle operation (precreate / create / update / replace / delete) at + * the same time. + * + * - `"unbounded"` (the default) places no limit — every ready resource is + * dispatched concurrently, exactly as before. + * - A positive integer (e.g. `64`) caps in-flight lifecycle operations at + * that many; additional resources queue until a slot frees up. + * + * Only the actual provider call is gated, never the dependency wait that + * precedes it, so bounding concurrency can never deadlock the DAG: a + * resource blocked on its upstreams holds no permit. + */ + readonly concurrency?: "unbounded" | number; +} + +/** + * A combinator that gates an effect on a shared concurrency permit. For + * `"unbounded"` it is the identity function; for a numeric limit it wraps the + * effect in a single permit taken from a shared {@link Semaphore}, so no more + * than `concurrency` gated effects run at once across the whole apply. + */ +export type LifecycleLimit = ( + effect: Effect.Effect, +) => Effect.Effect; + +const makeLifecycleLimit = ( + concurrency: "unbounded" | number, +): LifecycleLimit => { + if (concurrency === "unbounded") { + return (effect) => effect; + } + // One semaphore shared across every phase (executePlan, converge, + // collectGarbage) so the limit is global to the apply, not per-phase. + const semaphore = Semaphore.makeUnsafe(concurrency); + return (effect) => Semaphore.withPermits(semaphore, 1)(effect); +}; + const provideLifecycleScope = (fqn: string, instanceId: string) => ( @@ -105,6 +146,7 @@ const provideLifecycleScope = */ const instrumentLifecycle = ( + limit: LifecycleLimit, op: ResourceOp, fqn: string, resourceType: string, @@ -116,6 +158,10 @@ const instrumentLifecycle = ): Effect.Effect> => effect.pipe( provideLifecycleScope(fqn, instanceId), + // Hold a concurrency permit only around the scoped provider call. The + // metric/span below stay outside the permit so queue time isn't counted + // as operation duration. + limit, recordResourceOp(resourceType, op), Effect.withSpan(`provider.${op}`, { attributes: { @@ -130,6 +176,7 @@ const instrumentLifecycle = export const apply =

( plan: P, + options?: ApplyOptions, ): Effect.Effect< Input.Resolve, Output.InvalidReferenceError | Output.MissingSourceError | StateStoreError, @@ -143,6 +190,9 @@ export const apply =

( const stage = yield* Stage; const stackName = stack.name; + // Shared across all three phases so the cap is global to the apply. + const limit = makeLifecycleLimit(options?.concurrency ?? "unbounded"); + const tracker: Record = {}; const terminalStatuses = new Map< string, @@ -161,13 +211,14 @@ export const apply =

( state, stackName, stage, + limit, ); // TODO(sam): support roll back to previous state if errors occur during expansion // -> RISK: some UPDATEs may not be reversible (i.e. trigger replacements) // TODO(sam): should pivot be done separately? E.g shift traffic? - yield* collectGarbage(plan, session); + yield* collectGarbage(plan, session, limit); yield* converge( plan, @@ -177,6 +228,7 @@ export const apply =

( state, stackName, stage, + limit, ); yield* Effect.forEach( @@ -242,6 +294,7 @@ const executePlan = Effect.fnUntraced(function* ( }, stackName: string, stage: string, + limit: LifecycleLimit, ) { // Resources and tasks share the same FQN namespace and DAG, so the // scheduler tracks them together. Each entry gets a single Deferred that @@ -344,6 +397,7 @@ const executePlan = Effect.fnUntraced(function* ( waitForDeps, failures, plan.cycleMembers.has(fqn), + limit, ), ), { concurrency: "unbounded" }, @@ -393,6 +447,7 @@ const executeNode = ( waitForDeps: (fqns: string[]) => Effect.Effect, failures: LifecycleFailure[], inCycle: boolean, + limit: LifecycleLimit, ): Effect.Effect => Effect.gen(function* () { const logicalId = node.resource.LogicalId; @@ -583,6 +638,7 @@ const executeNode = ( }) .pipe( instrumentLifecycle( + limit, "precreate", fqn, node.resource.Type, @@ -644,6 +700,7 @@ const executeNode = ( }) .pipe( instrumentLifecycle( + limit, "create", fqn, node.resource.Type, @@ -766,6 +823,7 @@ const executeNode = ( }) .pipe( instrumentLifecycle( + limit, "update", fqn, node.resource.Type, @@ -880,6 +938,7 @@ const executeNode = ( }) .pipe( instrumentLifecycle( + limit, "precreate", fqn, node.resource.Type, @@ -942,6 +1001,7 @@ const executeNode = ( }) .pipe( instrumentLifecycle( + limit, "create", fqn, node.resource.Type, @@ -1222,6 +1282,7 @@ const converge = Effect.fnUntraced(function* ( }, stackName: string, stage: string, + limit: LifecycleLimit, ) { for (;;) { let anyUpdated = false; @@ -1276,6 +1337,7 @@ const converge = Effect.fnUntraced(function* ( }) .pipe( instrumentLifecycle( + limit, "update", fqn, node.resource.Type, @@ -1396,6 +1458,7 @@ const converge = Effect.fnUntraced(function* ( const collectGarbage = Effect.fnUntraced(function* ( plan: Plan, session: PlanStatusSession, + limit: LifecycleLimit, ) { const state = yield* State; const stack = yield* Stack; @@ -1556,6 +1619,7 @@ const collectGarbage = Effect.fnUntraced(function* ( }) .pipe( instrumentLifecycle( + limit, "delete", fqn, resourceType, diff --git a/packages/alchemy/src/Cli/commands/_shared.ts b/packages/alchemy/src/Cli/commands/_shared.ts index 458179ed4..f1641bfc6 100644 --- a/packages/alchemy/src/Cli/commands/_shared.ts +++ b/packages/alchemy/src/Cli/commands/_shared.ts @@ -136,6 +136,22 @@ export const force = Flag.boolean("force").pipe( Flag.withDefault(false), ); +/** + * `--concurrency ` caps how many resources run a provider lifecycle + * operation at the same time (e.g. `--concurrency 64`). Defers to Effect's + * own concurrency model: omitting the flag leaves the apply unbounded, exactly + * like `{ concurrency: "unbounded" }`. + */ +export const concurrency = Flag.integer("concurrency").pipe( + Flag.withSchema(S.Int.check(S.isGreaterThanOrEqualTo(1))), + Flag.withDescription( + "Max number of resources applying in parallel (e.g. 64). " + + "Defaults to unbounded (no limit).", + ), + Flag.optional, + Flag.map(Option.getOrUndefined), +); + export const script = Argument.file("main", { mustExist: true, }).pipe( diff --git a/packages/alchemy/src/Cli/commands/deploy.ts b/packages/alchemy/src/Cli/commands/deploy.ts index 6236122ac..b36a90813 100644 --- a/packages/alchemy/src/Cli/commands/deploy.ts +++ b/packages/alchemy/src/Cli/commands/deploy.ts @@ -21,6 +21,7 @@ import { loadConfigProvider } from "../../Util/ConfigProvider.ts"; import { fileLogger } from "../../Util/FileLogger.ts"; import { + concurrency, dryRun as dryRunFlag, envFile, force, @@ -43,6 +44,7 @@ export const ExecStackOptions = Schema.Struct({ destroy: Schema.optional(Schema.Boolean), dev: Schema.optional(Schema.Boolean), adopt: Schema.optional(Schema.Boolean), + concurrency: Schema.optional(Schema.Number), }); export type ExecStackOptions = typeof ExecStackOptions.Type; export type ExecStackOptionsEncoded = typeof ExecStackOptions.Encoded; @@ -56,6 +58,7 @@ const stackSpanAttrs = (args: ExecStackOptions) => ({ "alchemy.destroy": !!args.destroy, "alchemy.dev": !!args.dev, "alchemy.adopt": !!args.adopt, + "alchemy.concurrency": args.concurrency ?? "unbounded", }); const adopt = Flag.boolean("adopt").pipe( @@ -77,6 +80,7 @@ export const execStack = Effect.fn(function* ({ destroy = false, dev = false, adopt = false, + concurrency, }: ExecStackOptions) { const stackEffect = yield* importStack(main); @@ -147,7 +151,9 @@ export const execStack = Effect.fn(function* ({ return; } } - const outputs = yield* apply(updatePlan); + const outputs = yield* apply(updatePlan, { + concurrency: concurrency ?? "unbounded", + }); if (outputs !== undefined) { yield* Console.log(outputs); @@ -172,6 +178,7 @@ export const deployCommand = Command.make( yes, profile, adopt, + concurrency, }, instrumentCommand("deploy", stackSpanAttrs)(execStack), ); @@ -185,6 +192,7 @@ export const destroyCommand = Command.make( stage, yes, profile, + concurrency, }, instrumentCommand( "destroy", diff --git a/packages/alchemy/src/Test/Core.ts b/packages/alchemy/src/Test/Core.ts index 121056e99..3a80d01a1 100644 --- a/packages/alchemy/src/Test/Core.ts +++ b/packages/alchemy/src/Test/Core.ts @@ -7,7 +7,7 @@ import * as FetchHttpClient from "effect/unstable/http/FetchHttpClient"; import { AdoptPolicy } from "../AdoptPolicy.ts"; import { AlchemyContext, AlchemyContextLive } from "../AlchemyContext.ts"; -import { apply } from "../Apply.ts"; +import { apply, type ApplyOptions } from "../Apply.ts"; import { provideFreshArtifactStore } from "../Artifacts.ts"; import { AuthProviders } from "../Auth/AuthProvider.ts"; import { CredentialsStoreLive } from "../Auth/Credentials.ts"; @@ -206,8 +206,9 @@ export interface ScratchStack { readonly state: Layer.Layer; deploy( effect: Effect.Effect, + options?: ApplyOptions, ): Effect.Effect, any, Exclude>; - destroy(): Effect.Effect; + destroy(options?: ApplyOptions): Effect.Effect; } const sanitizeStackName = (name: string) => @@ -233,7 +234,10 @@ export const scratchStack = ( State.InMemoryService(inMemory), ); - const buildAndApply = (effect: Effect.Effect) => + const buildAndApply = ( + effect: Effect.Effect, + applyOptions?: ApplyOptions, + ) => (effect as Effect.Effect).pipe( makeStack({ name: stackName, @@ -242,7 +246,7 @@ export const scratchStack = ( } as any) as any, Effect.flatMap((compiled: any) => Plan.make(compiled).pipe( - Effect.flatMap(apply), + Effect.flatMap((plan) => apply(plan, applyOptions)), Effect.provide(compiled.services), ), ), @@ -253,9 +257,11 @@ export const scratchStack = ( return { name: stackName, state: stateLayer, - deploy: ((effect: Effect.Effect) => - buildAndApply(effect)) as ScratchStack["deploy"], - destroy: () => + deploy: (( + effect: Effect.Effect, + applyOptions?: ApplyOptions, + ) => buildAndApply(effect, applyOptions)) as ScratchStack["deploy"], + destroy: (applyOptions?: ApplyOptions) => Plan.make({ name: stackName, stage, @@ -264,7 +270,7 @@ export const scratchStack = ( actions: {}, output: {}, }).pipe( - Effect.flatMap(apply), + Effect.flatMap((plan) => apply(plan, applyOptions)), Effect.asVoid, Effect.provide(stateLayer), Effect.provide(options.providers as Layer.Layer), diff --git a/packages/alchemy/test/apply.test.ts b/packages/alchemy/test/apply.test.ts index 74ffeed0c..10468699e 100644 --- a/packages/alchemy/test/apply.test.ts +++ b/packages/alchemy/test/apply.test.ts @@ -4411,3 +4411,159 @@ describe("Duration round-trip through state", () => { }), ); }); + +describe("concurrency limit", () => { + // Probe that records the high-water mark of resources running their + // provider lifecycle (reconcile) at the same time. The `create` hook is + // invoked from inside `reconcile`, which is exactly the operation gated by + // the apply's concurrency semaphore, so `max` reflects in-flight permits. + const makeConcurrencyProbe = (holdMillis = 50) => { + const state = { current: 0, max: 0 }; + const probe = Layer.succeed(TestResourceHooks, { + create: () => + Effect.gen(function* () { + yield* Effect.sync(() => { + state.current += 1; + state.max = Math.max(state.max, state.current); + }); + // Hold the permit long enough that every concurrently-admitted + // reconcile overlaps before any of them releases. + yield* Effect.sleep(Duration.millis(holdMillis)); + yield* Effect.sync(() => { + state.current -= 1; + }); + }), + }); + return { state, probe }; + }; + + // N resources with no dependencies between them — every node is "ready" + // immediately, so without a limit they all reconcile at once. + const independentResources = (count: number) => + Effect.gen(function* () { + for (let i = 0; i < count; i++) { + yield* TestResource(`R${i}`, { string: `v${i}` }); + } + }); + + const expectCreated = Effect.fn(function* (id: string, string: string) { + const s = (yield* getState(id)) as unknown as { + status: string; + attr: { string: string }; + }; + expect(s?.status).toEqual("created"); + expect(s?.attr?.string).toEqual(string); + }); + + test.provider( + "defaults to unbounded — every ready resource applies at once", + (stack) => + Effect.gen(function* () { + const count = 8; + const { state, probe } = makeConcurrencyProbe(); + + yield* stack + .deploy(independentResources(count)) + .pipe(Effect.provide(probe)); + + // No limit supplied → all `count` reconciles run concurrently. + expect(state.max).toBe(count); + for (let i = 0; i < count; i++) { + yield* expectCreated(`R${i}`, `v${i}`); + } + }), + { timeout: 20_000 }, + ); + + test.provider( + "caps the number of resources applying at the same time", + (stack) => + Effect.gen(function* () { + const count = 12; + const limit = 4; + const { state, probe } = makeConcurrencyProbe(); + + yield* stack + .deploy(independentResources(count), { concurrency: limit }) + .pipe(Effect.provide(probe)); + + // Never more than `limit` reconciles in flight, yet the cap is + // actually reached (count > limit and they all overlap). + expect(state.max).toBeLessThanOrEqual(limit); + expect(state.max).toBe(limit); + + // …and every resource still converges. + for (let i = 0; i < count; i++) { + yield* expectCreated(`R${i}`, `v${i}`); + } + }), + { timeout: 20_000 }, + ); + + test.provider( + "concurrency of 1 fully serializes lifecycle operations", + (stack) => + Effect.gen(function* () { + const count = 5; + const { state, probe } = makeConcurrencyProbe(10); + + yield* stack + .deploy(independentResources(count), { concurrency: 1 }) + .pipe(Effect.provide(probe)); + + expect(state.max).toBe(1); + for (let i = 0; i < count; i++) { + expect((yield* getState(`R${i}`))?.status).toEqual("created"); + } + }), + { timeout: 20_000 }, + ); + + // Regression guard for the permit-after-wait design: a bounded apply must + // only hold a permit around the actual provider call, never around the + // dependency wait. A chain longer than the limit would deadlock if a + // resource held its permit while blocked on an upstream that can't acquire + // one. With `concurrency: 1` and a 5-deep chain this would hang forever if + // the gate were placed around the wait. + const chain = (count: number) => + Effect.gen(function* () { + let prev = yield* TestResource("C0", { string: "base" }); + for (let i = 1; i < count; i++) { + prev = yield* TestResource(`C${i}`, { string: prev.string }); + } + return prev.string; + }); + + test.provider( + "a dependency chain does not deadlock under a tight limit", + (stack) => + Effect.gen(function* () { + const out = yield* stack.deploy(chain(5), { concurrency: 1 }); + + // The value must propagate the whole way down the chain. + expect(out).toEqual("base"); + for (let i = 0; i < 5; i++) { + expect((yield* getState(`C${i}`))?.status).toEqual("created"); + } + }), + { timeout: 20_000 }, + ); + + test.provider("destroy honors the concurrency limit", (stack) => + Effect.gen(function* () { + const count = 6; + yield* stack.deploy(independentResources(count)); + for (let i = 0; i < count; i++) { + expect((yield* getState(`R${i}`))?.status).toEqual("created"); + } + + // A bounded destroy must still tear everything down. + yield* stack.destroy({ concurrency: 2 }); + + for (let i = 0; i < count; i++) { + expect(yield* getState(`R${i}`)).toBeUndefined(); + } + expect(yield* listState()).toEqual([]); + }), + ); +});