Skip to content

Commit

Permalink
Use black, the canonical python code formatter
Browse files Browse the repository at this point in the history
  • Loading branch information
xiashang0624 authored and paolomorandini committed Jun 13, 2022
1 parent 9f04625 commit a1b0911
Show file tree
Hide file tree
Showing 13 changed files with 319 additions and 156 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,5 @@ build
*#
*.pyc
.gradle
.idea
venv
20 changes: 20 additions & 0 deletions client/CONTRIBUTING.md
Original file line number Diff line number Diff line change
@@ -1 +1,21 @@
# Contributing to Batch SQL Translation Client

## Install

Install the client dependencies (including dev dependencies) in a
virtualenv.

```shell
git clone [email protected]:google/dwh-migration-tools.git
cd dwh-migration-tools
python3 -m venv venv
source venv/bin/activate
pip install -r client/requirements.txt
pip install -r client/requirements_dev.txt
```

## Before committing

```shell
cd client && make format
```
6 changes: 6 additions & 0 deletions client/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
SHELL := /bin/bash -ex

.PHONY: format
format:
black .
isort .
126 changes: 80 additions & 46 deletions client/batch_sql_translator.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,21 +12,22 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import shutil
import config_parser
import gcs_util
import os
import shutil
import sys
import uuid
import time
from typing import Optional

import uuid
from datetime import datetime
from os.path import dirname, join
from typing import Optional

from google.cloud import bigquery_migration_v2

import config_parser
import gcs_util
from config_parser import TranslationConfig
from macro_processor import MacroProcessor
from object_mapping_parser import ObjectMappingParser
from config_parser import TranslationConfig
from google.cloud import bigquery_migration_v2


class BatchSqlTranslator:
Expand All @@ -35,19 +36,26 @@ class BatchSqlTranslator:
"""

def __init__(self, config: TranslationConfig, input_directory: str, output_directory: str, preprocessor: Optional[MacroProcessor] = None, object_name_mapping_list: Optional[ObjectMappingParser] = None):
def __init__(
self,
config: TranslationConfig,
input_directory: str,
output_directory: str,
preprocessor: Optional[MacroProcessor] = None,
object_name_mapping_list: Optional[ObjectMappingParser] = None,
):
self.config = config
self._input_directory = input_directory
self._output_directory = output_directory
self.client = bigquery_migration_v2.MigrationServiceClient()
self.gcs_path = None
self.preprocessor = preprocessor # May be None
self.preprocessor = preprocessor # May be None
self._object_name_mapping_list = object_name_mapping_list
self.tmp_dir = join(dirname(self._input_directory), self.__TMP_DIR_NAME)

__JOB_FINISHED_STATES = {
bigquery_migration_v2.types.MigrationWorkflow.State.COMPLETED,
bigquery_migration_v2.types.MigrationWorkflow.State.PAUSED
bigquery_migration_v2.types.MigrationWorkflow.State.PAUSED,
}

# The name of a hidden directory that stores temporary processed files if user defines a macro replacement map.
Expand All @@ -69,44 +77,61 @@ def start_translation(self):
self.preprocessor.preprocess(self._input_directory, local_input_dir)

self.gcs_path = self.__generate_gcs_path()
gcs_input_path = join("gs://%s" % self.config.gcs_bucket, self.gcs_path, "input")
gcs_output_path = join("gs://%s" % self.config.gcs_bucket, self.gcs_path, "output")
gcs_input_path = join(
"gs://%s" % self.config.gcs_bucket, self.gcs_path, "input"
)
gcs_output_path = join(
"gs://%s" % self.config.gcs_bucket, self.gcs_path, "output"
)
print("\nUploading inputs to gcs ...")
gcs_util.upload_directory(local_input_dir, self.config.gcs_bucket, join(self.gcs_path, "input"))
gcs_util.upload_directory(
local_input_dir, self.config.gcs_bucket, join(self.gcs_path, "input")
)
print("\nStart translation job...")
job_name = self.create_migration_workflow(gcs_input_path, gcs_output_path)
self.__wait_until_job_finished(job_name)
print("\nDownloading outputs...")
gcs_util.download_directory(local_output_dir, self.config.gcs_bucket, join(self.gcs_path, "output"))
gcs_util.download_directory(
local_output_dir, self.config.gcs_bucket, join(self.gcs_path, "output")
)

if self.preprocessor is not None:
print("\nStart post-processing by reverting the macros substitution...")
self.preprocessor.postprocess(local_output_dir, self._output_directory)

print("Finished postprocessing. The outputs are in %s\n" % self._output_directory)
print(
"Finished postprocessing. The outputs are in %s\n" % self._output_directory
)

if self.config.clean_up_tmp_files and os.path.exists(self.tmp_dir):
print("Cleaning up tmp files under \"%s\"..." % self.tmp_dir)
print('Cleaning up tmp files under "%s"...' % self.tmp_dir)
shutil.rmtree(self.tmp_dir)
print("Finished cleanup.")

print("\nThe job finished successfully!")
print(
"To view the job details, please go to the link: %s" % self.__get_ui_link())
print("Thank you for using BigQuery SQL Translation Service with the Python exemplary client!")
"To view the job details, please go to the link: %s" % self.__get_ui_link()
)
print(
"Thank you for using BigQuery SQL Translation Service with the Python exemplary client!"
)

def __generate_gcs_path(self) -> str:
"""Generates a gcs_path in the format of {translation_type}-{yyyy-mm-dd}-xxxx-xxxx-xxx-xxxx-xxxxxx.
The suffix is a random generated uuid string.
"""
return ("%s-%s-%s" % (self.config.translation_type, datetime.now().strftime('%Y-%m-%d')
, str(uuid.uuid4())))
return "%s-%s-%s" % (
self.config.translation_type,
datetime.now().strftime("%Y-%m-%d"),
str(uuid.uuid4()),
)

def __get_ui_link(self) -> str:
"""Returns the http link to the offline translation page for this project.
"""
return ("https://console.cloud.google.com/bigquery/migrations/batch-translation?project=%s" %
self.config.project_number)
"""Returns the http link to the offline translation page for this project."""
return (
"https://console.cloud.google.com/bigquery/migrations/batch-translation?project=%s"
% self.config.project_number
)

def __wait_until_job_finished(self, workflow_id: str, length_seconds: int = 600):
"""Waits until the workflow finishes by calling the Migration Service API every 5 seconds.
Expand All @@ -118,13 +143,18 @@ def __wait_until_job_finished(self, workflow_id: str, length_seconds: int = 600)
processing_seconds = 0
while processing_seconds < length_seconds:
time.sleep(5)
processing_seconds = int(time.time() - start_time);
processing_seconds = int(time.time() - start_time)
job_status = self.get_migration_workflow(workflow_id)
print("Translation job status is %s. Processing time: %s seconds" % (job_status.state, processing_seconds))
print(
"Translation job status is %s. Processing time: %s seconds"
% (job_status.state, processing_seconds)
)
if job_status.state in self.__JOB_FINISHED_STATES:
return
print("The job is still running after %d seconds. Please go to the UI page and download the outputs manually %s"
% (processing_seconds, self.__get_ui_link()))
print(
"The job is still running after %d seconds. Please go to the UI page and download the outputs manually %s"
% (processing_seconds, self.__get_ui_link())
)
sys.exit()

