Skip to content

Conversation

@davramov
Copy link
Contributor

@davramov davramov commented Feb 11, 2025

I am opening this PR to address issue #59.

Key components in this initial commit:

  • Includes a new transfer controller CFSToHPSSTransferController() with logic for handling single files vs directories using SFAPI, Slurm, hsi, and htar based on HPSS best practices.
  • Moves create_sfapi_client() to a new file called orchestration/sfapi.py. Now it can be easily accessed by multiple components, including the NERSC tomography workflow
  • Includes new documentation in MkDocs for HPSS.
  • Added an HPSS endpoint to config.yml that we can use to define the root storage location.
  • Updates orchestration/_tests/test_sfapi_flow.py to reflect the new location of create_sfapi_client().

This is still a WIP and requires thorough testing, and there are a few outstanding tasks:

  • Add logic to the new HPSSToCFSTransferController() class (currently is just a placeholder)
  • Incorporate the HPSS transfer into our Prefect Flows.
  • We need a strategy for storing the SFAPI id/key pair. Our application must make it easy to update these values since they expire fairly quickly.
  • Refactor all of the TransferControllers to accept a generic config=Config rather than config=Config832, which will help generalize our codebase as we extend support to additional beamlines.
  • Build a new script orchestration/pruning_controller.py that centralizes and implements Pruning logic for:
    • HPSS as a minimum for this PR, and then start thinking about:
    • Globus Endpoints
    • Local File System Endpoints
  • Store HPSS file paths in SciCat

@davramov
Copy link
Contributor Author

Progress:

HPSS

  • Created two new classes in orchestration/transfer_controller.py: CFSToHPSSTransferController() and HPSSToCFSTransferController()m including SFAPI Slurm job scripts that handle a few cases:
    • Put and retrieve single files with hsi
    • Archive a project into a tar file directly onto HPSS using htar and break it up into multiple files if necessary (if > 2TB)
    • Retrieve either an entire tar archive, or just specific files from the archive using htar.
  • Includes new documentation in MkDocs for HPSS.
  • Added an HPSS endpoint to config.yml that we can use to define the root storage location.
  • Updated orchestration/_tests/test_transfer_controller.py with cases for the new HPSS controllers

New file: orchestration/sfapi.py

  • Moved create_sfapi_client() to a new file called orchestration/sfapi.py so it can be easily accessed by the HPSS TransferControllers and the bl832/nersc.py flow (and any other future flow requiring SFAPI).
  • Updated orchestration/_tests/test_sfapi_flow.py to reflect the new location of create_sfapi_client().

New BeamlineConfig() class in orchestration/config.py

  • Created a generic BeamlineConfig(ABC) class
  • Updated the Config832() class in orchestration/flows/bl832/config.py to be an implementation of the generic BeamlineConfig() class.
  • Updated the Typing in orchestration/transfer_controller.py to listen for a generic BeamlineConfig() rather than the specific Config832(). This will allow us to continue to scale our approach to more beamlines.

New file: orchestration/transfer_endpoints.py

  • Moved TransferEndpoint(ABC) and implementations FileSystemEndpoint() and HPSSEndpoint() into its own file so it can be referenced by the new prune_controller.py as well as the transfer_controller.

New file: orchestration/prune_controller.py

  • Refactored the pruning code into a new PruneController(ABC) class with implementations for HPSSPruneController(), FileSystemPruneController(), and GlobusPruneController.
  • These implementations use the existing schedule_prefect_flow() method to schedule specific Prefect Flows for each type of endpoint that are defined within this script. This should simplify the number of pruning deployments we have at each beamline.
  • Created a get_prune_controller() method that selects the correct controller based on the type of TransferEndpoint that is passed in (maybe this should be an Enum)
  • Created a new pytest script orchestration/_tests/test_prune_controller.py

Next steps:

HPSS

  • Create a new script orchestration/flows/bl832/hpss.py that calls the new TransferControllers
  • Test the implementation
  • Deploy the Prefect Flow at bl832 and connect it with our current pipeline
  • Begin archiving old projects off of NERSC (urgent)

PruneController

  • Implement filesystem and hpss pruning
  • Schedule pruning on CFS after moving to HPSS
  • Test locally
  • Refactor existing code (bl832 flows) to use the new PruneController implementations (maybe a different PR... since switching the pruning implementation will break all of our registered Prefect flows, and those still work)

SciCat

  • Confirm the SciCat version we want to support
  • Investigate how SciCat currently works in this project when ingesting data
  • Find a way to automatically update the filepaths in SciCat metadata when moving from CFS to HPSS (or anywhere really)

@davramov
Copy link
Contributor Author

Notes when talking to Dylan:

Moving to tape strategies:

  • detect when the user proposal changes
    • would require a database, but maybe we could leverage SciCat....
    • look into how SciCat moves things to tape (look at source code) -- collaborate with Garrett
      • "embargo period"
      • origdatablock and datablock for keeping track of file paths

