Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
f0e8a90
fix run_experiments
lucasvanmol Mar 6, 2025
174d85b
Run python in thread mode
lucasvanmol Mar 7, 2025
76f01d7
GIL workaround exp
lucasvanmol Mar 11, 2025
f9b27a0
Add monitoring
lucasvanmol Mar 18, 2025
46a950c
test side outputs
lucasvanmol Mar 18, 2025
676cd71
Add monitoring switch
lucasvanmol Mar 20, 2025
d749bc5
Merge commit '676cd71' into side-output-fanout
lucasvanmol Mar 20, 2025
ace6b52
Finalize experiments
lucasvanmol Mar 26, 2025
c5f202a
Rework Dataflow IR
lucasvanmol Mar 31, 2025
76e261b
Add deathstar benchmark test for new IR
lucasvanmol Apr 1, 2025
eb53b89
Get started on Flink Runtime conversion
lucasvanmol Apr 1, 2025
ce80d92
Implement DeathstarBench on Flink with new IR
lucasvanmol Apr 2, 2025
d3c45fe
Remove OpNode
lucasvanmol Apr 3, 2025
fabdc18
Use __dict__ as state instead of class instance
lucasvanmol Apr 3, 2025
850c39c
Test setting __dict__ variables in init method
lucasvanmol Apr 3, 2025
c5b1dea
include self in writes when used as attribute
lucasvanmol Apr 3, 2025
6339f4d
move test_programs
lucasvanmol Apr 3, 2025
14446ca
Add branching to CFG
lucasvanmol Apr 4, 2025
00220ac
Add blocked CFG
lucasvanmol Apr 4, 2025
74e3bc8
Implement split function on entities for new IR
lucasvanmol Apr 7, 2025
5542b4c
Renaming
lucasvanmol Apr 7, 2025
446ccd6
Use dataflow ref instead of dataflow on Event objects
lucasvanmol Apr 7, 2025
1bf3369
Fix integration tests
lucasvanmol Apr 7, 2025
0cdc9a7
Improve if/else test coverage
lucasvanmol Apr 7, 2025
243d3dd
Add ifnode to integration to pyruntime
lucasvanmol Apr 8, 2025
0ae5708
Add branching test to pyflink integration tests
lucasvanmol Apr 8, 2025
9fa0d21
Run experiments in new IR
lucasvanmol Apr 8, 2025
fae5b28
Experimentally try thread mode
lucasvanmol Apr 8, 2025
7d7dc12
Fix parallelism issues on experiments
lucasvanmol Apr 9, 2025
9c6b6f7
Add some support for external libraries
lucasvanmol Apr 9, 2025
1c98c4c
Add dynamic prefetch experiment
lucasvanmol Apr 10, 2025
1c8d3fd
Tune prefetcher experiment
lucasvanmol Apr 10, 2025
6a68183
Add code motion test
lucasvanmol Apr 10, 2025
1db6856
Fix parallelize for deathstar bench
lucasvanmol Apr 15, 2025
96b2534
Fixed outdated tests
lucasvanmol Apr 15, 2025
7ac5e40
Move collect operator to chained
lucasvanmol Apr 16, 2025
27d8b15
Add prefetch experiment
lucasvanmol Apr 18, 2025
fc7900a
Add return node & preprocessing
lucasvanmol Apr 24, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,11 @@ __pycache__
*.egg-info
build

.vscode/

# Experiment artifacts
*.png
*.pkl
*.pkl
*.csv
nohup.out
*.zip
17 changes: 0 additions & 17 deletions .vscode/launch.json

This file was deleted.

8 changes: 0 additions & 8 deletions .vscode/settings.json

This file was deleted.

2 changes: 1 addition & 1 deletion deathstar_hotel_reservation/demo.py
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ def user_login_workload_generator():
def benchmark_runner(proc_num) -> dict[int, dict]:
print(f'Generator: {proc_num} starting')
client = FlinkClientSync("deathstar", "ds-out", "localhost:9092", True)
deathstar_generator = user_login_workload_generator()
deathstar_generator = deathstar_workload_generator()
start = timer()

for _ in range(bursts):
Expand Down
198 changes: 99 additions & 99 deletions deathstar_hotel_reservation/test_demo.py
Original file line number Diff line number Diff line change
@@ -1,100 +1,100 @@

import os
import sys

# import cascade
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "../src")))

