Skip to content

Commit f4965f9

Browse files
committed
fix(api): fix ReflectionPropertyAccessor should convert value to expected type (#59)
Additional changes: - AppendFilter throws a FilterException if expression value for target field return null Resolves: GH-59
1 parent 5cea1e2 commit f4965f9

File tree

5 files changed

+111
-3
lines changed

5 files changed

+111
-3
lines changed

connect-file-pulse-api/src/main/java/io/streamthoughts/kafka/connect/filepulse/expression/accessor/ReflectivePropertyAccessor.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,14 @@
1919
package io.streamthoughts.kafka.connect.filepulse.expression.accessor;
2020

2121
import io.streamthoughts.kafka.connect.filepulse.expression.EvaluationContext;
22+
import io.streamthoughts.kafka.connect.filepulse.expression.converter.Converters;
2223

2324
import java.lang.reflect.Method;
2425
import java.util.Objects;
2526

2627
public class ReflectivePropertyAccessor implements PropertyAccessor {
2728

28-
private static final String GETTER_PREFIX = "of";
29+
private static final String GETTER_PREFIX = "get";
2930
private static final String SETTER_PREFIX = "set";
3031

3132
/**
@@ -83,7 +84,9 @@ public void write(final EvaluationContext context,
8384
Method method = findSetterMethodForProperty(type, name);
8485
if (method != null ) {
8586
method.setAccessible(true);
86-
method.invoke(target, newValue);
87+
Class<?> expectedSetterType = method.getParameterTypes()[0];
88+
Object converted = Converters.converts(context.getPropertyConverter(), newValue, expectedSetterType);
89+
method.invoke(target, converted);
8790
return;
8891
}
8992
} catch (Exception e) {

connect-file-pulse-api/src/main/java/io/streamthoughts/kafka/connect/filepulse/expression/parser/regex/RegexExpressionParser.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ public Expression parseExpression(final String expression,
101101
}
102102
}
103103
} else {
104-
compiledExpression = matchers.matches(expression, defaultRootObject, substitution);
104+
compiledExpression = matchers.matches(expression, defaultRootObject, false);
105105
}
106106

107107
if (compiledExpression == null) {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
/*
2+
* Copyright 2019-2020 StreamThoughts.
3+
*
4+
* Licensed to the Apache Software Foundation (ASF) under one or more
5+
* contributor license agreements. See the NOTICE file distributed with
6+
* this work for additional information regarding copyright ownership.
7+
* The ASF licenses this file to You under the Apache License, Version 2.0
8+
* (the "License"); you may not use this file except in compliance with
9+
* the License. You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
20+
package io.streamthoughts.kafka.connect.filepulse.expression.accessor;
21+
22+
import io.streamthoughts.kafka.connect.filepulse.data.TypedValue;
23+
import io.streamthoughts.kafka.connect.filepulse.expression.StandardEvaluationContext;
24+
import org.junit.Assert;
25+
import org.junit.Test;
26+
27+
public class ReflectivePropertyAccessorTest {
28+
29+
private StandardEvaluationContext context = new StandardEvaluationContext(new Object());
30+
31+
32+
@Test(expected = AccessException.class)
33+
public void should_thrown_when_writing_invalid_property_using_getter_method_given_pojo() {
34+
ReflectivePropertyAccessor accessor = new ReflectivePropertyAccessor();
35+
accessor.write(context, new DummyObject("foo"), "unknown", "");
36+
}
37+
38+
@Test(expected = AccessException.class)
39+
public void should_thrown_when_reading_invalid_property_using_getter_method_given_pojo() {
40+
ReflectivePropertyAccessor accessor = new ReflectivePropertyAccessor();
41+
accessor.read(context, new DummyObject("foo"), "unknown");
42+
}
43+
44+
@Test
45+
public void should_write_property_using_setter_method_given_pojo_and_expected_parameter() {
46+
ReflectivePropertyAccessor accessor = new ReflectivePropertyAccessor();
47+
DummyObject object = new DummyObject(null);
48+
accessor.write(context, object, "field", "foo");
49+
Assert.assertEquals("foo", object.field);
50+
}
51+
52+
@Test
53+
public void should_write_null_property_using_setter_method_given_pojo_and_expected_parameter() {
54+
ReflectivePropertyAccessor accessor = new ReflectivePropertyAccessor();
55+
DummyObject object = new DummyObject("foo");
56+
accessor.write(context, object, "field", null);
57+
Assert.assertEquals(null, object.field);
58+
}
59+
60+
@Test
61+
public void should_write_property_using_setter_method_given_pojo() {
62+
ReflectivePropertyAccessor accessor = new ReflectivePropertyAccessor();
63+
DummyObject object = new DummyObject(null);
64+
accessor.write(context, object, "field", TypedValue.string("foo"));
65+
Assert.assertEquals("foo", object.field);
66+
}
67+
68+
@Test
69+
public void should_read_property_using_getter_method_given_pojo() {
70+
ReflectivePropertyAccessor accessor = new ReflectivePropertyAccessor();
71+
Object object = accessor.read(context, new DummyObject("foo"), "field");
72+
Assert.assertEquals("foo", object);
73+
}
74+
75+
public static class DummyObject {
76+
77+
private String field;
78+
79+
DummyObject(String myField) {
80+
this.field = myField;
81+
}
82+
83+
public String getField() {
84+
return field;
85+
}
86+
87+
public void setField(String field) {
88+
this.field = field;
89+
}
90+
}
91+
92+
}

connect-file-pulse-filters/src/main/java/io/streamthoughts/kafka/connect/filepulse/filter/AppendFilter.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,10 @@ protected RecordsIterable<TypedStruct> apply(final FilterContext context,
113113
private Expression evaluateWriteExpression(final StandardEvaluationContext evaluationContext) {
114114
if (mustEvaluateWriteExpression) {
115115
final String evaluated = fieldExpression.readValue(evaluationContext, String.class);
116+
if (evaluated == null) {
117+
throw new FilterException("Invalid value for property 'field'. Evaluation of expression '"
118+
+ fieldExpression.originalExpression() + " 'returns 'null'.");
119+
}
116120
return parser.parseExpression(evaluated, DEFAULT_ROOT_OBJECT, false);
117121
}
118122
return fieldExpression;

connect-file-pulse-filters/src/test/java/io/streamthoughts/kafka/connect/filepulse/filter/AppendFilterTest.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,4 +87,13 @@ public void shouldSupportSubstitutionExpressionForFieldConfig() {
8787
TypedStruct result = output.collect().get(0);
8888
Assert.assertEquals("foo-bar", result.getString("foo"));
8989
}
90+
91+
@Test
92+
public void shouldSupportPropertyExpressionWithScopeForFieldConfig() {
93+
configs.put(AppendFilterConfig.APPEND_FIELD_CONFIG, "$topic");
94+
configs.put(AppendFilterConfig.APPEND_VALUE_CONFIG, "my-topic-{{ extract_array(values,0) }}");
95+
filter.configure(configs);
96+
filter.apply(context, STRUCT);
97+
Assert.assertEquals("my-topic-foo", context.topic());
98+
}
9099
}

0 commit comments

Comments
 (0)