Skip to content

Search optimization and indexing based on datetime #405

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: main
Choose a base branch
from

Conversation

GrzegorzPustulka
Copy link
Contributor

@GrzegorzPustulka GrzegorzPustulka commented Jun 18, 2025

Related Issue(s):

Index Management System with Time-based Partitioning

Description

This PR introduces a new index management system that enables automatic index partitioning based on dates and index size control with automatic splitting.

How it works

System Architecture

The system consists of several main components:

1. Search Engine Adapters

  • SearchEngineAdapter - base class
  • ElasticsearchAdapter and OpenSearchAdapter - implementations for specific engines

2. Index Selection Strategies

  • AsyncDatetimeBasedIndexSelector / SyncDatetimeBasedIndexSelector - date-based index filtering
  • UnfilteredIndexSelector - returns all indexes (fallback)
  • Cache with TTL (default 1 hour) for performance

3. Data Insertion Strategies

  • Simple strategy: one index per collection (behavior as before)
  • Datetime strategy: indexes partitioned by dates with automatic partitioning

Datetime Strategy - Operation Details

Index Format:

items_collection-name_2025-01-01-2025-03-31

Item Insertion Process:

  1. System checks item date (properties.datetime)
  2. Looks for existing index that covers this date
  3. If not found - creates new index from this date
  4. Checks target index size
  5. If exceeds limit (DATETIME_INDEX_MAX_SIZE_GB) - splits index

Early Date Handling:
If item has date earlier than oldest index:

  1. Creates new index from this earlier date
  2. Updates oldest index alias to end one day before new date

Index Splitting:
When index exceeds size limit:

  1. Updates current index alias to end on last item's date
  2. Creates new index from next day
  3. New items go to new index

Cache and Performance

IndexCacheManager:

  • Stores mapping of collection aliases to index lists
  • TTL default 1 hour
  • Automatic refresh on expiration
  • Manual refresh after index modifications

AsyncIndexAliasLoader / SyncIndexAliasLoader:

  • Load alias information from search engine
  • Use cache manager to store results
  • Async and sync versions for different usage contexts

Configuration

New Environment Variables:

# Enable datetime strategy (default false)
ENABLE_DATETIME_INDEX_FILTERING=true

# Maximum index size in GB before splitting (default 25)
DATETIME_INDEX_MAX_SIZE_GB=50

Usage Examples

Scenario 1: Adding items to new collection

  1. First item with date 2025-01-15 → creates index items_collection_2025-01-15
  2. Subsequent items with similar dates → go to same index

Scenario 2: Size limit exceeded

  1. Index items_collection_2025-01-01 reaches 25GB
  2. New item with date 2025-03-15 → system splits index:
    • Old: items_collection_2025-01-01-2025-03-15
    • New: items_collection_2025-03-16

Scenario 3: Item with early date

  1. Existing index: items_collection_2025-02-01
  2. New item with date 2024-12-15 → creates:
    • New: items_collection_2024-12-15-2025-01-31

Search

System automatically filters indexes during search:

Query with date range:

{
  "datetime": {
    "gte": "2025-02-01",
    "lte": "2025-02-28"
  }
}

Searches only indexes containing items from this period, instead of all collection indexes.

Factories

IndexSelectorFactory:

  • Creates appropriate selector based on configuration
  • create_async_selector() / create_sync_selector()

IndexInsertionFactory:

  • Creates insertion strategy based on configuration
  • Automatically detects engine type and creates appropriate adapter

SearchEngineAdapterFactory:

  • Detects whether you're using Elasticsearch or OpenSearch
  • Creates appropriate adapter with engine-specific methods

Backward Compatibility

  • When ENABLE_DATETIME_INDEX_FILTERING=false → works as before
  • Existing indexes remain unchanged

All operations have sync and async versions for different usage contexts in the application.

PR Checklist:

  • Code is formatted and linted (run pre-commit run --all-files)
  • Tests pass (run make test)
  • Documentation has been updated to reflect changes, if applicable
  • Changes are added to the changelog

@GrzegorzPustulka GrzegorzPustulka marked this pull request as ready for review July 7, 2025 20:01
@GrzegorzPustulka
Copy link
Contributor Author

GrzegorzPustulka commented Jul 8, 2025

@jonhealy1
@StijnCaerts
@jamesfisher-geo

The MR is already finished and ready for code review.

@jonhealy1
Copy link
Collaborator

@GrzegorzPustulka There's a couple of conflicts now. They don't look too bad. I have been travelling but am going to try to review this in the next few days,

@jonhealy1 jonhealy1 requested review from jonhealy1, jamesfisher-geo, rhysrevans3 and StijnCaerts and removed request for jamesfisher-geo July 20, 2025 11:33
@jonhealy1
Copy link
Collaborator

@jamesfisher-geo @StijnCaerts @rhysrevans3 Hi. Added you guys as reviewers if you have time to have a look :)

Copy link
Collaborator

@rhysrevans3 rhysrevans3 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks okay to me but I have a couple of questions.

Comment on lines +335 to +336
logger.error(f"Invalid interval format: {datetime}, error: {e}")
datetime_search = None
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this error be returned to the user rather than continuing the search without a datetime filter?

Comment on lines +515 to +520
except (ValueError, TypeError) as e:
# Handle invalid interval formats if return_date fails
logger.error(
f"Invalid interval format: {search_request.datetime}, error: {e}"
)
datetime_search = None
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As above.

Comment on lines +132 to +143
def create_index_name(collection_id: str, start_date: str) -> str:
"""Create index name from collection ID and start date.

Args:
collection_id (str): Collection identifier.
start_date (str): Start date for the index.

Returns:
str: Formatted index name.
"""
cleaned = collection_id.translate(_ES_INDEX_NAME_UNSUPPORTED_CHARS_TABLE)
return f"{ITEMS_INDEX_PREFIX}{cleaned.lower()}_{start_date}"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this the equivalent of index_by_collection_id for the simple method? If it is should it not also include the hex of the collection_id and -000001?

What's the benefit of having the start datetime in the index name could you just have it in the alias with the end datetime? You could just use a count to prevent index name clashes.

You would then only need to create a new index when you exceed the max size and not for earlier items. If the item's start datetime is earlier or the end datetime is later than the current alias then update the alias.

Comment on lines +13 to +21
def __init__(self, cache_ttl_seconds: int = 3600):
"""Initialize the cache manager.

Args:
cache_ttl_seconds (int): Time-to-live for cache entries in seconds.
"""
self._cache: Optional[Dict[str, List[str]]] = None
self._timestamp: float = 0
self._ttl = cache_ttl_seconds
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be better to just update the cache as aliases are set/updated rather than polling ES every hour?

Copy link
Collaborator

@jamesfisher-geo jamesfisher-geo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall looks great. I've got some comments around error handling and some cache handling as well.

This PR will add a lot of future maintenance burden in it's current form. How about we implement only in async and not include the sync code. That would cut down on repetitive code in this PR.

@jonhealy1 @GrzegorzPustulka what are your thoughts on this?

@@ -342,6 +348,7 @@ async def item_collection(
sort=None,
token=token,
collection_ids=[collection_id],
datetime_search=datetime_search,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this needed? We apply the datetime_search to the search variable on line 331. If this is optional, could we omit it?

@@ -560,6 +574,7 @@ async def post_search(
token=search_request.token,
sort=sort,
collection_ids=search_request.collections,
datetime_search=datetime_search,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here -- Is this needed? We apply the datetime_search to the search variable on line 513. If this is optional, could we omit it?

class ElasticsearchAdapter(SearchEngineAdapter):
"""Elasticsearch-specific adapter implementation."""

async def create_simple_index(self, client: Any, collection_id: str) -> str:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The index mappings and setting are missing from ElasticsearchAdapter().create_simple_index(). Could you include the mappings here like is done in OpenSearchAdapter()._create_index_body()

The patterns for creating an index should be the same between ElasticsearchAdapter() and OpenSearchAdapter() IMO. How about creating a _create_index_body() method in ElasticsearchAdapter()?

Returns:
SearchEngineType: Detected engine type.
"""
return (
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about using isInstance() here rather than matching the string?

return (
       OpenSearchAdapter()
            if isInstance(client, (OpenSearch, AsyncOpenSearch))
            else ElasticsearchAdapter()
)

"""Factory for creating search engine adapters."""

@staticmethod
def create_adapter(engine_type: SearchEngineType) -> SearchEngineAdapter:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this function necessary? See comment below

)
return product_datetime

async def handle_new_collection(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logging statements in handle_new_collection() and handle_new_collection_sync() would be useful


_instance = None

def __new__(cls, client):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a bit confused with this implementation. Maybe I am missing something. Could this be replaced with the normal method of instance creation using __init__()

def __init__(self, client: Any):
         self.cache_manager = IndexCacheManager()
         self.alias_loader = AsyncIndexAliasLoader(client, self.cache_manager)

class IndexCacheManager:
"""Manages caching of index aliases with expiration."""

def __init__(self, cache_ttl_seconds: int = 3600):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe some concurrency management is needed here because multiple threads may be attempting to access the cache resource at the same time. From what I have found threading.Lock() should work.

https://docs.python.org/3/library/threading.html#lock-objects

The following (untested) should place a lock on the cache when accessing it and release it when finished

import threading
class IndexCacheManager:
    def __init__(self, cache_ttl_seconds: int = 3600):
        self._cache: Optional[Dict[str, List[str]]] = None
        self._timestamp: float = 0
        self._ttl = cache_ttl_seconds
        self._lock = threading.Lock()

def get_cache(self) -> Optional[Dict[str, List[str]]]:
        """Get the current cache if not expired.
        Returns:
            Optional[Dict[str, List[str]]]: Cache data if valid, None if expired.
        """
    with self._lock:        
        if self.is_expired:
            return None
        return self._cache

"""
if self.is_expired:
return None
return self._cache
Copy link
Collaborator

@jamesfisher-geo jamesfisher-geo Jul 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Returning the _cache object here could be problematic because it is a pointer to the actual cache. How about returning a copy?
return {k: v.copy() for k, v in self._cache.items()}

return (
SyncDatetimeBasedIndexSelector(sync_client)
if use_datetime_filtering
else UnfilteredIndexSelector()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But the UnfilteredIndexSelector() is async

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants