Skip to content

Commit

Permalink
MongoDB: Add capability to give type hints and add transformations
Browse files Browse the repository at this point in the history
  • Loading branch information
amotl committed Aug 14, 2024
1 parent 30d1903 commit 4814bf1
Show file tree
Hide file tree
Showing 9 changed files with 106 additions and 16 deletions.
19 changes: 14 additions & 5 deletions cratedb_toolkit/api/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import logging
import typing as t
from abc import abstractmethod
from pathlib import Path

from yarl import URL

Expand All @@ -18,7 +19,7 @@

class ClusterBase(abc.ABC):
@abstractmethod
def load_table(self, resource: InputOutputResource, target: TableAddress):
def load_table(self, resource: InputOutputResource, target: TableAddress, transformation: Path):
raise NotImplementedError("Child class needs to implement this method")


Expand All @@ -35,7 +36,9 @@ class ManagedCluster(ClusterBase):
def __post_init__(self):
logger.info(f"Connecting to CrateDB Cloud Cluster: {self.cloud_id}")

def load_table(self, resource: InputOutputResource, target: t.Optional[TableAddress] = None):
def load_table(
self, resource: InputOutputResource, target: t.Optional[TableAddress] = None, transformation: Path = None
):
"""
Load data into a database table on CrateDB Cloud.
Expand Down Expand Up @@ -96,7 +99,7 @@ class StandaloneCluster(ClusterBase):
address: DatabaseAddress
info: t.Optional[ClusterInformation] = None

def load_table(self, resource: InputOutputResource, target: TableAddress):
def load_table(self, resource: InputOutputResource, target: TableAddress, transformation: Path = None):
"""
Load data into a database table on a standalone CrateDB Server.
Expand All @@ -109,11 +112,11 @@ def load_table(self, resource: InputOutputResource, target: TableAddress):
"""
source_url = resource.url
target_url = self.address.dburi
source_url_obj = URL(source_url)
if source_url.startswith("influxdb"):
from cratedb_toolkit.io.influxdb import influxdb_copy

http_scheme = "http://"
source_url_obj = URL(source_url)
if asbool(source_url_obj.query.get("ssl")):
http_scheme = "https://"
source_url = source_url.replace("influxdb2://", http_scheme)
Expand All @@ -130,7 +133,13 @@ def load_table(self, resource: InputOutputResource, target: TableAddress):
else:
from cratedb_toolkit.io.mongodb.api import mongodb_copy

if not mongodb_copy(source_url, target_url, progress=True):
if not mongodb_copy(
source_url,
target_url,
transformation=transformation,
limit=int(source_url_obj.query.get("limit", 0)),
progress=True,
):
msg = "Data loading failed"
logger.error(msg)
raise OperationFailed(msg)
Expand Down
5 changes: 4 additions & 1 deletion cratedb_toolkit/io/cli.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
from pathlib import Path

import click
from click_aliases import ClickAliasedGroup
Expand Down Expand Up @@ -35,6 +36,7 @@ def cli(ctx: click.Context, verbose: bool, debug: bool):
@click.option("--table", envvar="CRATEDB_TABLE", type=str, required=False, help="Table where to import the data")
@click.option("--format", "format_", type=str, required=False, help="File format of the import resource")
@click.option("--compression", type=str, required=False, help="Compression format of the import resource")
@click.option("--transformation", type=Path, required=False, help="Path to Zyp transformation file")
@click.pass_context
def load_table(
ctx: click.Context,
Expand All @@ -46,6 +48,7 @@ def load_table(
table: str,
format_: str,
compression: str,
transformation: Path,
):
"""
Import data into CrateDB and CrateDB Cloud clusters.
Expand Down Expand Up @@ -82,4 +85,4 @@ def load_table(
cluster = StandaloneCluster(address=address)
else:
raise NotImplementedError("Unable to select backend")
return cluster.load_table(resource=resource, target=target)
return cluster.load_table(resource=resource, target=target, transformation=transformation)
16 changes: 13 additions & 3 deletions cratedb_toolkit/io/mongodb/api.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import argparse
import logging
from pathlib import Path

from cratedb_toolkit.io.mongodb.cdc import MongoDBCDCRelayCrateDB
from cratedb_toolkit.io.mongodb.core import export, extract, translate
Expand All @@ -10,7 +11,7 @@
logger = logging.getLogger(__name__)


def mongodb_copy(source_url, target_url, limit: int = 0, progress: bool = False):
def mongodb_copy(source_url, target_url, transformation: Path = None, limit: int = 0, progress: bool = False):
"""
Synopsis
--------
Expand All @@ -36,7 +37,12 @@ def mongodb_copy(source_url, target_url, limit: int = 0, progress: bool = False)
# 1. Extract schema from MongoDB collection.
logger.info(f"Extracting schema from MongoDB: {mongodb_database}.{mongodb_collection}")
extract_args = argparse.Namespace(
url=str(mongodb_uri), database=mongodb_database, collection=mongodb_collection, scan="full", limit=limit
url=str(mongodb_uri),
database=mongodb_database,
collection=mongodb_collection,
scan="full",
transformation=transformation,
limit=limit,
)
mongodb_schema = extract(extract_args)
count = mongodb_schema[mongodb_collection]["count"]
Expand Down Expand Up @@ -65,7 +71,11 @@ def mongodb_copy(source_url, target_url, limit: int = 0, progress: bool = False)
f"source={mongodb_collection_address.fullname}, target={cratedb_table_address.fullname}"
)
export_args = argparse.Namespace(
url=str(mongodb_uri), database=mongodb_database, collection=mongodb_collection, limit=limit
url=str(mongodb_uri),
database=mongodb_database,
collection=mongodb_collection,
transformation=transformation,
limit=limit,
)
buffer = export(export_args)
cr8_insert_json(infile=buffer, hosts=cratedb_address.httpuri, table=cratedb_table_address.fullname)
Expand Down
5 changes: 5 additions & 0 deletions cratedb_toolkit/io/mongodb/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@
import json
import sys
import typing as t
from pathlib import Path

from rich.console import Console

from cratedb_toolkit import __version__
from cratedb_toolkit.io.mongodb.core import export, extract, translate
from cratedb_toolkit.util.common import setup_logging

console = Console(stderr=True)
rich = console
Expand All @@ -25,6 +27,7 @@ def extract_parser(subargs):
help="Whether to fully scan the MongoDB collections or only partially.",
)
parser.add_argument("--limit", type=int, default=0, required=False, help="Limit export to N documents")
parser.add_argument("--transformation", type=Path, required=False, help="Zyp transformation file")
parser.add_argument("-o", "--out", required=False)


Expand All @@ -44,6 +47,7 @@ def export_parser(subargs):
parser.add_argument("--port", default=27017, help="MongoDB port")
parser.add_argument("--database", required=True, help="MongoDB database")
parser.add_argument("--limit", type=int, default=0, required=False, help="Limit export to N documents")
parser.add_argument("--transformation", type=Path, required=False, help="Zyp transformation file")


def get_args():
Expand Down Expand Up @@ -98,6 +102,7 @@ def export_to_stdout(args):


def main():
setup_logging()
args = get_args()
headline_prefix = "[green bold]MongoDB[/green bold] -> [blue bold]CrateDB[/blue bold] Exporter"
if args.command == "extract":
Expand Down
14 changes: 10 additions & 4 deletions cratedb_toolkit/io/mongodb/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import logging
import sys
import typing as t
from collections import OrderedDict

import pymongo
import pymongo.database
Expand All @@ -12,6 +13,7 @@

from .export import collection_to_json
from .extract import extract_schema_from_collection
from .transform import TransformationManager
from .translate import translate as translate_schema
from .util import parse_input_numbers

Expand Down Expand Up @@ -93,9 +95,12 @@ def extract(args) -> t.Dict[str, t.Any]:

rich.print(f"\nExecuting a [red bold]{'partial' if partial else 'full'}[/red bold] scan")

schemas = {}
for collection in filtered_collections:
schemas[collection] = extract_schema_from_collection(db[collection], partial, limit=args.limit)
tm = TransformationManager(path=args.transformation)
schemas = OrderedDict()
for collection_name in filtered_collections:
collection_schema = extract_schema_from_collection(db[collection_name], partial, limit=args.limit)
tm.apply_type_overrides(db.name, collection_name, collection_schema)
schemas[collection_name] = collection_schema
return schemas


Expand All @@ -119,8 +124,9 @@ def export(args) -> t.IO[bytes]:
TODO: Run on multiple collections, like `extract`.
"""
tm = TransformationManager(path=args.transformation)
buffer = io.BytesIO()
client, db = get_mongodb_client_database(args, document_class=RawBSONDocument)
collection_to_json(db[args.collection], fp=buffer, limit=args.limit)
collection_to_json(db[args.collection], fp=buffer, tm=tm, limit=args.limit)
buffer.seek(0)
return buffer
14 changes: 12 additions & 2 deletions cratedb_toolkit/io/mongodb/export.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,15 @@
import orjson as json
import pymongo.collection

from cratedb_toolkit.io.mongodb.transform import TransformationManager
from cratedb_toolkit.io.mongodb.util import sanitize_field_names


def date_converter(value):
""" """
# TODO: This assumes timestamp values represented as integer values are using millisecond precision.
# This routine provides heuristics to compute timestamp precision from the value itself.
# https://github.com/daq-tools/kotori/blob/90e815aa90/kotori/daq/storage/util.py#L87-L120
if isinstance(value, int):
return value
dt = dateparser.parse(value)
Expand Down Expand Up @@ -87,7 +92,9 @@ def convert(d):
return newdict


def collection_to_json(collection: pymongo.collection.Collection, fp: t.IO[t.Any], limit: int = 0):
def collection_to_json(
collection: pymongo.collection.Collection, fp: t.IO[t.Any], tm: TransformationManager = None, limit: int = 0
):
"""
Export a MongoDB collection's documents to standard JSON.
The output is suitable to be consumed by the `cr8` program.
Expand All @@ -101,5 +108,8 @@ def collection_to_json(collection: pymongo.collection.Collection, fp: t.IO[t.Any
for document in collection.find().limit(limit):
bson_json = bsonjs.dumps(document.raw)
json_object = json.loads(bson_json)
fp.write(json.dumps(convert(json_object)))
data = convert(json_object)
if tm:
data = tm.apply_transformations(collection.database.name, collection.name, data)
fp.write(json.dumps(data))
fp.write(b"\n")
42 changes: 42 additions & 0 deletions cratedb_toolkit/io/mongodb/transform.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import logging
import typing as t
from pathlib import Path

from jsonpointer import JsonPointer
from zyp.model.collection import CollectionAddress, CollectionTransformation
from zyp.model.project import TransformationProject

logger = logging.getLogger(__name__)


class TransformationManager:
def __init__(self, path: Path):
self.path = path
self.active = False
if not self.path:
return
if not self.path.exists():
raise FileNotFoundError(f"File does not exist: {self.path}")
self.project = TransformationProject.from_yaml(self.path.read_text())
logger.info("Transformation manager initialized. File: %s", self.path)
self.active = True

def apply_type_overrides(self, database_name: str, collection_name: str, collection_schema: t.Dict[str, t.Any]):
if not self.active:
return
address = CollectionAddress(database_name, collection_name)
transformation: CollectionTransformation = self.project.get(address)
# TODO: Also support addressing nested elements.
# Hint: Implementation already exists on another machine,
# where it has not been added to the repository. Sigh.
for rule in transformation.schema.rules:
pointer = JsonPointer(f"/document{rule.pointer}/types")
type_stats = pointer.resolve(collection_schema)
type_stats[rule.type] = 1e10

def apply_transformations(self, database_name: str, collection_name: str, data: t.Dict[str, t.Any]):
if not self.active:
return data
address = CollectionAddress(database_name, collection_name)
transformation: CollectionTransformation = self.project.get(address)
return transformation.bucket.apply(data)
5 changes: 5 additions & 0 deletions doc/io/mongodb/loader.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,3 +56,8 @@ Use `mongoimport`.
mongoimport --uri 'mongodb+srv://MYUSERNAME:[email protected]/test?retryWrites=true&w=majority'
```
:::


docker run -it --rm --network=host --volume ~/Downloads/applications_snp.quotes.json:/data/applications_snp.quotes.json mongo:7 mongoimport --db=testdrive --collection=carrier-quotes /data/applications_snp.quotes.json --jsonArray
export CRATEDB_SQLALCHEMY_URL=crate://crate@localhost:4200/testdrive/carrier-quotes
ctk load table mongodb://localhost:27017/testdrive/carrier-quotes
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ kinesis = [
"lorrystream @ git+https://github.com/daq-tools/lorrystream.git@55cf456fdcd3",
]
mongodb = [
"commons-codec[mongodb]==0.0.2",
"commons-codec[mongodb,zyp] @ git+https://github.com/daq-tools/commons-codec@zyp",
"cratedb-toolkit[io]",
"orjson<4,>=3.3.1",
"pymongo<5,>=3.10.1",
Expand Down

0 comments on commit 4814bf1

Please sign in to comment.