Skip to content
This repository was archived by the owner on Oct 18, 2023. It is now read-only.

Commit f09e491

Browse files
committed
send gzip in chunks of known size
1 parent ea9c3bd commit f09e491

File tree

1 file changed

+150
-76
lines changed

1 file changed

+150
-76
lines changed

bottomless/src/replicator.rs

Lines changed: 150 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use aws_sdk_s3::error::SdkError;
88
use aws_sdk_s3::operation::get_object::builders::GetObjectFluentBuilder;
99
use aws_sdk_s3::operation::list_objects::builders::ListObjectsFluentBuilder;
1010
use aws_sdk_s3::operation::list_objects::ListObjectsOutput;
11-
use aws_sdk_s3::primitives::{ByteStream, SdkBody};
11+
use aws_sdk_s3::primitives::{ByteStream};
1212
use aws_sdk_s3::types::{CompletedMultipartUpload, CompletedPart};
1313
use aws_sdk_s3::{Client, Config};
1414
use bytes::{Buf, Bytes, BytesMut};
@@ -608,92 +608,166 @@ impl Replicator {
608608
return Ok(());
609609
}
610610
tracing::debug!("Snapshotting {}", self.db_path);
611-
let change_counter = match self.use_compression {
612-
CompressionKind::None => {
613-
self.client
614-
.put_object()
615-
.bucket(&self.bucket)
616-
.key(format!("{}-{}/db.db", self.db_name, self.generation))
617-
.body(ByteStream::from_path(&self.db_path).await?)
618-
.send()
619-
.await?;
620-
let mut reader = tokio::fs::File::open(&self.db_path).await?;
621-
Self::read_change_counter(&mut reader).await?
622-
}
623-
CompressionKind::Gzip => {
624-
let mut reader = tokio::fs::File::open(&self.db_path).await?;
625-
626-
let stream = tokio::io::BufReader::new(reader.try_clone().await?);
627-
let mut gzip_reader = async_compression::tokio::bufread::GzipEncoder::new(stream);
628-
629-
let key = format!("{}-{}/db.gz", self.db_name, self.generation);
630-
let upload_id = self
631-
.client
632-
.create_multipart_upload()
633-
.bucket(&self.bucket)
634-
.key(key.clone())
635-
.send()
636-
.await?
637-
.upload_id
638-
.ok_or_else(|| anyhow::anyhow!("missing upload_id"))?;
639-
640-
const CHUNK_SIZE: usize = 5 * 1024 * 1024;
641-
let mut parts = Vec::new();
642-
// S3 takes an 1-based index
643-
for part in 1..=10000 {
644-
let mut buffer = bytes::BytesMut::with_capacity(CHUNK_SIZE);
645-
loop {
646-
let bytes_written = gzip_reader.read_buf(&mut buffer).await?;
647-
// EOF or buffer is full
648-
if bytes_written == 0 {
611+
let change_counter =
612+
match self.use_compression {
613+
CompressionKind::None => {
614+
self.client
615+
.put_object()
616+
.bucket(&self.bucket)
617+
.key(format!("{}-{}/db.db", self.db_name, self.generation))
618+
.body(ByteStream::from_path(&self.db_path).await?)
619+
.send()
620+
.await?;
621+
let mut reader = tokio::fs::File::open(&self.db_path).await?;
622+
Self::read_change_counter(&mut reader).await?
623+
}
624+
CompressionKind::Gzip => {
625+
let mut reader = tokio::fs::File::open(&self.db_path).await?;
626+
let buf_reader = tokio::io::BufReader::new(reader.try_clone().await?);
627+
let mut gzip_reader =
628+
async_compression::tokio::bufread::GzipEncoder::new(buf_reader);
629+
630+
let key = format!("{}-{}/db.gz", self.db_name, self.generation);
631+
632+
// Unfortunally we can send the gzip output in a single call without buffering
633+
// the whole snapshot in memory because S3 requires the `Content-Length` header
634+
// to be set.
635+
let upload_id = self
636+
.client
637+
.create_multipart_upload()
638+
.bucket(&self.bucket)
639+
.key(key.clone())
640+
.send()
641+
.await?
642+
.upload_id
643+
.ok_or_else(|| anyhow::anyhow!("missing upload_id"))?;
644+
645+
let chunk_sizes = &[
646+
5 * 1024 * 1024,
647+
10 * 1024 * 1024,
648+
25 * 1024 * 1024,
649+
50 * 1024 * 1024,
650+
100 * 1024 * 1024,
651+
];
652+
653+
const LAST_PART: i32 = 10_000;
654+
let mut parts = Vec::new();
655+
let mut has_reached_eof = false;
656+
657+
// S3 allows a maximum of 10_000 parts and each part can size from 5 MiB to
658+
// 5 GiB, except for the last one that has no limits.
659+
//
660+
// See: https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html
661+
for part in 0..LAST_PART - 1 {
662+
// Progressively increase the chunk size every 16 chunks up to the last
663+
// chunk_size. This allows smaller allocate for small databases.
664+
//
665+
// Here's a table of how much data we can chunk:
666+
// ┌────────────┬──────────────────┬───────────────────────┬──────────────────┐
667+
// │ Chunk size │ Number of chunks │ Amount for chunk size │ Cumulative total │
668+
// ├────────────┼──────────────────┼───────────────────────┼──────────────────┤
669+
// │ 5 MiB │ 16 │ 80 MiB │ 80 MiB │
670+
// ├────────────┼──────────────────┼───────────────────────┼──────────────────┤
671+
// │ 10 MiB │ 16 │ 160 MiB │ 240 MiB │
672+
// ├────────────┼──────────────────┼───────────────────────┼──────────────────┤
673+
// │ 25 MiB │ 16 │ 400 MiB │ 560 MiB │
674+
// ├────────────┼──────────────────┼───────────────────────┼──────────────────┤
675+
// │ 50 MiB │ 16 │ 800 MiB │ 1.172 GiB │
676+
// ├────────────┼──────────────────┼───────────────────────┼──────────────────┤
677+
// │ 100 MiB │ 9935 │ 970.215 GiB │ 971.387 GiB │
678+
// └────────────┴──────────────────┴───────────────────────┴──────────────────┘
679+
//
680+
// We can send up to 971 GiB in chunks, which is more than enough for the
681+
// majority of use cases.
682+
//
683+
// The last chunk is reserved for the remaining of the `gzip_reader`
684+
let chunk_size =
685+
chunk_sizes[((part / 16) as usize).min(chunk_sizes.len() - 1)];
686+
687+
let mut buffer = bytes::BytesMut::with_capacity(chunk_size);
688+
loop {
689+
let bytes_written = gzip_reader.read_buf(&mut buffer).await?;
690+
// EOF or buffer is full
691+
if bytes_written == 0 {
692+
break;
693+
}
694+
}
695+
696+
// EOF
697+
if buffer.is_empty() {
698+
has_reached_eof = true;
649699
break;
650700
}
701+
702+
let part_out = self
703+
.client
704+
.upload_part()
705+
.bucket(&self.bucket)
706+
.key(key.clone())
707+
.upload_id(upload_id.clone())
708+
.body(ByteStream::from(buffer.freeze()))
709+
.part_number(part + 1)
710+
.send()
711+
.await?;
712+
713+
parts.push(
714+
CompletedPart::builder()
715+
.part_number(part + 1)
716+
.e_tag(part_out.e_tag.ok_or_else(|| {
717+
anyhow::anyhow!("e_tag missing from part upload")
718+
})?)
719+
.build(),
720+
);
651721
}
652722

653-
// EOF
654-
if buffer.is_empty() {
655-
break;
723+
// If the gzip stream has not reached EOF we need to send the last part to S3.
724+
// Since we don't know the size of the stream and we can't be sure if it fits in
725+
// memory, we save it into a file to allow streaming.
726+
//
727+
// This would only happen to databases that are around ~1 TiB.
728+
if !has_reached_eof {
729+
let last_chunk_path =
730+
format!("{}-{}/db.last-chunk.gz", self.db_name, self.generation);
731+
let mut last_chunk_file = tokio::fs::File::create(&last_chunk_path).await?;
732+
tokio::io::copy(&mut gzip_reader, &mut last_chunk_file).await?;
733+
734+
let part_out = self
735+
.client
736+
.upload_part()
737+
.bucket(&self.bucket)
738+
.key(key.clone())
739+
.upload_id(upload_id.clone())
740+
.body(ByteStream::from_path(last_chunk_path).await?)
741+
.part_number(LAST_PART) // last chunk
742+
.send()
743+
.await?;
744+
745+
parts.push(
746+
CompletedPart::builder()
747+
.part_number(LAST_PART)
748+
.e_tag(part_out.e_tag.ok_or_else(|| {
749+
anyhow::anyhow!("e_tag missing from part upload")
750+
})?)
751+
.build(),
752+
);
656753
}
657754

658-
let part_out = self
659-
.client
660-
.upload_part()
755+
self.client
756+
.complete_multipart_upload()
757+
.upload_id(upload_id)
661758
.bucket(&self.bucket)
662-
.key(key.clone())
663-
.upload_id(upload_id.clone())
664-
.body(ByteStream::from(buffer.freeze()))
665-
.part_number(part)
759+
.key(key)
760+
.multipart_upload(
761+
CompletedMultipartUpload::builder()
762+
.set_parts(Some(parts))
763+
.build(),
764+
)
666765
.send()
667766
.await?;
668767

669-
parts.push(
670-
CompletedPart::builder()
671-
.part_number(part)
672-
.e_tag(
673-
part_out.e_tag.ok_or_else(|| {
674-
anyhow::anyhow!("e_tag missing from part upload")
675-
})?,
676-
)
677-
.build(),
678-
);
768+
Self::read_change_counter(&mut reader).await?
679769
}
680-
681-
self.client
682-
.complete_multipart_upload()
683-
.upload_id(upload_id)
684-
.bucket(&self.bucket)
685-
.key(key)
686-
.multipart_upload(
687-
CompletedMultipartUpload::builder()
688-
.set_parts(Some(parts))
689-
.build(),
690-
)
691-
.send()
692-
.await?;
693-
694-
Self::read_change_counter(&mut reader).await?
695-
}
696-
};
770+
};
697771

698772
/* FIXME: we can't rely on the change counter in WAL mode:
699773
** "In WAL mode, changes to the database are detected using the wal-index and

0 commit comments

Comments
 (0)