Skip to content

Commit

Permalink
Cloud API: Add ManagedCluster.get_client_bundle method
Browse files Browse the repository at this point in the history
It can be used to acquire corresponding client handles (adapter, dbapi,
sqlalchemy), in order to communicate with the database.
  • Loading branch information
amotl committed Nov 15, 2023
1 parent d1e4c0f commit 2171623
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 8 deletions.
50 changes: 50 additions & 0 deletions cratedb_toolkit/api/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,27 +2,44 @@
import dataclasses
import json
import logging
import os
import sys
import time
import typing as t
from abc import abstractmethod

import crate.client
import sqlalchemy as sa

from cratedb_toolkit.api.guide import GuidingTexts
from cratedb_toolkit.cluster.util import deploy_cluster, get_cluster_by_name, get_cluster_info
from cratedb_toolkit.config import CONFIG
from cratedb_toolkit.exception import CroudException, OperationFailed
from cratedb_toolkit.io.croud import CloudIo, CloudJob
from cratedb_toolkit.model import ClusterInformation, DatabaseAddress, InputOutputResource, TableAddress
from cratedb_toolkit.util import DatabaseAdapter
from cratedb_toolkit.util.runtime import flexfun
from cratedb_toolkit.util.setting import RequiredMutuallyExclusiveSettingsGroup, Setting

logger = logging.getLogger(__name__)


@dataclasses.dataclass
class ClientBundle:
adapter: DatabaseAdapter
dbapi: crate.client.connection.Connection
sqlalchemy: sa.Engine


class ClusterBase(abc.ABC):
@abstractmethod
def load_table(self, source: InputOutputResource, target: TableAddress):
raise NotImplementedError("Child class needs to implement this method")

@abstractmethod
def get_client_bundle(self) -> ClientBundle:
raise NotImplementedError("Child class needs to implement this method")


class ManagedCluster(ClusterBase):
"""
Expand Down Expand Up @@ -102,6 +119,8 @@ def delete(self) -> "ManagedCluster":
def probe(self) -> "ManagedCluster":
"""
Probe a CrateDB Cloud cluster, API-wise.
TODO: Investigate callers, and reduce number of invocations.
"""
try:
if self.id:
Expand Down Expand Up @@ -144,6 +163,11 @@ def acquire(self) -> "ManagedCluster":
if not self.exists:
logger.info(f"Cluster does not exist, deploying it: id={self.id}, name={self.name}")
self.deploy()
logger.info(f"Cluster deployed: id={self.id}, name={self.name}")

# Wait a bit, to let the deployment settle, to work around DNS propagation problems.
time.sleep(3.25)

self.probe()
if not self.exists:
# TODO: Is it possible to gather and propagate more information why the deployment failed?
Expand Down Expand Up @@ -210,6 +234,29 @@ def load_table(self, source: InputOutputResource, target: t.Optional[TableAddres
logger.exception(msg)
raise OperationFailed(msg) from ex

def get_client_bundle(self, username: str = None, password: str = None) -> ClientBundle:
"""
Return a bundle of client handles to the CrateDB Cloud cluster.
- adapter: A high-level `DatabaseAdapter` instance, offering a few convenience methods.
- dbapi: A DBAPI connection object, as provided by SQLAlchemy's `dbapi_connection`.
- sqlalchemy: An SQLAlchemy `Engine` object.
"""
if username is None:
username = os.environ.get("CRATEDB_USERNAME")
if password is None:
password = os.environ.get("CRATEDB_PASSWORD")
cratedb_http_url = self.info.cloud["url"]
logger.info(f"Connecting to database cluster at: {cratedb_http_url}")
address = DatabaseAddress.from_httpuri(cratedb_http_url)
address.with_credentials(username=username, password=password)
adapter = DatabaseAdapter(address.dburi)
return ClientBundle(
adapter=adapter,
dbapi=adapter.connection.connection.dbapi_connection,
sqlalchemy=adapter.engine,
)


@dataclasses.dataclass
class StandaloneCluster(ClusterBase):
Expand Down Expand Up @@ -250,3 +297,6 @@ def load_table(self, source: InputOutputResource, target: TableAddress):
raise OperationFailed(msg)
else:
raise NotImplementedError("Importing resource not implemented yet")

def get_client_bundle(self, username: str = None, password: str = None) -> ClientBundle:
raise NotImplementedError("Not implemented for `StandaloneCluster` yet")
11 changes: 9 additions & 2 deletions cratedb_toolkit/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,14 @@ class DatabaseAddress:
uri: URL

@classmethod
def from_string(cls, url):
def from_string(cls, url: str) -> "DatabaseAddress":
"""
Factory method to create an instance from an SQLAlchemy database URL in string format.
"""
return cls(uri=URL(url))

@classmethod
def from_httpuri(cls, url):
def from_httpuri(cls, url: str) -> "DatabaseAddress":
"""
Factory method to create an instance from an HTTP URL in string format.
"""
Expand All @@ -34,6 +34,13 @@ def from_httpuri(cls, url):
uri.scheme = "crate"
return cls(uri=uri)

def with_credentials(self, username: str = None, password: str = None):
if username is not None:
self.uri.username = username
if password is not None:
self.uri.password = password
return self

@property
def dburi(self) -> str:
"""
Expand Down
13 changes: 7 additions & 6 deletions examples/cloud_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
export CRATEDB_CLOUD_CLUSTER_ID='e1e38d92-a650-48f1-8a70-8133f2d5c400'
export CRATEDB_USERNAME='admin'
export CRATEDB_PASSWORD='H3IgNXNvQBJM3CiElOiVHuSp6CjXMCiQYhB4I9dLccVHGvvvitPSYr1vTpt4'
ctk shell --command "SELECT * from sys.summits LIMIT 5;"
ctk shell --command "SELECT * from sys.summits LIMIT 2;"
Usage
=====
Expand Down Expand Up @@ -60,6 +60,7 @@
"""
import json
import logging
import sys

import cratedb_toolkit
from cratedb_toolkit.util import setup_logging
Expand All @@ -72,7 +73,7 @@ def workload():
Run a workload on a CrateDB database cluster on CrateDB Cloud.
ctk cluster start Hotzenplotz
ctk shell --command "SELECT * from sys.summits LIMIT 5;"
ctk shell --command "SELECT * from sys.summits LIMIT 2;"
"""

from cratedb_toolkit import ManagedCluster
Expand All @@ -83,12 +84,12 @@ def workload():

# Report information about cluster.
# TODO: Use `cluster.{print,format}_info()`.
print(json.dumps(cluster.info.cloud)) # noqa: T201
print(json.dumps(cluster.info.cloud), file=sys.stderr) # noqa: T201

# Run database workload.
# TODO: Enable acquiring a client handle.
# cratedb = cluster.get_connection_client() # noqa: ERA001
# cratedb.run_sql("SELECT * from sys.summits LIMIT 5;") # noqa: ERA001
cratedb = cluster.get_client_bundle()
results = cratedb.adapter.run_sql("SELECT * from sys.summits LIMIT 2;", records=True)
print(json.dumps(results, indent=2)) # noqa: T201

# Stop cluster again.
cluster.stop()
Expand Down

0 comments on commit 2171623

Please sign in to comment.