欧美性猛交xxxx免费看_牛牛在线视频国产免费_天堂草原电视剧在线观看免费_国产粉嫩高清在线观看_国产欧美日本亚洲精品一5区

0
  • 聊天消息
  • 系統(tǒng)消息
  • 評論與回復
登錄后你可以
  • 下載海量資料
  • 學習在線課程
  • 觀看技術視頻
  • 寫文章/發(fā)帖/加入社區(qū)
會員中心
創(chuàng)作中心

完善資料讓更多小伙伴認識你,還能領取20積分哦,立即完善>

3天內不再提示

Kafka高性能背后的技術原理

小林coding ? 來源:君哥聊技術 ? 2024-10-23 09:37 ? 次閱讀

以下文章來源于君哥聊技術,作者朱晉君

Kafka 是一款性能非常優(yōu)秀的消息隊列,每秒處理的消息體量可以達到千萬級別。

今天來聊一聊 Kafka 高性能背后的技術原理,也是面試常問的一個知識考點。

1 批量發(fā)送

Kafka 收發(fā)消息都是批量進行處理的。我們看一下 Kafka 生產者發(fā)送消息的代碼:

privateFuturedoSend(ProducerRecordrecord,Callbackcallback){
TopicPartitiontp=null;
try{
//省略前面代碼
CallbackinterceptCallback=newInterceptorCallback<>(callback,this.interceptors,tp);
//把消息追加到之前緩存的這一批消息上
RecordAccumulator.RecordAppendResultresult=accumulator.append(tp,timestamp,serializedKey,
serializedValue,headers,interceptCallback,remainingWaitMs);
//積累到設置的緩存大小,則發(fā)送出去
if(result.batchIsFull||result.newBatchCreated){
log.trace("Wakingupthesendersincetopic{}partition{}iseitherfullorgettinganewbatch",record.topic(),partition);
this.sender.wakeup();
}
returnresult.future;
//handlingexceptionsandrecordtheerrors;
//forAPIexceptionsreturntheminthefuture,
//forotherexceptionsthrowdirectly
}catch/**省略catch代碼*/
}

從代碼中可以看到,生產者調用 doSend 方法后,并不會直接把消息發(fā)送出去,而是把消息緩存起來,緩存消息量達到配置的批量大小后,才會發(fā)送出去。

注意:從上面 accumulator.append 代碼可以看到,一批消息屬于同一個 topic 下面的同一個 partition。

Broker 收到消息后,并不會把批量消息解析成單條消息后落盤,而是作為批量消息進行落盤,同時也會把批量消息直接同步給其他副本。

消費者拉取消息,也不會按照單條進行拉取,而是按照批量進行拉取,拉取到一批消息后,再解析成單條消息進行消費。

使用批量收發(fā)消息,減輕了客戶端和 Broker 的交互次數(shù),提升了 Broker 處理能力。

2 消息壓縮

如果消息體比較大,Kafka 消息吞吐量要達到千萬級別,網卡支持的網絡傳輸帶寬會是一個瓶頸。Kafka 的解決方案是消息壓縮。發(fā)送消息時,如果增加參數(shù) compression.type,就可以開啟消息壓縮:

publicstaticvoidmain(String[]args){
Propertiesprops=newProperties();
props.put("bootstrap.servers","localhost:9092");
props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
//開啟消息壓縮
props.put("compression.type","gzip");
Producerproducer=newKafkaProducer<>(props);

ProducerRecordrecord=newProducerRecord<>("my_topic","key1","value1");

producer.send(record,newCallback(){
@Override
publicvoidonCompletion(RecordMetadatametadata,Exceptionexception){
if(exception!=null){
logger.error("sendingmessage error:", e);
}else{
logger.info("sendingmessage successful, Offset:", metadata.offset());
}
}
});

producer.close();
}

如果 compression.type 的值設置為 none,則不開啟壓縮。那消息是在什么時候進行壓縮呢?前面提到過,生產者緩存一批消息后才會發(fā)送,在發(fā)送這批消息之前就會進行壓縮,代碼如下:

