Skip to content

Comments

feat: add checkpoint aput method and test#269

Merged
averikitsch merged 13 commits intogoogleapis:langgraph-basefrom
carloszuag:langgraph-base-aput
Mar 5, 2025
Merged

feat: add checkpoint aput method and test#269
averikitsch merged 13 commits intogoogleapis:langgraph-basefrom
carloszuag:langgraph-base-aput

Conversation

@carloszuag
Copy link

Thank you for opening a Pull Request! Before submitting your PR, there are a few things you can do to make sure it goes smoothly:

  • Make sure to open an issue as a bug/issue before writing your code! That way we can discuss the change, evaluate designs, and agree on the general idea
  • Ensure the tests and linter pass
  • Code coverage does not decrease (if any source code was changed)
  • Appropriate docs were updated (if necessary)

Fixes #<issue_number_goes_here> 🦕

@carloszuag carloszuag requested review from a team as code owners February 14, 2025 22:49
@product-auto-label product-auto-label bot added the api: cloudsql-postgres Issues related to the googleapis/langchain-google-cloud-sql-pg-python API. label Feb 14, 2025
@carloszuag carloszuag changed the title feat : create checkpoint aput method and test feat: add checkpoint aput method and test Feb 14, 2025
@averikitsch
Copy link
Collaborator

/gcbrun

@averikitsch
Copy link
Collaborator

/gcbrun

