From 5f601adad84f0353510417871037d474a16fa8f8 Mon Sep 17 00:00:00 2001 From: ronanstokes-db Date: Wed, 8 May 2024 18:28:51 -0700 Subject: [PATCH 1/7] wip --- CHANGELOG.md | 5 +++++ CONTRIBUTING.md | 3 ++- README.md | 2 +- python/dev_require.txt | 2 +- python/require.txt | 2 +- 5 files changed, 10 insertions(+), 4 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 305894c5..ccad5989 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,11 @@ ## Change History All notable changes to the Databricks Labs Data Generator will be documented in this file. +### Unreleased - next release is intended to be v0.4.0 + +#### Changed +* Minimum spark version is PySpark 3.2.1 0 which is the minimum version for Databricks Runtime 10.4 LTS + ### Version 0.3.6 #### Changed diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 9402e2d4..161025e3 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -22,7 +22,8 @@ Dependent packages are not installed automatically by the `dbldatagen` package. The code has been tested with Python 3.8.12 and later. Older releases were tested with Python 3.7.5 but as of this release, it requires the Databricks -runtime 9.1 LTS or later. +runtime 10.4 LTS or later, and Python 3.8.10 or later. Due to limations in the runtimes available in +Github actions, testing is performed with Python 3.8.12. ## Checking your code for common issues diff --git a/README.md b/README.md index 5695350c..05a08f4c 100644 --- a/README.md +++ b/README.md @@ -82,7 +82,7 @@ The documentation [installation notes](https://databrickslabs.github.io/dbldatag contains details of installation using alternative mechanisms. ## Compatibility -The Databricks Labs Data Generator framework can be used with Pyspark 3.1.2 and Python 3.8 or later. These are +The Databricks Labs Data Generator framework can be used with Pyspark 3.2.1 and Python 3.8 or later. These are compatible with the Databricks runtime 10.4 LTS and later releases. For full Unity Catalog support, we recommend using Databricks runtime 13.2 or later (Databricks 13.3 LTS or above preferred) diff --git a/python/dev_require.txt b/python/dev_require.txt index a34ed3b2..bb6229b7 100644 --- a/python/dev_require.txt +++ b/python/dev_require.txt @@ -5,7 +5,7 @@ pandas==1.2.4 pickleshare==0.7.5 py4j==0.10.9 pyarrow==4.0.1 -pyspark>=3.1.3 +pyspark>=3.2.1 python-dateutil==2.8.1 six==1.15.0 pyparsing==2.4.7 diff --git a/python/require.txt b/python/require.txt index 5f0e30a4..de5c0250 100644 --- a/python/require.txt +++ b/python/require.txt @@ -5,7 +5,7 @@ pandas==1.2.5 pickleshare==0.7.5 py4j==0.10.9 pyarrow==4.0.1 -pyspark>=3.1.3 +pyspark>=3.2.1 python-dateutil==2.8.1 six==1.15.0 pyparsing==2.4.7 From 26adee6d51ee2b9f2014ad397a609bb8acd5bd36 Mon Sep 17 00:00:00 2001 From: ronanstokes-db Date: Thu, 9 May 2024 18:47:02 -0700 Subject: [PATCH 2/7] wip --- docs/source/index.rst | 2 +- docs/source/using_streaming_data.rst | 107 +++++++++++++++++++++++++-- 2 files changed, 102 insertions(+), 7 deletions(-) diff --git a/docs/source/index.rst b/docs/source/index.rst index 5a879ea4..791a6ceb 100644 --- a/docs/source/index.rst +++ b/docs/source/index.rst @@ -28,7 +28,7 @@ As it is installable via `%pip install`, it can also be incorporated in environm Using data distributions Options for column specification Generating repeatable data - Using streaming data + Producing synthetic streaming data Generating JSON and structured column data Generating synthetic data from existing data Generating Change Data Capture (CDC) data diff --git a/docs/source/using_streaming_data.rst b/docs/source/using_streaming_data.rst index 20770853..2a3514b6 100644 --- a/docs/source/using_streaming_data.rst +++ b/docs/source/using_streaming_data.rst @@ -1,20 +1,51 @@ .. Databricks Labs Data Generator documentation master file, created by sphinx-quickstart on Sun Jun 21 10:54:30 2020. -Using Streaming Data -==================== +Producing synthetic streaming data +================================== +The Databricks Labs Data Generator can be used to generate synthetic streaming data using different synthetic data +streaming sources. + +When generating streaming data, the number of rows to be generated in the original construction of the data spec is +either ignored, and the number of rows and speed at which they are generated depends on the options passed to +the `build` method. + +When generating batch data, the data generation process begins with generating a base dataframe of seed values +using the `spark.range` method. The rules for additional column generation are then applied to this base data frame. + +For generation of streaming data, the base data frame is generated using one of the `spark.readStream` variations. +This is controlled by passing the argument `withStreaming=True` to the `build` method of the DataGenerator instance. +Additional options may be passed to control rate at which synthetic rows are generated. + +.. note:: + Prior to this release, only the structured streaming `rate` source was available to generate streaming data. But with + this release, you can use the `rate` source, the `rate-micro-batch` source and possibly other sources. + +In theory, any of the structured streaming sources are supported but we do not test compatibility for sources other +than the `rate` and `rate-micro-batch` sources. + +.. seealso:: + See the following links for more details: + + * `Spark Structured Streaming data sources `_ + +Generating Streaming Data +------------------------- You can make any data spec into a streaming data frame by passing additional options to the ``build`` method. -If the ``withStreaming`` option is used when building the data set, it will use a streaming rate source to generate -the data. You can control the streaming rate with the option ``rowsPerSecond``. +If the ``withStreaming`` option is used when building the data set, it will use a streaming source to generate +the data. By default, this will use a structured streaming `rate` data source as the base data frame. -In this case, the row count is ignored. +When using the `rate` streaming source, you can control the streaming rate with the option ``rowsPerSecond``. + +When using streaming data sources, the row count specified in the call to the DataGenerator instance constructor +is ignored. In most cases, no further changes are needed to run the data generation as a streaming data generator. -As the generated data frame is a normal spark streaming data frame, all the same caveats and features apply. +As the generated data frame is a spark structured streaming data frame, all the relevant caveats and features apply. Example 1: site code and technology ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ @@ -64,7 +95,71 @@ Example 1: site code and technology display(df) +Customizing streaming data generation +------------------------------------- + +You can customize how the streaming data is generated using the options to the ``build`` command. + +There are two types of options: + + * Options that are interpreted by the streaming data generation process. These options begin with `'dbldatagen.'` + + * Options that are passed through to the underlying streaming data frame. All other options are passed through to the + `options` method of the underlying dataframe. + +.. list-table:: **Data generation options for generating streaming data** + :header-rows: 1 + + * - Option + - Usage + + * - `dbldatagen.streaming.source` + - Type of streaming source to generate. Defaults to `rate` + + * - `dbldatagen.streaming.sourcePath` + - Path for file based data sources. + + * - `dbldatagen.streaming.sourceSchema` + - Schema for source of streaming file sources + + * - `dbldatagen.streaming.sourceIdField` + - Name of source id field - defaults to `value` + + * - `dbldatagen.streaming.sourceTimestampField` + - Name of source timestamp field - defaults to `timestamp` + + * - `dbldatagen.streaming.generateTimestamp` + - if set to `True`, automatically generates a timestamp field if none present + + +The type of the streaming source may be the fully qualified name of a custom streaming source, a built in streaming +source such as `rate` or `rate-micro-batch`, or the name of a file format such as `parquet`, `delta`, or `csv`. + +File based data source support `csv`, `parquet` and `delta` format files or folders of files. Files or folders of +files in `delta` format do not require specification of a schema as it is inferred from the underlying file. + +Files or folders of files in `csv` format require a schema. + +Any options that do not begin with the prefix `dbldatagen.` are passed through to the options method of the underlying +based data frame. + +When a schema is specified for a file based source, the schema should only specify the schema of the fields in the +underlying source, not for additional fields added by the data generation rules. + +.. note:: + Every streaming data source requires a field that can be designated as the seed field or `id` field. + This takes on the same role of the `id` field when batch data generation is used. + + This field will be renamed to the seed field name `id` (or to the custom seed field name, if it + has been overriden in the data generator constructor). + + Many streaming operations also require the designation of a timestamp field to represent event time. This may + be read from the underlying streaming source, or automatically generated. This is also needed if using + enhanced event time (described in a later section). +What happens if there are other fields in the underlying data source? These are ignored but fields in the generation +spec may refer to them. However, unless a field generation rule replicates the data in the source field, it will not +appear in the generated data. Example 2: IOT style data ^^^^^^^^^^^^^^^^^^^^^^^^^ From ceb4df9de4e0dfc563dbf75cd3ca18303455c763 Mon Sep 17 00:00:00 2001 From: ronanstokes-db Date: Thu, 9 May 2024 18:55:53 -0700 Subject: [PATCH 3/7] wip --- docs/source/using_streaming_data.rst | 16 +++++----------- 1 file changed, 5 insertions(+), 11 deletions(-) diff --git a/docs/source/using_streaming_data.rst b/docs/source/using_streaming_data.rst index 2a3514b6..fe302569 100644 --- a/docs/source/using_streaming_data.rst +++ b/docs/source/using_streaming_data.rst @@ -132,20 +132,12 @@ There are two types of options: - if set to `True`, automatically generates a timestamp field if none present -The type of the streaming source may be the fully qualified name of a custom streaming source, a built in streaming -source such as `rate` or `rate-micro-batch`, or the name of a file format such as `parquet`, `delta`, or `csv`. - -File based data source support `csv`, `parquet` and `delta` format files or folders of files. Files or folders of -files in `delta` format do not require specification of a schema as it is inferred from the underlying file. - -Files or folders of files in `csv` format require a schema. +The type of the streaming source may be the fully qualified name of a custom streaming source, or a built in streaming +source such as `rate` or `rate-micro-batch`. Any options that do not begin with the prefix `dbldatagen.` are passed through to the options method of the underlying based data frame. -When a schema is specified for a file based source, the schema should only specify the schema of the fields in the -underlying source, not for additional fields added by the data generation rules. - .. note:: Every streaming data source requires a field that can be designated as the seed field or `id` field. This takes on the same role of the `id` field when batch data generation is used. @@ -227,7 +219,9 @@ data generation for. .withColumn("event_ts", "timestamp", expr="now()") ) - dfTestDataStreaming = testDataSpec.build(withStreaming=True, options={'rowsPerSecond': 500}) + dfTestDataStreaming = testDataSpec.build(withStreaming=True, + options={'rowsPerBatch': 500, + 'dbldatagen.streaming.source': 'rate-micro-batch' }) # ... do something with your streaming source here display(dfTestDataStreaming) From 4f6a8575be2a2e352cdcb7749ef0b7944bac8b0a Mon Sep 17 00:00:00 2001 From: ronanstokes-db Date: Thu, 9 May 2024 22:36:55 -0700 Subject: [PATCH 4/7] wip --- docs/source/using_streaming_data.rst | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/docs/source/using_streaming_data.rst b/docs/source/using_streaming_data.rst index fe302569..1f3b41a2 100644 --- a/docs/source/using_streaming_data.rst +++ b/docs/source/using_streaming_data.rst @@ -219,9 +219,13 @@ data generation for. .withColumn("event_ts", "timestamp", expr="now()") ) - dfTestDataStreaming = testDataSpec.build(withStreaming=True, - options={'rowsPerBatch': 500, - 'dbldatagen.streaming.source': 'rate-micro-batch' }) + # note that by default, for a rate source, the timestamp starts with the current time + # but for `rate-micro-batch` it starts with start of the epoch (1/1/1970) unless + # a value for `startTimestamp` is provided + streamingOptions = {'rowsPerSecond': 500, + 'dbldatagen.streaming.source': 'rate-micro-batch', + 'startTimestamp': int(time.time() * 1000)} + dfTestDataStreaming = testDataSpec.build(withStreaming=True, options=streamingOptions) # ... do something with your streaming source here display(dfTestDataStreaming) From 996509f3d5fd9b88371b872e8253ead91bb31830 Mon Sep 17 00:00:00 2001 From: ronanstokes-db Date: Fri, 10 May 2024 00:09:28 -0700 Subject: [PATCH 5/7] wip --- CHANGELOG.md | 2 + dbldatagen/data_generator.py | 102 +++++++++++++++++++++- dbldatagen/enhanced_event_time.py | 138 ++++++++++++++++++++++++++++++ 3 files changed, 241 insertions(+), 1 deletion(-) create mode 100644 dbldatagen/enhanced_event_time.py diff --git a/CHANGELOG.md b/CHANGELOG.md index ccad5989..17f7ff3a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,8 @@ All notable changes to the Databricks Labs Data Generator will be documented in #### Changed * Minimum spark version is PySpark 3.2.1 0 which is the minimum version for Databricks Runtime 10.4 LTS +* Changed parsing of build options for data generator to support use of custom streaming +* Documentation updates in support of new features such as streaming, complex structures etc ### Version 0.3.6 diff --git a/dbldatagen/data_generator.py b/dbldatagen/data_generator.py index 9f99e903..a69c9f25 100644 --- a/dbldatagen/data_generator.py +++ b/dbldatagen/data_generator.py @@ -25,8 +25,32 @@ _OLD_MIN_OPTION = 'min' _OLD_MAX_OPTION = 'max' +_STREAMING_SOURCE_OPTION = "dbldatagen.streaming.source" +_STREAMING_SCHEMA_OPTION = "dbldatagen.streaming.sourceSchema" +_STREAMING_PATH_OPTION = "dbldatagen.streaming.sourcePath" +_STREAMING_TABLE_OPTION = "dbldatagen.streaming.sourceTable" +_STREAMING_ID_FIELD_OPTION = "dbldatagen.streaming.sourceIdField" +_STREAMING_TIMESTAMP_FIELD_OPTION = "dbldatagen.streaming.sourceTimestampField" +_STREAMING_GEN_TIMESTAMP_OPTION = "dbldatagen.streaming.generateTimestamp" +_STREAMING_USE_SOURCE_FIELDS = "dbldatagen.streaming.sourceFields" +_BUILD_OPTION_PREFIX = "dbldatagen." + _STREAMING_TIMESTAMP_COLUMN = "_source_timestamp" +_STREAMING_SOURCE_RATE = "rate" +_STREAMING_SOURCE_RATE_MICRO_BATCH = "rate-micro-batch" +_STREAMING_SOURCE_NUM_PARTITIONS = "numPartitions" +_STREAMING_SOURCE_ROWS_PER_BATCH = "rowsPerBatch" +_STREAMING_SOURCE_ROWS_PER_SECOND = "rowsPerSecond" +_STREAM_SOURCE_START_TIMESTAMP = "startTimestamp" + +_STREAMING_SOURCE_TEXT = "text" +_STREAMING_SOURCE_PARQUET = "parquet" +_STREAMING_SOURCE_CSV = "csv" +_STREAMING_SOURCE_JSON = "json" +_STREAMING_SOURCE_ORC = "ord" +_STREAMING_SOURCE_DELTA = "delta" + class DataGenerator: """ Main Class for test data set generation @@ -53,6 +77,11 @@ class DataGenerator: Note: in a shared spark session, the sparkContext is not available, so the default parallelism is set to 200. We recommend passing an explicit value for `partitions` in this case. + + Note that the number of partitions requested is not guaranteed to be supplied when generating + a streaming data set using a streaming source that generates a different number of partitions. + However for most batch use cases, the requested number of partitions will be generated. + """ # class vars @@ -955,7 +984,7 @@ def withStructColumn(self, colName, fields=None, asJson=False, **kwargs): if asJson: output_expr = f"to_json({struct_expr})" - newDf = self.withColumn(colName, StringType(), expr=output_expr, **kwargs) + newDf = self.withColumn(colName, StringType(), expr=output_expr, **kwargs) else: newDf = self.withColumn(colName, INFER_DATATYPE, expr=struct_expr, **kwargs) @@ -1044,6 +1073,7 @@ def _getBaseDataFrame(self, startId=0, streaming=False, options=None): end_id = self._rowCount + startId id_partitions = self.partitions if self.partitions is not None else 4 + build_options, passthrough_options, unsupported_options = self._parseBuildOptions(options) if not streaming: status = f"Generating data frame with ids from {startId} to {end_id} with {id_partitions} partitions" @@ -1233,6 +1263,76 @@ def computeBuildPlan(self): self.buildPlanComputed = True return self + def _parseBuildOptions(self, options): + """ Parse build options + Parse build options into tuple of dictionaries - (datagen options, passthrough options, unsupported options) + where + - `datagen options` is dictionary of options to be interpreted by the data generator + - `passthrough options` is dictionary of options to be passed through to the underlying base dataframe + - `supported options` is dictionary of options that are not supported + :param options: Dict of options to control generating of data + :returns: tuple of options dictionaries - (datagen_options, passthrough_options, unsupported options) + """ + passthrough_options = {} + unsupported_options = {} + datagen_options = {} + + supported_options = [_STREAMING_SOURCE_OPTION, + _STREAMING_SCHEMA_OPTION, + _STREAMING_PATH_OPTION, + _STREAMING_TABLE_OPTION, + _STREAMING_ID_FIELD_OPTION, + _STREAMING_TIMESTAMP_FIELD_OPTION, + _STREAMING_GEN_TIMESTAMP_OPTION, + _STREAMING_USE_SOURCE_FIELDS + ] + + if options is not None: + for k, v in options.items(): + if isinstance(k, str): + if k.startswith(_BUILD_OPTION_PREFIX): + if k in supported_options: + datagen_options[k] = v + else: + unsupported_options[k] = v + else: + passthrough_options[k] = v + else: + unsupported_options[k] = v + + # add defaults + + return datagen_options, passthrough_options, unsupported_options + + def _applyStreamingDefaults(self, build_options, passthrough_options): + """ Apply default options for streaming data generation""" + assert build_options is not None + assert passthrough_options is not None + + # default to `rate` streaming source + if _STREAMING_SOURCE_OPTION not in build_options: + build_options[_STREAMING_SOURCE_OPTION] = _STREAMING_SOURCE_RATE + + # setup `numPartitions` if not specified + if build_options[_STREAMING_SOURCE_OPTION] in [_STREAMING_SOURCE_RATE, _STREAMING_SOURCE_RATE_MICRO_BATCH]: + if _STREAMING_SOURCE_NUM_PARTITIONS not in passthrough_options: + passthrough_options[_STREAMING_SOURCE_NUM_PARTITIONS] = self.partitions + + # set up rows per batch if not specified + if build_options[_STREAMING_SOURCE_OPTION] == _STREAMING_SOURCE_RATE: + if _STREAMING_SOURCE_ROWS_PER_SECOND not in passthrough_options: + passthrough_options[_STREAMING_SOURCE_ROWS_PER_SECOND] = 1 + + if build_options[_STREAMING_SOURCE_OPTION] == _STREAMING_SOURCE_RATE_MICRO_BATCH: + if _STREAMING_SOURCE_ROWS_PER_BATCH not in passthrough_options: + passthrough_options[_STREAMING_SOURCE_ROWS_PER_BATCH] = 1 + if _STREAM_SOURCE_START_TIMESTAMP not in passthrough_options: + currentTs = math.floor(time.mktime(time.localtime())) * 1000 + passthrough_options[_STREAM_SOURCE_START_TIMESTAMP] = currentTs + + if build_options[_STREAMING_SOURCE_OPTION] == _STREAMING_SOURCE_TEXT: + self.logger.warning("Use of the `text` format may not work due to lack of type support") + def build(self, withTempView=False, withView=False, withStreaming=False, options=None): """ build the test data set from the column definitions and return a dataframe for it diff --git a/dbldatagen/enhanced_event_time.py b/dbldatagen/enhanced_event_time.py new file mode 100644 index 00000000..bb198c29 --- /dev/null +++ b/dbldatagen/enhanced_event_time.py @@ -0,0 +1,138 @@ +# See the License for the specific language governing permissions and +# limitations under the License. +# + +""" +This file defines the `EnhancedEventTime` class + +This defines helper methods for implementing enhanced event time +""" + +import logging + + +from pyspark.sql.functions import col, pandas_udf +from pyspark.sql.functions import lit, concat, rand, round as sql_round, array, expr, when, udf, \ + format_string +from pyspark.sql.types import FloatType, IntegerType, StringType, DoubleType, BooleanType, \ + TimestampType, DataType, DateType + +from .column_spec_options import ColumnSpecOptions +from .datagen_constants import RANDOM_SEED_FIXED, RANDOM_SEED_HASH_FIELD_NAME, RANDOM_SEED_RANDOM +from .daterange import DateRange +from .distributions import Normal, DataDistribution +from .nrange import NRange +from .text_generators import TemplateGenerator +from .utils import ensure, coalesce_values + +import dbldatagen as dg +import dbldatagen.distributions as dist + +HASH_COMPUTE_METHOD = "hash" +VALUES_COMPUTE_METHOD = "values" +RAW_VALUES_COMPUTE_METHOD = "raw_values" +AUTO_COMPUTE_METHOD = "auto" +COMPUTE_METHOD_VALID_VALUES = [HASH_COMPUTE_METHOD, + AUTO_COMPUTE_METHOD, + VALUES_COMPUTE_METHOD, + RAW_VALUES_COMPUTE_METHOD] + + + +class EnhancedEventTimeHelper(object): + + def init(self): + pass + + def withEnhancedEventTime(self, + dataspec, + startEventTime=None, + acceleratedEventTimeInterval="10 minutes", + fractionLateArriving=0.1, + lateTimeInterval="6 hours", + jitter=(-0.25, 0.9999), + eventTimeName=None, + baseColumn="timestamp", + keepIntermediateColumns=False): + """ + Implement enhanced event time + + :param dataspec: dataspec to apply enhanced event time to + :param startEventTime: start timestamp of output event time + :param acceleratedEventTimeInterval: interval for accelerated event time (i.e "10 minutes") + :param fractionLateArriving: fraction of late arriving data. range [0.0, 1.0] + :param lateTimeInterval: interval for late arriving events (i.e "6 hours") + :param jitter: jitter factor to avoid strictly increasing order in events + :param eventTimeName: Column name for generated event time column + :param baseColumn: Base column name used for computations of adjusted event time + :param keepIntermediateColumns: Flag to retain intermediate columns in the output, [ debug / test only] + + This adjusts the dataframe to produce IOT style event time that normally increases over time but has a + configurable fraction of late arriving data. It uses a base timestamp column (called `timestamp` by default) + to compute data. There must be a column of this name in the source data frame and it should have the value of + "now()". + + By default `rate` and `rate-micro-batch` streaming sources have this column but + it can be added to other dataframes - either batch or streaming data frames. + + While the overall intent is to support synthetic IOT style simulated device data in a stream, it can be used + with a batch data frame (as long as an appropriate timestamp column is added and designated as the base data + timestamp column. + """ + + assert startEventTime is not None, "value for `startTime` must be specified" + assert dataspec is not None, "dataspec must be specified" + assert 0.0 <= fractionLateArriving <= 1.0, "fractionLateArriving must be in range [0.0, 1.0]" + assert eventTimeName is not None, "eventTimeName argument must be supplied" + + # determine timestamp for start of generation + start_of_generation = \ + dataspec.sparkSession.sql(f"select cast(now() as string) as start_timestamp").collect()[0][ + 'start_timestamp'] + + omitInterimColumns = not keepIntermediateColumns + + retval = (dataspec + .withColumn("_late_arrival_factor1", "double", minValue=0.0, maxValue=1.0, continuous=True, + random=True, + distribution=dist.Beta(2.0, 5.3), omit=omitInterimColumns) + .withColumn("_late_arrival_factor", "double", expr="least(_late_arrival_factor1 * 1.17, 1.0)", + baseColumn="_late_arrival_factor1", + omit=omitInterimColumns) + .withColumn("_stream_time", "timestamp", expr=f"{baseColumn}", + omit=omitInterimColumns, baseColumn=baseColumn) + .withColumn("_data_gen_time", "timestamp", expr="now()", omit=omitInterimColumns) + .withColumn("_difference_factor", "double", + expr=f"cast(_stream_time as double) - cast(TIMESTAMP '{start_of_generation}' as double)", + baseColumns=["_stream_time"], + omit=omitInterimColumns) + .withColumn("_jitter_within_event_interval", "double", minValue=jitter[0], maxValue=jitter[1], + continuous=True, random=True, + omit=omitInterimColumns) + .withColumn("_late_arrival_prob", "double", minValue=0.0, maxValue=1.0, continuous=True, + random=True, + omit=omitInterimColumns) + .withColumn("_late_arrival", "boolean", + expr=f"case when _late_arrival_prob <= {fractionLateArriving} then true else false end", + baseColumns=["_late_arrival_prob"], + omit=omitInterimColumns) + .withColumn("_ontime_event_ts", "timestamp", + expr=f"""greatest(TIMESTAMP '{startEventTime}' + + ((_difference_factor + _jitter_within_event_interval) + * INTERVAL {acceleratedEventTimeInterval}), + TIMESTAMP '{startEventTime}') """, + baseColumns=["_difference_factor", "_jitter_within_event_interval"], + omit=omitInterimColumns) + .withColumn("_late_arrival_ts", "timestamp", + expr=f"""greatest(_ontime_event_ts - (_late_arrival_factor * INTERVAL {lateTimeInterval}), + TIMESTAMP '{startEventTime}') + """, + baseColumns=["_ontime_event_ts", "_late_arrival_factor"], + omit=omitInterimColumns) + + # generate event time column + .withColumn(eventTimeName, "timestamp", + expr="case when _late_arrival then _late_arrival_ts else _ontime_event_ts end", + baseColumns=["_late_arrival", "_late_arrival_ts", "_ontime_event_ts"]) + ) + return retval From be16b0c2560d8a8da248ddefb41471dd06bb7ca2 Mon Sep 17 00:00:00 2001 From: ronanstokes-db Date: Tue, 14 May 2024 16:56:53 -0700 Subject: [PATCH 6/7] wip --- dbldatagen/data_generator.py | 1 + python/dev_require.txt | 2 +- python/require.txt | 2 +- 3 files changed, 3 insertions(+), 2 deletions(-) diff --git a/dbldatagen/data_generator.py b/dbldatagen/data_generator.py index a69c9f25..9b02a9e6 100644 --- a/dbldatagen/data_generator.py +++ b/dbldatagen/data_generator.py @@ -28,6 +28,7 @@ _STREAMING_SOURCE_OPTION = "dbldatagen.streaming.source" _STREAMING_SCHEMA_OPTION = "dbldatagen.streaming.sourceSchema" _STREAMING_PATH_OPTION = "dbldatagen.streaming.sourcePath" +_STREAMING_FORMAT_OPTION = "dbldatagen.streaming.sourceFormat" _STREAMING_TABLE_OPTION = "dbldatagen.streaming.sourceTable" _STREAMING_ID_FIELD_OPTION = "dbldatagen.streaming.sourceIdField" _STREAMING_TIMESTAMP_FIELD_OPTION = "dbldatagen.streaming.sourceTimestampField" diff --git a/python/dev_require.txt b/python/dev_require.txt index bb6229b7..26f63282 100644 --- a/python/dev_require.txt +++ b/python/dev_require.txt @@ -3,7 +3,7 @@ numpy==1.22.0 pandas==1.2.4 pickleshare==0.7.5 -py4j==0.10.9 +py4j>=0.10.9.3 pyarrow==4.0.1 pyspark>=3.2.1 python-dateutil==2.8.1 diff --git a/python/require.txt b/python/require.txt index de5c0250..e396a5f0 100644 --- a/python/require.txt +++ b/python/require.txt @@ -3,7 +3,7 @@ numpy==1.22.0 pandas==1.2.5 pickleshare==0.7.5 -py4j==0.10.9 +py4j>=0.10.9.3 pyarrow==4.0.1 pyspark>=3.2.1 python-dateutil==2.8.1 From 9d3dbd047b32b9959a38036f5aaeefb1b3608755 Mon Sep 17 00:00:00 2001 From: ronanstokes-db Date: Fri, 17 May 2024 13:09:44 -0700 Subject: [PATCH 7/7] wip --- dbldatagen/data_generator.py | 2 ++ dbldatagen/enhanced_event_time.py | 23 ++--------------------- 2 files changed, 4 insertions(+), 21 deletions(-) diff --git a/dbldatagen/data_generator.py b/dbldatagen/data_generator.py index 9b02a9e6..448e0594 100644 --- a/dbldatagen/data_generator.py +++ b/dbldatagen/data_generator.py @@ -7,6 +7,8 @@ """ import copy import logging +import math +import time import re from pyspark.sql.types import LongType, IntegerType, StringType, StructType, StructField, DataType diff --git a/dbldatagen/enhanced_event_time.py b/dbldatagen/enhanced_event_time.py index bb198c29..5f76708a 100644 --- a/dbldatagen/enhanced_event_time.py +++ b/dbldatagen/enhanced_event_time.py @@ -8,24 +8,6 @@ This defines helper methods for implementing enhanced event time """ -import logging - - -from pyspark.sql.functions import col, pandas_udf -from pyspark.sql.functions import lit, concat, rand, round as sql_round, array, expr, when, udf, \ - format_string -from pyspark.sql.types import FloatType, IntegerType, StringType, DoubleType, BooleanType, \ - TimestampType, DataType, DateType - -from .column_spec_options import ColumnSpecOptions -from .datagen_constants import RANDOM_SEED_FIXED, RANDOM_SEED_HASH_FIELD_NAME, RANDOM_SEED_RANDOM -from .daterange import DateRange -from .distributions import Normal, DataDistribution -from .nrange import NRange -from .text_generators import TemplateGenerator -from .utils import ensure, coalesce_values - -import dbldatagen as dg import dbldatagen.distributions as dist HASH_COMPUTE_METHOD = "hash" @@ -38,7 +20,6 @@ RAW_VALUES_COMPUTE_METHOD] - class EnhancedEventTimeHelper(object): def init(self): @@ -87,8 +68,8 @@ def withEnhancedEventTime(self, # determine timestamp for start of generation start_of_generation = \ - dataspec.sparkSession.sql(f"select cast(now() as string) as start_timestamp").collect()[0][ - 'start_timestamp'] + dataspec.sparkSession.sql("select cast(now() as string) as start_timestamp").collect()[0][ + 'start_timestamp'] omitInterimColumns = not keepIntermediateColumns