Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 75 additions & 0 deletions src/fabric_cli/client/fab_api_semantic_model.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.

from argparse import Namespace

from fabric_cli.client import fab_api_client as fabric_api
from fabric_cli.client.fab_api_types import ApiResponse


def refresh_semantic_model(args: Namespace, payload: str) -> ApiResponse:
"""https://learn.microsoft.com/en-us/rest/api/power-bi/datasets/refresh-dataset-in-group"""
original_wait = getattr(args, "wait", False)

args.uri = f"groups/{args.ws_id}/datasets/{args.item_id}/refreshes"
args.method = "post"
args.audience = "powerbi"
args.wait = (
False # Disable automatic long-running operation polling for the HTTP request
)

response = fabric_api.do_request(args, data=payload)

args.wait = original_wait

return response


def get_refresh_execution_details(args: Namespace) -> ApiResponse:
"""
https://learn.microsoft.com/en-us/rest/api/power-bi/datasets/get-refresh-execution-details-in-group
"""
refresh_id = getattr(args, "instance_id", None) or getattr(args, "refresh_id", None)

if not refresh_id:
raise ValueError("args must contain either 'instance_id' or 'refresh_id'")

args.uri = f"groups/{args.ws_id}/datasets/{args.item_id}/refreshes/{refresh_id}"
args.method = "get"
args.audience = "powerbi"
args.wait = False

return fabric_api.do_request(args)


def get_refresh_execution_details_by_url(
args: Namespace, refresh_url: str
) -> ApiResponse:
"""https://learn.microsoft.com/en-us/rest/api/power-bi/datasets/get-refresh-execution-details-in-group"""
from urllib.parse import urlparse

parsed_url = urlparse(refresh_url)

path_parts = parsed_url.path.split("/v1.0/myorg/", 1)
if len(path_parts) == 2:
uri = path_parts[1]
else:
uri = parsed_url.path.lstrip("/")

hostname = parsed_url.netloc

args.uri = uri
args.method = "get"
args.audience = "powerbi"
args.wait = False

return fabric_api.do_request(args, hostname=hostname)


def cancel_refresh(args: Namespace, refresh_id: str) -> ApiResponse:
"""https://learn.microsoft.com/en-us/rest/api/power-bi/datasets/cancel-refresh-in-group"""
args.uri = f"groups/{args.ws_id}/datasets/{args.item_id}/refreshes/{refresh_id}"
args.method = "delete"
args.audience = "powerbi"

return fabric_api.do_request(args)
161 changes: 120 additions & 41 deletions src/fabric_cli/commands/jobs/fab_jobs_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,106 @@
from fabric_cli.client import fab_api_jobs as jobs_api
from fabric_cli.core import fab_constant as con
from fabric_cli.core import fab_state_config as config
from fabric_cli.core.fab_types import FabricJobType
from fabric_cli.core.hiearchy.fab_hiearchy import Item
from fabric_cli.utils import fab_cmd_job_utils as utils_job
from fabric_cli.utils import fab_ui


def exec_command(args: Namespace, item: Item) -> None:
if item.job_type == FabricJobType.SEMANTIC_MODEL_REFRESH:
_exec_semantic_model_refresh(args, item)
else:
_exec_fabric_job(args, item)


def _cancel_fabric_job(args: Namespace, job_id: str) -> None:
args.instance_id = job_id
response = jobs_api.cancel_item_job_instance(args)
if response.status_code == 202:
fab_ui.print_output_format(
args,
message=f"Job instance '{job_id}' cancelled (async)",
)


def _cancel_semantic_model_refresh(args: Namespace, refresh_id: str) -> None:
from fabric_cli.client import fab_api_semantic_model as semantic_model_api

response = semantic_model_api.cancel_refresh(args, refresh_id)
if response.status_code == 200:
fab_ui.print_output_format(
args,
message=f"Job instance '{refresh_id}' cancelled (async)",
)


def _wait_for_job_with_timeout(
args: Namespace,
job_id: str,
response,
is_semantic_model_refresh: bool,
cancel_func,
) -> None:
fab_ui.print_grey(f"∟ Job instance '{job_id}' created'")
timeout = getattr(args, "timeout", None)
if timeout is not None:
fab_ui.print_grey(f"∟ Timeout: {timeout} seconds")
else:
fab_ui.print_grey("∟ Timeout: no timeout specified")

