55import  hashlib 
66import  json 
77import  logging 
8- import  re 
98import  tempfile 
109from  pathlib  import  Path 
1110from  tempfile  import  NamedTemporaryFile 
2928
3029from  learning_resources .resources .youtube_client  import  YouTubeClientFactory 
3130
32- 
33- def  _check_video_duration (video_data : dict [str , Any ], max_duration : int  =  60 ) ->  bool :
34-     """ 
35-     Check if video duration is within acceptable limits for shorts. 
36- 
37-     YouTube shorts should typically be 60 seconds or less. 
38-     """ 
39-     duration_str  =  video_data .get ("contentDetails" , {}).get ("duration" , "" )
40-     if  not  duration_str :
41-         return  False 
42- 
43-     # Parse ISO 8601 duration format (PT1M30S) 
44-     pattern  =  r"PT(?:(\d+)H)?(?:(\d+)M)?(?:(\d+)S)?" 
45-     match  =  re .match (pattern , duration_str )
46- 
47-     if  not  match :
48-         return  False 
49- 
50-     hours  =  int (match .group (1 ) or  0 )
51-     minutes  =  int (match .group (2 ) or  0 )
52-     seconds  =  int (match .group (3 ) or  0 )
53- 
54-     total_seconds  =  hours  *  3600  +  minutes  *  60  +  seconds 
55-     return  total_seconds  <=  max_duration 
56- 
57- 
5831log  =  logging .getLogger (__name__ )
5932
6033# Dynamic partitioning for individual video processing 
@@ -67,7 +40,6 @@ class YouTubeShortsConfig(Config):
6740    bucket_name : str  =  "ol-devops-sandbox" 
6841    s3_prefix : str  =  "youtube_shorts" 
6942    max_workers : int  =  4 
70-     max_video_duration : int  =  60   # seconds 
7143    cached_metadata : str  |  None  =  None   # JSON string of video metadata from sensor 
7244
7345
@@ -101,8 +73,6 @@ def _generate_video_version(video_data: dict[str, Any]) -> str:
10173        "id" : video_data .get ("id" ),
10274        "etag" : video_data .get ("etag" ),
10375        "published_at" : video_data .get ("snippet" , {}).get ("publishedAt" ),
104-         "title" : video_data .get ("snippet" , {}).get ("title" ),
105-         "view_count" : video_data .get ("statistics" , {}).get ("viewCount" ),
10676    }
10777
10878    version_string  =  json .dumps (version_fields , sort_keys = True )
@@ -111,7 +81,7 @@ def _generate_video_version(video_data: dict[str, Any]) -> str:
11181
11282def  fetch_video_metadata (youtube_client , video_id ):
11383    request  =  youtube_client .client .videos ().list (
114-         part = "snippet,contentDetails,statistics " , id = video_id 
84+         part = "snippet,contentDetails" , id = video_id 
11585    )
11686    return  request .execute ()
11787
@@ -184,13 +154,6 @@ def youtube_video_metadata(  # noqa: PLR0913
184154
185155            video_data  =  videos [0 ]
186156
187-         # Check if this is actually a short (duration filter) 
188-         # Only check duration if we fetched from API (sensor data is pre-filtered) 
189-         if  not  from_sensor  and  not  _check_video_duration (video_data , max_duration = 60 ):
190-             duration  =  video_data .get ("contentDetails" , {}).get ("duration" , "" )
191-             msg  =  f"Video { video_id } { duration }  
192-             raise  ValueError (msg )  # noqa: TRY301 
193- 
194157        # Generate version for change tracking 
195158        version  =  _generate_video_version (video_data )
196159
@@ -200,13 +163,6 @@ def youtube_video_metadata(  # noqa: PLR0913
200163        # Convert video_data to JSON and upload to S3 
201164        metadata_json  =  json .dumps (video_data , indent = 2 , ensure_ascii = False )
202165
203-         try :
204-             # Check if metadata already exists 
205-             s3 .get_client ().head_object (Bucket = config .bucket_name , Key = s3_key )
206-             log .info ("Metadata %s already exists in S3, updating..." , video_id )
207-         except  s3 .get_client ().exceptions .NoSuchKey :
208-             log .info ("Uploading new metadata for %s to S3" , video_id )
209- 
210166        # Upload metadata to S3 
211167        upload_to_s3 (
212168            s3 .get_client (),
@@ -266,24 +222,8 @@ def youtube_video_content(
266222    # Check if video already exists in S3 
267223    s3_key  =  f"{ config .s3_prefix } { video_id }  
268224
269-     try :
270-         # Check if file exists 
271-         s3 .get_client ().head_object (Bucket = config .bucket_name , Key = s3_key )
272-         log .info ("Video %s already exists in S3, skipping download" , video_id )
273- 
274-         return  Output (
275-             value = s3_key ,
276-             metadata = {
277-                 "video_id" : video_id ,
278-                 "s3_bucket" : config .bucket_name ,
279-                 "s3_key" : s3_key ,
280-                 "status" : "already_exists" ,
281-             },
282-         )
283- 
284-     except  s3 .get_client ().exceptions .NoSuchKey :
285-         # File doesn't exist, proceed with download 
286-         pass 
225+     # Generate version for change tracking 
226+     version  =  _generate_video_version ({"id" : video_id })
287227
288228    # Download video using yt-dlp 
289229    with  tempfile .TemporaryDirectory () as  temp_dir :
@@ -331,6 +271,7 @@ def youtube_video_content(
331271
332272            return  Output (
333273                value = s3_key ,
274+                 data_version = DataVersion (version ),
334275                metadata = {
335276                    "video_id" : video_id ,
336277                    "s3_bucket" : config .bucket_name ,
@@ -398,6 +339,9 @@ def youtube_video_thumbnail(
398339
399340        video_metadata  =  videos [0 ]
400341
342+     # Generate version for change tracking 
343+     version  =  _generate_video_version (video_metadata )
344+ 
401345    thumbnails  =  video_metadata .get ("snippet" , {}).get ("thumbnails" , {})
402346
403347    # Try to get high, medium, or default quality thumbnail 
@@ -413,25 +357,6 @@ def youtube_video_thumbnail(
413357
414358    s3_key  =  f"{ config .s3_prefix } { video_id }  
415359
416-     try :
417-         # Check if thumbnail already exists 
418-         s3 .get_client ().head_object (Bucket = config .bucket_name , Key = s3_key )
419-         log .info ("Thumbnail %s already exists in S3, skipping download" , video_id )
420- 
421-         return  Output (
422-             value = s3_key ,
423-             metadata = {
424-                 "video_id" : video_id ,
425-                 "s3_bucket" : config .bucket_name ,
426-                 "s3_key" : s3_key ,
427-                 "status" : "already_exists" ,
428-             },
429-         )
430- 
431-     except  s3 .get_client ().exceptions .NoSuchKey :
432-         # File doesn't exist, proceed with download 
433-         pass 
434- 
435360    try :
436361        # Download thumbnail 
437362        response  =  requests .get (thumbnail_url , timeout = 30 )
@@ -454,6 +379,7 @@ def youtube_video_thumbnail(
454379
455380        return  Output (
456381            value = s3_key ,
382+             data_version = DataVersion (version ),
457383            metadata = {
458384                "video_id" : video_id ,
459385                "s3_bucket" : config .bucket_name ,
@@ -470,17 +396,17 @@ def youtube_video_thumbnail(
470396
471397
472398@asset ( 
473-     key = AssetKey (["youtube_shorts" , "webhook_notification " ]), 
399+     key = AssetKey (["youtube_shorts" , "video_webhook " ]), 
474400    group_name = "youtube_shorts" , 
475401    description = "Send webhook notification after metadata extraction." , 
476402    code_version = "youtube_webhook_v1" , 
477403    partitions_def = youtube_video_ids , 
478404    ins = {"video_metadata" : AssetIn (key = AssetKey (["youtube_shorts" , "video_metadata" ]))}, 
479405) 
480- def  youtube_webhook_notification (
406+ def  youtube_webhook (
481407    context : AssetExecutionContext ,
482408    learn_api : MITLearnApiClient ,
483-     video_metadata : dict [str , Any ],   # noqa: ARG001 
409+     video_metadata : dict [str , Any ],
484410) ->  Output [bool ]:
485411    """ 
486412    Send a webhook notification for a processed YouTube video. 
@@ -494,6 +420,10 @@ def youtube_webhook_notification(
494420        Boolean indicating success of the webhook notification 
495421    """ 
496422    video_id  =  context .partition_key 
423+ 
424+     # Generate version for change tracking 
425+     version  =  _generate_video_version (video_metadata )
426+ 
497427    try :
498428        webhook_response  =  send_webhook (learn_api , video_id )
499429        log .info (
@@ -503,6 +433,7 @@ def youtube_webhook_notification(
503433        )
504434        return  Output (
505435            value = True ,
436+             data_version = DataVersion (version ),
506437            metadata = {
507438                "video_id" : video_id ,
508439                "webhook_response" : webhook_response ,
0 commit comments