Skip to content

Commit

Permalink
MongoDB: Add capability to give type hints and add transformations
Browse files Browse the repository at this point in the history
  • Loading branch information
amotl committed Aug 14, 2024
1 parent 135188b commit 85afd95
Show file tree
Hide file tree
Showing 12 changed files with 145 additions and 17 deletions.
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
- MongoDB: Map OID types to CrateDB TEXT columns
- MongoDB: Make `migr8 extract` and `migr8 export` accept the `--limit` option
- MongoDB: Fix indentation in prettified SQL output of `migr8 translate`
- MongoDB: Add capability to give type hints and add transformations

## 2024/07/25 v0.0.16
- `ctk load table`: Added support for MongoDB Change Streams
Expand Down
19 changes: 14 additions & 5 deletions cratedb_toolkit/api/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import logging
import typing as t
from abc import abstractmethod
from pathlib import Path

from yarl import URL

Expand All @@ -18,7 +19,7 @@

class ClusterBase(abc.ABC):
@abstractmethod
def load_table(self, resource: InputOutputResource, target: TableAddress):
def load_table(self, resource: InputOutputResource, target: TableAddress, transformation: Path):
raise NotImplementedError("Child class needs to implement this method")


Expand All @@ -35,7 +36,9 @@ class ManagedCluster(ClusterBase):
def __post_init__(self):
logger.info(f"Connecting to CrateDB Cloud Cluster: {self.cloud_id}")

