diff --git a/examples/run_perf_test_reads.py b/examples/run_perf_test_reads.py index e5dee85f..87c7f450 100644 --- a/examples/run_perf_test_reads.py +++ b/examples/run_perf_test_reads.py @@ -40,13 +40,21 @@ poetry run python ./examples/run_perf_test_reads.py -e=5 --destination=e2e ``` +Testing raw PyAirbyte throughput with and without caching: + +```bash +# Test raw PyAirbyte throughput with caching (Source->Cache): +poetry run python ./examples/run_perf_test_reads.py -e=5 +# Test raw PyAirbyte throughput without caching (Source->Destination): +poetry run python ./examples/run_perf_test_reads.py -e=5 --destination=e2e --no-cache +``` + Note: - The Faker stream ('purchases') is assumed to be 220 bytes, meaning 4_500 records is approximately 1 MB. Based on this: 25K records/second is approximately 5.5 MB/s. - The E2E stream is assumed to be 180 bytes, meaning 5_500 records is approximately 1 MB. Based on this: 40K records/second is approximately 7.2 MB/s - and 61K records/second is approximately 11 MB/s. - + and 60K records/second is approximately 10.5 MB/s. """ from __future__ import annotations @@ -58,6 +66,7 @@ import airbyte as ab from airbyte.caches import BigQueryCache, CacheBase, SnowflakeCache from airbyte.secrets.google_gsm import GoogleGSMSecretManager +from typing_extensions import Literal if TYPE_CHECKING: from airbyte.sources.base import Source @@ -78,7 +87,12 @@ def get_gsm_secret_json(secret_name: str) -> dict: return secret.parse_json() -def get_cache(cache_type: str) -> CacheBase: +def get_cache( + cache_type: Literal["duckdb", "snowflake", "bigquery", False], +) -> CacheBase | Literal[False]: + if cache_type is False: + return False + if cache_type == "duckdb": return ab.new_local_cache() @@ -168,14 +182,14 @@ def get_destination(destination_type: str) -> ab.Destination: def main( e: int | None = None, n: int | None = None, - cache_type: str = "duckdb", + cache_type: Literal["duckdb", "bigquery", "snowflake", False] = "duckdb", source_alias: str = "e2e", destination_type: str | None = None, ) -> None: num_records: int = n or 5 * (10 ** (e or 3)) - cache_type = cache_type or "duckdb" + cache_type = "duckdb" if cache_type is None else cache_type - cache: CacheBase = get_cache( + cache: CacheBase | Literal[False] = get_cache( cache_type=cache_type, ) source: Source = get_source( @@ -183,10 +197,14 @@ def main( num_records=num_records, ) source.check() - read_result = source.read(cache) if destination_type: destination = get_destination(destination_type=destination_type) - destination.write(read_result) + if cache is not False: + read_result = source.read(cache) + if destination_type: + destination.write(read_result) + else: + destination.write(source, cache=False) if __name__ == "__main__": @@ -216,6 +234,11 @@ def main( choices=["duckdb", "snowflake", "bigquery"], default="duckdb", ) + parser.add_argument( + "--no-cache", + action="store_true", + help="Disable caching.", + ) parser.add_argument( "--source", type=str, @@ -238,7 +261,7 @@ def main( main( e=args.e, n=args.n, - cache_type=args.cache, + cache_type=args.cache if not args.no_cache else False, source_alias=args.source, destination_type=args.destination, )