Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 92 additions & 8 deletions test/copy-to-s3/mzcompose.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,15 @@

import csv
import json
import os
import random
import string
import time
from io import BytesIO, StringIO
from textwrap import dedent

import pyarrow.parquet #
import boto3
import pyarrow.parquet
from minio import Minio

from materialize.mzcompose.composition import (
Expand Down Expand Up @@ -178,15 +180,103 @@ def workflow_nightly(c: Composition, parser: WorkflowArgumentParser) -> None:
del actual_parquet


def workflow_gcs(c: Composition) -> None:
"""
Test COPY TO S3 with GCS using HMAC authentication. This test requires access to GCS.
"""
# We can't use the current GCP_SERVICE_ACCOUNT_JSON as that is a credentials file used by GCP
# SDK. See <https://cloud.google.com/docs/authentication/application-default-credentials>
#
# We need HMAC keys, which contain access and secret keys (these don't exist in CI today)
# see <https://cloud.google.com/storage/docs/authentication/hmackeys>

def env_get_or_fail(key: str) -> str:
value = os.environ.get(key)
assert value is not None, f"{key} environment variable must be set"
return value

gcs_bucket = env_get_or_fail("GCS_BUCKET")
gcs_region = env_get_or_fail("GCS_REGION")
gcs_access_key = env_get_or_fail("GCS_ACCESS_KEY")
gcs_secret_key = env_get_or_fail("GCS_SECRET_KEY")
gcs_endpoint = os.environ.get("GCS_ENDPOINT", "https://storage.googleapis.com")

key_prefix = f"copy_to/{make_random_key(10)}/"

s3 = boto3.client(
"s3",
endpoint_url=gcs_endpoint,
region_name=gcs_region,
aws_access_key_id=gcs_access_key,
aws_secret_access_key=gcs_secret_key,
)
contents = []
try:
with c.override(Testdrive(no_reset=True)):
c.up("materialized")
c.testdrive(
dedent(
f"""
> CREATE SECRET gcs_secret AS '{gcs_secret_key}';
> CREATE CONNECTION gcs_conn
TO AWS (
ACCESS KEY ID = '{gcs_access_key}',
SECRET ACCESS KEY = SECRET gcs_secret,
ENDPOINT = '{gcs_endpoint}',
REGION = '{gcs_region}'
);
> CREATE TABLE t (a int, b text, c float);
> INSERT INTO t VALUES (1, 'a', 1.1), (2, 'b', 2.2);

> COPY (SELECT * FROM t)
TO 's3://{gcs_bucket}/{key_prefix}'
WITH (
AWS CONNECTION = gcs_conn, FORMAT = 'csv', HEADER
);
"""
)
)
# list should contain a single file (we won't know the name ahead of time, just the prefix).
listing = s3.list_objects_v2(Bucket=gcs_bucket, Prefix=key_prefix)

contents = listing["Contents"]
assert len(contents) == 1, f"expected 1 file, got {contents}"

key = contents[0]["Key"]
assert key.endswith(".csv"), f"expected .csv suffix, got {key}"

object_response = s3.get_object(Bucket=gcs_bucket, Key=key)
body = object_response["Body"].read().decode("utf-8")
csv_reader = csv.DictReader(StringIO(body))
actual = [row for row in csv_reader]

# all fields are read in as strings
expected = [{"a": "1", "b": "a", "c": "1.1"}, {"a": "2", "b": "b", "c": "2.2"}]
assert actual == expected, f"actual: {actual} != expected: {expected}"

finally:
# This is a best effort cleanup, as the process may exit before reaching this point.
for obj in contents:
key = obj["Key"]
s3.delete_object(Bucket=gcs_bucket, Key=key)


def workflow_ci(c: Composition, _parser: WorkflowArgumentParser) -> None:
"""
Workflows to run during CI
"""
for name in ["auth", "http"]:
for name in ["auth", "http", "gcs"]:
with c.test_case(name):
c.workflow(name)


def make_random_key(n: int):
return "".join(
random.SystemRandom().choice(string.ascii_uppercase + string.digits)
for _ in range(n)
)


def workflow_auth(c: Composition) -> None:
c.up(Service("mc", idle=True), "materialized", "minio")

Expand All @@ -195,12 +285,6 @@ def workflow_auth(c: Composition) -> None:
# User 'nodelete': PutObject, ListBucket
# User 'read': GetObject, ListBucket

def make_random_key(n: int):
return "".join(
random.SystemRandom().choice(string.ascii_uppercase + string.digits)
for _ in range(n)
)

def make_user(username: str, actions: list[str]):
return (
username,
Expand Down