前言
說起Spark,大家就會自然而然地想到Flink,而且會不自覺地將這兩種主流的大數(shù)據(jù)實(shí)時(shí)處理技術(shù)進(jìn)行比較。然后最終得出結(jié)論:Flink實(shí)時(shí)性大于Spark。
的確,F(xiàn)link中的數(shù)據(jù)計(jì)算是以事件為驅(qū)動的,所以來一條數(shù)據(jù)就會觸發(fā)一次計(jì)算,而Spark基于數(shù)據(jù)集RDD計(jì)算,RDD最小生成間隔就是50毫秒,所以Spark就被定義為亞實(shí)時(shí)計(jì)算。
窗口Window
這里的RDD就是“天然的窗口”,將RDD生成的時(shí)間間隔設(shè)置成1min,那么這個(gè)RDD就可以理解為“1min窗口”。所以如果想要窗口計(jì)算,首選Spark。
但當(dāng)需要對即臨近時(shí)間窗口進(jìn)行計(jì)算時(shí),必須借助滑動窗口的算子來實(shí)現(xiàn)。
臨近時(shí)間如何理解
例如“3分鐘內(nèi)”這種時(shí)間范圍描述。這種時(shí)間范圍的計(jì)算,需要計(jì)算歷史的數(shù)據(jù)。例如1 ~ 3是3min,2 ~ 4也是3min,這里就重復(fù)使用了2和3的數(shù)據(jù),依次類推,3 ~ 5也是3min,同樣也重復(fù)使用了3和4。
如果使用普通窗口,就無法滿足“最近3分鐘內(nèi)”這種時(shí)間概念。
很多窗口都丟失了臨近時(shí)間,例如第3個(gè)RDD的臨近時(shí)間其實(shí)是第二個(gè)RDD,但是他們就沒法在一起計(jì)算,這就是為什么不用普通窗口的原因。
滑動窗口
滑動窗口三要素:RDD的生成時(shí)間、窗口的長度、滑動的步長。
我在本次實(shí)踐中,將RDD的時(shí)間間隔設(shè)置為10s,窗口長度為30s、滑動步長為10s。也就是說每10s就會生成一個(gè)窗口,計(jì)算最近30s內(nèi)的數(shù)據(jù),每個(gè)窗口由3個(gè)RDD組成。
數(shù)據(jù)源構(gòu)建
1. 數(shù)據(jù)規(guī)范
假設(shè)我們采集了設(shè)備的指標(biāo)信息,這里我們只關(guān)注吞吐量和響應(yīng)時(shí)間,在采集之前定義數(shù)據(jù)字段和規(guī)范[throughput, response_time],這里都定義成int類型,響應(yīng)時(shí)間單位這里定義成毫秒ms。
實(shí)際情況中,我們不可能只采集一臺設(shè)備,如果我們想要得出每臺或者每個(gè)種類設(shè)備的指標(biāo)監(jiān)控,就要在采集數(shù)據(jù)的時(shí)候?qū)γ總€(gè)設(shè)備加上唯一ID或者TypeID。
我這里的想法是對每臺設(shè)備的指標(biāo)進(jìn)行分析,所以我給每個(gè)設(shè)備都增加了一個(gè)唯一ID,最終字段[id, throughput, response_time],所以我們就按照這個(gè)數(shù)據(jù)格式,在SparkStreaming中構(gòu)建數(shù)據(jù)源讀取部分。
2. 讀取kafka
代碼語言:scala
復(fù)制
val conf = new SparkConf().setAppName("aqi").setMaster("local[1]") val ssc = new StreamingContext(conf, Seconds(10)) val kafkaParams = Map[String, Object]( "bootstrap.servers" -> "121.91.168.193:9092", "key.deserializer" -> classOf[StringDeserializer], "value.deserializer" -> classOf[StringDeserializer], "group.id" -> "aqi", "auto.offset.reset" -> "earliest", "enable.auto.commit" -> (true: java.lang.Boolean) ) val topics = Array("evt_monitor") val stream: DStream[String] = KafkaUtils.createDirectStream[String, String]( ssc, PreferConsistent, Subscribe[String, String](topics, kafkaParams) ).map(_.value)
這里我們將一個(gè)RDD時(shí)間間隔設(shè)置為10S,因?yàn)槭褂玫氖枪P記本跑,所以這里要將Master設(shè)置為local,表示本地運(yùn)行模式,1代表使用1個(gè)線程。
我們使用Kafka作為數(shù)據(jù)源,在讀取時(shí)就要構(gòu)建Consumer的config,像bootstrap.servers這些基本配置沒有什么好說的,關(guān)鍵的是auto.offset.reset和enable.auto.commit,
這兩個(gè)參數(shù)分表控制讀取topic消費(fèi)策略和是否提交offset。這里的earliest會從topic中現(xiàn)存最早的數(shù)據(jù)開始消費(fèi),latest是最新的位置開始消費(fèi)。
當(dāng)重啟程序時(shí),這兩種消費(fèi)模式又被enable.auto.commit控制,設(shè)置true提交offset時(shí),earliest和latest不再生效,都是從消費(fèi)組記錄的offset進(jìn)行消費(fèi)。設(shè)置為false不提交offset,offset不被提交記錄earliest還是從topic中現(xiàn)存最早的數(shù)據(jù)開始消費(fèi),latest還是從最新的數(shù)據(jù)消費(fèi)。
最后就是設(shè)置要讀取的topic和創(chuàng)建Kafka的DStream數(shù)據(jù)流。至此,整個(gè)數(shù)據(jù)源的讀取就已經(jīng)完成了,下面就是對數(shù)據(jù)處理邏輯的開發(fā)。
3. 指標(biāo)聚合計(jì)算
代碼語言:scala
復(fù)制
stream.map(x => { val s = x.split(",") (s(0), (s(2).toInt, 1)) }).reduceByKey((x, y) => (x._1 + y._1, x._2 + y._2)) .reduceByKeyAndWindow((x: (Int, Int), y: (Int, Int)) => (x._1 + y._1, x._2 + y._2), Seconds(30), Seconds(10)) .foreachRDD(rdd => { rdd.foreach(x => { val id = x._1 val responseTimes = x._2._1 val num = x._2._2 val responseTime_avg = responseTimes / num println(id, responseTime_avg) }) })
我們從自身需求出發(fā),來構(gòu)思程序邏輯的開發(fā)。從需求看,關(guān)鍵字無非是最近一段時(shí)間內(nèi)、平均值。想要取一段時(shí)間內(nèi)的數(shù)據(jù),就要使用滑動窗口,以當(dāng)前時(shí)間為基準(zhǔn),向前圈定時(shí)間范圍。
而平均值,無非就是將時(shí)間范圍內(nèi),即窗口所有的響應(yīng)時(shí)間加起來,然后除以數(shù)據(jù)條數(shù)即可。想要把所有的響應(yīng)時(shí)間加起來,這里使用reduceByKey() 將窗口內(nèi)相同ID的設(shè)備時(shí)間相加,將數(shù)據(jù)條數(shù)進(jìn)行相加。
所以我在第一步切分?jǐn)?shù)據(jù)的時(shí)候,就將數(shù)據(jù)切分成KV的元組形式,V有兩個(gè)字段,第一個(gè)是響應(yīng)時(shí)間,第二個(gè)1表示一條數(shù)據(jù)。reduceByKey一共分為兩步,第一是RDD內(nèi)的reduceByKey,這也算是數(shù)據(jù)的預(yù)處理,RDD的數(shù)據(jù)只會計(jì)算一次,當(dāng)這個(gè)RDD被多個(gè)窗口使用,就不會重復(fù)計(jì)算了。第二步是基于窗口的reduceByKey,將窗口所有RDD的數(shù)據(jù)再一次聚合,最后在foreachRDD中獲取輸出
4. 驗(yàn)證結(jié)果
我們向kafka的evt_monitor這個(gè)topic中寫入數(shù)據(jù)。
備注:(最后11那個(gè)id是終端顯示問題,其實(shí)是1),然后可以輸出平均值。
驗(yàn)證結(jié)果是沒有問題的,換個(gè)角度,我們也可以從DAG來看。
這個(gè)窗口一共計(jì)算了3個(gè)RDD,其中左側(cè)的兩個(gè)是灰色的,上面是skipped標(biāo)識,代表著這兩個(gè)RDD在上一個(gè)窗口已經(jīng)計(jì)算完成了,在這個(gè)窗口只需要計(jì)算當(dāng)前的RDD,然后再一起對RDD的結(jié)果數(shù)據(jù)進(jìn)行窗口計(jì)算。
結(jié)語
本篇文章主要是利用Spark的滑動窗口,做了一個(gè)計(jì)算平均響應(yīng)時(shí)長的應(yīng)用場景,以Kafka作為數(shù)據(jù)源、通過滑動窗口和reduceByKey算子得以實(shí)現(xiàn)。同時(shí),開發(fā)Spark還是強(qiáng)烈推薦scala,整個(gè)程序看起來沒有任何多余的部分。
最后對于Spark和Flink的選型看法,Spark的確是在實(shí)時(shí)性上比Flink差一些,但是Spark對于窗口計(jì)算還是有優(yōu)勢的。所以對于每種技術(shù),也不用人云亦云,適合自己的才是最好的。
審核編輯 黃宇
-
RDD
+關(guān)注
關(guān)注
0文章
7瀏覽量
7997 -
實(shí)時(shí)監(jiān)控
+關(guān)注
關(guān)注
1文章
93瀏覽量
13678 -
SPARK
+關(guān)注
關(guān)注
1文章
105瀏覽量
19983
發(fā)布評論請先 登錄
相關(guān)推薦
輸電線路防外破防異物實(shí)時(shí)監(jiān)控預(yù)警裝置|場景模型真實(shí)還原|測距誤差在0.25米內(nèi)
spark為什么比mapreduce快?
監(jiān)控系統(tǒng)原理揭秘-數(shù)據(jù)運(yùn)算篇
![<b class='flag-5'>監(jiān)控</b>系統(tǒng)原理揭秘-數(shù)據(jù)運(yùn)算篇](https://file1.elecfans.com//web2/M00/01/A4/wKgaomaxiq6Ac8QHAABo6GSpJIA631.png)
30元如何實(shí)現(xiàn)車輛防后撞
![<b class='flag-5'>30</b>元如何<b class='flag-5'>實(shí)現(xiàn)</b>車輛防后撞](https://file1.elecfans.com/web2/M00/FF/F5/wKgaomam89OAeScPAACkNKyNpNA514.png)
spark運(yùn)行的基本流程
![<b class='flag-5'>spark</b>運(yùn)行的基本流程](https://file1.elecfans.com//web2/M00/F7/E3/wKgaomaDZoeAbeQsAAGiJ2LbwGw750.png)
Spark基于DPU的Native引擎算子卸載方案
![<b class='flag-5'>Spark</b>基于DPU的Native引擎算子卸載方案](https://file1.elecfans.com/web2/M00/F5/20/wKgZomZ-fKiAAsUcAAFEwmESLqQ755.png)
上位監(jiān)控程序如何實(shí)現(xiàn)
存內(nèi)計(jì)算原理分類——數(shù)字存內(nèi)計(jì)算與模擬存內(nèi)計(jì)算
![存<b class='flag-5'>內(nèi)</b><b class='flag-5'>計(jì)算</b>原理分類——數(shù)字存<b class='flag-5'>內(nèi)</b><b class='flag-5'>計(jì)算</b>與模擬存<b class='flag-5'>內(nèi)</b><b class='flag-5'>計(jì)算</b>](https://file1.elecfans.com/web2/M00/E6/66/wKgZomZG_NWAIoJ9ABRwhzlRrNg851.png)
從MRAM的演進(jìn)看存內(nèi)計(jì)算的發(fā)展
![<b class='flag-5'>從</b>MRAM的演進(jìn)看存<b class='flag-5'>內(nèi)</b><b class='flag-5'>計(jì)算</b>的發(fā)展](https://file1.elecfans.com/web2/M00/E6/12/wKgZomZF7M6AHAXwAAUS9x_2Gho313.png)
淺談存內(nèi)計(jì)算生態(tài)環(huán)境搭建以及軟件開發(fā)
存內(nèi)計(jì)算WTM2101編譯工具鏈 資料
Spark基于DPU Snappy壓縮算法的異構(gòu)加速方案
![<b class='flag-5'>Spark</b>基于DPU Snappy壓縮算法的異構(gòu)加速方案](https://file1.elecfans.com/web2/M00/C5/B1/wKgZomYBSXOAHfkiAAL8llhFqoo690.png)
RDMA技術(shù)在Apache Spark中的應(yīng)用
![RDMA技術(shù)在Apache <b class='flag-5'>Spark</b>中的應(yīng)用](https://file1.elecfans.com/web2/M00/C3/6A/wKgaomXlNumAD4qkAAKXD6F2opA173.png)
基于DPU和HADOS-RACE加速Spark 3.x
![基于DPU和HADOS-RACE加速<b class='flag-5'>Spark</b> 3.x](https://file1.elecfans.com//web2/M00/C1/12/wKgZomXcZN2AYm7rAAIEZvTT08A684.png)
評論