Skip to content

Commit e5a89c0

Browse files
committed
Update pipeline import to create a new version instead of patching
1 parent 0dce623 commit e5a89c0

File tree

3 files changed

+66
-58
lines changed

3 files changed

+66
-58
lines changed

deepset_cloud_sdk/_service/pipeline_service.py

Lines changed: 19 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -374,29 +374,32 @@ async def _create_index(self, name: str, pipeline_yaml: str) -> Response:
374374
)
375375

376376
async def _overwrite_pipeline(self, name: str, pipeline_yaml: str) -> Response:
377-
"""Overwrite a pipeline in deepset AI Platform.
377+
"""Overwrite a pipeline in deepset AI Platform by creating a new version.
378+
If creating a new version fails (e.g. pipeline doesn't exist), create the
379+
pipeline instead.
378380
379381
:param name: Name of the pipeline.
380382
:param pipeline_yaml: Generated pipeline YAML string.
381383
"""
382-
# First get the (last) version id if available
383-
version_response = await self._api.get(
384-
workspace_name=self._workspace_name, endpoint=f"pipelines/{name}/versions"
384+
# First try to create a new version of the existing pipeline
385+
version_response = await self._api.post(
386+
workspace_name=self._workspace_name,
387+
endpoint=f"pipelines/{name}/versions",
388+
json={"config_yaml": pipeline_yaml},
385389
)
386390

387-
# If pipeline doesn't exist (404), create it instead
388-
if version_response.status_code == HTTPStatus.NOT_FOUND:
389-
logger.debug(f"Pipeline {name} not found, creating new pipeline.")
390-
response = await self._create_pipeline(name=name, pipeline_yaml=pipeline_yaml)
391-
else:
392-
version_body = version_response.json()
393-
version_id = version_body["data"][0]["version_id"]
394-
response = await self._api.patch(
395-
workspace_name=self._workspace_name,
396-
endpoint=f"pipelines/{name}/versions/{version_id}",
397-
json={"config_yaml": pipeline_yaml},
398-
)
391+
if version_response.status_code == HTTPStatus.CREATED:
392+
logger.debug("Created new version for pipeline %s.", name)
393+
return version_response
394+
# If creating a version fails, assume the pipeline doesn't exist and create it
395+
logger.debug(
396+
"Failed to create new version for pipeline %s (status %s). "
397+
"Assuming pipeline does not exist and creating it instead.",
398+
name,
399+
version_response.status_code,
400+
)
399401

402+
response = await self._create_pipeline(name=name, pipeline_yaml=pipeline_yaml)
400403
return response
401404

402405
async def _create_pipeline(self, name: str, pipeline_yaml: str) -> Response:

tests/integration/workflows/test_integration_pipeline_client.py

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -488,10 +488,10 @@ async def test_import_pipeline_with_overwrite_fallback_to_create_async(
488488
return_value=Response(status_code=HTTPStatus.NO_CONTENT)
489489
)
490490

491-
# Mock 404 response for GET (resource not found)
492-
version_check_route = respx.get(
491+
# Mock failed version creation (non-201 for POST /pipelines/{name}/versions)
492+
version_create_route = respx.post(
493493
"https://test-api-url.com/workspaces/test-workspace/pipelines/test-pipeline-fallback/versions"
494-
).mock(return_value=Response(status_code=HTTPStatus.NOT_FOUND))
494+
).mock(return_value=Response(status_code=HTTPStatus.BAD_REQUEST))
495495

496496
# Mock successful creation
497497
create_route = respx.post("https://test-api-url.com/workspaces/test-workspace/pipelines").mock(
@@ -508,20 +508,24 @@ async def test_import_pipeline_with_overwrite_fallback_to_create_async(
508508

509509
await test_async_client.import_into_deepset(sample_pipeline, pipeline_config)
510510

511-
# Verify all three endpoints were called in sequence
511+
# Verify all three endpoints were called
512512
assert validation_route.called
513-
assert version_check_route.called
513+
assert version_create_route.called
514514
assert create_route.called
515515

516516
# Check validation request
517517
validation_request = validation_route.calls[0].request
518518
assert validation_request.headers["Authorization"] == "Bearer test-api-key"
519519
validation_body = json.loads(validation_request.content)
520520
assert "query_yaml" in validation_body
521-
522-
# Check GET attempt
523-
version_check_request = version_check_route.calls[0].request
524-
assert version_check_request.headers["Authorization"] == "Bearer test-api-key"
521+
# When overwrite=True, name should be excluded from validation payload (if your code does that)
522+
# assert "name" not in validation_body
523+
524+
# Check attempted version creation request
525+
version_create_request = version_create_route.calls[0].request
526+
assert version_create_request.headers["Authorization"] == "Bearer test-api-key"
527+
version_body = json.loads(version_create_request.content)
528+
assert "config_yaml" in version_body
525529

526530
# Check fallback creation
527531
create_request = create_route.calls[0].request

tests/unit/service/test_pipeline_service.py

Lines changed: 34 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -587,7 +587,7 @@ async def test_import_index_with_overwrite_fallback_to_create(
587587
async def test_import_pipeline_with_overwrite_true(
588588
self, pipeline_service: PipelineService, index_pipeline: Pipeline, mock_api: AsyncMock
589589
) -> None:
590-
"""Test importing a pipeline with overwrite=True uses PUT endpoint."""
590+
"""Test importing a pipeline with overwrite=True creates a new version via POST endpoint."""
591591
config = PipelineConfig(
592592
name="test_pipeline_overwrite",
593593
inputs=PipelineInputs(query=["retriever.query"]),
@@ -600,25 +600,21 @@ async def test_import_pipeline_with_overwrite_true(
600600
validation_response = Mock(spec=Response)
601601
validation_response.status_code = HTTPStatus.NO_CONTENT.value
602602

603-
# Mock successful versions response
604-
versions_response = Mock(status_code=HTTPStatus.OK.value)
605-
versions_response.json.return_value = {
606-
"data": [{"version_id": "42abcd"}],
607-
}
608-
609-
# Mock successful overwrite response
603+
# Mock successful "create new version" response
610604
overwrite_response = Mock(spec=Response)
611-
overwrite_response.status_code = HTTPStatus.OK.value
605+
overwrite_response.status_code = HTTPStatus.CREATED.value
612606

613-
mock_api.post.return_value = validation_response
614-
mock_api.get.return_value = versions_response
615-
mock_api.put.return_value = overwrite_response
607+
# First POST is validation, second POST is "create new version"
608+
mock_api.post.side_effect = [validation_response, overwrite_response]
616609

617610
await pipeline_service.import_async(index_pipeline, config)
618611

619-
# Should call validation endpoint first, then overwrite endpoint
620-
assert mock_api.post.call_count == 1
621-
assert mock_api.patch.call_count == 1
612+
# Should call validation endpoint first, then create-version endpoint
613+
assert mock_api.post.call_count == 2
614+
# No GET/PATCH/PUT calls in the overwrite path anymore
615+
assert mock_api.get.call_count == 0
616+
assert mock_api.patch.call_count == 0
617+
assert mock_api.put.call_count == 0
622618

623619
# Check validation call
624620
validation_call = mock_api.post.call_args_list[0]
@@ -627,16 +623,16 @@ async def test_import_pipeline_with_overwrite_true(
627623
# When overwrite=True, name should be excluded from validation payload
628624
assert "name" not in validation_call.kwargs["json"]
629625

630-
# Check overwrite call
631-
overwrite_call = mock_api.patch.call_args_list[0]
632-
assert overwrite_call.kwargs["endpoint"] == "pipelines/test_pipeline_overwrite/versions/42abcd"
626+
# Check create-version call
627+
overwrite_call = mock_api.post.call_args_list[1]
628+
assert overwrite_call.kwargs["endpoint"] == "pipelines/test_pipeline_overwrite/versions"
633629
assert "config_yaml" in overwrite_call.kwargs["json"]
634630

635631
@pytest.mark.asyncio
636632
async def test_import_pipeline_with_overwrite_fallback_to_create(
637633
self, pipeline_service: PipelineService, index_pipeline: Pipeline, mock_api: AsyncMock
638634
) -> None:
639-
"""Test importing a pipeline with overwrite=True that falls back to create when resource doesn't exist."""
635+
"""Test importing a pipeline with overwrite=True that falls back to create when version creation fails."""
640636

641637
config = PipelineConfig(
642638
name="test_pipeline_fallback",
@@ -650,22 +646,26 @@ async def test_import_pipeline_with_overwrite_fallback_to_create(
650646
validation_response = Mock(spec=Response)
651647
validation_response.status_code = HTTPStatus.NO_CONTENT.value
652648

653-
# Mock 404 response for GET (resource not found)
654-
not_found_response = Mock(spec=Response)
655-
not_found_response.status_code = HTTPStatus.NOT_FOUND.value
649+
# Mock non-201 response for POST /pipelines/{name}/versions (version creation fails)
650+
version_fail_response = Mock(spec=Response)
651+
version_fail_response.status_code = HTTPStatus.BAD_REQUEST.value
656652

657-
# Mock successful creation response
653+
# Mock successful creation response for POST /pipelines
658654
create_response = Mock(spec=Response)
659655
create_response.status_code = HTTPStatus.CREATED.value
660656

661-
mock_api.post.side_effect = [validation_response, create_response]
662-
mock_api.get.return_value = not_found_response
657+
# POST calls: validation, create-version (fails), create-pipeline (fallback)
658+
mock_api.post.side_effect = [validation_response, version_fail_response, create_response]
663659

664660
await pipeline_service.import_async(index_pipeline, config)
665661

666-
# Should call validation endpoint, then GET (which returns 404), then POST to create
667-
assert mock_api.post.call_count == 2
668-
assert mock_api.get.call_count == 1
662+
# Should call validation endpoint, then POST to create new version (fails),
663+
# then POST to create the pipeline
664+
assert mock_api.post.call_count == 3
665+
# No GET anymore; overwrite logic doesn't fetch versions
666+
assert mock_api.get.call_count == 0
667+
assert mock_api.patch.call_count == 0
668+
assert mock_api.put.call_count == 0
669669

670670
# Check validation call
671671
validation_call = mock_api.post.call_args_list[0]
@@ -674,12 +674,13 @@ async def test_import_pipeline_with_overwrite_fallback_to_create(
674674
# When overwrite=True, name should be excluded from validation payload
675675
assert "name" not in validation_call.kwargs["json"]
676676

677-
# Check GET versions attempt
678-
get_call = mock_api.get.call_args_list[0]
679-
assert get_call.kwargs["endpoint"] == "pipelines/test_pipeline_fallback/versions"
677+
# Check attempted version creation call
678+
version_call = mock_api.post.call_args_list[1]
679+
assert version_call.kwargs["endpoint"] == "pipelines/test_pipeline_fallback/versions"
680+
assert "config_yaml" in version_call.kwargs["json"]
680681

681-
# Check fallback POST call
682-
create_call = mock_api.post.call_args_list[1]
682+
# Check fallback create-pipeline call
683+
create_call = mock_api.post.call_args_list[2]
683684
assert create_call.kwargs["endpoint"] == "pipelines"
684685
assert create_call.kwargs["json"]["name"] == "test_pipeline_fallback"
685686
assert "query_yaml" in create_call.kwargs["json"]

0 commit comments

Comments
 (0)