From 2d90641759f1d5e15bd2eb94833c60ab126dcd87 Mon Sep 17 00:00:00 2001 From: "Aaron (\"AJ\") Steers" Date: Tue, 19 Mar 2024 15:29:45 -0700 Subject: [PATCH] Chore: Add logging for config validation, source check, and successful installs (#135) --- airbyte/_executor.py | 2 ++ airbyte/_util/telemetry.py | 47 ++++++++++++++++++++++++++++++++++++++ airbyte/sources/base.py | 30 +++++++++++++++++++++--- airbyte/sources/util.py | 21 +++-------------- 4 files changed, 79 insertions(+), 21 deletions(-) diff --git a/airbyte/_executor.py b/airbyte/_executor.py index 03f4087b..139d3ef5 100644 --- a/airbyte/_executor.py +++ b/airbyte/_executor.py @@ -15,6 +15,7 @@ from airbyte import exceptions as exc from airbyte._util.meta import is_windows +from airbyte._util.telemetry import EventState, log_install_state from airbyte.sources.registry import ConnectorMetadata @@ -238,6 +239,7 @@ def install(self) -> None: # Assuming the installation succeeded, store the installed version self.reported_version = self._get_installed_version(raise_on_error=False, recheck=True) + log_install_state(self.name, state=EventState.SUCCEEDED) print( f"Connector '{self.name}' installed successfully!\n" f"For more information, see the {self.name} documentation:\n" diff --git a/airbyte/_util/telemetry.py b/airbyte/_util/telemetry.py index 92ab65ec..a32dc9ea 100644 --- a/airbyte/_util/telemetry.py +++ b/airbyte/_util/telemetry.py @@ -176,6 +176,8 @@ class EventState(str, Enum): class EventType(str, Enum): INSTALL = "install" SYNC = "sync" + VALIDATE = "validate" + CHECK = "check" @dataclass @@ -293,3 +295,48 @@ def send_telemetry( "timestamp": datetime.datetime.utcnow().isoformat(), # noqa: DTZ003 }, ) + + +def log_config_validation_result( + name: str, + state: EventState, + exception: Exception | None = None, +) -> None: + """Log a config validation event.""" + send_telemetry( + source=name, + cache=None, + state=state, + event_type=EventType.VALIDATE, + exception=exception, + ) + + +def log_source_check_result( + name: str, + state: EventState, + exception: Exception | None = None, +) -> None: + """Log a source `check` result.""" + send_telemetry( + source=name, + cache=None, + state=state, + event_type=EventType.CHECK, + exception=exception, + ) + + +def log_install_state( + name: str, + state: EventState, + exception: Exception | None = None, +) -> None: + """Log an install event.""" + send_telemetry( + source=name, + cache=None, + state=state, + event_type=EventType.INSTALL, + exception=exception, + ) diff --git a/airbyte/sources/base.py b/airbyte/sources/base.py index e81e8b3d..81d2722d 100644 --- a/airbyte/sources/base.py +++ b/airbyte/sources/base.py @@ -29,7 +29,13 @@ from airbyte import exceptions as exc from airbyte._util import protocol_util from airbyte._util.name_normalizers import normalize_records -from airbyte._util.telemetry import EventState, EventType, send_telemetry +from airbyte._util.telemetry import ( + EventState, + EventType, + log_config_validation_result, + log_source_check_result, + send_telemetry, +) from airbyte.caches.util import get_default_cache from airbyte.datasets._lazy import LazyDataset from airbyte.progress import progress @@ -200,8 +206,12 @@ def validate_config(self, config: dict[str, Any] | None = None) -> None: config = self._config if config is None else config try: jsonschema.validate(config, spec.connectionSpecification) + log_config_validation_result( + name=self.name, + state=EventState.SUCCEEDED, + ) except jsonschema.ValidationError as ex: - raise exc.AirbyteConnectorValidationFailedError( + validation_ex = exc.AirbyteConnectorValidationFailedError( message="The provided config is not valid.", context={ "error_message": ex.message, @@ -209,7 +219,13 @@ def validate_config(self, config: dict[str, Any] | None = None) -> None: "error_instance": ex.instance, "error_schema": ex.schema, }, - ) from ex + ) + log_config_validation_result( + name=self.name, + state=EventState.FAILED, + exception=validation_ex, + ) + raise validation_ex from ex def get_available_streams(self) -> list[str]: """Get the available streams from the spec.""" @@ -398,8 +414,16 @@ def check(self) -> None: if msg.type == Type.CONNECTION_STATUS and msg.connectionStatus: if msg.connectionStatus.status != Status.FAILED: print(f"Connection check succeeded for `{self.name}`.") + log_source_check_result( + name=self.name, + state=EventState.SUCCEEDED, + ) return + log_source_check_result( + name=self.name, + state=EventState.FAILED, + ) raise exc.AirbyteConnectorCheckFailedError( help_url=self.docs_url, context={ diff --git a/airbyte/sources/util.py b/airbyte/sources/util.py index c21fb427..518ea132 100644 --- a/airbyte/sources/util.py +++ b/airbyte/sources/util.py @@ -11,7 +11,7 @@ from airbyte import exceptions as exc from airbyte._executor import PathExecutor, VenvExecutor -from airbyte._util.telemetry import EventState, EventType, send_telemetry +from airbyte._util.telemetry import EventState, log_install_state from airbyte.sources.base import Source from airbyte.sources.registry import ConnectorMetadata, get_connector_metadata @@ -122,7 +122,7 @@ def get_source( metadata = get_connector_metadata(name) except exc.AirbyteConnectorNotRegisteredError as ex: if not pip_url: - _log_install_state(name, state=EventState.FAILED, exception=ex) + log_install_state(name, state=EventState.FAILED, exception=ex) # We don't have a pip url or registry entry, so we can't install the connector raise @@ -143,25 +143,10 @@ def get_source( executor=executor, ) except Exception as e: - _log_install_state(name, state=EventState.FAILED, exception=e) + log_install_state(name, state=EventState.FAILED, exception=e) raise __all__ = [ "get_source", ] - - -def _log_install_state( - name: str, - state: EventState, - exception: Exception | None = None, -) -> None: - """Log an install event.""" - send_telemetry( - source=name, - cache=None, - state=state, - event_type=EventType.INSTALL, - exception=exception, - )