Skip to content

Commit dee302d

Browse files
非常非常关键的改变!
1 parent d5f33c0 commit dee302d

23 files changed

+9864
-11805
lines changed
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
package com.atguigu.course
2+
3+
import org.apache.flink.streaming.api.TimeCharacteristic
4+
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction
5+
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
6+
import org.apache.flink.streaming.api.scala._
7+
import org.apache.flink.streaming.api.windowing.time.Time
8+
import org.apache.flink.util.Collector
9+
import org.joda.time.DateTime
10+
import org.joda.time.format.DateTimeFormat
11+
12+
// 需求:每个用户的点击Join这个用户最近10分钟内的浏览
13+
14+
// 数据流clickStream
15+
// 某个用户在某个时刻点击了某个页面
16+
// {"userID": "user_2", "eventTime": "2019-11-16 17:30:02", "eventType": "click", "pageID": "page_1"}
17+
18+
// 数据流browseStream
19+
// 某个用户在某个时刻浏览了某个商品,以及商品的价值
20+
// {"userID": "user_2", "eventTime": "2019-11-16 17:30:01", "eventType": "browse", "productID": "product_1", "productPrice": 10}
21+
object IntervalJoinExample {
22+
23+
case class UserClickLog(userID: String,
24+
eventTime: String,
25+
eventType: String,
26+
pageID: String)
27+
28+
case class UserBrowseLog(userID: String,
29+
eventTime: String,
30+
eventType: String,
31+
productID: String,
32+
productPrice: String)
33+
34+
def main(args: Array[String]): Unit = {
35+
val env = StreamExecutionEnvironment.getExecutionEnvironment
36+
env.setParallelism(1)
37+
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
38+
39+
val clickStream = env
40+
.fromElements(
41+
UserClickLog("user_2", "2019-11-16 17:30:00", "click", "page_1")
42+
)
43+
.assignTimestampsAndWatermarks(
44+
new BoundedOutOfOrdernessTimestampExtractor[UserClickLog](Time.seconds(0)) {
45+
override def extractTimestamp(t: UserClickLog): Long = {
46+
val dateTimeFormatter = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss")
47+
val dateTime = DateTime.parse(t.eventTime, dateTimeFormatter)
48+
dateTime.getMillis
49+
}
50+
}
51+
)
52+
53+
val browseStream = env
54+
.fromElements(
55+
UserBrowseLog("user_2", "2019-11-16 17:19:00", "browse", "product_1", "10"),
56+
UserBrowseLog("user_2", "2019-11-16 17:20:00", "browse", "product_1", "10"),
57+
UserBrowseLog("user_2", "2019-11-16 17:22:00", "browse", "product_1", "10"),
58+
UserBrowseLog("user_2", "2019-11-16 17:26:00", "browse", "product_1", "10"),
59+
UserBrowseLog("user_2", "2019-11-16 17:30:00", "browse", "product_1", "10"),
60+
UserBrowseLog("user_2", "2019-11-16 17:31:00", "browse", "product_1", "10")
61+
)
62+
.assignTimestampsAndWatermarks(
63+
new BoundedOutOfOrdernessTimestampExtractor[UserBrowseLog](Time.seconds(0)) {
64+
override def extractTimestamp(t: UserBrowseLog): Long = {
65+
val dateTimeFormatter = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss")
66+
val dateTime = DateTime.parse(t.eventTime, dateTimeFormatter)
67+
dateTime.getMillis
68+
}
69+
}
70+
)
71+
72+
clickStream
73+
.keyBy("userID")
74+
.intervalJoin(browseStream.keyBy("userID"))
75+
.between(Time.minutes(-10),Time.seconds(0))
76+
.process(new MyIntervalJoin)
77+
.print()
78+
79+
env.execute()
80+
}
81+
82+
class MyIntervalJoin extends ProcessJoinFunction[UserClickLog, UserBrowseLog, String] {
83+
override def processElement(left: UserClickLog,
84+
right: UserBrowseLog,
85+
context: ProcessJoinFunction[UserClickLog, UserBrowseLog, String]#Context,
86+
out: Collector[String]): Unit = {
87+
out.collect(left +" =Interval Join=> "+right)
88+
}
89+
}
90+
}
199 Bytes
Binary file not shown.

