Skip to content
Open

aa #15

Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
195 commits
Select commit Hold shift + click to select a range
9baf1dc
flink 1
confucianzuoyuan Apr 29, 2019
19d68b7
fix
confucianzuoyuan Apr 29, 2019
9e306be
Create WatermarkTest.scala
confucianzuoyuan May 2, 2019
ded2813
xxx
confucianzuoyuan May 2, 2019
90c4e30
proj
confucianzuoyuan May 4, 2019
3ae874a
Merge branch 'master' of github.com:confucianzuoyuan/flink-tutorial
confucianzuoyuan May 4, 2019
5267d9c
fix
confucianzuoyuan May 7, 2019
8301b53
rm
confucianzuoyuan May 7, 2019
42e3075
add
confucianzuoyuan May 7, 2019
66d2106
fix
confucianzuoyuan May 10, 2019
fd1c10e
rm
confucianzuoyuan May 10, 2019
65c0d00
fix
confucianzuoyuan May 10, 2019
e74f48c
fix
confucianzuoyuan May 10, 2019
8e60c02
fix
confucianzuoyuan May 10, 2019
78befad
fix
confucianzuoyuan May 11, 2019
bb5b0be
fix
confucianzuoyuan May 11, 2019
00165e9
fix
confucianzuoyuan May 13, 2019
63c2fd6
add data set
confucianzuoyuan May 13, 2019
4b74a48
fix
confucianzuoyuan May 13, 2019
3e64337
fix
confucianzuoyuan May 13, 2019
7d51992
fix
confucianzuoyuan May 13, 2019
e5259da
fix
confucianzuoyuan May 13, 2019
7f82842
add
confucianzuoyuan May 13, 2019
aba14f4
java to scala
confucianzuoyuan May 16, 2019
475baed
fix
confucianzuoyuan May 17, 2019
cd7694c
fix
confucianzuoyuan May 17, 2019
c367954
fix
confucianzuoyuan May 17, 2019
e0ffff9
fix
confucianzuoyuan May 17, 2019
1ba367e
fix
confucianzuoyuan May 17, 2019
7a82998
rm
confucianzuoyuan May 17, 2019
bd27444
rename
confucianzuoyuan May 17, 2019
c4f7900
add
confucianzuoyuan May 17, 2019
ecc8418
add
confucianzuoyuan May 18, 2019
a9c6d86
fix
confucianzuoyuan May 20, 2019
0dc0cd9
fix
confucianzuoyuan May 21, 2019
671a74a
add
confucianzuoyuan May 22, 2019
f02e81a
fix
confucianzuoyuan May 23, 2019
456f956
fix
confucianzuoyuan May 23, 2019
781e4c3
fix
confucianzuoyuan May 23, 2019
a2ffe04
fix
confucianzuoyuan May 23, 2019
3cfc629
ignore
confucianzuoyuan May 24, 2019
9767a17
add
confucianzuoyuan May 24, 2019
6da3383
add readme
confucianzuoyuan May 24, 2019
ba1970b
refactor
confucianzuoyuan May 24, 2019
1bcd63d
fix
confucianzuoyuan May 24, 2019
6ef3795
fix
confucianzuoyuan May 29, 2019
0104f16
add
confucianzuoyuan May 29, 2019
6a5eac3
fix
confucianzuoyuan May 29, 2019
82dd520
add
confucianzuoyuan May 29, 2019
e7df710
add
confucianzuoyuan Jun 4, 2019
ebc8f79
add
confucianzuoyuan Jun 4, 2019
092ab49
add
confucianzuoyuan Jun 4, 2019
6789119
add
confucianzuoyuan Jun 5, 2019
7430de9
add
confucianzuoyuan Jun 5, 2019
454b27f
add
confucianzuoyuan Jun 5, 2019
f5b4c8c
fix
confucianzuoyuan Jun 5, 2019
216f1a6
fix
confucianzuoyuan Jun 5, 2019
67cb260
fix
confucianzuoyuan Jun 5, 2019
c5f256c
fix
confucianzuoyuan Jun 5, 2019
df297bf
add
confucianzuoyuan Jun 6, 2019
ebcb032
fix
confucianzuoyuan Jun 6, 2019
b7186ed
rewrite
confucianzuoyuan Jun 6, 2019
cfb3e50
fix
confucianzuoyuan Jun 6, 2019
047f526
add
confucianzuoyuan Jun 7, 2019
7b033a7
fix
confucianzuoyuan Jun 8, 2019
fb36899
fix
confucianzuoyuan Jun 8, 2019
b9b9837
fix
confucianzuoyuan Jun 9, 2019
fc693b9
fix
confucianzuoyuan Jun 9, 2019
41ae236
add
confucianzuoyuan Jun 9, 2019
24068ee
add
confucianzuoyuan Jun 10, 2019
333fe58
fix
confucianzuoyuan Jun 10, 2019
76c2fca
fix
confucianzuoyuan Jun 10, 2019
57b6d40
fix
confucianzuoyuan Jun 10, 2019
a9b929e
fix
confucianzuoyuan Jun 10, 2019
da93893
rewrite
confucianzuoyuan Jun 10, 2019
7ed73a8
fix
confucianzuoyuan Jun 10, 2019
76f6d84
fix
confucianzuoyuan Jun 11, 2019
12719f0
fix
confucianzuoyuan Jun 11, 2019
dee456b
add
confucianzuoyuan Jun 11, 2019
8ad6a77
fix
confucianzuoyuan Jun 11, 2019
cc199c0
fix
confucianzuoyuan Jun 11, 2019
41393a9
add
confucianzuoyuan Jun 12, 2019
1ed6b55
fix
confucianzuoyuan Jun 12, 2019
b5296c0
fix
confucianzuoyuan Jun 12, 2019
e39e763
fix
confucianzuoyuan Jun 12, 2019
16d8091
fix
confucianzuoyuan Jun 12, 2019
8451aea
fix
confucianzuoyuan Jun 13, 2019
d05411c
add slide & document of ecommerce project
wushengran Jun 17, 2019
576795e
Merge branch 'master' of github.com:confucianzuoyuan/flink-tutorial
confucianzuoyuan Jun 17, 2019
d2428e8
fix
confucianzuoyuan Jun 17, 2019
69a98ea
fix
confucianzuoyuan Jun 17, 2019
5454326
fix
confucianzuoyuan Jun 17, 2019
3ed47de
fix
confucianzuoyuan Jun 17, 2019
b56ab37
fix
confucianzuoyuan Jun 17, 2019
958ad8b
fix
confucianzuoyuan Jun 17, 2019
0f4c62b
Update README.md
confucianzuoyuan Jun 17, 2019
96737c0
fix
confucianzuoyuan Jun 17, 2019
ec2fc3c
add
confucianzuoyuan Jun 18, 2019
16d191e
fix
confucianzuoyuan Jun 18, 2019
608ff42
fix
confucianzuoyuan Jun 18, 2019
79092aa
add
confucianzuoyuan Jun 18, 2019
3330dc6
add
confucianzuoyuan Jun 20, 2019
5b0300c
fix
confucianzuoyuan Jun 20, 2019
d868e5e
fix
confucianzuoyuan Jun 20, 2019
3d3752c
add es
confucianzuoyuan Jun 20, 2019
5648183
fix
confucianzuoyuan Jun 21, 2019
1776939
fix
confucianzuoyuan Jun 21, 2019
807e20f
fix
confucianzuoyuan Jun 21, 2019
183eb73
fix
confucianzuoyuan Jun 23, 2019
878e8fb
add
confucianzuoyuan Jun 23, 2019
1d0aae3
fix
confucianzuoyuan Jun 23, 2019
cae012d
fix
confucianzuoyuan Jun 23, 2019
1444d0f
fix
confucianzuoyuan Jun 23, 2019
4ef8e0d
fix
confucianzuoyuan Jun 24, 2019
956c8a0
fix
confucianzuoyuan Jun 24, 2019
4059a52
trans
confucianzuoyuan Jun 24, 2019
f87524f
add
confucianzuoyuan Jun 24, 2019
0e59f9b
fix
confucianzuoyuan Jun 25, 2019
951863b
fix
confucianzuoyuan Jun 25, 2019
76bfeb0
change svg to png
confucianzuoyuan Jun 25, 2019
bc8e2cd
fix
confucianzuoyuan Jun 25, 2019
9e07e08
fix
confucianzuoyuan Jun 26, 2019
9205ea8
fix
confucianzuoyuan Jun 26, 2019
cf0a160
fix
confucianzuoyuan Jun 26, 2019
15b760c
fix
confucianzuoyuan Jun 28, 2019
257f3ee
fix
confucianzuoyuan Jun 28, 2019
aea8ba0
fix
confucianzuoyuan Jun 28, 2019
60d6185
fix
confucianzuoyuan Jun 29, 2019
b177165
fix
confucianzuoyuan Jun 29, 2019
285c51e
add
confucianzuoyuan Jun 29, 2019
08671d6
add
confucianzuoyuan Jun 29, 2019
36edd75
rm
confucianzuoyuan Jun 29, 2019
b9885b1
add
confucianzuoyuan Jun 29, 2019
7d80e49
fix
confucianzuoyuan Jun 29, 2019
4412a0b
add
confucianzuoyuan Jul 5, 2019
ff688dd
add
confucianzuoyuan Jul 6, 2019
ae80121
add
confucianzuoyuan Jul 8, 2019
f4788bc
add pdf
confucianzuoyuan Jul 8, 2019
ec39796
fix
confucianzuoyuan Jul 8, 2019
5ec2ff7
fix
confucianzuoyuan Jul 8, 2019
670713b
add chapter 3 translation of Stream Processing with Apache Flink
wushengran Jul 23, 2019
c9ad001
add
confucianzuoyuan Jul 25, 2019
fc3a7a5
fix
confucianzuoyuan Jul 25, 2019
93e7d7e
add chapter 1 translation of Stream Processing with Apache Flink
wushengran Jul 25, 2019
b5d9d4a
add
confucianzuoyuan Jul 26, 2019
844b192
Merge branch 'master' of github.com:confucianzuoyuan/flink-tutorial
confucianzuoyuan Jul 26, 2019
639bfbf
add markdown file
confucianzuoyuan Jul 28, 2019
8316f33
fix
confucianzuoyuan Jul 28, 2019
30b7b4c
fix
confucianzuoyuan Jul 28, 2019
27c9221
fix
confucianzuoyuan Jul 28, 2019
c0241de
fix
confucianzuoyuan Aug 1, 2019
f669f2e
add
confucianzuoyuan Aug 8, 2019
fe04f18
add
confucianzuoyuan Aug 8, 2019
6ef1a87
add content
confucianzuoyuan Aug 9, 2019
2688aeb
add content
confucianzuoyuan Aug 11, 2019
a211728
add
confucianzuoyuan Aug 11, 2019
301d451
add
confucianzuoyuan Aug 11, 2019
7a7c0aa
add
confucianzuoyuan Aug 13, 2019
33b209e
add
confucianzuoyuan Aug 14, 2019
b85fb9c
fix
confucianzuoyuan Aug 16, 2019
f093f8f
fix
confucianzuoyuan Aug 18, 2019
0cd6e6d
add
confucianzuoyuan Aug 20, 2019
7526e8b
add
confucianzuoyuan Aug 20, 2019
7d90b05
fix
confucianzuoyuan Aug 20, 2019
b922c37
add
confucianzuoyuan Aug 20, 2019
3d605fb
add
confucianzuoyuan Aug 20, 2019
7d7709d
add
confucianzuoyuan Aug 20, 2019
fa7ddc1
add
confucianzuoyuan Aug 20, 2019
35352a1
fix
confucianzuoyuan Aug 20, 2019
439b95a
fix
confucianzuoyuan Aug 20, 2019
833dbbe
fix
confucianzuoyuan Sep 9, 2019
83487b0
fix
confucianzuoyuan Oct 11, 2019
8501916
add final docs
wushengran Oct 11, 2019
10eedd9
fix
confucianzuoyuan Nov 9, 2019
6bf30f2
fix
confucianzuoyuan Nov 9, 2019
f7eb157
fix
confucianzuoyuan Nov 9, 2019
900fec2
fix
confucianzuoyuan Dec 4, 2019
1113e77
fix
confucianzuoyuan Dec 21, 2019
1ae9e3a
fix
confucianzuoyuan Dec 25, 2019
dc1318b
fix
confucianzuoyuan Dec 25, 2019
41a3477
add
confucianzuoyuan Dec 25, 2019
00cf84b
fix
confucianzuoyuan Dec 25, 2019
776d9cd
rm deprecated files
confucianzuoyuan Dec 26, 2019
cdc4638
重要清理
confucianzuoyuan Feb 8, 2020
e35d7ca
重要清理
confucianzuoyuan Feb 8, 2020
b5b77f4
重要清理
confucianzuoyuan Feb 8, 2020
2b89e99
重要清理
confucianzuoyuan Feb 8, 2020
846d586
重要清理
confucianzuoyuan Feb 8, 2020
cc167a4
重大重构-1
confucianzuoyuan Feb 15, 2020
253093b
重大重构-2
confucianzuoyuan Feb 17, 2020
4467e63
fix
confucianzuoyuan Feb 17, 2020
491bf69
加了运维部署一章
confucianzuoyuan Feb 17, 2020
27adb85
fix
confucianzuoyuan Feb 18, 2020
379be58
fix
confucianzuoyuan Feb 18, 2020
7d1fb53
fix
confucianzuoyuan Feb 18, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
.DS_Store
target/
.classpath
.idea/
.project
.settings/
.vscode/
*.iml
.metals/
*.aux
*.log
*.toc
*.synctex.gz
_minted-尚硅谷Flink教程/
*.out
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1 +1 @@
# Flink教程
[尚硅谷Flink教材链接](https://confucianzuoyuan.github.io/flink-tutorial)
140 changes: 140 additions & 0 deletions docs/chap1.adoc

Large diffs are not rendered by default.

242 changes: 242 additions & 0 deletions docs/chap10.adoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,242 @@
== Flink CEP简介

*什么是复杂事件CEP?*

一个或多个由简单事件构成的事件流通过一定的规则匹配,然后输出用户想得到的数据,满足规则的复杂事件。

*特征:*

* 目标:从有序的简单事件流中发现一些高阶特征
* 输入:一个或多个由简单事件构成的事件流
* 处理:识别简单事件之间的内在联系,多个符合一定规则的简单事件构成复杂事件
* 输出:满足规则的复杂事件

image::cep1.jpg[]

CEP用于分析低延迟、频繁产生的不同来源的事件流。CEP可以帮助在复杂的、不相关的事件流中找出有意义的模式和复杂的关系,以接近实时或准实时的获得通知并阻止一些行为。

CEP支持在流上进行模式匹配,根据模式的条件不同,分为连续的条件或不连续的条件;模式的条件允许有时间的限制,当在条件范围内没有达到满足的条件时,会导致模式匹配超时。

看起来很简单,但是它有很多不同的功能:

* 输入的流数据,尽快产生结果
* 在2个event流上,基于时间进行聚合类的计算
* 提供实时/准实时的警告和通知
* 在多样的数据源中产生关联并分析模式
* 高吞吐、低延迟的处理

市场上有多种CEP的解决方案,例如Spark、Samza、Beam等,但他们都没有提供专门的library支持。但是Flink提供了专门的CEP library。

*Flink CEP*

Flink为CEP提供了专门的Flink CEP library,它包含如下组件:

* Event Stream
* pattern定义
* pattern检测
* 生成Alert

image::cep6.png[]

首先,开发人员要在DataStream流上定义出模式条件,之后Flink CEP引擎进行模式检测,必要时生成告警。

为了使用Flink CEP,我们需要导入依赖:

[source,xml]
----
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cep-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
----

*Event Streams*

登录事件流

[source,scala]
----
case class LoginEvent(userId: String, ip: String, eventType: String, eventTime: String)

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)

