Skip to content
This repository was archived by the owner on Oct 18, 2023. It is now read-only.

bottomless: stream gzip snapshot #585

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions bottomless/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ arc-swap = "1.6"
chrono = "0.4.23"
uuid = "1.4.1"
rand = "0.8.5"
tempfile = "3.3.0"

[features]
libsql_linked_statically = []
Expand Down
175 changes: 174 additions & 1 deletion bottomless/src/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ use crate::replicator::CompressionKind;
use crate::wal::WalFrameHeader;
use anyhow::Result;
use async_compression::tokio::bufread::GzipDecoder;
use aws_sdk_s3::primitives::ByteStream;
use aws_sdk_s3::{
primitives::ByteStream,
types::{CompletedMultipartUpload, CompletedPart},
};
use std::io::ErrorKind;
use std::pin::Pin;
use tokio::io::{AsyncRead, AsyncReadExt, BufReader};
Expand Down Expand Up @@ -55,3 +58,173 @@ impl BatchReader {
Ok(())
}
}

pub async fn upload_s3_multipart(
client: &aws_sdk_s3::Client,
key: &str,
bucket: &str,
reader: impl AsyncRead + Unpin,
) -> Result<()> {
let upload_id = client
.create_multipart_upload()
.bucket(bucket)
.key(key)
.send()
.await?
.upload_id
.ok_or_else(|| anyhow::anyhow!("missing upload_id"))?;

let parts = upload_s3_parts(client, upload_id.as_str(), key, bucket, reader).await;

match parts {
Ok(parts) => {
client
.complete_multipart_upload()
.upload_id(upload_id)
.bucket(bucket)
.key(key)
.multipart_upload(
CompletedMultipartUpload::builder()
.set_parts(Some(parts))
.build(),
)
.send()
.await?;

Ok(())
}
Err(err) => {
client
.abort_multipart_upload()
.upload_id(upload_id)
.bucket(bucket)
.key(key)
.send()
.await?;

Err(err)
}
}
}

