Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 7 additions & 8 deletions backend/btrixcloud/background_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -515,14 +515,13 @@ async def job_finished(
if job.type != job_type:
raise HTTPException(status_code=400, detail="invalid_job_type")

if success:
if job_type == BgJobType.CREATE_REPLICA:
await self.handle_replica_job_finished(cast(CreateReplicaJob, job))
if job_type == BgJobType.DELETE_REPLICA:
await self.handle_delete_replica_job_finished(
cast(DeleteReplicaJob, job)
)
else:
if success and job_type == BgJobType.CREATE_REPLICA:
await self.handle_replica_job_finished(cast(CreateReplicaJob, job))

if job_type == BgJobType.DELETE_REPLICA:
await self.handle_delete_replica_job_finished(cast(DeleteReplicaJob, job))

if not success:
await self._send_bg_job_failure_email(job, finished)

await self.jobs.find_one_and_update(
Expand Down
43 changes: 37 additions & 6 deletions backend/btrixcloud/crawlmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -460,24 +460,55 @@ async def pause_resume_crawl(
crawl_id, {"pausedAt": date_to_str(paused_at) if paused_at else ""}
)

async def delete_crawl_configs_for_org(self, org: str) -> None:
"""Delete all crawl configs for given org"""
await self._delete_crawl_configs(f"btrix.org={org}")
async def delete_all_k8s_resources_for_org(self, oid_str: str) -> None:
"""Delete all k8s resources related to org"""
await self.delete_crawl_config_cron_jobs_for_org(oid_str)
await self.delete_bg_job_cron_jobs_for_org(oid_str)
await self.delete_crawl_jobs_for_org(oid_str)
await self.delete_profile_jobs_for_org(oid_str)

async def delete_crawl_config_by_id(self, cid: str) -> None:
"""Delete all crawl configs by id"""
await self._delete_crawl_configs(f"btrix.crawlconfig={cid}")
await self._delete_cron_jobs(f"btrix.crawlconfig={cid},role=cron-job")

async def delete_crawl_config_cron_jobs_for_org(self, oid_str: str) -> None:
"""Delete all crawl configs for given org"""
await self._delete_cron_jobs(f"btrix.org={oid_str},role=cron-job")

async def delete_bg_job_cron_jobs_for_org(self, oid_str: str) -> None:
"""Delete all background cron jobsf or org"""
await self._delete_cron_jobs(f"btrix.org={oid_str},role=cron-background-job")

async def delete_crawl_jobs_for_org(self, oid_str: str) -> None:
"""Delete all crawl jobs for given org"""
await self._delete_custom_objects(f"btrix.org={oid_str}", plural="crawljobs")

async def delete_profile_jobs_for_org(self, oid_str: str) -> None:
"""Delete all browser profile jobs for given org"""
await self._delete_custom_objects(f"btrix.org={oid_str}", plural="profilejobs")

# ========================================================================
# Internal Methods
async def _delete_crawl_configs(self, label) -> None:
"""Delete any crawl config specific resources (now only cron jobs)"""
async def _delete_cron_jobs(self, label) -> None:
"""Delete namespaced cron jobs (e.g. crawl configs, bg jobs)"""

await self.batch_api.delete_collection_namespaced_cron_job(
namespace=self.namespace,
label_selector=label,
)

async def _delete_custom_objects(self, label, plural="crawljobs") -> None:
"""Delete custom objects (e.g. crawl jobs, profile browser jobs)"""
await self.custom_api.delete_collection_namespaced_custom_object(
group="btrix.cloud",
version="v1",
namespace=self.namespace,
label_selector=label,
plural=plural,
grace_period_seconds=0,
propagation_policy="Background",
)

async def update_scheduled_job(
self, crawlconfig: CrawlConfig, userid: Optional[str] = None
) -> Optional[str]:
Expand Down
5 changes: 3 additions & 2 deletions backend/btrixcloud/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,8 @@ def main() -> None:

dbclient, mdb = init_db()

crawl_manager = CrawlManager()

settings = SettingsResponse(
registrationEnabled=is_bool(os.environ.get("REGISTRATION_ENABLED")),
jwtTokenLifetime=JWT_TOKEN_LIFETIME,
Expand Down Expand Up @@ -178,6 +180,7 @@ def main() -> None:
app,
mdb,
user_manager,
crawl_manager,
invites,
current_active_user,
)
Expand All @@ -194,8 +197,6 @@ def main() -> None:
)
sys.exit(1)

crawl_manager = CrawlManager()

crawl_log_ops = CrawlLogOps(mdb, org_ops)

storage_ops = init_storages_api(
Expand Down
1 change: 1 addition & 0 deletions backend/btrixcloud/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
MAX_BROWSER_WINDOWS = os.environ.get("MAX_BROWSER_WINDOWS") or 0

# crawl scale for constraint
# pylint: disable=invalid-name
if MAX_BROWSER_WINDOWS:
MAX_BROWSER_WINDOWS = int(MAX_BROWSER_WINDOWS)
MAX_CRAWL_SCALE = math.ceil(MAX_BROWSER_WINDOWS / NUM_BROWSERS)
Expand Down
14 changes: 13 additions & 1 deletion backend/btrixcloud/operator/crawls.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
from kubernetes.utils import parse_quantity
from redis import asyncio as exceptions

from fastapi import HTTPException

from btrixcloud.models import (
TYPE_NON_RUNNING_STATES,
TYPE_RUNNING_STATES,
Expand Down Expand Up @@ -162,7 +164,17 @@ async def sync_crawls(self, data: MCSyncData):
params["userid"] = spec.get("userid", "")

pods = data.children[POD]
org = await self.org_ops.get_org_by_id(UUID(oid))
try:
org = await self.org_ops.get_org_by_id(UUID(oid))
except HTTPException as e:
# org likely deleted, should delete this crawljob
if e.detail == "invalid_org_id":
return {
"status": status.dict(exclude_none=True),
"children": [],
"finalized": True,
}
raise

crawl = CrawlSpec(
id=crawl_id,
Expand Down
7 changes: 6 additions & 1 deletion backend/btrixcloud/operator/cronjobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,12 @@ async def make_new_crawljob(

# get org
oid = crawlconfig.oid
org = await self.org_ops.get_org_by_id(oid)
try:
org = await self.org_ops.get_org_by_id(oid)
# pylint: disable=bare-except
except:
print(f"error: error getting org {oid}, skipping schedulued job")
return self.get_finished_response(metadata)

# db create
user = None
Expand Down
6 changes: 3 additions & 3 deletions backend/btrixcloud/ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,16 +49,16 @@ def init_ops() -> Tuple[

dbclient, mdb = init_db()

crawl_manager = CrawlManager()

invite_ops = InviteOps(mdb, email)

user_manager = UserManager(mdb, email, invite_ops)

org_ops = OrgOps(mdb, invite_ops, user_manager)
org_ops = OrgOps(mdb, invite_ops, user_manager, crawl_manager)

event_webhook_ops = EventWebhookOps(mdb, org_ops)

crawl_manager = CrawlManager()

crawl_log_ops = CrawlLogOps(mdb, org_ops)

storage_ops = StorageOps(org_ops, crawl_manager, mdb)
Expand Down
20 changes: 17 additions & 3 deletions backend/btrixcloud/orgs.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,9 +104,10 @@
from .background_jobs import BackgroundJobOps
from .pages import PageOps
from .file_uploads import FileUploadOps
from .crawlmanager import CrawlManager
else:
InviteOps = BaseCrawlOps = ProfileOps = CollectionOps = object
BackgroundJobOps = UserManager = PageOps = FileUploadOps = object
BackgroundJobOps = UserManager = PageOps = FileUploadOps = CrawlManager = object


DEFAULT_ORG = os.environ.get("DEFAULT_ORG", "My Organization")
Expand All @@ -122,6 +123,7 @@ class OrgOps:

invites: InviteOps
user_manager: UserManager
crawl_manager: CrawlManager
register_to_org_id: Optional[str]
base_crawl_ops: BaseCrawlOps
default_primary: Optional[StorageRef]
Expand All @@ -132,7 +134,13 @@ class OrgOps:
org_owner_dep: Optional[Callable]
org_public: Optional[Callable]

def __init__(self, mdb, invites: InviteOps, user_manager: UserManager):
def __init__(
self,
mdb,
invites: InviteOps,
user_manager: UserManager,
crawl_manager: CrawlManager,
):
self.orgs = mdb["organizations"]
self.crawls_db = mdb["crawls"]
self.crawl_configs_db = mdb["crawl_configs"]
Expand All @@ -154,6 +162,7 @@ def __init__(self, mdb, invites: InviteOps, user_manager: UserManager):

self.invites = invites
self.user_manager = user_manager
self.crawl_manager = crawl_manager
self.register_to_org_id = os.environ.get("REGISTER_TO_ORG_ID")

def set_ops(
Expand Down Expand Up @@ -1368,6 +1377,7 @@ async def delete_org_and_data(
) -> None:
"""Delete org and all of its associated data."""
print(f"Deleting org: {org.slug} {org.name} {org.id}")

# Delete archived items
cursor = self.crawls_db.find({"oid": org.id}, projection=["_id"])
items = await cursor.to_list(length=DEL_ITEMS)
Expand Down Expand Up @@ -1417,6 +1427,9 @@ async def delete_org_and_data(
# Delete org
await self.orgs.delete_one({"_id": org.id})

# Delete related k8s objects
await self.crawl_manager.delete_all_k8s_resources_for_org(str(org.id))

async def recalculate_storage(self, org: Organization) -> dict[str, bool]:
"""Recalculate org storage use"""
try:
Expand Down Expand Up @@ -1483,13 +1496,14 @@ def init_orgs_api(
app,
mdb,
user_manager: UserManager,
crawl_manager: CrawlManager,
invites: InviteOps,
user_dep: Callable,
):
"""Init organizations api router for /orgs"""
# pylint: disable=too-many-locals,invalid-name

ops = OrgOps(mdb, invites, user_manager)
ops = OrgOps(mdb, invites, user_manager, crawl_manager)

async def org_dep(oid: UUID, user: User = Depends(user_dep)):
org = await ops.get_org_for_user_by_id(oid, user)
Expand Down
Loading