from cascade.runtime.python_runtime import PythonClientSync, PythonRuntime
from cascade.runtime.flink_runtime import FlinkClientSync, FlinkRuntime
from deathstar_hotel_reservation.demo import DeathstarDemo, recommend, reserve, search_hotel, user_login
import time
import pytest

@pytest.mark.integration
def test_deathstar_demo():
ds = DeathstarDemo()
ds.init_runtime(FlinkRuntime("deathstardemo-test", "dsd-out"))
ds.runtime.run(run_async=True)
print("Populating, press enter to go to the next step when done")
ds.populate()

client = FlinkClientSync("deathstardemo-test", "dsd-out")
input()
print("testing user login")
event = user_login()
client.send(event)

input()
print("testing reserve")
event = reserve()
client.send(event)

input()
print("testing search")
event = search_hotel()
client.send(event)

input()
print("testing recommend (distance)")
time.sleep(0.5)
event = recommend(req_param="distance")
client.send(event)

input()
print("testing recommend (price)")
time.sleep(0.5)
event = recommend(req_param="price")
client.send(event)

print(client._futures)
input()
print("done!")
print(client._futures)

def test_deathstar_demo_python():
ds = DeathstarDemo()
ds.init_runtime(PythonRuntime())
ds.runtime.run()
print("Populating, press enter to go to the next step when done")
ds.populate()

time.sleep(0.1)

client = PythonClientSync(ds.runtime)
print("testing user login")
event = user_login()
result = client.send(event)
assert result == True
event = user_login(succesfull=False)
result = client.send(event)
assert result == False

print("testing reserve")
event = reserve()
result = client.send(event)
assert result == True

return
print("testing search")
event = search_hotel()
result = client.send(event)
print(result)

print("testing recommend (distance)")
time.sleep(0.5)
event = recommend(req_param="distance")
result = client.send(event)
print(result)

print("testing recommend (price)")
time.sleep(0.5)
event = recommend(req_param="price")
result = client.send(event)
print(result)

print("done!")


if __name__ == "__main__":
test_deathstar_demo()
# import os
# import sys

# # import cascade
# sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "../src")))

# from cascade.runtime.python_runtime import PythonClientSync, PythonRuntime
# from cascade.runtime.flink_runtime import FlinkClientSync, FlinkRuntime
# from deathstar_hotel_reservation.demo import DeathstarDemo, recommend, reserve, search_hotel, user_login
# import time
# import pytest

# @pytest.mark.integration
# def test_deathstar_demo():
# ds = DeathstarDemo()
# ds.init_runtime(FlinkRuntime("deathstardemo-test", "dsd-out"))
# ds.runtime.run(run_async=True)
# print("Populating, press enter to go to the next step when done")
# ds.populate()

# client = FlinkClientSync("deathstardemo-test", "dsd-out")
# input()
# print("testing user login")
# event = user_login()
# client.send(event)

# input()
# print("testing reserve")
# event = reserve()
# client.send(event)

# input()
# print("testing search")
# event = search_hotel()
# client.send(event)

# input()
# print("testing recommend (distance)")
# time.sleep(0.5)
# event = recommend(req_param="distance")
# client.send(event)

# input()
# print("testing recommend (price)")
# time.sleep(0.5)
# event = recommend(req_param="price")
# client.send(event)

# print(client._futures)
# input()
# print("done!")
# print(client._futures)

# def test_deathstar_demo_python():
# ds = DeathstarDemo()
# ds.init_runtime(PythonRuntime())
# ds.runtime.run()
# print("Populating, press enter to go to the next step when done")
# ds.populate()

# time.sleep(0.1)

# client = PythonClientSync(ds.runtime)
# print("testing user login")
# event = user_login()
# result = client.send(event)
# assert result == True
# event = user_login(succesfull=False)
# result = client.send(event)
# assert result == False

# print("testing reserve")
# event = reserve()
# result = client.send(event)
# assert result == True

# return
# print("testing search")
# event = search_hotel()
# result = client.send(event)
# print(result)

# print("testing recommend (distance)")
# time.sleep(0.5)
# event = recommend(req_param="distance")
# result = client.send(event)
# print(result)

# print("testing recommend (price)")
# time.sleep(0.5)
# event = recommend(req_param="price")
# result = client.send(event)
# print(result)

# print("done!")


