Skip to content

Commit 6add773

Browse files
update
1 parent c83d05d commit 6add773

File tree

2 files changed

+112
-0
lines changed

2 files changed

+112
-0
lines changed
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
package com.atguigu.day11
2+
3+
import com.atguigu.day2.SensorSource
4+
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
5+
import org.apache.flink.table.api._
6+
import org.apache.flink.api.scala._
7+
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
8+
import org.apache.flink.table.functions.AggregateFunction
9+
import org.apache.flink.types.Row
10+
11+
object AggregateFunctionExample {
12+
def main(args: Array[String]): Unit = {
13+
val env = StreamExecutionEnvironment.getExecutionEnvironment
14+
env.setParallelism(1)
15+
16+
val settings = EnvironmentSettings.newInstance().inStreamingMode().build()
17+
18+
val tEnv = StreamTableEnvironment.create(env, settings)
19+
20+
val stream = env.addSource(new SensorSource)
21+
22+
val avgTemp = new AvgTemp()
23+
24+
// sql
25+
tEnv.createTemporaryView("sensor", stream)
26+
tEnv.registerFunction("avgTemp", avgTemp)
27+
val sqlResult = tEnv.sqlQuery("SELECT id, avgTemp(temperature) FROM sensor GROUP BY id")
28+
tEnv.toRetractStream[Row](sqlResult).print()
29+
30+
// table api
31+
val table = tEnv.fromDataStream(stream)
32+
val tableResult = table.groupBy($"id").aggregate(avgTemp($"temperature") as "avgTemp").select($"id", $"avgTemp")
33+
tEnv.toRetractStream[Row](tableResult).print()
34+
35+
env.execute()
36+
}
37+
38+
class AvgTempAcc {
39+
var sum: Double = 0.0
40+
var count: Int = 0
41+
}
42+
43+
class AvgTemp extends AggregateFunction[Double, AvgTempAcc] {
44+
override def createAccumulator(): AvgTempAcc = new AvgTempAcc
45+
46+
override def getValue(acc: AvgTempAcc): Double = acc.sum / acc.count
47+
48+
def accumulate(acc: AvgTempAcc, in: Double): Unit = {
49+
acc.sum += in
50+
acc.count += 1
51+
}
52+
}
53+
}
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
package com.atguigu.day11
2+
3+
import com.atguigu.day2.SensorSource
4+
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
5+
import org.apache.flink.api.scala._
6+
import org.apache.flink.table.api._
7+
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
8+
import org.apache.flink.table.functions.TableAggregateFunction
9+
import org.apache.flink.types.Row
10+
import org.apache.flink.util.Collector
11+
12+
object TableAggregateFunctionExample {
13+
def main(args: Array[String]): Unit = {
14+
val env = StreamExecutionEnvironment.getExecutionEnvironment
15+
env.setParallelism(1)
16+
17+
val settings = EnvironmentSettings.newInstance().inStreamingMode().build()
18+
19+
val tEnv = StreamTableEnvironment.create(env, settings)
20+
21+
val stream = env.addSource(new SensorSource)
22+
23+
val table = tEnv.fromDataStream(stream)
24+
25+
val top2Temp = new Top2Temp()
26+
27+
val resultTable = table
28+
.groupBy($"id")
29+
.flatAggregate(top2Temp($"temperature") as ("temp", "rank"))
30+
.select($"id", $"temp", $"rank")
31+
32+
tEnv.toRetractStream[Row](resultTable).print()
33+
34+
env.execute()
35+
}
36+
37+
class Top2TempAcc {
38+
var highestTemp: Double = Double.MinValue
39+
var secondHighestTemp: Double = Double.MinValue
40+
}
41+
42+
class Top2Temp extends TableAggregateFunction[(Double, Int), Top2TempAcc] {
43+
override def createAccumulator(): Top2TempAcc = new Top2TempAcc
44+
45+
def accumulate(acc: Top2TempAcc, in: Double): Unit = {
46+
if (in > acc.highestTemp) {
47+
acc.secondHighestTemp = acc.highestTemp
48+
acc.highestTemp = in
49+
} else if (in > acc.secondHighestTemp) {
50+
acc.secondHighestTemp = in
51+
}
52+
}
53+
54+
def emitValue(acc: Top2TempAcc, out: Collector[(Double, Int)]): Unit = {
55+
out.collect(acc.highestTemp, 1)
56+
out.collect(acc.secondHighestTemp, 2)
57+
}
58+
}
59+
}

0 commit comments

Comments
 (0)