diff --git a/Dockerfile.pyflink b/Dockerfile.pyflink new file mode 100644 index 0000000..aa7356c --- /dev/null +++ b/Dockerfile.pyflink @@ -0,0 +1,24 @@ +FROM flink:1.20.0-scala_2.12 + +# Install Python 3.11.11 +RUN apt-get update && apt-get install -y \ + software-properties-common && \ + add-apt-repository ppa:deadsnakes/ppa && \ + apt-get update && \ + apt-get install -y python3.11 python3.11-venv python3.11-dev && \ + ln -sf /usr/bin/python3.11 /usr/bin/python && \ + ln -sf /usr/bin/python3.11 /usr/bin/python3 && \ + rm -rf /var/lib/apt/lists/* + +# Install pip +RUN python3.11 -m ensurepip --upgrade && \ + ln -sf /usr/local/bin/pip3 /usr/bin/pip && \ + ln -sf /usr/local/bin/pip3 /usr/bin/pip3 + +# Copy requirements file +COPY requirements.txt /requirements.txt + +# Install Python dependencies +RUN pip install --no-cache-dir -r /requirements.txt + +CMD ["bash"] diff --git a/README.md b/README.md index b9e673a..78fe221 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,90 @@ # Cascade +## Benchmarking + +Requirements: +- Docker +- Conda +- Local flink client + +1. First create the conda environment with: + +``` +conda env create -f environment.yml +``` + +2. Activate the environment with: + +``` +conda activate cascade_env +``` + +3. Start the Kafka and Pyflink local clusters + +``` +docker compose up +``` + +This will launch: + +- a Kafka broker at `localhost:9092` (`kafka:9093` for inter-docker communication!) and, +- a [Kafbat UI](https://github.com/kafbat/kafka-ui) at http://localhost:8080 +- a local Flink cluster with `PyFlink` and all requirements, with a ui at http://localhost:8081 + +By default the flink cluster will run with 16 task slots. This can be changed +setting the `TASK_SLOTS` enviroment variable, for example: + +``` +TASK_SLOTS=32 docker compose up +``` + +You could also scale up the number of taskmanagers, each with the same defined +number of task slots (untested): + +``` +docker compose up --scale taskmanager=3 +``` + +Once everything has started (for example, you can see the web UIs running), you +can upload the benchmark job to the cluster. Note that the Kafka topics must be +emptied first, otherwise the job will immediately start consuming old events. +You can use the Kafbat UI for this, for example by deleting topics or purging +messages. To start the job, first navigate to the cascade repo directory e.g. +`cd /path/to/cascade`. Then run the following command, where `X` is the default +parallelism desired: + +``` +flink run --pyFiles /path/to/cascade/src,/path/to/cascade --pyModule deathstar_movie_review.demo -p X +``` + +> This command runs `FlinkRuntime.init`, which requires the location of a +> flink-python jarfile. +> The location is currently hardcoded in `src/cascade/runtime/flink_runtime` and +> should be changed based on your environment. The jar file is included as part +> of the flink installation itself, at https://flink.apache.org/downloads/ (1.20.1). + +Once the job is submitted, you can start the benchmark. Open another terminal in +the same directory (and conda environment) and run: + +``` +python -m deathstar_movie_review.start_benchmark +``` + +This will start the benchmark by sending events to Kafka. The first phase will +initialise the state required for the benchmark, and is not measured. The second +phase starts the actual becnhmark. + + +### Notes + +Currently trying to scale up higher than `-p 16`, however I ran into the +following issue on `-p 64` with `TASK_SLOTS=128`, more configuration might be required? + +``` +Caused by: java.io.IOException: Insufficient number of network buffers: required 65, but only 38 available. The total number of network buffers is currently set to 4096 of 32768 bytes each. You can increase this number by setting the configuration keys 'taskmanager.memory.network.fraction', 'taskmanager.memory.network.min', and 'taskmanager.memory.network.max'. +``` + + ## Development Cascade should work with Python 3.10 / 3.11 although other versions could work. Dependencies should first be installed with: @@ -8,7 +93,7 @@ Cascade should work with Python 3.10 / 3.11 although other versions could work. pip install -r requirements.txt ``` -## Testing +## (old) Testing The `pip install` command should have installed a suitable version of `pytest`. diff --git a/deathstar_movie_review/demo.py b/deathstar_movie_review/demo.py index 6e546e7..60a623b 100644 --- a/deathstar_movie_review/demo.py +++ b/deathstar_movie_review/demo.py @@ -1,189 +1,73 @@ -import hashlib -import uuid - -from .movie_data import movie_data -from .workload_data import movie_titles, charset -import random -from timeit import default_timer as timer -import sys +from typing import Literal +from cascade.dataflow.optimization.dead_node_elim import dead_node_elimination +from cascade.runtime.flink_runtime import FlinkRuntime + +from .entities.user import user_op +from .entities.compose_review import compose_review_op +from .entities.frontend import frontend_df_parallel, frontend_df_serial, frontend_op, text_op, unique_id_op +from .entities.movie import movie_id_op, movie_info_op, plot_op + import os +from confluent_kafka.admin import AdminClient, NewTopic -# import cascade -sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "../src"))) +KAFKA_BROKER = "localhost:9092" +KAFKA_FLINK_BROKER = "kafka:9093" # If running a flink cluster and kafka inside docker, the broker url might be different -from cascade.dataflow.dataflow import Event, EventResult, InitClass, OpNode -from cascade.runtime.flink_runtime import FlinkClientSync, FlinkRuntime -from cascade.dataflow.optimization.dead_node_elim import dead_node_elimination +IN_TOPIC = "ds-movie-in" +OUT_TOPIC = "ds-movie-out" +INTERNAL_TOPIC = "ds-movie-internal" -from .entities.user import user_op, User -from .entities.compose_review import compose_review_op -from .entities.frontend import frontend_op, text_op, unique_id_op -from .entities.movie import MovieInfo, movie_id_op, movie_info_op, plot_op, Plot, MovieId - -import time -import pandas as pd - -def populate_user(client: FlinkRuntime): - init_user = OpNode(User, InitClass(), read_key_from="username") - for i in range(1000): - user_id = f'user{i}' - username = f'username_{i}' - password = f'password_{i}' - hasher = hashlib.new('sha512') - salt = uuid.uuid1().bytes - hasher.update(password.encode()) - hasher.update(salt) - - password_hash = hasher.hexdigest() - - user_data = { - "userId": user_id, - "FirstName": "firstname", - "LastName": "lastname", - "Username": username, - "Password": password_hash, - "Salt": salt - } - event = Event(init_user, {"username": username, "user_data": user_data}, None) - client.send(event) - - -def populate_movie(client: FlinkRuntime): - init_movie_info = OpNode(MovieInfo, InitClass(), read_key_from="movie_id") - init_plot = OpNode(Plot, InitClass(), read_key_from="movie_id") - init_movie_id = OpNode(MovieId, InitClass(), read_key_from="title") - - for movie in movie_data: - movie_id = movie["MovieId"] - - # movie info -> write `movie` - event = Event(init_movie_info, {"movie_id": movie_id, "info": movie}, None) - client.send(event) - - # plot -> write "plot" - event = Event(init_plot, {"movie_id": movie_id, "plot": "plot"}, None) - client.send(event) - - # movie_id_op -> register movie id - event = Event(init_movie_id, {"title": movie["Title"], "movie_id": movie_id}, None) - client.send(event) - - -def compose_review(req_id): - user_index = random.randint(0, 999) - username = f"username_{user_index}" - password = f"password_{user_index}" - title = random.choice(movie_titles) - rating = random.randint(0, 10) - text = ''.join(random.choice(charset) for _ in range(256)) - - return frontend_op.dataflow.generate_event({ - "review": req_id, - "user": username, - "title": title, - "rating": rating, - "text": text - }) - -def deathstar_workload_generator(): - c = 1 - while True: - yield compose_review(c) - c += 1 - -threads = 1 -messages_per_burst = 10 -sleeps_per_burst = 10 -sleep_time = 0.08 #0.0085 -seconds_per_burst = 1 -bursts = 100 - - -def benchmark_runner(proc_num) -> dict[int, dict]: - print(f'Generator: {proc_num} starting') - client = FlinkClientSync("ds-movie-in", "ds-movie-out") - deathstar_generator = deathstar_workload_generator() - start = timer() - - for _ in range(bursts): - sec_start = timer() - - # send burst of messages - for i in range(messages_per_burst): - - # sleep sometimes between messages - if i % (messages_per_burst // sleeps_per_burst) == 0: - time.sleep(sleep_time) - event = next(deathstar_generator) - client.send(event) - - client.flush() - sec_end = timer() - - # wait out the second - lps = sec_end - sec_start - if lps < seconds_per_burst: - time.sleep(1 - lps) - sec_end2 = timer() - print(f'Latency per burst: {sec_end2 - sec_start} ({seconds_per_burst})') +EXPERIMENT: Literal["baseline", "pipelined", "parallel"] = os.getenv("EXPERIMENT", "baseline") + +def create_topics(*required_topics): + conf = { + "bootstrap.servers": KAFKA_BROKER + } + + admin_client = AdminClient(conf) + + # Fetch existing topics + existing_topics = admin_client.list_topics(timeout=5).topics.keys() + + # Find missing topics + missing_topics = [topic for topic in required_topics if topic not in existing_topics] + + if missing_topics: + print(f"Creating missing topics: {missing_topics}") - end = timer() - print(f'Average latency per burst: {(end - start) / bursts} ({seconds_per_burst})') - - done = False - while not done: - done = True - for event_id, fut in client._futures.items(): - result = fut["ret"] - if result is None: - done = False - time.sleep(0.5) - break - futures = client._futures - client.close() - return futures - - -def write_dict_to_pkl(futures_dict, filename): - """ - Writes a dictionary of event data to a pickle file. - - Args: - futures_dict (dict): A dictionary where each key is an event ID and the value is another dict. - filename (str): The name of the pickle file to write to. - """ - - # Prepare the data for the DataFrame - data = [] - for event_id, event_data in futures_dict.items(): - ret: EventResult = event_data.get("ret") - row = { - "event_id": event_id, - "sent": str(event_data.get("sent")), - "sent_t": event_data.get("sent_t"), - "ret": str(event_data.get("ret")), - "ret_t": event_data.get("ret_t"), - "roundtrip": ret.metadata["roundtrip"] if ret else None, - "flink_time": ret.metadata["flink_time"] if ret else None, - "deser_times": ret.metadata["deser_times"] if ret else None, - "loops": ret.metadata["loops"] if ret else None, - "latency": event_data["ret_t"][1] - event_data["sent_t"][1] if ret else None - } - data.append(row) - - # Create a DataFrame and save it as a pickle file - df = pd.DataFrame(data) - df.to_pickle(filename) + # Define new topics (default: 1 partition, replication factor 1) + new_topics = [NewTopic(topic, num_partitions=1, replication_factor=1) for topic in missing_topics] + + # Create topics + futures = admin_client.create_topics(new_topics) + + # Wait for topic creation to complete + for topic, future in futures.items(): + try: + future.result() # Block until the operation is complete + print(f"Topic '{topic}' created successfully") + except Exception as e: + print(f"Failed to create topic '{topic}': {e}") + else: + print("All required topics exist.") + def main(): - runtime = FlinkRuntime("ds-movie-in", "ds-movie-out", 8081) - runtime.init(bundle_time=5, bundle_size=10) + create_topics(IN_TOPIC, OUT_TOPIC, INTERNAL_TOPIC) - print(frontend_op.dataflow.to_dot()) - # dead_node_elimination([], [frontend_op]) - print(frontend_op.dataflow.to_dot()) - input() + runtime = FlinkRuntime(IN_TOPIC, OUT_TOPIC, internal_topic=INTERNAL_TOPIC) + runtime.init(kafka_broker=KAFKA_FLINK_BROKER,bundle_time=5, bundle_size=10) + if EXPERIMENT == "baseline": + frontend_op.dataflow = frontend_df_serial() + elif EXPERIMENT == "pipelined": + frontend_op.dataflow = frontend_df_serial() + dead_node_elimination([], [frontend_op]) + elif EXPERIMENT == "parallel": + frontend_op.dataflow = frontend_df_parallel() + + print(frontend_op.dataflow.to_dot()) + print(f"Creating dataflow [{EXPERIMENT}]") runtime.add_operator(compose_review_op) runtime.add_operator(user_op) @@ -194,30 +78,7 @@ def main(): runtime.add_stateless_operator(unique_id_op) runtime.add_stateless_operator(text_op) - runtime.run(run_async=True) - populate_user(runtime) - populate_movie(runtime) - runtime.producer.flush() - time.sleep(1) - - input() - - # with Pool(threads) as p: - # results = p.map(benchmark_runner, range(threads)) - - # results = {k: v for d in results for k, v in d.items()} - results = benchmark_runner(0) - - print("last result:") - print(list(results.values())[-1]) - t = len(results) - r = 0 - for result in results.values(): - if result["ret"] is not None: - print(result) - r += 1 - print(f"{r}/{t} results recieved.") - write_dict_to_pkl(results, "test2.pkl") + runtime.run() if __name__ == "__main__": main() diff --git a/deathstar_movie_review/entities/compose_review.py b/deathstar_movie_review/entities/compose_review.py index 7423c4b..853e34b 100644 --- a/deathstar_movie_review/entities/compose_review.py +++ b/deathstar_movie_review/entities/compose_review.py @@ -1,6 +1,6 @@ from typing import Any -from src.cascade.dataflow.operator import StatefulOperator +from cascade.dataflow.operator import StatefulOperator class ComposeReview: diff --git a/deathstar_movie_review/entities/user.py b/deathstar_movie_review/entities/user.py index 2b244a9..e883277 100644 --- a/deathstar_movie_review/entities/user.py +++ b/deathstar_movie_review/entities/user.py @@ -1,7 +1,7 @@ from typing import Any from deathstar_movie_review.entities.compose_review import ComposeReview -from src.cascade.dataflow.dataflow import DataFlow, Edge, InvokeMethod, OpNode -from src.cascade.dataflow.operator import StatefulOperator +from cascade.dataflow.dataflow import DataFlow, Edge, InvokeMethod, OpNode +from cascade.dataflow.operator import StatefulOperator class User: diff --git a/deathstar_movie_review/start_benchmark.py b/deathstar_movie_review/start_benchmark.py new file mode 100644 index 0000000..7664b86 --- /dev/null +++ b/deathstar_movie_review/start_benchmark.py @@ -0,0 +1,237 @@ +import hashlib +import time +import uuid +import pandas as pd +import random +from .movie_data import movie_data +from .workload_data import movie_titles, charset +import sys +import os +from timeit import default_timer as timer +import argparse + +# import cascade +sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "../src"))) + +from cascade.dataflow.dataflow import Event, EventResult, InitClass, OpNode +from cascade.runtime.flink_runtime import FlinkClientSync + +from .entities.user import User +from .entities.frontend import frontend_op +from .entities.movie import MovieInfo, Plot, MovieId + +IN_TOPIC = "ds-movie-in" +OUT_TOPIC = "ds-movie-out" +# threads = 1 +# messages_per_burst = 10 +# sleeps_per_burst = 10 +# sleep_time = 0.08 +# seconds_per_burst = 1 +# bursts = 100 + +def populate_user(client: FlinkClientSync): + init_user = OpNode(User, InitClass(), read_key_from="username") + for i in range(1000): + user_id = f'user{i}' + username = f'username_{i}' + password = f'password_{i}' + hasher = hashlib.new('sha512') + salt = uuid.uuid1().bytes + hasher.update(password.encode()) + hasher.update(salt) + + password_hash = hasher.hexdigest() + + user_data = { + "userId": user_id, + "FirstName": "firstname", + "LastName": "lastname", + "Username": username, + "Password": password_hash, + "Salt": salt + } + event = Event(init_user, {"username": username, "user_data": user_data}, None) + client.send(event) + + +def populate_movie(client: FlinkClientSync): + init_movie_info = OpNode(MovieInfo, InitClass(), read_key_from="movie_id") + init_plot = OpNode(Plot, InitClass(), read_key_from="movie_id") + init_movie_id = OpNode(MovieId, InitClass(), read_key_from="title") + + for movie in movie_data: + movie_id = movie["MovieId"] + + # movie info -> write `movie` + event = Event(init_movie_info, {"movie_id": movie_id, "info": movie}, None) + client.send(event) + + # plot -> write "plot" + event = Event(init_plot, {"movie_id": movie_id, "plot": "plot"}, None) + client.send(event) + + # movie_id_op -> register movie id + event = Event(init_movie_id, {"title": movie["Title"], "movie_id": movie_id}, None) + client.send(event) + + +def compose_review(req_id): + user_index = random.randint(0, 999) + username = f"username_{user_index}" + password = f"password_{user_index}" + title = random.choice(movie_titles) + rating = random.randint(0, 10) + text = ''.join(random.choice(charset) for _ in range(256)) + + return frontend_op.dataflow.generate_event({ + "review": req_id, + "user": username, + "title": title, + "rating": rating, + "text": text + }) + +def deathstar_workload_generator(): + c = 1 + while True: + yield compose_review(c) + c += 1 + + +def benchmark_runner(proc_num, messages_per_burst, sleeps_per_burst, sleep_time, seconds_per_burst, bursts) -> dict[int, dict]: + print(f'Generator: {proc_num} starting') + client = FlinkClientSync(IN_TOPIC, OUT_TOPIC) + deathstar_generator = deathstar_workload_generator() + start = timer() + + for b in range(bursts): + sec_start = timer() + + # send burst of messages + for i in range(messages_per_burst): + + # sleep sometimes between messages + if i % (messages_per_burst // sleeps_per_burst) == 0: + time.sleep(sleep_time) + event = next(deathstar_generator) + client.send(event) + + client.flush() + sec_end = timer() + + # wait out the second + lps = sec_end - sec_start + if lps < seconds_per_burst: + time.sleep(1 - lps) + sec_end2 = timer() + print(f'Latency per burst: {sec_end2 - sec_start} ({b+1}/{bursts})') + + end = timer() + print(f'Average latency per burst: {(end - start) / bursts} ({seconds_per_burst})') + futures = wait_for_futures(client) + client.close() + return futures + +def wait_for_futures(client: FlinkClientSync): + done = False + while not done: + done = True + for event_id, fut in client._futures.items(): + result = fut["ret"] + if result is None: + done = False + time.sleep(0.5) + break + futures = client._futures + return futures + + +def write_dict_to_pkl(futures_dict, filename): + """ + Writes a dictionary of event data to a pickle file. + + Args: + futures_dict (dict): A dictionary where each key is an event ID and the value is another dict. + filename (str): The name of the pickle file to write to. + """ + + # Prepare the data for the DataFrame + data = [] + for event_id, event_data in futures_dict.items(): + ret: EventResult = event_data.get("ret") + row = { + "event_id": event_id, + "sent": str(event_data.get("sent")), + "sent_t": event_data.get("sent_t"), + "ret": str(event_data.get("ret")), + "ret_t": event_data.get("ret_t"), + "roundtrip": ret.metadata["roundtrip"] if ret else None, + "flink_time": ret.metadata["flink_time"] if ret else None, + "deser_times": ret.metadata["deser_times"] if ret else None, + "loops": ret.metadata["loops"] if ret else None, + "latency": event_data["ret_t"][1] - event_data["sent_t"][1] if ret else None + } + data.append(row) + + # Create a DataFrame and save it as a pickle file + df = pd.DataFrame(data) + + # Multiply flink_time by 1000 to convert to milliseconds + df['flink_time'] = df['flink_time'] * 1000 + + df.to_pickle(filename) + return df + +def main(): + parser = argparse.ArgumentParser(description="Run the benchmark and save results.") + parser.add_argument("-o", "--output", type=str, default="benchmark_results.pkl", help="Output file name for the results") + parser.add_argument("--messages_per_burst", type=int, default=10, help="Number of messages per burst") + parser.add_argument("--sleeps_per_burst", type=int, default=10, help="Number of sleep cycles per burst") + parser.add_argument("--sleep_time", type=float, default=0.08, help="Sleep time between messages") + parser.add_argument("--seconds_per_burst", type=int, default=1, help="Seconds per burst") + parser.add_argument("--bursts", type=int, default=100, help="Number of bursts") + args = parser.parse_args() + + print(f"Starting with args:\n{args}") + + init_client = FlinkClientSync(IN_TOPIC, OUT_TOPIC) + + print("Populating...") + populate_user(init_client) + populate_movie(init_client) + init_client.producer.flush() + wait_for_futures(init_client) + print("Done.") + time.sleep(1) + + print("Starting benchmark") + + # with Pool(threads) as p: + # results = p.map(benchmark_runner, range(threads)) + + # results = {k: v for d in results for k, v in d.items()} + results = benchmark_runner(0, args.messages_per_burst, args.sleeps_per_burst, args.sleep_time, args.seconds_per_burst, args.bursts) + + print("last result:") + print(list(results.values())[-1]) + t = len(results) + r = 0 + for result in results.values(): + if result["ret"] is not None: + # print(result) + r += 1 + + print(f"{r}/{t} results recieved.") + print(f"Writing results to {args.output}") + + df = write_dict_to_pkl(results, args.output) + + flink_time = df['flink_time'].median() + latency = df['latency'].median() + flink_prct = float(flink_time) * 100 / latency + print(f"Median latency : {latency:.2f} ms") + print(f"Median Flink time : {flink_time:.2f} ms ({flink_prct:.2f}%)") + init_client.close() + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/display_results.ipynb b/display_results.ipynb index f47e1d5..548f528 100644 --- a/display_results.ipynb +++ b/display_results.ipynb @@ -1910,7 +1910,7 @@ }, { "cell_type": "code", - "execution_count": 33, + "execution_count": null, "metadata": {}, "outputs": [ { @@ -2917,9 +2917,6 @@ " # Read the DataFrame from the pickle file\n", " df = pd.read_pickle(pickle_file_path)\n", "\n", - " # Multiply flink_time by 1000 to convert to milliseconds\n", - " df['flink_time'] = df['flink_time'] * 1000\n", - "\n", " # Calculate the additional Kafka overhead\n", " df['kafka_overhead'] = df['latency'] - df['flink_time']\n", "\n", diff --git a/docker-compose.yml b/docker-compose.yml index ce9e1b4..22b5bb8 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -40,3 +40,31 @@ services: KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9093 depends_on: - kafka + + # https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/deployment/resource-providers/standalone/docker/#flink-with-docker-compose + + jobmanager: + build: + context: . + dockerfile: Dockerfile.pyflink + ports: + - "8081:8081" + command: jobmanager + environment: + - | + FLINK_PROPERTIES= + jobmanager.rpc.address: jobmanager + + taskmanager: + build: + context: . + dockerfile: Dockerfile.pyflink + depends_on: + - jobmanager + command: taskmanager + scale: 1 + environment: + - | + FLINK_PROPERTIES= + jobmanager.rpc.address: jobmanager + taskmanager.numberOfTaskSlots: ${TASK_SLOTS:-16} \ No newline at end of file diff --git a/environment.yml b/environment.yml new file mode 100644 index 0000000..e31bd80 --- /dev/null +++ b/environment.yml @@ -0,0 +1,94 @@ +name: cascade_env +channels: + - defaults + - https://repo.anaconda.com/pkgs/main + - https://repo.anaconda.com/pkgs/r +dependencies: + - _libgcc_mutex=0.1=main + - _openmp_mutex=5.1=1_gnu + - bzip2=1.0.8=h5eee18b_6 + - ca-certificates=2024.12.31=h06a4308_0 + - ld_impl_linux-64=2.40=h12ee557_0 + - libffi=3.4.4=h6a678d5_1 + - libgcc-ng=11.2.0=h1234567_1 + - libgomp=11.2.0=h1234567_1 + - libstdcxx-ng=11.2.0=h1234567_1 + - libuuid=1.41.5=h5eee18b_0 + - ncurses=6.4=h6a678d5_0 + - openssl=3.0.15=h5eee18b_0 + - pip=25.0=py311h06a4308_0 + - python=3.11.11=he870216_0 + - readline=8.2=h5eee18b_0 + - setuptools=75.8.0=py311h06a4308_0 + - sqlite=3.45.3=h5eee18b_0 + - tk=8.6.14=h39e8969_0 + - wheel=0.45.1=py311h06a4308_0 + - xz=5.6.4=h5eee18b_1 + - zlib=1.2.13=h5eee18b_1 + - pip: + - apache-beam==2.48.0 + - apache-flink==1.20.0 + - apache-flink-libraries==1.20.0 + - astor==0.8.1 + - avro-python3==1.10.2 + - certifi==2024.8.30 + - charset-normalizer==3.4.0 + - cloudpickle==2.2.1 + - configargparse==1.7 + - confluent-kafka==2.6.1 + - contourpy==1.3.1 + - crcmod==1.7 + - cycler==0.12.1 + - dill==0.3.1.1 + - dnspython==2.7.0 + - docopt==0.6.2 + - exceptiongroup==1.2.2 + - fastavro==1.9.7 + - fasteners==0.19 + - find-libpython==0.4.0 + - fonttools==4.56.0 + - geographiclib==2.0 + - geopy==2.4.1 + - grpcio==1.68.1 + - hdfs==2.7.3 + - httplib2==0.22.0 + - idna==3.10 + - iniconfig==2.0.0 + - jinja2==3.1.4 + - kiwisolver==1.4.8 + - klara==0.6.3 + - markupsafe==3.0.2 + - matplotlib==3.9.2 + - networkx==3.2.1 + - numpy==1.24.4 + - objsize==0.6.1 + - orjson==3.10.12 + - packaging==24.2 + - pandas==2.2.3 + - pdoc==15.0.0 + - pemja==0.4.1 + - pillow==11.1.0 + - pluggy==1.5.0 + - proto-plus==1.25.0 + - protobuf==4.23.4 + - py4j==0.10.9.7 + - pyarrow==11.0.0 + - pydot==1.4.2 + - pygments==2.18.0 + - pymongo==4.10.1 + - pyparsing==3.2.0 + - pytest==8.3.4 + - python-dateutil==2.9.0.post0 + - pytz==2024.2 + - regex==2024.11.6 + - requests==2.32.3 + - ruamel-yaml==0.18.6 + - ruamel-yaml-clib==0.2.12 + - six==1.17.0 + - tomli==2.2.1 + - typed-ast==1.5.5 + - typing-extensions==4.12.2 + - tzdata==2024.2 + - urllib3==2.2.3 + - z3-solver==4.14.0.0 + - zstandard==0.23.0 diff --git a/run_experiments.py b/run_experiments.py new file mode 100755 index 0000000..3cf327e --- /dev/null +++ b/run_experiments.py @@ -0,0 +1,96 @@ +import os +import subprocess +import time + +args = { + "messages_per_burst": 10, + "sleeps_per_burst": 10, + "sleep_time": 0.08, + "seconds_per_burst": 1, + "bursts": 100 +} + +mps_20 = { + **args, + "messages_per_burst": 20, + "sleeps_per_burst": 20, + "sleep_time": 0.08/2, +} + +mps_50 = { + **args, + "messages_per_burst": 50, + "sleeps_per_burst": 50, + "sleep_time": 0.08/5, +} + +# Define experiment parameters as a list of dictionaries +experiments = [ + {"parallelism": 16, "benchmark_args": {**args}}, + {"parallelism": 16, "benchmark_args": {**mps_20}}, + {"parallelism": 16, "benchmark_args": {**mps_50}}, + + {"parallelism": 8, "benchmark_args": {**args}}, + {"parallelism": 8, "benchmark_args": {**mps_20}}, + + {"parallelism": 4, "benchmark_args": {**mps_20}}, + {"parallelism": 4, "benchmark_args": {**args}}, + + {"parallelism": 2, "benchmark_args": {**args}}, + {"parallelism": 2, "benchmark_args": {**mps_20}}, + + {"parallelism": 1, "benchmark_args": {**args}}, + {"parallelism": 1, "benchmark_args": {**mps_20}}, + + {"parallelism": 8, "benchmark_args": {**mps_50}}, + {"parallelism": 4, "benchmark_args": {**mps_50}}, + {"parallelism": 2, "benchmark_args": {**mps_50}}, + {"parallelism": 1, "benchmark_args": {**mps_50}}, +] + + + + +print("Tearing down docker containers") +subprocess.run(["docker", "compose", "down"], check=True) + +for e in ["parallel", "base", "piplined"]: + for exp in experiments: + print(f"Starting experiment {exp}") + + # Start docker compose + subprocess.run(["docker", "compose", "up", "-d"], check=True) + + time.sleep(10) + + # Run Flink job + + flink_cmd = [ + "flink", "run", "--pyFiles", "/home/lvanmol/cascade/src,/home/lvanmol/cascade", + "--pyModule", "deathstar_movie_review.demo", "-d", "-p", str(exp['parallelism']) + ] + env = os.environ + env["EXPERIMENT"] = e + subprocess.run(flink_cmd, check=True, env=env) + + # Start benchmark + filename = f"{e}_p-{exp['parallelism']}_mps-{exp['benchmark_args']['messages_per_burst']}.plk" + benchmark_cmd = [ + "python", "-u", "-m", "deathstar_movie_review.start_benchmark", "--output", filename + ] + + for arg, val in exp['benchmark_args'].items(): + benchmark_cmd.append(f"--{arg}") + benchmark_cmd.append(str(val)) + subprocess.run(benchmark_cmd, check=True) + + # Sleep for experiment duration + # print(f"Sleeping for {exp['sleep']} seconds...") + # time.sleep(exp['sleep']) + + # Stop docker compose + subprocess.run(["docker", "compose", "down"], check=True) + + print(f"Experiment completed.") + +print("All experiments completed.") diff --git a/src/cascade/runtime/flink_runtime.py b/src/cascade/runtime/flink_runtime.py index febfc83..5afd53f 100644 --- a/src/cascade/runtime/flink_runtime.py +++ b/src/cascade/runtime/flink_runtime.py @@ -7,7 +7,6 @@ from typing import Any, Literal, Optional, Type, Union from pyflink.common.typeinfo import Types, get_gateway from pyflink.common import Configuration, DeserializationSchema, SerializationSchema, WatermarkStrategy -from pyflink.datastream.connectors import DeliveryGuarantee from pyflink.datastream.data_stream import CloseableIterator from pyflink.datastream.functions import KeyedProcessFunction, RuntimeContext, ValueState, ValueStateDescriptor from pyflink.datastream.connectors.kafka import KafkaOffsetsInitializer, KafkaRecordSerializationSchema, KafkaSource, KafkaSink @@ -25,6 +24,9 @@ console_handler.setFormatter(formatter) logger.addHandler(console_handler) +# Required if SelectAll nodes are used +SELECT_ALL_ENABLED = False + @dataclass class FlinkRegisterKeyNode(Node): """A node that will register a key with the SelectAll operator. @@ -69,14 +71,15 @@ def process_element(self, event: Event, ctx: KeyedProcessFunction.Context): result = self.operator.handle_init_class(*event.variable_map.values()) # Register the created key in FlinkSelectAllOperator - # register_key_event = Event( - # FlinkRegisterKeyNode(key, self.operator.entity), - # {}, - # None, - # _id = event._id - # ) - # logger.debug(f"FlinkOperator {self.operator.entity.__name__}[{ctx.get_current_key()}]: Registering key: {register_key_event}") - # yield register_key_event + if SELECT_ALL_ENABLED: + register_key_event = Event( + FlinkRegisterKeyNode(key, self.operator.entity), + {}, + None, + _id = event._id + ) + logger.debug(f"FlinkOperator {self.operator.entity.__name__}[{ctx.get_current_key()}]: Registering key: {register_key_event}") + yield register_key_event self.state.update(pickle.dumps(result)) elif isinstance(event.target.method_type, InvokeMethod): @@ -92,6 +95,7 @@ def process_element(self, event: Event, ctx: KeyedProcessFunction.Context): # TODO: check if state actually needs to be updated if state is not None: self.state.update(pickle.dumps(state)) + # Filter targets are used in cases of [hotel for hotel in Hotel.__all__() *if hotel....*] # elif isinstance(event.target.method_type, Filter): # state = pickle.loads(self.state.value()) # result = event.target.method_type.filter_fn(event.variable_map, state) @@ -317,18 +321,18 @@ def debug(x, msg=""): class FlinkRuntime(): """A Runtime that runs Dataflows on Flink.""" - def __init__(self, input_topic="input-topic", output_topic="output-topic", ui_port: Optional[int] = None): + def __init__(self, input_topic="input-topic", output_topic="output-topic", ui_port: Optional[int] = None, internal_topic="internal-topic"): self.env: Optional[StreamExecutionEnvironment] = None """@private""" - self.producer: Producer = None - """@private""" - self.sent_events = 0 """The number of events that were sent using `send()`.""" self.input_topic = input_topic - """The topic to use for internal communications.""" + """The topic to use read new events/requests.""" + + self.internal_topic = internal_topic + """The topic used for internal messages.""" self.output_topic = output_topic """The topic to use for external communications, i.e. when a dataflow is @@ -368,30 +372,27 @@ def init(self, kafka_broker="localhost:9092", bundle_time=1, bundle_size=5, para config.set_integer("python.fn-execution.bundle.size", bundle_size) # optimize for low latency - config.set_integer("taskmanager.memory.managed.size", 0) - config.set_integer("execution.buffer-timeout", 0) + # config.set_integer("taskmanager.memory.managed.size", 0) + config.set_integer("execution.buffer-timeout", 5) + + + kafka_jar = os.path.join(os.path.abspath(os.path.dirname(__file__)), + 'bin/flink-sql-connector-kafka-3.3.0-1.20.jar') + serializer_jar = os.path.join(os.path.abspath(os.path.dirname(__file__)), 'bin/flink-kafka-bytes-serializer.jar') + flink_jar = "/home/lvanmol/flink-1.20.1/opt/flink-python-1.20.1.jar" + + # Add the required jars https://issues.apache.org/jira/browse/FLINK-36457q + config.set_string("pipeline.jars",f"file://{flink_jar};file://{kafka_jar};file://{serializer_jar}") self.env = StreamExecutionEnvironment.get_execution_environment(config) if parallelism: self.env.set_parallelism(parallelism) logger.debug(f"FlinkRuntime: parellelism {self.env.get_parallelism()}") - kafka_jar = os.path.join(os.path.abspath(os.path.dirname(__file__)), - 'bin/flink-sql-connector-kafka-3.3.0-1.20.jar') - serializer_jar = os.path.join(os.path.abspath(os.path.dirname(__file__)), 'bin/flink-kafka-bytes-serializer.jar') - - if os.name == 'nt': - self.env.add_jars(f"file:///{kafka_jar}",f"file:///{serializer_jar}") - else: - self.env.add_jars(f"file://{kafka_jar}",f"file://{serializer_jar}") deserialization_schema = ByteSerializer() - properties: dict = { - "bootstrap.servers": kafka_broker, - "auto.offset.reset": "earliest", - "group.id": "test_group_1", - } - kafka_source = ( + + kafka_external_source = ( KafkaSource.builder() .set_bootstrap_servers(kafka_broker) .set_topics(self.input_topic) @@ -400,16 +401,24 @@ def init(self, kafka_broker="localhost:9092", bundle_time=1, bundle_size=5, para .set_value_only_deserializer(deserialization_schema) .build() ) + kafka_internal_source = ( + KafkaSource.builder() + .set_bootstrap_servers(kafka_broker) + .set_topics(self.internal_topic) + .set_group_id("test_group_1") + .set_starting_offsets(KafkaOffsetsInitializer.earliest()) + .set_value_only_deserializer(deserialization_schema) + .build() + ) self.kafka_internal_sink = ( KafkaSink.builder() .set_bootstrap_servers(kafka_broker) .set_record_serializer( KafkaRecordSerializationSchema.builder() - .set_topic(self.input_topic) + .set_topic(self.internal_topic) .set_value_serialization_schema(deserialization_schema) .build() ) - .set_delivery_guarantee(DeliveryGuarantee.AT_LEAST_ONCE) .build() ) """Kafka sink that will be ingested again by the Flink runtime.""" @@ -423,60 +432,55 @@ def init(self, kafka_broker="localhost:9092", bundle_time=1, bundle_size=5, para .set_value_serialization_schema(deserialization_schema) .build() ) - .set_delivery_guarantee(DeliveryGuarantee.AT_LEAST_ONCE) .build() ) """Kafka sink corresponding to outputs of calls (`EventResult`s).""" - event_stream = ( - self.env.from_source( - kafka_source, + self.env.from_source( + kafka_external_source, WatermarkStrategy.no_watermarks(), - "Kafka Source" + "Kafka External Source" ) .map(lambda x: deserialize_and_timestamp(x)) - # .map(lambda x: debug(x, msg=f"entry: {x}")) - .name("DESERIALIZE") + .name("DESERIALIZE external") # .filter(lambda e: isinstance(e, Event)) # Enforced by `send` type safety + ).union( + self.env.from_source( + kafka_internal_source, + WatermarkStrategy.no_watermarks(), + "Kafka External Source" + ) + .map(lambda x: deserialize_and_timestamp(x)) + .name("DESERIALIZE internal") ) - """REMOVE SELECT ALL NODES # Events with a `SelectAllNode` will first be processed by the select # all operator, which will send out multiple other Events that can # then be processed by operators in the same steam. - select_all_stream = ( - event_stream.filter(lambda e: - isinstance(e.target, SelectAllNode) or isinstance(e.target, FlinkRegisterKeyNode)) - .key_by(lambda e: e.target.cls) - .process(FlinkSelectAllOperator()).name("SELECT ALL OP") - ) - # Stream that ingests events with an `SelectAllNode` or `FlinkRegisterKeyNode` - not_select_all_stream = ( - event_stream.filter(lambda e: - not (isinstance(e.target, SelectAllNode) or isinstance(e.target, FlinkRegisterKeyNode))) - ) + if SELECT_ALL_ENABLED: + select_all_stream = ( + event_stream.filter(lambda e: + isinstance(e.target, SelectAllNode) or isinstance(e.target, FlinkRegisterKeyNode)) + .key_by(lambda e: e.target.cls) + .process(FlinkSelectAllOperator()).name("SELECT ALL OP") + ) + # Stream that ingests events with an `SelectAllNode` or `FlinkRegisterKeyNode` + not_select_all_stream = ( + event_stream.filter(lambda e: + not (isinstance(e.target, SelectAllNode) or isinstance(e.target, FlinkRegisterKeyNode))) + ) - operator_stream = select_all_stream.union(not_select_all_stream) - """ + event_stream = select_all_stream.union(not_select_all_stream) self.stateful_op_stream = event_stream self.stateless_op_stream = event_stream - # MOVED TO END OF OP STREAMS! - # self.merge_op_stream = ( - # event_stream.filter(lambda e: isinstance(e.target, CollectNode)) - # .key_by(lambda e: e._id) # might not work in the future if we have multiple merges in one dataflow? - # .process(FlinkCollectOperator()) - # .name("Collect") - # ) - # """Stream that ingests events with an `cascade.dataflow.dataflow.CollectNode` target""" self.stateless_op_streams = [] self.stateful_op_streams = [] """List of stateful operator streams, which gets appended at `add_operator`.""" - self.producer = Producer({'bootstrap.servers': kafka_broker}) logger.debug("FlinkRuntime initialized") def add_operator(self, op: StatefulOperator): @@ -485,10 +489,8 @@ def add_operator(self, op: StatefulOperator): op_stream = ( self.stateful_op_stream.filter(lambda e: isinstance(e.target, OpNode) and e.target.entity == flink_op.operator.entity) - # .map(lambda x: debug(x, msg=f"filtered op: {op.entity}")) .key_by(lambda e: e.variable_map[e.target.read_key_from]) .process(flink_op) - # .map(lambda x: debug(x, msg=f"processed op: {op.entity}")) .name("STATEFUL OP: " + flink_op.operator.entity.__name__) ) self.stateful_op_streams.append(op_stream) @@ -505,17 +507,6 @@ def add_stateless_operator(self, op: StatelessOperator): ) self.stateless_op_streams.append(op_stream) - def send(self, event: Event, flush=False): - """Send an event to the Kafka source. - Once `run` has been called, the Flink runtime will start ingesting these - messages. Messages can always be sent after `init` is called - Flink - will continue ingesting messages after `run` is called asynchronously. - """ - self.producer.produce(self.input_topic, value=pickle.dumps(event)) - if flush: - self.producer.flush() - self.sent_events += 1 - def run(self, run_async=False, output: Literal["collect", "kafka", "stdout"]="kafka") -> Union[CloseableIterator, None]: """Start ingesting and processing messages from the Kafka source. @@ -526,10 +517,16 @@ def run(self, run_async=False, output: Literal["collect", "kafka", "stdout"]="ka logger.debug("FlinkRuntime merging operator streams...") # Combine all the operator streams - # operator_streams = self.merge_op_stream.union(*self.stateful_op_streams[1:], *self.stateless_op_streams)#.map(lambda x: debug(x, msg="combined ops")) - s1 = self.stateful_op_streams[0] - rest = self.stateful_op_streams[1:] - operator_streams = s1.union(*rest, *self.stateless_op_streams)#.map(lambda x: debug(x, msg="combined ops")) + if len(self.stateful_op_streams) >= 1: + s1 = self.stateful_op_streams[0] + rest = self.stateful_op_streams[1:] + operator_streams = s1.union(*rest, *self.stateless_op_streams) + elif len(self.stateless_op_streams) >= 1: + s1 = self.stateless_op_streams[0] + rest = self.stateless_op_streams[1:] + operator_streams = s1.union(*rest, *self.stateful_op_streams) + else: + raise RuntimeError("No operators found, were they added to the flink runtime with .add_*_operator()") merge_op_stream = ( operator_streams.filter(lambda e: isinstance(e, Event) and isinstance(e.target, CollectNode)) @@ -539,20 +536,6 @@ def run(self, run_async=False, output: Literal["collect", "kafka", "stdout"]="ka ) """Stream that ingests events with an `cascade.dataflow.dataflow.CollectNode` target""" - - """ - # Add filtering for nodes with a `Filter` target - full_stream_filtered = ( - operator_streams - .filter(lambda e: isinstance(e, Event) and isinstance(e.target, Filter)) - .filter(lambda e: e.target.filter_fn()) - ) - full_stream_unfiltered = ( - operator_streams - .filter(lambda e: not (isinstance(e, Event) and isinstance(e.target, Filter))) - ) - ds = full_stream_filtered.union(full_stream_unfiltered) - """ # union with EventResults or Events that don't have a CollectNode target ds = merge_op_stream.union(operator_streams.filter(lambda e: not (isinstance(e, Event) and isinstance(e.target, CollectNode))))