Skip to content

Commit

Permalink
Olivia/quickstart (#2052)
Browse files Browse the repository at this point in the history
* updated python starter code

* streaming docs

* first draft scripts

* ingest

* fix imports

* fix python/ts distinctions

* remove params

* clean up

* trigger via moose API make python only

* workflows cron update

* pydantic

* quickstart fix

* starter code for ts and python

* update e2e tests

* data passing ts, fix python MV

* chris ingest feedback

* chris feedback + workflow ingest

* workflow ts syntax fix

---------

Co-authored-by: Jonathan Widjaja <[email protected]>
  • Loading branch information
okane16 and DatGuyJonathan authored Feb 21, 2025
1 parent a2ca360 commit 3973558
Show file tree
Hide file tree
Showing 23 changed files with 1,816 additions and 1,025 deletions.
28 changes: 12 additions & 16 deletions apps/framework-cli-e2e/test/cli.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -150,14 +150,13 @@ describe("framework-cli", () => {

const eventId = randomUUID();

const response = await fetch("http://localhost:4000/ingest/UserActivity", {
const response = await fetch("http://localhost:4000/ingest/Foo", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({
eventId: eventId,
timestamp: "2019-01-01 00:00:01",
userId: "123456",
activity: "Login",
primaryKey: eventId,
timestamp: 1739990000,
optionalText: "Hello world",
}),
});

Expand All @@ -183,20 +182,17 @@ describe("framework-cli", () => {

try {
const result = await client.query({
query: "SELECT * FROM ParsedActivity_0_0",
query: "SELECT * FROM Foo_0_0",
format: "JSONEachRow",
});
const rows: any[] = await result.json();
console.log("ParsedActivity data:", rows);
console.log("Foo data:", rows);

expect(rows).to.have.lengthOf(
1,
"Expected exactly one row in ParsedActivity",
);
expect(rows).to.have.lengthOf(1, "Expected exactly one row in Foo");

expect(rows[0].eventId).to.equal(
expect(rows[0].primaryKey).to.equal(
eventId,
"EventId in ParsedActivity should match the generated UUID",
"PrimaryKey in Foo should match the generated UUID",
);
} catch (error) {
console.error("Error querying ClickHouse:", error);
Expand All @@ -207,16 +203,16 @@ describe("framework-cli", () => {

console.log("Sending consumption request...");
const consumptionResponse = await fetch(
"http://localhost:4000/consumption/dailyActiveUsers?minDailyActiveUsers=1",
"http://localhost:4000/consumption/bar?orderBy=totalRows",
);

if (consumptionResponse.ok) {
console.log("Test request sent successfully");
let json = await consumptionResponse.json();
expect(json).to.deep.equal([
{
date: "2019-01-01",
dailyActiveUsers: "1",
dayOfMonth: 21,
totalRows: "1",
},
]);
} else {
Expand Down
191 changes: 108 additions & 83 deletions apps/framework-cli/src/framework/python/templates.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,27 +10,28 @@ pub enum PythonRenderingError {
}

pub static PYTHON_BASE_MODEL_TEMPLATE: &str = r#"
# This file was auto-generated by the framework. You can add data models or change the existing ones
from moose_lib import Key, moose_data_model
from dataclasses import dataclass
from datetime import datetime
from typing import Optional
# This decorator is used to mark the data model as a Moose data model
@moose_data_model
@dataclass
class UserActivity:
eventId: Key[str]
timestamp: str
userId: str
activity: str
class Foo:
primary_key: Key[str]
timestamp: float
optional_text: Optional[str]
@moose_data_model
@dataclass
class ParsedActivity:
eventId: Key[str]
timestamp: datetime
userId: str
activity: str
class Bar:
primary_key: Key[str]
utc_timestamp: datetime
has_text: bool
text_length: int
"#;

pub static SETUP_PY_TEMPLATE: &str = r#"
Expand All @@ -48,21 +49,23 @@ setup(
"#;

pub static PYTHON_BASE_STREAMING_FUNCTION_SAMPLE: &str = r#"
## Example Streaming Function: Transforms Foo data to Bar data
from moose_lib import StreamingFunction
from app.datamodels.models import Foo, Bar # Import your Moose data models to use in the streaming function
from datetime import datetime
from app.datamodels.models import UserActivity, ParsedActivity
def parse_activity(activity: UserActivity) -> ParsedActivity:
return ParsedActivity(
eventId=activity.eventId,
timestamp=datetime.fromisoformat(activity.timestamp),
userId=activity.userId,
activity=activity.activity,
# The 'run' function contains the logic that runs on each new data point in the Foo stream.
# For more details on how Moose streaming functions work, see: https://docs.moosejs.com
def run(foo: Foo) -> Bar:
return Bar(
primary_key=foo.primary_key,
utc_timestamp=datetime.fromtimestamp(foo.timestamp),
has_text=foo.optional_text is not None,
text_length=len(foo.optional_text) if foo.optional_text else 0
)
my_function = StreamingFunction(
run=parse_activity
)
Foo__Bar = StreamingFunction(run=run) # Register the run function as a streaming function.
"#;

pub static PYTHON_BASE_STREAMING_FUNCTION_TEMPLATE: &str = r#"
Expand All @@ -83,35 +86,73 @@ my_function = StreamingFunction(

pub static PYTHON_BASE_CONSUMPTION_TEMPLATE: &str = r#"
# This file is where you can define your API templates for consuming your data
# All query_params are passed in as strings, and are used within the sql tag to parameterize you queries
#Query params are passed in as Pydantic models and are used within the sql tag to parameterize you queries
from moose_lib import MooseClient
from pydantic import BaseModel
class QueryParams(BaseModel):
limit: Optional[int] = 10
def run(client: MooseClient, params):
return client.query("SELECT 1", { })
def run(client: MooseClient, params: QueryParams):
return client.query.execute("SELECT 1", {})
"#;

pub static PYTHON_BASE_API_SAMPLE: &str = r#"
# This file is where you can define your API templates for consuming your data
from moose_lib import MooseClient
from pydantic import BaseModel, Field
from typing import Optional
def run(client: MooseClient, params):
minDailyActiveUsers = int(params.get('minDailyActiveUsers', [0])[0])
limit = int(params.get('limit', [10])[0])
return client.query(
'''SELECT
date,
uniqMerge(dailyActiveUsers) as dailyActiveUsers
FROM DailyActiveUsers
GROUP BY date
HAVING dailyActiveUsers >= {minDailyActiveUsers}
ORDER BY date
LIMIT {limit}''',
{
"minDailyActiveUsers": minDailyActiveUsers,
"limit": limit
}
# Query params are defined as Pydantic models and are validated automatically
class QueryParams(BaseModel):
order_by: Optional[str] = Field(
default="total_rows",
pattern=r"^(total_rows|rows_with_text|max_text_length|total_text_length)$",
description="Must be one of: total_rows, rows_with_text, max_text_length, total_text_length"
)
limit: Optional[int] = Field(
default=5,
gt=0,
le=100,
description="Must be between 1 and 100"
)
start_day: Optional[int] = Field(
default=1,
gt=0,
le=31,
description="Must be between 1 and 31"
)
end_day: Optional[int] = Field(
default=31,
gt=0,
le=31,
description="Must be between 1 and 31"
)
## The run function is where you can define your API logic
def run(client: MooseClient, params: QueryParams):
start_day = params.start_day
end_day = params.end_day
limit = params.limit
order_by = params.order_by
query = f"""
SELECT
day_of_month,
{order_by}
FROM BarAggregated_MV
WHERE day_of_month >= {start_day}
AND day_of_month <= {end_day}
ORDER BY {order_by} DESC
LIMIT {limit}
"""
return client.query.execute(query, {"order_by": order_by, "start_day": start_day, "end_day": end_day, "limit": limit})
"#;

pub static PYTHON_BASE_BLOCKS_TEMPLATE: &str = r#"
Expand Down Expand Up @@ -139,57 +180,41 @@ block = Blocks(teardown=teardown_queries, setup=setup_queries)
"#;

pub static PYTHON_BASE_BLOCKS_SAMPLE: &str = r#"
# Here is a sample aggregation query that calculates the number of daily active users
# based on the number of unique users who complete a sign-in activity each day.
# This block is used to aggregate the data from the Bar table into a materialized view
from moose_lib import (
AggregationCreateOptions,
AggregationDropOptions,
Blocks,
ClickHouseEngines,
TableCreateOptions,
create_aggregation,
drop_aggregation,
Blocks
)
destination_table = "DailyActiveUsers"
materialized_view = "DailyActiveUsers_mv"
select_sql = """
SELECT
toStartOfDay(timestamp) as date,
uniqState(userId) as dailyActiveUsers
FROM ParsedActivity_0_0
WHERE activity = 'Login'
GROUP BY toStartOfDay(timestamp)
MV_NAME = "BarAggregated_MV" # The name of the materialized view
# The query to create the materialized view, which is executed when the block is set up
MV_QUERY = """
CREATE MATERIALIZED VIEW BarAggregated_MV
ENGINE = MergeTree()
ORDER BY day_of_month
POPULATE
AS
SELECT
toDayOfMonth(utc_timestamp) as day_of_month,
count(primary_key) as total_rows,
countIf(has_text) as rows_with_text,
sum(text_length) as total_text_length,
max(text_length) as max_text_length
FROM Bar_0_0
GROUP BY toDayOfMonth(utc_timestamp)
"""
teardown_queries = drop_aggregation(
AggregationDropOptions(materialized_view, destination_table)
)
table_options = TableCreateOptions(
name=destination_table,
columns={"date": "Date", "dailyActiveUsers": "AggregateFunction(uniq, String)"},
engine=ClickHouseEngines.MergeTree,
order_by="date",
)
aggregation_options = AggregationCreateOptions(
table_create_options=table_options,
materialized_view_name=materialized_view,
select=select_sql,
)
setup_queries = create_aggregation(aggregation_options)
# The query to drop the materialized view, which is executed when the block is torn down
DROP_MV_QUERY = f"DROP TABLE IF EXISTS {MV_NAME}"
block = Blocks(teardown=teardown_queries, setup=setup_queries)
# The block to create the materialized view
block = Blocks(teardown=[DROP_MV_QUERY], setup=[MV_QUERY])
"#;

pub static PYTHON_BASE_SCRIPT_TEMPLATE: &str = r#"from moose_lib import task
@task()
def {{name}}(): # The name of your script
def {{name}}(data: dict): # The name of your script
"""
Description of what this script does
"""
Expand Down
Loading

0 comments on commit 3973558

Please sign in to comment.