以下文章來源于君哥聊技術,作者朱晉君
Kafka 是一款性能非常優秀的消息隊列,每秒處理的消息體量可以達到千萬級別。
今天來聊一聊 Kafka 高性能背后的技術原理,也是面試常問的一個知識考點。
1 批量發送
Kafka 收發消息都是批量進行處理的。我們看一下 Kafka 生產者發送消息的代碼:
privateFuturedoSend(ProducerRecord record,Callbackcallback){ TopicPartitiontp=null; try{ //省略前面代碼 CallbackinterceptCallback=newInterceptorCallback<>(callback,this.interceptors,tp); //把消息追加到之前緩存的這一批消息上 RecordAccumulator.RecordAppendResultresult=accumulator.append(tp,timestamp,serializedKey, serializedValue,headers,interceptCallback,remainingWaitMs); //積累到設置的緩存大小,則發送出去 if(result.batchIsFull||result.newBatchCreated){ log.trace("Wakingupthesendersincetopic{}partition{}iseitherfullorgettinganewbatch",record.topic(),partition); this.sender.wakeup(); } returnresult.future; //handlingexceptionsandrecordtheerrors; //forAPIexceptionsreturntheminthefuture, //forotherexceptionsthrowdirectly }catch/**省略catch代碼*/ }
從代碼中可以看到,生產者調用 doSend 方法后,并不會直接把消息發送出去,而是把消息緩存起來,緩存消息量達到配置的批量大小后,才會發送出去。
注意:從上面 accumulator.append 代碼可以看到,一批消息屬于同一個 topic 下面的同一個 partition。
Broker 收到消息后,并不會把批量消息解析成單條消息后落盤,而是作為批量消息進行落盤,同時也會把批量消息直接同步給其他副本。
消費者拉取消息,也不會按照單條進行拉取,而是按照批量進行拉取,拉取到一批消息后,再解析成單條消息進行消費。
使用批量收發消息,減輕了客戶端和 Broker 的交互次數,提升了 Broker 處理能力。
2 消息壓縮
如果消息體比較大,Kafka 消息吞吐量要達到千萬級別,網卡支持的網絡傳輸帶寬會是一個瓶頸。Kafka 的解決方案是消息壓縮。發送消息時,如果增加參數 compression.type,就可以開啟消息壓縮:
publicstaticvoidmain(String[]args){ Propertiesprops=newProperties(); props.put("bootstrap.servers","localhost:9092"); props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer"); //開啟消息壓縮 props.put("compression.type","gzip"); Producerproducer=newKafkaProducer<>(props); ProducerRecord record=newProducerRecord<>("my_topic","key1","value1"); producer.send(record,newCallback(){ @Override publicvoidonCompletion(RecordMetadatametadata,Exceptionexception){ if(exception!=null){ logger.error("sendingmessage error:", e); }else{ logger.info("sendingmessage successful, Offset:", metadata.offset()); } } }); producer.close(); }
如果 compression.type 的值設置為 none,則不開啟壓縮。那消息是在什么時候進行壓縮呢?前面提到過,生產者緩存一批消息后才會發送,在發送這批消息之前就會進行壓縮,代碼如下:
publicRecordAppendResultappend(TopicPartitiontp, longtimestamp, byte[]key, byte[]value, Header[]headers, Callbackcallback, longmaxTimeToBlock)throwsInterruptedException{ //... try{ //... buffer=free.allocate(size,maxTimeToBlock); synchronized(dq){ //... RecordAppendResultappendResult=tryAppend(timestamp,key,value,headers,callback,dq); if(appendResult!=null){ //Somebodyelsefoundusabatch,returntheonewewaitedfor!Hopefullythisdoesn'thappenoften... returnappendResult; } //這批消息緩存已滿,這里進行壓縮 MemoryRecordsBuilderrecordsBuilder=recordsBuilder(buffer,maxUsableMagic); ProducerBatchbatch=newProducerBatch(tp,recordsBuilder,time.milliseconds()); FutureRecordMetadatafuture=Utils.notNull(batch.tryAppend(timestamp,key,value,headers,callback,time.milliseconds())); dq.addLast(batch); incomplete.add(batch); //Don'tdeallocatethisbufferinthefinallyblockasit'sbeingusedintherecordbatch buffer=null; returnnewRecordAppendResult(future,dq.size()>1||batch.isFull(),true); } }finally{ if(buffer!=null) free.deallocate(buffer); appendsInProgress.decrementAndGet(); } }
上面的 recordsBuilder 方法最終調用了下面 MemoryRecordsBuilder 的構造方法。
publicMemoryRecordsBuilder(ByteBufferOutputStreambufferStream, bytemagic, CompressionTypecompressionType, TimestampTypetimestampType, longbaseOffset, longlogAppendTime, longproducerId, shortproducerEpoch, intbaseSequence, booleanisTransactional, booleanisControlBatch, intpartitionLeaderEpoch, intwriteLimit){ //省略其他代碼 this.appendStream=newDataOutputStream(compressionType.wrapForOutput(this.bufferStream,magic)); }
上面的 wrapForOutput 方法會根據配置的壓縮算法進行壓縮或者選擇不壓縮。目前 Kafka 支持的壓縮算法包括:gzip、snappy、lz4,從 2.1.0 版本開始,Kafka 支持 Zstandard 算法。
在 Broker 端,會解壓 header 做一些校驗,但不會解壓消息體。消息體的解壓是在消費端,消費者拉取到一批消息后,首先會進行解壓,然后進行消息處理。
因為壓縮和解壓都是耗費 CPU 的操作,所以在開啟消息壓縮時,也要考慮生產者和消費者的 CPU 資源情況。
有了消息批量收集和壓縮,kafka 生產者發送消息的過程如下圖:
3 磁盤順序讀寫
順序讀寫省去了尋址的時間,只要一次尋址,就可以連續讀寫。
在固態硬盤上,順序讀寫的性能是隨機讀寫的好幾倍。而在機械硬盤上,尋址時需要移動磁頭,這個機械運動會花費很多時間,因此機械硬盤的順序讀寫性能是隨機讀寫的幾十倍。
Kafka 的 Broker 在寫消息數據時,首先為每個 Partition 創建一個文件,然后把數據順序地追加到該文件對應的磁盤空間中,如果這個文件寫滿了,就再創建一個新文件繼續追加寫。這樣大大減少了尋址時間,提高了讀寫性能。
4 PageCache
在 Linux 系統中,所有文件 IO 操作都要通過 PageCache,PageCache 是磁盤文件在內存中建立的緩存。當應用程序讀寫文件時,并不會直接讀寫磁盤上的文件,而是操作 PageCache。
應用程序寫文件時,都先會把數據寫入 PageCache,然后操作系統定期地將 PageCache 的數據寫到磁盤上。如下圖:
而應用程序在讀取文件數據時,首先會判斷數據是否在 PageCache 中,如果在則直接讀取,如果不在,則讀取磁盤,并且將數據緩存到 PageCache。
Kafka 充分利用了 PageCache 的優勢,當生產者生產消息的速率和消費者消費消息的速率差不多時,Kafka 基本可以不用落盤就能完成消息的傳輸。
5 零拷貝
Kafka Broker 將消息發送給消費端時,即使命中了 PageCache,也需要將 PageCache 中的數據先復制到應用程序的內存空間,然后從應用程序的內存空間復制到 Socket 緩存區,將數據發送出去。如下圖:
Kafka 采用了零拷貝技術把數據直接從 PageCache 復制到 Socket 緩沖區中,這樣數據不用復制到用戶態的內存空間,同時 DMA 控制器直接完成數據復制,不需要 CPU 參與。如下圖:
Java 零拷貝技術采用 FileChannel.transferTo() 方法,底層調用了 sendfile 方法。
6 mmap
Kafka 的日志文件分為數據文件(.log)和索引文件(.index),Kafka 為了提高索引文件的讀取性能,對索引文件采用了 mmap 內存映射,將索引文件映射到進程的內存空間,這樣讀取索引文件就不需要從磁盤進行讀取。如下圖:
7 總結
本文介紹了 Kafka 實現高性能用到的關鍵技術,這些技術可以為我們學習和工作提供參考。
-
代碼
+關注
關注
30文章
4857瀏覽量
69527 -
消息隊列
+關注
關注
0文章
33瀏覽量
3024 -
kafka
+關注
關注
0文章
52瀏覽量
5269
原文標題:面試官:你說說 Kafka 為什么是高性能的?
文章出處:【微信號:小林coding,微信公眾號:小林coding】歡迎添加關注!文章轉載請注明出處。
發布評論請先 登錄
相關推薦
泰克30+GHz高性能示波器的關鍵技術
基于閃存存儲的Apache Kafka性能提升方法
Kafka集群環境的搭建
大數據開發最火技術Kafka背后的“黑科技”

Kafka的概念及Kafka的宕機

Kafka如何做到那么高的性能
Kafka 的簡介

物通博聯5G-kafka工業網關實現kafka協議對接到云平臺
從Kafka中學習高性能系統如何設計

Kafka架構技術:Kafka的架構和客戶端API設計

golang中使用kafka的綜合指南
華為云 FlexusX 實例下的 Kafka 集群部署實踐與性能優化

評論