Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions components/execd/pkg/web/controller/codeinterpreting.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ func (c *CodeInterpretingController) CreateSession() {
c.RespondSuccess(model.CreateSessionResponse{SessionID: sessionID})
}

// RunInSession runs code in an existing bash session and streams output via SSE (run_in_session API).
// RunInSession runs a command in an existing bash session and streams output via SSE (run_in_session API).
func (c *CodeInterpretingController) RunInSession() {
sessionID := c.ctx.Param("sessionId")
if sessionID == "" {
Expand Down Expand Up @@ -295,11 +295,11 @@ func (c *CodeInterpretingController) RunInSession() {
return
}

timeout := time.Duration(request.TimeoutMs) * time.Millisecond
timeout := time.Duration(request.Timeout) * time.Millisecond
runReq := &runtime.ExecuteCodeRequest{
Language: runtime.Bash,
Context: sessionID,
Code: request.Code,
Code: request.Command,
Cwd: request.Cwd,
Timeout: timeout,
}
Expand Down
8 changes: 4 additions & 4 deletions components/execd/pkg/web/model/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,11 @@ type CreateSessionResponse struct {
SessionID string `json:"session_id"`
}

// RunInSessionRequest is the request body for running code in an existing session.
// RunInSessionRequest is the request body for running a command in an existing session.
type RunInSessionRequest struct {
Code string `json:"code" validate:"required"`
Cwd string `json:"cwd,omitempty"`
TimeoutMs int64 `json:"timeout_ms,omitempty" validate:"omitempty,gte=0"`
Command string `json:"command" validate:"required"`
Cwd string `json:"cwd,omitempty"`
Timeout int64 `json:"timeout,omitempty" validate:"omitempty,gte=0"`
}

// Validate validates RunInSessionRequest.
Expand Down
1 change: 1 addition & 0 deletions sdks/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
"packageManager": "pnpm@9.15.0",
"scripts": {
"build:js": "pnpm -r --filter @alibaba-group/opensandbox-code-interpreter... --sort run build",
"test:js": "pnpm -r --filter @alibaba-group/opensandbox-code-interpreter... run test",
"lint:js": "pnpm -r --filter @alibaba-group/opensandbox-code-interpreter... run lint",
"clean:js": "pnpm -r --filter @alibaba-group/opensandbox-code-interpreter... --sort run clean",
"publish:js": "pnpm -r --filter @alibaba-group/opensandbox-code-interpreter... publish --access public --no-git-checks"
Expand Down
265 changes: 143 additions & 122 deletions sdks/sandbox/csharp/src/OpenSandbox/Adapters/CommandsAdapter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -59,48 +59,15 @@ public async IAsyncEnumerable<ServerStreamEvent> RunStreamAsync(
RunCommandOptions? options = null,
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
if (options?.Gid.HasValue == true && options.Uid.HasValue != true)
{
throw new InvalidArgumentException("uid is required when gid is provided");
}
if (options?.Uid.HasValue == true && options.Uid.Value < 0)
{
throw new InvalidArgumentException("uid must be >= 0");
}
if (options?.Gid.HasValue == true && options.Gid.Value < 0)
{
throw new InvalidArgumentException("gid must be >= 0");
}

var url = $"{_baseUrl}/command";
ValidateRunOptions(options);
_logger.LogDebug("Running command stream (commandLength={CommandLength})", command.Length);
var requestBody = new RunCommandRequest
{
Command = command,
Cwd = options?.WorkingDirectory,
Background = options?.Background,
Timeout = options?.TimeoutSeconds.HasValue == true ? options.TimeoutSeconds.Value * 1000L : null,
Uid = options?.Uid,
Gid = options?.Gid,
Envs = options?.Envs
};

var json = JsonSerializer.Serialize(requestBody, JsonOptions);
using var request = new HttpRequestMessage(HttpMethod.Post, url)
{
Content = new StringContent(json, Encoding.UTF8, "application/json")
};

request.Headers.Accept.Add(new System.Net.Http.Headers.MediaTypeWithQualityHeaderValue("text/event-stream"));
var spec = new StreamingRequestSpec(
Url: $"{_baseUrl}/command",
Body: BuildRunCommandRequest(command, options),
ErrorMessage: "Run command failed");

foreach (var header in _headers)
{
request.Headers.TryAddWithoutValidation(header.Key, header.Value);
}

using var response = await _sseHttpClient.SendAsync(request, HttpCompletionOption.ResponseHeadersRead, cancellationToken).ConfigureAwait(false);

await foreach (var ev in SseParser.ParseJsonEventStreamAsync<ServerStreamEvent>(response, "Run command failed", cancellationToken).ConfigureAwait(false))
await foreach (var ev in StreamExecutionAsync(spec, cancellationToken).ConfigureAwait(false))
{
yield return ev;
}
Expand All @@ -113,36 +80,11 @@ public async Task<Execution> RunAsync(
CancellationToken cancellationToken = default)
{
_logger.LogDebug("Running command (commandLength={CommandLength})", command.Length);
var execution = new Execution();
var dispatcher = new ExecutionEventDispatcher(execution, handlers);

await foreach (var ev in RunStreamAsync(command, options, cancellationToken).ConfigureAwait(false))
{
// Keep legacy behavior: if server sends "init" with empty id, preserve previous id
if (ev.Type == ServerStreamEventTypes.Init && string.IsNullOrEmpty(ev.Text) && !string.IsNullOrEmpty(execution.Id))
{
ev.Text = execution.Id;
}

await dispatcher.DispatchAsync(ev).ConfigureAwait(false);
}

if (!(options?.Background ?? false))
{
if (execution.Error != null)
{
if (int.TryParse(execution.Error.Value, out var exitCode))
{
execution.ExitCode = exitCode;
}
}
else if (execution.Complete != null)
{
execution.ExitCode = 0;
}
}

return execution;
return await ConsumeExecutionAsync(
RunStreamAsync(command, options, cancellationToken),
handlers,
inferExitCode: !(options?.Background ?? false),
cancellationToken).ConfigureAwait(false);
}

public async Task InterruptAsync(string sessionId, CancellationToken cancellationToken = default)
Expand All @@ -156,14 +98,8 @@ public async Task<string> CreateSessionAsync(
CreateSessionOptions? options = null,
CancellationToken cancellationToken = default)
{
object? body = null;
if (!string.IsNullOrEmpty(options?.Cwd))
{
body = new { cwd = options.Cwd };
}

_logger.LogDebug("Creating bash session (cwd={Cwd})", options?.Cwd);
var response = await _client.PostAsync<CreateSessionResponse>("/session", body, cancellationToken).ConfigureAwait(false);
_logger.LogDebug("Creating bash session (workingDirectory={WorkingDirectory})", options?.WorkingDirectory);
var response = await _client.PostAsync<CreateSessionResponse>("/session", BuildCreateSessionBody(options), cancellationToken).ConfigureAwait(false);
if (string.IsNullOrEmpty(response?.SessionId))
{
throw new SandboxApiException(
Expand All @@ -177,52 +113,33 @@ public async Task<string> CreateSessionAsync(

public async IAsyncEnumerable<ServerStreamEvent> RunInSessionStreamAsync(
string sessionId,
string code,
string command,
RunInSessionOptions? options = null,
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
if (string.IsNullOrWhiteSpace(sessionId))
{
throw new InvalidArgumentException("sessionId cannot be empty");
}
if (string.IsNullOrWhiteSpace(code))
{
throw new InvalidArgumentException("code cannot be empty");
}

var path = $"/session/{Uri.EscapeDataString(sessionId)}/run";
var url = $"{_baseUrl}{path}";
var requestBody = new RunInSessionRequest
{
Code = code,
Cwd = options?.Cwd,
TimeoutMs = options?.TimeoutMs
};

var json = JsonSerializer.Serialize(requestBody, JsonOptions);
using var request = new HttpRequestMessage(HttpMethod.Post, url)
if (string.IsNullOrWhiteSpace(command))
{
Content = new StringContent(json, Encoding.UTF8, "application/json")
};

request.Headers.Accept.Add(new System.Net.Http.Headers.MediaTypeWithQualityHeaderValue("text/event-stream"));

foreach (var header in _headers)
{
request.Headers.TryAddWithoutValidation(header.Key, header.Value);
throw new InvalidArgumentException("command cannot be empty");
}

using var response = await _sseHttpClient.SendAsync(request, HttpCompletionOption.ResponseHeadersRead, cancellationToken).ConfigureAwait(false);
var spec = new StreamingRequestSpec(
Url: $"{_baseUrl}/session/{Uri.EscapeDataString(sessionId)}/run",
Body: BuildRunInSessionRequest(command, options),
ErrorMessage: "Run in session failed");

await foreach (var ev in SseParser.ParseJsonEventStreamAsync<ServerStreamEvent>(response, "Run in session failed", cancellationToken).ConfigureAwait(false))
await foreach (var ev in StreamExecutionAsync(spec, cancellationToken).ConfigureAwait(false))
{
yield return ev;
}
}

public async Task<Execution> RunInSessionAsync(
string sessionId,
string code,
string command,
RunInSessionOptions? options = null,
ExecutionHandlers? handlers = null,
CancellationToken cancellationToken = default)
Expand All @@ -231,26 +148,17 @@ public async Task<Execution> RunInSessionAsync(
{
throw new InvalidArgumentException("sessionId cannot be empty");
}
if (string.IsNullOrWhiteSpace(code))
if (string.IsNullOrWhiteSpace(command))
{
throw new InvalidArgumentException("code cannot be empty");
}

_logger.LogDebug("Running in session: {SessionId} (codeLength={CodeLength})", sessionId, code.Length);
var execution = new Execution();
var dispatcher = new ExecutionEventDispatcher(execution, handlers);

await foreach (var ev in RunInSessionStreamAsync(sessionId, code, options, cancellationToken).ConfigureAwait(false))
{
if (ev.Type == ServerStreamEventTypes.Init && string.IsNullOrEmpty(ev.Text) && !string.IsNullOrEmpty(execution.Id))
{
ev.Text = execution.Id;
}

await dispatcher.DispatchAsync(ev).ConfigureAwait(false);
throw new InvalidArgumentException("command cannot be empty");
}

return execution;
_logger.LogDebug("Running in session: {SessionId} (commandLength={CommandLength})", sessionId, command.Length);
return await ConsumeExecutionAsync(
RunInSessionStreamAsync(sessionId, command, options, cancellationToken),
handlers,
inferExitCode: true,
cancellationToken).ConfigureAwait(false);
}

public async Task DeleteSessionAsync(string sessionId, CancellationToken cancellationToken = default)
Expand Down Expand Up @@ -312,6 +220,119 @@ public async Task<CommandLogs> GetBackgroundCommandLogsAsync(
};
}

private async IAsyncEnumerable<ServerStreamEvent> StreamExecutionAsync(
StreamingRequestSpec spec,
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
var json = JsonSerializer.Serialize(spec.Body, JsonOptions);
using var request = new HttpRequestMessage(HttpMethod.Post, spec.Url)
{
Content = new StringContent(json, Encoding.UTF8, "application/json")
};

request.Headers.Accept.Add(new System.Net.Http.Headers.MediaTypeWithQualityHeaderValue("text/event-stream"));

foreach (var header in _headers)
{
request.Headers.TryAddWithoutValidation(header.Key, header.Value);
}

using var response = await _sseHttpClient.SendAsync(request, HttpCompletionOption.ResponseHeadersRead, cancellationToken).ConfigureAwait(false);

await foreach (var ev in SseParser.ParseJsonEventStreamAsync<ServerStreamEvent>(response, spec.ErrorMessage, cancellationToken).ConfigureAwait(false))
{
yield return ev;
}
}

private static void ValidateRunOptions(RunCommandOptions? options)
{
if (options?.Gid.HasValue == true && options.Uid.HasValue != true)
{
throw new InvalidArgumentException("uid is required when gid is provided");
}
if (options?.Uid.HasValue == true && options.Uid.Value < 0)
{
throw new InvalidArgumentException("uid must be >= 0");
}
if (options?.Gid.HasValue == true && options.Gid.Value < 0)
{
throw new InvalidArgumentException("gid must be >= 0");
}
}

private static object? BuildCreateSessionBody(CreateSessionOptions? options)
{
return !string.IsNullOrEmpty(options?.WorkingDirectory) ? new { cwd = options.WorkingDirectory } : null;
}

private static RunCommandRequest BuildRunCommandRequest(string command, RunCommandOptions? options)
{
return new RunCommandRequest
{
Command = command,
Cwd = options?.WorkingDirectory,
Background = options?.Background,
Timeout = options?.TimeoutSeconds.HasValue == true ? options.TimeoutSeconds.Value * 1000L : null,
Uid = options?.Uid,
Gid = options?.Gid,
Envs = options?.Envs
};
}

private static RunInSessionRequest BuildRunInSessionRequest(string command, RunInSessionOptions? options)
{
return new RunInSessionRequest
{
Command = command,
Cwd = options?.WorkingDirectory,
Timeout = options?.Timeout
};
}

private static int? InferForegroundExitCode(Execution execution)
{
if (execution.Error != null)
{
return int.TryParse(execution.Error.Value, out var exitCode) ? exitCode : null;
}

return execution.Complete != null ? 0 : null;
}

private static void PreserveLegacyInitId(ServerStreamEvent ev, Execution execution)
{
if (ev.Type == ServerStreamEventTypes.Init && string.IsNullOrEmpty(ev.Text) && !string.IsNullOrEmpty(execution.Id))
{
ev.Text = execution.Id;
}
}

private async Task<Execution> ConsumeExecutionAsync(
IAsyncEnumerable<ServerStreamEvent> stream,
ExecutionHandlers? handlers,
bool inferExitCode,
CancellationToken cancellationToken)
{
var execution = new Execution();
var dispatcher = new ExecutionEventDispatcher(execution, handlers);

await foreach (var ev in stream.WithCancellation(cancellationToken).ConfigureAwait(false))
{
PreserveLegacyInitId(ev, execution);
await dispatcher.DispatchAsync(ev).ConfigureAwait(false);
}

if (inferExitCode)
{
execution.ExitCode = InferForegroundExitCode(execution);
}

return execution;
}

private sealed record StreamingRequestSpec(string Url, object Body, string ErrorMessage);

private static SandboxApiException CreateApiException(HttpResponseMessage response, string content)
{
var requestId = response.Headers.TryGetValues(Constants.RequestIdHeader, out var values)
Expand Down
Loading
Loading