Skip to content

Commit bfca7cd

Browse files
feat: Add generic TransformStep with pluggable operation system (#536)
Implements a flexible data transformation primitive supporting multiple operation types: - Comparison operations (compare_results, verify_consensus) for multi-model validation - Aggregation operations (sum, avg, count, min, max, group_by, concat) - Filtering operations (equals, contains, greater_than, less_than, regex) - Mapping operations (extract, project, compute, transform) Key Features: - Pluggable operation registry for extensibility - 48 comprehensive tests (all passing) - Clean architecture with separation of concerns - Example workflow for map-reduce model comparison - Compact, consolidated documentation (58% reduction in README) Architecture: - BaseOperation abstract class with validation - OperationRegistry for dynamic operation registration - TransformStep orchestrator (137 lines) - 5 operation modules with 8+ total operations Files: - Added: transform_step.py, operations/*.py (6 files) - Added: test_transform_operations.py (30 tests) - Modified: test_transform_step.py (18 tests) - Updated: workflows/README.md (consolidated) - Added: model_comparison_mapreduce.yaml example 🤖 Generated with Claude Code Co-authored-by: Claude <[email protected]>
1 parent a2e70dd commit bfca7cd

File tree

13 files changed

+2103
-766
lines changed

13 files changed

+2103
-766
lines changed

plugins/automation/tests/test_transform_operations.py

Lines changed: 401 additions & 0 deletions
Large diffs are not rendered by default.
Lines changed: 367 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,367 @@
1+
"""Tests for TransformStep implementation."""
2+
3+
import pytest
4+
from plugins.automation.workflows.steps.transform_step import TransformStep
5+
from plugins.automation.workflows.definition import StepDefinition
6+
from plugins.automation.workflows import WorkflowContext
7+
8+
9+
class TestTransformStep:
10+
"""Test suite for TransformStep."""
11+
12+
def test_init(self):
13+
"""Test TransformStep initialization."""
14+
step_def = StepDefinition(
15+
id="test_transform",
16+
type="transform",
17+
config={"operation": "compare_results"},
18+
inputs={"model_1_result": "test1", "model_2_result": "test2"},
19+
)
20+
step = TransformStep(step_def)
21+
22+
assert step.step_id == "test_transform"
23+
assert step.definition.type == "transform"
24+
assert step.definition.config["operation"] == "compare_results"
25+
26+
def test_validate_compare_results_success(self):
27+
"""Test validation passes with sufficient model results."""
28+
step_def = StepDefinition(
29+
id="test_transform",
30+
type="transform",
31+
config={"operation": "compare_results"},
32+
inputs={"model_1_result": "test1", "model_2_result": "test2"},
33+
)
34+
# Should not raise during initialization
35+
step = TransformStep(step_def)
36+
assert step.step_id == "test_transform"
37+
38+
def test_validate_compare_results_insufficient_inputs(self):
39+
"""Test validation fails with insufficient model results."""
40+
step_def = StepDefinition(
41+
id="test_transform",
42+
type="transform",
43+
config={"operation": "compare_results"},
44+
inputs={"model_1_result": "test1"}, # Only one model
45+
)
46+
47+
# Should raise during initialization
48+
with pytest.raises(ValueError, match="at least 2"):
49+
step = TransformStep(step_def)
50+
51+
def test_validate_verify_consensus_missing_comparison(self):
52+
"""Test validation fails when comparison input is missing."""
53+
step_def = StepDefinition(
54+
id="test_transform",
55+
type="transform",
56+
config={"operation": "verify_consensus"},
57+
inputs={"threshold": 0.75}, # Missing comparison
58+
)
59+
60+
# Should raise during initialization
61+
with pytest.raises(ValueError, match="requires 'comparison' input"):
62+
step = TransformStep(step_def)
63+
64+
@pytest.mark.asyncio
65+
async def test_aggregate_operation(self):
66+
"""Test aggregate operation through TransformStep."""
67+
step_def = StepDefinition(
68+
id="aggregate",
69+
type="transform",
70+
config={"operation": "aggregate", "function": "sum"},
71+
inputs={"items": [1, 2, 3, 4, 5]},
72+
)
73+
step = TransformStep(step_def)
74+
context = WorkflowContext(inputs={})
75+
76+
step_result = await step.execute(context)
77+
result = step_result.result
78+
79+
assert result["function"] == "sum"
80+
assert result["result"] == 15
81+
82+
@pytest.mark.asyncio
83+
async def test_filter_operation(self):
84+
"""Test filter operation through TransformStep."""
85+
step_def = StepDefinition(
86+
id="filter",
87+
type="transform",
88+
config={
89+
"operation": "filter",
90+
"condition": "greater_than",
91+
"value": "5",
92+
},
93+
inputs={"items": [3, 7, 4, 9, 2]},
94+
)
95+
step = TransformStep(step_def)
96+
context = WorkflowContext(inputs={})
97+
98+
step_result = await step.execute(context)
99+
result = step_result.result
100+
101+
assert result["kept_count"] == 2
102+
assert 7 in result["filtered_items"]
103+
assert 9 in result["filtered_items"]
104+
105+
@pytest.mark.asyncio
106+
async def test_map_operation(self):
107+
"""Test map operation through TransformStep."""
108+
step_def = StepDefinition(
109+
id="map",
110+
type="transform",
111+
config={"operation": "map", "function": "extract", "fields": "name"},
112+
inputs={
113+
"items": [
114+
{"name": "Alice", "age": 30},
115+
{"name": "Bob", "age": 25},
116+
]
117+
},
118+
)
119+
step = TransformStep(step_def)
120+
context = WorkflowContext(inputs={})
121+
122+
step_result = await step.execute(context)
123+
result = step_result.result
124+
125+
assert result["mapped_items"] == ["Alice", "Bob"]
126+
127+
@pytest.mark.asyncio
128+
async def test_compare_results_two_models(self):
129+
"""Test comparing results from two models."""
130+
step_def = StepDefinition(
131+
id="compare",
132+
type="transform",
133+
config={"operation": "compare_results"},
134+
inputs={
135+
"model_1_result": "Async/await is a way to write asynchronous code in Python",
136+
"model_2_result": "Async and await are keywords for asynchronous programming in Python",
137+
"threshold": 0.75,
138+
},
139+
)
140+
step = TransformStep(step_def)
141+
context = WorkflowContext(inputs={})
142+
143+
step_result = await step.execute(context)
144+
result = step_result.result
145+
146+
assert "similarity_scores" in result
147+
assert "model_1_vs_model_2" in result["similarity_scores"]
148+
assert result["model_count"] == 2
149+
assert "average_similarity" in result
150+
assert "all_similar" in result
151+
assert "consensus" in result
152+
153+
@pytest.mark.asyncio
154+
async def test_compare_results_three_models(self):
155+
"""Test comparing results from three models."""
156+
step_def = StepDefinition(
157+
id="compare",
158+
type="transform",
159+
config={"operation": "compare_results"},
160+
inputs={
161+
"model_1_result": "Redis is an in-memory data store",
162+
"model_2_result": "Redis is a fast in-memory database",
163+
"model_3_result": "Redis stores data in memory for speed",
164+
"threshold": 0.70,
165+
},
166+
)
167+
step = TransformStep(step_def)
168+
context = WorkflowContext(inputs={})
169+
170+
step_result = await step.execute(context)
171+
result = step_result.result
172+
173+
assert result["model_count"] == 3
174+
assert "model_1_vs_model_2" in result["similarity_scores"]
175+
assert "model_1_vs_model_3" in result["similarity_scores"]
176+
assert "model_2_vs_model_3" in result["similarity_scores"]
177+
assert len(result["similarity_scores"]) == 3 # 3 pairs for 3 models
178+
179+
@pytest.mark.asyncio
180+
async def test_compare_results_high_similarity(self):
181+
"""Test comparison with high similarity."""
182+
step_def = StepDefinition(
183+
id="compare",
184+
type="transform",
185+
config={"operation": "compare_results"},
186+
inputs={
187+
"model_1_result": "The capital of France is Paris",
188+
"model_2_result": "Paris is the capital of France",
189+
"threshold": 0.75,
190+
},
191+
)
192+
step = TransformStep(step_def)
193+
context = WorkflowContext(inputs={})
194+
195+
step_result = await step.execute(context)
196+
result = step_result.result
197+
198+
assert result["average_similarity"] > 0.75
199+
assert result["all_similar"] is True
200+
assert "Strong consensus" in result["consensus"] or "Moderate consensus" in result["consensus"]
201+
202+
@pytest.mark.asyncio
203+
async def test_compare_results_low_similarity(self):
204+
"""Test comparison with low similarity."""
205+
step_def = StepDefinition(
206+
id="compare",
207+
type="transform",
208+
config={"operation": "compare_results"},
209+
inputs={
210+
"model_1_result": "Machine learning is a subset of artificial intelligence focused on learning from data",
211+
"model_2_result": "Quantum computing uses quantum mechanics for computation",
212+
"threshold": 0.75,
213+
},
214+
)
215+
step = TransformStep(step_def)
216+
context = WorkflowContext(inputs={})
217+
218+
step_result = await step.execute(context)
219+
result = step_result.result
220+
221+
assert result["average_similarity"] < 0.75
222+
assert result["all_similar"] is False
223+
assert "No consensus" in result["consensus"] or "Weak consensus" in result["consensus"]
224+
225+
@pytest.mark.asyncio
226+
async def test_compare_results_includes_differences(self):
227+
"""Test that comparison includes difference analysis."""
228+
step_def = StepDefinition(
229+
id="compare",
230+
type="transform",
231+
config={"operation": "compare_results"},
232+
inputs={
233+
"model_1_result": "Docker containers are lightweight virtualization",
234+
"model_2_result": "Kubernetes orchestrates containerized applications",
235+
"threshold": 0.70,
236+
},
237+
)
238+
step = TransformStep(step_def)
239+
context = WorkflowContext(inputs={})
240+
241+
step_result = await step.execute(context)
242+
result = step_result.result
243+
244+
assert "differences" in result
245+
assert "unique_words_per_model" in result["differences"]
246+
assert "text_lengths" in result["differences"]
247+
assert "length_variance" in result["differences"]
248+
249+
@pytest.mark.asyncio
250+
async def test_verify_consensus_passes(self):
251+
"""Test consensus verification when threshold is met."""
252+
comparison_data = {
253+
"all_similar": True,
254+
"average_similarity": 0.85,
255+
"consensus": "Strong consensus: All models produced highly similar results",
256+
}
257+
258+
step_def = StepDefinition(
259+
id="verify",
260+
type="transform",
261+
config={"operation": "verify_consensus"},
262+
inputs={"comparison": comparison_data, "threshold": 0.75},
263+
)
264+
step = TransformStep(step_def)
265+
context = WorkflowContext(inputs={})
266+
267+
step_result = await step.execute(context)
268+
result = step_result.result
269+
270+
assert result["verified"] is True
271+
assert result["passed_threshold"] is True
272+
assert "reliable" in result["recommendation"].lower()
273+
274+
@pytest.mark.asyncio
275+
async def test_verify_consensus_fails(self):
276+
"""Test consensus verification when threshold is not met."""
277+
comparison_data = {
278+
"all_similar": False,
279+
"average_similarity": 0.55,
280+
"consensus": "Weak consensus: Models show some agreement but notable differences",
281+
}
282+
283+
step_def = StepDefinition(
284+
id="verify",
285+
type="transform",
286+
config={"operation": "verify_consensus"},
287+
inputs={"comparison": comparison_data, "threshold": 0.75},
288+
)
289+
step = TransformStep(step_def)
290+
context = WorkflowContext(inputs={})
291+
292+
step_result = await step.execute(context)
293+
result = step_result.result
294+
295+
assert result["verified"] is False
296+
assert result["passed_threshold"] is False
297+
assert "disagree" in result["recommendation"].lower() or "review" in result["recommendation"].lower()
298+
299+
@pytest.mark.asyncio
300+
async def test_verify_consensus_no_data(self):
301+
"""Test consensus verification with no comparison data."""
302+
step_def = StepDefinition(
303+
id="verify",
304+
type="transform",
305+
config={"operation": "verify_consensus"},
306+
inputs={"comparison": None, "threshold": 0.75},
307+
)
308+
step = TransformStep(step_def)
309+
context = WorkflowContext(inputs={})
310+
311+
step_result = await step.execute(context)
312+
result = step_result.result
313+
314+
assert result["verified"] is False
315+
assert "No comparison data available" in result["reason"]
316+
317+
@pytest.mark.asyncio
318+
async def test_custom_transform(self):
319+
"""Test custom transform operation (pass-through for unknown operations)."""
320+
step_def = StepDefinition(
321+
id="custom",
322+
type="transform",
323+
config={"operation": "transform"},
324+
inputs={"data": "test_data", "value": 123},
325+
)
326+
step = TransformStep(step_def)
327+
context = WorkflowContext(inputs={})
328+
329+
step_result = await step.execute(context)
330+
result = step_result.result
331+
332+
assert result["transformed"] is True
333+
assert result["inputs"]["data"] == "test_data"
334+
assert result["inputs"]["value"] == 123
335+
336+
def test_unknown_operation_raises_error(self):
337+
"""Test that unknown operations raise an error."""
338+
step_def = StepDefinition(
339+
id="unknown",
340+
type="transform",
341+
config={"operation": "unknown_operation"},
342+
inputs={},
343+
)
344+
345+
with pytest.raises(ValueError, match="Unknown operation"):
346+
step = TransformStep(step_def)
347+
348+
@pytest.mark.asyncio
349+
async def test_chained_operations(self):
350+
"""Test using multiple transform steps in sequence."""
351+
# First step: aggregate
352+
aggregate_def = StepDefinition(
353+
id="aggregate",
354+
type="transform",
355+
config={"operation": "aggregate", "function": "count"},
356+
inputs={"items": ["a", "b", "c", "d", "e"]},
357+
)
358+
aggregate_step = TransformStep(aggregate_def)
359+
context = WorkflowContext(inputs={})
360+
361+
aggregate_result = await aggregate_step.execute(context)
362+
context.set_step_result("aggregate", aggregate_result)
363+
364+
# Second step: use aggregated result
365+
# In a real workflow, this would use template resolution
366+
# For now, just verify the result is available
367+
assert aggregate_result.result["result"] == 5

0 commit comments

Comments
 (0)