diff --git a/.github/workflows/k3d-ci.yaml b/.github/workflows/k3d-ci.yaml index 9bcb10b3aa..17cf9a564a 100644 --- a/.github/workflows/k3d-ci.yaml +++ b/.github/workflows/k3d-ci.yaml @@ -116,6 +116,11 @@ jobs: - name: Wait for all pods to be ready run: kubectl wait --for=condition=ready pod --all --timeout=240s + - name: Create Extra Test Buckets + run: | + kubectl exec -i deployment/local-minio -c minio -- mkdir /data/custom-primary && + kubectl exec -i deployment/local-minio -c minio -- mkdir /data/custom-replica + - name: Run Tests timeout-minutes: 30 run: pytest -vv ./backend/test/test_*.py diff --git a/.github/workflows/microk8s-ci.yaml b/.github/workflows/microk8s-ci.yaml index 07a3a2124e..0111915ac5 100644 --- a/.github/workflows/microk8s-ci.yaml +++ b/.github/workflows/microk8s-ci.yaml @@ -68,6 +68,11 @@ jobs: - name: Wait for all pods to be ready run: sudo microk8s kubectl wait --for=condition=ready pod --all --timeout=240s + - name: Create Extra Test Buckets + run: | + kubectl exec -i deployment/local-minio -c minio -- mkdir /data/custom-primary && + kubectl exec -i deployment/local-minio -c minio -- mkdir /data/custom-replica + - name: Run Tests run: pytest -vv ./backend/test/test_*.py diff --git a/backend/btrixcloud/background_jobs.py b/backend/btrixcloud/background_jobs.py index 49f82d12e1..4236c685b5 100644 --- a/backend/btrixcloud/background_jobs.py +++ b/backend/btrixcloud/background_jobs.py @@ -16,7 +16,6 @@ from .models import ( BaseFile, Organization, - BackgroundJob, BgJobType, CreateReplicaJob, DeleteReplicaJob, @@ -24,12 +23,15 @@ RecalculateOrgStatsJob, ReAddOrgPagesJob, OptimizePagesJob, + CopyBucketJob, PaginatedBackgroundJobResponse, AnyJob, StorageRef, User, SuccessResponse, SuccessResponseId, + JobProgress, + BackgroundJob, ) from .pagination import DEFAULT_PAGE_SIZE, paginated_format from .utils import dt_now @@ -43,7 +45,7 @@ # ============================================================================ -# pylint: disable=too-many-instance-attributes +# pylint: disable=too-many-instance-attributes, too-many-lines, too-many-return-statements, too-many-public-methods class BackgroundJobOps: """k8s background job management""" @@ -56,7 +58,7 @@ class BackgroundJobOps: migration_jobs_scale: int - # pylint: disable=too-many-locals, too-many-arguments, invalid-name + # pylint: disable=too-many-locals, too-many-arguments, too-many-positional-arguments, invalid-name def __init__(self, mdb, email, user_manager, org_ops, crawl_manager, storage_ops): self.jobs = mdb["jobs"] @@ -302,7 +304,7 @@ async def create_delete_org_job( self, org: Organization, existing_job_id: Optional[str] = None, - ) -> Optional[str]: + ) -> str: """Create background job to delete org and its data""" try: @@ -339,7 +341,7 @@ async def create_delete_org_job( except Exception as exc: # pylint: disable=raise-missing-from print(f"warning: delete org job could not be started: {exc}") - return None + return "" async def create_recalculate_org_stats_job( self, @@ -473,6 +475,73 @@ async def create_optimize_crawl_pages_job( print(f"warning: optimize pages job could not be started: {exc}") return None + async def create_copy_bucket_job( + self, + org: Organization, + prev_storage_ref: StorageRef, + new_storage_ref: StorageRef, + existing_job_id: Optional[str] = None, + ) -> str: + """Start background job to copy entire s3 bucket and return job id""" + prev_storage = self.storage_ops.get_org_storage_by_ref(org, prev_storage_ref) + prev_endpoint, prev_bucket = self.strip_bucket(prev_storage.endpoint_url) + + new_storage = self.storage_ops.get_org_storage_by_ref(org, new_storage_ref) + new_endpoint, new_bucket = self.strip_bucket(new_storage.endpoint_url) + + # Ensure buckets terminate with trailing slash + prev_bucket = os.path.join(prev_bucket, "") + new_bucket = os.path.join(new_bucket, "") + + job_type = BgJobType.COPY_BUCKET.value + + try: + job_id = await self.crawl_manager.run_copy_bucket_job( + oid=str(org.id), + job_type=job_type, + prev_storage=prev_storage_ref, + prev_endpoint=prev_endpoint, + prev_bucket=prev_bucket, + new_storage=new_storage_ref, + new_endpoint=new_endpoint, + new_bucket=new_bucket, + job_id_prefix=f"{job_type}-{org.id}", + existing_job_id=existing_job_id, + ) + if existing_job_id: + copy_job = await self.get_background_job(existing_job_id, org.id) + previous_attempt = { + "started": copy_job.started, + "finished": copy_job.finished, + } + if copy_job.previousAttempts: + copy_job.previousAttempts.append(previous_attempt) + else: + copy_job.previousAttempts = [previous_attempt] + copy_job.started = dt_now() + copy_job.finished = None + copy_job.success = None + else: + copy_job = CopyBucketJob( + id=job_id, + oid=org.id, + started=dt_now(), + prev_storage=prev_storage_ref, + new_storage=new_storage_ref, + ) + + await self.jobs.find_one_and_update( + {"_id": job_id}, {"$set": copy_job.to_dict()}, upsert=True + ) + + return job_id + # pylint: disable=broad-exception-caught + except Exception as exc: + print( + f"warning: copy bucket job could not be started for org {org.id}: {exc}" + ) + return "" + async def job_finished( self, job_id: str, @@ -498,6 +567,9 @@ async def job_finished( await self.handle_delete_replica_job_finished( cast(DeleteReplicaJob, job) ) + if job_type == BgJobType.COPY_BUCKET and job.oid: + org = await self.org_ops.get_org_by_id(job.oid) + await self.org_ops.update_read_only(org, False) else: print( f"Background job {job.id} failed, sending email to superuser", @@ -528,6 +600,9 @@ async def get_background_job( DeleteReplicaJob, DeleteOrgJob, RecalculateOrgStatsJob, + CopyBucketJob, + DeleteOrgJob, + RecalculateOrgStatsJob, ReAddOrgPagesJob, OptimizePagesJob, ]: @@ -544,33 +619,84 @@ async def get_background_job( def _get_job_by_type_from_data(self, data: dict[str, object]): """convert dict to propert background job type""" - if data["type"] == BgJobType.CREATE_REPLICA: + if data["type"] == BgJobType.CREATE_REPLICA.value: return CreateReplicaJob.from_dict(data) - if data["type"] == BgJobType.DELETE_REPLICA: + if data["type"] == BgJobType.DELETE_REPLICA.value: return DeleteReplicaJob.from_dict(data) - if data["type"] == BgJobType.RECALCULATE_ORG_STATS: + if data["type"] == BgJobType.RECALCULATE_ORG_STATS.value: return RecalculateOrgStatsJob.from_dict(data) - if data["type"] == BgJobType.READD_ORG_PAGES: + if data["type"] == BgJobType.READD_ORG_PAGES.value: return ReAddOrgPagesJob.from_dict(data) - if data["type"] == BgJobType.OPTIMIZE_PAGES: + if data["type"] == BgJobType.OPTIMIZE_PAGES.value: return OptimizePagesJob.from_dict(data) + if data["type"] == BgJobType.COPY_BUCKET.value: + return CopyBucketJob.from_dict(data) + return DeleteOrgJob.from_dict(data) + async def get_job_progress(self, job_id: str) -> JobProgress: + """Return progress of background job for supported types""" + job = await self.get_background_job(job_id) + + if job.type != BgJobType.COPY_BUCKET: + raise HTTPException(status_code=403, detail="job_type_not_supported") + + if job.success is False: + raise HTTPException(status_code=400, detail="job_failed") + + if job.finished: + return JobProgress(percentage=1.0) + + log_tail = await self.crawl_manager.tail_background_job(job_id) + if not log_tail: + raise HTTPException(status_code=400, detail="job_log_not_available") + + lines = log_tail.splitlines() + reversed_lines = list(reversed(lines)) + + progress = JobProgress(percentage=0.0) + + # Parse lines in reverse order until we find one with latest stats + for line in reversed_lines: + try: + if "ETA" not in line: + continue + + stats_groups = line.split(",") + for group in stats_groups: + group = group.strip() + if "%" in group: + progress.percentage = float(group.strip("%")) / 100 + if "ETA" in group: + eta_str = group.strip("ETA ") + # Split on white space to remove byte mark rclone sometimes + # adds to end of stats line + eta_list = eta_str.split(" ") + progress.eta = eta_list[0] + + break + # pylint: disable=bare-except + except: + continue + + return progress + async def list_background_jobs( self, org: Optional[Organization] = None, page_size: int = DEFAULT_PAGE_SIZE, page: int = 1, success: Optional[bool] = None, + running: Optional[bool] = None, job_type: Optional[str] = None, sort_by: Optional[str] = None, sort_direction: Optional[int] = -1, - ) -> Tuple[List[BackgroundJob], int]: + ) -> Tuple[List[Union[CreateReplicaJob, DeleteReplicaJob, CopyBucketJob]], int]: """List all background jobs""" # pylint: disable=duplicate-code # Zero-index page for query @@ -585,6 +711,12 @@ async def list_background_jobs( if success in (True, False): query["success"] = success + if running: + query["success"] = None + + if running is False: + query["success"] = {"$in": [True, False]} + if job_type: query["type"] = job_type @@ -676,6 +808,7 @@ async def retry_org_background_job( self, job: BackgroundJob, org: Organization ) -> Dict[str, Union[bool, Optional[str]]]: """Retry background job specific to one org""" + # pylint: disable=too-many-return-statements if job.type == BgJobType.CREATE_REPLICA: job = cast(CreateReplicaJob, job) file = await self.get_replica_job_file(job, org) @@ -736,6 +869,16 @@ async def retry_org_background_job( ) return {"success": True} + if job.type == BgJobType.COPY_BUCKET: + job = cast(CopyBucketJob, job) + await self.create_copy_bucket_job( + org, + job.prev_storage, + job.new_storage, + existing_job_id=job.id, + ) + return {"success": True} + return {"success": False} async def retry_failed_org_background_jobs( @@ -773,7 +916,7 @@ async def retry_all_failed_background_jobs( # ============================================================================ -# pylint: disable=too-many-arguments, too-many-locals, invalid-name, fixme +# pylint: disable=too-many-arguments, too-many-locals, invalid-name, fixme, too-many-positional-arguments def init_background_jobs_api( app, mdb, email, user_manager, org_ops, crawl_manager, storage_ops, user_dep ): @@ -800,6 +943,18 @@ async def get_org_background_job( """Retrieve information for background job""" return await ops.get_background_job(job_id, org.id) + @router.get( + "/{job_id}/progress", + response_model=JobProgress, + ) + async def get_job_progress( + job_id: str, + # pylint: disable=unused-argument + org: Organization = Depends(org_crawl_dep), + ): + """Return progress information for background job""" + return await ops.get_job_progress(job_id) + @app.get("/orgs/all/jobs/{job_id}", response_model=AnyJob, tags=["jobs"]) async def get_background_job_all_orgs(job_id: str, user: User = Depends(user_dep)): """Get background job from any org""" @@ -894,6 +1049,7 @@ async def list_background_jobs( pageSize: int = DEFAULT_PAGE_SIZE, page: int = 1, success: Optional[bool] = None, + running: Optional[bool] = None, jobType: Optional[str] = None, sortBy: Optional[str] = None, sortDirection: Optional[int] = -1, @@ -904,6 +1060,7 @@ async def list_background_jobs( page_size=pageSize, page=page, success=success, + running=running, job_type=jobType, sort_by=sortBy, sort_direction=sortDirection, diff --git a/backend/btrixcloud/basecrawls.py b/backend/btrixcloud/basecrawls.py index 48ed89402f..b0b77bafbb 100644 --- a/backend/btrixcloud/basecrawls.py +++ b/backend/btrixcloud/basecrawls.py @@ -49,7 +49,7 @@ # ============================================================================ -# pylint: disable=too-many-instance-attributes, too-many-public-methods, too-many-lines, too-many-branches +# pylint: disable=too-many-instance-attributes, too-many-public-methods, too-many-lines, too-many-branches, too-many-positional-arguments class BaseCrawlOps: """operations that apply to all crawls""" diff --git a/backend/btrixcloud/colls.py b/backend/btrixcloud/colls.py index ff9cc70959..352b5b670c 100644 --- a/backend/btrixcloud/colls.py +++ b/backend/btrixcloud/colls.py @@ -66,6 +66,7 @@ # ============================================================================ +# pylint: disable=too-many-positional-arguments class CollectionOps: """ops for working with named collections of crawls""" diff --git a/backend/btrixcloud/crawlconfigs.py b/backend/btrixcloud/crawlconfigs.py index 1b1e236589..2160c943ae 100644 --- a/backend/btrixcloud/crawlconfigs.py +++ b/backend/btrixcloud/crawlconfigs.py @@ -76,7 +76,7 @@ class CrawlConfigOps: """Crawl Config Operations""" - # pylint: disable=too-many-arguments, too-many-instance-attributes, too-many-public-methods + # pylint: disable=too-many-arguments, too-many-instance-attributes, too-many-public-methods, too-many-positional-arguments user_manager: UserManager org_ops: OrgOps @@ -1265,7 +1265,7 @@ async def stats_recompute_all(crawl_configs, crawls, cid: UUID): # ============================================================================ -# pylint: disable=redefined-builtin,invalid-name,too-many-locals,too-many-arguments +# pylint: disable=redefined-builtin,invalid-name,too-many-locals,too-many-arguments,too-many-positional-arguments def init_crawl_config_api( app, dbclient, diff --git a/backend/btrixcloud/crawlmanager.py b/backend/btrixcloud/crawlmanager.py index bbadb2424c..90aed035c6 100644 --- a/backend/btrixcloud/crawlmanager.py +++ b/backend/btrixcloud/crawlmanager.py @@ -21,7 +21,7 @@ # ============================================================================ -# pylint: disable=too-many-public-methods +# pylint: disable=too-many-public-methods, too-many-positional-arguments class CrawlManager(K8sAPI): """abstract crawl manager""" @@ -124,11 +124,12 @@ async def run_delete_org_job( existing_job_id: Optional[str] = None, ) -> str: """run job to delete org and all of its data""" - if existing_job_id: job_id = existing_job_id else: - job_id = f"delete-org-{oid}-{secrets.token_hex(5)}" + job_id_prefix = f"delete-org-{oid}" + # ensure name is <=63 characters + job_id = f"{job_id_prefix[:52]}-{secrets.token_hex(5)}" return await self._run_bg_job_with_ops_classes( job_id, job_type=BgJobType.DELETE_ORG.value, oid=oid @@ -212,6 +213,47 @@ async def _run_bg_job_with_ops_classes( return job_id + async def run_copy_bucket_job( + self, + oid: str, + job_type: str, + prev_storage: StorageRef, + prev_endpoint: str, + prev_bucket: str, + new_storage: StorageRef, + new_endpoint: str, + new_bucket: str, + job_id_prefix: Optional[str] = None, + existing_job_id: Optional[str] = None, + ): + """run job to copy entire contents of one s3 bucket to another""" + if existing_job_id: + job_id = existing_job_id + else: + if not job_id_prefix: + job_id_prefix = job_type + + # ensure name is <=63 characters + job_id = f"{job_id_prefix[:52]}-{secrets.token_hex(5)}" + + params = { + "id": job_id, + "oid": oid, + "job_type": job_type, + "prev_secret_name": prev_storage.get_storage_secret_name(oid), + "prev_endpoint": prev_endpoint, + "prev_bucket": prev_bucket, + "new_secret_name": new_storage.get_storage_secret_name(oid), + "new_endpoint": new_endpoint, + "new_bucket": new_bucket, + } + + data = self.templates.env.get_template("copy_job.yaml").render(params) + + await self.create_from_yaml(data) + + return job_id + async def create_crawl_job( self, crawlconfig: CrawlConfig, @@ -394,6 +436,22 @@ async def delete_crawl_config_by_id(self, cid: str) -> None: """Delete all crawl configs by id""" await self._delete_crawl_configs(f"btrix.crawlconfig={cid}") + async def tail_background_job(self, job_id: str) -> str: + """Tail running background job pod""" + pods = await self.core_api.list_namespaced_pod( + namespace=self.namespace, + label_selector=f"batch.kubernetes.io/job-name={job_id}", + ) + + if not pods.items: + return "" + + pod_name = pods.items[0].metadata.name + + return await self.core_api.read_namespaced_pod_log( + pod_name, self.namespace, tail_lines=10 + ) + # ======================================================================== # Internal Methods async def _delete_crawl_configs(self, label) -> None: diff --git a/backend/btrixcloud/crawls.py b/backend/btrixcloud/crawls.py index 3718a34bed..7661897ae6 100644 --- a/backend/btrixcloud/crawls.py +++ b/backend/btrixcloud/crawls.py @@ -72,7 +72,7 @@ # ============================================================================ -# pylint: disable=too-many-arguments, too-many-instance-attributes, too-many-public-methods +# pylint: disable=too-many-arguments, too-many-instance-attributes, too-many-public-methods, too-many-positional-arguments class CrawlOps(BaseCrawlOps): """Crawl Ops""" @@ -1115,7 +1115,7 @@ async def recompute_crawl_file_count_and_size(crawls, crawl_id: str): # ============================================================================ -# pylint: disable=too-many-arguments, too-many-locals, too-many-statements +# pylint: disable=too-many-arguments, too-many-locals, too-many-statements, too-many-positional-arguments def init_crawls_api(crawl_manager: CrawlManager, app, user_dep, *args): """API for crawl management, including crawl done callback""" # pylint: disable=invalid-name, duplicate-code diff --git a/backend/btrixcloud/db.py b/backend/btrixcloud/db.py index 22df847515..dafd99f59d 100644 --- a/backend/btrixcloud/db.py +++ b/backend/btrixcloud/db.py @@ -87,7 +87,7 @@ async def ping_db(mdb) -> None: # ============================================================================ async def update_and_prepare_db( - # pylint: disable=R0913 + # pylint: disable=R0913, too-many-positional-arguments mdb: AsyncIOMotorDatabase, user_manager: UserManager, org_ops: OrgOps, @@ -131,7 +131,7 @@ async def update_and_prepare_db( # ============================================================================ -# pylint: disable=too-many-locals, too-many-arguments +# pylint: disable=too-many-locals, too-many-arguments, too-many-positional-arguments async def run_db_migrations( mdb, user_manager, page_ops, org_ops, background_job_ops, coll_ops ): @@ -223,7 +223,7 @@ async def drop_indexes(mdb): # ============================================================================ -# pylint: disable=too-many-arguments +# pylint: disable=too-many-arguments, too-many-positional-arguments async def create_indexes( org_ops, crawl_ops, diff --git a/backend/btrixcloud/emailsender.py b/backend/btrixcloud/emailsender.py index 7651e8dc3f..c82682db2e 100644 --- a/backend/btrixcloud/emailsender.py +++ b/backend/btrixcloud/emailsender.py @@ -17,7 +17,7 @@ from .utils import is_bool, get_origin -# pylint: disable=too-few-public-methods, too-many-instance-attributes +# pylint: disable=too-few-public-methods, too-many-instance-attributes, too-many-positional-arguments class EmailSender: """SMTP Email Sender""" diff --git a/backend/btrixcloud/invites.py b/backend/btrixcloud/invites.py index fdab122d22..e5f84ca785 100644 --- a/backend/btrixcloud/invites.py +++ b/backend/btrixcloud/invites.py @@ -27,6 +27,7 @@ # ============================================================================ +# pylint: disable=too-many-positional-arguments class InviteOps: """invite users (optionally to an org), send emails and delete invites""" diff --git a/backend/btrixcloud/k8sapi.py b/backend/btrixcloud/k8sapi.py index bfaaabb656..b0a901a824 100644 --- a/backend/btrixcloud/k8sapi.py +++ b/backend/btrixcloud/k8sapi.py @@ -22,7 +22,7 @@ # ============================================================================ -# pylint: disable=too-many-instance-attributes +# pylint: disable=too-many-instance-attributes, too-many-positional-arguments class K8sAPI: """K8S API accessors""" @@ -328,7 +328,7 @@ async def has_custom_jobs_with_label(self, plural, label) -> bool: """return true/false if any crawljobs or profilejobs match given label""" try: - await self.custom_api.list_namespaced_custom_object( + resp = await self.custom_api.list_namespaced_custom_object( group="btrix.cloud", version="v1", namespace=self.namespace, @@ -336,6 +336,9 @@ async def has_custom_jobs_with_label(self, plural, label) -> bool: label_selector=label, limit=1, ) + matching_items = resp.get("items", []) + if not matching_items: + return False return True # pylint: disable=broad-exception-caught except Exception: diff --git a/backend/btrixcloud/main.py b/backend/btrixcloud/main.py index 71dd62da95..ad08a0ec24 100644 --- a/backend/btrixcloud/main.py +++ b/backend/btrixcloud/main.py @@ -266,6 +266,8 @@ def main() -> None: org_ops.set_ops(base_crawl_ops, profiles, coll_ops, background_job_ops, page_ops) + storage_ops.set_ops(background_job_ops) + user_manager.set_ops(org_ops, crawl_config_ops, base_crawl_ops) background_job_ops.set_ops(base_crawl_ops, profiles) diff --git a/backend/btrixcloud/main_op.py b/backend/btrixcloud/main_op.py index c1d79c1617..9c8ea34587 100644 --- a/backend/btrixcloud/main_op.py +++ b/backend/btrixcloud/main_op.py @@ -9,7 +9,6 @@ from .ops import init_ops from .utils import register_exit_handler - app_root = FastAPI() diff --git a/backend/btrixcloud/migrations/migration_0032_dupe_org_names.py b/backend/btrixcloud/migrations/migration_0032_dupe_org_names.py index d2c06781e0..c1f417a4d5 100644 --- a/backend/btrixcloud/migrations/migration_0032_dupe_org_names.py +++ b/backend/btrixcloud/migrations/migration_0032_dupe_org_names.py @@ -54,7 +54,7 @@ async def migrate_up(self): orgs_db, org_name_set, org_slug_set, name, org_dict.get("_id") ) - # pylint: disable=too-many-arguments + # pylint: disable=too-many-arguments, too-many-positional-arguments async def update_org_name_and_slug( self, orgs_db, diff --git a/backend/btrixcloud/models.py b/backend/btrixcloud/models.py index fa333f832f..6306d5cab4 100644 --- a/backend/btrixcloud/models.py +++ b/backend/btrixcloud/models.py @@ -693,11 +693,8 @@ def get_storage_secret_name(self, oid: str) -> str: return f"storage-cs-{self.name}-{oid[:12]}" def get_storage_extra_path(self, oid: str) -> str: - """return extra path added to the endpoint - using oid for default storages, no extra path for custom""" - if not self.custom: - return oid + "/" - return "" + """return extra oid path added to the endpoint""" + return oid + "/" # ============================================================================ @@ -1632,15 +1629,36 @@ class OrgPublicCollections(BaseModel): collections: List[PublicCollOut] = [] +# ============================================================================ +class OrgStorageRef(BaseModel): + """Input model for setting primary storage""" + + storage: StorageRef + + +# ============================================================================ +class OrgStorageReplicaRefs(BaseModel): + """Input model for setting replica storages""" + + storageReplicas: List[StorageRef] + + # ============================================================================ class OrgStorageRefs(BaseModel): - """Input model for setting primary storage + optional replicas""" + """Model for org storage references""" storage: StorageRef storageReplicas: List[StorageRef] = [] +# ============================================================================ +class OrgAllStorages(BaseModel): + """Response model for listing all available storages""" + + allStorages: List[StorageRef] + + # ============================================================================ class S3StorageIn(BaseModel): """Custom S3 Storage input model""" @@ -1655,6 +1673,7 @@ class S3StorageIn(BaseModel): bucket: str access_endpoint_url: Optional[str] = None region: str = "" + provider: str = "Other" # ============================================================================ @@ -1669,6 +1688,7 @@ class S3Storage(BaseModel): secret_key: str access_endpoint_url: str region: str = "" + provider: str = "Other" # ============================================================================ @@ -2570,6 +2590,7 @@ class BgJobType(str, Enum): RECALCULATE_ORG_STATS = "recalculate-org-stats" READD_ORG_PAGES = "readd-org-pages" OPTIMIZE_PAGES = "optimize-pages" + COPY_BUCKET = "copy-bucket" # ============================================================================ @@ -2588,7 +2609,7 @@ class BackgroundJob(BaseMongoModel): # ============================================================================ class CreateReplicaJob(BackgroundJob): - """Model for tracking create of replica jobs""" + """Model for tracking creation of replica jobs""" type: Literal[BgJobType.CREATE_REPLICA] = BgJobType.CREATE_REPLICA file_path: str @@ -2639,15 +2660,25 @@ class OptimizePagesJob(BackgroundJob): type: Literal[BgJobType.OPTIMIZE_PAGES] = BgJobType.OPTIMIZE_PAGES +# ============================================================================ +class CopyBucketJob(BackgroundJob): + """Model for tracking job to copy entire s3 bucket""" + + type: Literal[BgJobType.COPY_BUCKET] = BgJobType.COPY_BUCKET + prev_storage: StorageRef + new_storage: StorageRef + + # ============================================================================ # Union of all job types, for response model AnyJob = RootModel[ Union[ + BackgroundJob, CreateReplicaJob, DeleteReplicaJob, - BackgroundJob, DeleteOrgJob, + CopyBucketJob, RecalculateOrgStatsJob, ReAddOrgPagesJob, OptimizePagesJob, @@ -2655,6 +2686,14 @@ class OptimizePagesJob(BackgroundJob): ] +# ============================================================================ +class JobProgress(BaseModel): + """Model for reporting background job progress""" + + percentage: float + eta: Optional[str] = None + + # ============================================================================ ### GENERIC RESPONSE MODELS ### @@ -2667,6 +2706,13 @@ class UpdatedResponse(BaseModel): updated: bool +# ============================================================================ +class UpdatedResponseId(UpdatedResponse): + """Response for API endpoints that return updated + id""" + + id: Optional[str] = None + + # ============================================================================ class SuccessResponse(BaseModel): """Response for API endpoints that return success""" diff --git a/backend/btrixcloud/operator/baseoperator.py b/backend/btrixcloud/operator/baseoperator.py index b63ff9f4ae..ff3bb5ea54 100644 --- a/backend/btrixcloud/operator/baseoperator.py +++ b/backend/btrixcloud/operator/baseoperator.py @@ -136,7 +136,7 @@ async def async_init(self) -> None: print("Auto-Resize Enabled", self.enable_auto_resize) -# pylint: disable=too-many-instance-attributes, too-many-arguments +# pylint: disable=too-many-instance-attributes, too-many-arguments, too-many-positional-arguments # ============================================================================ class BaseOperator: """BaseOperator""" diff --git a/backend/btrixcloud/operator/crawls.py b/backend/btrixcloud/operator/crawls.py index 70bcec66a0..9d9fae5743 100644 --- a/backend/btrixcloud/operator/crawls.py +++ b/backend/btrixcloud/operator/crawls.py @@ -78,7 +78,7 @@ # pylint: disable=too-many-public-methods, too-many-locals, too-many-branches, too-many-statements -# pylint: disable=invalid-name, too-many-lines, too-many-return-statements +# pylint: disable=invalid-name, too-many-lines, too-many-return-statements, too-many-positional-arguments # ============================================================================ class CrawlOperator(BaseOperator): """CrawlOperator Handler""" diff --git a/backend/btrixcloud/operator/cronjobs.py b/backend/btrixcloud/operator/cronjobs.py index 9a411431e5..b811d8e26b 100644 --- a/backend/btrixcloud/operator/cronjobs.py +++ b/backend/btrixcloud/operator/cronjobs.py @@ -51,7 +51,7 @@ def get_finished_response( status=status, ) - # pylint: disable=too-many-arguments + # pylint: disable=too-many-arguments, too-many-positional-arguments async def make_new_crawljob( self, cid: UUID, diff --git a/backend/btrixcloud/orgs.py b/backend/btrixcloud/orgs.py index ac939391f1..907361f9a1 100644 --- a/backend/btrixcloud/orgs.py +++ b/backend/btrixcloud/orgs.py @@ -111,7 +111,7 @@ # ============================================================================ -# pylint: disable=too-many-public-methods, too-many-instance-attributes, too-many-locals, too-many-arguments +# pylint: disable=too-many-public-methods, too-many-instance-attributes, too-many-locals, too-many-arguments, too-many-positional-arguments class OrgOps: """Organization API operations""" @@ -496,13 +496,61 @@ async def update_slug_and_name(self, org: Organization) -> bool: ) return res is not None - async def update_storage_refs(self, org: Organization) -> bool: - """Update storage + replicas for given org""" - set_dict = org.dict(include={"storage": True, "storageReplicas": True}) + async def update_storage_refs( + self, org: Organization, replicas: bool = False + ) -> bool: + """Update storage or replica refs for given org""" + include = {} + if replicas: + include["storageReplicas"] = True + else: + include["storage"] = True + + set_dict = org.dict(include=include) res = await self.orgs.find_one_and_update({"_id": org.id}, {"$set": set_dict}) return res is not None + async def update_file_storage_refs( + self, org: Organization, previous_storage: StorageRef, new_storage: StorageRef + ) -> None: + """Update storage refs for all crawl and profile files in given org""" + await self.crawls_db.update_many( + {"oid": org.id}, + {"$set": {"files.$[].storage": dict(new_storage)}}, + ) + + await self.profiles_db.update_many( + {"oid": org.id, "resource.storage": dict(previous_storage)}, + {"$set": {"resource.storage": dict(new_storage)}}, + ) + + async def add_or_remove_file_replica_storage_refs( + self, org: Organization, storage_ref: StorageRef, remove=False + ) -> None: + """Add replica storage ref for all files in given org""" + if remove: + verb = "$pull" + else: + verb = "$push" + + await self.crawls_db.update_many( + {"oid": org.id}, + {verb: {"files.$[].replicas": dict(storage_ref)}}, + ) + + await self.profiles_db.update_many( + {"oid": org.id}, + {verb: {"resource.replicas": dict(storage_ref)}}, + ) + + async def unset_file_presigned_urls(self, org: Organization) -> None: + """Unset all presigned URLs for files in org""" + await self.crawls_db.update_many( + {"oid": org.id}, + {"$set": {"files.$[].presignedUrl": None}}, + ) + async def update_subscription_data( self, update: SubscriptionUpdate ) -> Optional[Organization]: @@ -993,6 +1041,31 @@ async def get_org_metrics(self, org: Organization) -> dict[str, int]: "publicCollectionsCount": public_collections_count, } + async def is_crawl_running(self, org: Organization) -> bool: + """Return boolean indicating whether any crawls are currently running in org""" + running_count = await self.crawls_db.count_documents( + {"oid": org.id, "state": {"$in": RUNNING_STATES}} + ) + if running_count > 0: + return True + return False + + async def has_files_stored(self, org: Organization) -> bool: + """Return boolean indicating whether any files are stored on org""" + crawl_count = await self.crawls_db.count_documents( + {"oid": org.id, "files": {"$exists": True, "$ne": []}}, + ) + if crawl_count > 0: + return True + + profile_count = await self.profiles_db.count_documents( + {"oid": org.id, "resource": {"$exists": True}}, + ) + if profile_count > 0: + return True + + return False + async def get_all_org_slugs(self) -> dict[str, list[str]]: """Return list of all org slugs.""" slugs = await self.orgs.distinct("slug", {}) diff --git a/backend/btrixcloud/pages.py b/backend/btrixcloud/pages.py index 189c2108d9..b80ff4b8d3 100644 --- a/backend/btrixcloud/pages.py +++ b/backend/btrixcloud/pages.py @@ -52,7 +52,7 @@ # ============================================================================ -# pylint: disable=too-many-instance-attributes, too-many-arguments,too-many-public-methods +# pylint: disable=too-many-instance-attributes, too-many-arguments,too-many-public-methods, too-many-positional-arguments class PageOps: """crawl pages""" @@ -1013,7 +1013,7 @@ async def process_finished_crawls(): # ============================================================================ -# pylint: disable=too-many-arguments, too-many-locals, invalid-name, fixme +# pylint: disable=too-many-arguments, too-many-locals, invalid-name, fixme, too-many-positional-arguments def init_pages_api( app, mdb, crawl_ops, org_ops, storage_ops, background_job_ops, coll_ops, user_dep ) -> PageOps: diff --git a/backend/btrixcloud/profiles.py b/backend/btrixcloud/profiles.py index 06010551d3..2693e79fe0 100644 --- a/backend/btrixcloud/profiles.py +++ b/backend/btrixcloud/profiles.py @@ -49,7 +49,7 @@ # ============================================================================ -# pylint: disable=too-many-instance-attributes, too-many-arguments +# pylint: disable=too-many-instance-attributes, too-many-arguments, too-many-positional-arguments class ProfileOps: """Profile management""" @@ -500,7 +500,7 @@ async def calculate_org_profile_file_storage(self, oid: UUID) -> int: # ============================================================================ -# pylint: disable=redefined-builtin,invalid-name,too-many-locals,too-many-arguments +# pylint: disable=redefined-builtin,invalid-name,too-many-locals,too-many-arguments, too-many-positional-arguments def init_profiles_api( mdb, org_ops: OrgOps, diff --git a/backend/btrixcloud/storages.py b/backend/btrixcloud/storages.py index 1e58521717..3e866894ac 100644 --- a/backend/btrixcloud/storages.py +++ b/backend/btrixcloud/storages.py @@ -12,6 +12,8 @@ TYPE_CHECKING, Any, cast, + Callable, + Union, ) from urllib.parse import urlsplit from contextlib import asynccontextmanager @@ -27,7 +29,7 @@ from datetime import datetime, timedelta from zipfile import ZipInfo -from fastapi import Depends, HTTPException, APIRouter +from fastapi import Depends, HTTPException, APIRouter, BackgroundTasks from stream_zip import stream_zip, NO_COMPRESSION_64, Method from remotezip import RemoteZip from aiobotocore.config import AioConfig @@ -47,8 +49,12 @@ S3Storage, S3StorageIn, OrgStorageRefs, + OrgStorageRef, + OrgStorageReplicaRefs, + OrgAllStorages, DeletedResponse, UpdatedResponse, + UpdatedResponseId, AddedResponseName, PRESIGN_DURATION_SECONDS, PresignedUrl, @@ -63,14 +69,16 @@ if TYPE_CHECKING: from .orgs import OrgOps from .crawlmanager import CrawlManager + from .background_jobs import BackgroundJobOps else: - OrgOps = CrawlManager = object + OrgOps = CrawlManager = BackgroundJobOps = object CHUNK_SIZE = 1024 * 256 # ============================================================================ -# pylint: disable=broad-except,raise-missing-from +# pylint: disable=broad-except,raise-missing-from,too-many-public-methods, too-many-positional-arguments +# pylint: disable=too-many-lines,too-many-instance-attributes class StorageOps: """All storage handling, download/upload operations""" @@ -104,6 +112,8 @@ def __init__(self, org_ops, crawl_manager, mdb) -> None: default_namespace = os.environ.get("DEFAULT_NAMESPACE", "default") self.frontend_origin = f"{frontend_origin}.{default_namespace}" + self.background_job_ops = cast(BackgroundJobOps, None) + with open(os.environ["STORAGES_JSON"], encoding="utf-8") as fh: storage_list = json.loads(fh.read()) @@ -148,6 +158,10 @@ async def init_index(self): "signedAt", expireAfterSeconds=self.expire_at_duration_seconds ) + def set_ops(self, background_job_ops: BackgroundJobOps) -> None: + """Set background job ops""" + self.background_job_ops = background_job_ops + def _create_s3_storage(self, storage: dict[str, str]) -> S3Storage: """create S3Storage object""" endpoint_url = storage["endpoint_url"] @@ -165,13 +179,14 @@ def _create_s3_storage(self, storage: dict[str, str]) -> S3Storage: endpoint_url=endpoint_url, endpoint_no_bucket_url=endpoint_no_bucket_url, access_endpoint_url=access_endpoint_url, + provider=storage.get("provider", "Other"), ) async def add_custom_storage( self, storagein: S3StorageIn, org: Organization ) -> dict: """Add new custom storage""" - name = "!" + slug_from_name(storagein.name) + name = slug_from_name(storagein.name) if name in org.customStorages: raise HTTPException(status_code=400, detail="storage_already_exists") @@ -188,7 +203,8 @@ async def add_custom_storage( region=storagein.region, endpoint_url=endpoint_url, endpoint_no_bucket_url=endpoint_no_bucket_url, - access_endpoint_url=storagein.access_endpoint_url or storagein.endpoint_url, + access_endpoint_url=storagein.access_endpoint_url or endpoint_url, + provider=storagein.provider, ) try: @@ -207,6 +223,8 @@ async def add_custom_storage( "STORE_ENDPOINT_NO_BUCKET_URL": storage.endpoint_no_bucket_url, "STORE_ACCESS_KEY": storage.access_key, "STORE_SECRET_KEY": storage.secret_key, + "STORE_REGION": storage.region, + "STORE_S3_PROVIDER": storage.provider, } await self.crawl_manager.add_org_storage( @@ -220,7 +238,7 @@ async def add_custom_storage( async def remove_custom_storage( self, name: str, org: Organization ) -> dict[str, bool]: - """remove custom storage""" + """Remove custom storage from org""" if org.storage.custom and org.storage.name == name: raise HTTPException(status_code=400, detail="storage_in_use") @@ -241,37 +259,154 @@ async def remove_custom_storage( return {"deleted": True} - async def update_storage_refs( + async def update_storage_ref( self, - storage_refs: OrgStorageRefs, + storage_refs: OrgStorageRef, org: Organization, - ) -> dict[str, bool]: - """update storage for org""" + background_tasks: BackgroundTasks, + ) -> dict[str, Union[bool, Optional[str]]]: + """Update storage for org""" + storage_ref = storage_refs.storage try: - self.get_org_storage_by_ref(org, storage_refs.storage) + self.get_org_storage_by_ref(org, storage_ref) + except: + raise HTTPException(status_code=400, detail="invalid_storage_ref") - for replica in storage_refs.storageReplicas: - self.get_org_storage_by_ref(org, replica) + if org.storage == storage_ref: + raise HTTPException(status_code=400, detail="identical_storage_ref") + + if await self.org_ops.is_crawl_running(org): + raise HTTPException(status_code=403, detail="crawl_running") + + if org.readOnly: + raise HTTPException(status_code=403, detail="org_set_to_read_only") + + _, jobs_running_count = await self.background_job_ops.list_background_jobs( + org=org, running=True + ) + if jobs_running_count > 0: + raise HTTPException(status_code=403, detail="background_jobs_running") + + prev_storage_ref = org.storage + org.storage = storage_ref + + await self.org_ops.update_storage_refs(org) + + if not await self.org_ops.has_files_stored(org): + print("No files stored, no updates to do", flush=True) + return {"updated": True, "id": None} + + await self.org_ops.update_read_only(org, True, "Updating storage") + + job_id = await self.background_job_ops.create_copy_bucket_job( + org, prev_storage_ref, storage_ref + ) + + background_tasks.add_task( + self._run_post_storage_update_tasks, + org, + prev_storage_ref, + storage_ref, + ) + + return {"updated": True, "id": job_id} + async def _run_post_storage_update_tasks( + self, + org: Organization, + prev_storage_ref: StorageRef, + new_storage_ref: StorageRef, + ): + """Handle tasks necessary after changing org storage""" + await self.org_ops.update_file_storage_refs( + org, prev_storage_ref, new_storage_ref + ) + + await self.org_ops.unset_file_presigned_urls(org) + + async def update_storage_replica_refs( + self, + storage_refs: OrgStorageReplicaRefs, + org: Organization, + background_tasks: BackgroundTasks, + ) -> dict[str, bool]: + """Update replica storage for org""" + + replicas = storage_refs.storageReplicas + + try: + for replica in replicas: + self.get_org_storage_by_ref(org, replica) except: raise HTTPException(status_code=400, detail="invalid_storage_ref") - org.storage = storage_refs.storage - org.storageReplicas = storage_refs.storageReplicas + if org.storageReplicas == replicas: + raise HTTPException(status_code=400, detail="identical_storage_ref") - await self.org_ops.update_storage_refs(org) + if await self.org_ops.is_crawl_running(org): + raise HTTPException(status_code=403, detail="crawl_running") + + if org.readOnly: + raise HTTPException(status_code=403, detail="org_set_to_read_only") + + _, jobs_running_count = await self.background_job_ops.list_background_jobs( + org=org, running=True + ) + if jobs_running_count > 0: + raise HTTPException(status_code=403, detail="background_jobs_running") + + prev_storage_replicas = org.storageReplicas + org.storageReplicas = replicas + + await self.org_ops.update_storage_refs(org, replicas=True) + + background_tasks.add_task( + self._run_post_storage_replica_update_tasks, + org, + prev_storage_replicas, + replicas, + ) return {"updated": True} - def get_available_storages(self, org: Organization) -> List[StorageRef]: + async def _run_post_storage_replica_update_tasks( + self, + org: Organization, + prev_replica_refs: List[StorageRef], + new_replica_refs: List[StorageRef], + ): + """Handle tasks necessary after updating org replica storages""" + if not await self.org_ops.has_files_stored(org): + print("No files stored, no updates to do", flush=True) + return + + # Replicate files to any new replica locations + for replica_storage in new_replica_refs: + if replica_storage not in prev_replica_refs: + await self.background_job_ops.create_copy_bucket_job( + org, org.storage, replica_storage + ) + await self.org_ops.add_or_remove_file_replica_storage_refs( + org, replica_storage + ) + + # Delete no-longer-used replica storage refs from files + # Determine if we want to delete files from the buckets as well + for replica_storage in prev_replica_refs: + if replica_storage not in new_replica_refs: + await self.org_ops.add_or_remove_file_replica_storage_refs( + org, replica_storage, remove=True + ) + + def get_available_storages(self, org: Organization) -> Dict[str, List[StorageRef]]: """return a list of available default + custom storages""" refs: List[StorageRef] = [] for name in self.default_storages: refs.append(StorageRef(name=name, custom=False)) for name in org.customStorages: refs.append(StorageRef(name=name, custom=True)) - return refs + return {"allStorages": refs} @asynccontextmanager async def get_s3_client( @@ -798,8 +933,13 @@ def _parse_json(line) -> dict: # ============================================================================ +# pylint: disable=too-many-locals def init_storages_api( - org_ops: OrgOps, crawl_manager: CrawlManager, app: APIRouter, mdb, user_dep + org_ops: OrgOps, + crawl_manager: CrawlManager, + app: APIRouter, + mdb, + user_dep: Callable, ) -> StorageOps: """API for updating storage for an org""" @@ -811,14 +951,24 @@ def init_storages_api( router = org_ops.router org_owner_dep = org_ops.org_owner_dep - @router.get("/storage", tags=["organizations"], response_model=OrgStorageRefs) + @router.get( + "/storage", tags=["organizations", "storage"], response_model=OrgStorageRefs + ) def get_storage_refs( org: Organization = Depends(org_owner_dep), ): """get storage refs for an org""" - return OrgStorageRefs(storage=org.storage, storageReplicas=org.storageReplicas) + if org.storageReplicas: + replica_refs = org.storageReplicas + else: + replica_refs = storage_ops.default_replicas + return OrgStorageRefs(storage=org.storage, storageReplicas=replica_refs) - @router.get("/allStorages", tags=["organizations"], response_model=List[StorageRef]) + @router.get( + "/all-storages", + tags=["organizations", "storage"], + response_model=OrgAllStorages, + ) def get_available_storages(org: Organization = Depends(org_owner_dep)): return storage_ops.get_available_storages(org) @@ -845,31 +995,66 @@ async def clear_all_presigned_urls(user: User = Depends(user_dep)): return {"success": True} - # pylint: disable=unreachable, fixme - # todo: enable when ready to support custom storage - return storage_ops - @router.post( - "/customStorage", tags=["organizations"], response_model=AddedResponseName + "/custom-storage", + tags=["organizations", "storage"], + response_model=AddedResponseName, ) async def add_custom_storage( - storage: S3StorageIn, org: Organization = Depends(org_owner_dep) + storage: S3StorageIn, + org: Organization = Depends(org_owner_dep), + user: User = Depends(user_dep), ): + if not user.is_superuser: + raise HTTPException(status_code=403, detail="Not Allowed") + return await storage_ops.add_custom_storage(storage, org) @router.delete( - "/customStorage/{name}", tags=["organizations"], response_model=DeletedResponse + "/custom-storage/{name}", + tags=["organizations", "storage"], + response_model=DeletedResponse, ) async def remove_custom_storage( - name: str, org: Organization = Depends(org_owner_dep) + name: str, + org: Organization = Depends(org_owner_dep), + user: User = Depends(user_dep), ): + if not user.is_superuser: + raise HTTPException(status_code=403, detail="Not Allowed") + return await storage_ops.remove_custom_storage(name, org) - @router.post("/storage", tags=["organizations"], response_model=UpdatedResponse) - async def update_storage_refs( - storage: OrgStorageRefs, + @router.post( + "/storage", tags=["organizations", "storage"], response_model=UpdatedResponseId + ) + async def update_storage_ref( + storage: OrgStorageRef, + background_tasks: BackgroundTasks, org: Organization = Depends(org_owner_dep), + user: User = Depends(user_dep), ): - return await storage_ops.update_storage_refs(storage, org) + if not user.is_superuser: + raise HTTPException(status_code=403, detail="Not Allowed") + + return await storage_ops.update_storage_ref(storage, org, background_tasks) + + @router.post( + "/storage-replicas", + tags=["organizations", "storage"], + response_model=UpdatedResponse, + ) + async def update_storage_replica_refs( + storage: OrgStorageReplicaRefs, + background_tasks: BackgroundTasks, + org: Organization = Depends(org_owner_dep), + user: User = Depends(user_dep), + ): + if not user.is_superuser: + raise HTTPException(status_code=403, detail="Not Allowed") + + return await storage_ops.update_storage_replica_refs( + storage, org, background_tasks + ) return storage_ops diff --git a/backend/btrixcloud/subs.py b/backend/btrixcloud/subs.py index fd4a359686..95c17dd158 100644 --- a/backend/btrixcloud/subs.py +++ b/backend/btrixcloud/subs.py @@ -56,6 +56,8 @@ class SubOps: """API for managing subscriptions. Only enabled if billing is enabled""" + # pylint: disable=too-many-positional-arguments + org_ops: OrgOps user_manager: UserManager @@ -332,7 +334,7 @@ async def get_billing_portal_url( return SubscriptionPortalUrlResponse() -# pylint: disable=invalid-name,too-many-arguments +# pylint: disable=invalid-name,too-many-arguments,too-many-positional-arguments def init_subs_api( app, mdb, diff --git a/backend/btrixcloud/uploads.py b/backend/btrixcloud/uploads.py index 80771f3c17..ab11ead663 100644 --- a/backend/btrixcloud/uploads.py +++ b/backend/btrixcloud/uploads.py @@ -47,7 +47,7 @@ async def get_upload( return UploadedCrawl.from_dict(res) # pylint: disable=too-many-arguments, too-many-instance-attributes, too-many-public-methods, too-many-function-args - # pylint: disable=too-many-arguments, too-many-locals, duplicate-code, invalid-name + # pylint: disable=too-many-positional-arguments, too-many-locals, duplicate-code, invalid-name async def upload_stream( self, stream, @@ -247,7 +247,7 @@ def read(self, size: Optional[int] = CHUNK_SIZE) -> bytes: # ============================================================================ -# pylint: disable=too-many-arguments, too-many-locals, invalid-name +# pylint: disable=too-many-arguments, too-many-locals, invalid-name, too-many-positional-arguments def init_uploads_api(app, user_dep, *args): """uploads api""" diff --git a/backend/btrixcloud/users.py b/backend/btrixcloud/users.py index e41a583b81..ae4e808103 100644 --- a/backend/btrixcloud/users.py +++ b/backend/btrixcloud/users.py @@ -327,7 +327,7 @@ async def request_verify( user.email, token, dict(request.headers) if request else None ) - # pylint: disable=too-many-arguments + # pylint: disable=too-many-arguments, too-many-positional-arguments async def create_user( self, name: str, diff --git a/backend/btrixcloud/webhooks.py b/backend/btrixcloud/webhooks.py index 1222ccea17..4259308627 100644 --- a/backend/btrixcloud/webhooks.py +++ b/backend/btrixcloud/webhooks.py @@ -40,7 +40,7 @@ class EventWebhookOps: """Event webhook notification management""" - # pylint: disable=invalid-name, too-many-arguments, too-many-locals + # pylint: disable=invalid-name, too-many-arguments, too-many-locals, too-many-positional-arguments org_ops: OrgOps crawl_ops: CrawlOps @@ -556,7 +556,7 @@ async def create_collection_deleted_notification( ) -# pylint: disable=too-many-arguments, too-many-locals, invalid-name, fixme +# pylint: disable=too-many-arguments, too-many-locals, invalid-name, fixme, too-many-positional-arguments def init_event_webhooks_api(mdb, org_ops, app): """init event webhooks system""" # pylint: disable=invalid-name diff --git a/backend/test/test_org.py b/backend/test/test_org.py index 755e3711b2..78011ac1b6 100644 --- a/backend/test/test_org.py +++ b/backend/test/test_org.py @@ -13,6 +13,9 @@ invite_email = "test-user@example.com" +CUSTOM_PRIMARY_STORAGE_NAME = "custom-primary" +CUSTOM_REPLICA_STORAGE_NAME = "custom-replica" + def test_ensure_only_one_default_org(admin_auth_headers): r = requests.get(f"{API_PREFIX}/orgs", headers=admin_auth_headers) @@ -231,9 +234,8 @@ def test_create_org_duplicate_slug(admin_auth_headers, non_default_org_id, slug) assert data["detail"] == "duplicate_org_slug" -# disable until storage customization is enabled -def _test_change_org_storage(admin_auth_headers): - # change to invalid storage +def test_change_storage_invalid(admin_auth_headers): + # try to change to invalid storage r = requests.post( f"{API_PREFIX}/orgs/{new_oid}/storage", headers=admin_auth_headers, @@ -242,7 +244,6 @@ def _test_change_org_storage(admin_auth_headers): assert r.status_code == 400 - # change to invalid storage r = requests.post( f"{API_PREFIX}/orgs/{new_oid}/storage", headers=admin_auth_headers, @@ -251,16 +252,159 @@ def _test_change_org_storage(admin_auth_headers): assert r.status_code == 400 - # change to valid storage + +def test_add_custom_storage_doesnt_verify(admin_auth_headers): + # verify that custom storage that can't be verified with + # a test file isn't added + r = requests.post( + f"{API_PREFIX}/orgs/{new_oid}/custom-storage", + headers=admin_auth_headers, + json={ + "name": "custom-bucket-doesnt-exist", + "access_key": "ADMIN", + "secret_key": "PASSW0RD", + "bucket": "custom-bucket-doesnt-exist", + "endpoint_url": "http://local-minio.default:9000/", + }, + ) + assert r.status_code == 400 + assert ( + r.json()["detail"] + == "Could not verify custom storage. Check credentials are valid?" + ) + + +def test_add_custom_storage(admin_auth_headers): + # add custom storages + r = requests.post( + f"{API_PREFIX}/orgs/{new_oid}/custom-storage", + headers=admin_auth_headers, + json={ + "name": CUSTOM_PRIMARY_STORAGE_NAME, + "access_key": "ADMIN", + "secret_key": "PASSW0RD", + "bucket": CUSTOM_PRIMARY_STORAGE_NAME, + "endpoint_url": "http://local-minio.default:9000/", + }, + ) + assert r.status_code == 200 + data = r.json() + assert data["added"] + assert data["name"] == CUSTOM_PRIMARY_STORAGE_NAME + + r = requests.post( + f"{API_PREFIX}/orgs/{new_oid}/custom-storage", + headers=admin_auth_headers, + json={ + "name": CUSTOM_REPLICA_STORAGE_NAME, + "access_key": "ADMIN", + "secret_key": "PASSW0RD", + "bucket": CUSTOM_REPLICA_STORAGE_NAME, + "endpoint_url": "http://local-minio.default:9000/", + }, + ) + assert r.status_code == 200 + data = r.json() + assert data["added"] + assert data["name"] == CUSTOM_REPLICA_STORAGE_NAME + + # verify custom storages are now available on org + r = requests.get( + f"{API_PREFIX}/orgs/{new_oid}/all-storages", + headers=admin_auth_headers, + ) + assert r.status_code == 200 + all_storages = r.json()["allStorages"] + assert {"name": CUSTOM_PRIMARY_STORAGE_NAME, "custom": True} in all_storages + assert {"name": CUSTOM_REPLICA_STORAGE_NAME, "custom": True} in all_storages + + # set org to use custom storage moving forward r = requests.post( f"{API_PREFIX}/orgs/{new_oid}/storage", headers=admin_auth_headers, - json={"storage": {"name": "alt-storage", "custom": False}}, + json={ + "storage": {"name": CUSTOM_PRIMARY_STORAGE_NAME, "custom": True}, + }, ) assert r.status_code == 200 assert r.json()["updated"] + # set org to use custom storage replica moving forward + r = requests.post( + f"{API_PREFIX}/orgs/{new_oid}/storage-replicas", + headers=admin_auth_headers, + json={ + "storageReplicas": [{"name": CUSTOM_REPLICA_STORAGE_NAME, "custom": True}], + }, + ) + + # check org was updated as expected + r = requests.get( + f"{API_PREFIX}/orgs/{new_oid}/storage", + headers=admin_auth_headers, + ) + assert r.status_code == 200 + data = r.json() + + storage = data["storage"] + assert storage["name"] == CUSTOM_PRIMARY_STORAGE_NAME + assert storage["custom"] + + replicas = data["storageReplicas"] + assert len(replicas) == 1 + replica = replicas[0] + assert replica["name"] == CUSTOM_REPLICA_STORAGE_NAME + assert replica["custom"] + + +def test_remove_custom_storage(admin_auth_headers): + # Try to remove in-use storages, verify we get expected 400 response + r = requests.delete( + f"{API_PREFIX}/orgs/{new_oid}/custom-storage/{CUSTOM_PRIMARY_STORAGE_NAME}", + headers=admin_auth_headers, + ) + assert r.status_code == 400 + assert r.json()["detail"] == "storage_in_use" + + r = requests.delete( + f"{API_PREFIX}/orgs/{new_oid}/custom-storage/{CUSTOM_REPLICA_STORAGE_NAME}", + headers=admin_auth_headers, + ) + assert r.status_code == 400 + assert r.json()["detail"] == "storage_in_use" + + # Unset replica storage from org + r = requests.post( + f"{API_PREFIX}/orgs/{new_oid}/storage-replicas", + headers=admin_auth_headers, + json={ + "storageReplicas": [], + }, + ) + + # Delete no longer used replica storage location + r = requests.delete( + f"{API_PREFIX}/orgs/{new_oid}/custom-storage/{CUSTOM_REPLICA_STORAGE_NAME}", + headers=admin_auth_headers, + ) + assert r.status_code == 200 + assert r.json()["deleted"] + + # Check org + r = requests.get( + f"{API_PREFIX}/orgs/{new_oid}/storage", + headers=admin_auth_headers, + ) + assert r.status_code == 200 + data = r.json() + + storage = data["storage"] + assert storage["name"] == CUSTOM_PRIMARY_STORAGE_NAME + assert storage["custom"] + + assert data["storageReplicas"] == [] + def test_remove_user_from_org(admin_auth_headers, default_org_id): # Add new user to org diff --git a/chart/app-templates/copy_job.yaml b/chart/app-templates/copy_job.yaml new file mode 100644 index 0000000000..61a8febcde --- /dev/null +++ b/chart/app-templates/copy_job.yaml @@ -0,0 +1,97 @@ +apiVersion: batch/v1 +kind: Job +metadata: + name: "{{ id }}" + labels: + job-id: "{{ id }}" + role: "background-job" + job_type: {{ job_type }} + btrix.org: {{ oid }} + +spec: + ttlSecondsAfterFinished: 30 + backoffLimit: 3 + template: + spec: + restartPolicy: Never + priorityClassName: bg-job + podFailurePolicy: + rules: + - action: FailJob + onExitCodes: + containerName: rclone + operator: NotIn + values: [0] + containers: + - name: rclone + image: rclone/rclone:latest + env: + + - name: RCLONE_CONFIG_PREV_TYPE + value: "s3" + + - name: RCLONE_CONFIG_PREV_ACCESS_KEY_ID + valueFrom: + secretKeyRef: + name: "{{ prev_secret_name }}" + key: STORE_ACCESS_KEY + + - name: RCLONE_CONFIG_PREV_SECRET_ACCESS_KEY + valueFrom: + secretKeyRef: + name: "{{ prev_secret_name }}" + key: STORE_SECRET_KEY + + - name: RCLONE_CONFIG_PREV_REGION + valueFrom: + secretKeyRef: + name: "{{ prev_secret_name }}" + key: STORE_REGION + + - name: RCLONE_CONFIG_PREV_PROVIDER + valueFrom: + secretKeyRef: + name: "{{ prev_secret_name }}" + key: STORE_S3_PROVIDER + + - name: RCLONE_CONFIG_PREV_ENDPOINT + value: "{{ prev_endpoint }}" + + - name: RCLONE_CONFIG_NEW_TYPE + value: "s3" + + - name: RCLONE_CONFIG_NEW_ACCESS_KEY_ID + valueFrom: + secretKeyRef: + name: "{{ new_secret_name }}" + key: STORE_ACCESS_KEY + + - name: RCLONE_CONFIG_NEW_SECRET_ACCESS_KEY + valueFrom: + secretKeyRef: + name: "{{ new_secret_name }}" + key: STORE_SECRET_KEY + + - name: RCLONE_CONFIG_NEW_REGION + valueFrom: + secretKeyRef: + name: "{{ new_secret_name }}" + key: STORE_REGION + + - name: RCLONE_CONFIG_NEW_PROVIDER + valueFrom: + secretKeyRef: + name: "{{ new_secret_name }}" + key: STORE_S3_PROVIDER + + - name: RCLONE_CONFIG_NEW_ENDPOINT + value: "{{ new_endpoint }}" + + command: ["rclone", "-v", "--stats-one-line", "--stats", "10s", "copy", "--checksum", "--use-mmap", "--transfers=2", "--checkers=2", "prev:{{ prev_bucket }}{{ oid }}", "new:{{ new_bucket }}{{ oid }}"] + resources: + limits: + memory: "350Mi" + + requests: + memory: "350Mi" + cpu: "50m" diff --git a/frontend/docs/docs/deploy/admin/org-storage.md b/frontend/docs/docs/deploy/admin/org-storage.md new file mode 100644 index 0000000000..ad5563f77d --- /dev/null +++ b/frontend/docs/docs/deploy/admin/org-storage.md @@ -0,0 +1,76 @@ +# Org Storage + +This guide covers configuring storage for an organization in Browsertrix. + +By default, all organizations will use the default storage and default replica locations (if any are configured) set in the Helm chart. + +The Browsertrix API supports adding custom storage locations per organization and configuring the organization to use custom storages for primary and/or replica storage. These endpoints are available to superusers only. + +## Adding Custom Storage + +The first step to configuring custom storage with an organization is to add the S3 buckets to the organization, e.g.: + +```sh +curl -X POST -H "Content-type: application/json" -H "Authorization: Bearer " https://app.browsertrix.com/api/orgs//custom-storage --data '{"name": "new-custom-storage", "access_key": "", "secret_key": "", "bucket": "new-custom-storage", "endpoint_url": "https://s3-provider.example.com/"}' +``` + +If desired, the `provider` field can be set to any of the [values supported by rclone for S3 providers](https://rclone.org/s3/#s3-provider). By default, this field is set to "Other", which should work for nearly all S3 storage providers. This value is used solely for migrating existing files with rclone when updating org storage and/or replica locations. + +Verify that the custom storage has been added to the organization by checking the `/all-storages` API endpoint: + +```sh +curl -H "Content-type: application/json" -H "Authorization: Bearer " https://app.browsertrix.com/api/orgs//all-storages +``` + +The storage reference for our new custom storage should be present in the returned JSON, e.g.: + +```json +{ + "allStorages": [ + {"name": "default", "custom": false}, + {"name": "default-replica", "custom": false}, + {"name": "new-custom-storage", "custom": true}, + ] +} +``` + +The custom storage is now ready to be configured. + + +## Updating Org Storage + +Each organization has one primary storage location. It is possible to configure the organization to use any of the storage options listed in the `/all-storages` API endpoint as primary storage, e.g.: + +```sh +curl -X POST -H "Content-type: application/json" -H "Authorization: Bearer " https://app.browsertrix.com/api/orgs//storage --data '{"storage": {"name": "new-custom-storage", "custom": true}}' +``` + +If any crawls, uploads, or browser profiles have been created on the organization, modifying the primary storage will disable archiving on the organization while files are migrated from the previous to the new storage location. Archiving is re-enabled when the migration completes. + +At this time, all files are copied from the previous storage location to the new storage location, and are not automatically deleted from their source location. + + +## Updating Org Replica Storage + +Each organization can have any number of replica storage locations. These locations serve as replicas of the content in the primary storage location, and are most commonly used as backups. + +It is possible to configure the organization to use any of the storage options listed in the `/all-storages` API endpoint as replica storage, e.g.: + +```sh +curl -X POST -H "Content-type: application/json" -H "Authorization: Bearer " https://app.browsertrix.com/api/orgs//storage-replicas --data '{"storageReplicas": [{"name": "default-replica", "custom": false}, {"new-custom-storage": true}]}' +``` + +If any crawls, uploads, or browser profiles have been created on the organization, adding a new replica location will result in a background job to replicate all of the organization's files from primary storage to the new replica location. Unlike with updating primary storage, this process will not disable archiving on the organization. + +If any crawls, uploads, or browser profiles have been created on the organization, removing a previously used replica location will result in database updates to reflect that the prior replica location is no longer available. At this time, no files are automatically deleted from the removed replica location. + + +## Removing Custom Storage + +It is also possible to remove a custom storage from an organization, referencing the storage to be deleted's name in the API endpoint, e.g.: + +```sh +curl -X DELETE -H "Content-type: application/json" -H "Authorization: Bearer " https://app.browsertrix.com/api/orgs//custom-storage/new-custom-storage +``` + +The custom storage location to be deleted must not be in use on the organization, or else the endpoint will return `400`. Default storage locations shared between organizations cannot be deleted with this endpoint. diff --git a/frontend/docs/mkdocs.yml b/frontend/docs/mkdocs.yml index bc7d16a203..5982e7d094 100644 --- a/frontend/docs/mkdocs.yml +++ b/frontend/docs/mkdocs.yml @@ -86,6 +86,7 @@ nav: - Administration: - deploy/admin/upgrade-notes.md - deploy/admin/org-import-export.md + - deploy/admin/org-storage.md - Development: - develop/index.md - develop/local-dev-setup.md