feature: implement scheduler execution engine with Service Binding architecture#23
Conversation
…chitecture Phase A-C: execution engine core - Extend /run endpoint with type (web/data) and mode (test/run) discriminated union - Add CodeRunnerClient interface + HTTPCodeRunnerClient with retry (5xx, exponential backoff) - Add scheduler-stage-runs CRUD in supabase-connector - Implement stage-runner (executeStage, executeFanOut, resolveDefaultInput) - Implement scheduler-executor (pipeline orchestration, fan-out, partial failure) - Add POST /schedulers/:id/execute handler with concurrent run guard - Fix unknown type JSX rendering in RunHistorySection Phase D-E: Service Binding architecture refactor - Convert crawler-manager-worker to WorkerEntrypoint with executeCrawler() RPC - Add crawler-executor.ts (resolveURL + executeCrawler) in crawler-manager - Create ServiceBindingCrawlerExecutionClient for scheduler → crawler-manager RPC - Remove crawler knowledge from scheduler-manager (no getCrawlerByID, no resolveURL) - Configure [[services]] binding in scheduler-manager wrangler.toml - Update tests: SELF.fetch() pattern for WorkerEntrypoint, auxiliary worker mock Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
- crawler-executor: 7 tests (web/data routing, URL resolution, error handling) - stage-runner: 19 tests (executeStage, executeFanOut, resolveDefaultInput) - scheduler-executor: 12 tests (pipeline orchestration, fan-out, partial failure) Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
- Replace `as any` with typed CrawlerManagerRPC interface for Service Binding RPC - Add fan_out_field input type validation before object cast - Add JWT_SECRET to crawler-manager-worker secrets comment Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- Atomic concurrent run guard via DB unique partial index (#1) - Pipeline timeout 5 minutes with AbortController to prevent state race (#2) - Per-RPC child logger with crawlerID context (#3, #4) - Shared CrawlerExecuteResult type with runtime validation (#5) - Protected updateSchedulerStageRun in catch with .catch() (#6) - Input validation for resolveDefaultInput (#7) - Concurrency-ready fan-out with Symbol sentinel for null safety (#9, #10) - HTTP status codes: 408 timeout, 422 config, 502 upstream, 503 DB (#11) - Runtime validation for code-runner HTTP responses (#13) - ReDoS protection at execution time with safe-regex2 (#14, #15) - Typed logger mocks in tests (#16) - Added tests: resolveHTTPStatus, validateCrawlerExecuteResult, validateCodeRunnerResult, fan-out null preservation Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Walkthrough웹·데이터 크롤러 실행 모드와 스케줄러 단계/팬아웃 실행 오케스트레이션을 도입하고, 코드 러너의 요청/검증·샌드박스 실행 흐름을 재구성하며 Supabase 스키마·권한·마이그레이션 및 관련 워커/테스트를 대규모로 추가·수정했습니다. Changes
Sequence Diagram(s)sequenceDiagram
participant Client
participant CrawlerMgr as Crawler Manager<br/>Worker
participant CodeRunnerFn as Code Runner<br/>Function
participant TargetSite as Target Website
Client->>CrawlerMgr: executeCrawler(crawlerID, input)
CrawlerMgr->>CrawlerMgr: getCrawlerByID / resolveURL / validate pattern
alt crawler.type == 'web'
CrawlerMgr->>CodeRunnerFn: POST /run { type: 'web', mode, url, code }
CodeRunnerFn->>TargetSite: fetch(url)
TargetSite-->>CodeRunnerFn: response text
CodeRunnerFn->>CodeRunnerFn: executeInSandbox(code, response text)
else crawler.type == 'data'
CrawlerMgr->>CodeRunnerFn: POST /run { type: 'data', mode, data, code }
CodeRunnerFn->>CodeRunnerFn: executeInSandbox(code, data)
end
CodeRunnerFn-->>CrawlerMgr: { type, mode, result }
CrawlerMgr-->>Client: CrawlerExecuteResult
sequenceDiagram
participant Client
participant SchedulerMgr as Scheduler Manager<br/>Worker
participant Supabase
participant CrawlerMgr as Crawler Manager<br/>Worker
Client->>SchedulerMgr: POST /schedulers/:id/execute
SchedulerMgr->>Supabase: verifySchedulerOwnership / create run (pending)
Supabase-->>SchedulerMgr: run record
SchedulerMgr->>Supabase: listSchedulerStages
Supabase-->>SchedulerMgr: stages
loop stages
alt stage.fan_out_field set
SchedulerMgr->>Supabase: create scheduler_stage_run (running)
loop per item
SchedulerMgr->>CrawlerMgr: executeCrawler(crawler_id, item)
CrawlerMgr-->>SchedulerMgr: result
end
SchedulerMgr->>Supabase: update scheduler_stage_run (completed/partially_failed/failed)
else
SchedulerMgr->>Supabase: create scheduler_stage_run (running)
SchedulerMgr->>CrawlerMgr: executeCrawler(crawler_id, input)
CrawlerMgr-->>SchedulerMgr: result
SchedulerMgr->>Supabase: update scheduler_stage_run
end
end
SchedulerMgr->>Supabase: update run status (completed/failed/partially_failed)
Supabase-->>SchedulerMgr: updated run
SchedulerMgr-->>Client: HTTP response with run state
Estimated code review effort🎯 5 (Critical) | ⏱️ ~120 minutes Possibly related PRs
시
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
|
@coderabbitai review Review Context이 PR은 Cloudflare Workers 상에서 end-to-end scheduler execution path를 추가합니다. scheduler-manager worker는 POST /schedulers/:id/execute를 노출하고, scheduler_run row를 생성한 뒤, executeScheduler를 호출하고, 최종 run state를 반환합니다. executeScheduler는 Supabase에서 순서가 지정된 stage들을 로드하고, 첫 stage의 input_schema defaults로부터 초기 input을 도출한 다음, 각 stage를 service-binding client를 통해 crawler-manager에서 실행하며, stage별 execution record를 scheduler_stage_runs에, 최종 scheduler_runs status/result를 별도로 영속화합니다. crawler-manager는 RPC-style executeCrawler(crawlerID, input)를 노출하며, ownership checks 없이 crawler를 로드하고, url_pattern safety를 검증하고, web crawler용 URL을 해석한 뒤, execution을 code-runner HTTP client에 위임합니다. 주의 깊게 검토가 필요한 영역
|
|
✅ Actions performedReview triggered.
|
There was a problem hiding this comment.
Actionable comments posted: 15
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In
`@packages/supabase-connector/migrations/006_add_active_run_unique_constraint.sql`:
- Around line 5-7: The migration must first remove or normalize existing
duplicates before creating the partial unique index; add a pre-step that scans
scheduler_runs for rows with the same scheduler_id and status IN
('pending','running') and keeps one canonical active row (e.g., by newest
created_at or highest id) while marking or deleting the others (e.g., set
status='failed' or remove duplicate rows) to ensure zero duplicates per
scheduler_id, then create the CREATE UNIQUE INDEX
scheduler_runs_one_active_per_scheduler ON scheduler_runs (scheduler_id) WHERE
status IN ('pending', 'running'); use the table name scheduler_runs and the
index name scheduler_runs_one_active_per_scheduler to locate where to insert
this cleanup step in the migration.
In `@packages/supabase-connector/sources/crawlers.ts`:
- Around line 95-131: The handlers handleCreateStage and handleUpdateStage
currently only validate crawler_id format but do not verify ownership; update
both to call getCrawler (or an equivalent ownership-check function like
getCrawler(client, crawlerID, userUUID)) after parsing crawler_id and before
persisting the stage, and if getCrawler returns null (not owned) return a
403/validation error and abort; ensure you use the same Supabase client and the
authenticated user's UUID when calling getCrawler so stages cannot be assigned
to crawlers owned by other users.
In `@workers/crawler-code-runner-worker/sources/index.ts`:
- Around line 132-141: 현재 handleRun에서 fetchResponse.text()로 전체 바디를 무제한 버퍼링해 메모리
폭주 위험이 있으니, fetch 호출 직후 fetchResponse.headers.get('content-length')를 확인하고 허용
상한(FETCH_MAX_BYTES)을 초과하면 즉시 실패 반환하도록 하고, Content-Length가 없거나 작을 경우에도 body를
streaming으로 읽어 누적된 바이트가 FETCH_MAX_BYTES를 초과하면 읽기를 중단(AbortController 또는
reader.cancel())하고 오류를 반환하도록 변경하세요; 참고로 수정할 위치는 handleRun 내의
fetchResponse/responseText 처리 로직이며 기존 변수들(FETCH_TIMEOUT_MILLISECONDS,
fetchResponse, responseText, targetURL)을 그대로 사용해 상한 검사와 스트리밍 읽기 루프를 추가하면 됩니다.
- Around line 135-147: In handleRun, avoid logging the full user-provided URL
(targetURL) to prevent leaking sensitive query/userinfo; instead construct a
sanitized URL (e.g., clone targetURL, clear username/password and search/query)
and log only sanitizedURL.origin or host plus pathname. Update the four logging
sites referencing targetURL.toString() — the logger.info('Fetching target
URL'...), logger.info('Target URL fetched'...), logger.error('Fetch timed
out'...), and logger.error('Failed to fetch target URL'...) — to use the
sanitized value while keeping the actual fetch call using the original
targetURL; ensure timeout log still reports FETCH_TIMEOUT_MILLISECONDS and error
logs include the sanitized URL field.
In `@workers/crawler-manager-worker/sources/code-runner-client.ts`:
- Around line 128-130: The success-path is directly calling await
response.json() and validateCodeRunnerResult in the response.ok branch (around
validateCodeRunnerResult), so malformed JSON throws a raw SyntaxError instead of
a CodeRunnerExecutionError; wrap the await response.json() call in a try/catch
inside the response.ok branch, on catch throw a new CodeRunnerExecutionError
(use the same error shape used in the 4xx handler, e.g., error type
"invalid_response" and include the original error message and response
status/body context) before calling validateCodeRunnerResult, ensuring all JSON
parse failures are normalized.
In `@workers/crawler-manager-worker/sources/crawler-executor.ts`:
- Around line 47-67: The empty catch swallowing RegExp compilation errors in
executeCrawler makes debugging hard; modify the catch block after new
RegExp(crawler.url_pattern) to log a warning or error via
logger.warn/logger.error (include crawler.id, crawler.url_pattern and url
context) and a clear message like "Invalid url_pattern regex" so failures in
RegExp construction are recorded, while still skipping validation; reference
isSafeURLPattern, executeCrawler, and the RegExp construction to locate the
spot.
In `@workers/crawler-manager-worker/sources/index.ts`:
- Around line 417-434: The code currently skips ownership checks for crawlers,
so add ownership validation immediately after crawler_id validation in
handleCreateStage and handleUpdateStage by calling a scoped lookup (e.g.,
getCrawler(supabaseClient, crawlerID, userUUID) or a new function that verifies
user_uuid equals the crawler owner) and reject the request if not owned;
additionally, update executeCrawler to use a user-scoped fetch instead of
getCrawlerByID (or perform an explicit owner check after getCrawlerByID) so
HTTPCodeRunnerClient executions cannot be invoked on crawlers the caller does
not own.
In `@workers/scheduler-manager-worker/sources/handlers/scheduler-execution.ts`:
- Around line 17-26: In resolveHTTPStatus, currently unknown failures fall back
to 200; change the logic so that when status === 'failed' the default fallback
is 500 (internal server error) instead of 200: keep the existing specific error
substring checks (e.g., 'timed out' -> 408, 'Invalid
input_schema'/'fan_out_field' -> 422, 'CodeRunner error'/'Invalid
CrawlerExecuteResult' -> 502, 'Supabase'/'database' -> 503) but ensure the final
return for failed statuses is 500 (and only return 200 for non-failed/
completed/partially_failed statuses or when status !== 'failed' and error is
null), updating resolveHTTPStatus accordingly so unknown/unaligned failure
messages no longer map to 200.
- Around line 91-103: The timeout path currently unconditionally calls
abortController.abort() and then updateSchedulerRun(..., {status: 'failed',
...}), which can race with executeScheduler()'s own terminal updates; change the
timeout handler in handleExecuteScheduler to perform an atomic conditional
update via updateSchedulerRun/supabaseClient so the status is set to 'failed'
only if the current run.status is one of the non-terminal states (e.g.,
'pending' or 'running'), and skip the patch if the DB shows a terminal status
(completed/partially_failed/failed); keep abortController.abort() but ensure the
conditional WHERE/patch uses run.id and status IN (...) to avoid overwriting an
already-finalized run.
In `@workers/scheduler-manager-worker/sources/index.ts`:
- Around line 252-259: The abort/timeout path currently only marks scheduler_run
as failed but leaves scheduler_stage_run rows in non-terminal states; add
cleanup to executeScheduler (scheduler-executor.ts) so that on abort/timeout it
finds all scheduler_stage_run records for the running scheduler_run (status in
['running','pending','queued','in_progress'] etc.) and updates them to a
terminal status (e.g., 'failed' or 'aborted') with an appropriate
end_time/reason; perform this update in the abort/timeout handler or the finally
block where scheduler.last_run_at is updated, and reuse existing DB/update
helpers used by the scheduler_run update (the same persistence functions called
from handleExecuteScheduler / scheduler-execution.ts) and emit any existing
lifecycle events/notifications so retries aren’t blocked by lingering running
stage_run rows.
- Around line 252-259: Add a JSDoc block to the executeFanOut() function
clarifying that its internal filter() call compresses the returned results array
by removing failed items (so returned results do not preserve original input
indices), that callers must track position or failure details externally using
provided metadata fields itemsTotal, itemsSucceeded, itemsFailed if they need
mapping to original inputs, and that this compressed-results behavior is an
intentional contract of executeFanOut() and must be relied upon by downstream
code (reference executeFanOut and the filter-based compression in
stage-runner.ts).
In `@workers/scheduler-manager-worker/sources/stage-runner.ts`:
- Around line 105-145: The current code removes failed slots by using
results.filter(...) which shifts positions; instead preserve the original index
mapping by returning the full positional results array (keeping FAN_OUT_FAILED
placeholders) or wrapping each entry in an item-level envelope that indicates
success/failure; update the return value used by executeFanOut (and the local
results variable and any callers expecting successfulResults) to return results
as-is (or map to envelopes) so downstream consumers can determine which input
indexes failed, and ensure itemsTotal and status remain correct.
- Around line 45-50: executeStage가 상위 타임아웃/abort 신호를 전달받아 중단되지 않는 문제: thread된
AbortSignal을 도입해 executeStage(및 executeScheduler에서 호출하는 두 helper)를 수정하고, crawler
클라이언트 호출과 fan-out 반복문에 이 signal을 전달하도록 하세요; 각 반복(iteration)과 상태 업데이트 직전에
signal.aborted를 확인해 즉시 루프/함수에서 반환하고, crawler 호출 시에도 abort-able request를 사용해(혹은
클라이언트의 cancel API 호출) 진행 중 요청을 중단하도록 구현하세요. 또한 executeStage가 signal이 중단된 경우 즉시
반환하도록 보장하고, helper들(executeStage에서 호출하는 해당 helper 함수들 이름을 찾아)을 업데이트해 시그널을 인자로
받거나 상위의 signal을 캡처하도록 변경하세요.
In `@workers/scheduler-manager-worker/tests/scheduler-executor.test.ts`:
- Around line 459-484: The test "always updates scheduler.last_run_at in finally
block" currently lacks an assertion that the final PATCH actually occurred;
update the test to explicitly verify mockUpdateScheduler was called (or that the
scheduler update call was made with a payload containing last_run_at) after
calling executeScheduler(dependencies, SCHEDULER_ID, USER_UUID, RUN_ID);
reference the test helper mockUpdateScheduler and the executeScheduler
invocation to locate where to add an assertion like
expect(mockUpdateScheduler).toHaveBeenCalled() or inspecting the call args to
confirm last_run_at was set.
In `@workers/scheduler-manager-worker/vitest.config.ts`:
- Around line 16-21: The test stub response for the worker
"audio-underview-crawler-manager-worker" returns "{}", which lacks the required
type/result fields and will fail validateCrawlerExecuteResult; update the
script's fetch() return to emit a JSON body with the minimal validated shape
(e.g., include "type" and "result" fields, and any subfields
validateCrawlerExecuteResult expects) so the stubbed Response matches the
validation contract used by validateCrawlerExecuteResult.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: 8697ec7c-6735-450f-9e05-4ba5c89d5084
⛔ Files ignored due to path filters (1)
pnpm-lock.yamlis excluded by!**/pnpm-lock.yaml
📒 Files selected for processing (32)
applications/web/sources/components/schedulers/RunHistorySection.tsxfunctions/crawler-code-runner-function/sources/index.tsfunctions/crawler-code-runner-function/tests/index.test.tspackages/supabase-connector/migrations/006_add_active_run_unique_constraint.sqlpackages/supabase-connector/sources/crawlers.tspackages/supabase-connector/sources/index.tspackages/supabase-connector/sources/scheduler-stage-runs.tspackages/supabase-connector/sources/types/database.tsworkers/crawler-code-runner-worker/sources/create-code-runner.tsworkers/crawler-code-runner-worker/sources/index.tsworkers/crawler-code-runner-worker/tests/index.test.tsworkers/crawler-manager-worker/package.jsonworkers/crawler-manager-worker/sources/code-runner-client.tsworkers/crawler-manager-worker/sources/crawler-executor.tsworkers/crawler-manager-worker/sources/index.tsworkers/crawler-manager-worker/sources/safe-url-pattern.tsworkers/crawler-manager-worker/tests/crawler-executor.test.tsworkers/crawler-manager-worker/tests/index.test.tsworkers/crawler-manager-worker/vitest.config.tsworkers/crawler-manager-worker/wrangler.tomlworkers/scheduler-manager-worker/sources/crawler-execution-client.tsworkers/scheduler-manager-worker/sources/handlers/scheduler-execution.tsworkers/scheduler-manager-worker/sources/index.tsworkers/scheduler-manager-worker/sources/scheduler-executor.tsworkers/scheduler-manager-worker/sources/stage-runner.tsworkers/scheduler-manager-worker/tests/scheduler-execution.test.tsworkers/scheduler-manager-worker/tests/scheduler-executor.test.tsworkers/scheduler-manager-worker/tests/stage-runner.test.tsworkers/scheduler-manager-worker/vitest.config.tsworkers/scheduler-manager-worker/wrangler.tomlworkers/tools/sources/index.tsworkers/tools/sources/types.ts
| /** | ||
| * Gets a single crawler by ID without ownership check. | ||
| * Used by the execution engine where the crawler may belong to any user. | ||
| * | ||
| * @param client - Supabase client | ||
| * @param id - Crawler ID | ||
| * @returns Crawler row if found, null otherwise | ||
| */ | ||
| export async function getCrawlerByID( | ||
| client: SupabaseClientType, | ||
| id: string, | ||
| ): Promise<CrawlerRow | null> { | ||
| return traceDatabaseOperation( | ||
| { serviceName: 'supabase-connector', operation: 'select', table: 'crawlers' }, | ||
| async (span) => { | ||
| span.setAttribute('db.query.id', id); | ||
|
|
||
| const { data, error } = await client | ||
| .from('crawlers') | ||
| .select('*') | ||
| .eq('id', id) | ||
| .single(); | ||
|
|
||
| if (error) { | ||
| if (error.code === 'PGRST116') { | ||
| span.setAttribute('db.rows_affected', 0); | ||
| return null; | ||
| } | ||
| span.setStatus({ code: SpanStatusCode.ERROR, message: error.message }); | ||
| throw new Error(`Failed to get crawler by ID: ${error.message}`); | ||
| } | ||
|
|
||
| span.setAttribute('db.rows_affected', 1); | ||
| return data as CrawlerRow; | ||
| } | ||
| ); | ||
| } |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
set -euo pipefail
printf '\n== scheduler stage handlers ==\n'
fd -a 'scheduler-stages\.ts$' workers/scheduler-manager-worker/sources/handlers -x sed -n '1,260p' {}
printf '\n== crawler lookup call sites ==\n'
rg -n -C3 --type=ts '\b(getCrawlerByID|getCrawler)\s*\(' workers packages
printf '\n== crawler_id input/persistence paths ==\n'
rg -n -C3 --type=ts '\bcrawler_id\b' workers/scheduler-manager-worker packages/supabase-connectorRepository: gignac-cha/audio-underview
Length of output: 33450
HTTP handlers에서 crawler_id 소유권 검증이 누락되어 있습니다.
handleCreateStage() 및 handleUpdateStage()는 scheduler 소유권만 확인하고, 제출된 crawler_id가 현재 사용자 소유인지 검증하지 않습니다 (UUID 형식 검증만 실행). 다른 사용자의 crawler를 자신의 scheduler stage에 할당할 수 있으며, 실행 시 그 crawler를 실행하게 됩니다. stage 생성 및 수정 경로에서 getCrawler(client, crawlerID, userUUID) 또는 동등한 ownership check를 추가하세요.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@packages/supabase-connector/sources/crawlers.ts` around lines 95 - 131, The
handlers handleCreateStage and handleUpdateStage currently only validate
crawler_id format but do not verify ownership; update both to call getCrawler
(or an equivalent ownership-check function like getCrawler(client, crawlerID,
userUUID)) after parsing crawler_id and before persisting the stage, and if
getCrawler returns null (not owned) return a 403/validation error and abort;
ensure you use the same Supabase client and the authenticated user's UUID when
calling getCrawler so stages cannot be assigned to crawlers owned by other
users.
| let responseText: string; | ||
| try { | ||
| const signal = AbortSignal.timeout(FETCH_TIMEOUT_MILLISECONDS); | ||
| logger.info('Fetching target URL', { url: targetURL.toString() }, { function: 'handleRun' }); | ||
| const fetchResponse = await fetch(targetURL.toString(), { signal }); | ||
| responseText = await fetchResponse.text(); | ||
| logger.info('Target URL fetched', { | ||
| status: fetchResponse.status, | ||
| contentLength: responseText.length, | ||
| }, { function: 'handleRun' }); |
There was a problem hiding this comment.
원격 응답 본문을 무제한 text()로 버퍼링하면 큰 페이지 하나로 메모리 폭주가 납니다.
현재는 임의 URL의 body를 전량 메모리에 올린 뒤 실행하므로 oversized response 하나만으로도 worker 메모리와 CPU를 쉽게 압박할 수 있습니다. 최소한 Content-Length 상한과 streaming cap을 두고 초과 시 실패로 반환하세요.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@workers/crawler-code-runner-worker/sources/index.ts` around lines 132 - 141,
현재 handleRun에서 fetchResponse.text()로 전체 바디를 무제한 버퍼링해 메모리 폭주 위험이 있으니, fetch 호출 직후
fetchResponse.headers.get('content-length')를 확인하고 허용 상한(FETCH_MAX_BYTES)을 초과하면
즉시 실패 반환하도록 하고, Content-Length가 없거나 작을 경우에도 body를 streaming으로 읽어 누적된 바이트가
FETCH_MAX_BYTES를 초과하면 읽기를 중단(AbortController 또는 reader.cancel())하고 오류를 반환하도록
변경하세요; 참고로 수정할 위치는 handleRun 내의 fetchResponse/responseText 처리 로직이며 기존
변수들(FETCH_TIMEOUT_MILLISECONDS, fetchResponse, responseText, targetURL)을 그대로 사용해
상한 검사와 스트리밍 읽기 루프를 추가하면 됩니다.
| logger.info('Fetching target URL', { url: targetURL.toString() }, { function: 'handleRun' }); | ||
| const fetchResponse = await fetch(targetURL.toString(), { signal }); | ||
| responseText = await fetchResponse.text(); | ||
| logger.info('Target URL fetched', { | ||
| status: fetchResponse.status, | ||
| contentLength: responseText.length, | ||
| }, { function: 'handleRun' }); | ||
| } catch (fetchError) { | ||
| if (fetchError instanceof DOMException && fetchError.name === 'TimeoutError') { | ||
| logger.error('Fetch timed out', { url: targetURL.toString(), timeoutMilliseconds: FETCH_TIMEOUT_MILLISECONDS }, { function: 'handleRun' }); | ||
| return errorResponse('fetch_timeout', `Fetch timed out after ${FETCH_TIMEOUT_MILLISECONDS}ms`, 504, context); | ||
| } | ||
| logger.error('Failed to fetch target URL', { error: fetchError, url: targetURL.toString() }, { function: 'handleRun' }); |
There was a problem hiding this comment.
사용자 제공 URL 전체를 로그에 남기지 마세요.
여기는 임의 URL을 받는 경로라서 query string, signed token, 기본 인증 정보 같은 민감값이 그대로 로그로 유출될 수 있습니다. 최소한 userinfo와 search는 제거하고, 가능하면 host 수준만 기록하는 편이 안전합니다.
🔒 제안된 수정
+ const logURL = new URL(targetURL);
+ logURL.username = '';
+ logURL.password = '';
+ logURL.search = '';
+
- logger.info('Fetching target URL', { url: targetURL.toString() }, { function: 'handleRun' });
+ logger.info('Fetching target URL', { url: logURL.toString() }, { function: 'handleRun' });
const fetchResponse = await fetch(targetURL.toString(), { signal });
responseText = await fetchResponse.text();
logger.info('Target URL fetched', {
status: fetchResponse.status,
contentLength: responseText.length,
}, { function: 'handleRun' });
} catch (fetchError) {
if (fetchError instanceof DOMException && fetchError.name === 'TimeoutError') {
- logger.error('Fetch timed out', { url: targetURL.toString(), timeoutMilliseconds: FETCH_TIMEOUT_MILLISECONDS }, { function: 'handleRun' });
+ logger.error('Fetch timed out', { url: logURL.toString(), timeoutMilliseconds: FETCH_TIMEOUT_MILLISECONDS }, { function: 'handleRun' });
return errorResponse('fetch_timeout', `Fetch timed out after ${FETCH_TIMEOUT_MILLISECONDS}ms`, 504, context);
}
- logger.error('Failed to fetch target URL', { error: fetchError, url: targetURL.toString() }, { function: 'handleRun' });
+ logger.error('Failed to fetch target URL', { error: fetchError, url: logURL.toString() }, { function: 'handleRun' });
return errorResponse('fetch_failed', 'Failed to fetch the target URL', 502, context);📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| logger.info('Fetching target URL', { url: targetURL.toString() }, { function: 'handleRun' }); | |
| const fetchResponse = await fetch(targetURL.toString(), { signal }); | |
| responseText = await fetchResponse.text(); | |
| logger.info('Target URL fetched', { | |
| status: fetchResponse.status, | |
| contentLength: responseText.length, | |
| }, { function: 'handleRun' }); | |
| } catch (fetchError) { | |
| if (fetchError instanceof DOMException && fetchError.name === 'TimeoutError') { | |
| logger.error('Fetch timed out', { url: targetURL.toString(), timeoutMilliseconds: FETCH_TIMEOUT_MILLISECONDS }, { function: 'handleRun' }); | |
| return errorResponse('fetch_timeout', `Fetch timed out after ${FETCH_TIMEOUT_MILLISECONDS}ms`, 504, context); | |
| } | |
| logger.error('Failed to fetch target URL', { error: fetchError, url: targetURL.toString() }, { function: 'handleRun' }); | |
| const logURL = new URL(targetURL); | |
| logURL.username = ''; | |
| logURL.password = ''; | |
| logURL.search = ''; | |
| logger.info('Fetching target URL', { url: logURL.toString() }, { function: 'handleRun' }); | |
| const fetchResponse = await fetch(targetURL.toString(), { signal }); | |
| responseText = await fetchResponse.text(); | |
| logger.info('Target URL fetched', { | |
| status: fetchResponse.status, | |
| contentLength: responseText.length, | |
| }, { function: 'handleRun' }); | |
| } catch (fetchError) { | |
| if (fetchError instanceof DOMException && fetchError.name === 'TimeoutError') { | |
| logger.error('Fetch timed out', { url: logURL.toString(), timeoutMilliseconds: FETCH_TIMEOUT_MILLISECONDS }, { function: 'handleRun' }); | |
| return errorResponse('fetch_timeout', `Fetch timed out after ${FETCH_TIMEOUT_MILLISECONDS}ms`, 504, context); | |
| } | |
| logger.error('Failed to fetch target URL', { error: fetchError, url: logURL.toString() }, { function: 'handleRun' }); |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@workers/crawler-code-runner-worker/sources/index.ts` around lines 135 - 147,
In handleRun, avoid logging the full user-provided URL (targetURL) to prevent
leaking sensitive query/userinfo; instead construct a sanitized URL (e.g., clone
targetURL, clear username/password and search/query) and log only
sanitizedURL.origin or host plus pathname. Update the four logging sites
referencing targetURL.toString() — the logger.info('Fetching target URL'...),
logger.info('Target URL fetched'...), logger.error('Fetch timed out'...), and
logger.error('Failed to fetch target URL'...) — to use the sanitized value while
keeping the actual fetch call using the original targetURL; ensure timeout log
still reports FETCH_TIMEOUT_MILLISECONDS and error logs include the sanitized
URL field.
| case 'scheduler_execute': { | ||
| if (request.method === 'POST') { | ||
| return await handleExecuteScheduler(environment, context, route.schedulerID!, userUUID); | ||
| } | ||
| const response = errorResponse('method_not_allowed', 'Method not allowed', 405, context); | ||
| response.headers.set('Allow', 'POST'); | ||
| return response; | ||
| } |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
set -euo pipefail
printf '\n== scheduler execution handler ==\n'
fd -a 'scheduler-execution\.ts$' workers/scheduler-manager-worker/sources/handlers -x sed -n '1,260p' {}
printf '\n== scheduler executor ==\n'
fd -a 'scheduler-executor\.ts$' workers/scheduler-manager-worker/sources -x sed -n '1,260p' {}
printf '\n== stage runner ==\n'
fd -a 'stage-runner\.ts$' workers/scheduler-manager-worker/sources -x sed -n '1,260p' {}
printf '\n== abort and terminal status transitions ==\n'
rg -n -C3 --type=ts 'AbortController|AbortSignal|updateSchedulerRun|updateSchedulerStageRun|pending|running|failed|partially_failed|completed' workers/scheduler-manager-worker/sourcesRepository: gignac-cha/audio-underview
Length of output: 40329
abort 후 모든 stage_run이 terminal state로 업데이트되지 않을 수 있습니다.
타임아웃 발생 시 handler는 scheduler_run 상태를 'failed'로 업데이트하지만(scheduler-execution.ts:93-97), 진행 중인 scheduler_stage_run은 업데이트되지 않습니다. 특히 fan-out 실행 중 timeout이 발생하면, stage_run이 'running' 상태로 남아 같은 scheduler 재실행이 409로 계속 차단됩니다. executeScheduler의 finally 블록(scheduler-executor.ts:177-187)은 scheduler.last_run_at만 업데이트하고 stage_run 정리를 하지 않습니다. abort 신호 시 모든 in-progress stage_run을 terminal state로 업데이트하는 cleanup 로직을 추가해 주세요.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@workers/scheduler-manager-worker/sources/index.ts` around lines 252 - 259,
The abort/timeout path currently only marks scheduler_run as failed but leaves
scheduler_stage_run rows in non-terminal states; add cleanup to executeScheduler
(scheduler-executor.ts) so that on abort/timeout it finds all
scheduler_stage_run records for the running scheduler_run (status in
['running','pending','queued','in_progress'] etc.) and updates them to a
terminal status (e.g., 'failed' or 'aborted') with an appropriate
end_time/reason; perform this update in the abort/timeout handler or the finally
block where scheduler.last_run_at is updated, and reuse existing DB/update
helpers used by the scheduler_run update (the same persistence functions called
from handleExecuteScheduler / scheduler-execution.ts) and emit any existing
lifecycle events/notifications so retries aren’t blocked by lingering running
stage_run rows.
🧹 Nitpick | 🔵 Trivial
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
set -euo pipefail
printf '\n== stage runner ==\n'
fd -a 'stage-runner\.ts$' workers/scheduler-manager-worker/sources -x sed -n '1,260p' {}
printf '\n== fan-out result handling ==\n'
rg -n -C3 --type=ts 'executeFanOut|Symbol|filter\(|results\[|itemsSucceeded|itemsFailed|fan[_-]?out' workers/scheduler-manager-worker/sourcesRepository: gignac-cha/audio-underview
Length of output: 21954
🏁 Script executed:
#!/bin/bash
set -euo pipefail
printf '\n== Tests for executeFanOut ==\n'
fd -a '\.(test|spec)\.(ts|tsx|js|jsx)$' workers/scheduler-manager-worker -x rg -l 'executeFanOut' {}
printf '\n== Documentation files ==\n'
fd -a '\.(md|txt)$' workers/scheduler-manager-worker -x rg -l 'fan.out|fan_out|fanOut' {} 2>/dev/null || true
printf '\n== How fanOutResult is used downstream ==\n'
rg -n 'fanOutResult\.' workers/scheduler-manager-worker/sources -A2 -B2
printf '\n== currentInput usage after fan-out ==\n'
rg -n 'currentInput.*=' workers/scheduler-manager-worker/sources/scheduler-executor.ts | head -20Repository: gignac-cha/audio-underview
Length of output: 264
🏁 Script executed:
#!/bin/bash
set -euo pipefail
printf '\n== stage-runner.test.ts (full) ==\n'
wc -l workers/scheduler-manager-worker/tests/stage-runner.test.ts
printf '\n== scheduler-executor.test.ts (full) ==\n'
wc -l workers/scheduler-manager-worker/tests/scheduler-executor.test.ts
printf '\n== stage-runner test content (search for executeFanOut tests) ==\n'
rg -A30 'executeFanOut|fan.out|fan_out' workers/scheduler-manager-worker/tests/stage-runner.test.ts | head -100
printf '\n== scheduler-executor test content (fan-out related) ==\n'
rg -A30 'executeFanOut|fan.out|fan_out' workers/scheduler-manager-worker/tests/scheduler-executor.test.ts | head -100
printf '\n== How currentInput is reassigned after fan-out ==\n'
rg -n 'currentInput' workers/scheduler-manager-worker/sources/scheduler-executor.ts | head -20Repository: gignac-cha/audio-underview
Length of output: 8203
fan-out 결과 압축 동작이 설계 의도임을 명확히 문서화해 주세요.
현재 구현에서 filter()는 실패한 항목을 제거하여 압축된 배열을 반환합니다(stage-runner.ts 141줄). 이는 테스트에서 확인할 수 있듯이 의도된 동작이지만, 다음 단계가 이를 전제로 동작하는지 명시적으로 문서화되지 않았습니다.
executeFanOut() 함수에 JSDoc 코멘트를 추가하여 다음을 명확히 해주세요:
results배열은 압축되어 원래 입력의 인덱스 대응 정보를 포함하지 않음- 실패한 항목의 위치 정보가 필요한 경우
itemsTotal,itemsSucceeded,itemsFailed메타데이터와 함께 외부에서 추적해야 함 - 이 동작이 의도적인 계약임을 명시
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@workers/scheduler-manager-worker/sources/index.ts` around lines 252 - 259,
Add a JSDoc block to the executeFanOut() function clarifying that its internal
filter() call compresses the returned results array by removing failed items (so
returned results do not preserve original input indices), that callers must
track position or failure details externally using provided metadata fields
itemsTotal, itemsSucceeded, itemsFailed if they need mapping to original inputs,
and that this compressed-results behavior is an intentional contract of
executeFanOut() and must be relied upon by downstream code (reference
executeFanOut and the filter-based compression in stage-runner.ts).
| export async function executeStage( | ||
| dependencies: StageRunnerDependencies, | ||
| runID: string, | ||
| stage: SchedulerStageRow, | ||
| input: unknown, | ||
| ): Promise<StageResult> { |
There was a problem hiding this comment.
상위 timeout 신호가 stage 실행으로 내려오지 않아 중단이 실제로 일어나지 않습니다.
executeScheduler()는 abort 후 바로 return할 수 있지만, 이 두 helper는 signal을 받지 않아서 이미 시작된 crawler 호출과 fan-out loop가 끝까지 계속됩니다. timeout 시점에 stage_run이 오래 running으로 남거나, handler가 408을 반환한 뒤에도 후속 작업이 계속될 수 있으니, 중단 신호를 helper와 crawler client까지 전달하고 각 iteration/상태 업데이트 전에 확인해야 합니다.
Also applies to: 97-102, 111-130
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@workers/scheduler-manager-worker/sources/stage-runner.ts` around lines 45 -
50, executeStage가 상위 타임아웃/abort 신호를 전달받아 중단되지 않는 문제: thread된 AbortSignal을 도입해
executeStage(및 executeScheduler에서 호출하는 두 helper)를 수정하고, crawler 클라이언트 호출과
fan-out 반복문에 이 signal을 전달하도록 하세요; 각 반복(iteration)과 상태 업데이트 직전에 signal.aborted를
확인해 즉시 루프/함수에서 반환하고, crawler 호출 시에도 abort-able request를 사용해(혹은 클라이언트의 cancel API
호출) 진행 중 요청을 중단하도록 구현하세요. 또한 executeStage가 signal이 중단된 경우 즉시 반환하도록 보장하고,
helper들(executeStage에서 호출하는 해당 helper 함수들 이름을 찾아)을 업데이트해 시그널을 인자로 받거나 상위의
signal을 캡처하도록 변경하세요.
| workers: [ | ||
| { | ||
| name: 'audio-underview-crawler-manager-worker', | ||
| modules: true, | ||
| script: 'export default { async fetch() { return new Response("{}"); } }', | ||
| }, |
There was a problem hiding this comment.
테스트 스텁 응답이 실행 결과 검증 규약과 맞지 않습니다.
Line 20의 "{}"는 type/result 필드가 없어 validateCrawlerExecuteResult 경로에서 실패할 수 있습니다. 최소한 검증 가능한 형태로 스텁을 맞춰주세요.
수정 예시 (검증 가능한 응답 형태로 정렬)
- script: 'export default { async fetch() { return new Response("{}"); } }',
+ script: `
+ export default {
+ async executeCrawler() {
+ return { type: 'data', result: null };
+ },
+ async fetch() {
+ return new Response('{"type":"data","result":null}', {
+ headers: { 'Content-Type': 'application/json' },
+ });
+ },
+ };
+ `,🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@workers/scheduler-manager-worker/vitest.config.ts` around lines 16 - 21, The
test stub response for the worker "audio-underview-crawler-manager-worker"
returns "{}", which lacks the required type/result fields and will fail
validateCrawlerExecuteResult; update the script's fetch() return to emit a JSON
body with the minimal validated shape (e.g., include "type" and "result" fields,
and any subfields validateCrawlerExecuteResult expects) so the stubbed Response
matches the validation contract used by validateCrawlerExecuteResult.
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- Add crawler_permissions table with owner/subscriber levels (migration 007) - Add fan_out_strategy option (compact/preserve) for positional alignment (migration 008) - Add crawler permission check in stage create/update handlers - Add Content-Length check for fetch response size limit - Add conditional update to prevent timeout race on terminal status - Propagate abort signal to executeStage/executeFanOut - Wrap response.json() SyntaxError in CodeRunnerExecutionError - Change resolveHTTPStatus fallback from 200 to 500 for unknown errors - Add logging to empty catch blocks - Fix vitest stub response to match CrawlerExecuteResult schema - Add explicit finally block assertion in scheduler-executor tests - Add duplicate cleanup in migration 006 before unique index creation Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
|
@coderabbitai review 이전 리뷰 이후 변경 사항 컨텍스트이전 CodeRabbit 리뷰에서 지적된 사항들을 반영한 수정입니다. 주요 변경 내용: 보안/권한
Timeout/Race condition
Fan-out 개선
입력 검증/에러 처리
테스트
DB 마이그레이션
|
|
✅ Actions performedReview triggered.
|
There was a problem hiding this comment.
Actionable comments posted: 12
♻️ Duplicate comments (2)
workers/scheduler-manager-worker/vitest.config.ts (1)
16-21:⚠️ Potential issue | 🟠 Major서비스 바인딩 스텁이 RPC/검증 계약을 충족하지 않습니다.
Line 20의 스텁은
fetch()만 제공하고{ success, result }를 반환합니다. 하지만 실제 호출부는CRAWLER_MANAGER.executeCrawler(...)를 호출한 뒤validateCrawlerExecuteResult(...)로 검증합니다 (workers/scheduler-manager-worker/sources/crawler-execution-client.ts). 현재 형태는 RPC 메서드 계약과 결과 스키마(type포함) 모두와 불일치합니다.수정 예시
workers: [ { name: 'audio-underview-crawler-manager-worker', modules: true, - script: 'export default { async fetch() { return Response.json({ success: true, result: null }); } }', + script: ` + import { WorkerEntrypoint } from 'cloudflare:workers'; + export default class extends WorkerEntrypoint { + async executeCrawler() { + return { type: 'data', result: null }; + } + async fetch() { + return Response.json({ type: 'data', result: null }); + } + } + `, }, ],검증은 아래처럼 심볼 기준으로 확인해 주세요.
#!/bin/bash set -euo pipefail echo "== RPC 호출부 확인 ==" rg -nP --type=ts -C3 'interface\s+CrawlerManagerRPC|executeCrawler\s*\(' workers/scheduler-manager-worker echo echo "== 결과 검증 경로 확인 ==" rg -nP --type=ts -C3 'validateCrawlerExecuteResult|CrawlerExecuteResult' echo echo "== 현재 Miniflare 스텁 확인 ==" nl -ba workers/scheduler-manager-worker/vitest.config.ts | sed -n '14,24p'🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@workers/scheduler-manager-worker/vitest.config.ts` around lines 16 - 21, The Miniflare stub currently returns only fetch() with { success, result } which does not match the RPC contract; update the stubbed script (the object exported in the workers' entry with async fetch()) to implement the same RPC shape that CRAWLER_MANAGER.executeCrawler expects and that validateCrawlerExecuteResult/CrawlerExecuteResult validate: call/handle the executeCrawler RPC method and return an object including the required "type" field and the full result schema (not just null) along with success; ensure the stubbed response shape exactly matches the CrawlerExecuteResult interface and any fields used by validateCrawlerExecuteResult so the test-side validation will pass.workers/scheduler-manager-worker/sources/stage-runner.ts (1)
54-76:⚠️ Potential issue | 🟠 Major타임아웃 후에도 stage를
completed로 기록할 수 있습니다.
signal은 호출 전에만 확인되고,crawlerExecutionClient.execute()에는 전달되지 않습니다. 타임아웃이await중에 발생하면 상위scheduler_run은 이미failed가 되었는데도 여기서는 응답 도착 후scheduler_stage_runs를completed로 덮어쓸 수 있습니다. RPC를 취소 가능하게 만들고, 각await직후에도signal.aborted를 다시 확인한 뒤 DB를 갱신하세요.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@workers/scheduler-manager-worker/sources/stage-runner.ts` around lines 54 - 76, The stage run can be marked "completed" after a timeout because the abort signal isn't propagated and not re-checked after awaits; update createSchedulerStageRun / crawlerExecutionClient.execute / updateSchedulerStageRun usage so that crawlerExecutionClient.execute is called with the abort signal (make the RPC cancelable) and then check signal?.aborted immediately after each await (especially after the await on crawlerExecutionClient.execute and before calling updateSchedulerStageRun) and if aborted throw or return without writing a completed status, ensuring updateSchedulerStageRun is only called when not aborted.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@packages/supabase-connector/sources/crawler-permissions.ts`:
- Around line 72-97: The function deleteCrawlerPermission currently
unconditionally sets span attribute db.rows_affected to 1; change the delete
call to request the deleted rows (e.g., use .delete().select(...) or the
appropriate Supabase return option) and use the returned data length to set
db.rows_affected (0 if no rows deleted) and only mark error/throw on actual
error; update references in the trace callback (span, client, crawlerID,
userUUID) so span.setAttribute('db.rows_affected', data?.length ?? 0) instead of
the hardcoded 1.
- Around line 26-30: The test helper mockCrawlerPermission is only stubbing GET
calls, but createCrawlerPermission uses
client.from('crawler_permissions').insert(...) which issues a POST; add a POST
mock inside mockCrawlerPermission to intercept POST requests to the
/rest/v1/crawler_permissions path (same regex used for GET) and return a 201
response with the expected crawler permission JSON (same shape as the GET reply)
so createCrawlerPermission receives the inserted record; reference
mockCrawlerPermission and createCrawlerPermission when making the change.
In `@workers/crawler-code-runner-worker/sources/index.ts`:
- Around line 139-145: The current size check only uses
fetchResponse.headers.get('Content-Length') and skips when absent, letting
chunked or headerless responses be buffered; update the logic in handleRun to
read fetchResponse.body as a stream (or use a reader on fetchResponse) and
accumulate bytes while enforcing MAX_RESPONSE_BYTES, aborting and returning
errorResponse('response_too_large', ...) if the accumulated size exceeds
MAX_RESPONSE_BYTES; also keep the existing logger.warn call (referencing
logger.warn, fetchResponse, MAX_RESPONSE_BYTES, responseText, and errorResponse)
and as a fallback, after calling fetchResponse.text() ensure responseText.length
(or byte length) is checked against MAX_RESPONSE_BYTES before proceeding.
In `@workers/crawler-manager-worker/sources/code-runner-client.ts`:
- Around line 102-106: The backoff wait in the retry loop uses delay() which
doesn't accept an AbortSignal, so waits won't be cancellable; update the retry
logic in code-runner-client.ts to pass an AbortSignal into an abort-aware sleep
(either extend delay(signal) to accept a signal or call an existing
abortableSleep) and short-circuit when signal.aborted; specifically, change the
loop around MAX_RETRY_ATTEMPTS / INITIAL_BACKOFF_MILLISECONDS to await
delay(backoffMilliseconds, signal) (or call abortableSleep(backoffMilliseconds,
signal)) and handle the case where the signal triggers (throw an AbortError or
return) so the backoff stops immediately when the provided AbortSignal is
triggered.
- Around line 168-177: The final post-loop throw block that constructs a new
CodeRunnerExecutionError using lastError is unreachable because every path
inside the retry for-loop already returns or throws; remove the unreachable
lines (the throw that creates a new CodeRunnerExecutionError) and either rely on
the existing throws/returns inside the retry loop or, if you intended a
fallback, change the retry logic so it only throws inside the loop on the final
attempt (use the existing lastError and CodeRunnerExecutionError symbols to
rethrow appropriately). Ensure the retry loop consistently returns a result or
throws a CodeRunnerExecutionError so no post-loop fallback is needed.
In `@workers/scheduler-manager-worker/sources/handlers/scheduler-execution.ts`:
- Around line 18-27: The current resolveHTTPStatus function relies on substring
matches in the human-readable error message, which is brittle; change the API to
use a typed error code instead: update resolveHTTPStatus(status: string,
errorCode?: string | number) to decide codes from a stable errorCode enum or
well-known strings (e.g., TIMEOUT, INVALID_INPUT_SCHEMA, CODE_RUNNER_ERROR,
DATABASE_ERROR) and only fall back to mapping from message when errorCode is
absent; ensure the producer that writes scheduler_run stores a stable error_code
field and that callers populate that field so resolveHTTPStatus (and any
callers) use error_code instead of inspecting error text.
In `@workers/scheduler-manager-worker/sources/handlers/scheduler-stages.ts`:
- Line 224: The long if condition checking body.crawler_id, body.input_schema,
body.output_schema, body.fan_out_field, and body.fan_out_strategy is hard to
read; refactor by extracting the check into a clearly named helper (e.g.,
isAllSchemasAndCrawlerUndefined) or by grouping the keys into an array and using
a single predicates check (e.g., keys.every(k => body[k] === undefined)) and
replace the inline condition in the if inside scheduler-stages.ts with a call to
that helper to improve readability and maintainability.
In `@workers/scheduler-manager-worker/sources/scheduler-executor.ts`:
- Around line 23-29: executeScheduler currently accepts userUUID but does not
pass it to the crawler RPC or re-check ownership, so revoked permissions still
allow runs; update executeScheduler to forward the caller identity to the
crawler manager by including userUUID (or a credentials token) when invoking the
RPC method executeCrawler (referenced from stage-runner.ts calls at the crawler
invocation sites) and ensure the crawler service’s getCrawlerByID or
executeCrawler validates ownership/permissions for that user before running the
crawler; also update the RPC signature and any callers (stage-runner.ts and the
crawler manager) so the identity flows to the authorization check and aborts
execution if the user no longer owns or is allowed to run the crawler.
In `@workers/scheduler-manager-worker/sources/stage-runner.ts`:
- Around line 117-159: The worker loop exits on signal?.aborted with a bare
break, leaving unprocessed slots as FAN_OUT_FAILED but not counted toward
itemsFailed, so status can be computed incorrectly; update worker() (and the
post-processing) so that when signal?.aborted is observed you mark the current
and all remaining unprocessed indices as failed and increment itemsFailed
accordingly (use nextIndex, items.length, results and FAN_OUT_FAILED to set
those slots), or alternatively after Promise.all compute itemsFailed =
results.filter(r => r === FAN_OUT_FAILED).length if abort occurred; ensure
itemsSucceeded remains accurate and status calculation uses the corrected
itemsFailed/itemsSucceeded values.
In `@workers/scheduler-manager-worker/tests/scheduler-executor.test.ts`:
- Around line 531-533: The test extracts unused variables from
createDependencies; remove unused destructured identifiers (e.g.,
crawlerExecutionClient) or prefix them with an underscore to silence the linter
in the failing tests around the createDependencies call (the one that currently
does const { dependencies, crawlerExecutionClient, logger } =
createDependencies([...]) and the similar call at the other location). Update
the destructuring to only include used symbols (e.g., const { dependencies,
logger } = createDependencies(...)) or rename to const { dependencies,
crawlerExecutionClient: _crawlerExecutionClient, logger } =
createDependencies(...) in both places referenced.
- Around line 129-140: The mock factory createMockCrawlerExecutionClient always
returns { type: 'data', result } from its execute implementation, so the 'web'
execution path isn't covered; update createMockCrawlerExecutionClient to accept
optional per-call response types (e.g., allow results array entries to be {
type: 'data'|'web', result: unknown } or accept a parallel types array/tuple)
and have execute return the provided type for each call (or default to 'data')
so tests can simulate 'web' responses; modify the function signature and the
execute mock implementation (referencing createMockCrawlerExecutionClient and
execute) to read and return the specified type for each call.
In `@workers/scheduler-manager-worker/tests/stage-runner.test.ts`:
- Around line 310-326: Add a regression test that runs executeFanOut with
fan_out_strategy: 'preserve' (use createDependencies()/mockStage() and the same
crawlerExecutionClient.execute mock sequence) and assert that on partial failure
the overall status is 'partially_failed', itemsSucceeded/itemsFailed/itemsTotal
are 2/1/3, and importantly the results array preserves positional alignment
(e.g., ['ok', null, 'ok']) so the failed slot is null rather than compacted; add
the same mirrored test for the other spot mentioned (the block around lines
345-359).
---
Duplicate comments:
In `@workers/scheduler-manager-worker/sources/stage-runner.ts`:
- Around line 54-76: The stage run can be marked "completed" after a timeout
because the abort signal isn't propagated and not re-checked after awaits;
update createSchedulerStageRun / crawlerExecutionClient.execute /
updateSchedulerStageRun usage so that crawlerExecutionClient.execute is called
with the abort signal (make the RPC cancelable) and then check signal?.aborted
immediately after each await (especially after the await on
crawlerExecutionClient.execute and before calling updateSchedulerStageRun) and
if aborted throw or return without writing a completed status, ensuring
updateSchedulerStageRun is only called when not aborted.
In `@workers/scheduler-manager-worker/vitest.config.ts`:
- Around line 16-21: The Miniflare stub currently returns only fetch() with {
success, result } which does not match the RPC contract; update the stubbed
script (the object exported in the workers' entry with async fetch()) to
implement the same RPC shape that CRAWLER_MANAGER.executeCrawler expects and
that validateCrawlerExecuteResult/CrawlerExecuteResult validate: call/handle the
executeCrawler RPC method and return an object including the required "type"
field and the full result schema (not just null) along with success; ensure the
stubbed response shape exactly matches the CrawlerExecuteResult interface and
any fields used by validateCrawlerExecuteResult so the test-side validation will
pass.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: 0ed848d6-d3f4-4659-8a5b-8a5a8968b5eb
📒 Files selected for processing (22)
packages/supabase-connector/migrations/006_add_active_run_unique_constraint.sqlpackages/supabase-connector/migrations/007_create_crawler_permissions.sqlpackages/supabase-connector/migrations/008_add_fan_out_strategy.sqlpackages/supabase-connector/sources/crawler-permissions.tspackages/supabase-connector/sources/index.tspackages/supabase-connector/sources/scheduler-runs.tspackages/supabase-connector/sources/types/database.tspackages/supabase-connector/sources/types/index.tsworkers/crawler-code-runner-worker/sources/index.tsworkers/crawler-manager-worker/sources/code-runner-client.tsworkers/crawler-manager-worker/sources/crawler-executor.tsworkers/crawler-manager-worker/sources/index.tsworkers/scheduler-manager-worker/sources/handlers/scheduler-execution.tsworkers/scheduler-manager-worker/sources/handlers/scheduler-stages.tsworkers/scheduler-manager-worker/sources/scheduler-executor.tsworkers/scheduler-manager-worker/sources/stage-runner.tsworkers/scheduler-manager-worker/tests/index.test.tsworkers/scheduler-manager-worker/tests/scheduler-execution.test.tsworkers/scheduler-manager-worker/tests/scheduler-executor.test.tsworkers/scheduler-manager-worker/tests/stage-runner.test.tsworkers/scheduler-manager-worker/vitest.config.tsworkers/tools/sources/types.ts
| const { data, error } = await client | ||
| .from('crawler_permissions') | ||
| .insert(parameters) | ||
| .select() | ||
| .single(); |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# 테스트 파일에서 crawler_permissions 목 설정 여부 확인
rg -n 'crawler_permissions' workers/scheduler-manager-worker/tests/ workers/crawler-manager-worker/tests/ --type=ts -C3Repository: gignac-cha/audio-underview
Length of output: 743
🏁 Script executed:
# POST 메서드의 crawler_permissions 목 설정 확인
rg -n "post|POST|insert" workers/scheduler-manager-worker/tests/index.test.ts -C 3 | grep -A 5 -B 5 "crawler_permissions"Repository: gignac-cha/audio-underview
Length of output: 52
🏁 Script executed:
# mockCrawlerPermission 함수가 테스트에서 호출되는지 확인
rg -n "mockCrawlerPermission\(\)" workers/scheduler-manager-worker/tests/index.test.tsRepository: gignac-cha/audio-underview
Length of output: 142
🏁 Script executed:
# 테스트 파일 전체 구조 파악 (특히 INSERT 관련 부분)
rg -n "insert|createCrawlerPermission" workers/scheduler-manager-worker/tests/index.test.ts --type=tsRepository: gignac-cha/audio-underview
Length of output: 52
🏁 Script executed:
# mockCrawlerPermission 함수 내용 확인 (93-120줄)
sed -n '93,120p' workers/scheduler-manager-worker/tests/index.test.tsRepository: gignac-cha/audio-underview
Length of output: 903
🏁 Script executed:
# 420줄 주변 컨텍스트 확인
sed -n '410,430p' workers/scheduler-manager-worker/tests/index.test.tsRepository: gignac-cha/audio-underview
Length of output: 809
🏁 Script executed:
# 전체 테스트 파일에서 POST/insert 메서드 목 확인
rg -n "\.post|\.insert|method.*POST|method.*post" workers/scheduler-manager-worker/tests/index.test.ts --type=tsRepository: gignac-cha/audio-underview
Length of output: 594
🏁 Script executed:
# packages/supabase-connector 디렉토리의 테스트 파일 찾기
find . -path "*/packages/supabase-connector*" -name "*.test.ts" -o -name "*.test.tsx" -o -name "*.spec.ts" 2>/dev/nullRepository: gignac-cha/audio-underview
Length of output: 1337
🏁 Script executed:
# createCrawlerPermission 함수 검색
rg -n "createCrawlerPermission" . --type=tsRepository: gignac-cha/audio-underview
Length of output: 441
🏁 Script executed:
# crawler-permissions.ts 파일 내용 확인
cat -n packages/supabase-connector/sources/crawler-permissions.tsRepository: gignac-cha/audio-underview
Length of output: 3852
mockCrawlerPermission() 목에서 POST 메서드가 누락되었습니다.
createCrawlerPermission 함수는 .insert(parameters) 작업으로 POST /rest/v1/crawler_permissions 요청을 실행하지만, 테스트의 mockCrawlerPermission() 함수는 GET 메서드만 모의합니다. POST 메서드를 추가해야 합니다:
필요한 목 추가
function mockCrawlerPermission() {
fetchMock
.get('https://supabase.example.com')
.intercept({ path: /^\/rest\/v1\/crawler_permissions/, method: 'GET' })
.reply(200, JSON.stringify({
id: '00000000-0000-0000-0000-000000000099',
crawler_id: MOCK_CRAWLER_ID,
user_uuid: MOCK_USER_UUID,
level: 'owner',
created_at: '2026-01-01T00:00:00Z',
}));
// 추가: POST 메서드 목
fetchMock
.post('https://supabase.example.com')
.intercept({ path: /^\/rest\/v1\/crawler_permissions/, method: 'POST' })
.reply(201, JSON.stringify({
id: '00000000-0000-0000-0000-000000000099',
crawler_id: MOCK_CRAWLER_ID,
user_uuid: MOCK_USER_UUID,
level: 'owner',
created_at: '2026-01-01T00:00:00Z',
}));
}🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@packages/supabase-connector/sources/crawler-permissions.ts` around lines 26 -
30, The test helper mockCrawlerPermission is only stubbing GET calls, but
createCrawlerPermission uses client.from('crawler_permissions').insert(...)
which issues a POST; add a POST mock inside mockCrawlerPermission to intercept
POST requests to the /rest/v1/crawler_permissions path (same regex used for GET)
and return a 201 response with the expected crawler permission JSON (same shape
as the GET reply) so createCrawlerPermission receives the inserted record;
reference mockCrawlerPermission and createCrawlerPermission when making the
change.
| export async function deleteCrawlerPermission( | ||
| client: SupabaseClientType, | ||
| crawlerID: string, | ||
| userUUID: string, | ||
| ): Promise<void> { | ||
| return traceDatabaseOperation( | ||
| { serviceName: 'supabase-connector', operation: 'delete', table: 'crawler_permissions' }, | ||
| async (span) => { | ||
| span.setAttribute('db.query.crawler_id', crawlerID); | ||
| span.setAttribute('db.query.user_uuid', userUUID); | ||
|
|
||
| const { error } = await client | ||
| .from('crawler_permissions') | ||
| .delete() | ||
| .eq('crawler_id', crawlerID) | ||
| .eq('user_uuid', userUUID); | ||
|
|
||
| if (error) { | ||
| span.setStatus({ code: SpanStatusCode.ERROR, message: error.message }); | ||
| throw new Error(`Failed to delete crawler permission: ${error.message}`); | ||
| } | ||
|
|
||
| span.setAttribute('db.rows_affected', 1); | ||
| }, | ||
| ); | ||
| } |
There was a problem hiding this comment.
deleteCrawlerPermission에서 db.rows_affected 속성이 실제 삭제 여부와 무관하게 1로 설정됩니다.
Supabase의 delete() 메서드는 삭제된 행 수를 반환하지 않으므로, 실제로 행이 삭제되었는지 확인할 수 없습니다. 현재 구현은 항상 1을 설정하여 트레이싱 정보가 부정확할 수 있습니다.
🔧 삭제 확인 로직 추가 제안
export async function deleteCrawlerPermission(
client: SupabaseClientType,
crawlerID: string,
userUUID: string,
-): Promise<void> {
+): Promise<boolean> {
return traceDatabaseOperation(
{ serviceName: 'supabase-connector', operation: 'delete', table: 'crawler_permissions' },
async (span) => {
span.setAttribute('db.query.crawler_id', crawlerID);
span.setAttribute('db.query.user_uuid', userUUID);
- const { error } = await client
+ const { data, error, count } = await client
.from('crawler_permissions')
.delete()
.eq('crawler_id', crawlerID)
- .eq('user_uuid', userUUID);
+ .eq('user_uuid', userUUID)
+ .select()
+ .maybeSingle();
if (error) {
span.setStatus({ code: SpanStatusCode.ERROR, message: error.message });
throw new Error(`Failed to delete crawler permission: ${error.message}`);
}
- span.setAttribute('db.rows_affected', 1);
+ const deleted = data !== null;
+ span.setAttribute('db.rows_affected', deleted ? 1 : 0);
+ return deleted;
},
);
}🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@packages/supabase-connector/sources/crawler-permissions.ts` around lines 72 -
97, The function deleteCrawlerPermission currently unconditionally sets span
attribute db.rows_affected to 1; change the delete call to request the deleted
rows (e.g., use .delete().select(...) or the appropriate Supabase return option)
and use the returned data length to set db.rows_affected (0 if no rows deleted)
and only mark error/throw on actual error; update references in the trace
callback (span, client, crawlerID, userUUID) so
span.setAttribute('db.rows_affected', data?.length ?? 0) instead of the
hardcoded 1.
| const contentLength = fetchResponse.headers.get('Content-Length'); | ||
| if (contentLength !== null && Number(contentLength) > MAX_RESPONSE_BYTES) { | ||
| logger.warn('Response too large', { url: targetURL.toString(), contentLength }, { function: 'handleRun' }); | ||
| return errorResponse('response_too_large', `Response exceeds maximum size of ${MAX_RESPONSE_BYTES} bytes`, 413, context); | ||
| } | ||
|
|
||
| responseText = await fetchResponse.text(); |
There was a problem hiding this comment.
Content-Length 헤더가 없는 응답은 크기 제한을 우회할 수 있습니다.
Content-Length 헤더가 있을 때만 크기 검사를 수행하므로, chunked encoding 등 헤더가 없는 응답은 무제한으로 버퍼링될 수 있습니다. 스트리밍 방식으로 읽으면서 누적 바이트를 검사하거나, text() 호출 후 길이를 확인하는 방어 로직을 추가하세요.
🛡️ 제안된 수정
responseText = await fetchResponse.text();
+
+ if (responseText.length > MAX_RESPONSE_BYTES) {
+ logger.warn('Response too large after fetch', { contentLength: responseText.length }, { function: 'handleRun' });
+ return errorResponse('response_too_large', `Response exceeds maximum size of ${MAX_RESPONSE_BYTES} bytes`, 413, context);
+ }
+
logger.info('Target URL fetched', {🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@workers/crawler-code-runner-worker/sources/index.ts` around lines 139 - 145,
The current size check only uses fetchResponse.headers.get('Content-Length') and
skips when absent, letting chunked or headerless responses be buffered; update
the logic in handleRun to read fetchResponse.body as a stream (or use a reader
on fetchResponse) and accumulate bytes while enforcing MAX_RESPONSE_BYTES,
aborting and returning errorResponse('response_too_large', ...) if the
accumulated size exceeds MAX_RESPONSE_BYTES; also keep the existing logger.warn
call (referencing logger.warn, fetchResponse, MAX_RESPONSE_BYTES, responseText,
and errorResponse) and as a fallback, after calling fetchResponse.text() ensure
responseText.length (or byte length) is checked against MAX_RESPONSE_BYTES
before proceeding.
| for (let attempt = 0; attempt <= MAX_RETRY_ATTEMPTS; attempt++) { | ||
| if (attempt > 0) { | ||
| const backoffMilliseconds = INITIAL_BACKOFF_MILLISECONDS * Math.pow(2, attempt - 1); | ||
| await delay(backoffMilliseconds); | ||
| } |
There was a problem hiding this comment.
🧹 Nitpick | 🔵 Trivial
재시도 간 backoff 지연에 abort signal이 전파되지 않습니다.
delay() 함수가 AbortSignal을 받지 않으므로, 요청이 취소되어도 backoff 대기 중에는 즉시 중단되지 않습니다. PR 목표에서 abort signal 전파를 강조했으므로, signal을 전달받아 대기 중 취소를 지원하는 것을 고려하세요.
♻️ 제안된 수정
-async function delay(milliseconds: number): Promise<void> {
- return new Promise((resolve) => setTimeout(resolve, milliseconds));
+async function delay(milliseconds: number, signal?: AbortSignal): Promise<void> {
+ return new Promise((resolve, reject) => {
+ if (signal?.aborted) {
+ reject(signal.reason);
+ return;
+ }
+ const timeoutID = setTimeout(resolve, milliseconds);
+ signal?.addEventListener('abort', () => {
+ clearTimeout(timeoutID);
+ reject(signal.reason);
+ }, { once: true });
+ });
}🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@workers/crawler-manager-worker/sources/code-runner-client.ts` around lines
102 - 106, The backoff wait in the retry loop uses delay() which doesn't accept
an AbortSignal, so waits won't be cancellable; update the retry logic in
code-runner-client.ts to pass an AbortSignal into an abort-aware sleep (either
extend delay(signal) to accept a signal or call an existing abortableSleep) and
short-circuit when signal.aborted; specifically, change the loop around
MAX_RETRY_ATTEMPTS / INITIAL_BACKOFF_MILLISECONDS to await
delay(backoffMilliseconds, signal) (or call abortableSleep(backoffMilliseconds,
signal)) and handle the case where the signal triggers (throw an AbortError or
return) so the backoff stops immediately when the provided AbortSignal is
triggered.
| // Exhausted all retry attempts — throw the last captured error | ||
| if (lastError instanceof CodeRunnerExecutionError) { | ||
| throw lastError; | ||
| } | ||
| throw new CodeRunnerExecutionError( | ||
| 'network_error', | ||
| lastError instanceof Error ? lastError.message : 'Unknown error after retries', | ||
| 0, | ||
| ); | ||
| } |
There was a problem hiding this comment.
🧹 Nitpick | 🔵 Trivial
도달 불가능한 코드입니다.
for 루프 내에서 모든 경로가 return 또는 throw로 종료됩니다:
- 성공 시
return result(Line 131) - 네트워크 에러 시 마지막 시도에서
throw(Line 121-125) - 5xx 에러 시 마지막 시도에서
throw lastError(Line 150) - 4xx 에러 시 즉시
throw(Line 165)
따라서 Lines 168-177은 실행될 수 없습니다.
♻️ 제안된 수정
throw new CodeRunnerExecutionError(errorCode, errorDescription, response.status);
}
-
- // Exhausted all retry attempts — throw the last captured error
- if (lastError instanceof CodeRunnerExecutionError) {
- throw lastError;
- }
- throw new CodeRunnerExecutionError(
- 'network_error',
- lastError instanceof Error ? lastError.message : 'Unknown error after retries',
- 0,
- );
+
+ // TypeScript: unreachable, but satisfies return type
+ throw new CodeRunnerExecutionError('network_error', 'Unexpected retry loop exit', 0);
}
}🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@workers/crawler-manager-worker/sources/code-runner-client.ts` around lines
168 - 177, The final post-loop throw block that constructs a new
CodeRunnerExecutionError using lastError is unreachable because every path
inside the retry for-loop already returns or throws; remove the unreachable
lines (the throw that creates a new CodeRunnerExecutionError) and either rely on
the existing throws/returns inside the retry loop or, if you intended a
fallback, change the retry logic so it only throws inside the loop on the final
attempt (use the existing lastError and CodeRunnerExecutionError symbols to
rethrow appropriately). Ensure the retry loop consistently returns a result or
throws a CodeRunnerExecutionError so no post-loop fallback is needed.
| export async function executeScheduler( | ||
| dependencies: ExecutorDependencies, | ||
| schedulerID: string, | ||
| userUUID: string, | ||
| runID: string, | ||
| signal?: AbortSignal, | ||
| ): Promise<void> { |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
set -euo pipefail
echo "== execution call sites =="
rg -nC2 'crawlerExecutionClient\.execute\(|executeCrawler\(' \
workers/scheduler-manager-worker \
workers/crawler-manager-worker
echo
echo "== identity threaded into execution =="
rg -nC2 'userUUID|user_uuid' \
workers/scheduler-manager-worker/sources/scheduler-executor.ts \
workers/scheduler-manager-worker/sources/stage-runner.ts \
workers/scheduler-manager-worker/sources/crawler-execution-client.ts \
workers/crawler-manager-worker
echo
echo "== permission checks around runtime execution =="
rg -nC3 'crawler_permissions|permission|getCrawlerByID' \
workers/scheduler-manager-worker \
workers/crawler-manager-worker \
packages/supabase-connectorRepository: gignac-cha/audio-underview
Length of output: 35380
실행 시점에 권한을 재검증하지 않아 권한 철회 후에도 크롤러 호출이 가능합니다.
executeScheduler는 userUUID를 받지만, 실제 크롤러 실행 호출(stage-runner.ts:68, 123)에서 crawler_id와 input만 전달합니다. 크롤러 관리자의 RPC 메서드(executeCrawler)는 사용자 신원을 전혀 받지 않으며, getCrawlerByID로 크롤러를 조회할 때도 소유권 확인이 없습니다.
권한 검증은 scheduler-manager-worker/handlers/scheduler-stages.ts:72-74에서 스테이지 생성/수정 시점에만 이루어지므로, 이후 권한이 철회되어도 이미 저장된 scheduler_stage.crawler_id는 계속 실행됩니다. 호출자 신원을 RPC 경계까지 전달하여 크롤러 실행 직전에 재검증해야 합니다.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@workers/scheduler-manager-worker/sources/scheduler-executor.ts` around lines
23 - 29, executeScheduler currently accepts userUUID but does not pass it to the
crawler RPC or re-check ownership, so revoked permissions still allow runs;
update executeScheduler to forward the caller identity to the crawler manager by
including userUUID (or a credentials token) when invoking the RPC method
executeCrawler (referenced from stage-runner.ts calls at the crawler invocation
sites) and ensure the crawler service’s getCrawlerByID or executeCrawler
validates ownership/permissions for that user before running the crawler; also
update the RPC signature and any callers (stage-runner.ts and the crawler
manager) so the identity flows to the authorization check and aborts execution
if the user no longer owns or is allowed to run the crawler.
| async function worker(): Promise<void> { | ||
| while (nextIndex < items.length) { | ||
| if (signal?.aborted) break; | ||
| const index = nextIndex++; | ||
| const item = items[index]; | ||
| try { | ||
| const response = await crawlerExecutionClient.execute(stage.crawler_id, item); | ||
| results[index] = response.result; | ||
| itemsSucceeded++; | ||
| } catch (error: unknown) { | ||
| logger.warn('Fan-out item failed', error, { | ||
| function: 'executeFanOut', | ||
| metadata: { stageID: stage.id, itemIndex: index }, | ||
| }); | ||
| itemsFailed++; | ||
| } | ||
| } | ||
| } | ||
|
|
||
| const workers = Array.from({ length: Math.min(concurrency, items.length) }, () => worker()); | ||
| await Promise.all(workers); | ||
|
|
||
| let status: 'completed' | 'partially_failed' | 'failed'; | ||
| if (itemsFailed === 0) { | ||
| status = 'completed'; | ||
| } else if (itemsSucceeded > 0) { | ||
| status = 'partially_failed'; | ||
| } else { | ||
| status = 'failed'; | ||
| } | ||
|
|
||
| const strategy = stage.fan_out_strategy ?? 'compact'; | ||
| const finalResults = strategy === 'preserve' | ||
| ? results.map((result) => result === FAN_OUT_FAILED ? null : result) | ||
| : results.filter((result) => result !== FAN_OUT_FAILED); | ||
|
|
||
| return { | ||
| results: finalResults, | ||
| itemsTotal: items.length, | ||
| itemsSucceeded, | ||
| itemsFailed, | ||
| status, | ||
| }; |
There was a problem hiding this comment.
중단된 fan-out을 정상 집계하고 있습니다.
signal.aborted가 되면 worker는 그냥 break만 하고, 남은 위치는 FAN_OUT_FAILED로 남아도 itemsFailed에는 반영되지 않습니다. 예를 들어 10개 중 1개만 처리한 뒤 abort되면 itemsTotal=10, itemsSucceeded=1, itemsFailed=0, status='completed'가 되어 stage run이 정상 완료처럼 저장됩니다. abort는 상태 집계 전에 별도 실패/중단 경로로 빠져야 합니다.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@workers/scheduler-manager-worker/sources/stage-runner.ts` around lines 117 -
159, The worker loop exits on signal?.aborted with a bare break, leaving
unprocessed slots as FAN_OUT_FAILED but not counted toward itemsFailed, so
status can be computed incorrectly; update worker() (and the post-processing) so
that when signal?.aborted is observed you mark the current and all remaining
unprocessed indices as failed and increment itemsFailed accordingly (use
nextIndex, items.length, results and FAN_OUT_FAILED to set those slots), or
alternatively after Promise.all compute itemsFailed = results.filter(r => r ===
FAN_OUT_FAILED).length if abort occurred; ensure itemsSucceeded remains accurate
and status calculation uses the corrected itemsFailed/itemsSucceeded values.
| function createMockCrawlerExecutionClient( | ||
| results: unknown[] = [], | ||
| ): CrawlerExecutionClient & { execute: ReturnType<typeof vi.fn> } { | ||
| let callIndex = 0; | ||
| return { | ||
| execute: vi.fn().mockImplementation(async () => { | ||
| const result = results[callIndex] ?? { extracted: 'data' }; | ||
| callIndex++; | ||
| return { type: 'data' as const, result }; | ||
| }), | ||
| }; | ||
| } |
There was a problem hiding this comment.
🧹 Nitpick | 🔵 Trivial
목(mock) 클라이언트가 항상 'data' 타입만 반환합니다.
createMockCrawlerExecutionClient가 항상 { type: 'data', result } 형태를 반환하여 'web' 타입 크롤러 실행 경로가 테스트되지 않습니다. 현재 테스트 범위에서는 문제가 없지만, 향후 'web' 타입 관련 테스트 추가 시 고려가 필요합니다.
♻️ 타입 지정 가능한 목 팩토리 제안
function createMockCrawlerExecutionClient(
- results: unknown[] = [],
+ results: Array<{ type?: 'web' | 'data'; result: unknown }> = [],
): CrawlerExecutionClient & { execute: ReturnType<typeof vi.fn> } {
let callIndex = 0;
return {
execute: vi.fn().mockImplementation(async () => {
- const result = results[callIndex] ?? { extracted: 'data' };
+ const item = results[callIndex] ?? { result: { extracted: 'data' } };
callIndex++;
- return { type: 'data' as const, result };
+ return { type: item.type ?? 'data', result: item.result };
}),
};
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| function createMockCrawlerExecutionClient( | |
| results: unknown[] = [], | |
| ): CrawlerExecutionClient & { execute: ReturnType<typeof vi.fn> } { | |
| let callIndex = 0; | |
| return { | |
| execute: vi.fn().mockImplementation(async () => { | |
| const result = results[callIndex] ?? { extracted: 'data' }; | |
| callIndex++; | |
| return { type: 'data' as const, result }; | |
| }), | |
| }; | |
| } | |
| function createMockCrawlerExecutionClient( | |
| results: Array<{ type?: 'web' | 'data'; result: unknown }> = [], | |
| ): CrawlerExecutionClient & { execute: ReturnType<typeof vi.fn> } { | |
| let callIndex = 0; | |
| return { | |
| execute: vi.fn().mockImplementation(async () => { | |
| const item = results[callIndex] ?? { result: { extracted: 'data' } }; | |
| callIndex++; | |
| return { type: item.type ?? 'data', result: item.result }; | |
| }), | |
| }; | |
| } |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@workers/scheduler-manager-worker/tests/scheduler-executor.test.ts` around
lines 129 - 140, The mock factory createMockCrawlerExecutionClient always
returns { type: 'data', result } from its execute implementation, so the 'web'
execution path isn't covered; update createMockCrawlerExecutionClient to accept
optional per-call response types (e.g., allow results array entries to be {
type: 'data'|'web', result: unknown } or accept a parallel types array/tuple)
and have execute return the provided type for each call (or default to 'data')
so tests can simulate 'web' responses; modify the function signature and the
execute mock implementation (referencing createMockCrawlerExecutionClient and
execute) to read and return the specified type for each call.
| const { dependencies, crawlerExecutionClient, logger } = createDependencies([ | ||
| { data: 'no-links-field' }, | ||
| ]); |
There was a problem hiding this comment.
🧹 Nitpick | 🔵 Trivial
사용되지 않는 구조 분해 변수가 있습니다.
crawlerExecutionClient가 구조 분해로 추출되지만 테스트에서 사용되지 않습니다. 린터 경고를 방지하려면 제거하거나 밑줄 프리픽스를 추가하세요.
♻️ 사용되지 않는 변수 제거
it('throws error when fan_out_field references a non-existent field', async () => {
- const { dependencies, crawlerExecutionClient, logger } = createDependencies([
+ const { dependencies, logger } = createDependencies([
{ data: 'no-links-field' },
]); it('throws error when fan_out_field references a non-array value', async () => {
- const { dependencies, crawlerExecutionClient, logger } = createDependencies([
+ const { dependencies, logger } = createDependencies([
{ links: 'not-an-array' },
]);Also applies to: 568-570
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@workers/scheduler-manager-worker/tests/scheduler-executor.test.ts` around
lines 531 - 533, The test extracts unused variables from createDependencies;
remove unused destructured identifiers (e.g., crawlerExecutionClient) or prefix
them with an underscore to silence the linter in the failing tests around the
createDependencies call (the one that currently does const { dependencies,
crawlerExecutionClient, logger } = createDependencies([...]) and the similar
call at the other location). Update the destructuring to only include used
symbols (e.g., const { dependencies, logger } = createDependencies(...)) or
rename to const { dependencies, crawlerExecutionClient: _crawlerExecutionClient,
logger } = createDependencies(...) in both places referenced.
| it('returns partially_failed when some items fail', async () => { | ||
| const { dependencies, crawlerExecutionClient } = createDependencies(); | ||
| const stage = mockStage(); | ||
|
|
||
| crawlerExecutionClient.execute | ||
| .mockResolvedValueOnce({ type: 'data', result: 'ok' }) | ||
| .mockRejectedValueOnce(new Error('item failed')) | ||
| .mockResolvedValueOnce({ type: 'data', result: 'ok' }); | ||
|
|
||
| const result = await executeFanOut(dependencies, stage, ['a', 'b', 'c']); | ||
|
|
||
| expect(result.status).toBe('partially_failed'); | ||
| expect(result.itemsSucceeded).toBe(2); | ||
| expect(result.itemsFailed).toBe(1); | ||
| expect(result.itemsTotal).toBe(3); | ||
| expect(result.results).toEqual(['ok', 'ok']); | ||
| }); |
There was a problem hiding this comment.
🧹 Nitpick | 🔵 Trivial
preserve 전략의 실패 슬롯 유지 회귀 테스트도 추가해 주세요.
현재 부분 실패는 기본 compact 결과만 확인하고, fan_out_strategy: 'preserve'에서 실패 index가 null로 남는 핵심 계약은 검증하지 않습니다. 이번 positional alignment 수정의 목적이 바로 그 케이스라서, 이 테스트가 없으면 쉽게 회귀합니다.
Also applies to: 345-359
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@workers/scheduler-manager-worker/tests/stage-runner.test.ts` around lines 310
- 326, Add a regression test that runs executeFanOut with fan_out_strategy:
'preserve' (use createDependencies()/mockStage() and the same
crawlerExecutionClient.execute mock sequence) and assert that on partial failure
the overall status is 'partially_failed', itemsSucceeded/itemsFailed/itemsTotal
are 2/1/3, and importantly the results array preserves positional alignment
(e.g., ['ok', null, 'ok']) so the failed slot is null rather than compacted; add
the same mirrored test for the other spot mentioned (the block around lines
345-359).
Summary
executeCrawler()RPC methodLinear Issue
Closes TES-23
Changes
scheduler-executor.ts,stage-runner.ts,crawler-execution-client.ts,handlers/scheduler-execution.tsexecuteCrawler()RPC,code-runner-client.ts,crawler-executor.ts,safe-url-pattern.tsCrawlerExecuteResulttype +validateCrawlerExecuteResult()type: web|data,mode: test|rundiscriminated union supportscheduler-stage-runsCRUD,getCrawlerByID, type updatesReview Fixes Applied
Test plan
pnpm --filter scheduler-manager-worker test— 85 tests passpnpm --filter @audio-underview/crawler-manager-worker test— 63 tests passpnpm typecheck— all 29 projects pass🤖 Generated with Claude Code
Summary by CodeRabbit
릴리스 노트
새로운 기능
버그 수정
안정성 개선