@@ -314,15 +314,23 @@ impl Handler<PackagedSplitBatch> for Uploader {
314
314
if batch. publish_lock . is_dead ( ) {
315
315
// TODO: Remove the junk right away?
316
316
info ! ( "splits' publish lock is dead" ) ;
317
- split_update_sender. discard ( ) ?;
318
- return Ok ( ( ) ) ;
317
+ if let Err ( e) = split_update_sender. discard ( ) {
318
+ warn ! ( cause=?e, "could not discard split" ) ;
319
+ }
320
+ return ;
319
321
}
320
322
321
- let split_streamer = SplitPayloadBuilder :: get_split_payload (
323
+ let split_streamer = match SplitPayloadBuilder :: get_split_payload (
322
324
& packaged_split. split_files ,
323
325
& packaged_split. serialized_split_fields ,
324
326
& packaged_split. hotcache_bytes ,
325
- ) ?;
327
+ ) {
328
+ Ok ( split_streamer) => split_streamer,
329
+ Err ( e) => {
330
+ warn ! ( cause=?e, split_id=packaged_split. split_id( ) , "could not create split streamer" ) ;
331
+ return ;
332
+ }
333
+ } ;
326
334
let split_metadata = create_split_metadata (
327
335
& merge_policy,
328
336
retention_policy. as_ref ( ) ,
@@ -340,11 +348,22 @@ impl Handler<PackagedSplitBatch> for Uploader {
340
348
341
349
}
342
350
343
- let stage_splits_request = StageSplitsRequest :: try_from_splits_metadata ( index_uid. clone ( ) , split_metadata_list. clone ( ) ) ?;
344
- metastore
351
+ let stage_splits_request = match StageSplitsRequest :: try_from_splits_metadata ( index_uid. clone ( ) , split_metadata_list. clone ( ) ) {
352
+ Ok ( stage_splits_request) => stage_splits_request,
353
+ Err ( e) => {
354
+ warn ! ( cause=?e, "could not create stage splits request" ) ;
355
+ return ;
356
+ }
357
+ } ;
358
+ if let Err ( e) = metastore
345
359
. clone ( )
346
360
. stage_splits ( stage_splits_request)
347
- . await ?;
361
+ . await
362
+ {
363
+ warn ! ( cause=?e, "failed to stage splits" ) ;
364
+ return ;
365
+ } ;
366
+
348
367
counters. num_staged_splits . fetch_add ( split_metadata_list. len ( ) as u64 , Ordering :: SeqCst ) ;
349
368
350
369
let mut packaged_splits_and_metadata = Vec :: with_capacity ( batch. splits . len ( ) ) ;
@@ -363,7 +382,7 @@ impl Handler<PackagedSplitBatch> for Uploader {
363
382
if let Err ( cause) = upload_result {
364
383
warn ! ( cause=?cause, split_id=packaged_split. split_id( ) , "Failed to upload split. Killing!" ) ;
365
384
kill_switch. kill ( ) ;
366
- bail ! ( "failed to upload split `{}`. killing the actor context" , packaged_split . split_id ( ) ) ;
385
+ return ;
367
386
}
368
387
369
388
packaged_splits_and_metadata. push ( ( packaged_split, metadata) ) ;
@@ -379,11 +398,17 @@ impl Handler<PackagedSplitBatch> for Uploader {
379
398
batch. batch_parent_span ,
380
399
) ;
381
400
382
- split_update_sender. send ( splits_update, & ctx_clone) . await ?;
401
+ let target = match & split_update_sender {
402
+ SplitsUpdateSender :: Sequencer ( _) => "sequencer" ,
403
+ SplitsUpdateSender :: Publisher ( _) => "publisher" ,
404
+ } ;
405
+ if let Err ( e) = split_update_sender. send ( splits_update, & ctx_clone) . await {
406
+ warn ! ( cause=?e, target, "failed to send uploaded split" ) ;
407
+ return ;
408
+ }
383
409
// We explicitly drop it in order to force move the permit guard into the async
384
410
// task.
385
411
mem:: drop ( permit_guard) ;
386
- Result :: < ( ) , anyhow:: Error > :: Ok ( ( ) )
387
412
}
388
413
. instrument ( Span :: current ( ) ) ,
389
414
"upload_single_task"
0 commit comments