Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Using both AsyncArray and Array from FSSpecStore #2909

Open
ilan-gold opened this issue Mar 13, 2025 · 11 comments
Open

Using both AsyncArray and Array from FSSpecStore #2909

ilan-gold opened this issue Mar 13, 2025 · 11 comments
Labels
bug Potential issues with the zarr-python library

Comments

@ilan-gold
Copy link
Contributor

ilan-gold commented Mar 13, 2025

Zarr version

v3.0.5

Numcodecs version

0.15.1

Python Version

3.13

Operating System

Mac

Installation

uv

Description

Hello! I'm trying to create an api around zarr async and I came across an edge case, maybe. I would like to be able to open the zarr store synchronously (potentially), but then use it asynchronously (hopefully). This seems to be specific to this store and not to zarr async i.e., the following works with a local file instead of an http server. Any advice? It would be great to write code that is synchronous at the top level, but then hooks into async at a lower level.

The weird part is that most of the anndata tests pass with this paradigm - the problem only arises in a script, or ipython sessions. This is likely just specific to remote stores, not some feature of how the program is run. Something to do with the event loop certainly? Any advice?

Steps to reproduce

ran in ipython/jupyter/script

from __future__ import annotations

import asyncio

import zarr.api
import zarr.api.asynchronous


async def main():
    g = await zarr.api.asynchronous.open_array(
        store="http://127.0.0.1:8080/242b59a3-7398-4f0d-89fc-73cbc886d1b9.zarr/X/data"
    )
    await g.getitem(Ellipsis)  # works
    g_sync = zarr.open_array(
        "http://127.0.0.1:8080/242b59a3-7398-4f0d-89fc-73cbc886d1b9.zarr/X/data"
    )
    await (await zarr.api.asynchronous.open_array(store=g_sync.store)).getitem(
        Ellipsis
    )  # fails
    await g_sync._async_array.getitem(()) # fails


if __name__ == "__main__":
    asyncio.run(main())
