Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion .env.example
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
GLOBUS_CLIENT_ID=<globus_client_id>
GLOBUS_CLIENT_SECRET=<globus_client_secret>
PREFECT_API_URL=<url_of_prefect_server>
PREFECT_API_KEY=<prefect_client_secret>
PREFECT_API_KEY=<prefect_client_secret>
PUSHGATEWAY_URL=<url_of_pushgateway_server>
JOB_NAME=<jobname_for_pushgateway>
INSTANCE_LABEL=<label_for_pushgateway>
10 changes: 5 additions & 5 deletions .github/workflows/python-app.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,16 @@ jobs:

steps:
- uses: actions/checkout@v2
- name: Set up Python 3.9
uses: actions/setup-python@v2
- name: Set up Python 3.11
uses: actions/setup-python@v5
with:
python-version: 3.11
cache: 'pip'
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install flake8 pytest
pip install .
python -m pip install --no-cache-dir --upgrade pip
pip install --no-cache-dir flake8 pytest
pip install --no-cache-dir .
if [ -f requirements.txt ]; then pip install -r requirements.txt; fi
if [ -f requirements-dev.txt ]; then pip install -r requirements-dev.txt; fi
- name: Lint with flake8
Expand Down
5 changes: 5 additions & 0 deletions create_deployments_832.sh
100755 → 100644
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,11 @@ prefect work-pool create 'scicat_ingest_pool'
# new_file_832_flow_pool
# in docker-compose.yaml:
# command: prefect agent start --pool "new_file_832_flow_pool"

# file transfer from data832 to nersc832 with Grafana Monitoring enabled
prefect deployment build ./orchestration/flows/bl832/move.py:test_transfers_832_grafana -n test_transfers_832_grafana -p new_file_832_flow_pool -q test_transfers_832_queue
prefect deployment apply test_transfers_832_grafana-deployment.yaml

prefect deployment build ./orchestration/flows/bl832/move.py:process_new_832_file -n new_file_832 -p new_file_832_flow_pool -q new_file_832_queue
prefect deployment apply process_new_832_file-deployment.yaml

Expand Down
4 changes: 2 additions & 2 deletions orchestration/_tests/test_globus.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ def test_succeeded_transfer():
source_endpoint = GlobusEndpoint("123", "source.magrathea.com", "/root")
dest_endpoint = GlobusEndpoint("456", "dest.magrathea.com", "/root")

