1212from  dagster  import  (
1313    AddDynamicPartitionsRequest ,
1414    AssetKey ,
15-     DagsterEventType ,
1615    DefaultSensorStatus ,
17-     EventRecordsFilter ,
1816    RunRequest ,
1917    SensorResult ,
2018    sensor ,
2119)
2220
23- MAX_VIDEO_RESULTS  =  1 
21+ MAX_VIDEO_RESULTS  =  16 
2422
2523
2624def  fetch_youtube_shorts_api_data (
@@ -130,61 +128,12 @@ def fetch_and_log_video_metadata(youtube_client, youtube_config_provider):
130128    return  current_video_ids , metadata_by_video_id 
131129
132130
133- def  _check_asset_materialization (context , asset_key , partition_key ):
134-     """ 
135-     Check if a specific asset partition has been successfully materialized. 
136- 
137-     Args: 
138-         context: Sensor evaluation context 
139-         asset_key: The asset key to check 
140-         partition_key: The partition key (video ID) to check 
141- 
142-     Returns: 
143-         Boolean indicating if the asset partition has been successfully materialized 
144-     """ 
145-     # Check for successful materialization 
146-     materializations  =  context .instance .get_event_records (
147-         event_records_filter = EventRecordsFilter (
148-             asset_key = asset_key ,
149-             asset_partitions = [partition_key ],
150-             event_type = DagsterEventType .ASSET_MATERIALIZATION ,
151-         ),
152-         limit = 1 ,
153-     )
154- 
155-     return  len (materializations ) >  0 
156- 
157- 
158- def  _is_video_fully_processed (context , video_id ):
159-     """ 
160-     Check if all assets for a video have been successfully materialized. 
161- 
162-     Args: 
163-         context: Sensor evaluation context 
164-         video_id: Video ID to check 
165- 
166-     Returns: 
167-         Boolean indicating if all assets have been materialized 
168-     """ 
169-     required_assets  =  [
170-         AssetKey (["youtube_shorts" , "video_content" ]),
171-         AssetKey (["youtube_shorts" , "video_thumbnail" ]),
172-         AssetKey (["youtube_shorts" , "video_metadata" ]),
173-         AssetKey (["youtube_shorts" , "video_webhook" ]),
174-     ]
175- 
176-     for  asset_key  in  required_assets :
177-         if  not  _check_asset_materialization (context , asset_key , video_id ):
178-             return  False 
179- 
180-     return  True 
181- 
182- 
183131def  create_run_requests (context , videos_to_process , metadata_by_video_id ):
184132    """ 
185133    Create run requests for videos that need processing. 
186134
187-     Only creates run requests for videos that haven't been fully processed yet. 
135+     Individual assets will determine if they need to be re-materialized based on 
136+     version and S3 file existence checks. 
188137
189138    Args: 
190139        context: Sensor evaluation context 
@@ -196,11 +145,6 @@ def create_run_requests(context, videos_to_process, metadata_by_video_id):
196145    """ 
197146    run_requests  =  []
198147    for  video_id  in  videos_to_process :
199-         # Skip videos that have already been fully processed 
200-         if  _is_video_fully_processed (context , video_id ):
201-             context .log .debug ("Video %s already fully processed, skipping" , video_id )
202-             continue 
203- 
204148        context .log .info ("Creating run request for video %s" , video_id )
205149
206150        metadata_json  =  (
0 commit comments