Skip to content

Commit bdf1c50

Browse files
committed
Only retry failed assets and simplify sensor function
1 parent c53ddce commit bdf1c50

File tree

1 file changed

+96
-66
lines changed

1 file changed

+96
-66
lines changed

dg_projects/learning_resources/learning_resources/sensors/youtube_shorts.py

Lines changed: 96 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,90 @@ def _create_dynamic_partition_requests(
191191
return dynamic_requests
192192

193193

194+
def fetch_and_log_video_metadata(youtube_client, youtube_config_provider):
195+
"""
196+
Fetch video metadata for the 24 most recent videos and log the results.
197+
198+
Returns:
199+
current_video_ids: Set of video IDs
200+
metadata_by_video_id: Dictionary of metadata by video ID
201+
"""
202+
current_videos_metadata = fetch_youtube_shorts_api_data(
203+
youtube_client, youtube_config_provider
204+
)
205+
206+
current_video_ids = {
207+
item["contentDetails"]["videoId"] for item in current_videos_metadata
208+
}
209+
210+
metadata_by_video_id = {
211+
item["contentDetails"]["videoId"]: item for item in current_videos_metadata
212+
}
213+
214+
return current_video_ids, metadata_by_video_id
215+
216+
217+
def determine_videos_to_process(current_video_ids, successfully_processed):
218+
"""
219+
Determine which videos need processing (new or failed).
220+
221+
Returns:
222+
Set of video IDs to process
223+
"""
224+
videos_to_process = current_video_ids - successfully_processed
225+
226+
if not videos_to_process:
227+
# Assign message to variable for clarity
228+
skip_reason_message = (
229+
"No videos need processing and none were removed from YouTube"
230+
)
231+
raise SkipReason(skip_reason_message)
232+
233+
return videos_to_process
234+
235+
236+
def create_run_requests(videos_to_process, metadata_by_video_id):
237+
"""
238+
Create run requests for videos that need processing.
239+
240+
Returns:
241+
List of RunRequest objects
242+
"""
243+
run_requests = []
244+
for video_id in videos_to_process:
245+
metadata_json = (
246+
json.dumps(metadata_by_video_id.get(video_id))
247+
if video_id in metadata_by_video_id
248+
else None
249+
)
250+
251+
run_requests.append(
252+
RunRequest(
253+
asset_selection=[
254+
AssetKey(["youtube_shorts", "video_content"]),
255+
AssetKey(["youtube_shorts", "video_thumbnail"]),
256+
AssetKey(["youtube_shorts", "video_metadata"]),
257+
AssetKey(["youtube_shorts", "video_webhook"]),
258+
],
259+
partition_key=video_id,
260+
tags={"video_id": video_id},
261+
run_config={
262+
"ops": {
263+
"youtube_shorts__video_thumbnail": {
264+
"config": {"cached_metadata": metadata_json}
265+
},
266+
"youtube_shorts__video_metadata": {
267+
"config": {"cached_metadata": metadata_json}
268+
},
269+
}
270+
}
271+
if metadata_json
272+
else {},
273+
)
274+
)
275+
return run_requests
276+
277+
194278
@sensor(
195279
description=(
196280
"Sensor to monitor YouTube channels for new video shorts and cleanup "
@@ -226,22 +310,10 @@ def youtube_shorts_sensor(context):
226310
youtube_config_provider = context.resources.youtube_config_provider
227311

228312
# Fetch video metadata for the 24 most recent videos
229-
current_videos_metadata = fetch_youtube_shorts_api_data(
313+
current_video_ids, metadata_by_video_id = fetch_and_log_video_metadata(
230314
youtube_client, youtube_config_provider
231315
)
232316

233-
# Extract video IDs from metadata and create a lookup dict
234-
current_video_ids = {
235-
item["contentDetails"]["videoId"] for item in current_videos_metadata
236-
}
237-
238-
# Create a metadata lookup dictionary for passing to assets
239-
metadata_by_video_id = {
240-
item["contentDetails"]["videoId"]: item for item in current_videos_metadata
241-
}
242-
243-
context.log.info("Found %d videos in YouTube channels", len(current_video_ids))
244-
245317
# Get existing dynamic partitions
246318
existing_partitions = set(
247319
context.instance.get_dynamic_partitions("youtube_video_ids")
@@ -254,58 +326,12 @@ def youtube_shorts_sensor(context):
254326
)
255327

256328
# Determine videos that need processing (new or failed)
257-
videos_to_process = current_video_ids - successfully_processed
258-
259-
# Log which specific videos need processing
260-
if videos_to_process:
261-
sample_to_process = list(videos_to_process)[:10]
262-
context.log.info("Sample videos needing processing: %s", sample_to_process)
263-
264-
if not videos_to_process:
265-
return SkipReason(
266-
"No videos need processing and none were removed from YouTube"
267-
)
268-
269-
context.log.info(
270-
"Videos needing processing (new or failed): %d", len(videos_to_process)
329+
videos_to_process = determine_videos_to_process(
330+
current_video_ids, successfully_processed
271331
)
272332

273-
# Create run requests for videos that need processing (new or failed)
274-
run_requests = []
275-
total_config_size = 0
276-
for video_id in videos_to_process:
277-
metadata_json = (
278-
json.dumps(metadata_by_video_id.get(video_id))
279-
if video_id in metadata_by_video_id
280-
else None
281-
)
282-
if metadata_json:
283-
total_config_size += len(metadata_json)
284-
285-
run_requests.append(
286-
RunRequest(
287-
asset_selection=[
288-
AssetKey(["youtube_shorts", "video_content"]),
289-
AssetKey(["youtube_shorts", "video_thumbnail"]),
290-
AssetKey(["youtube_shorts", "video_metadata"]),
291-
AssetKey(["youtube_shorts", "video_webhook"]),
292-
],
293-
partition_key=video_id,
294-
tags={"video_id": video_id},
295-
run_config={
296-
"ops": {
297-
"youtube_shorts__video_thumbnail": {
298-
"config": {"cached_metadata": metadata_json}
299-
},
300-
"youtube_shorts__video_metadata": {
301-
"config": {"cached_metadata": metadata_json}
302-
},
303-
}
304-
}
305-
if metadata_json
306-
else {},
307-
)
308-
)
333+
# Create run requests for videos that need processing
334+
run_requests = create_run_requests(videos_to_process, metadata_by_video_id)
309335

310336
# Create dynamic partition requests
311337
dynamic_requests = _create_dynamic_partition_requests(
@@ -328,6 +354,10 @@ def youtube_shorts_sensor(context):
328354
cursor=cursor_json,
329355
)
330356

331-
except Exception:
332-
context.log.exception("Error in YouTube shorts sensor")
333-
return SkipReason("Sensor failed due to error - check logs")
357+
except SkipReason as skip:
358+
return skip
359+
360+
except Exception as e:
361+
msg = f"Error in youtube_shorts_sensor: {e!s}"
362+
context.log.exception(msg)
363+
raise

0 commit comments

Comments
 (0)