publicRecordAppendResultappend(TopicPartitiontp,
longtimestamp,
byte[]key,
byte[]value,
Header[]headers,
Callbackcallback,
longmaxTimeToBlock)throwsInterruptedException{
//...
try{
//...
buffer=free.allocate(size,maxTimeToBlock);
synchronized(dq){
//...
RecordAppendResultappendResult=tryAppend(timestamp,key,value,headers,callback,dq);
if(appendResult!=null){
//Somebodyelsefoundusabatch,returntheonewewaitedfor!Hopefullythisdoesn'thappenoften...
returnappendResult;
}
//這批消息緩存已滿,這里進行壓縮
MemoryRecordsBuilderrecordsBuilder=recordsBuilder(buffer,maxUsableMagic);
ProducerBatchbatch=newProducerBatch(tp,recordsBuilder,time.milliseconds());
FutureRecordMetadatafuture=Utils.notNull(batch.tryAppend(timestamp,key,value,headers,callback,time.milliseconds()));

dq.addLast(batch);
incomplete.add(batch);

//Don'tdeallocatethisbufferinthefinallyblockasit'sbeingusedintherecordbatch
buffer=null;

returnnewRecordAppendResult(future,dq.size()>1||batch.isFull(),true);
}
}finally{
if(buffer!=null)
free.deallocate(buffer);
appendsInProgress.decrementAndGet();
}
}

上面的 recordsBuilder 方法最終調用了下面 MemoryRecordsBuilder 的構造方法。

publicMemoryRecordsBuilder(ByteBufferOutputStreambufferStream,
bytemagic,
CompressionTypecompressionType,
TimestampTypetimestampType,
longbaseOffset,
longlogAppendTime,
longproducerId,
shortproducerEpoch,
intbaseSequence,
booleanisTransactional,
booleanisControlBatch,
intpartitionLeaderEpoch,
intwriteLimit){
//省略其他代碼
this.appendStream=newDataOutputStream(compressionType.wrapForOutput(this.bufferStream,magic));
}

上面的 wrapForOutput 方法會根據(jù)配置的壓縮算法進行壓縮或者選擇不壓縮。目前 Kafka 支持的壓縮算法包括:gzip、snappy、lz4,從 2.1.0 版本開始,Kafka 支持 Zstandard 算法。

在 Broker 端,會解壓 header 做一些校驗,但不會解壓消息體。消息體的解壓是在消費端,消費者拉取到一批消息后,首先會進行解壓,然后進行消息處理。

因為壓縮和解壓都是耗費 CPU 的操作,所以在開啟消息壓縮時,也要考慮生產者和消費者的 CPU 資源情況。

有了消息批量收集和壓縮,kafka 生產者發(fā)送消息的過程如下圖:

33f1d514-90dd-11ef-a511-92fbcf53809c.png

3 磁盤順序讀寫

順序讀寫省去了尋址的時間,只要一次尋址,就可以連續(xù)讀寫。

在固態(tài)硬盤上,順序讀寫的性能是隨機讀寫的好幾倍。而在機械硬盤上,尋址時需要移動磁頭,這個機械運動會花費很多時間,因此機械硬盤的順序讀寫性能是隨機讀寫的幾十倍。

Kafka 的 Broker 在寫消息數(shù)據(jù)時,首先為每個 Partition 創(chuàng)建一個文件,然后把數(shù)據(jù)順序地追加到該文件對應的磁盤空間中,如果這個文件寫滿了,就再創(chuàng)建一個新文件繼續(xù)追加寫。這樣大大減少了尋址時間,提高了讀寫性能。

4 PageCache

Linux 系統(tǒng)中,所有文件 IO 操作都要通過 PageCache,PageCache 是磁盤文件在內存中建立的緩存。當應用程序讀寫文件時,并不會直接讀寫磁盤上的文件,而是操作 PageCache。

33fa2b24-90dd-11ef-a511-92fbcf53809c.png

應用程序寫文件時,都先會把數(shù)據(jù)寫入 PageCache,然后操作系統(tǒng)定期地將 PageCache 的數(shù)據(jù)寫到磁盤上。如下圖:

34021e6a-90dd-11ef-a511-92fbcf53809c.png

而應用程序在讀取文件數(shù)據(jù)時,首先會判斷數(shù)據(jù)是否在 PageCache 中,如果在則直接讀取,如果不在,則讀取磁盤,并且將數(shù)據(jù)緩存到 PageCache。

3409b486-90dd-11ef-a511-92fbcf53809c.png

