diff --git a/src/fabric_cli/client/fab_api_semantic_model.py b/src/fabric_cli/client/fab_api_semantic_model.py new file mode 100644 index 00000000..eed1bd5b --- /dev/null +++ b/src/fabric_cli/client/fab_api_semantic_model.py @@ -0,0 +1,75 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +from argparse import Namespace + +from fabric_cli.client import fab_api_client as fabric_api +from fabric_cli.client.fab_api_types import ApiResponse + + +def refresh_semantic_model(args: Namespace, payload: str) -> ApiResponse: + """https://learn.microsoft.com/en-us/rest/api/power-bi/datasets/refresh-dataset-in-group""" + original_wait = getattr(args, "wait", False) + + args.uri = f"groups/{args.ws_id}/datasets/{args.item_id}/refreshes" + args.method = "post" + args.audience = "powerbi" + args.wait = ( + False # Disable automatic long-running operation polling for the HTTP request + ) + + response = fabric_api.do_request(args, data=payload) + + args.wait = original_wait + + return response + + +def get_refresh_execution_details(args: Namespace) -> ApiResponse: + """ + https://learn.microsoft.com/en-us/rest/api/power-bi/datasets/get-refresh-execution-details-in-group + """ + refresh_id = getattr(args, "instance_id", None) or getattr(args, "refresh_id", None) + + if not refresh_id: + raise ValueError("args must contain either 'instance_id' or 'refresh_id'") + + args.uri = f"groups/{args.ws_id}/datasets/{args.item_id}/refreshes/{refresh_id}" + args.method = "get" + args.audience = "powerbi" + args.wait = False + + return fabric_api.do_request(args) + + +def get_refresh_execution_details_by_url( + args: Namespace, refresh_url: str +) -> ApiResponse: + """https://learn.microsoft.com/en-us/rest/api/power-bi/datasets/get-refresh-execution-details-in-group""" + from urllib.parse import urlparse + + parsed_url = urlparse(refresh_url) + + path_parts = parsed_url.path.split("/v1.0/myorg/", 1) + if len(path_parts) == 2: + uri = path_parts[1] + else: + uri = parsed_url.path.lstrip("/") + + hostname = parsed_url.netloc + + args.uri = uri + args.method = "get" + args.audience = "powerbi" + args.wait = False + + return fabric_api.do_request(args, hostname=hostname) + + +def cancel_refresh(args: Namespace, refresh_id: str) -> ApiResponse: + """https://learn.microsoft.com/en-us/rest/api/power-bi/datasets/cancel-refresh-in-group""" + args.uri = f"groups/{args.ws_id}/datasets/{args.item_id}/refreshes/{refresh_id}" + args.method = "delete" + args.audience = "powerbi" + + return fabric_api.do_request(args) diff --git a/src/fabric_cli/commands/jobs/fab_jobs_run.py b/src/fabric_cli/commands/jobs/fab_jobs_run.py index 9569ede7..5a9e1b8a 100644 --- a/src/fabric_cli/commands/jobs/fab_jobs_run.py +++ b/src/fabric_cli/commands/jobs/fab_jobs_run.py @@ -7,12 +7,106 @@ from fabric_cli.client import fab_api_jobs as jobs_api from fabric_cli.core import fab_constant as con from fabric_cli.core import fab_state_config as config +from fabric_cli.core.fab_types import FabricJobType from fabric_cli.core.hiearchy.fab_hiearchy import Item from fabric_cli.utils import fab_cmd_job_utils as utils_job from fabric_cli.utils import fab_ui def exec_command(args: Namespace, item: Item) -> None: + if item.job_type == FabricJobType.SEMANTIC_MODEL_REFRESH: + _exec_semantic_model_refresh(args, item) + else: + _exec_fabric_job(args, item) + + +def _cancel_fabric_job(args: Namespace, job_id: str) -> None: + args.instance_id = job_id + response = jobs_api.cancel_item_job_instance(args) + if response.status_code == 202: + fab_ui.print_output_format( + args, + message=f"Job instance '{job_id}' cancelled (async)", + ) + + +def _cancel_semantic_model_refresh(args: Namespace, refresh_id: str) -> None: + from fabric_cli.client import fab_api_semantic_model as semantic_model_api + + response = semantic_model_api.cancel_refresh(args, refresh_id) + if response.status_code == 200: + fab_ui.print_output_format( + args, + message=f"Job instance '{refresh_id}' cancelled (async)", + ) + + +def _wait_for_job_with_timeout( + args: Namespace, + job_id: str, + response, + is_semantic_model_refresh: bool, + cancel_func, +) -> None: + fab_ui.print_grey(f"∟ Job instance '{job_id}' created'") + timeout = getattr(args, "timeout", None) + if timeout is not None: + fab_ui.print_grey(f"∟ Timeout: {timeout} seconds") + else: + fab_ui.print_grey("∟ Timeout: no timeout specified") + + try: + utils_job.wait_for_job_completion( + args, + job_id, + response, + timeout=timeout, + custom_polling_interval=getattr(args, "polling_interval", None), + is_semantic_model_refresh=is_semantic_model_refresh, + ) + except TimeoutError as e: + fab_ui.print_warning(str(e)) + if config.get_config(con.FAB_JOB_CANCEL_ONTIMEOUT) == "false": + fab_ui.print_grey( + f"Job still running. To change this behaviour and cancel on timeout, set {con.FAB_JOB_CANCEL_ONTIMEOUT} config property to 'true'" + ) + else: + fab_ui.print_grey( + f"Cancelling job instance '{job_id}' (timeout). To change this behaviour and continue running on timeout, set {con.FAB_JOB_CANCEL_ONTIMEOUT} config property to 'false'" + ) + + if config.get_config(con.FAB_JOB_CANCEL_ONTIMEOUT) != "false": + cancel_func(args, job_id) + + +def _handle_job_response( + args: Namespace, + item: Item, + response, + job_id: str, + is_semantic_model_refresh: bool, +) -> None: + if args.wait: + cancel_func = ( + _cancel_semantic_model_refresh + if is_semantic_model_refresh + else _cancel_fabric_job + ) + _wait_for_job_with_timeout( + args=args, + job_id=job_id, + response=response, + is_semantic_model_refresh=is_semantic_model_refresh, + cancel_func=cancel_func, + ) + else: + fab_ui.print_output_format(args, message=f"Job instance '{job_id}' created") + fab_ui.print_grey( + f"→ To see status run 'job run-status {item.path} --id {job_id}'" + ) + + +def _exec_fabric_job(args: Namespace, item: Item) -> None: if getattr(args, "configuration", None) is not None: payload = json.dumps({"executionData": json.loads(args.configuration)}) else: @@ -21,45 +115,30 @@ def exec_command(args: Namespace, item: Item) -> None: (response, job_instance_id) = jobs_api.run_on_demand_item_job(args, payload) if response.status_code == 202: - if args.wait: - fab_ui.print_grey(f"∟ Job instance '{job_instance_id}' created") - timeout = getattr(args, "timeout", None) - if timeout is not None: - fab_ui.print_grey(f"∟ Timeout: {timeout} seconds") - else: - fab_ui.print_grey("∟ Timeout: no timeout specified") - - try: - utils_job.wait_for_job_completion( - args, - job_instance_id, - response, - timeout=timeout, - custom_polling_interval=getattr(args, "polling_interval", None), - ) - except TimeoutError as e: - fab_ui.print_warning(str(e)) - # Get the configuration to check if we should cancel the job - if config.get_config(con.FAB_JOB_CANCEL_ONTIMEOUT) == "false": - fab_ui.print_grey( - f"Job still running. To change this behaviour and cancel on timeout, set {con.FAB_JOB_CANCEL_ONTIMEOUT} config property to 'true'" - ) - else: - fab_ui.print_grey( - f"Cancelling job instance '{job_instance_id}' (timeout). To change this behaviour and continue running on timeout, set {con.FAB_JOB_CANCEL_ONTIMEOUT} config property to 'false'" - ) - args.instance_id = job_instance_id - response = jobs_api.cancel_item_job_instance(args) - if response.status_code == 202: - fab_ui.print_output_format( - args, - message=f"Job instance '{args.instance_id}' cancelled (async)", - ) + _handle_job_response( + args=args, + item=item, + response=response, + job_id=job_instance_id, + is_semantic_model_refresh=False, + ) - else: - fab_ui.print_output_format( - args, message=f"Job instance '{job_instance_id}' created" - ) - fab_ui.print_grey( - f"→ To see status run 'job run-status {item.path} --id {job_instance_id}'" - ) + +def _exec_semantic_model_refresh(args: Namespace, item: Item) -> None: + from fabric_cli.client import fab_api_semantic_model as semantic_model_api + from fabric_cli.utils import fab_cmd_semantic_model_utils as sm_utils + + payload = json.dumps({"retryCount": 0}) + + response = semantic_model_api.refresh_semantic_model(args, payload) + + if response.status_code == 202: + refresh_id = sm_utils.extract_semantic_model_refresh_id(response) + + _handle_job_response( + args=args, + item=item, + response=response, + job_id=refresh_id, + is_semantic_model_refresh=True, + ) diff --git a/src/fabric_cli/commands/jobs/fab_jobs_run_status.py b/src/fabric_cli/commands/jobs/fab_jobs_run_status.py index 087010ce..fbf39d6c 100644 --- a/src/fabric_cli/commands/jobs/fab_jobs_run_status.py +++ b/src/fabric_cli/commands/jobs/fab_jobs_run_status.py @@ -5,11 +5,19 @@ from argparse import Namespace from fabric_cli.client import fab_api_jobs as jobs_api +from fabric_cli.core.fab_types import FabricJobType from fabric_cli.core.hiearchy.fab_hiearchy import Item from fabric_cli.utils import fab_ui def exec_command(args: Namespace, context: Item) -> None: + if context.job_type == FabricJobType.SEMANTIC_MODEL_REFRESH: + _exec_semantic_model_status(args, context) + else: + _exec_fabric_job_status(args, context) + + +def _exec_fabric_job_status(args: Namespace, context: Item) -> None: if args.schedule: args.schedule_id = args.id response = jobs_api.get_item_schedule(args) @@ -20,3 +28,69 @@ def exec_command(args: Namespace, context: Item) -> None: if response.status_code == 200: content = json.loads(response.text) fab_ui.print_output_format(args, data=content, show_headers=True) + + +def _exec_semantic_model_status(args: Namespace, context: Item) -> None: + from fabric_cli.client import fab_api_semantic_model as semantic_model_api + + if args.schedule: + fab_ui.print_warning( + "Schedule status not supported for semantic models via this command. " + "Use Power BI portal or API to manage refresh schedules." + ) + return + + args.ws_id = context.workspace.id + args.item_id = context.id + args.instance_id = args.id + + response = semantic_model_api.get_refresh_execution_details(args) + + if response.status_code == 200: + content = json.loads(response.text) + transformed = _transform_to_job_instance_format(content, args.id, context.id) + fab_ui.print_output_format(args, data=transformed, show_headers=True) + # get execution details can retun 202, need to handle + # elif response.status_code == 202: + # # For 202 (Accepted) responses, log the entire response as-is + # content = json.loads(response.text) + # fab_ui.print_output_format(args, data=content, show_headers=True) + + +def _transform_to_job_instance_format( + content: dict, refresh_id: str, item_id: str +) -> dict: + status = content.get("extendedStatus") or content.get("status") + + transformed = { + "id": refresh_id, + "itemId": item_id, + "jobType": "RefreshSemanticModel", + "invokeType": content.get("currentRefreshType", "Unknown"), + "status": status, + "startTimeUtc": content.get("startTime"), + "endTimeUtc": content.get("endTime"), + } + + if status and status.lower() not in ["completed", "success"]: + failure_reason = _extract_failure_reason(content.get("messages", [])) + if failure_reason: + transformed["failureReason"] = failure_reason + + return transformed + + +def _extract_failure_reason(messages: list) -> dict: + if not messages: + return None + + error_messages = [ + {"code": msg.get("code"), "message": msg.get("message")} + for msg in messages + if msg.get("type") == "Error" + ] + + if not error_messages: + return None + + return {"errors": error_messages} diff --git a/src/fabric_cli/core/fab_config/command_support.yaml b/src/fabric_cli/core/fab_config/command_support.yaml index 822aa980..c2e3893d 100644 --- a/src/fabric_cli/core/fab_config/command_support.yaml +++ b/src/fabric_cli/core/fab_config/command_support.yaml @@ -45,13 +45,24 @@ commands: - notebook - data_pipeline - lakehouse + - semantic_model subcommands: run: run_cancel: + unsupported_items: + - semantic_model run_list: + unsupported_items: + - semantic_model run_update: + unsupported_items: + - semantic_model run_sch: + unsupported_items: + - semantic_model run-rm: + unsupported_items: + - semantic_model run_status: run_wait: diff --git a/src/fabric_cli/core/fab_types.py b/src/fabric_cli/core/fab_types.py index 2888f625..995ceed5 100644 --- a/src/fabric_cli/core/fab_types.py +++ b/src/fabric_cli/core/fab_types.py @@ -326,6 +326,7 @@ class FabricJobType(Enum): RUN_NOTEBOOK = "RunNotebook" PIPELINE = "Pipeline" TABLE_MAINTENANCE = "TableMaintenance" + SEMANTIC_MODEL_REFRESH = "SemanticModelRefresh" ITJobMap: dict[ItemType, FabricJobType] = { @@ -337,6 +338,8 @@ class FabricJobType(Enum): ItemType.DATA_PIPELINE: FabricJobType.PIPELINE, # {"tableName": "orders", "optimizeSettings": {"vOrder": true, "zOrderBy": ["account_id"]}, "vacuumSettings": {"retentionPeriod": "7.01:00:00"}} ItemType.LAKEHOUSE: FabricJobType.TABLE_MAINTENANCE, + # {"applyRefreshPolicy": true, "commitMode": "Transactional", "retryCount": 3} + ItemType.SEMANTIC_MODEL: FabricJobType.SEMANTIC_MODEL_REFRESH, } ################################### diff --git a/src/fabric_cli/errors/__init__.py b/src/fabric_cli/errors/__init__.py index d4c61be5..aa3f2857 100644 --- a/src/fabric_cli/errors/__init__.py +++ b/src/fabric_cli/errors/__init__.py @@ -9,6 +9,7 @@ from .cp import CpErrors from .export import ExportErrors from .hierarchy import HierarchyErrors +from .job import JobErrors from .labels import LabelsErrors from .mkdir import MkdirErrors from .mv import MvErrors @@ -25,6 +26,7 @@ class ErrorMessages: Cp = CpErrors Export = ExportErrors Hierarchy = HierarchyErrors + Job = JobErrors Labels = LabelsErrors Mkdir = MkdirErrors Mv = MvErrors diff --git a/src/fabric_cli/errors/job.py b/src/fabric_cli/errors/job.py new file mode 100644 index 00000000..8b76b5ec --- /dev/null +++ b/src/fabric_cli/errors/job.py @@ -0,0 +1,15 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + + +class JobErrors: + @staticmethod + def refresh_id_not_found() -> str: + return ( + "Failed to extract refresh ID from API response. " + "Neither Location header nor RequestId header present in response." + ) + + @staticmethod + def semantic_model_refresh_params_not_supported() -> str: + return "Semantic model refresh does not support -P/--params or -i/--input parameters. " diff --git a/src/fabric_cli/utils/fab_cmd_job_utils.py b/src/fabric_cli/utils/fab_cmd_job_utils.py index 1e537cd3..7d3c1883 100644 --- a/src/fabric_cli/utils/fab_cmd_job_utils.py +++ b/src/fabric_cli/utils/fab_cmd_job_utils.py @@ -8,6 +8,7 @@ from typing import Any, Optional from requests.structures import CaseInsensitiveDict + from fabric_cli.client import fab_api_jobs as jobs_api from fabric_cli.client.fab_api_types import ApiResponse from fabric_cli.core import fab_constant, fab_logger @@ -15,8 +16,8 @@ from fabric_cli.core.fab_types import FabricJobType from fabric_cli.core.hiearchy.fab_hiearchy import Item from fabric_cli.errors import ErrorMessages -from fabric_cli.utils.fab_http_polling_utils import get_polling_interval from fabric_cli.utils import fab_ui +from fabric_cli.utils.fab_http_polling_utils import get_polling_interval def add_item_props_to_args(args: Namespace, context: Item) -> None: @@ -31,6 +32,16 @@ def add_item_props_to_args(args: Namespace, context: Item) -> None: def build_config_from_args( args: Namespace, item: Item, schedule: Optional[bool] = False ): + # Semantic model refresh does not support -P/--params or -i/--input parameters + if item.job_type == FabricJobType.SEMANTIC_MODEL_REFRESH: + if getattr(args, "params", None) or getattr(args, "input", None): + raise FabricCLIError( + ErrorMessages.Job.semantic_model_refresh_params_not_supported(), + fab_constant.ERROR_NOT_RUNNABLE, + ) + # No configuration needed for semantic model refresh + return + # Input and Params cannot be used at the same time because they affect the same property if getattr(args, "input", None) and ( getattr(args, "params", None) or getattr(args, "config", None) @@ -50,10 +61,11 @@ def build_config_from_args( def wait_for_job_completion( job_args: Namespace, - job_ins_id, + job_ins_id: str, job_response: ApiResponse, timeout: Optional[int] = None, - custom_polling_interval: Optional[int] = None + custom_polling_interval: Optional[int] = None, + is_semantic_model_refresh: bool = False, ) -> None: args = Namespace( ws_id=getattr(job_args, "ws_id", None), @@ -66,12 +78,22 @@ def wait_for_job_completion( ), output_format=getattr(job_args, "output_format", None), ) + + if is_semantic_model_refresh: + from fabric_cli.utils import fab_cmd_semantic_model_utils as sm_utils + + get_status_func = sm_utils.create_refresh_status_function(job_response, args) + else: + get_status_func = jobs_api.get_item_job_instance + attempts = 0 status = "NotStarted" content = None - initial_interval = get_polling_interval(job_response.headers, custom_polling_interval) - if (timeout is not None and timeout < initial_interval): + initial_interval = get_polling_interval( + job_response.headers, custom_polling_interval + ) + if timeout is not None and timeout < initial_interval: initial_interval = timeout time.sleep(initial_interval) total_wait_time = initial_interval @@ -79,22 +101,31 @@ def wait_for_job_completion( # Retry until the job is completed or the timeout is reached while timeout is None or total_wait_time < timeout: _t1 = time.time() - response = jobs_api.get_item_job_instance(args) + response = get_status_func(args) api_call_time = int(time.time() - _t1) - + # Add API call time to total wait time total_wait_time += api_call_time - if response.status_code == 200: + if response.status_code == 200 or ( + is_semantic_model_refresh and response.status_code == 202 + ): content = json.loads(response.text) - status = content["status"] - # Available statuses are: NotStarted, InProgress, Completed, Deduped, Failed, Cancelled - if status in ["Completed", "Cancelled", "Deduped"]: + if is_semantic_model_refresh: + status = content.get( + "extendedStatus", content.get("status", "Unavailable") + ) + else: + status = content.get("status", "Unknown") + + # Available statuses: NotStarted, InProgress, Completed, Deduped, Failed, Cancelled, TimedOut, Disabled + if status in ["Completed", "Cancelled", "Deduped", "TimedOut", "Disabled"]: fab_ui.print_progress(f"Job instance status: {status}") if status == "Completed": fab_ui.print_output_format( - args, message=f"Job instance '{job_ins_id}' completed" + args, + message=f"Job instance '{job_ins_id}' completed successfully", ) else: fab_logger.log_warning( @@ -103,15 +134,20 @@ def wait_for_job_completion( return elif status == "Failed": fab_ui.print_entries_unix_style([content], content.keys(), header=True) + raise FabricCLIError( - ErrorMessages.Common.job_instance_failed(job_ins_id), - fab_constant.ERROR_JOB_FAILED, - ) - elif status in ["NotStarted", "InProgress"]: + ErrorMessages.Common.job_instance_failed(job_ins_id), + fab_constant.ERROR_JOB_FAILED, + ) + elif status in ["NotStarted", "InProgress"] or ( + status == "Unknown" and is_semantic_model_refresh + ): fab_ui.print_progress(f"Job instance status: {status}") - - interval = get_polling_interval(response.headers, custom_polling_interval) - + + interval = get_polling_interval( + response.headers, custom_polling_interval + ) + time.sleep(interval) total_wait_time += interval @@ -128,8 +164,6 @@ def wait_for_job_completion( ) - - def _extract_times(times: str) -> list[str]: # Extract the times from the input string time_list = times.strip("[]").split(",") @@ -405,7 +439,7 @@ def _process_params(args: Namespace, item: Item) -> None: ) if getattr(args, "configuration", None): - # Merge the configuration properties with the existing parameters + # Merge with existing configuration from -C/--config (notebooks only) _config_str: str = str(args.configuration) current_config = json.loads(_config_str) current_config["parameters"] = params @@ -420,9 +454,13 @@ def validate_timeout_polling_interval(args: Namespace) -> None: """Validate that polling interval is not greater than or equal to timeout.""" timeout = getattr(args, "timeout", None) polling_interval = getattr(args, "polling_interval", None) - - if timeout is not None and polling_interval is not None and polling_interval >= timeout: + + if ( + timeout is not None + and polling_interval is not None + and polling_interval >= timeout + ): raise FabricCLIError( f"Custom polling interval ({polling_interval}s) cannot be greater than or equal to timeout ({timeout}s)", fab_constant.ERROR_INVALID_INPUT, - ) \ No newline at end of file + ) diff --git a/src/fabric_cli/utils/fab_cmd_semantic_model_utils.py b/src/fabric_cli/utils/fab_cmd_semantic_model_utils.py new file mode 100644 index 00000000..4a77990b --- /dev/null +++ b/src/fabric_cli/utils/fab_cmd_semantic_model_utils.py @@ -0,0 +1,82 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +from argparse import Namespace +from typing import Callable + +from fabric_cli.client.fab_api_types import ApiResponse +from fabric_cli.core import fab_constant +from fabric_cli.core.fab_exceptions import FabricCLIError +from fabric_cli.errors import ErrorMessages + + +def extract_semantic_model_refresh_id(response: ApiResponse) -> str: + """ + Extract refresh ID from the POST semantic model refreshes response. + + Tries to get the refreshId from the Location header first. If not present, + falls back to the RequestId header. + + Args: + response: ApiResponse from refresh trigger request (202 status code) + + Returns: + Refresh ID string suitable for display and polling + + Raises: + FabricCLIError: If neither Location header nor RequestId are present + """ + refresh_location_url = response.headers.get("Location", "") + + if refresh_location_url: + return refresh_location_url.split("/")[-1] + + request_id = response.headers.get("RequestId", "") + if not request_id: + raise FabricCLIError( + ErrorMessages.Job.refresh_id_not_found(), + fab_constant.ERROR_API_FAILED, + ) + + return request_id + + +def create_refresh_status_function( + job_response: ApiResponse, args: Namespace +) -> Callable[[Namespace], ApiResponse]: + """ + Create a status polling function for semantic model refresh. + + This function determines the appropriate API method to use based on the + response headers from the POST semantic model refresh trigger operation. + + Args: + job_response: Initial API response from semantic model refresh trigger + args: Namespace containing ws_id and item_id + + Returns: + A callable function that takes Namespace args and returns ApiResponse + for polling refresh execution status + """ + from fabric_cli.client import fab_api_semantic_model as sm_api + + refresh_location_url = job_response.headers.get("Location", "") + + if refresh_location_url: + # Use the Location URL directly for polling + return lambda a: sm_api.get_refresh_execution_details_by_url( + a, refresh_url=refresh_location_url + ) + + # Fallback: use extract_semantic_model_refresh_id to get the refresh ID + # and call get_refresh_execution_details (IDs extracted from args) + refresh_id = extract_semantic_model_refresh_id(job_response) + + def polling_func(a: Namespace) -> ApiResponse: + # Ensure args has the required attributes for get_refresh_execution_details + a.ws_id = args.ws_id + a.item_id = args.item_id + a.refresh_id = refresh_id + return sm_api.get_refresh_execution_details(a) + + return polling_func