File tree Expand file tree Collapse file tree
src/main/java/com/monitory/data Expand file tree Collapse file tree Original file line number Diff line number Diff line change 11package com .monitory .data ;
22
33import com .monitory .data .sources .MqttSource ;
4+ import com .monitory .data .transformations .TimeStampAssigner ;
45import org .apache .flink .api .common .eventtime .WatermarkStrategy ;
5- import org .apache .flink .api .common .functions .MapFunction ;
66import org .apache .flink .configuration .Configuration ;
77import org .apache .flink .streaming .api .datastream .DataStream ;
88import org .apache .flink .streaming .api .environment .StreamExecutionEnvironment ;
@@ -16,16 +16,9 @@ public static void main (String [] args) throws Exception {
1616 // 2. 데이터 소스
1717 DataStream <String > sourceStream = env .fromSource (new MqttSource (), WatermarkStrategy .noWatermarks (), "MQTT-Source" );
1818
19- // 3. 데이터 처리: 단순하게 문자열을 대문자로 변환하는 예시
19+ // 3. 데이터 처리: Time Stamp 출력과 Anomaly 감지
2020 DataStream <String > transformedStream = sourceStream
21- .map (new MapFunction <String , String >() {
22- @ Override
23- public String map (String value ) throws Exception {
24- // Thread.sleep(2000000);
25- System .out .println ("💡 received: " + value );
26- return value .toUpperCase ();
27- }
28- });
21+ .map (new TimeStampAssigner ());
2922
3023 // 4. 데이터 싱크: 콘솔에 출력
3124 transformedStream .print ();
Original file line number Diff line number Diff line change 1+ package com .monitory .data .transformations ;
2+
3+ import org .apache .flink .api .common .functions .MapFunction ;
4+ import com .fasterxml .jackson .databind .ObjectMapper ;
5+ import com .fasterxml .jackson .databind .node .ObjectNode ;
6+
7+ import java .time .Instant ;
8+ import java .time .LocalDateTime ;
9+
10+ public class TimeStampAssigner implements MapFunction <String , String > {
11+
12+ private static final ObjectMapper mapper = new ObjectMapper ();
13+
14+ @ Override
15+ public String map (String value ) throws Exception {
16+ ObjectNode jsonNode = (ObjectNode ) mapper .readTree (value );
17+ jsonNode .put ("time" , LocalDateTime .now ().toString ());
18+ return mapper .writeValueAsString (jsonNode );
19+ }
20+ }
You can’t perform that action at this time.
0 commit comments