diff --git a/CHANGELOG.md b/CHANGELOG.md index 7ded5b6a..72c65fcf 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,11 +3,20 @@ ## Change History All notable changes to the Databricks Labs Data Generator will be documented in this file. -### Unreleased +### Version 0.3.5 #### Changed * Added formatting of generated code as Html for script methods * Modified pipfile to use newer version of package specifications +* Additional migration of tests to use of `pytest` +* Changed parsing of build options for data generator to support use of custom streaming +* Documentation updates in support of new features such as streaming, complex structures etc + +#### Added +* Added support for additional streaming source types and for use of custom streaming sources +* Added support for use of file reads as a streaming source (for seed and timestamp columns only) +* Added support for complex event time in streaming scenarios. It may also be used in batch scenarios for testing + ### Version 0.3.4 Post 3 diff --git a/dbldatagen/data_generator.py b/dbldatagen/data_generator.py index 02095490..1656af94 100644 --- a/dbldatagen/data_generator.py +++ b/dbldatagen/data_generator.py @@ -8,6 +8,8 @@ import copy import logging import re +import time +import math from pyspark.sql.types import LongType, IntegerType, StringType, StructType, StructField, DataType from .spark_singleton import SparkSingleton @@ -19,13 +21,37 @@ from .utils import ensure, topologicalSort, DataGenError, deprecated, split_list_matching_condition from .html_utils import HtmlUtils from . _version import _get_spark_version +from .enhanced_event_time import EnhancedEventTimeHelper from .schema_parser import SchemaParser _OLD_MIN_OPTION = 'min' _OLD_MAX_OPTION = 'max' +_STREAMING_SOURCE_OPTION = "dbldatagen.streaming.source" +_STREAMING_SCHEMA_OPTION = "dbldatagen.streaming.sourceSchema" +_STREAMING_PATH_OPTION = "dbldatagen.streaming.sourcePath" +_STREAMING_TABLE_OPTION = "dbldatagen.streaming.sourceTable" +_STREAMING_ID_FIELD_OPTION = "dbldatagen.streaming.sourceIdField" +_STREAMING_TIMESTAMP_FIELD_OPTION = "dbldatagen.streaming.sourceTimestampField" +_STREAMING_GEN_TIMESTAMP_OPTION = "dbldatagen.streaming.generateTimestamp" +_STREAMING_USE_SOURCE_FIELDS = "dbldatagen.streaming.sourceFields" +_BUILD_OPTION_PREFIX = "dbldatagen." + _STREAMING_TIMESTAMP_COLUMN = "_source_timestamp" +_STREAMING_SOURCE_RATE = "rate" +_STREAMING_SOURCE_RATE_MICRO_BATCH = "rate-micro-batch" +_STREAMING_SOURCE_NUM_PARTITIONS = "numPartitions" +_STREAMING_SOURCE_ROWS_PER_BATCH = "rowsPerBatch" +_STREAMING_SOURCE_ROWS_PER_SECOND = "rowsPerSecond" +_STREAM_SOURCE_START_TIMESTAMP = "startTimestamp" + +_STREAMING_SOURCE_TEXT = "text" +_STREAMING_SOURCE_PARQUET = "parquet" +_STREAMING_SOURCE_CSV = "csv" +_STREAMING_SOURCE_JSON = "json" +_STREAMING_SOURCE_ORC = "ord" +_STREAMING_SOURCE_DELTA = "delta" class DataGenerator: """ Main Class for test data set generation @@ -49,6 +75,11 @@ class DataGenerator: it is recommended that you use a different name for the seed column - for example `_id`. This may be specified by setting the `seedColumnName` attribute to `_id` + + Note that the number of partitions requested is not guaranteed to be supplied when generating + a streaming data set using a streaming source that generates a different number of partitions. + However for most batch use cases, the requested number of partitions will be generated. + """ # class vars @@ -154,6 +185,10 @@ def __init__(self, sparkSession=None, name=None, randomSeedMethod=None, self._inferredSchemaFields = [] self.buildPlanComputed = False + # set of columns that must be present during processing + # will be map from column name to reason it is needed + self._requiredColumns = {} + # lets add the seed column self.withColumn(self._seedColumnName, LongType(), nullable=False, implicit=True, omit=True, noWarn=True) self._batchSize = batchSize @@ -885,9 +920,12 @@ def _getBaseDataFrame(self, startId=0, streaming=False, options=None): :returns: Spark data frame for base data that drives the data generation """ + assert self.partitions is not None, "Expecting partition count to be initialized" + id_partitions = self.partitions end_id = self._rowCount + startId - id_partitions = self.partitions if self.partitions is not None else 4 + + build_options, passthrough_options, unsupported_options = self._parseBuildOptions(options) if not streaming: status = f"Generating data frame with ids from {startId} to {end_id} with {id_partitions} partitions" @@ -902,34 +940,120 @@ def _getBaseDataFrame(self, startId=0, streaming=False, options=None): df1 = df1.withColumnRenamed(SPARK_RANGE_COLUMN, self._seedColumnName) else: - status = ( - f"Generating streaming data frame with ids from {startId} to {end_id} with {id_partitions} partitions") + self._applyStreamingDefaults(build_options, passthrough_options) + + assert _STREAMING_SOURCE_OPTION in build_options.keys(), "There must be a source type specified" + streaming_source_format = build_options[_STREAMING_SOURCE_OPTION] + + if streaming_source_format in [ _STREAMING_SOURCE_RATE, _STREAMING_SOURCE_RATE_MICRO_BATCH]: + streaming_partitions = passthrough_options[_STREAMING_SOURCE_NUM_PARTITIONS] + status = ( + f"Generating streaming data frame with {streaming_partitions} partitions") + else: + status = ( + f"Generating streaming data frame with '{streaming_source_format}' streaming source") + self.logger.info(status) self.executionHistory.append(status) df1 = (self.sparkSession.readStream - .format("rate")) - if options is not None: - if "rowsPerSecond" not in options: - options['rowsPerSecond'] = 1 - if "numPartitions" not in options: - options['numPartitions'] = id_partitions - - for k, v in options.items(): - df1 = df1.option(k, v) - df1 = (df1.load() - .withColumnRenamed("value", self._seedColumnName) - ) + .format(streaming_source_format)) + + for k, v in passthrough_options.items(): + df1 = df1.option(k, v) + + file_formats = [_STREAMING_SOURCE_TEXT, _STREAMING_SOURCE_JSON, _STREAMING_SOURCE_CSV, + _STREAMING_SOURCE_PARQUET, _STREAMING_SOURCE_DELTA, _STREAMING_SOURCE_ORC] + + data_path = None + source_table = None + id_column = "value" + if _STREAMING_ID_FIELD_OPTION in build_options: + id_column = build_options[_STREAMING_ID_FIELD_OPTION] + + if _STREAMING_TABLE_OPTION in build_options: + source_table = build_options[_STREAMING_TABLE_OPTION] + + if _STREAMING_SCHEMA_OPTION in build_options: + source_schema = build_options[_STREAMING_SCHEMA_OPTION] + df1 = df1.schema(source_schema) + + # get path for file based reads + if _STREAMING_PATH_OPTION in build_options: + data_path = build_options[_STREAMING_PATH_OPTION] + elif streaming_source_format in file_formats: + if "path" in passthrough_options: + data_path = passthrough_options["path"] + + if data_path is not None: + df1 = df1.load(data_path) + elif source_table is not None: + df1 = df1.table(source_table) else: - df1 = (df1.option("rowsPerSecond", 1) - .option("numPartitions", id_partitions) - .load() - .withColumnRenamed("value", self._seedColumnName) - ) + df1 = df1.load() + + if id_column != self._seedColumnName: + df1 = df1.withColumnRenamed(id_column, self._seedColumnName) return df1 + def withEnhancedEventTime(self, startEventTime=None, + acceleratedEventTimeInterval="10 minutes", + fractionLateArriving=0.1, + lateTimeInterval="6 hours", + jitter=(-0.25, 0.9999), + eventTimeName="event_ts", + baseColumn="timestamp", + keepIntermediateColumns=False): + """ + Delegate to EnhancedEventTimeHelper to implement enhanced event time + + :param startEventTime: start timestamp of output event time + :param acceleratedEventTimeInterval: interval for accelerated event time (i.e "10 minutes") + :param fractionLateArriving: fraction of late arriving data. range [0.0, 1.0] + :param lateTimeInterval: interval for late arriving events (i.e "6 hours") + :param jitter: jitter factor to avoid strictly increasing order in events + :param eventTimeName: Column name for generated event time column + :param baseColumn: Base column name used for computations of adjusted event time + :param keepIntermediateColumns: Flag to retain intermediate columns in the output, [ debug / test only] + + :returns instance of data generator (note caller object is changed) + + This adjusts the dataframe to produce IOT style event time that normally increases over time but has a + configurable fraction of late arriving data. It uses a base timestamp column (called timestamp) + to compute data. There must be a column of the same name as the base timestamp column in the source data frame + and it should have the value of "now()". + + The modified dataframe will have a new column named using the `eventTimeName` parameter containing the + new timestamp value. + + By default `rate` and `rate-micro-batch` streaming sources have this column but + it can be added to other dataframes - either batch or streaming data frames. + + While the overall intent is to support synthetic IOT style simulated device data in a stream, it can be used + with a batch data frame (as long as an appropriate timestamp column is added and designated as the base data + timestamp column. + """ + + helper = EnhancedEventTimeHelper() + assert baseColumn is not None and len(baseColumn) > 0, "baseColumn argument must be supplied" + + # add metadata for required timestamp field + self._requiredColumns[baseColumn] = f"Column '{baseColumn}' is required by `withEnhancedEventTime` processing" + + helper.withEnhancedEventTime(self, + startEventTime, + acceleratedEventTimeInterval, + fractionLateArriving, + lateTimeInterval, + jitter, + eventTimeName, + baseColumn, + keepIntermediateColumns) + + return self + def _computeColumnBuildOrder(self): """ compute the build ordering using a topological sort on dependencies @@ -1077,6 +1201,81 @@ def computeBuildPlan(self): self.buildPlanComputed = True return self + def _parseBuildOptions(self, options): + """ Parse build options + + Parse build options into tuple of dictionaries - (datagen options, passthrough options, unsupported options) + + where + + - `datagen options` is dictionary of options to be interpreted by the data generator + - `passthrough options` is dictionary of options to be passed through to the underlying base dataframe + - `supported options` is dictionary of options that are not supported + + :param options: Dict of options to control generating of data + :returns: tuple of options dictionaries - (datagen_options, passthrough_options, unsupported options) + + """ + passthrough_options = {} + unsupported_options = {} + datagen_options = {} + + supported_options = [_STREAMING_SOURCE_OPTION, + _STREAMING_SCHEMA_OPTION, + _STREAMING_PATH_OPTION, + _STREAMING_TABLE_OPTION, + _STREAMING_ID_FIELD_OPTION, + _STREAMING_TIMESTAMP_FIELD_OPTION, + _STREAMING_GEN_TIMESTAMP_OPTION, + _STREAMING_USE_SOURCE_FIELDS + ] + + if options is not None: + for k, v in options.items(): + if isinstance(k, str): + if k.startswith(_BUILD_OPTION_PREFIX): + if k in supported_options: + datagen_options[k] = v + else: + unsupported_options[k] = v + else: + passthrough_options[k] = v + else: + unsupported_options[k] = v + + # add defaults + + return datagen_options, passthrough_options, unsupported_options + + def _applyStreamingDefaults(self, build_options, passthrough_options): + assert build_options is not None + assert passthrough_options is not None + + # default to `rate` streaming source + if _STREAMING_SOURCE_OPTION not in build_options: + build_options[_STREAMING_SOURCE_OPTION] = _STREAMING_SOURCE_RATE + + # setup `numPartitions` if not specified + if build_options[_STREAMING_SOURCE_OPTION] in [_STREAMING_SOURCE_RATE, _STREAMING_SOURCE_RATE_MICRO_BATCH]: + if _STREAMING_SOURCE_NUM_PARTITIONS not in passthrough_options: + passthrough_options[_STREAMING_SOURCE_NUM_PARTITIONS] = self.partitions + + # set up rows per batch if not specified + if build_options[_STREAMING_SOURCE_OPTION] == _STREAMING_SOURCE_RATE: + if _STREAMING_SOURCE_ROWS_PER_SECOND not in passthrough_options: + passthrough_options[_STREAMING_SOURCE_ROWS_PER_SECOND] = 1 + + if build_options[_STREAMING_SOURCE_OPTION] == _STREAMING_SOURCE_RATE_MICRO_BATCH: + if _STREAMING_SOURCE_ROWS_PER_BATCH not in passthrough_options: + passthrough_options[_STREAMING_SOURCE_ROWS_PER_BATCH] = 1 + if _STREAM_SOURCE_START_TIMESTAMP not in passthrough_options: + currentTs = math.floor(time.mktime(time.localtime())) * 1000 + passthrough_options[_STREAM_SOURCE_START_TIMESTAMP] = currentTs + + if build_options[_STREAMING_SOURCE_OPTION] == _STREAMING_SOURCE_TEXT: + self.logger.warning("Use of the `text` format may not work due to lack of type support") + + def build(self, withTempView=False, withView=False, withStreaming=False, options=None): """ build the test data set from the column definitions and return a dataframe for it @@ -1107,6 +1306,8 @@ def build(self, withTempView=False, withView=False, withStreaming=False, options df1 = self._getBaseDataFrame(self.starting_id, streaming=withStreaming, options=options) + # TODO: check that the required columns are present in the base data frame + self.executionHistory.append("Using Pandas Optimizations {True}") # build columns diff --git a/dbldatagen/enhanced_event_time.py b/dbldatagen/enhanced_event_time.py new file mode 100644 index 00000000..bb198c29 --- /dev/null +++ b/dbldatagen/enhanced_event_time.py @@ -0,0 +1,138 @@ +# See the License for the specific language governing permissions and +# limitations under the License. +# + +""" +This file defines the `EnhancedEventTime` class + +This defines helper methods for implementing enhanced event time +""" + +import logging + + +from pyspark.sql.functions import col, pandas_udf +from pyspark.sql.functions import lit, concat, rand, round as sql_round, array, expr, when, udf, \ + format_string +from pyspark.sql.types import FloatType, IntegerType, StringType, DoubleType, BooleanType, \ + TimestampType, DataType, DateType + +from .column_spec_options import ColumnSpecOptions +from .datagen_constants import RANDOM_SEED_FIXED, RANDOM_SEED_HASH_FIELD_NAME, RANDOM_SEED_RANDOM +from .daterange import DateRange +from .distributions import Normal, DataDistribution +from .nrange import NRange +from .text_generators import TemplateGenerator +from .utils import ensure, coalesce_values + +import dbldatagen as dg +import dbldatagen.distributions as dist + +HASH_COMPUTE_METHOD = "hash" +VALUES_COMPUTE_METHOD = "values" +RAW_VALUES_COMPUTE_METHOD = "raw_values" +AUTO_COMPUTE_METHOD = "auto" +COMPUTE_METHOD_VALID_VALUES = [HASH_COMPUTE_METHOD, + AUTO_COMPUTE_METHOD, + VALUES_COMPUTE_METHOD, + RAW_VALUES_COMPUTE_METHOD] + + + +class EnhancedEventTimeHelper(object): + + def init(self): + pass + + def withEnhancedEventTime(self, + dataspec, + startEventTime=None, + acceleratedEventTimeInterval="10 minutes", + fractionLateArriving=0.1, + lateTimeInterval="6 hours", + jitter=(-0.25, 0.9999), + eventTimeName=None, + baseColumn="timestamp", + keepIntermediateColumns=False): + """ + Implement enhanced event time + + :param dataspec: dataspec to apply enhanced event time to + :param startEventTime: start timestamp of output event time + :param acceleratedEventTimeInterval: interval for accelerated event time (i.e "10 minutes") + :param fractionLateArriving: fraction of late arriving data. range [0.0, 1.0] + :param lateTimeInterval: interval for late arriving events (i.e "6 hours") + :param jitter: jitter factor to avoid strictly increasing order in events + :param eventTimeName: Column name for generated event time column + :param baseColumn: Base column name used for computations of adjusted event time + :param keepIntermediateColumns: Flag to retain intermediate columns in the output, [ debug / test only] + + This adjusts the dataframe to produce IOT style event time that normally increases over time but has a + configurable fraction of late arriving data. It uses a base timestamp column (called `timestamp` by default) + to compute data. There must be a column of this name in the source data frame and it should have the value of + "now()". + + By default `rate` and `rate-micro-batch` streaming sources have this column but + it can be added to other dataframes - either batch or streaming data frames. + + While the overall intent is to support synthetic IOT style simulated device data in a stream, it can be used + with a batch data frame (as long as an appropriate timestamp column is added and designated as the base data + timestamp column. + """ + + assert startEventTime is not None, "value for `startTime` must be specified" + assert dataspec is not None, "dataspec must be specified" + assert 0.0 <= fractionLateArriving <= 1.0, "fractionLateArriving must be in range [0.0, 1.0]" + assert eventTimeName is not None, "eventTimeName argument must be supplied" + + # determine timestamp for start of generation + start_of_generation = \ + dataspec.sparkSession.sql(f"select cast(now() as string) as start_timestamp").collect()[0][ + 'start_timestamp'] + + omitInterimColumns = not keepIntermediateColumns + + retval = (dataspec + .withColumn("_late_arrival_factor1", "double", minValue=0.0, maxValue=1.0, continuous=True, + random=True, + distribution=dist.Beta(2.0, 5.3), omit=omitInterimColumns) + .withColumn("_late_arrival_factor", "double", expr="least(_late_arrival_factor1 * 1.17, 1.0)", + baseColumn="_late_arrival_factor1", + omit=omitInterimColumns) + .withColumn("_stream_time", "timestamp", expr=f"{baseColumn}", + omit=omitInterimColumns, baseColumn=baseColumn) + .withColumn("_data_gen_time", "timestamp", expr="now()", omit=omitInterimColumns) + .withColumn("_difference_factor", "double", + expr=f"cast(_stream_time as double) - cast(TIMESTAMP '{start_of_generation}' as double)", + baseColumns=["_stream_time"], + omit=omitInterimColumns) + .withColumn("_jitter_within_event_interval", "double", minValue=jitter[0], maxValue=jitter[1], + continuous=True, random=True, + omit=omitInterimColumns) + .withColumn("_late_arrival_prob", "double", minValue=0.0, maxValue=1.0, continuous=True, + random=True, + omit=omitInterimColumns) + .withColumn("_late_arrival", "boolean", + expr=f"case when _late_arrival_prob <= {fractionLateArriving} then true else false end", + baseColumns=["_late_arrival_prob"], + omit=omitInterimColumns) + .withColumn("_ontime_event_ts", "timestamp", + expr=f"""greatest(TIMESTAMP '{startEventTime}' + + ((_difference_factor + _jitter_within_event_interval) + * INTERVAL {acceleratedEventTimeInterval}), + TIMESTAMP '{startEventTime}') """, + baseColumns=["_difference_factor", "_jitter_within_event_interval"], + omit=omitInterimColumns) + .withColumn("_late_arrival_ts", "timestamp", + expr=f"""greatest(_ontime_event_ts - (_late_arrival_factor * INTERVAL {lateTimeInterval}), + TIMESTAMP '{startEventTime}') + """, + baseColumns=["_ontime_event_ts", "_late_arrival_factor"], + omit=omitInterimColumns) + + # generate event time column + .withColumn(eventTimeName, "timestamp", + expr="case when _late_arrival then _late_arrival_ts else _ontime_event_ts end", + baseColumns=["_late_arrival", "_late_arrival_ts", "_ontime_event_ts"]) + ) + return retval diff --git a/docs/source/using_streaming_data.rst b/docs/source/using_streaming_data.rst index 20770853..12a72daa 100644 --- a/docs/source/using_streaming_data.rst +++ b/docs/source/using_streaming_data.rst @@ -1,25 +1,61 @@ .. Databricks Labs Data Generator documentation master file, created by sphinx-quickstart on Sun Jun 21 10:54:30 2020. -Using Streaming Data -==================== +Producing synthetic streaming data +=================================== +The Databricks Labs Data Generator can be used to generate synthetic streaming data using a variety of +streaming sources. + +When generating streaming data, the number of rows to be generated in the original construction of the data spec is +either ignored, and the number of rows and speed at which they are generated depends on the options passed to +the `build` method. + +When generating batch data, the data generation process begins with generating a base dataframe of seed values +using the `spark.range` method. +The rules for additional column generation are then applied to this base data frame. + +For generation of streaming data, the base data frame is generated using one of the `spark.readStream` variations. +This is controlled by passing the argument `withStreaming=True` to the `build` method of the DataGenerator instance. +Additional options may be passed to control rate at which synthetic rows are generated. + +.. note:: + Prior to this release, only the structured streaming `rate` source was available to generate streaming data. But with + this release, you can use a `rate-micro-batch` source, a file source (for streaming reads from data in files such as + delta, parquet or other file formats). Custom formats should also be supported through the new streaming mechanisms + +In theory, any of the structured streaming sources are supported but we do not test compatibility for sources other +than the `rate`, `rate-micro-batch` and file based data sources. + +.. seealso:: + See the following links for more details: + + * `Spark Structured Streaming data sources `_ + +Generating Streaming Data +------------------------- You can make any data spec into a streaming data frame by passing additional options to the ``build`` method. -If the ``withStreaming`` option is used when building the data set, it will use a streaming rate source to generate -the data. You can control the streaming rate with the option ``rowsPerSecond``. +If the ``withStreaming`` option is used when building the data set, it will use a streaming source to generate +the data. By default, this will use a structured streaming `rate` data source as the base data frame. + +When using the `rate` streaming source, you can control the streaming rate with the option ``rowsPerSecond``. -In this case, the row count is ignored. +When using streaming data sources, the row count specified in the call to the DataGenerator instance constructor +is ignored. In most cases, no further changes are needed to run the data generation as a streaming data generator. -As the generated data frame is a normal spark streaming data frame, all the same caveats and features apply. +As the generated data frame is a normal spark streaming data frame, all of the structured streaming caveats +and features apply. Example 1: site code and technology ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ .. code-block:: python + :linenos: + :emphasize-lines: 35 from datetime import timedelta, datetime import math @@ -65,6 +101,71 @@ Example 1: site code and technology display(df) +Customizing streaming data generation +------------------------------------- + +You can customize how the streaming data is generated using the options to the ``build`` command. + +There are two types of options: + + * Options that are interpreted by the streaming data generation process. These options begin with `'dbldatagen.'` + + * Options that are passed through to the underlying streaming data frame. All other options are passed through to the + `options` method of the underlying dataframe. + +.. list-table:: **Data generation options for generating streaming data** + :header-rows: 1 + + * - Option + - Usage + + * - `dbldatagen.streaming.source` + - Type of streaming source to generate. Defaults to `rate` + + * - `dbldatagen.streaming.sourcePath` + - Path for file based data sources. + + * - `dbldatagen.streaming.sourceSchema` + - Schema for source of streaming file sources + + * - `dbldatagen.streaming.sourceIdField` + - Name of source id field - defaults to `value` + + * - `dbldatagen.streaming.sourceTimestampField` + - Name of source timestamp field - defaults to `timestamp` + + * - `dbldatagen.streaming.generateTimestamp` + - if set to `True`, automatically generates a timestamp field if none present + + +The type of the streaming source may be the fully qualified name of a custom streaming source, a built in streaming +source such as `rate` or `rate-micro-batch`, or the name of a file format such as `parquet`, `delta`, or `csv`. + +File based data source support `csv`, `parquet` and `delta` format files or folders of files. Files or folders of +files in `delta` format do not require specification of a schema as it is inferred from the underlying file. + +Files or folders of files in `csv` format require a schema. + +Any options that do not begin with the prefix `dbldatagen.` are passed through to the options method of the underlying +based data frame. + +When a schema is specified for a file based source, the schema should only specify the schema of the fields in the +underlying source, not for additional fields added by the data generation rules. + +.. note:: + Every streaming data source requires a field that can be designated as the seed field or `id` field. + This takes on the same role of the `id` field when batch data generation is used. + + This field will be renamed to the seed field name `id` (or to the custom seed field name, if it + has been overriden in the data generator constructor). + + Many streaming operations also require the designation of a timestamp field to represent event time. This may + be read from the underlying streaming source, or automatically generated. This is also needed if using + enhanced event time (described in a later section). + +What happens if there are other fields in the underlying data source? These are ignored but fields in the generation +spec may refer to them. However, unless a field generation rule replicates the data in the source field, it will not +appear in the generated data. Example 2: IOT style data ^^^^^^^^^^^^^^^^^^^^^^^^^ @@ -73,6 +174,8 @@ The following example shows how to control the length of time to run the streami data generation for. .. code-block:: python + :emphasize-lines: 60,61,62 + :linenos: import time time_to_run = 180 @@ -132,7 +235,8 @@ data generation for. .withColumn("event_ts", "timestamp", expr="now()") ) - dfTestDataStreaming = testDataSpec.build(withStreaming=True, options={'rowsPerSecond': 500}) + dfTestDataStreaming = testDataSpec.build(withStreaming=True, + options={'rowsPerSecond': 500}) # ... do something with your streaming source here display(dfTestDataStreaming) @@ -151,8 +255,38 @@ terminate the streaming after a specified period of time. except RuntimeError: pass +Using file based streaming sources +---------------------------------- + +Spark structured streaming allows for the use of a dataset storage that has been written in `delta`, +`parquet`, `csv` or other file formats as a streaming source. In particular, the Databricks `delta` format includes +additional enhancements such as transactional isolation that make it suitable as a structured streaming source. + +The Databricks Labs Data Generator allows for the use of files or datasets in `delta`, `parquet` or `csv` format as a +source for the generation of structured streaming data. + +When a file based source is used as a source of structured streaming data, the data generator will use the `seed` or +`id` field from the source and optionally the timestamp field that represents event time. + +When the file contains other fields, these fields may be referred to by other data generation rules (for example in the +body of an `expr` attribute). However, these underlying source fields are not propagated to the output data. + +Simulating late arriving events +------------------------------- + +The data generator also supports simulating the common IOT event time pattern in streaming device data where +most of the events advance in event time as messages are delivered but with a certain percentage of events +arriving late. + +By specifying the use of ehanced event time, you can designate that events increase in event time for most events +but that a certain percentage of the events are delayed up to a configurable time interval. + +This allows for validation of handling of device data with late arriving data, watermarks, stream/stream joins and many +other typical streaming scenarios. + + Using streaming data with Delta tables -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +-------------------------------------- If you write the streaming data to a Delta table using a streaming writer, then the Delta table itself can be used as a streaming source diff --git a/docs/source/working_with_dlt.rst b/docs/source/working_with_dlt.rst new file mode 100644 index 00000000..cdd204bd --- /dev/null +++ b/docs/source/working_with_dlt.rst @@ -0,0 +1,33 @@ +.. Databricks Labs Data Generator documentation master file, created by + sphinx-quickstart on Sun Jun 21 10:54:30 2020. + You can adapt this file completely to your liking, but it should at least + contain the root `toctree` directive. + +Working with Delta Live Tables +============================== +The Databricks Labs Data Generator can be used within a Delta Live Tables Python pipeline to generate one or more +synthetic data sources that can be used to test or benchmark a Delta Live Tables pipeline. + +To install the library in a Delta Live Tables pipeline, you will need to create a notebook cell as the first cell +of the notebook to install the library via the `%pip` command. + +.. code-block:: python + + %pip install dbldatagen + +Once the Data Generator has been installed, you can use it to create DLT live tables and live streaming tables. + +.. note:: + We recommend using the `rate-micro-batch` streaming source when creating a Delta Live Tables streaming source. + +Creating a batch data source +---------------------------- + +Creating a batch data source in DLT is similar to creating a batch data source in a classic Databricks notebook. + + +Creating a streaming data source +-------------------------------- + +You can use the Data Generator to generate a synthetic source for a streaming live table. + diff --git a/tests/test_streaming.py b/tests/test_streaming.py index cf10273a..15fae53e 100644 --- a/tests/test_streaming.py +++ b/tests/test_streaming.py @@ -2,20 +2,55 @@ import shutil import time import pytest +import logging from pyspark.sql.types import IntegerType, StringType, FloatType +import pyspark.sql.functions as F import dbldatagen as dg spark = dg.SparkSingleton.getLocalInstance("streaming tests") +@pytest.fixture(scope="class") +def setupLogging(): + FORMAT = '%(asctime)-15s %(message)s' + logging.basicConfig(format=FORMAT) + + class TestStreaming(): row_count = 100000 column_count = 10 time_to_run = 10 rows_per_second = 5000 + def setup_log_capture(self, caplog_object): + """ set up log capture fixture + + Sets up log capture fixture to only capture messages after setup and only + capture warnings and errors + + """ + caplog_object.set_level(logging.WARNING) + + # clear messages from setup + caplog_object.clear() + + def get_log_capture_warnings_and_errors(self, caplog_object, textFlag): + """ + gets count of errors containing specified text + + :param caplog_object: log capture object from fixture + :param textFlag: text to search for to include error or warning in count + :return: count of errors containg text specified in `textFlag` + """ + streaming_warnings_and_errors = 0 + for r in caplog_object.records: + if (r.levelname == "WARNING" or r.levelname == "ERROR") and textFlag in r.message: + streaming_warnings_and_errors += 1 + + return streaming_warnings_and_errors + @pytest.fixture def getStreamingDirs(self): time_now = int(round(time.time() * 1000)) @@ -32,7 +67,26 @@ def getStreamingDirs(self): shutil.rmtree(base_dir, ignore_errors=True) print(f"\n\n*** test dir [{base_dir}] deleted") - @pytest.mark.parametrize("seedColumnName", ["id", "_id", None]) + @pytest.fixture + def getDataDir(self): + time_now = int(round(time.time() * 1000)) + base_dir = "/tmp/testdata_{}".format(time_now) + data_dir = os.path.join(base_dir, "data") + print(f"test data dir created '{base_dir}'") + + # dont need to create the data dir + os.makedirs(base_dir) + + try: + yield data_dir + finally: + shutil.rmtree(base_dir, ignore_errors=True) + print(f"\n\n*** test data dir [{base_dir}] deleted") + + + @pytest.mark.parametrize("seedColumnName", ["id", + "_id", + None]) def test_streaming(self, getStreamingDirs, seedColumnName): base_dir, test_dir, checkpoint_dir = getStreamingDirs @@ -104,7 +158,9 @@ def test_streaming(self, getStreamingDirs, seedColumnName): # check that we have at least one second of data assert rows_retrieved >= self.rows_per_second - @pytest.mark.parametrize("seedColumnName", ["id", "_id", None]) + @pytest.mark.parametrize("seedColumnName", ["id", + "_id", + None]) def test_streaming_trigger_once(self, getStreamingDirs, seedColumnName): base_dir, test_dir, checkpoint_dir = getStreamingDirs @@ -178,3 +234,236 @@ def test_streaming_trigger_once(self, getStreamingDirs, seedColumnName): # check that we have at least one second of data assert rows_retrieved >= self.rows_per_second + + @pytest.mark.parametrize("options,optionsExpected", + [ ({"dbldatagen.streaming.source": "rate"}, + ({"dbldatagen.streaming.source": "rate"}, {}, {})), + ({"dbldatagen.streaming.source": "rate-micro-batch", "rowsPerSecond": 50}, + ({"dbldatagen.streaming.source": "rate-micro-batch"}, {"rowsPerSecond": 50}, {})), + ({"dbldatagen.streaming.source": "rate-micro-batch", + "rowsPerSecond": 50, + "dbldatagen.rows": 100000}, + ({"dbldatagen.streaming.source": "rate-micro-batch"}, + {"rowsPerSecond": 50}, + {"dbldatagen.rows": 100000})) + ]) + def test_option_parsing(self, options, optionsExpected): + testDataSpec = (dg.DataGenerator(sparkSession=spark, name="test_data_set1", rows=self.row_count, + partitions=4, seedMethod='hash_fieldname') + .withColumn("code1", IntegerType(), minValue=100, maxValue=200) + .withColumn("code2", IntegerType(), minValue=0, maxValue=10) + .withColumn("code3", StringType(), values=['a', 'b', 'c']) + .withColumn("code4", StringType(), values=['a', 'b', 'c'], random=True) + .withColumn("code5", StringType(), values=['a', 'b', 'c'], random=True, weights=[9, 1, 1]) + ) + + datagen_options, passthrough_options, unsupported_options = testDataSpec._parseBuildOptions(options) + + expected_datagen_options, expected_passthrough_options, expected_unsupported_options = optionsExpected + + assert datagen_options == expected_datagen_options + assert passthrough_options == expected_passthrough_options + assert unsupported_options == expected_unsupported_options + + @pytest.mark.parametrize("options", + [ {"dbldatagen.streaming.source": "parquet", + "dbldatagen.streaming.sourcePath": "/tmp/testStreamingFiles/data1"}, + {"dbldatagen.streaming.source": "csv", + "dbldatagen.streaming.sourcePath": "/tmp/testStreamingFiles/data2"}, + ]) + def test_basic_file_streaming(self, options, getStreamingDirs): + base_dir, test_dir, checkpoint_dir = getStreamingDirs + + # generate file for base of streaming generator + testDataSpecBase = (dg.DataGenerator(sparkSession=spark, name="test_data_set1", rows=self.row_count, + seedMethod='hash_fieldname') + .withColumn('value', "long", expr="id") + .withColumn("code1", IntegerType(), minValue=100, maxValue=200) + .withColumn("code5", StringType(), values=['a', 'b', 'c'], random=True, weights=[9, 1, 1]) + ) + + dfBase = testDataSpecBase.build() + dfBase.write.format(options["dbldatagen.streaming.source"])\ + .mode('overwrite')\ + .save(options["dbldatagen.streaming.source"]) + + # generate streaming data frame + testDataSpec = (dg.DataGenerator(sparkSession=spark, name="test_data_set2", rows=self.row_count, + partitions=4, seedMethod='hash_fieldname') + .withColumn("a", IntegerType(), minValue=100, maxValue=200) + .withColumn("b", StringType(), values=['a', 'b', 'c'], random=True, weights=[9, 1, 1]) + ) + dfStreaming = testDataSpecBase.build(withStreaming=True, options=options) + + sq = (dfStreaming + .writeStream + .format("parquet") + .outputMode("append") + .option("path", test_dir) + .option("checkpointLocation", checkpoint_dir) + .trigger(once=True) + .start()) + + sq.processAllAvailable() + + dfStreamDataRead = spark.read.format("parquet").load(test_dir) + rows_read = dfStreamDataRead.count() + + assert rows_read == self.row_count + + def test_withEventTime_batch(self): + # test it in batch mode + starting_datetime = "2022-06-01 01:00:00" + testDataSpecBase = (dg.DataGenerator(sparkSession=spark, name="test_data_set1", rows=self.row_count, + partitions=4, seedMethod='hash_fieldname') + .withColumn('value', "long", expr="id") + .withColumn("code1", IntegerType(), minValue=100, maxValue=200) + .withColumn("code5", StringType(), values=['a', 'b', 'c'], random=True, weights=[9, 1, 1]) + .withColumn("timestamp", "timestamp", expr="now()") + .withEnhancedEventTime(startEventTime=starting_datetime, baseColumn="timestamp", + eventTimeName="event_ts") + ) + + df = testDataSpecBase.build() + assert df.count() == self.row_count + + df.show() + + def test_withEventTime_streaming(self, getStreamingDirs): + base_dir, test_dir, checkpoint_dir = getStreamingDirs + + # test it in streaming mode + starting_datetime = "2022-06-01 01:00:00" + testDataSpecBase = (dg.DataGenerator(sparkSession=spark, name="test_data_set1", rows=self.row_count, + partitions=4, seedMethod='hash_fieldname') + .withColumn('value', "long", expr="id") + .withColumn("code1", IntegerType(), minValue=100, maxValue=200) + .withColumn("code5", StringType(), values=['a', 'b', 'c'], random=True, weights=[9, 1, 1]) + .withColumn("timestamp2", "timestamp", expr="timestamp") + .withEnhancedEventTime(startEventTime=starting_datetime, baseColumn="timestamp2", + eventTimeName="event_ts") + ) + + dfStreaming = testDataSpecBase.build(withStreaming=True) + + sq = (dfStreaming + .writeStream + .format("parquet") + .outputMode("append") + .option("path", test_dir) + .option("checkpointLocation", checkpoint_dir) + .start()) + + sq.awaitTermination(5) + if sq.isActive: + sq.stop() + + dfStreamDataRead = spark.read.format("parquet").load(test_dir) + rows_read = dfStreamDataRead.count() + assert rows_read > 0 + + @pytest.mark.parametrize("options,optionsExpected", + [ ({"dbldatagen.streaming.source": "rate"}, + ({"dbldatagen.streaming.source": "rate"}, + {"rowsPerSecond": 1, 'numPartitions': 10}, {})), + ({"dbldatagen.streaming.source": "rate-micro-batch"}, + ({"dbldatagen.streaming.source": "rate-micro-batch"}, {'numPartitions': 10, 'rowsPerBatch':1}, {})), + ]) + def test_default_options(self, options, optionsExpected): + testDataSpec = (dg.DataGenerator(sparkSession=spark, name="test_data_set1", rows=self.row_count, + partitions=10, seedMethod='hash_fieldname') + .withColumn("code1", IntegerType(), minValue=100, maxValue=200) + .withColumn("code2", IntegerType(), minValue=0, maxValue=10) + .withColumn("code3", StringType(), values=['a', 'b', 'c']) + .withColumn("code4", StringType(), values=['a', 'b', 'c'], random=True) + .withColumn("code5", StringType(), values=['a', 'b', 'c'], random=True, weights=[9, 1, 1]) + ) + + datagen_options, passthrough_options, unsupported_options = testDataSpec._parseBuildOptions(options) + testDataSpec._applyStreamingDefaults(datagen_options, passthrough_options) + if "startTimestamp" in passthrough_options.keys(): + passthrough_options.pop("startTimestamp", None) + + # remove start timestamp from both options and expected options + + expected_datagen_options, expected_passthrough_options, expected_unsupported_options = optionsExpected + + assert datagen_options == expected_datagen_options + assert passthrough_options == expected_passthrough_options + + def test_text_streaming(self, getDataDir, caplog, getStreamingDirs): + datadir = getDataDir + base_dir, test_dir, checkpoint_dir = getStreamingDirs + + # caplog fixture captures log content + self.setup_log_capture(caplog) + + df = spark.range(10000).select(F.expr("cast(id as string)").alias("id")) + df.write.format("text").save(datadir) + + testDataSpec = (dg.DataGenerator(sparkSession=spark, name="test_data_set1", rows=self.row_count, + partitions=10, seedMethod='hash_fieldname') + .withColumn("code1", IntegerType(), minValue=100, maxValue=200) + .withColumn("code2", IntegerType(), minValue=0, maxValue=10) + .withColumn("code3", StringType(), values=['a', 'b', 'c']) + .withColumn("code4", StringType(), values=['a', 'b', 'c'], random=True) + .withColumn("code5", StringType(), values=['a', 'b', 'c'], random=True, weights=[9, 1, 1]) + ) + + streamingOptions = { + 'dbldatagen.streaming.source': 'text', + 'dbldatagen.streaming.sourcePath': datadir, + + } + df_streaming = testDataSpec.build(withStreaming=True, options=streamingOptions) + + # check that there warnings about `text` format + text_format_warnings_and_errors = self.get_log_capture_warnings_and_errors(caplog, "text") + assert text_format_warnings_and_errors > 0, "Should have error or warning messages about text format" + + # loop until we get one seconds worth of data + start_time = time.time() + elapsed_time = 0 + rows_retrieved = 0 + time_limit = 10.0 + + while elapsed_time < time_limit and rows_retrieved < self.rows_per_second: + sq = (df_streaming + .writeStream + .format("parquet") + .outputMode("append") + .option("path", test_dir) + .option("checkpointLocation", checkpoint_dir) + .trigger(once=True) + .start()) + + # wait for trigger once to terminate + sq.awaitTermination(5) + + elapsed_time = time.time() - start_time + + try: + df2 = spark.read.format("parquet").load(test_dir) + rows_retrieved = df2.count() + + # ignore file or metadata not found issues arising from read before stream has written first batch + except Exception as exc: + print("Exception:", exc) + + if sq.isActive: + sq.stop() + + end_time = time.time() + + print("*** Done ***") + print("read {} rows from newly written data".format(rows_retrieved)) + print("elapsed time (seconds)", end_time - start_time) + + # check that we have at least one second of data + assert rows_retrieved >= self.rows_per_second + + + + + +