Skip to content

Commit

Permalink
Fix: Auto-use manifests when declared in the connector registry (#347)
Browse files Browse the repository at this point in the history
  • Loading branch information
aaronsteers authored Aug 31, 2024
1 parent c5e4c20 commit 47d7c16
Show file tree
Hide file tree
Showing 11 changed files with 319 additions and 167 deletions.
7 changes: 2 additions & 5 deletions airbyte/_executors/declarative.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

from __future__ import annotations

import json
import warnings
from pathlib import Path
from typing import IO, TYPE_CHECKING, cast
Expand Down Expand Up @@ -63,13 +62,11 @@ def __init__(
self._validate_manifest(self._manifest_dict)
self.declarative_source = ManifestDeclarativeSource(source_config=self._manifest_dict)

# TODO: Consider adding version detection
# https://github.com/airbytehq/airbyte/issues/318
self.reported_version: str | None = None
self.reported_version: str | None = self._manifest_dict.get("version", None)

def _validate_manifest(self, manifest_dict: dict) -> None:
"""Validate the manifest."""
manifest_text = json.dumps(manifest_dict)
manifest_text = yaml.safe_dump(manifest_dict)
if "class_name:" in manifest_text:
raise exc.AirbyteConnectorInstallationError(
message=(
Expand Down
143 changes: 84 additions & 59 deletions airbyte/_executors/util.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
from __future__ import annotations

import shutil
import sys
import tempfile
from pathlib import Path
from typing import TYPE_CHECKING, cast
from typing import TYPE_CHECKING, Literal, cast

import requests
import yaml
Expand All @@ -17,8 +15,9 @@
from airbyte._executors.docker import DockerExecutor
from airbyte._executors.local import PathExecutor
from airbyte._executors.python import VenvExecutor
from airbyte._util.meta import which
from airbyte._util.telemetry import EventState, log_install_state # Non-public API
from airbyte.sources.registry import ConnectorMetadata, get_connector_metadata
from airbyte.sources.registry import ConnectorMetadata, InstallType, get_connector_metadata


if TYPE_CHECKING:
Expand Down Expand Up @@ -77,33 +76,71 @@ def _try_get_source_manifest(source_name: str, manifest_url: str | None) -> dict
return result_1


def get_connector_executor( # noqa: PLR0912, PLR0913, PLR0915 # Too complex
def _get_local_executor(
name: str,
local_executable: Path | str | Literal[True],
version: str | None,
) -> Executor:
"""Get a local executor for a connector."""
if version:
raise exc.PyAirbyteInputError(
message="Param 'version' is not supported when 'local_executable' is set."
)

if local_executable is True:
# Use the default executable name for the connector
local_executable = name

if isinstance(local_executable, str):
if "/" in local_executable or "\\" in local_executable:
# Assume this is a path
local_executable = Path(local_executable).absolute()
else:
which_executable: Path | None = which(local_executable)
if not which_executable:
raise exc.AirbyteConnectorExecutableNotFoundError(
connector_name=name,
context={
"executable": name,
"working_directory": Path.cwd().absolute(),
},
) from FileNotFoundError(name)
local_executable = Path(which_executable).absolute()

# `local_executable` is now a Path object

print(f"Using local `{name}` executable: {local_executable!s}")
return PathExecutor(
name=name,
path=local_executable,
)


def get_connector_executor( # noqa: PLR0912, PLR0913 # Too complex
name: str,
*,
version: str | None = None,
pip_url: str | None = None,
local_executable: Path | str | None = None,
docker_image: bool | str | None = False,
docker_image: bool | str | None = None,
use_host_network: bool = False,
source_manifest: bool | dict | Path | str = False,
source_manifest: bool | dict | Path | str | None = None,
install_if_missing: bool = True,
install_root: Path | None = None,
) -> Executor:
"""This factory function creates an executor for a connector.
For documentation of each arg, see the function `airbyte.sources.util.get_source()`.
"""
if (
sum(
[
bool(local_executable),
bool(docker_image),
bool(pip_url),
bool(source_manifest),
]
)
> 1
):
install_method_count = sum(
[
bool(local_executable),
bool(docker_image),
bool(pip_url),
bool(source_manifest),
]
)
if install_method_count > 1:
raise exc.PyAirbyteInputError(
message=(
"You can only specify one of the settings: 'local_executable', 'docker_image', "
Expand All @@ -116,40 +153,37 @@ def get_connector_executor( # noqa: PLR0912, PLR0913, PLR0915 # Too complex
"source_manifest": source_manifest,
},
)
metadata: ConnectorMetadata | None = None
try:
metadata = get_connector_metadata(name)
except exc.AirbyteConnectorNotRegisteredError as ex:
if install_method_count == 0:
# User has not specified how to install the connector, and it is not registered.
# Fail the install.
log_install_state(name, state=EventState.FAILED, exception=ex)
raise

if local_executable:
if version:
raise exc.PyAirbyteInputError(
message="Param 'version' is not supported when 'local_executable' is set."
)
if install_method_count == 0:
# User has not specified how to install the connector.
# Prefer local executable if found, then manifests, then python, then docker, depending upon
# how the connector is declared in the connector registry.
if which(name):
local_executable = name
elif metadata and metadata.install_types:
match metadata.default_install_type:
case InstallType.YAML:
source_manifest = True
case InstallType.PYTHON:
pip_url = metadata.pypi_package_name
case _:
docker_image = True

if isinstance(local_executable, str):
if "/" in local_executable or "\\" in local_executable:
# Assume this is a path
local_executable = Path(local_executable).absolute()
else:
which_executable: str | None = None
which_executable = shutil.which(local_executable)
if not which_executable and sys.platform == "win32":
# Try with the .exe extension
local_executable = f"{local_executable}.exe"
which_executable = shutil.which(local_executable)

if which_executable is None:
raise exc.AirbyteConnectorExecutableNotFoundError(
connector_name=name,
context={
"executable": local_executable,
"working_directory": Path.cwd().absolute(),
},
) from FileNotFoundError(local_executable)
local_executable = Path(which_executable).absolute()

print(f"Using local `{name}` executable: {local_executable!s}")
return PathExecutor(
name=name,
path=local_executable,
)
if local_executable:
return _get_local_executor(
name=name,
local_executable=local_executable,
version=version,
)

if docker_image:
if docker_image is True:
Expand Down Expand Up @@ -215,15 +249,6 @@ def get_connector_executor( # noqa: PLR0912, PLR0913, PLR0915 # Too complex

# else: we are installing a connector in a Python virtual environment:

metadata: ConnectorMetadata | None = None
try:
metadata = get_connector_metadata(name)
except exc.AirbyteConnectorNotRegisteredError as ex:
if not pip_url:
log_install_state(name, state=EventState.FAILED, exception=ex)
# We don't have a pip url or registry entry, so we can't install the connector
raise

try:
executor = VenvExecutor(
name=name,
Expand Down
20 changes: 20 additions & 0 deletions airbyte/_util/meta.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from __future__ import annotations

import os
import shutil
import sys
from contextlib import suppress
from functools import lru_cache
Expand Down Expand Up @@ -147,3 +148,22 @@ def get_os() -> str:
return f"Google Colab ({get_colab_release_version()})"

return f"{system()}"


@lru_cache
def which(executable_name: str) -> Path | None:
"""Return the path to an executable which would be run if the given name were called.
This function is a cross-platform wrapper for the `shutil.which()` function.
"""
which_executable: str | None = None
which_executable = shutil.which(executable_name)
if not which_executable and is_windows():
# Try with the .exe extension
which_executable = shutil.which(f"{executable_name}.exe")

return Path(which_executable) if which_executable else None


def is_docker_installed() -> bool:
return bool(which("docker"))
10 changes: 8 additions & 2 deletions airbyte/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@


NEW_ISSUE_URL = "https://github.com/airbytehq/airbyte/issues/new/choose"
DOCS_URL = "https://airbytehq.github.io/PyAirbyte/airbyte.html"
DOCS_URL_BASE = "https://airbytehq.github.io/PyAirbyte"
DOCS_URL = f"{DOCS_URL_BASE}/airbyte.html"


# Base error class
Expand Down Expand Up @@ -246,7 +247,12 @@ class AirbyteConnectorNotRegisteredError(AirbyteConnectorRegistryError):
"""Connector not found in registry."""

connector_name: str | None = None
guidance = "Please double check the connector name."
guidance = (
"Please double check the connector name. "
"Alternatively, you can provide an explicit connector install method to `get_source()`: "
"`pip_url`, `local_executable`, `docker_image`, or `source_manifest`."
)
help_url = DOCS_URL_BASE + "/airbyte/sources/util.html#get_source"


@dataclass
Expand Down
Loading

0 comments on commit 47d7c16

Please sign in to comment.