diff --git a/wren-ai-service/src/pipelines/indexing/db_schema.py b/wren-ai-service/src/pipelines/indexing/db_schema.py index 394d087b46..9b06b1a39c 100644 --- a/wren-ai-service/src/pipelines/indexing/db_schema.py +++ b/wren-ai-service/src/pipelines/indexing/db_schema.py @@ -1,4 +1,3 @@ -import asyncio import logging import sys import uuid @@ -29,7 +28,7 @@ @component class DDLChunker: @component.output_types(documents=List[Document]) - async def run( + def run( self, mdl: Dict[str, Any], column_batch_size: int, @@ -48,7 +47,7 @@ def _additional_meta() -> Dict[str, Any]: }, "content": chunk["payload"], } - for chunk in await self._get_ddl_commands( + for chunk in self._get_ddl_commands( **mdl, column_batch_size=column_batch_size ) ] @@ -63,7 +62,7 @@ def _additional_meta() -> Dict[str, Any]: ] } - async def _model_preprocessor( + def _model_preprocessor( self, models: List[Dict[str, Any]], **kwargs ) -> List[Dict[str, Any]]: def _column_preprocessor( @@ -81,9 +80,9 @@ def _column_preprocessor( **addition, } - async def _preprocessor(model: Dict[str, Any], **kwargs) -> Dict[str, Any]: + def _preprocessor(model: Dict[str, Any], **kwargs) -> Dict[str, Any]: addition = { - key: await helper(model, **kwargs) + key: helper(model, **kwargs) for key, helper in helper.MODEL_PREPROCESSORS.items() if helper.condition(model, **kwargs) } @@ -100,11 +99,9 @@ async def _preprocessor(model: Dict[str, Any], **kwargs) -> Dict[str, Any]: "primaryKey": model.get("primaryKey", ""), } - tasks = [_preprocessor(model, **kwargs) for model in models] - - return await asyncio.gather(*tasks) + return [_preprocessor(model, **kwargs) for model in models] - async def _get_ddl_commands( + def _get_ddl_commands( self, models: List[Dict[str, Any]], relationships: List[Dict[str, Any]], @@ -115,7 +112,7 @@ async def _get_ddl_commands( ) -> List[dict]: return ( self._convert_models_and_relationships( - await self._model_preprocessor(models, **kwargs), + self._model_preprocessor(models, **kwargs), relationships, column_batch_size, ) @@ -300,13 +297,13 @@ def validate_mdl(mdl_str: str, validator: MDLValidator) -> Dict[str, Any]: @observe(capture_input=False) -async def chunk( +def chunk( mdl: Dict[str, Any], chunker: DDLChunker, column_batch_size: int, project_id: Optional[str] = None, ) -> Dict[str, Any]: - return await chunker.run( + return chunker.run( mdl=mdl, column_batch_size=column_batch_size, project_id=project_id, diff --git a/wren-ai-service/tests/pytest/pipelines/indexing/test_db_schema.py b/wren-ai-service/tests/pytest/pipelines/indexing/test_db_schema.py index 20bd8ac682..a78d653ee2 100644 --- a/wren-ai-service/tests/pytest/pipelines/indexing/test_db_schema.py +++ b/wren-ai-service/tests/pytest/pipelines/indexing/test_db_schema.py @@ -8,17 +8,15 @@ from src.pipelines.indexing.db_schema import DBSchema, DDLChunker -@pytest.mark.asyncio -async def test_empty_mdl(): +def test_empty_mdl(): chunker = DDLChunker() mdl = {"models": [], "views": [], "relationships": [], "metrics": []} - document = await chunker.run(mdl, column_batch_size=1) + document = chunker.run(mdl, column_batch_size=1) assert document == {"documents": []} -@pytest.mark.asyncio -async def test_single_model(): +def test_single_model(): chunker = DDLChunker() mdl = { "models": [ @@ -35,7 +33,7 @@ async def test_single_model(): "metrics": [], } - actual = await chunker.run(mdl, column_batch_size=1) + actual = chunker.run(mdl, column_batch_size=1) assert len(actual["documents"]) == 1 document: Document = actual["documents"][0] @@ -49,8 +47,7 @@ async def test_single_model(): ) -@pytest.mark.asyncio -async def test_multiple_models(): +def test_multiple_models(): chunker = DDLChunker() mdl = { "models": [ @@ -74,7 +71,7 @@ async def test_multiple_models(): "metrics": [], } - actual = await chunker.run(mdl, column_batch_size=1) + actual = chunker.run(mdl, column_batch_size=1) assert len(actual["documents"]) == 2 document_1: Document = actual["documents"][0] @@ -98,8 +95,7 @@ async def test_multiple_models(): ) -@pytest.mark.asyncio -async def test_column_is_primary_key(): +def test_column_is_primary_key(): chunker = DDLChunker() mdl = { "models": [ @@ -119,7 +115,7 @@ async def test_column_is_primary_key(): "metrics": [], } - actual = await chunker.run(mdl, column_batch_size=1) + actual = chunker.run(mdl, column_batch_size=1) assert len(actual["documents"]) == 2 document_0: Document = actual["documents"][0] @@ -140,8 +136,7 @@ async def test_column_is_primary_key(): ) -@pytest.mark.asyncio -async def test_column_with_properties(): +def test_column_with_properties(): chunker = DDLChunker() mdl = { "models": [ @@ -164,7 +159,7 @@ async def test_column_with_properties(): "metrics": [], } - actual = await chunker.run(mdl, column_batch_size=1) + actual = chunker.run(mdl, column_batch_size=1) assert len(actual["documents"]) == 2 document_0: Document = actual["documents"][0] @@ -195,8 +190,7 @@ async def test_column_with_properties(): ) -@pytest.mark.asyncio -async def test_column_with_nested_columns(): +def test_column_with_nested_columns(): chunker = DDLChunker() mdl = { "models": [ @@ -221,7 +215,7 @@ async def test_column_with_nested_columns(): "metrics": [], } - actual = await chunker.run(mdl, column_batch_size=1) + actual = chunker.run(mdl, column_batch_size=1) assert len(actual["documents"]) == 2 document_0: Document = actual["documents"][0] @@ -242,8 +236,7 @@ async def test_column_with_nested_columns(): ) -@pytest.mark.asyncio -async def test_column_with_calculated_property(): +def test_column_with_calculated_property(): chunker = DDLChunker() mdl = { "models": [ @@ -264,7 +257,7 @@ async def test_column_with_calculated_property(): "metrics": [], } - actual = await chunker.run(mdl, column_batch_size=1) + actual = chunker.run(mdl, column_batch_size=1) assert len(actual["documents"]) == 2 document_0: Document = actual["documents"][0] @@ -285,8 +278,7 @@ async def test_column_with_calculated_property(): ) -@pytest.mark.asyncio -async def test_column_with_relationship(): +def test_column_with_relationship(): chunker = DDLChunker() mdl = { "models": [ @@ -328,7 +320,7 @@ async def test_column_with_relationship(): "metrics": [], } - actual = await chunker.run(mdl, column_batch_size=1) + actual = chunker.run(mdl, column_batch_size=1) assert len(actual["documents"]) == 6 document_0: Document = actual["documents"][0] @@ -381,8 +373,7 @@ async def test_column_with_relationship(): ) -@pytest.mark.asyncio -async def test_column_batch_size(): +def test_column_batch_size(): chunker = DDLChunker() mdl = { "models": [ @@ -399,7 +390,7 @@ async def test_column_batch_size(): "relationships": [], "metrics": [], } - actual = await chunker.run(mdl, column_batch_size=2) + actual = chunker.run(mdl, column_batch_size=2) assert len(actual["documents"]) == 3 document_0: Document = actual["documents"][0] @@ -444,8 +435,7 @@ async def test_column_batch_size(): ) -@pytest.mark.asyncio -async def test_view(): +def test_view(): chunker = DDLChunker() mdl = { "models": [], @@ -453,7 +443,7 @@ async def test_view(): "relationships": [], "metrics": [], } - actual = await chunker.run(mdl, column_batch_size=1) + actual = chunker.run(mdl, column_batch_size=1) assert len(actual["documents"]) == 1 document_0: Document = actual["documents"][0] @@ -468,8 +458,7 @@ async def test_view(): ) -@pytest.mark.asyncio -async def test_view_with_properties(): +def test_view_with_properties(): chunker = DDLChunker() mdl = { "models": [], @@ -483,7 +472,7 @@ async def test_view_with_properties(): "relationships": [], "metrics": [], } - actual = await chunker.run(mdl, column_batch_size=1) + actual = chunker.run(mdl, column_batch_size=1) assert len(actual["documents"]) == 1 document_0: Document = actual["documents"][0] @@ -498,8 +487,7 @@ async def test_view_with_properties(): ) -@pytest.mark.asyncio -async def test_metric(): +def test_metric(): chunker = DDLChunker() mdl = { "models": [], @@ -518,7 +506,7 @@ async def test_metric(): } ], } - actual = await chunker.run(mdl, column_batch_size=1) + actual = chunker.run(mdl, column_batch_size=1) assert len(actual["documents"]) == 1 document_0: Document = actual["documents"][0]