Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 65 additions & 1 deletion packages/alchemy/src/Apply.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import * as Cause from "effect/Cause";
import * as Deferred from "effect/Deferred";
import * as Effect from "effect/Effect";
import * as Option from "effect/Option";
import * as Semaphore from "effect/Semaphore";
import type { Simplify } from "effect/Types";
import {
Artifacts,
Expand Down Expand Up @@ -78,6 +79,46 @@ interface ResourceTracker {
instanceId: string;
}

export interface ApplyOptions {
/**
* Upper bound on the number of resources that may be running a provider
* lifecycle operation (precreate / create / update / replace / delete) at
* the same time.
*
* - `"unbounded"` (the default) places no limit — every ready resource is
* dispatched concurrently, exactly as before.
* - A positive integer (e.g. `64`) caps in-flight lifecycle operations at
* that many; additional resources queue until a slot frees up.
*
* Only the actual provider call is gated, never the dependency wait that
* precedes it, so bounding concurrency can never deadlock the DAG: a
* resource blocked on its upstreams holds no permit.
*/
readonly concurrency?: "unbounded" | number;
}

/**
* A combinator that gates an effect on a shared concurrency permit. For
* `"unbounded"` it is the identity function; for a numeric limit it wraps the
* effect in a single permit taken from a shared {@link Semaphore}, so no more
* than `concurrency` gated effects run at once across the whole apply.
*/
export type LifecycleLimit = <A, E, R>(
effect: Effect.Effect<A, E, R>,
) => Effect.Effect<A, E, R>;

const makeLifecycleLimit = (
concurrency: "unbounded" | number,
): LifecycleLimit => {
if (concurrency === "unbounded") {
return (effect) => effect;
}
// One semaphore shared across every phase (executePlan, converge,
// collectGarbage) so the limit is global to the apply, not per-phase.
const semaphore = Semaphore.makeUnsafe(concurrency);
return (effect) => Semaphore.withPermits(semaphore, 1)(effect);
};

const provideLifecycleScope =
(fqn: string, instanceId: string) =>
<A, E, R>(
Expand Down Expand Up @@ -105,6 +146,7 @@ const provideLifecycleScope =
*/
const instrumentLifecycle =
(
limit: LifecycleLimit,
op: ResourceOp,
fqn: string,
resourceType: string,
Expand All @@ -116,6 +158,10 @@ const instrumentLifecycle =
): Effect.Effect<A, E, Exclude<R, InstanceId | Artifacts>> =>
effect.pipe(
provideLifecycleScope(fqn, instanceId),
// Hold a concurrency permit only around the scoped provider call. The
// metric/span below stay outside the permit so queue time isn't counted
// as operation duration.
limit,
recordResourceOp(resourceType, op),
Effect.withSpan(`provider.${op}`, {
attributes: {
Expand All @@ -130,6 +176,7 @@ const instrumentLifecycle =

export const apply = <P extends Plan>(
plan: P,
options?: ApplyOptions,
): Effect.Effect<
Input.Resolve<P["output"]>,
Output.InvalidReferenceError | Output.MissingSourceError | StateStoreError,
Expand All @@ -143,6 +190,9 @@ export const apply = <P extends Plan>(
const stage = yield* Stage;
const stackName = stack.name;

// Shared across all three phases so the cap is global to the apply.
const limit = makeLifecycleLimit(options?.concurrency ?? "unbounded");

const tracker: Record<string, ResourceTracker> = {};
const terminalStatuses = new Map<
string,
Expand All @@ -161,13 +211,14 @@ export const apply = <P extends Plan>(
state,
stackName,
stage,
limit,
);

// TODO(sam): support roll back to previous state if errors occur during expansion
// -> RISK: some UPDATEs may not be reversible (i.e. trigger replacements)
// TODO(sam): should pivot be done separately? E.g shift traffic?

yield* collectGarbage(plan, session);
yield* collectGarbage(plan, session, limit);

yield* converge(
plan,
Expand All @@ -177,6 +228,7 @@ export const apply = <P extends Plan>(
state,
stackName,
stage,
limit,
);

yield* Effect.forEach(
Expand Down Expand Up @@ -242,6 +294,7 @@ const executePlan = Effect.fnUntraced(function* (
},
stackName: string,
stage: string,
limit: LifecycleLimit,
) {
// Resources and tasks share the same FQN namespace and DAG, so the
// scheduler tracks them together. Each entry gets a single Deferred that
Expand Down Expand Up @@ -344,6 +397,7 @@ const executePlan = Effect.fnUntraced(function* (
waitForDeps,
failures,
plan.cycleMembers.has(fqn),
limit,
),
),
{ concurrency: "unbounded" },
Expand Down Expand Up @@ -393,6 +447,7 @@ const executeNode = (
waitForDeps: (fqns: string[]) => Effect.Effect<void[], never, never>,
failures: LifecycleFailure[],
inCycle: boolean,
limit: LifecycleLimit,
): Effect.Effect<void, never, never> =>
Effect.gen(function* () {
const logicalId = node.resource.LogicalId;
Expand Down Expand Up @@ -583,6 +638,7 @@ const executeNode = (
})
.pipe(
instrumentLifecycle(
limit,
"precreate",
fqn,
node.resource.Type,
Expand Down Expand Up @@ -644,6 +700,7 @@ const executeNode = (
})
.pipe(
instrumentLifecycle(
limit,
"create",
fqn,
node.resource.Type,
Expand Down Expand Up @@ -766,6 +823,7 @@ const executeNode = (
})
.pipe(
instrumentLifecycle(
limit,
"update",
fqn,
node.resource.Type,
Expand Down Expand Up @@ -880,6 +938,7 @@ const executeNode = (
})
.pipe(
instrumentLifecycle(
limit,
"precreate",
fqn,
node.resource.Type,
Expand Down Expand Up @@ -942,6 +1001,7 @@ const executeNode = (
})
.pipe(
instrumentLifecycle(
limit,
"create",
fqn,
node.resource.Type,
Expand Down Expand Up @@ -1222,6 +1282,7 @@ const converge = Effect.fnUntraced(function* (
},
stackName: string,
stage: string,
limit: LifecycleLimit,
) {
for (;;) {
let anyUpdated = false;
Expand Down Expand Up @@ -1276,6 +1337,7 @@ const converge = Effect.fnUntraced(function* (
})
.pipe(
instrumentLifecycle(
limit,
"update",
fqn,
node.resource.Type,
Expand Down Expand Up @@ -1396,6 +1458,7 @@ const converge = Effect.fnUntraced(function* (
const collectGarbage = Effect.fnUntraced(function* (
plan: Plan,
session: PlanStatusSession,
limit: LifecycleLimit,
) {
const state = yield* State;
const stack = yield* Stack;
Expand Down Expand Up @@ -1556,6 +1619,7 @@ const collectGarbage = Effect.fnUntraced(function* (
})
.pipe(
instrumentLifecycle(
limit,
"delete",
fqn,
resourceType,
Expand Down
16 changes: 16 additions & 0 deletions packages/alchemy/src/Cli/commands/_shared.ts
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,22 @@ export const force = Flag.boolean("force").pipe(
Flag.withDefault(false),
);

/**
* `--concurrency <n>` caps how many resources run a provider lifecycle
* operation at the same time (e.g. `--concurrency 64`). Defers to Effect's
* own concurrency model: omitting the flag leaves the apply unbounded, exactly
* like `{ concurrency: "unbounded" }`.
*/
export const concurrency = Flag.integer("concurrency").pipe(
Flag.withSchema(S.Int.check(S.isGreaterThanOrEqualTo(1))),
Flag.withDescription(
"Max number of resources applying in parallel (e.g. 64). " +
"Defaults to unbounded (no limit).",
),
Flag.optional,
Flag.map(Option.getOrUndefined),
);

export const script = Argument.file("main", {
mustExist: true,
}).pipe(
Expand Down
10 changes: 9 additions & 1 deletion packages/alchemy/src/Cli/commands/deploy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import { loadConfigProvider } from "../../Util/ConfigProvider.ts";
import { fileLogger } from "../../Util/FileLogger.ts";

import {
concurrency,
dryRun as dryRunFlag,
envFile,
force,
Expand All @@ -43,6 +44,7 @@ export const ExecStackOptions = Schema.Struct({
destroy: Schema.optional(Schema.Boolean),
dev: Schema.optional(Schema.Boolean),
adopt: Schema.optional(Schema.Boolean),
concurrency: Schema.optional(Schema.Number),
});
export type ExecStackOptions = typeof ExecStackOptions.Type;
export type ExecStackOptionsEncoded = typeof ExecStackOptions.Encoded;
Expand All @@ -56,6 +58,7 @@ const stackSpanAttrs = (args: ExecStackOptions) => ({
"alchemy.destroy": !!args.destroy,
"alchemy.dev": !!args.dev,
"alchemy.adopt": !!args.adopt,
"alchemy.concurrency": args.concurrency ?? "unbounded",
});

const adopt = Flag.boolean("adopt").pipe(
Expand All @@ -77,6 +80,7 @@ export const execStack = Effect.fn(function* ({
destroy = false,
dev = false,
adopt = false,
concurrency,
}: ExecStackOptions) {
const stackEffect = yield* importStack(main);

Expand Down Expand Up @@ -147,7 +151,9 @@ export const execStack = Effect.fn(function* ({
return;
}
}
const outputs = yield* apply(updatePlan);
const outputs = yield* apply(updatePlan, {
concurrency: concurrency ?? "unbounded",
});

if (outputs !== undefined) {
yield* Console.log(outputs);
Expand All @@ -172,6 +178,7 @@ export const deployCommand = Command.make(
yes,
profile,
adopt,
concurrency,
},
instrumentCommand("deploy", stackSpanAttrs)(execStack),
);
Expand All @@ -185,6 +192,7 @@ export const destroyCommand = Command.make(
stage,
yes,
profile,
concurrency,
},
instrumentCommand(
"destroy",
Expand Down
22 changes: 14 additions & 8 deletions packages/alchemy/src/Test/Core.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import * as FetchHttpClient from "effect/unstable/http/FetchHttpClient";

import { AdoptPolicy } from "../AdoptPolicy.ts";
import { AlchemyContext, AlchemyContextLive } from "../AlchemyContext.ts";
import { apply } from "../Apply.ts";
import { apply, type ApplyOptions } from "../Apply.ts";
import { provideFreshArtifactStore } from "../Artifacts.ts";
import { AuthProviders } from "../Auth/AuthProvider.ts";
import { CredentialsStoreLive } from "../Auth/Credentials.ts";
Expand Down Expand Up @@ -206,8 +206,9 @@ export interface ScratchStack<ROut = any> {
readonly state: Layer.Layer<State.State, never, never>;
deploy<A, E, R>(
effect: Effect.Effect<A, E, R>,
options?: ApplyOptions,
): Effect.Effect<Input.Resolve<A>, any, Exclude<R, ROut | StackServices>>;
destroy(): Effect.Effect<void, any, never>;
destroy(options?: ApplyOptions): Effect.Effect<void, any, never>;
}

const sanitizeStackName = (name: string) =>
Expand All @@ -233,7 +234,10 @@ export const scratchStack = <ROut>(
State.InMemoryService(inMemory),
);

const buildAndApply = (effect: Effect.Effect<any, any, any>) =>
const buildAndApply = (
effect: Effect.Effect<any, any, any>,
applyOptions?: ApplyOptions,
) =>
(effect as Effect.Effect<any, any, never>).pipe(
makeStack({
name: stackName,
Expand All @@ -242,7 +246,7 @@ export const scratchStack = <ROut>(
} as any) as any,
Effect.flatMap((compiled: any) =>
Plan.make(compiled).pipe(
Effect.flatMap(apply),
Effect.flatMap((plan) => apply(plan, applyOptions)),
Effect.provide(compiled.services),
),
),
Expand All @@ -253,9 +257,11 @@ export const scratchStack = <ROut>(
return {
name: stackName,
state: stateLayer,
deploy: ((effect: Effect.Effect<any, any, any>) =>
buildAndApply(effect)) as ScratchStack<ROut>["deploy"],
destroy: () =>
deploy: ((
effect: Effect.Effect<any, any, any>,
applyOptions?: ApplyOptions,
) => buildAndApply(effect, applyOptions)) as ScratchStack<ROut>["deploy"],
destroy: (applyOptions?: ApplyOptions) =>
Plan.make({
name: stackName,
stage,
Expand All @@ -264,7 +270,7 @@ export const scratchStack = <ROut>(
actions: {},
output: {},
}).pipe(
Effect.flatMap(apply),
Effect.flatMap((plan) => apply(plan, applyOptions)),
Effect.asVoid,
Effect.provide(stateLayer),
Effect.provide(options.providers as Layer.Layer<any, never, any>),
Expand Down
Loading
Loading