Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 68 additions & 60 deletions backend/python/app/connectors/services/base_arango_service.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
"""ArangoDB service for interacting with the database"""

# pylint: disable=E1101, W0718
import asyncio
import datetime
Expand All @@ -9,50 +7,41 @@
from typing import Any, Dict, List, Optional, Set, Tuple

import aiohttp # type: ignore
from arango import ArangoClient # type: ignore
from arango.database import TransactionDatabase # type: ignore
from fastapi import Request # type: ignore

from app.config.configuration_service import ConfigurationService
from app.config.constants.arangodb import (
CollectionNames,
Connectors,
DepartmentNames,
GraphNames,
LegacyGraphNames,
OriginTypes,
RecordTypes,
)
from app.config.constants.arangodb import (CollectionNames, Connectors,
DepartmentNames, GraphNames,
LegacyGraphNames, OriginTypes,
RecordTypes)
from app.config.constants.http_status_code import HttpStatusCode
from app.config.constants.service import DefaultEndpoints, config_node_constants
from app.config.constants.service import (DefaultEndpoints,
config_node_constants)
from app.connectors.services.kafka_service import KafkaService
from app.models.entities import AppUserGroup, FileRecord, Record, RecordGroup, User
from app.schema.arango.documents import (
agent_schema,
agent_template_schema,
app_schema,
department_schema,
file_record_schema,
mail_record_schema,
orgs_schema,
record_group_schema,
record_schema,
team_schema,
ticket_record_schema,
user_schema,
webpage_record_schema,
)
from app.schema.arango.edges import (
basic_edge_schema,
belongs_to_schema,
is_of_type_schema,
permissions_schema,
record_relations_schema,
user_app_relation_schema,
user_drive_relation_schema,
)
from app.models.entities import (AppUserGroup, FileRecord, Record, RecordGroup,
User)
from app.schema.arango.documents import (agent_schema, agent_template_schema,
app_schema, department_schema,
file_record_schema,
mail_record_schema, orgs_schema,
record_group_schema, record_schema,
team_schema, ticket_record_schema,
user_schema, webpage_record_schema)
from app.schema.arango.edges import (basic_edge_schema, belongs_to_schema,
is_of_type_schema, permissions_schema,
record_relations_schema,
user_app_relation_schema,
user_drive_relation_schema)
from app.schema.arango.graph import EDGE_DEFINITIONS
from app.utils.time_conversion import get_epoch_timestamp_in_ms
from arango import ArangoClient # type: ignore
from arango.database import TransactionDatabase # type: ignore
from codeflash.code_utils.codeflash_wrap_decorator import \
codeflash_performance_async
from fastapi import Request # type: ignore

"""ArangoDB service for interacting with the database"""




# Collection definitions with their schemas
NODE_COLLECTIONS = [
Expand Down Expand Up @@ -1782,6 +1771,7 @@ async def remove_user_access_to_record(self, connector_name: Connectors, externa
self.logger.error(f"❌ Failed to remove user access {external_id} from {connector_name}: {str(e)}")
raise

@codeflash_performance_async
async def _remove_user_access_from_record(self, record_id: str, user_id: str) -> Dict:
"""Remove a specific user's access to a record"""
try:
Expand All @@ -1796,12 +1786,16 @@ async def _remove_user_access_from_record(self, record_id: str, user_id: str) ->
RETURN OLD
"""

cursor = self.db.aql.execute(user_removal_query, bind_vars={
"record_from": f"records/{record_id}",
"user_to": f"users/{user_id}"
})
# Use run_in_executor to avoid blocking event loop on sync DB I/O
def _execute_query():
cursor = self.db.aql.execute(user_removal_query, bind_vars={
"record_from": f"records/{record_id}",
"user_to": f"users/{user_id}"
})
return list(cursor)

removed_permissions = await asyncio.to_thread(_execute_query)

removed_permissions = list(cursor)

if removed_permissions:
self.logger.info(f"✅ Removed {len(removed_permissions)} permission(s) for user {user_id} on record {record_id}")
Expand Down Expand Up @@ -3576,6 +3570,7 @@ async def get_record_by_conversation_index(
)
return None

@codeflash_performance_async
async def get_record_owner_source_user_email(
self,
record_id: str,
Expand Down Expand Up @@ -3604,7 +3599,11 @@ async def get_record_owner_source_user_email(
"""

db = transaction if transaction else self.db
cursor = db.aql.execute(query, bind_vars={"record_id": record_id})

# Offload the blocking db.aql.execute to a thread and make it async
cursor = await asyncio.to_thread(
db.aql.execute, query, bind_vars={"record_id": record_id}
)
result = next(cursor, None)
return result

Expand Down Expand Up @@ -6432,7 +6431,8 @@ async def _validate_folder_creation(self, kb_id: str, user_id: str) -> Dict:

# Check permissions
user_role = await self.get_user_kb_permission(kb_id, user_key)
if user_role not in ["OWNER", "WRITER"]:
# Faster membership checking by set for repeated validations
if user_role not in {"OWNER", "WRITER"}:
return {
"valid": False,
"success": False,
Expand Down Expand Up @@ -6665,6 +6665,14 @@ async def get_user_kb_permission(
self.logger.info(f"🔍 Checking permissions for user {user_id} on KB {kb_id}")
db = transaction if transaction else self.db


# AOQL queries can reuse bind_vars dict for both main & debug
bind_vars = {
"kb_id": kb_id,
"user_id": user_id,
"@permissions_collection": CollectionNames.PERMISSIONS_TO_KB.value,
}

query = """
FOR perm IN @@permissions_collection
FILTER perm._from == CONCAT('users/', @user_id)
Expand All @@ -6674,11 +6682,7 @@ async def get_user_kb_permission(

cursor = db.aql.execute(
query,
bind_vars={
"kb_id": kb_id,
"user_id": user_id,
"@permissions_collection": CollectionNames.PERMISSIONS_TO_KB.value,
},
bind_vars=bind_vars,
)

permission = next(cursor, None)
Expand All @@ -6702,10 +6706,7 @@ async def get_user_kb_permission(
"""
debug_cursor = db.aql.execute(
debug_query,
bind_vars={
"kb_id": kb_id,
"@permissions_collection": CollectionNames.PERMISSIONS_TO_KB.value,
},
bind_vars=bind_vars,
)
existing_perms = list(debug_cursor)
self.logger.info(f"🔍 Debug - All permissions for KB {kb_id}: {existing_perms}")
Expand Down Expand Up @@ -10531,12 +10532,19 @@ async def upload_records(
async def get_user_by_user_id(self, user_id: str) -> Optional[Dict]:
"""Get user by user ID"""
try:
query = f"""
FOR user IN {CollectionNames.USERS.value}
# Convert query to use a bind var for collection name (avoid str formatting for every call)
query = """
FOR user IN @@users_collection
FILTER user.userId == @user_id
RETURN user
"""
cursor = self.db.aql.execute(query, bind_vars={"user_id": user_id})
cursor = self.db.aql.execute(
query,
bind_vars={
"user_id": user_id,
"@users_collection": CollectionNames.USERS.value
}
)
result = next(cursor, None)
return result
except Exception as e:
Expand Down