def list_migration_workflows(self, num_jobs=5):
Expand All @@ -134,7 +164,8 @@ def list_migration_workflows(self, num_jobs=5):
"""
print("List migration workflows for project %s" % self.config.project_number)
request = bigquery_migration_v2.ListMigrationWorkflowsRequest(
parent="projects/%s/locations/%s" % (self.config.project_number, self.config.location),
parent="projects/%s/locations/%s"
% (self.config.project_number, self.config.location),
)

page_result = self.client.list_migration_workflows(request=request)
Expand All @@ -144,8 +175,7 @@ def list_migration_workflows(self, num_jobs=5):
print(response)

def get_migration_workflow(self, job_name):
"""Starts a get API call for a migration workflow and print out the status on terminal.
"""
"""Starts a get API call for a migration workflow and print out the status on terminal."""
print("Get migration workflows for %s" % job_name)
request = bigquery_migration_v2.GetMigrationWorkflowRequest(
name=job_name,
Expand All @@ -154,40 +184,43 @@ def get_migration_workflow(self, job_name):
page_result = self.client.get_migration_workflow(request=request)
return page_result

def create_migration_workflow(self, gcs_input_path: str, gcs_output_path: str) -> str:
"""Creates a migration workflow and returns the name of the workflow.
"""
target_dialect = bigquery_migration_v2.Dialect();
target_dialect.bigquery_dialect = bigquery_migration_v2.BigQueryDialect();
def create_migration_workflow(
self, gcs_input_path: str, gcs_output_path: str
) -> str:
"""Creates a migration workflow and returns the name of the workflow."""
target_dialect = bigquery_migration_v2.Dialect()
target_dialect.bigquery_dialect = bigquery_migration_v2.BigQueryDialect()

translation_config = bigquery_migration_v2.TranslationConfigDetails(
gcs_source_path=gcs_input_path,
gcs_target_path=gcs_output_path,
source_dialect=self.get_input_dialect(),
target_dialect=target_dialect
target_dialect=target_dialect,
)

if self.config.default_database or self.config.schema_search_path:
translation_config.source_env = bigquery_migration_v2.types.SourceEnv(
default_database=self.config.default_database,
schema_search_path=self.config.schema_search_path
schema_search_path=self.config.schema_search_path,
)

if self._object_name_mapping_list:
translation_config.name_mapping_list = self._object_name_mapping_list

migration_task = bigquery_migration_v2.MigrationTask(
type=self.config.translation_type,
translation_config_details=translation_config
translation_config_details=translation_config,
)

workflow = bigquery_migration_v2.MigrationWorkflow(
display_name="%s-cli-%s" % (self.config.translation_type, datetime.now().strftime('%m-%d-%H:%M'))
display_name="%s-cli-%s"
% (self.config.translation_type, datetime.now().strftime("%m-%d-%H:%M"))
)

workflow.tasks["translation-task"] = migration_task
request = bigquery_migration_v2.CreateMigrationWorkflowRequest(
parent="projects/%s/locations/%s" % (self.config.project_number, self.config.location),
parent="projects/%s/locations/%s"
% (self.config.project_number, self.config.location),
migration_workflow=workflow,
)

Expand All @@ -196,15 +229,16 @@ def create_migration_workflow(self, gcs_input_path: str, gcs_output_path: str) -
return response.name

def get_input_dialect(self) -> bigquery_migration_v2.Dialect:
"""Returns the input dialect proto based on the translation type in the config.
"""
"""Returns the input dialect proto based on the translation type in the config."""
dialect = bigquery_migration_v2.Dialect()
if self.config.translation_type == config_parser.TERADATA2BQ:
dialect.teradata_dialect = bigquery_migration_v2.TeradataDialect(
mode=bigquery_migration_v2.TeradataDialect.Mode.SQL)
mode=bigquery_migration_v2.TeradataDialect.Mode.SQL
)
elif self.config.translation_type == config_parser.BTEQ2BQ:
dialect.teradata_dialect = bigquery_migration_v2.TeradataDialect(
mode=bigquery_migration_v2.TeradataDialect.Mode.BTEQ)
mode=bigquery_migration_v2.TeradataDialect.Mode.BTEQ
)
elif self.config.translation_type == config_parser.REDSHIFT2BQ:
dialect.redshift_dialect = bigquery_migration_v2.RedshiftDialect()
elif self.config.translation_type == config_parser.ORACLE2BQ:
Expand Down
38 changes: 22 additions & 16 deletions client/config_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import yaml
from os.path import abspath

import yaml
from yaml.loader import SafeLoader

AZURESYNAPSE2BQ = "Translation_AzureSynapse2BQ"
Expand All @@ -31,8 +31,7 @@


class TranslationConfig:
"""A structure for holding the config info of the translation job.
"""
"""A structure for holding the config info of the translation job."""

def __init__(self):
self.project_number = None
Expand All @@ -45,8 +44,7 @@ def __init__(self):


class ConfigParser:
"""A parser for the config file.
"""
"""A parser for the config file."""

def __init__(self, config_file_path: str):
self._config_file_path = abspath(config_file_path)
Expand All @@ -70,7 +68,7 @@ def __init__(self, config_file_path: str):
SPARKSQL2BQ,
TERADATA2BQ,
VERTICA2BQ,
SQLSERVER2BQ
SQLSERVER2BQ,
}

def parse_config(self) -> TranslationConfig:
Expand All @@ -93,23 +91,31 @@ def parse_config(self) -> TranslationConfig:
config.location = translation_config_input["location"]
config.translation_type = translation_config_input[self.__TRANSLATION_TYPE]

config.clean_up_tmp_files = True if self.__CLEAN_UP not in translation_config_input \
config.clean_up_tmp_files = (
True
if self.__CLEAN_UP not in translation_config_input
else translation_config_input[self.__CLEAN_UP]
)

config.default_database = translation_config_input.get(self.__DEFAULT_DATABASE)
config.schema_search_path = translation_config_input.get(self.__SCHEMA_SEARCH_PATH)
config.schema_search_path = translation_config_input.get(
self.__SCHEMA_SEARCH_PATH
)

print("Finished parsing translation config.")
print("The config is:")
print('\n'.join(" %s: %s" % item for item in vars(config).items()))
print("\n".join(" %s: %s" % item for item in vars(config).items()))
return config

def __validate_config_yaml(self, yaml_data):
"""Validate the data in the config yaml file.
"""
assert self.__TRANSLATION_CONFIG in yaml_data, "Missing translation_config field in config.yaml."
assert self.__TRANSLATION_TYPE in yaml_data[
self.__TRANSLATION_CONFIG], "Missing translation_type field in config.yaml."
"""Validate the data in the config yaml file."""
assert (
self.__TRANSLATION_CONFIG in yaml_data
), "Missing translation_config field in config.yaml."
assert (
self.__TRANSLATION_TYPE in yaml_data[self.__TRANSLATION_CONFIG]
), "Missing translation_type field in config.yaml."
translation_type = yaml_data[self.__TRANSLATION_CONFIG][self.__TRANSLATION_TYPE]
assert translation_type in self.__SUPPORTED_TYPES, "The type \"%s\" is not supported." \
% translation_type
assert translation_type in self.__SUPPORTED_TYPES, (
'The type "%s" is not supported.' % translation_type
)
Loading

0 comments on commit a1b0911

Please sign in to comment.