Skip to content

Commit

Permalink
Chore: Add PyAirbyte performance profiling option without caching (#323)
Browse files Browse the repository at this point in the history
  • Loading branch information
aaronsteers authored Jul 31, 2024
1 parent d7a42d3 commit 821c699
Showing 1 changed file with 32 additions and 9 deletions.
41 changes: 32 additions & 9 deletions examples/run_perf_test_reads.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,21 @@
poetry run python ./examples/run_perf_test_reads.py -e=5 --destination=e2e
```
Testing raw PyAirbyte throughput with and without caching:
```bash
# Test raw PyAirbyte throughput with caching (Source->Cache):
poetry run python ./examples/run_perf_test_reads.py -e=5
# Test raw PyAirbyte throughput without caching (Source->Destination):
poetry run python ./examples/run_perf_test_reads.py -e=5 --destination=e2e --no-cache
```
Note:
- The Faker stream ('purchases') is assumed to be 220 bytes, meaning 4_500 records is
approximately 1 MB. Based on this: 25K records/second is approximately 5.5 MB/s.
- The E2E stream is assumed to be 180 bytes, meaning 5_500 records is
approximately 1 MB. Based on this: 40K records/second is approximately 7.2 MB/s
and 61K records/second is approximately 11 MB/s.
and 60K records/second is approximately 10.5 MB/s.
"""

from __future__ import annotations
Expand All @@ -58,6 +66,7 @@
import airbyte as ab
from airbyte.caches import BigQueryCache, CacheBase, SnowflakeCache
from airbyte.secrets.google_gsm import GoogleGSMSecretManager
from typing_extensions import Literal

if TYPE_CHECKING:
from airbyte.sources.base import Source
Expand All @@ -78,7 +87,12 @@ def get_gsm_secret_json(secret_name: str) -> dict:
return secret.parse_json()


def get_cache(cache_type: str) -> CacheBase:
def get_cache(
cache_type: Literal["duckdb", "snowflake", "bigquery", False],
) -> CacheBase | Literal[False]:
if cache_type is False:
return False

if cache_type == "duckdb":
return ab.new_local_cache()

Expand Down Expand Up @@ -168,25 +182,29 @@ def get_destination(destination_type: str) -> ab.Destination:
def main(
e: int | None = None,
n: int | None = None,
cache_type: str = "duckdb",
cache_type: Literal["duckdb", "bigquery", "snowflake", False] = "duckdb",
source_alias: str = "e2e",
destination_type: str | None = None,
) -> None:
num_records: int = n or 5 * (10 ** (e or 3))
cache_type = cache_type or "duckdb"
cache_type = "duckdb" if cache_type is None else cache_type

cache: CacheBase = get_cache(
cache: CacheBase | Literal[False] = get_cache(
cache_type=cache_type,
)
source: Source = get_source(
source_alias=source_alias,
num_records=num_records,
)
source.check()
read_result = source.read(cache)
if destination_type:
destination = get_destination(destination_type=destination_type)
destination.write(read_result)
if cache is not False:
read_result = source.read(cache)
if destination_type:
destination.write(read_result)
else:
destination.write(source, cache=False)


if __name__ == "__main__":
Expand Down Expand Up @@ -216,6 +234,11 @@ def main(
choices=["duckdb", "snowflake", "bigquery"],
default="duckdb",
)
parser.add_argument(
"--no-cache",
action="store_true",
help="Disable caching.",
)
parser.add_argument(
"--source",
type=str,
Expand All @@ -238,7 +261,7 @@ def main(
main(
e=args.e,
n=args.n,
cache_type=args.cache,
cache_type=args.cache if not args.no_cache else False,
source_alias=args.source,
destination_type=args.destination,
)

0 comments on commit 821c699

Please sign in to comment.