diff --git a/.changes/unreleased/fixed-20260204-020639.yaml b/.changes/unreleased/fixed-20260204-020639.yaml new file mode 100644 index 00000000..34a467fc --- /dev/null +++ b/.changes/unreleased/fixed-20260204-020639.yaml @@ -0,0 +1,8 @@ +kind: fixed +body: Handle ItemDisplayNameAlreadyInUse error during retries under API throttling race condition +time: 2026-02-04T02:06:39Z +custom: + Author: slavatrofimov + AuthorLink: https://github.com/slavatrofimov + Issue: "791" + IssueLink: https://github.com/microsoft/fabric-cicd/issues/791 diff --git a/src/fabric_cicd/_common/_fabric_endpoint.py b/src/fabric_cicd/_common/_fabric_endpoint.py index f27aa4c2..7e498f13 100644 --- a/src/fabric_cicd/_common/_fabric_endpoint.py +++ b/src/fabric_cicd/_common/_fabric_endpoint.py @@ -293,7 +293,7 @@ def _handle_response( msg = f"The executing identity is not authorized to call {method} on '{url}'." raise Exception(msg) - # Handle item name conflicts + # Handle item name conflicts (temporarily reserved) elif ( response.status_code == 400 and response.headers.get("x-ms-public-api-error-code") == "ItemDisplayNameNotAvailableYet" @@ -307,6 +307,16 @@ def _handle_response( prepend_message="Item name is reserved.", ) + # Handle item name already exists (permanent conflict - item exists in workspace) + elif ( + response.status_code == 400 + and response.headers.get("x-ms-public-api-error-code") == "ItemDisplayNameAlreadyInUse" + ): + response_json = response.json() if response.text else {} + item_name = response_json.get("message", "").replace("Requested '", "").split("'")[0] if response_json else "" + msg = f"Item '{item_name}' already exists in the workspace but was not found during initial scan. " + raise Exception(msg) + # Handle scenario where library removed from environment before being removed from repo elif response.status_code == 400 and "is not present in the environment." in response.json().get( "message", "No message provided" diff --git a/src/fabric_cicd/fabric_workspace.py b/src/fabric_cicd/fabric_workspace.py index 600d5031..e30e712c 100644 --- a/src/fabric_cicd/fabric_workspace.py +++ b/src/fabric_cicd/fabric_workspace.py @@ -656,14 +656,58 @@ def _publish_item( # Create a new item if it does not exist # https://learn.microsoft.com/en-us/rest/api/fabric/core/items/create-item - item_create_response = self.endpoint.invoke( - method="POST", url=f"{self.base_api_url}/items", body=combined_body - ) - api_response = item_create_response - item_guid = item_create_response["body"]["id"] - self.repository_items[item_type][item_name].guid = item_guid + try: + item_create_response = self.endpoint.invoke( + method="POST", url=f"{self.base_api_url}/items", body=combined_body + ) + api_response = item_create_response + item_guid = item_create_response["body"]["id"] + self.repository_items[item_type][item_name].guid = item_guid + except Exception as e: + # Handle race condition: item may have been created during a throttled retry + # or exists due to stale cache from API throttling delays during deployment. + # Check for both the error message and the specific API error code. + error_str = str(e).lower() + if "already in use" in error_str or "itemdisplaynamealreadyinuse" in error_str: + logger.warning( + f"Item '{item_name}' already exists (possible throttling race condition). " + "Attempting to recover by fetching current state." + ) + # Re-fetch the item's GUID from the workspace using existing lookup function + try: + item_guid = self._lookup_item_attribute(self.workspace_id, item_type, item_name, "id") + except InputError: + item_guid = None + if item_guid: + self.repository_items[item_type][item_name].guid = item_guid + is_deployed = True + # Update deployed_items cache to ensure folder move logic works correctly + if item_type not in self.deployed_items: + self.deployed_items[item_type] = {} + self.deployed_items[item_type][item_name] = Item( + type=item_type, + name=item_name, + description=item.description, + guid=item_guid, + folder_id="", # Unknown at this point, folder move logic will handle if needed + logical_id=item.logical_id, + ) + # Set api_response for response tracking to indicate recovery occurred + api_response = { + "recovered": True, + "body": {"id": item_guid, "displayName": item_name, "type": item_type}, + "status_code": 200, + "header": {}, + } + logger.info( + f"{constants.INDENT}Recovered item GUID: {item_guid}. Will update instead of create." + ) + else: + raise # Re-raise if we couldn't recover + else: + raise # Re-raise for other errors - elif is_deployed and not shell_only_publish: + if is_deployed and not shell_only_publish: # Update the item's definition if full publish is required # https://learn.microsoft.com/en-us/rest/api/fabric/core/items/update-item-definition update_response = self.endpoint.invoke( diff --git a/tests/test__fabric_endpoint.py b/tests/test__fabric_endpoint.py index 6e129320..2f969b0c 100644 --- a/tests/test__fabric_endpoint.py +++ b/tests/test__fabric_endpoint.py @@ -476,6 +476,31 @@ def test_handle_response_item_display_name_already_in_use(setup_mocks, monkeypat assert dl.messages == [expected] +def test_handle_response_item_display_name_already_in_use_permanent(setup_mocks): + """ + Test _handle_response raises an exception when item display name is already in use (permanent conflict). + + This error code indicates the item already exists in the workspace, not a temporary reservation. + """ + _, _mock_requests = setup_mocks + response = Mock( + status_code=400, + headers={"x-ms-public-api-error-code": "ItemDisplayNameAlreadyInUse", "Content-Type": "application/json"}, + text='{"message": "Requested \'Test Item\' is already in use."}', + ) + response.json.return_value = {"message": "Requested 'Test Item' is already in use."} + + with pytest.raises(Exception, match="already exists in the workspace"): + _handle_response( + response=response, + method="POST", + url="http://example.com/items", + body="{}", + long_running=False, + iteration_count=1, + ) + + def test_handle_response_environment_libraries_not_found(setup_mocks): """Test _handle_response exits loop when environment libraries are not found (404).""" _, _mock_requests = setup_mocks diff --git a/tests/test_item_creation_recovery.py b/tests/test_item_creation_recovery.py new file mode 100644 index 00000000..ca76cf1a --- /dev/null +++ b/tests/test_item_creation_recovery.py @@ -0,0 +1,310 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +"""Tests for item creation recovery logic when 'already in use' error occurs during API throttling.""" + +import json +import tempfile +from pathlib import Path +from unittest.mock import MagicMock, patch + +import pytest + +from fabric_cicd.fabric_workspace import FabricWorkspace + +# Error messages used in tests +ALREADY_IN_USE_ERROR_GENERIC = ( + "Unhandled error occurred calling POST on " + "'https://api.fabric.microsoft.com/v1/workspaces/test/items'. " + "Message: Requested 'TestNotebook' is already in use." +) +ALREADY_IN_USE_ERROR_WITH_CODE = "Item 'TestNotebook' already exists (ItemDisplayNameAlreadyInUse)" +ALREADY_IN_USE_ERROR_SIMPLE = "Message: Requested 'TestNotebook' is already in use." +UNRELATED_ERROR = "Some other unrelated error occurred" + + +@pytest.fixture +def mock_endpoint(): + """Mock FabricEndpoint for testing recovery scenarios.""" + mock = MagicMock() + mock.upn_auth = True + return mock + + +@pytest.fixture +def test_workspace_for_recovery(mock_endpoint): + """Create a test workspace for recovery testing.""" + with tempfile.TemporaryDirectory() as temp_dir: + temp_path = Path(temp_dir) + + # Create a notebook item + notebook_dir = temp_path / "TestNotebook.Notebook" + notebook_dir.mkdir(parents=True, exist_ok=True) + + platform_file = notebook_dir / ".platform" + platform_file.write_text( + json.dumps({ + "metadata": { + "kernel_info": {"name": "synapse_pyspark"}, + "language_info": {"name": "python"}, + } + }) + ) + + notebook_file = notebook_dir / "notebook-content.py" + notebook_file.write_text("# Test notebook content\nprint('Hello World')") + + with ( + patch("fabric_cicd.fabric_workspace.FabricEndpoint", return_value=mock_endpoint), + patch.object( + FabricWorkspace, "_refresh_deployed_items", new=lambda self: setattr(self, "deployed_items", {}) + ), + patch.object( + FabricWorkspace, "_refresh_deployed_folders", new=lambda self: setattr(self, "deployed_folders", {}) + ), + patch.object(FabricWorkspace, "_refresh_repository_items", new=lambda _: None), + patch.object(FabricWorkspace, "_refresh_repository_folders", new=lambda _: None), + ): + workspace = FabricWorkspace( + workspace_id="12345678-1234-5678-abcd-1234567890ab", + repository_directory=str(temp_path), + item_type_in_scope=["Notebook"], + ) + # Manually set up repository items + workspace.repository_items = { + "Notebook": { + "TestNotebook": MagicMock( + guid=None, + folder_id="", + logical_id="test-notebook-logical-id", + description="Test notebook description", + item_files=[ + MagicMock( + relative_path="notebook-content.py", + type="text", + file_path=notebook_file, + contents="# Test notebook content\nprint('Hello World')", + base64_payload={"path": "notebook-content.py", "payloadType": "InlineBase64"}, + ) + ], + skip_publish=False, + path=notebook_dir, + ) + } + } + workspace.deployed_items = {} + workspace.parameter_data = {} + workspace.parameter_file_path = None + yield workspace, mock_endpoint + + +def _create_recovery_mock_invoke(recovered_guid, error_message, url_check=None): + """Create a mock invoke function for recovery testing.""" + call_count = 0 + update_called = False + + def mock_invoke(method, url, body=None, **kwargs): # noqa: ARG001 + nonlocal call_count, update_called + call_count += 1 + + if call_count == 1: + raise Exception(error_message) + if call_count == 2: + return { + "body": { + "value": [{"id": recovered_guid, "displayName": "TestNotebook", "type": "Notebook"}], + "continuationToken": None, + } + } + if call_count == 3: + if url_check and url_check in url: + update_called = True + return {"body": {"id": recovered_guid, "message": "Updated"}, "status_code": 200, "header": {}} + return {"body": {}} + + return mock_invoke, lambda: call_count, lambda: update_called + + +def _create_recovery_not_found_mock_invoke(error_message): + """Create a mock invoke function where recovery lookup returns empty results.""" + call_count = 0 + + def mock_invoke(method, url, body=None, **kwargs): # noqa: ARG001 + nonlocal call_count + call_count += 1 + + if call_count == 1: + raise Exception(error_message) + if call_count == 2: + return {"body": {"value": [], "continuationToken": None}} + return {"body": {}} + + return mock_invoke + + +class TestItemCreationRecovery: + """Tests for the item creation recovery logic when 'already in use' error occurs.""" + + def test_recovery_on_already_in_use_error_from_generic_handler(self, test_workspace_for_recovery): + """ + Test recovery when item creation fails with 'already in use' error from generic error handler. + + This simulates the actual error format: "Message: Requested 'X' is already in use." + """ + workspace, mock_endpoint = test_workspace_for_recovery + recovered_guid = "recovered-item-guid-456" + + mock_invoke, get_call_count, _ = _create_recovery_mock_invoke(recovered_guid, ALREADY_IN_USE_ERROR_GENERIC) + mock_endpoint.invoke.side_effect = mock_invoke + + with ( + patch.object(workspace, "_replace_logical_ids", side_effect=lambda x: x), + patch.object(workspace, "_replace_parameters", side_effect=lambda file, _: file.contents), + patch.object(workspace, "_replace_workspace_ids", side_effect=lambda x: x), + ): + workspace._publish_item(item_name="TestNotebook", item_type="Notebook") + + # Verify the item GUID was recovered and stored + assert workspace.repository_items["Notebook"]["TestNotebook"].guid == recovered_guid + + # Verify deployed_items cache was updated + assert "Notebook" in workspace.deployed_items + assert "TestNotebook" in workspace.deployed_items["Notebook"] + assert workspace.deployed_items["Notebook"]["TestNotebook"].guid == recovered_guid + assert get_call_count() == 3 + + def test_recovery_on_itemdisplaynamealreadyinuse_error_code(self, test_workspace_for_recovery): + """ + Test recovery when error contains 'ItemDisplayNameAlreadyInUse' error code. + + This simulates the error format from the explicit handler in _fabric_endpoint.py. + """ + workspace, mock_endpoint = test_workspace_for_recovery + recovered_guid = "recovered-via-error-code" + + mock_invoke, _, _ = _create_recovery_mock_invoke(recovered_guid, ALREADY_IN_USE_ERROR_WITH_CODE) + mock_endpoint.invoke.side_effect = mock_invoke + + with ( + patch.object(workspace, "_replace_logical_ids", side_effect=lambda x: x), + patch.object(workspace, "_replace_parameters", side_effect=lambda file, _: file.contents), + patch.object(workspace, "_replace_workspace_ids", side_effect=lambda x: x), + ): + workspace._publish_item(item_name="TestNotebook", item_type="Notebook") + + assert workspace.repository_items["Notebook"]["TestNotebook"].guid == recovered_guid + + def test_recovery_preserves_logical_id(self, test_workspace_for_recovery): + """Test that recovery logic correctly preserves the logical_id from the repository item.""" + workspace, mock_endpoint = test_workspace_for_recovery + recovered_guid = "recovered-item-guid-789" + + mock_invoke, _, _ = _create_recovery_mock_invoke(recovered_guid, ALREADY_IN_USE_ERROR_SIMPLE) + mock_endpoint.invoke.side_effect = mock_invoke + + with ( + patch.object(workspace, "_replace_logical_ids", side_effect=lambda x: x), + patch.object(workspace, "_replace_parameters", side_effect=lambda file, _: file.contents), + patch.object(workspace, "_replace_workspace_ids", side_effect=lambda x: x), + ): + workspace._publish_item(item_name="TestNotebook", item_type="Notebook") + + # Verify the deployed_items entry has correct logical_id + deployed_item = workspace.deployed_items["Notebook"]["TestNotebook"] + assert deployed_item.logical_id == "test-notebook-logical-id" + + def test_recovery_sets_api_response_for_tracking(self, test_workspace_for_recovery): + """Test that recovery logic sets a synthetic api_response for response tracking.""" + workspace, mock_endpoint = test_workspace_for_recovery + recovered_guid = "recovered-item-guid-tracking" + + # Enable response collection + workspace.responses = {"Notebook": {}} + + mock_invoke, _, _ = _create_recovery_mock_invoke(recovered_guid, ALREADY_IN_USE_ERROR_SIMPLE) + mock_endpoint.invoke.side_effect = mock_invoke + + with ( + patch.object(workspace, "_replace_logical_ids", side_effect=lambda x: x), + patch.object(workspace, "_replace_parameters", side_effect=lambda file, _: file.contents), + patch.object(workspace, "_replace_workspace_ids", side_effect=lambda x: x), + ): + workspace._publish_item(item_name="TestNotebook", item_type="Notebook") + + # Response should be stored + assert "TestNotebook" in workspace.responses["Notebook"] + + def test_recovery_fails_when_item_not_found(self, test_workspace_for_recovery): + """Test that when recovery fails to find the item, the original error is re-raised.""" + workspace, mock_endpoint = test_workspace_for_recovery + + mock_invoke = _create_recovery_not_found_mock_invoke(ALREADY_IN_USE_ERROR_SIMPLE) + mock_endpoint.invoke.side_effect = mock_invoke + + with ( + patch.object(workspace, "_replace_logical_ids", side_effect=lambda x: x), + patch.object(workspace, "_replace_parameters", side_effect=lambda file, _: file.contents), + patch.object(workspace, "_replace_workspace_ids", side_effect=lambda x: x), + pytest.raises(Exception, match="already in use"), + ): + workspace._publish_item(item_name="TestNotebook", item_type="Notebook") + + def test_non_already_in_use_errors_are_reraised(self, test_workspace_for_recovery): + """Test that errors other than 'already in use' are re-raised without recovery attempt.""" + workspace, mock_endpoint = test_workspace_for_recovery + + mock_endpoint.invoke.side_effect = Exception(UNRELATED_ERROR) + + with ( + patch.object(workspace, "_replace_logical_ids", side_effect=lambda x: x), + patch.object(workspace, "_replace_parameters", side_effect=lambda file, _: file.contents), + patch.object(workspace, "_replace_workspace_ids", side_effect=lambda x: x), + pytest.raises(Exception, match="Some other unrelated error"), + ): + workspace._publish_item(item_name="TestNotebook", item_type="Notebook") + + # Verify invoke was only called once (no recovery attempt) + assert mock_endpoint.invoke.call_count == 1 + + def test_recovery_proceeds_with_update_after_guid_recovery(self, test_workspace_for_recovery): + """Test that after recovering the GUID, the publish proceeds with an UPDATE operation.""" + workspace, mock_endpoint = test_workspace_for_recovery + recovered_guid = "recovered-for-update" + + mock_invoke, get_call_count, get_update_called = _create_recovery_mock_invoke( + recovered_guid, ALREADY_IN_USE_ERROR_SIMPLE, url_check="updateDefinition" + ) + mock_endpoint.invoke.side_effect = mock_invoke + + with ( + patch.object(workspace, "_replace_logical_ids", side_effect=lambda x: x), + patch.object(workspace, "_replace_parameters", side_effect=lambda file, _: file.contents), + patch.object(workspace, "_replace_workspace_ids", side_effect=lambda x: x), + ): + workspace._publish_item(item_name="TestNotebook", item_type="Notebook") + + # Verify that 3 invoke calls were made and update was called + assert get_call_count() == 3 + assert get_update_called() + + def test_recovery_initializes_deployed_items_dict_if_missing(self, test_workspace_for_recovery): + """Test that recovery correctly initializes the item_type dict in deployed_items if missing.""" + workspace, mock_endpoint = test_workspace_for_recovery + recovered_guid = "recovered-init-dict" + + # Ensure deployed_items doesn't have Notebook key + workspace.deployed_items = {} + + mock_invoke, _, _ = _create_recovery_mock_invoke(recovered_guid, ALREADY_IN_USE_ERROR_SIMPLE) + mock_endpoint.invoke.side_effect = mock_invoke + + with ( + patch.object(workspace, "_replace_logical_ids", side_effect=lambda x: x), + patch.object(workspace, "_replace_parameters", side_effect=lambda file, _: file.contents), + patch.object(workspace, "_replace_workspace_ids", side_effect=lambda x: x), + ): + workspace._publish_item(item_name="TestNotebook", item_type="Notebook") + + # Verify the Notebook dict was created + assert "Notebook" in workspace.deployed_items + assert "TestNotebook" in workspace.deployed_items["Notebook"]