Skip to content

Commit

Permalink
Merge pull request #1006 from dlt-hub/devel
Browse files Browse the repository at this point in the history
0.4.5 release master merge
  • Loading branch information
rudolfix authored Feb 26, 2024
2 parents 2704ed2 + 5a46b09 commit d6c93fe
Show file tree
Hide file tree
Showing 117 changed files with 9,646 additions and 5,391 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/test_common.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ jobs:
os: "ubuntu-latest"
- python-version: "3.10.x"
os: "ubuntu-latest"
- python-version: "3.12.x"
os: "ubuntu-latest"

defaults:
run:
Expand Down
9 changes: 5 additions & 4 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ test-load-local:
DESTINATION__POSTGRES__CREDENTIALS=postgresql://loader:loader@localhost:5432/dlt_data DESTINATION__DUCKDB__CREDENTIALS=duckdb:///_storage/test_quack.duckdb poetry run pytest tests -k '(postgres or duckdb)'

test-common:
poetry run pytest tests/common tests/normalize tests/extract tests/pipeline tests/reflection tests/sources tests/cli/common
poetry run pytest tests/common tests/normalize tests/extract tests/pipeline tests/reflection tests/sources tests/cli/common tests/load/test_dummy_client.py tests/libs tests/destinations

reset-test-storage:
-rm -r _storage
Expand All @@ -89,9 +89,10 @@ publish-library: build-library
poetry publish

test-build-images: build-library
poetry export -f requirements.txt --output _gen_requirements.txt --without-hashes --extras gcp --extras redshift
grep `cat compiled_packages.txt` _gen_requirements.txt > compiled_requirements.txt
# TODO: enable when we can remove special duckdb setting for python 3.12
# poetry export -f requirements.txt --output _gen_requirements.txt --without-hashes --extras gcp --extras redshift
# grep `cat compiled_packages.txt` _gen_requirements.txt > compiled_requirements.txt
docker build -f deploy/dlt/Dockerfile.airflow --build-arg=COMMIT_SHA="$(shell git log -1 --pretty=%h)" --build-arg=IMAGE_VERSION="$(shell poetry version -s)" .
docker build -f deploy/dlt/Dockerfile --build-arg=COMMIT_SHA="$(shell git log -1 --pretty=%h)" --build-arg=IMAGE_VERSION="$(shell poetry version -s)" .
# docker build -f deploy/dlt/Dockerfile --build-arg=COMMIT_SHA="$(shell git log -1 --pretty=%h)" --build-arg=IMAGE_VERSION="$(shell poetry version -s)" .


42 changes: 40 additions & 2 deletions dlt/common/configuration/providers/google_secrets.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,38 @@
import base64
import string
import re
from typing import Tuple

from dlt.common import json
from dlt.common.configuration.specs import GcpServiceAccountCredentials
from dlt.common.exceptions import MissingDependencyException

from .toml import VaultTomlProvider
from .provider import get_key_name

# Create a translation table to replace punctuation with ""
# since google secrets allow "-"" and "_" we need to exclude them
punctuation = "".join(set(string.punctuation) - {"-", "_"})
translator = str.maketrans("", "", punctuation)


def normalize_key(in_string: str) -> str:
"""Replaces punctuation characters in a string
Note: We exclude `_` and `-` from punctuation characters
Args:
in_string(str): input string
Returns:
(str): a string without punctuatio characters and whitespaces
"""

# Strip punctuation from the string
stripped_text = in_string.translate(translator)
whitespace = re.compile(r"\s+")
stripped_whitespace = whitespace.sub("", stripped_text)
return stripped_whitespace


