diff --git a/backend/Makefile b/backend/Makefile index 6fbd38a4..45ac60e6 100644 --- a/backend/Makefile +++ b/backend/Makefile @@ -19,6 +19,15 @@ image: --output ${OUTPUT} \ . +# make ingress PLATFORM= TAG= OUTPUT_INGRESS= +ingress: + docker buildx build \ + -f build/Dockerfile.ingress \ + --platform ${PLATFORM} \ + --tag ${REGISTRY}/monkeycode-ai-ingress:${TAG} \ + --output ${OUTPUT} \ + . + swag: swag fmt && swag init -ot json --pd -g cmd/server/main.go @@ -32,4 +41,4 @@ check-generate: @echo "Generated code is up to date." migrate_sql: - migrate create -ext sql -dir migration -seq ${SEQ} \ No newline at end of file + migrate create -ext sql -dir migration -seq ${SEQ} diff --git a/backend/biz/host/handler/v1/host.go b/backend/biz/host/handler/v1/host.go index add2b258..c1684bd8 100644 --- a/backend/biz/host/handler/v1/host.go +++ b/backend/biz/host/handler/v1/host.go @@ -371,9 +371,7 @@ func (h *HostHandler) ConnectVMTerminal(c *web.Context, req domain.TerminalReq) ctx, cancel := context.WithCancel(c.Request().Context()) defer cancel() - var vmInfo *domain.VirtualMachine if err := h.usecase.WithVMPermission(ctx, user.ID, req.ID, func(v *domain.VirtualMachine) error { - vmInfo = v return nil }); err != nil { logger.With("error", err).ErrorContext(ctx, "failed to check permission") @@ -404,20 +402,6 @@ func (h *HostHandler) ConnectVMTerminal(c *web.Context, req domain.TerminalReq) } defer shell.Stop() - // 刷新空闲计时器 - if vmInfo != nil { - hostID := "" - if vmInfo.Host != nil { - hostID = vmInfo.Host.ID - } - _ = h.usecase.RefreshIdleTimers(ctx, vmInfo.ID, &domain.VmIdleInfo{ - UID: user.ID, - VmID: vmInfo.ID, - HostID: hostID, - EnvID: vmInfo.EnvironmentID, - }) - } - go func() { defer cancel() for { @@ -577,17 +561,6 @@ func (h *HostHandler) ShareTerminal(c *web.Context, req domain.ShareTerminalReq) if err != nil { return err } - // 刷新空闲计时器 - hostID := "" - if v.Host != nil { - hostID = v.Host.ID - } - _ = h.usecase.RefreshIdleTimers(c.Request().Context(), v.ID, &domain.VmIdleInfo{ - UID: user.ID, - VmID: v.ID, - HostID: hostID, - EnvID: v.EnvironmentID, - }) return c.Success(resp) }) } @@ -752,21 +725,6 @@ func (h *HostHandler) ApplyPort(c *web.Context, req domain.ApplyPortReq) error { h.logger.With("error", err).ErrorContext(c.Request().Context(), "failed to apply port") return errcode.ErrApplyPortFailed.Wrap(err) } - - // 刷新空闲计时器 - _ = h.usecase.WithVMPermission(c.Request().Context(), user.ID, req.ID, func(v *domain.VirtualMachine) error { - hostID := "" - if v.Host != nil { - hostID = v.Host.ID - } - return h.usecase.RefreshIdleTimers(c.Request().Context(), v.ID, &domain.VmIdleInfo{ - UID: user.ID, - VmID: v.ID, - HostID: hostID, - EnvID: v.EnvironmentID, - }) - }) - return c.Success(port) } diff --git a/backend/biz/host/handler/v1/internal.go b/backend/biz/host/handler/v1/internal.go index 4212394a..950cc6c1 100644 --- a/backend/biz/host/handler/v1/internal.go +++ b/backend/biz/host/handler/v1/internal.go @@ -230,8 +230,8 @@ return nil // 通过 hook 获取关联的 TaskID(内部项目注入时生效) taskID := uuid.Nil - if h.hook != nil { - taskID = h.hook.OnAgentAuth(ctx, vm.ID) + if len(vm.Edges.Tasks) > 0 { + taskID = vm.Edges.Tasks[0].ID } return &taskflow.Token{ @@ -347,6 +347,15 @@ func (h *InternalHostHandler) VmReady(c *web.Context, req taskflow.VirtualMachin h.logger.With("task", t, "error", err).ErrorContext(c.Request().Context(), "failed to transition task to processing") } } + + go func() { + ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) + defer cancel() + if err := h.hostUsecase.RefreshIdleTimers(ctx, vm.ID); err != nil { + h.logger.With("error", err).ErrorContext(ctx, "failed to refresh idel timers") + } + }() + } return c.Success(nil) @@ -421,17 +430,12 @@ type VMActivityReq struct { // VMActivity VM 活动回调,用于刷新空闲计时器 func (h *InternalHostHandler) VMActivity(c *web.Context, req VMActivityReq) error { - vm, err := h.repo.GetVirtualMachine(c.Request().Context(), req.VMID) - if err != nil { - h.logger.ErrorContext(c.Request().Context(), "vm activity: vm not found", "vmID", req.VMID, "error", err) - return err - } - - payload := &domain.VmIdleInfo{ - UID: vm.UserID, - VmID: vm.ID, - HostID: vm.HostID, - EnvID: vm.EnvironmentID, - } - return h.hostUsecase.RefreshIdleTimers(c.Request().Context(), req.VMID, payload) + go func() { + ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) + defer cancel() + if err := h.hostUsecase.RefreshIdleTimers(ctx, req.VMID); err != nil { + h.logger.With("error", err).ErrorContext(ctx, "failed to refresh idel timers") + } + }() + return c.Success(nil) } diff --git a/backend/biz/host/repo/host.go b/backend/biz/host/repo/host.go index 52271bac..de16f181 100644 --- a/backend/biz/host/repo/host.go +++ b/backend/biz/host/repo/host.go @@ -215,6 +215,7 @@ func (h *HostRepo) GetVirtualMachineWithUser(ctx context.Context, uid uuid.UUID, ForUpdate(). WithHost(). WithModel(). + WithTasks(). WithUser(). Where(virtualmachine.HasHostWith(hostWithUserPredicate(uid))). Where(virtualmachine.UserID(uid)). @@ -233,6 +234,7 @@ func (h *HostRepo) GetVirtualMachine(ctx context.Context, id string) (*db.Virtua ForUpdate(). WithHost(). WithModel(). + WithTasks(). WithUser(). Where(virtualmachine.ID(id)). First(ctx) @@ -548,6 +550,7 @@ func (h *HostRepo) UpdateVM(ctx context.Context, req domain.UpdateVMReq, fn func // GetVirtualMachineByEnvID implements domain.HostRepo. func (h *HostRepo) GetVirtualMachineByEnvID(ctx context.Context, envID string) (*db.VirtualMachine, error) { return h.db.VirtualMachine.Query(). + WithTasks(). Where(virtualmachine.EnvironmentID(envID)). First(ctx) } diff --git a/backend/biz/host/usecase/host.go b/backend/biz/host/usecase/host.go index fbc92b05..504637bb 100644 --- a/backend/biz/host/usecase/host.go +++ b/backend/biz/host/usecase/host.go @@ -11,6 +11,7 @@ import ( "net/url" "sort" "strconv" + "strings" "time" "github.com/google/uuid" @@ -25,6 +26,7 @@ import ( "github.com/chaitin/MonkeyCode/backend/pkg/cvt" "github.com/chaitin/MonkeyCode/backend/pkg/delayqueue" "github.com/chaitin/MonkeyCode/backend/pkg/entx" + "github.com/chaitin/MonkeyCode/backend/pkg/notify/dispatcher" "github.com/chaitin/MonkeyCode/backend/pkg/random" "github.com/chaitin/MonkeyCode/backend/pkg/taskflow" "github.com/chaitin/MonkeyCode/backend/templates" @@ -36,7 +38,9 @@ type HostUsecase struct { taskflow taskflow.Clienter logger *slog.Logger repo domain.HostRepo + taskRepo domain.TaskRepo userRepo domain.UserRepo + notifyDispatcher *dispatcher.Dispatcher vmSleepQueue *delayqueue.VMSleepQueue vmNotifyQueue *delayqueue.VMNotifyQueue vmRecycleQueue *delayqueue.VMRecycleQueue @@ -46,16 +50,18 @@ type HostUsecase struct { func NewHostUsecase(i *do.Injector) (domain.HostUsecase, error) { h := &HostUsecase{ - cfg: do.MustInvoke[*config.Config](i), - redis: do.MustInvoke[*redis.Client](i), - taskflow: do.MustInvoke[taskflow.Clienter](i), - logger: do.MustInvoke[*slog.Logger](i).With("module", "HostUsecase"), - repo: do.MustInvoke[domain.HostRepo](i), - userRepo: do.MustInvoke[domain.UserRepo](i), - vmSleepQueue: do.MustInvoke[*delayqueue.VMSleepQueue](i), - vmNotifyQueue: do.MustInvoke[*delayqueue.VMNotifyQueue](i), - vmRecycleQueue: do.MustInvoke[*delayqueue.VMRecycleQueue](i), - vmexpireQueue: do.MustInvoke[*delayqueue.VMExpireQueue](i), + cfg: do.MustInvoke[*config.Config](i), + redis: do.MustInvoke[*redis.Client](i), + taskflow: do.MustInvoke[taskflow.Clienter](i), + logger: do.MustInvoke[*slog.Logger](i).With("module", "HostUsecase"), + repo: do.MustInvoke[domain.HostRepo](i), + taskRepo: do.MustInvoke[domain.TaskRepo](i), + userRepo: do.MustInvoke[domain.UserRepo](i), + notifyDispatcher: do.MustInvoke[*dispatcher.Dispatcher](i), + vmSleepQueue: do.MustInvoke[*delayqueue.VMSleepQueue](i), + vmNotifyQueue: do.MustInvoke[*delayqueue.VMNotifyQueue](i), + vmRecycleQueue: do.MustInvoke[*delayqueue.VMRecycleQueue](i), + vmexpireQueue: do.MustInvoke[*delayqueue.VMExpireQueue](i), } // 可选注入 PrivilegeChecker @@ -76,8 +82,33 @@ const ( VM_NOTIFY_QUEUE_KEY = "vm:idle:notify" VM_RECYCLE_QUEUE_KEY = "vm:idle:recycle" VM_EXPIRE_QUEUE_KEY = "vm:expire" + vmRecycleNotifyLead = time.Hour ) +func (h *HostUsecase) vmIdleSleepDelay() time.Duration { + return time.Duration(h.cfg.VMIdle.SleepSeconds) * time.Second +} + +func (h *HostUsecase) vmIdleRecycleDelay() time.Duration { + return time.Duration(h.cfg.VMIdle.RecycleSeconds) * time.Second +} + +func (h *HostUsecase) vmIdleNotifyDelay() time.Duration { + recycleDelay := h.vmIdleRecycleDelay() + if recycleDelay <= vmRecycleNotifyLead { + return 0 + } + return recycleDelay - vmRecycleNotifyLead +} + +func (h *HostUsecase) vmRecycleNotifyRemaining() time.Duration { + recycleDelay := h.vmIdleRecycleDelay() + if recycleDelay <= vmRecycleNotifyLead { + return recycleDelay + } + return vmRecycleNotifyLead +} + func (h *HostUsecase) periodicEnqueueVm() { t := time.NewTicker(10 * time.Minute) for range t.C { @@ -144,21 +175,25 @@ func (h *HostUsecase) vmexpireConsumer() { } } -func (h *HostUsecase) RefreshIdleTimers(ctx context.Context, vmID string, payload *domain.VmIdleInfo) error { - // 仅对任务创建的 VM 使用空闲检测逻辑 - // 手动创建的 VM(TTLKind=CountDown)保留原有的 TTL 过期逻辑 +func (h *HostUsecase) RefreshIdleTimers(ctx context.Context, vmID string) error { vm, err := h.repo.GetVirtualMachine(ctx, vmID) if err != nil { h.logger.ErrorContext(ctx, "failed to get vm for refresh idle timers", "vmID", vmID, "error", err) return fmt.Errorf("get vm %s: %w", vmID, err) } - // 如果是 CountDown 类型的 VM(手动创建),跳过空闲检测逻辑 - if vm.TTLKind == consts.CountDown { + if len(vm.Edges.Tasks) == 0 { h.logger.DebugContext(ctx, "skip idle timer for countdown VM", "vmID", vmID) return nil } + payload := &domain.VmIdleInfo{ + UID: vm.UserID, + VmID: vm.ID, + HostID: vm.HostID, + EnvID: vm.EnvironmentID, + } + debounceKey := fmt.Sprintf("vm:idle:debounce:%s", vmID) ok, err := h.redis.SetNX(ctx, debounceKey, "1", 30*time.Second).Result() if err != nil { @@ -171,15 +206,15 @@ func (h *HostUsecase) RefreshIdleTimers(ctx context.Context, vmID string, payloa now := time.Now() var errs []error - if _, err := h.vmSleepQueue.Enqueue(ctx, VM_SLEEP_QUEUE_KEY, payload, now.Add(10*time.Minute), vmID); err != nil { + if _, err := h.vmSleepQueue.Enqueue(ctx, VM_SLEEP_QUEUE_KEY, payload, now.Add(h.vmIdleSleepDelay()), vmID); err != nil { h.logger.ErrorContext(ctx, "failed to enqueue sleep", "error", err, "vmID", vmID) errs = append(errs, fmt.Errorf("enqueue sleep: %w", err)) } - if _, err := h.vmNotifyQueue.Enqueue(ctx, VM_NOTIFY_QUEUE_KEY, payload, now.Add(7*24*time.Hour-1*time.Hour), vmID); err != nil { + if _, err := h.vmNotifyQueue.Enqueue(ctx, VM_NOTIFY_QUEUE_KEY, payload, now.Add(h.vmIdleNotifyDelay()), vmID); err != nil { h.logger.ErrorContext(ctx, "failed to enqueue notify", "error", err, "vmID", vmID) errs = append(errs, fmt.Errorf("enqueue notify: %w", err)) } - if _, err := h.vmRecycleQueue.Enqueue(ctx, VM_RECYCLE_QUEUE_KEY, payload, now.Add(7*24*time.Hour), vmID); err != nil { + if _, err := h.vmRecycleQueue.Enqueue(ctx, VM_RECYCLE_QUEUE_KEY, payload, now.Add(h.vmIdleRecycleDelay()), vmID); err != nil { h.logger.ErrorContext(ctx, "failed to enqueue recycle", "error", err, "vmID", vmID) errs = append(errs, fmt.Errorf("enqueue recycle: %w", err)) } @@ -192,8 +227,25 @@ func (h *HostUsecase) vmSleepConsumer() { err := h.vmSleepQueue.StartConsumer(context.Background(), VM_SLEEP_QUEUE_KEY, func(ctx context.Context, job *delayqueue.Job[*domain.VmIdleInfo]) error { logger.InfoContext(ctx, "vm idle sleep triggered", "vmID", job.Payload.VmID) - // TODO: VirtualMachine ent schema does not have a status field yet; - // update this once SetStatus is available on VirtualMachineUpdateOne. + vm, err := h.repo.GetVirtualMachine(ctx, job.Payload.VmID) + if err != nil { + if db.IsNotFound(err) { + logger.InfoContext(ctx, "skip sleeping missing vm", "vmID", job.Payload.VmID) + return nil + } + return fmt.Errorf("get vm %s: %w", job.Payload.VmID, err) + } + if vm.IsRecycled { + return nil + } + + if err := h.taskflow.VirtualMachiner().Hibernate(ctx, &taskflow.HibernateVirtualMachineReq{ + HostID: vm.HostID, + UserID: vm.UserID.String(), + ID: vm.ID, + }); err != nil { + return fmt.Errorf("hibernate vm %s: %w", vm.ID, err) + } return nil }) logger.Warn("sleep consumer error, retrying...", "error", err) @@ -207,8 +259,26 @@ func (h *HostUsecase) vmNotifyConsumer() { err := h.vmNotifyQueue.StartConsumer(context.Background(), VM_NOTIFY_QUEUE_KEY, func(ctx context.Context, job *delayqueue.Job[*domain.VmIdleInfo]) error { logger.InfoContext(ctx, "vm recycle notify triggered", "vmID", job.Payload.VmID) - // TODO: 对接现有 Notify 模块,发送回收预警(任务维度) - return nil + vm, err := h.repo.GetVirtualMachine(ctx, job.Payload.VmID) + if err != nil { + if db.IsNotFound(err) { + return nil + } + return fmt.Errorf("get vm %s: %w", job.Payload.VmID, err) + } + if vm.IsRecycled { + return nil + } + + event, err := h.buildVMRecycleNotifyEvent(ctx, vm, time.Now().Add(h.vmRecycleNotifyRemaining())) + if err != nil { + return err + } + if event == nil { + return nil + } + + return h.notifyDispatcher.Publish(ctx, event) }) logger.Warn("notify consumer error, retrying...", "error", err) time.Sleep(10 * time.Second) @@ -226,9 +296,15 @@ func (h *HostUsecase) vmRecycleConsumer() { ctx = entx.SkipSoftDelete(ctx) vm, err := h.repo.GetVirtualMachine(ctx, job.Payload.VmID) if err != nil { + if db.IsNotFound(err) { + return nil + } innerLogger.ErrorContext(ctx, "failed to get vm", "error", err) return fmt.Errorf("get vm %s: %w", job.Payload.VmID, err) } + if vm.IsRecycled { + return nil + } if err := h.taskflow.VirtualMachiner().Delete(ctx, &taskflow.DeleteVirtualMachineReq{ UserID: vm.UserID.String(), @@ -246,6 +322,11 @@ func (h *HostUsecase) vmRecycleConsumer() { innerLogger.ErrorContext(ctx, "failed to update vm", "error", err) return err } + + if err := h.markRecycledTasksFinished(ctx, vm); err != nil { + innerLogger.ErrorContext(ctx, "failed to mark recycled tasks finished", "error", err) + return err + } return nil }) logger.Warn("recycle consumer error, retrying...", "error", err) @@ -378,6 +459,9 @@ func (h *HostUsecase) CloseTerminal(ctx context.Context, id string, terminalID s // ConnectVMTerminal 连接到虚拟机终端 func (h *HostUsecase) ConnectVMTerminal(ctx context.Context, uid uuid.UUID, req domain.TerminalReq) (taskflow.Sheller, error) { + if err := h.resumeVMIfSleeping(ctx, uid, req.ID); err != nil { + return nil, err + } return h.taskflow.VirtualMachiner().Terminal(ctx, &taskflow.TerminalReq{ ID: req.ID, TerminalID: req.TerminalID, @@ -507,14 +591,6 @@ func (h *HostUsecase) CreateVM(ctx context.Context, user *domain.User, req *doma }, time.Now().Add(time.Duration(req.Life)*time.Second), tfvm.ID); err != nil { h.logger.With("error", err, "vm", tfvm).ErrorContext(ctx, "failed to enqueue countdown vm") } - } else { - // 永久的 VM 使用空闲检测逻辑 - h.RefreshIdleTimers(ctx, tfvm.ID, &domain.VmIdleInfo{ - UID: user.ID, - VmID: tfvm.ID, - HostID: req.HostID, - EnvID: tfvm.EnvironmentID, - }) } return &domain.VirtualMachine{ @@ -590,7 +666,12 @@ func (h *HostUsecase) VMInfo(ctx context.Context, uid uuid.UUID, id string) (*do } status := taskflow.VirtualMachineStatusOffline - if vmonline.OnlineMap[vm.ID] { + if info, err := h.taskflow.VirtualMachiner().Info(ctx, taskflow.VirtualMachineInfoReq{ + ID: vm.ID, + UserID: vm.UserID.String(), + }); err == nil && info != nil && info.Status != taskflow.VirtualMachineStatusUnknown { + status = info.Status + } else if vmonline.OnlineMap[vm.ID] { status = taskflow.VirtualMachineStatusOnline } dvm := cvt.From(vm, &domain.VirtualMachine{ @@ -786,6 +867,9 @@ func (h *HostUsecase) UpdateVM(ctx context.Context, req domain.UpdateVMReq) (*do // ApplyPort implements domain.HostUsecase. func (h *HostUsecase) ApplyPort(ctx context.Context, uid uuid.UUID, req *domain.ApplyPortReq) (*domain.VMPort, error) { + if err := h.resumeVMIfSleeping(ctx, uid, req.ID); err != nil { + return nil, err + } if req.ForwardID == "" { forwardInfo, err := h.taskflow.PortForwarder().Create( ctx, @@ -834,12 +918,101 @@ func (h *HostUsecase) ApplyPort(ctx context.Context, uid uuid.UUID, req *domain. // RecyclePort implements domain.HostUsecase. func (h *HostUsecase) RecyclePort(ctx context.Context, uid uuid.UUID, req *domain.RecyclePortReq) error { + if err := h.resumeVMIfSleeping(ctx, uid, req.ID); err != nil { + return err + } return h.taskflow.PortForwarder().Close(ctx, taskflow.ClosePortForward{ ID: req.ID, ForwardID: req.ForwardID, }) } +func (h *HostUsecase) resumeVMIfSleeping(ctx context.Context, uid uuid.UUID, vmID string) error { + info, err := h.taskflow.VirtualMachiner().Info(ctx, taskflow.VirtualMachineInfoReq{ + ID: vmID, + UserID: uid.String(), + }) + if err != nil || info == nil || info.Status != taskflow.VirtualMachineStatusSleeping { + return nil + } + + if err := h.taskflow.VirtualMachiner().Resume(ctx, &taskflow.ResumeVirtualMachineReq{ + HostID: info.HostID, + UserID: uid.String(), + ID: vmID, + }); err != nil { + return err + } + + return nil +} +func (h *HostUsecase) markRecycledTasksFinished(ctx context.Context, vm *db.VirtualMachine) error { + var errs []error + for _, tk := range vm.Edges.Tasks { + if tk == nil { + continue + } + if tk.Status == consts.TaskStatusFinished || tk.Status == consts.TaskStatusError { + continue + } + err := h.taskRepo.Update(ctx, nil, tk.ID, func(up *db.TaskUpdateOne) error { + up.SetStatus(consts.TaskStatusFinished) + up.SetCompletedAt(time.Now()) + return nil + }) + if err != nil { + errs = append(errs, fmt.Errorf("update task %s: %w", tk.ID, err)) + } + } + return errors.Join(errs...) +} + +func (h *HostUsecase) buildVMRecycleNotifyEvent(ctx context.Context, vm *db.VirtualMachine, expiresAt time.Time) (*domain.NotifyEvent, error) { + if len(vm.Edges.Tasks) == 0 || vm.Edges.Tasks[0] == nil { + return nil, nil + } + + tk, err := h.taskRepo.GetByID(ctx, vm.Edges.Tasks[0].ID) + if err != nil { + return nil, fmt.Errorf("get task %s: %w", vm.Edges.Tasks[0].ID, err) + } + + event := &domain.NotifyEvent{ + EventType: consts.NotifyEventVMExpiringSoon, + SubjectUserID: tk.UserID, + RefID: tk.ID.String(), + OccurredAt: time.Now(), + Payload: domain.NotifyEventPayload{ + TaskID: tk.ID.String(), + TaskContent: tk.Content, + TaskStatus: string(tk.Status), + TaskURL: strings.TrimRight(h.cfg.Server.BaseURL, "/") + "/console/task/" + tk.ID.String(), + VMID: vm.ID, + VMName: vm.Name, + HostID: vm.HostID, + VMArch: vm.Arch, + VMCores: vm.Cores, + VMMemory: vm.Memory, + VMOS: vm.Os, + ExpiresAt: &expiresAt, + }, + } + + if len(tk.Edges.ProjectTasks) > 0 && tk.Edges.ProjectTasks[0] != nil { + pt := tk.Edges.ProjectTasks[0] + event.Payload.RepoURL = pt.RepoURL + if pt.Edges.Model != nil { + event.Payload.ModelName = pt.Edges.Model.Model + } + } + + if vm.Edges.User != nil { + event.Payload.UserName = vm.Edges.User.Name + } + + return event, nil +} + // GetPorts 获取虚拟机端口列表 func (h *HostUsecase) GetPorts(ctx context.Context, vid string) ([]*domain.VMPort, error) { forwardInfos, err := h.taskflow.PortForwarder().List(ctx, vid) diff --git a/backend/biz/register.go b/backend/biz/register.go index 19be2cad..ceec04e1 100644 --- a/backend/biz/register.go +++ b/backend/biz/register.go @@ -26,7 +26,7 @@ func RegisterAll(i *do.Injector) error { return err } - // 注册 task 模块(需在 git 模块之前,因为 webhook handler 依赖 GitTaskUsecase) + // 注册 task 模块的 usecase 和 handler(TaskUsecase 依赖 HostUsecase,需在 host 之后) task.RegisterTask(i) // 注册 git 模块 diff --git a/backend/biz/task/register.go b/backend/biz/task/register.go index f8718284..d73574fe 100644 --- a/backend/biz/task/register.go +++ b/backend/biz/task/register.go @@ -9,13 +9,13 @@ import ( "github.com/chaitin/MonkeyCode/backend/biz/task/usecase" ) -// RegisterTask 注册 task 模块 +// RegisterTask 注册 task 模块的 usecase 和 handler func RegisterTask(i *do.Injector) { - do.Provide(i, repo.NewTaskRepo) - do.Provide(i, repo.NewGitTaskRepo) do.Provide(i, usecase.NewTaskUsecase) do.Provide(i, usecase.NewGitTaskUsecase) do.Provide(i, service.NewTaskSummaryService) do.Provide(i, v1.NewTaskHandler) + do.Provide(i, repo.NewTaskRepo) + do.Provide(i, repo.NewGitTaskRepo) do.MustInvoke[*v1.TaskHandler](i) } diff --git a/backend/biz/task/usecase/gittask.go b/backend/biz/task/usecase/gittask.go index 2076b829..7047987c 100644 --- a/backend/biz/task/usecase/gittask.go +++ b/backend/biz/task/usecase/gittask.go @@ -2,6 +2,7 @@ package usecase import ( "context" + "encoding/json" "fmt" "log/slog" "strings" @@ -119,8 +120,12 @@ func (g *GitTaskUsecase) Create(ctx context.Context, req domain.CreateGitTaskReq }, Env: req.Env, } + b, err := json.Marshal(createTaskReq) + if err != nil { + return vm, err + } reqKey := fmt.Sprintf("task:create_req:%s", t.ID.String()) - if err := g.redis.Set(ctx, reqKey, createTaskReq, 10*time.Minute).Err(); err != nil { + if err := g.redis.Set(ctx, reqKey, string(b), 10*time.Minute).Err(); err != nil { g.logger.WarnContext(ctx, "failed to store CreateTaskReq in Redis", "error", err) } diff --git a/backend/biz/task/usecase/task.go b/backend/biz/task/usecase/task.go index 6b2dd5c4..569d374b 100644 --- a/backend/biz/task/usecase/task.go +++ b/backend/biz/task/usecase/task.go @@ -3,6 +3,7 @@ package usecase import ( "bytes" "context" + "encoding/json" "fmt" "log/slog" "os" @@ -172,8 +173,18 @@ func (a *TaskUsecase) List(ctx context.Context, user *domain.User, req domain.Ta // Stop implements domain.TaskUsecase. func (a *TaskUsecase) Stop(ctx context.Context, user *domain.User, id uuid.UUID) error { + t, err := a.repo.Info(ctx, user, id) + if err != nil { + return err + } + tk := cvt.From(t, &domain.Task{}) return a.repo.Stop(ctx, user, id, func(t *db.Task) error { return a.taskflow.TaskManager().Stop(ctx, taskflow.TaskReq{ + VirtualMachine: &taskflow.VirtualMachine{ + ID: tk.VirtualMachine.ID, + HostID: tk.VirtualMachine.Host.ID, + EnvironmentID: tk.VirtualMachine.EnvironmentID, + }, Task: &taskflow.Task{ ID: id, }, @@ -190,7 +201,11 @@ func (a *TaskUsecase) Cancel(ctx context.Context, user *domain.User, id uuid.UUI tk := cvt.From(t, &domain.Task{}) if err := a.taskflow.TaskManager().Cancel(ctx, taskflow.TaskReq{ - VirtualMachine: &taskflow.VirtualMachine{ID: tk.VirtualMachine.ID}, + VirtualMachine: &taskflow.VirtualMachine{ + ID: tk.VirtualMachine.ID, + HostID: tk.VirtualMachine.Host.ID, + EnvironmentID: tk.VirtualMachine.EnvironmentID, + }, Task: &taskflow.Task{ ID: id, }, @@ -208,9 +223,12 @@ func (a *TaskUsecase) Continue(ctx context.Context, user *domain.User, id uuid.U return err } tk := cvt.From(t, &domain.Task{}) - if err := a.taskflow.TaskManager().Continue(ctx, taskflow.TaskReq{ - VirtualMachine: &taskflow.VirtualMachine{ID: tk.VirtualMachine.ID}, + VirtualMachine: &taskflow.VirtualMachine{ + ID: tk.VirtualMachine.ID, + HostID: tk.VirtualMachine.Host.ID, + EnvironmentID: tk.VirtualMachine.EnvironmentID, + }, Task: &taskflow.Task{ ID: id, Text: content, @@ -351,8 +369,12 @@ func (a *TaskUsecase) Create(ctx context.Context, user *domain.User, req domain. Configs: configs, McpConfigs: mcps, } + b, err := json.Marshal(createTaskReq) + if err != nil { + return vm, err + } reqKey := fmt.Sprintf("task:create_req:%s", t.ID.String()) - if err := a.redis.Set(ctx, reqKey, createTaskReq, 10*time.Minute).Err(); err != nil { + if err := a.redis.Set(ctx, reqKey, string(b), 10*time.Minute).Err(); err != nil { a.logger.WarnContext(ctx, "failed to store CreateTaskReq in Redis", "error", err) } diff --git a/backend/build/Dockerfile.ingress b/backend/build/Dockerfile.ingress new file mode 100644 index 00000000..44152445 --- /dev/null +++ b/backend/build/Dockerfile.ingress @@ -0,0 +1,3 @@ +FROM nginx:1.29.6-alpine3.23 + +COPY build/nginx.conf /etc/nginx/nginx.conf diff --git a/backend/build/nginx.conf b/backend/build/nginx.conf new file mode 100644 index 00000000..13d3e0e8 --- /dev/null +++ b/backend/build/nginx.conf @@ -0,0 +1,100 @@ +events { + worker_connections 1024; +} + +http { + include /etc/nginx/mime.types; + default_type application/octet-stream; + + # 日志格式 + log_format main '$remote_addr - $remote_user [$time_local] "$request" ' + '$status $body_bytes_sent "$http_referer" ' + '"$http_user_agent" "$http_x_forwarded_for"'; + + # 基本设置 + sendfile on; + tcp_nopush on; + tcp_nodelay on; + keepalive_timeout 65; + types_hash_max_size 2048; + + # Gzip 压缩 + gzip on; + gzip_vary on; + gzip_min_length 1024; + gzip_types text/plain text/css text/xml text/javascript application/javascript application/xml+rss application/json; + + server { + listen 80; + server_name localhost; + + # 处理前端路由(SPA) + location / { + proxy_pass http://monkeycode-ai-frontend; + } + + location /api/ { + client_max_body_size 10m; + proxy_pass http://monkeycode-ai-backend:8888; + proxy_set_header Host $host; + proxy_set_header X-Real-IP $remote_addr; + proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; + proxy_set_header X-Forwarded-Proto $scheme; + + # 支持 WebSocket + proxy_http_version 1.1; + proxy_set_header Upgrade $http_upgrade; + proxy_set_header Connection "upgrade"; + } + + location /api/v1/users/files/upload { + client_max_body_size 10m; + proxy_pass http://monkeycode-ai-backend:8888; + proxy_set_header Host $host; + proxy_set_header X-Real-IP $remote_addr; + proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; + proxy_set_header X-Forwarded-Proto $scheme; + proxy_read_timeout 10m; + proxy_send_timeout 60s; + } + } + + upstream grpcservers { + server monkeycode-ai-taskflow:50051; + keepalive 500; + } + + server { + listen 50051; + server_name _; + + http2 on; + underscores_in_headers on; + + location / { + grpc_pass grpc://grpcservers; + + # 1. 增加读写超时时间 + # gRPC 双向流通常是长连接,默认 60s 超时会导致连接意外断开 + grpc_read_timeout 1h; + grpc_send_timeout 1h; + + grpc_set_header Host $host; + grpc_set_header X-Real-IP $remote_addr; + } + + location /codingmatrix.proto.v1.agent.AgentService/FileManager { + grpc_pass grpc://grpcservers; + + client_max_body_size 10m; + + # 1. 增加读写超时时间 + # gRPC 双向流通常是长连接,默认 60s 超时会导致连接意外断开 + grpc_read_timeout 1h; + grpc_send_timeout 1h; + + grpc_set_header Host $host; + grpc_set_header X-Real-IP $remote_addr; + } + } +} diff --git a/backend/config/config.go b/backend/config/config.go index 055cc655..dcb5221f 100644 --- a/backend/config/config.go +++ b/backend/config/config.go @@ -40,6 +40,7 @@ type Config struct { Loki Loki `mapstructure:"loki"` LLM LLM `mapstructure:"llm"` Notify Notify `mapstructure:"notify"` + VMIdle VMIdle `mapstructure:"vm_idle"` // Context7 API 配置 Context7ApiKey string `mapstructure:"context7_api_key"` @@ -109,6 +110,11 @@ type Notify struct { VMExpireWarningMinutes int `mapstructure:"vm_expire_warning_minutes"` // VM 过期预警时间(分钟) } +type VMIdle struct { + SleepSeconds int `mapstructure:"sleep_seconds"` // VM 空闲休眠时间(秒) + RecycleSeconds int `mapstructure:"recycle_seconds"` // VM 空闲回收时间(秒) +} + type Session struct { ExpireDay int `mapstructure:"expire_day"` } @@ -139,6 +145,7 @@ func Init(dir string) (*Config, error) { v.SetDefault("debug", false) v.SetDefault("server.addr", ":8888") v.SetDefault("server.base_url", "http://localhost:8888") + v.SetDefault("loki.addr", "http://monkeycode-ai-loki:3100") v.SetDefault("database.master", "") v.SetDefault("database.slave", "") v.SetDefault("database.max_open_conns", 100) @@ -157,6 +164,8 @@ func Init(dir string) (*Config, error) { v.SetDefault("redis.port", 6379) v.SetDefault("redis.pass", "") v.SetDefault("redis.db", 0) + v.SetDefault("vm_idle.sleep_seconds", 600) + v.SetDefault("vm_idle.recycle_seconds", 604800) v.SetDefault("init_team.email", "") v.SetDefault("init_team.name", "") v.SetDefault("init_team.password", "") diff --git a/backend/config/server/config.yaml.example b/backend/config/server/config.yaml.example index 836e6318..b2d88550 100644 --- a/backend/config/server/config.yaml.example +++ b/backend/config/server/config.yaml.example @@ -33,3 +33,7 @@ admin_token: "change-me" logger: level: "info" + +vm_idle: + sleep_seconds: 600 + recycle_seconds: 604800 diff --git a/backend/domain/host.go b/backend/domain/host.go index 6e3f3bc0..9206696c 100644 --- a/backend/domain/host.go +++ b/backend/domain/host.go @@ -30,7 +30,7 @@ type HostUsecase interface { DeleteVM(ctx context.Context, uid uuid.UUID, hostID, vmID string) error DeleteHost(ctx context.Context, uid uuid.UUID, id string) error UpdateHost(ctx context.Context, uid uuid.UUID, req *UpdateHostReq) error - RefreshIdleTimers(ctx context.Context, vmID string, payload *VmIdleInfo) error + RefreshIdleTimers(ctx context.Context, vmID string) error FireExpiredVM(ctx context.Context, fire bool) ([]FireExpiredVMItem, error) UpdateVM(ctx context.Context, req UpdateVMReq) (*VirtualMachine, error) ApplyPort(ctx context.Context, uid uuid.UUID, req *ApplyPortReq) (*VMPort, error) @@ -63,8 +63,8 @@ type VmIdleInfo struct { VmID string `json:"vm_id"` HostID string `json:"host_id"` EnvID string `json:"env_id"` - TaskID string `json:"task_id,omitempty"` // 关联的任务 ID,用于通知 - Name string `json:"name,omitempty"` // 任务名称,用于通知内容 + TaskID string `json:"task_id,omitempty"` // 关联的任务 ID,用于通知 + Name string `json:"name,omitempty"` // 任务名称,用于通知内容 } // VmExpireInfo VM 过期信息(手动创建的 VM) @@ -125,21 +125,21 @@ type VMTerminalResizeData struct { // VirtualMachine 虚拟机 type VirtualMachine struct { - ID string `json:"id"` - Hostname string `json:"hostname"` - OS string `json:"os"` - Cores int32 `json:"cores"` - Memory uint64 `json:"memory"` + ID string `json:"id"` + Hostname string `json:"hostname"` + OS string `json:"os"` + Cores int32 `json:"cores"` + Memory uint64 `json:"memory"` Status taskflow.VirtualMachineStatus `json:"status"` - Name string `json:"name"` - LifeTimeSeconds int64 `json:"life_time_seconds"` - Host *Host `json:"host,omitempty"` - Version string `json:"version"` - CreatedAt int64 `json:"created_at"` - EnvironmentID string `json:"environment_id,omitempty"` - Owner *User `json:"owner,omitempty"` - Conditions []*etypes.Condition `json:"conditions"` - Ports []*VMPort `json:"ports,omitempty"` + Name string `json:"name"` + LifeTimeSeconds int64 `json:"life_time_seconds"` + Host *Host `json:"host,omitempty"` + Version string `json:"version"` + CreatedAt int64 `json:"created_at"` + EnvironmentID string `json:"environment_id,omitempty"` + Owner *User `json:"owner,omitempty"` + Conditions []*etypes.Condition `json:"conditions"` + Ports []*VMPort `json:"ports,omitempty"` } // From 从数据库模型转换 @@ -369,4 +369,3 @@ type FireExpiredVMItem struct { ID string `json:"id"` Message string `json:"message"` } - diff --git a/backend/pkg/delayqueue/vmidlequeue.go b/backend/pkg/delayqueue/vmidlequeue.go index 9b57eb72..437cb031 100644 --- a/backend/pkg/delayqueue/vmidlequeue.go +++ b/backend/pkg/delayqueue/vmidlequeue.go @@ -9,7 +9,7 @@ import ( "github.com/chaitin/MonkeyCode/backend/domain" ) -// VMSleepQueue 10 分钟空闲休眠队列 +// VMSleepQueue 空闲休眠队列 type VMSleepQueue struct { *RedisDelayQueue[*domain.VmIdleInfo] } @@ -19,13 +19,13 @@ type VMNotifyQueue struct { *RedisDelayQueue[*domain.VmIdleInfo] } -// VMRecycleQueue 7 天空闲回收队列 +// VMRecycleQueue 空闲回收队列 type VMRecycleQueue struct { *RedisDelayQueue[*domain.VmIdleInfo] } func NewVMSleepQueue(rdb *redis.Client, logger *slog.Logger) *VMSleepQueue { - return &VMSleepQueue{NewRedisDelayQueue[*domain.VmIdleInfo](rdb, logger, + return &VMSleepQueue{NewRedisDelayQueue(rdb, logger, WithPrefix[*domain.VmIdleInfo]("mcai:vmsleep"), WithPollInterval[*domain.VmIdleInfo](5*time.Second), WithRequeueDelay[*domain.VmIdleInfo](1*time.Minute), @@ -33,7 +33,7 @@ func NewVMSleepQueue(rdb *redis.Client, logger *slog.Logger) *VMSleepQueue { } func NewVMNotifyQueue(rdb *redis.Client, logger *slog.Logger) *VMNotifyQueue { - return &VMNotifyQueue{NewRedisDelayQueue[*domain.VmIdleInfo](rdb, logger, + return &VMNotifyQueue{NewRedisDelayQueue(rdb, logger, WithPrefix[*domain.VmIdleInfo]("mcai:vmnotify"), WithPollInterval[*domain.VmIdleInfo](30*time.Second), WithRequeueDelay[*domain.VmIdleInfo](1*time.Minute), @@ -42,7 +42,7 @@ func NewVMNotifyQueue(rdb *redis.Client, logger *slog.Logger) *VMNotifyQueue { } func NewVMRecycleQueue(rdb *redis.Client, logger *slog.Logger) *VMRecycleQueue { - return &VMRecycleQueue{NewRedisDelayQueue[*domain.VmIdleInfo](rdb, logger, + return &VMRecycleQueue{NewRedisDelayQueue(rdb, logger, WithPrefix[*domain.VmIdleInfo]("mcai:vmrecycle"), WithPollInterval[*domain.VmIdleInfo](30*time.Second), WithRequeueDelay[*domain.VmIdleInfo](1*time.Minute), diff --git a/backend/pkg/loki/client.go b/backend/pkg/loki/client.go index 800849f8..215fef62 100644 --- a/backend/pkg/loki/client.go +++ b/backend/pkg/loki/client.go @@ -13,18 +13,20 @@ import ( "strconv" "strings" "time" + + "github.com/coder/websocket" ) // Client Loki 客户端 type Client struct { - baseURL string - httpClient *http.Client - basicUser string - basicPass string + baseURL string + httpClient *http.Client + basicUser string + basicPass string bearerToken string - orgID string - headers http.Header - logger *slog.Logger + orgID string + headers http.Header + logger *slog.Logger } // Option 配置选项 @@ -173,61 +175,359 @@ func (c *Client) History(ctx context.Context, taskID string, start time.Time, fn return nil } -// Tail 实时追踪任务日志,先拉取历史再通过轮询追踪新日志 -// start: 查询起始时间 -// limit: 单次查询最大条数 -// fn: 回调函数,收到日志后调用 +// Tail 使用 WebSocket 替代 HTTP 轮询,提供完整的实时日志流 +// 策略: +// 1. 历史阶段:通过 HTTP 查询从 start 到 now-2s 的所有历史日志 +// 2. 实时阶段:建立 WebSocket 连接,从 lastTS-2s 开始接收实时日志 +// 3. 去重机制:基于"时间戳+日志内容"的复合键去重,处理同一纳秒的多条日志 +// 4. 每收到一条日志立即调用回调函数(无批处理) +// +// start: 日志查询起始时间 +// limit: 单次查询/接收的最大日志条数 +// fn: 日志回调函数,接收单条日志的切片,返回 error 可中断处理 func (c *Client) Tail(ctx context.Context, taskID string, start time.Time, limit int, fn func([]LogEntry) error) error { + // 参数校验与初始化 if limit <= 0 { limit = 200 } - // 阶段 1: 拉取历史日志 - histEnd := time.Now() + const skew = 2 * time.Second // Loki 聚合延迟安全窗口 + + query := fmt.Sprintf(`{task_id="%s"}`, escapeLabelValue(taskID)) + + // 去重状态:仅跟踪最新时间戳的日志 + var lastTS time.Time + seenAtLastTS := make(map[string]struct{}) + + // === 阶段 1: 历史数据查询 === + c.logger.With("task_id", taskID, "start", start).DebugContext(ctx, "Tail: starting historical phase") + + histEnd := time.Now().Add(-skew) histStart := start + historicalFailed := false for { - if ctx.Err() != nil { - return ctx.Err() - } + // 查询历史日志 entries, err := c.QueryByTaskID(ctx, taskID, histStart, histEnd, limit, "forward") if err != nil { - return fmt.Errorf("historical query failed: %w", err) + // 历史查询失败时,记录警告日志并继续进入 WebSocket 实时阶段 + c.logger.With("task_id", taskID, "error", err).WarnContext(ctx, "Tail: historical query failed, continuing to WebSocket phase") + historicalFailed = true + break } - if len(entries) > 0 { - if err := fn(entries); err != nil { - return err + + // 处理历史日志,应用去重逻辑并立即回调 + for _, e := range entries { + key := e.Line // 使用日志内容作为去重键 + + switch { + case lastTS.IsZero(): + // 第一条日志 + lastTS = e.Timestamp + seenAtLastTS = make(map[string]struct{}) + seenAtLastTS[key] = struct{}{} + if err := fn([]LogEntry{e}); err != nil { + return fmt.Errorf("callback error: %w", err) + } + + case e.Timestamp.Before(lastTS): + // 乱序日志(时间戳小于当前最大值),丢弃 + continue + + case e.Timestamp.Equal(lastTS): + // 相同时间戳,按键去重 + if _, exists := seenAtLastTS[key]; exists { + continue + } + seenAtLastTS[key] = struct{}{} + if err := fn([]LogEntry{e}); err != nil { + return fmt.Errorf("callback error: %w", err) + } + + default: // e.Timestamp.After(lastTS) + // 时间戳前进,更新去重状态 + lastTS = e.Timestamp + seenAtLastTS = make(map[string]struct{}) + seenAtLastTS[key] = struct{}{} + if err := fn([]LogEntry{e}); err != nil { + return fmt.Errorf("callback error: %w", err) + } } - histStart = entries[len(entries)-1].Timestamp.Add(time.Nanosecond) } + + // 检查是否完成历史查询 if len(entries) < limit { + // 历史数据已全部获取 break } + + // 继续分页:从最后一条日志的时间戳 + 1ns 开始 + histStart = lastTS.Add(time.Nanosecond) + } + + if historicalFailed { + c.logger.With("task_id", taskID, "last_ts", lastTS).DebugContext(ctx, "Tail: historical phase ended early due to error") + } else { + c.logger.With("task_id", taskID, "last_ts", lastTS).DebugContext(ctx, "Tail: historical phase complete") } - // 阶段 2: 轮询新日志 - pollStart := histStart - ticker := time.NewTicker(2 * time.Second) - defer ticker.Stop() + // === 阶段 2: WebSocket 实时流(带重连和心跳) === + c.logger.With("task_id", taskID).DebugContext(ctx, "Tail: starting WebSocket phase") + + // 构造 WebSocket 认证头(复用于每次连接) + header := http.Header{} + for k, vals := range c.headers { + for _, v := range vals { + header.Add(k, v) + } + } + if c.orgID != "" { + header.Set("X-Scope-OrgID", c.orgID) + } + if c.bearerToken != "" { + header.Set("Authorization", "Bearer "+c.bearerToken) + } else if c.basicUser != "" || c.basicPass != "" { + header.Set("Authorization", "Basic "+basicAuth(c.basicUser, c.basicPass)) + } + + const ( + maxReconnectAttempts = 10 + initialBackoff = 500 * time.Millisecond + maxBackoff = 10 * time.Second + pingInterval = 30 * time.Second + ) + + reconnectAttempts := 0 for { + if ctx.Err() != nil { + return ctx.Err() + } + + // 构造 WebSocket URL,每次重连从 lastTS-skew 开始以利用去重 + q := url.Values{} + q.Set("query", query) + q.Set("limit", strconv.Itoa(limit)) + wsStart := lastTS + if wsStart.IsZero() { + wsStart = histEnd + } + wsStart = wsStart.Add(-skew) + q.Set("start", strconv.FormatInt(wsStart.UnixNano(), 10)) + + wsURL, err := c.toWebSocketURL("/loki/api/v1/tail", q) + if err != nil { + return fmt.Errorf("failed to build WebSocket URL: %w", err) + } + + c.logger.With("task_id", taskID, "url", wsURL, "reconnect_attempts", reconnectAttempts). + DebugContext(ctx, "Tail: connecting to WebSocket") + + conn, _, err := websocket.Dial(ctx, wsURL, &websocket.DialOptions{ + HTTPHeader: header, + }) + if err != nil { + reconnectAttempts++ + if reconnectAttempts > maxReconnectAttempts { + return fmt.Errorf("WebSocket dial failed after %d attempts: %w", maxReconnectAttempts, err) + } + backoff := min(initialBackoff*time.Duration(1<<(reconnectAttempts-1)), maxBackoff) + c.logger.With("error", err, "task_id", taskID, "attempt", reconnectAttempts, "backoff", backoff). + WarnContext(ctx, "Tail: WebSocket dial failed, retrying") + select { + case <-time.After(backoff): + continue + case <-ctx.Done(): + return ctx.Err() + } + } + conn.SetReadLimit(-1) + + // 连接成功,重置重连计数 + reconnectAttempts = 0 + c.logger.With("task_id", taskID).DebugContext(ctx, "Tail: WebSocket connected") + + // 运行单次 WebSocket 会话(含心跳) + sessionErr := c.tailWebSocketSession(ctx, conn, pingInterval, &lastTS, seenAtLastTS, fn) + conn.Close(websocket.StatusNormalClosure, "session ended") + + if sessionErr == nil { + // 正常关闭(服务端主动断开),不重连 + return nil + } + if ctx.Err() != nil { + return ctx.Err() + } + + // WebSocket 异常断开,尝试重连 + reconnectAttempts++ + if reconnectAttempts > maxReconnectAttempts { + return fmt.Errorf("WebSocket failed after %d reconnect attempts: %w", maxReconnectAttempts, sessionErr) + } + backoff := min(initialBackoff*time.Duration(1<<(reconnectAttempts-1)), maxBackoff) + c.logger.With("error", sessionErr, "task_id", taskID, "attempt", reconnectAttempts, "backoff", backoff). + WarnContext(ctx, "Tail: WebSocket disconnected, reconnecting") select { + case <-time.After(backoff): case <-ctx.Done(): return ctx.Err() - case <-ticker.C: - entries, err := c.QueryByTaskID(ctx, taskID, pollStart, time.Now(), limit, "forward") + } + } +} + +// tailWebSocketSession 运行单次 Loki tail WebSocket 会话,包含心跳 ping 和消息处理。 +// 返回 nil 表示连接正常关闭(不需要重连),返回 error 表示异常断开(需要重连)。 +func (c *Client) tailWebSocketSession( + ctx context.Context, + conn *websocket.Conn, + pingInterval time.Duration, + lastTS *time.Time, + seenAtLastTS map[string]struct{}, + fn func([]LogEntry) error, +) error { + // 心跳 goroutine:定期发送 ping 防止空闲超时 + pingCtx, pingCancel := context.WithCancel(ctx) + defer pingCancel() + go func() { + ticker := time.NewTicker(pingInterval) + defer ticker.Stop() + for { + select { + case <-pingCtx.Done(): + return + case <-ticker.C: + if err := conn.Ping(pingCtx); err != nil { + c.logger.With("error", err).DebugContext(pingCtx, "Tail: ping failed") + return + } + } + } + }() + + // 读取 goroutine + msgCh := make(chan []byte, 32) + errCh := make(chan error, 1) + go func() { + defer close(msgCh) + defer close(errCh) + for { + _, data, err := conn.Read(ctx) if err != nil { - c.logger.Warn("tail poll failed", "error", err, "task_id", taskID) - continue + errCh <- err + return } - if len(entries) > 0 { - if err := fn(entries); err != nil { + select { + case msgCh <- data: + case <-ctx.Done(): + return + } + } + }() + + // 主事件循环 + for { + select { + case <-ctx.Done(): + return ctx.Err() + + case data, ok := <-msgCh: + if !ok { + // 读取 goroutine 退出且 channel 已空,检查是否有 error + select { + case err := <-errCh: return err + default: + return nil + } + } + + var msg lokiTailMessage + if err := json.Unmarshal(data, &msg); err != nil { + continue + } + + for _, s := range msg.Streams { + lbls := s.Stream + for _, v := range s.Values { + if len(v) != 2 { + continue + } + + ns, err := strconv.ParseInt(v[0], 10, 64) + if err != nil { + continue + } + ts := time.Unix(0, ns).UTC() + key := v[1] + + entry := LogEntry{ + Timestamp: ts, + Line: v[1], + Labels: lbls, + } + + switch { + case lastTS.IsZero(): + *lastTS = ts + // 清空并重用 seenAtLastTS + for k := range seenAtLastTS { + delete(seenAtLastTS, k) + } + seenAtLastTS[key] = struct{}{} + if err := fn([]LogEntry{entry}); err != nil { + return fmt.Errorf("callback error: %w", err) + } + + case ts.Before(*lastTS): + continue + + case ts.Equal(*lastTS): + if _, exists := seenAtLastTS[key]; exists { + continue + } + seenAtLastTS[key] = struct{}{} + if err := fn([]LogEntry{entry}); err != nil { + return fmt.Errorf("callback error: %w", err) + } + + default: // ts.After(*lastTS) + *lastTS = ts + for k := range seenAtLastTS { + delete(seenAtLastTS, k) + } + seenAtLastTS[key] = struct{}{} + if err := fn([]LogEntry{entry}); err != nil { + return fmt.Errorf("callback error: %w", err) + } + } } - pollStart = entries[len(entries)-1].Timestamp.Add(time.Nanosecond) } + + case err := <-errCh: + return err + } + } +} + +func (c *Client) toWebSocketURL(path string, q url.Values) (string, error) { + u, err := url.Parse(c.baseURL) + if err != nil { + return "", err + } + switch u.Scheme { + case "http": + u.Scheme = "ws" + case "https": + u.Scheme = "wss" + default: + // 未指定 scheme 时默认按 ws 处理 + if u.Scheme == "" { + u.Scheme = "ws" } } + u.Path = strings.TrimRight(u.Path, "/") + path + u.RawQuery = q.Encode() + return u.String(), nil } func (c *Client) decorateReq(req *http.Request) { @@ -266,3 +566,8 @@ type lokiQueryFrame struct { Stream map[string]string `json:"stream"` Values [][]string `json:"values"` } + +type lokiTailMessage struct { + Streams []lokiQueryFrame `json:"streams"` + // dropped_entries 字段可能存在于某些版本,这里不强依赖 +} diff --git a/backend/pkg/taskflow/client.go b/backend/pkg/taskflow/client.go index 35fa297e..862a5710 100644 --- a/backend/pkg/taskflow/client.go +++ b/backend/pkg/taskflow/client.go @@ -47,6 +47,8 @@ type Hoster interface { type VirtualMachiner interface { Create(ctx context.Context, req *CreateVirtualMachineReq) (*VirtualMachine, error) Delete(ctx context.Context, req *DeleteVirtualMachineReq) error + Hibernate(ctx context.Context, req *HibernateVirtualMachineReq) error + Resume(ctx context.Context, req *ResumeVirtualMachineReq) error List(ctx context.Context, id string) ([]*VirtualMachine, error) Info(ctx context.Context, req VirtualMachineInfoReq) (*VirtualMachine, error) Terminal(ctx context.Context, req *TerminalReq) (Sheller, error) diff --git a/backend/pkg/taskflow/types.go b/backend/pkg/taskflow/types.go index 45c7d095..8dcc2281 100644 --- a/backend/pkg/taskflow/types.go +++ b/backend/pkg/taskflow/types.go @@ -520,7 +520,7 @@ type Task struct { type CodingAgent int const ( - CodingAgentCodex CodingAgent = iota + 1 + CodingAgentCodex CodingAgent = iota + 1 CodingAgentClaude CodingAgentMCAIReview CodingAgentOpenCode @@ -636,3 +636,15 @@ type File struct { AccessedAt int64 `json:"accessed_at"` UpdatedAt int64 `json:"updated_at"` } + +type HibernateVirtualMachineReq struct { + HostID string `json:"host_id" query:"host_id" validate:"required"` // 宿主机 id + UserID string `json:"user_id" query:"user_id" validate:"required"` // 用户id + ID string `json:"id" query:"id" validate:"required"` // 虚拟机 id +} + +type ResumeVirtualMachineReq struct { + HostID string `json:"host_id" query:"host_id" validate:"required"` // 宿主机 id + UserID string `json:"user_id" query:"user_id" validate:"required"` // 用户id + ID string `json:"id" query:"id" validate:"required"` // 虚拟机 id +} diff --git a/backend/pkg/taskflow/vm.go b/backend/pkg/taskflow/vm.go index f5ce14b1..bbc6526d 100644 --- a/backend/pkg/taskflow/vm.go +++ b/backend/pkg/taskflow/vm.go @@ -171,3 +171,21 @@ func (v *virtualMachineClient) CloseTerminal(ctx context.Context, req *CloseTerm _, err := request.Delete[any](v.client, ctx, "/internal/terminal", request.WithBody(req)) return err } + +// Hibernate implements [VirtualMachiner]. +func (v *virtualMachineClient) Hibernate(ctx context.Context, req *HibernateVirtualMachineReq) error { + _, err := request.Post[Resp[any]](v.client, ctx, "/internal/vm/hibernate", req) + if err != nil { + return err + } + return nil +} + +// Resume implements [VirtualMachiner]. +func (v *virtualMachineClient) Resume(ctx context.Context, req *ResumeVirtualMachineReq) error { + _, err := request.Post[Resp[any]](v.client, ctx, "/internal/vm/resume", req) + if err != nil { + return err + } + return nil +} diff --git a/frontend/docker/nginx.conf b/frontend/docker/nginx.conf index 3c0ef096..cc967929 100644 --- a/frontend/docker/nginx.conf +++ b/frontend/docker/nginx.conf @@ -38,74 +38,10 @@ http { try_files $uri $uri/ /index.html; } - location /api/ { - client_max_body_size 10m; - proxy_pass http://monkeycode-ai-backend:8888; - proxy_set_header Host $host; - proxy_set_header X-Real-IP $remote_addr; - proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; - proxy_set_header X-Forwarded-Proto $scheme; - - # 支持 WebSocket - proxy_http_version 1.1; - proxy_set_header Upgrade $http_upgrade; - proxy_set_header Connection "upgrade"; - } - - location /api/v1/users/files/upload { - client_max_body_size 10m; - proxy_pass http://monkeycode-ai-backend:8888; - proxy_set_header Host $host; - proxy_set_header X-Real-IP $remote_addr; - proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; - proxy_set_header X-Forwarded-Proto $scheme; - proxy_read_timeout 10m; - proxy_send_timeout 60s; - } - # 静态资源缓存 location ~* \.(js|css|png|svg|ttf)$ { expires 1y; add_header Cache-Control "public, immutable"; } } - - upstream grpcservers { - server monkeycode-ai-taskflow:50051; - keepalive 500; - } - - server { - listen 50051; - server_name _; - - http2 on; - underscores_in_headers on; - - location / { - grpc_pass grpc://grpcservers; - - # 1. 增加读写超时时间 - # gRPC 双向流通常是长连接,默认 60s 超时会导致连接意外断开 - grpc_read_timeout 1h; - grpc_send_timeout 1h; - - grpc_set_header Host $host; - grpc_set_header X-Real-IP $remote_addr; - } - - location /codingmatrix.proto.v1.agent.AgentService/FileManager { - grpc_pass grpc://grpcservers; - - client_max_body_size 10m; - - # 1. 增加读写超时时间 - # gRPC 双向流通常是长连接,默认 60s 超时会导致连接意外断开 - grpc_read_timeout 1h; - grpc_send_timeout 1h; - - grpc_set_header Host $host; - grpc_set_header X-Real-IP $remote_addr; - } - } }