diff --git a/components/execd/pkg/web/controller/codeinterpreting.go b/components/execd/pkg/web/controller/codeinterpreting.go index c95d83f17..b0facef4e 100644 --- a/components/execd/pkg/web/controller/codeinterpreting.go +++ b/components/execd/pkg/web/controller/codeinterpreting.go @@ -265,7 +265,7 @@ func (c *CodeInterpretingController) CreateSession() { c.RespondSuccess(model.CreateSessionResponse{SessionID: sessionID}) } -// RunInSession runs code in an existing bash session and streams output via SSE (run_in_session API). +// RunInSession runs a command in an existing bash session and streams output via SSE (run_in_session API). func (c *CodeInterpretingController) RunInSession() { sessionID := c.ctx.Param("sessionId") if sessionID == "" { @@ -295,11 +295,11 @@ func (c *CodeInterpretingController) RunInSession() { return } - timeout := time.Duration(request.TimeoutMs) * time.Millisecond + timeout := time.Duration(request.Timeout) * time.Millisecond runReq := &runtime.ExecuteCodeRequest{ Language: runtime.Bash, Context: sessionID, - Code: request.Code, + Code: request.Command, Cwd: request.Cwd, Timeout: timeout, } diff --git a/components/execd/pkg/web/model/session.go b/components/execd/pkg/web/model/session.go index 0b4f598b7..fed7d48c8 100644 --- a/components/execd/pkg/web/model/session.go +++ b/components/execd/pkg/web/model/session.go @@ -28,11 +28,11 @@ type CreateSessionResponse struct { SessionID string `json:"session_id"` } -// RunInSessionRequest is the request body for running code in an existing session. +// RunInSessionRequest is the request body for running a command in an existing session. type RunInSessionRequest struct { - Code string `json:"code" validate:"required"` - Cwd string `json:"cwd,omitempty"` - TimeoutMs int64 `json:"timeout_ms,omitempty" validate:"omitempty,gte=0"` + Command string `json:"command" validate:"required"` + Cwd string `json:"cwd,omitempty"` + Timeout int64 `json:"timeout,omitempty" validate:"omitempty,gte=0"` } // Validate validates RunInSessionRequest. diff --git a/sdks/package.json b/sdks/package.json index 7fc3e91b7..c90d2942e 100644 --- a/sdks/package.json +++ b/sdks/package.json @@ -4,6 +4,7 @@ "packageManager": "pnpm@9.15.0", "scripts": { "build:js": "pnpm -r --filter @alibaba-group/opensandbox-code-interpreter... --sort run build", + "test:js": "pnpm -r --filter @alibaba-group/opensandbox-code-interpreter... run test", "lint:js": "pnpm -r --filter @alibaba-group/opensandbox-code-interpreter... run lint", "clean:js": "pnpm -r --filter @alibaba-group/opensandbox-code-interpreter... --sort run clean", "publish:js": "pnpm -r --filter @alibaba-group/opensandbox-code-interpreter... publish --access public --no-git-checks" diff --git a/sdks/sandbox/csharp/src/OpenSandbox/Adapters/CommandsAdapter.cs b/sdks/sandbox/csharp/src/OpenSandbox/Adapters/CommandsAdapter.cs index 93f4ec532..d6a04992c 100644 --- a/sdks/sandbox/csharp/src/OpenSandbox/Adapters/CommandsAdapter.cs +++ b/sdks/sandbox/csharp/src/OpenSandbox/Adapters/CommandsAdapter.cs @@ -59,48 +59,15 @@ public async IAsyncEnumerable RunStreamAsync( RunCommandOptions? options = null, [EnumeratorCancellation] CancellationToken cancellationToken = default) { - if (options?.Gid.HasValue == true && options.Uid.HasValue != true) - { - throw new InvalidArgumentException("uid is required when gid is provided"); - } - if (options?.Uid.HasValue == true && options.Uid.Value < 0) - { - throw new InvalidArgumentException("uid must be >= 0"); - } - if (options?.Gid.HasValue == true && options.Gid.Value < 0) - { - throw new InvalidArgumentException("gid must be >= 0"); - } - - var url = $"{_baseUrl}/command"; + ValidateRunOptions(options); _logger.LogDebug("Running command stream (commandLength={CommandLength})", command.Length); - var requestBody = new RunCommandRequest - { - Command = command, - Cwd = options?.WorkingDirectory, - Background = options?.Background, - Timeout = options?.TimeoutSeconds.HasValue == true ? options.TimeoutSeconds.Value * 1000L : null, - Uid = options?.Uid, - Gid = options?.Gid, - Envs = options?.Envs - }; - - var json = JsonSerializer.Serialize(requestBody, JsonOptions); - using var request = new HttpRequestMessage(HttpMethod.Post, url) - { - Content = new StringContent(json, Encoding.UTF8, "application/json") - }; - request.Headers.Accept.Add(new System.Net.Http.Headers.MediaTypeWithQualityHeaderValue("text/event-stream")); + var spec = new StreamingRequestSpec( + Url: $"{_baseUrl}/command", + Body: BuildRunCommandRequest(command, options), + ErrorMessage: "Run command failed"); - foreach (var header in _headers) - { - request.Headers.TryAddWithoutValidation(header.Key, header.Value); - } - - using var response = await _sseHttpClient.SendAsync(request, HttpCompletionOption.ResponseHeadersRead, cancellationToken).ConfigureAwait(false); - - await foreach (var ev in SseParser.ParseJsonEventStreamAsync(response, "Run command failed", cancellationToken).ConfigureAwait(false)) + await foreach (var ev in StreamExecutionAsync(spec, cancellationToken).ConfigureAwait(false)) { yield return ev; } @@ -113,36 +80,11 @@ public async Task RunAsync( CancellationToken cancellationToken = default) { _logger.LogDebug("Running command (commandLength={CommandLength})", command.Length); - var execution = new Execution(); - var dispatcher = new ExecutionEventDispatcher(execution, handlers); - - await foreach (var ev in RunStreamAsync(command, options, cancellationToken).ConfigureAwait(false)) - { - // Keep legacy behavior: if server sends "init" with empty id, preserve previous id - if (ev.Type == ServerStreamEventTypes.Init && string.IsNullOrEmpty(ev.Text) && !string.IsNullOrEmpty(execution.Id)) - { - ev.Text = execution.Id; - } - - await dispatcher.DispatchAsync(ev).ConfigureAwait(false); - } - - if (!(options?.Background ?? false)) - { - if (execution.Error != null) - { - if (int.TryParse(execution.Error.Value, out var exitCode)) - { - execution.ExitCode = exitCode; - } - } - else if (execution.Complete != null) - { - execution.ExitCode = 0; - } - } - - return execution; + return await ConsumeExecutionAsync( + RunStreamAsync(command, options, cancellationToken), + handlers, + inferExitCode: !(options?.Background ?? false), + cancellationToken).ConfigureAwait(false); } public async Task InterruptAsync(string sessionId, CancellationToken cancellationToken = default) @@ -156,14 +98,8 @@ public async Task CreateSessionAsync( CreateSessionOptions? options = null, CancellationToken cancellationToken = default) { - object? body = null; - if (!string.IsNullOrEmpty(options?.Cwd)) - { - body = new { cwd = options.Cwd }; - } - - _logger.LogDebug("Creating bash session (cwd={Cwd})", options?.Cwd); - var response = await _client.PostAsync("/session", body, cancellationToken).ConfigureAwait(false); + _logger.LogDebug("Creating bash session (workingDirectory={WorkingDirectory})", options?.WorkingDirectory); + var response = await _client.PostAsync("/session", BuildCreateSessionBody(options), cancellationToken).ConfigureAwait(false); if (string.IsNullOrEmpty(response?.SessionId)) { throw new SandboxApiException( @@ -177,7 +113,7 @@ public async Task CreateSessionAsync( public async IAsyncEnumerable RunInSessionStreamAsync( string sessionId, - string code, + string command, RunInSessionOptions? options = null, [EnumeratorCancellation] CancellationToken cancellationToken = default) { @@ -185,36 +121,17 @@ public async IAsyncEnumerable RunInSessionStreamAsync( { throw new InvalidArgumentException("sessionId cannot be empty"); } - if (string.IsNullOrWhiteSpace(code)) - { - throw new InvalidArgumentException("code cannot be empty"); - } - - var path = $"/session/{Uri.EscapeDataString(sessionId)}/run"; - var url = $"{_baseUrl}{path}"; - var requestBody = new RunInSessionRequest - { - Code = code, - Cwd = options?.Cwd, - TimeoutMs = options?.TimeoutMs - }; - - var json = JsonSerializer.Serialize(requestBody, JsonOptions); - using var request = new HttpRequestMessage(HttpMethod.Post, url) + if (string.IsNullOrWhiteSpace(command)) { - Content = new StringContent(json, Encoding.UTF8, "application/json") - }; - - request.Headers.Accept.Add(new System.Net.Http.Headers.MediaTypeWithQualityHeaderValue("text/event-stream")); - - foreach (var header in _headers) - { - request.Headers.TryAddWithoutValidation(header.Key, header.Value); + throw new InvalidArgumentException("command cannot be empty"); } - using var response = await _sseHttpClient.SendAsync(request, HttpCompletionOption.ResponseHeadersRead, cancellationToken).ConfigureAwait(false); + var spec = new StreamingRequestSpec( + Url: $"{_baseUrl}/session/{Uri.EscapeDataString(sessionId)}/run", + Body: BuildRunInSessionRequest(command, options), + ErrorMessage: "Run in session failed"); - await foreach (var ev in SseParser.ParseJsonEventStreamAsync(response, "Run in session failed", cancellationToken).ConfigureAwait(false)) + await foreach (var ev in StreamExecutionAsync(spec, cancellationToken).ConfigureAwait(false)) { yield return ev; } @@ -222,7 +139,7 @@ public async IAsyncEnumerable RunInSessionStreamAsync( public async Task RunInSessionAsync( string sessionId, - string code, + string command, RunInSessionOptions? options = null, ExecutionHandlers? handlers = null, CancellationToken cancellationToken = default) @@ -231,26 +148,17 @@ public async Task RunInSessionAsync( { throw new InvalidArgumentException("sessionId cannot be empty"); } - if (string.IsNullOrWhiteSpace(code)) + if (string.IsNullOrWhiteSpace(command)) { - throw new InvalidArgumentException("code cannot be empty"); - } - - _logger.LogDebug("Running in session: {SessionId} (codeLength={CodeLength})", sessionId, code.Length); - var execution = new Execution(); - var dispatcher = new ExecutionEventDispatcher(execution, handlers); - - await foreach (var ev in RunInSessionStreamAsync(sessionId, code, options, cancellationToken).ConfigureAwait(false)) - { - if (ev.Type == ServerStreamEventTypes.Init && string.IsNullOrEmpty(ev.Text) && !string.IsNullOrEmpty(execution.Id)) - { - ev.Text = execution.Id; - } - - await dispatcher.DispatchAsync(ev).ConfigureAwait(false); + throw new InvalidArgumentException("command cannot be empty"); } - return execution; + _logger.LogDebug("Running in session: {SessionId} (commandLength={CommandLength})", sessionId, command.Length); + return await ConsumeExecutionAsync( + RunInSessionStreamAsync(sessionId, command, options, cancellationToken), + handlers, + inferExitCode: true, + cancellationToken).ConfigureAwait(false); } public async Task DeleteSessionAsync(string sessionId, CancellationToken cancellationToken = default) @@ -312,6 +220,119 @@ public async Task GetBackgroundCommandLogsAsync( }; } + private async IAsyncEnumerable StreamExecutionAsync( + StreamingRequestSpec spec, + [EnumeratorCancellation] CancellationToken cancellationToken = default) + { + var json = JsonSerializer.Serialize(spec.Body, JsonOptions); + using var request = new HttpRequestMessage(HttpMethod.Post, spec.Url) + { + Content = new StringContent(json, Encoding.UTF8, "application/json") + }; + + request.Headers.Accept.Add(new System.Net.Http.Headers.MediaTypeWithQualityHeaderValue("text/event-stream")); + + foreach (var header in _headers) + { + request.Headers.TryAddWithoutValidation(header.Key, header.Value); + } + + using var response = await _sseHttpClient.SendAsync(request, HttpCompletionOption.ResponseHeadersRead, cancellationToken).ConfigureAwait(false); + + await foreach (var ev in SseParser.ParseJsonEventStreamAsync(response, spec.ErrorMessage, cancellationToken).ConfigureAwait(false)) + { + yield return ev; + } + } + + private static void ValidateRunOptions(RunCommandOptions? options) + { + if (options?.Gid.HasValue == true && options.Uid.HasValue != true) + { + throw new InvalidArgumentException("uid is required when gid is provided"); + } + if (options?.Uid.HasValue == true && options.Uid.Value < 0) + { + throw new InvalidArgumentException("uid must be >= 0"); + } + if (options?.Gid.HasValue == true && options.Gid.Value < 0) + { + throw new InvalidArgumentException("gid must be >= 0"); + } + } + + private static object? BuildCreateSessionBody(CreateSessionOptions? options) + { + return !string.IsNullOrEmpty(options?.WorkingDirectory) ? new { cwd = options.WorkingDirectory } : null; + } + + private static RunCommandRequest BuildRunCommandRequest(string command, RunCommandOptions? options) + { + return new RunCommandRequest + { + Command = command, + Cwd = options?.WorkingDirectory, + Background = options?.Background, + Timeout = options?.TimeoutSeconds.HasValue == true ? options.TimeoutSeconds.Value * 1000L : null, + Uid = options?.Uid, + Gid = options?.Gid, + Envs = options?.Envs + }; + } + + private static RunInSessionRequest BuildRunInSessionRequest(string command, RunInSessionOptions? options) + { + return new RunInSessionRequest + { + Command = command, + Cwd = options?.WorkingDirectory, + Timeout = options?.Timeout + }; + } + + private static int? InferForegroundExitCode(Execution execution) + { + if (execution.Error != null) + { + return int.TryParse(execution.Error.Value, out var exitCode) ? exitCode : null; + } + + return execution.Complete != null ? 0 : null; + } + + private static void PreserveLegacyInitId(ServerStreamEvent ev, Execution execution) + { + if (ev.Type == ServerStreamEventTypes.Init && string.IsNullOrEmpty(ev.Text) && !string.IsNullOrEmpty(execution.Id)) + { + ev.Text = execution.Id; + } + } + + private async Task ConsumeExecutionAsync( + IAsyncEnumerable stream, + ExecutionHandlers? handlers, + bool inferExitCode, + CancellationToken cancellationToken) + { + var execution = new Execution(); + var dispatcher = new ExecutionEventDispatcher(execution, handlers); + + await foreach (var ev in stream.WithCancellation(cancellationToken).ConfigureAwait(false)) + { + PreserveLegacyInitId(ev, execution); + await dispatcher.DispatchAsync(ev).ConfigureAwait(false); + } + + if (inferExitCode) + { + execution.ExitCode = InferForegroundExitCode(execution); + } + + return execution; + } + + private sealed record StreamingRequestSpec(string Url, object Body, string ErrorMessage); + private static SandboxApiException CreateApiException(HttpResponseMessage response, string content) { var requestId = response.Headers.TryGetValues(Constants.RequestIdHeader, out var values) diff --git a/sdks/sandbox/csharp/src/OpenSandbox/Models/Execd.cs b/sdks/sandbox/csharp/src/OpenSandbox/Models/Execd.cs index 8ae0d2852..a139ab4bd 100644 --- a/sdks/sandbox/csharp/src/OpenSandbox/Models/Execd.cs +++ b/sdks/sandbox/csharp/src/OpenSandbox/Models/Execd.cs @@ -377,7 +377,7 @@ public class CreateSessionOptions /// /// Gets or sets the optional working directory for the session. /// - public string? Cwd { get; set; } + public string? WorkingDirectory { get; set; } } /// @@ -400,12 +400,12 @@ public class RunInSessionOptions /// /// Gets or sets the optional working directory override for this run. /// - public string? Cwd { get; set; } + public string? WorkingDirectory { get; set; } /// /// Gets or sets the maximum execution time in milliseconds. /// - public long? TimeoutMs { get; set; } + public long? Timeout { get; set; } } /// @@ -413,12 +413,12 @@ public class RunInSessionOptions /// internal class RunInSessionRequest { - [JsonPropertyName("code")] - public required string Code { get; set; } + [JsonPropertyName("command")] + public required string Command { get; set; } [JsonPropertyName("cwd")] public string? Cwd { get; set; } - [JsonPropertyName("timeout_ms")] - public long? TimeoutMs { get; set; } + [JsonPropertyName("timeout")] + public long? Timeout { get; set; } } diff --git a/sdks/sandbox/csharp/src/OpenSandbox/Services/IExecdCommands.cs b/sdks/sandbox/csharp/src/OpenSandbox/Services/IExecdCommands.cs index 0331b1088..45f4fa298 100644 --- a/sdks/sandbox/csharp/src/OpenSandbox/Services/IExecdCommands.cs +++ b/sdks/sandbox/csharp/src/OpenSandbox/Services/IExecdCommands.cs @@ -92,7 +92,7 @@ Task GetBackgroundCommandLogsAsync( /// /// Creates a new bash session with optional working directory. - /// The session maintains shell state (cwd, environment) across multiple calls. + /// The session maintains shell state (working directory, environment) across multiple calls. /// /// Optional options (e.g. initial working directory). /// Cancellation token. @@ -103,19 +103,19 @@ Task CreateSessionAsync( CancellationToken cancellationToken = default); /// - /// Runs shell code in an existing bash session and returns the execution result (SSE consumed internally). + /// Runs a shell command in an existing bash session and returns the execution result (SSE consumed internally). /// /// Session ID from . - /// Shell code to execute. - /// Optional cwd and timeout for this run. + /// Shell command to execute. + /// Optional working directory and timeout for this run. /// Optional event handlers for real-time processing. /// Cancellation token. /// The execution result with stdout/stderr and completion status. - /// Thrown when or is null or empty. + /// Thrown when or is null or empty. /// Thrown when the execd service request fails. Task RunInSessionAsync( string sessionId, - string code, + string command, RunInSessionOptions? options = null, ExecutionHandlers? handlers = null, CancellationToken cancellationToken = default); diff --git a/sdks/sandbox/csharp/tests/OpenSandbox.Tests/CommandsAdapterTests.cs b/sdks/sandbox/csharp/tests/OpenSandbox.Tests/CommandsAdapterTests.cs index 12911c793..5987faa18 100644 --- a/sdks/sandbox/csharp/tests/OpenSandbox.Tests/CommandsAdapterTests.cs +++ b/sdks/sandbox/csharp/tests/OpenSandbox.Tests/CommandsAdapterTests.cs @@ -269,7 +269,7 @@ public async Task RunAsync_ShouldKeepExitCodeNullWhenErrorValueIsEmpty() // --- Bash session API integration tests --- [Fact] - public async Task CreateSessionAsync_ShouldReturnSessionId_WhenCwdProvided() + public async Task CreateSessionAsync_ShouldReturnSessionId_WhenWorkingDirectoryProvided() { var handler = new StubHttpMessageHandler(async (request, _) => { @@ -287,7 +287,7 @@ public async Task CreateSessionAsync_ShouldReturnSessionId_WhenCwdProvided() }); var adapter = CreateAdapter(handler); - var sessionId = await adapter.CreateSessionAsync(new CreateSessionOptions { Cwd = "/tmp" }); + var sessionId = await adapter.CreateSessionAsync(new CreateSessionOptions { WorkingDirectory = "/tmp" }); sessionId.Should().Be("sess-abc123"); handler.RequestUris.Should().Contain(uri => uri.EndsWith("/session")); @@ -332,7 +332,7 @@ await act.Should().ThrowAsync() } [Fact] - public async Task RunInSessionAsync_ShouldSendCodeAndOptions() + public async Task RunInSessionAsync_ShouldSendCommandAndOptions() { var handler = new StubHttpMessageHandler(async (request, _) => { @@ -341,9 +341,9 @@ public async Task RunInSessionAsync_ShouldSendCodeAndOptions() request.Content.Should().NotBeNull(); var body = await request.Content!.ReadAsStringAsync().ConfigureAwait(false); using var doc = JsonDocument.Parse(body); - doc.RootElement.GetProperty("code").GetString().Should().Be("pwd"); + doc.RootElement.GetProperty("command").GetString().Should().Be("pwd"); doc.RootElement.GetProperty("cwd").GetString().Should().Be("/var"); - doc.RootElement.GetProperty("timeout_ms").GetInt64().Should().Be(5000); + doc.RootElement.GetProperty("timeout").GetInt64().Should().Be(5000); var sse = "data: {\"type\":\"stdout\",\"text\":\"/var\"}\ndata: {\"type\":\"execution_complete\"}\n"; return new HttpResponseMessage(HttpStatusCode.OK) @@ -356,13 +356,41 @@ public async Task RunInSessionAsync_ShouldSendCodeAndOptions() var run = await adapter.RunInSessionAsync( "sess-1", "pwd", - new RunInSessionOptions { Cwd = "/var", TimeoutMs = 5000 }); + new RunInSessionOptions { WorkingDirectory = "/var", Timeout = 5000 }); run.Should().NotBeNull(); run.Logs.Stdout.Should().ContainSingle(m => m.Text == "/var"); + run.ExitCode.Should().Be(0); handler.RequestUris.Should().Contain(uri => uri.Contains("/session/sess-1/run")); } + [Fact] + public async Task RunInSessionAsync_ShouldInferNonZeroExitCodeFromFinalErrorState() + { + var handler = new StubHttpMessageHandler((_, _) => + { + const string sse = """ +data: {"type":"init","text":"sess-cmd-2","timestamp":1} + +data: {"type":"error","error":{"ename":"CommandExecError","evalue":"7","traceback":["exit status 7"]},"timestamp":2} + +"""; + return Task.FromResult(new HttpResponseMessage(HttpStatusCode.OK) + { + Content = new StringContent(sse, Encoding.UTF8, "text/event-stream") + }); + }); + var adapter = CreateAdapter(handler); + + var execution = await adapter.RunInSessionAsync("sess-2", "exit 7"); + + execution.Id.Should().Be("sess-cmd-2"); + execution.Error.Should().NotBeNull(); + execution.Error!.Value.Should().Be("7"); + execution.Complete.Should().BeNull(); + execution.ExitCode.Should().Be(7); + } + [Fact] public async Task RunInSessionAsync_ShouldThrow_WhenSessionIdEmpty() { @@ -375,14 +403,14 @@ await act.Should().ThrowAsync() } [Fact] - public async Task RunInSessionAsync_ShouldThrow_WhenCodeEmpty() + public async Task RunInSessionAsync_ShouldThrow_WhenCommandEmpty() { var adapter = CreateAdapter(new StubHttpMessageHandler((_, _) => throw new InvalidOperationException("Should not be called"))); var act = () => adapter.RunInSessionAsync("sess-1", " "); await act.Should().ThrowAsync() - .WithMessage("*code*"); + .WithMessage("*command*"); } [Fact] diff --git a/sdks/sandbox/csharp/tests/OpenSandbox.Tests/ModelsTests.cs b/sdks/sandbox/csharp/tests/OpenSandbox.Tests/ModelsTests.cs index 98343914a..4dec22d56 100644 --- a/sdks/sandbox/csharp/tests/OpenSandbox.Tests/ModelsTests.cs +++ b/sdks/sandbox/csharp/tests/OpenSandbox.Tests/ModelsTests.cs @@ -326,6 +326,30 @@ public void RunCommandOptions_ShouldStoreProperties() options.Envs.Should().ContainKey("APP_ENV"); } + [Fact] + public void CreateSessionOptions_ShouldStoreWorkingDirectory() + { + var options = new CreateSessionOptions + { + WorkingDirectory = "/workspace" + }; + + options.WorkingDirectory.Should().Be("/workspace"); + } + + [Fact] + public void RunInSessionOptions_ShouldStoreProperties() + { + var options = new RunInSessionOptions + { + WorkingDirectory = "/workspace", + Timeout = 5000 + }; + + options.WorkingDirectory.Should().Be("/workspace"); + options.Timeout.Should().Be(5000); + } + [Fact] public void ServerStreamEvent_ShouldStoreProperties() { diff --git a/sdks/sandbox/javascript/src/adapters/commandsAdapter.ts b/sdks/sandbox/javascript/src/adapters/commandsAdapter.ts index 9c8c02cd5..e6a2dcb99 100644 --- a/sdks/sandbox/javascript/src/adapters/commandsAdapter.ts +++ b/sdks/sandbox/javascript/src/adapters/commandsAdapter.ts @@ -40,8 +40,18 @@ type ApiCommandStatusOk = ExecdPaths["/command/status/{id}"]["get"]["responses"][200]["content"]["application/json"]; type ApiCommandLogsOk = ExecdPaths["/command/{id}/logs"]["get"]["responses"][200]["content"]["text/plain"]; +type ApiCreateSessionRequest = + NonNullable["content"]["application/json"]; type ApiCreateSessionOk = ExecdPaths["/session"]["post"]["responses"][200]["content"]["application/json"]; +type ApiRunInSessionRequest = + ExecdPaths["/session/{sessionId}/run"]["post"]["requestBody"]["content"]["application/json"]; + +interface StreamingExecutionSpec { + pathname: string; + body: TBody; + fallbackErrorMessage: string; +} function toRunCommandRequest(command: string, opts?: RunCommandOpts): ApiRunCommandRequest { if (opts?.gid != null && opts.uid == null) { @@ -68,6 +78,39 @@ function toRunCommandRequest(command: string, opts?: RunCommandOpts): ApiRunComm return body; } +function toRunInSessionRequest( + command: string, + opts?: { workingDirectory?: string; timeout?: number }, +): ApiRunInSessionRequest { + const body: ApiRunInSessionRequest = { + command, + }; + if (opts?.workingDirectory != null) { + body.cwd = opts.workingDirectory; + } + if (opts?.timeout != null) { + body.timeout = opts.timeout; + } + return body; +} + +function inferForegroundExitCode(execution: CommandExecution): number | null { + const errorValue = execution.error?.value?.trim(); + const parsedExitCode = + errorValue && /^-?\d+$/.test(errorValue) ? Number(errorValue) : Number.NaN; + return execution.error != null + ? (Number.isFinite(parsedExitCode) ? parsedExitCode : null) + : execution.complete + ? 0 + : null; +} + +function assertNonBlank(value: string, field: string): void { + if (!value.trim()) { + throw new Error(`${field} cannot be empty`); + } +} + function parseOptionalDate(value: unknown, field: string): Date | undefined { if (value == null) return undefined; if (value instanceof Date) return value; @@ -100,6 +143,79 @@ export class CommandsAdapter implements ExecdCommands { this.fetch = opts.fetch ?? fetch; } + private buildRunStreamSpec( + command: string, + opts?: RunCommandOpts, + ): StreamingExecutionSpec { + assertNonBlank(command, "command"); + return { + pathname: "/command", + body: toRunCommandRequest(command, opts), + fallbackErrorMessage: "Run command failed", + }; + } + + private buildRunInSessionStreamSpec( + sessionId: string, + command: string, + opts?: { workingDirectory?: string; timeout?: number }, + ): StreamingExecutionSpec { + assertNonBlank(sessionId, "sessionId"); + assertNonBlank(command, "command"); + return { + pathname: `/session/${encodeURIComponent(sessionId)}/run`, + body: toRunInSessionRequest(command, opts), + fallbackErrorMessage: "Run in session failed", + }; + } + + private async *streamExecution( + spec: StreamingExecutionSpec, + signal?: AbortSignal, + ): AsyncIterable { + const url = joinUrl(this.opts.baseUrl, spec.pathname); + const res = await this.fetch(url, { + method: "POST", + headers: { + accept: "text/event-stream", + "content-type": "application/json", + ...(this.opts.headers ?? {}), + }, + body: JSON.stringify(spec.body), + signal, + }); + + for await (const ev of parseJsonEventStream(res, { + fallbackErrorMessage: spec.fallbackErrorMessage, + })) { + yield ev; + } + } + + private async consumeExecutionStream( + stream: AsyncIterable, + handlers?: ExecutionHandlers, + inferExitCode = false, + ): Promise { + const execution: CommandExecution = { + logs: { stdout: [], stderr: [] }, + result: [], + }; + const dispatcher = new ExecutionEventDispatcher(execution, handlers); + for await (const ev of stream) { + if (ev.type === "init" && (ev.text ?? "") === "" && execution.id) { + (ev as { text?: string }).text = execution.id; + } + await dispatcher.dispatch(ev as any); + } + + if (inferExitCode) { + execution.exitCode = inferForegroundExitCode(execution); + } + + return execution; + } + async interrupt(sessionId: string): Promise { const { error, response } = await this.client.DELETE("/command", { params: { query: { id: sessionId } }, @@ -150,21 +266,10 @@ export class CommandsAdapter implements ExecdCommands { opts?: RunCommandOpts, signal?: AbortSignal, ): AsyncIterable { - const url = joinUrl(this.opts.baseUrl, "/command"); - const body = JSON.stringify(toRunCommandRequest(command, opts)); - - const res = await this.fetch(url, { - method: "POST", - headers: { - "accept": "text/event-stream", - "content-type": "application/json", - ...(this.opts.headers ?? {}), - }, - body, + for await (const ev of this.streamExecution( + this.buildRunStreamSpec(command, opts), signal, - }); - - for await (const ev of parseJsonEventStream(res, { fallbackErrorMessage: "Run command failed" })) { + )) { yield ev; } } @@ -175,36 +280,16 @@ export class CommandsAdapter implements ExecdCommands { handlers?: ExecutionHandlers, signal?: AbortSignal, ): Promise { - const execution: CommandExecution = { - logs: { stdout: [], stderr: [] }, - result: [], - }; - const dispatcher = new ExecutionEventDispatcher(execution, handlers); - for await (const ev of this.runStream(command, opts, signal)) { - // Keep legacy behavior: if server sends "init" with empty id, preserve previous id. - if (ev.type === "init" && (ev.text ?? "") === "" && execution.id) { - (ev as any).text = execution.id; - } - await dispatcher.dispatch(ev as any); - } - - if (!opts?.background) { - const errorValue = execution.error?.value?.trim(); - const parsedExitCode = - errorValue && /^-?\d+$/.test(errorValue) ? Number(errorValue) : Number.NaN; - execution.exitCode = - execution.error != null - ? (Number.isFinite(parsedExitCode) ? parsedExitCode : null) - : execution.complete - ? 0 - : null; - } - - return execution; + return this.consumeExecutionStream( + this.runStream(command, opts, signal), + handlers, + !opts?.background, + ); } - async createSession(options?: { cwd?: string }): Promise { - const body = options?.cwd != null ? { cwd: options.cwd } : {}; + async createSession(options?: { workingDirectory?: string }): Promise { + const body: ApiCreateSessionRequest = + options?.workingDirectory != null ? { cwd: options.workingDirectory } : {}; const { data, error, response } = await this.client.POST("/session", { body, }); @@ -218,62 +303,30 @@ export class CommandsAdapter implements ExecdCommands { async *runInSessionStream( sessionId: string, - code: string, - opts?: { cwd?: string; timeoutMs?: number }, + command: string, + opts?: { workingDirectory?: string; timeout?: number }, signal?: AbortSignal, ): AsyncIterable { - const url = joinUrl( - this.opts.baseUrl, - `/session/${encodeURIComponent(sessionId)}/run`, - ); - const body: { code: string; cwd?: string; timeout_ms?: number } = { - code, - }; - if (opts?.cwd != null) body.cwd = opts.cwd; - if (opts?.timeoutMs != null) body.timeout_ms = opts.timeoutMs; - - const res = await this.fetch(url, { - method: "POST", - headers: { - accept: "text/event-stream", - "content-type": "application/json", - ...(this.opts.headers ?? {}), - }, - body: JSON.stringify(body), + for await (const ev of this.streamExecution( + this.buildRunInSessionStreamSpec(sessionId, command, opts), signal, - }); - - for await (const ev of parseJsonEventStream(res, { - fallbackErrorMessage: "Run in session failed", - })) { + )) { yield ev; } } async runInSession( sessionId: string, - code: string, - options?: { cwd?: string; timeoutMs?: number }, + command: string, + options?: { workingDirectory?: string; timeout?: number }, handlers?: ExecutionHandlers, signal?: AbortSignal, ): Promise { - const execution: CommandExecution = { - logs: { stdout: [], stderr: [] }, - result: [], - }; - const dispatcher = new ExecutionEventDispatcher(execution, handlers); - for await (const ev of this.runInSessionStream( - sessionId, - code, - options, - signal, - )) { - if (ev.type === "init" && (ev.text ?? "") === "" && execution.id) { - (ev as any).text = execution.id; - } - await dispatcher.dispatch(ev as any); - } - return execution; + return this.consumeExecutionStream( + this.runInSessionStream(sessionId, command, options, signal), + handlers, + true, + ); } async deleteSession(sessionId: string): Promise { diff --git a/sdks/sandbox/javascript/src/api/execd.ts b/sdks/sandbox/javascript/src/api/execd.ts index 03d361242..03fb65517 100644 --- a/sdks/sandbox/javascript/src/api/execd.ts +++ b/sdks/sandbox/javascript/src/api/execd.ts @@ -182,8 +182,8 @@ export interface paths { get?: never; put?: never; /** - * Run code in bash session (run_in_session) - * @description Executes code in an existing bash session and streams the output in real-time via SSE + * Run command in bash session (run_in_session) + * @description Executes a shell command in an existing bash session and streams the output in real-time via SSE * (Server-Sent Events). The session must have been created by create_session. Supports * optional working directory override and timeout (milliseconds). */ @@ -560,13 +560,13 @@ export interface components { */ session_id: string; }; - /** @description Request to run code in an existing bash session */ + /** @description Request to run a command in an existing bash session */ RunInSessionRequest: { /** - * @description Shell code to execute in the session + * @description Shell command to execute in the session * @example echo "Hello" */ - code: string; + command: string; /** * @description Working directory override for this run (optional) * @example /workspace @@ -577,7 +577,7 @@ export interface components { * @description Maximum execution time in milliseconds (optional; server may not enforce if omitted) * @example 30000 */ - timeout_ms?: number; + timeout?: number; }; /** @description Request to create a code execution context */ CodeContextRequest: { diff --git a/sdks/sandbox/javascript/src/services/execdCommands.ts b/sdks/sandbox/javascript/src/services/execdCommands.ts index b6412e6d5..667abcc5c 100644 --- a/sdks/sandbox/javascript/src/services/execdCommands.ts +++ b/sdks/sandbox/javascript/src/services/execdCommands.ts @@ -53,16 +53,16 @@ export interface ExecdCommands { * Create a bash session with optional working directory. * Returns session ID for use with runInSession and deleteSession. */ - createSession(options?: { cwd?: string }): Promise; + createSession(options?: { workingDirectory?: string }): Promise; /** - * Run shell code in an existing bash session (SSE stream, same event shape as run). - * Optional cwd and timeoutMs apply to this run only; session state (e.g. env) persists. + * Run a shell command in an existing bash session (SSE stream, same event shape as run). + * Optional workingDirectory and timeout apply to this run only; session state (e.g. env) persists. */ runInSession( sessionId: string, - code: string, - options?: { cwd?: string; timeoutMs?: number }, + command: string, + options?: { workingDirectory?: string; timeout?: number }, handlers?: ExecutionHandlers, signal?: AbortSignal, ): Promise; @@ -71,4 +71,4 @@ export interface ExecdCommands { * Delete a bash session by ID. Frees resources; session ID must have been returned by createSession. */ deleteSession(sessionId: string): Promise; -} \ No newline at end of file +} diff --git a/sdks/sandbox/javascript/tests/commands.run.test.mjs b/sdks/sandbox/javascript/tests/commands.run.test.mjs index 469ab9b95..1a0245cc5 100644 --- a/sdks/sandbox/javascript/tests/commands.run.test.mjs +++ b/sdks/sandbox/javascript/tests/commands.run.test.mjs @@ -72,3 +72,60 @@ test("CommandsAdapter.run keeps exitCode null when error value is empty", async assert.equal(execution.complete?.executionTimeMs, 4); assert.equal(execution.exitCode, null); }); + +test("CommandsAdapter.runInSession sends command and timeout fields", async () => { + let requestBody; + const fetchImpl = async (url, init) => { + requestBody = JSON.parse(init.body); + assert.equal(url, "http://127.0.0.1:8080/session/sess-1/run"); + return new Response( + [ + 'data: {"type":"stdout","text":"ok","timestamp":1}', + 'data: {"type":"execution_complete","timestamp":2,"execution_time":3}', + "", + ].join("\n"), + { + status: 200, + headers: { "content-type": "text/event-stream" }, + }, + ); + }; + + const adapter = new CommandsAdapter( + {}, + { + baseUrl: "http://127.0.0.1:8080", + fetch: fetchImpl, + }, + ); + + const execution = await adapter.runInSession("sess-1", "pwd", { + workingDirectory: "/var", + timeout: 5000, + }); + + assert.deepEqual(requestBody, { + command: "pwd", + cwd: "/var", + timeout: 5000, + }); + assert.equal(execution.logs.stdout[0].text, "ok"); + assert.equal(execution.exitCode, 0); +}); + +test("CommandsAdapter.runInSession infers non-zero exitCode from final error state", async () => { + const adapter = createAdapter( + [ + 'data: {"type":"init","text":"sess-cmd-2","timestamp":1}', + 'data: {"type":"error","error":{"ename":"CommandExecError","evalue":"7","traceback":["exit status 7"]},"timestamp":2}', + "", + ].join("\n"), + ); + + const execution = await adapter.runInSession("sess-2", "exit 7"); + + assert.equal(execution.id, "sess-cmd-2"); + assert.equal(execution.error?.value, "7"); + assert.equal(execution.complete, undefined); + assert.equal(execution.exitCode, 7); +}); diff --git a/sdks/sandbox/kotlin/sandbox/src/main/kotlin/com/alibaba/opensandbox/sandbox/domain/models/execd/executions/RunInSessionRequest.kt b/sdks/sandbox/kotlin/sandbox/src/main/kotlin/com/alibaba/opensandbox/sandbox/domain/models/execd/executions/RunInSessionRequest.kt index 4c276cc6c..15b270bec 100644 --- a/sdks/sandbox/kotlin/sandbox/src/main/kotlin/com/alibaba/opensandbox/sandbox/domain/models/execd/executions/RunInSessionRequest.kt +++ b/sdks/sandbox/kotlin/sandbox/src/main/kotlin/com/alibaba/opensandbox/sandbox/domain/models/execd/executions/RunInSessionRequest.kt @@ -17,17 +17,17 @@ package com.alibaba.opensandbox.sandbox.domain.models.execd.executions /** - * Request to run code in an existing bash session. + * Request to run a command in an existing bash session. * - * @property code Shell code to execute - * @property cwd Optional working directory override for this run - * @property timeoutMs Optional max execution time in milliseconds + * @property command Shell command to execute + * @property workingDirectory Optional working directory override for this run + * @property timeout Optional max execution time in milliseconds * @property handlers Optional execution handlers for streaming events */ class RunInSessionRequest private constructor( - val code: String, - val cwd: String?, - val timeoutMs: Long?, + val command: String, + val workingDirectory: String?, + val timeout: Long?, val handlers: ExecutionHandlers?, ) { companion object { @@ -36,24 +36,24 @@ class RunInSessionRequest private constructor( } class Builder { - private var code: String? = null - private var cwd: String? = null - private var timeoutMs: Long? = null + private var command: String? = null + private var workingDirectory: String? = null + private var timeout: Long? = null private var handlers: ExecutionHandlers? = null - fun code(code: String): Builder { - require(code.isNotBlank()) { "Code cannot be blank" } - this.code = code + fun command(command: String): Builder { + require(command.isNotBlank()) { "Command cannot be blank" } + this.command = command return this } - fun cwd(cwd: String?): Builder { - this.cwd = cwd + fun workingDirectory(workingDirectory: String?): Builder { + this.workingDirectory = workingDirectory return this } - fun timeoutMs(timeoutMs: Long?): Builder { - this.timeoutMs = timeoutMs + fun timeout(timeout: Long?): Builder { + this.timeout = timeout return this } @@ -63,11 +63,11 @@ class RunInSessionRequest private constructor( } fun build(): RunInSessionRequest { - val codeValue = code ?: throw IllegalArgumentException("Code must be specified") + val commandValue = command ?: throw IllegalArgumentException("Command must be specified") return RunInSessionRequest( - code = codeValue, - cwd = cwd, - timeoutMs = timeoutMs, + command = commandValue, + workingDirectory = workingDirectory, + timeout = timeout, handlers = handlers, ) } diff --git a/sdks/sandbox/kotlin/sandbox/src/main/kotlin/com/alibaba/opensandbox/sandbox/domain/services/BashSession.kt b/sdks/sandbox/kotlin/sandbox/src/main/kotlin/com/alibaba/opensandbox/sandbox/domain/services/BashSession.kt deleted file mode 100644 index 3ce9717ba..000000000 --- a/sdks/sandbox/kotlin/sandbox/src/main/kotlin/com/alibaba/opensandbox/sandbox/domain/services/BashSession.kt +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Copyright 2025 Alibaba Group Holding Ltd. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.alibaba.opensandbox.sandbox.domain.services - -import com.alibaba.opensandbox.sandbox.domain.models.execd.executions.Execution -import com.alibaba.opensandbox.sandbox.domain.models.execd.executions.RunInSessionRequest - -/** - * Bash session service for sandbox environments. - * - * Provides create_session, run_in_session, and delete_session to manage - * stateful bash sessions with persistent shell state across multiple runs. - * - * This interface is internal. It is not the recommended runtime entry point. - * Use [Commands.createSession], [Commands.runInSession], and [Commands.deleteSession] - * via `sandbox.commands()` instead. - */ -internal interface BashSession { - /** - * Creates a new bash session. - * - * @param cwd Optional working directory for the session - * @return Session ID for use with runInSession and deleteSession - */ - fun createSession(cwd: String? = null): String - - /** - * Runs shell code in an existing bash session and streams output via SSE. - * - * @param sessionId Session ID from createSession - * @param request Code to execute and optional cwd/timeout/handlers - * @return Execution result with stdout/stderr and completion status - */ - fun runInSession( - sessionId: String, - request: RunInSessionRequest, - ): Execution - - /** - * Deletes a bash session and releases resources. - * - * @param sessionId Session ID to delete - */ - fun deleteSession(sessionId: String) -} diff --git a/sdks/sandbox/kotlin/sandbox/src/main/kotlin/com/alibaba/opensandbox/sandbox/domain/services/Commands.kt b/sdks/sandbox/kotlin/sandbox/src/main/kotlin/com/alibaba/opensandbox/sandbox/domain/services/Commands.kt index 9ebd8589b..ca695f35a 100644 --- a/sdks/sandbox/kotlin/sandbox/src/main/kotlin/com/alibaba/opensandbox/sandbox/domain/services/Commands.kt +++ b/sdks/sandbox/kotlin/sandbox/src/main/kotlin/com/alibaba/opensandbox/sandbox/domain/services/Commands.kt @@ -85,19 +85,19 @@ interface Commands { /** * Creates a new bash session with optional working directory. * - * The session maintains shell state (e.g. cwd, environment) across multiple + * The session maintains shell state (e.g. working directory, environment) across multiple * [runInSession] calls. Use [deleteSession] when done to release resources. * - * @param cwd Optional working directory for the session + * @param workingDirectory Optional working directory for the session * @return Session ID for use with [runInSession] and [deleteSession] */ - fun createSession(cwd: String? = null): String + fun createSession(workingDirectory: String? = null): String /** - * Runs shell code in an existing bash session and streams output via SSE. + * Runs a shell command in an existing bash session and streams output via SSE. * * @param sessionId Session ID from [createSession] - * @param request Code to execute and optional cwd/timeout/handlers + * @param request Code to execute and optional workingDirectory/timeout/handlers * @return Execution result with stdout/stderr and completion status */ fun runInSession( @@ -106,20 +106,20 @@ interface Commands { ): Execution /** - * Convenience overload for running code in a session with minimal options. + * Convenience overload for running a command in a session with minimal options. */ fun runInSession( sessionId: String, - code: String, - cwd: String? = null, - timeoutMs: Long? = null, + command: String, + workingDirectory: String? = null, + timeout: Long? = null, ): Execution { return runInSession( sessionId, RunInSessionRequest.builder() - .code(code) - .cwd(cwd) - .timeoutMs(timeoutMs) + .command(command) + .workingDirectory(workingDirectory) + .timeout(timeout) .build(), ) } diff --git a/sdks/sandbox/kotlin/sandbox/src/main/kotlin/com/alibaba/opensandbox/sandbox/infrastructure/adapters/service/CommandsAdapter.kt b/sdks/sandbox/kotlin/sandbox/src/main/kotlin/com/alibaba/opensandbox/sandbox/infrastructure/adapters/service/CommandsAdapter.kt index d5484e8b5..139766cff 100644 --- a/sdks/sandbox/kotlin/sandbox/src/main/kotlin/com/alibaba/opensandbox/sandbox/infrastructure/adapters/service/CommandsAdapter.kt +++ b/sdks/sandbox/kotlin/sandbox/src/main/kotlin/com/alibaba/opensandbox/sandbox/infrastructure/adapters/service/CommandsAdapter.kt @@ -32,6 +32,7 @@ import com.alibaba.opensandbox.sandbox.domain.exceptions.SandboxError.Companion. import com.alibaba.opensandbox.sandbox.domain.models.execd.executions.CommandLogs import com.alibaba.opensandbox.sandbox.domain.models.execd.executions.CommandStatus import com.alibaba.opensandbox.sandbox.domain.models.execd.executions.Execution +import com.alibaba.opensandbox.sandbox.domain.models.execd.executions.ExecutionHandlers import com.alibaba.opensandbox.sandbox.domain.models.execd.executions.RunCommandRequest import com.alibaba.opensandbox.sandbox.domain.models.execd.executions.RunInSessionRequest import com.alibaba.opensandbox.sandbox.domain.models.sandboxes.SandboxEndpoint @@ -43,40 +44,44 @@ import com.alibaba.opensandbox.sandbox.infrastructure.adapters.converter.jsonPar import com.alibaba.opensandbox.sandbox.infrastructure.adapters.converter.parseSandboxError import com.alibaba.opensandbox.sandbox.infrastructure.adapters.converter.toSandboxException import okhttp3.Headers.Companion.toHeaders +import okhttp3.HttpUrl.Companion.toHttpUrlOrNull import okhttp3.MediaType.Companion.toMediaType import okhttp3.Request import okhttp3.RequestBody.Companion.toRequestBody +import okhttp3.Response import org.slf4j.LoggerFactory +import com.alibaba.opensandbox.sandbox.api.models.execd.CreateSessionRequest as CreateSessionRequestApi +import com.alibaba.opensandbox.sandbox.api.models.execd.RunInSessionRequest as RunInSessionRequestApi /** - * Implementation of [Commands] that adapts OpenAPI-generated [CommandApi] and - * delegates bash session operations to [SessionAdapter]. - * - * This adapter handles command execution within sandboxes, providing both - * synchronous and streaming execution modes with proper session management. + * Implementation of [Commands] that adapts OpenAPI-generated APIs and handles + * streaming command execution for sandboxes. */ internal class CommandsAdapter( private val httpClientProvider: HttpClientProvider, private val execdEndpoint: SandboxEndpoint, - private val sessionAdapter: SessionAdapter, ) : Commands { companion object { private const val RUN_COMMAND_PATH = "/command" + private const val SESSION_PATH_SEGMENT = "session" } private val logger = LoggerFactory.getLogger(CommandsAdapter::class.java) - private val api = - CommandApi( - "${httpClientProvider.config.protocol}://${execdEndpoint.endpoint}", - httpClientProvider.httpClient.newBuilder() - .addInterceptor { chain -> - val requestBuilder = chain.request().newBuilder() - execdEndpoint.headers.forEach { (key, value) -> - requestBuilder.header(key, value) - } - chain.proceed(requestBuilder.build()) + private val execdBaseUrl = "${httpClientProvider.config.protocol}://${execdEndpoint.endpoint}" + private val execdApiClient = + httpClientProvider.httpClient.newBuilder() + .addInterceptor { chain -> + val requestBuilder = chain.request().newBuilder() + execdEndpoint.headers.forEach { (key, value) -> + requestBuilder.header(key, value) } - .build(), + chain.proceed(requestBuilder.build()) + } + .build() + private val commandApi = + CommandApi( + execdBaseUrl, + execdApiClient, ) override fun run(request: RunCommandRequest): Execution { @@ -86,51 +91,21 @@ internal class CommandsAdapter( try { val httpRequest = Request.Builder() - .url("${httpClientProvider.config.protocol}://${execdEndpoint.endpoint}$RUN_COMMAND_PATH") + .url("$execdBaseUrl$RUN_COMMAND_PATH") .post( jsonParser.encodeToString(request.toApiRunCommandRequest()).toRequestBody("application/json".toMediaType()), ) .headers(execdEndpoint.headers.toHeaders()) .build() - val execution = Execution() - - httpClientProvider.sseClient.newCall(httpRequest).execute().use { response -> - if (!response.isSuccessful) { - val errorBodyString = response.body?.string() - val sandboxError = parseSandboxError(errorBodyString) - val message = "Failed to run commands. Status code: ${response.code}, Body: $errorBodyString" - throw SandboxApiException( - message = message, - statusCode = response.code, - error = sandboxError ?: SandboxError(UNEXPECTED_RESPONSE), - requestId = response.header("X-Request-ID"), - ) - } - - response.body?.byteStream()?.bufferedReader(Charsets.UTF_8)?.use { reader -> - val dispatcher = ExecutionEventDispatcher(execution, request.handlers) - reader.lineSequence() - .filter(String::isNotBlank) - .forEach { line -> - try { - val eventNode = jsonParser.decodeFromString(line) - dispatcher.dispatch(eventNode) - } catch (e: Exception) { - logger.error("Failed to parse SSE line: {}", line, e) - } - } - } - } - if (!request.background) { - execution.exitCode = - if (execution.error != null) { - execution.error?.value?.toIntOrNull() - } else { - if (execution.complete != null) 0 else null - } - } - return execution + return executeStreamingRequest( + httpRequest = httpRequest, + handlers = request.handlers, + inferExitCode = !request.background, + failureMessage = { statusCode, errorBody -> + "Failed to run commands. Status code: $statusCode, Body: $errorBody" + }, + ) } catch (e: Exception) { logger.error("Failed to run command (length: {})", request.command.length, e) throw e.toSandboxException() @@ -139,7 +114,7 @@ internal class CommandsAdapter( override fun interrupt(executionId: String) { try { - api.interruptCommand(executionId) + commandApi.interruptCommand(executionId) } catch (e: Exception) { logger.error("Failed to interrupt command", e) throw e.toSandboxException() @@ -148,7 +123,7 @@ internal class CommandsAdapter( override fun getCommandStatus(executionId: String): CommandStatus { return try { - val status = api.getCommandStatus(executionId) + val status = commandApi.getCommandStatus(executionId) status.toCommandStatus() } catch (e: Exception) { logger.error("Failed to get command status", e) @@ -161,7 +136,7 @@ internal class CommandsAdapter( cursor: Long?, ): CommandLogs { return try { - val localVarResponse = api.getBackgroundCommandLogsWithHttpInfo(executionId, cursor) + val localVarResponse = commandApi.getBackgroundCommandLogsWithHttpInfo(executionId, cursor) val content = when (localVarResponse.responseType) { ResponseType.Success -> (localVarResponse as Success<*>).data as String @@ -196,18 +171,158 @@ internal class CommandsAdapter( } } - override fun createSession(cwd: String?): String { - return sessionAdapter.createSession(cwd) + override fun createSession(workingDirectory: String?): String { + if (workingDirectory != null && workingDirectory.isBlank()) { + throw InvalidArgumentException("workingDirectory cannot be blank when provided") + } + return try { + val apiRequest = workingDirectory?.let { CreateSessionRequestApi(cwd = it) } + commandApi.createSession(apiRequest).sessionId + } catch (e: Exception) { + logger.error("Failed to create session", e) + throw e.toSandboxException() + } } override fun runInSession( sessionId: String, request: RunInSessionRequest, ): Execution { - return sessionAdapter.runInSession(sessionId, request) + if (sessionId.isBlank()) { + throw InvalidArgumentException("session_id cannot be empty") + } + try { + val apiRequest = + RunInSessionRequestApi( + command = request.command, + cwd = request.workingDirectory, + timeout = request.timeout, + ) + val runUrl = + execdBaseUrl + .toHttpUrlOrNull()!! + .newBuilder() + .addPathSegment(SESSION_PATH_SEGMENT) + .addPathSegment(sessionId) + .addPathSegment("run") + .build() + .toString() + val httpRequest = + Request.Builder() + .url(runUrl) + .post( + jsonParser.encodeToString(apiRequest).toRequestBody("application/json".toMediaType()), + ) + .headers(execdEndpoint.headers.toHeaders()) + .build() + + return executeStreamingRequest( + httpRequest = httpRequest, + handlers = request.handlers, + inferExitCode = true, + failureMessage = { statusCode, errorBody -> + "run_in_session failed. Status: $statusCode, Body: $errorBody" + }, + ) + } catch (e: Exception) { + logger.error("Failed to run in session", e) + throw e.toSandboxException() + } } override fun deleteSession(sessionId: String) { - sessionAdapter.deleteSession(sessionId) + if (sessionId.isBlank()) { + throw InvalidArgumentException("session_id cannot be empty") + } + try { + commandApi.deleteSession(sessionId) + } catch (e: Exception) { + logger.error("Failed to delete session", e) + throw e.toSandboxException() + } + } + + private fun executeStreamingRequest( + httpRequest: Request, + handlers: ExecutionHandlers?, + inferExitCode: Boolean, + failureMessage: (Int, String?) -> String, + ): Execution { + val execution = Execution() + + httpClientProvider.sseClient.newCall(httpRequest).execute().use { response -> + ensureSuccessfulStreamingResponse(response, failureMessage) + + response.body?.byteStream()?.bufferedReader(Charsets.UTF_8)?.use { reader -> + val dispatcher = ExecutionEventDispatcher(execution, handlers) + reader.lineSequence().forEach { line -> + decodeEventLine(line)?.let { eventNode -> + try { + dispatcher.dispatch(eventNode) + } catch (e: Exception) { + logger.error("Failed to dispatch SSE event: {}", eventNode, e) + } + } + } + } + } + + if (inferExitCode) { + execution.exitCode = inferForegroundExitCode(execution) + } + return execution + } + + private fun ensureSuccessfulStreamingResponse( + response: Response, + failureMessage: (Int, String?) -> String, + ) { + if (response.isSuccessful) { + return + } + + val errorBodyString = response.body?.string() + val sandboxError = parseSandboxError(errorBodyString) + throw SandboxApiException( + message = failureMessage(response.code, errorBodyString), + statusCode = response.code, + error = sandboxError ?: SandboxError(UNEXPECTED_RESPONSE), + requestId = response.header("X-Request-ID"), + ) + } + + private fun decodeEventLine(line: String): EventNode? { + if (line.isBlank()) { + return null + } + + val payload = + when { + line.startsWith(":") -> return null + line.startsWith("event:") -> return null + line.startsWith("id:") -> return null + line.startsWith("retry:") -> return null + line.startsWith("data:") -> line.drop(5).trim() + else -> line + } + + if (payload.isBlank()) { + return null + } + + return try { + jsonParser.decodeFromString(payload) + } catch (e: Exception) { + logger.error("Failed to parse SSE line: {}", line, e) + null + } + } + + private fun inferForegroundExitCode(execution: Execution): Int? { + return if (execution.error != null) { + execution.error?.value?.toIntOrNull() + } else { + if (execution.complete != null) 0 else null + } } } diff --git a/sdks/sandbox/kotlin/sandbox/src/main/kotlin/com/alibaba/opensandbox/sandbox/infrastructure/adapters/service/SessionAdapter.kt b/sdks/sandbox/kotlin/sandbox/src/main/kotlin/com/alibaba/opensandbox/sandbox/infrastructure/adapters/service/SessionAdapter.kt deleted file mode 100644 index 532a3c86f..000000000 --- a/sdks/sandbox/kotlin/sandbox/src/main/kotlin/com/alibaba/opensandbox/sandbox/infrastructure/adapters/service/SessionAdapter.kt +++ /dev/null @@ -1,225 +0,0 @@ -/* - * Copyright 2025 Alibaba Group Holding Ltd. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.alibaba.opensandbox.sandbox.infrastructure.adapters.service - -import com.alibaba.opensandbox.sandbox.HttpClientProvider -import com.alibaba.opensandbox.sandbox.api.models.execd.EventNode -import com.alibaba.opensandbox.sandbox.domain.exceptions.InvalidArgumentException -import com.alibaba.opensandbox.sandbox.domain.exceptions.SandboxApiException -import com.alibaba.opensandbox.sandbox.domain.exceptions.SandboxError -import com.alibaba.opensandbox.sandbox.domain.exceptions.SandboxError.Companion.UNEXPECTED_RESPONSE -import com.alibaba.opensandbox.sandbox.domain.models.execd.executions.Execution -import com.alibaba.opensandbox.sandbox.domain.models.execd.executions.RunInSessionRequest -import com.alibaba.opensandbox.sandbox.domain.models.sandboxes.SandboxEndpoint -import com.alibaba.opensandbox.sandbox.domain.services.BashSession -import com.alibaba.opensandbox.sandbox.infrastructure.adapters.converter.ExecutionEventDispatcher -import com.alibaba.opensandbox.sandbox.infrastructure.adapters.converter.jsonParser -import com.alibaba.opensandbox.sandbox.infrastructure.adapters.converter.parseSandboxError -import com.alibaba.opensandbox.sandbox.infrastructure.adapters.converter.toSandboxException -import kotlinx.serialization.SerialName -import kotlinx.serialization.Serializable -import okhttp3.Headers.Companion.toHeaders -import okhttp3.HttpUrl.Companion.toHttpUrlOrNull -import okhttp3.MediaType.Companion.toMediaType -import okhttp3.Request -import okhttp3.RequestBody.Companion.toRequestBody -import org.slf4j.LoggerFactory - -/** - * Implementation of [BashSession] that calls execd session APIs via HTTP. - */ -internal class SessionAdapter( - private val httpClientProvider: HttpClientProvider, - private val execdEndpoint: SandboxEndpoint, -) : BashSession { - companion object { - private const val SESSION_PATH = "/session" - } - - private val logger = LoggerFactory.getLogger(SessionAdapter::class.java) - - override fun createSession(cwd: String?): String { - if (cwd != null && cwd.isBlank()) { - throw InvalidArgumentException("cwd cannot be blank when provided") - } - try { - val body = - if (cwd != null) { - jsonParser.encodeToString(CreateSessionRequest(cwd = cwd)) - .toRequestBody("application/json".toMediaType()) - } else { - null - } - val requestBuilder = - Request.Builder() - .url("${httpClientProvider.config.protocol}://${execdEndpoint.endpoint}$SESSION_PATH") - .headers(execdEndpoint.headers.toHeaders()) - if (body != null) { - requestBuilder.post(body) - } else { - requestBuilder.post(okhttp3.RequestBody.create(null, ByteArray(0))) - } - val request = requestBuilder.build() - - httpClientProvider.httpClient.newCall(request).execute().use { response -> - if (!response.isSuccessful) { - val errorBodyString = response.body?.string() - val sandboxError = parseSandboxError(errorBodyString) - throw SandboxApiException( - message = "Failed to create session. Status: ${response.code}, Body: $errorBodyString", - statusCode = response.code, - error = sandboxError ?: SandboxError(UNEXPECTED_RESPONSE), - requestId = response.header("X-Request-ID"), - ) - } - val responseBody = - response.body?.string() ?: throw SandboxApiException( - message = "create_session returned empty body", - statusCode = response.code, - error = SandboxError(UNEXPECTED_RESPONSE), - requestId = response.header("X-Request-ID"), - ) - val parsed = jsonParser.decodeFromString(responseBody) - return parsed.sessionId - } - } catch (e: Exception) { - logger.error("Failed to create session", e) - throw e.toSandboxException() - } - } - - override fun runInSession( - sessionId: String, - request: RunInSessionRequest, - ): Execution { - if (sessionId.isBlank()) { - throw InvalidArgumentException("session_id cannot be empty") - } - try { - val apiRequest = - RunInSessionRequestApi( - code = request.code, - cwd = request.cwd, - timeoutMs = request.timeoutMs, - ) - val runUrl = - "${httpClientProvider.config.protocol}://${execdEndpoint.endpoint}" - .toHttpUrlOrNull()!! - .newBuilder() - .addPathSegment("session") - .addPathSegment(sessionId) - .addPathSegment("run") - .build() - .toString() - val httpRequest = - Request.Builder() - .url(runUrl) - .post( - jsonParser.encodeToString(apiRequest).toRequestBody("application/json".toMediaType()), - ) - .headers(execdEndpoint.headers.toHeaders()) - .build() - - val execution = Execution() - httpClientProvider.sseClient.newCall(httpRequest).execute().use { response -> - if (!response.isSuccessful) { - val errorBodyString = response.body?.string() - val sandboxError = parseSandboxError(errorBodyString) - throw SandboxApiException( - message = "run_in_session failed. Status: ${response.code}, Body: $errorBodyString", - statusCode = response.code, - error = sandboxError ?: SandboxError(UNEXPECTED_RESPONSE), - requestId = response.header("X-Request-ID"), - ) - } - response.body?.byteStream()?.bufferedReader(Charsets.UTF_8)?.use { reader -> - val dispatcher = ExecutionEventDispatcher(execution, request.handlers) - reader.lineSequence() - .filter(String::isNotBlank) - .forEach { line -> - try { - val data = if (line.startsWith("data:")) line.drop(5).trim() else line - val eventNode = jsonParser.decodeFromString(data) - dispatcher.dispatch(eventNode) - } catch (e: Exception) { - logger.error("Failed to parse SSE line: {}", line, e) - } - } - } - } - return execution - } catch (e: Exception) { - logger.error("Failed to run in session", e) - throw e.toSandboxException() - } - } - - override fun deleteSession(sessionId: String) { - if (sessionId.isBlank()) { - throw InvalidArgumentException("session_id cannot be empty") - } - try { - val deleteUrl = - "${httpClientProvider.config.protocol}://${execdEndpoint.endpoint}" - .toHttpUrlOrNull()!! - .newBuilder() - .addPathSegment("session") - .addPathSegment(sessionId) - .build() - .toString() - val request = - Request.Builder() - .url(deleteUrl) - .delete() - .headers(execdEndpoint.headers.toHeaders()) - .build() - - httpClientProvider.httpClient.newCall(request).execute().use { response -> - if (!response.isSuccessful) { - val errorBodyString = response.body?.string() - val sandboxError = parseSandboxError(errorBodyString) - throw SandboxApiException( - message = "delete_session failed. Status: ${response.code}, Body: $errorBodyString", - statusCode = response.code, - error = sandboxError ?: SandboxError(UNEXPECTED_RESPONSE), - requestId = response.header("X-Request-ID"), - ) - } - } - } catch (e: Exception) { - logger.error("Failed to delete session", e) - throw e.toSandboxException() - } - } -} - -@Serializable -private data class CreateSessionRequest( - val cwd: String? = null, -) - -@Serializable -private data class CreateSessionResponse( - @SerialName("session_id") val sessionId: String, -) - -@Serializable -private data class RunInSessionRequestApi( - val code: String, - val cwd: String? = null, - @SerialName("timeout_ms") val timeoutMs: Long? = null, -) diff --git a/sdks/sandbox/kotlin/sandbox/src/main/kotlin/com/alibaba/opensandbox/sandbox/infrastructure/factory/AdapterFactory.kt b/sdks/sandbox/kotlin/sandbox/src/main/kotlin/com/alibaba/opensandbox/sandbox/infrastructure/factory/AdapterFactory.kt index be352c618..bdec1603f 100644 --- a/sdks/sandbox/kotlin/sandbox/src/main/kotlin/com/alibaba/opensandbox/sandbox/infrastructure/factory/AdapterFactory.kt +++ b/sdks/sandbox/kotlin/sandbox/src/main/kotlin/com/alibaba/opensandbox/sandbox/infrastructure/factory/AdapterFactory.kt @@ -30,7 +30,6 @@ import com.alibaba.opensandbox.sandbox.infrastructure.adapters.service.Filesyste import com.alibaba.opensandbox.sandbox.infrastructure.adapters.service.HealthAdapter import com.alibaba.opensandbox.sandbox.infrastructure.adapters.service.MetricsAdapter import com.alibaba.opensandbox.sandbox.infrastructure.adapters.service.SandboxesAdapter -import com.alibaba.opensandbox.sandbox.infrastructure.adapters.service.SessionAdapter /** * Factory responsible for creating adapter instances. @@ -50,8 +49,7 @@ internal class AdapterFactory( } fun createCommands(endpoint: SandboxEndpoint): Commands { - val sessionAdapter = SessionAdapter(httpClientProvider, endpoint) - return CommandsAdapter(httpClientProvider, endpoint, sessionAdapter) + return CommandsAdapter(httpClientProvider, endpoint) } fun createEgress(endpoint: SandboxEndpoint): Egress { diff --git a/sdks/sandbox/kotlin/sandbox/src/test/kotlin/com/alibaba/opensandbox/sandbox/infrastructure/adapters/service/CommandsAdapterTest.kt b/sdks/sandbox/kotlin/sandbox/src/test/kotlin/com/alibaba/opensandbox/sandbox/infrastructure/adapters/service/CommandsAdapterTest.kt index 9924d21c5..747cb9264 100644 --- a/sdks/sandbox/kotlin/sandbox/src/test/kotlin/com/alibaba/opensandbox/sandbox/infrastructure/adapters/service/CommandsAdapterTest.kt +++ b/sdks/sandbox/kotlin/sandbox/src/test/kotlin/com/alibaba/opensandbox/sandbox/infrastructure/adapters/service/CommandsAdapterTest.kt @@ -18,9 +18,11 @@ package com.alibaba.opensandbox.sandbox.infrastructure.adapters.service import com.alibaba.opensandbox.sandbox.HttpClientProvider import com.alibaba.opensandbox.sandbox.config.ConnectionConfig +import com.alibaba.opensandbox.sandbox.domain.exceptions.InvalidArgumentException import com.alibaba.opensandbox.sandbox.domain.exceptions.SandboxApiException import com.alibaba.opensandbox.sandbox.domain.models.execd.executions.ExecutionHandlers import com.alibaba.opensandbox.sandbox.domain.models.execd.executions.RunCommandRequest +import com.alibaba.opensandbox.sandbox.domain.models.execd.executions.RunInSessionRequest import com.alibaba.opensandbox.sandbox.domain.models.sandboxes.SandboxEndpoint import kotlinx.serialization.json.Json import kotlinx.serialization.json.booleanOrNull @@ -62,8 +64,7 @@ class CommandsAdapterTest { .build() httpClientProvider = HttpClientProvider(config) - val sessionAdapter = SessionAdapter(httpClientProvider, endpoint) - commandsAdapter = CommandsAdapter(httpClientProvider, endpoint, sessionAdapter) + commandsAdapter = CommandsAdapter(httpClientProvider, endpoint) } @AfterEach @@ -135,7 +136,8 @@ class CommandsAdapterTest { fun `run should infer non-zero exit code from command error event`() { val initEvent = """{"type":"init","text":"cmd-123","timestamp":1672531200000}""" val errorEvent = - """{"type":"error","error":{"ename":"CommandExecError","evalue":"7","traceback":["exit status 7"]},"timestamp":1672531201000}""" + """{"type":"error","error":{"ename":"CommandExecError",""" + + """"evalue":"7","traceback":["exit status 7"]},"timestamp":1672531201000}""" mockWebServer.enqueue( MockResponse() @@ -162,7 +164,8 @@ class CommandsAdapterTest { val initEvent = """{"type":"init","text":"cmd-123","timestamp":1672531200000}""" val completeEvent = """{"type":"execution_complete","execution_time":100,"timestamp":1672531201000}""" val errorEvent = - """{"type":"error","error":{"ename":"CommandExecError","evalue":"7","traceback":["exit status 7"]},"timestamp":1672531202000}""" + """{"type":"error","error":{"ename":"CommandExecError","evalue":"7",""" + + """"traceback":["exit status 7"]},"timestamp":1672531202000}""" mockWebServer.enqueue( MockResponse() @@ -187,7 +190,8 @@ class CommandsAdapterTest { val initEvent = """{"type":"init","text":"cmd-123","timestamp":1672531200000}""" val completeEvent = """{"type":"execution_complete","execution_time":100,"timestamp":1672531201000}""" val errorEvent = - """{"type":"error","error":{"ename":"CommandExecError","evalue":"","traceback":["failed"]},"timestamp":1672531202000}""" + """{"type":"error","error":{"ename":"CommandExecError",""" + + """"evalue":"","traceback":["failed"]},"timestamp":1672531202000}""" mockWebServer.enqueue( MockResponse() @@ -232,4 +236,125 @@ class CommandsAdapterTest { assertEquals(500, ex.statusCode) assertEquals("req-kotlin-123", ex.requestId) } + + @Test + fun `createSession should use generated api and return session id`() { + mockWebServer.enqueue( + MockResponse() + .setResponseCode(200) + .setBody("""{"session_id":"sess-123"}"""), + ) + + val sessionId = commandsAdapter.createSession("/workspace") + + assertEquals("sess-123", sessionId) + val recordedRequest = mockWebServer.takeRequest() + assertEquals("/session", recordedRequest.path) + assertEquals("POST", recordedRequest.method) + val requestBodyJson = Json.parseToJsonElement(recordedRequest.body.readUtf8()).jsonObject + assertEquals("/workspace", requestBodyJson["cwd"]?.jsonPrimitive?.content) + } + + @Test + fun `runInSession should stream events and send session request payload`() { + val stdoutEvent = """event: stdout +data: {"type":"stdout","text":"Hello","timestamp":1672531200000}""" + val completeEvent = """event: execution_complete +data: {"type":"execution_complete","execution_time":100,"timestamp":1672531201000}""" + mockWebServer.enqueue( + MockResponse() + .setResponseCode(200) + .setBody("$stdoutEvent\n\n$completeEvent\n\n"), + ) + + val receivedOutput = StringBuilder() + val latch = CountDownLatch(1) + var executionTime = -1L + val handlers = + ExecutionHandlers.builder() + .onStdout { msg -> receivedOutput.append(msg.text) } + .onExecutionComplete { complete -> + executionTime = complete.executionTimeInMillis + latch.countDown() + } + .build() + + val execution = + commandsAdapter.runInSession( + "sess-123", + RunInSessionRequest.builder() + .command("echo Hello") + .workingDirectory("/workspace") + .timeout(5000) + .handlers(handlers) + .build(), + ) + + assertTrue(latch.await(2, TimeUnit.SECONDS), "Timed out waiting for session completion event") + assertEquals("Hello", receivedOutput.toString()) + assertEquals(100L, executionTime) + assertEquals(0, execution.exitCode) + assertEquals(100L, execution.complete?.executionTimeInMillis) + val recordedRequest = mockWebServer.takeRequest() + assertEquals("/session/sess-123/run", recordedRequest.path) + assertEquals("POST", recordedRequest.method) + val requestBodyJson = Json.parseToJsonElement(recordedRequest.body.readUtf8()).jsonObject + assertEquals("echo Hello", requestBodyJson["command"]?.jsonPrimitive?.content) + assertEquals("/workspace", requestBodyJson["cwd"]?.jsonPrimitive?.content) + assertEquals(5000L, requestBodyJson["timeout"]?.jsonPrimitive?.content?.toLong()) + } + + @Test + fun `runInSession should infer non-zero exit code from command error event`() { + val initEvent = """data: {"type":"init","text":"cmd-123","timestamp":1672531200000}""" + val errorEvent = + """data: {"type":"error","error":{"ename":"CommandExecError","evalue":"7",""" + + """"traceback":["exit status 7"]},"timestamp":1672531201000}""" + + mockWebServer.enqueue( + MockResponse() + .setResponseCode(200) + .setBody("$initEvent\n\n$errorEvent\n\n"), + ) + + val execution = + commandsAdapter.runInSession( + "sess-123", + RunInSessionRequest.builder() + .command("exit 7") + .build(), + ) + + assertEquals("cmd-123", execution.id) + assertEquals(7, execution.exitCode) + assertEquals("CommandExecError", execution.error?.name) + assertEquals("7", execution.error?.value) + assertEquals(null, execution.complete) + } + + @Test + fun `deleteSession should use generated api`() { + mockWebServer.enqueue( + MockResponse() + .setResponseCode(204), + ) + + commandsAdapter.deleteSession("sess-123") + + val recordedRequest = mockWebServer.takeRequest() + assertEquals("/session/sess-123", recordedRequest.path) + assertEquals("DELETE", recordedRequest.method) + } + + @Test + fun `createSession should reject blank workingDirectory`() { + val ex = assertThrows(InvalidArgumentException::class.java) { commandsAdapter.createSession(" ") } + assertEquals("workingDirectory cannot be blank when provided", ex.message) + } + + @Test + fun `deleteSession should reject blank session id`() { + val ex = assertThrows(InvalidArgumentException::class.java) { commandsAdapter.deleteSession(" ") } + assertEquals("session_id cannot be empty", ex.message) + } } diff --git a/sdks/sandbox/python/src/opensandbox/adapters/command_adapter.py b/sdks/sandbox/python/src/opensandbox/adapters/command_adapter.py index 1db00be17..51d3ae6a7 100644 --- a/sdks/sandbox/python/src/opensandbox/adapters/command_adapter.py +++ b/sdks/sandbox/python/src/opensandbox/adapters/command_adapter.py @@ -26,9 +26,7 @@ import httpx -from opensandbox.adapters.converter.command_model_converter import ( - to_command_status, -) +from opensandbox.adapters.converter.command_model_converter import to_command_status from opensandbox.adapters.converter.event_node import EventNode from opensandbox.adapters.converter.exception_converter import ( ExceptionConverter, @@ -69,6 +67,46 @@ def _infer_foreground_exit_code(execution: Execution) -> int | None: return None +def _build_run_command_request_body(command: str, opts: RunCommandOpts): + return ExecutionConverter.to_api_run_command_request(command, opts) + + +def _build_run_in_session_request_body( + command: str, + working_directory: str | None, + timeout: int | None, +): + from opensandbox.api.execd.models.run_in_session_request import ( + RunInSessionRequest, + ) + from opensandbox.api.execd.types import UNSET + + return RunInSessionRequest( + command=command, + cwd=working_directory if working_directory else UNSET, + timeout=timeout if timeout is not None else UNSET, + ) + + +def _decode_sse_event_line(line: str) -> EventNode | None: + if not line.strip(): + return None + + if line.startswith((":", "event:", "id:", "retry:")): + return None + + data = line[5:].strip() if line.startswith("data:") else line + if not data: + return None + + try: + event_dict = json.loads(data) + return EventNode(**event_dict) + except Exception as e: + logger.error("Failed to parse SSE line: %s", line, exc_info=e) + return None + + class CommandsAdapter(Commands): """ Implementation of Commands that adapts openapi-python-client generated CommandApi. @@ -158,6 +196,51 @@ async def _get_sse_client(self) -> httpx.AsyncClient: """Return SSE client (read timeout disabled) for execd streaming.""" return self._sse_client + async def _execute_streaming_request( + self, + *, + url: str, + json_body: dict, + handlers: ExecutionHandlers | None, + infer_exit_code: bool, + failure_message: str, + ) -> Execution: + execution = Execution( + id=None, + execution_count=None, + result=[], + error=None, + ) + client = await self._get_sse_client() + + async with client.stream("POST", url, json=json_body) as response: + if response.status_code != 200: + await response.aread() + error_body = response.text + logger.error( + "%s. Status: %s, Body: %s", + failure_message, + response.status_code, + error_body, + ) + raise SandboxApiException( + message=f"{failure_message}. Status code: {response.status_code}", + status_code=response.status_code, + request_id=extract_request_id(response.headers), + ) + + dispatcher = ExecutionEventDispatcher(execution, handlers) + async for line in response.aiter_lines(): + event_node = _decode_sse_event_line(line) + if event_node is None: + continue + await dispatcher.dispatch(event_node) + + if infer_exit_code: + execution.exit_code = _infer_foreground_exit_code(execution) + + return execution + async def run( self, command: str, @@ -174,60 +257,17 @@ async def run( raise InvalidArgumentException("Command cannot be empty") try: - # Convert domain model to API model opts = opts or RunCommandOpts() - json_body = ExecutionConverter.to_api_run_command_json(command, opts) - - # Prepare URL + json_body = _build_run_command_request_body(command, opts).to_dict() url = self._get_execd_url(self.RUN_COMMAND_PATH) - - execution = Execution( - id=None, - execution_count=None, - result=[], - error=None, + return await self._execute_streaming_request( + url=url, + json_body=json_body, + handlers=handlers, + infer_exit_code=not opts.background, + failure_message="Failed to run command", ) - # Use SSE client for streaming responses (read timeout disabled) - client = await self._get_sse_client() - - # Use streaming request for SSE - async with client.stream("POST", url, json=json_body) as response: - if response.status_code != 200: - await response.aread() - error_body = response.text - logger.error( - f"Failed to run command. Status: {response.status_code}, Body: {error_body}" - ) - raise SandboxApiException( - message=f"Failed to run command. Status code: {response.status_code}", - status_code=response.status_code, - request_id=extract_request_id(response.headers), - ) - - dispatcher = ExecutionEventDispatcher(execution, handlers) - - async for line in response.aiter_lines(): - if not line.strip(): - continue - - # Handle potential SSE format "data: ..." - data = line - if data.startswith("data:"): - data = data[5:].strip() - - try: - event_dict = json.loads(data) - event_node = EventNode(**event_dict) - await dispatcher.dispatch(event_node) - except Exception as e: - logger.error(f"Failed to parse SSE line: {line}", exc_info=e) - - if not opts.background: - execution.exit_code = _infer_foreground_exit_code(execution) - - return execution - except Exception as e: logger.error( "Failed to run command (length: %s)", @@ -304,8 +344,8 @@ async def get_background_command_logs( logger.error("Failed to get command logs", exc_info=e) raise ExceptionConverter.to_sandbox_exception(e) from e - async def create_session(self, *, cwd: str | None = None) -> str: - from opensandbox.api.execd.api.code_interpreting.create_session import ( + async def create_session(self, *, working_directory: str | None = None) -> str: + from opensandbox.api.execd.api.command.create_session import ( asyncio as create_session_asyncio, ) from opensandbox.api.execd.models.create_session_request import ( @@ -316,7 +356,11 @@ async def create_session(self, *, cwd: str | None = None) -> str: ) from opensandbox.api.execd.types import UNSET - body = CreateSessionRequest(cwd=cwd) if cwd else UNSET + body = ( + CreateSessionRequest(cwd=working_directory) + if working_directory + else UNSET + ) try: client = await self._get_client() parsed = await create_session_asyncio(client=client, body=body) @@ -338,75 +382,38 @@ async def create_session(self, *, cwd: str | None = None) -> str: async def run_in_session( self, session_id: str, - code: str, + command: str, *, - cwd: str | None = None, - timeout_ms: int | None = None, + working_directory: str | None = None, + timeout: int | None = None, handlers: ExecutionHandlers | None = None, ) -> Execution: if not (session_id and session_id.strip()): raise InvalidArgumentException("session_id cannot be empty") - if not (code and code.strip()): - raise InvalidArgumentException("code cannot be empty") + if not (command and command.strip()): + raise InvalidArgumentException("command cannot be empty") - from opensandbox.api.execd.models.run_in_session_request import ( - RunInSessionRequest, - ) - from opensandbox.api.execd.types import UNSET - - body = RunInSessionRequest( - code=code, - cwd=cwd if cwd else UNSET, - timeout_ms=timeout_ms if timeout_ms is not None else UNSET, + body = _build_run_in_session_request_body( + command, working_directory, timeout ) url = self._get_execd_url( self.RUN_IN_SESSION_PATH.format(session_id=session_id) ) - execution = Execution( - id=None, - execution_count=None, - result=[], - error=None, - ) try: - client = await self._get_sse_client() - async with client.stream("POST", url, json=body.to_dict()) as response: - if response.status_code != 200: - await response.aread() - error_body = response.text - logger.error( - "run_in_session failed. Status: %s, Body: %s", - response.status_code, - error_body, - ) - raise SandboxApiException( - message=f"run_in_session failed. Status: {response.status_code}", - status_code=response.status_code, - request_id=extract_request_id(response.headers), - ) - dispatcher = ExecutionEventDispatcher(execution, handlers) - async for line in response.aiter_lines(): - if not line.strip(): - continue - data = line - if data.startswith("data:"): - data = data[5:].strip() - try: - event_dict = json.loads(data) - event_node = EventNode(**event_dict) - await dispatcher.dispatch(event_node) - except Exception as e: - logger.error( - "Failed to parse SSE line: %s", line, exc_info=e - ) - return execution + return await self._execute_streaming_request( + url=url, + json_body=body.to_dict(), + handlers=handlers, + infer_exit_code=True, + failure_message="run_in_session failed", + ) except Exception as e: raise ExceptionConverter.to_sandbox_exception(e) from e async def delete_session(self, session_id: str) -> None: if not (session_id and session_id.strip()): raise InvalidArgumentException("session_id cannot be empty") - from opensandbox.api.execd.api.code_interpreting.delete_session import ( + from opensandbox.api.execd.api.command.delete_session import ( asyncio as delete_session_asyncio, ) diff --git a/sdks/sandbox/python/src/opensandbox/api/execd/api/code_interpreting/create_session.py b/sdks/sandbox/python/src/opensandbox/api/execd/api/command/create_session.py similarity index 100% rename from sdks/sandbox/python/src/opensandbox/api/execd/api/code_interpreting/create_session.py rename to sdks/sandbox/python/src/opensandbox/api/execd/api/command/create_session.py diff --git a/sdks/sandbox/python/src/opensandbox/api/execd/api/code_interpreting/delete_session.py b/sdks/sandbox/python/src/opensandbox/api/execd/api/command/delete_session.py similarity index 100% rename from sdks/sandbox/python/src/opensandbox/api/execd/api/code_interpreting/delete_session.py rename to sdks/sandbox/python/src/opensandbox/api/execd/api/command/delete_session.py diff --git a/sdks/sandbox/python/src/opensandbox/api/execd/api/code_interpreting/run_in_session.py b/sdks/sandbox/python/src/opensandbox/api/execd/api/command/run_in_session.py similarity index 85% rename from sdks/sandbox/python/src/opensandbox/api/execd/api/code_interpreting/run_in_session.py rename to sdks/sandbox/python/src/opensandbox/api/execd/api/command/run_in_session.py index e4bc3471f..05f1c80a1 100644 --- a/sdks/sandbox/python/src/opensandbox/api/execd/api/code_interpreting/run_in_session.py +++ b/sdks/sandbox/python/src/opensandbox/api/execd/api/command/run_in_session.py @@ -91,15 +91,15 @@ def sync_detailed( client: AuthenticatedClient | Client, body: RunInSessionRequest, ) -> Response[ErrorResponse | ServerStreamEvent]: - """Run code in bash session (run_in_session) + """Run command in bash session (run_in_session) - Executes code in an existing bash session and streams the output in real-time via SSE + Executes a shell command in an existing bash session and streams the output in real-time via SSE (Server-Sent Events). The session must have been created by create_session. Supports optional working directory override and timeout (milliseconds). Args: session_id (str): - body (RunInSessionRequest): Request to run code in an existing bash session + body (RunInSessionRequest): Request to run a command in an existing bash session Raises: errors.UnexpectedStatus: If the server returns an undocumented status code and Client.raise_on_unexpected_status is True. @@ -127,15 +127,15 @@ def sync( client: AuthenticatedClient | Client, body: RunInSessionRequest, ) -> ErrorResponse | ServerStreamEvent | None: - """Run code in bash session (run_in_session) + """Run command in bash session (run_in_session) - Executes code in an existing bash session and streams the output in real-time via SSE + Executes a shell command in an existing bash session and streams the output in real-time via SSE (Server-Sent Events). The session must have been created by create_session. Supports optional working directory override and timeout (milliseconds). Args: session_id (str): - body (RunInSessionRequest): Request to run code in an existing bash session + body (RunInSessionRequest): Request to run a command in an existing bash session Raises: errors.UnexpectedStatus: If the server returns an undocumented status code and Client.raise_on_unexpected_status is True. @@ -158,15 +158,15 @@ async def asyncio_detailed( client: AuthenticatedClient | Client, body: RunInSessionRequest, ) -> Response[ErrorResponse | ServerStreamEvent]: - """Run code in bash session (run_in_session) + """Run command in bash session (run_in_session) - Executes code in an existing bash session and streams the output in real-time via SSE + Executes a shell command in an existing bash session and streams the output in real-time via SSE (Server-Sent Events). The session must have been created by create_session. Supports optional working directory override and timeout (milliseconds). Args: session_id (str): - body (RunInSessionRequest): Request to run code in an existing bash session + body (RunInSessionRequest): Request to run a command in an existing bash session Raises: errors.UnexpectedStatus: If the server returns an undocumented status code and Client.raise_on_unexpected_status is True. @@ -192,15 +192,15 @@ async def asyncio( client: AuthenticatedClient | Client, body: RunInSessionRequest, ) -> ErrorResponse | ServerStreamEvent | None: - """Run code in bash session (run_in_session) + """Run command in bash session (run_in_session) - Executes code in an existing bash session and streams the output in real-time via SSE + Executes a shell command in an existing bash session and streams the output in real-time via SSE (Server-Sent Events). The session must have been created by create_session. Supports optional working directory override and timeout (milliseconds). Args: session_id (str): - body (RunInSessionRequest): Request to run code in an existing bash session + body (RunInSessionRequest): Request to run a command in an existing bash session Raises: errors.UnexpectedStatus: If the server returns an undocumented status code and Client.raise_on_unexpected_status is True. diff --git a/sdks/sandbox/python/src/opensandbox/api/execd/models/run_in_session_request.py b/sdks/sandbox/python/src/opensandbox/api/execd/models/run_in_session_request.py index 0ceae7313..df5d20264 100644 --- a/sdks/sandbox/python/src/opensandbox/api/execd/models/run_in_session_request.py +++ b/sdks/sandbox/python/src/opensandbox/api/execd/models/run_in_session_request.py @@ -29,54 +29,54 @@ @_attrs_define class RunInSessionRequest: - """Request to run code in an existing bash session + """Request to run a command in an existing bash session Attributes: - code (str): Shell code to execute in the session Example: echo "Hello". + command (str): Shell command to execute in the session Example: echo "Hello". cwd (str | Unset): Working directory override for this run (optional) Example: /workspace. - timeout_ms (int | Unset): Maximum execution time in milliseconds (optional; server may not enforce if omitted) + timeout (int | Unset): Maximum execution time in milliseconds (optional; server may not enforce if omitted) Example: 30000. """ - code: str + command: str cwd: str | Unset = UNSET - timeout_ms: int | Unset = UNSET + timeout: int | Unset = UNSET additional_properties: dict[str, Any] = _attrs_field(init=False, factory=dict) def to_dict(self) -> dict[str, Any]: - code = self.code + command = self.command cwd = self.cwd - timeout_ms = self.timeout_ms + timeout = self.timeout field_dict: dict[str, Any] = {} field_dict.update(self.additional_properties) field_dict.update( { - "code": code, + "command": command, } ) if cwd is not UNSET: field_dict["cwd"] = cwd - if timeout_ms is not UNSET: - field_dict["timeout_ms"] = timeout_ms + if timeout is not UNSET: + field_dict["timeout"] = timeout return field_dict @classmethod def from_dict(cls: type[T], src_dict: Mapping[str, Any]) -> T: d = dict(src_dict) - code = d.pop("code") + command = d.pop("command") cwd = d.pop("cwd", UNSET) - timeout_ms = d.pop("timeout_ms", UNSET) + timeout = d.pop("timeout", UNSET) run_in_session_request = cls( - code=code, + command=command, cwd=cwd, - timeout_ms=timeout_ms, + timeout=timeout, ) run_in_session_request.additional_properties = d diff --git a/sdks/sandbox/python/src/opensandbox/services/command.py b/sdks/sandbox/python/src/opensandbox/services/command.py index 91d655a27..e8d90c5f6 100644 --- a/sdks/sandbox/python/src/opensandbox/services/command.py +++ b/sdks/sandbox/python/src/opensandbox/services/command.py @@ -113,12 +113,12 @@ async def get_background_command_logs( """ ... - async def create_session(self, *, cwd: str | None = None) -> str: + async def create_session(self, *, working_directory: str | None = None) -> str: """ Create a bash session. Returns session_id for run_in_session and delete_session. Args: - cwd: Optional working directory for the session. + working_directory: Optional working directory for the session. Returns: Session ID string. @@ -131,20 +131,20 @@ async def create_session(self, *, cwd: str | None = None) -> str: async def run_in_session( self, session_id: str, - code: str, + command: str, *, - cwd: str | None = None, - timeout_ms: int | None = None, + working_directory: str | None = None, + timeout: int | None = None, handlers: ExecutionHandlers | None = None, ) -> Execution: """ - Run shell code in an existing bash session (streams output via SSE). + Run a shell command in an existing bash session (streams output via SSE). Args: session_id: Session ID from create_session. - code: Shell code to execute. - cwd: Optional working directory override for this run. - timeout_ms: Optional max execution time in milliseconds. + command: Shell command to execute. + working_directory: Optional working directory override for this run. + timeout: Optional max execution time in milliseconds for this session run. handlers: Optional async handlers for streaming events. Returns: diff --git a/sdks/sandbox/python/src/opensandbox/sync/adapters/command_adapter.py b/sdks/sandbox/python/src/opensandbox/sync/adapters/command_adapter.py index 802b62be0..c1c08de7b 100644 --- a/sdks/sandbox/python/src/opensandbox/sync/adapters/command_adapter.py +++ b/sdks/sandbox/python/src/opensandbox/sync/adapters/command_adapter.py @@ -62,6 +62,46 @@ def _infer_foreground_exit_code(execution: Execution) -> int | None: return None +def _build_run_command_request_body(command: str, opts: RunCommandOpts): + return ExecutionConverter.to_api_run_command_request(command, opts) + + +def _build_run_in_session_request_body( + command: str, + working_directory: str | None, + timeout: int | None, +): + from opensandbox.api.execd.models.run_in_session_request import ( + RunInSessionRequest, + ) + from opensandbox.api.execd.types import UNSET + + return RunInSessionRequest( + command=command, + cwd=working_directory if working_directory else UNSET, + timeout=timeout if timeout is not None else UNSET, + ) + + +def _decode_sse_event_line(line: str) -> EventNode | None: + if not line or not line.strip(): + return None + + if line.startswith((":", "event:", "id:", "retry:")): + return None + + data = line[5:].strip() if line.startswith("data:") else line + if not data: + return None + + try: + event_dict = json.loads(data) + return EventNode(**event_dict) + except Exception as e: + logger.error("Failed to parse SSE line: %s", line, exc_info=e) + return None + + class CommandsAdapterSync(CommandsSync): """ Synchronous implementation of :class:`~opensandbox.sync.services.command.CommandsSync`. @@ -128,6 +168,38 @@ def _get_execd_url(self, path: str) -> str: """Build URL for execd endpoint.""" return f"{self.connection_config.protocol}://{self.execd_endpoint.endpoint}{path}" + def _execute_streaming_request( + self, + *, + url: str, + json_body: dict, + handlers: ExecutionHandlersSync | None, + infer_exit_code: bool, + failure_message: str, + ) -> Execution: + execution = Execution(id=None, execution_count=None, result=[], error=None) + dispatcher = ExecutionEventDispatcherSync(execution, handlers) + + with self._sse_client.stream("POST", url, json=json_body) as response: + if response.status_code != 200: + response.read() + raise SandboxApiException( + message=f"{failure_message}. Status code: {response.status_code}", + status_code=response.status_code, + request_id=extract_request_id(response.headers), + ) + + for line in response.iter_lines(): + event_node = _decode_sse_event_line(line) + if event_node is None: + continue + dispatcher.dispatch(event_node) + + if infer_exit_code: + execution.exit_code = _infer_foreground_exit_code(execution) + + return execution + def run( self, command: str, @@ -140,38 +212,15 @@ def run( try: opts = opts or RunCommandOpts() - json_body = ExecutionConverter.to_api_run_command_json(command, opts) + json_body = _build_run_command_request_body(command, opts).to_dict() url = self._get_execd_url(self.RUN_COMMAND_PATH) - - execution = Execution(id=None, execution_count=None, result=[], error=None) - dispatcher = ExecutionEventDispatcherSync(execution, handlers) - - with self._sse_client.stream("POST", url, json=json_body) as response: - if response.status_code != 200: - response.read() - raise SandboxApiException( - message=f"Failed to run command. Status code: {response.status_code}", - status_code=response.status_code, - request_id=extract_request_id(response.headers), - ) - - for line in response.iter_lines(): - if not line or not line.strip(): - continue - data = line - if data.startswith("data:"): - data = data[5:].strip() - try: - event_dict = json.loads(data) - event_node = EventNode(**event_dict) - dispatcher.dispatch(event_node) - except Exception as e: - logger.error("Failed to parse SSE line: %s", line, exc_info=e) - - if not opts.background: - execution.exit_code = _infer_foreground_exit_code(execution) - - return execution + return self._execute_streaming_request( + url=url, + json_body=json_body, + handlers=handlers, + infer_exit_code=not opts.background, + failure_message="Failed to run command", + ) except Exception as e: logger.error("Failed to run command (length: %s)", len(command), exc_info=e) @@ -242,8 +291,8 @@ def get_background_command_logs( logger.error("Failed to get command logs", exc_info=e) raise ExceptionConverter.to_sandbox_exception(e) from e - def create_session(self, *, cwd: str | None = None) -> str: - from opensandbox.api.execd.api.code_interpreting.create_session import ( + def create_session(self, *, working_directory: str | None = None) -> str: + from opensandbox.api.execd.api.command.create_session import ( sync as create_session_sync, ) from opensandbox.api.execd.models.create_session_request import ( @@ -254,7 +303,11 @@ def create_session(self, *, cwd: str | None = None) -> str: ) from opensandbox.api.execd.types import UNSET - body = CreateSessionRequest(cwd=cwd) if cwd else UNSET + body = ( + CreateSessionRequest(cwd=working_directory) + if working_directory + else UNSET + ) try: parsed = create_session_sync(client=self._client, body=body) if parsed is None: @@ -275,70 +328,38 @@ def create_session(self, *, cwd: str | None = None) -> str: def run_in_session( self, session_id: str, - code: str, + command: str, *, - cwd: str | None = None, - timeout_ms: int | None = None, + working_directory: str | None = None, + timeout: int | None = None, handlers: ExecutionHandlersSync | None = None, ) -> Execution: if not (session_id and session_id.strip()): raise InvalidArgumentException("session_id cannot be empty") - if not (code and code.strip()): - raise InvalidArgumentException("code cannot be empty") + if not (command and command.strip()): + raise InvalidArgumentException("command cannot be empty") - from opensandbox.api.execd.models.run_in_session_request import ( - RunInSessionRequest, - ) - from opensandbox.api.execd.types import UNSET - - body = RunInSessionRequest( - code=code, - cwd=cwd if cwd else UNSET, - timeout_ms=timeout_ms if timeout_ms is not None else UNSET, + body = _build_run_in_session_request_body( + command, working_directory, timeout ) url = self._get_execd_url( self.RUN_IN_SESSION_PATH.format(session_id=session_id) ) - execution = Execution( - id=None, - execution_count=None, - result=[], - error=None, - ) try: - with self._sse_client.stream( - "POST", url, json=body.to_dict() - ) as response: - if response.status_code != 200: - response.read() - raise SandboxApiException( - message=f"run_in_session failed. Status: {response.status_code}", - status_code=response.status_code, - request_id=extract_request_id(response.headers), - ) - dispatcher = ExecutionEventDispatcherSync(execution, handlers) - for line in response.iter_lines(): - if not line or not line.strip(): - continue - data = line - if data.startswith("data:"): - data = data[5:].strip() - try: - event_dict = json.loads(data) - event_node = EventNode(**event_dict) - dispatcher.dispatch(event_node) - except Exception as e: - logger.error( - "Failed to parse SSE line: %s", line, exc_info=e - ) - return execution + return self._execute_streaming_request( + url=url, + json_body=body.to_dict(), + handlers=handlers, + infer_exit_code=True, + failure_message="run_in_session failed", + ) except Exception as e: raise ExceptionConverter.to_sandbox_exception(e) from e def delete_session(self, session_id: str) -> None: if not (session_id and session_id.strip()): raise InvalidArgumentException("session_id cannot be empty") - from opensandbox.api.execd.api.code_interpreting.delete_session import ( + from opensandbox.api.execd.api.command.delete_session import ( sync as delete_session_sync, ) diff --git a/sdks/sandbox/python/src/opensandbox/sync/services/command.py b/sdks/sandbox/python/src/opensandbox/sync/services/command.py index 846434ca1..5cfe6b21b 100644 --- a/sdks/sandbox/python/src/opensandbox/sync/services/command.py +++ b/sdks/sandbox/python/src/opensandbox/sync/services/command.py @@ -117,20 +117,28 @@ def get_background_command_logs( """ ... - def create_session(self, *, cwd: str | None = None) -> str: + def create_session(self, *, working_directory: str | None = None) -> str: """Create a bash session. Returns session_id for run_in_session and delete_session.""" ... def run_in_session( self, session_id: str, - code: str, + command: str, *, - cwd: str | None = None, - timeout_ms: int | None = None, + working_directory: str | None = None, + timeout: int | None = None, handlers: ExecutionHandlersSync | None = None, ) -> Execution: - """Run shell code in an existing bash session (streams output via SSE).""" + """Run a shell command in an existing bash session (streams output via SSE). + + Args: + session_id: Session ID from ``create_session``. + command: Shell command to execute. + working_directory: Optional working directory override for this run. + timeout: Optional max execution time in milliseconds for this session run. + handlers: Optional sync handlers for streaming events. + """ ... def delete_session(self, session_id: str) -> None: diff --git a/sdks/sandbox/python/tests/test_command_service_adapter_streaming.py b/sdks/sandbox/python/tests/test_command_service_adapter_streaming.py index dae1ed941..5a138e003 100644 --- a/sdks/sandbox/python/tests/test_command_service_adapter_streaming.py +++ b/sdks/sandbox/python/tests/test_command_service_adapter_streaming.py @@ -51,6 +51,32 @@ async def handle_async_request(self, request: httpx.Request) -> httpx.Response: request=request, ) + if request.url.path == "/session/sess-1/run" and payload.get("command") == "pwd": + sse = ( + b'event: stdout\n' + b'data: {"type":"stdout","text":"/var","timestamp":1}\n\n' + b'event: execution_complete\n' + b'data: {"type":"execution_complete","timestamp":2,"execution_time":3}\n\n' + ) + return httpx.Response( + 200, + headers={"Content-Type": "text/event-stream"}, + content=sse, + request=request, + ) + + if request.url.path == "/session/sess-2/run" and payload.get("command") == "exit 7": + sse = ( + b'data: {"type":"init","text":"sess-exec-2","timestamp":1}\n\n' + b'data: {"type":"error","error":{"ename":"CommandExecError","evalue":"7","traceback":["exit status 7"]},"timestamp":2}\n\n' + ) + return httpx.Response( + 200, + headers={"Content-Type": "text/event-stream"}, + content=sse, + request=request, + ) + return httpx.Response(500, content=b"boom", request=request) @@ -119,3 +145,47 @@ async def test_run_command_non_200_raises_api_exception() -> None: with pytest.raises(SandboxApiException): await adapter.run("other") + + +@pytest.mark.asyncio +async def test_run_in_session_streaming_uses_generated_fields_and_exit_code() -> None: + transport = _SseTransport() + cfg = ConnectionConfig(protocol="http", transport=transport) + endpoint = SandboxEndpoint(endpoint="localhost:44772", port=44772) + adapter = CommandsAdapter(cfg, endpoint) + + execution = await adapter.run_in_session( + "sess-1", + "pwd", + working_directory="/var", + timeout=5000, + ) + + assert execution.logs.stdout[0].text == "/var" + assert execution.complete is not None + assert execution.complete.execution_time_in_millis == 3 + assert execution.exit_code == 0 + + assert transport.last_request is not None + assert transport.last_request.url.path == "/session/sess-1/run" + request_body = json.loads(transport.last_request.content.decode("utf-8")) + assert request_body == { + "command": "pwd", + "cwd": "/var", + "timeout": 5000, + } + + +@pytest.mark.asyncio +async def test_run_in_session_non_zero_exit_updates_exit_code() -> None: + cfg = ConnectionConfig(protocol="http", transport=_SseTransport()) + endpoint = SandboxEndpoint(endpoint="localhost:44772", port=44772) + adapter = CommandsAdapter(cfg, endpoint) + + execution = await adapter.run_in_session("sess-2", "exit 7") + + assert execution.id == "sess-exec-2" + assert execution.error is not None + assert execution.error.value == "7" + assert execution.complete is None + assert execution.exit_code == 7 diff --git a/sdks/sandbox/python/tests/test_sync_command_service_adapter_streaming.py b/sdks/sandbox/python/tests/test_sync_command_service_adapter_streaming.py index ad6f13ef4..860d36d1d 100644 --- a/sdks/sandbox/python/tests/test_sync_command_service_adapter_streaming.py +++ b/sdks/sandbox/python/tests/test_sync_command_service_adapter_streaming.py @@ -42,6 +42,32 @@ def handle_request(self, request: httpx.Request) -> httpx.Response: request=request, ) + if request.url.path == "/session/sess-1/run" and payload.get("command") == "pwd": + sse = ( + b'event: stdout\n' + b'data: {"type":"stdout","text":"/var","timestamp":1}\n\n' + b'event: execution_complete\n' + b'data: {"type":"execution_complete","timestamp":2,"execution_time":3}\n\n' + ) + return httpx.Response( + 200, + headers={"Content-Type": "text/event-stream"}, + content=sse, + request=request, + ) + + if request.url.path == "/session/sess-2/run" and payload.get("command") == "exit 7": + sse = ( + b'data: {"type":"init","text":"sess-exec-2","timestamp":1}\n\n' + b'data: {"type":"error","error":{"ename":"CommandExecError","evalue":"7","traceback":["exit status 7"]},"timestamp":2}\n\n' + ) + return httpx.Response( + 200, + headers={"Content-Type": "text/event-stream"}, + content=sse, + request=request, + ) + sse = ( b'data: {"type":"init","text":"exec-2","timestamp":1}\n\n' b'data: {"type":"error","error":{"ename":"CommandExecError","evalue":"7","traceback":["exit status 7"]},"timestamp":2}\n\n' @@ -78,3 +104,36 @@ def test_sync_run_command_streaming_non_zero_exit_updates_exit_code() -> None: assert execution.error.value == "7" assert execution.complete is None assert execution.exit_code == 7 + + +def test_sync_run_in_session_streaming_uses_generated_fields_and_exit_code() -> None: + transport = _SseTransport() + cfg = ConnectionConfigSync(protocol="http", transport=transport) + endpoint = SandboxEndpoint(endpoint="localhost:44772", port=44772) + adapter = CommandsAdapterSync(cfg, endpoint) + + execution = adapter.run_in_session( + "sess-1", + "pwd", + working_directory="/var", + timeout=5000, + ) + + assert execution.logs.stdout[0].text == "/var" + assert execution.complete is not None + assert execution.complete.execution_time_in_millis == 3 + assert execution.exit_code == 0 + + +def test_sync_run_in_session_non_zero_exit_updates_exit_code() -> None: + cfg = ConnectionConfigSync(protocol="http", transport=_SseTransport()) + endpoint = SandboxEndpoint(endpoint="localhost:44772", port=44772) + adapter = CommandsAdapterSync(cfg, endpoint) + + execution = adapter.run_in_session("sess-2", "exit 7") + + assert execution.id == "sess-exec-2" + assert execution.error is not None + assert execution.error.value == "7" + assert execution.complete is None + assert execution.exit_code == 7 diff --git a/specs/execd-api.yaml b/specs/execd-api.yaml index c5f658b79..2a46a484b 100644 --- a/specs/execd-api.yaml +++ b/specs/execd-api.yaml @@ -291,7 +291,7 @@ paths: code executions. Request body is optional; an empty body uses default options (no cwd override). operationId: createSession tags: - - CodeInterpreting + - Command requestBody: required: false content: @@ -320,14 +320,14 @@ paths: /session/{sessionId}/run: post: - summary: Run code in bash session (run_in_session) + summary: Run command in bash session (run_in_session) description: | - Executes code in an existing bash session and streams the output in real-time via SSE + Executes a shell command in an existing bash session and streams the output in real-time via SSE (Server-Sent Events). The session must have been created by create_session. Supports optional working directory override and timeout (milliseconds). operationId: runInSession tags: - - CodeInterpreting + - Command parameters: - name: sessionId in: path @@ -344,15 +344,15 @@ paths: $ref: "#/components/schemas/RunInSessionRequest" examples: simple: - summary: Run shell code + summary: Run shell command value: - code: echo "Hello from session" + command: echo "Hello from session" with_options: summary: With cwd and timeout value: - code: ls -la + command: ls -la cwd: /workspace - timeout_ms: 30000 + timeout: 30000 responses: "200": description: Stream of execution events @@ -373,7 +373,7 @@ paths: and releases resources. The session ID must have been returned by create_session. operationId: deleteSession tags: - - CodeInterpreting + - Command parameters: - name: sessionId in: path @@ -1016,19 +1016,19 @@ components: RunInSessionRequest: type: object - description: Request to run code in an existing bash session + description: Request to run a command in an existing bash session required: - - code + - command properties: - code: + command: type: string - description: Shell code to execute in the session + description: Shell command to execute in the session example: echo "Hello" cwd: type: string description: Working directory override for this run (optional) example: /workspace - timeout_ms: + timeout: type: integer format: int64 minimum: 0 diff --git a/tests/csharp/OpenSandbox.E2ETests/SandboxE2ETests.cs b/tests/csharp/OpenSandbox.E2ETests/SandboxE2ETests.cs index e6c2a261f..7fa904852 100644 --- a/tests/csharp/OpenSandbox.E2ETests/SandboxE2ETests.cs +++ b/tests/csharp/OpenSandbox.E2ETests/SandboxE2ETests.cs @@ -368,7 +368,16 @@ public async Task Sandbox_Create_With_HostVolumeMount_ReadOnly() Assert.Equal("opensandbox-e2e-marker", marker.Logs.Stdout[0].Text); var write = await roSandbox.Commands.RunAsync($"touch {containerMountPath}/should-fail.txt"); - Assert.NotNull(write.Error); + var stat = await roSandbox.Commands.RunAsync( + $"test ! -e {containerMountPath}/should-fail.txt && echo OK"); + var writeWasRejected = write.Error is not null || write.Logs.Stderr.Count > 0; + var fileWasNotCreated = + stat.Error is null && + stat.Logs.Stdout.Count == 1 && + stat.Logs.Stdout[0].Text == "OK"; + Assert.True( + writeWasRejected || fileWasNotCreated, + "Write on read-only host volume should fail or leave no created file."); } finally { @@ -468,7 +477,16 @@ public async Task Sandbox_Create_With_PvcVolumeMount_ReadOnly() Assert.Equal("pvc-marker-data", marker.Logs.Stdout[0].Text); var write = await roSandbox.Commands.RunAsync($"touch {containerMountPath}/should-fail.txt"); - Assert.NotNull(write.Error); + var stat = await roSandbox.Commands.RunAsync( + $"test ! -e {containerMountPath}/should-fail.txt && echo OK"); + var writeWasRejected = write.Error is not null || write.Logs.Stderr.Count > 0; + var fileWasNotCreated = + stat.Error is null && + stat.Logs.Stdout.Count == 1 && + stat.Logs.Stdout[0].Text == "OK"; + Assert.True( + writeWasRejected || fileWasNotCreated, + "Write on read-only PVC volume should fail or leave no created file."); } finally { @@ -545,7 +563,7 @@ public async Task Sandbox_Create_With_PvcVolumeMount_SubPath() } [Fact(Timeout = 2 * 60 * 1000)] - public async Task Command_Execution_Success_Cwd_Background_Failure() + public async Task Command_Execution_Success_WorkingDirectory_Background_Failure() { var sandbox = _fixture.Sandbox; @@ -689,31 +707,34 @@ public async Task Command_Env_Injection() } [Fact(Timeout = 2 * 60 * 1000)] - public async Task Bash_Session_API_Cwd_And_Env_Persistence() + public async Task Bash_Session_API_WorkingDirectory_And_Env_Persistence() { var sandbox = _fixture.Sandbox; - var sid = await sandbox.Commands.CreateSessionAsync(new CreateSessionOptions { Cwd = "/tmp" }); + var sid = await sandbox.Commands.CreateSessionAsync(new CreateSessionOptions { WorkingDirectory = "/tmp" }); Assert.False(string.IsNullOrWhiteSpace(sid)); var run = await sandbox.Commands.RunInSessionAsync(sid, "pwd"); Assert.Null(run.Error); + Assert.Equal(0, run.ExitCode); var stdout = string.Join("", run.Logs.Stdout.Select(m => m.Text)).Trim(); Assert.Equal("/tmp", stdout); run = await sandbox.Commands.RunInSessionAsync( sid, "pwd", - options: new RunInSessionOptions { Cwd = "/var" }); + options: new RunInSessionOptions { WorkingDirectory = "/var" }); Assert.Null(run.Error); + Assert.Equal(0, run.ExitCode); stdout = string.Join("", run.Logs.Stdout.Select(m => m.Text)).Trim(); Assert.Equal("/var", stdout); run = await sandbox.Commands.RunInSessionAsync( sid, "pwd", - options: new RunInSessionOptions { Cwd = "/tmp" }); + options: new RunInSessionOptions { WorkingDirectory = "/tmp" }); Assert.Null(run.Error); + Assert.Equal(0, run.ExitCode); stdout = string.Join("", run.Logs.Stdout.Select(m => m.Text)).Trim(); Assert.Equal("/tmp", stdout); @@ -722,13 +743,22 @@ public async Task Bash_Session_API_Cwd_And_Env_Persistence() run = await sandbox.Commands.RunInSessionAsync(sid, "echo $E2E_SESSION_ENV"); Assert.Null(run.Error); + Assert.Equal(0, run.ExitCode); stdout = string.Join("", run.Logs.Stdout.Select(m => m.Text)).Trim(); Assert.Equal("session-env-ok", stdout); - var sid2 = await sandbox.Commands.CreateSessionAsync(new CreateSessionOptions { Cwd = "/var" }); + run = await sandbox.Commands.RunInSessionAsync(sid, "sh -c 'echo session-fail >&2; exit 7'"); + Assert.NotNull(run.Error); + Assert.Equal("CommandExecError", run.Error!.Name); + Assert.Equal("7", run.Error.Value); + Assert.Equal(7, run.ExitCode); + Assert.Null(run.Complete); + + var sid2 = await sandbox.Commands.CreateSessionAsync(new CreateSessionOptions { WorkingDirectory = "/var" }); Assert.False(string.IsNullOrWhiteSpace(sid2)); run = await sandbox.Commands.RunInSessionAsync(sid2, "pwd"); Assert.Null(run.Error); + Assert.Equal(0, run.ExitCode); stdout = string.Join("", run.Logs.Stdout.Select(m => m.Text)).Trim(); Assert.Equal("/var", stdout); @@ -855,6 +885,22 @@ await sandbox.Files.ReplaceContentsAsync(new[] var verify = await sandbox.Commands.RunAsync( $"test ! -d {testDir1} && test ! -d {testDir2} && echo OK", options: new RunCommandOptions { WorkingDirectory = "/tmp" }); + for (var attempt = 0; attempt < 3; attempt++) + { + var verified = + verify.Error is null && + verify.Logs.Stdout.Count == 1 && + verify.Logs.Stdout[0].Text == "OK"; + if (verified) + { + break; + } + + await Task.Delay(1000); + verify = await sandbox.Commands.RunAsync( + $"test ! -d {testDir1} && test ! -d {testDir2} && echo OK", + options: new RunCommandOptions { WorkingDirectory = "/tmp" }); + } Assert.Null(verify.Error); Assert.Single(verify.Logs.Stdout); Assert.Equal("OK", verify.Logs.Stdout[0].Text); diff --git a/tests/java/src/test/java/com/alibaba/opensandbox/e2e/SandboxE2ETest.java b/tests/java/src/test/java/com/alibaba/opensandbox/e2e/SandboxE2ETest.java index 2460037e4..87a06fcd0 100644 --- a/tests/java/src/test/java/com/alibaba/opensandbox/e2e/SandboxE2ETest.java +++ b/tests/java/src/test/java/com/alibaba/opensandbox/e2e/SandboxE2ETest.java @@ -864,7 +864,7 @@ void testSandboxCreateWithPvcVolumeMountSubPath() { @Test @Order(3) - @DisplayName("Command execution: success, cwd, background, failure") + @DisplayName("Command execution: success, working directory, background, failure") @Timeout(value = 2, unit = TimeUnit.MINUTES) void testBasicCommandExecution() { assertNotNull(sandbox); @@ -1051,9 +1051,9 @@ void testRunCommandWithEnvInjection() { @Test @Order(4) - @DisplayName("Bash session API: cwd and env persistence") + @DisplayName("Bash session API: working directory and env persistence") @Timeout(value = 2, unit = TimeUnit.MINUTES) - void testBashSessionApiCwdAndEnvPersistence() { + void testBashSessionApiWorkingDirectoryAndEnvPersistence() { assertNotNull(sandbox); String sid = sandbox.commands().createSession("/tmp"); @@ -1061,12 +1061,10 @@ void testBashSessionApiCwdAndEnvPersistence() { assertFalse(sid.isBlank()); Execution run = - sandbox - .commands() - .runInSession( - sid, - RunInSessionRequest.builder().code("pwd").build()); + sandbox.commands() + .runInSession(sid, RunInSessionRequest.builder().command("pwd").build()); assertNull(run.getError()); + assertEquals(0, run.getExitCode()); String stdout = run.getLogs().getStdout().stream() .map(OutputMessage::getText) @@ -1075,15 +1073,15 @@ void testBashSessionApiCwdAndEnvPersistence() { assertEquals("/tmp", stdout); run = - sandbox - .commands() + sandbox.commands() .runInSession( sid, RunInSessionRequest.builder() - .code("pwd") - .cwd("/var") + .command("pwd") + .workingDirectory("/var") .build()); assertNull(run.getError()); + assertEquals(0, run.getExitCode()); stdout = run.getLogs().getStdout().stream() .map(OutputMessage::getText) @@ -1092,15 +1090,15 @@ void testBashSessionApiCwdAndEnvPersistence() { assertEquals("/var", stdout); run = - sandbox - .commands() + sandbox.commands() .runInSession( sid, RunInSessionRequest.builder() - .code("pwd") - .cwd("/tmp") + .command("pwd") + .workingDirectory("/tmp") .build()); assertNull(run.getError()); + assertEquals(0, run.getExitCode()); stdout = run.getLogs().getStdout().stream() .map(OutputMessage::getText) @@ -1109,24 +1107,23 @@ void testBashSessionApiCwdAndEnvPersistence() { assertEquals("/tmp", stdout); run = - sandbox - .commands() + sandbox.commands() .runInSession( sid, RunInSessionRequest.builder() - .code("export E2E_SESSION_ENV=session-env-ok") + .command("export E2E_SESSION_ENV=session-env-ok") .build()); assertNull(run.getError()); run = - sandbox - .commands() + sandbox.commands() .runInSession( sid, RunInSessionRequest.builder() - .code("echo $E2E_SESSION_ENV") + .command("echo $E2E_SESSION_ENV") .build()); assertNull(run.getError()); + assertEquals(0, run.getExitCode()); stdout = run.getLogs().getStdout().stream() .map(OutputMessage::getText) @@ -1134,15 +1131,26 @@ void testBashSessionApiCwdAndEnvPersistence() { .trim(); assertEquals("session-env-ok", stdout); + run = + sandbox.commands() + .runInSession( + sid, + RunInSessionRequest.builder() + .command("sh -c 'echo session-fail >&2; exit 7'") + .build()); + assertNotNull(run.getError()); + assertEquals("CommandExecError", run.getError().getName()); + assertEquals("7", run.getError().getValue()); + assertEquals(Integer.valueOf(7), run.getExitCode()); + assertNull(run.getComplete()); + String sid2 = sandbox.commands().createSession("/var"); assertNotNull(sid2); run = - sandbox - .commands() - .runInSession( - sid2, - RunInSessionRequest.builder().code("pwd").build()); + sandbox.commands() + .runInSession(sid2, RunInSessionRequest.builder().command("pwd").build()); assertNull(run.getError()); + assertEquals(0, run.getExitCode()); stdout = run.getLogs().getStdout().stream() .map(OutputMessage::getText) @@ -1201,7 +1209,7 @@ void testCommandStatusAndLogs() throws Exception { @Order(5) @DisplayName("Filesystem operations: CRUD + replace/move/delete + mtime checks") @Timeout(value = 2, unit = TimeUnit.MINUTES) - void testBasicFilesystemOperations() { + void testBasicFilesystemOperations() throws Exception { assertNotNull(sandbox); String testDir1 = "/tmp/fs_test1_" + System.currentTimeMillis(); String testDir2 = "/tmp/fs_test2_" + System.currentTimeMillis(); @@ -1429,6 +1437,28 @@ void testBasicFilesystemOperations() { + " && echo OK") .workingDirectory("/tmp") .build()); + for (int attempt = 0; attempt < 3; attempt++) { + boolean verified = + verify.getError() == null + && verify.getLogs().getStdout().size() == 1 + && "OK".equals(verify.getLogs().getStdout().get(0).getText()); + if (verified) { + break; + } + Thread.sleep(1000); + verify = + sandbox.commands() + .run( + RunCommandRequest.builder() + .command( + "test ! -d " + + testDir1 + + " && test ! -d " + + testDir2 + + " && echo OK") + .workingDirectory("/tmp") + .build()); + } assertNull(verify.getError()); assertEquals(1, verify.getLogs().getStdout().size()); assertEquals("OK", verify.getLogs().getStdout().get(0).getText()); diff --git a/tests/java/src/test/java/com/alibaba/opensandbox/e2e/SandboxPoolPseudoDistributedE2ETest.java b/tests/java/src/test/java/com/alibaba/opensandbox/e2e/SandboxPoolPseudoDistributedE2ETest.java index 9586cc60a..6f1390810 100644 --- a/tests/java/src/test/java/com/alibaba/opensandbox/e2e/SandboxPoolPseudoDistributedE2ETest.java +++ b/tests/java/src/test/java/com/alibaba/opensandbox/e2e/SandboxPoolPseudoDistributedE2ETest.java @@ -121,11 +121,13 @@ void testCrossNodeAcquireAndResizePropagation() throws Exception { borrowed.add(sandbox); assertTrue(sandbox.isHealthy(), "cross-node acquire should return healthy sandbox"); Execution result = - sandbox.commands().run(RunCommandRequest.builder().command("echo dist-acquire-ok").build()); + sandbox.commands() + .run(RunCommandRequest.builder().command("echo dist-acquire-ok").build()); assertNotNull(result); assertNull(result.getError()); - // Resize from one node should propagate through shared store and be honored by leader reconcile. + // Resize from one node should propagate through shared store and be honored by leader + // reconcile. poolB.resize(0); int released = poolA.releaseAllIdle(); assertTrue(released >= 0, "releaseAllIdle should be non-negative"); @@ -215,14 +217,16 @@ void testOnlyOneOwnerWritesIdleInSteadyState() throws Exception { Thread.sleep(Duration.ofSeconds(3).toMillis()); Map putCounts = store.putCountsByOwner(poolName); - assertEquals(1, putCounts.size(), "idle writes in steady-state should come from one owner only"); + assertEquals( + 1, putCounts.size(), "idle writes in steady-state should come from one owner only"); assertTrue( putCounts.containsKey(store.currentOwner(poolName)), "steady-state writer should match current primary owner"); } @Test - @DisplayName("renew failure window drops extra create and orphan cleanup keeps remote count bounded") + @DisplayName( + "renew failure window drops extra create and orphan cleanup keeps remote count bounded") @Timeout(value = 6, unit = java.util.concurrent.TimeUnit.MINUTES) void testRenewFailureWindowAndOrphanCleanupBoundedResources() throws Exception { tag = "e2e-pool-renew-window-" + UUID.randomUUID().toString().substring(0, 8); @@ -327,11 +331,14 @@ void testNonPrimaryAcquireDuringLeadershipChanges() throws Exception { SandboxPool follower = currentOwner.equals(ownerA) ? poolB : poolA; String expectedNextOwner = currentOwner.equals(ownerA) ? ownerB : ownerA; - Sandbox sandbox = - follower.acquire(Duration.ofMinutes(3), AcquirePolicy.DIRECT_CREATE); + Sandbox sandbox = follower.acquire(Duration.ofMinutes(3), AcquirePolicy.DIRECT_CREATE); borrowed.add(sandbox); Execution execution = - sandbox.commands().run(RunCommandRequest.builder().command("echo follower-acquire-ok").build()); + sandbox.commands() + .run( + RunCommandRequest.builder() + .command("echo follower-acquire-ok") + .build()); assertNotNull(execution); assertNull(execution.getError()); @@ -387,8 +394,7 @@ void testNodeRestartRejoinsWithoutIdlePollution() throws Exception { Duration.ofSeconds(1), () -> poolA.snapshot().getIdleCount() <= 1); assertTrue( - countTaggedSandboxes(tag) <= 3, - "restart should not cause runaway idle pollution"); + countTaggedSandboxes(tag) <= 3, "restart should not cause runaway idle pollution"); } private SandboxPool createPool( @@ -488,20 +494,20 @@ private static void killAndCloseQuietly(Sandbox sandbox) { } /** - * A thread-safe in-process store that mimics distributed semantics: - * - shared idle membership by poolName - * - owner-based primary lock with TTL - * - shared maxIdle propagation + * A thread-safe in-process store that mimics distributed semantics: - shared idle membership by + * poolName - owner-based primary lock with TTL - shared maxIdle propagation */ static class PseudoDistributedPoolStateStore implements PoolStateStore { private static final Duration IDLE_TTL = Duration.ofHours(24); - private final Map> idleByPool = new LinkedHashMap<>(); + private final Map> idleByPool = + new LinkedHashMap<>(); private final Map locks = new LinkedHashMap<>(); private final Map maxIdleByPool = new LinkedHashMap<>(); private final Map> putCountByOwnerByPool = new HashMap<>(); private final Map> renewCountByOwnerByPool = new HashMap<>(); - private final Map> failRenewAfterPutByOwnerByPool = new HashMap<>(); + private final Map> failRenewAfterPutByOwnerByPool = + new HashMap<>(); @Override public synchronized String tryTakeIdle(String poolName) { @@ -555,7 +561,8 @@ public synchronized boolean tryAcquirePrimaryLock( } @Override - public synchronized boolean renewPrimaryLock(String poolName, String ownerId, Duration ttl) { + public synchronized boolean renewPrimaryLock( + String poolName, String ownerId, Duration ttl) { Instant now = Instant.now(); LockEntry lock = locks.get(poolName); if (lock == null || !lock.ownerId.equals(ownerId) || !lock.expiresAt.isAfter(now)) { @@ -643,9 +650,7 @@ private boolean shouldFailRenewByPutThreshold(String poolName, String ownerId) { return false; } int putCount = - putCountByOwnerByPool - .getOrDefault(poolName, Map.of()) - .getOrDefault(ownerId, 0); + putCountByOwnerByPool.getOrDefault(poolName, Map.of()).getOrDefault(ownerId, 0); return putCount >= threshold; } diff --git a/tests/javascript/tests/test_sandbox_e2e.test.ts b/tests/javascript/tests/test_sandbox_e2e.test.ts index e3cb36277..a791c2773 100644 --- a/tests/javascript/tests/test_sandbox_e2e.test.ts +++ b/tests/javascript/tests/test_sandbox_e2e.test.ts @@ -554,7 +554,7 @@ test("01g sandbox manager: list + get", async () => { expect(info.metadata?.tag).toBe("e2e-test"); }); -test("02 command execution: success, cwd, background, failure", async () => { +test("02 command execution: success, working directory, background, failure", async () => { if (!sandbox) throw new Error("sandbox not created"); const stdoutMessages: OutputMessage[] = []; @@ -695,23 +695,26 @@ test("02b command env injection", async () => { expect(injectedOutput).toBe(envValue); }); -test("02c bash session API: cwd and env persistence", async () => { +test("02c bash session API: working directory and env persistence", async () => { if (!sandbox) throw new Error("sandbox not created"); - const sid = await sandbox.commands.createSession({ cwd: "/tmp" }); + const sid = await sandbox.commands.createSession({ workingDirectory: "/tmp" }); expect(typeof sid).toBe("string"); expect(sid.length).toBeGreaterThan(0); let run = await sandbox.commands.runInSession(sid, "pwd"); expect(run.error).toBeUndefined(); + expect(run.exitCode).toBe(0); expect(run.logs.stdout.map((m) => m.text).join("").trim()).toBe("/tmp"); - run = await sandbox.commands.runInSession(sid, "pwd", { cwd: "/var" }); + run = await sandbox.commands.runInSession(sid, "pwd", { workingDirectory: "/var" }); expect(run.error).toBeUndefined(); + expect(run.exitCode).toBe(0); expect(run.logs.stdout.map((m) => m.text).join("").trim()).toBe("/var"); - run = await sandbox.commands.runInSession(sid, "pwd", { cwd: "/tmp" }); + run = await sandbox.commands.runInSession(sid, "pwd", { workingDirectory: "/tmp" }); expect(run.error).toBeUndefined(); + expect(run.exitCode).toBe(0); expect(run.logs.stdout.map((m) => m.text).join("").trim()).toBe("/tmp"); run = await sandbox.commands.runInSession( @@ -722,14 +725,25 @@ test("02c bash session API: cwd and env persistence", async () => { run = await sandbox.commands.runInSession(sid, "echo $E2E_SESSION_ENV"); expect(run.error).toBeUndefined(); + expect(run.exitCode).toBe(0); expect(run.logs.stdout.map((m) => m.text).join("").trim()).toBe( "session-env-ok" ); - const sid2 = await sandbox.commands.createSession({ cwd: "/var" }); + run = await sandbox.commands.runInSession( + sid, + "sh -c 'echo session-fail >&2; exit 7'", + ); + expect(run.error?.name).toBe("CommandExecError"); + expect(run.error?.value).toBe("7"); + expect(run.exitCode).toBe(7); + expect(run.complete).toBeUndefined(); + + const sid2 = await sandbox.commands.createSession({ workingDirectory: "/var" }); expect(typeof sid2).toBe("string"); run = await sandbox.commands.runInSession(sid2, "pwd"); expect(run.error).toBeUndefined(); + expect(run.exitCode).toBe(0); expect(run.logs.stdout.map((m) => m.text).join("").trim()).toBe("/var"); await sandbox.commands.deleteSession(sid); diff --git a/tests/python/tests/test_sandbox_e2e.py b/tests/python/tests/test_sandbox_e2e.py index 3b01b44ae..391060748 100644 --- a/tests/python/tests/test_sandbox_e2e.py +++ b/tests/python/tests/test_sandbox_e2e.py @@ -1013,63 +1013,79 @@ async def on_init(init: ExecutionInit): @pytest.mark.timeout(120) @pytest.mark.order(2) async def test_02c_bash_session_api(self): - """Test create_session / run_in_session / delete_session: verify cwd is passed and applied. - Current execd session API supports cwd only (no env); this test asserts cwd for both - create_session and run_in_session. + """Test create_session / run_in_session / delete_session. + + Verifies working directory passing, session env persistence, and run_in_session exit_code behavior. """ await self._ensure_sandbox_created() sandbox = TestSandboxE2E.sandbox logger.info("=" * 80) - logger.info("TEST 2c: Bash session API — verify cwd is passed and applied") + logger.info("TEST 2c: Bash session API — verify working directory is passed and applied") logger.info("=" * 80) - logger.info("Step 1: Create session with cwd=/tmp and verify session starts in that directory") - sid = await sandbox.commands.create_session(cwd="/tmp") + logger.info("Step 1: Create session with working_directory=/tmp and verify session starts in that directory") + sid = await sandbox.commands.create_session(working_directory="/tmp") assert sid is not None and isinstance(sid, str) and len(sid) > 0 out_pwd = await sandbox.commands.run_in_session(sid, "pwd") assert out_pwd.error is None, f"pwd failed: {out_pwd.error}" + assert out_pwd.exit_code == 0 pwd_line = "".join(m.text for m in out_pwd.logs.stdout).strip() - assert pwd_line == "/tmp", f"create_session(cwd=/tmp) should run in /tmp, got: {pwd_line!r}" - logger.info("✓ create_session(cwd=/tmp) applied: pwd => %s", pwd_line) + assert pwd_line == "/tmp", f"create_session(working_directory=/tmp) should run in /tmp, got: {pwd_line!r}" + logger.info("✓ create_session(working_directory=/tmp) applied: pwd => %s", pwd_line) - logger.info("Step 2: run_in_session with cwd override — run in /var and verify") - out_var = await sandbox.commands.run_in_session(sid, "pwd", cwd="/var") + logger.info("Step 2: run_in_session with working_directory override — run in /var and verify") + out_var = await sandbox.commands.run_in_session(sid, "pwd", working_directory="/var") assert out_var.error is None + assert out_var.exit_code == 0 var_line = "".join(m.text for m in out_var.logs.stdout).strip() - assert var_line == "/var", f"run_in_session(..., cwd=/var) should run in /var, got: {var_line!r}" - logger.info("✓ run_in_session(..., cwd=/var) applied: pwd => %s", var_line) + assert var_line == "/var", f"run_in_session(..., working_directory=/var) should run in /var, got: {var_line!r}" + logger.info("✓ run_in_session(..., working_directory=/var) applied: pwd => %s", var_line) - logger.info("Step 3: run_in_session with cwd=/tmp — verify override per run") - out_tmp = await sandbox.commands.run_in_session(sid, "pwd", cwd="/tmp") + logger.info("Step 3: run_in_session with working_directory=/tmp — verify override per run") + out_tmp = await sandbox.commands.run_in_session(sid, "pwd", working_directory="/tmp") assert out_tmp.error is None + assert out_tmp.exit_code == 0 tmp_line = "".join(m.text for m in out_tmp.logs.stdout).strip() - assert tmp_line == "/tmp", f"run_in_session(..., cwd=/tmp) should run in /tmp, got: {tmp_line!r}" - logger.info("✓ run_in_session(..., cwd=/tmp) applied: pwd => %s", tmp_line) + assert tmp_line == "/tmp", f"run_in_session(..., working_directory=/tmp) should run in /tmp, got: {tmp_line!r}" + logger.info("✓ run_in_session(..., working_directory=/tmp) applied: pwd => %s", tmp_line) logger.info("Step 3b: Export env in one run, read in next run — verify session state (env) persists") await sandbox.commands.run_in_session(sid, "export E2E_SESSION_ENV=session-env-ok") out_env = await sandbox.commands.run_in_session(sid, "echo $E2E_SESSION_ENV") assert out_env.error is None + assert out_env.exit_code == 0 env_line = "".join(m.text for m in out_env.logs.stdout).strip() assert env_line == "session-env-ok", f"env set in previous run should be visible, got: {env_line!r}" logger.info("✓ session env persists across run_in_session: echo $E2E_SESSION_ENV => %s", env_line) - logger.info("Step 4: New session with cwd=/var — verify create_session cwd again") - sid2 = await sandbox.commands.create_session(cwd="/var") + logger.info("Step 3c: Failing subprocess in session should propagate non-zero exit_code") + fail = await sandbox.commands.run_in_session( + sid, "sh -c 'echo session-fail >&2; exit 7'" + ) + assert fail.error is not None + assert fail.error.name == "CommandExecError" + assert fail.error.value == "7" + assert fail.exit_code == 7 + assert fail.complete is None + logger.info("✓ run_in_session failure propagated exit_code=7") + + logger.info("Step 4: New session with working_directory=/var — verify create_session working directory again") + sid2 = await sandbox.commands.create_session(working_directory="/var") assert sid2 is not None out_var2 = await sandbox.commands.run_in_session(sid2, "pwd") assert out_var2.error is None + assert out_var2.exit_code == 0 var2_line = "".join(m.text for m in out_var2.logs.stdout).strip() - assert var2_line == "/var", f"create_session(cwd=/var) should run in /var, got: {var2_line!r}" - logger.info("✓ create_session(cwd=/var) applied: pwd => %s", var2_line) + assert var2_line == "/var", f"create_session(working_directory=/var) should run in /var, got: {var2_line!r}" + logger.info("✓ create_session(working_directory=/var) applied: pwd => %s", var2_line) logger.info("Step 5: Delete both sessions") await sandbox.commands.delete_session(sid) await sandbox.commands.delete_session(sid2) logger.info("✓ Sessions deleted") - logger.info("TEST 2c PASSED: cwd passing verified for create_session and run_in_session") + logger.info("TEST 2c PASSED: working directory passing verified for create_session and run_in_session") @pytest.mark.timeout(120) @pytest.mark.order(3) @@ -1349,6 +1365,19 @@ async def test_03_basic_filesystem_operations(self): f"test ! -d {test_dir1} && test ! -d {test_dir2} && echo OK", opts=RunCommandOpts(working_directory="/tmp"), ) + for _ in range(3): + verified = ( + verify_dirs_deleted.error is None + and len(verify_dirs_deleted.logs.stdout) == 1 + and verify_dirs_deleted.logs.stdout[0].text == "OK" + ) + if verified: + break + await asyncio.sleep(1) + verify_dirs_deleted = await sandbox.commands.run( + f"test ! -d {test_dir1} && test ! -d {test_dir2} && echo OK", + opts=RunCommandOpts(working_directory="/tmp"), + ) assert verify_dirs_deleted.error is None assert len(verify_dirs_deleted.logs.stdout) == 1 assert verify_dirs_deleted.logs.stdout[0].text == "OK" diff --git a/tests/python/tests/test_sandbox_e2e_sync.py b/tests/python/tests/test_sandbox_e2e_sync.py index 34b982853..7a3cc58b3 100644 --- a/tests/python/tests/test_sandbox_e2e_sync.py +++ b/tests/python/tests/test_sandbox_e2e_sync.py @@ -857,64 +857,78 @@ def on_init(init): @pytest.mark.timeout(120) @pytest.mark.order(2) def test_02c_bash_session_api(self) -> None: - """Test create_session / run_in_session / delete_session: verify cwd is passed and applied (sync). - Current execd session API supports cwd only (no env); this test asserts cwd for both - create_session and run_in_session. + """Test create_session / run_in_session / delete_session (sync). + + Verifies working directory passing, session env persistence, and run_in_session exit_code behavior. """ TestSandboxE2ESync._ensure_sandbox_created() sandbox = TestSandboxE2ESync.sandbox assert sandbox is not None logger.info("=" * 80) - logger.info("TEST 2c: Bash session API (sync) — verify cwd is passed and applied") + logger.info("TEST 2c: Bash session API (sync) — verify working directory is passed and applied") logger.info("=" * 80) - logger.info("Step 1: Create session with cwd=/tmp and verify session starts in that directory") - sid = sandbox.commands.create_session(cwd="/tmp") + logger.info("Step 1: Create session with working_directory=/tmp and verify session starts in that directory") + sid = sandbox.commands.create_session(working_directory="/tmp") assert sid is not None and isinstance(sid, str) and len(sid) > 0 out_pwd = sandbox.commands.run_in_session(sid, "pwd") assert out_pwd.error is None, f"pwd failed: {out_pwd.error}" + assert out_pwd.exit_code == 0 pwd_line = "".join(m.text for m in out_pwd.logs.stdout).strip() - assert pwd_line == "/tmp", f"create_session(cwd=/tmp) should run in /tmp, got: {pwd_line!r}" - logger.info("✓ create_session(cwd=/tmp) applied: pwd => %s", pwd_line) + assert pwd_line == "/tmp", f"create_session(working_directory=/tmp) should run in /tmp, got: {pwd_line!r}" + logger.info("✓ create_session(working_directory=/tmp) applied: pwd => %s", pwd_line) - logger.info("Step 2: run_in_session with cwd override — run in /var and verify") - out_var = sandbox.commands.run_in_session(sid, "pwd", cwd="/var") + logger.info("Step 2: run_in_session with working_directory override — run in /var and verify") + out_var = sandbox.commands.run_in_session(sid, "pwd", working_directory="/var") assert out_var.error is None + assert out_var.exit_code == 0 var_line = "".join(m.text for m in out_var.logs.stdout).strip() - assert var_line == "/var", f"run_in_session(..., cwd=/var) should run in /var, got: {var_line!r}" - logger.info("✓ run_in_session(..., cwd=/var) applied: pwd => %s", var_line) + assert var_line == "/var", f"run_in_session(..., working_directory=/var) should run in /var, got: {var_line!r}" + logger.info("✓ run_in_session(..., working_directory=/var) applied: pwd => %s", var_line) - logger.info("Step 3: run_in_session with cwd=/tmp — verify override per run") - out_tmp = sandbox.commands.run_in_session(sid, "pwd", cwd="/tmp") + logger.info("Step 3: run_in_session with working_directory=/tmp — verify override per run") + out_tmp = sandbox.commands.run_in_session(sid, "pwd", working_directory="/tmp") assert out_tmp.error is None + assert out_tmp.exit_code == 0 tmp_line = "".join(m.text for m in out_tmp.logs.stdout).strip() - assert tmp_line == "/tmp", f"run_in_session(..., cwd=/tmp) should run in /tmp, got: {tmp_line!r}" - logger.info("✓ run_in_session(..., cwd=/tmp) applied: pwd => %s", tmp_line) + assert tmp_line == "/tmp", f"run_in_session(..., working_directory=/tmp) should run in /tmp, got: {tmp_line!r}" + logger.info("✓ run_in_session(..., working_directory=/tmp) applied: pwd => %s", tmp_line) logger.info("Step 3b: Export env in one run, read in next run — verify session state (env) persists") sandbox.commands.run_in_session(sid, "export E2E_SESSION_ENV=session-env-ok") out_env = sandbox.commands.run_in_session(sid, "echo $E2E_SESSION_ENV") assert out_env.error is None + assert out_env.exit_code == 0 env_line = "".join(m.text for m in out_env.logs.stdout).strip() assert env_line == "session-env-ok", f"env set in previous run should be visible, got: {env_line!r}" logger.info("✓ session env persists across run_in_session: echo $E2E_SESSION_ENV => %s", env_line) - logger.info("Step 4: New session with cwd=/var — verify create_session cwd again") - sid2 = sandbox.commands.create_session(cwd="/var") + logger.info("Step 3c: Failing subprocess in session should propagate non-zero exit_code") + fail = sandbox.commands.run_in_session(sid, "sh -c 'echo session-fail >&2; exit 7'") + assert fail.error is not None + assert fail.error.name == "CommandExecError" + assert fail.error.value == "7" + assert fail.exit_code == 7 + assert fail.complete is None + logger.info("✓ run_in_session failure propagated exit_code=7") + + logger.info("Step 4: New session with working_directory=/var — verify create_session working directory again") + sid2 = sandbox.commands.create_session(working_directory="/var") assert sid2 is not None out_var2 = sandbox.commands.run_in_session(sid2, "pwd") assert out_var2.error is None + assert out_var2.exit_code == 0 var2_line = "".join(m.text for m in out_var2.logs.stdout).strip() - assert var2_line == "/var", f"create_session(cwd=/var) should run in /var, got: {var2_line!r}" - logger.info("✓ create_session(cwd=/var) applied: pwd => %s", var2_line) + assert var2_line == "/var", f"create_session(working_directory=/var) should run in /var, got: {var2_line!r}" + logger.info("✓ create_session(working_directory=/var) applied: pwd => %s", var2_line) logger.info("Step 5: Delete both sessions") sandbox.commands.delete_session(sid) sandbox.commands.delete_session(sid2) logger.info("✓ Sessions deleted") - logger.info("TEST 2c PASSED: cwd passing verified for create_session and run_in_session (sync)") + logger.info("TEST 2c PASSED: working directory passing verified for create_session and run_in_session (sync)") @pytest.mark.timeout(120) @pytest.mark.order(3) @@ -1165,6 +1179,19 @@ def test_03_basic_filesystem_operations(self) -> None: f"test ! -d {test_dir1} && test ! -d {test_dir2} && echo OK", opts=RunCommandOpts(working_directory="/tmp"), ) + for _ in range(3): + verified = ( + verify_dirs_deleted.error is None + and len(verify_dirs_deleted.logs.stdout) == 1 + and verify_dirs_deleted.logs.stdout[0].text == "OK" + ) + if verified: + break + time.sleep(1) + verify_dirs_deleted = sandbox.commands.run( + f"test ! -d {test_dir1} && test ! -d {test_dir2} && echo OK", + opts=RunCommandOpts(working_directory="/tmp"), + ) assert verify_dirs_deleted.error is None assert len(verify_dirs_deleted.logs.stdout) == 1 assert verify_dirs_deleted.logs.stdout[0].text == "OK" diff --git a/tests/python/tests/test_sandbox_manager_e2e_sync.py b/tests/python/tests/test_sandbox_manager_e2e_sync.py index b5a5336c6..c8c438149 100644 --- a/tests/python/tests/test_sandbox_manager_e2e_sync.py +++ b/tests/python/tests/test_sandbox_manager_e2e_sync.py @@ -23,12 +23,11 @@ We create 3 dedicated sandboxes per run to keep assertions deterministic. """ +import logging import time from datetime import timedelta from uuid import uuid4 -import logging - import pytest from opensandbox import SandboxManagerSync, SandboxSync from opensandbox.exceptions import SandboxApiException