Kafka 充分利用了 PageCache 的優(yōu)勢,當生產者生產消息的速率和消費者消費消息的速率差不多時,Kafka 基本可以不用落盤就能完成消息的傳輸。

5 零拷貝

Kafka Broker 將消息發(fā)送給消費端時,即使命中了 PageCache,也需要將 PageCache 中的數(shù)據(jù)先復制到應用程序的內存空間,然后從應用程序的內存空間復制到 Socket 緩存區(qū),將數(shù)據(jù)發(fā)送出去。如下圖:

341c0708-90dd-11ef-a511-92fbcf53809c.png

Kafka 采用了零拷貝技術把數(shù)據(jù)直接從 PageCache 復制到 Socket 緩沖區(qū)中,這樣數(shù)據(jù)不用復制到用戶態(tài)的內存空間,同時 DMA 控制器直接完成數(shù)據(jù)復制,不需要 CPU 參與。如下圖:

34240c00-90dd-11ef-a511-92fbcf53809c.png

Java 零拷貝技術采用 FileChannel.transferTo() 方法,底層調用了 sendfile 方法。

6 mmap

Kafka 的日志文件分為數(shù)據(jù)文件(.log)和索引文件(.index),Kafka 為了提高索引文件的讀取性能,對索引文件采用了 mmap 內存映射,將索引文件映射到進程的內存空間,這樣讀取索引文件就不需要從磁盤進行讀取。如下圖:

3440d9d4-90dd-11ef-a511-92fbcf53809c.png

7 總結

本文介紹了 Kafka 實現(xiàn)高性能用到的關鍵技術,這些技術可以為我們學習和工作提供參考。

聲明:本文內容及配圖由入駐作者撰寫或者入駐合作網站授權轉載。文章觀點僅代表作者本人,不代表電子發(fā)燒友網立場。文章及其配圖僅供工程師學習之用,如有內容侵權或者其他違規(guī)問題,請聯(lián)系本站處理。 舉報投訴
  • 代碼
    +關注

    關注

    30

    文章

    4830

    瀏覽量

    69110
  • 消息隊列
    +關注

    關注

    0

    文章

    33

    瀏覽量

    3017
  • kafka
    +關注

    關注

    0

    文章

    52

    瀏覽量

    5245

原文標題:面試官:你說說 Kafka 為什么是高性能的?

文章出處:【微信號:小林coding,微信公眾號:小林coding】歡迎添加關注!文章轉載請注明出處。