val loginEventStream = env.fromCollection(List(
LoginEvent("1", "192.168.0.1", "fail", "1558430842"),
LoginEvent("1", "192.168.0.2", "fail", "1558430843"),
LoginEvent("1", "192.168.0.3", "fail", "1558430844"),
LoginEvent("2", "192.168.10.10", "success", "1558430845")
)).assignAscendingTimestamps(_.eventTime.toLong * 1000)
----

*Pattern API*

每个Pattern都应该包含几个步骤,或者叫做state。从一个state到另一个state,通常我们需要定义一些条件,例如下列的代码:

[source,scala]
----
val loginFailPattern = Pattern.begin[LoginEvent]("begin")
.where(_.eventType.equals("fail"))
.next("next")
.where(_.eventType.equals("fail"))
.within(Time.seconds(10)
----

每个state都应该有一个标示:

例如: ``.begin[LoginEvent]("begin")``中的"begin"

每个state都需要有一个唯一的名字,而且需要一个filter来过滤条件,这个过滤条件定义事件需要符合的条件

例如: ``.where(_.eventType.equals("fail"))``

我们也可以通过subtype来限制event的子类型:

[source,scala]
----
start.subtype(SubEvent.class).where(...);
----

事实上,你可以多次调用subtype和where方法;而且如果where条件是不相关的,你可以通过or来指定一个单独的filter函数:

[source,scala]
----
pattern.where(...).or(...);
----

之后,我们可以在此条件基础上,通过next或者followedBy方法切换到下一个state,next的意思是说上一步符合条件的元素之后紧挨着的元素;而followedBy并不要求一定是挨着的元素。这两者分别称为严格近邻和非严格近邻。

[source,scala]
----
val strictNext = start.next("middle")
val nonStrictNext = start.followedBy("middle")
----

最后,我们可以将所有的Pattern的条件限定在一定的时间范围内:

[source,scala]
----
next.within(Time.seconds(10))
----

这个时间可以是Processing Time,也可以是Event Time。

*Pattern 检测*

通过一个input DataStream以及刚刚我们定义的Pattern,我们可以创建一个PatternStream:

[source,scala]
----
val input = ...
val pattern = ...

val patternStream = CEP.pattern(input, pattern)
----

[source,scala]
----
val patternStream = CEP.pattern(loginEventStream.keyBy(_.userId), loginFailPattern)
----

一旦获得PatternStream,我们就可以通过select或flatSelect,从一个Map序列找到我们需要的告警信息。

*select*

select方法需要实现一个PatternSelectFunction,通过select方法来输出需要的警告。它接受一个Map对,包含string/event,其中key为state的名字,event则为真是的Event。

[source,scala]
----
val loginFailDataStream = patternStream
.select((pattern: Map[String, Iterable[LoginEvent]]) => {
val first = pattern.getOrElse("begin", null).iterator.next()
val second = pattern.getOrElse("next", null).iterator.next()

(second.userId, second.ip, second.eventType)
})
----

其返回值仅为1条记录。

*flatSelect*

通过实现PatternFlatSelectFunction,实现与select相似的功能。唯一的区别就是flatSelect方法可以返回多条记录。

*超时事件的处理*

通过within方法,我们的parttern规则限定在一定的窗口范围内。当有超过窗口时间后还到达的event,我们可以通过在select或flatSelect中,实现PatternTimeoutFunction/PatternFlatTimeoutFunction来处理这种情况。

[source,scala]
----
val complexResult = patternStream.select(orderTimeoutOutput) {
(pattern: Map[String, Iterable[OrderEvent]], timestamp: Long) => {
val createOrder = pattern.get("begin")
OrderTimeoutEvent(createOrder.get.iterator.next().orderId, "timeout")
}
} {
pattern: Map[String, Iterable[OrderEvent]] => {
val payOrder = pattern.get("next")
OrderTimeoutEvent(payOrder.get.iterator.next().orderId, "success")
}
}

val timeoutResult = complexResult.getSideOutput(orderTimeoutOutput)

complexResult.print()
timeoutResult.print()
----

完整例子:

[source,scala]
----
import org.apache.flink.cep.scala.CEP
import org.apache.flink.cep.scala.pattern.Pattern
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time

import scala.collection.Map

object ScalaFlinkLoginFail {

def main(args: Array[String]): Unit = {

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)

val loginEventStream = env.fromCollection(List(
LoginEvent("1", "192.168.0.1", "fail", "1558430842"),
LoginEvent("1", "192.168.0.2", "fail", "1558430843"),
LoginEvent("1", "192.168.0.3", "fail", "1558430844"),
LoginEvent("2", "192.168.10.10", "success", "1558430845")
)).assignAscendingTimestamps(_.eventTime.toLong)

val loginFailPattern = Pattern.begin[LoginEvent]("begin")
.where(_.eventType.equals("fail"))
.next("next")
.where(_.eventType.equals("fail"))
.within(Time.seconds(10))

val patternStream = CEP.pattern(loginEventStream.keyBy(_.userId), loginFailPattern)

val loginFailDataStream = patternStream
.select((pattern: Map[String, Iterable[LoginEvent]]) => {
val first = pattern.getOrElse("begin", null).iterator.next()
val second = pattern.getOrElse("next", null).iterator.next()

(second.userId, second.ip, second.eventType)
})

loginFailDataStream.print

env.execute
}

}

case class LoginEvent(userId: String, ip: String, eventType: String, eventTime: String)
----

Loading