Skip to content
This repository was archived by the owner on Oct 18, 2023. It is now read-only.

Commit 43d035f

Browse files
committed
extract upload_s3_multipart
1 parent d0dd4c7 commit 43d035f

File tree

2 files changed

+174
-162
lines changed

2 files changed

+174
-162
lines changed

bottomless/src/read.rs

Lines changed: 149 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,10 @@ use crate::replicator::CompressionKind;
22
use crate::wal::WalFrameHeader;
33
use anyhow::Result;
44
use async_compression::tokio::bufread::GzipDecoder;
5-
use aws_sdk_s3::primitives::ByteStream;
5+
use aws_sdk_s3::{
6+
primitives::ByteStream,
7+
types::{CompletedMultipartUpload, CompletedPart},
8+
};
69
use std::io::ErrorKind;
710
use std::pin::Pin;
811
use tokio::io::{AsyncRead, AsyncReadExt, BufReader};
@@ -55,3 +58,148 @@ impl BatchReader {
5558
Ok(())
5659
}
5760
}
61+
62+
pub async fn upload_s3_multipart(
63+
client: &aws_sdk_s3::Client,
64+
key: &str,
65+
bucket: &str,
66+
mut reader: impl AsyncRead + Unpin,
67+
) -> Result<()> {
68+
let upload_id = client
69+
.create_multipart_upload()
70+
.bucket(bucket)
71+
.key(key)
72+
.send()
73+
.await?
74+
.upload_id
75+
.ok_or_else(|| anyhow::anyhow!("missing upload_id"))?;
76+
77+
let chunk_sizes = &[
78+
5 * 1024 * 1024,
79+
10 * 1024 * 1024,
80+
25 * 1024 * 1024,
81+
50 * 1024 * 1024,
82+
100 * 1024 * 1024,
83+
];
84+
85+
const LAST_PART: i32 = 10_000;
86+
let mut parts = Vec::new();
87+
let mut has_reached_eof = false;
88+
89+
// S3 allows a maximum of 10_000 parts and each part can size from 5 MiB to
90+
// 5 GiB, except for the last one that has no limits.
91+
//
92+
// See: https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html
93+
for part in 0..LAST_PART - 1 {
94+
// Progressively increase the chunk size every 16 chunks up to the last
95+
// chunk_size. This allows smaller allocations for small databases.
96+
//
97+
// Here's a table of how much data we can chunk:
98+
// ┌────────────┬──────────────────┬───────────────────────┬──────────────────┐
99+
// │ Chunk size │ Number of chunks │ Amount for chunk size │ Cumulative total │
100+
// ├────────────┼──────────────────┼───────────────────────┼──────────────────┤
101+
// │ 5 MiB │ 16 │ 80 MiB │ 80 MiB │
102+
// ├────────────┼──────────────────┼───────────────────────┼──────────────────┤
103+
// │ 10 MiB │ 16 │ 160 MiB │ 240 MiB │
104+
// ├────────────┼──────────────────┼───────────────────────┼──────────────────┤
105+
// │ 25 MiB │ 16 │ 400 MiB │ 640 MiB │
106+
// ├────────────┼──────────────────┼───────────────────────┼──────────────────┤
107+
// │ 50 MiB │ 16 │ 800 MiB │ 1.406 GiB │
108+
// ├────────────┼──────────────────┼───────────────────────┼──────────────────┤
109+
// │ 100 MiB │ 9935 │ 970.215 GiB │ 971.621 GiB │
110+
// └────────────┴──────────────────┴───────────────────────┴──────────────────┘
111+
//
112+
// We can send up to 971 GiB in chunks, which is more than enough for the
113+
// majority of use cases.
114+
//
115+
// The last chunk is reserved for the remaining of the `gzip_reader`
116+
let chunk_size = chunk_sizes[((part / 16) as usize).min(chunk_sizes.len() - 1)];
117+
118+
let mut buffer = bytes::BytesMut::with_capacity(chunk_size);
119+
loop {
120+
let bytes_written = reader.read_buf(&mut buffer).await?;
121+
// EOF or buffer is full
122+
if bytes_written == 0 {
123+
break;
124+
}
125+
}
126+
127+
// EOF
128+
if buffer.is_empty() {
129+
has_reached_eof = true;
130+
break;
131+
}
132+
133+
let part_out = client
134+
.upload_part()
135+
.bucket(bucket)
136+
.key(key)
137+
.upload_id(upload_id.clone())
138+
.body(ByteStream::from(buffer.freeze()))
139+
.part_number(part + 1)
140+
.send()
141+
.await?;
142+
143+
parts.push(
144+
CompletedPart::builder()
145+
.part_number(part + 1)
146+
.e_tag(
147+
part_out
148+
.e_tag
149+
.ok_or_else(|| anyhow::anyhow!("e_tag missing from part upload"))?,
150+
)
151+
.build(),
152+
);
153+
}
154+
155+
// If the gzip stream has not reached EOF we need to send the last part to S3.
156+
// Since we don't know the size of the stream and we can't be sure if it fits in
157+
// memory, we save it into a file to allow streaming.
158+
//
159+
// This would only happen to databases that are around ~1 TiB.
160+
if !has_reached_eof {
161+
let mut last_chunk_path = std::env::temp_dir();
162+
last_chunk_path.push(rand::random::<u32>().to_string());
163+
164+
let mut last_chunk_file = tokio::fs::File::create(&last_chunk_path).await?;
165+
tokio::io::copy(&mut reader, &mut last_chunk_file).await?;
166+
167+
let part_out = client
168+
.upload_part()
169+
.bucket(bucket)
170+
.key(key)
171+
.upload_id(upload_id.clone())
172+
.body(ByteStream::from_path(&last_chunk_path).await?)
173+
.part_number(LAST_PART)
174+
.send()
175+
.await?;
176+
177+
parts.push(
178+
CompletedPart::builder()
179+
.part_number(LAST_PART)
180+
.e_tag(
181+
part_out
182+
.e_tag
183+
.ok_or_else(|| anyhow::anyhow!("e_tag missing from part upload"))?,
184+
)
185+
.build(),
186+
);
187+
188+
let _ = tokio::fs::remove_file(last_chunk_path).await;
189+
}
190+
191+
client
192+
.complete_multipart_upload()
193+
.upload_id(upload_id)
194+
.bucket(bucket)
195+
.key(key)
196+
.multipart_upload(
197+
CompletedMultipartUpload::builder()
198+
.set_parts(Some(parts))
199+
.build(),
200+
)
201+
.send()
202+
.await?;
203+
204+
Ok(())
205+
}

