Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 35 additions & 2 deletions openviking/retrieve/hierarchical_retriever.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
"""

import heapq
from datetime import datetime
from typing import Any, Dict, List, Optional, Tuple

from openviking.models.embedder.base import EmbedResult
Expand All @@ -21,6 +22,7 @@
RelatedContext,
TypedQuery,
)
from openviking.retrieve.memory_lifecycle import hotness_score
from openviking_cli.utils.config import RerankConfig
from openviking_cli.utils.logger import get_logger

Expand All @@ -40,6 +42,7 @@ class HierarchicalRetriever:
SCORE_PROPAGATION_ALPHA = 0.5 # Score propagation coefficient
DIRECTORY_DOMINANCE_RATIO = 1.2 # Directory score must exceed max child score
GLOBAL_SEARCH_TOPK = 3 # Global retrieval count
HOTNESS_ALPHA = 0.2 # Weight for hotness score in final ranking (0 = disabled)

def __init__(
self,
Expand Down Expand Up @@ -416,7 +419,13 @@ async def _convert_to_matched_contexts(
candidates: List[Dict[str, Any]],
ctx: RequestContext,
) -> List[MatchedContext]:
"""Convert candidate results to MatchedContext list."""
"""Convert candidate results to MatchedContext list.

Blends semantic similarity with a hotness score derived from
``active_count`` and ``updated_at`` so that frequently-accessed,
recently-updated contexts get a ranking boost. The blend weight
is controlled by ``HOTNESS_ALPHA`` (0 disables the boost).
"""
results = []

for c in candidates:
Expand All @@ -433,6 +442,28 @@ async def _convert_to_matched_contexts(
if abstract:
relations.append(RelatedContext(uri=uri, abstract=abstract))

semantic_score = c.get("_final_score", c.get("_score", 0.0))

# --- hotness boost ---
updated_at_raw = c.get("updated_at")
if isinstance(updated_at_raw, str):
try:
updated_at_val = datetime.fromisoformat(updated_at_raw)
except (ValueError, TypeError):
updated_at_val = None
elif isinstance(updated_at_raw, datetime):
updated_at_val = updated_at_raw
else:
updated_at_val = None

h_score = hotness_score(
active_count=c.get("active_count", 0),
updated_at=updated_at_val,
)

alpha = self.HOTNESS_ALPHA
final_score = (1 - alpha) * semantic_score + alpha * h_score

results.append(
MatchedContext(
uri=c.get("uri", ""),
Expand All @@ -442,11 +473,13 @@ async def _convert_to_matched_contexts(
level=c.get("level", 2),
abstract=c.get("abstract", ""),
category=c.get("category", ""),
score=c.get("_final_score", c.get("_score", 0.0)),
score=final_score,
relations=relations,
)
)

# Re-sort by blended score so hotness boost can change ranking
results.sort(key=lambda x: x.score, reverse=True)
return results

def _get_root_uris_for_type(
Expand Down
64 changes: 64 additions & 0 deletions openviking/retrieve/memory_lifecycle.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
# Copyright (c) 2026 Beijing Volcano Engine Technology Co., Ltd.
# SPDX-License-Identifier: Apache-2.0
"""Hotness scoring for cold/hot memory lifecycle management (#296).

Provides a pure function to compute a 0.0–1.0 hotness score based on
access frequency (active_count) and recency (updated_at). The score
can be blended with semantic similarity to boost frequently-accessed,
recently-updated contexts in search results.
"""

import math
from datetime import datetime, timezone
from typing import Optional

# Default half-life in days for the exponential time-decay component.
DEFAULT_HALF_LIFE_DAYS: float = 7.0


def hotness_score(
active_count: int,
updated_at: Optional[datetime],
now: Optional[datetime] = None,
half_life_days: float = DEFAULT_HALF_LIFE_DAYS,
) -> float:
"""Compute a 0.0–1.0 hotness score.

Formula::

score = sigmoid(log1p(active_count)) * time_decay(updated_at)

* **sigmoid** maps ``log1p(active_count)`` into (0, 1).
* **time_decay** is an exponential decay with configurable half-life;
returns 0.0 when *updated_at* is ``None``.

Args:
active_count: Number of times this context was retrieved/accessed.
updated_at: Last update / access timestamp (preferably UTC).
now: Current time override (useful for deterministic tests).
half_life_days: Half-life for the recency decay, in days.

Returns:
A float in [0.0, 1.0].
"""
if now is None:
now = datetime.now(timezone.utc)

# --- frequency component ---
freq = 1.0 / (1.0 + math.exp(-math.log1p(active_count)))

# --- recency component ---
if updated_at is None:
return 0.0

# Normalise to aware UTC so subtraction always works.
if updated_at.tzinfo is None:
updated_at = updated_at.replace(tzinfo=timezone.utc)
if now.tzinfo is None:
now = now.replace(tzinfo=timezone.utc)

age_days = max((now - updated_at).total_seconds() / 86400.0, 0.0)
decay_rate = math.log(2) / half_life_days
recency = math.exp(-decay_rate * age_days)

return freq * recency
126 changes: 126 additions & 0 deletions tests/test_memory_lifecycle.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
# Copyright (c) 2026 Beijing Volcano Engine Technology Co., Ltd.
# SPDX-License-Identifier: Apache-2.0
"""Tests for memory lifecycle hotness scoring (#296)."""

import math
from datetime import datetime, timedelta, timezone

import pytest

from openviking.retrieve.memory_lifecycle import DEFAULT_HALF_LIFE_DAYS, hotness_score


NOW = datetime(2026, 2, 26, 12, 0, 0, tzinfo=timezone.utc)


class TestHotnessScore:
"""Unit tests for hotness_score()."""

def test_zero_active_count_just_now(self):
"""active_count=0, just updated -> sigmoid(log1p(0))=0.5, decay≈1.0."""
score = hotness_score(0, NOW, now=NOW)
assert 0.49 < score < 0.51 # sigmoid(0) = 0.5

def test_high_active_count_just_now(self):
"""active_count=1000, just updated -> close to 1.0."""
score = hotness_score(1000, NOW, now=NOW)
assert score > 0.95

def test_old_memory(self):
"""active_count=10, 30 days ago -> very low score."""
old = NOW - timedelta(days=30)
score = hotness_score(10, old, now=NOW)
assert score < 0.1

def test_recent_memory(self):
"""active_count=5, 1 hour ago -> moderate-high score."""
recent = NOW - timedelta(hours=1)
score = hotness_score(5, recent, now=NOW)
assert 0.5 < score < 1.0

def test_none_updated_at(self):
"""updated_at=None -> score must be 0.0."""
score = hotness_score(100, None, now=NOW)
assert score == 0.0

def test_half_life_decay(self):
"""At exactly half_life_days, recency component should be ~0.5."""
at_half = NOW - timedelta(days=DEFAULT_HALF_LIFE_DAYS)
score = hotness_score(0, at_half, now=NOW)
# freq = sigmoid(0) = 0.5, recency ≈ 0.5 => score ≈ 0.25
assert 0.24 < score < 0.26

def test_custom_half_life(self):
"""Custom half_life_days should change decay rate."""
at_14_days = NOW - timedelta(days=14)
score_7 = hotness_score(5, at_14_days, now=NOW, half_life_days=7.0)
score_30 = hotness_score(5, at_14_days, now=NOW, half_life_days=30.0)
# With half_life=30, decay is slower, so score should be higher
assert score_30 > score_7

def test_naive_datetime_treated_as_utc(self):
"""Timezone-naive datetimes should be handled without error."""
naive_now = datetime(2026, 2, 26, 12, 0, 0)
naive_updated = datetime(2026, 2, 26, 11, 0, 0)
score = hotness_score(5, naive_updated, now=naive_now)
assert 0.0 < score < 1.0

def test_monotonic_with_active_count(self):
"""Higher active_count -> higher score (all else equal)."""
s1 = hotness_score(1, NOW, now=NOW)
s2 = hotness_score(10, NOW, now=NOW)
s3 = hotness_score(100, NOW, now=NOW)
assert s1 < s2 < s3

def test_monotonic_with_recency(self):
"""More recent -> higher score (all else equal)."""
s_old = hotness_score(5, NOW - timedelta(days=30), now=NOW)
s_mid = hotness_score(5, NOW - timedelta(days=3), now=NOW)
s_new = hotness_score(5, NOW - timedelta(hours=1), now=NOW)
assert s_old < s_mid < s_new


class TestHotnessBlending:
"""Tests for the blending logic (alpha weighting)."""

def test_alpha_zero_preserves_semantic_order(self):
"""With alpha=0, final score equals semantic score exactly."""
semantic = 0.85
alpha = 0.0
h = hotness_score(100, NOW, now=NOW)
blended = (1 - alpha) * semantic + alpha * h
assert blended == pytest.approx(semantic)

def test_hotness_boost_can_rerank(self):
"""A hot memory with lower semantic score can overtake a cold one."""
alpha = 0.4 # aggressive weight for demonstration

# Memory A: high semantic, cold (old, low access)
sem_a = 0.8
h_a = hotness_score(1, NOW - timedelta(days=60), now=NOW)
blended_a = (1 - alpha) * sem_a + alpha * h_a

# Memory B: lower semantic, hot (recent, high access)
sem_b = 0.6
h_b = hotness_score(500, NOW, now=NOW)
blended_b = (1 - alpha) * sem_b + alpha * h_b

# B should overtake A due to hotness
assert blended_b > blended_a

def test_default_alpha_preserves_semantic_dominance(self):
"""With default alpha=0.2, a large semantic gap is not overturned."""
alpha = 0.2

# Memory A: much higher semantic, cold
sem_a = 0.9
h_a = hotness_score(0, NOW - timedelta(days=30), now=NOW)
blended_a = (1 - alpha) * sem_a + alpha * h_a

# Memory B: much lower semantic, hot
sem_b = 0.3
h_b = hotness_score(1000, NOW, now=NOW)
blended_b = (1 - alpha) * sem_b + alpha * h_b

# A should still win — semantic dominance preserved
assert blended_a > blended_b