result = start_transfer(
result, _ = start_transfer(
transfer_client, source_endpoint, "/42/mice.jpg", dest_endpoint, "/42/mice.jpg"
)

Expand All @@ -80,7 +80,7 @@ def test_failed_transfer():
source_endpoint = GlobusEndpoint("123", "source.magrathea.com", "/root")
dest_endpoint = GlobusEndpoint("456", "dest.magrathea.com", "/root")

result = start_transfer(
result, _ = start_transfer(
transfer_client, source_endpoint, "/42/mice.jpg", dest_endpoint, "/42/mice.jpg"
)

Expand Down
49 changes: 47 additions & 2 deletions orchestration/_tests/test_transfer_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from .test_globus import MockTransferClient



@pytest.fixture(autouse=True, scope="session")
def prefect_test_fixture():
"""
Expand Down Expand Up @@ -160,7 +161,7 @@ def test_globus_transfer_controller_copy_success(
# Patch any Secret.load calls to avoid real Prefect Cloud calls
mocker.patch('prefect.blocks.system.Secret.load', return_value=MockSecretClass())

with patch("orchestration.transfer_controller.start_transfer", return_value=True) as mock_start_transfer:
with patch("orchestration.transfer_controller.start_transfer", return_value=(True, "mock-task-id")) as mock_start_transfer:
controller = GlobusTransferController(mock_config832)
result = controller.copy(
file_path="some_dir/test_file.txt",
Expand Down Expand Up @@ -190,7 +191,7 @@ def test_globus_transfer_controller_copy_failure(

mocker.patch('prefect.blocks.system.Secret.load', return_value=MockSecretClass())

with patch("orchestration.transfer_controller.start_transfer", return_value=False) as mock_start_transfer:
with patch("orchestration.transfer_controller.start_transfer", return_value=(False, "mock-task-id")) as mock_start_transfer:
controller = GlobusTransferController(mock_config832)
result = controller.copy(
file_path="some_dir/test_file.txt",
Expand Down Expand Up @@ -225,6 +226,50 @@ def test_globus_transfer_controller_copy_exception(
assert result is False, "Expected False when TransferAPIError is raised."
mock_start_transfer.assert_called_once()

def test_globus_transfer_controller_with_metrics(
mock_config832, mock_globus_endpoint, transfer_controller_module
):
"""
Test GlobusTransferController with PrometheusMetrics integration.
"""
GlobusTransferController = transfer_controller_module["GlobusTransferController"]
from orchestration.prometheus_utils import PrometheusMetrics
mock_prometheus = MagicMock(spec=PrometheusMetrics)

with patch("orchestration.transfer_controller.start_transfer", return_value=(True, "mock-task-id")) as mock_start_transfer:
# Create the controller with mock prometheus metrics
controller = GlobusTransferController(mock_config832, prometheus_metrics=mock_prometheus)

# Set up mock for get_transfer_file_info
mock_transfer_info = {"bytes_transferred": 1024 * 1024} # 1MB
controller.get_transfer_file_info = MagicMock(return_value=mock_transfer_info)

# Execute the copy operation
result = controller.copy(
file_path="some_dir/test_file.txt",
source=mock_globus_endpoint,
destination=mock_globus_endpoint,
)

# Verify transfer was successful
assert result is True
mock_start_transfer.assert_called_once()

# Verify metrics were collected and pushed
controller.get_transfer_file_info.assert_called_once_with("mock-task-id")
mock_prometheus.push_metrics_to_prometheus.assert_called_once()

# Verify the metrics data
metrics_data = mock_prometheus.push_metrics_to_prometheus.call_args[0][0]
assert metrics_data["bytes_transferred"] == 1024 * 1024
assert metrics_data["status"] == "success"
assert "timestamp" in metrics_data
assert "end_timestamp" in metrics_data
assert "duration_seconds" in metrics_data
assert "transfer_speed" in metrics_data
assert "machine" in metrics_data
assert "local_path" in metrics_data
assert "remote_path" in metrics_data

# --------------------------------------------------------------------------
# Tests for SimpleTransferController
Expand Down
4 changes: 2 additions & 2 deletions orchestration/flows/bl7012/move.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def transfer_data_to_nersc(
f"Transferring {source_path} {source_endpoint.name} to {destination_endpoint.name}"
)

success = start_transfer(
success, _ = start_transfer(
transfer_client,
source_endpoint,
source_path,
Expand Down Expand Up @@ -76,7 +76,7 @@ def transfer_data_within_single_endpoint(

# Start globus transfer
logger.info(f"Transferring {source_path} to {dest_path} at {globus_endpoint.name}")
success = start_transfer(
success, _ = start_transfer(
transfer_client,
globus_endpoint,
source_path,
Expand Down
4 changes: 2 additions & 2 deletions orchestration/flows/bl7012/move_recon.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ def transfer_data_to_nersc(
f"Transferring {source_path} {source_endpoint.name} to {destination_endpoint.name}"
)

success = start_transfer(
success, _ = start_transfer(
transfer_client,
source_endpoint,
source_path,
Expand Down Expand Up @@ -83,7 +83,7 @@ def transfer_data_within_single_endpoint(

# Start globus transfer
logger.info(f"Transferring {source_path} to {dest_path} at {globus_endpoint.name}")
success = start_transfer(
success, _ = start_transfer(
transfer_client,
globus_endpoint,
source_path,
Expand Down
56 changes: 39 additions & 17 deletions orchestration/flows/bl832/move.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from orchestration.flows.bl832.config import Config832
from orchestration.globus.transfer import GlobusEndpoint, start_transfer
from orchestration.prefect import schedule_prefect_flow
from orchestration.prometheus_utils import PrometheusMetrics


API_KEY = os.getenv("API_KEY")
Expand All @@ -32,7 +33,7 @@ def transfer_spot_to_data(

source_path = os.path.join(spot832.root_path, file_path)
dest_path = os.path.join(data832.root_path, file_path)
success = start_transfer(
success, _ = start_transfer(
transfer_client,
spot832,
source_path,
Expand All @@ -44,7 +45,6 @@ def transfer_spot_to_data(
logger.info(f"spot832 to data832 globus task_id: {task}")
return success


@task(name="transfer_data_to_nersc")
def transfer_data_to_nersc(
file_path: str,
Expand All @@ -53,28 +53,38 @@ def transfer_data_to_nersc(
nersc832: GlobusEndpoint,
):
logger = get_run_logger()

# if source_file begins with "/", it will mess up os.path.join
if file_path[0] == "/":
file_path = file_path[1:]
source_path = os.path.join(data832.root_path, file_path)
dest_path = os.path.join(nersc832.root_path, file_path)

logger.info(f"Transferring {dest_path} data832 to nersc")

success = start_transfer(
transfer_client,
data832,
source_path,
nersc832,
dest_path,
max_wait_seconds=600,
logger=logger,
# Initialize config
config = Config832()

# Import here to avoid circular imports
from orchestration.transfer_controller import get_transfer_controller, CopyMethod

# Change prometheus_metrics=None if do not want to push metrics
# prometheus_metrics = None
prometheus_metrics = PrometheusMetrics()
# Get a Globus transfer controller
transfer_controller = get_transfer_controller(
transfer_type=CopyMethod.GLOBUS,
config=config,
prometheus_metrics=prometheus_metrics
)

# Use transfer controller to copy the file
# The controller automatically handles metrics collection and pushing
logger.info(f"Transferring {file_path} from data832 to nersc")
success = transfer_controller.copy(
file_path=file_path,
source=data832,
destination=nersc832
)

return success


@flow(name="new_832_file_flow")
def process_new_832_file(file_path: str,
is_export_control=False,
Expand Down Expand Up @@ -179,7 +189,7 @@ def test_transfers_832(file_path: str = "/raw/transfer_tests/test.txt"):
file = Path(file_path)
new_file = str(file.with_name(f"test_{str(uuid.uuid4())}.txt"))
logger.info(new_file)
success = start_transfer(
success, _ = start_transfer(
config.tc, config.spot832, file_path, config.spot832, new_file, logger=logger
)
logger.info(success)
Expand All @@ -192,3 +202,15 @@ def test_transfers_832(file_path: str = "/raw/transfer_tests/test.txt"):
logger.info(
f"File successfully transferred from data832 to NERSC {spot832_path}. Task {task}"
)


@flow(name="test_832_transfers_grafana")
def test_transfers_832_grafana(file_path: str = "/raw/transfer_tests/test/"):
logger = get_run_logger()
config = Config832()

task = transfer_data_to_nersc(file_path, config.tc, config.data832, config.nersc_alsdev)

logger.info(
f"File successfully transferred from data832 to NERSC {file_path}. Task {task}"
)
6 changes: 5 additions & 1 deletion orchestration/globus/transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,9 +132,13 @@ def start_transfer(
# if a transfer failed, like for a file not found globus keeps trying for a long time
# and won't let another be attempted
task_id = task["task_id"]
return task_wait(

success = task_wait(
transfer_client, task_id, max_wait_seconds=max_wait_seconds, logger=logger
)

return success, task_id



def is_globus_file_older(file_obj, older_than_days):
Expand Down
77 changes: 77 additions & 0 deletions orchestration/prometheus_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
import os
import uuid
from prometheus_client import Gauge, CollectorRegistry, push_to_gateway

class PrometheusMetrics():
def __init__(self):
try:
# Create a new registry
self.registry = CollectorRegistry()

# Define the required metrics
# 1. Count of requests - Gauge
self.request_counter = Gauge('nersc_transfer_request_count',
'Number of times the flow has been executed',
['execution_id'],
registry=self.registry)

self.bytes_counter = Gauge('nersc_transfer_total_bytes',
'Number of bytes for all the executed flows',
['execution_id'],
registry=self.registry)

# 2. Total bytes transferred - Gauge
self.transfer_bytes = Gauge('nersc_transfer_file_bytes',
'Total size of all file transfers to NERSC',
['machine'],
registry=self.registry)

# 3. Transfer speed - Gauge
self.transfer_speed = Gauge('nersc_transfer_speed_bytes_per_second',
'Transfer speed for NERSC file transfers in bytes per second',
['machine'],
registry=self.registry)

# 4. Transfer time - Gauge
self.transfer_time = Gauge('nersc_transfer_time_seconds',
'Time taken for NERSC file transfers in seconds',
['machine'],
registry=self.registry)
except Exception as e:
print(f"Error initializing Prometheus metrics: {e}")

def push_metrics_to_prometheus(self, metrics, logger):
"""Push metrics directly to Prometheus Pushgateway."""
PUSHGATEWAY_URL = os.getenv('PUSHGATEWAY_URL', 'http://localhost:9091')
JOB_NAME = os.getenv('JOB_NAME', 'nersc_transfer')
INSTANCE_LABEL = os.getenv('INSTANCE_LABEL', 'data_transfer')

try:
# Generate a unique execution ID for this transfer
execution_id = f"exec_{str(uuid.uuid4())}"

# Set the metrics
self.request_counter.labels(execution_id=execution_id).set(1)
self.bytes_counter.labels(execution_id=execution_id).set(metrics['bytes_transferred'])
self.transfer_bytes.labels(machine=metrics['machine']).set(metrics['bytes_transferred'])
self.transfer_time.labels(machine=metrics['machine']).set(metrics['duration_seconds'])
self.transfer_speed.labels(machine=metrics['machine']).set(metrics['transfer_speed'])

# Log metrics for debugging
logger.info(f"Pushing metrics: transfer_bytes = {metrics['bytes_transferred']} bytes")
logger.info(f"Pushing metrics: transfer_speed = {metrics['transfer_speed']} bytes/second")

# Push to Pushgateway with error handling
try:
push_to_gateway(
PUSHGATEWAY_URL,
job=JOB_NAME,
registry=self.registry,
grouping_key={'instance': INSTANCE_LABEL}
)
logger.info(f"Successfully pushed metrics to Pushgateway at {PUSHGATEWAY_URL}")
except Exception as push_error:
logger.error(f"Error pushing to Pushgateway at {PUSHGATEWAY_URL}: {push_error}")

except Exception as e:
logger.error(f"Error preparing metrics for Prometheus: {e}")
Loading