Skip to content

Commit

Permalink
Rename Deferred to IO to reflect that its an I/O monad.
Browse files Browse the repository at this point in the history
Also use destructive substituion where needed to make function signature
return types more explicit.
  • Loading branch information
zoj613 committed Dec 25, 2024
1 parent 00ade96 commit 5d79457
Show file tree
Hide file tree
Showing 17 changed files with 210 additions and 239 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ open Zarr
open Zarr.Codecs
open Zarr.Indexing
open Zarr_sync.Storage
open Deferred.Infix (* opens infix operators >>= and >>| for monadic bind & map *)
open IO.Infix (* opens infix operators >>= and >>| for monadic bind & map *)
let store = FilesystemStore.create "testdata.zarr";;
```
Expand Down
17 changes: 8 additions & 9 deletions zarr-eio/src/storage.ml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
module Deferred = struct
module IO = struct
type 'a t = 'a
let return = Fun.id
let bind x f = f x
Expand All @@ -19,14 +19,13 @@ module Deferred = struct
end
end

module ZipStore = Zarr.Zip.Make(Deferred)
module MemoryStore = Zarr.Memory.Make(Deferred)
module ZipStore = Zarr.Zip.Make(IO)
module MemoryStore = Zarr.Memory.Make(IO)

module FilesystemStore = struct
module IO = struct
module Deferred = Deferred

module S = struct
type t = {root : Eio.Fs.dir_ty Eio.Path.t; perm : Eio.File.Unix_perm.t}
type 'a io = 'a IO.t

let fspath_to_key t (path : Eio.Fs.dir_ty Eio.Path.t) =
let s = snd path and pos = String.length (snd t.root) + 1 in
Expand Down Expand Up @@ -121,12 +120,12 @@ module FilesystemStore = struct
let create ?(perm=0o700) ~env dirname =
Zarr.Util.create_parent_dir dirname perm;
Sys.mkdir dirname perm;
IO.{root = Eio.Path.(Eio.Stdenv.fs env / Zarr.Util.sanitize_dir dirname); perm}
S.{root = Eio.Path.(Eio.Stdenv.fs env / Zarr.Util.sanitize_dir dirname); perm}

let open_store ?(perm=0o700) ~env dirname =
if Sys.is_directory dirname
then IO.{root = Eio.Path.(Eio.Stdenv.fs env / Zarr.Util.sanitize_dir dirname); perm}
then S.{root = Eio.Path.(Eio.Stdenv.fs env / Zarr.Util.sanitize_dir dirname); perm}
else raise (Zarr.Storage.Not_a_filesystem_store dirname)

include Zarr.Storage.Make(IO)
include Zarr.Storage.Make(IO)(S)
end
8 changes: 4 additions & 4 deletions zarr-eio/src/storage.mli
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
module Deferred : Zarr.Types.Deferred with type 'a t = 'a
module IO : Zarr.Types.IO with type 'a t = 'a

(** An Eio-aware in-memory storage backend for Zarr v3 hierarchy. *)
module MemoryStore : Zarr.Memory.S with module Deferred = Deferred
module MemoryStore : Zarr.Memory.S with type 'a io := 'a

(** An Eio-aware Zip file storage backend for a Zarr v3 hierarchy. *)
module ZipStore : Zarr.Zip.S with module Deferred = Deferred
module ZipStore : Zarr.Zip.S with type 'a io := 'a

(** An Eio-aware local filesystem storage backend for a Zarr v3 hierarchy. *)
module FilesystemStore : sig
include Zarr.Storage.STORE with module Deferred = Deferred
include Zarr.Storage.S with type 'a io := 'a

val create : ?perm:Eio.File.Unix_perm.t -> env:<fs : Eio.Fs.dir_ty Eio.Path.t; ..> -> string -> t
(** [create ~perm ~env dir] returns a new filesystem store.
Expand Down
8 changes: 3 additions & 5 deletions zarr-eio/test/test_eio.ml
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,7 @@ let string_of_list = [%show: string list]
let print_node_pair = [%show: Node.Array.t list * Node.Group.t list]
let print_int_array = [%show : int array]

module type EIO_STORE = sig
include Zarr.Storage.STORE with type 'a Deferred.t = 'a
end
module type EIO_STORE = Zarr.Storage.S with type 'a io := 'a

let test_storage
(type a) (module M : EIO_STORE with type t = a) (store : a) =
Expand Down Expand Up @@ -68,7 +66,7 @@ let test_storage
assert_equal exp got;
match codecs with
| [`ShardingIndexed _] -> Array.delete store anode
| _ -> Deferred.return_unit)
| _ -> IO.return_unit)
[[`ShardingIndexed cfg]; [`Bytes BE]];