bottomless/src/replicator.rs

Lines changed: 25 additions & 161 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use crate::backup::WalCopier;
2-
use crate::read::BatchReader;
2+
use crate::read::{upload_s3_multipart, BatchReader};
33
use crate::transaction_cache::TransactionPageCache;
44
use crate::wal::WalFileReader;
55
use anyhow::anyhow;
@@ -9,7 +9,6 @@ use aws_sdk_s3::operation::get_object::builders::GetObjectFluentBuilder;
99
use aws_sdk_s3::operation::list_objects::builders::ListObjectsFluentBuilder;
1010
use aws_sdk_s3::operation::list_objects::ListObjectsOutput;
1111
use aws_sdk_s3::primitives::ByteStream;
12-
use aws_sdk_s3::types::{CompletedMultipartUpload, CompletedPart};
1312
use aws_sdk_s3::{Client, Config};
1413
use bytes::{Buf, Bytes, BytesMut};
1514
use chrono::{DateTime, LocalResult, TimeZone, Utc};
@@ -608,168 +607,33 @@ impl Replicator {
608607
return Ok(());
609608
}
610609
tracing::debug!("Snapshotting {}", self.db_path);
611-
let change_counter =
612-
match self.use_compression {
613-
CompressionKind::None => {
614-
self.client
615-
.put_object()
616-
.bucket(&self.bucket)
617-
.key(format!("{}-{}/db.db", self.db_name, self.generation))
618-
.body(ByteStream::from_path(&self.db_path).await?)
619-
.send()
620-
.await?;
621-
let mut reader = tokio::fs::File::open(&self.db_path).await?;
622-
Self::read_change_counter(&mut reader).await?
623-
}
624-
CompressionKind::Gzip => {
625-
let mut reader = tokio::fs::File::open(&self.db_path).await?;
626-
let buf_reader = tokio::io::BufReader::new(reader.try_clone().await?);
627-
let mut gzip_reader =
628-
async_compression::tokio::bufread::GzipEncoder::new(buf_reader);
629-
630-
let key = format!("{}-{}/db.gz", self.db_name, self.generation);
631-
632-
// Unfortunally we can't send the gzip output in a single call without buffering
633-
// the whole snapshot in memory because S3 requires the `Content-Length` header
634-
// to be set.
635-
let upload_id = self
636-
.client
637-
.create_multipart_upload()
638-
.bucket(&self.bucket)
639-
.key(key.clone())
640-
.send()
641-
.await?
642-
.upload_id
643-
.ok_or_else(|| anyhow::anyhow!("missing upload_id"))?;
644-
645-
let chunk_sizes = &[
646-
5 * 1024 * 1024,
647-
10 * 1024 * 1024,
648-
25 * 1024 * 1024,
649-
50 * 1024 * 1024,
650-
100 * 1024 * 1024,
651-
];
652-
653-
const LAST_PART: i32 = 10_000;
654-
let mut parts = Vec::new();
655-
let mut has_reached_eof = false;
656-
657-
// S3 allows a maximum of 10_000 parts and each part can size from 5 MiB to
658-
// 5 GiB, except for the last one that has no limits.
659-
//
660-
// See: https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html
661-
for part in 0..LAST_PART - 1 {
662-
// Progressively increase the chunk size every 16 chunks up to the last
663-
// chunk_size. This allows smaller allocations for small databases.
664-
//
665-
// Here's a table of how much data we can chunk:
666-
// ┌────────────┬──────────────────┬───────────────────────┬──────────────────┐
667-
// │ Chunk size │ Number of chunks │ Amount for chunk size │ Cumulative total │
668-
// ├────────────┼──────────────────┼───────────────────────┼──────────────────┤
669-
// │ 5 MiB │ 16 │ 80 MiB │ 80 MiB │
670-
// ├────────────┼──────────────────┼───────────────────────┼──────────────────┤
671-
// │ 10 MiB │ 16 │ 160 MiB │ 240 MiB │
672-
// ├────────────┼──────────────────┼───────────────────────┼──────────────────┤
673-
// │ 25 MiB │ 16 │ 400 MiB │ 640 MiB │
674-
// ├────────────┼──────────────────┼───────────────────────┼──────────────────┤
675-
// │ 50 MiB │ 16 │ 800 MiB │ 1.406 GiB │
676-
// ├────────────┼──────────────────┼───────────────────────┼──────────────────┤
677-
// │ 100 MiB │ 9935 │ 970.215 GiB │ 971.621 GiB │
678-
// └────────────┴──────────────────┴───────────────────────┴──────────────────┘
679-
//
680-
// We can send up to 971 GiB in chunks, which is more than enough for the
681-
// majority of use cases.
682-
//
683-
// The last chunk is reserved for the remaining of the `gzip_reader`
684-
let chunk_size =
685-
chunk_sizes[((part / 16) as usize).min(chunk_sizes.len() - 1)];
686-
687-
let mut buffer = bytes::BytesMut::with_capacity(chunk_size);
688-
loop {
689-
let bytes_written = gzip_reader.read_buf(&mut buffer).await?;
690-
// EOF or buffer is full
691-
if bytes_written == 0 {
692-
break;
693-
}
694-
}
695-
696-
// EOF
697-
if buffer.is_empty() {
698-
has_reached_eof = true;
699-
break;
700-
}
701-
702-
let part_out = self
703-
.client
704-
.upload_part()
705-
.bucket(&self.bucket)
706-
.key(key.clone())
707-
.upload_id(upload_id.clone())
708-
.body(ByteStream::from(buffer.freeze()))
709-
.part_number(part + 1)
710-
.send()
711-
.await?;
712-
713-
parts.push(
714-
CompletedPart::builder()
715-
.part_number(part + 1)
716-
.e_tag(part_out.e_tag.ok_or_else(|| {
717-
anyhow::anyhow!("e_tag missing from part upload")
718-
})?)
719-
.build(),
720-
);
721-
}
610+
let change_counter = match self.use_compression {
611+
CompressionKind::None => {
612+
self.client
613+
.put_object()
614+
.bucket(&self.bucket)
615+
.key(format!("{}-{}/db.db", self.db_name, self.generation))
616+
.body(ByteStream::from_path(&self.db_path).await?)
617+
.send()
618+
.await?;
619+
let mut reader = tokio::fs::File::open(&self.db_path).await?;
620+
Self::read_change_counter(&mut reader).await?
621+
}
622+
CompressionKind::Gzip => {
623+
let mut reader = tokio::fs::File::open(&self.db_path).await?;
624+
let buf_reader = tokio::io::BufReader::new(reader.try_clone().await?);
625+
let gzip_reader = async_compression::tokio::bufread::GzipEncoder::new(buf_reader);
722626

723-
// If the gzip stream has not reached EOF we need to send the last part to S3.
724-
// Since we don't know the size of the stream and we can't be sure if it fits in
725-
// memory, we save it into a file to allow streaming.
726-
//
727-
// This would only happen to databases that are around ~1 TiB.
728-
if !has_reached_eof {
729-
let last_chunk_path =
730-
format!("{}-{}/db.last-chunk.gz", self.db_name, self.generation);
731-
let mut last_chunk_file = tokio::fs::File::create(&last_chunk_path).await?;
732-
tokio::io::copy(&mut gzip_reader, &mut last_chunk_file).await?;
733-
734-
let part_out = self
735-
.client
736-
.upload_part()
737-
.bucket(&self.bucket)
738-
.key(key.clone())
739-
.upload_id(upload_id.clone())
740-
.body(ByteStream::from_path(&last_chunk_path).await?)
741-
.part_number(LAST_PART) // last chunk
742-
.send()
743-
.await?;
744-
745-
parts.push(
746-
CompletedPart::builder()
747-
.part_number(LAST_PART)
748-
.e_tag(part_out.e_tag.ok_or_else(|| {
749-
anyhow::anyhow!("e_tag missing from part upload")
750-
})?)
751-
.build(),
752-
);
627+
let key = format!("{}-{}/db.gz", self.db_name, self.generation);
753628

754-
let _ = tokio::fs::remove_file(last_chunk_path).await;
755-
}
629+
// Unfortunally we can't send the gzip output in a single call without buffering
630+
// the whole snapshot in memory because S3 requires the `Content-Length` header
631+
// to be set.
632+
upload_s3_multipart(&self.client, &key, &self.bucket, gzip_reader).await?;
756633

757-
self.client
758-
.complete_multipart_upload()
759-
.upload_id(upload_id)
760-
.bucket(&self.bucket)
761-
.key(key)
762-
.multipart_upload(
763-
CompletedMultipartUpload::builder()
764-
.set_parts(Some(parts))
765-
.build(),
766-
)
767-
.send()
768-
.await?;
769-
770-
Self::read_change_counter(&mut reader).await?
771-
}
772-
};
634+
Self::read_change_counter(&mut reader).await?
635+
}
636+
};
773637

774638
/* FIXME: we can't rely on the change counter in WAL mode:
775639
** "In WAL mode, changes to the database are detected using the wal-index and

0 commit comments

Comments
 (0)