Traceback (most recent call last):
  File "/Users/ilangold/Projects/Theis/anndata/tester.py", line 23, in <module>
    asyncio.run(main())
    ~~~~~~~~~~~^^^^^^^^
  File "/opt/homebrew/Cellar/[email protected]/3.13.0_1/Frameworks/Python.framework/Versions/3.13/lib/python3.13/asyncio/runners.py", line 194, in run
    return runner.run(main)
           ~~~~~~~~~~^^^^^^
  File "/opt/homebrew/Cellar/[email protected]/3.13.0_1/Frameworks/Python.framework/Versions/3.13/lib/python3.13/asyncio/runners.py", line 118, in run
    return self._loop.run_until_complete(task)
           ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~^^^^^^
  File "/opt/homebrew/Cellar/[email protected]/3.13.0_1/Frameworks/Python.framework/Versions/3.13/lib/python3.13/asyncio/base_events.py", line 721, in run_until_complete
    return future.result()
           ~~~~~~~~~~~~~^^
  File "/Users/ilangold/Projects/Theis/anndata/tester.py", line 17, in main
    await (await zarr.api.asynchronous.open_array(store=g_sync.store)).getitem(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/ilangold/Projects/Theis/anndata/venv/lib/python3.13/site-packages/zarr/api/asynchronous.py", line 1256, in open_array
    return await AsyncArray.open(store_path, zarr_format=zarr_format)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/ilangold/Projects/Theis/anndata/venv/lib/python3.13/site-packages/zarr/core/array.py", line 896, in open
    metadata_dict = await get_array_metadata(store_path, zarr_format=zarr_format)
                    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/ilangold/Projects/Theis/anndata/venv/lib/python3.13/site-packages/zarr/core/array.py", line 177, in get_array_metadata
    zarr_json_bytes, zarray_bytes, zattrs_bytes = await gather(
                                                  ^^^^^^^^^^^^^
    ...<3 lines>...
    )
    ^
  File "/Users/ilangold/Projects/Theis/anndata/venv/lib/python3.13/site-packages/zarr/storage/_common.py", line 124, in get
    return await self.store.get(self.path, prototype=prototype, byte_range=byte_range)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/ilangold/Projects/Theis/anndata/venv/lib/python3.13/site-packages/zarr/storage/_fsspec.py", line 230, in get
    value = prototype.buffer.from_bytes(await self.fs._cat_file(path))
                                        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/ilangold/Projects/Theis/anndata/venv/lib/python3.13/site-packages/fsspec/implementations/http.py", line 234, in _cat_file
    async with session.get(self.encode_url(url), **kw) as r:
               ~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/ilangold/Projects/Theis/anndata/venv/lib/python3.13/site-packages/aiohttp/client.py", line 1425, in __aenter__
    self._resp: _RetType = await self._coro
                           ^^^^^^^^^^^^^^^^
  File "/Users/ilangold/Projects/Theis/anndata/venv/lib/python3.13/site-packages/aiohttp/client.py", line 607, in _request
    with timer:
         ^^^^^
  File "/Users/ilangold/Projects/Theis/anndata/venv/lib/python3.13/site-packages/aiohttp/helpers.py", line 636, in __enter__
    raise RuntimeError("Timeout context manager should be used inside a task")

Additional output

No response

@ilan-gold ilan-gold added the bug Potential issues with the zarr-python library label Mar 13, 2025
@ilan-gold ilan-gold changed the title async from sync in zarr async from sync in zarr (scripts? ipython?) Mar 13, 2025
@ilan-gold
Copy link
Contributor Author

Some additional context: a lot of our code is generic over zarr and hdf5 so being able to do things like g["key"] is not possible if we have to open things asynchronously if I am not mistaken

@ilan-gold ilan-gold changed the title async from sync in zarr (scripts? ipython?) async from sync in zarr Mar 13, 2025
@ilan-gold
Copy link
Contributor Author

So it seems it really does have to do with the event loop under which the store is created: aio-libs/aiohttp#10153

@ilan-gold ilan-gold changed the title async from sync in zarr AsyncArray from Array in zarr Mar 13, 2025
@ilan-gold
Copy link
Contributor Author

I see this comment in _get_loop but this issue would lead me to conclude the opposite:

"""Create or return the default fsspec IO loop

@ilan-gold ilan-gold changed the title AsyncArray from Array in zarr Using both AsyncArray and Array Mar 13, 2025
@ilan-gold
Copy link
Contributor Author

ilan-gold commented Mar 13, 2025

Here's a more interesting example perhaps

from __future__ import annotations

import asyncio

import zarr
import zarr.api.asynchronous
from zarr.storage import FsspecStore


async def main():
    store = FsspecStore.from_url(
        "http://127.0.0.1:8080/242b59a3-7398-4f0d-89fc-73cbc886d1b9.zarr/X/data"
    )

    zarr.open_array(store)

    store = FsspecStore.from_url(
        "http://127.0.0.1:8080/242b59a3-7398-4f0d-89fc-73cbc886d1b9.zarr/X/data"
    )

    await zarr.api.asynchronous.open_array(store=store) # fails, same error as above


if __name__ == "__main__":
    asyncio.run(main())

To me this example says "once you start the zarr event loop, you can't use asynchronous zarr anymore"

@ilan-gold ilan-gold changed the title Using both AsyncArray and Array Using both AsyncArray and Array from FSSpecStore Mar 13, 2025
@jhamman
Copy link
Member

jhamman commented Mar 15, 2025

Thanks for the report @ilan-gold. This feels like it may be the same issue underlying #2878. That said, I'll respond to a few points in this ticket:

  1. The Async/Sync APIs in Zarr were not really built to be used in concert, within one runtime. I'm not particularly surprised things break when you try. I'm curious why you are trying to combine them? Are there parts of the Async API that are missing, causing you to fall back to the Sync API? Is it just the Group.__getitem__ issue you mention above?
  2. I suspect the core issues here are:
    • Fsspec's instance caching is not handling the loop identity (this is async zarr.create_array fails second call through async.run() #2878)
    • Zarr's caches its own IO loop when you start running the sync API. Issues may arise if you try using a different loop. This could be solved by adding an (advanced) API for specifying which loop you want Zarr to use.
  3. Note about the comment you see in _get_loop: I suspect that is just left over from when we copied the sync module from fsspec.

@ilan-gold
Copy link
Contributor Author

  1. I'm trying to combine them for performance since our i/o looks something like this:
from __future__ import annotations

import asyncio
import random
import time

async def child(parent, id, should_log=True, is_async=True) -> None:
    if is_async:
        await asyncio.sleep(random.random())  # some non-trivial request for data
    else:
        time.sleep(random.random()) # some non-trivial request, but now blocking :/
    time.sleep(random.random()) # do someting with the data
    if should_log:
        print("child", id, "done from parent", parent)


async def encapsulated(i, should_log=True, is_async=True) -> None:
    time.sleep(random.random()) # prepare for the data
    children = [child(i, j, is_async=is_async) for j in range(10)]
    await asyncio.gather(*children)
    time.sleep(random.random()) # do something with the children
    if should_log:
        print("parent", i, "done")


async def main(is_async=True) -> None:
    t = time.time()
    await asyncio.gather(
        encapsulated(1, is_async=is_async),
        encapsulated(2, is_async=is_async),
        child("orphaned", "orphan 1", is_async=is_async),
        child("orphaned", "orphan 2", is_async=is_async),
        child("orphaned", "orphan 3", is_async=is_async),
    )
    print("total time taken:", time.time() - t)


if __name__ == "__main__":
    asyncio.run(main(is_async=False))

We let users pass in their own zarr stores at the moment so they can do partial writes/reads (among other reasons), so this issue would basically mean users who want to do that with remote data and the synchronous api cannot (the merit of which can be debated certainly, no arguments from me). Not the end of the world, but also not great. For example, someone might want to create a data loader that loads chunks of an AnnData object - we could tell them "if you want to do this with remote data, you must create the store in its own async context and wrap asyncio.run yourself etc. to load" but this issue feels solvable maybe? I will end up powering through it if need be but now I'm quite interested in what is going on, bringing me to (no pun intended):

  1. What's interesting is I passed in zarr's default event loop _get_loop to url_to_fs

    fs, path = url_to_fs(url, **opts)
    and even though the underlying store was using zarr's event loop (including a small change in fsspec locally as well to force it to use that loop trying to get around this asychronnous argument, which I don't fully understand) i.e., assert store.fs.loop == _get_loop, this problem still came up, so I'm a bit lost cc @martindurant .

  2. As my answer to 2. might imply, I've been digging around a bit and came to a similar conclusion but didn't want to spam this post more :)

Would be cool to solve this but also I think we could work around it!

@rabernat
Copy link
Contributor

I will just comment that this is a really hard problem that kind of plagues all of Python. Once you introduce async functions, it sort of infects your entire stack. Everything upstack from the async function call also needs to be async. This makes it really hard to incrementally introduce async behavior in places that it can improve performance (e.g. with I/O) without forcing users to rewrite their entire code as async. It's particularly hard in data science environments where users really don't want to have to learn / use async concepts.

One solution, prototyped by fsspec and copied by Zarr Python 3, is to manage a dedicated async event loop and hide it in a separate thread. This works okay as long as the calling code is all sync. But if you try to mix this approach with other async code, it gets really messy (multiple different event loops). I think that's what we are seeing here.

@martindurant
Copy link
Member

even though the underlying store was using zarr's event loop

Not only the value of the loop matters, but potentially where the instance is made (in a coroutine or not). I think this should only happen on the right loop (here), but it's possible other resources get allocated along the way which tie themselves to the "current" event loop wherever they are called.

@martindurant
Copy link
Member

For the instance caching/loop situation, can someone try

--- a/fsspec/spec.py
+++ b/fsspec/spec.py
@@ -1,5 +1,6 @@
 from __future__ import annotations

+import asyncio
 import io
 import json
 import logging
@@ -67,8 +68,15 @@ class _Cached(type):
         extra_tokens = tuple(
             getattr(cls, attr, None) for attr in cls._extra_tokenize_attributes
         )
+        if "loop" not in kwargs and cls.async_impl:
+            try:
+                loop = asyncio.get_running_loop()
+            except RuntimeError:
+                loop = None
+        else:
+            loop = None
         token = tokenize(
-            cls, cls._pid, threading.get_ident(), *args, *extra_tokens, **kwargs
+            cls, cls._pid, threading.get_ident(), loop, *args, *extra_tokens, **kwargs
         )
         skip = kwargs.pop("skip_instance_cache", False)

@ilan-gold
Copy link
Contributor Author

ilan-gold commented Mar 18, 2025

Not only the value of the loop matters, but potentially where the instance is made (in a coroutine or not). I think this should only happen on the right loop (here), but it's possible other resources get allocated along the way which tie themselves to the "current" event loop wherever they are called.

Interesting, so if the loop is made outside a coroutine but called inside one, it won't work? or?

As for the changes, I made them but got:

Traceback (most recent call last):
  File "/Users/ilangold/Projects/Theis/zarr-python/tester.py", line 11, in <module>
    asyncio.run(main())
  File "/Users/ilangold/.pyenv/versions/3.12.1/lib/python3.12/asyncio/runners.py", line 194, in run
    return runner.run(main)
           ^^^^^^^^^^^^^^^^
  File "/Users/ilangold/.pyenv/versions/3.12.1/lib/python3.12/asyncio/runners.py", line 118, in run
    return self._loop.run_until_complete(task)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/ilangold/.pyenv/versions/3.12.1/lib/python3.12/asyncio/base_events.py", line 684, in run_until_complete
    return future.result()
           ^^^^^^^^^^^^^^^
  File "/Users/ilangold/Projects/Theis/zarr-python/tester.py", line 8, in main
    await z._async_array.getitem(())
  File "/Users/ilangold/Projects/Theis/zarr-python/src/zarr/core/array.py", line 1346, in getitem
    return await self._get_selection(indexer, prototype=prototype)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/ilangold/Projects/Theis/zarr-python/src/zarr/core/array.py", line 1287, in _get_selection
    await self.codec_pipeline.read(
  File "/Users/ilangold/Projects/Theis/zarr-python/src/zarr/core/codec_pipeline.py", line 464, in read
    await concurrent_map(
  File "/Users/ilangold/Projects/Theis/zarr-python/src/zarr/core/common.py", line 68, in concurrent_map
    return await asyncio.gather(*[asyncio.ensure_future(run(item)) for item in items])
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/ilangold/Projects/Theis/zarr-python/src/zarr/core/common.py", line 66, in run
    return await func(*item)
           ^^^^^^^^^^^^^^^^^
  File "/Users/ilangold/Projects/Theis/zarr-python/src/zarr/core/codec_pipeline.py", line 265, in read_batch
    chunk_bytes_batch = await concurrent_map(
                        ^^^^^^^^^^^^^^^^^^^^^
  File "/Users/ilangold/Projects/Theis/zarr-python/src/zarr/core/common.py", line 68, in concurrent_map
    return await asyncio.gather(*[asyncio.ensure_future(run(item)) for item in items])
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/ilangold/Projects/Theis/zarr-python/src/zarr/core/common.py", line 66, in run
    return await func(*item)
           ^^^^^^^^^^^^^^^^^
  File "/Users/ilangold/Projects/Theis/zarr-python/src/zarr/storage/_common.py", line 124, in get
    return await self.store.get(self.path, prototype=prototype, byte_range=byte_range)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/ilangold/Projects/Theis/zarr-python/src/zarr/storage/_fsspec.py", line 230, in get
    value = prototype.buffer.from_bytes(await self.fs._cat_file(path))
                                        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/ilangold/Projects/Theis/filesystem_spec/fsspec/implementations/http.py", line 234, in _cat_file
    async with session.get(self.encode_url(url), **kw) as r:
  File "/Users/ilangold/Projects/Theis/zarr-python/venv/lib/python3.12/site-packages/aiohttp/client.py", line 1423, in __aenter__
    self._resp: _RetType = await self._coro
                           ^^^^^^^^^^^^^^^^
  File "/Users/ilangold/Projects/Theis/zarr-python/venv/lib/python3.12/site-packages/aiohttp/client.py", line 607, in _request
    with timer:
  File "/Users/ilangold/Projects/Theis/zarr-python/venv/lib/python3.12/site-packages/aiohttp/helpers.py", line 636, in __enter__
    raise RuntimeError("Timeout context manager should be used inside a task")
RuntimeError: Timeout context manager should be used inside a task
Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x1029cdf70>
Unclosed connector
connections: ['deque([(<aiohttp.client_proto.ResponseHandler object at 0x1034cdf10>, 981138.594737125), (<aiohttp.client_proto.ResponseHandler object at 0x1034ced50>, 981138.594759583), (<aiohttp.client_proto.ResponseHandler object at 0x1034ce390>, 981138.594787125), (<aiohttp.client_proto.ResponseHandler object at 0x1034ce7b0>, 981138.59481), (<aiohttp.client_proto.ResponseHandler object at 0x1034cee70>, 981138.594949916), (<aiohttp.client_proto.ResponseHandler object at 0x1034ce0f0>, 981138.596215125), (<aiohttp.client_proto.ResponseHandler object at 0x1034cf530>, 981138.59787275), (<aiohttp.client_proto.ResponseHandler object at 0x1034cef90>, 981138.600228291), (<aiohttp.client_proto.ResponseHandler object at 0x1034cdeb0>, 981138.600392833), (<aiohttp.client_proto.ResponseHandler object at 0x1034cec30>, 981138.6009755)])']
connector: <aiohttp.connector.TCPConnector object at 0x1021f6720>
venv(base) 

@martindurant
Copy link
Member

martindurant commented Mar 18, 2025

if the loop is made outside a coroutine but called inside one, it won't work

It is possible, if you are explicit about when and how you call the loop and always maintain the reference in all places. I don't recommend it. In fact, you cannot run a coroutine without a loop pre-existing, right?

The with timer line definitely happens inside a coroutine (in aiohttp.client.ClientSession._request), so I don't know what that means at all.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Potential issues with the zarr-python library
Projects
None yet
Development

No branches or pull requests

4 participants