class GoogleSecretsProvider(VaultTomlProvider):
def __init__(
Expand All @@ -20,7 +46,19 @@ def __init__(

@staticmethod
def get_key_name(key: str, *sections: str) -> str:
return get_key_name(key, "-", *sections)
"""Make key name for the secret
Per Google the secret name can contain, so we will use snake_case normalizer
1. Uppercase and lowercase letters,
2. Numerals,
3. Hyphens,
4. Underscores.
"""
key = normalize_key(key)
normalized_sections = [normalize_key(section) for section in sections if section]
key_name = get_key_name(normalize_key(key), "-", *normalized_sections)
return key_name

@property
def name(self) -> str:
Expand Down
9 changes: 5 additions & 4 deletions dlt/common/data_types/type_helpers.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import binascii
import base64
import dataclasses
import datetime # noqa: I251
from collections.abc import Mapping as C_Mapping, Sequence as C_Sequence
from typing import Any, Type, Literal, Union, cast
Expand Down Expand Up @@ -55,7 +56,7 @@ def py_type_to_sc_type(t: Type[Any]) -> TDataType:
return "bigint"
if issubclass(t, bytes):
return "binary"
if issubclass(t, (C_Mapping, C_Sequence)):
if dataclasses.is_dataclass(t) or issubclass(t, (C_Mapping, C_Sequence)):
return "complex"
# Enum is coerced to str or int respectively
if issubclass(t, Enum):
Expand All @@ -81,13 +82,13 @@ def coerce_from_date_types(
if to_type == "text":
return v.isoformat()
if to_type == "bigint":
return v.int_timestamp # type: ignore
return v.int_timestamp
if to_type == "double":
return v.timestamp() # type: ignore
return v.timestamp()
if to_type == "date":
return ensure_pendulum_date(v)
if to_type == "time":
return v.time() # type: ignore[no-any-return]
return v.time()
raise TypeError(f"Cannot convert timestamp to {to_type}")


Expand Down
67 changes: 57 additions & 10 deletions dlt/common/destination/reference.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
Generic,
Final,
)
from contextlib import contextmanager
import datetime # noqa: 251
from copy import deepcopy
import inspect
Expand All @@ -32,18 +31,22 @@
UnknownDestinationModule,
)
from dlt.common.schema import Schema, TTableSchema, TSchemaTables
from dlt.common.schema.typing import TWriteDisposition
from dlt.common.schema.exceptions import InvalidDatasetName
from dlt.common.schema.utils import get_write_disposition, get_table_format
from dlt.common.configuration import configspec, with_config, resolve_configuration, known_sections
from dlt.common.schema.exceptions import SchemaException
from dlt.common.schema.utils import (
get_write_disposition,
get_table_format,
get_columns_names_with_prop,
has_column_with_prop,
get_first_column_name_with_prop,
)
from dlt.common.configuration import configspec, resolve_configuration, known_sections
from dlt.common.configuration.specs import BaseConfiguration, CredentialsConfiguration
from dlt.common.configuration.accessors import config
from dlt.common.destination.capabilities import DestinationCapabilitiesContext
from dlt.common.schema.utils import is_complete_column
from dlt.common.schema.exceptions import UnknownTableException
from dlt.common.storages import FileStorage
from dlt.common.storages.load_storage import ParsedLoadJobFileName
from dlt.common.utils import get_module_name
from dlt.common.configuration.specs import GcpCredentials, AwsCredentialsWithoutDefaults


Expand Down Expand Up @@ -252,7 +255,8 @@ def new_file_path(self) -> str:
class FollowupJob:
"""Adds a trait that allows to create a followup job"""

def create_followup_jobs(self, next_state: str) -> List[NewLoadJob]:
def create_followup_jobs(self, final_state: TLoadJobState) -> List[NewLoadJob]:
"""Return list of new jobs. `final_state` is state to which this job transits"""
return []


Expand Down Expand Up @@ -345,6 +349,49 @@ def _verify_schema(self) -> None:
table_name,
self.capabilities.max_identifier_length,
)
if has_column_with_prop(table, "hard_delete"):
if len(get_columns_names_with_prop(table, "hard_delete")) > 1:
raise SchemaException(
f'Found multiple "hard_delete" column hints for table "{table_name}" in'
f' schema "{self.schema.name}" while only one is allowed:'
f' {", ".join(get_columns_names_with_prop(table, "hard_delete"))}.'
)
if table.get("write_disposition") in ("replace", "append"):
logger.warning(
f"""The "hard_delete" column hint for column "{get_first_column_name_with_prop(table, 'hard_delete')}" """
f'in table "{table_name}" with write disposition'
f' "{table.get("write_disposition")}"'
f' in schema "{self.schema.name}" will be ignored.'
' The "hard_delete" column hint is only applied when using'
' the "merge" write disposition.'
)
if has_column_with_prop(table, "dedup_sort"):
if len(get_columns_names_with_prop(table, "dedup_sort")) > 1:
raise SchemaException(
f'Found multiple "dedup_sort" column hints for table "{table_name}" in'
f' schema "{self.schema.name}" while only one is allowed:'
f' {", ".join(get_columns_names_with_prop(table, "dedup_sort"))}.'
)
if table.get("write_disposition") in ("replace", "append"):
logger.warning(
f"""The "dedup_sort" column hint for column "{get_first_column_name_with_prop(table, 'dedup_sort')}" """
f'in table "{table_name}" with write disposition'
f' "{table.get("write_disposition")}"'
f' in schema "{self.schema.name}" will be ignored.'
' The "dedup_sort" column hint is only applied when using'
' the "merge" write disposition.'
)
if table.get("write_disposition") == "merge" and not has_column_with_prop(
table, "primary_key"
):
logger.warning(
f"""The "dedup_sort" column hint for column "{get_first_column_name_with_prop(table, 'dedup_sort')}" """
f'in table "{table_name}" with write disposition'
f' "{table.get("write_disposition")}"'
f' in schema "{self.schema.name}" will be ignored.'
' The "dedup_sort" column hint is only applied when a'
" primary key has been specified."
)
for column_name, column in dict(table["columns"]).items():
if len(column_name) > self.capabilities.max_column_identifier_length:
raise IdentifierTooLongException(
Expand All @@ -361,9 +408,9 @@ def _verify_schema(self) -> None:
" column manually in code ie. as a merge key?"
)

