Skip to content

Feature streaming enhancments #127

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 40 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
738d220
wip
ronanstokes-db Oct 20, 2022
94a90e8
wip
ronanstokes-db Oct 20, 2022
51adf39
wip
ronanstokes-db Oct 20, 2022
46f8406
Merge branch 'master' into feature_streaming_enhancments
ronanstokes-db Oct 21, 2022
e0f7887
Merge branch 'master' into feature_streaming_enhancments
ronanstokes-db Oct 25, 2022
2b3060e
Merge branch 'master' into feature_streaming_enhancments
ronanstokes-db Nov 1, 2022
1b91312
Merge branch 'master' into feature_streaming_enhancments
ronanstokes-db Nov 28, 2022
8bf2e22
Merge branch 'master' into feature_streaming_enhancments
ronanstokes-db Feb 14, 2023
305a16a
wip
ronanstokes-db Feb 16, 2023
8f95ca5
wip
ronanstokes-db Feb 16, 2023
13c3a91
wip
ronanstokes-db Feb 17, 2023
11cfacd
documentation updates
ronanstokes-db Feb 28, 2023
fa9365b
modified base examples for better compatibility with notebooks
ronanstokes-db Feb 28, 2023
f19b014
Merge branch 'master' into feature_streaming_enhancments
ronanstokes-db Mar 2, 2023
423b405
Merge branch 'master' into feature_streaming_enhancments
ronanstokes-db Mar 2, 2023
680e425
merge from origin
ronanstokes-db Mar 2, 2023
855410c
updated changelog
ronanstokes-db Mar 2, 2023
7a90397
separated documentation changes to separate branch
ronanstokes-db Mar 2, 2023
f62d728
wip
ronanstokes-db Mar 3, 2023
1c9e268
wip
ronanstokes-db Mar 3, 2023
12fb038
wip
ronanstokes-db Mar 3, 2023
60d56e6
wip
ronanstokes-db Mar 3, 2023
b7fafe9
wip
ronanstokes-db Mar 5, 2023
4a646f2
Merge branch 'master' into feature_streaming_enhancments
ronanstokes-db Mar 6, 2023
2425b37
wip
ronanstokes-db Mar 6, 2023
16b27fd
wip
ronanstokes-db Mar 6, 2023
104de6b
wip
ronanstokes-db Mar 6, 2023
d7f66bc
wip
ronanstokes-db Mar 6, 2023
a2d98ae
wip
ronanstokes-db Mar 7, 2023
b363acc
wip
ronanstokes-db Mar 7, 2023
5f51db1
wip
ronanstokes-db Mar 10, 2023
bf00500
wip
ronanstokes-db Mar 10, 2023
51ba870
Merge branch 'master' into feature_streaming_enhancments
ronanstokes-db Mar 11, 2023
c24aa4f
merge from master
ronanstokes-db Mar 11, 2023
ab5908a
Merge branch 'master' into feature_streaming_enhancments
ronanstokes-db Mar 17, 2023
85a0312
merged from master
ronanstokes-db Jul 2, 2023
92fad0c
Merge branch 'master' into feature_streaming_enhancments
ronanstokes-db Jul 2, 2023
84f3383
merged from remote
ronanstokes-db Jul 2, 2023
4ac18c9
changes to changelog
ronanstokes-db Jul 2, 2023
041b3dc
changes to changelog
ronanstokes-db Jul 2, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,20 @@
## Change History
All notable changes to the Databricks Labs Data Generator will be documented in this file.

### Unreleased
### Version 0.3.5

#### Changed
* Added formatting of generated code as Html for script methods
* Modified pipfile to use newer version of package specifications
* Additional migration of tests to use of `pytest`
* Changed parsing of build options for data generator to support use of custom streaming
* Documentation updates in support of new features such as streaming, complex structures etc

#### Added
* Added support for additional streaming source types and for use of custom streaming sources
* Added support for use of file reads as a streaming source (for seed and timestamp columns only)
* Added support for complex event time in streaming scenarios. It may also be used in batch scenarios for testing


### Version 0.3.4 Post 3

Expand Down
241 changes: 221 additions & 20 deletions dbldatagen/data_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
import copy
import logging
import re
import time
import math

