我們已經知道kafka的集群由n個broker組成,每個broker都是kafka的壹個實例或者壹個叫做Kafka的服務。實際上,控制者也是經紀人,控制者也被稱為領導經紀人。
除了壹般broker的功能,他還負責分區首領的選擇,也就是負責選舉分區的首領副本。
當kafka中的每個代理啟動時,將會實例化壹個KafkaController,並將代理的id註冊到zookeeper。在集群啟動過程中,將通過選舉機制選舉其中壹個代理作為領導者,也就是上面提到的控制器。
包括集群啟動在內,有三種情況會觸發控制器選舉:
1,集群啟動
2.控制器所在的代理出現故障。
3.zookeeper知道heartbeat,控制器和它自己的會話已經過期。
老規矩,先看圖。我們將根據下圖說明集群啟動時的控制器選舉過程。
假設這個集群有三個代理,它們同時啟動。
(1)三個代理從zookeeper獲得控制器臨時節點信息。/controller存儲選出的領導者信息。這是為了確認是否已經有壹個領導者。
(2)如果還沒有選舉領導人,那麽這個節點不存在,返回-1。如果返回的是領袖的json數據而不是-1,那麽領袖已經存在,選舉結束。
(3)三個broker發現返回-1,了解到目前沒有leader,所以都觸發將自己的信息寫入臨時節點/控制器。第壹個寫的人將成為領導者。
(4)假設broker 0最快,他先寫/controller節點,然後他就成了leader。可惜對於broker1和broker2來說,他們在寫/controller的過程中會拋出ZkNodeExistsException,也就是說zk告訴他們這個節點已經存在了。
經過以上四步,broker 0成功寫入/controller節點,其他broker寫入失敗,因此broker 0成功當選盟主。
此外,zk中還有壹個controller_epoch節點,存儲領導者的變化次數,初始值為0,以後每次領導者發生變化,這個值都會是+1。對控制器的所有請求都將攜帶該值。如果控制器與自身內存比較,請求值較小,說明kafka集群發生了新的選舉,這個請求過期無效。如果請求值大於控制器內存值,說明已經選舉了新的控制器,他已經退位,請求無效。Kafka通過controller_epoch保證了集群控制器的唯壹性和操作的壹致性。
於是,Kafka控制器的選舉就是看誰先得到/controller節點寫自己的信息。
控制器的初始化實際上是初始化控制器使用的組件和監聽器,準備元數據。
如前所述,每個代理將實例化並啟動壹個KafkaController。KafkaController及其組件關系,以及每個組件的介紹如下:
圖中箭頭表示組件的層次關系,其他組件將在組件下面初始化。可以看出控制器還是有些復雜,主要包括以下幾個部件:
1,ControllerContext,這個對象存儲了控制器需要的所有上下文信息,包括存活的代理,所有的主題和分區分配方案,AR,leader,ISR等每個分區的信息。
2,壹系列的監聽器,通過zookeeper的監控,觸發相應的操作,黃色方框都是監聽器。
3.分區和副本狀態機,管理分區和副本。
4.當前的代理選舉器ZookeeperLeaderElector具有用於高位和低位的相關回調方法。
5.PartitionLeaderSelector
6.主題刪除管理器,TopicDeletetionManager
7.ControllerBrokerRequestBatch用於領導者和代理之間的批量通信。緩存狀態機處理後生成的請求,然後統壹發出。
8.控制器平衡操作的KafkaScheduler僅在broker為leader時有效。
ZK中記錄了Kafka集群的壹些重要信息,如集群的所有代理節點、主題的所有分區、分區的副本信息(副本集、主副本、同步副本集)。每個代理都有壹個控制器。為了管理整個集群,卡夫卡選擇zk選舉模式,為整個集群選舉壹個“中央控制器”或“主控制器”。控制器實際上是壹個代理節點,除了壹般的代理功能之外,還具有選舉分區領導的功能。中央控制器管理所有節點的信息,並通過向ZK註冊各種監控事件來管理整個集群節點和分區的領導者的選舉和再平衡。外部事件會更新ZK的數據,壹旦ZK的數據發生變化,控制器會做不同的響應處理。
故障轉移實際上是首領所在的代理出現故障,首領轉移到另壹個代理。轉移的過程就是重新選舉領導人的過程。
領導者重新當選後,需要為代理註冊相應的權限,調用ZookeeperLeaderElector的onControllerFailover()方法。在這種方法中,壹系列組件被初始化並開始完成領導者的各種操作。具體如下,其實和控制器初始化有很大的相似性。
1.為分區管理註冊相關的偵聽器。
2.登記主題管理的相關監控。
3.註冊代理更改監聽程序
4.重新初始化控制器上下文。
5.啟動ControllerChannelManager,用於控制器和其他代理之間的通信。
6.創建用於刪除主題的TopicDeletionManager對象並啟動它。
7.啟動分區狀態機和副本狀態機。
8.輪詢每個主題,並添加PartitionModificationsListener來監視分區的變化。
9.如果設置了分區平衡計時操作,則會創建壹個分區平衡計時任務,默認情況下會在300秒內檢查並執行該任務。
除了啟動這些組件之外,在onControllerFailover方法中還完成了以下操作:
1,/controller_epoch值+1,並更新為ControllerContext。
2、檢查是否啟動分區重分配,並做相關操作。
3.檢查優先副本需要選擇為領導,並做相關操作。
4.向kafka集群中的所有代理發送更新元數據的請求。
讓我們看壹下取消領導者權限時調用的方法onControllerResignation。
1.在這種方法中,控制器的權限被取消。取消zookeeper中對分區和副本感知監聽器的監控。
2.關閉已啟動的組件。
3.最後,清除ControllerContext中記錄的控制器版本的數值,將當前代理設置為RunnignAsBroker,成為普通代理。
通過學習控制器的啟動過程,我們應該已經了解了卡夫卡的工作原理。核心是監控zookeeper的相關節點,並在節點發生變化時觸發相應的操作。
當壹個新的代理加入集群時,就說代理在線。相反,當代理關閉,集群被推出時,稱為代理脫機。
在線代理:
1.當新代理啟動時,將數據寫入/brokers/ids。
2.BrokerChangeListener監聽更改。呼叫ControllerChannelManager。將Broker()添加到新的在線節點,以完成新的在線代理網絡層的初始化。
3.打電話給KafkaController。onbrokerstartup()進行處理。
3.5恢復因新代理上線而暫停的刪除主題操作的線程。
代理離線:
1.找到脫機節點集。
2.輪詢下線節點並調用ControllerChannelManager。RemoveBroker()關閉每個下線節點的網絡連接。清空下線節點的消息隊列並關閉下線節點請求。
3.輪詢離線節點並調用Kafka控制器。onbrokerfailure進行處理。
4.向群集的所有幸存代理發送updateMetadataRequest。
顧名思義,協調員負責協調工作。本節提到的協調器是用來協調消費者的工作分配的。簡單來說就是消費者啟動後,正常消費前的這個階段的初始化工作。消費者的正常運轉完全取決於協調者。
有兩個主要協調員:
1.消費者協調員
2.小組協調員
卡夫卡對協調者的引入有其歷史過程。事實證明,消費者信息依賴於zookeeper的存儲。當代理商或消費者發生變化時,就會導致消費者平衡。這時候消費者不透明,每個消費者單獨和zookeeper溝通,容易造成羊群效應和腦分裂問題。
為了解決這些問題,卡夫卡引入了協調者。服務器引入壹個GroupCoordinator,消費者引入壹個ConsumerCoordinator。當每個代理啟動時,會創建壹個GroupCoordinator實例來管理壹些消費者組(集群負載均衡)以及該組下每個消費者消費的偏移量。每個消費者實例化時,同時實例化壹個ConsumerCoordinator對象,負責每個消費者與同壹消費者組下的服務器組協調器之間的通信。如下圖所示:
消費者協調器可以看作是消費者做操作的代理類(其實不是),消費者的很多操作都是通過消費者協調器來處理的。
消費者協調員主要負責以下工作:
1.更新消費者緩存的元數據。
2.向小組協調員申請加入小組。
3.消費者入群後的相應待遇。
4.請求離開消費者團體
5.向集團協調員提交補償。
6.通過heartbeat保持組協調器的連接感知。
7.被小組協調員選為組長的消費者協調員負責消費者區域的分配。分配結果被發送到組協調器。
8.對於不是領導者的消費者,結果通過消費者協調器和組協調器同步分發。
消費者協調員主要依賴的組件和描述如下圖所示:
可以看出,這些組成部分可以與消費者協調員承擔的工作相比較。
組協調者負責處理消費者協調者發送的各種請求。它主要提供以下功能:
組協調器在broker啟動時被實例化,每個組協調器負責壹些使用者組的管理。它主要依賴的組件如下所示:
這些組件也可以對應於組協調器的功能。具體內容不詳細。
下圖顯示了消費者開始選擇領導者和加入團體的過程。
消費者加入團體的過程顯示了消費者協調員和團體協調員如何壹起工作。領導消費者會承擔分區分配的工作,所以卡夫卡集群的壓力會小很多。同壹組中的消費者通過組協調器保持同步。消費者和分區之間的對應在卡夫卡那裏是持久的。
消費者在消費的時候,會在本地維護消費的偏移量,也就是偏移量,這樣下次就知道從哪裏開始消費了。如果整個環境沒有改變,這就足夠了。但是壹旦消費者平衡操作或分區發生變化,消費者不再對應原來的分區,每個消費者的偏移量也不會同步到服務器,無法繼續之前的工作。
因此,只有消耗偏移量會定期發送到服務器,由GroupCoordinator集中管理。在分區被重新分配後,每個消費者從GroupCoordinator中讀取其對應分區的偏移量,並在新分區中繼續其前任的工作。
下圖是沒有向服務器提交偏移量的問題:
當初消費者0消費分區0,1。後來由於新的消費者2,分區被重新分配。消費者0不再消費分區2,但消費者2消費分區2,但由於消費者不能相互通信,所有消費者2都不知道從哪裏開始消費。
因此,消費者需要定期向服務器提交自己的消費偏移量,以便在重新分區操作後,每個消費者都能在服務器上找到分配給他的分區所消耗的偏移量,並繼續消費。
由於kafka具有高可用性和橫向擴展的特點,當新的分區出現或者新的消費者進入群組時,需要重新分配消費者對應的分區,所以如果offset提交有問題,就會重復消費或者丟失消息。特別要註意提交抵銷的時機和方式!!
1.自動提交抵銷。
將enable.auto.commit設置為true,設置周期,默認值為5s。消費者每次調用輪詢消息的poll()方法,都會檢查是否超過5s未提交的偏移量,如果是,就會提交上次輪詢返回的偏移量。
這樣很方便,但是會帶來重復消費的問題。如果在最近壹次提交3s的偏移後觸發了重新平衡,並且上次提交的偏移仍然存儲在服務器中,那麽重新平衡後,新的消費者將從上次提交的偏移中拉取消息,3s內消耗的消息將被重復消耗。
2.手動提交偏移
將enable.auto.commit設置為false。在程序中手動調用CommitSync()來提交偏移量。此時,提交poll方法返回的最新偏移量。
CommitSync()是同步提交偏移量,主程序會壹直阻塞,直到偏移量提交成功。這將限制程序的吞吐量。如果降低提交頻率,很容易重復消費。
這裏我們可以使用commitAsync()異步提交偏移量。只需提交,無需等待經紀人返回提交結果。
只要沒有不可恢復的錯誤,CommitSync就會重試,直到成功。CommitAsync不會重試,失敗就是失敗。CommitAsync不會再次嘗試,因為當它再次嘗試提交時,其他更大的偏移量可能已經成功提交了。如果此時嘗試成功提交,較小的偏移量將覆蓋較大的偏移量。然後,如果此時發生再平衡,新的消費者將重復消費信息。