def get_load_table(self, table_name: str, prepare_for_staging: bool = False) -> TTableSchema:
if table_name not in self.schema.tables:
return None
def prepare_load_table(
self, table_name: str, prepare_for_staging: bool = False
) -> TTableSchema:
try:
# make a copy of the schema so modifications do not affect the original document
table = deepcopy(self.schema.tables[table_name])
Expand Down
36 changes: 24 additions & 12 deletions dlt/common/json/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,14 +80,18 @@ def custom_encode(obj: Any) -> str:


# use PUA range to encode additional types
_DECIMAL = "\uf026"
_DATETIME = "\uf027"
_DATE = "\uf028"
_UUIDT = "\uf029"
_HEXBYTES = "\uf02a"
_B64BYTES = "\uf02b"
_WEI = "\uf02c"
_TIME = "\uf02d"
PUA_START = int(os.environ.get("DLT_JSON_TYPED_PUA_START", "0xf026"), 16)

_DECIMAL = chr(PUA_START)
_DATETIME = chr(PUA_START + 1)
_DATE = chr(PUA_START + 2)
_UUIDT = chr(PUA_START + 3)
_HEXBYTES = chr(PUA_START + 4)
_B64BYTES = chr(PUA_START + 5)
_WEI = chr(PUA_START + 6)
_TIME = chr(PUA_START + 7)

PUA_START_UTF8_MAGIC = _DECIMAL.encode("utf-8")[:2]


def _datetime_decoder(obj: str) -> datetime:
Expand Down Expand Up @@ -148,10 +152,17 @@ def custom_pua_encode(obj: Any) -> str:

def custom_pua_decode(obj: Any) -> Any:
if isinstance(obj, str) and len(obj) > 1:
c = ord(obj[0]) - 0xF026
c = ord(obj[0]) - PUA_START
# decode only the PUA space defined in DECODERS
if c >= 0 and c <= PUA_CHARACTER_MAX:
return DECODERS[c](obj[1:])
try:
return DECODERS[c](obj[1:])
except Exception:
# return strings that cannot be parsed
# this may be due
# (1) someone exposing strings with PUA characters to external systems (ie. via API)
# (2) using custom types ie. DateTime that does not create correct iso strings
return obj
return obj


Expand All @@ -166,7 +177,7 @@ def custom_pua_decode_nested(obj: Any) -> Any:
def custom_pua_remove(obj: Any) -> Any:
"""Removes the PUA data type marker and leaves the correctly serialized type representation. Unmarked values are returned as-is."""
if isinstance(obj, str) and len(obj) > 1:
c = ord(obj[0]) - 0xF026
c = ord(obj[0]) - PUA_START
# decode only the PUA space defined in DECODERS
if c >= 0 and c <= PUA_CHARACTER_MAX:
return obj[1:]
Expand All @@ -175,7 +186,7 @@ def custom_pua_remove(obj: Any) -> Any:

def may_have_pua(line: bytes) -> bool:
"""Checks if bytes string contains pua marker"""
return b"\xef\x80" in line
return PUA_START_UTF8_MAGIC in line


