1515    AssetExecutionContext ,
1616    AssetIn ,
1717    AssetKey ,
18+     AssetRecordsFilter ,
19+     Backoff ,
1820    Config ,
1921    DataVersion ,
2022    DynamicPartitionsDefinition ,
2123    Output ,
24+     RetryPolicy ,
2225    asset ,
2326)
2427from  ol_orchestrate .lib .automation_policies  import  upstream_or_code_changes 
28+ from  s3fs  import  S3FileSystem 
29+ from  upath  import  UPath 
2530
2631from  learning_resources .resources .api_client_factory  import  ApiClientFactory 
2732from  learning_resources .resources .youtube_client  import  YouTubeClientFactory 
@@ -48,6 +53,14 @@ def _assign_s3_path(video_id: str, extension: str) -> str:
4853    return  f"{ video_id } { video_id } { extension }  
4954
5055
56+ def  _build_full_s3_path (config : YouTubeShortsConfig , relative_path : str ) ->  str :
57+     """Build full S3 path from config and relative path.""" 
58+     bucket  =  config .bucket_name 
59+     prefix  =  config .s3_prefix 
60+     full_path  =  f"{ bucket } { prefix } { relative_path }  .replace ("//" , "/" )
61+     return  f"s3://{ full_path }  
62+ 
63+ 
5164def  _load_metadata_from_config (
5265    config : YouTubeShortsConfig ,
5366) ->  dict [str , Any ] |  None :
@@ -84,6 +97,159 @@ def _generate_video_version(video_data: dict[str, Any]) -> str:
8497    return  hashlib .sha256 (version_string .encode ()).hexdigest ()[:16 ]
8598
8699
100+ def  _should_skip_asset_materialization (
101+     context : AssetExecutionContext ,
102+     expected_version : str ,
103+     s3_path : str ,
104+ ) ->  tuple [bool , dict [str , Any ]]:
105+     """ 
106+     Check if an asset should skip materialization based on existing version and S3 file. 
107+ 
108+ 
109+     Args: 
110+         context: Dagster execution context with asset_key and partition_key 
111+         expected_version: The expected data version hash for this asset 
112+         s3_path: Full S3 path (s3://bucket/key) to verify file existence 
113+ 
114+     Returns: 
115+         Tuple of (should_skip: bool, existing_metadata: dict) where 
116+         should_skip is True if the asset already exists with the correct 
117+         version AND the S3 file exists with content. 
118+     """ 
119+     video_id  =  context .partition_key 
120+ 
121+     # Check if asset already exists with correct version 
122+     existing_materializations  =  context .instance .fetch_materializations (
123+         records_filter = AssetRecordsFilter (
124+             asset_key = context .asset_key ,
125+             asset_partitions = [video_id ],
126+         ),
127+         limit = 1 ,
128+     )
129+ 
130+     if  not  existing_materializations .records :
131+         log .debug (
132+             "No existing materialization found for %s partition %s" ,
133+             context .asset_key ,
134+             video_id ,
135+         )
136+         return  (False , {})
137+ 
138+     existing_record  =  existing_materializations .records [0 ]
139+     existing_version  =  existing_record .asset_materialization .tags .get (
140+         "dagster/data_version" 
141+     )
142+     existing_metadata  =  {
143+         k : v .value  for  k , v  in  existing_record .asset_materialization .metadata .items ()
144+     }
145+ 
146+     if  existing_version  !=  expected_version :
147+         log .info (
148+             "Asset %s for video %s exists but version mismatch " 
149+             "(existing: %s, expected: %s) - re-materializing" ,
150+             context .asset_key ,
151+             video_id ,
152+             existing_version ,
153+             expected_version ,
154+         )
155+         return  (False , {})
156+ 
157+     # Version matches - verify S3 file actually exists before skipping 
158+     try :
159+         s3_fs  =  S3FileSystem ()
160+         s3_file  =  UPath (s3_path , ** s3_fs .storage_options )
161+ 
162+         if  not  s3_file .exists ():
163+             log .warning (
164+                 "Asset %s for video %s has correct version but S3 file %s " 
165+                 "does not exist - re-materializing" ,
166+                 context .asset_key ,
167+                 video_id ,
168+                 s3_path ,
169+             )
170+             return  (False , {})
171+ 
172+         # Check file size to ensure it's not empty 
173+         file_size  =  s3_file .stat ().st_size 
174+         if  file_size  ==  0 :
175+             log .warning (
176+                 "Asset %s for video %s has correct version but S3 file is empty " 
177+                 "(0 bytes) - re-materializing" ,
178+                 context .asset_key ,
179+                 video_id ,
180+             )
181+             return  (False , {})
182+ 
183+         log .info (
184+             "Asset %s for video %s already exists with correct version %s " 
185+             "and valid S3 file (%d bytes) - skipping" ,
186+             context .asset_key ,
187+             video_id ,
188+             expected_version ,
189+             file_size ,
190+         )
191+     except  Exception  as  e :  # noqa: BLE001 
192+         log .warning (
193+             "Failed to verify S3 file existence for %s: %s - " 
194+             "re-materializing to be safe" ,
195+             s3_path ,
196+             e ,
197+             exc_info = True ,
198+         )
199+         return  (False , {})
200+     else :
201+         return  (True , existing_metadata )
202+ 
203+ 
204+ def  _create_skip_output (
205+     context : AssetExecutionContext ,
206+     version : str ,
207+     target_path : str ,
208+     file_extension : str ,
209+     existing_metadata : dict [str , Any ],
210+ ) ->  Output [tuple ]:
211+     """ 
212+     Create an Output for skipping asset materialization. 
213+ 
214+     Creates a lightweight marker file that the IO manager will detect and skip 
215+     uploading, preserving the existing asset in S3 without re-processing or 
216+     re-uploading. 
217+ 
218+     Args: 
219+         context: Dagster execution context 
220+         version: Data version hash 
221+         target_path: Target S3 path for the asset 
222+             (relative path like "video_id/video_id.mp4") 
223+         file_extension: File extension for the marker file 
224+             (e.g., "mp4", "jpg", "json") 
225+         existing_metadata: Metadata from the existing materialization to preserve 
226+ 
227+     Returns: 
228+         Output with marker file and preserved metadata 
229+     """ 
230+     video_id  =  context .partition_key 
231+ 
232+     # Create an empty marker file - IO manager will detect .skip_ prefix and skip upload 
233+     marker_file  =  Path (f".skip_{ video_id } { file_extension }  )
234+     marker_file .touch ()
235+ 
236+     log .info (
237+         "Skipping %s for video %s - already exists with correct version %s" ,
238+         context .asset_key ,
239+         video_id ,
240+         version ,
241+     )
242+ 
243+     return  Output (
244+         value = (marker_file , target_path ),
245+         data_version = DataVersion (version ),
246+         metadata = {
247+             ** existing_metadata ,
248+             "status" : "skipped_existing" ,
249+         },
250+     )
251+ 
252+ 
87253def  fetch_video_metadata (youtube_client , video_id ):
88254    request  =  youtube_client .client .videos ().list (
89255        part = "snippet,contentDetails" , id = video_id 
@@ -149,6 +315,18 @@ def youtube_video_metadata(
149315        # Generate version for change tracking 
150316        version  =  _generate_video_version (video_data )
151317
318+         target_path  =  _assign_s3_path (video_id , "json" )
319+         full_s3_path  =  _build_full_s3_path (config , target_path )
320+ 
321+         # Check if asset already exists with correct version - skip re-processing 
322+         should_skip , existing_metadata  =  _should_skip_asset_materialization (
323+             context , version , full_s3_path 
324+         )
325+         if  should_skip :
326+             return  _create_skip_output (
327+                 context , version , target_path , "json" , existing_metadata 
328+             )
329+ 
152330        # Write metadata to local JSON file for io_manager to upload 
153331        metadata_file  =  Path (f"{ version }  )
154332        with  metadata_file .open ("w" , encoding = "utf-8" ) as  f :
@@ -206,6 +384,18 @@ def youtube_video_content(
206384    # Generate version for change tracking 
207385    version  =  _generate_video_version (video_data  or  {"id" : video_id })
208386
387+     target_path  =  _assign_s3_path (video_id , "mp4" )
388+     full_s3_path  =  _build_full_s3_path (config , target_path )
389+ 
390+     # Check if asset already exists with correct version - skip expensive download 
391+     should_skip , existing_metadata  =  _should_skip_asset_materialization (
392+         context , version , full_s3_path 
393+     )
394+     if  should_skip :
395+         return  _create_skip_output (
396+             context , version , target_path , "mp4" , existing_metadata 
397+         )
398+ 
209399    # Download video using yt-dlp to local file 
210400    output_file  =  Path (f"{ video_id }  )
211401
@@ -317,6 +507,18 @@ def youtube_video_thumbnail(
317507    # Generate version for change tracking 
318508    version  =  _generate_video_version (video_metadata )
319509
510+     target_path  =  _assign_s3_path (video_id , "jpg" )
511+     full_s3_path  =  _build_full_s3_path (config , target_path )
512+ 
513+     # Check if asset already exists with correct version - skip expensive download 
514+     should_skip , existing_metadata  =  _should_skip_asset_materialization (
515+         context , version , full_s3_path 
516+     )
517+     if  should_skip :
518+         return  _create_skip_output (
519+             context , version , target_path , "jpg" , existing_metadata 
520+         )
521+ 
320522    try :
321523        # Download thumbnail 
322524        response  =  requests .get (thumbnail_url , timeout = 30 )
@@ -360,27 +562,59 @@ def youtube_video_thumbnail(
360562    code_version = "youtube_webhook_v1" , 
361563    partitions_def = youtube_video_ids , 
362564    ins = {"video_metadata" : AssetIn (key = AssetKey (["youtube_shorts" , "video_metadata" ]))}, 
565+     retry_policy = RetryPolicy (max_retries = 3 , delay = 1 , backoff = Backoff .EXPONENTIAL ), 
363566) 
364567def  youtube_webhook (
365568    context : AssetExecutionContext ,
569+     config : YouTubeShortsConfig ,
366570    learn_api : ApiClientFactory ,
571+     youtube_client : YouTubeClientFactory ,
367572    video_metadata ,  # noqa: ARG001 
368573) ->  Output [bool ]:
369574    """ 
370575    Send a webhook notification for a processed YouTube video. 
371576
372577    Args: 
373578        context: Dagster execution context containing partition key (video ID) 
579+         config: Configuration containing cached metadata from sensor 
374580        learn_api: MIT Learn API client factory 
375-         video_metadata: S3 path to metadata file 
581+         youtube_client: YouTube API client factory 
582+         video_metadata: Dependency to ensure metadata is processed first 
376583
377584    Returns: 
378585        Boolean indicating success of the webhook notification 
379586    """ 
380587    video_id  =  context .partition_key 
381588
382589    try :
383-         webhook_response  =  learn_api .client .notify_shorts_processed (video_id )
590+         # Try to get metadata from the sensor via run_config 
591+         metadata_dict  =  _load_metadata_from_config (config )
592+ 
593+         if  metadata_dict :
594+             log .info ("Using cached metadata from sensor for webhook: %s" , video_id )
595+         else :
596+             # Fallback: Fetch metadata from YouTube API 
597+             log .info ("Fetching metadata from YouTube for %s" , video_id )
598+             try :
599+                 response  =  fetch_video_metadata (youtube_client , video_id )
600+                 videos  =  response .get ("items" , [])
601+                 if  not  videos :
602+                     msg  =  f"Video { video_id }  
603+                     raise  ValueError (msg )  # noqa: TRY301 
604+                 metadata_dict  =  videos [0 ]
605+             except  Exception  as  fetch_error :
606+                 log .exception (
607+                     "Failed to fetch metadata from YouTube API for video %s" ,
608+                     video_id ,
609+                 )
610+                 msg  =  (
611+                     f"No metadata found in run config and failed to fetch " 
612+                     f"from YouTube API for video { video_id }  
613+                     "Cannot send webhook without metadata." 
614+                 )
615+                 raise  ValueError (msg ) from  fetch_error 
616+ 
617+         webhook_response  =  learn_api .client .notify_shorts_processed (metadata_dict )
384618        log .info (
385619            "Successfully sent webhook notification for video %s: %s" ,
386620            video_id ,
0 commit comments