from pyspark.sql.types import LongType, IntegerType, StringType, StructType, StructField, DataType
from .spark_singleton import SparkSingleton
Expand All @@ -19,13 +21,37 @@
from .utils import ensure, topologicalSort, DataGenError, deprecated, split_list_matching_condition
from .html_utils import HtmlUtils
from . _version import _get_spark_version
from .enhanced_event_time import EnhancedEventTimeHelper
from .schema_parser import SchemaParser

_OLD_MIN_OPTION = 'min'
_OLD_MAX_OPTION = 'max'

_STREAMING_SOURCE_OPTION = "dbldatagen.streaming.source"
_STREAMING_SCHEMA_OPTION = "dbldatagen.streaming.sourceSchema"
_STREAMING_PATH_OPTION = "dbldatagen.streaming.sourcePath"
_STREAMING_TABLE_OPTION = "dbldatagen.streaming.sourceTable"
_STREAMING_ID_FIELD_OPTION = "dbldatagen.streaming.sourceIdField"
_STREAMING_TIMESTAMP_FIELD_OPTION = "dbldatagen.streaming.sourceTimestampField"
_STREAMING_GEN_TIMESTAMP_OPTION = "dbldatagen.streaming.generateTimestamp"
_STREAMING_USE_SOURCE_FIELDS = "dbldatagen.streaming.sourceFields"
_BUILD_OPTION_PREFIX = "dbldatagen."

_STREAMING_TIMESTAMP_COLUMN = "_source_timestamp"

_STREAMING_SOURCE_RATE = "rate"
_STREAMING_SOURCE_RATE_MICRO_BATCH = "rate-micro-batch"
_STREAMING_SOURCE_NUM_PARTITIONS = "numPartitions"
_STREAMING_SOURCE_ROWS_PER_BATCH = "rowsPerBatch"
_STREAMING_SOURCE_ROWS_PER_SECOND = "rowsPerSecond"
_STREAM_SOURCE_START_TIMESTAMP = "startTimestamp"

_STREAMING_SOURCE_TEXT = "text"
_STREAMING_SOURCE_PARQUET = "parquet"
_STREAMING_SOURCE_CSV = "csv"
_STREAMING_SOURCE_JSON = "json"
_STREAMING_SOURCE_ORC = "ord"
_STREAMING_SOURCE_DELTA = "delta"

class DataGenerator:
""" Main Class for test data set generation
Expand All @@ -49,6 +75,11 @@ class DataGenerator:
it is recommended that you use a different name for the seed column - for example `_id`.

This may be specified by setting the `seedColumnName` attribute to `_id`

Note that the number of partitions requested is not guaranteed to be supplied when generating
a streaming data set using a streaming source that generates a different number of partitions.
However for most batch use cases, the requested number of partitions will be generated.

"""