@davramov davramov requested a review from rajasriramoju March 14, 2025 22:07
@davramov davramov self-assigned this Mar 14, 2025
@davramov
Copy link
Contributor Author

Summary of Issue 59 Changes

Key New Features

  • New Prefect HPSS Workflows:
    • Added dispatcher flows to coordinate a few modes
      • archive_832_project_dispatcher flow for individual project archiving
      • archive_832_projects_from_previous_cycle_dispatcher flow that implements a bi-annual archiving policy:
        • January 2: Archives projects from January 1 to July 15 of previous year
        • July 4: Archives projects from July 16 to December 31 of previous year
    • Added a new module (orchestration/hpss.py) for tape archive operations at NERSC
    • Implemented transfer controllers for CFS (Community File System) to HPSS and HPSS to CFS operations
      • CFSToHPSSTransferController that generates SLURM job scripts to bundle files by date ranges into tar archives for efficient tape storage
      • HPSSToCFSTransferController with support for three retrieval modes: single file, full tar extraction, and partial tar extraction
    • Added pruning controller specific to HPSS requirements
  • Transfer and Prune Controller Refactoring
    • Implemented abstract base classes for both transfer and pruning operations
    • Added type-safe controllers with better error handling
    • Created a factory pattern for selecting appropriate controllers based on operation type
  • Endpoint Abstractions
    • Added new endpoint classes in transfer_endpoints.py:
      • TransferEndpoint abstract base class
      • FileSystemEndpoint for local file operations
      • HPSSEndpoint for tape interactions
    • Type-safe controller implementations using Python's Generic[Endpoint] pattern
  • SciCat Integration Improvements
    • Enhanced dataset ingestor with better error handling
    • Added support for derived datasets in addition to raw datasets
    • Implemented tomo recon thumbnails from tiffs and zarr
  • Configuration Management: Implemented Dynaconf for more robust settings management
  • Code Structure: Reorganized flows by separating concerns into modular components
  • Testing: Added pytest test suite to ensure reliability
  • Documentation: Enhanced README with workflow status tables and detailed setup instructions

@davramov
Copy link
Contributor Author

I met with @rajasriramoju and came up with a few final steps to test and verify the feature changes.

End-to-end tests

  • Documentation how to test each feature
  • Dummy data to move around, but real endpoints (if possible)
  • Test each method for each controller
  • Define a logical path to test each portion

Define test for tape:

  • Look at code and see if the logic makes sense
  • Come up with a simple test, move some of Dula's data to tape

Copy link
Contributor

@dylanmcreynolds dylanmcreynolds left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A couple preliminary requests.

As discussed, let's move a lot of this stuff into the scicat_beamline project.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2 questions:

  • I expect the BL scientist or prefect dispatcher to start off the hpss transfer, because they would have access. So when is a user, mentioned in the docs, involved in this?
  • When talking about transferring specific user data from hpss from cfs, where is the operation taking place from and who is performing it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm updating the documentation to define what we mean by NERSC user, ALS user, and service accounts.

NERSC users can move data to HPSS.
In this case, "user" is assumed to be our alsdev service account
In production, ALS users would not be moving the data themselves, but they would still have access to their own data and know where it is located on HPSS via SciCat. As ALS staff, we could get their data if they request it using our flows, or the user could follow the documentation and log in to HPSS in jupyter and they should have file permissions for their projects.

Also thinking maybe we should set the nersc account in the slurm job as an env variable so it's easier for other people to use it themselves. Maybe a fallback can be set in config.yml, and overridden by the env variable.

Change it from this
#SBATCH -A als # Specify the account.

to something like this