收藏 人收藏

    評論

    相關推薦

    泰克30+GHz高性能示波器的關鍵技術

    泰克公司最近宣布首款經驗證采用 IBM 8HP 硅鍺 (SiGe) BiCMOS 特殊工藝技術設計的新型示波器平臺ASIC各項技術指標優(yōu)于規(guī)定要求,實現(xiàn)了新型高性能示波器的設計目標,使多通道帶寬達
    發(fā)表于 07-24 07:47

    基于閃存存儲的Apache Kafka性能提升方法

    作者:Dennis Lattka我是美光科技的首席存儲解決方案工程師Dennis Lattka。這個頭銜的真正含義是,我要致力于確定如何利用閃存存儲改善工作負載應用的性能和結果。為此,我決定對大數(shù)
    發(fā)表于 07-24 06:58

    基于發(fā)布與訂閱的消息系統(tǒng)Kafka

    Kafka權威指南》——初識 Kafka
    發(fā)表于 03-05 13:46

    Kafka基礎入門文檔

    kafka系統(tǒng)入門教程(原理、配置、集群搭建、Java應用、Kafka-manager)
    發(fā)表于 03-12 07:22

    Kafka集群環(huán)境的搭建

    1、環(huán)境版本版本:kafka2.11,zookeeper3.4注意:這里zookeeper3.4也是基于集群模式部署。2、解壓重命名tar -zxvf
    發(fā)表于 01-05 17:55

    Kafka文件存儲機制分析

    機制設計是衡量一個消息隊列服務技術水平和最關鍵指標之一。 《br》 下面將從Kafka文件存儲機制和物理結
    發(fā)表于 09-28 15:40 ?0次下載

    大數(shù)據(jù)開發(fā)最火技術Kafka背后的“黑科技”

    、低延遲等方面有很突出的表現(xiàn)。這篇文章不同于其他介紹Kafka使用或實現(xiàn)的文章,只是談談Kafka用了什么“黑科技”使他在性能方面有這么突出的表現(xiàn)。消息順序寫入磁盤磁盤大多數(shù)都還是機械結構(SSD不在
    的頭像 發(fā)表于 10-22 17:53 ?1024次閱讀
    大數(shù)據(jù)開發(fā)最火<b class='flag-5'>技術</b><b class='flag-5'>Kafka</b><b class='flag-5'>背后</b>的“黑科技”

    Kafka的概念及Kafka的宕機

    問題要從一次Kafka的宕機開始說起。 筆者所在的是一家金融科技公司,但公司內部并沒有采用在金融支付領域更為流行的 RabbitMQ ,而是采用了設計之初就為日志處理而生的 Kafka ,所以我一直
    的頭像 發(fā)表于 08-27 11:21 ?2171次閱讀
    <b class='flag-5'>Kafka</b>的概念及<b class='flag-5'>Kafka</b>的宕機

    Kafka如何做到那么高的性能

    有人說:他曾在一臺配置較好的機子上對 Kafka 進行性能壓測,壓測結果是 Kafka 單個節(jié)點的極限處理能力接近每秒 2000萬 條消息,吞吐量達到每秒 600MB。
    的頭像 發(fā)表于 09-14 17:03 ?1107次閱讀

    Kafka 的簡介

    ,即使對TB級以上數(shù)據(jù)也能保證常數(shù)時間的訪問性能 高吞吐率。即使在非常廉價的機器上也能做到單機支持每秒100K條消息的傳輸 支持Kafka Server間的消息分區(qū),及分布式消費,同時保證每個
    的頭像 發(fā)表于 07-03 11:10 ?675次閱讀
    <b class='flag-5'>Kafka</b> 的簡介

    物通博聯(lián)5G-kafka工業(yè)網關實現(xiàn)kafka協(xié)議對接到云平臺

    Kafka協(xié)議是一種基于TCP層的網絡協(xié)議,用于在分布式消息傳遞系統(tǒng)Apache Kafka中發(fā)送和接收消息。Kafka協(xié)議定義了客戶端和服務器之間的通信方式和數(shù)據(jù)格式,允許客戶端發(fā)送消息到K
    的頭像 發(fā)表于 07-11 10:44 ?550次閱讀

    Kafka中學習高性能系統(tǒng)如何設計

    相信各位小伙伴之前或多或少接觸過消息隊列,比較知名的包含 Rocket MQ 和 Kafka,在京東內部使用的是自研的消息中間件 JMQ,從 JMQ2 升級到 JMQ4 的也是帶來了性能上的明顯提升,并且 JMQ4 的底層也是參考 Ka
    的頭像 發(fā)表于 07-17 11:25 ?622次閱讀
    從<b class='flag-5'>Kafka</b>中學習<b class='flag-5'>高性能</b>系統(tǒng)如何設計

    Kafka架構技術Kafka的架構和客戶端API設計

    Kafka 給自己的定位是事件流平臺(event stream platform)。因此在消息隊列中經常使用的 "消息"一詞,在 Kafka 中被稱為 "事件"。
    的頭像 發(fā)表于 10-10 15:41 ?2450次閱讀
    <b class='flag-5'>Kafka</b>架構<b class='flag-5'>技術</b>:<b class='flag-5'>Kafka</b>的架構和客戶端API設計

    golang中使用kafka的綜合指南

    kafka是一個比較流行的分布式、可拓展、高性能、可靠的流處理平臺。在處理kafka的數(shù)據(jù)時,這里有確保處理效率和可靠性的多種最佳實踐。本文將介紹這幾種實踐方式,并通過sarama實現(xiàn)他們。
    的頭像 發(fā)表于 11-30 11:18 ?640次閱讀

    華為云 FlexusX 實例下的 Kafka 集群部署實踐與性能優(yōu)化

    前言 華為云 FlexusX 實例,以創(chuàng)新的柔性算力技術,為 Kafka 集群部署帶來前所未有的性能飛躍。其靈活的 CPU 與內存配比,結合智能調度與加速技術,讓
    的頭像 發(fā)表于 01-07 17:23 ?192次閱讀
    華為云 FlexusX 實例下的 <b class='flag-5'>Kafka</b> 集群部署實踐與<b class='flag-5'>性能</b>優(yōu)化