Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
164 changes: 164 additions & 0 deletions lib/iris/io/s3_mounts.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
# Copyright Iris contributors
#
# This file is part of Iris and is released under the BSD license.
# See LICENSE in the root of the repository for full licensing details.
"""Code to manage access to files stored in S3 buckets.

S3 buckets are mounted by name in temporary directories.
These mounts must persist, so that any data-proxies can re-open the mapped files.
They are removed either at system exit, or under the control of a context manager.
Authentication, access control, and the storage region supplied are controlled outside of
Iris/Python, by user configuration (e.g. "aws configure").

"""

import atexit
from pathlib import Path
import shutil
import subprocess
from subprocess import run
import time
from typing import List

_DO_DEBUG = True


def _DEBUG(*args, **kwargs):
if _DO_DEBUG:
print(*args, **kwargs)


class _S3MountsManager:
_N_UNMOUNT_RETRIES = 3
_NULL_PATH = Path("/<nonexist>")

def __init__(self, base_path: str | Path | None = None):
self.mounts_basepath: Path = self._NULL_PATH
self.mount_paths: List[Path] = []

def _ensure_basepath(self) -> Path:
if self.mounts_basepath is self._NULL_PATH:
base_path = Path("/var/tmp/__iris_s3mounts__")
if not base_path.exists():
base_path.mkdir(parents=True, exist_ok=False)
self.mounts_basepath = base_path
assert self.mounts_basepath.exists()
return self.mounts_basepath

def bucket_mountpath(self, bucket_name: str) -> Path:
self._ensure_basepath()
mount_path = self.mounts_basepath / bucket_name
if mount_path not in self.mount_paths:
_DEBUG(f"\nCreating S3 mount-path dir {mount_path} ...")
mount_path.mkdir(parents=True, exist_ok=False)
_DEBUG("...done.\n")

_DEBUG(f"\nMounting S3 bucket {bucket_name} ...")
try:
run(
f"s3fs {bucket_name} {mount_path}",
shell=True,
capture_output=True,
check=True,
)
except subprocess.CalledProcessError as exc:
print(f"Error mounting s3 bucket {bucket_name} at {mount_path}:")
print(exc.stderr.decode())
raise
_DEBUG("...done.\n")
self.mount_paths.append(mount_path)

return mount_path

def unmount_bucket(self, mount_path: Path, n_tries_done: int) -> bool:
"""Attempt to unmount the specified S3 mount, or force if timed out."""
success = False
if n_tries_done >= self._N_UNMOUNT_RETRIES:
_DEBUG(
f"Unmount of {mount_path} out of retries - final attempt with lazy ..."
)
try:
run(
f"umount {mount_path} -l",
shell=True,
capture_output=True,
check=True,
)
_DEBUG("...succeeded.\n")
success = True
except subprocess.CalledProcessError as exc:
msg = exc.stderr.decode()
print(f"Unknown error in 'umount {mount_path} -l' :", msg)
raise
else:
try:
_DEBUG(
f"\nUnmounting mount path {mount_path}, attempt #{n_tries_done} ..."
)
run(f"umount {mount_path}", shell=True, capture_output=True, check=True)
_DEBUG("...succeeded.\n")
success = True
except subprocess.CalledProcessError as exc:
msg = exc.stderr.decode()
if "busy" in msg:
# This is OK. We will just pause before retrying.
_DEBUG("Unmount request failed with 'busy': error=", msg)
else:
print(f"Unknown error attempting to unmount {mount_path}:", msg)
raise
return success

def unmount_all(self):
# Cleanup handler
unmount_tries = {name: 0 for name in self.mount_paths}
while unmount_tries:
# try once (each pass) to unmount each outstanding mount.
to_do = list(unmount_tries.keys())
for unmount in to_do:
n_tries = unmount_tries[unmount]
unmount_tries[unmount] = n_tries + 1
try:
if self.unmount_bucket(unmount, n_tries):
del unmount_tries[unmount]
except subprocess.CalledProcessError as exc:
print("Other failure on unmount: ", exc.stderr.decode())
raise

if unmount_tries:
# If any remain, pause before doing a batch of retries.
time.sleep(1.0)

# All finally gone: remove the base-path also
try:
shutil.rmtree(self.mounts_basepath)
finally:
self.mounts_basepath = self._NULL_PATH


# Singleton object that holds all the info.
s3_mounter = _S3MountsManager()


# Fix so that we clean up at the exit of the main program.
@atexit.register
def _cleanup():
_DEBUG("\n\nFINAL CLEANUP.\n")
try:
s3_mounter.unmount_all()
except Exception:
pass # ignore any exception with the cleanup handler


# #
# # ALSO: force cleanup if an uncaught Exception causes exit.
# #
# _orig_excepthook = sys.excepthook
#
# def _excepthook(*args):
# global _orig_excepthook
# try:
# _cleanup()
# finally:
# _orig_excepthook(*args)
#
# sys.excepthook = _excepthook
23 changes: 20 additions & 3 deletions lib/iris/loading.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,18 +29,35 @@ def _generate_cubes(uris, callback, constraints):

for scheme, groups in itertools.groupby(uri_tuples, key=lambda x: x[0]):
# Call each scheme handler with the appropriate URIs
if scheme == "file":
part_names = [x[1] for x in groups]
for cube in iris.io.load_files(part_names, callback, constraints):
if scheme in ("file", "s3"):
filepaths = []
for group in groups:
filepath = group[1]
if scheme == "s3":
# S3-based files are loaded from a bucket mounted *as a filesystem*.
# For this (a) ensure the mount and (b) adjust the filepath.
from iris.io.s3_mounts import s3_mounter

# Strip off initial "//"
assert filepath.startswith("//")
bucket_name, filepath = filepath[2:].split("/", maxsplit=1)
bucket_mount_path = s3_mounter.bucket_mountpath(bucket_name)
filepath = str(bucket_mount_path / filepath)
filepaths.append(filepath)

for cube in iris.io.load_files(filepaths, callback, constraints):
yield cube

elif scheme in ["http", "https"]:
urls = [":".join(x) for x in groups]
for cube in iris.io.load_http(urls, callback):
yield cube

elif scheme == "data":
data_objects = [x[1] for x in groups]
for cube in iris.io.load_data_objects(data_objects, callback):
yield cube

else:
raise ValueError("Iris cannot handle the URI scheme: %s" % scheme)

Expand Down
Loading