Skip to content

[aws-cpp-sdk-s3-crt]: TransferManager for S3CrtClient #2380

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmake/sdks.cmake
Original file line number Diff line number Diff line change
@@ -2,7 +2,7 @@ include(sdksCommon)

set(SDK_DEPENDENCY_BUILD_LIST "")

set(NON_GENERATED_CLIENT_LIST access-management text-to-speech core queues s3-encryption identity-management transfer) ## Manually generated code with a name mimicking client name
set(NON_GENERATED_CLIENT_LIST access-management text-to-speech core queues s3-encryption identity-management transfer transfer-crt) ## Manually generated code with a name mimicking client name

if(REGENERATE_CLIENTS OR REGENERATE_DEFAULTS)
message(STATUS "Checking for SDK generation requirements")
1 change: 1 addition & 0 deletions cmake/sdksCommon.cmake
Original file line number Diff line number Diff line change
@@ -171,6 +171,7 @@ list(APPEND SDK_TEST_PROJECT_LIST "s3-encryption:tests/aws-cpp-sdk-s3-encryption
list(APPEND SDK_TEST_PROJECT_LIST "s3control:tests/aws-cpp-sdk-s3control-integration-tests")
list(APPEND SDK_TEST_PROJECT_LIST "sqs:tests/aws-cpp-sdk-sqs-integration-tests")
list(APPEND SDK_TEST_PROJECT_LIST "transfer:tests/aws-cpp-sdk-transfer-tests")
list(APPEND SDK_TEST_PROJECT_LIST "transfer-crt:tests/aws-cpp-sdk-transfer-crt-tests")
list(APPEND SDK_TEST_PROJECT_LIST "text-to-speech:tests/aws-cpp-sdk-text-to-speech-tests,tests/aws-cpp-sdk-polly-sample")
list(APPEND SDK_TEST_PROJECT_LIST "transcribestreaming:tests/aws-cpp-sdk-transcribestreaming-integration-tests")
list(APPEND SDK_TEST_PROJECT_LIST "eventbridge:tests/aws-cpp-sdk-eventbridge-tests")
53 changes: 53 additions & 0 deletions src/aws-cpp-sdk-transfer-crt/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
add_project(aws-cpp-sdk-transfer-crt
"High-level C++ SDK for file transfer to/from AWS S3 (CRT variant)"
aws-cpp-sdk-s3-crt
aws-cpp-sdk-core)

file( GLOB TRANSFER_HEADERS "include/aws/transfer-crt/*.h" )

file( GLOB TRANSFER_SOURCE "source/transfer-crt/*.cpp" )

if(MSVC)
source_group("Header Files\\aws\\transfer-crt" FILES ${TRANSFER_HEADERS})
source_group("Source Files\\transfer-crt" FILES ${TRANSFER_SOURCE})
endif()

file(GLOB ALL_TRANSFER_HEADERS
${TRANSFER_HEADERS}
)

file(GLOB ALL_TRANSFER_SOURCE
${TRANSFER_SOURCE}
)

file(GLOB ALL_TRANSFER
${ALL_TRANSFER_HEADERS}
${ALL_TRANSFER_SOURCE}
)

set(TRANSFER_INCLUDES
"${CMAKE_CURRENT_SOURCE_DIR}/include/"
)

include_directories(${TRANSFER_INCLUDES})

if(USE_WINDOWS_DLL_SEMANTICS AND BUILD_SHARED_LIBS)
add_definitions("-DAWS_TRANSFER_EXPORTS")
endif()

add_library(${PROJECT_NAME} ${ALL_TRANSFER})
add_library(AWS::${PROJECT_NAME} ALIAS ${PROJECT_NAME})

target_include_directories(${PROJECT_NAME} PUBLIC
$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/include>
$<INSTALL_INTERFACE:include>)
target_link_libraries(${PROJECT_NAME} PRIVATE ${PLATFORM_DEP_LIBS} ${PROJECT_LIBS})

set_compiler_flags(${PROJECT_NAME})
set_compiler_warnings(${PROJECT_NAME})

setup_install()

install (FILES ${ALL_TRANSFER_HEADERS} DESTINATION ${INCLUDE_DIRECTORY}/aws/transfer-crt)

do_packaging()
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/**
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0.
*/
#pragma once

#include <aws/core/utils/memory/AWSMemory.h>
#include <aws/core/utils/memory/stl/AWSString.h>
#include <functional>
#include <iostream>
#include <memory>
#include <mutex>
#include <streambuf>
#include <utility>

namespace Aws {
namespace TransferCrt {

// Used by the classes below to notify the receiver of low-level file errors.
using ErrorCallback = std::function<void(Aws::String someDescriptiveErrorMessage)>;

// Default size for the put buffer (which is bypassed when xsputn is used).
// Measurements in "The Linux Programming Interface" show that a minimum 4096B is
// required when O_SYNC is enabled. Use a larger value to aggregate small writes.
constexpr size_t DEFAULT_BUFSIZE = 1 << 20;

// Helper class for DownloadStream.
//
// This implements only what DownloadStream needs: a simple, file-descriptor based streambuf.
// Hence many std::streambuf operations, such as seekoff/pos, are not supported.
// The expected use-case is that mostly xsputn(const char *, size_t) will be called.
//
// The ErrorCallback that is passed into the constructor is invoked when encountering a
// low-level write error, receiving a string describing the error cause (based on errno).
class FileDescriptorBuf : public std::streambuf {
public:
// Class does not own the file descriptor @fd - caller is responsible for closing it.
FileDescriptorBuf(int fd, ErrorCallback errorCallback, size_t bufsize = DEFAULT_BUFSIZE)
: fd_{fd}, errorCallback_{errorCallback}, buffer_{Aws::MakeUniqueArray<char>(bufsize, "FdBuf")} {
setp(buffer_.get(), buffer_.get() + bufsize);
}

protected:
int sync() override;
int overflow(int_type c) override;
std::streamsize xsputn(const char *data, std::streamsize datalen) override;

private:
int fd_;
ErrorCallback errorCallback_;
Aws::UniqueArrayPtr<char> buffer_;
};

// Download output stream class for a given @dstPath.
//
// This takes an Error Callback which gets invoked with descriptive error message when a failure
// occurs in either this class, or the contained FileDescriptorBuf.
//
// The constructor does the following:
// 1. Create any missing directory components of @dstPath.
// 2. Generate a temporary .partial file to write to. This file will be renamed into @dstPath
// upon successful completion, or removed on failure. The implementation uses mkostemp(3),
// which is the reason we are using a file-descriptor based backend.
// 3. Open a file descriptor to the temporary file and advise the kernel about its use.
// 4. Complete the construction of the iostream, using a FileDescriptorBuf as rdbuf.
//
// The Error Callback @ec may be invoked already before the constructor call returns.
// It is also invoked by the contained FileDescriptorBuf, and during close().
class DownloadStream final : public std::iostream {
public:
// Create a DownloadStream for @dstPath, calling @ec if any failure happens.
// Enabling O_SYNC via @sync_always is optional, as it degrades download performance.
DownloadStream(const Aws::String &dstPath, ErrorCallback ec, bool sync_always = false);
~DownloadStream();

// Set eof, close the temporary file and atomically rename it into @dstPath.
void close() noexcept;

private:
void _error(Aws::String msg) {
setstate(std::ios::badbit);
errorCallback_(std::move(msg));
}

private:
const Aws::String dstPath_;
Aws::String dstTempPath_;
ErrorCallback errorCallback_;

int fd_ = -1;
Aws::UniquePtr<FileDescriptorBuf> buf_;
std::mutex close_mutex_;
};

} // namespace TransferCrt
} // namespace Aws
100 changes: 100 additions & 0 deletions src/aws-cpp-sdk-transfer-crt/include/aws/transfer-crt/Metadata.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/**
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0.
*/

#pragma once

#include <aws/core/utils/memory/stl/AWSString.h>
#include <aws/core/utils/memory/stl/AWSMap.h>
#include <chrono>
#include <utility>

namespace Aws {
namespace TransferCrt {

// WriteMetadata specifies blob metadata information.
// @uri: destination URI of the blob, in <scheme>://<bucket>/<path> format.
// @content_type: MIME type of the @uri content.
// @content_encoding: content encoding that was applied.
// @metadata: metadata key/value pairs.
// @tags: S3 object storage tagging key/value pairs.
struct WriteMetadata {
// Constructor for the default case - just create a blob at @uri.
explicit WriteMetadata(const Aws::String &uri) : WriteMetadata(uri, "", "") {}

WriteMetadata(const Aws::String &uri,
const Aws::String &content_type,
const Aws::String &content_encoding,
const Aws::Map<Aws::String, Aws::String> &metadata = {})
: uri{uri},
content_type{content_type},
content_encoding{content_encoding},
metadata{metadata} {}

// Destination URI of the blob, in <scheme>://<bucket>/<path> format.
Aws::String uri;

// Content-Type (MIME type) of @uri.
Aws::String content_type;

// Content-Encoding (if any) of @uri.
Aws::String content_encoding;

// Metadata key/value pairs.
Aws::Map<Aws::String, Aws::String> metadata;

// S3 Object Tagging key/value pairs (S3 objects only).
// These require s3:PutObjectTagging permissions on @uri, otherwise requests fail with 403.
// The tags also have to satisfy the following syntax restrictions and limits:
// * https://docs.aws.amazon.com/AmazonS3/latest/userguide/tagging-managing.html
// * https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/Using_Tags.html
// * https://docs.aws.amazon.com/awsaccountbilling/latest/aboutv2/allocation-tag-restrictions.html
Aws::Map<Aws::String, Aws::String> tags;
};

// ReadMetadata encapsulates the metadata associated with a given blob.
struct ReadMetadata {
// URI of the blob.
Aws::String uri;

// Size of @path in bytes.
size_t size = 0;

// Date/time the blob was last modified.
std::chrono::system_clock::time_point last_modified;

// MIME type of the blob.
Aws::String content_type;

// Indicates whether the data at @path is stored in compressed format (RFC 7231, 3.1.2.2).
Aws::String content_encoding;

// ETag value.
Aws::String etag;

// Metadata key/value pairs.
Aws::Map<Aws::String, Aws::String> metadata;
};

static inline std::ostream &operator<<(std::ostream &os, const ReadMetadata &md) {
os << "ReadMetadata(\"" << md.uri << "\", " << md.size;

time_t lm = std::chrono::system_clock::to_time_t(md.last_modified);
if (lm) { // Format: "Wed Jun 30 21:49:08 1993\n" - truncate before " 1993\n":
os << ", " << Aws::String{ctime(&lm), 19};
}
if (!md.etag.empty()) os << ", " << md.etag;
if (!md.content_type.empty()) {
os << ", " << md.content_type;
if (!md.content_encoding.empty()) os << " (" << md.content_encoding << ")";
}
if (!md.metadata.empty()) {
os << ",";
for (const auto &e : md.metadata) os << " " << e.first << "=" << e.second;
}
return os << ")";
}

} // namespace TransferCrt
} // namespace Aws
201 changes: 201 additions & 0 deletions src/aws-cpp-sdk-transfer-crt/include/aws/transfer-crt/TransferHandle.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
/**
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0.
*/

#pragma once

#include <aws/core/client/AWSError.h>
#include <aws/core/client/AsyncCallerContext.h>
#include <aws/core/utils/memory/AWSMemory.h>
#include <aws/core/utils/memory/stl/AWSMap.h>
#include <aws/core/utils/memory/stl/AWSString.h>
#include <aws/core/utils/DateTime.h>
#include <aws/s3-crt/S3CrtErrors.h>
#include <aws/transfer-crt/Metadata.h>

#include <atomic>
#include <condition_variable>
#include <fstream>
#include <future>
#include <iostream>
#include <memory>
#include <mutex>
#include <utility>
#include <assert.h>

namespace Aws {
namespace TransferCrt {

enum class TransferStatus : uint8_t {
//
// TransferHandle states:
//
// The three terminal states are CANCELED, FAILED, and COMPLETED.
//
// NOT_STARTED indicates that no S3CrtClient API call has been made yet. This means that no
// status/progress callbacks will be invoked. Unlike IN_PROGRESS, we do not have to wait for
// these when transitioning to FAILED or CANCELED.
//
// IN_PROGRESS indicates that an API call was made, which means we must wait until its final
// .shutdown_callback (Handle{Put,Get}ObjectResponse()) is called. The AWS SDK and aws-c-s3
// expect all request data structures to still be alive until then (in particular the response
// body, which may be updated several times in order to record errors).
//
// FAILING is an intermediate state that is entered from IN_PROGRESS to record that a failure
// (e.g. local write error) occurred while the transfer was progressing. It will be converted
// into FAILED once the .shutdown_callback is called.
//
NOT_STARTED, // No S3CrtClient API call has been made yet.
IN_PROGRESS, // An API call was made, and the transfer is running.
FAILING, // A failure occurred while the transfer was running.
CANCELED, // Transfer was canceled.
FAILED, // Transfer failed.
COMPLETED, // Transfer completed successfully.
};
Aws::String StatusToString(TransferStatus status);
Aws::OStream &operator<<(Aws::OStream &s, TransferStatus status);

// Interface for interacting with asynchronous UPLOAD/DOWNLOAD transfers.
class TransferHandle final {
public:
// Upload to the destination specified in @md.
TransferHandle(const WriteMetadata &md,
const std::shared_ptr<const Aws::Client::AsyncCallerContext> &context);

// Download from @srcUri.
TransferHandle(const Aws::String &srcUri,
const std::shared_ptr<const Aws::Client::AsyncCallerContext> &context);

/*
* Thread-safe Getter methods (values set only at initialization time).
*/

// Bucket/key part of blob location in Amazon S3.
Aws::String GetBucket() const;
Aws::String GetKey() const;

// S3 storage tags (key/value pairs).
const Aws::String &GetTagging() const { return tagging_; }

// Get the user-provided context that was passed at construction time.
std::shared_ptr<const Aws::Client::AsyncCallerContext> GetContext() const { return m_context; }

/*
* Get/set methods called after initialization that are based on atomic variables.
* These methods synchronize-with each other via the affected atomic variable.
*/
// Get/Set the CANCEL flag, which cancels any further processing.
bool ShouldContinue() const { return !m_cancel.load(); }
void Cancel() { m_cancel.store(true); }

// Get/update the cumulative byte count transferred since start of the transfer.
uint64_t GetBytesTransferred() const { return m_bytesTransferred.load(); }
void UpdateBytesTransferred(uint64_t amount) { m_bytesTransferred.fetch_add(amount); }

/*
* Getters/setters called after initialization that synchronize via @m_getterSetterLock.
*/

// Return the blob metadata information (populated for both download and upload).
// Pre-condition: function may only be called if BytesTotalSizeHasBeenSet() returns true.
const ReadMetadata &GetReadMetadata() const {
std::lock_guard<std::mutex> guard{m_getterSetterLock};
assert(total_size_has_been_set_);
return rmd_;
}

// Set the total size of the object being transferred. May be called at most once.
void SetBytesTotalSize(uint64_t size) {
std::lock_guard<std::mutex> guard{m_getterSetterLock};
assert(!total_size_has_been_set_);
total_size_has_been_set_ = true;
rmd_.size = size;
}
// Check whether SetBytesTotalSize has been called (see TransferManager::GetObject for details).
bool BytesTotalSizeHasBeenSet() const { return total_size_has_been_set_; }

// Set the LastModified time of the blob.
void SetLastModified(const Aws::Utils::DateTime &lastDateTime) {
std::lock_guard<std::mutex> guard{m_getterSetterLock};
rmd_.last_modified = lastDateTime.UnderlyingTimestamp();
}

// Set the ETag of the blob.
void SetETag(const Aws::String &etag) {
std::lock_guard<std::mutex> guard{m_getterSetterLock};
rmd_.etag = etag;
}

// Set the Content-Type of the blob.
void SetContentType(const Aws::String &contentType) {
std::lock_guard<std::mutex> guard{m_getterSetterLock};
rmd_.content_type = contentType;
}

// Set the Content-Encoding of the blob (e.g. "gzip" when compressing content).
void SetContentEncoding(const Aws::String &encoding) {
std::lock_guard<std::mutex> guard{m_getterSetterLock};
rmd_.content_encoding = encoding;
}

// Set the metadata key/value pairs associated with the blob.
void SetMetadata(const Aws::Map<Aws::String, Aws::String> &metadata) {
std::lock_guard<std::mutex> guard{m_getterSetterLock};
rmd_.metadata = metadata;
}

// Get/set the last error that was encountered (if any).
// Check GetStatus() first, as the default value is S3Crt::S3CrtErrors::UNKNOWN.
const Aws::Client::AWSError<Aws::S3Crt::S3CrtErrors> GetLastError() const {
std::lock_guard<std::mutex> guard{m_getterSetterLock};
return m_lastError;
}
void SetError(const Aws::Client::AWSError<Aws::S3Crt::S3CrtErrors> &error) {
std::lock_guard<std::mutex> guard{m_getterSetterLock};
m_lastError = error;
}

/*
* Modifiers that only synchronize via @m_statusLock.
*/
// Get current TransferStatus of this handle.
// Synchronizes-with UpdateStatus.
TransferStatus GetStatus() const;

// Block on (internal) condition variable until handle reaches a 'finished' status.
// Synchronizes-with UpdateStatus.
void WaitUntilFinished() const;

// Conditionally transition into @status.
void UpdateStatus(TransferStatus status);

// Return true if @value equals one of the terminal states.
static bool IsFinishedStatus(TransferStatus value) {
return value == TransferStatus::COMPLETED || value == TransferStatus::FAILED ||
value == TransferStatus::CANCELED;
}

private:
std::atomic<uint64_t> m_bytesTransferred{0};
std::atomic<bool> m_cancel{false};
const std::shared_ptr<const Aws::Client::AsyncCallerContext> m_context{nullptr};
// S3 storage tags (upload only).
const Aws::String tagging_;

// Variables that are protected by @m_getterSetterLock:
mutable std::mutex m_getterSetterLock;
// Blob metadata (uri, size, ...) - used for both upload and download blobs.
ReadMetadata rmd_{};
// Flag that indicates whether @rmd_.size has been initialized.
std::atomic<bool> total_size_has_been_set_{false};
Aws::Client::AWSError<S3Crt::S3CrtErrors> m_lastError{S3Crt::S3CrtErrors::UNKNOWN, true};

// Variables that are protected by @m_statusLock:
mutable std::mutex m_statusLock;
mutable std::promise<void> m_finishedSignal;
TransferStatus m_status = TransferStatus::NOT_STARTED;
};

} // namespace TransferCrt
} // namespace Aws
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
/**
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0.
*/

#pragma once

#include <aws/core/client/AsyncCallerContext.h>
#include <aws/core/utils/stream/PreallocatedStreamBuf.h>
#include <aws/s3-crt/S3CrtClient.h>
#include <aws/s3-crt/model/GetObjectRequest.h>
#include <aws/s3-crt/model/PutObjectRequest.h>
#include <aws/transfer-crt/TransferHandle.h>
#include <aws/transfer-crt/DownloadStream.h>
#include <memory>

namespace Aws {
namespace TransferCrt {

// Callback type used for updates.
using HandleUpdatedCallback = std::function<void(const std::shared_ptr<const TransferHandle> &)>;

// Helper structs to carry around a TransferHandle as part of the caller context.
struct UploadContext final : Aws::Client::AsyncCallerContext {
explicit UploadContext(std::shared_ptr<TransferHandle> th) : handle{std::move(th)} {}
std::shared_ptr<TransferHandle> handle;
};

struct DownloadContext final : Aws::Client::AsyncCallerContext {
explicit DownloadContext(std::shared_ptr<TransferHandle> th) : handle{std::move(th)} {}

~DownloadContext() {
if (!dstStreamOwnershipHasBeenTransferred) {
delete dstStream;
}
}

std::shared_ptr<TransferHandle> handle;
bool dstStreamOwnershipHasBeenTransferred = false;
std::iostream *dstStream = nullptr;
std::streambuf *dstStreamBuf = nullptr;
};

// S3 TransferManager for the S3CrtClient.
//
// All public methods are non-blocking and return a pointer to an asynchronous TransferHandle.
class TransferManager final : public std::enable_shared_from_this<TransferManager> {
public:
// Callbacks are invoked under the following conditions:
// - uploadProgressCallback: when the number of bytes-sent changes,
// - downloadProgressCallback: when the number of bytes-received changes,
// - statusChangedCallback: when the handle changes status.
// NOTE: code maintains the invariant that @statusChangedCallback
// is called at most once for a "finished" TransferStatus.
static std::shared_ptr<TransferManager> Create(
std::shared_ptr<Aws::S3Crt::S3CrtClient> s3Client,
const HandleUpdatedCallback &uploadProgressCallback,
const HandleUpdatedCallback &downloadProgressCallback,
const HandleUpdatedCallback &statusChangedCallback);

// Upload from @srcPath or @srcStream to @md.uri via PutObjectAsync.
// If @srcStream is not set, open the input file at @srcPath.
std::shared_ptr<TransferHandle> UploadFile(
const Aws::String &srcPath,
const std::shared_ptr<Aws::IOStream> &srcStream,
const WriteMetadata &md,
const std::shared_ptr<const Aws::Client::AsyncCallerContext> &context = nullptr);

// Download from @srcUri to local @dstPath or @dstStreamBuf, via GetObjectAsync.
// If both @dstPath and @dstStreamBuf are specified, @dstStreamBuf is used.
std::shared_ptr<TransferHandle> DownloadFile(
const Aws::String &srcUri,
const Aws::String &dstPath,
std::streambuf *dstStreamBuf = nullptr,
const std::shared_ptr<const Aws::Client::AsyncCallerContext> &context = nullptr);

private:
// The constructor supports a "cancel all transfers when the first failure is encountered"
// optional policy, which is not exposed to the outside. It enforces the invariant that a
// bulk transfer succeeds only after all of its managed transfers have succeeded.
TransferManager(std::shared_ptr<Aws::S3Crt::S3CrtClient> s3Client,
const HandleUpdatedCallback &uploadProgressCallback,
const HandleUpdatedCallback &downloadProgressCallback,
const HandleUpdatedCallback &statusChangedCallback,
bool cancel_on_first_failure = true);

void PutObject(const std::shared_ptr<Aws::IOStream> &streamToPut,
const std::shared_ptr<TransferHandle> &handle);

void GetObject(const std::shared_ptr<DownloadContext> &context,
const std::shared_ptr<TransferHandle> &handle);

void HandlePutObjectResponse(const Aws::S3Crt::S3CrtClient *,
const Aws::S3Crt::Model::PutObjectRequest &,
const Aws::S3Crt::Model::PutObjectOutcome &,
const std::shared_ptr<const Aws::Client::AsyncCallerContext> &);

void HandleGetObjectResponse(const Aws::S3Crt::S3CrtClient *,
const Aws::S3Crt::Model::GetObjectRequest &,
Aws::S3Crt::Model::GetObjectOutcome,
const std::shared_ptr<const Aws::Client::AsyncCallerContext> &);

// Record that an error has occurred.
// Set FAILED state if no API call has been made yet, otherwise transition to FAILING state.
void Fail(std::shared_ptr<TransferHandle> handle,
std::string errorMsg,
std::string exceptionMsg = "FATAL ERROR");

void OnUploadProgress(const std::shared_ptr<TransferHandle> &handle);
void OnDownloadProgress(const std::shared_ptr<TransferHandle> &handle);
void OnStatusChanged(const std::shared_ptr<TransferHandle> &handle);

private:
std::shared_ptr<Aws::S3Crt::S3CrtClient> s3Client_;
// Cancel all new/pending transfers after the first failure (optional policy):
const bool cancel_on_first_failure_;
std::atomic<bool> failure_occurred_{false};

HandleUpdatedCallback uploadProgressCallback;
HandleUpdatedCallback downloadProgressCallback;
HandleUpdatedCallback statusChangedCallback;
};

} // namespace TransferCrt
} // namespace Aws
153 changes: 153 additions & 0 deletions src/aws-cpp-sdk-transfer-crt/source/transfer-crt/DownloadStream.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
/**
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0.
*/

#include <aws/transfer-crt/DownloadStream.h>
#include <aws/core/platform/FileSystem.h>
#include <aws/core/utils/memory/stl/AWSStringStream.h>

#ifndef _GNU_SOURCE /* mkostemp() */
#define _GNU_SOURCE
#endif
#include <assert.h>
#include <fcntl.h>
#include <stdlib.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <unistd.h>

#include <sstream>

namespace Aws {
namespace TransferCrt {

namespace {
// Return the parent directory of @path, or an empty string if not possible.
Aws::String ParentPath(const Aws::String &path) {
const size_t n = path.find_last_of(Aws::FileSystem::PATH_DELIM);
return n == Aws::String::npos ? "" : path.substr(0, n);
}
} // namespace

/*
* FileDescriptorBuf methods.
*/
int FileDescriptorBuf::overflow(int_type c) {
return sync() == EOF ? EOF : (c == EOF ? 0 : sputc(c));
}

int FileDescriptorBuf::sync() {
if (pbase() && pptr() > pbase()) {
std::streamsize n = xsputn(pbase(), pptr() - pbase());
if (n == EOF) {
return EOF;
}
pbump(-n);
}
return 0;
}

// On failure, invoke the error callback with the description of the errno value.
// It also throws an ios::failure in case exceptions() has been called on the stream.
std::streamsize FileDescriptorBuf::xsputn(const char *data, std::streamsize datalen) {
for (std::streamsize written = 0, n = 0; written < datalen; written += n) {
n = ::write(fd_, data + written, datalen - written);
if (n < 0 && errno != EINTR && errno != EAGAIN) {
Aws::StringStream ss;
ss << "write error: " << ::strerror(errno);
errorCallback_(ss.str());
return EOF;
}
}
return datalen;
}

/*
* DownloadStream methods.
*/
DownloadStream::DownloadStream(const Aws::String &dstPath, ErrorCallback ec, bool sync_always)
: std::iostream{nullptr},
dstPath_{dstPath},
dstTempPath_{dstPath + ".partial.XXXXXX"},
errorCallback_{ec} {
const Aws::String parent_path = ParentPath(dstPath_);
Aws::StringStream ss;

assert(!dstPath_.empty());
assert(errorCallback_);

// Generate any missing directory components.
if (!parent_path.empty() && !Aws::FileSystem::CreateDirectoryIfNotExists(parent_path.c_str(), true)) {
ss << "Failed to create " << dstPath << " parent directories";
_error(ss.str());
return;
}

// Produce unique temporary-file suffix. Use O_SYNC to ensure data gets written out to disk.
fd_ = ::mkostemp(&dstTempPath_[0], sync_always ? O_SYNC : 0);
if (fd_ < 0) {
ss << "Failed to create " << dstTempPath_ << ": " << ::strerror(errno);
_error(ss.str());
return;
}

// Advise the kernel that the data used by @fd_ will not be accessed in the near time.
if (::posix_fadvise(fd_, 0, 0, POSIX_FADV_DONTNEED) < 0) {
ss << "Failed to posix_fadvise('" << dstTempPath_ << "'): " << ::strerror(errno);
_error(ss.str());
return;
}

buf_ = Aws::MakeUnique<FileDescriptorBuf>("FdBuf", fd_, [this](Aws::String writeError) {
Aws::StringStream ss;
ss << "Failed to write " << dstTempPath_ << ": " << std::move(writeError);
_error(ss.str());
return;
});
rdbuf(buf_.get());
}

void DownloadStream::close() noexcept {
Aws::StringStream ss;
std::lock_guard<std::mutex> closer(close_mutex_);

if (eof()) { // Idempotent.
return;
}

// Call rdbuf()->pubsync() one last time, to empty the put-buffer:
flush();

setstate(std::ios::eofbit);

if (fd_ > 0 && ::close(fd_) < 0) {
ss << "Failed to close " << dstTempPath_ << ": " << ::strerror(errno);
setstate(std::ios::failbit);
_error(ss.str());
}
fd_ = -1;

if (bad()) {
_error("Stream is corrupt on close");
} else if (!dstTempPath_.empty()) {
if (::rename(dstTempPath_.c_str(), dstPath_.c_str())) {
ss << "Failed to rename " << dstTempPath_ << ": " << ::strerror(errno);
_error(ss.str());
}
dstTempPath_.clear();
}
}

DownloadStream::~DownloadStream() {
if (fd_ > 0) {
::close(fd_);
}

if (!dstTempPath_.empty()) {
::unlink(dstTempPath_.c_str());
}
}

} // namespace TransferCrt
} // namespace Aws
110 changes: 110 additions & 0 deletions src/aws-cpp-sdk-transfer-crt/source/transfer-crt/TransferHandle.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
/**
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0.
*/

#include <aws/transfer-crt/TransferHandle.h>
#include <numeric>
#include <utility>
#include <assert.h>

namespace Aws {
namespace TransferCrt {

namespace {
using KeyValue = std::pair<Aws::String, Aws::String>;
// Encode @key_values as query-parameter string.
Aws::String encode_query_string(const Aws::Map<Aws::String, Aws::String> &key_values) {
return std::accumulate(key_values.begin(),
key_values.end(),
Aws::String{},
[](const Aws::String &prev, const KeyValue &cur) {
return (prev.empty() ? "" : prev + "&") + cur.first +
(cur.second.empty() ? "" : "=") + cur.second;
});
}
} // namespace

TransferHandle::TransferHandle(const WriteMetadata &md,
const std::shared_ptr<const Aws::Client::AsyncCallerContext> &ctx)
: m_context{ctx}, tagging_{encode_query_string(md.tags)}, rmd_{} {
rmd_.uri = md.uri;
SetContentType(md.content_type);
SetContentEncoding(md.content_encoding);
SetMetadata(md.metadata);
}

TransferHandle::TransferHandle(const Aws::String &srcUri,
const std::shared_ptr<const Aws::Client::AsyncCallerContext> &ctx)
: m_context{ctx}, rmd_{} {
rmd_.uri = srcUri;
}

Aws::String TransferHandle::GetBucket() const {
const size_t start = sizeof("s3://") - 1;
const size_t end = rmd_.uri.find('/', start);
return rmd_.uri.substr(start, end - start);
}

Aws::String TransferHandle::GetKey() const {
const size_t bucket_start = sizeof("s3://") - 1;
const size_t bucket_end = rmd_.uri.find('/', bucket_start);
if (bucket_end == Aws::String::npos) {
return "";
}
return rmd_.uri.substr(bucket_end + 1);
}

void TransferHandle::UpdateStatus(TransferStatus value) {
// Release any pending per-chunk requests on failure.
if (value == TransferStatus::FAILING || value == TransferStatus::FAILED) {
Cancel();
}

std::unique_lock<std::mutex> lock(m_statusLock);
assert(m_status != TransferStatus::FAILING || value == TransferStatus::FAILED);

// The following ensures exactly one transition from "not finished" to "finished":
if (!IsFinishedStatus(m_status) && value != m_status) {
m_status = value;
if (IsFinishedStatus(value)) {
m_finishedSignal.set_value();
}
}
}

TransferStatus TransferHandle::GetStatus() const {
std::lock_guard<std::mutex> lock(m_statusLock);
return m_status;
}

void TransferHandle::WaitUntilFinished() const {
if (!IsFinishedStatus(GetStatus())) {
m_finishedSignal.get_future().wait();
}
}

Aws::String StatusToString(TransferStatus status) {
switch (status) {
case TransferStatus::NOT_STARTED:
return "NOT_STARTED";
case TransferStatus::IN_PROGRESS:
return "IN_PROGRESS";
case TransferStatus::FAILING:
return "FAILING";
case TransferStatus::CANCELED:
return "CANCELED";
case TransferStatus::FAILED:
return "FAILED";
case TransferStatus::COMPLETED:
return "COMPLETED";
}
return "UNKNOWN";
}

Aws::OStream &operator<<(Aws::OStream &s, TransferStatus status) {
return s << StatusToString(status);
}

} // namespace TransferCrt
} // namespace Aws
392 changes: 392 additions & 0 deletions src/aws-cpp-sdk-transfer-crt/source/transfer-crt/TransferManager.cpp

Large diffs are not rendered by default.

30 changes: 30 additions & 0 deletions tests/aws-cpp-sdk-transfer-crt-tests/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
add_project(aws-cpp-sdk-transfer-crt-tests
"Tests for the AWS TransferManager (CRT version) of the C++ SDK"
aws-cpp-sdk-transfer-crt
aws-cpp-sdk-s3-crt
testing-resources
aws-cpp-sdk-core)

# Headers are included in the source so that they show up in Visual Studio.
# They are included elsewhere for consistency.

file(GLOB TRANSFER_TEST_SRC
"${CMAKE_CURRENT_SOURCE_DIR}/*.cpp"
)

if(MSVC AND BUILD_SHARED_LIBS)
add_definitions(-DGTEST_LINKED_AS_SHARED_LIBRARY=1)
endif()

enable_testing()

if(PLATFORM_ANDROID AND BUILD_SHARED_LIBS)
add_library(${PROJECT_NAME} ${TRANSFER_TEST_SRC})
else()
add_executable(${PROJECT_NAME} ${TRANSFER_TEST_SRC})
endif()

set_compiler_flags(${PROJECT_NAME})
set_compiler_warnings(${PROJECT_NAME})

target_link_libraries(${PROJECT_NAME} ${PROJECT_LIBS})
323 changes: 323 additions & 0 deletions tests/aws-cpp-sdk-transfer-crt-tests/DownloadStreamTests.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,323 @@
/**
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0.
*/

#include <aws/core/platform/FileSystem.h>
#include <aws/core/utils/memory/stl/AWSStringStream.h>
#include <aws/transfer-crt/DownloadStream.h>

#include <fstream>
#include <fcntl.h>
#include <stdlib.h>
#include <string.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <unistd.h>

#include <gtest/gtest.h>

namespace Aws {
namespace TransferCrt {
namespace {

TEST(Construction, FileDescriptorBuf) {
static_assert(!std::is_default_constructible<FileDescriptorBuf>{}, "Should not permit default construction");
static_assert(!std::is_trivially_destructible<FileDescriptorBuf>{}, "Non-trivial destructor");
static_assert(std::is_nothrow_destructible<FileDescriptorBuf>{}, "Should not throw during destruction");
static_assert(!std::is_copy_constructible<FileDescriptorBuf>{}, "Should not have copy constructor");
static_assert(!std::is_copy_assignable<FileDescriptorBuf>{}, "Should not permit copy assignment");
static_assert(std::is_move_constructible<FileDescriptorBuf>{}, "Should be move-constructible");
static_assert(std::is_move_assignable<FileDescriptorBuf>{}, "Should support move assignment");
static_assert(!std::is_trivially_move_constructible<FileDescriptorBuf>{}, "Non-trivial move constructor");
static_assert(!std::is_trivially_move_assignable<FileDescriptorBuf>{}, "Non-trivial move assignment");
}

TEST(Construction, DownloadStream) {
static_assert(!std::is_default_constructible<DownloadStream>{}, "Should not permit default construction");
static_assert(!std::is_trivially_destructible<DownloadStream>{}, "Non-trivial destructor");
static_assert(std::is_nothrow_destructible<DownloadStream>{}, "Should not throw during destruction");
static_assert(!std::is_copy_constructible<DownloadStream>{}, "Should not have copy constructor");
static_assert(!std::is_copy_assignable<DownloadStream>{}, "Should not permit copy assignment");
static_assert(!std::is_move_constructible<DownloadStream>{}, "Should not be move-constructible");
static_assert(!std::is_move_assignable<DownloadStream>{}, "Should not support move assignment");
}

namespace {
// Return the parent directory of @path, or an empty string if not possible.
Aws::String ParentPath(const Aws::String &path) {
const size_t n = path.find_last_of(Aws::FileSystem::PATH_DELIM);
return n == Aws::String::npos ? "" : path.substr(0, n);
}

// Stolen from endpoint/BuiltInParameters.cpp.
bool StringEndsWith(const Aws::String& str, const Aws::String& suffix) {
if (suffix.size() > str.size())
return false;
return std::equal(suffix.rbegin(), suffix.rend(), str.rbegin());
}
} // namespace

// Test fixture to help set up / tear down DownloadStream test cases.
class DownloadStreamtest : public ::testing::Test {
public:
static void SetUpTestCase() {
Aws::FileSystem::DeepDeleteDirectory(GetTestFilesDirectory().c_str());
}
static void TearDownTestCase() {
Aws::FileSystem::DeepDeleteDirectory(GetTestFilesDirectory().c_str());
}

DownloadStreamtest() : dst_{GetTestFilesDirectory() + "/test.file"} {}
~DownloadStreamtest() { (void)UnlinkTestFile(); }

// Open up a file descriptor to @dst_, creating any missing directory components.
int TestFile() {
const Aws::String parent_path = ParentPath(dst_);
Aws::FileSystem::CreateDirectoryIfNotExists(parent_path.c_str(), true);
return ::open(dst_.c_str(), O_WRONLY | O_CREAT | O_TRUNC, 0600);
}

// Remove @dst_ from the filesystem.
int UnlinkTestFile() { return ::unlink(dst_.c_str()); }

// Return the contents of @dst_.
Aws::String TestFileContents() {
Aws::StringStream ss;
std::ifstream is{dst_};

is >> std::noskipws >> ss.rdbuf();
return ss.str();
}

static Aws::String GetTestFilesDirectory() {
Aws::String directory;
#ifdef __ANDROID__
directory = Aws::FileSystem::Join(Aws::Platform::GetCacheDirectory(), "TransferCrtTests");
#else
directory = "TransferCrtTests";
#endif // __ANDROID__

Aws::FileSystem::CreateDirectoryIfNotExists(directory.c_str());
return directory;
}

protected:
Aws::String dst_;
Aws::String test_data_{"the quick brown fox jumps over the lazy lackadaisical lapdog"};
};

TEST_F(DownloadStreamtest, FdEnsureCallbackAndExceptionWork) {
Aws::String errMsg;
FileDescriptorBuf fdb(-1, [&errMsg](Aws::String e) { errMsg = std::move(e); });
// ACTION
ASSERT_EQ(fdb.sputn(test_data_.c_str(), test_data_.size()), EOF);
// VERIFICATION
EXPECT_EQ(errMsg, "write error: Bad file descriptor");
}

TEST_F(DownloadStreamtest, FdSupportedMethods) {
// Test assumptions as to which methods are supported. Mostly VERIFICATION in this test.
Aws::String errMsg;
int fd = TestFile();
FileDescriptorBuf fdb(fd, [&errMsg](Aws::String e) { errMsg = std::move(e); });
char alternateBuf[128] = {0};

ASSERT_GT(fd, 2);
ASSERT_EQ(fdb.pubsync(), 0);
ASSERT_EQ(fdb.sputc('a'), 'a');
ASSERT_EQ(fdb.sputn(test_data_.c_str(), test_data_.size()), (std::streamsize)test_data_.size());
ASSERT_EQ(errMsg, "");

// Show that pubsetbuf has no effect on this class.
ASSERT_EQ(::lseek(fd, 0, SEEK_SET), 0);
ASSERT_EQ(::ftruncate(fd, 0), 0);

ASSERT_EQ(fdb.pubsetbuf(alternateBuf, sizeof(alternateBuf)), &fdb);
ASSERT_EQ(fdb.sputn(test_data_.c_str(), test_data_.size()), (std::streamsize)test_data_.size());
ASSERT_EQ(::close(fd), 0);
ASSERT_STREQ(alternateBuf, ""); // Nothing got transferred.
ASSERT_EQ(TestFileContents(), test_data_); // Wrote to fd, as intended.

ASSERT_EQ(errMsg, "");
}

TEST_F(DownloadStreamtest, FdUnsupportedMethods) {
// Document which methods are not supported. Mostly VERIFICATION in this test.
Aws::String errMsg;
int fd = TestFile();
FileDescriptorBuf fdb(fd, [&errMsg](Aws::String e) { errMsg = std::move(e); });
char buf[3] = {0};

ASSERT_GT(fd, 2);

ASSERT_EQ(fdb.pubseekoff(0, std::ios_base::beg), -1);
ASSERT_EQ(fdb.pubseekoff(0, std::ios_base::cur), -1);
ASSERT_EQ(fdb.pubseekoff(0, std::ios_base::end), -1);

ASSERT_EQ(fdb.sputn(test_data_.c_str(), test_data_.size()), (std::streamsize)test_data_.size());
ASSERT_EQ(fdb.pubseekpos(0), -1);
ASSERT_EQ(fdb.pubseekpos(test_data_.size()), -1);

ASSERT_EQ(fdb.in_avail(), 0);
ASSERT_EQ(fdb.snextc(), -1);
ASSERT_EQ(fdb.sbumpc(), -1);
ASSERT_EQ(fdb.sgetc(), -1);
ASSERT_EQ(fdb.sgetn(buf, sizeof(buf)), 0);
ASSERT_EQ(fdb.sputbackc('a'), -1);
ASSERT_EQ(fdb.sungetc(), -1);

ASSERT_EQ(errMsg, "");
}

TEST_F(DownloadStreamtest, DownloadStreamHappyPath) {
// Document expected use cases in ACTION/VERIFICATION blocks.
Aws::String errMsg;
DownloadStream d{dst_, [&errMsg](Aws::String e) { errMsg = std::move(e); }};

d << test_data_;
ASSERT_TRUE(d.good());
ASSERT_FALSE(d.eof());

d.close();
ASSERT_FALSE(d.bad());
ASSERT_FALSE(d.fail());
ASSERT_TRUE(d.eof());

ASSERT_EQ(TestFileContents(), test_data_);
ASSERT_EQ(errMsg, "");
}

TEST_F(DownloadStreamtest, SupportedMethodsShouldSucceed) {
// Document expected use-cases for supported methods in ACTION/VERIFICATION blocks.
Aws::String errMsg;
DownloadStream d{dst_, [&errMsg](Aws::String e) { errMsg = std::move(e); }};

// These characters use the internal buffer (testing flush), for strings xsputn is called.
for (const char &c : test_data_) {
d.put(c);
}
ASSERT_TRUE(d.good());

d << test_data_;
ASSERT_TRUE(d.good());

d.write(test_data_.c_str(), test_data_.size());
ASSERT_TRUE(d.good());

d.close();
ASSERT_FALSE(d.fail());

// Call flush() after the file has been closed; to require that close() flushed the buffer.
d.flush();
ASSERT_FALSE(d.fail());
ASSERT_TRUE(d.eof());

ASSERT_EQ(TestFileContents(), test_data_ + test_data_ + test_data_);
ASSERT_EQ(errMsg, "");
}

TEST_F(DownloadStreamtest, DownloadStreamUnsupportedMethods) {
// Document unsupported methods via VERIFICATION (ASSERT) statements.
Aws::String errMsg;
DownloadStream d{dst_, [&errMsg](Aws::String e) { errMsg = std::move(e); }};

d << test_data_;
ASSERT_TRUE(d.good());

ASSERT_EQ(d.get(), -1);
ASSERT_TRUE(d.fail() && !d.bad() && d.eof());

d.clear();
ASSERT_EQ(d.peek(), -1);
ASSERT_FALSE(d.good());
ASSERT_TRUE(d.eof());

d.clear();
ASSERT_EQ(d.tellg(), -1);
ASSERT_TRUE(d.good());

d.seekg(0);
ASSERT_TRUE(d.fail() && !d.bad() && !d.eof());

d.clear();
ASSERT_EQ(d.sync(), 0);
ASSERT_TRUE(d.good());
ASSERT_TRUE(d.flush().good());

// Ensure the data is written out despite the failures in between:
d.close();
ASSERT_EQ(TestFileContents(), test_data_);

// It is now too late to flush any data:
ASSERT_TRUE(d.eof());
ASSERT_EQ(d.sync(), -1);
ASSERT_TRUE(d.fail() && !d.bad() && d.eof());

// Flush proceeds without error, since the put buffer was empty:
d.clear();
ASSERT_TRUE(d.flush().good());

ASSERT_EQ(errMsg, "");
ASSERT_EQ(UnlinkTestFile(), 0);

// Try flush again with non-empty put buffer:
DownloadStream d2{dst_, [&errMsg](Aws::String e) { errMsg = std::move(e); }};
for (const char &c : test_data_) {
d2.put(c);
}
ASSERT_TRUE(d2.good());

d2.close();
ASSERT_TRUE(!d2.fail() && !d2.bad() && d2.eof());

// The following is only false at eof since we empty the put-buffer in close():
ASSERT_FALSE(d2.flush().bad());

ASSERT_EQ(TestFileContents(), test_data_);
ASSERT_EQ(errMsg, "");
}

TEST_F(DownloadStreamtest, EnsureOutputDoesNotExistIfStreamIsCorrupted) {
// When a stream is corrupted (badbit set), ensure that no output file is generated.
Aws::String errMsg;
DownloadStream d{dst_, [&errMsg](Aws::String e) { errMsg = std::move(e); }};

d << test_data_;
ASSERT_TRUE(d.good());
ASSERT_FALSE(d.eof());

// ACTION
d.setstate(std::ios::badbit);
d.close();

// VERIFICATION
ASSERT_EQ(d.rdstate(), std::ios::badbit | std::ios::eofbit | std::ios::failbit);
ASSERT_EQ(errMsg, "Stream is corrupt on close");
ASSERT_EQ(::access(dst_.c_str(), F_OK), -1);
}

TEST_F(DownloadStreamtest, PermissionsError) {
Aws::String errMsg;
DownloadStream d{dst_, [&errMsg](Aws::String e) { errMsg = std::move(e); }};

// ACTION
// Change the directory permissions so that renaming the file will fail:
ASSERT_EQ(::chmod(ParentPath(dst_).c_str(), 0), 0);

d << test_data_;
ASSERT_TRUE(d.good());

d.close();
ASSERT_TRUE(d.fail() && d.bad() && d.eof());

// VERIFICATION
EXPECT_TRUE(StringEndsWith(errMsg, "Permission denied"));

// CLEAN-UP (need to restore directory permissions to enable deletion).
ASSERT_EQ(::chmod(ParentPath(dst_).c_str(), 755), 0);
}

} // namespace
} // namespace TransferCrt
} // namespace Aws
29 changes: 29 additions & 0 deletions tests/aws-cpp-sdk-transfer-crt-tests/RunTests.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/**
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0.
*/

#include <gtest/gtest.h>
#include <aws/core/Aws.h>
#include <aws/testing/platform/PlatformTesting.h>
#include <aws/testing/TestingEnvironment.h>
#include <aws/testing/MemoryTesting.h>

int main(int argc, char** argv)
{
Aws::SDKOptions options;
options.loggingOptions.logLevel = Aws::Utils::Logging::LogLevel::Trace;

AWS_BEGIN_MEMORY_TEST_EX(options, 1024, 128);
Aws::Testing::InitPlatformTest(options);
Aws::Testing::ParseArgs(argc, argv);

Aws::InitAPI(options);
::testing::InitGoogleTest(&argc, argv);
int exitCode = RUN_ALL_TESTS();
Aws::ShutdownAPI(options);

AWS_END_MEMORY_TEST_EX;
Aws::Testing::ShutdownPlatformTest(options);
return exitCode;
}
295 changes: 295 additions & 0 deletions tests/aws-cpp-sdk-transfer-crt-tests/TransferHandleTests.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,295 @@
/**
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0.
*/

#include <aws/core/utils/memory/stl/AWSStringStream.h>
#include <aws/transfer-crt/TransferManager.h>
#include <fstream>
#include <iostream>

#include <gtest/gtest.h>

namespace Aws {
namespace TransferCrt {
namespace {
/*
* TransferHandle Tests.
*/
TEST(TransferHandleTest, DefaultValues) {
TransferHandle th{"s3://some.bucket/some.path", {}};

ASSERT_TRUE(th.ShouldContinue());
ASSERT_FALSE(th.BytesTotalSizeHasBeenSet());
ASSERT_EQ(th.GetBytesTransferred(), 0u);
ASSERT_EQ(th.GetStatus(), TransferStatus::NOT_STARTED);
ASSERT_EQ(th.GetLastError().GetErrorType(), S3Crt::S3CrtErrors::UNKNOWN);
ASSERT_EQ(th.GetLastError().GetMessage(), "");
ASSERT_EQ(th.GetLastError().GetExceptionName(), "");
ASSERT_TRUE(th.GetLastError().ShouldRetry());
}

TEST(TransferHandleTest, ReadMetadata) {
TransferHandle th{"s3://some.bucket/some.path", {}};

th.SetBytesTotalSize(42);

// VERIFICATION
EXPECT_EQ(th.GetReadMetadata().size, 42u);

// Validate default values:
EXPECT_NE(th.GetReadMetadata().content_encoding, "gzip");
EXPECT_EQ(th.GetReadMetadata().content_type, "");
EXPECT_EQ(th.GetReadMetadata().metadata.size(), 0u);
}

TEST(TransferHandleTest, StateMachine) {
TransferHandle th{"s3://some.bucket/some.path", {}};

// Initial state.
ASSERT_EQ(th.GetStatus(), TransferStatus::NOT_STARTED);

// NOT_STARTED => IN_PROGRESS
th.UpdateStatus(TransferStatus::IN_PROGRESS);
ASSERT_EQ(th.GetStatus(), TransferStatus::IN_PROGRESS);

// IN_PROGRESS => NOT_STARTED
th.UpdateStatus(TransferStatus::NOT_STARTED);
ASSERT_EQ(th.GetStatus(), TransferStatus::NOT_STARTED);

// NOT_STARTED => COMPLETED
th.UpdateStatus(TransferStatus::COMPLETED);
ASSERT_EQ(th.GetStatus(), TransferStatus::COMPLETED);
}

TEST(TransferHandleTest, FailingATransfer) {
// Mimic a TransferHandle on which FailWithError is called shortly after construction.
TransferHandle th{"s3://some.bucket/some.path", {}};

// PRECONDITION
ASSERT_TRUE(th.ShouldContinue());

// ACTION
th.UpdateStatus(TransferStatus::FAILED);
th.SetError({S3Crt::S3CrtErrors::UNKNOWN, "FATAL ERROR", "Something went wrong", false});

// VERIFICATION
EXPECT_FALSE(th.ShouldContinue());
EXPECT_EQ(th.GetStatus(), TransferStatus::FAILED);
EXPECT_EQ(th.GetLastError().GetErrorType(), S3Crt::S3CrtErrors::UNKNOWN);
EXPECT_EQ(th.GetLastError().GetMessage(), "Something went wrong");
EXPECT_EQ(th.GetLastError().GetExceptionName(), "FATAL ERROR");
}

TEST(TransferHandleTest, UpdateStatusIsIdempotent) {
// NOT_STARTED is the initial state. Can not set it again.
{
TransferHandle th{"s3://some.bucket/some.path", {}};

ASSERT_EQ(th.GetStatus(), TransferStatus::NOT_STARTED);
th.UpdateStatus(TransferStatus::NOT_STARTED);
EXPECT_EQ(th.GetStatus(), TransferStatus::NOT_STARTED);
}

// Can update to IN_PROGRESS at most once.
{
TransferHandle th{"s3://some.bucket/some.path", {}};
th.UpdateStatus(TransferStatus::NOT_STARTED);
th.UpdateStatus(TransferStatus::IN_PROGRESS);
th.UpdateStatus(TransferStatus::IN_PROGRESS);
EXPECT_EQ(th.GetStatus(), TransferStatus::IN_PROGRESS);
}

// May transition from IN_PROGRESS back to NOT_STARTED (not currently used by the code).
{
TransferHandle th{"s3://some.bucket/some.path", {}};

ASSERT_EQ(th.GetStatus(), TransferStatus::NOT_STARTED);
th.UpdateStatus(TransferStatus::IN_PROGRESS);
EXPECT_EQ(th.GetStatus(), TransferStatus::IN_PROGRESS);

th.UpdateStatus(TransferStatus::NOT_STARTED);
th.UpdateStatus(TransferStatus::NOT_STARTED);
EXPECT_EQ(th.GetStatus(), TransferStatus::NOT_STARTED);

th.UpdateStatus(TransferStatus::IN_PROGRESS);
th.UpdateStatus(TransferStatus::IN_PROGRESS);
EXPECT_EQ(th.GetStatus(), TransferStatus::IN_PROGRESS);
}

// Once a final state is reached, no more state transitions are possible.
{
for (auto &&finalState : {
TransferStatus::CANCELED,
TransferStatus::FAILED,
TransferStatus::COMPLETED,
}) {
TransferHandle th{"s3://some.bucket/some.path", {}};

th.UpdateStatus(finalState);

for (auto &&testState : {
TransferStatus::NOT_STARTED,
TransferStatus::IN_PROGRESS,
TransferStatus::FAILING,
TransferStatus::CANCELED,
TransferStatus::FAILED,
TransferStatus::COMPLETED,
}) {
th.UpdateStatus(testState);
EXPECT_EQ(th.GetStatus(), finalState);
}
}
}
}

TEST(TransferHandleTest, Upload) {
// Validate handling of upload parameters.
WriteMetadata wmd{"s3://some.bucket/some.path"};
TransferHandle th{wmd, {}};

// Need to initialize the ReadMetadata (size must be set in order to call GetReadMetadata()).
th.SetBytesTotalSize(42);

// VERIFICATION
EXPECT_EQ(th.GetBucket(), "some.bucket");
EXPECT_EQ(th.GetKey(), "some.path");
EXPECT_EQ(th.GetReadMetadata().content_type, "");
EXPECT_EQ(th.GetReadMetadata().content_encoding, "");
}

TEST(TransferHandleTest, UploadMetadata) {
// Validate encoding of non-ASCII metadata.
WriteMetadata wmd{"s3://some.bucket/some.path"};
wmd.metadata = {{"src", "Âûröræ"}, {"dst", "ÄMÄZÕÑ S3"}, {"purpose", "upload"}};
TransferHandle th{wmd, {}};

// Need to initialize the ReadMetadata (see above).
th.SetBytesTotalSize(0);

// VERIFICATION
ASSERT_EQ(th.GetReadMetadata().metadata.size(), wmd.metadata.size());
EXPECT_EQ(th.GetReadMetadata().metadata, wmd.metadata);
}

TEST(TransferHandleTest, UploadGzipped) {
// Ensure compressed (gzip) upload is handled as expected.
WriteMetadata wmd{"s3://", "text/plain", "gzip"};
TransferHandle th{wmd, {}};

th.SetBytesTotalSize(0); // Needed to initialize ReadMetadata.

ASSERT_EQ(th.GetReadMetadata().content_type, "text/plain");
ASSERT_EQ(th.GetReadMetadata().content_encoding, "gzip");
}

/*
* Parameterized test to check combinations of (terminal) TransferStatus states.
*/
class DownloadHandleFixture : public ::testing::TestWithParam<TransferStatus> {
protected:
TransferHandle th{"s3://some.bucket/some.path", {}};
};

TEST_P(DownloadHandleFixture, TerminalStatesMustNotBeChanged) {
const TransferStatus cs = GetParam();

th.UpdateStatus(cs);
ASSERT_EQ(th.GetStatus(), cs);

for (const auto &ts : {TransferStatus::NOT_STARTED,
TransferStatus::IN_PROGRESS,
TransferStatus::CANCELED,
TransferStatus::FAILED,
TransferStatus::COMPLETED}) {
th.UpdateStatus(ts);
ASSERT_EQ(th.GetStatus(), cs);
if (ts != cs) {
ASSERT_NE(th.GetStatus(), ts);
}
}
}

// Test the combinations of terminal states.
INSTANTIATE_TEST_SUITE_P(TerminalStatesTests,
DownloadHandleFixture,
::testing::Values(TransferStatus::CANCELED,
TransferStatus::FAILED,
TransferStatus::COMPLETED));

/*
* Limited TransferManager tests (full tests require network).
*/
class TransferManagerTest : public ::testing::Test {};

TEST_F(TransferManagerTest, AsyncCallerContext) {
auto ctx = std::make_shared<const Aws::Client::AsyncCallerContext>();
// Pre-condition
EXPECT_EQ(ctx.use_count(), 1);
{
// Context is passed as const std:shared_ptr<>&, but copy-constructed internally.
// Verify that the use-count behaves as expected.
TransferHandle th{"s3://some.bucket/some.path", ctx};
EXPECT_EQ(ctx.use_count(), 2);
EXPECT_EQ(th.GetContext().use_count(), 3);
EXPECT_EQ(ctx.use_count(), 2);
}
EXPECT_EQ(ctx.use_count(), 1);
}

TEST_F(TransferManagerTest, DownloadPaths) {
// Ensure that an initialization failure is caught and handled.
auto mgr = TransferManager::Create(nullptr, {}, {}, {});

// Attempt to create a file below an invalid path.
{
auto th = mgr->DownloadFile("s3://", "/a.path **that does not exist**!");

EXPECT_EQ(th->GetStatus(), TransferStatus::FAILED);
EXPECT_FALSE(th->ShouldContinue());
EXPECT_EQ(th->GetLastError().GetExceptionName(), "DownloadStream Failure");
}
}

TEST_F(TransferManagerTest, UploadPaths) {
WriteMetadata s3_uri{"s3://some.bucket/some.path"};

// Open a non-existing file.
{
auto mgr = TransferManager::Create(nullptr, {}, {}, {});
auto th = mgr->UploadFile("/.no::such^path!", {}, s3_uri);

EXPECT_EQ(th->GetStatus(), TransferStatus::FAILED);
EXPECT_FALSE(th->ShouldContinue());
EXPECT_EQ(th->GetLastError().GetExceptionName(), "FATAL ERROR");
}

// Pass a bad (closed) input stream.
{
auto mgr = TransferManager::Create(nullptr, {}, {}, {});
auto is = std::make_shared<std::fstream>("/bin/ls");
is->close();

auto th = mgr->UploadFile("", is, s3_uri);
EXPECT_EQ(th->GetStatus(), TransferStatus::FAILED);
EXPECT_FALSE(th->ShouldContinue());
EXPECT_EQ(th->GetLastError().GetMessage(), "Failed to open stream '': Permission denied");
EXPECT_EQ(th->GetLastError().GetExceptionName(), "FATAL ERROR");
}

// Pass a non-seekable input stream.
{
auto mgr = TransferManager::Create(nullptr, {}, {}, {});
auto is = std::make_shared<std::iostream>(std::cout.rdbuf());
auto th = mgr->UploadFile("", is, s3_uri);

EXPECT_EQ(th->GetStatus(), TransferStatus::FAILED);
EXPECT_FALSE(th->ShouldContinue());
EXPECT_EQ(th->GetLastError().GetMessage(), "Failed to determine size of '': Illegal seek");
EXPECT_EQ(th->GetLastError().GetExceptionName(), "FATAL ERROR");
}
}

} // namespace
} // namespace TransferCrt
} // namespace Aws