diff --git a/CHANGES.md b/CHANGES.md index c2eea613..a9589fa6 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -14,6 +14,7 @@ - MongoDB: Map OID types to CrateDB TEXT columns - MongoDB: Make `migr8 extract` and `migr8 export` accept the `--limit` option - MongoDB: Fix indentation in prettified SQL output of `migr8 translate` +- MongoDB: Add capability to give type hints and add transformations ## 2024/07/25 v0.0.16 - `ctk load table`: Added support for MongoDB Change Streams diff --git a/cratedb_toolkit/api/main.py b/cratedb_toolkit/api/main.py index 5ea2e5fe..93a355b3 100644 --- a/cratedb_toolkit/api/main.py +++ b/cratedb_toolkit/api/main.py @@ -4,6 +4,7 @@ import logging import typing as t from abc import abstractmethod +from pathlib import Path from yarl import URL @@ -18,7 +19,7 @@ class ClusterBase(abc.ABC): @abstractmethod - def load_table(self, resource: InputOutputResource, target: TableAddress): + def load_table(self, resource: InputOutputResource, target: TableAddress, transformation: Path): raise NotImplementedError("Child class needs to implement this method") @@ -35,7 +36,9 @@ class ManagedCluster(ClusterBase): def __post_init__(self): logger.info(f"Connecting to CrateDB Cloud Cluster: {self.cloud_id}") - def load_table(self, resource: InputOutputResource, target: t.Optional[TableAddress] = None): + def load_table( + self, resource: InputOutputResource, target: t.Optional[TableAddress] = None, transformation: Path = None + ): """ Load data into a database table on CrateDB Cloud. @@ -96,7 +99,7 @@ class StandaloneCluster(ClusterBase): address: DatabaseAddress info: t.Optional[ClusterInformation] = None - def load_table(self, resource: InputOutputResource, target: TableAddress): + def load_table(self, resource: InputOutputResource, target: TableAddress, transformation: Path = None): """ Load data into a database table on a standalone CrateDB Server. @@ -109,11 +112,11 @@ def load_table(self, resource: InputOutputResource, target: TableAddress): """ source_url = resource.url target_url = self.address.dburi + source_url_obj = URL(source_url) if source_url.startswith("influxdb"): from cratedb_toolkit.io.influxdb import influxdb_copy http_scheme = "http://" - source_url_obj = URL(source_url) if asbool(source_url_obj.query.get("ssl")): http_scheme = "https://" source_url = source_url.replace("influxdb2://", http_scheme) @@ -130,7 +133,13 @@ def load_table(self, resource: InputOutputResource, target: TableAddress): else: from cratedb_toolkit.io.mongodb.api import mongodb_copy - if not mongodb_copy(source_url, target_url, progress=True): + if not mongodb_copy( + source_url, + target_url, + transformation=transformation, + limit=int(source_url_obj.query.get("limit", 0)), + progress=True, + ): msg = "Data loading failed" logger.error(msg) raise OperationFailed(msg) diff --git a/cratedb_toolkit/io/cli.py b/cratedb_toolkit/io/cli.py index 6c866e58..6d188be0 100644 --- a/cratedb_toolkit/io/cli.py +++ b/cratedb_toolkit/io/cli.py @@ -1,4 +1,5 @@ import logging +from pathlib import Path import click from click_aliases import ClickAliasedGroup @@ -35,6 +36,7 @@ def cli(ctx: click.Context, verbose: bool, debug: bool): @click.option("--table", envvar="CRATEDB_TABLE", type=str, required=False, help="Table where to import the data") @click.option("--format", "format_", type=str, required=False, help="File format of the import resource") @click.option("--compression", type=str, required=False, help="Compression format of the import resource") +@click.option("--transformation", type=Path, required=False, help="Path to Zyp transformation file") @click.pass_context def load_table( ctx: click.Context, @@ -46,6 +48,7 @@ def load_table( table: str, format_: str, compression: str, + transformation: Path, ): """ Import data into CrateDB and CrateDB Cloud clusters. @@ -82,4 +85,4 @@ def load_table( cluster = StandaloneCluster(address=address) else: raise NotImplementedError("Unable to select backend") - return cluster.load_table(resource=resource, target=target) + return cluster.load_table(resource=resource, target=target, transformation=transformation) diff --git a/cratedb_toolkit/io/mongodb/api.py b/cratedb_toolkit/io/mongodb/api.py index cd22ad45..c4b2d202 100644 --- a/cratedb_toolkit/io/mongodb/api.py +++ b/cratedb_toolkit/io/mongodb/api.py @@ -1,5 +1,6 @@ import argparse import logging +from pathlib import Path from cratedb_toolkit.io.mongodb.cdc import MongoDBCDCRelayCrateDB from cratedb_toolkit.io.mongodb.core import export, extract, translate @@ -10,7 +11,7 @@ logger = logging.getLogger(__name__) -def mongodb_copy(source_url, target_url, limit: int = 0, progress: bool = False): +def mongodb_copy(source_url, target_url, transformation: Path = None, limit: int = 0, progress: bool = False): """ Synopsis -------- @@ -36,7 +37,12 @@ def mongodb_copy(source_url, target_url, limit: int = 0, progress: bool = False) # 1. Extract schema from MongoDB collection. logger.info(f"Extracting schema from MongoDB: {mongodb_database}.{mongodb_collection}") extract_args = argparse.Namespace( - url=str(mongodb_uri), database=mongodb_database, collection=mongodb_collection, scan="full", limit=limit + url=str(mongodb_uri), + database=mongodb_database, + collection=mongodb_collection, + scan="full", + transformation=transformation, + limit=limit, ) mongodb_schema = extract(extract_args) count = mongodb_schema[mongodb_collection]["count"] @@ -65,7 +71,11 @@ def mongodb_copy(source_url, target_url, limit: int = 0, progress: bool = False) f"source={mongodb_collection_address.fullname}, target={cratedb_table_address.fullname}" ) export_args = argparse.Namespace( - url=str(mongodb_uri), database=mongodb_database, collection=mongodb_collection, limit=limit + url=str(mongodb_uri), + database=mongodb_database, + collection=mongodb_collection, + transformation=transformation, + limit=limit, ) buffer = export(export_args) cr8_insert_json(infile=buffer, hosts=cratedb_address.httpuri, table=cratedb_table_address.fullname) diff --git a/cratedb_toolkit/io/mongodb/cli.py b/cratedb_toolkit/io/mongodb/cli.py index 563af816..48cf27c6 100644 --- a/cratedb_toolkit/io/mongodb/cli.py +++ b/cratedb_toolkit/io/mongodb/cli.py @@ -2,11 +2,13 @@ import json import sys import typing as t +from pathlib import Path from rich.console import Console from cratedb_toolkit import __version__ from cratedb_toolkit.io.mongodb.core import export, extract, translate +from cratedb_toolkit.util.common import setup_logging console = Console(stderr=True) rich = console @@ -25,6 +27,7 @@ def extract_parser(subargs): help="Whether to fully scan the MongoDB collections or only partially.", ) parser.add_argument("--limit", type=int, default=0, required=False, help="Limit export to N documents") + parser.add_argument("--transformation", type=Path, required=False, help="Zyp transformation file") parser.add_argument("-o", "--out", required=False) @@ -44,6 +47,7 @@ def export_parser(subargs): parser.add_argument("--port", default=27017, help="MongoDB port") parser.add_argument("--database", required=True, help="MongoDB database") parser.add_argument("--limit", type=int, default=0, required=False, help="Limit export to N documents") + parser.add_argument("--transformation", type=Path, required=False, help="Zyp transformation file") def get_args(): @@ -98,6 +102,7 @@ def export_to_stdout(args): def main(): + setup_logging() args = get_args() headline_prefix = "[green bold]MongoDB[/green bold] -> [blue bold]CrateDB[/blue bold] Exporter" if args.command == "extract": diff --git a/cratedb_toolkit/io/mongodb/core.py b/cratedb_toolkit/io/mongodb/core.py index fae1f8ad..ca0e0772 100644 --- a/cratedb_toolkit/io/mongodb/core.py +++ b/cratedb_toolkit/io/mongodb/core.py @@ -2,6 +2,7 @@ import logging import sys import typing as t +from collections import OrderedDict import pymongo import pymongo.database @@ -12,6 +13,7 @@ from .export import collection_to_json from .extract import extract_schema_from_collection +from .transform import TransformationManager from .translate import translate as translate_schema from .util import parse_input_numbers @@ -38,7 +40,7 @@ def gather_collections(database) -> t.List[str]: rich.print(tbl) - rich.print("\nCollections to exclude: (eg: '0 1 2', '0, 1, 2', '0-2'). Leave empty for using all connections.") + rich.print("\nCollections to exclude: (eg: '0 1 2', '0, 1, 2', '0-2'). Leave empty to use all collections.") sys.stderr.write("> ") collections_to_ignore = parse_input_numbers(input()) @@ -93,9 +95,12 @@ def extract(args) -> t.Dict[str, t.Any]: rich.print(f"\nExecuting a [red bold]{'partial' if partial else 'full'}[/red bold] scan") - schemas = {} - for collection in filtered_collections: - schemas[collection] = extract_schema_from_collection(db[collection], partial, limit=args.limit) + tm = TransformationManager(path=args.transformation) + schemas = OrderedDict() + for collection_name in filtered_collections: + collection_schema = extract_schema_from_collection(db[collection_name], partial, limit=args.limit) + tm.apply_type_overrides(db.name, collection_name, collection_schema) + schemas[collection_name] = collection_schema return schemas @@ -119,8 +124,9 @@ def export(args) -> t.IO[bytes]: TODO: Run on multiple collections, like `extract`. """ + tm = TransformationManager(path=args.transformation) buffer = io.BytesIO() client, db = get_mongodb_client_database(args, document_class=RawBSONDocument) - collection_to_json(db[args.collection], fp=buffer, limit=args.limit) + collection_to_json(db[args.collection], fp=buffer, tm=tm, limit=args.limit) buffer.seek(0) return buffer diff --git a/cratedb_toolkit/io/mongodb/export.py b/cratedb_toolkit/io/mongodb/export.py index c4ca0257..87cafc15 100644 --- a/cratedb_toolkit/io/mongodb/export.py +++ b/cratedb_toolkit/io/mongodb/export.py @@ -32,6 +32,7 @@ import orjson as json import pymongo.collection +from cratedb_toolkit.io.mongodb.transform import TransformationManager from cratedb_toolkit.io.mongodb.util import sanitize_field_names @@ -87,7 +88,9 @@ def convert(d): return newdict -def collection_to_json(collection: pymongo.collection.Collection, fp: t.IO[t.Any], limit: int = 0): +def collection_to_json( + collection: pymongo.collection.Collection, fp: t.IO[t.Any], tm: TransformationManager = None, limit: int = 0 +): """ Export a MongoDB collection's documents to standard JSON. The output is suitable to be consumed by the `cr8` program. @@ -101,5 +104,8 @@ def collection_to_json(collection: pymongo.collection.Collection, fp: t.IO[t.Any for document in collection.find().limit(limit): bson_json = bsonjs.dumps(document.raw) json_object = json.loads(bson_json) - fp.write(json.dumps(convert(json_object))) + data = convert(json_object) + if tm: + data = tm.apply_transformations(collection.database.name, collection.name, data) + fp.write(json.dumps(data)) fp.write(b"\n") diff --git a/cratedb_toolkit/io/mongodb/transform.py b/cratedb_toolkit/io/mongodb/transform.py new file mode 100644 index 00000000..84047037 --- /dev/null +++ b/cratedb_toolkit/io/mongodb/transform.py @@ -0,0 +1,49 @@ +import logging +import typing as t +from pathlib import Path + +from jsonpointer import JsonPointer +from zyp.model.collection import CollectionAddress, CollectionTransformation +from zyp.model.project import TransformationProject + +logger = logging.getLogger(__name__) + + +class TransformationManager: + def __init__(self, path: Path): + self.path = path + self.active = False + if not self.path: + return + if not self.path.exists(): + raise FileNotFoundError(f"File does not exist: {self.path}") + self.project = TransformationProject.from_yaml(self.path.read_text()) + logger.info("Transformation manager initialized. File: %s", self.path) + self.active = True + + def apply_type_overrides(self, database_name: str, collection_name: str, collection_schema: t.Dict[str, t.Any]): + if not self.active: + return + address = CollectionAddress(database_name, collection_name) + try: + transformation: CollectionTransformation = self.project.get(address) + except KeyError: + return + logger.info(f"Applying type overrides for {database_name}/{collection_name}") + # TODO: Also support addressing nested elements. + # Hint: Implementation already exists on another machine, + # where it has not been added to the repository. Sigh. + for rule in transformation.schema.rules: + pointer = JsonPointer(f"/document{rule.pointer}/types") + type_stats = pointer.resolve(collection_schema) + type_stats[rule.type] = 1e10 + + def apply_transformations(self, database_name: str, collection_name: str, data: t.Dict[str, t.Any]): + if not self.active: + return data + address = CollectionAddress(database_name, collection_name) + try: + transformation: CollectionTransformation = self.project.get(address) + except KeyError: + return data + return transformation.bucket.apply(data) diff --git a/doc/io/mongodb/loader.md b/doc/io/mongodb/loader.md index 369dd5a2..7242687a 100644 --- a/doc/io/mongodb/loader.md +++ b/doc/io/mongodb/loader.md @@ -49,6 +49,13 @@ ctk shell --command "SELECT * FROM testdrive.demo;" ctk show table "testdrive.demo" ``` +## Using Zyp transformations +You can use [Zyp transformations] to change the shape of the data while being +transferred. In order to add it to the pipeline, use the `--transformation` +command line option on the `migr8 extract` and `migr8 export` commands. + +You can find an example file at `examples/zyp-transformation.yaml`. + :::{todo} Use `mongoimport`. @@ -56,3 +63,6 @@ Use `mongoimport`. mongoimport --uri 'mongodb+srv://MYUSERNAME:SECRETPASSWORD@mycluster-ABCDE.azure.mongodb.net/test?retryWrites=true&w=majority' ``` ::: + + +[Zyp transformations]: https://commons-codec.readthedocs.io/zyp/index.html diff --git a/doc/io/mongodb/migr8.md b/doc/io/mongodb/migr8.md index 65b19e12..377820f6 100644 --- a/doc/io/mongodb/migr8.md +++ b/doc/io/mongodb/migr8.md @@ -222,4 +222,13 @@ Alternatively, use [cr8] to directly write the MongoDB collection into a CrateDB cr8 insert-json --hosts localhost:4200 --table test +### Using Zyp transformations +You can use [Zyp transformations] to change the shape of the data while being +transferred. In order to add it to the pipeline, use the `--transformation` +command line option on the `migr8 extract` and `migr8 export` commands. + +You can find an example file at `examples/zyp-transformation.yaml`. + + [cr8]: https://github.com/mfussenegger/cr8 +[Zyp transformations]: https://commons-codec.readthedocs.io/zyp/index.html diff --git a/examples/zyp-transformation.yaml b/examples/zyp-transformation.yaml new file mode 100644 index 00000000..183caff0 --- /dev/null +++ b/examples/zyp-transformation.yaml @@ -0,0 +1,20 @@ +meta: + type: zyp-project + version: 1 +collections: +- address: + container: testdrive-db + name: foobar-collection + schema: + rules: + - pointer: /some_date + type: DATETIME + - pointer: /another_date + type: DATETIME + bucket: + values: + rules: + - pointer: /some_date + transformer: to_unixtime + - pointer: /another_date + transformer: to_unixtime diff --git a/pyproject.toml b/pyproject.toml index 081130e6..9c8ca002 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -154,7 +154,7 @@ kinesis = [ "lorrystream @ git+https://github.com/daq-tools/lorrystream.git@55cf456fdcd3", ] mongodb = [ - "commons-codec[mongodb]==0.0.3", + "commons-codec[mongodb,zyp]==0.0.4", "cratedb-toolkit[io]", "orjson<4,>=3.3.1", "pymongo<5,>=3.10.1",