# if __name__ == "__main__":
# test_deathstar_demo()
83 changes: 23 additions & 60 deletions deathstar_movie_review/demo.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
from typing import Literal
import cascade
from cascade.dataflow.dataflow import DataflowRef
from cascade.dataflow.optimization.dead_node_elim import dead_node_elimination
from cascade.dataflow.optimization.parallelization import parallelize_until_if
from cascade.runtime.flink_runtime import FlinkRuntime

from .entities.user import user_op
from .entities.compose_review import compose_review_op
from .entities.frontend import frontend_df_parallel, frontend_df_serial, frontend_op, text_op, unique_id_op
from .entities.movie import movie_id_op, movie_info_op, plot_op
from tests.integration.flink.utils import create_topics, init_flink_runtime

import os
from confluent_kafka.admin import AdminClient, NewTopic

KAFKA_BROKER = "localhost:9092"
KAFKA_FLINK_BROKER = "kafka:9093" # If running a flink cluster and kafka inside docker, the broker url might be different
Expand All @@ -17,67 +15,32 @@
OUT_TOPIC = "ds-movie-out"
INTERNAL_TOPIC = "ds-movie-internal"

EXPERIMENT: Literal["baseline", "pipelined", "parallel"] = os.getenv("EXPERIMENT", "baseline")

def create_topics(*required_topics):
conf = {
"bootstrap.servers": KAFKA_BROKER
}

admin_client = AdminClient(conf)

# Fetch existing topics
existing_topics = admin_client.list_topics(timeout=5).topics.keys()

# Find missing topics
missing_topics = [topic for topic in required_topics if topic not in existing_topics]

if missing_topics:
print(f"Creating missing topics: {missing_topics}")

# Define new topics (default: 1 partition, replication factor 1)
new_topics = [NewTopic(topic, num_partitions=1, replication_factor=1) for topic in missing_topics]

# Create topics
futures = admin_client.create_topics(new_topics)

# Wait for topic creation to complete
for topic, future in futures.items():
try:
future.result() # Block until the operation is complete
print(f"Topic '{topic}' created successfully")
except Exception as e:
print(f"Failed to create topic '{topic}': {e}")
else:
print("All required topics exist.")
EXPERIMENT: Literal["baseline", "parallel"] = os.getenv("EXPERIMENT", "baseline")


def main():
create_topics(IN_TOPIC, OUT_TOPIC, INTERNAL_TOPIC)

runtime = FlinkRuntime(IN_TOPIC, OUT_TOPIC, internal_topic=INTERNAL_TOPIC)
runtime.init(kafka_broker=KAFKA_FLINK_BROKER,bundle_time=5, bundle_size=10)

if EXPERIMENT == "baseline":
frontend_op.dataflow = frontend_df_serial()
elif EXPERIMENT == "pipelined":
frontend_op.dataflow = frontend_df_serial()
dead_node_elimination([], [frontend_op])
elif EXPERIMENT == "parallel":
frontend_op.dataflow = frontend_df_parallel()

print(frontend_op.dataflow.to_dot())
runtime = init_flink_runtime("deathstar_movie_review.entities.entities", IN_TOPIC, OUT_TOPIC, INTERNAL_TOPIC, kafka_broker=KAFKA_FLINK_BROKER,bundle_time=5, bundle_size=10, thread_mode=True, parallelism=None)

print(f"Creating dataflow [{EXPERIMENT}]")

runtime.add_operator(compose_review_op)
runtime.add_operator(user_op)
runtime.add_operator(movie_info_op)
runtime.add_operator(movie_id_op)
runtime.add_operator(plot_op)
runtime.add_stateless_operator(frontend_op)
runtime.add_stateless_operator(unique_id_op)
runtime.add_stateless_operator(text_op)

# for parallel experiment
df_baseline = cascade.core.dataflows[DataflowRef("Frontend", "compose")]
df_parallel, _ = parallelize_until_if(df_baseline)
df_parallel.name = "compose_parallel"
cascade.core.dataflows[DataflowRef("Frontend", "compose_parallel")] = df_parallel
runtime.add_dataflow(df_parallel)

# for prefetch experiment
df_baseline = cascade.core.dataflows[DataflowRef("MovieId", "upload_movie_prefetch")]
df_parallel, _ = parallelize_until_if(df_baseline)
df_parallel.name = "upload_movie_prefetch_parallel"
cascade.core.dataflows[DataflowRef("MovieId", "upload_movie_prefetch_parallel")] = df_parallel
runtime.add_dataflow(df_parallel)

print(cascade.core.dataflows.keys())

runtime.run()

if __name__ == "__main__":
Expand Down
Loading
Loading