let child = Node.Group.of_path "/some/child/group" in
Expand Down Expand Up @@ -147,7 +145,7 @@ let _ =
let zpath = tmp_dir ^ ".zip" in
ZipStore.with_open `Read_write zpath (fun z -> test_storage (module ZipStore) z);
(* test just opening the now exisitant archive created by the previous test. *)
ZipStore.with_open `Read_only zpath (fun _ -> ZipStore.Deferred.return_unit);
ZipStore.with_open `Read_only zpath (fun _ -> ());
test_storage (module MemoryStore) @@ MemoryStore.create ();
test_storage (module FilesystemStore) s)
])
63 changes: 31 additions & 32 deletions zarr-lwt/src/storage.ml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
module Deferred = struct
module IO = struct
type 'a t = 'a Lwt.t
let return = Lwt.return
let bind = Lwt.bind
Expand All @@ -19,16 +19,16 @@ module Deferred = struct
end
end

module ZipStore = Zarr.Zip.Make(Deferred)
module MemoryStore = Zarr.Memory.Make(Deferred)
module ZipStore = Zarr.Zip.Make(IO)
module MemoryStore = Zarr.Memory.Make(IO)

module FilesystemStore = struct
module IO = struct
module Deferred = Deferred
open Deferred.Infix
open Deferred.Syntax
module S = struct
open IO.Infix
open IO.Syntax

type t = {dirname : string; perm : Lwt_unix.file_perm}
type 'a io = 'a IO.t

let fspath_to_key t path =
let pos = String.length t.dirname + 1 in
Expand All @@ -49,7 +49,7 @@ module FilesystemStore = struct
let size t key =
let file_length path () = Lwt.map Int64.to_int (Lwt_io.file_length path)
and filepath = key_to_fspath t key in
Lwt.catch (file_length filepath) (Fun.const @@ Deferred.return 0)
Lwt.catch (file_length filepath) (Fun.const @@ IO.return 0)

let get t key =
let* buf_size = size t key in
Expand Down Expand Up @@ -148,22 +148,22 @@ module FilesystemStore = struct
let create ?(perm=0o700) dirname =
Zarr.Util.create_parent_dir dirname perm;
Sys.mkdir dirname perm;
IO.{dirname = Zarr.Util.sanitize_dir dirname; perm}
S.{dirname = Zarr.Util.sanitize_dir dirname; perm}

let open_store ?(perm=0o700) dirname =
if Sys.is_directory dirname
then IO.{dirname = Zarr.Util.sanitize_dir dirname; perm}
then S.{dirname = Zarr.Util.sanitize_dir dirname; perm}
else raise (Zarr.Storage.Not_a_filesystem_store dirname)

include Zarr.Storage.Make(IO)
include Zarr.Storage.Make(IO)(S)
end

module AmazonS3Store = struct
module Credentials = Aws_s3_lwt.Credentials
module S3 = Aws_s3_lwt.S3

open Deferred.Infix
open Deferred.Syntax
open IO.Infix
open IO.Syntax

exception Request_failed of S3.error

Expand All @@ -178,7 +178,7 @@ module AmazonS3Store = struct

let fold_or_catch ~not_found res =
let return_or_raise r () = match r with
| Ok v -> Deferred.return v
| Ok v -> IO.return v
| Error e -> raise (Request_failed e)
and on_exception ~not_found = function
| Request_failed S3.Not_found -> Lwt.return (not_found ())
Expand All @@ -190,19 +190,18 @@ module AmazonS3Store = struct
let empty_Ls = Fun.const ([], S3.Ls.Done)

let fold_continuation ~return ~more = function
| S3.Ls.Done -> Deferred.return return
| S3.Ls.Done -> IO.return return
| S3.Ls.More continuation ->
continuation () >>= fold_or_catch ~not_found:empty_Ls >>= fun (xs, cont) ->
more xs cont

module IO = struct
module Deferred = Deferred

module S = struct
type t =
{retries : int
;bucket : string
;cred : Credentials.t
;endpoint : Aws_s3.Region.endpoint}
type 'a io = 'a IO.t

let size t key =
let bucket = t.bucket and credentials = t.cred and endpoint = t.endpoint in
Expand All @@ -229,19 +228,19 @@ module AmazonS3Store = struct
let* res = S3.retry ~retries:t.retries ~endpoint ~f () in
Lwt.map (fun x -> [x]) (fold_or_catch ~not_found:(raise_not_found key) res)
in
Deferred.concat_map (read_range t key) ranges
IO.concat_map (read_range t key) ranges

let set t key data =
let bucket = t.bucket and credentials = t.cred and endpoint = t.endpoint in
let f ~endpoint () = S3.put ~bucket ~credentials ~endpoint ~data ~key () in
let* res = S3.retry ~retries:t.retries ~endpoint ~f () in
let* _ = fold_or_catch ~not_found:(Fun.const String.empty) res in
Deferred.return_unit
IO.return_unit

let set_partial_values t key ?(append=false) rsv =
let* size = size t key in
let* ov = match size with
| 0 -> Deferred.return String.empty
| 0 -> IO.return String.empty
| _ -> get t key
in
let f = if append || ov = String.empty then
Expand All @@ -259,7 +258,7 @@ module AmazonS3Store = struct
S3.retry ~retries:t.retries ~endpoint ~f () >>= fold_or_catch ~not_found:(Fun.const ())

let rec delete_keys t cont () =
let del t xs c = Deferred.iter (delete_content t) xs >>= delete_keys t c in
let del t xs c = IO.iter (delete_content t) xs >>= delete_keys t c in
fold_continuation ~return:() ~more:(del t) cont

and delete_content t S3.{key; _} = erase t key
Expand All @@ -269,7 +268,7 @@ module AmazonS3Store = struct
let f ~endpoint () = S3.ls ~bucket ~credentials ~endpoint ~prefix () in
let* res = S3.retry ~retries:t.retries ~endpoint ~f () in
let* xs, rest = fold_or_catch ~not_found:empty_Ls res in
Deferred.iter (delete_content t) xs >>= delete_keys t rest
IO.iter (delete_content t) xs >>= delete_keys t rest

let rec list t =
let bucket = t.bucket and credentials = t.cred and endpoint = t.endpoint in
Expand All @@ -284,32 +283,32 @@ module AmazonS3Store = struct
let append acc xs c = accumulate_keys (acc @ List.map content_key xs) c in
fold_continuation ~return:acc ~more:(append acc) cont

module S = Set.Make(String)
module M = Set.Make(String)

let rec partition_keys prefix ((l, r) as acc) cont =
let split ~acc ~prefix xs c = partition_keys prefix (List.fold_left (add prefix) acc xs) c in
fold_continuation ~return:(l, S.elements r) ~more:(split ~acc ~prefix) cont
fold_continuation ~return:(l, M.elements r) ~more:(split ~acc ~prefix) cont

and add prefix (l, r) (c : S3.content) =
let size = String.length prefix in
if not (String.contains_from c.key size '/') then c.key :: l, r else
l, S.add String.(sub c.key 0 @@ 1 + index_from c.key size '/') r
l, M.add String.(sub c.key 0 @@ 1 + index_from c.key size '/') r

and list_dir t prefix =
let bucket = t.bucket and credentials = t.cred and endpoint = t.endpoint in
let f ~endpoint () = S3.ls ~bucket ~credentials ~endpoint ~prefix () in
let* res = S3.retry ~retries:t.retries ~endpoint ~f () in
let* xs, rest = fold_or_catch ~not_found:empty_Ls res in
let init = List.fold_left (add prefix) ([], S.empty) xs in
let init = List.fold_left (add prefix) ([], M.empty) xs in
partition_keys prefix init rest

let rec rename t prefix new_prefix =
let upload t (k, v) = set t k v in
let* xs = list t in
let to_delete = List.filter (String.starts_with ~prefix) xs in
let* data = Deferred.fold_left (rename_and_add ~t ~prefix ~new_prefix) [] to_delete in
let* () = Deferred.iter (upload t) data in
Deferred.iter (erase t) to_delete
let* data = IO.fold_left (rename_and_add ~t ~prefix ~new_prefix) [] to_delete in
let* () = IO.iter (upload t) data in
IO.iter (erase t) to_delete

and rename_and_add ~t ~prefix ~new_prefix acc k =
let l = String.length prefix in
Expand All @@ -321,7 +320,7 @@ module AmazonS3Store = struct
let* res = Credentials.Helper.get_credentials ~profile () in
let cred = Result.fold ~ok:Fun.id ~error:raise res in
let endpoint = Aws_s3.Region.endpoint ~inet ~scheme region in
f IO.{bucket; cred; endpoint; retries}
f S.{bucket; cred; endpoint; retries}

include Zarr.Storage.Make(IO)
include Zarr.Storage.Make(IO)(S)
end
10 changes: 5 additions & 5 deletions zarr-lwt/src/storage.mli
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
module Deferred : Zarr.Types.Deferred with type 'a t = 'a Lwt.t
module IO : Zarr.Types.IO with type 'a t = 'a Lwt.t

(** An Lwt-aware in-memory storage backend for Zarr v3 hierarchy. *)
module MemoryStore : Zarr.Memory.S with module Deferred = Deferred
module MemoryStore : Zarr.Memory.S with type 'a io := 'a Lwt.t

(** An Lwt-aware Zip file storage backend for a Zarr v3 hierarchy. *)
module ZipStore : Zarr.Zip.S with module Deferred = Deferred
module ZipStore : Zarr.Zip.S with type 'a io := 'a Lwt.t

(** An Lwt-aware local filesystem storage backend for a Zarr V3 hierarchy. *)
module FilesystemStore : sig
include Zarr.Storage.STORE with module Deferred = Deferred
include Zarr.Storage.S with type 'a io := 'a Lwt.t

val create : ?perm:Unix.file_perm -> string -> t
(** [create ~perm dir] returns a new filesystem store.
Expand All @@ -25,7 +25,7 @@ end
module AmazonS3Store : sig
exception Request_failed of Aws_s3_lwt.S3.error

include Zarr.Storage.STORE with module Deferred = Deferred
include Zarr.Storage.S with type 'a io := 'a Lwt.t

val with_open :
?scheme:[ `Http | `Https ] ->
Expand Down
12 changes: 5 additions & 7 deletions zarr-lwt/test/test_lwt.ml
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,12 @@ let string_of_list = [%show: string list]
let print_node_pair = [%show: Node.Array.t list * Node.Group.t list]
let print_int_array = [%show : int array]

module type LWT_STORE = sig
include Zarr.Storage.STORE with type 'a Deferred.t = 'a Lwt.t
end
module type LWT_STORE = Zarr.Storage.S with type 'a io := 'a Lwt.t

let test_storage
(type a) (module M : LWT_STORE with type t = a) (store : a) =
let open M in
let open M.Deferred.Infix in
let open IO.Infix in
let gnode = Node.Group.root in

hierarchy store >>= fun nodes ->
Expand Down Expand Up @@ -69,7 +67,7 @@ let test_storage
assert_equal exp got;
match codecs with
| [`ShardingIndexed _] -> Array.delete store anode
| _ -> Deferred.return_unit)
| _ -> IO.return_unit)
[[`ShardingIndexed cfg]; [`Bytes BE]] >>= fun () ->

let child = Node.Group.of_path "/some/child/group" in
Expand Down Expand Up @@ -114,7 +112,7 @@ let test_storage
clear store >>= fun () ->
hierarchy store >>= fun got ->
assert_equal ~printer:print_node_pair ([], []) got;
Deferred.return_unit
IO.return_unit

let _ =
run_test_tt_main @@ ("Run Zarr Lwt API tests" >::: [
Expand Down Expand Up @@ -151,7 +149,7 @@ let _ =
Lwt_main.run @@ Lwt.join
[ZipStore.with_open `Read_write zpath (fun z -> test_storage (module ZipStore) z)
(* test just opening the now exisitant archive created by the previous test. *)
;ZipStore.with_open `Read_only zpath (fun _ -> ZipStore.Deferred.return_unit)
;ZipStore.with_open `Read_only zpath (fun _ -> Lwt.return_unit)
;AmazonS3Store.with_open ~region ~bucket ~profile (test_storage (module AmazonS3Store))
;test_storage (module MemoryStore) @@ MemoryStore.create ()
;test_storage (module FilesystemStore) s])
Expand Down
Loading

0 comments on commit 5d79457

Please sign in to comment.