bin/preproc

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
#!/usr/bin/env python3
2+
3+
"""
4+
Concatenates a number of input files into a single output file, while
5+
performing the following regex substitutions:
6+
7+
[[pagebreak]]
8+
9+
[nobr[s]] # Substitute spaces with `\ ` to mark as nonbreaking
10+
# Doesn't work inside code markdown, alas
11+
12+
[nh[x]] \hyphenation{x} # no hyphen, no underscores allowed
13+
[ix[x]] \index{x} # index straight up
14+
[ixtt[x]] \index{x@\texttt{x}} # index tt
15+
16+
fl = footnote link
17+
flx = footnote link to example https://beej.us/guide/bgnet/examples/file
18+
flr = footnote link to redirect https://beej.us/guide/url/id
19+
20+
[fl[link|url]] [link](url)^[url]
21+
[flx[link|file]] [link](https://beej.us/guide/bgnet/examples/file)^[https://beej.us/guide/bgnet/examples/file]
22+
[flr[link|id]] [link](https://beej.us/guide/url/id)^[https://beej.us/guide/url/id]
23+
[flrfc[link|num]] [link](https://tools.ietf.org/html/rfcnum)^[https://tools.ietf.org/html/rfcnum]
24+
25+
Also puts a blank line between files.
26+
27+
"""
28+
29+
import sys
30+
import re
31+
import preproc_config
32+
33+
if len(sys.argv) < 3:
34+
print("usage: preproc infile [infile ... ] outputfile", file=sys.stdout)
35+
sys.exit(1)
36+
37+
infiles = sys.argv[1:-1]
38+
outfile = sys.argv[-1]
39+
40+
filedata = []
41+
42+
def nobr_replace(mo):
43+
return re.sub(r'\s', r'\ ', mo.group(1))
44+
45+
for infile in infiles:
46+
fin = open(infile)
47+
filedata.append(fin.read())
48+
fin.close()
49+
50+
filedata = '\n'.join(filedata)
51+
52+
filedata = re.sub(r'\t', " ", filedata, flags=re.DOTALL)
53+
filedata = re.sub(r'\[nobr\[(.+?)\]\]', nobr_replace, filedata, flags=re.DOTALL)
54+
filedata = re.sub(r'\[\[pagebreak\]\]', r'\\newpage', filedata, flags=re.DOTALL)
55+
filedata = re.sub(r'\[nh\[(.+?)\]\]', r'\\hyphenation{\1}', filedata, flags=re.DOTALL)
56+
filedata = re.sub(r'\[ix\[(.+?)\]\]', r'\\index{\1}', filedata, flags=re.DOTALL)
57+
filedata = re.sub(r'\[ixtt\[(.+?)\]\]', r'\\index{\1@\\texttt{\1}}', filedata, flags=re.DOTALL)
58+
filedata = re.sub(r'\[fl\[(.+?)\|(.+?)\]\]', r'[\1](\2)^[\2]', filedata, flags=re.DOTALL)
59+
filedata = re.sub(r'\[flx\[(.+?)\|(.+?)\]\]', r'[\1](' + preproc_config.EXAMPLE_URL + r'\2)^[' + preproc_config.EXAMPLE_URL + r'\2]', filedata, flags=re.DOTALL)
60+
filedata = re.sub(r'\[flr\[(.+?)\|(.+?)\]\]', r'[\1](https://beej.us/guide/url/\2)^[https://beej.us/guide/url/\2]', filedata, flags=re.DOTALL)
61+
filedata = re.sub(r'\[flrfc\[(.+?)\|(.+?)\]\]', r'[\1](https://tools.ietf.org/html/rfc\2)^[https://tools.ietf.org/html/rfc\2]', filedata, flags=re.DOTALL)
62+
63+
fout = open(outfile, "w")
64+
in_fence = False
65+
this_line_fence = False
66+
number_lines = False
67+
68+
# Go through a line at a time indenting if we're in unnumbered fenced code
69+
for line in filedata.splitlines(True):
70+
if line.strip()[:3] == '```':
71+
number_lines = line.lower().find("numberlines") != -1
72+
this_line_fence = True
73+
in_fence = not in_fence
74+
else:
75+
this_line_fence = False
76+
77+
if in_fence and not this_line_fence and not number_lines:
78+
fout.write(" ") # indent
79+
80+
fout.write(line)
81+
82+
fout.close()
83+

