|
| 1 | +"""Assets for managing instructor onboarding data in the access-forge GitHub repository. |
| 2 | +
|
| 3 | +This module pulls email addresses from the combined user course roles |
| 4 | +dbt model and pushes them to a private GitHub repository for instructor |
| 5 | +access management. |
| 6 | +""" |
| 7 | + |
| 8 | +from datetime import UTC, datetime |
| 9 | + |
| 10 | +import polars as pl |
| 11 | +from dagster import AssetExecutionContext, AssetIn, AssetKey, Output, asset |
| 12 | +from github.GithubException import UnknownObjectException |
| 13 | +from ol_orchestrate.resources.github import GithubApiClientFactory |
| 14 | + |
| 15 | + |
| 16 | +@asset( |
| 17 | + name="instructor_onboarding_user_list", |
| 18 | + group_name="instructor_onboarding", |
| 19 | + ins={ |
| 20 | + "int__combined__user_course_roles": AssetIn( |
| 21 | + key=AssetKey(["int__combined__user_course_roles"]) |
| 22 | + ) |
| 23 | + }, |
| 24 | + description="Generates CSV file with user emails for access-forge repository", |
| 25 | +) |
| 26 | +def generate_instructor_onboarding_user_list( |
| 27 | + context: AssetExecutionContext, |
| 28 | + int__combined__user_course_roles: pl.DataFrame, |
| 29 | +) -> Output[str]: |
| 30 | + """Pull unique email addresses from user course roles and prepare for GitHub upload. |
| 31 | +
|
| 32 | + This asset reads the combined user course roles dbt model, extracts unique email |
| 33 | + addresses, and generates a CSV string formatted for the access-forge repository. |
| 34 | +
|
| 35 | + The output CSV has three columns: |
| 36 | + - email: User's email address (from user_email field) |
| 37 | + - role: Set to 'ol-data-analyst' for all users |
| 38 | + - sent_invite: Set to 1 for all users |
| 39 | +
|
| 40 | + Args: |
| 41 | + context: Dagster execution context |
| 42 | + int__combined__user_course_roles: DataFrame from dbt model containing fields: |
| 43 | + platform, user_username, user_email, user_full_name, courserun_readable_id, |
| 44 | + organization, courseaccess_role |
| 45 | +
|
| 46 | + Returns: |
| 47 | + Output containing CSV string content formatted for access-forge repo |
| 48 | + """ |
| 49 | + # Select unique email addresses and filter out nulls |
| 50 | + user_data = ( |
| 51 | + int__combined__user_course_roles.select(["user_email"]) |
| 52 | + .filter(pl.col("user_email").is_not_null()) |
| 53 | + .unique() |
| 54 | + .sort("user_email") |
| 55 | + ) |
| 56 | + |
| 57 | + # Add role and sent_invite columns with fixed values |
| 58 | + user_data = user_data.with_columns( |
| 59 | + [pl.lit("ol-data-analyst").alias("role"), pl.lit(1).alias("sent_invite")] |
| 60 | + ) |
| 61 | + |
| 62 | + # Rename column to match expected format |
| 63 | + user_data = user_data.rename({"user_email": "email"}) |
| 64 | + |
| 65 | + # Reorder columns: email, role, sent_invite |
| 66 | + user_data = user_data.select(["email", "role", "sent_invite"]) |
| 67 | + |
| 68 | + # Convert to CSV string |
| 69 | + csv_content = user_data.write_csv() |
| 70 | + |
| 71 | + context.log.info("Generated CSV content with %s unique users", len(user_data)) |
| 72 | + |
| 73 | + return Output( |
| 74 | + value=csv_content, |
| 75 | + metadata={ |
| 76 | + "num_users": len(user_data), |
| 77 | + "preview": csv_content[:500], |
| 78 | + }, |
| 79 | + ) |
| 80 | + |
| 81 | + |
| 82 | +@asset( |
| 83 | + name="update_access_forge_repo", |
| 84 | + group_name="instructor_onboarding", |
| 85 | + ins={"instructor_onboarding_user_list": AssetIn()}, |
| 86 | + required_resource_keys={"github_api"}, |
| 87 | + description="Updates the access-forge repository with the generated user list", |
| 88 | +) |
| 89 | +def update_access_forge_repository( |
| 90 | + context: AssetExecutionContext, |
| 91 | + instructor_onboarding_user_list: str, |
| 92 | + github_api: GithubApiClientFactory, |
| 93 | +) -> Output[dict]: |
| 94 | + """Push the generated CSV content to the access-forge GitHub repository. |
| 95 | +
|
| 96 | + This asset updates or creates a CSV file in the private mitodl/access-forge |
| 97 | + repository with the user list generated from the dbt model. |
| 98 | +
|
| 99 | + Args: |
| 100 | + context: Dagster execution context |
| 101 | + instructor_onboarding_user_list: CSV string content to upload with columns: |
| 102 | + email, role, sent_invite |
| 103 | + github_api: GitHub API client factory resource for authentication |
| 104 | +
|
| 105 | + Returns: |
| 106 | + Output containing metadata about the commit (repo, file path, action, SHA) |
| 107 | +
|
| 108 | + Raises: |
| 109 | + Exception: If GitHub API call fails or authentication issues occur |
| 110 | + """ |
| 111 | + repo_name = "mitodl/access-forge" |
| 112 | + file_path = "users/ci/users.csv" |
| 113 | + base_branch = "main" |
| 114 | + |
| 115 | + # Create unique branch name with timestamp |
| 116 | + timestamp = datetime.now(tz=UTC).strftime("%Y%m%d-%H%M%S") |
| 117 | + new_branch = f"dagster/update-user-list-{timestamp}" |
| 118 | + |
| 119 | + commit_message = "dagster-pipeline - update user list from ol-data-platform" |
| 120 | + |
| 121 | + try: |
| 122 | + gh_client = github_api.get_client() |
| 123 | + repo = gh_client.get_repo(repo_name) |
| 124 | + |
| 125 | + # Get the base branch reference |
| 126 | + base_ref = repo.get_git_ref(f"heads/{base_branch}") |
| 127 | + base_sha = base_ref.object.sha |
| 128 | + |
| 129 | + # Create new branch from base |
| 130 | + repo.create_git_ref(ref=f"refs/heads/{new_branch}", sha=base_sha) |
| 131 | + context.log.info("Created branch %s", new_branch) |
| 132 | + |
| 133 | + # Try to get existing file to update it, or create new file |
| 134 | + try: |
| 135 | + contents = repo.get_contents(file_path, ref=base_branch) |
| 136 | + repo.update_file( |
| 137 | + path=file_path, |
| 138 | + message=commit_message, |
| 139 | + content=instructor_onboarding_user_list, |
| 140 | + sha=contents.sha, |
| 141 | + branch=new_branch, |
| 142 | + ) |
| 143 | + context.log.info("Updated file %s in branch %s", file_path, new_branch) |
| 144 | + action = "updated" |
| 145 | + except UnknownObjectException: |
| 146 | + # File doesn't exist, create it |
| 147 | + repo.create_file( |
| 148 | + path=file_path, |
| 149 | + message=commit_message, |
| 150 | + content=instructor_onboarding_user_list, |
| 151 | + branch=new_branch, |
| 152 | + ) |
| 153 | + context.log.info("Created file %s in branch %s", file_path, new_branch) |
| 154 | + action = "created" |
| 155 | + |
| 156 | + # Create pull request |
| 157 | + pr = repo.create_pull( |
| 158 | + title=f"Update user list - {timestamp}", |
| 159 | + body=( |
| 160 | + "Automated update of user list from ol-data-platform Dagster " |
| 161 | + f"pipeline.\n\n" |
| 162 | + f"- Action: {action} file\n" |
| 163 | + f"- File: {file_path}\n" |
| 164 | + f"- Users: {instructor_onboarding_user_list.count(chr(10))} entries" |
| 165 | + ), |
| 166 | + head=new_branch, |
| 167 | + base=base_branch, |
| 168 | + ) |
| 169 | + |
| 170 | + context.log.info("Created PR #%s: %s", pr.number, pr.html_url) |
| 171 | + |
| 172 | + return Output( |
| 173 | + value={ |
| 174 | + "repo": repo_name, |
| 175 | + "file_path": file_path, |
| 176 | + "action": action, |
| 177 | + "branch": new_branch, |
| 178 | + "pr_number": pr.number, |
| 179 | + "pr_url": pr.html_url, |
| 180 | + }, |
| 181 | + metadata={ |
| 182 | + "repository": repo_name, |
| 183 | + "file_path": file_path, |
| 184 | + "action": action, |
| 185 | + "pr_number": pr.number, |
| 186 | + "pr_url": pr.html_url, |
| 187 | + }, |
| 188 | + ) |
| 189 | + |
| 190 | + except Exception: |
| 191 | + context.log.exception("Failed to create PR in GitHub repository") |
| 192 | + raise |
0 commit comments