Skip to content

[Feature] Strip ANSI codes from run logs and store them as plain text instead of bytes #2876

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Jul 11, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 0 additions & 9 deletions frontend/src/libs/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -91,15 +91,6 @@ export const riseRouterException = (status = 404, json = 'Not Found'): never =>
throw new Response(json, { status });
};

export const base64ToArrayBuffer = (base64: string) => {
const binaryString = atob(base64);
const bytes = new Uint8Array(binaryString.length);
for (let i = 0; i < binaryString.length; i++) {
bytes[i] = binaryString.charCodeAt(i);
}
return bytes;
};

export const isValidUrl = (urlString: string) => {
try {
return Boolean(new URL(urlString));
Expand Down
2 changes: 1 addition & 1 deletion frontend/src/pages/Runs/Details/Logs/index.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ export const Logs: React.FC<IProps> = ({ className, projectName, runName, jobSub

const writeDataToTerminal = (logs: ILogItem[]) => {
logs.forEach((logItem) => {
terminalInstance.current.write(logItem.message);
terminalInstance.current.write(logItem.message.replace(/(?<!\r)\n/g, '\r\n'));
});

fitAddonInstance.current.fit();
Expand Down
3 changes: 1 addition & 2 deletions frontend/src/services/project.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import { API } from 'api';
import { createApi, fetchBaseQuery } from '@reduxjs/toolkit/query/react';

import { base64ToArrayBuffer } from 'libs';
import fetchBaseQueryHeaders from 'libs/fetchBaseQueryHeaders';

// Helper function to transform backend response to frontend format
Expand Down Expand Up @@ -131,7 +130,7 @@ export const projectApi = createApi({
transformResponse: (response: { logs: ILogItem[]; next_token: string }) => {
const logs = response.logs.map((logItem) => ({
...logItem,
message: base64ToArrayBuffer(logItem.message as string),
message: logItem.message,
}));

return {
Expand Down
2 changes: 1 addition & 1 deletion frontend/src/types/log.d.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
declare interface ILogItem {
log_source: 'stdout' | 'stderr';
timestamp: string;
message: string | Uint8Array;
message: string;
}

declare type TRequestLogsParams = {
Expand Down
4 changes: 3 additions & 1 deletion runner/go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/dstackai/dstack/runner

go 1.23
go 1.23.8

require (
github.com/alexellis/go-execute/v2 v2.2.1
Expand All @@ -10,6 +10,7 @@ require (
github.com/docker/docker v26.0.0+incompatible
github.com/docker/go-connections v0.5.0
github.com/docker/go-units v0.5.0
github.com/dstackai/ansistrip v0.0.6
github.com/go-git/go-git/v5 v5.12.0
github.com/golang/gddo v0.0.0-20210115222349-20d68f94ee1f
github.com/gorilla/websocket v1.5.1
Expand Down Expand Up @@ -62,6 +63,7 @@ require (
github.com/russross/blackfriday/v2 v2.1.0 // indirect
github.com/sergi/go-diff v1.3.2-0.20230802210424-5b0b94c5c0d3 // indirect
github.com/skeema/knownhosts v1.2.2 // indirect
github.com/tidwall/btree v1.7.0 // indirect
github.com/tklauser/go-sysconf v0.3.12 // indirect
github.com/tklauser/numcpus v0.6.1 // indirect
github.com/ulikunitz/xz v0.5.12 // indirect
Expand Down
4 changes: 4 additions & 0 deletions runner/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ github.com/docker/go-connections v0.5.0 h1:USnMq7hx7gwdVZq1L49hLXaFtUdTADjXGp+uj
github.com/docker/go-connections v0.5.0/go.mod h1:ov60Kzw0kKElRwhNs9UlUHAE/F9Fe6GLaXnqyDdmEXc=
github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4=
github.com/docker/go-units v0.5.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk=
github.com/dstackai/ansistrip v0.0.6 h1:6qqeDNWt8NoqfkY1CxKUvdHpJzBl89LOE3wMwptVpaI=
github.com/dstackai/ansistrip v0.0.6/go.mod h1:w3ejXI0twxDv6bPXhkOaPeYdbwz2nwcrcvFoZGqi9F0=
github.com/ebitengine/purego v0.8.1 h1:sdRKd6plj7KYW33EH5As6YKfe8m9zbN9JMrOjNVF/BE=
github.com/ebitengine/purego v0.8.1/go.mod h1:iIjxzd6CiRiOG0UyXP+V1+jWqUXVjPKLAI0mRfJZTmQ=
github.com/elazarl/goproxy v0.0.0-20230808193330-2592e75ae04a h1:mATvB/9r/3gvcejNsXKSkQ6lcIaNec2nyfOdlTBR2lU=
Expand Down Expand Up @@ -171,6 +173,8 @@ github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81P
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/tidwall/btree v1.7.0 h1:L1fkJH/AuEh5zBnnBbmTwQ5Lt+bRJ5A8EWecslvo9iI=
github.com/tidwall/btree v1.7.0/go.mod h1:twD9XRA5jj9VUQGELzDO4HPQTNJsoWWfYEL+EUQ2cKY=
github.com/tklauser/go-sysconf v0.3.12 h1:0QaGUFOdQaIVdPgfITYzaTegZvdCjmYO52cSFAEVmqU=
github.com/tklauser/go-sysconf v0.3.12/go.mod h1:Ho14jnntGE1fpdOqQEEaiKRpvIavV0hSfmBq8nJbHYI=
github.com/tklauser/numcpus v0.6.1 h1:ng9scYS7az0Bk4OZLvrNXNSAO2Pxr1XXRAPyjhIx+Fk=
Expand Down
2 changes: 1 addition & 1 deletion runner/internal/executor/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (

type Executor interface {
GetHistory(timestamp int64) *schemas.PullResponse
GetJobLogsHistory() []schemas.LogEvent
GetJobWsLogsHistory() []schemas.LogEvent
GetRunnerState() string
Run(ctx context.Context) error
SetCodePath(codePath string)
Expand Down
23 changes: 21 additions & 2 deletions runner/internal/executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"time"

"github.com/creack/pty"
"github.com/dstackai/ansistrip"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any benefit having this in a separate repo vs adding to dstack?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ansistrip docs should mention what it does with non-ascii, non-utf-8 sequences.

"github.com/dstackai/dstack/runner/consts"
"github.com/dstackai/dstack/runner/internal/connections"
"github.com/dstackai/dstack/runner/internal/gerrors"
Expand All @@ -28,6 +29,18 @@ import (
"github.com/prometheus/procfs"
)

// TODO: Tune these parameters for optimal experience/performance
const (
// Output is flushed when the cursor doesn't move for this duration
AnsiStripFlushInterval = 500 * time.Millisecond

// Output is flushed regardless of cursor activity after this maximum delay
AnsiStripMaxDelay = 3 * time.Second

// Maximum buffer size for ansistrip
MaxBufferSize = 32 * 1024 // 32KB
)

type ConnectionTracker interface {
GetNoConnectionsSecs() int64
Track(ticker <-chan time.Time)
Expand All @@ -54,6 +67,7 @@ type RunExecutor struct {
state string
jobStateHistory []schemas.JobStateEvent
jobLogs *appendWriter
jobWsLogs *appendWriter
runnerLogs *appendWriter
timestamp *MonotonicTimestamp

Expand Down Expand Up @@ -110,6 +124,7 @@ func NewRunExecutor(tempDir string, homeDir string, workingDir string, sshPort i
state: WaitSubmit,
jobStateHistory: make([]schemas.JobStateEvent, 0),
jobLogs: newAppendWriter(mu, timestamp),
jobWsLogs: newAppendWriter(mu, timestamp),
runnerLogs: newAppendWriter(mu, timestamp),
timestamp: timestamp,

Expand Down Expand Up @@ -153,7 +168,9 @@ func (ex *RunExecutor) Run(ctx context.Context) (err error) {
}
}()

logger := io.MultiWriter(runnerLogFile, os.Stdout, ex.runnerLogs)
stripper := ansistrip.NewWriter(ex.runnerLogs, AnsiStripFlushInterval, AnsiStripMaxDelay, MaxBufferSize)
defer stripper.Close()
logger := io.MultiWriter(runnerLogFile, os.Stdout, stripper)
ctx = log.WithLogger(ctx, log.NewEntry(logger, int(log.DefaultEntry.Logger.Level))) // todo loglevel
log.Info(ctx, "Run job", "log_level", log.GetLogger(ctx).Logger.Level.String())

Expand Down Expand Up @@ -455,7 +472,9 @@ func (ex *RunExecutor) execJob(ctx context.Context, jobLogFile io.Writer) error
defer func() { _ = ptm.Close() }()
defer func() { _ = cmd.Wait() }() // release resources if copy fails

logger := io.MultiWriter(jobLogFile, ex.jobLogs)
stripper := ansistrip.NewWriter(ex.jobLogs, AnsiStripFlushInterval, AnsiStripMaxDelay, MaxBufferSize)
defer stripper.Close()
logger := io.MultiWriter(jobLogFile, ex.jobWsLogs, stripper)
_, err = io.Copy(logger, ptm)
if err != nil && !isPtyError(err) {
return gerrors.Wrap(err)
Expand Down
113 changes: 104 additions & 9 deletions runner/internal/executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"os"
"os/exec"
"path/filepath"
"strings"
"testing"
"time"

Expand All @@ -17,8 +18,6 @@ import (
"github.com/stretchr/testify/require"
)

// todo test get history

func TestExecutor_WorkingDir_Current(t *testing.T) {
var b bytes.Buffer
ex := makeTestExecutor(t)
Expand All @@ -28,7 +27,8 @@ func TestExecutor_WorkingDir_Current(t *testing.T) {

err := ex.execJob(context.TODO(), io.Writer(&b))
assert.NoError(t, err)
assert.Equal(t, ex.workingDir+"\r\n", b.String())
// Normalize line endings for cross-platform compatibility.
assert.Equal(t, ex.workingDir+"\n", strings.ReplaceAll(b.String(), "\r\n", "\n"))
}

func TestExecutor_WorkingDir_Nil(t *testing.T) {
Expand All @@ -39,7 +39,7 @@ func TestExecutor_WorkingDir_Nil(t *testing.T) {

err := ex.execJob(context.TODO(), io.Writer(&b))
assert.NoError(t, err)
assert.Equal(t, ex.workingDir+"\r\n", b.String())
assert.Equal(t, ex.workingDir+"\n", strings.ReplaceAll(b.String(), "\r\n", "\n"))
}

func TestExecutor_HomeDir(t *testing.T) {
Expand All @@ -49,7 +49,7 @@ func TestExecutor_HomeDir(t *testing.T) {

err := ex.execJob(context.TODO(), io.Writer(&b))
assert.NoError(t, err)
assert.Equal(t, ex.homeDir+"\r\n", b.String())
assert.Equal(t, ex.homeDir+"\n", strings.ReplaceAll(b.String(), "\r\n", "\n"))
}

func TestExecutor_NonZeroExit(t *testing.T) {
Expand All @@ -61,7 +61,7 @@ func TestExecutor_NonZeroExit(t *testing.T) {
assert.Error(t, err)
assert.NotEmpty(t, ex.jobStateHistory)
exitStatus := ex.jobStateHistory[len(ex.jobStateHistory)-1].ExitStatus
assert.NotNil(t, exitStatus, ex.jobStateHistory)
assert.NotNil(t, exitStatus)
assert.Equal(t, 100, *exitStatus)
}

Expand Down Expand Up @@ -96,7 +96,7 @@ func TestExecutor_LocalRepo(t *testing.T) {

err = ex.execJob(context.TODO(), io.Writer(&b))
assert.NoError(t, err)
assert.Equal(t, "bar\r\n", b.String())
assert.Equal(t, "bar\n", strings.ReplaceAll(b.String(), "\r\n", "\n"))
}

func TestExecutor_Recover(t *testing.T) {
Expand Down Expand Up @@ -148,8 +148,8 @@ func TestExecutor_RemoteRepo(t *testing.T) {

err = ex.execJob(context.TODO(), io.Writer(&b))
assert.NoError(t, err)
expected := fmt.Sprintf("%s\r\n%s\r\n%s\r\n", ex.getRepoData().RepoHash, ex.getRepoData().RepoConfigName, ex.getRepoData().RepoConfigEmail)
assert.Equal(t, expected, b.String())
expected := fmt.Sprintf("%s\n%s\n%s\n", ex.getRepoData().RepoHash, ex.getRepoData().RepoConfigName, ex.getRepoData().RepoConfigEmail)
assert.Equal(t, expected, strings.ReplaceAll(b.String(), "\r\n", "\n"))
}

/* Helpers */
Expand Down Expand Up @@ -236,3 +236,98 @@ func TestWriteDstackProfile(t *testing.T) {
assert.Equal(t, value, string(out))
}
}

func TestExecutor_Logs(t *testing.T) {
var b bytes.Buffer
ex := makeTestExecutor(t)
// Use printf to generate ANSI control codes.
// \033[31m = red text, \033[1;32m = bold green text, \033[0m = reset
ex.jobSpec.Commands = append(ex.jobSpec.Commands, "printf '\\033[31mRed Hello World\\033[0m\\n' && printf '\\033[1;32mBold Green Line 2\\033[0m\\n' && printf 'Line 3\\n'")

err := ex.execJob(context.TODO(), io.Writer(&b))
assert.NoError(t, err)

logHistory := ex.GetHistory(0).JobLogs
assert.NotEmpty(t, logHistory)

logString := combineLogMessages(logHistory)
normalizedLogString := strings.ReplaceAll(logString, "\r\n", "\n")

expectedOutput := "Red Hello World\nBold Green Line 2\nLine 3\n"
assert.Equal(t, expectedOutput, normalizedLogString, "Should strip ANSI codes from regular logs")

// Verify timestamps are in order
assert.Greater(t, len(logHistory), 0)
for i := 1; i < len(logHistory); i++ {
assert.GreaterOrEqual(t, logHistory[i].Timestamp, logHistory[i-1].Timestamp)
}
}

func TestExecutor_LogsWithErrors(t *testing.T) {
var b bytes.Buffer
ex := makeTestExecutor(t)
ex.jobSpec.Commands = append(ex.jobSpec.Commands, "echo 'Success message' && echo 'Error message' >&2 && exit 1")

err := ex.execJob(context.TODO(), io.Writer(&b))
assert.Error(t, err)

logHistory := ex.GetHistory(0).JobLogs
assert.NotEmpty(t, logHistory)

logString := combineLogMessages(logHistory)
normalizedLogString := strings.ReplaceAll(logString, "\r\n", "\n")

expectedOutput := "Success message\nError message\n"
assert.Equal(t, expectedOutput, normalizedLogString)
}

func TestExecutor_LogsAnsiCodeHandling(t *testing.T) {
var b bytes.Buffer
ex := makeTestExecutor(t)

// Test a variety of ANSI escape sequences on stdout and stderr.
cmd := "printf '\\033[31mRed\\033[0m \\033[32mGreen\\033[0m\\n' && " +
"printf '\\033[1mBold\\033[0m \\033[4mUnderline\\033[0m\\n' && " +
"printf '\\033[s\\033[uPlain text\\n' >&2"

ex.jobSpec.Commands = append(ex.jobSpec.Commands, cmd)

err := ex.execJob(context.TODO(), io.Writer(&b))
assert.NoError(t, err)

// 1. Check WebSocket logs, which should preserve ANSI codes.
wsLogHistory := ex.GetJobWsLogsHistory()
assert.NotEmpty(t, wsLogHistory)
wsLogString := combineLogMessages(wsLogHistory)
normalizedWsLogString := strings.ReplaceAll(wsLogString, "\r\n", "\n")

expectedWsOutput := "\033[31mRed\033[0m \033[32mGreen\033[0m\n" +
"\033[1mBold\033[0m \033[4mUnderline\033[0m\n" +
"\033[s\033[uPlain text\n"
assert.Equal(t, expectedWsOutput, normalizedWsLogString, "Websocket logs should preserve ANSI codes")

// 2. Check regular job logs, which should have ANSI codes stripped.
regularLogHistory := ex.GetHistory(0).JobLogs
assert.NotEmpty(t, regularLogHistory)
regularLogString := combineLogMessages(regularLogHistory)
normalizedRegularLogString := strings.ReplaceAll(regularLogString, "\r\n", "\n")

expectedRegularOutput := "Red Green\n" +
"Bold Underline\n" +
"Plain text\n"
assert.Equal(t, expectedRegularOutput, normalizedRegularLogString, "Regular logs should have ANSI codes stripped")

// Verify timestamps are ordered for both log types.
assert.Greater(t, len(wsLogHistory), 0)
for i := 1; i < len(wsLogHistory); i++ {
assert.GreaterOrEqual(t, wsLogHistory[i].Timestamp, wsLogHistory[i-1].Timestamp)
}
}

func combineLogMessages(logHistory []schemas.LogEvent) string {
var logOutput bytes.Buffer
for _, logEvent := range logHistory {
logOutput.Write(logEvent.Message)
}
return logOutput.String()
}
4 changes: 2 additions & 2 deletions runner/internal/executor/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ import (
"github.com/dstackai/dstack/runner/internal/schemas"
)

func (ex *RunExecutor) GetJobLogsHistory() []schemas.LogEvent {
return ex.jobLogs.history
func (ex *RunExecutor) GetJobWsLogsHistory() []schemas.LogEvent {
return ex.jobWsLogs.history
}

func (ex *RunExecutor) GetHistory(timestamp int64) *schemas.PullResponse {
Expand Down
10 changes: 5 additions & 5 deletions runner/internal/runner/api/ws.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,23 +34,23 @@ func (s *Server) streamJobLogs(conn *websocket.Conn) {

for {
s.executor.RLock()
jobLogsHistory := s.executor.GetJobLogsHistory()
jobLogsWsHistory := s.executor.GetJobWsLogsHistory()
select {
case <-s.shutdownCh:
if currentPos >= len(jobLogsHistory) {
if currentPos >= len(jobLogsWsHistory) {
s.executor.RUnlock()
close(s.wsDoneCh)
return
}
default:
if currentPos >= len(jobLogsHistory) {
if currentPos >= len(jobLogsWsHistory) {
s.executor.RUnlock()
time.Sleep(100 * time.Millisecond)
continue
}
}
for currentPos < len(jobLogsHistory) {
if err := conn.WriteMessage(websocket.BinaryMessage, jobLogsHistory[currentPos].Message); err != nil {
for currentPos < len(jobLogsWsHistory) {
if err := conn.WriteMessage(websocket.BinaryMessage, jobLogsWsHistory[currentPos].Message); err != nil {
s.executor.RUnlock()
log.Error(context.TODO(), "Failed to write message", "err", err)
return
Expand Down
Loading
Loading