async fn upload_s3_parts(
client: &aws_sdk_s3::Client,
upload_id: &str,
key: &str,
bucket: &str,
mut reader: impl AsyncRead + Unpin,
) -> Result<Vec<CompletedPart>> {
let chunk_sizes = &[
5 * 1024 * 1024,
10 * 1024 * 1024,
25 * 1024 * 1024,
50 * 1024 * 1024,
100 * 1024 * 1024,
];

const LAST_PART: i32 = 10_000;
let mut parts = Vec::new();
let mut has_reached_eof = false;

// S3 allows a maximum of 10_000 parts and each part can size from 5 MiB to
// 5 GiB, except for the last one that has no limits.
//
// See: https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html
for part in 0..LAST_PART - 1 {
// Progressively increase the chunk size every 16 chunks up to the last
// chunk_size. This allows smaller allocations for small databases.
//
// Here's a table of how much data we can chunk:
// ┌────────────┬──────────────────┬───────────────────────┬──────────────────┐
// │ Chunk size │ Number of chunks │ Amount for chunk size │ Cumulative total │
// ├────────────┼──────────────────┼───────────────────────┼──────────────────┤
// │ 5 MiB │ 16 │ 80 MiB │ 80 MiB │
// ├────────────┼──────────────────┼───────────────────────┼──────────────────┤
// │ 10 MiB │ 16 │ 160 MiB │ 240 MiB │
// ├────────────┼──────────────────┼───────────────────────┼──────────────────┤
// │ 25 MiB │ 16 │ 400 MiB │ 640 MiB │
// ├────────────┼──────────────────┼───────────────────────┼──────────────────┤
// │ 50 MiB │ 16 │ 800 MiB │ 1.406 GiB │
// ├────────────┼──────────────────┼───────────────────────┼──────────────────┤
// │ 100 MiB │ 9935 │ 970.215 GiB │ 971.621 GiB │
// └────────────┴──────────────────┴───────────────────────┴──────────────────┘
//
// We can send up to 971 GiB in chunks, which is more than enough for the
// majority of use cases.
//
// The last chunk is reserved for the remaining of the `gzip_reader`
let chunk_size = chunk_sizes[((part / 16) as usize).min(chunk_sizes.len() - 1)];

let mut buffer = bytes::BytesMut::with_capacity(chunk_size);
loop {
let bytes_written = reader.read_buf(&mut buffer).await?;
// EOF or buffer is full
if bytes_written == 0 {
break;
}
}

// EOF
if buffer.is_empty() {
has_reached_eof = true;
break;
}

let part_out = client
.upload_part()
.bucket(bucket)
.key(key)
.upload_id(upload_id)
.body(ByteStream::from(buffer.freeze()))
.part_number(part + 1)
.send()
.await?;

parts.push(
CompletedPart::builder()
.part_number(part + 1)
.e_tag(
part_out
.e_tag
.ok_or_else(|| anyhow::anyhow!("e_tag missing from part upload"))?,
)
.build(),
);
}

// If the gzip stream has not reached EOF we need to send the last part to S3.
// Since we don't know the size of the stream and we can't be sure if it fits in
// memory, we save it into a file to allow streaming.
//
// This would only happen to databases that are around ~1 TiB.
if !has_reached_eof {
let last_chunk_file = tempfile::NamedTempFile::new()?;
let mut last_chunk_tokio_file =
tokio::fs::File::from_std(last_chunk_file.as_file().try_clone()?);

tokio::io::copy(&mut reader, &mut last_chunk_tokio_file).await?;

let part_out = client
.upload_part()
.bucket(bucket)
.key(key)
.upload_id(upload_id)
.body(ByteStream::from_path(last_chunk_file.path()).await?)
.part_number(LAST_PART)
.send()
.await?;

parts.push(
CompletedPart::builder()
.part_number(LAST_PART)
.e_tag(
part_out
.e_tag
.ok_or_else(|| anyhow::anyhow!("e_tag missing from part upload"))?,
)
.build(),
);
}

Ok(parts)
}
30 changes: 14 additions & 16 deletions bottomless/src/replicator.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::backup::WalCopier;
use crate::read::BatchReader;
use crate::read::{upload_s3_multipart, BatchReader};
use crate::transaction_cache::TransactionPageCache;
use crate::wal::WalFileReader;
use anyhow::anyhow;
Expand Down Expand Up @@ -614,8 +614,11 @@ impl Replicator {
return Ok(false);
}
tracing::debug!("Snapshotting {}", self.db_path);

let start = Instant::now();
let change_counter = match self.use_compression {

let mut reader = tokio::fs::File::open(&self.db_path).await?;
match self.use_compression {
CompressionKind::None => {
self.client
.put_object()
Expand All @@ -624,23 +627,17 @@ impl Replicator {
.body(ByteStream::from_path(&self.db_path).await?)
.send()
.await?;
let mut reader = tokio::fs::File::open(&self.db_path).await?;
Self::read_change_counter(&mut reader).await?
}
CompressionKind::Gzip => {
// TODO: find a way to compress ByteStream on the fly instead of creating
// an intermediary file.
let (compressed_db_path, change_counter) = self.compress_main_db_file().await?;
let buf_reader = tokio::io::BufReader::new(reader.try_clone().await?);
let gzip_reader = async_compression::tokio::bufread::GzipEncoder::new(buf_reader);

let key = format!("{}-{}/db.gz", self.db_name, self.generation);
self.client
.put_object()
.bucket(&self.bucket)
.key(key)
.body(ByteStream::from_path(compressed_db_path).await?)
.send()
.await?;
let _ = tokio::fs::remove_file(compressed_db_path).await;
change_counter

// Since it's not possible to know the exact size of a gzip stream and the
// PutObject operation requires the Content-Length header to be set, we need to
// send the content in chunks of known size.
upload_s3_multipart(&self.client, &key, &self.bucket, gzip_reader).await?;
}
};

Expand All @@ -650,6 +647,7 @@ impl Replicator {
** incremented on each transaction in WAL mode."
** Instead, we need to consult WAL checksums.
*/
let change_counter = Self::read_change_counter(&mut reader).await?;
let change_counter_key = format!("{}-{}/.changecounter", self.db_name, self.generation);
self.client
.put_object()
Expand Down