diff --git a/nbs/API/01_actions.ipynb b/nbs/API/01_actions.ipynb index 6629cb9..ff3f8d2 100644 --- a/nbs/API/01_actions.ipynb +++ b/nbs/API/01_actions.ipynb @@ -92,7 +92,7 @@ "\n", "util.ACTION_REGISTRY: Dict[str, Callable] = {}\n", "\n", - "def register(name: str, description: Optional[str] = None):\n", + "def register(name: str, description: Optional[str] = None, overwrite: bool = False) -> Callable:\n", " \"\"\"\n", " Register your function as an action in the global registry.\n", " \n", @@ -104,9 +104,14 @@ " \\n • `ValueError`: If an action with the same name already exists\n", " \"\"\"\n", " def decorator(fn: Callable) -> Callable:\n", - " if name in util.ACTION_REGISTRY and util.ACTION_REGISTRY[name] != fn:\n", - " raise ValueError(f\"An action named '{name}' is already registered.\")\n", - " \n", + " if name in util.ACTION_REGISTRY and util.ACTION_REGISTRY[name] != fn and not overwrite:\n", + " util.logger.warning(f\"Action '{name}' already exists. Keeping the existing one.\")\n", + " return fn\n", + " if name in util.ACTION_REGISTRY and overwrite:\n", + " util.logger.warning(f\"Overwriting existing action '{name}' with new one.\")\n", + " # Remove the old action\n", + " del util.ACTION_REGISTRY[name]\n", + " \n", " # Add metadata to the function\n", " fn._action_name = name\n", " fn._action_description = description\n", diff --git a/nbs/API/02_jobs.ipynb b/nbs/API/02_jobs.ipynb index 309bb6b..9a06a1a 100644 --- a/nbs/API/02_jobs.ipynb +++ b/nbs/API/02_jobs.ipynb @@ -32,7 +32,17 @@ "cell_type": "code", "execution_count": null, "metadata": {}, - "outputs": [], + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "2025-05-14 20:09:06,829 - numexpr.utils - INFO - NumExpr defaulting to 10 threads.\n", + "2025-05-14 20:09:07,052 - triggerkit - INFO - Registered action: Register Views\n", + "2025-05-14 20:09:07,052 - triggerkit - INFO - Registered action: Get DDL and Register Views\n" + ] + } + ], "source": [ "#| export\n", "\n", @@ -103,6 +113,7 @@ " Returns:\n", " Job function\n", " \"\"\"\n", + " # TODO: Add print statements\n", " if isinstance(action_names, str):\n", " action_names = [action_names]\n", " \n", @@ -304,13 +315,15 @@ " scheduler.add_job(\n", " job_func,\n", " trigger=trigger,\n", - " id=name,\n", + " id=view_name,\n", + " name=view_name,\n", " replace_existing=True,\n", " executor=executor,\n", " misfire_grace_time=job_config.get('misfire_grace_time', 60),\n", " max_instances=job_config.get('max_instances', 3)\n", " )\n", - " \n", + " # TODO: Add print statement \n", + "\n", " util.logger.info(f\"Scheduled job '{name}' for view '{view_name}' with actions {actions} to run {schedule_description}\")\n", " util.SCHEDULED_JOBS[name] = {\n", " 'actions': actions,\n", @@ -333,7 +346,7 @@ "name": "stderr", "output_type": "stream", "text": [ - "2025-05-14 16:48:33,788 - triggerkit - INFO - Registered action: Create Job From View\n" + "2025-05-14 20:09:07,088 - triggerkit - INFO - Registered action: Create Job From View\n" ] } ], @@ -393,7 +406,7 @@ " existing_job['view_name'] == view_name and \n", " existing_job['executor'] == executor):\n", " util.logger.info(f\"Job '{name}' already exists with same configuration, skipping\")\n", - " results[view] = \"Job already exists with same configuration\"\n", + " results[view['TABLE_NAME']] = \"Job already exists with same configuration\"\n", " continue\n", "\n", " job_func = create(view_name, actions, name, executor)\n", @@ -406,7 +419,8 @@ " scheduler.add_job(\n", " job_func,\n", " trigger=trigger,\n", - " id=name,\n", + " id=view_name,\n", + " name=view_name,\n", " replace_existing=True,\n", " executor=executor,\n", " misfire_grace_time=view_config.get('misfire_grace_time', 60),\n", @@ -421,13 +435,13 @@ " 'executor': executor,\n", " 'enabled': True\n", " }\n", - " results[view] = f\"Successfully scheduled job '{name}'\"\n", + " results[view_name] = f\"Successfully scheduled job '{name}'\"\n", " except ValueError as e:\n", - " util.logger.warning(f\"Error scheduling job '{name}' from view '{view}': {e}\")\n", - " results[view] = f\"Error scheduling job: {str(e)}\"\n", + " util.logger.warning(f\"Error scheduling job '{name}' from view '{view_name}': {e}\")\n", + " results[view_name] = f\"Error scheduling job: {str(e)}\"\n", " except Exception as e:\n", - " util.logger.error(f\"Error processing view '{view}': {str(e)}\")\n", - " results[view] = f\"Error: {str(e)}\"\n", + " util.logger.error(f\"Error processing view '{view_name}': {str(e)}\")\n", + " results[view_name] = f\"Error: {str(e)}\"\n", " \n", " return results\n" ] diff --git a/nbs/API/database/snowflake.ipynb b/nbs/API/database/snowflake.ipynb index 2edafaa..116befd 100644 --- a/nbs/API/database/snowflake.ipynb +++ b/nbs/API/database/snowflake.ipynb @@ -144,6 +144,7 @@ " database=sf_config.get('database'),\n", " schema=sf_config.get('schema'),\n", " role=sf_config.get('role', 'ACCOUNTADMIN')\n", + " #,session_parameters={'QUERY_TAG': 'TRIGGERKIT'}\n", " )\n", " util.logger.info(\"Successfully connected to Snowflake\")\n", " return connection\n", diff --git a/triggerkit/actions.py b/triggerkit/actions.py index 6d4343d..2ff5f07 100644 --- a/triggerkit/actions.py +++ b/triggerkit/actions.py @@ -22,7 +22,7 @@ def run_sql(data): # %% ../nbs/API/01_actions.ipynb 8 util.ACTION_REGISTRY: Dict[str, Callable] = {} -def register(name: str, description: Optional[str] = None): +def register(name: str, description: Optional[str] = None, overwrite: bool = False) -> Callable: """ Register your function as an action in the global registry. @@ -34,9 +34,14 @@ def register(name: str, description: Optional[str] = None): \n • `ValueError`: If an action with the same name already exists """ def decorator(fn: Callable) -> Callable: - if name in util.ACTION_REGISTRY and util.ACTION_REGISTRY[name] != fn: - raise ValueError(f"An action named '{name}' is already registered.") - + if name in util.ACTION_REGISTRY and util.ACTION_REGISTRY[name] != fn and not overwrite: + util.logger.warning(f"Action '{name}' already exists. Keeping the existing one.") + return fn + if name in util.ACTION_REGISTRY and overwrite: + util.logger.warning(f"Overwriting existing action '{name}' with new one.") + # Remove the old action + del util.ACTION_REGISTRY[name] + # Add metadata to the function fn._action_name = name fn._action_description = description diff --git a/triggerkit/jobs.py b/triggerkit/jobs.py index e63b44b..6adfc75 100644 --- a/triggerkit/jobs.py +++ b/triggerkit/jobs.py @@ -58,6 +58,7 @@ def create(view_name: str, action_names: Union[str, List[str]], job_name: Option Returns: Job function """ + # TODO: Add print statements if isinstance(action_names, str): action_names = [action_names] @@ -244,13 +245,15 @@ def schedule_jobs(config: Dict[str, Any]): scheduler.add_job( job_func, trigger=trigger, - id=name, + id=view_name, + name=view_name, replace_existing=True, executor=executor, misfire_grace_time=job_config.get('misfire_grace_time', 60), max_instances=job_config.get('max_instances', 3) ) - + # TODO: Add print statement + util.logger.info(f"Scheduled job '{name}' for view '{view_name}' with actions {actions} to run {schedule_description}") util.SCHEDULED_JOBS[name] = { 'actions': actions, @@ -317,7 +320,7 @@ def create_job_from_view(data): existing_job['view_name'] == view_name and existing_job['executor'] == executor): util.logger.info(f"Job '{name}' already exists with same configuration, skipping") - results[view] = "Job already exists with same configuration" + results[view['TABLE_NAME']] = "Job already exists with same configuration" continue job_func = create(view_name, actions, name, executor) @@ -330,7 +333,8 @@ def create_job_from_view(data): scheduler.add_job( job_func, trigger=trigger, - id=name, + id=view_name, + name=view_name, replace_existing=True, executor=executor, misfire_grace_time=view_config.get('misfire_grace_time', 60), @@ -345,13 +349,13 @@ def create_job_from_view(data): 'executor': executor, 'enabled': True } - results[view] = f"Successfully scheduled job '{name}'" + results[view_name] = f"Successfully scheduled job '{name}'" except ValueError as e: - util.logger.warning(f"Error scheduling job '{name}' from view '{view}': {e}") - results[view] = f"Error scheduling job: {str(e)}" + util.logger.warning(f"Error scheduling job '{name}' from view '{view_name}': {e}") + results[view_name] = f"Error scheduling job: {str(e)}" except Exception as e: - util.logger.error(f"Error processing view '{view}': {str(e)}") - results[view] = f"Error: {str(e)}" + util.logger.error(f"Error processing view '{view_name}': {str(e)}") + results[view_name] = f"Error: {str(e)}" return results diff --git a/triggerkit/snowflake.py b/triggerkit/snowflake.py index 3f3e585..d4f2a8e 100644 --- a/triggerkit/snowflake.py +++ b/triggerkit/snowflake.py @@ -75,6 +75,7 @@ def connect(sf_config: Dict[str, Any]) -> SnowflakeConnection: database=sf_config.get('database'), schema=sf_config.get('schema'), role=sf_config.get('role', 'ACCOUNTADMIN') + #,session_parameters={'QUERY_TAG': 'TRIGGERKIT'} ) util.logger.info("Successfully connected to Snowflake") return connection