Skip to content

Commit d11c537

Browse files
committed
[heft] Factor out WatchLoop
1 parent ae84107 commit d11c537

39 files changed

+1402
-351
lines changed

apps/heft/package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
"dependencies": {
3737
"@rushstack/heft-config-file": "workspace:*",
3838
"@rushstack/node-core-library": "workspace:*",
39+
"@rushstack/operation-graph": "workspace:*",
3940
"@rushstack/rig-package": "workspace:*",
4041
"@rushstack/ts-command-line": "workspace:*",
4142
"@types/tapable": "1.0.6",

apps/heft/src/cli/HeftActionRunner.ts

Lines changed: 79 additions & 112 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,14 @@ import {
1313
type ITerminal,
1414
type IPackageJson
1515
} from '@rushstack/node-core-library';
16+
import {
17+
type IOperationExecutionOptions,
18+
type IWatchLoopState,
19+
Operation,
20+
OperationExecutionManager,
21+
OperationStatus,
22+
WatchLoop
23+
} from '@rushstack/operation-graph';
1624
import type {
1725
CommandLineFlagParameter,
1826
CommandLineParameterProvider,
@@ -24,11 +32,6 @@ import type { HeftConfiguration } from '../configuration/HeftConfiguration';
2432
import type { LoggingManager } from '../pluginFramework/logging/LoggingManager';
2533
import type { MetricsCollector } from '../metrics/MetricsCollector';
2634
import { HeftParameterManager } from '../pluginFramework/HeftParameterManager';
27-
import {
28-
OperationExecutionManager,
29-
type IOperationExecutionOptions
30-
} from '../operations/OperationExecutionManager';
31-
import { Operation } from '../operations/Operation';
3235
import { TaskOperationRunner } from '../operations/runners/TaskOperationRunner';
3336
import { PhaseOperationRunner } from '../operations/runners/PhaseOperationRunner';
3437
import type { HeftPhase } from '../pluginFramework/HeftPhase';
@@ -41,10 +44,8 @@ import type {
4144
} from '../pluginFramework/HeftLifecycleSession';
4245
import type { HeftLifecycle } from '../pluginFramework/HeftLifecycle';
4346
import type { HeftTask } from '../pluginFramework/HeftTask';
44-
import { CancellationToken, CancellationTokenSource } from '../pluginFramework/CancellationToken';
4547
import { deleteFilesAsync, type IDeleteOperation } from '../plugins/DeleteFilesPlugin';
4648
import { Constants } from '../utilities/Constants';
47-
import { OperationStatus } from '../operations/OperationStatus';
4849

4950
export interface IHeftActionRunnerOptions extends IHeftActionOptions {
5051
action: IHeftAction;
@@ -81,15 +82,18 @@ export async function runWithLoggingAsync(
8182
loggingManager: LoggingManager,
8283
terminal: ITerminal,
8384
metricsCollector: MetricsCollector,
84-
cancellationToken: CancellationToken
85-
): Promise<void> {
85+
abortSignal: AbortSignal,
86+
throwOnFailure?: boolean
87+
): Promise<OperationStatus> {
8688
const startTime: number = performance.now();
8789
loggingManager.resetScopedLoggerErrorsAndWarnings();
8890

91+
let result: OperationStatus = OperationStatus.Failure;
92+
8993
// Execute the action operations
9094
let encounteredError: boolean = false;
9195
try {
92-
const result: OperationStatus = await fn();
96+
result = await fn();
9397
if (result === OperationStatus.Failure) {
9498
encounteredError = true;
9599
}
@@ -100,8 +104,8 @@ export async function runWithLoggingAsync(
100104
const warningStrings: string[] = loggingManager.getWarningStrings();
101105
const errorStrings: string[] = loggingManager.getErrorStrings();
102106

103-
const wasCancelled: boolean = cancellationToken.isCancelled;
104-
const encounteredWarnings: boolean = warningStrings.length > 0 || wasCancelled;
107+
const wasAborted: boolean = abortSignal.aborted;
108+
const encounteredWarnings: boolean = warningStrings.length > 0 || wasAborted;
105109
encounteredError = encounteredError || errorStrings.length > 0;
106110

107111
await metricsCollector.recordAsync(
@@ -112,7 +116,7 @@ export async function runWithLoggingAsync(
112116
action.getParameterStringMap()
113117
);
114118

115-
const finishedLoggingWord: string = encounteredError ? 'Failed' : wasCancelled ? 'Cancelled' : 'Finished';
119+
const finishedLoggingWord: string = encounteredError ? 'Failed' : wasAborted ? 'Aborted' : 'Finished';
116120
const duration: number = performance.now() - startTime;
117121
const durationSeconds: number = Math.round(duration) / 1000;
118122
const finishedLoggingLine: string = `-------------------- ${finishedLoggingWord} (${durationSeconds}s) --------------------`;
@@ -143,9 +147,11 @@ export async function runWithLoggingAsync(
143147
}
144148
}
145149

146-
if (encounteredError) {
150+
if (encounteredError && throwOnFailure) {
147151
throw new AlreadyReportedError();
148152
}
153+
154+
return result;
149155
}
150156

151157
export class HeftActionRunner {
@@ -260,41 +266,30 @@ export class HeftActionRunner {
260266
// executed, and the session should be populated with the executing parameters.
261267
this._internalHeftSession.parameterManager = this.parameterManager;
262268

263-
// Set up the ability to terminate the build via Ctrl+C and have it exit gracefully if pressed once,
264-
// less gracefully if pressed a second time.
265-
const cliCancellationTokenSource: CancellationTokenSource = new CancellationTokenSource();
266-
const cliCancellationToken: CancellationToken = cliCancellationTokenSource.token;
267-
const cli: ReadlineInterface = createInterface(process.stdin, undefined, undefined, true);
268-
let forceTerminate: boolean = false;
269-
cli.on('SIGINT', () => {
270-
cli.close();
271-
272-
if (forceTerminate) {
273-
terminal.writeErrorLine(`Forcibly terminating.`);
274-
process.exit(1);
275-
} else {
276-
terminal.writeLine(
277-
Colors.yellow(Colors.bold(`Canceling build... Press Ctrl+C again to forcibly terminate.`))
278-
);
279-
}
280-
281-
forceTerminate = true;
282-
cliCancellationTokenSource.cancel();
283-
});
284-
285269
initializeHeft(this._heftConfiguration, terminal, this.parameterManager.defaultParameters.verbose);
286270

287271
const operations: ReadonlySet<Operation> = this._generateOperations();
288272

289273
const executionManager: OperationExecutionManager = new OperationExecutionManager(operations);
290274

275+
const cliAbortSignal: AbortSignal = this._createCliAbortSignal();
276+
291277
try {
292278
await _startLifecycleAsync(this._internalHeftSession);
293279

294280
if (this._action.watch) {
295-
await this._executeWatchAsync(executionManager, cliCancellationToken);
281+
const watchLoop: WatchLoop = this._createWatchLoop(executionManager);
282+
283+
if (process.send) {
284+
await watchLoop.runIPCAsync();
285+
} else {
286+
await watchLoop.runUntilAbortedAsync(cliAbortSignal, () => {
287+
terminal.writeLine(Colors.bold('Waiting for changes. Press CTRL + C to exit...'));
288+
terminal.writeLine('');
289+
});
290+
}
296291
} else {
297-
await this._executeOnceAsync(executionManager, cliCancellationToken);
292+
await this._executeOnceAsync(executionManager, cliAbortSignal);
298293
}
299294
} finally {
300295
// Invoke this here both to ensure it always runs and that it does so after recordMetrics
@@ -305,96 +300,67 @@ export class HeftActionRunner {
305300
}
306301
}
307302

308-
private async _executeWatchAsync(
309-
executionManager: OperationExecutionManager,
310-
cliCancellationToken: CancellationToken
311-
): Promise<void> {
312-
let runRequested: boolean = true;
313-
let isRunning: boolean = true;
314-
let cancellationTokenSource: CancellationTokenSource = new CancellationTokenSource();
315-
316-
const { _terminal: terminal } = this;
317-
318-
let resolveRequestRun!: (requestor?: string) => void;
319-
function createRequestRunPromise(): Promise<void> {
320-
return new Promise<string | undefined>(
321-
(resolve: (requestor?: string) => void, reject: (err: Error) => void) => {
322-
resolveRequestRun = resolve;
323-
}
324-
).then((requestor: string | undefined) => {
325-
terminal.writeLine(Colors.bold(`New run requested by ${requestor || 'unknown task'}`));
326-
runRequested = true;
327-
if (isRunning) {
328-
terminal.writeLine(Colors.bold(`Cancelling incremental build...`));
329-
// If there's a source file change, we need to cancel the incremental build and wait for the
330-
// execution to finish before we begin execution again.
331-
cancellationTokenSource.cancel();
332-
}
333-
});
334-
}
335-
let requestRunPromise: Promise<void> = createRequestRunPromise();
336-
337-
function cancelExecution(): void {
338-
cancellationTokenSource.cancel();
339-
}
340-
341-
function requestRun(requestor?: string): void {
342-
// The wrapper here allows operation runners to hang onto a single instance, despite the underlying
343-
// promise changing.
344-
resolveRequestRun(requestor);
345-
}
303+
private _createCliAbortSignal(): AbortSignal {
304+
// Set up the ability to terminate the build via Ctrl+C and have it exit gracefully if pressed once,
305+
// less gracefully if pressed a second time.
306+
const cliAbortController: AbortController = new AbortController();
307+
const cliAbortSignal: AbortSignal = cliAbortController.signal;
308+
const cli: ReadlineInterface = createInterface(process.stdin, undefined, undefined, true);
309+
const terminal: ITerminal = this._terminal;
310+
let forceTerminate: boolean = false;
311+
cli.on('SIGINT', () => {
312+
cli.close();
346313

347-
// eslint-disable-next-line no-constant-condition
348-
while (!cliCancellationToken.isCancelled) {
349-
if (cancellationTokenSource.isCancelled) {
350-
cancellationTokenSource = new CancellationTokenSource();
351-
cliCancellationToken.onCancelledPromise.finally(cancelExecution);
314+
if (forceTerminate) {
315+
terminal.writeErrorLine(`Forcibly terminating.`);
316+
process.exit(1);
317+
} else {
318+
terminal.writeLine(
319+
Colors.yellow(Colors.bold(`Canceling build... Press Ctrl+C again to forcibly terminate.`))
320+
);
352321
}
353322

354-
// Create the cancellation token which is passed to the incremental build.
355-
const cancellationToken: CancellationToken = cancellationTokenSource.token;
356-
357-
// Write an empty line to the terminal for separation between iterations. We've already iterated
358-
// at this point, so log out that we're about to start a new run.
359-
terminal.writeLine('');
360-
terminal.writeLine(Colors.bold('Starting incremental build...'));
361-
362-
// Start the incremental build and wait for a source file to change
363-
runRequested = false;
364-
isRunning = true;
323+
forceTerminate = true;
324+
cliAbortController.abort();
325+
});
365326

366-
try {
367-
await this._executeOnceAsync(executionManager, cancellationToken, requestRun);
368-
} catch (err) {
369-
if (!(err instanceof AlreadyReportedError)) {
370-
throw err;
371-
}
372-
} finally {
373-
isRunning = false;
374-
}
327+
return cliAbortSignal;
328+
}
375329

376-
if (!runRequested) {
377-
terminal.writeLine(Colors.bold('Waiting for changes. Press CTRL + C to exit...'));
330+
private _createWatchLoop(executionManager: OperationExecutionManager): WatchLoop {
331+
const { _terminal: terminal } = this;
332+
const watchLoop: WatchLoop = new WatchLoop({
333+
onBeforeExecute: () => {
334+
// Write an empty line to the terminal for separation between iterations. We've already iterated
335+
// at this point, so log out that we're about to start a new run.
378336
terminal.writeLine('');
379-
await Promise.race([requestRunPromise, cliCancellationToken.onCancelledPromise]);
337+
terminal.writeLine(Colors.bold('Starting incremental build...'));
338+
},
339+
executeAsync: (state: IWatchLoopState): Promise<OperationStatus> => {
340+
return this._executeOnceAsync(executionManager, state.abortSignal, state.requestRun);
341+
},
342+
onRequestRun: (requestor?: string) => {
343+
terminal.writeLine(Colors.bold(`New run requested by ${requestor || 'unknown task'}`));
344+
},
345+
onAbort: () => {
346+
terminal.writeLine(Colors.bold(`Cancelling incremental build...`));
380347
}
381-
382-
requestRunPromise = createRequestRunPromise();
383-
}
348+
});
349+
return watchLoop;
384350
}
385351

386352
private async _executeOnceAsync(
387353
executionManager: OperationExecutionManager,
388-
cancellationToken: CancellationToken,
354+
abortSignal: AbortSignal,
389355
requestRun?: (requestor?: string) => void
390-
): Promise<void> {
356+
): Promise<OperationStatus> {
391357
// Execute the action operations
392-
await runWithLoggingAsync(
358+
return await runWithLoggingAsync(
393359
() => {
394360
const operationExecutionManagerOptions: IOperationExecutionOptions = {
395361
terminal: this._terminal,
396362
parallelism: this._parallelism,
397-
cancellationToken,
363+
abortSignal,
398364
requestRun
399365
};
400366

@@ -404,7 +370,8 @@ export class HeftActionRunner {
404370
this._loggingManager,
405371
this._terminal,
406372
this._metricsCollector,
407-
cancellationToken
373+
abortSignal,
374+
!requestRun
408375
);
409376
}
410377

apps/heft/src/cli/actions/CleanAction.ts

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import {
77
type CommandLineStringListParameter
88
} from '@rushstack/ts-command-line';
99
import type { ITerminal } from '@rushstack/node-core-library';
10+
import { OperationStatus } from '@rushstack/operation-graph';
1011

1112
import type { IHeftAction, IHeftActionOptions } from './IHeftAction';
1213
import type { HeftPhase } from '../../pluginFramework/HeftPhase';
@@ -18,8 +19,6 @@ import { Constants } from '../../utilities/Constants';
1819
import { definePhaseScopingParameters, expandPhases } from './RunAction';
1920
import { deleteFilesAsync, type IDeleteOperation } from '../../plugins/DeleteFilesPlugin';
2021
import { initializeHeft, runWithLoggingAsync } from '../HeftActionRunner';
21-
import { CancellationToken } from '../../pluginFramework/CancellationToken';
22-
import { OperationStatus } from '../../operations/OperationStatus';
2322

2423
export class CleanAction extends CommandLineAction implements IHeftAction {
2524
public readonly watch: boolean = false;
@@ -79,7 +78,7 @@ export class CleanAction extends CommandLineAction implements IHeftAction {
7978

8079
protected async onExecute(): Promise<void> {
8180
const { heftConfiguration } = this._internalHeftSession;
82-
const cancellationToken: CancellationToken = new CancellationToken();
81+
const abortSignal: AbortSignal = new AbortSignal();
8382

8483
initializeHeft(heftConfiguration, this._terminal, this._verboseFlag.value);
8584
await runWithLoggingAsync(
@@ -88,7 +87,7 @@ export class CleanAction extends CommandLineAction implements IHeftAction {
8887
this._internalHeftSession.loggingManager,
8988
this._terminal,
9089
this._metricsCollector,
91-
cancellationToken
90+
abortSignal
9291
);
9392
}
9493

apps/heft/src/index.ts

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,7 @@ export type { IRigPackageResolver } from './configuration/RigPackageResolver';
1818

1919
export type { IHeftPlugin, IHeftTaskPlugin, IHeftLifecyclePlugin } from './pluginFramework/IHeftPlugin';
2020

21-
export {
22-
CancellationTokenSource,
23-
CancellationToken,
24-
type ICancellationTokenSourceOptions,
25-
type ICancellationTokenOptions as _ICancellationTokenOptions
26-
} from './pluginFramework/CancellationToken';
21+
export { CancellationToken } from './pluginFramework/CancellationToken';
2722

2823
export type { IHeftParameters, IHeftDefaultParameters } from './pluginFramework/HeftParameterManager';
2924

apps/heft/src/operations/runners/PhaseOperationRunner.ts

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,13 @@
11
// Copyright (c) Microsoft Corporation. All rights reserved. Licensed under the MIT license.
22
// See LICENSE in the project root for license information.
33

4-
import { performance } from 'perf_hooks';
4+
import {
5+
type IOperationRunner,
6+
type IOperationRunnerContext,
7+
OperationStatus
8+
} from '@rushstack/operation-graph';
59

6-
import { OperationStatus } from '../OperationStatus';
710
import { deleteFilesAsync, type IDeleteOperation } from '../../plugins/DeleteFilesPlugin';
8-
import type { IOperationRunner, IOperationRunnerContext } from '../IOperationRunner';
911
import type { HeftPhase } from '../../pluginFramework/HeftPhase';
1012
import type { HeftPhaseSession } from '../../pluginFramework/HeftPhaseSession';
1113
import type { HeftTaskSession } from '../../pluginFramework/HeftTaskSession';

0 commit comments

Comments
 (0)