Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions src/evaluator/evaluator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,16 @@ export class EvaluatorImpl implements Evaluator {
moduleReaders: ModuleReader[] = []
randState: bigint;

constructor(private evaluatorId: bigint, private manager: EvaluatorManager) {
constructor(public evaluatorId: bigint, private manager: EvaluatorManager) {
this.pendingRequests = new Map()
this.randState = evaluatorId
}

close(): void {
this.manager.close()
if (this.closed) {
return
}
return this.manager.closeEvaluator(this)
}

async evaluateExpression<T>(source: ModuleSource, expr: string): Promise<Any> {
Expand Down
18 changes: 17 additions & 1 deletion src/evaluator/evaluator_manager.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import {EvaluatorImpl, Evaluator} from "./evaluator";
import {CreateEvaluator, OutgoingMessage, packMessage} from "../types/outgoing";
import {CloseEvaluator, CreateEvaluator, OutgoingMessage, packMessage} from "../types/outgoing";
import * as msgpackr from "msgpackr";
import {
codeCloseEvaluator,
codeEvaluateLog,
codeEvaluateRead,
codeEvaluateReadModule,
Expand Down Expand Up @@ -73,6 +74,7 @@ export class EvaluatorManager implements EvaluatorManagerInterface {
}> = new Map()
private evaluators: Map<bigint, EvaluatorImpl> = new Map()
private closed: boolean = false
private exited: boolean = false
private version?: string
private cmd: ChildProcessByStdio<Writable, Readable, null>;
private readonly msgpackConfig: msgpackr.Options = {int64AsType: 'bigint', useRecords: false, encodeUndefinedAsNil: true}
Expand All @@ -89,6 +91,7 @@ export class EvaluatorManager implements EvaluatorManagerInterface {

this.decode(this.cmd.stdout).catch(console.error)
this.cmd.on('close', () => {
this.exited = true
this.pendingEvaluators.forEach(({reject}) => {
reject(new Error("pkl command exited"))
})
Expand Down Expand Up @@ -120,6 +123,9 @@ export class EvaluatorManager implements EvaluatorManagerInterface {
}

async send(out: OutgoingMessage) {
if (this.exited) {
return
}
await new Promise<void>((resolve, reject) => this.cmd.stdin.write(packMessage(this.encoder, out), (error) => {
if (error) {
reject(error)
Expand All @@ -138,6 +144,16 @@ export class EvaluatorManager implements EvaluatorManagerInterface {
return ev
}

closeEvaluator(evaluator: EvaluatorImpl) {
const closeEvaluator: CloseEvaluator = {
evaluatorId: evaluator.evaluatorId,
code: codeCloseEvaluator,
}
this.send(closeEvaluator) // best effort
this.evaluators.delete(evaluator.evaluatorId)
evaluator.closed = true
}

private async decode(stdout: Readable) {
stdout.pipe(this.streamDecoder)
for await (const item of this.streamDecoder) {
Expand Down