try:
utils_job.wait_for_job_completion(
args,
job_id,
response,
timeout=timeout,
custom_polling_interval=getattr(args, "polling_interval", None),
is_semantic_model_refresh=is_semantic_model_refresh,
)
except TimeoutError as e:
fab_ui.print_warning(str(e))
if config.get_config(con.FAB_JOB_CANCEL_ONTIMEOUT) == "false":
fab_ui.print_grey(
f"Job still running. To change this behaviour and cancel on timeout, set {con.FAB_JOB_CANCEL_ONTIMEOUT} config property to 'true'"
)
else:
fab_ui.print_grey(
f"Cancelling job instance '{job_id}' (timeout). To change this behaviour and continue running on timeout, set {con.FAB_JOB_CANCEL_ONTIMEOUT} config property to 'false'"
)

if config.get_config(con.FAB_JOB_CANCEL_ONTIMEOUT) != "false":
cancel_func(args, job_id)


def _handle_job_response(
args: Namespace,
item: Item,
response,
job_id: str,
is_semantic_model_refresh: bool,
) -> None:
if args.wait:
cancel_func = (
_cancel_semantic_model_refresh
if is_semantic_model_refresh
else _cancel_fabric_job
)
_wait_for_job_with_timeout(
args=args,
job_id=job_id,
response=response,
is_semantic_model_refresh=is_semantic_model_refresh,
cancel_func=cancel_func,
)
else:
fab_ui.print_output_format(args, message=f"Job instance '{job_id}' created")
fab_ui.print_grey(
f"→ To see status run 'job run-status {item.path} --id {job_id}'"
)


def _exec_fabric_job(args: Namespace, item: Item) -> None:
if getattr(args, "configuration", None) is not None:
payload = json.dumps({"executionData": json.loads(args.configuration)})
else:
Expand All @@ -21,45 +115,30 @@ def exec_command(args: Namespace, item: Item) -> None:
(response, job_instance_id) = jobs_api.run_on_demand_item_job(args, payload)

if response.status_code == 202:
if args.wait:
fab_ui.print_grey(f"∟ Job instance '{job_instance_id}' created")
timeout = getattr(args, "timeout", None)
if timeout is not None:
fab_ui.print_grey(f"∟ Timeout: {timeout} seconds")
else:
fab_ui.print_grey("∟ Timeout: no timeout specified")

try:
utils_job.wait_for_job_completion(
args,
job_instance_id,
response,
timeout=timeout,
custom_polling_interval=getattr(args, "polling_interval", None),
)
except TimeoutError as e:
fab_ui.print_warning(str(e))
# Get the configuration to check if we should cancel the job
if config.get_config(con.FAB_JOB_CANCEL_ONTIMEOUT) == "false":
fab_ui.print_grey(
f"Job still running. To change this behaviour and cancel on timeout, set {con.FAB_JOB_CANCEL_ONTIMEOUT} config property to 'true'"
)
else:
fab_ui.print_grey(
f"Cancelling job instance '{job_instance_id}' (timeout). To change this behaviour and continue running on timeout, set {con.FAB_JOB_CANCEL_ONTIMEOUT} config property to 'false'"
)
args.instance_id = job_instance_id
response = jobs_api.cancel_item_job_instance(args)
if response.status_code == 202:
fab_ui.print_output_format(
args,
message=f"Job instance '{args.instance_id}' cancelled (async)",
)
_handle_job_response(
args=args,
item=item,
response=response,
job_id=job_instance_id,
is_semantic_model_refresh=False,
)

else:
fab_ui.print_output_format(
args, message=f"Job instance '{job_instance_id}' created"
)
fab_ui.print_grey(
f"→ To see status run 'job run-status {item.path} --id {job_instance_id}'"
)

def _exec_semantic_model_refresh(args: Namespace, item: Item) -> None:
from fabric_cli.client import fab_api_semantic_model as semantic_model_api
from fabric_cli.utils import fab_cmd_semantic_model_utils as sm_utils

payload = json.dumps({"retryCount": 0})

response = semantic_model_api.refresh_semantic_model(args, payload)

if response.status_code == 202:
refresh_id = sm_utils.extract_semantic_model_refresh_id(response)

_handle_job_response(
args=args,
item=item,
response=response,
job_id=refresh_id,
is_semantic_model_refresh=True,
)
74 changes: 74 additions & 0 deletions src/fabric_cli/commands/jobs/fab_jobs_run_status.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,19 @@
from argparse import Namespace

