Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ dependencies = [
"dagster-gcp ~=0.27.0",
"dagster-postgres ~=0.27.0",
"dagster-slack ~=0.27.0",
"dagster-sling>=0.27.1",
"dagster-webserver ~= 1.10",
"dbt-duckdb ~= 1.9.0",
"dbt-trino ~= 1.9.0",
Expand Down
13 changes: 13 additions & 0 deletions src/ol_orchestrate/assets/edxorg_sling.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
from typing import Any

from dagster_sling import SlingResource, sling_assets


def generate_edxorg_raw_table_assets(replication_config: dict[str, Any]):
@sling_assets(name="edxorg_raw_tables", replication_config=replication_config)
def edxorg_raw_table_assets(context, sling: SlingResource):
yield from sling.replicate(context=context)
for row in sling.stream_raw_logs():
context.log.info(row)

return edxorg_raw_table_assets
38 changes: 38 additions & 0 deletions src/ol_orchestrate/definitions/edx/generate_edxorg_raw_table.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
from dagster import (
Definitions,
)
from dagster_aws.s3 import S3Resource

from ol_orchestrate.assets.edxorg_sling import generate_edxorg_raw_table_assets
from ol_orchestrate.lib.constants import DAGSTER_ENV
from ol_orchestrate.lib.sling_config import (
create_sling_resource,
edxorg_replication_config,
)

if DAGSTER_ENV == "production":
source_bucket_name = "ol-data-lake-landing-zone-production"
glue_warehouse = "s3://ol-data-lake-raw-production/"
glue_namespace = "ol_data_lake_raw_production"
else:
source_bucket_name = "ol-data-lake-landing-zone-qa"
glue_warehouse = "s3://ol-data-lake-raw-qa/"
glue_namespace = "ol_data_lake_raw_qa"

source_bucket_prefix = (
f"s3://{source_bucket_name}/edxorg-raw-data/edxorg/raw_data/db_table"
)

s3_resource = S3Resource()

sling_resource = create_sling_resource(source_bucket_name, glue_warehouse)
replication_config = edxorg_replication_config(source_bucket_prefix, glue_namespace)
assets = generate_edxorg_raw_table_assets(replication_config)

edxorg_raw_table_defs = Definitions(
assets=[assets],
resources={
"sling": sling_resource,
"s3": s3_resource,
},
)
70 changes: 70 additions & 0 deletions src/ol_orchestrate/lib/sling_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
from dagster_sling import SlingConnectionResource, SlingResource


def create_sling_resource(source_bucket_name, glue_warehouse):
"""Create SlingResource instance with the appropriate connections.

Args:
source_bucket_name (str): The name of the S3 source bucket.
e.g., "ol-data-lake-landing-zone-qa".
glue_warehouse (str): The Destination Glue warehouse location.
e.g., "s3://ol-data-lake-raw-qa/".

Returns:
SlingResource: Configured SlingResource instance.
"""
return SlingResource(
connections=[
SlingConnectionResource(
name="S3_SOURCE",
type="s3",
bucket=source_bucket_name,
s3_region="us-east-1",
),
SlingConnectionResource(
name="ICEBERG",
type="iceberg",
catalog_type="glue",
glue_warehouse=glue_warehouse,
s3_region="us-east-1",
),
]
)


def edxorg_replication_config(source_bucket_prefix, glue_namespace):
"""Generate the Sling replication configuration for edxorg data streams.

Args:
source_bucket_prefix (str): The S3 bucket prefix for edxorg raw data.
glue_namespace (str): The Destination Glue namespace for the raw data.

Returns:
dict: Replication configuration dictionary for edxorg streams.
"""
return {
"source": "S3_SOURCE",
"target": "ICEBERG",
"defaults": {
"mode": "incremental",
"source_options": {"delimiter": "\t"},
# uses the _sling_loaded_at column in the target table
"update_key": "_sling_loaded_at",
"debug": True,
},
"streams": {
f"{source_bucket_prefix}/auth_user/prod/": {
"object": f"{glue_namespace}.raw__edxorg__s3__auth_user",
},
f"{source_bucket_prefix}/student_courseenrollment/prod/": {
"object": f"{glue_namespace}.raw__edxorg__s3__student_courseenrollment",
"source_options": {
"delimiter": "\t",
"chunk_size": 10000,
},
},
},
"env": {
"SLING_LOADED_AT_COLUMN": "timestamp",
},
}
1 change: 1 addition & 0 deletions src/ol_orchestrate/workspace.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ load_from:
- python_module: ol_orchestrate.definitions.edx.openedx_data_extract
- python_module: ol_orchestrate.definitions.edx.retrieve_edxorg_raw_data
- python_module: ol_orchestrate.definitions.edx.sync_program_credential_reports
- python_module: ol_orchestrate.definitions.edx.generate_edxorg_raw_table
- python_module: ol_orchestrate.definitions.learning_resource.extract_api_data
- python_module: ol_orchestrate.definitions.canvas_course_export
- python_module: ol_orchestrate.definitions.lakehouse.elt
Expand Down
34 changes: 34 additions & 0 deletions uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.