Skip to content

Commit

Permalink
Docs: Improve docstrings and automated API ref docs (#313)
Browse files Browse the repository at this point in the history
  • Loading branch information
aaronsteers authored Jul 30, 2024
1 parent 4591a6d commit 4a5722a
Show file tree
Hide file tree
Showing 25 changed files with 168 additions and 64 deletions.
6 changes: 3 additions & 3 deletions airbyte/_connector_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,9 +188,9 @@ def print_config_spec(
"""Print the configuration spec for this connector.
Args:
- format: The format to print the spec in. Must be "yaml" or "json".
- output_file: Optional. If set, the spec will be written to the given file path. Otherwise,
it will be printed to the console.
format: The format to print the spec in. Must be "yaml" or "json".
output_file: Optional. If set, the spec will be written to the given file path.
Otherwise, it will be printed to the console.
"""
if format not in {"yaml", "json"}:
raise exc.PyAirbyteInputError(
Expand Down
4 changes: 1 addition & 3 deletions airbyte/_future_cdk/sql_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -431,10 +431,8 @@ def _ensure_compatible_table_schema(
Raises an exception if the table schema is not compatible with the schema of the
input stream.
TODO:
- Expand this to check for column types and sizes.
"""
# TODO: Expand this to check for column types and sizes.
self._add_missing_columns_to_table(
stream_name=stream_name,
table_name=table_name,
Expand Down
1 change: 0 additions & 1 deletion airbyte/_message_iterators.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ def read(self) -> str:
@classmethod
def from_read_result(cls, read_result: ReadResult) -> AirbyteMessageIterator:
"""Create a iterator from a `ReadResult` object."""

state_provider = read_result.cache.get_state_provider(
source_name=read_result.source_name,
refresh=True,
Expand Down
2 changes: 1 addition & 1 deletion airbyte/_processors/sql/snowflakecortex.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ def _add_missing_columns_to_table(
stream_name: str,
table_name: str,
) -> None:
"""Use Snowflake Python connector to add new columns to the table"""
"""Use Snowflake Python connector to add new columns to the table."""
columns = self._get_sql_column_definitions(stream_name)
existing_columns = self._get_column_list_from_table(table_name)
for column_name, column_type in columns.items():
Expand Down
16 changes: 9 additions & 7 deletions airbyte/caches/_catalog_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,11 @@ class CachedStream(SqlAlchemyModel): # type: ignore[valid-type,misc]


class CatalogBackendBase(abc.ABC):
"""
A class to manage the stream catalog of data synced to a cache:
* What streams exist and to what tables they map
* The JSON schema for each stream
"""A class to manage the stream catalog of data synced to a cache.
This includes:
- What streams exist and to what tables they map
- The JSON schema for each stream
"""

# Abstract implementations
Expand Down Expand Up @@ -101,10 +102,11 @@ def get_source_catalog_provider(self, source_name: str) -> CatalogProvider:


class SqlCatalogBackend(CatalogBackendBase):
"""
A class to manage the stream catalog of data synced to a cache:
"""A class to manage the stream catalog of data synced to a cache.
This includes:
- What streams exist and to what tables they map
- The JSON schema for each stream
- The JSON schema for each stream.
"""

def __init__(
Expand Down
4 changes: 2 additions & 2 deletions airbyte/caches/_state_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,9 +179,9 @@ def _write_state(


class SqlStateBackend(StateBackendBase):
"""
A class to manage the stream catalog of data synced to a cache:
"""A class to manage the stream catalog of data synced to a cache.
This includes:
- What streams exist and to what tables they map
- The JSON schema for each stream
"""
Expand Down
3 changes: 2 additions & 1 deletion airbyte/caches/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ def get_arrow_dataset(
max_chunk_size: int = DEFAULT_ARROW_MAX_CHUNK_SIZE,
) -> NoReturn:
"""Raises NotImplementedError; BigQuery doesn't support `pd.read_sql_table`.
https://github.com/airbytehq/PyAirbyte/issues/165
See: https://github.com/airbytehq/PyAirbyte/issues/165
"""
raise NotImplementedError(
"BigQuery doesn't currently support to_arrow"
Expand Down
2 changes: 2 additions & 0 deletions airbyte/cloud/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,10 +97,12 @@ def table_prefix(self) -> str:

@property
def connection_url(self) -> str | None:
"""The URL to the connection."""
return f"{self.workspace.workspace_url}/connections/{self.connection_id}"

@property
def job_history_url(self) -> str | None:
"""The URL to the job history for the connection."""
return f"{self.connection_url}/job-history"

# Run Sync
Expand Down
4 changes: 2 additions & 2 deletions airbyte/cloud/experimental.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
)


class CloudWorkspace(Stable_CloudWorkspace):
class CloudWorkspace(Stable_CloudWorkspace): # noqa: D101 # Docstring inherited from parent.
__doc__ = (
f"Experimental implementation of `.CloudWorkspace`.\n\n{Stable_CloudConnection.__doc__}"
)
Expand All @@ -53,7 +53,7 @@ class CloudWorkspace(Stable_CloudWorkspace):
permanently_delete_destination = Stable_CloudWorkspace._permanently_delete_destination


class CloudConnection(Stable_CloudConnection):
class CloudConnection(Stable_CloudConnection): # noqa: D101 # Docstring inherited from parent.
__doc__ = (
f"Experimental implementation of `.CloudConnection`.\n\n{Stable_CloudConnection.__doc__}"
)
Expand Down
1 change: 0 additions & 1 deletion airbyte/cloud/sync_results.py
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,6 @@ def __getitem__(self, key: str) -> CachedDataset:
return self.parent.get_dataset(stream_name=key)

def __iter__(self) -> Iterator[str]:
"""TODO"""
return iter(self.parent.stream_names)

def __len__(self) -> int:
Expand Down
5 changes: 5 additions & 0 deletions airbyte/cloud/workspaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ class CloudWorkspace:

@property
def workspace_url(self) -> str | None:
"""The URL of the workspace."""
return f"{self.api_root}/workspaces/{self.workspace_id}"

# Test connection and creds
Expand Down Expand Up @@ -202,6 +203,10 @@ def _deploy_connection(
`cache` or `destination`, but not both.
destination (str, optional): The destination ID to use. You can provide
`cache` or `destination`, but not both.
table_prefix (str, optional): The table prefix to use for the cache. If not provided,
the cache's table prefix will be used.
selected_streams (list[str], optional): The selected stream names to use for the
connection. If not provided, the source's selected streams will be used.
"""
# Resolve source ID
source_id: str
Expand Down
1 change: 1 addition & 0 deletions airbyte/documents.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ class Document(BaseModel):
last_modified: Optional[datetime.datetime] = Field(default=None)

def __str__(self) -> str:
"""Return a string representation of the document."""
return self.content

@property
Expand Down
6 changes: 6 additions & 0 deletions airbyte/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ def get_message(self) -> str:
return self.__doc__.split("\n")[0] if self.__doc__ else ""

def __str__(self) -> str:
"""Return a string representation of the exception."""
special_properties = ["message", "guidance", "help_url", "log_text", "context"]
display_properties = {
k: v
Expand Down Expand Up @@ -109,6 +110,7 @@ def __str__(self) -> str:
return exception_str

def __repr__(self) -> str:
"""Return a string representation of the exception."""
class_name = self.__class__.__name__
properties_str = ", ".join(
f"{k}={v!r}" for k, v in self.__dict__.items() if not k.startswith("_")
Expand Down Expand Up @@ -383,6 +385,7 @@ class AirbyteError(PyAirbyteError):

@property
def workspace_url(self) -> str | None:
"""The URL to the workspace where the error occurred."""
if self.workspace:
return self.workspace.workspace_url

Expand All @@ -404,20 +407,23 @@ class AirbyteConnectionError(AirbyteError):

@property
def connection_url(self) -> str | None:
"""The URL to the connection where the error occurred."""
if self.workspace_url and self.connection_id:
return f"{self.workspace_url}/connections/{self.connection_id}"

return None

@property
def job_history_url(self) -> str | None:
"""The URL to the job history where the error occurred."""
if self.connection_url:
return f"{self.connection_url}/job-history"

return None

@property
def job_url(self) -> str | None:
"""The URL to the job where the error occurred."""
if self.job_history_url and self.job_id:
return f"{self.job_history_url}#{self.job_id}::0"

Expand Down
10 changes: 10 additions & 0 deletions airbyte/records.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,9 @@ def __init__(
Args:
from_dict: The dictionary to initialize the StreamRecord with.
stream_record_handler: The StreamRecordHandler to use for processing the record.
with_internal_columns: If `True`, the internal columns will be added to the record.
extracted_at: The time the record was extracted. If not provided, the current time will
be used.
"""
self._stream_handler: StreamRecordHandler = stream_record_handler

Expand Down Expand Up @@ -252,12 +255,14 @@ def from_record_message(
)

def __getitem__(self, key: str) -> Any: # noqa: ANN401
"""Return the item with the given key."""
try:
return super().__getitem__(key)
except KeyError:
return super().__getitem__(self._stream_handler.to_index_case(key))

def __setitem__(self, key: str, value: Any) -> None: # noqa: ANN401
"""Set the item with the given key to the given value."""
index_case_key = self._stream_handler.to_index_case(key)
if (
self._stream_handler.prune_extra_fields
Expand All @@ -268,6 +273,7 @@ def __setitem__(self, key: str, value: Any) -> None: # noqa: ANN401
super().__setitem__(index_case_key, value)

def __delitem__(self, key: str) -> None:
"""Delete the item with the given key."""
try:
super().__delitem__(key)
except KeyError:
Expand All @@ -282,18 +288,22 @@ def __delitem__(self, key: str) -> None:
raise KeyError(key)

def __contains__(self, key: object) -> bool:
"""Return whether the dictionary contains the given key."""
assert isinstance(key, str), "Key must be a string."
return super().__contains__(key) or super().__contains__(
self._stream_handler.to_index_case(key)
)

def __iter__(self) -> Iterator[str]:
"""Return an iterator over the keys of the dictionary."""
return iter(super().__iter__())

def __len__(self) -> int:
"""Return the number of items in the dictionary."""
return super().__len__()

def __eq__(self, other: object) -> bool:
"""Return whether the StreamRecord is equal to the given dict or StreamRecord object."""
if isinstance(other, StreamRecord):
return dict(self) == dict(other)

Expand Down
26 changes: 26 additions & 0 deletions airbyte/results.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,11 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
"""Module which defines the `ReadResult` and `WriteResult` classes.
These classes are used to return information about read and write operations, respectively. They
contain information such as the number of records read or written, the cache object, and the
state handlers for a sync.
"""

from __future__ import annotations

from collections.abc import Mapping
Expand Down Expand Up @@ -37,45 +44,58 @@ def __init__(
cache: CacheBase,
progress_tracker: ProgressTracker,
) -> None:
"""Initialize a read result.
This class should not be created directly. Instead, it should be returned by the `read`
method of the `Source` class.
"""
self.source_name = source_name
self._progress_tracker = progress_tracker
self._cache = cache
self._processed_streams = processed_streams

def __getitem__(self, stream: str) -> CachedDataset:
"""Return the cached dataset for a given stream name."""
if stream not in self._processed_streams:
raise KeyError(stream)

return CachedDataset(self._cache, stream)

def __contains__(self, stream: object) -> bool:
"""Return whether a given stream name was included in processing."""
if not isinstance(stream, str):
return False

return stream in self._processed_streams

def __iter__(self) -> Iterator[str]:
"""Return an iterator over the stream names that were processed."""
return self._processed_streams.__iter__()

def __len__(self) -> int:
"""Return the number of streams that were processed."""
return len(self._processed_streams)

def get_sql_engine(self) -> Engine:
"""Return the SQL engine used by the cache."""
return self._cache.get_sql_engine()

@property
def processed_records(self) -> int:
"""The total number of records read from the source."""
return self._progress_tracker.total_records_read

@property
def streams(self) -> Mapping[str, CachedDataset]:
"""Return a mapping of stream names to cached datasets."""
return {
stream_name: CachedDataset(self._cache, stream_name)
for stream_name in self._processed_streams
}

@property
def cache(self) -> CacheBase:
"""Return the cache object."""
return self._cache


Expand All @@ -96,6 +116,11 @@ def __init__(
state_writer: StateWriterBase,
progress_tracker: ProgressTracker,
) -> None:
"""Initialize a write result.
This class should not be created directly. Instead, it should be returned by the `write`
method of the `Destination` class.
"""
self._destination: Destination = destination
self._source_data: Source | ReadResult = source_data
self._catalog_provider: CatalogProvider = catalog_provider
Expand All @@ -104,6 +129,7 @@ def __init__(

@property
def processed_records(self) -> int:
"""The total number of records written to the destination."""
return self._progress_tracker.total_destination_records_delivered

def get_state_provider(self) -> StateProviderBase:
Expand Down
Loading

0 comments on commit 4a5722a

Please sign in to comment.