# class vars
Expand Down Expand Up @@ -154,6 +185,10 @@ def __init__(self, sparkSession=None, name=None, randomSeedMethod=None,
self._inferredSchemaFields = []
self.buildPlanComputed = False

# set of columns that must be present during processing
# will be map from column name to reason it is needed
self._requiredColumns = {}

# lets add the seed column
self.withColumn(self._seedColumnName, LongType(), nullable=False, implicit=True, omit=True, noWarn=True)
self._batchSize = batchSize
Expand Down Expand Up @@ -885,9 +920,12 @@ def _getBaseDataFrame(self, startId=0, streaming=False, options=None):

:returns: Spark data frame for base data that drives the data generation
"""
assert self.partitions is not None, "Expecting partition count to be initialized"

id_partitions = self.partitions
end_id = self._rowCount + startId
id_partitions = self.partitions if self.partitions is not None else 4

build_options, passthrough_options, unsupported_options = self._parseBuildOptions(options)

if not streaming:
status = f"Generating data frame with ids from {startId} to {end_id} with {id_partitions} partitions"
Expand All @@ -902,34 +940,120 @@ def _getBaseDataFrame(self, startId=0, streaming=False, options=None):
df1 = df1.withColumnRenamed(SPARK_RANGE_COLUMN, self._seedColumnName)

else:
status = (
f"Generating streaming data frame with ids from {startId} to {end_id} with {id_partitions} partitions")
self._applyStreamingDefaults(build_options, passthrough_options)

assert _STREAMING_SOURCE_OPTION in build_options.keys(), "There must be a source type specified"
streaming_source_format = build_options[_STREAMING_SOURCE_OPTION]

if streaming_source_format in [ _STREAMING_SOURCE_RATE, _STREAMING_SOURCE_RATE_MICRO_BATCH]:
streaming_partitions = passthrough_options[_STREAMING_SOURCE_NUM_PARTITIONS]
status = (
f"Generating streaming data frame with {streaming_partitions} partitions")
else:
status = (
f"Generating streaming data frame with '{streaming_source_format}' streaming source")

self.logger.info(status)
self.executionHistory.append(status)

df1 = (self.sparkSession.readStream
.format("rate"))
if options is not None:
if "rowsPerSecond" not in options:
options['rowsPerSecond'] = 1
if "numPartitions" not in options:
options['numPartitions'] = id_partitions

for k, v in options.items():
df1 = df1.option(k, v)
df1 = (df1.load()
.withColumnRenamed("value", self._seedColumnName)
)
.format(streaming_source_format))

for k, v in passthrough_options.items():
df1 = df1.option(k, v)

file_formats = [_STREAMING_SOURCE_TEXT, _STREAMING_SOURCE_JSON, _STREAMING_SOURCE_CSV,
_STREAMING_SOURCE_PARQUET, _STREAMING_SOURCE_DELTA, _STREAMING_SOURCE_ORC]

data_path = None
source_table = None
id_column = "value"

if _STREAMING_ID_FIELD_OPTION in build_options:
id_column = build_options[_STREAMING_ID_FIELD_OPTION]

if _STREAMING_TABLE_OPTION in build_options:
source_table = build_options[_STREAMING_TABLE_OPTION]

if _STREAMING_SCHEMA_OPTION in build_options:
source_schema = build_options[_STREAMING_SCHEMA_OPTION]
df1 = df1.schema(source_schema)

# get path for file based reads
if _STREAMING_PATH_OPTION in build_options:
data_path = build_options[_STREAMING_PATH_OPTION]
elif streaming_source_format in file_formats:
if "path" in passthrough_options:
data_path = passthrough_options["path"]

if data_path is not None:
df1 = df1.load(data_path)
elif source_table is not None:
df1 = df1.table(source_table)
else:
df1 = (df1.option("rowsPerSecond", 1)
.option("numPartitions", id_partitions)
.load()
.withColumnRenamed("value", self._seedColumnName)
)
df1 = df1.load()

if id_column != self._seedColumnName:
df1 = df1.withColumnRenamed(id_column, self._seedColumnName)

return df1

def withEnhancedEventTime(self, startEventTime=None,
acceleratedEventTimeInterval="10 minutes",
fractionLateArriving=0.1,
lateTimeInterval="6 hours",
jitter=(-0.25, 0.9999),
eventTimeName="event_ts",
baseColumn="timestamp",
keepIntermediateColumns=False):
"""
Delegate to EnhancedEventTimeHelper to implement enhanced event time

:param startEventTime: start timestamp of output event time
:param acceleratedEventTimeInterval: interval for accelerated event time (i.e "10 minutes")
:param fractionLateArriving: fraction of late arriving data. range [0.0, 1.0]
:param lateTimeInterval: interval for late arriving events (i.e "6 hours")
:param jitter: jitter factor to avoid strictly increasing order in events
:param eventTimeName: Column name for generated event time column
:param baseColumn: Base column name used for computations of adjusted event time
:param keepIntermediateColumns: Flag to retain intermediate columns in the output, [ debug / test only]

:returns instance of data generator (note caller object is changed)

This adjusts the dataframe to produce IOT style event time that normally increases over time but has a
configurable fraction of late arriving data. It uses a base timestamp column (called timestamp)
to compute data. There must be a column of the same name as the base timestamp column in the source data frame
and it should have the value of "now()".

The modified dataframe will have a new column named using the `eventTimeName` parameter containing the
new timestamp value.

By default `rate` and `rate-micro-batch` streaming sources have this column but
it can be added to other dataframes - either batch or streaming data frames.

While the overall intent is to support synthetic IOT style simulated device data in a stream, it can be used
with a batch data frame (as long as an appropriate timestamp column is added and designated as the base data
timestamp column.
"""

helper = EnhancedEventTimeHelper()
assert baseColumn is not None and len(baseColumn) > 0, "baseColumn argument must be supplied"

# add metadata for required timestamp field
self._requiredColumns[baseColumn] = f"Column '{baseColumn}' is required by `withEnhancedEventTime` processing"

helper.withEnhancedEventTime(self,
startEventTime,
acceleratedEventTimeInterval,
fractionLateArriving,
lateTimeInterval,
jitter,
eventTimeName,
baseColumn,
keepIntermediateColumns)

return self

def _computeColumnBuildOrder(self):
""" compute the build ordering using a topological sort on dependencies

Expand Down Expand Up @@ -1077,6 +1201,81 @@ def computeBuildPlan(self):
self.buildPlanComputed = True
return self

def _parseBuildOptions(self, options):
""" Parse build options

Parse build options into tuple of dictionaries - (datagen options, passthrough options, unsupported options)

where

- `datagen options` is dictionary of options to be interpreted by the data generator
- `passthrough options` is dictionary of options to be passed through to the underlying base dataframe
- `supported options` is dictionary of options that are not supported

:param options: Dict of options to control generating of data
:returns: tuple of options dictionaries - (datagen_options, passthrough_options, unsupported options)

"""
passthrough_options = {}
unsupported_options = {}
datagen_options = {}

supported_options = [_STREAMING_SOURCE_OPTION,
_STREAMING_SCHEMA_OPTION,
_STREAMING_PATH_OPTION,
_STREAMING_TABLE_OPTION,
_STREAMING_ID_FIELD_OPTION,
_STREAMING_TIMESTAMP_FIELD_OPTION,
_STREAMING_GEN_TIMESTAMP_OPTION,
_STREAMING_USE_SOURCE_FIELDS
]

if options is not None:
for k, v in options.items():
if isinstance(k, str):
if k.startswith(_BUILD_OPTION_PREFIX):
if k in supported_options:
datagen_options[k] = v
else:
unsupported_options[k] = v
else:
passthrough_options[k] = v
else:
unsupported_options[k] = v

# add defaults

return datagen_options, passthrough_options, unsupported_options

def _applyStreamingDefaults(self, build_options, passthrough_options):
assert build_options is not None
assert passthrough_options is not None

# default to `rate` streaming source
if _STREAMING_SOURCE_OPTION not in build_options:
build_options[_STREAMING_SOURCE_OPTION] = _STREAMING_SOURCE_RATE

# setup `numPartitions` if not specified
if build_options[_STREAMING_SOURCE_OPTION] in [_STREAMING_SOURCE_RATE, _STREAMING_SOURCE_RATE_MICRO_BATCH]:
if _STREAMING_SOURCE_NUM_PARTITIONS not in passthrough_options:
passthrough_options[_STREAMING_SOURCE_NUM_PARTITIONS] = self.partitions

# set up rows per batch if not specified
if build_options[_STREAMING_SOURCE_OPTION] == _STREAMING_SOURCE_RATE:
if _STREAMING_SOURCE_ROWS_PER_SECOND not in passthrough_options:
passthrough_options[_STREAMING_SOURCE_ROWS_PER_SECOND] = 1

if build_options[_STREAMING_SOURCE_OPTION] == _STREAMING_SOURCE_RATE_MICRO_BATCH:
if _STREAMING_SOURCE_ROWS_PER_BATCH not in passthrough_options:
passthrough_options[_STREAMING_SOURCE_ROWS_PER_BATCH] = 1
if _STREAM_SOURCE_START_TIMESTAMP not in passthrough_options:
currentTs = math.floor(time.mktime(time.localtime())) * 1000
passthrough_options[_STREAM_SOURCE_START_TIMESTAMP] = currentTs

if build_options[_STREAMING_SOURCE_OPTION] == _STREAMING_SOURCE_TEXT:
self.logger.warning("Use of the `text` format may not work due to lack of type support")


def build(self, withTempView=False, withView=False, withStreaming=False, options=None):
""" build the test data set from the column definitions and return a dataframe for it

Expand Down Expand Up @@ -1107,6 +1306,8 @@ def build(self, withTempView=False, withView=False, withStreaming=False, options

df1 = self._getBaseDataFrame(self.starting_id, streaming=withStreaming, options=options)

# TODO: check that the required columns are present in the base data frame

self.executionHistory.append("Using Pandas Optimizations {True}")

# build columns
Expand Down
Loading