@@ -754,14 +753,14 @@ async def _ainit_checkpoint_table(
self,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make sure this is the exact same as the AlloyDB library? For example the arguments are in a different order https://github.com/googleapis/langchain-google-alloydb-pg-python/blob/4f24eea67ff0aeec547463f3a77a07f6da209718/src/langchain_google_alloydb_pg/engine.py#L770
and missing quotes around the table name https://github.com/googleapis/langchain-google-alloydb-pg-python/blob/4f24eea67ff0aeec547463f3a77a07f6da209718/src/langchain_google_alloydb_pg/engine.py#L783. please make sure there are no other difference beyond naming differences

Copy link
Author

@carloszuag carloszuag Feb 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it is the same implementation of the method but, you are right. There was a mismatch in the arguments. It's fixed now.

self.schema_name = schema_name

@classmethod
async def create(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please make sure the argument order matches AlloyDB

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

idem, there was a mismatching. It's addressed.

assert dict(next_config) == test_config

# Verify if the checkpoint is stored correctly in the database
results = await afetch(async_engine, f"SELECT * FROM {table_name}")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Table names need to be in quotes

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have added too all table names the quotes.

yield async_engine
# use default table for AsyncPostgresSaver.
await aexecute(async_engine, f'DROP TABLE IF EXISTS "{table_name}"')
await aexecute(async_engine, f'DROP TABLE IF EXISTS"{table_name_writes}"')
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missing space

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.


project_id = os.environ["PROJECT_ID"]
region = os.environ["REGION"]
cluster_id = os.environ["CLUSTER_ID"]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed.

async_engine = await PostgresEngine.afrom_instance(
project_id=project_id,
region=region,
cluster=cluster_id,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed.

return result_fetch


@pytest_asyncio.fixture ##(scope="module")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove comment

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed. There was another comment just like that and, I have also removed it.

@averikitsch
Copy link
Collaborator

/gcbrun

@averikitsch
Copy link
Collaborator

/usr/local/lib/python3.9/importlib/__init__.py:127: in import_module
    return _bootstrap._gcd_import(name[level:], package, level)
tests/test_engine.py:31: in <module>
    from langchain_google_cloud_sql_pg.engine import (
E   ImportError: cannot import name 'CHECKPOINT_WRITES_TABLE' from 'langchain_google_cloud_sql_pg.engine' (/builder/home/.local/lib/python3.9/site-packages/langchain_google_cloud_sql_pg/engine.py)

@averikitsch
Copy link
Collaborator

/gcbrun

@averikitsch
Copy link
Collaborator

/gcbrun

)

return metadata.tables[f"{schema_name}.{table_name}"]
return metadata.tables[f'"{schema_name}"."{table_name}"']
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove change

await aexecute(async_engine, f'DROP TABLE IF EXISTS "{table_name}"')
await aexecute(async_engine, f'DROP TABLE IF EXISTS "{table_name_writes}"')
await async_engine.close()
await async_engine._connector.close()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use .close_async()

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a doubt here, do I need to use await "async_engine.close_async()" instead of "await async_engine.close()"?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The for the connector use close_async() for the engine continue to use close(). https://github.com/GoogleCloudPlatform/cloud-sql-python-connector?tab=readme-ov-file#async-context-manager

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed this and I added the "async_engine.close_asyn()" just as you suggested.

@averikitsch
Copy link
Collaborator

/gcbrun


# Verify that the query is executed on the custom table.
stmt = f"SELECT column_name, data_type FROM information_schema.columns WHERE table_name = '{custom_table_name}';"
stmt = f'SELECT column_name, data_type FROM information_schema.columns WHERE table_name = "{table_name}";'
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These values shouldn't be in double quotes only single quotes

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've removes double quotes here.

Copy link
Collaborator

@averikitsch averikitsch left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

____________________________ test_checkpoint_alist _____________________________

async_engine = <langchain_google_cloud_sql_pg.engine.PostgresEngine object at 0x7f8ed8342310>
checkpointer = <langchain_google_cloud_sql_pg.async_checkpoint.AsyncPostgresSaver object at 0x7f8ed83ffbd0>
test_data = {'checkpoints': [{'channel_values': {'my_key': 'meow', 'node': 'node'}, 'channel_versions': {'start': 2, 'my_key':...rce': 'input', 'step': 2, 'writes': {}}, {'parents': None, 'source': 'loop', 'step': 1, 'writes': {'foo': 'bar'}}, {}]}

@pytest.mark.asyncio
async def test_checkpoint_alist(
    async_engine: PostgresEngine,
    checkpointer: AsyncPostgresSaver,
    test_data: dict[str, Any],
) -> None:
    configs = test_data["configs"]
    checkpoints = test_data["checkpoints"]
    metadata = test_data["metadata"]

    await checkpointer.aput(configs[1], checkpoints[1], metadata[0], {})
    await checkpointer.aput(configs[2], checkpoints[2], metadata[1], {})
    await checkpointer.aput(configs[3], checkpoints[3], metadata[2], {})

    # call method / assertions
    query_1 = {"source": "input"}  # search by 1 key
    query_2 = {
        "step": 1,
        "writes": {"foo": "bar"},
    }  # search by multiple keys
    query_3: dict[str, Any] = {}  # search by no keys, return all checkpoints
    query_4 = {"source": "update", "step": 1}  # no match
  search_results_1 = [c async for c in checkpointer.alist(None, filter=query_1)]

tests/test_async_checkpoint.py:242:


tests/test_async_checkpoint.py:242: in
search_results_1 = [c async for c in checkpointer.alist(None, filter=query_1)]
/builder/home/.local/lib/python3.11/site-packages/langchain_google_cloud_sql_pg/async_checkpoint.py:444: in alist
result = await conn.stream(text(query), args)
/builder/home/.local/lib/python3.11/site-packages/sqlalchemy/ext/asyncio/base.py:151: in start
start_value = await util.anext_(self.gen)
/builder/home/.local/lib/python3.11/site-packages/sqlalchemy/ext/asyncio/engine.py:582: in stream
result = await greenlet_spawn(
/builder/home/.local/lib/python3.11/site-packages/sqlalchemy/util/_concurrency_py3k.py:190: in greenlet_spawn
result = context.switch(*args, **kwargs)
/builder/home/.local/lib/python3.11/site-packages/sqlalchemy/engine/base.py:1412: in execute
distilled_parameters = _distill_params_20(parameters)
lib/sqlalchemy/cyextension/util.pyx:32: in sqlalchemy.cyextension.util._distill_params_20
???


???
E sqlalchemy.exc.ArgumentError: List argument must consist only of tuples or dictionaries

lib/sqlalchemy/cyextension/util.pyx:23: ArgumentError

@averikitsch
Copy link
Collaborator

/gcbrun


# Verify that the query is executed on the custom table.
stmt = f"SELECT column_name, data_type FROM information_schema.columns WHERE table_name = '{custom_table_name}';"
stmt = f'SELECT column_name, data_type FROM information_schema.columns WHERE table_name = "{table_name_writes}";'
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please update to use single quotes around table name

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I've replaced the single quotes for double quotes instead.

table_name = f"checkpoint{uuid.uuid4()}"
table_name_writes = f"{table_name}_writes"
await engine.ainit_checkpoint_table(table_name=table_name)
stmt = f'SELECT column_name, data_type FROM information_schema.columns WHERE table_name = "{table_name_writes}";'
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please update to use single quotes around table name

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was addressed.

table_name = f"checkpoint{uuid.uuid4()}"
table_name_writes = f"{table_name}_writes"
engine.init_checkpoint_table(table_name=table_name)
stmt = f'SELECT column_name, data_type FROM information_schema.columns WHERE table_name = "{table_name}";'
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please update to use single quotes around table name

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was also addressed. Thank you for all the comments.

@averikitsch
Copy link
Collaborator

/gcbrun

@averikitsch averikitsch merged commit ea22dd0 into googleapis:langgraph-base Mar 5, 2025
4 of 8 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

api: cloudsql-postgres Issues related to the googleapis/langchain-google-cloud-sql-pg-python API.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants