Skip to content

Commit 7574e09

Browse files
authored
Integrate canvas course export with google sheet (#1711)
* integrate canvas course export with google sheet * add course_content_metadata to sensor * update cursor * updates
1 parent 48e0214 commit 7574e09

File tree

6 files changed

+195
-70
lines changed

6 files changed

+195
-70
lines changed

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ dependencies = [
3333
"polars ~= 1.19",
3434
"pyarrow ~=21.0.0",
3535
"pydantic ~=2.11.1",
36+
"pygsheets>=2.0.6",
3637
"pymysql ~= 1.1",
3738
"s3fs (>=2025.2.0,<2026.0.0)",
3839
"trino[external-authentication-token-cache]>=0.336.0",

src/ol_orchestrate/assets/canvas.py

Lines changed: 2 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,8 @@
99
AssetKey,
1010
AssetOut,
1111
DataVersion,
12+
DynamicPartitionsDefinition,
1213
Output,
13-
StaticPartitionsDefinition,
1414
asset,
1515
multi_asset,
1616
)
@@ -22,68 +22,7 @@
2222
)
2323
from ol_orchestrate.lib.utils import compute_zip_content_hash
2424

25-
# predefined course IDs to export
26-
canvas_course_ids = StaticPartitionsDefinition(
27-
[
28-
"155",
29-
"7023",
30-
"14566",
31-
"28766",
32-
"28768",
33-
"28770",
34-
"28772",
35-
"28774",
36-
"28777",
37-
"28751",
38-
"28753",
39-
"28755",
40-
"28759",
41-
"28765",
42-
"28767",
43-
"28785",
44-
# "28760",
45-
# "28761",
46-
# "28762",
47-
"28803",
48-
"28805",
49-
"28807",
50-
"28808",
51-
"28811",
52-
"28813",
53-
"28815",
54-
"28816",
55-
"28818",
56-
"28821",
57-
"28822",
58-
"28824",
59-
"28839",
60-
"28841",
61-
"28842",
62-
"28845",
63-
"28847",
64-
"28849",
65-
"28880",
66-
"33162",
67-
"33448",
68-
"34062",
69-
"34545",
70-
"34562",
71-
"34580",
72-
"34593",
73-
"34599",
74-
"34625",
75-
"34627",
76-
"34631",
77-
"34633",
78-
"34642",
79-
"34653",
80-
"34681",
81-
"34716",
82-
"34721",
83-
# "34859",
84-
"35054",
85-
]
86-
)
25+
canvas_course_ids = DynamicPartitionsDefinition(name="canvas_course_ids")
8726

8827

8928
def _extract_course_files(context, course_id):

src/ol_orchestrate/definitions/canvas_course_export.py

Lines changed: 43 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,12 @@
1+
from typing import Any
2+
13
from dagster import (
4+
ConfigurableResource,
25
Definitions,
3-
build_schedule_from_partitioned_job,
6+
OpExecutionContext,
7+
RunRequest,
48
define_asset_job,
9+
schedule,
510
)
611
from dagster_aws.s3 import S3Resource
712

@@ -15,24 +20,54 @@
1520
default_file_object_io_manager,
1621
default_io_manager,
1722
)
18-
from ol_orchestrate.lib.utils import authenticate_vault, s3_uploads_bucket
23+
from ol_orchestrate.lib.utils import (
24+
authenticate_vault,
25+
s3_uploads_bucket,
26+
)
1927
from ol_orchestrate.resources.api_client_factory import ApiClientFactory
28+
from ol_orchestrate.sensors.canvas import canvas_google_sheet_course_id_sensor
2029

2130
vault = authenticate_vault(DAGSTER_ENV, VAULT_ADDRESS)
2231

32+
gs_secrets = vault.client.secrets.kv.v1.read_secret(
33+
mount_point="secret-data",
34+
path="pipelines/google-service-account",
35+
)["data"]
36+
37+
38+
class GoogleSheetConfig(ConfigurableResource):
39+
service_account_json: dict[str, Any] # Service account JSON credentials
40+
# Google Sheet ID for canvas course IDs
41+
sheet_id: str = "13AoothEhEvWs2cJEEfZETm7E6h3-ZY4tD11KX_ARe1A"
42+
worksheet_id: int = 1472315099 # Worksheet ID (gid) within the Google Sheet
2343

24-
canvas_course_content_job = define_asset_job(
44+
45+
# Asset job that will be executed per partition (course_id)
46+
canvas_course_export_job = define_asset_job(
2547
name="canvas_course_export_job",
2648
selection=[export_course_content, course_content_metadata],
2749
partitions_def=canvas_course_ids,
2850
)
2951

30-
canvas_course_export_schedule = build_schedule_from_partitioned_job(
31-
name="canvas_course_export_schedule",
32-
job=canvas_course_content_job,
52+
53+
@schedule(
3354
cron_schedule="0 */6 * * *",
55+
job=canvas_course_export_job,
3456
execution_timezone="Etc/UTC",
57+
required_resource_keys={"google_sheet_config"},
3558
)
59+
def canvas_course_export_schedule(context: OpExecutionContext):
60+
"""Return a RunRequest for each canvas course ID found in the Google Sheet"""
61+
partition_keys = context.instance.get_dynamic_partitions("canvas_course_ids")
62+
63+
return [
64+
RunRequest(
65+
run_key=partition_key,
66+
partition_key=partition_key,
67+
)
68+
for partition_key in partition_keys
69+
]
70+
3671

3772
canvas_course_export = Definitions(
3873
resources={
@@ -60,7 +95,9 @@
6095
kv_version="2",
6196
vault=vault,
6297
),
98+
"google_sheet_config": GoogleSheetConfig(service_account_json=gs_secrets),
6399
},
64100
assets=[export_course_content, course_content_metadata],
65101
schedules=[canvas_course_export_schedule],
102+
sensors=[canvas_google_sheet_course_id_sensor],
66103
)

src/ol_orchestrate/lib/canvas.py

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
import json
2+
3+
import pygsheets
4+
from dagster import OpExecutionContext
5+
from google.oauth2 import service_account
6+
7+
8+
def fetch_canvas_course_ids_from_google_sheet(context: OpExecutionContext):
9+
"""
10+
Fetch all canvas course IDs from a Google Sheet
11+
"""
12+
sheet_config = context.resources.google_sheet_config
13+
14+
if sheet_config.service_account_json is None:
15+
context.log.error("No google service account credentials found in vault")
16+
return set()
17+
18+
creds_dict = (
19+
sheet_config.service_account_json
20+
if isinstance(sheet_config.service_account_json, dict)
21+
else json.loads(sheet_config.service_account_json)
22+
)
23+
24+
creds_dict["private_key"] = creds_dict["private_key"].replace("\\n", "\n")
25+
26+
scopes = [
27+
"https://www.googleapis.com/auth/spreadsheets",
28+
"https://www.googleapis.com/auth/drive",
29+
]
30+
31+
credentials = service_account.Credentials.from_service_account_info(
32+
creds_dict, scopes=scopes
33+
)
34+
35+
google_sheet_client = pygsheets.authorize(custom_credentials=credentials)
36+
spreadsheet = google_sheet_client.open_by_key(sheet_config.sheet_id)
37+
38+
# Find worksheet by gid
39+
worksheet = next(
40+
(
41+
worksheet
42+
for worksheet in spreadsheet.worksheets()
43+
if worksheet.id == sheet_config.worksheet_id
44+
),
45+
None,
46+
)
47+
if worksheet is None:
48+
context.log.error("No worksheet found with gid %s", sheet_config.worksheet_id)
49+
return set()
50+
51+
# Get all values from the first column and filter to only numeric values
52+
column_values = worksheet.get_col(1, include_tailing_empty=False)
53+
54+
return {value.strip() for value in column_values if value.strip().isdigit()}
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
import json
2+
3+
from dagster import (
4+
AddDynamicPartitionsRequest,
5+
AssetKey,
6+
DefaultSensorStatus,
7+
DeleteDynamicPartitionsRequest,
8+
RunRequest,
9+
SensorResult,
10+
SkipReason,
11+
sensor,
12+
)
13+
14+
from ol_orchestrate.lib.canvas import fetch_canvas_course_ids_from_google_sheet
15+
16+
17+
@sensor(
18+
description="Sensor to monitor a Google Sheet for Canvas course IDs to export.",
19+
minimum_interval_seconds=60 * 60, # Check every 1 hour
20+
required_resource_keys={"google_sheet_config"},
21+
default_status=DefaultSensorStatus.STOPPED,
22+
asset_selection=[
23+
AssetKey(["canvas", "course_content"]),
24+
AssetKey(["canvas", "course_metadata"]),
25+
AssetKey(["canvas", "course_content_metadata"]),
26+
],
27+
)
28+
def canvas_google_sheet_course_id_sensor(context):
29+
google_sheet_course_ids = fetch_canvas_course_ids_from_google_sheet(context)
30+
context.log.info("google_sheet_course_ids: %s", google_sheet_course_ids)
31+
32+
# Existing dynamic partitions
33+
existing_partitions = set(
34+
context.instance.get_dynamic_partitions("canvas_course_ids")
35+
)
36+
context.log.info("existing_partitions: %s", existing_partitions)
37+
38+
# Register any new course IDs as partitions
39+
new_course_ids = google_sheet_course_ids - existing_partitions
40+
removed_course_ids = existing_partitions - google_sheet_course_ids
41+
42+
if not new_course_ids and not removed_course_ids:
43+
return SkipReason("No changes in canvas course IDs")
44+
45+
run_requests = [
46+
RunRequest(
47+
asset_selection=[
48+
AssetKey(["canvas", "course_content"]),
49+
AssetKey(["canvas", "course_metadata"]),
50+
AssetKey(["canvas", "course_content_metadata"]),
51+
],
52+
partition_key=course_id,
53+
)
54+
for course_id in new_course_ids
55+
]
56+
57+
updated_ids = sorted(google_sheet_course_ids)
58+
dynamic_requests = []
59+
if new_course_ids:
60+
dynamic_requests.append(
61+
AddDynamicPartitionsRequest(
62+
partitions_def_name="canvas_course_ids",
63+
partition_keys=list(new_course_ids),
64+
)
65+
)
66+
67+
if removed_course_ids:
68+
dynamic_requests.append(
69+
DeleteDynamicPartitionsRequest(
70+
partitions_def_name="canvas_course_ids",
71+
partition_keys=list(removed_course_ids),
72+
)
73+
)
74+
75+
return SensorResult(
76+
dynamic_partitions_requests=dynamic_requests,
77+
run_requests=run_requests,
78+
cursor=json.dumps(updated_ids),
79+
)

uv.lock

Lines changed: 16 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)