Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
307 changes: 181 additions & 126 deletions backend/btrixcloud/orgs.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,114 +8,112 @@
import math
import os
import time

from uuid import UUID, uuid4
from calendar import c
from tempfile import NamedTemporaryFile

from typing import (
Awaitable,
Optional,
TYPE_CHECKING,
Dict,
Any,
AsyncGenerator,
Awaitable,
Callable,
Dict,
List,
Literal,
AsyncGenerator,
Any,
Optional,
)
from uuid import UUID, uuid4

import json_stream
from aiostream import stream
from fastapi import APIRouter, Depends, HTTPException, Request
from fastapi.responses import StreamingResponse
from motor.motor_asyncio import AsyncIOMotorDatabase
from pydantic import ValidationError
from pymongo import ReturnDocument
from pymongo.collation import Collation
from pymongo.errors import AutoReconnect, DuplicateKeyError

from fastapi import APIRouter, Depends, HTTPException, Request
from fastapi.responses import StreamingResponse
import json_stream
from aiostream import stream

from .models import (
SUCCESSFUL_STATES,
ACTIVE,
MAX_BROWSER_WINDOWS,
MAX_CRAWL_SCALE,
PAUSED_PAYMENT_FAILED,
REASON_PAUSED,
RUNNING_STATES,
SUCCESSFUL_STATES,
WAITING_STATES,
AddedResponse,
AddedResponseId,
AddToOrgRequest,
BaseCrawl,
Collection,
ConfigRevision,
Crawl,
CrawlConfig,
CrawlConfigDefaults,
DeleteCrawlList,
DeletedResponseId,
InvitePending,
InviteToOrgRequest,
OrgAcceptInviteResponse,
Organization,
PlansResponse,
StorageRef,
OrgCreate,
OrgDeleteInviteResponse,
OrgImportResponse,
OrgInviteResponse,
OrgMetrics,
OrgOut,
OrgOutExport,
OrgProxies,
OrgPublicProfileUpdate,
OrgQuotas,
OrgQuotasIn,
OrgQuotaUpdate,
OrgReadOnlyUpdate,
OrgReadOnlyOnCancel,
OrgMetrics,
OrgReadOnlyUpdate,
OrgSlugsResponse,
OrgWebhookUrls,
OrgCreate,
OrgProxies,
Subscription,
SubscriptionUpdate,
SubscriptionCancel,
RenameOrg,
UpdateRole,
RemovePendingInvite,
RemoveFromOrg,
AddToOrgRequest,
InvitePending,
InviteToOrgRequest,
UserRole,
User,
PageWithAllQA,
PaginatedInvitePendingResponse,
PaginatedOrgOutResponse,
CrawlConfig,
Crawl,
CrawlConfigDefaults,
UploadedCrawl,
ConfigRevision,
PlansResponse,
Profile,
Collection,
OrgOut,
OrgOutExport,
PageWithAllQA,
DeleteCrawlList,
PAUSED_PAYMENT_FAILED,
REASON_PAUSED,
ACTIVE,
DeletedResponseId,
UpdatedResponse,
AddedResponse,
AddedResponseId,
SuccessResponseId,
OrgInviteResponse,
OrgAcceptInviteResponse,
OrgDeleteInviteResponse,
RemovedResponse,
OrgSlugsResponse,
OrgImportResponse,
OrgPublicProfileUpdate,
MAX_BROWSER_WINDOWS,
MAX_CRAWL_SCALE,
RemoveFromOrg,
RemovePendingInvite,
RenameOrg,
StorageRef,
Subscription,
SubscriptionCancel,
SubscriptionUpdate,
SuccessResponseId,
UpdatedResponse,
UpdateRole,
UploadedCrawl,
User,
UserRole,
)
from .pagination import DEFAULT_PAGE_SIZE, paginated_format
from .utils import (
JSONSerializer,
browser_windows_from_scale,
dt_now,
slug_from_name,
validate_slug,
get_duplicate_key_error_field,
slug_from_name,
validate_language_code,
JSONSerializer,
browser_windows_from_scale,
validate_slug,
)

