1+ package com .atguigu .proj
2+
3+ import org .apache .flink .streaming .api .TimeCharacteristic
4+ import org .apache .flink .streaming .api .scala ._
5+ import org .apache .flink .table .api .EnvironmentSettings
6+ import org .apache .flink .table .api .scala ._
7+ import org .apache .flink .types .Row
8+
9+ // 和底层api的实现有什么区别呢?
10+ // sql没办法实现增量聚合和全窗口聚合结合使用
11+ // 所以很占存储空间
12+ object UserBehaviourAnalysisBySQL {
13+
14+ case class UserBehaviour (userId : Long ,
15+ itemId : Long ,
16+ categoryId : Int ,
17+ behaviour : String ,
18+ timestamp : Long )
19+
20+ def main (args : Array [String ]): Unit = {
21+ val env = StreamExecutionEnvironment .getExecutionEnvironment
22+ env.setParallelism(1 )
23+ // 为了时间旅行,必须使用事件时间
24+ env.setStreamTimeCharacteristic(TimeCharacteristic .EventTime )
25+
26+ val stream = env
27+ .readTextFile(" /Users/yuanzuo/Desktop/flink-tutorial/Flink0105/src/main/resources/UserBehavior.csv" )
28+ .map(line => {
29+ val arr = line.split(" ," )
30+ // 注意,时间戳单位必须是毫秒
31+ UserBehaviour (arr(0 ).toLong, arr(1 ).toLong, arr(2 ).toInt, arr(3 ), arr(4 ).toLong * 1000L )
32+ })
33+ .filter(_.behaviour.equals(" pv" )) // 过滤出pv事件
34+ .assignAscendingTimestamps(_.timestamp) // 分配升序时间戳
35+
36+ val settings = EnvironmentSettings
37+ .newInstance()
38+ .useBlinkPlanner()
39+ .inStreamingMode()
40+ .build()
41+
42+ val tEnv = StreamTableEnvironment .create(env, settings)
43+
44+ tEnv.createTemporaryView(" t" , stream, ' itemId , ' timestamp .rowtime as ' ts )
45+
46+ // HOP_END是关键字,用来获取窗口结束时间
47+ // 最内层的子查询相当于stream.keyBy(_.itemId).timeWindow(滑动窗口).aggregate()
48+ // 倒数第二层的子查询相当于.keyBy(_.windowEnd).process(排序)
49+ // 最外层查询相当于.take(3)
50+ tEnv
51+ .sqlQuery(
52+ """
53+ |SELECT *
54+ |FROM (
55+ | SELECT *,
56+ | ROW_NUMBER() OVER (PARTITION BY windowEnd ORDER BY itemCount DESC) as row_num
57+ | FROM (
58+ | SELECT itemId, count(itemId) as itemCount,
59+ | HOP_END(ts, INTERVAL '5' MINUTE, INTERVAL '1' HOUR) as windowEnd
60+ | FROM t GROUP BY itemId, HOP(ts, INTERVAL '5' MINUTE, INTERVAL '1' HOUR)
61+ | )
62+ |)
63+ |WHERE row_num <= 3""" .stripMargin)
64+ .toRetractStream[Row ] // 每个窗口的前三名是不断变化的,所以用撤回流
65+ .filter(_._1 == true ) // 把更新的数据过滤出来
66+ .print()
67+
68+ env.execute()
69+ }
70+ }
0 commit comments