A modular data pipeline for extracting data from Superset API and loading it into PostgreSQL, with Airflow integration for scheduling and monitoring.
pip install -r requirements.txt
Required dependencies:
- psycopg2-binary (>=2.9.0)
- PyYAML (>=6.0)
- requests (>=2.28.0)
- python-dotenv (>=1.0.0)
You can configure the pipeline using either:
Copy and customize one of the example config files:
# For JSON configuration
cp config.example.json config.json
# For YAML configuration
cp config.example.yaml config.yaml
Copy and customize the environment file:
cp .env.example .env
Then edit the .env
file with your specific configuration values.
Run the database setup script to create the required tables:
psql -U your_postgres_user -d your_database -f setup_database.sql
Or manually create the PostgreSQL database and table:
CREATE DATABASE ucs_data;
\c ucs_data;
CREATE TABLE superset_data (
id VARCHAR(255) PRIMARY KEY,
value JSONB,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE INDEX idx_superset_data_updated_at ON superset_data(updated_at);
# Using config file
python ucs_data_pipeline.py -c config.yaml
# Using environment variables only
python ucs_data_pipeline.py
# With verbose logging
python ucs_data_pipeline.py -c config.yaml -v
- Set up Airflow environment:
./setup_airflow.sh
- Run the pipeline using Airflow:
./run_pipeline.sh
- Check the pipeline status:
./check_dag_status.sh
SUPERSET_BASE_URL
: Superset API endpoint URLSUPERSET_USERNAME
: Username for user login authenticationSUPERSET_PASSWORD
: Password for user login authenticationSUPERSET_CHART_ID
: ID of the Superset chart to extract data fromSUPERSET_TIMEOUT
: Request timeout in seconds (default: 30)SUPERSET_RETRIES
: Number of retry attempts (default: 3)SUPERSET_BATCH_SIZE
: Batch size for data extraction (default: 5000)
PG_HOST
: Database host (default: localhost)PG_DB
: Database namePG_USER
: Database userPG_PASSWORD
: Database passwordPG_PORT
: Database port (default: 5432)PG_BATCH_SIZE
: Batch size for data loading (default: 1000)
LOG_LEVEL
: Log level - DEBUG, INFO, WARNING, ERROR, CRITICAL (default: INFO)LOG_FORMAT
: Log message formatLOG_FILE
: Log file path (default: logs/ucs_pipeline.log)
PIPELINE_PARALLEL
: Enable parallel processing (default: false)PIPELINE_MAX_WORKERS
: Maximum worker threads for parallel processing (default: 4)
The project includes Airflow integration for scheduled pipeline execution:
- DAG ID:
ucs_data_pipeline
- Schedule: Daily
- Start Date: July 1, 2025
setup_airflow.sh
: Sets up the Airflow environmentrun_pipeline.sh
: Triggers the pipeline DAGcheck_dag_status.sh
: Checks the status of the pipeline DAGreset_airflow.sh
: Resets the Airflow databaserestart_scheduler.sh
: Restarts the Airflow scheduler
ucs_pipeline/
├── config/ # Configuration management
├── extractor/ # Data extraction from Superset
├── transformer/ # Data transformation logic
├── loader/ # Data loading to PostgreSQL
└── utils/ # Utilities (logging, error handling)
airflow/
├── dags/ # Airflow DAG definitions
└── logs/ # Airflow execution logs
logs/ # Pipeline execution logs
After running the pipeline, an execution summary is displayed with:
- Execution time
- Records extracted
- Records loaded
- Any errors encountered