Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
125 changes: 67 additions & 58 deletions backend/python/app/connectors/services/base_arango_service.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
"""ArangoDB service for interacting with the database"""

# pylint: disable=E1101, W0718
import asyncio
import datetime
Expand All @@ -9,50 +7,41 @@
from typing import Any, Dict, List, Optional, Set, Tuple

import aiohttp # type: ignore
from arango import ArangoClient # type: ignore
from arango.database import TransactionDatabase # type: ignore
from fastapi import Request # type: ignore

from app.config.configuration_service import ConfigurationService
from app.config.constants.arangodb import (
CollectionNames,
Connectors,
DepartmentNames,
GraphNames,
LegacyGraphNames,
OriginTypes,
RecordTypes,
)
from app.config.constants.arangodb import (CollectionNames, Connectors,
DepartmentNames, GraphNames,
LegacyGraphNames, OriginTypes,
RecordTypes)
from app.config.constants.http_status_code import HttpStatusCode
from app.config.constants.service import DefaultEndpoints, config_node_constants
from app.config.constants.service import (DefaultEndpoints,
config_node_constants)
from app.connectors.services.kafka_service import KafkaService
from app.models.entities import AppUserGroup, FileRecord, Record, RecordGroup, User
from app.schema.arango.documents import (
agent_schema,
agent_template_schema,
app_schema,
department_schema,
file_record_schema,
mail_record_schema,
orgs_schema,
record_group_schema,
record_schema,
team_schema,
ticket_record_schema,
user_schema,
webpage_record_schema,
)
from app.schema.arango.edges import (
basic_edge_schema,
belongs_to_schema,
is_of_type_schema,
permissions_schema,
record_relations_schema,
user_app_relation_schema,
user_drive_relation_schema,
)
from app.models.entities import (AppUserGroup, FileRecord, Record, RecordGroup,
User)
from app.schema.arango.documents import (agent_schema, agent_template_schema,
app_schema, department_schema,
file_record_schema,
mail_record_schema, orgs_schema,
record_group_schema, record_schema,
team_schema, ticket_record_schema,
user_schema, webpage_record_schema)
from app.schema.arango.edges import (basic_edge_schema, belongs_to_schema,
is_of_type_schema, permissions_schema,
record_relations_schema,
user_app_relation_schema,
user_drive_relation_schema)
from app.schema.arango.graph import EDGE_DEFINITIONS
from app.utils.time_conversion import get_epoch_timestamp_in_ms
from arango import ArangoClient # type: ignore
from arango.database import TransactionDatabase # type: ignore
from codeflash.code_utils.codeflash_wrap_decorator import \
codeflash_performance_async
from fastapi import Request # type: ignore

"""ArangoDB service for interacting with the database"""




# Collection definitions with their schemas
NODE_COLLECTIONS = [
Expand Down Expand Up @@ -1782,6 +1771,7 @@ async def remove_user_access_to_record(self, connector_name: Connectors, externa
self.logger.error(f"❌ Failed to remove user access {external_id} from {connector_name}: {str(e)}")
raise

@codeflash_performance_async
async def _remove_user_access_from_record(self, record_id: str, user_id: str) -> Dict:
"""Remove a specific user's access to a record"""
try:
Expand All @@ -1796,12 +1786,16 @@ async def _remove_user_access_from_record(self, record_id: str, user_id: str) ->
RETURN OLD
"""

cursor = self.db.aql.execute(user_removal_query, bind_vars={
"record_from": f"records/{record_id}",
"user_to": f"users/{user_id}"
})
# Use run_in_executor to avoid blocking event loop on sync DB I/O
def _execute_query():
cursor = self.db.aql.execute(user_removal_query, bind_vars={
"record_from": f"records/{record_id}",
"user_to": f"users/{user_id}"
})
return list(cursor)

removed_permissions = await asyncio.to_thread(_execute_query)

removed_permissions = list(cursor)

if removed_permissions:
self.logger.info(f"✅ Removed {len(removed_permissions)} permission(s) for user {user_id} on record {record_id}")
Expand Down Expand Up @@ -3576,6 +3570,7 @@ async def get_record_by_conversation_index(
)
return None

@codeflash_performance_async
async def get_record_owner_source_user_email(
self,
record_id: str,
Expand Down Expand Up @@ -3604,7 +3599,11 @@ async def get_record_owner_source_user_email(
"""

db = transaction if transaction else self.db
cursor = db.aql.execute(query, bind_vars={"record_id": record_id})

# Offload the blocking db.aql.execute to a thread and make it async
cursor = await asyncio.to_thread(
db.aql.execute, query, bind_vars={"record_id": record_id}
)
result = next(cursor, None)
return result

Expand All @@ -3628,33 +3627,43 @@ async def get_record_by_path(
Optional[Record]: The Record object if found, otherwise None.
"""
try:
self.logger.info(
# Move logging outside the tight path as much as possible
logger_info = self.logger.info
logger_warning = self.logger.warning
logger_error = self.logger.error

logger_info(
"🚀 Retrieving record by path for connector %s and path %s", connector_name.value, path
)

query = f"""
FOR fileRecord IN {CollectionNames.FILES.value}
FILTER fileRecord.path == @path
RETURN fileRecord
"""
query = (
f"FOR fileRecord IN {CollectionNames.FILES.value} "
"FILTER fileRecord.path == @path "
"RETURN fileRecord"
)


db = transaction if transaction else self.db
cursor = db.aql.execute(
query, bind_vars={"path": path}
query, bind_vars={"path": path}, full_count=False
)
result = next(cursor, None)

if result:
try:
result = next(cursor)
except StopIteration:
result = None

self.logger.info(

if result:
logger_info(
"✅ Successfully retrieved file record for path: %s", path
)
# record = await self.get_record_by_id(result["_key"])

# return record.id
return result
else:
self.logger.warning(
logger_warning(
"⚠️ No record found for path: %s", path
)
return None
Expand Down