diff --git a/backend/btrixcloud/crawlconfigs.py b/backend/btrixcloud/crawlconfigs.py index 28cbc6a535..d2817442ee 100644 --- a/backend/btrixcloud/crawlconfigs.py +++ b/backend/btrixcloud/crawlconfigs.py @@ -11,7 +11,7 @@ import re import os import traceback -from datetime import datetime +from datetime import datetime, timedelta from uuid import UUID, uuid4 import urllib.parse @@ -95,6 +95,8 @@ class CrawlConfigOps: crawler_images_map: dict[str, str] crawler_image_pull_policy_map: dict[str, str] + paused_expiry_delta: timedelta + def __init__( self, dbclient, @@ -121,6 +123,10 @@ def __init__( "DEFAULT_CRAWLER_IMAGE_PULL_POLICY", "IfNotPresent" ) + self.paused_expiry_delta = timedelta( + minutes=int(os.environ.get("PAUSED_CRAWL_LIMIT_MINUTES", "10080")) + ) + self.router = APIRouter( prefix="/crawlconfigs", tags=["crawlconfigs"], @@ -765,6 +771,14 @@ async def _add_running_curr_crawl_stats(self, crawlconfig: CrawlConfigOut): crawlconfig.lastCrawlState = crawl.state crawlconfig.lastCrawlSize = crawl.stats.size if crawl.stats else 0 crawlconfig.lastCrawlStopping = crawl.stopping + crawlconfig.lastCrawlPausing = crawl.pausing + crawlconfig.lastCrawlPausedAt = crawl.pausedAt + crawlconfig.lastCrawlPausedExpiry = None + crawlconfig.lastCrawlExecSeconds = crawl.crawlExecSeconds + if crawl.pausedAt: + crawlconfig.lastCrawlPausedExpiry = ( + crawl.pausedAt + self.paused_expiry_delta + ) crawlconfig.isCrawlRunning = True async def get_crawl_config_out(self, cid: UUID, org: Organization): diff --git a/backend/btrixcloud/crawlmanager.py b/backend/btrixcloud/crawlmanager.py index bbadb2424c..88999b28f7 100644 --- a/backend/btrixcloud/crawlmanager.py +++ b/backend/btrixcloud/crawlmanager.py @@ -4,7 +4,7 @@ import secrets from typing import Optional, Dict, Tuple -from datetime import timedelta +from datetime import datetime, timedelta from fastapi import HTTPException @@ -386,6 +386,14 @@ async def shutdown_crawl(self, crawl_id: str, graceful=True) -> dict: return await self.delete_crawl_job(crawl_id) + async def pause_resume_crawl( + self, crawl_id: str, paused_at: Optional[datetime] = None + ) -> dict: + """pause or resume a crawl""" + return await self._patch_job( + crawl_id, {"pausedAt": date_to_str(paused_at) if paused_at else ""} + ) + async def delete_crawl_configs_for_org(self, org: str) -> None: """Delete all crawl configs for given org""" await self._delete_crawl_configs(f"btrix.org={org}") diff --git a/backend/btrixcloud/crawls.py b/backend/btrixcloud/crawls.py index 3718a34bed..1d70744eeb 100644 --- a/backend/btrixcloud/crawls.py +++ b/backend/btrixcloud/crawls.py @@ -769,6 +769,39 @@ async def get_crawl_stats( return crawls_data + async def pause_crawl( + self, crawl_id: str, org: Organization, pause: bool + ) -> Dict[str, bool]: + """pause or resume a crawl temporarily""" + crawl = await self.get_base_crawl(crawl_id, org) + if crawl and crawl.type != "crawl": + raise HTTPException(status_code=400, detail="not_a_crawl") + + result = None + + if pause: + paused_at = dt_now() + else: + paused_at = None + + try: + result = await self.crawl_manager.pause_resume_crawl( + crawl_id, paused_at=paused_at + ) + + if result.get("success"): + await self.crawls.find_one_and_update( + {"_id": crawl_id, "type": "crawl", "oid": org.id}, + {"$set": {"pausing": pause, "pausedAt": paused_at}}, + ) + + return {"success": True} + # pylint: disable=bare-except + except: + pass + + raise HTTPException(status_code=404, detail="crawl_not_found") + async def shutdown_crawl( self, crawl_id: str, org: Organization, graceful: bool ) -> Dict[str, bool]: @@ -1242,6 +1275,22 @@ async def crawl_cancel_immediately( async def crawl_graceful_stop(crawl_id, org: Organization = Depends(org_crawl_dep)): return await ops.shutdown_crawl(crawl_id, org, graceful=True) + @app.post( + "/orgs/{oid}/crawls/{crawl_id}/pause", + tags=["crawls"], + response_model=SuccessResponse, + ) + async def pause_crawl(crawl_id, org: Organization = Depends(org_crawl_dep)): + return await ops.pause_crawl(crawl_id, org, pause=True) + + @app.post( + "/orgs/{oid}/crawls/{crawl_id}/resume", + tags=["crawls"], + response_model=SuccessResponse, + ) + async def resume_crawl(crawl_id, org: Organization = Depends(org_crawl_dep)): + return await ops.pause_crawl(crawl_id, org, pause=False) + @app.post( "/orgs/{oid}/crawls/delete", tags=["crawls"], diff --git a/backend/btrixcloud/main.py b/backend/btrixcloud/main.py index 71dd62da95..0958c6a31f 100644 --- a/backend/btrixcloud/main.py +++ b/backend/btrixcloud/main.py @@ -124,6 +124,8 @@ class SettingsResponse(BaseModel): localesEnabled: Optional[List[str]] + pausedExpiryMinutes: int + # ============================================================================ # pylint: disable=too-many-locals, duplicate-code @@ -158,6 +160,7 @@ def main() -> None: if os.environ.get("LOCALES_ENABLED") else None ), + pausedExpiryMinutes=int(os.environ.get("PAUSED_CRAWL_LIMIT_MINUTES", 10080)), ) invites = init_invites(mdb, email) diff --git a/backend/btrixcloud/models.py b/backend/btrixcloud/models.py index 27405b451f..cf250b08fa 100644 --- a/backend/btrixcloud/models.py +++ b/backend/btrixcloud/models.py @@ -222,7 +222,9 @@ class UserOrgInfoOut(BaseModel): ] RUNNING_STATES = get_args(TYPE_RUNNING_STATES) -TYPE_WAITING_STATES = Literal["starting", "waiting_capacity", "waiting_org_limit"] +TYPE_WAITING_STATES = Literal[ + "starting", "waiting_capacity", "waiting_org_limit", "paused" +] WAITING_STATES = get_args(TYPE_WAITING_STATES) TYPE_FAILED_STATES = Literal[ @@ -236,6 +238,7 @@ class UserOrgInfoOut(BaseModel): TYPE_SUCCESSFUL_STATES = Literal[ "complete", "stopped_by_user", + "stopped_pause_expired", "stopped_storage_quota_reached", "stopped_time_quota_reached", "stopped_org_readonly", @@ -478,6 +481,10 @@ class CrawlConfigOut(CrawlConfigCore, CrawlConfigAdditional): id: UUID lastCrawlStopping: Optional[bool] = False + lastCrawlPausing: Optional[bool] = False + lastCrawlPausedAt: Optional[datetime] = None + lastCrawlPausedExpiry: Optional[datetime] = None + lastCrawlExecSeconds: Optional[int] = None profileName: Optional[str] = None firstSeed: Optional[str] = None seedCount: int = 0 @@ -863,6 +870,8 @@ class CrawlOut(BaseMongoModel): seedCount: Optional[int] = None profileName: Optional[str] = None stopping: Optional[bool] = False + pausing: Optional[bool] = False + pausedAt: Optional[datetime] = None manual: bool = False cid_rev: Optional[int] = None scale: Scale = 1 @@ -1017,6 +1026,7 @@ class Crawl(BaseCrawl, CrawlConfigCore): manual: bool = False stopping: Optional[bool] = False + pausing: Optional[bool] = False qaCrawlExecSeconds: int = 0 diff --git a/backend/btrixcloud/operator/crawls.py b/backend/btrixcloud/operator/crawls.py index 70bcec66a0..e05528624f 100644 --- a/backend/btrixcloud/operator/crawls.py +++ b/backend/btrixcloud/operator/crawls.py @@ -5,7 +5,7 @@ import math from pprint import pprint from typing import Optional, Any, Sequence -from datetime import datetime +from datetime import datetime, timedelta from uuid import UUID import json @@ -79,6 +79,7 @@ # pylint: disable=too-many-public-methods, too-many-locals, too-many-branches, too-many-statements # pylint: disable=invalid-name, too-many-lines, too-many-return-statements +# pylint: disable=too-many-instance-attributes # ============================================================================ class CrawlOperator(BaseOperator): """CrawlOperator Handler""" @@ -93,6 +94,8 @@ class CrawlOperator(BaseOperator): min_avail_storage_ratio: float + paused_expires_delta: timedelta + def __init__(self, *args): super().__init__(*args) @@ -110,6 +113,13 @@ def __init__(self, *args): os.environ.get("CRAWLER_MIN_AVAIL_STORAGE_RATIO") or 0 ) + # time in minutes before paused crawl is stopped - default is 7 days + paused_crawl_limit_minutes = int( + os.environ.get("PAUSED_CRAWL_LIMIT_MINUTES", "10080") + ) + + self.paused_expires_delta = timedelta(minutes=paused_crawl_limit_minutes) + def init_routes(self, app): """init routes for this operator""" @@ -160,6 +170,7 @@ async def sync_crawls(self, data: MCSyncData): scale=spec.get("scale", 1), started=data.parent["metadata"]["creationTimestamp"], stopping=spec.get("stopping", False), + paused_at=str_to_date(spec.get("pausedAt")), timeout=spec.get("timeout") or 0, max_crawl_size=int(spec.get("maxCrawlSize") or 0), scheduled=spec.get("manual") != "1", @@ -263,6 +274,27 @@ async def sync_crawls(self, data: MCSyncData): else: status.scale = 1 + # stopping paused crawls + if crawl.paused_at: + stop_reason: Optional[StopReason] = None + state: Optional[TYPE_NON_RUNNING_STATES] = None + # Check if pause expiry limit is reached and if so, stop crawl + if dt_now() >= (crawl.paused_at + self.paused_expires_delta): + print(f"Paused crawl expiry reached, stopping crawl, id: {crawl.id}") + stop_reason = "stopped_pause_expired" + state = "stopped_pause_expired" + + # Check if paused crawl was stopped manually + elif crawl.stopping: + print(f"Paused crawl stopped by user, id: {crawl.id}") + stop_reason = "stopped_by_user" + state = "stopped_by_user" + + if stop_reason and state: + status.stopping = True + status.stopReason = stop_reason + await self.mark_finished(crawl, status, state) + children = self._load_redis(params, status, data.children) storage_path = crawl.storage.get_storage_extra_path(oid) @@ -326,7 +358,11 @@ async def sync_crawls(self, data: MCSyncData): children.extend(await self._load_qa_configmap(params, data.children)) for i in range(0, status.scale): - children.extend(self._load_crawler(params, i, status, data.children)) + children.extend( + self._load_crawler( + params, i, status, data.children, bool(crawl.paused_at) + ) + ) return { "status": status.dict(exclude_none=True), @@ -430,7 +466,8 @@ async def _load_qa_configmap(self, params, children): params["qa_source_replay_json"] = crawl_replay.json(include={"resources"}) return self.load_from_yaml("qa_configmap.yaml", params) - def _load_crawler(self, params, i, status: CrawlStatus, children): + # pylint: disable=too-many-arguments + def _load_crawler(self, params, i, status: CrawlStatus, children, paused: bool): name = f"crawl-{params['id']}-{i}" has_pod = name in children[POD] @@ -456,12 +493,12 @@ def _load_crawler(self, params, i, status: CrawlStatus, children): params["memory_limit"] = self.k8s.max_crawler_memory_size params["storage"] = pod_info.newStorage or params.get("crawler_storage") params["workers"] = params.get(worker_field) or 1 - params["do_restart"] = False - if has_pod: + params["init_crawler"] = not paused + if has_pod and not paused: restart_reason = pod_info.should_restart_pod(params.get("force_restart")) if restart_reason: print(f"Restarting {name}, reason: {restart_reason}") - params["do_restart"] = True + params["init_crawler"] = False return self.load_from_yaml("crawler.yaml", params) @@ -840,6 +877,28 @@ async def sync_crawl_state( if status.anyCrawlPodNewExit: await self.log_crashes(crawl.id, status.podStatus, redis) + if crawl.paused_at and redis: + for name in pods.keys(): + pod_status = status.podStatus[name] + if ( + pod_status.isNewExit + and pod_status.exitTime + and pod_status.reason == "done" + ): + await redis.hset(f"{crawl.id}:status", name, "interrupted") + + # remove stopping key (used for pause) unless actually stopping if: + # 1. no more crawler pods running (to allow easily to resume) + # 2. crawl has already been resumed, to allow pods to resume instantly + if ( + not crawl.stopping + and redis + and status.stopReason == "paused" + and (not crawler_running or not crawl.paused_at) + ): + await redis.delete(f"{crawl.id}:stopping") + await redis.delete("crawl-stop") + if not crawler_running or not redis: # if either crawler is not running or redis is inaccessible if not pod_done_count and self.should_mark_waiting( @@ -847,10 +906,14 @@ async def sync_crawl_state( ): # mark as waiting (if already running) await self.set_state( - "waiting_capacity", + "waiting_capacity" if not crawl.paused_at else "paused", status, crawl, - allowed_from=RUNNING_AND_STARTING_ONLY, + allowed_from=( + RUNNING_AND_STARTING_ONLY + if not crawl.paused_at + else RUNNING_AND_WAITING_STATES + ), ) if not crawler_running and redis: @@ -866,6 +929,7 @@ async def sync_crawl_state( f"Pausing redis, no running crawler pods for >{REDIS_TTL} secs" ) status.initRedis = False + elif crawler_running and not redis: # if crawler is running, but no redis, init redis status.initRedis = True @@ -874,6 +938,8 @@ async def sync_crawl_state( status.resync_after = self.fast_retry_secs return status + # only get here if at least one crawler pod is running + # update lastActiveTime if crawler is running if crawler_running: status.lastActiveTime = date_to_str(dt_now()) @@ -1303,6 +1369,9 @@ async def is_crawl_stopping( if crawl.stopping: return "stopped_by_user" + if crawl.paused_at: + return "paused" + # check timeout if timeout time exceeds elapsed time if crawl.timeout: elapsed = status.elapsedCrawlTime @@ -1424,11 +1493,14 @@ async def update_crawl_state( f"Attempting to adjust storage to {pod_info.newStorage} for {key}" ) + # check if no longer paused, clear paused stopping state + if status.stopReason == "paused" and not crawl.paused_at: + status.stopReason = None + status.stopping = False + if not status.stopReason: status.stopReason = await self.is_crawl_stopping(crawl, status, data) status.stopping = status.stopReason is not None - if status.stopping: - print("Crawl gracefully stopping: {status.stopReason}, id: {crawl.id}") # mark crawl as stopping if status.stopping: @@ -1436,6 +1508,12 @@ async def update_crawl_state( # backwards compatibility with older crawler await redis.set("crawl-stop", "1") + if status.stopReason == "paused": + print(f"Crawl paused, id: {crawl.id}") + return status + + print(f"Crawl gracefully stopping: {status.stopReason}, id: {crawl.id}") + # resolve scale if crawl.scale != status.scale: status.scale = await self._resolve_scale( diff --git a/backend/btrixcloud/operator/models.py b/backend/btrixcloud/operator/models.py index 3bd449d842..555f32deda 100644 --- a/backend/btrixcloud/operator/models.py +++ b/backend/btrixcloud/operator/models.py @@ -1,6 +1,7 @@ """Operator Models""" from collections import defaultdict +from datetime import datetime from uuid import UUID from typing import Optional, DefaultDict, Literal, Annotated, Any from pydantic import BaseModel, Field @@ -17,6 +18,8 @@ StopReason = Literal[ "stopped_by_user", + "paused", + "stopped_pause_expired", "time-limit", "size-limit", "stopped_storage_quota_reached", @@ -76,6 +79,7 @@ class CrawlSpec(BaseModel): started: str crawler_channel: str stopping: bool = False + paused_at: Optional[datetime] = None scheduled: bool = False timeout: int = 0 max_crawl_size: int = 0 diff --git a/backend/test/test_api.py b/backend/test/test_api.py index dd2b148eed..88cb0e3806 100644 --- a/backend/test/test_api.py +++ b/backend/test/test_api.py @@ -51,4 +51,5 @@ def test_api_settings(): "salesEmail": "", "supportEmail": "", "localesEnabled": None, + "pausedExpiryMinutes": 10080, } diff --git a/chart/app-templates/crawl_job.yaml b/chart/app-templates/crawl_job.yaml index 002372c65d..9f9d966d26 100644 --- a/chart/app-templates/crawl_job.yaml +++ b/chart/app-templates/crawl_job.yaml @@ -35,3 +35,6 @@ spec: storageName: "{{ storage_name }}" proxyId: "{{ proxy_id }}" + + pausedAt: "{{ pausedAt }}" + diff --git a/chart/app-templates/crawler.yaml b/chart/app-templates/crawler.yaml index 566ce4f082..4862f85cb9 100644 --- a/chart/app-templates/crawler.yaml +++ b/chart/app-templates/crawler.yaml @@ -28,7 +28,7 @@ spec: # ------- # CRAWLER # ------- -{% if not do_restart %} +{% if init_crawler %} --- apiVersion: v1 kind: Pod diff --git a/chart/templates/configmap.yaml b/chart/templates/configmap.yaml index 185684a0eb..fffed02ea6 100644 --- a/chart/templates/configmap.yaml +++ b/chart/templates/configmap.yaml @@ -20,6 +20,8 @@ data: INVITE_EXPIRE_SECONDS: "{{ .Values.invite_expire_seconds }}" + PAUSED_CRAWL_LIMIT_MINUTES: "{{ .Values.paused_crawl_limit_minutes }}" + REGISTRATION_ENABLED: "{{ .Values.registration_enabled | default 0 }}" REGISTER_TO_ORG_ID: "{{ .Values.registration_org_id }}" diff --git a/chart/values.yaml b/chart/values.yaml index b7f2f204c4..5d0d52d58b 100644 --- a/chart/values.yaml +++ b/chart/values.yaml @@ -82,6 +82,9 @@ allow_dupe_invites: "0" # number of seconds before pending invites expire - default is 7 days invite_expire_seconds: 604800 +# number of minutes before paused crawls are stopped - default is 7 days +paused_crawl_limit_minutes: 10080 + # base url for replayweb.page rwp_base_url: "https://cdn.jsdelivr.net/npm/replaywebpage@2.3.3/" diff --git a/frontend/src/features/archived-items/crawl-status.ts b/frontend/src/features/archived-items/crawl-status.ts index 030210c892..55b13b33ae 100644 --- a/frontend/src/features/archived-items/crawl-status.ts +++ b/frontend/src/features/archived-items/crawl-status.ts @@ -25,6 +25,9 @@ export class CrawlStatus extends TailwindElement { @property({ type: Boolean }) stopping = false; + @property({ type: Boolean }) + pausing = false; + @property({ type: Boolean }) hoist = false; @@ -118,6 +121,37 @@ export class CrawlStatus extends TailwindElement { label = msg("Stopping"); break; + case "pausing": + color = "var(--sl-color-violet-600)"; + icon = html``; + label = msg("Pausing"); + break; + + case "unpausing": + icon = html``; + label = msg("Resuming"); + break; + + case "paused": + color = "var(--sl-color-neutral-500)"; + icon = html``; + label = msg("Paused"); + break; + case "pending-wait": color = "var(--sl-color-violet-600)"; icon = html``; + label = msg("Stopped: Paused Too Long"); + break; + case "stopped_storage_quota_reached": color = "var(--warning)"; icon = html` diff --git a/frontend/src/features/crawl-workflows/live-workflow-status.ts b/frontend/src/features/crawl-workflows/live-workflow-status.ts index a4e11e2e08..b069103624 100644 --- a/frontend/src/features/crawl-workflows/live-workflow-status.ts +++ b/frontend/src/features/crawl-workflows/live-workflow-status.ts @@ -106,6 +106,8 @@ export class LiveWorkflowStatus extends BtrixElement { `; }); diff --git a/frontend/src/features/crawl-workflows/workflow-list.ts b/frontend/src/features/crawl-workflows/workflow-list.ts index 752fcf1c12..dc05efcdce 100644 --- a/frontend/src/features/crawl-workflows/workflow-list.ts +++ b/frontend/src/features/crawl-workflows/workflow-list.ts @@ -250,6 +250,7 @@ export class WorkflowListItem extends BtrixElement { `, )} @@ -280,11 +281,15 @@ export class WorkflowListItem extends BtrixElement { if (diff < 1000) { return ""; } - return msg( - str`Running for ${this.localize.humanizeDuration(diff, { - compact: true, - })}`, - ); + const duration = this.localize.humanizeDuration(diff, { + compact: true, + }); + + if (workflow.lastCrawlState === "paused") { + return msg(str`Active for ${duration}`); + } + + return msg(str`Running for ${duration}`); } return notSpecified; })} diff --git a/frontend/src/pages/org/workflow-detail.ts b/frontend/src/pages/org/workflow-detail.ts index 6416ef97d5..c5aaa724be 100644 --- a/frontend/src/pages/org/workflow-detail.ts +++ b/frontend/src/pages/org/workflow-detail.ts @@ -562,12 +562,41 @@ export class WorkflowDetail extends BtrixElement { const workflow = this.workflow; const archivingDisabled = isArchivingDisabled(this.org, true); + const paused = workflow.lastCrawlState === "paused"; + + const hidePauseResume = + !this.lastCrawlId || + this.isCancelingOrStoppingCrawl || + this.workflow.lastCrawlStopping; + // disable pause/resume button if desired state is already in the process of being set. + // if crawl is running, and already pausing, don't allow clicking Pausing + // if crawl not running, and already unpausing (lastCrawlPausing is false), don't allow clicking Resume + const disablePauseResume = + this.workflow.lastCrawlPausing === + (this.workflow.lastCrawlState === "running"); return html` ${when( this.workflow.isCrawlRunning, () => html` + ${when( + !hidePauseResume, + () => html` + + + ${paused ? msg("Resume") : msg("Pause")} + + `, + )} (this.openDialogName = "stop")} @@ -709,6 +738,7 @@ export class WorkflowDetail extends BtrixElement { `, )} @@ -1654,6 +1684,42 @@ export class WorkflowDetail extends BtrixElement { } } + private async pauseResume() { + if (!this.lastCrawlId) return; + + const pause = this.workflow?.lastCrawlState !== "paused"; + + try { + const data = await this.api.fetch<{ success: boolean }>( + `/orgs/${this.orgId}/crawls/${this.lastCrawlId}/${pause ? "pause" : "resume"}`, + { + method: "POST", + }, + ); + if (data.success) { + void this.fetchWorkflow(); + } else { + throw data; + } + + this.notify.toast({ + message: pause ? msg("Pausing crawl.") : msg("Resuming paused crawl."), + variant: "success", + icon: "check2-circle", + id: "crawl-pause-resume-status", + }); + } catch { + this.notify.toast({ + message: pause + ? msg("Something went wrong, couldn't pause crawl.") + : msg("Something went wrong, couldn't resume paused crawl."), + variant: "danger", + icon: "exclamation-octagon", + id: "crawl-pause-resume-status", + }); + } + } + private async cancel() { if (!this.lastCrawlId) return; diff --git a/frontend/src/types/crawlState.ts b/frontend/src/types/crawlState.ts index d8fa497a20..1a63aecb8b 100644 --- a/frontend/src/types/crawlState.ts +++ b/frontend/src/types/crawlState.ts @@ -11,12 +11,14 @@ export const WAITING_STATES = [ "starting", "waiting_capacity", "waiting_org_limit", + "paused", ] as const; // Match backend TYPE_SUCCESSFUL_STATES in models.py export const SUCCESSFUL_STATES = [ "complete", "stopped_by_user", + "stopped_pause_expired", "stopped_storage_quota_reached", "stopped_time_quota_reached", "stopped_org_readonly", diff --git a/frontend/src/types/crawler.ts b/frontend/src/types/crawler.ts index 7909373c81..bd7894505e 100644 --- a/frontend/src/types/crawler.ts +++ b/frontend/src/types/crawler.ts @@ -92,6 +92,8 @@ export type Workflow = CrawlConfig & { lastCrawlSize: number | null; lastStartedByName: string | null; lastCrawlStopping: boolean | null; + lastCrawlPausing: boolean | null; + lastCrawlExecSeconds: number | null; lastRun: string; totalSize: string | null; inactive: boolean;