11package com .atguigu .proj
22
3+ import java .sql .Timestamp
4+
35import org .apache .flink .api .common .functions .AggregateFunction
6+ import org .apache .flink .api .common .state .ListStateDescriptor
7+ import org .apache .flink .api .scala .typeutils .Types
48import org .apache .flink .streaming .api .TimeCharacteristic
9+ import org .apache .flink .streaming .api .functions .KeyedProcessFunction
510import org .apache .flink .streaming .api .scala ._
611import org .apache .flink .streaming .api .scala .function .ProcessWindowFunction
712import org .apache .flink .streaming .api .windowing .time .Time
813import org .apache .flink .streaming .api .windowing .windows .TimeWindow
914import org .apache .flink .util .Collector
1015
16+ import scala .collection .mutable .ListBuffer
17+
1118object UserBehaviourAnalysis {
1219
1320 case class UserBehaviour (userId : Long ,
@@ -30,6 +37,7 @@ object UserBehaviourAnalysis {
3037 .readTextFile(" /Users/yuanzuo/Desktop/flink-tutorial/Flink0105/src/main/resources/UserBehavior.csv" )
3138 .map(line => {
3239 val arr = line.split(" ," )
40+ // 注意,时间戳单位必须是毫秒
3341 UserBehaviour (arr(0 ).toLong, arr(1 ).toLong, arr(2 ).toInt, arr(3 ), arr(4 ).toLong * 1000L )
3442 })
3543 .filter(_.behaviour.equals(" pv" )) // 过滤出pv事件
@@ -38,13 +46,74 @@ object UserBehaviourAnalysis {
3846 .timeWindow(Time .hours(1 ), Time .minutes(5 )) // 每隔5分钟,最近一个小时
3947 // 增量聚合和全窗口聚合结合使用
4048 // 聚合结果ItemViewCount是每个窗口中每个商品被浏览的次数
41- .aggregate(new CountAgg , new WindowResult )
49+ .aggregate(new CountAgg , new WindowResult ) // => DataStream[ItemViewCount]
50+ // 对DataStream[ItemViewCount]使用窗口结束时间进行分流
51+ // 每一条支流里面的元素都属于同一个窗口,元素是ItemViewCount
52+ // 所以只需要对支流里面的元素按照count字段进行排序就可以了
53+ // 支流里的元素是有限的,因为都属于同一个窗口
54+ .keyBy(_.windowEnd) // => KeyedStream
55+ .process(new TopN (3 ))
4256
4357 stream.print()
4458
4559 env.execute()
4660 }
4761
62+ class TopN (n : Int ) extends KeyedProcessFunction [Long , ItemViewCount , String ] {
63+ // 初始化一个列表状态变量
64+ lazy val itemState = getRuntimeContext.getListState(
65+ new ListStateDescriptor [ItemViewCount ](" item-state" , Types .of[ItemViewCount ])
66+ )
67+
68+ // 每来一条ItemViewCount就调用一次
69+ override def processElement (value : ItemViewCount , ctx : KeyedProcessFunction [Long , ItemViewCount , String ]# Context , out : Collector [String ]): Unit = {
70+ itemState.add(value)
71+ // 由于所有value的windowEnd都一样,所以只会注册一个定时器
72+ ctx.timerService().registerEventTimeTimer(value.windowEnd + 100L )
73+ }
74+
75+ // 定时器用来排序
76+ override def onTimer (timestamp : Long , ctx : KeyedProcessFunction [Long , ItemViewCount , String ]# OnTimerContext , out : Collector [String ]): Unit = {
77+ val allItems : ListBuffer [ItemViewCount ] = ListBuffer ()
78+ // 导入隐式类型转换
79+ import scala .collection .JavaConversions ._
80+ // 将列表状态变量中的元素都转移到allItems中
81+ // 因为列表状态变量没有排序的功能,所以必须取出来排序
82+ for (item <- itemState.get) {
83+ allItems += item
84+ }
85+ // 清空列表状态变量了,GC
86+ itemState.clear()
87+
88+ // 对allItems降序排列,取出前n个元素
89+ val sortedItems = allItems.sortBy(- _.count).take(n)
90+
91+ // 打印结果
92+ val result = new StringBuilder
93+ result
94+ .append(" ======================================\n " )
95+ .append(" 窗口结束时间是:" )
96+ // 还原窗口结束时间,所以要减去100ms
97+ .append(new Timestamp (timestamp - 100L ))
98+ .append(" \n " )
99+ for (i <- sortedItems.indices) {
100+ val currItem = sortedItems(i)
101+ result
102+ .append(" 第" )
103+ .append(i + 1 )
104+ .append(" 名的商品ID是:" )
105+ .append(currItem.itemId)
106+ .append(" ,浏览量是:" )
107+ .append(currItem.count)
108+ .append(" \n " )
109+ }
110+ result
111+ .append(" ======================================\n\n\n " )
112+
113+ out.collect(result.toString())
114+ }
115+ }
116+
48117 class CountAgg extends AggregateFunction [UserBehaviour , Long , Long ] {
49118 override def createAccumulator (): Long = 0L
50119
0 commit comments