from fabric_cli.client import fab_api_jobs as jobs_api
from fabric_cli.core.fab_types import FabricJobType
from fabric_cli.core.hiearchy.fab_hiearchy import Item
from fabric_cli.utils import fab_ui


def exec_command(args: Namespace, context: Item) -> None:
if context.job_type == FabricJobType.SEMANTIC_MODEL_REFRESH:
_exec_semantic_model_status(args, context)
else:
_exec_fabric_job_status(args, context)


def _exec_fabric_job_status(args: Namespace, context: Item) -> None:
if args.schedule:
args.schedule_id = args.id
response = jobs_api.get_item_schedule(args)
Expand All @@ -20,3 +28,69 @@ def exec_command(args: Namespace, context: Item) -> None:
if response.status_code == 200:
content = json.loads(response.text)
fab_ui.print_output_format(args, data=content, show_headers=True)


def _exec_semantic_model_status(args: Namespace, context: Item) -> None:
from fabric_cli.client import fab_api_semantic_model as semantic_model_api

if args.schedule:
fab_ui.print_warning(
"Schedule status not supported for semantic models via this command. "
"Use Power BI portal or API to manage refresh schedules."
)
return

args.ws_id = context.workspace.id
args.item_id = context.id
args.instance_id = args.id

response = semantic_model_api.get_refresh_execution_details(args)

if response.status_code == 200:
content = json.loads(response.text)
transformed = _transform_to_job_instance_format(content, args.id, context.id)
fab_ui.print_output_format(args, data=transformed, show_headers=True)
# get execution details can retun 202, need to handle
# elif response.status_code == 202:
# # For 202 (Accepted) responses, log the entire response as-is
# content = json.loads(response.text)
# fab_ui.print_output_format(args, data=content, show_headers=True)


def _transform_to_job_instance_format(
content: dict, refresh_id: str, item_id: str
) -> dict:
status = content.get("extendedStatus") or content.get("status")

transformed = {
"id": refresh_id,
"itemId": item_id,
"jobType": "RefreshSemanticModel",
"invokeType": content.get("currentRefreshType", "Unknown"),
"status": status,
"startTimeUtc": content.get("startTime"),
"endTimeUtc": content.get("endTime"),
}

if status and status.lower() not in ["completed", "success"]:
failure_reason = _extract_failure_reason(content.get("messages", []))
if failure_reason:
transformed["failureReason"] = failure_reason

return transformed


def _extract_failure_reason(messages: list) -> dict:
if not messages:
return None

error_messages = [
{"code": msg.get("code"), "message": msg.get("message")}
for msg in messages
if msg.get("type") == "Error"
]

if not error_messages:
return None

return {"errors": error_messages}
11 changes: 11 additions & 0 deletions src/fabric_cli/core/fab_config/command_support.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,24 @@ commands:
- notebook
- data_pipeline
- lakehouse
- semantic_model
subcommands:
run:
run_cancel:
unsupported_items:
- semantic_model
run_list:
unsupported_items:
- semantic_model
run_update:
unsupported_items:
- semantic_model
run_sch:
unsupported_items:
- semantic_model
run-rm:
unsupported_items:
- semantic_model
run_status:
run_wait:

Expand Down
3 changes: 3 additions & 0 deletions src/fabric_cli/core/fab_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,7 @@ class FabricJobType(Enum):
RUN_NOTEBOOK = "RunNotebook"
PIPELINE = "Pipeline"
TABLE_MAINTENANCE = "TableMaintenance"
SEMANTIC_MODEL_REFRESH = "SemanticModelRefresh"


ITJobMap: dict[ItemType, FabricJobType] = {
Expand All @@ -337,6 +338,8 @@ class FabricJobType(Enum):
ItemType.DATA_PIPELINE: FabricJobType.PIPELINE,
# {"tableName": "orders", "optimizeSettings": {"vOrder": true, "zOrderBy": ["account_id"]}, "vacuumSettings": {"retentionPeriod": "7.01:00:00"}}
ItemType.LAKEHOUSE: FabricJobType.TABLE_MAINTENANCE,
# {"applyRefreshPolicy": true, "commitMode": "Transactional", "retryCount": 3}
ItemType.SEMANTIC_MODEL: FabricJobType.SEMANTIC_MODEL_REFRESH,
}

###################################
Expand Down
Loading
Loading