A comprehensive data pipeline for monitoring, analyzing, and billing GPU usage in compute clusters. This project demonstrates a complete data engineering solution using Apache Airflow, PyIceberg, and modern analytics tools.
- Overview
- Architecture
- Features
- Quick Start
- Project Structure
- Pipeline Components
- Configuration
- Testing
- Dashboard
- Troubleshooting
- Contributing
This project implements a production-ready GPU usage monitoring and billing pipeline that:
- Generates realistic GPU usage logs for testing
- Ingests raw data into a structured data warehouse
- Transforms data into analytical fact and dimension tables
- Aggregates usage metrics for monitoring and billing
- Visualizes results through an interactive dashboard
- Orchestrates everything with Apache Airflow
βββββββββββββββββββ βββββββββββββββββββ βββββββββββββββββββ
β Data Source βββββΆβ Airflow DAG βββββΆβ Iceberg Tables β
β (GPU Logs) β β (Orchestrator)β β (Data Lake) β
βββββββββββββββββββ βββββββββββββββββββ βββββββββββββββββββ
β
βΌ
βββββββββββββββββββ βββββββββββββββββββ
β Aggregation βββββΆβ Dashboard β
β & Analytics β β (Streamlit) β
βββββββββββββββββββ βββββββββββββββββββ
- Data Generation:
generate_gpu_logs.py- Creates realistic GPU usage data - Data Ingestion:
ingest.py- Transforms and loads data into Iceberg tables - Data Aggregation:
aggregate.py- Computes metrics and billing calculations - Orchestration: Airflow DAG with three tasks: generate β ingest β aggregate
- Storage: Mock Iceberg catalog (easily replaceable with real Iceberg)
- Visualization: Streamlit dashboard with interactive charts
- End-to-end data flow from generation to visualization
- Robust error handling and logging
- Idempotent operations (safe to re-run)
- Time-based aggregations (hourly, daily, weekly, monthly)
- Multi-dimensional analysis (by node, GPU, job, user)
- Derived metrics (efficiency ratios, usage categories)
- GPU-hour based pricing
- Memory overage charges
- Cluster overhead calculations
- User/job cost allocation
- Real-time metrics and KPIs
- Interactive charts and filters
- Cost analysis and trends
- Performance monitoring
- Comprehensive unit tests
- Mock catalog for development
- Schema evolution support
- Data validation
- Docker and Docker Compose
- Python 3.12+
- 4GB+ RAM available
git clone <repository-url>
cd gpu_usage_pipelinecd airflow
docker-compose up -dWait for all services to start (2-3 minutes). Airflow will be available at:
- Web UI: http://localhost:8080
- Username:
airflow - Password:
airflow
-
Trigger the DAG manually:
- Go to Airflow UI β DAGs β
gpu_usage_pipeline - Click "Play" button to trigger a run
- Go to Airflow UI β DAGs β
-
Or wait for scheduled runs:
- The DAG runs automatically every hour
- Data: Check
/opt/airflow/data/for generated files - Logs: View in Airflow UI or
/opt/airflow/logs/ - Dashboard: Run
streamlit run dashboard/app.py(see Dashboard section)
gpu_usage_pipeline/
βββ airflow/ # Airflow configuration
β βββ dags/ # DAG definitions
β β βββ gpu_usage_pipeline_dag.py # Main pipeline DAG
β β βββ generate_gpu_logs.py # Data generation
β β βββ ingest.py # Data ingestion
β β βββ aggregate.py # Analytics & billing
β β βββ test_ingest.py # Unit tests
β β βββ test_aggregate.py # Unit tests
β βββ plugins/ # Custom plugins
β β βββ iceberg_catalog.py # Iceberg schema definitions
β β βββ mock_iceberg_catalog.py # Mock catalog for testing
β βββ docker-compose.yaml # Airflow services
β βββ Dockerfile # Custom Airflow image
β βββ requirements.txt # Python dependencies
βββ dashboard/ # Streamlit dashboard
β βββ app.py # Main dashboard
β βββ requirements.txt # Dashboard dependencies
βββ data/ # Data storage
β βββ raw/ # Raw GPU logs
β βββ iceberg/ # Iceberg tables (CSV format)
β βββ aggregates/ # Aggregated data
βββ README.md # This file
Generates realistic GPU usage logs with:
- Timestamps: Epoch milliseconds or ISO format
- Metrics: GPU utilization, memory usage, job IDs
- Variability: Realistic usage patterns and distributions
# Generate 2 hours of data
generate_logs(hours=2)Transforms raw logs into analytical tables:
Fact Table (gpu_usage_fact):
- Primary usage metrics
- Derived columns (efficiency ratios, categories)
- Time-series data for analysis
Dimension Tables:
nodes_dim: Node informationjobs_dim: Job metadata
Features:
- Automatic timestamp parsing (epoch/ISO)
- Deduplication and upsert logic
- Ingestion state tracking
- Schema evolution support
Computes analytical metrics:
Time Aggregations:
- Hourly, daily, weekly, monthly summaries
- GPU hours, effective utilization
- Memory usage patterns
Dimensional Aggregations:
- Per-node performance metrics
- Per-job resource consumption
- User cost allocation
Billing Calculations:
- GPU-hour based pricing ($2.50/hour)
- Memory overage charges ($0.10/GB over 16GB)
- Cluster overhead (5%)
Three-task pipeline:
generate_logs: Creates new GPU usage dataingest_to_iceberg: Loads data into analytical tablesaggregate_usage: Computes metrics and billing
Schedule: Runs every hour Dependencies: generate β ingest β aggregate
# Airflow configuration
AIRFLOW_UID=50000
AIRFLOW_GID=0
# GPU pricing (in aggregate.py)
GPU_HOURLY_RATE=2.50 # USD per GPU-hour
MEMORY_OVERAGE_RATE=0.10 # USD per GB over 16GB
CLUSTER_OVERHEAD_RATE=0.05 # 5% overhead/opt/airflow/data/
βββ raw/ # Raw GPU logs (CSV)
βββ iceberg/ # Iceberg tables (CSV format)
βββ aggregates/ # Aggregated data (CSV)The project uses a mock Iceberg catalog for development:
- Location:
/opt/airflow/data/iceberg/ - Format: CSV files (easily replaceable with real Iceberg)
- Tables:
gpu_usage_fact,nodes_dim,jobs_dim
# Test ingestion pipeline
cd airflow
docker-compose exec airflow-worker python -m pytest dags/test_ingest.py -v
# Test aggregation functions
docker-compose exec airflow-worker python -m pytest dags/test_aggregate.py -v# Generate test data
docker-compose exec airflow-worker python -c "
from dags.generate_gpu_logs import generate_logs
generate_logs(hours=1)
"# Test ingestion pipeline
docker-compose exec airflow-worker python -c "
from dags.ingest import ingest_raw_data
ingest_raw_data()
"# Test aggregation pipeline
docker-compose exec airflow-worker python -c "
from dags.aggregate import run_aggregation_pipeline
run_aggregation_pipeline()
"# Install dashboard dependencies
pip install -r dashboard/requirements.txt
# Start Streamlit dashboard
streamlit run dashboard/app.pyDashboard will be available at: http://localhost:8501
- Real-time Metrics: GPU hours, utilization, costs
- Interactive Charts: Time series, bar charts, scatter plots
- Filtering: By date range, nodes, jobs
- Cost Analysis: Billing breakdown, cost trends
- Performance Monitoring: Node and job performance
1. Airflow containers not starting
# Check logs
docker-compose logs airflow-init
# Restart services
docker-compose down
docker-compose up -d2. Permission errors
# Fix data directory permissions
sudo chown -R 50000:0 data/3. DAG not appearing
# Check DAG parsing
docker-compose exec airflow-scheduler airflow dags list
# Restart scheduler
docker-compose restart airflow-scheduler4. Mock catalog issues
# Clear mock data
rm -rf data/iceberg/*- Airflow logs:
/opt/airflow/logs/ - Application logs: Docker container logs
- Data files:
/opt/airflow/data/
# Run with debug logging
docker-compose exec airflow-worker python -c "
import logging
logging.basicConfig(level=logging.DEBUG)
from dags.ingest import ingest_raw_data
ingest_raw_data()
"- Fork the repository
- Create a feature branch
- Make changes
- Add tests
- Run test suite
- Submit pull request
- Python: PEP 8 with 88-character line length
- Documentation: Google-style docstrings
- Tests: Unit tests for all functions
- Type hints: Use type annotations
- Unit tests pass
- Integration tests pass
- DAG runs successfully
- Dashboard displays correctly
- Documentation updated
This project is licensed under the MIT License - see the LICENSE file for details.
- Apache Airflow for workflow orchestration
- PyIceberg for data lake functionality
- Streamlit for interactive dashboards
- Plotly for beautiful visualizations
Happy GPU monitoring! π