# pick the right impl
Expand Down Expand Up @@ -203,4 +214,5 @@ def may_have_pua(line: bytes) -> bool:
"custom_pua_decode_nested",
"custom_pua_remove",
"SupportsJson",
"may_have_pua",
]
23 changes: 21 additions & 2 deletions dlt/common/libs/pyarrow.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
from datetime import datetime, date # noqa: I251
from typing import Any, Tuple, Optional, Union, Callable, Iterable, Iterator, Sequence, Tuple
from copy import copy

from dlt import version
from dlt.common import pendulum
from dlt.common.exceptions import MissingDependencyException
from dlt.common.schema.typing import DLT_NAME_PREFIX, TTableSchemaColumns

Expand All @@ -15,7 +16,7 @@
import pyarrow.parquet
except ModuleNotFoundError:
raise MissingDependencyException(
"DLT parquet Helpers", [f"{version.DLT_PKG_NAME}[parquet]"], "DLT Helpers for for parquet."
"dlt parquet Helpers", [f"{version.DLT_PKG_NAME}[parquet]"], "dlt Helpers for for parquet."
)


Expand Down Expand Up @@ -313,6 +314,24 @@ def is_arrow_item(item: Any) -> bool:
return isinstance(item, (pyarrow.Table, pyarrow.RecordBatch))


def to_arrow_compute_input(value: Any, arrow_type: pyarrow.DataType) -> Any:
"""Converts python value to an arrow compute friendly version"""
return pyarrow.scalar(value, type=arrow_type)


def from_arrow_compute_output(arrow_value: pyarrow.Scalar) -> Any:
"""Converts arrow scalar into Python type. Currently adds "UTC" to naive date times."""
row_value = arrow_value.as_py()
# dates are not represented as datetimes but I see connector-x represents
# datetimes as dates and keeping the exact time inside. probably a bug
# but can be corrected this way
if isinstance(row_value, date) and not isinstance(row_value, datetime):
row_value = pendulum.from_timestamp(arrow_value.cast(pyarrow.int64()).as_py() / 1000)
elif isinstance(row_value, datetime):
row_value = pendulum.instance(row_value)
return row_value


TNewColumns = Sequence[Tuple[int, pyarrow.Field, Callable[[pyarrow.Table], Iterable[Any]]]]
"""Sequence of tuples: (field index, field, generating function)"""

Expand Down
13 changes: 7 additions & 6 deletions dlt/common/libs/pydantic.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,15 +100,16 @@ def pydantic_to_table_schema_columns(
# This applies to pydantic.Json fields, the inner type is the type after json parsing
# (In pydantic 2 the outer annotation is the final type)
annotation = inner_annotation

nullable = is_optional_type(annotation)

if is_union_type(annotation):
inner_type = get_args(annotation)[0]
else:
inner_type = extract_inner_type(annotation)
inner_type = extract_inner_type(annotation)
if is_union_type(inner_type):
first_argument_type = get_args(inner_type)[0]
inner_type = extract_inner_type(first_argument_type)

if inner_type is Json: # Same as `field: Json[Any]`
inner_type = Any
inner_type = Any # type: ignore[assignment]

if inner_type is Any: # Any fields will be inferred from data
continue
Expand Down Expand Up @@ -229,7 +230,7 @@ def _process_annotation(t_: Type[Any]) -> Type[Any]:
"""Recursively recreates models with applied schema contract"""
if is_annotated(t_):
a_t, *a_m = get_args(t_)
return Annotated[_process_annotation(a_t), a_m] # type: ignore
return Annotated[_process_annotation(a_t), tuple(a_m)] # type: ignore[return-value]
elif is_list_generic_type(t_):
l_t: Type[Any] = get_args(t_)[0]
try:
Expand Down
2 changes: 1 addition & 1 deletion dlt/common/normalizers/naming/direct.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
class NamingConvention(BaseNamingConvention):
PATH_SEPARATOR = "▶"

_CLEANUP_TABLE = str.maketrans("\n\r'\"▶", "_____")
_CLEANUP_TABLE = str.maketrans(".\n\r'\"▶", "______")

def normalize_identifier(self, identifier: str) -> str:
identifier = super().normalize_identifier(identifier)
Expand Down
2 changes: 1 addition & 1 deletion dlt/common/pendulum.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from datetime import timedelta # noqa: I251
from datetime import timedelta, timezone # noqa: I251
import pendulum # noqa: I251

# force UTC as the local timezone to prevent local dates to be written to dbs
Expand Down
Loading

0 comments on commit d6c93fe

Please sign in to comment.