Skip to content
This repository was archived by the owner on Oct 18, 2023. It is now read-only.

Commit ea9c3bd

Browse files
committed
bottomless: stream gzip snapshot
1 parent be9a6ef commit ea9c3bd

File tree

1 file changed

+67
-8
lines changed

1 file changed

+67
-8
lines changed

bottomless/src/replicator.rs

Lines changed: 67 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,8 @@ use aws_sdk_s3::error::SdkError;
88
use aws_sdk_s3::operation::get_object::builders::GetObjectFluentBuilder;
99
use aws_sdk_s3::operation::list_objects::builders::ListObjectsFluentBuilder;
1010
use aws_sdk_s3::operation::list_objects::ListObjectsOutput;
11-
use aws_sdk_s3::primitives::ByteStream;
11+
use aws_sdk_s3::primitives::{ByteStream, SdkBody};
12+
use aws_sdk_s3::types::{CompletedMultipartUpload, CompletedPart};
1213
use aws_sdk_s3::{Client, Config};
1314
use bytes::{Buf, Bytes, BytesMut};
1415
use chrono::{DateTime, LocalResult, TimeZone, Utc};
@@ -620,19 +621,77 @@ impl Replicator {
620621
Self::read_change_counter(&mut reader).await?
621622
}
622623
CompressionKind::Gzip => {
623-
// TODO: find a way to compress ByteStream on the fly instead of creating
624-
// an intermediary file.
625-
let (compressed_db_path, change_counter) = self.compress_main_db_file().await?;
624+
let mut reader = tokio::fs::File::open(&self.db_path).await?;
625+
626+
let stream = tokio::io::BufReader::new(reader.try_clone().await?);
627+
let mut gzip_reader = async_compression::tokio::bufread::GzipEncoder::new(stream);
628+
626629
let key = format!("{}-{}/db.gz", self.db_name, self.generation);
630+
let upload_id = self
631+
.client
632+
.create_multipart_upload()
633+
.bucket(&self.bucket)
634+
.key(key.clone())
635+
.send()
636+
.await?
637+
.upload_id
638+
.ok_or_else(|| anyhow::anyhow!("missing upload_id"))?;
639+
640+
const CHUNK_SIZE: usize = 5 * 1024 * 1024;
641+
let mut parts = Vec::new();
642+
// S3 takes an 1-based index
643+
for part in 1..=10000 {
644+
let mut buffer = bytes::BytesMut::with_capacity(CHUNK_SIZE);
645+
loop {
646+
let bytes_written = gzip_reader.read_buf(&mut buffer).await?;
647+
// EOF or buffer is full
648+
if bytes_written == 0 {
649+
break;
650+
}
651+
}
652+
653+
// EOF
654+
if buffer.is_empty() {
655+
break;
656+
}
657+
658+
let part_out = self
659+
.client
660+
.upload_part()
661+
.bucket(&self.bucket)
662+
.key(key.clone())
663+
.upload_id(upload_id.clone())
664+
.body(ByteStream::from(buffer.freeze()))
665+
.part_number(part)
666+
.send()
667+
.await?;
668+
669+
parts.push(
670+
CompletedPart::builder()
671+
.part_number(part)
672+
.e_tag(
673+
part_out.e_tag.ok_or_else(|| {
674+
anyhow::anyhow!("e_tag missing from part upload")
675+
})?,
676+
)
677+
.build(),
678+
);
679+
}
680+
627681
self.client
628-
.put_object()
682+
.complete_multipart_upload()
683+
.upload_id(upload_id)
629684
.bucket(&self.bucket)
630685
.key(key)
631-
.body(ByteStream::from_path(compressed_db_path).await?)
686+
.multipart_upload(
687+
CompletedMultipartUpload::builder()
688+
.set_parts(Some(parts))
689+
.build(),
690+
)
632691
.send()
633692
.await?;
634-
let _ = tokio::fs::remove_file(compressed_db_path).await;
635-
change_counter
693+
694+
Self::read_change_counter(&mut reader).await?
636695
}
637696
};
638697

0 commit comments

Comments
 (0)