nersc_account = os.environ.get("NERSC_ACCOUNT"),
job_script = rf"""#!/bin/bash
...
#SBATCH -A {nersc_account}                            # Specify the account.
  • The flow is set up to use the alsdev service account handles the transfer using the xfer QOS: #SBATCH -q xfer to launch the slurm job with the specific hsi and htar commands
  • It would be launched via a Prefect Agent running on the beamline's respective VM (i.e. flow-prd). We have options if we want that to be a scheduled transfer, or manual transfer of one project directory.
  • ALS Users who are NERSC users could look up their data in SciCat and get their data via hsi/htar commands

@davramov
Copy link
Contributor Author

davramov commented Apr 9, 2025

Notes from Raja

  • - what to env variables to be set
  • - add comments in the code what will move to beamline_scicat repo
  • - credentials to use for the end-to-end tests
    • - scicatlive, ingest user, api url password
    • - globus
  • - als users / HPSS tape
    • - how do we expect users to access their data
    • - do we need a form, clear guide, to teach beamline scientists?
  • - small hackathon next week to test

@davramov
Copy link
Contributor Author

davramov commented Apr 15, 2025

Notes from today's testing hackathon with Raja, just a few items to clean up

TODO:

  • - Write a script to ls a tar on hpss with alsdev (or whatever account moved the data)
  • - Within the slurm scripts (or when it determines the bundles) we might need to filter for files greater than 60 GB we use hsi instead of htar. That would move just the large files individually, and would not be in the .tar archive. Also update the logs to reflect where those files are
  • - In the logs, it does not print the HPSS tree, despite saying it is!
  • - For HPSS to CFS, make sure that the logs are not saved in a complicated tree. Maybe convert slashes to underscores for the log file name to make it easier to find?
  • - Update documentation on the root readme

@davramov davramov requested a review from GBirkel April 30, 2025 21:49
@davramov
Copy link
Contributor Author

Hi @GBirkel, adding you as a reviewer to this PR, but don't worry about it until I rebase with other recent changes.

@davramov davramov marked this pull request as ready for review May 5, 2025 21:40
1. Define the source (CFS) and destination (HPSS) paths.
2. Create the destination directory on HPSS if it doesn't exist.
- Recursively check each part of the incoming file path if the folder exists
- If the folder does not exist, use `hsi mkdir`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there an equivalent to mkdir -p for recursive folder creation? E.g. hsi mkdir -p?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. I found this in the documentation: https://hpss-collaboration.org/wp-content/uploads/2023/09/hpss_hsi_10.2_reference_manual.pdf

19.63. MKDIR command
Synopsis
mkd[ir] [-A annotation] [-m mode] [-p] path …
Description
This command creates HPSS subdirectories.
HSI Version
All
Aliases
add, md
Options
...
-p Creates missing intermediate path name directories. If the -p flag is not specified,
the parent directory of each newly-created directory must already exist

- If a directory, group files by beam cycle and archive them.
* Cycle 1: Jan 1 - Jul 15
* Cycle 2: Jul 16 - Dec 31
* If a group exceeds 2 TB, it is partitioned into multiple tar archives.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should there be code that warns if the resulting archive would be less than 100GB, and skips it, so the content is consolidated into the next archiving run instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting thought. The lower limit bound (<100GB) seems more like a suggestion so that small files are not scattered across many different tapes. I think if all of the files are bundled together for a cycle/proposal, then users could still retrieve their data easily.

def add_new_dataset_location(
self,
dataset_id: str = None,
source_folder: str = None,
Copy link
Contributor

@GBirkel GBirkel May 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we giving add_new_dataset_location a path to a folder, then having it construct a file path by appending the name of the data set to the folder? It seems like a dangerous assumption to make that the file would always have that name, especially since it appears we are quietly falling back to a generic name of "dataset".
If a user wants to put two data sets in the same place and renames one for this purpose, there is no way to supply the changed name to add_new_dataset_location.
Perhaps we should leave it to the caller to compose the full filename?


# Add location information to the path if host is provided
if source_folder_host:
datafile.path = f"{source_folder_host}:{file_path}"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps prepare this string before creating the DataFile object above, so we're not modifying the object after creation?

def _find_dataset(
self,
proposal_id: Optional[str] = None, # The ALS proposal ID, not the SciCat ID
file_name: Optional[str] = None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if SciCat's advanced query API supports this, but, if we're actually searching by file_name, wouldn't it make more sense to look in all the dataFileList entries for all datasets?

This comes to mind because at 733, scientists organize their data mostly by creating dated folders, and there's no guarantee that the files in those folders have unique names relative to the other folders. So if they were searching for a particular data file, they would need to use a path fragment, e.g. '20241216_153047/new_run.h5'

If this function could search dataFileLists by path fragment, it would be some future-proofing for those users...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am adding this use case in a comment as TODO. Right now, the _find_dataset method isn't used anywhere, so we can build on this as needed.

sample_name: str
) -> str:
"""extract search terms from sample name to provide something pleasing to search on"""
terms = re.split("[^a-zA-Z0-9]", sample_name)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this include dashes or underscores?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question... I will leave this as is for now, since I'm not sure.

"""Create a thumbnail from an image array."""
image_array = image_array - np.min(image_array) + 1.001
image_array = np.log(image_array)
image_array = 205 * image_array / (np.max(image_array))
Copy link
Contributor

@GBirkel GBirkel May 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can an image_array ever contain all 0s? (Don't want a "divide by zero" error)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm adding a try-except statement, where if an error occurs while building the thumbnail (e.g., dividing by 0), it will return a blank image. This way if it fails, it won't block the rest of the ingestion flow from completing.

f"in {days_from_now.total_seconds()/86400:.1f} days")

try:
schedule_prefect_flow(
Copy link
Contributor

@GBirkel GBirkel May 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if this is called twice, and two pruning jobs for the same location are triggered at the same time? Will we create a race between the jobs and end up with file errors?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From what I've seen, if the file is already pruned, then the flow would fail with a "file not found" error

file_path: str = None,
source_endpoint: FileSystemEndpoint = None,
check_endpoint: Optional[FileSystemEndpoint] = None,
days_from_now: datetime.timedelta = 0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is asking for days_from_now but expects a timedelta. Might be more clear to either:

  1. Change the argument to time_delta
  2. Expect a floating point value, and create the timedelta internally from it

…rces requested than allowed for logical queue xfer (0.0390625 requested core-equivalents > 0.025)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants