Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 9 additions & 4 deletions nbs/API/01_actions.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@
"\n",
"util.ACTION_REGISTRY: Dict[str, Callable] = {}\n",
"\n",
"def register(name: str, description: Optional[str] = None):\n",
"def register(name: str, description: Optional[str] = None, overwrite: bool = False) -> Callable:\n",
" \"\"\"\n",
" Register your function as an action in the global registry.\n",
" \n",
Expand All @@ -104,9 +104,14 @@
" \\n • `ValueError`: If an action with the same name already exists\n",
" \"\"\"\n",
" def decorator(fn: Callable) -> Callable:\n",
" if name in util.ACTION_REGISTRY and util.ACTION_REGISTRY[name] != fn:\n",
" raise ValueError(f\"An action named '{name}' is already registered.\")\n",
" \n",
" if name in util.ACTION_REGISTRY and util.ACTION_REGISTRY[name] != fn and not overwrite:\n",
" util.logger.warning(f\"Action '{name}' already exists. Keeping the existing one.\")\n",
" return fn\n",
" if name in util.ACTION_REGISTRY and overwrite:\n",
" util.logger.warning(f\"Overwriting existing action '{name}' with new one.\")\n",
" # Remove the old action\n",
" del util.ACTION_REGISTRY[name]\n",
" \n",
" # Add metadata to the function\n",
" fn._action_name = name\n",
" fn._action_description = description\n",
Expand Down
36 changes: 25 additions & 11 deletions nbs/API/02_jobs.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,17 @@
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"2025-05-14 20:09:06,829 - numexpr.utils - INFO - NumExpr defaulting to 10 threads.\n",
"2025-05-14 20:09:07,052 - triggerkit - INFO - Registered action: Register Views\n",
"2025-05-14 20:09:07,052 - triggerkit - INFO - Registered action: Get DDL and Register Views\n"
]
}
],
"source": [
"#| export\n",
"\n",
Expand Down Expand Up @@ -103,6 +113,7 @@
" Returns:\n",
" Job function\n",
" \"\"\"\n",
" # TODO: Add print statements\n",
" if isinstance(action_names, str):\n",
" action_names = [action_names]\n",
" \n",
Expand Down Expand Up @@ -304,13 +315,15 @@
" scheduler.add_job(\n",
" job_func,\n",
" trigger=trigger,\n",
" id=name,\n",
" id=view_name,\n",
" name=view_name,\n",
" replace_existing=True,\n",
" executor=executor,\n",
" misfire_grace_time=job_config.get('misfire_grace_time', 60),\n",
" max_instances=job_config.get('max_instances', 3)\n",
" )\n",
" \n",
" # TODO: Add print statement \n",
"\n",
" util.logger.info(f\"Scheduled job '{name}' for view '{view_name}' with actions {actions} to run {schedule_description}\")\n",
" util.SCHEDULED_JOBS[name] = {\n",
" 'actions': actions,\n",
Expand All @@ -333,7 +346,7 @@
"name": "stderr",
"output_type": "stream",
"text": [
"2025-05-14 16:48:33,788 - triggerkit - INFO - Registered action: Create Job From View\n"
"2025-05-14 20:09:07,088 - triggerkit - INFO - Registered action: Create Job From View\n"
]
}
],
Expand Down Expand Up @@ -393,7 +406,7 @@
" existing_job['view_name'] == view_name and \n",
" existing_job['executor'] == executor):\n",
" util.logger.info(f\"Job '{name}' already exists with same configuration, skipping\")\n",
" results[view] = \"Job already exists with same configuration\"\n",
" results[view['TABLE_NAME']] = \"Job already exists with same configuration\"\n",
" continue\n",
"\n",
" job_func = create(view_name, actions, name, executor)\n",
Expand All @@ -406,7 +419,8 @@
" scheduler.add_job(\n",
" job_func,\n",
" trigger=trigger,\n",
" id=name,\n",
" id=view_name,\n",
" name=view_name,\n",
" replace_existing=True,\n",
" executor=executor,\n",
" misfire_grace_time=view_config.get('misfire_grace_time', 60),\n",
Expand All @@ -421,13 +435,13 @@
" 'executor': executor,\n",
" 'enabled': True\n",
" }\n",
" results[view] = f\"Successfully scheduled job '{name}'\"\n",
" results[view_name] = f\"Successfully scheduled job '{name}'\"\n",
" except ValueError as e:\n",
" util.logger.warning(f\"Error scheduling job '{name}' from view '{view}': {e}\")\n",
" results[view] = f\"Error scheduling job: {str(e)}\"\n",
" util.logger.warning(f\"Error scheduling job '{name}' from view '{view_name}': {e}\")\n",
" results[view_name] = f\"Error scheduling job: {str(e)}\"\n",
" except Exception as e:\n",
" util.logger.error(f\"Error processing view '{view}': {str(e)}\")\n",
" results[view] = f\"Error: {str(e)}\"\n",
" util.logger.error(f\"Error processing view '{view_name}': {str(e)}\")\n",
" results[view_name] = f\"Error: {str(e)}\"\n",
" \n",
" return results\n"
]
Expand Down
1 change: 1 addition & 0 deletions nbs/API/database/snowflake.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@
" database=sf_config.get('database'),\n",
" schema=sf_config.get('schema'),\n",
" role=sf_config.get('role', 'ACCOUNTADMIN')\n",
" #,session_parameters={'QUERY_TAG': 'TRIGGERKIT'}\n",
" )\n",
" util.logger.info(\"Successfully connected to Snowflake\")\n",
" return connection\n",
Expand Down
13 changes: 9 additions & 4 deletions triggerkit/actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ def run_sql(data):
# %% ../nbs/API/01_actions.ipynb 8
util.ACTION_REGISTRY: Dict[str, Callable] = {}

def register(name: str, description: Optional[str] = None):
def register(name: str, description: Optional[str] = None, overwrite: bool = False) -> Callable:
"""
Register your function as an action in the global registry.

Expand All @@ -34,9 +34,14 @@ def register(name: str, description: Optional[str] = None):
\n • `ValueError`: If an action with the same name already exists
"""
def decorator(fn: Callable) -> Callable:
if name in util.ACTION_REGISTRY and util.ACTION_REGISTRY[name] != fn:
raise ValueError(f"An action named '{name}' is already registered.")

if name in util.ACTION_REGISTRY and util.ACTION_REGISTRY[name] != fn and not overwrite:
util.logger.warning(f"Action '{name}' already exists. Keeping the existing one.")
return fn
if name in util.ACTION_REGISTRY and overwrite:
util.logger.warning(f"Overwriting existing action '{name}' with new one.")
# Remove the old action
del util.ACTION_REGISTRY[name]

# Add metadata to the function
fn._action_name = name
fn._action_description = description
Expand Down
22 changes: 13 additions & 9 deletions triggerkit/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ def create(view_name: str, action_names: Union[str, List[str]], job_name: Option
Returns:
Job function
"""
# TODO: Add print statements
if isinstance(action_names, str):
action_names = [action_names]

Expand Down Expand Up @@ -244,13 +245,15 @@ def schedule_jobs(config: Dict[str, Any]):
scheduler.add_job(
job_func,
trigger=trigger,
id=name,
id=view_name,
name=view_name,
replace_existing=True,
executor=executor,
misfire_grace_time=job_config.get('misfire_grace_time', 60),
max_instances=job_config.get('max_instances', 3)
)

# TODO: Add print statement

util.logger.info(f"Scheduled job '{name}' for view '{view_name}' with actions {actions} to run {schedule_description}")
util.SCHEDULED_JOBS[name] = {
'actions': actions,
Expand Down Expand Up @@ -317,7 +320,7 @@ def create_job_from_view(data):
existing_job['view_name'] == view_name and
existing_job['executor'] == executor):
util.logger.info(f"Job '{name}' already exists with same configuration, skipping")
results[view] = "Job already exists with same configuration"
results[view['TABLE_NAME']] = "Job already exists with same configuration"
continue

job_func = create(view_name, actions, name, executor)
Expand All @@ -330,7 +333,8 @@ def create_job_from_view(data):
scheduler.add_job(
job_func,
trigger=trigger,
id=name,
id=view_name,
name=view_name,
replace_existing=True,
executor=executor,
misfire_grace_time=view_config.get('misfire_grace_time', 60),
Expand All @@ -345,13 +349,13 @@ def create_job_from_view(data):
'executor': executor,
'enabled': True
}
results[view] = f"Successfully scheduled job '{name}'"
results[view_name] = f"Successfully scheduled job '{name}'"
except ValueError as e:
util.logger.warning(f"Error scheduling job '{name}' from view '{view}': {e}")
results[view] = f"Error scheduling job: {str(e)}"
util.logger.warning(f"Error scheduling job '{name}' from view '{view_name}': {e}")
results[view_name] = f"Error scheduling job: {str(e)}"
except Exception as e:
util.logger.error(f"Error processing view '{view}': {str(e)}")
results[view] = f"Error: {str(e)}"
util.logger.error(f"Error processing view '{view_name}': {str(e)}")
results[view_name] = f"Error: {str(e)}"

return results

Expand Down
1 change: 1 addition & 0 deletions triggerkit/snowflake.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ def connect(sf_config: Dict[str, Any]) -> SnowflakeConnection:
database=sf_config.get('database'),
schema=sf_config.get('schema'),
role=sf_config.get('role', 'ACCOUNTADMIN')
#,session_parameters={'QUERY_TAG': 'TRIGGERKIT'}
)
util.logger.info("Successfully connected to Snowflake")
return connection
Expand Down