bin/preproc_config.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
EXAMPLE_URL = "https://beej.us/guide/bgnet/examples/"

docs/Makefile

Lines changed: 174 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,174 @@
1+
TITLE="尚硅谷Flink教程"
2+
SUBTITLE="Flink理论与项目实践"
3+
AUTHOR='尚硅谷大数据教学组'
4+
VERSION_DATE="v3.1.2, Copyright © November 13, 2019"
5+
6+
GUIDE_ID=flinktutorial
7+
8+
PDF_MAINFONT="Liberation Serif"
9+
PDF_SANSFONT="Liberation Sans"
10+
PDF_MONOFONT="Liberation Mono"
11+
#PDF_MAINFONT="DejaVu Serif"
12+
#PDF_SANSFONT="DejaVu Sans"
13+
#PDF_MONOFONT="DejaVu Sans Mono"
14+
15+
USLETTER_COLOR=$(GUIDE_ID)_usl_c_1.pdf $(GUIDE_ID)_usl_c_2.pdf
16+
USLETTER_BW=$(GUIDE_ID)_usl_bw_1.pdf $(GUIDE_ID)_usl_bw_2.pdf
17+
A4_COLOR=$(GUIDE_ID)_a4_c_1.pdf $(GUIDE_ID)_a4_c_2.pdf
18+
A4_BW=$(GUIDE_ID)_a4_bw_1.pdf $(GUIDE_ID)_a4_bw_2.pdf
19+
BOOKS=$(A4_COLOR)
20+
21+
HTML=$(GUIDE_ID).html
22+
23+
PREPROC=../bin/preproc
24+
PREPROC_MD=$(GUIDE_ID)_temp_preproc.md
25+
26+
COMMON_OPTS= \
27+
--variable title:$(TITLE) \
28+
--variable subtitle:$(SUBTITLE) \
29+
--variable author:$(AUTHOR) \
30+
--variable date:$(VERSION_DATE) \
31+
--toc
32+
33+
PDF_OPTS= \
34+
-H latex/header_index.latex \
35+
-A latex/after_index.latex \
36+
--pdf-engine=xelatex \
37+
--variable mainfont=$(PDF_MAINFONT) \
38+
--variable sansfont=$(PDF_SANSFONT) \
39+
--variable monofont=$(PDF_MONOFONT) \
40+
--variable geometry:"top=1in,bottom=1in" \
41+
-V documentclass=ctexbook \
42+
-o $(GUIDE_ID)_temp.tex \
43+
$(COMMON_OPTS)
44+
45+
HTML_OPTS=$(COMMON_OPTS) \
46+
--metadata title:$(TITLE)
47+
48+
ONESIDE=--variable classoption:oneside
49+
TWOSIDE=--variable classoption:twoside
50+
USLETTER=--variable papersize:letter
51+
A4=--variable papersize:a4
52+
CROWNQUARTO=--variable geometry:"paperwidth=7.444in,paperheight=9.681in,top=1in,bottom=1in,left=1in,right=1.5in" # Lulu press
53+
CROWNQUARTO_AMAZON=--variable geometry:"paperwidth=7.444in,paperheight=9.681in,top=1in,bottom=1in,left=1.25in,right=1.25in" # Amazon
54+
#SIZE_75x925_AMAZON=--variable geometry:"paperwidth=7.5in,paperheight=9.25in,top=1in,bottom=1in,left=1.125in,right=1.375in" # Amazon 7.5" x 9.25", margins too far inside
55+
SIZE_75x925_AMAZON=--variable geometry:"paperwidth=7.5in,paperheight=9.25in,top=1in,bottom=1in,left=1.25in,right=1.25in" # Amazon 7.5" x 9.25"
56+
BLANKLAST=-A latex/after_blank.latex # add a blank last page
57+
BW=--no-highlight # black and white options
58+
59+
all: $(HTML) $(BOOKS)
60+
61+
$(GUIDE_ID).html: $(GUIDE_ID).md bg-css.html
62+
$(PREPROC) $< $(PREPROC_MD)
63+
pandoc $(HTML_OPTS) -s $(PREPROC_MD) -o $(GUIDE_ID).html -H bg-css.html
64+
sed 's/src="\(.*\)\.pdf"/src="\1.svg"/g' $(GUIDE_ID).html > $(GUIDE_ID)_temp.html # use svg images
65+
mv $(GUIDE_ID)_temp.html $(GUIDE_ID).html
66+
rm -f $(GUIDE_ID)_temp*
67+
68+
$(GUIDE_ID)_usl_c_1.pdf: $(GUIDE_ID).md
69+
$(PREPROC) $< $(PREPROC_MD)
70+
pandoc $(PDF_OPTS) $(USLETTER) $(ONESIDE) $(PREPROC_MD)
71+
xelatex $(GUIDE_ID)_temp.tex
72+
makeindex $(GUIDE_ID)_temp.idx
73+
xelatex $(GUIDE_ID)_temp.tex
74+
xelatex $(GUIDE_ID)_temp.tex
75+
mv $(GUIDE_ID)_temp.pdf $@
76+
rm -f $(GUIDE_ID)_temp*
77+
78+
$(GUIDE_ID)_usl_c_2.pdf: $(GUIDE_ID).md
79+
$(PREPROC) $< $(PREPROC_MD)
80+
pandoc $(PDF_OPTS) $(USLETTER) $(TWOSIDE) $(PREPROC_MD)
81+
xelatex $(GUIDE_ID)_temp.tex
82+
makeindex $(GUIDE_ID)_temp.idx
83+
xelatex $(GUIDE_ID)_temp.tex
84+
xelatex $(GUIDE_ID)_temp.tex
85+
mv $(GUIDE_ID)_temp.pdf $@
86+
rm -f $(GUIDE_ID)_temp*
87+
88+
$(GUIDE_ID)_a4_c_1.pdf: $(GUIDE_ID).md
89+
$(PREPROC) $< $(PREPROC_MD)
90+
pandoc $(PDF_OPTS) $(A4) $(ONESIDE) $(PREPROC_MD)
91+
xelatex $(GUIDE_ID)_temp.tex
92+
makeindex $(GUIDE_ID)_temp.idx
93+
xelatex $(GUIDE_ID)_temp.tex
94+
xelatex $(GUIDE_ID)_temp.tex
95+
mv $(GUIDE_ID)_temp.pdf $@
96+
rm -f $(GUIDE_ID)_temp*
97+
98+
$(GUIDE_ID)_a4_c_2.pdf: $(GUIDE_ID).md
99+
$(PREPROC) $< $(PREPROC_MD)
100+
pandoc $(PDF_OPTS) $(A4) $(TWOSIDE) $(PREPROC_MD)
101+
xelatex $(GUIDE_ID)_temp.tex
102+
makeindex $(GUIDE_ID)_temp.idx
103+
xelatex $(GUIDE_ID)_temp.tex
104+
xelatex $(GUIDE_ID)_temp.tex
105+
mv $(GUIDE_ID)_temp.pdf $@
106+
rm -f $(GUIDE_ID)_temp*
107+
108+
$(GUIDE_ID)_usl_bw_1.pdf: $(GUIDE_ID).md
109+
$(PREPROC) $< $(PREPROC_MD)
110+
pandoc $(PDF_OPTS) $(USLETTER) $(ONESIDE) $(BW) $(PREPROC_MD)
111+
xelatex $(GUIDE_ID)_temp.tex
112+
makeindex $(GUIDE_ID)_temp.idx
113+
xelatex $(GUIDE_ID)_temp.tex
114+
xelatex $(GUIDE_ID)_temp.tex
115+
mv $(GUIDE_ID)_temp.pdf $@
116+
rm -f $(GUIDE_ID)_temp*
117+
118+
$(GUIDE_ID)_usl_bw_2.pdf: $(GUIDE_ID).md
119+
$(PREPROC) $< $(PREPROC_MD)
120+
pandoc $(PDF_OPTS) $(USLETTER) $(TWOSIDE) $(BW) $(PREPROC_MD)
121+
xelatex $(GUIDE_ID)_temp.tex
122+
makeindex $(GUIDE_ID)_temp.idx
123+
xelatex $(GUIDE_ID)_temp.tex
124+
xelatex $(GUIDE_ID)_temp.tex
125+
mv $(GUIDE_ID)_temp.pdf $@
126+
rm -f $(GUIDE_ID)_temp*
127+
128+
$(GUIDE_ID)_a4_bw_1.pdf: $(GUIDE_ID).md
129+
$(PREPROC) $< $(PREPROC_MD)
130+
pandoc $(PDF_OPTS) $(A4) $(ONESIDE) $(BW) $(PREPROC_MD)
131+
xelatex $(GUIDE_ID)_temp.tex
132+
makeindex $(GUIDE_ID)_temp.idx
133+
xelatex $(GUIDE_ID)_temp.tex
134+
xelatex $(GUIDE_ID)_temp.tex
135+
mv $(GUIDE_ID)_temp.pdf $@
136+
rm -f $(GUIDE_ID)_temp*
137+
138+
$(GUIDE_ID)_a4_bw_2.pdf: $(GUIDE_ID).md
139+
$(PREPROC) $< $(PREPROC_MD)
140+
pandoc $(PDF_OPTS) $(A4) $(TWOSIDE) $(BW) $(PREPROC_MD)
141+
xelatex $(GUIDE_ID)_temp.tex
142+
makeindex $(GUIDE_ID)_temp.idx
143+
xelatex $(GUIDE_ID)_temp.tex
144+
xelatex $(GUIDE_ID)_temp.tex
145+
mv $(GUIDE_ID)_temp.pdf $@
146+
rm -f $(GUIDE_ID)_temp*
147+
148+
$(GUIDE_ID)_lulu.pdf: $(GUIDE_ID)_lulu.md
149+
$(PREPROC) $< $(PREPROC_MD)
150+
pandoc $(PDF_OPTS) $(TWOSIDE) $(CROWNQUARTO) $(BLANKLAST) $(PREPROC_MD)
151+
xelatex $(GUIDE_ID)_temp.tex
152+
makeindex $(GUIDE_ID)_temp.idx
153+
xelatex $(GUIDE_ID)_temp.tex
154+
xelatex $(GUIDE_ID)_temp.tex
155+
mv $(GUIDE_ID)_temp.pdf $@
156+
rm -f $(GUIDE_ID)_temp*
157+
158+
$(GUIDE_ID)_amazon.pdf: $(GUIDE_ID)_amazon.md
159+
$(PREPROC) $< $(PREPROC_MD)
160+
pandoc $(PDF_OPTS) $(TWOSIDE) $(SIZE_75x925_AMAZON) $(BLANKLAST) $(PREPROC_MD)
161+
xelatex $(GUIDE_ID)_temp.tex
162+
makeindex $(GUIDE_ID)_temp.idx
163+
xelatex $(GUIDE_ID)_temp.tex
164+
xelatex $(GUIDE_ID)_temp.tex
165+
mv $(GUIDE_ID)_temp.pdf $@
166+
rm -f $(GUIDE_ID)_temp*
167+
168+
clean:
169+
rm -f $(GUIDE_ID)_temp*
170+
171+
pristine: clean
172+
rm -f $(HTML) $(BOOKS)
173+
174+
.PHONY: all, html, clean, pristine

0 commit comments

Comments
 (0)