diff --git a/crates/ov_cli/src/client.rs b/crates/ov_cli/src/client.rs index bbf2354d6..57eeedab1 100644 --- a/crates/ov_cli/src/client.rs +++ b/crates/ov_cli/src/client.rs @@ -497,12 +497,18 @@ impl HttpClient { uri: String, node_limit: i32, threshold: Option, + since: Option, + until: Option, + time_field: Option, ) -> Result { let body = serde_json::json!({ "query": query, "target_uri": uri, "limit": node_limit, "score_threshold": threshold, + "since": since, + "until": until, + "time_field": time_field, }); self.post("/api/v1/search/find", &body).await } @@ -514,6 +520,9 @@ impl HttpClient { session_id: Option, node_limit: i32, threshold: Option, + since: Option, + until: Option, + time_field: Option, ) -> Result { let body = serde_json::json!({ "query": query, @@ -521,6 +530,9 @@ impl HttpClient { "session_id": session_id, "limit": node_limit, "score_threshold": threshold, + "since": since, + "until": until, + "time_field": time_field, }); self.post("/api/v1/search/search", &body).await } diff --git a/crates/ov_cli/src/commands/search.rs b/crates/ov_cli/src/commands/search.rs index f66386a86..0c5c286aa 100644 --- a/crates/ov_cli/src/commands/search.rs +++ b/crates/ov_cli/src/commands/search.rs @@ -8,11 +8,22 @@ pub async fn find( uri: &str, node_limit: i32, threshold: Option, + since: Option<&str>, + until: Option<&str>, + time_field: Option<&str>, output_format: OutputFormat, compact: bool, ) -> Result<()> { let result = client - .find(query.to_string(), uri.to_string(), node_limit, threshold) + .find( + query.to_string(), + uri.to_string(), + node_limit, + threshold, + since.map(|s| s.to_string()), + until.map(|s| s.to_string()), + time_field.map(|s| s.to_string()), + ) .await?; output_success(&result, output_format, compact); Ok(()) @@ -25,6 +36,9 @@ pub async fn search( session_id: Option, node_limit: i32, threshold: Option, + since: Option<&str>, + until: Option<&str>, + time_field: Option<&str>, output_format: OutputFormat, compact: bool, ) -> Result<()> { @@ -35,12 +49,14 @@ pub async fn search( session_id, node_limit, threshold, + since.map(|s| s.to_string()), + until.map(|s| s.to_string()), + time_field.map(|s| s.to_string()), ) .await?; output_success(&result, output_format, compact); Ok(()) } - pub async fn grep( client: &HttpClient, uri: &str, diff --git a/crates/ov_cli/src/main.rs b/crates/ov_cli/src/main.rs index c3bd50165..cbb15067f 100644 --- a/crates/ov_cli/src/main.rs +++ b/crates/ov_cli/src/main.rs @@ -380,6 +380,12 @@ enum Commands { /// Score threshold #[arg(short, long)] threshold: Option, + /// Only include results on or after this time (e.g. 48h, 7d, 2026-03-10, ISO-8601) + #[arg(long = "after")] + after: Option, + /// Only include results on or before this time (e.g. 24h, 2026-03-15, ISO-8601) + #[arg(long = "before")] + before: Option, }, /// Run context-aware retrieval Search { @@ -402,6 +408,12 @@ enum Commands { /// Score threshold #[arg(short, long)] threshold: Option, + /// Only include results on or after this time (e.g. 48h, 7d, 2026-03-10, ISO-8601) + #[arg(long = "after")] + after: Option, + /// Only include results on or before this time (e.g. 24h, 2026-03-15, ISO-8601) + #[arg(long = "before")] + before: Option, }, /// Run content pattern search Grep { @@ -799,14 +811,23 @@ async fn main() { uri, node_limit, threshold, - } => handle_find(query, uri, node_limit, threshold, ctx).await, + after, + before, + } => handle_find(query, uri, node_limit, threshold, after, before, ctx).await, Commands::Search { query, uri, session_id, node_limit, threshold, - } => handle_search(query, uri, session_id, node_limit, threshold, ctx).await, + after, + before, + } => { + handle_search( + query, uri, session_id, node_limit, threshold, after, before, ctx, + ) + .await + } Commands::Grep { uri, exclude_uri, @@ -1297,12 +1318,15 @@ async fn handle_find( uri: String, node_limit: i32, threshold: Option, + after: Option, + before: Option, ctx: CliContext, ) -> Result<()> { let mut params = vec![format!("--uri={}", uri), format!("-n {}", node_limit)]; if let Some(t) = threshold { params.push(format!("--threshold {}", t)); } + append_time_filter_params(&mut params, after.as_deref(), before.as_deref()); params.push(format!("\"{}\"", query)); print_command_echo("ov find", ¶ms.join(" "), ctx.config.echo_command); let client = ctx.get_client(); @@ -1312,6 +1336,9 @@ async fn handle_find( &uri, node_limit, threshold, + after.as_deref(), + before.as_deref(), + None, ctx.output_format, ctx.compact, ) @@ -1324,6 +1351,8 @@ async fn handle_search( session_id: Option, node_limit: i32, threshold: Option, + after: Option, + before: Option, ctx: CliContext, ) -> Result<()> { let mut params = vec![format!("--uri={}", uri), format!("-n {}", node_limit)]; @@ -1333,6 +1362,7 @@ async fn handle_search( if let Some(t) = threshold { params.push(format!("--threshold {}", t)); } + append_time_filter_params(&mut params, after.as_deref(), before.as_deref()); params.push(format!("\"{}\"", query)); print_command_echo("ov search", ¶ms.join(" "), ctx.config.echo_command); let client = ctx.get_client(); @@ -1343,12 +1373,28 @@ async fn handle_search( session_id, node_limit, threshold, + after.as_deref(), + before.as_deref(), + None, ctx.output_format, ctx.compact, ) .await } +fn append_time_filter_params( + params: &mut Vec, + after: Option<&str>, + before: Option<&str>, +) { + if let Some(value) = after { + params.push(format!("--after {}", value)); + } + if let Some(value) = before { + params.push(format!("--before {}", value)); + } +} + /// Print command with specified parameters for debugging fn print_command_echo(command: &str, params: &str, echo_enabled: bool) { if echo_enabled { @@ -1612,4 +1658,15 @@ mod tests { assert!(result.is_err(), "removed write flags should not parse"); } + + #[test] + fn append_time_filter_params_only_emits_after_and_before() { + let mut params = Vec::new(); + let after = Some("7d".to_string()); + let before = Some("2026-03-12".to_string()); + + super::append_time_filter_params(&mut params, after.as_deref(), before.as_deref()); + + assert_eq!(params, vec!["--after 7d", "--before 2026-03-12"]); + } } diff --git a/docs/en/api/06-retrieval.md b/docs/en/api/06-retrieval.md index 18c8ec479..c3559a9c9 100644 --- a/docs/en/api/06-retrieval.md +++ b/docs/en/api/06-retrieval.md @@ -27,6 +27,9 @@ Basic vector similarity search. | limit | int | No | 10 | Maximum number of results | | score_threshold | float | No | None | Minimum relevance score threshold | | filter | Dict | No | None | Metadata filters | +| since | str | No | None | Lower time bound, accepts `2h` or ISO 8601 / `YYYY-MM-DD`, timezone-less values use local time. CLI `--after` maps to this field | +| until | str | No | None | Upper time bound, accepts `30m` or ISO 8601 / `YYYY-MM-DD`, timezone-less values use local time. CLI `--before` maps to this field | +| time_field | `"updated_at"` or `"created_at"` | No | `"updated_at"` | Metadata time field used by `since` / `until` | **FindResult Structure** @@ -59,6 +62,13 @@ class MatchedContext: ```python results = client.find("how to authenticate users") +recent_emails = client.find( + "invoice", + target_uri="viking://resources/email/", + since="7d", + time_field="created_at", +) + for ctx in results.resources: print(f"URI: {ctx.uri}") print(f"Score: {ctx.score:.3f}") @@ -87,8 +97,11 @@ curl -X POST http://localhost:1933/api/v1/search/find \ ```bash openviking find "how to authenticate users" [--uri viking://resources/] [--limit 10] +openviking find "invoice" --after 7d ``` +`--after` maps to API `since`, and `--before` maps to API `until`. + **Response** ```json @@ -184,6 +197,9 @@ Search with session context and intent analysis. | limit | int | No | 10 | Maximum number of results | | score_threshold | float | No | None | Minimum relevance score threshold | | filter | Dict | No | None | Metadata filters | +| since | str | No | None | Lower time bound, accepts `2h` or ISO 8601 / `YYYY-MM-DD`, timezone-less values use local time. CLI `--after` maps to this field | +| until | str | No | None | Upper time bound, accepts `30m` or ISO 8601 / `YYYY-MM-DD`, timezone-less values use local time. CLI `--before` maps to this field | +| time_field | `"updated_at"` or `"created_at"` | No | `"updated_at"` | Metadata time field used by `since` / `until` | **Python SDK (Embedded / HTTP)** @@ -202,7 +218,8 @@ session.add_message("assistant", [ # Search understands the conversation context results = client.search( "best practices", - session=session + session=session, + since="2h" ) for ctx in results.resources: @@ -223,6 +240,8 @@ curl -X POST http://localhost:1933/api/v1/search/search \ -d '{ "query": "best practices", "session_id": "abc123", + "since": "2h", + "time_field": "updated_at", "limit": 10 }' ``` @@ -231,8 +250,11 @@ curl -X POST http://localhost:1933/api/v1/search/search \ ```bash openviking search "best practices" [--session-id abc123] [--limit 10] +openviking search "watch vs scheduled" --after 2026-03-15 --before 2026-03-15 ``` +`--after` maps to API `since`, and `--before` maps to API `until`. + **Response** ```json diff --git a/docs/zh/api/06-retrieval.md b/docs/zh/api/06-retrieval.md index 6aee6f1f3..a1f932744 100644 --- a/docs/zh/api/06-retrieval.md +++ b/docs/zh/api/06-retrieval.md @@ -27,6 +27,9 @@ OpenViking 提供两种搜索方法:`find` 用于简单的语义搜索,`sear | limit | int | 否 | 10 | 最大返回结果数 | | score_threshold | float | 否 | None | 最低相关性分数阈值 | | filter | Dict | 否 | None | 元数据过滤器 | +| since | str | 否 | None | 时间下界,支持 `2h` 或 ISO 8601 / `YYYY-MM-DD`,不带时区的值按本地时间解释。CLI `--after` 会映射到这个字段 | +| until | str | 否 | None | 时间上界,支持 `30m` 或 ISO 8601 / `YYYY-MM-DD`,不带时区的值按本地时间解释。CLI `--before` 会映射到这个字段 | +| time_field | `"updated_at"` 或 `"created_at"` | 否 | `"updated_at"` | `since` / `until` 使用的元数据时间字段 | **FindResult 结构** @@ -59,6 +62,13 @@ class MatchedContext: ```python results = client.find("how to authenticate users") +recent_emails = client.find( + "invoice", + target_uri="viking://resources/email/", + since="7d", + time_field="created_at", +) + for ctx in results.resources: print(f"URI: {ctx.uri}") print(f"Score: {ctx.score:.3f}") @@ -87,8 +97,11 @@ curl -X POST http://localhost:1933/api/v1/search/find \ ```bash openviking find "how to authenticate users" [--uri viking://resources/] [--limit 10] +openviking find "invoice" --after 7d ``` +`--after` 会映射为 API `since`,`--before` 会映射为 API `until`。 + **响应** ```json @@ -184,6 +197,9 @@ curl -X POST http://localhost:1933/api/v1/search/find \ | limit | int | 否 | 10 | 最大返回结果数 | | score_threshold | float | 否 | None | 最低相关性分数阈值 | | filter | Dict | 否 | None | 元数据过滤器 | +| since | str | 否 | None | 时间下界,支持 `2h` 或 ISO 8601 / `YYYY-MM-DD`,不带时区的值按本地时间解释。CLI `--after` 会映射到这个字段 | +| until | str | 否 | None | 时间上界,支持 `30m` 或 ISO 8601 / `YYYY-MM-DD`,不带时区的值按本地时间解释。CLI `--before` 会映射到这个字段 | +| time_field | `"updated_at"` 或 `"created_at"` | 否 | `"updated_at"` | `since` / `until` 使用的元数据时间字段 | **Python SDK (Embedded / HTTP)** @@ -202,7 +218,8 @@ session.add_message("assistant", [ # 搜索能够理解对话上下文 results = client.search( "best practices", - session=session + session=session, + since="2h" ) for ctx in results.resources: @@ -223,6 +240,8 @@ curl -X POST http://localhost:1933/api/v1/search/search \ -d '{ "query": "best practices", "session_id": "abc123", + "since": "2h", + "time_field": "updated_at", "limit": 10 }' ``` @@ -231,8 +250,11 @@ curl -X POST http://localhost:1933/api/v1/search/search \ ```bash openviking search "best practices" [--session-id abc123] [--limit 10] +openviking search "watch vs scheduled" --after 2026-03-15 --before 2026-03-15 ``` +`--after` 会映射为 API `since`,`--before` 会映射为 API `until`。 + **响应** ```json diff --git a/openviking/async_client.py b/openviking/async_client.py index 7c564cb93..ab56d520d 100644 --- a/openviking/async_client.py +++ b/openviking/async_client.py @@ -314,6 +314,9 @@ async def search( score_threshold: Optional[float] = None, filter: Optional[Dict] = None, telemetry: TelemetryRequest = False, + since: Optional[str] = None, + until: Optional[str] = None, + time_field: Optional[str] = None, ): """ Complex search with session context. @@ -339,6 +342,9 @@ async def search( score_threshold=score_threshold, filter=filter, telemetry=telemetry, + since=since, + until=until, + time_field=time_field, ) async def find( @@ -349,6 +355,9 @@ async def find( score_threshold: Optional[float] = None, filter: Optional[Dict] = None, telemetry: TelemetryRequest = False, + since: Optional[str] = None, + until: Optional[str] = None, + time_field: Optional[str] = None, ): """Semantic search""" await self._ensure_initialized() @@ -359,6 +368,9 @@ async def find( score_threshold=score_threshold, filter=filter, telemetry=telemetry, + since=since, + until=until, + time_field=time_field, ) # ============= FS methods ============= diff --git a/openviking/client/local.py b/openviking/client/local.py index ab7ab6f51..961d43c6a 100644 --- a/openviking/client/local.py +++ b/openviking/client/local.py @@ -14,6 +14,7 @@ attach_telemetry_payload, run_with_telemetry, ) +from openviking.utils.search_filters import merge_time_filter from openviking_cli.client.base import BaseClient from openviking_cli.session.user_id import UserIdentifier from openviking_cli.utils import run_async @@ -31,6 +32,21 @@ def _to_jsonable(value: Any) -> Any: return value +def _resolve_search_filter( + filter: Optional[Dict[str, Any]], + since: Optional[str], + until: Optional[str], + time_field: Optional[str], +) -> Optional[Dict[str, Any]]: + """Merge optional retrieval time bounds into the metadata filter.""" + return merge_time_filter( + filter, + since=since, + until=until, + time_field=time_field, + ) + + class LocalClient(BaseClient): """Local Client for OpenViking (embedded mode). @@ -264,8 +280,12 @@ async def find( score_threshold: Optional[float] = None, filter: Optional[Dict[str, Any]] = None, telemetry: TelemetryRequest = False, + since: Optional[str] = None, + until: Optional[str] = None, + time_field: Optional[str] = None, ) -> Any: """Semantic search without session context.""" + resolved_filter = _resolve_search_filter(filter, since, until, time_field) execution = await run_with_telemetry( operation="search.find", telemetry=telemetry, @@ -275,7 +295,7 @@ async def find( target_uri=target_uri, limit=limit, score_threshold=score_threshold, - filter=filter, + filter=resolved_filter, ), ) return attach_telemetry_payload( @@ -292,8 +312,12 @@ async def search( score_threshold: Optional[float] = None, filter: Optional[Dict[str, Any]] = None, telemetry: TelemetryRequest = False, + since: Optional[str] = None, + until: Optional[str] = None, + time_field: Optional[str] = None, ) -> Any: """Semantic search with optional session context.""" + resolved_filter = _resolve_search_filter(filter, since, until, time_field) async def _search(): session = None @@ -307,7 +331,7 @@ async def _search(): session=session, limit=limit, score_threshold=score_threshold, - filter=filter, + filter=resolved_filter, ) execution = await run_with_telemetry( @@ -443,7 +467,6 @@ async def add_message( If both content and parts are provided, parts takes precedence. """ - from openviking.message.part import Part, TextPart, part_from_dict session = self._service.sessions.session(self._ctx, session_id) diff --git a/openviking/server/routers/search.py b/openviking/server/routers/search.py index 001351fca..48389f306 100644 --- a/openviking/server/routers/search.py +++ b/openviking/server/routers/search.py @@ -3,9 +3,9 @@ """Search endpoints for OpenViking HTTP Server.""" import math -from typing import Any, Dict, Optional +from typing import Any, Dict, Literal, Optional -from fastapi import APIRouter, Depends +from fastapi import APIRouter, Depends, HTTPException from pydantic import BaseModel from openviking.pyagfs.exceptions import AGFSClientError, AGFSNotFoundError @@ -15,6 +15,7 @@ from openviking.server.models import Response from openviking.server.telemetry import run_operation from openviking.telemetry import TelemetryRequest +from openviking.utils.search_filters import merge_time_filter from openviking_cli.exceptions import NotFoundError @@ -32,6 +33,28 @@ def _sanitize_floats(obj: Any) -> Any: router = APIRouter(prefix="/api/v1/search", tags=["search"]) +TimeField = Literal["updated_at", "created_at"] + + +def _resolve_search_limit(limit: int, node_limit: Optional[int]) -> int: + return node_limit if node_limit is not None else limit + + +def _resolve_search_filter( + request_filter: Optional[Dict[str, Any]], + since: Optional[str], + until: Optional[str], + time_field: Optional[TimeField], +) -> Optional[Dict[str, Any]]: + try: + return merge_time_filter( + request_filter, + since=since, + until=until, + time_field=time_field, + ) + except ValueError as exc: + raise HTTPException(status_code=422, detail=str(exc)) from exc class FindRequest(BaseModel): @@ -44,6 +67,10 @@ class FindRequest(BaseModel): score_threshold: Optional[float] = None filter: Optional[Dict[str, Any]] = None include_provenance: bool = False + + since: Optional[str] = None + until: Optional[str] = None + time_field: Optional[TimeField] = None telemetry: TelemetryRequest = False @@ -58,6 +85,10 @@ class SearchRequest(BaseModel): score_threshold: Optional[float] = None filter: Optional[Dict[str, Any]] = None include_provenance: bool = False + + since: Optional[str] = None + until: Optional[str] = None + time_field: Optional[TimeField] = None telemetry: TelemetryRequest = False @@ -87,7 +118,13 @@ async def find( ): """Semantic search without session context.""" service = get_service() - actual_limit = request.node_limit if request.node_limit is not None else request.limit + actual_limit = _resolve_search_limit(request.limit, request.node_limit) + effective_filter = _resolve_search_filter( + request.filter, + request.since, + request.until, + request.time_field, + ) execution = await run_operation( operation="search.find", telemetry=request.telemetry, @@ -97,7 +134,7 @@ async def find( target_uri=request.target_uri, limit=actual_limit, score_threshold=request.score_threshold, - filter=request.filter, + filter=effective_filter, ), ) result = execution.result @@ -118,13 +155,19 @@ async def search( ): """Semantic search with optional session context.""" service = get_service() + actual_limit = _resolve_search_limit(request.limit, request.node_limit) + effective_filter = _resolve_search_filter( + request.filter, + request.since, + request.until, + request.time_field, + ) async def _search(): session = None if request.session_id: session = service.sessions.session(_ctx, request.session_id) await session.load() - actual_limit = request.node_limit if request.node_limit is not None else request.limit return await service.search.search( query=request.query, ctx=_ctx, @@ -132,7 +175,7 @@ async def _search(): session=session, limit=actual_limit, score_threshold=request.score_threshold, - filter=request.filter, + filter=effective_filter, ) execution = await run_operation( diff --git a/openviking/storage/content_write.py b/openviking/storage/content_write.py index 9d5343620..519dfc018 100644 --- a/openviking/storage/content_write.py +++ b/openviking/storage/content_write.py @@ -329,6 +329,7 @@ async def _vectorize_single_file( parent_uri=parent.uri, context_type=context_type, ctx=ctx, + preserve_existing_created_at=True, ) async def _summary_dict_for_vectorize( diff --git a/openviking/sync_client.py b/openviking/sync_client.py index 5d65d628c..48efaadb6 100644 --- a/openviking/sync_client.py +++ b/openviking/sync_client.py @@ -165,11 +165,24 @@ def search( score_threshold: Optional[float] = None, filter: Optional[Dict] = None, telemetry: TelemetryRequest = False, + since: Optional[str] = None, + until: Optional[str] = None, + time_field: Optional[str] = None, ): """Execute complex retrieval (intent analysis, hierarchical retrieval).""" return run_async( self._async_client.search( - query, target_uri, session, session_id, limit, score_threshold, filter, telemetry + query=query, + target_uri=target_uri, + session=session, + session_id=session_id, + limit=limit, + score_threshold=score_threshold, + filter=filter, + telemetry=telemetry, + since=since, + until=until, + time_field=time_field, ) ) @@ -181,6 +194,9 @@ def find( score_threshold: Optional[float] = None, filter: Optional[Dict] = None, telemetry: TelemetryRequest = False, + since: Optional[str] = None, + until: Optional[str] = None, + time_field: Optional[str] = None, ): """Quick retrieval""" return run_async( @@ -191,6 +207,9 @@ def find( score_threshold, filter, telemetry, + since, + until, + time_field, ) ) diff --git a/openviking/utils/embedding_utils.py b/openviking/utils/embedding_utils.py index 4d3b25b39..ef9151e0b 100644 --- a/openviking/utils/embedding_utils.py +++ b/openviking/utils/embedding_utils.py @@ -7,7 +7,7 @@ """ import os -from datetime import datetime +from datetime import datetime, timezone from typing import Dict, Optional from openviking.core.context import Context, ContextLevel, ResourceContentType, Vectorize @@ -16,6 +16,7 @@ from openviking.storage.queuefs import get_queue_manager from openviking.storage.queuefs.embedding_msg_converter import EmbeddingMsgConverter from openviking.storage.viking_fs import get_viking_fs +from openviking.utils.time_utils import parse_iso_datetime from openviking_cli.utils import VikingURI, get_logger from openviking_cli.utils.config import get_openviking_config @@ -47,6 +48,61 @@ def _owner_space_for_uri(uri: str, ctx: RequestContext) -> str: return "" +def _coerce_datetime(value: object) -> Optional[datetime]: + if isinstance(value, datetime): + return value + if isinstance(value, str) and value: + try: + return parse_iso_datetime(value) + except Exception: + return None + return None + + +async def _get_existing_created_at( + uri: str, + ctx: Optional[RequestContext], +) -> Optional[datetime]: + if ctx is None: + return None + try: + from openviking.server.dependencies import get_service + + service = get_service() + if not service or not service.vikingdb_manager: + return None + record = await service.vikingdb_manager.fetch_by_uri(uri, ctx=ctx) + if not record: + return None + return _coerce_datetime(record.get("created_at")) + except Exception: + return None + + +async def _resolve_context_timestamps( + uri: str, + ctx: Optional[RequestContext], + *, + preserve_existing_created_at: bool = False, +) -> tuple[datetime, datetime]: + updated_at = datetime.now(timezone.utc) + try: + stat_result = await get_viking_fs().stat(uri, ctx=ctx) + stat_mod_time = _coerce_datetime((stat_result or {}).get("modTime")) + if stat_mod_time is not None: + updated_at = stat_mod_time + except Exception: + pass + + created_at = updated_at + if preserve_existing_created_at: + existing_created_at = await _get_existing_created_at(uri, ctx) + if existing_created_at is not None: + created_at = existing_created_at + + return created_at, updated_at + + def get_resource_content_type(file_name: str) -> Optional[ResourceContentType]: """Determine resource content type based on file extension. @@ -155,6 +211,8 @@ async def vectorize_directory_meta( parent_uri = VikingURI(uri).parent.uri owner_space = _owner_space_for_uri(uri, ctx) + created_at, updated_at = await _resolve_context_timestamps(uri, ctx) + # Vectorize L0: .abstract.md (abstract) context_abstract = Context( uri=uri, @@ -163,6 +221,8 @@ async def vectorize_directory_meta( abstract=abstract, context_type=context_type, level=ContextLevel.ABSTRACT, + created_at=created_at, + updated_at=updated_at, user=ctx.user, account_id=ctx.account_id, owner_space=owner_space, @@ -190,6 +250,8 @@ async def vectorize_directory_meta( abstract=abstract, context_type=context_type, level=ContextLevel.OVERVIEW, + created_at=created_at, + updated_at=updated_at, user=ctx.user, account_id=ctx.account_id, owner_space=owner_space, @@ -219,6 +281,7 @@ async def vectorize_file( ctx: Optional[RequestContext] = None, semantic_msg_id: Optional[str] = None, use_summary: bool = False, + preserve_existing_created_at: bool = False, ) -> None: """ Vectorize a single file. @@ -241,13 +304,20 @@ async def vectorize_file( file_name = summary_dict.get("name") or os.path.basename(file_path) summary = summary_dict.get("summary", "") + created_at, updated_at = await _resolve_context_timestamps( + file_path, + ctx, + preserve_existing_created_at=preserve_existing_created_at, + ) + context = Context( uri=file_path, parent_uri=parent_uri, is_leaf=True, abstract=summary, context_type=context_type, - created_at=datetime.now(), + created_at=created_at, + updated_at=updated_at, user=ctx.user, account_id=ctx.account_id, owner_space=_owner_space_for_uri(file_path, ctx), diff --git a/openviking/utils/search_filters.py b/openviking/utils/search_filters.py new file mode 100644 index 000000000..fc6bba822 --- /dev/null +++ b/openviking/utils/search_filters.py @@ -0,0 +1,157 @@ +# Copyright (c) 2026 Beijing Volcano Engine Technology Co., Ltd. +# SPDX-License-Identifier: AGPL-3.0 + +from __future__ import annotations + +import re +from datetime import datetime, time, timedelta, timezone +from typing import Any, Dict, Literal, Optional + +from openviking.utils.time_utils import format_iso8601, parse_iso_datetime + +_DATE_ONLY_RE = re.compile(r"^\d{4}-\d{2}-\d{2}$") +_RELATIVE_RE = re.compile(r"^(?P\d+)(?P[smhdw])$") +TimeField = Literal["updated_at", "created_at"] +VALID_TIME_FIELDS = {"updated_at", "created_at"} + + +def merge_time_filter( + existing_filter: Optional[Dict[str, Any]], + since: Optional[str] = None, + until: Optional[str] = None, + time_field: Optional[TimeField] = None, + now: Optional[datetime] = None, +) -> Optional[Dict[str, Any]]: + """Merge relative or absolute time bounds into an existing metadata filter tree.""" + since_dt, until_dt = resolve_time_bounds(since=since, until=until, now=now) + if since_dt is None and until_dt is None: + return existing_filter + + time_filter: Dict[str, Any] = { + "op": "time_range", + "field": normalize_time_field(time_field), + } + + if since_dt is not None: + time_filter["gte"] = _serialize_time_value(since_dt) + if until_dt is not None: + time_filter["lte"] = _serialize_time_value(until_dt) + + if not existing_filter: + return time_filter + # Preserve any caller-supplied metadata predicates by AND-ing the time range + # into the existing filter tree instead of replacing it. + return {"op": "and", "conds": [existing_filter, time_filter]} + + +def normalize_time_field(time_field: Optional[str]) -> str: + normalized = (time_field or "updated_at").strip() or "updated_at" + if normalized not in VALID_TIME_FIELDS: + raise ValueError("time_field must be one of: updated_at, created_at") + return normalized + + +def resolve_time_bounds( + since: Optional[str] = None, + until: Optional[str] = None, + now: Optional[datetime] = None, + *, + lower_label: str = "since", + upper_label: str = "until", +) -> tuple[Optional[datetime], Optional[datetime]]: + """Resolve relative or absolute time bounds into parsed datetimes.""" + normalized_since = (since or "").strip() + normalized_until = (until or "").strip() + if not normalized_since and not normalized_until: + return (None, None) + + current_time = now or datetime.now(timezone.utc) + since_dt = None + until_dt = None + if normalized_since: + since_dt = _parse_time_value(normalized_since, current_time, is_upper_bound=False) + if normalized_until: + until_dt = _parse_time_value(normalized_until, current_time, is_upper_bound=True) + + if ( + since_dt + and until_dt + and normalize_datetime_for_comparison(since_dt) + > normalize_datetime_for_comparison(until_dt) + ): + raise ValueError(f"{lower_label} must be earlier than or equal to {upper_label}") + + return (since_dt, until_dt) + + +def normalize_datetime_for_comparison(value: datetime) -> datetime: + """Normalize aware/naive datetimes so they can be compared safely.""" + return _comparison_datetime(value) + + +def matches_time_bounds( + value: Optional[datetime], + since: Optional[datetime] = None, + until: Optional[datetime] = None, +) -> bool: + """Return True when a datetime falls within resolved bounds.""" + if value is None: + return False + + comparable_value = normalize_datetime_for_comparison(value) + if since is not None and comparable_value < normalize_datetime_for_comparison(since): + return False + if until is not None and comparable_value > normalize_datetime_for_comparison(until): + return False + return True + + +def _parse_time_value(value: str, now: datetime, *, is_upper_bound: bool) -> datetime: + relative_match = _RELATIVE_RE.fullmatch(value) + if relative_match: + amount = int(relative_match.group("value")) + unit = relative_match.group("unit") + delta = _duration_from_unit(amount, unit) + return now - delta + + if _DATE_ONLY_RE.fullmatch(value): + parsed_date = datetime.strptime(value, "%Y-%m-%d").date() + if is_upper_bound: + combined = datetime.combine(parsed_date, time.max) + else: + combined = datetime.combine(parsed_date, time.min) + if now.tzinfo is not None: + return combined.replace(tzinfo=now.tzinfo) + return combined + + return parse_iso_datetime(value) + + +def _serialize_time_value(value: datetime) -> str: + if value.tzinfo is None: + return value.isoformat(timespec="milliseconds") + return format_iso8601(value) + + +def _comparison_datetime(value: datetime) -> datetime: + if value.tzinfo is not None: + return value + + local_tz = datetime.now().astimezone().tzinfo + if local_tz is None: + raise ValueError("Could not determine local timezone for time filter comparison") + return value.replace(tzinfo=local_tz) + + +def _duration_from_unit(amount: int, unit: str) -> timedelta: + if unit == "s": + return timedelta(seconds=amount) + if unit == "m": + return timedelta(minutes=amount) + if unit == "h": + return timedelta(hours=amount) + if unit == "d": + return timedelta(days=amount) + if unit == "w": + return timedelta(weeks=amount) + raise ValueError(f"Unsupported relative time unit: {unit}") diff --git a/tests/server/test_api_search.py b/tests/server/test_api_search.py index 07a2922ab..f3f376468 100644 --- a/tests/server/test_api_search.py +++ b/tests/server/test_api_search.py @@ -3,10 +3,13 @@ """Tests for search endpoints: find, search, grep, glob.""" +from datetime import datetime, timezone + import httpx import pytest from openviking.models.embedder.base import EmbedResult +from openviking.utils.time_utils import parse_iso_datetime @pytest.fixture(autouse=True) @@ -65,6 +68,96 @@ async def test_find_no_results(client: httpx.AsyncClient): assert resp.json()["status"] == "ok" +async def test_find_with_since_compiles_time_range(client: httpx.AsyncClient, service, monkeypatch): + captured = {} + + async def fake_find(*, filter=None, **kwargs): + captured["filter"] = filter + captured["kwargs"] = kwargs + return {"items": []} + + monkeypatch.setattr(service.search, "find", fake_find) + + resp = await client.post( + "/api/v1/search/find", + json={"query": "sample", "since": "2h"}, + ) + + assert resp.status_code == 200 + assert resp.json()["status"] == "ok" + assert captured["filter"]["op"] == "time_range" + assert captured["filter"]["field"] == "updated_at" + gte = parse_iso_datetime(captured["filter"]["gte"]) + delta = datetime.now(timezone.utc) - gte + assert 7_100 <= delta.total_seconds() <= 7_300 + + +async def test_find_combines_existing_filter_with_time_range( + client: httpx.AsyncClient, service, monkeypatch +): + captured = {} + + async def fake_find(*, filter=None, **kwargs): + captured["filter"] = filter + return {"items": []} + + monkeypatch.setattr(service.search, "find", fake_find) + + resp = await client.post( + "/api/v1/search/find", + json={ + "query": "sample", + "filter": {"op": "must", "field": "kind", "conds": ["email"]}, + "since": "2026-03-10", + "time_field": "created_at", + }, + ) + + assert resp.status_code == 200 + assert resp.json()["status"] == "ok" + assert captured["filter"] == { + "op": "and", + "conds": [ + {"op": "must", "field": "kind", "conds": ["email"]}, + { + "op": "time_range", + "field": "created_at", + "gte": "2026-03-10T00:00:00.000Z", + }, + ], + } + + +async def test_find_with_invalid_time_returns_422(client: httpx.AsyncClient): + resp = await client.post( + "/api/v1/search/find", + json={"query": "sample", "since": "not-a-time"}, + ) + + assert resp.status_code == 422 + assert resp.json()["detail"] + + +async def test_find_with_invalid_time_field_returns_422(client: httpx.AsyncClient): + resp = await client.post( + "/api/v1/search/find", + json={"query": "sample", "time_field": "published_at", "since": "2h"}, + ) + + assert resp.status_code == 422 + assert resp.json()["detail"] + + +async def test_find_with_inverted_mixed_time_range_returns_422(client: httpx.AsyncClient): + resp = await client.post( + "/api/v1/search/find", + json={"query": "sample", "since": "2099-01-01", "until": "2h"}, + ) + + assert resp.status_code == 422 + assert "earlier than or equal to" in resp.json()["detail"] + + async def test_search_basic(client_with_resource): client, uri = client_with_resource resp = await client.post( @@ -106,7 +199,7 @@ async def test_find_telemetry_metrics(client_with_resource): summary = body["telemetry"]["summary"] assert summary["operation"] == "search.find" assert "duration_ms" in summary - assert {"total", "llm", "embedding"}.issubset(summary["tokens"].keys()) + assert "tokens" not in summary assert "vector" in summary assert summary["vector"]["searches"] >= 0 assert "queue" not in summary @@ -127,7 +220,10 @@ async def test_search_telemetry_metrics(client_with_resource): body = resp.json() summary = body["telemetry"]["summary"] assert summary["operation"] == "search.search" - assert summary["vector"]["returned"] >= 0 + if body["result"]["total"] > 0: + assert summary["vector"]["returned"] == body["result"]["total"] + else: + assert "returned" not in summary["vector"] assert "queue" not in summary assert "semantic_nodes" not in summary assert "memory" not in summary @@ -169,6 +265,31 @@ async def test_find_rejects_events_telemetry_request(client_with_resource): assert "events" in body["error"]["message"] +async def test_search_with_until_compiles_time_range( + client: httpx.AsyncClient, service, monkeypatch +): + captured = {} + + async def fake_search(*, filter=None, **kwargs): + captured["filter"] = filter + return {"items": []} + + monkeypatch.setattr(service.search, "search", fake_search) + + resp = await client.post( + "/api/v1/search/search", + json={"query": "sample", "until": "2026-03-11", "time_field": "created_at"}, + ) + + assert resp.status_code == 200 + assert resp.json()["status"] == "ok" + assert captured["filter"] == { + "op": "time_range", + "field": "created_at", + "lte": "2026-03-11T23:59:59.999Z", + } + + async def test_grep(client_with_resource): client, uri = client_with_resource parent_uri = "/".join(uri.split("/")[:-1]) + "/" @@ -195,8 +316,6 @@ async def test_grep_case_insensitive(client_with_resource): assert resp.json()["status"] == "ok" - - async def test_grep_exclude_uri_excludes_specific_uri_range( client: httpx.AsyncClient, upload_temp_dir, @@ -231,7 +350,7 @@ async def test_grep_exclude_uri_excludes_specific_uri_range( assert body["status"] == "ok" matches = body["result"]["matches"] assert matches - assert all(not m["uri"].startswith(exclude_uri.rstrip('/')) for m in matches) + assert all(not m["uri"].startswith(exclude_uri.rstrip("/")) for m in matches) async def test_grep_exclude_uri_does_not_exclude_same_named_sibling_dirs( @@ -275,6 +394,7 @@ async def test_grep_exclude_uri_does_not_exclude_same_named_sibling_dirs( assert any(uri.startswith("viking://resources/group_b/cache/") for uri in uris) assert all(not uri.startswith("viking://resources/group_a/cache/") for uri in uris) + async def test_glob(client_with_resource): client, _ = client_with_resource resp = await client.post( diff --git a/tests/server/test_sdk_time_filters.py b/tests/server/test_sdk_time_filters.py new file mode 100644 index 000000000..31db3fd33 --- /dev/null +++ b/tests/server/test_sdk_time_filters.py @@ -0,0 +1,133 @@ +# Copyright (c) 2026 Beijing Volcano Engine Technology Co., Ltd. +# SPDX-License-Identifier: AGPL-3.0 + +from datetime import datetime, timedelta, timezone + +from openviking.server.identity import RequestContext, Role +from openviking.utils.time_utils import format_iso8601 +from openviking_cli.session.user_id import UserIdentifier + + +async def _seed_time_filter_records( + svc, + query: str, + records: dict[str, dict[str, str]], +) -> dict[str, str]: + embedder = svc.vikingdb_manager.get_embedder() + vector = embedder.embed(query).dense_vector + ctx = RequestContext(user=UserIdentifier.the_default_user(), role=Role.ROOT) + + for record in records.values(): + await svc.vikingdb_manager.upsert( + { + "uri": record["uri"], + "parent_uri": record["parent_uri"], + "is_leaf": True, + "abstract": record["abstract"], + "context_type": "resource", + "category": "", + "created_at": record["created_at"], + "updated_at": record["updated_at"], + "active_count": 0, + "vector": vector, + "meta": {}, + "related_uri": [], + "account_id": "default", + "owner_space": "", + "level": 2, + }, + ctx=ctx, + ) + + return {name: record["uri"] for name, record in records.items()} + + +async def _seed_find_time_filter_records(svc, query: str) -> dict[str, str]: + now = datetime.now(timezone.utc) + return await _seed_time_filter_records( + svc, + query, + { + "recent_email": { + "uri": "viking://resources/email/recent-invoice.md", + "parent_uri": "viking://resources/email", + "abstract": "Recent invoice follow-up thread", + "created_at": format_iso8601(now - timedelta(hours=1)), + "updated_at": format_iso8601(now - timedelta(hours=1)), + }, + "old_email": { + "uri": "viking://resources/email/old-invoice.md", + "parent_uri": "viking://resources/email", + "abstract": "Older invoice follow-up thread", + "created_at": format_iso8601(now - timedelta(days=10)), + "updated_at": format_iso8601(now - timedelta(days=10)), + }, + }, + ) + + +async def _seed_search_time_filter_records(svc, query: str) -> dict[str, str]: + now = datetime.now(timezone.utc) + return await _seed_time_filter_records( + svc, + query, + { + "recent_note": { + "uri": "viking://resources/watch-schedule/recent-search-time-filter.md", + "parent_uri": "viking://resources/watch-schedule", + "abstract": "Recent watch vs scheduled discussion", + "created_at": format_iso8601(now - timedelta(minutes=30)), + "updated_at": format_iso8601(now - timedelta(minutes=30)), + }, + "old_note": { + "uri": "viking://resources/watch-schedule/old-search-time-filter.md", + "parent_uri": "viking://resources/watch-schedule", + "abstract": "Old watch vs scheduled discussion", + "created_at": format_iso8601(now - timedelta(days=30)), + "updated_at": format_iso8601(now - timedelta(days=30)), + }, + }, + ) + + +async def test_sdk_find_respects_since_and_time_field(http_client): + client, svc = http_client + uris = await _seed_find_time_filter_records(svc, "invoice follow-up") + + result = await client.find( + query="invoice follow-up", + target_uri="viking://resources/email", + since="2d", + time_field="created_at", + limit=10, + ) + + found_uris = {item.uri for item in result.resources} + assert uris["recent_email"] in found_uris + assert uris["old_email"] not in found_uris + + +async def test_sdk_search_respects_since_default_updated_at(http_client): + client, svc = http_client + uris = await _seed_search_time_filter_records(svc, "watch vs scheduled") + + recent_result = await client.search( + query="watch vs scheduled", + target_uri="viking://resources/watch-schedule", + since="2h", + limit=10, + ) + old_result = await client.search( + query="watch vs scheduled", + target_uri="viking://resources/watch-schedule", + until="7d", + limit=10, + ) + + recent_uris = {item.uri for item in recent_result.resources} + old_uris = {item.uri for item in old_result.resources} + + assert uris["recent_note"] in recent_uris + assert uris["old_note"] not in recent_uris + assert uris["old_note"] in old_uris + assert uris["recent_note"] not in old_uris diff --git a/tests/unit/test_search_filters.py b/tests/unit/test_search_filters.py new file mode 100644 index 000000000..32fd239b0 --- /dev/null +++ b/tests/unit/test_search_filters.py @@ -0,0 +1,121 @@ +# Copyright (c) 2026 Beijing Volcano Engine Technology Co., Ltd. +# SPDX-License-Identifier: AGPL-3.0 + +from datetime import datetime, timezone + +import pytest + +from openviking.utils.search_filters import merge_time_filter +from openviking.utils.time_utils import parse_iso_datetime + + +def test_merge_time_filter_builds_relative_range(): + now = datetime(2026, 3, 11, 18, 0, tzinfo=timezone.utc) + + result = merge_time_filter(None, since="2h", now=now) + + assert result == { + "op": "time_range", + "field": "updated_at", + "gte": "2026-03-11T16:00:00.000Z", + } + + +def test_merge_time_filter_merges_with_existing_filter(): + now = datetime(2026, 3, 11, 18, 0, tzinfo=timezone.utc) + existing_filter = {"op": "must", "field": "kind", "conds": ["email"]} + + result = merge_time_filter( + existing_filter, + since="2026-03-10", + until="2026-03-11", + time_field="created_at", + now=now, + ) + + assert result == { + "op": "and", + "conds": [ + existing_filter, + { + "op": "time_range", + "field": "created_at", + "gte": "2026-03-10T00:00:00.000Z", + "lte": "2026-03-11T23:59:59.999Z", + }, + ], + } + + +def test_merge_time_filter_accepts_absolute_timestamp(): + result = merge_time_filter(None, until="2026-03-11T15:18:00Z") + + assert result == { + "op": "time_range", + "field": "updated_at", + "lte": "2026-03-11T15:18:00.000Z", + } + + +def test_merge_time_filter_treats_empty_filter_as_missing(): + result = merge_time_filter({}, since="2026-03-11") + + assert result == { + "op": "time_range", + "field": "updated_at", + "gte": "2026-03-11T00:00:00.000Z", + } + + +def test_merge_time_filter_rejects_inverted_range(): + with pytest.raises(ValueError, match="since must be earlier than or equal to until"): + merge_time_filter(None, since="2026-03-12", until="2026-03-11") + + +def test_merge_time_filter_handles_mixed_aware_and_naive_bounds(): + now = datetime(2026, 3, 11, 18, 0, tzinfo=timezone.utc) + + result = merge_time_filter(None, since="2h", until="2099-01-01", now=now) + + assert result == { + "op": "time_range", + "field": "updated_at", + "gte": "2026-03-11T16:00:00.000Z", + "lte": "2099-01-01T23:59:59.999Z", + } + + +def test_merge_time_filter_rejects_inverted_mixed_range(): + now = datetime(2026, 3, 11, 18, 0, tzinfo=timezone.utc) + + with pytest.raises(ValueError, match="since must be earlier than or equal to until"): + merge_time_filter(None, since="2099-01-01", until="2h", now=now) + + +def test_merge_time_filter_rejects_invalid_time_value(): + with pytest.raises(ValueError): + merge_time_filter(None, since="not-a-time") + + +def test_merge_time_filter_rejects_invalid_time_field(): + with pytest.raises(ValueError, match="time_field must be one of"): + merge_time_filter(None, since="2h", time_field="published_at") + + +def test_merge_time_filter_output_preserves_timezone_semantics(): + now = datetime(2026, 3, 11, 18, 0, tzinfo=timezone.utc) + + result = merge_time_filter(None, since="30m", until="2026-03-11", now=now) + + assert parse_iso_datetime(result["gte"]).tzinfo is not None + assert parse_iso_datetime(result["lte"]).tzinfo is not None + + +def test_merge_time_filter_date_only_uses_now_timezone(): + local_tz = timezone.utc + now = datetime(2026, 3, 11, 18, 0, tzinfo=local_tz) + + result = merge_time_filter(None, since="2026-03-11", until="2026-03-12", now=now) + + assert result["gte"].endswith("Z") + assert result["lte"].endswith("Z")