diff --git a/.env.example b/.env.example index 2b8fdb89..e3728e89 100644 --- a/.env.example +++ b/.env.example @@ -1,4 +1,7 @@ GLOBUS_CLIENT_ID= GLOBUS_CLIENT_SECRET= PREFECT_API_URL= -PREFECT_API_KEY= \ No newline at end of file +PREFECT_API_KEY= +PUSHGATEWAY_URL= +JOB_NAME= +INSTANCE_LABEL= \ No newline at end of file diff --git a/.github/workflows/python-app.yml b/.github/workflows/python-app.yml index 5d38fb01..1164dede 100644 --- a/.github/workflows/python-app.yml +++ b/.github/workflows/python-app.yml @@ -12,16 +12,16 @@ jobs: steps: - uses: actions/checkout@v2 - - name: Set up Python 3.9 - uses: actions/setup-python@v2 + - name: Set up Python 3.11 + uses: actions/setup-python@v5 with: python-version: 3.11 cache: 'pip' - name: Install dependencies run: | - python -m pip install --upgrade pip - pip install flake8 pytest - pip install . + python -m pip install --no-cache-dir --upgrade pip + pip install --no-cache-dir flake8 pytest + pip install --no-cache-dir . if [ -f requirements.txt ]; then pip install -r requirements.txt; fi if [ -f requirements-dev.txt ]; then pip install -r requirements-dev.txt; fi - name: Lint with flake8 diff --git a/create_deployments_832.sh b/create_deployments_832.sh old mode 100755 new mode 100644 index 0d51438e..0c9bbd3e --- a/create_deployments_832.sh +++ b/create_deployments_832.sh @@ -9,6 +9,11 @@ prefect work-pool create 'scicat_ingest_pool' # new_file_832_flow_pool # in docker-compose.yaml: # command: prefect agent start --pool "new_file_832_flow_pool" + +# file transfer from data832 to nersc832 with Grafana Monitoring enabled +prefect deployment build ./orchestration/flows/bl832/move.py:test_transfers_832_grafana -n test_transfers_832_grafana -p new_file_832_flow_pool -q test_transfers_832_queue +prefect deployment apply test_transfers_832_grafana-deployment.yaml + prefect deployment build ./orchestration/flows/bl832/move.py:process_new_832_file -n new_file_832 -p new_file_832_flow_pool -q new_file_832_queue prefect deployment apply process_new_832_file-deployment.yaml diff --git a/orchestration/_tests/test_globus.py b/orchestration/_tests/test_globus.py index 26f48880..0d96d6a6 100644 --- a/orchestration/_tests/test_globus.py +++ b/orchestration/_tests/test_globus.py @@ -68,7 +68,7 @@ def test_succeeded_transfer(): source_endpoint = GlobusEndpoint("123", "source.magrathea.com", "/root") dest_endpoint = GlobusEndpoint("456", "dest.magrathea.com", "/root") - result = start_transfer( + result, _ = start_transfer( transfer_client, source_endpoint, "/42/mice.jpg", dest_endpoint, "/42/mice.jpg" ) @@ -80,7 +80,7 @@ def test_failed_transfer(): source_endpoint = GlobusEndpoint("123", "source.magrathea.com", "/root") dest_endpoint = GlobusEndpoint("456", "dest.magrathea.com", "/root") - result = start_transfer( + result, _ = start_transfer( transfer_client, source_endpoint, "/42/mice.jpg", dest_endpoint, "/42/mice.jpg" ) diff --git a/orchestration/_tests/test_transfer_controller.py b/orchestration/_tests/test_transfer_controller.py index 15b07f54..a1cae916 100644 --- a/orchestration/_tests/test_transfer_controller.py +++ b/orchestration/_tests/test_transfer_controller.py @@ -12,6 +12,7 @@ from .test_globus import MockTransferClient + @pytest.fixture(autouse=True, scope="session") def prefect_test_fixture(): """ @@ -160,7 +161,7 @@ def test_globus_transfer_controller_copy_success( # Patch any Secret.load calls to avoid real Prefect Cloud calls mocker.patch('prefect.blocks.system.Secret.load', return_value=MockSecretClass()) - with patch("orchestration.transfer_controller.start_transfer", return_value=True) as mock_start_transfer: + with patch("orchestration.transfer_controller.start_transfer", return_value=(True, "mock-task-id")) as mock_start_transfer: controller = GlobusTransferController(mock_config832) result = controller.copy( file_path="some_dir/test_file.txt", @@ -190,7 +191,7 @@ def test_globus_transfer_controller_copy_failure( mocker.patch('prefect.blocks.system.Secret.load', return_value=MockSecretClass()) - with patch("orchestration.transfer_controller.start_transfer", return_value=False) as mock_start_transfer: + with patch("orchestration.transfer_controller.start_transfer", return_value=(False, "mock-task-id")) as mock_start_transfer: controller = GlobusTransferController(mock_config832) result = controller.copy( file_path="some_dir/test_file.txt", @@ -225,6 +226,50 @@ def test_globus_transfer_controller_copy_exception( assert result is False, "Expected False when TransferAPIError is raised." mock_start_transfer.assert_called_once() +def test_globus_transfer_controller_with_metrics( + mock_config832, mock_globus_endpoint, transfer_controller_module +): + """ + Test GlobusTransferController with PrometheusMetrics integration. + """ + GlobusTransferController = transfer_controller_module["GlobusTransferController"] + from orchestration.prometheus_utils import PrometheusMetrics + mock_prometheus = MagicMock(spec=PrometheusMetrics) + + with patch("orchestration.transfer_controller.start_transfer", return_value=(True, "mock-task-id")) as mock_start_transfer: + # Create the controller with mock prometheus metrics + controller = GlobusTransferController(mock_config832, prometheus_metrics=mock_prometheus) + + # Set up mock for get_transfer_file_info + mock_transfer_info = {"bytes_transferred": 1024 * 1024} # 1MB + controller.get_transfer_file_info = MagicMock(return_value=mock_transfer_info) + + # Execute the copy operation + result = controller.copy( + file_path="some_dir/test_file.txt", + source=mock_globus_endpoint, + destination=mock_globus_endpoint, + ) + + # Verify transfer was successful + assert result is True + mock_start_transfer.assert_called_once() + + # Verify metrics were collected and pushed + controller.get_transfer_file_info.assert_called_once_with("mock-task-id") + mock_prometheus.push_metrics_to_prometheus.assert_called_once() + + # Verify the metrics data + metrics_data = mock_prometheus.push_metrics_to_prometheus.call_args[0][0] + assert metrics_data["bytes_transferred"] == 1024 * 1024 + assert metrics_data["status"] == "success" + assert "timestamp" in metrics_data + assert "end_timestamp" in metrics_data + assert "duration_seconds" in metrics_data + assert "transfer_speed" in metrics_data + assert "machine" in metrics_data + assert "local_path" in metrics_data + assert "remote_path" in metrics_data # -------------------------------------------------------------------------- # Tests for SimpleTransferController diff --git a/orchestration/flows/bl7012/move.py b/orchestration/flows/bl7012/move.py index 43c2233d..4b141120 100644 --- a/orchestration/flows/bl7012/move.py +++ b/orchestration/flows/bl7012/move.py @@ -38,7 +38,7 @@ def transfer_data_to_nersc( f"Transferring {source_path} {source_endpoint.name} to {destination_endpoint.name}" ) - success = start_transfer( + success, _ = start_transfer( transfer_client, source_endpoint, source_path, @@ -76,7 +76,7 @@ def transfer_data_within_single_endpoint( # Start globus transfer logger.info(f"Transferring {source_path} to {dest_path} at {globus_endpoint.name}") - success = start_transfer( + success, _ = start_transfer( transfer_client, globus_endpoint, source_path, diff --git a/orchestration/flows/bl7012/move_recon.py b/orchestration/flows/bl7012/move_recon.py index 290604fa..62f1c4ab 100644 --- a/orchestration/flows/bl7012/move_recon.py +++ b/orchestration/flows/bl7012/move_recon.py @@ -45,7 +45,7 @@ def transfer_data_to_nersc( f"Transferring {source_path} {source_endpoint.name} to {destination_endpoint.name}" ) - success = start_transfer( + success, _ = start_transfer( transfer_client, source_endpoint, source_path, @@ -83,7 +83,7 @@ def transfer_data_within_single_endpoint( # Start globus transfer logger.info(f"Transferring {source_path} to {dest_path} at {globus_endpoint.name}") - success = start_transfer( + success, _ = start_transfer( transfer_client, globus_endpoint, source_path, diff --git a/orchestration/flows/bl832/move.py b/orchestration/flows/bl832/move.py index 946bbec2..6c61a90c 100644 --- a/orchestration/flows/bl832/move.py +++ b/orchestration/flows/bl832/move.py @@ -11,6 +11,7 @@ from orchestration.flows.bl832.config import Config832 from orchestration.globus.transfer import GlobusEndpoint, start_transfer from orchestration.prefect import schedule_prefect_flow +from orchestration.prometheus_utils import PrometheusMetrics API_KEY = os.getenv("API_KEY") @@ -32,7 +33,7 @@ def transfer_spot_to_data( source_path = os.path.join(spot832.root_path, file_path) dest_path = os.path.join(data832.root_path, file_path) - success = start_transfer( + success, _ = start_transfer( transfer_client, spot832, source_path, @@ -44,7 +45,6 @@ def transfer_spot_to_data( logger.info(f"spot832 to data832 globus task_id: {task}") return success - @task(name="transfer_data_to_nersc") def transfer_data_to_nersc( file_path: str, @@ -53,28 +53,38 @@ def transfer_data_to_nersc( nersc832: GlobusEndpoint, ): logger = get_run_logger() - + # if source_file begins with "/", it will mess up os.path.join if file_path[0] == "/": file_path = file_path[1:] - source_path = os.path.join(data832.root_path, file_path) - dest_path = os.path.join(nersc832.root_path, file_path) - - logger.info(f"Transferring {dest_path} data832 to nersc") - success = start_transfer( - transfer_client, - data832, - source_path, - nersc832, - dest_path, - max_wait_seconds=600, - logger=logger, + # Initialize config + config = Config832() + + # Import here to avoid circular imports + from orchestration.transfer_controller import get_transfer_controller, CopyMethod + + # Change prometheus_metrics=None if do not want to push metrics + # prometheus_metrics = None + prometheus_metrics = PrometheusMetrics() + # Get a Globus transfer controller + transfer_controller = get_transfer_controller( + transfer_type=CopyMethod.GLOBUS, + config=config, + prometheus_metrics=prometheus_metrics + ) + + # Use transfer controller to copy the file + # The controller automatically handles metrics collection and pushing + logger.info(f"Transferring {file_path} from data832 to nersc") + success = transfer_controller.copy( + file_path=file_path, + source=data832, + destination=nersc832 ) return success - @flow(name="new_832_file_flow") def process_new_832_file(file_path: str, is_export_control=False, @@ -179,7 +189,7 @@ def test_transfers_832(file_path: str = "/raw/transfer_tests/test.txt"): file = Path(file_path) new_file = str(file.with_name(f"test_{str(uuid.uuid4())}.txt")) logger.info(new_file) - success = start_transfer( + success, _ = start_transfer( config.tc, config.spot832, file_path, config.spot832, new_file, logger=logger ) logger.info(success) @@ -192,3 +202,15 @@ def test_transfers_832(file_path: str = "/raw/transfer_tests/test.txt"): logger.info( f"File successfully transferred from data832 to NERSC {spot832_path}. Task {task}" ) + + +@flow(name="test_832_transfers_grafana") +def test_transfers_832_grafana(file_path: str = "/raw/transfer_tests/test/"): + logger = get_run_logger() + config = Config832() + + task = transfer_data_to_nersc(file_path, config.tc, config.data832, config.nersc_alsdev) + + logger.info( + f"File successfully transferred from data832 to NERSC {file_path}. Task {task}" + ) \ No newline at end of file diff --git a/orchestration/globus/transfer.py b/orchestration/globus/transfer.py index f6947065..37ea3dfd 100644 --- a/orchestration/globus/transfer.py +++ b/orchestration/globus/transfer.py @@ -132,9 +132,13 @@ def start_transfer( # if a transfer failed, like for a file not found globus keeps trying for a long time # and won't let another be attempted task_id = task["task_id"] - return task_wait( + + success = task_wait( transfer_client, task_id, max_wait_seconds=max_wait_seconds, logger=logger ) + + return success, task_id + def is_globus_file_older(file_obj, older_than_days): diff --git a/orchestration/prometheus_utils.py b/orchestration/prometheus_utils.py new file mode 100644 index 00000000..9b3ca566 --- /dev/null +++ b/orchestration/prometheus_utils.py @@ -0,0 +1,77 @@ +import os +import uuid +from prometheus_client import Gauge, CollectorRegistry, push_to_gateway + +class PrometheusMetrics(): + def __init__(self): + try: + # Create a new registry + self.registry = CollectorRegistry() + + # Define the required metrics + # 1. Count of requests - Gauge + self.request_counter = Gauge('nersc_transfer_request_count', + 'Number of times the flow has been executed', + ['execution_id'], + registry=self.registry) + + self.bytes_counter = Gauge('nersc_transfer_total_bytes', + 'Number of bytes for all the executed flows', + ['execution_id'], + registry=self.registry) + + # 2. Total bytes transferred - Gauge + self.transfer_bytes = Gauge('nersc_transfer_file_bytes', + 'Total size of all file transfers to NERSC', + ['machine'], + registry=self.registry) + + # 3. Transfer speed - Gauge + self.transfer_speed = Gauge('nersc_transfer_speed_bytes_per_second', + 'Transfer speed for NERSC file transfers in bytes per second', + ['machine'], + registry=self.registry) + + # 4. Transfer time - Gauge + self.transfer_time = Gauge('nersc_transfer_time_seconds', + 'Time taken for NERSC file transfers in seconds', + ['machine'], + registry=self.registry) + except Exception as e: + print(f"Error initializing Prometheus metrics: {e}") + + def push_metrics_to_prometheus(self, metrics, logger): + """Push metrics directly to Prometheus Pushgateway.""" + PUSHGATEWAY_URL = os.getenv('PUSHGATEWAY_URL', 'http://localhost:9091') + JOB_NAME = os.getenv('JOB_NAME', 'nersc_transfer') + INSTANCE_LABEL = os.getenv('INSTANCE_LABEL', 'data_transfer') + + try: + # Generate a unique execution ID for this transfer + execution_id = f"exec_{str(uuid.uuid4())}" + + # Set the metrics + self.request_counter.labels(execution_id=execution_id).set(1) + self.bytes_counter.labels(execution_id=execution_id).set(metrics['bytes_transferred']) + self.transfer_bytes.labels(machine=metrics['machine']).set(metrics['bytes_transferred']) + self.transfer_time.labels(machine=metrics['machine']).set(metrics['duration_seconds']) + self.transfer_speed.labels(machine=metrics['machine']).set(metrics['transfer_speed']) + + # Log metrics for debugging + logger.info(f"Pushing metrics: transfer_bytes = {metrics['bytes_transferred']} bytes") + logger.info(f"Pushing metrics: transfer_speed = {metrics['transfer_speed']} bytes/second") + + # Push to Pushgateway with error handling + try: + push_to_gateway( + PUSHGATEWAY_URL, + job=JOB_NAME, + registry=self.registry, + grouping_key={'instance': INSTANCE_LABEL} + ) + logger.info(f"Successfully pushed metrics to Pushgateway at {PUSHGATEWAY_URL}") + except Exception as push_error: + logger.error(f"Error pushing to Pushgateway at {PUSHGATEWAY_URL}: {push_error}") + + except Exception as e: + logger.error(f"Error preparing metrics for Prometheus: {e}") \ No newline at end of file diff --git a/orchestration/transfer_controller.py b/orchestration/transfer_controller.py index 973b796d..09e82649 100644 --- a/orchestration/transfer_controller.py +++ b/orchestration/transfer_controller.py @@ -1,15 +1,18 @@ from abc import ABC, abstractmethod from dotenv import load_dotenv from enum import Enum +import datetime import logging import os import time -from typing import Generic, TypeVar +from typing import Generic, TypeVar, Optional import globus_sdk from orchestration.flows.bl832.config import Config832 +from orchestration.flows.bl832.job_controller import HPC from orchestration.globus.transfer import GlobusEndpoint, start_transfer +from orchestration.prometheus_utils import PrometheusMetrics logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) @@ -111,17 +114,119 @@ def copy( class GlobusTransferController(TransferController[GlobusEndpoint]): - def __init__( - self, - config: Config832 - ) -> None: - super().__init__(config) """ Use Globus Transfer to move data between endpoints. Args: TransferController: Abstract class for transferring data. """ + def __init__( + self, + config: Config832, + prometheus_metrics: Optional[PrometheusMetrics] = None + ) -> None: + super().__init__(config) + self.prometheus_metrics = prometheus_metrics + + def get_transfer_file_info( + self, + task_id: str, + transfer_client: Optional[globus_sdk.TransferClient] = None + ) -> Optional[dict]: + """ + Get information about a completed transfer from the Globus API. + + Args: + task_id (str): The Globus transfer task ID + transfer_client (TransferClient, optional): TransferClient instance + + Returns: + Optional[dict]: Task information including bytes_transferred, or None if unavailable + """ + if transfer_client is None: + transfer_client = self.config.tc + + try: + task_info = transfer_client.get_task(task_id) + task_dict = task_info.data + + if task_dict.get('status') == 'SUCCEEDED': + bytes_transferred = task_dict.get('bytes_transferred', 0) + bytes_checksummed = task_dict.get('bytes_checksummed', 0) + files_transferred = task_dict.get('files_transferred', 0) + effective_bytes_per_second = task_dict.get('effective_bytes_per_second', 0) + return { + 'bytes_transferred': bytes_transferred, + 'bytes_checksummed': bytes_checksummed, + 'files_transferred': files_transferred, + 'effective_bytes_per_second': effective_bytes_per_second + } + + return None + + except Exception as e: + logger.error(f"Error getting transfer task info: {e}") + return None + + def collect_and_push_metrics( + self, + start_time: float, + end_time: float, + file_path: str, + source: GlobusEndpoint, + destination: GlobusEndpoint, + file_size: int, + transfer_speed: float, + success: bool + ) -> None: + """ + Collect transfer metrics and push them to Prometheus. + + Args: + start_time (float): Transfer start time as UNIX timestamp. + end_time (float): Transfer end time as UNIX timestamp. + file_path (str): The path of the transferred file. + source (GlobusEndpoint): The source endpoint. + destination (GlobusEndpoint): The destination endpoint. + file_size (int): Size of the transferred file in bytes. + transfer_speed (float): Transfer speed (bytes/second) provided by Globus + success (bool): Whether the transfer was successful. + """ + try: + # Get machine_name + machine_name = destination.name + + # Convert UNIX timestamps to ISO format strings + start_datetime = datetime.datetime.fromtimestamp(start_time, tz=datetime.timezone.utc) + end_datetime = datetime.datetime.fromtimestamp(end_time, tz=datetime.timezone.utc) + start_timestamp = start_datetime.isoformat() + end_timestamp = end_datetime.isoformat() + + # Calculate duration in seconds + duration_seconds = end_time - start_time + + # Calculate transfer speed (bytes per second) + # transfer_speed = file_size / duration_seconds if duration_seconds > 0 and file_size > 0 else 0 + + # Prepare metrics dictionary + metrics = { + "timestamp": start_timestamp, + "end_timestamp": end_timestamp, + "local_path": os.path.join(source.root_path, file_path), + "remote_path": os.path.join(destination.root_path, file_path), + "bytes_transferred": file_size, + "duration_seconds": duration_seconds, + "transfer_speed": transfer_speed, + "status": "success" if success else "failed", + "machine": machine_name + } + + # Push metrics to Prometheus + self.prometheus_metrics.push_metrics_to_prometheus(metrics, logger) + + except Exception as e: + logger.error(f"Error collecting or pushing metrics: {e}") + def copy( self, file_path: str = None, @@ -135,22 +240,37 @@ def copy( file_path (str): The path of the file to copy. source (GlobusEndpoint): The source endpoint. destination (GlobusEndpoint): The destination endpoint. - transfer_client (TransferClient): The Globus transfer client. + + Returns: + bool: True if the transfer was successful, False otherwise. """ + + if not file_path: + logger.error("No file_path provided") + return False + + if not source or not destination: + logger.error("Source or destination endpoint not provided") + return False logger.info(f"Transferring {file_path} from {source.name} to {destination.name}") + # Remove leading slash if present if file_path[0] == "/": file_path = file_path[1:] source_path = os.path.join(source.root_path, file_path) dest_path = os.path.join(destination.root_path, file_path) logger.info(f"Transferring {source_path} to {dest_path}") + # Start the timer - start_time = time.time() + transfer_start_time = time.time() success = False + task_id = None # Initialize task_id here to prevent UnboundLocalError + file_size = 0 # Initialize file_size here as well + try: - success = start_transfer( + success, task_id = start_transfer( transfer_client=self.config.tc, source_endpoint=source, source_path=source_path, @@ -159,18 +279,41 @@ def copy( max_wait_seconds=600, logger=logger, ) + if success: logger.info("Transfer completed successfully.") else: logger.error("Transfer failed.") - return success + except globus_sdk.services.transfer.errors.TransferAPIError as e: logger.error(f"Failed to submit transfer: {e}") - return success + finally: # Stop the timer and calculate the duration - elapsed_time = time.time() - start_time - logger.info(f"Transfer process took {elapsed_time:.2f} seconds.") + transfer_end_time = time.time() + + # Try to get transfer info from the completed task + if task_id: + transfer_info = self.get_transfer_file_info(task_id) + if transfer_info: + file_size = transfer_info.get('bytes_transferred', 0) + transfer_speed = transfer_info.get('effective_bytes_per_second', 0) + logger.info(f"Globus Task Info: Transferred {file_size} bytes ") + logger.info(f"Globus Task Info: Effective speed: {transfer_speed} bytes/second") + + # Collect and push metrics if enabled + if self.prometheus_metrics and file_size > 0: + self.collect_and_push_metrics( + start_time=transfer_start_time, + end_time=transfer_end_time, + file_path=file_path, + source=source, + destination=destination, + file_size=file_size, + transfer_speed=transfer_speed, + success=success, + ) + return success @@ -248,7 +391,8 @@ class CopyMethod(Enum): def get_transfer_controller( transfer_type: CopyMethod, - config: Config832 + config: Config832, + prometheus_metrics: Optional[PrometheusMetrics] = None ) -> TransferController: """ Get the appropriate transfer controller based on the transfer type. @@ -261,7 +405,7 @@ def get_transfer_controller( TransferController: The transfer controller object. """ if transfer_type == CopyMethod.GLOBUS: - return GlobusTransferController(config) + return GlobusTransferController(config, prometheus_metrics) elif transfer_type == CopyMethod.SIMPLE: return SimpleTransferController(config) else: @@ -288,4 +432,4 @@ def get_transfer_controller( if success: logger.info("Simple transfer succeeded.") else: - logger.error("Simple transfer failed.") + logger.error("Simple transfer failed.") \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index 4c31b498..b5b1cc2f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -13,4 +13,5 @@ pyyaml authlib sfapi_client globus-compute-sdk @ git+https://github.com/globus/globus-compute.git@d1731340074be56861ec91d732bdff44f8e2b46e#subdirectory=compute_sdk -griffe==0.47 \ No newline at end of file +griffe==0.47 +prometheus_client==0.21.1 \ No newline at end of file