def load_table(self, resource: InputOutputResource, target: t.Optional[TableAddress] = None):
def load_table(
self, resource: InputOutputResource, target: t.Optional[TableAddress] = None, transformation: Path = None
):
"""
Load data into a database table on CrateDB Cloud.
Expand Down Expand Up @@ -96,7 +99,7 @@ class StandaloneCluster(ClusterBase):
address: DatabaseAddress
info: t.Optional[ClusterInformation] = None

def load_table(self, resource: InputOutputResource, target: TableAddress):
def load_table(self, resource: InputOutputResource, target: TableAddress, transformation: Path = None):
"""
Load data into a database table on a standalone CrateDB Server.
Expand All @@ -109,11 +112,11 @@ def load_table(self, resource: InputOutputResource, target: TableAddress):
"""
source_url = resource.url
target_url = self.address.dburi
source_url_obj = URL(source_url)
if source_url.startswith("influxdb"):
from cratedb_toolkit.io.influxdb import influxdb_copy

http_scheme = "http://"
source_url_obj = URL(source_url)
if asbool(source_url_obj.query.get("ssl")):
http_scheme = "https://"
source_url = source_url.replace("influxdb2://", http_scheme)
Expand All @@ -130,7 +133,13 @@ def load_table(self, resource: InputOutputResource, target: TableAddress):
else:
from cratedb_toolkit.io.mongodb.api import mongodb_copy

if not mongodb_copy(source_url, target_url, progress=True):
if not mongodb_copy(
source_url,
target_url,
transformation=transformation,
limit=int(source_url_obj.query.get("limit", 0)),
progress=True,
):
msg = "Data loading failed"
logger.error(msg)
raise OperationFailed(msg)
Expand Down
5 changes: 4 additions & 1 deletion cratedb_toolkit/io/cli.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
from pathlib import Path

import click
from click_aliases import ClickAliasedGroup
Expand Down Expand Up @@ -35,6 +36,7 @@ def cli(ctx: click.Context, verbose: bool, debug: bool):
@click.option("--table", envvar="CRATEDB_TABLE", type=str, required=False, help="Table where to import the data")
@click.option("--format", "format_", type=str, required=False, help="File format of the import resource")
@click.option("--compression", type=str, required=False, help="Compression format of the import resource")
@click.option("--transformation", type=Path, required=False, help="Path to Zyp transformation file")
@click.pass_context
def load_table(
ctx: click.Context,
Expand All @@ -46,6 +48,7 @@ def load_table(
table: str,
format_: str,
compression: str,
transformation: Path,
):
"""
Import data into CrateDB and CrateDB Cloud clusters.
Expand Down Expand Up @@ -82,4 +85,4 @@ def load_table(
cluster = StandaloneCluster(address=address)
else:
raise NotImplementedError("Unable to select backend")
return cluster.load_table(resource=resource, target=target)
return cluster.load_table(resource=resource, target=target, transformation=transformation)
16 changes: 13 additions & 3 deletions cratedb_toolkit/io/mongodb/api.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import argparse
import logging
from pathlib import Path

from cratedb_toolkit.io.mongodb.cdc import MongoDBCDCRelayCrateDB
from cratedb_toolkit.io.mongodb.core import export, extract, translate
Expand All @@ -10,7 +11,7 @@
logger = logging.getLogger(__name__)


def mongodb_copy(source_url, target_url, limit: int = 0, progress: bool = False):
def mongodb_copy(source_url, target_url, transformation: Path = None, limit: int = 0, progress: bool = False):
"""
Synopsis
--------
Expand All @@ -36,7 +37,12 @@ def mongodb_copy(source_url, target_url, limit: int = 0, progress: bool = False)
# 1. Extract schema from MongoDB collection.
logger.info(f"Extracting schema from MongoDB: {mongodb_database}.{mongodb_collection}")
extract_args = argparse.Namespace(
url=str(mongodb_uri), database=mongodb_database, collection=mongodb_collection, scan="full", limit=limit
url=str(mongodb_uri),
database=mongodb_database,
collection=mongodb_collection,
scan="full",
transformation=transformation,
limit=limit,
)
mongodb_schema = extract(extract_args)
count = mongodb_schema[mongodb_collection]["count"]
Expand Down Expand Up @@ -65,7 +71,11 @@ def mongodb_copy(source_url, target_url, limit: int = 0, progress: bool = False)
f"source={mongodb_collection_address.fullname}, target={cratedb_table_address.fullname}"
)
export_args = argparse.Namespace(
url=str(mongodb_uri), database=mongodb_database, collection=mongodb_collection, limit=limit
url=str(mongodb_uri),
database=mongodb_database,
collection=mongodb_collection,
transformation=transformation,
limit=limit,
)
buffer = export(export_args)
cr8_insert_json(infile=buffer, hosts=cratedb_address.httpuri, table=cratedb_table_address.fullname)
Expand Down
5 changes: 5 additions & 0 deletions cratedb_toolkit/io/mongodb/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@
import json
import sys
import typing as t
from pathlib import Path

from rich.console import Console

from cratedb_toolkit import __version__
from cratedb_toolkit.io.mongodb.core import export, extract, translate
from cratedb_toolkit.util.common import setup_logging

console = Console(stderr=True)
rich = console
Expand All @@ -25,6 +27,7 @@ def extract_parser(subargs):
help="Whether to fully scan the MongoDB collections or only partially.",
)
parser.add_argument("--limit", type=int, default=0, required=False, help="Limit export to N documents")
parser.add_argument("--transformation", type=Path, required=False, help="Zyp transformation file")
parser.add_argument("-o", "--out", required=False)


Expand All @@ -44,6 +47,7 @@ def export_parser(subargs):
parser.add_argument("--port", default=27017, help="MongoDB port")
parser.add_argument("--database", required=True, help="MongoDB database")
parser.add_argument("--limit", type=int, default=0, required=False, help="Limit export to N documents")
parser.add_argument("--transformation", type=Path, required=False, help="Zyp transformation file")


def get_args():
Expand Down Expand Up @@ -98,6 +102,7 @@ def export_to_stdout(args):


def main():
setup_logging()
args = get_args()
headline_prefix = "[green bold]MongoDB[/green bold] -> [blue bold]CrateDB[/blue bold] Exporter"
if args.command == "extract":
Expand Down
16 changes: 11 additions & 5 deletions cratedb_toolkit/io/mongodb/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import logging
import sys
import typing as t
from collections import OrderedDict

import pymongo
import pymongo.database
Expand All @@ -12,6 +13,7 @@

from .export import collection_to_json
from .extract import extract_schema_from_collection
from .transform import TransformationManager
from .translate import translate as translate_schema
from .util import parse_input_numbers

Expand All @@ -38,7 +40,7 @@ def gather_collections(database) -> t.List[str]:

rich.print(tbl)

rich.print("\nCollections to exclude: (eg: '0 1 2', '0, 1, 2', '0-2'). Leave empty for using all connections.")
rich.print("\nCollections to exclude: (eg: '0 1 2', '0, 1, 2', '0-2'). Leave empty to use all collections.")

sys.stderr.write("> ")
collections_to_ignore = parse_input_numbers(input())
Expand Down Expand Up @@ -93,9 +95,12 @@ def extract(args) -> t.Dict[str, t.Any]:

rich.print(f"\nExecuting a [red bold]{'partial' if partial else 'full'}[/red bold] scan")

schemas = {}
for collection in filtered_collections:
schemas[collection] = extract_schema_from_collection(db[collection], partial, limit=args.limit)
tm = TransformationManager(path=args.transformation)
schemas = OrderedDict()
for collection_name in filtered_collections:
collection_schema = extract_schema_from_collection(db[collection_name], partial, limit=args.limit)
tm.apply_type_overrides(db.name, collection_name, collection_schema)
schemas[collection_name] = collection_schema
return schemas


Expand All @@ -119,8 +124,9 @@ def export(args) -> t.IO[bytes]:
TODO: Run on multiple collections, like `extract`.
"""
tm = TransformationManager(path=args.transformation)
buffer = io.BytesIO()
client, db = get_mongodb_client_database(args, document_class=RawBSONDocument)
collection_to_json(db[args.collection], fp=buffer, limit=args.limit)
collection_to_json(db[args.collection], fp=buffer, tm=tm, limit=args.limit)
buffer.seek(0)
return buffer
10 changes: 8 additions & 2 deletions cratedb_toolkit/io/mongodb/export.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import orjson as json
import pymongo.collection

from cratedb_toolkit.io.mongodb.transform import TransformationManager
from cratedb_toolkit.io.mongodb.util import sanitize_field_names


Expand Down Expand Up @@ -87,7 +88,9 @@ def convert(d):
return newdict


def collection_to_json(collection: pymongo.collection.Collection, fp: t.IO[t.Any], limit: int = 0):
def collection_to_json(
collection: pymongo.collection.Collection, fp: t.IO[t.Any], tm: TransformationManager = None, limit: int = 0
):
"""
Export a MongoDB collection's documents to standard JSON.
The output is suitable to be consumed by the `cr8` program.
Expand All @@ -101,5 +104,8 @@ def collection_to_json(collection: pymongo.collection.Collection, fp: t.IO[t.Any
for document in collection.find().limit(limit):
bson_json = bsonjs.dumps(document.raw)
json_object = json.loads(bson_json)
fp.write(json.dumps(convert(json_object)))
data = convert(json_object)
if tm:
data = tm.apply_transformations(collection.database.name, collection.name, data)
fp.write(json.dumps(data))
fp.write(b"\n")
49 changes: 49 additions & 0 deletions cratedb_toolkit/io/mongodb/transform.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import logging
import typing as t
from pathlib import Path

from jsonpointer import JsonPointer
from zyp.model.collection import CollectionAddress, CollectionTransformation
from zyp.model.project import TransformationProject

logger = logging.getLogger(__name__)


class TransformationManager:
def __init__(self, path: Path):
self.path = path
self.active = False
if not self.path:
return
if not self.path.exists():
raise FileNotFoundError(f"File does not exist: {self.path}")
self.project = TransformationProject.from_yaml(self.path.read_text())
logger.info("Transformation manager initialized. File: %s", self.path)
self.active = True

def apply_type_overrides(self, database_name: str, collection_name: str, collection_schema: t.Dict[str, t.Any]):
if not self.active:
return
address = CollectionAddress(database_name, collection_name)
try:
transformation: CollectionTransformation = self.project.get(address)
except KeyError:
return
logger.info(f"Applying type overrides for {database_name}/{collection_name}")
# TODO: Also support addressing nested elements.
# Hint: Implementation already exists on another machine,
# where it has not been added to the repository. Sigh.
for rule in transformation.schema.rules:
pointer = JsonPointer(f"/document{rule.pointer}/types")
type_stats = pointer.resolve(collection_schema)
type_stats[rule.type] = 1e10

def apply_transformations(self, database_name: str, collection_name: str, data: t.Dict[str, t.Any]):
if not self.active:
return data
address = CollectionAddress(database_name, collection_name)
try:
transformation: CollectionTransformation = self.project.get(address)
except KeyError:
return data
return transformation.bucket.apply(data)
10 changes: 10 additions & 0 deletions doc/io/mongodb/loader.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,20 @@ ctk shell --command "SELECT * FROM testdrive.demo;"
ctk show table "testdrive.demo"
```

## Using Zyp transformations
You can use [Zyp transformations] to change the shape of the data while being
transferred. In order to add it to the pipeline, use the `--transformation`
command line option on the `migr8 extract` and `migr8 export` commands.

You can find an example file at `examples/zyp-transformation.yaml`.


:::{todo}
Use `mongoimport`.
```shell
mongoimport --uri 'mongodb+srv://MYUSERNAME:[email protected]/test?retryWrites=true&w=majority'
```
:::


[Zyp transformations]: https://commons-codec.readthedocs.io/zyp/index.html
9 changes: 9 additions & 0 deletions doc/io/mongodb/migr8.md
Original file line number Diff line number Diff line change
Expand Up @@ -222,4 +222,13 @@ Alternatively, use [cr8] to directly write the MongoDB collection into a CrateDB
cr8 insert-json --hosts localhost:4200 --table test


### Using Zyp transformations
You can use [Zyp transformations] to change the shape of the data while being
transferred. In order to add it to the pipeline, use the `--transformation`
command line option on the `migr8 extract` and `migr8 export` commands.

You can find an example file at `examples/zyp-transformation.yaml`.


[cr8]: https://github.com/mfussenegger/cr8
[Zyp transformations]: https://commons-codec.readthedocs.io/zyp/index.html
20 changes: 20 additions & 0 deletions examples/zyp-transformation.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
meta:
type: zyp-project
version: 1
collections:
- address:
container: testdrive-db
name: foobar-collection
schema:
rules:
- pointer: /some_date
type: DATETIME
- pointer: /another_date
type: DATETIME
bucket:
values:
rules:
- pointer: /some_date
transformer: to_unixtime
- pointer: /another_date
transformer: to_unixtime
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ kinesis = [
"lorrystream @ git+https://github.com/daq-tools/lorrystream.git@55cf456fdcd3",
]
mongodb = [
"commons-codec[mongodb]==0.0.3",
"commons-codec[mongodb,zyp]==0.0.4",
"cratedb-toolkit[io]",
"orjson<4,>=3.3.1",
"pymongo<5,>=3.10.1",
Expand Down

0 comments on commit 85afd95

Please sign in to comment.