以下文章來源于君哥聊技術,作者朱晉君
Kafka 是一款性能非常優(yōu)秀的消息隊列,每秒處理的消息體量可以達到千萬級別。
今天來聊一聊 Kafka 高性能背后的技術原理,也是面試常問的一個知識考點。
1 批量發(fā)送
Kafka 收發(fā)消息都是批量進行處理的。我們看一下 Kafka 生產者發(fā)送消息的代碼:
privateFuturedoSend(ProducerRecord record,Callbackcallback){ TopicPartitiontp=null; try{ //省略前面代碼 CallbackinterceptCallback=newInterceptorCallback<>(callback,this.interceptors,tp); //把消息追加到之前緩存的這一批消息上 RecordAccumulator.RecordAppendResultresult=accumulator.append(tp,timestamp,serializedKey, serializedValue,headers,interceptCallback,remainingWaitMs); //積累到設置的緩存大小,則發(fā)送出去 if(result.batchIsFull||result.newBatchCreated){ log.trace("Wakingupthesendersincetopic{}partition{}iseitherfullorgettinganewbatch",record.topic(),partition); this.sender.wakeup(); } returnresult.future; //handlingexceptionsandrecordtheerrors; //forAPIexceptionsreturntheminthefuture, //forotherexceptionsthrowdirectly }catch/**省略catch代碼*/ }
從代碼中可以看到,生產者調用 doSend 方法后,并不會直接把消息發(fā)送出去,而是把消息緩存起來,緩存消息量達到配置的批量大小后,才會發(fā)送出去。
注意:從上面 accumulator.append 代碼可以看到,一批消息屬于同一個 topic 下面的同一個 partition。
Broker 收到消息后,并不會把批量消息解析成單條消息后落盤,而是作為批量消息進行落盤,同時也會把批量消息直接同步給其他副本。
消費者拉取消息,也不會按照單條進行拉取,而是按照批量進行拉取,拉取到一批消息后,再解析成單條消息進行消費。
使用批量收發(fā)消息,減輕了客戶端和 Broker 的交互次數(shù),提升了 Broker 處理能力。
2 消息壓縮
如果消息體比較大,Kafka 消息吞吐量要達到千萬級別,網卡支持的網絡傳輸帶寬會是一個瓶頸。Kafka 的解決方案是消息壓縮。發(fā)送消息時,如果增加參數(shù) compression.type,就可以開啟消息壓縮:
publicstaticvoidmain(String[]args){ Propertiesprops=newProperties(); props.put("bootstrap.servers","localhost:9092"); props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer"); //開啟消息壓縮 props.put("compression.type","gzip"); Producerproducer=newKafkaProducer<>(props); ProducerRecord record=newProducerRecord<>("my_topic","key1","value1"); producer.send(record,newCallback(){ @Override publicvoidonCompletion(RecordMetadatametadata,Exceptionexception){ if(exception!=null){ logger.error("sendingmessage error:", e); }else{ logger.info("sendingmessage successful, Offset:", metadata.offset()); } } }); producer.close(); }
如果 compression.type 的值設置為 none,則不開啟壓縮。那消息是在什么時候進行壓縮呢?前面提到過,生產者緩存一批消息后才會發(fā)送,在發(fā)送這批消息之前就會進行壓縮,代碼如下:
publicRecordAppendResultappend(TopicPartitiontp, longtimestamp, byte[]key, byte[]value, Header[]headers, Callbackcallback, longmaxTimeToBlock)throwsInterruptedException{ //... try{ //... buffer=free.allocate(size,maxTimeToBlock); synchronized(dq){ //... RecordAppendResultappendResult=tryAppend(timestamp,key,value,headers,callback,dq); if(appendResult!=null){ //Somebodyelsefoundusabatch,returntheonewewaitedfor!Hopefullythisdoesn'thappenoften... returnappendResult; } //這批消息緩存已滿,這里進行壓縮 MemoryRecordsBuilderrecordsBuilder=recordsBuilder(buffer,maxUsableMagic); ProducerBatchbatch=newProducerBatch(tp,recordsBuilder,time.milliseconds()); FutureRecordMetadatafuture=Utils.notNull(batch.tryAppend(timestamp,key,value,headers,callback,time.milliseconds())); dq.addLast(batch); incomplete.add(batch); //Don'tdeallocatethisbufferinthefinallyblockasit'sbeingusedintherecordbatch buffer=null; returnnewRecordAppendResult(future,dq.size()>1||batch.isFull(),true); } }finally{ if(buffer!=null) free.deallocate(buffer); appendsInProgress.decrementAndGet(); } }
上面的 recordsBuilder 方法最終調用了下面 MemoryRecordsBuilder 的構造方法。
publicMemoryRecordsBuilder(ByteBufferOutputStreambufferStream, bytemagic, CompressionTypecompressionType, TimestampTypetimestampType, longbaseOffset, longlogAppendTime, longproducerId, shortproducerEpoch, intbaseSequence, booleanisTransactional, booleanisControlBatch, intpartitionLeaderEpoch, intwriteLimit){ //省略其他代碼 this.appendStream=newDataOutputStream(compressionType.wrapForOutput(this.bufferStream,magic)); }
上面的 wrapForOutput 方法會根據(jù)配置的壓縮算法進行壓縮或者選擇不壓縮。目前 Kafka 支持的壓縮算法包括:gzip、snappy、lz4,從 2.1.0 版本開始,Kafka 支持 Zstandard 算法。
在 Broker 端,會解壓 header 做一些校驗,但不會解壓消息體。消息體的解壓是在消費端,消費者拉取到一批消息后,首先會進行解壓,然后進行消息處理。
因為壓縮和解壓都是耗費 CPU 的操作,所以在開啟消息壓縮時,也要考慮生產者和消費者的 CPU 資源情況。
有了消息批量收集和壓縮,kafka 生產者發(fā)送消息的過程如下圖:
3 磁盤順序讀寫
順序讀寫省去了尋址的時間,只要一次尋址,就可以連續(xù)讀寫。
在固態(tài)硬盤上,順序讀寫的性能是隨機讀寫的好幾倍。而在機械硬盤上,尋址時需要移動磁頭,這個機械運動會花費很多時間,因此機械硬盤的順序讀寫性能是隨機讀寫的幾十倍。
Kafka 的 Broker 在寫消息數(shù)據(jù)時,首先為每個 Partition 創(chuàng)建一個文件,然后把數(shù)據(jù)順序地追加到該文件對應的磁盤空間中,如果這個文件寫滿了,就再創(chuàng)建一個新文件繼續(xù)追加寫。這樣大大減少了尋址時間,提高了讀寫性能。
4 PageCache
在 Linux 系統(tǒng)中,所有文件 IO 操作都要通過 PageCache,PageCache 是磁盤文件在內存中建立的緩存。當應用程序讀寫文件時,并不會直接讀寫磁盤上的文件,而是操作 PageCache。
應用程序寫文件時,都先會把數(shù)據(jù)寫入 PageCache,然后操作系統(tǒng)定期地將 PageCache 的數(shù)據(jù)寫到磁盤上。如下圖:
而應用程序在讀取文件數(shù)據(jù)時,首先會判斷數(shù)據(jù)是否在 PageCache 中,如果在則直接讀取,如果不在,則讀取磁盤,并且將數(shù)據(jù)緩存到 PageCache。
Kafka 充分利用了 PageCache 的優(yōu)勢,當生產者生產消息的速率和消費者消費消息的速率差不多時,Kafka 基本可以不用落盤就能完成消息的傳輸。
5 零拷貝
Kafka Broker 將消息發(fā)送給消費端時,即使命中了 PageCache,也需要將 PageCache 中的數(shù)據(jù)先復制到應用程序的內存空間,然后從應用程序的內存空間復制到 Socket 緩存區(qū),將數(shù)據(jù)發(fā)送出去。如下圖:
Kafka 采用了零拷貝技術把數(shù)據(jù)直接從 PageCache 復制到 Socket 緩沖區(qū)中,這樣數(shù)據(jù)不用復制到用戶態(tài)的內存空間,同時 DMA 控制器直接完成數(shù)據(jù)復制,不需要 CPU 參與。如下圖:
Java 零拷貝技術采用 FileChannel.transferTo() 方法,底層調用了 sendfile 方法。
6 mmap
Kafka 的日志文件分為數(shù)據(jù)文件(.log)和索引文件(.index),Kafka 為了提高索引文件的讀取性能,對索引文件采用了 mmap 內存映射,將索引文件映射到進程的內存空間,這樣讀取索引文件就不需要從磁盤進行讀取。如下圖:
7 總結
本文介紹了 Kafka 實現(xiàn)高性能用到的關鍵技術,這些技術可以為我們學習和工作提供參考。
-
代碼
+關注
關注
30文章
4830瀏覽量
69110 -
消息隊列
+關注
關注
0文章
33瀏覽量
3017 -
kafka
+關注
關注
0文章
52瀏覽量
5245
原文標題:面試官:你說說 Kafka 為什么是高性能的?
文章出處:【微信號:小林coding,微信公眾號:小林coding】歡迎添加關注!文章轉載請注明出處。
發(fā)布評論請先 登錄
相關推薦
泰克30+GHz高性能示波器的關鍵技術
基于閃存存儲的Apache Kafka性能提升方法
Kafka集群環(huán)境的搭建
大數(shù)據(jù)開發(fā)最火技術Kafka背后的“黑科技”
![大數(shù)據(jù)開發(fā)最火<b class='flag-5'>技術</b><b class='flag-5'>Kafka</b><b class='flag-5'>背后</b>的“黑科技”](https://file.elecfans.com/web1/M00/AB/02/pIYBAF2uenCASzeQAADsdiZ-meA534.png)
Kafka的概念及Kafka的宕機
![<b class='flag-5'>Kafka</b>的概念及<b class='flag-5'>Kafka</b>的宕機](https://file.elecfans.com/web2/M00/12/67/pYYBAGEoW1-ATinwAAAh17nhnk8790.png)
Kafka如何做到那么高的性能
Kafka 的簡介
![<b class='flag-5'>Kafka</b> 的簡介](https://file1.elecfans.com/web2/M00/8B/E5/wKgaomSiPCWAZKnkAAAq8nWadM0221.png)
物通博聯(lián)5G-kafka工業(yè)網關實現(xiàn)kafka協(xié)議對接到云平臺
從Kafka中學習高性能系統(tǒng)如何設計
![從<b class='flag-5'>Kafka</b>中學習<b class='flag-5'>高性能</b>系統(tǒng)如何設計](https://file1.elecfans.com/web2/M00/8C/E3/wKgZomS0tdWALYEeAABW6ObHER4842.png)
Kafka架構技術:Kafka的架構和客戶端API設計
![<b class='flag-5'>Kafka</b>架構<b class='flag-5'>技術</b>:<b class='flag-5'>Kafka</b>的架構和客戶端API設計](https://file1.elecfans.com/web2/M00/A7/91/wKgaomUlAWiALB-jAABOA4mbRQk433.png)
golang中使用kafka的綜合指南
華為云 FlexusX 實例下的 Kafka 集群部署實踐與性能優(yōu)化
![華為云 FlexusX 實例下的 <b class='flag-5'>Kafka</b> 集群部署實踐與<b class='flag-5'>性能</b>優(yōu)化](https://file1.elecfans.com//web3/M00/05/20/wKgZPGd88qmAOKzdAAEA43MSbXE480.png)
評論