Skip to content

Commit 9debe41

Browse files
authored
Merge pull request #1585 from suhothayan/master
Fix Siddhi sending events to wrong stream when stopped and started.
2 parents 0ced86d + a51eae1 commit 9debe41

File tree

6 files changed

+158
-20
lines changed

6 files changed

+158
-20
lines changed

modules/siddhi-core/src/main/java/io/siddhi/core/query/processor/stream/LogStreamProcessor.java

+20-19
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
import io.siddhi.annotation.ParameterOverload;
2525
import io.siddhi.annotation.util.DataType;
2626
import io.siddhi.core.config.SiddhiQueryContext;
27-
import io.siddhi.core.event.ComplexEvent;
2827
import io.siddhi.core.event.ComplexEventChunk;
2928
import io.siddhi.core.event.stream.MetaStreamEvent;
3029
import io.siddhi.core.event.stream.StreamEvent;
@@ -221,52 +220,54 @@ protected void process(ComplexEventChunk<StreamEvent> streamEventChunk, Processo
221220
StreamEventCloner streamEventCloner, ComplexEventPopulater complexEventPopulater,
222221
State state) {
223222
while (streamEventChunk.hasNext()) {
224-
ComplexEvent complexEvent = streamEventChunk.next();
223+
StreamEvent streamEvent = streamEventChunk.next();
225224
switch (attributeExpressionLength) {
226225
case 0:
227-
log.info(logPrefix + complexEvent);
226+
log.info(logPrefix + streamEvent.toString(1));
228227
break;
229228
case 1:
230229
if (isLogEventExpressionExecutor != null) {
231-
if ((Boolean) isLogEventExpressionExecutor.execute(complexEvent)) {
232-
log.info(logPrefix + complexEvent);
230+
if ((Boolean) isLogEventExpressionExecutor.execute(streamEvent)) {
231+
log.info(logPrefix + streamEvent.toString(1));
233232
} else {
234233
log.info(logPrefix + "Event Arrived");
235234
}
236235
} else {
237-
log.info(logPrefix + logMessageExpressionExecutor.execute(complexEvent) + ", " + complexEvent);
236+
log.info(logPrefix + logMessageExpressionExecutor.execute(streamEvent) + ", " +
237+
streamEvent.toString(1));
238238
}
239239
break;
240240
case 2:
241241
if (isLogEventExpressionExecutor != null) {
242-
if ((Boolean) isLogEventExpressionExecutor.execute(complexEvent)) {
243-
log.info(logPrefix + logMessageExpressionExecutor.execute(complexEvent) + ", " +
244-
complexEvent);
242+
if ((Boolean) isLogEventExpressionExecutor.execute(streamEvent)) {
243+
log.info(logPrefix + logMessageExpressionExecutor.execute(streamEvent) + ", " +
244+
streamEvent.toString(1));
245245
} else {
246-
log.info(logPrefix + logMessageExpressionExecutor.execute(complexEvent));
246+
log.info(logPrefix + logMessageExpressionExecutor.execute(streamEvent));
247247
}
248248
} else {
249249
LogPriority tempLogPriority = logPriority;
250250
if (logPriorityExpressionExecutor != null) {
251-
tempLogPriority = LogPriority.valueOf((String) logPriorityExpressionExecutor.execute
252-
(complexEvent));
251+
tempLogPriority = LogPriority.valueOf((String) logPriorityExpressionExecutor.
252+
execute(streamEvent));
253253
}
254-
String message = logPrefix + logMessageExpressionExecutor.execute(complexEvent) + ", " +
255-
complexEvent;
254+
String message = logPrefix + logMessageExpressionExecutor.execute(streamEvent) + ", " +
255+
streamEvent.toString(1);
256256
logMessage(tempLogPriority, message);
257257
}
258258
break;
259259
default:
260260
String message;
261-
if ((Boolean) isLogEventExpressionExecutor.execute(complexEvent)) {
262-
message = logPrefix + logMessageExpressionExecutor.execute(complexEvent) + ", " + complexEvent;
261+
if ((Boolean) isLogEventExpressionExecutor.execute(streamEvent)) {
262+
message = logPrefix + logMessageExpressionExecutor.execute(streamEvent) + ", " +
263+
streamEvent.toString(1);
263264
} else {
264-
message = logPrefix + logMessageExpressionExecutor.execute(complexEvent);
265+
message = logPrefix + logMessageExpressionExecutor.execute(streamEvent);
265266
}
266267
LogPriority tempLogPriority = logPriority;
267268
if (logPriorityExpressionExecutor != null) {
268-
tempLogPriority = LogPriority.valueOf((String) logPriorityExpressionExecutor.execute
269-
(complexEvent));
269+
tempLogPriority = LogPriority.valueOf((String) logPriorityExpressionExecutor.
270+
execute(streamEvent));
270271
}
271272
logMessage(tempLogPriority, message);
272273
}

modules/siddhi-core/src/main/java/io/siddhi/core/stream/input/InputDistributor.java

+4
Original file line numberDiff line numberDiff line change
@@ -53,4 +53,8 @@ public void addInputProcessor(InputProcessor inputProcessor) {
5353
inputProcessors.add(inputProcessor);
5454

5555
}
56+
57+
public void clear() {
58+
inputProcessors.clear();
59+
}
5660
}

modules/siddhi-core/src/main/java/io/siddhi/core/stream/input/InputManager.java

+1
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ public synchronized void disconnect() {
7373
for (InputHandler inputHandler : inputHandlerMap.values()) {
7474
inputHandler.disconnect();
7575
}
76+
inputDistributor.clear();
7677
inputHandlerMap.clear();
7778
this.isConnected = false;
7879
}

modules/siddhi-core/src/test/java/io/siddhi/core/managment/LogTestCase.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,9 @@ public void receive(long timestamp, Event[] inEvents, Event[] removeEvents) {
8888
InputHandler inputHandler = siddhiAppRuntime.getInputHandler("StockStream");
8989
siddhiAppRuntime.start();
9090

91-
inputHandler.send(new Object[]{"IBM", 75.6f, 100});
91+
inputHandler.send(new Event[]{
92+
new Event(System.currentTimeMillis(), new Object[]{"IBM", 75.6f, 100}),
93+
new Event(System.currentTimeMillis(), new Object[]{"GOOG", 70.6f, 100})});
9294
Thread.sleep(100);
9395

9496
siddhiAppRuntime.shutdown();
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
/*
2+
* Copyright (c) 2019, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
3+
*
4+
* WSO2 Inc. licenses this file to you under the Apache License,
5+
* Version 2.0 (the "License"); you may not use this file except
6+
* in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing,
12+
* software distributed under the License is distributed on an
13+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
* KIND, either express or implied. See the License for the
15+
* specific language governing permissions and limitations
16+
* under the License.
17+
*/
18+
19+
package io.siddhi.core.managment;
20+
21+
import io.siddhi.core.SiddhiAppRuntime;
22+
import io.siddhi.core.SiddhiManager;
23+
import io.siddhi.core.event.Event;
24+
import io.siddhi.core.stream.input.InputHandler;
25+
import io.siddhi.core.stream.output.StreamCallback;
26+
import io.siddhi.core.util.EventPrinter;
27+
import org.apache.log4j.Logger;
28+
import org.testng.Assert;
29+
import org.testng.AssertJUnit;
30+
import org.testng.annotations.BeforeMethod;
31+
import org.testng.annotations.Test;
32+
33+
import java.util.concurrent.atomic.AtomicInteger;
34+
35+
public class StartStopTestCase {
36+
private static final Logger log = Logger.getLogger(StartStopTestCase.class);
37+
private AtomicInteger count;
38+
private boolean eventArrived;
39+
40+
@BeforeMethod
41+
public void init() {
42+
count = new AtomicInteger();
43+
eventArrived = false;
44+
}
45+
46+
@Test(expectedExceptions = InterruptedException.class)
47+
public void startStopTest1() throws InterruptedException {
48+
log.info("startStop test 1");
49+
50+
SiddhiManager siddhiManager = new SiddhiManager();
51+
52+
String siddhiApp = "" +
53+
"define stream cseEventStream (symbol string, price float, volume int);" +
54+
"define stream cseEventStream2 (symbol string, price float, volume int);" +
55+
"" +
56+
"@info(name = 'query1') " +
57+
"from cseEventStream " +
58+
"select 1 as eventFrom " +
59+
"insert into outputStream ;" +
60+
"" +
61+
"@info(name = 'query2') " +
62+
"from cseEventStream2 " +
63+
"select 2 as eventFrom " +
64+
"insert into outputStream ;";
65+
66+
SiddhiAppRuntime siddhiAppRuntime = siddhiManager.createSiddhiAppRuntime(siddhiApp);
67+
siddhiAppRuntime.addCallback("outputStream", new StreamCallback() {
68+
69+
@Override
70+
public void receive(Event[] events) {
71+
EventPrinter.print(events);
72+
}
73+
74+
});
75+
76+
InputHandler inputHandler = siddhiAppRuntime.getInputHandler("cseEventStream2");
77+
siddhiAppRuntime.start();
78+
siddhiAppRuntime.shutdown();
79+
siddhiAppRuntime.start();
80+
inputHandler.send(new Object[]{"WSO2", 55.6f, 100});
81+
siddhiAppRuntime.shutdown();
82+
}
83+
84+
@Test()
85+
public void startStopTest2() throws InterruptedException {
86+
log.info("startStop test 2");
87+
88+
SiddhiManager siddhiManager = new SiddhiManager();
89+
90+
String siddhiApp = "" +
91+
"define stream cseEventStream (symbol string, price float, volume int);" +
92+
"define stream cseEventStream2 (symbol string, price float, volume int);" +
93+
"" +
94+
"@info(name = 'query1') " +
95+
"from cseEventStream " +
96+
"select 1 as eventFrom " +
97+
"insert into outputStream ;" +
98+
"" +
99+
"@info(name = 'query2') " +
100+
"from cseEventStream2 " +
101+
"select 2 as eventFrom " +
102+
"insert into outputStream ;";
103+
104+
SiddhiAppRuntime siddhiAppRuntime = siddhiManager.createSiddhiAppRuntime(siddhiApp);
105+
siddhiAppRuntime.addCallback("outputStream", new StreamCallback() {
106+
107+
@Override
108+
public void receive(Event[] events) {
109+
EventPrinter.print(events);
110+
Assert.assertEquals(events[0].getData(0), 1);
111+
eventArrived = true;
112+
count.incrementAndGet();
113+
}
114+
115+
});
116+
117+
InputHandler inputHandler = siddhiAppRuntime.getInputHandler("cseEventStream2");
118+
inputHandler = siddhiAppRuntime.getInputHandler("cseEventStream");
119+
siddhiAppRuntime.start();
120+
siddhiAppRuntime.shutdown();
121+
siddhiAppRuntime.start();
122+
inputHandler = siddhiAppRuntime.getInputHandler("cseEventStream");
123+
inputHandler.send(new Object[]{"WSO2", 55.6f, 100});
124+
siddhiAppRuntime.shutdown();
125+
AssertJUnit.assertTrue(eventArrived);
126+
AssertJUnit.assertEquals(1, count.get());
127+
}
128+
129+
}

modules/siddhi-core/src/test/resources/testng.xml

+1
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333

3434
<class name="io.siddhi.core.managment.AsyncTestCase"/>
3535
<class name="io.siddhi.core.managment.EventTestCase"/>
36+
<class name="io.siddhi.core.managment.StartStopTestCase"/>
3637
<class name="io.siddhi.core.managment.QuerySyncTestCase"/>
3738
<class name="io.siddhi.core.managment.ValidateTestCase"/>
3839
<class name="io.siddhi.core.managment.PlaybackTestCase"/>

0 commit comments

Comments
 (0)