if TYPE_CHECKING:
from .invites import InviteOps
from .background_jobs import BackgroundJobOps
from .basecrawls import BaseCrawlOps
from .colls import CollectionOps
from .crawlmanager import CrawlManager
from .file_uploads import FileUploadOps
from .invites import InviteOps
from .pages import PageOps
from .profiles import ProfileOps
from .users import UserManager
from .background_jobs import BackgroundJobOps
from .pages import PageOps
from .file_uploads import FileUploadOps
from .crawlmanager import CrawlManager
else:
InviteOps = BaseCrawlOps = ProfileOps = CollectionOps = object
BackgroundJobOps = UserManager = PageOps = FileUploadOps = CrawlManager = object
Expand Down Expand Up @@ -628,74 +626,131 @@ async def update_quotas(

quotas.context = None

previous_extra_mins = (
org.quotas.extraExecMinutes
if (org.quotas and org.quotas.extraExecMinutes)
else 0
)
previous_gifted_mins = (
org.quotas.giftedExecMinutes
if (org.quotas and org.quotas.giftedExecMinutes)
else 0
)

if mode == "add":
increment_update: dict[str, Any] = {
"$inc": {},
update: list[dict[str, Any]] = [
{
"$set": {
"quotaUpdates": {
"$concatArrays": [
"$quotaUpdates",
[{"modified": dt_now(), "update": {}}],
]
},
}
}
]

for field, value in quotas.model_dump(
exclude_unset=True, exclude_defaults=True, exclude_none=True
).items():
if field == "context" or value is None:
continue
inc = max(value, -org.quotas.model_dump().get(field, 0))
increment_update["$inc"][f"quotas.{field}"] = inc
computed_quotas = {}

updated_org = await self.orgs.find_one_and_update(
{"_id": org.id},
increment_update,
projection={"quotas": True},
return_document=ReturnDocument.AFTER,
)
quotas = OrgQuotasIn(**updated_org["quotas"])

update: dict[str, dict[str, dict[str, Any] | int]] = {
"$push": {
"quotaUpdates": OrgQuotaUpdate(
modified=dt_now(),
update=OrgQuotas(
**quotas.model_dump(
exclude_unset=True, exclude_defaults=True, exclude_none=True
)
),
context=context,
).model_dump()
},
"$inc": {},
"$set": {},
}

if mode == "set":
if mode == "add":
update[0]["$set"]["quotaUpdates"]["$concatArrays"][1][0][
"context"
] = context
for field, value in quotas.model_dump().items():
if field == "context":
continue
if value is None:
# set value of field in pushed update to current value in `quotas`
update[0]["$set"]["quotaUpdates"]["$concatArrays"][1][0]["update"][
field
] = f"$quotas.{field}"
computed_quotas[field] = f"$quotas.{field}"
continue
new_value = {
"$add": [
{
"$cond": {
"if": {
"$gt": [
{"$multiply": [f"$quotas.{field}", -1]},
value,
]
},
"then": {"$multiply": [f"$quotas.{field}", -1]},
"else": value,
}
},
f"$quotas.{field}",
]
}
# set value of field in pushed update to current value in quotas + increment
update[0]["$set"][f"quotas.{field}"] = new_value
update[0]["$set"]["quotaUpdates"]["$concatArrays"][1][0]["update"][
field
] = new_value
computed_quotas[field] = new_value

elif mode == "set":
increment_update = quotas.model_dump(
exclude_unset=True, exclude_defaults=True, exclude_none=True
)
update["$set"]["quotas"] = increment_update
for field, value in increment_update.items():
update[0]["$set"][f"quotas.{field}"] = value
computed_quotas[field] = value
update[0]["$set"]["quotaUpdates"]["$concatArrays"][1][0] = OrgQuotaUpdate(
modified=dt_now(),
update=OrgQuotas(
**quotas.model_dump(
exclude_unset=True, exclude_defaults=True, exclude_none=True
)
),
context=context,
).model_dump()

# Inc org available fields for extra/gifted execution time as needed
if quotas.extraExecMinutes is not None:
extra_secs_diff = (quotas.extraExecMinutes - previous_extra_mins) * 60
if org.extraExecSecondsAvailable + extra_secs_diff <= 0:
update["$set"]["extraExecSecondsAvailable"] = 0
else:
update["$inc"]["extraExecSecondsAvailable"] = extra_secs_diff
for extra_or_gifted in ["extra", "gifted"]:
previous_mins = {
"$cond": {
"if": {
"$or": [
{"$ne": [f"$quotas.{extra_or_gifted}ExecMinutes", 0]},
{"$ne": [f"$quotas.{extra_or_gifted}ExecMinutes", None]},
]
},
"then": f"$quotas.{extra_or_gifted}ExecMinutes",
"else": 0,
}
}

if quotas.giftedExecMinutes is not None:
gifted_secs_diff = (quotas.giftedExecMinutes - previous_gifted_mins) * 60
if org.giftedExecSecondsAvailable + gifted_secs_diff <= 0:
update["$set"]["giftedExecSecondsAvailable"] = 0
else:
update["$inc"]["giftedExecSecondsAvailable"] = gifted_secs_diff
secs_diff = {
"$multiply": [
{
"$subtract": [
computed_quotas[f"{extra_or_gifted}ExecMinutes"],
previous_mins,
]
},
60,
]
}

update[0]["$set"][f"{extra_or_gifted}ExecSecondsAvailable"] = {
"$cond": {
"if": {"$ne": [f"${extra_or_gifted}ExecSecondsAvailable", None]},
"then": {
"$cond": {
"if": {
"$lte": [
{
"$add": [
f"${extra_or_gifted}ExecSecondsAvailable",
secs_diff,
]
},
0,
]
},
"then": 0,
"else": {
"$add": [
f"${extra_or_gifted}ExecSecondsAvailable",
secs_diff,
]
},
}
},
"else": f"${extra_or_gifted}ExecSecondsAvailable",
}
}

await self.orgs.find_one_and_update({"_id": org.id}, update)

Expand Down
Loading