RabbitMQ詳解
RabbitMQ的優(yōu)點(diǎn):
- 開源, 功能好效, 安定性好
- 提供可靠性消息投遞形式(confirm), 前往形式(return)等
- 與Spring完善整合, API豐厚
- 集群形式豐厚, 支持表達(dá)式設(shè)置, 高可用HA形式, 鏡像行列模子
- 可以確保數(shù)據(jù)不喪失的條件下做到高可靠性, 可用性
RabbitMQ高功能緣故:
- 由Erlang言語(yǔ)開發(fā),承繼其天生的并發(fā)性,安定性和寧?kù)o性有保證
RabbitMQ的協(xié)議:
AMQP(Advanced Message Queuing Protocol)高等消息行列協(xié)議,是一個(gè)異步消息轉(zhuǎn)達(dá)所使用使用層協(xié)議標(biāo)準(zhǔn),為面向消息正中件計(jì)劃,基于此協(xié)議的客戶端與消息正中件可以無(wú)視消息泉源轉(zhuǎn)達(dá)消息,不受客戶端、消息正中件、不同的開發(fā)言語(yǔ)情況等條件的限定。
計(jì)劃看法表明:
- Server : 又稱Broker, 承受客戶端毗連, 完成AMQP實(shí)體辦事
- Connection : 毗連, 使用步驟與Broker的網(wǎng)絡(luò)毗連
- Channel : 網(wǎng)絡(luò)信道, 幾乎一切的利用都在Channel中舉行, Channel是舉行消息讀寫的通道。客戶端可以創(chuàng)建多個(gè)Channel, 每個(gè)Channel代表一個(gè)會(huì)話職責(zé)。
- Message : 消息, 辦事器和使用步驟之間傳送的數(shù)據(jù), 有Properties和Body構(gòu)成。Properties可以抵消息舉行修飾, 好比消息的優(yōu)先級(jí), 延長(zhǎng)等高等特性; Body就是消息體內(nèi)容。
- Virtual Host : 假造地點(diǎn), 用于舉行邏輯斷絕, 最表層的消息路由。一個(gè)Virtual Host內(nèi)里可以有多少個(gè)Exchange和Queue, 同一個(gè)Virtual Host內(nèi)里不克不及有相反稱呼的Exchange或Queue
- Exchange : 互換機(jī), 用于吸收消息, 依據(jù)路由鍵轉(zhuǎn)發(fā)消息到綁定的行列
- Binding : Exchange和Queue之間的假造毗連, binding中可以包含routing key
- Routing Key : 一個(gè)路由端正, 假造機(jī)可用它來(lái)確定怎樣路由一個(gè)特定消息
- Queue : 也成Message Queue, 消息行列, 用于保存消息并將它們轉(zhuǎn)發(fā)給消耗者
RabbitMQ全體架構(gòu)
RabbitMQ成員簡(jiǎn)介
Binding-綁定
- Exchange和Exchange, Queue之間的毗連干系
- 綁定中可以包含RoutingKey大概參數(shù)
Queue-消息行列
- 消息行列, 實(shí)踐存儲(chǔ)消息數(shù)據(jù)
- Durability : 對(duì)否歷久化
- Auto delete : 如選yes,代表當(dāng)最初一個(gè)監(jiān)聽被移除之后, 該Queue會(huì)主動(dòng)被刪除
Message-消息
- 辦事和使用步驟之間傳送的數(shù)據(jù)
- 實(shí)質(zhì)上就是一段數(shù)據(jù), 由Properties和Payload(Body)構(gòu)成
- 常用屬性 : delivery mode, headers(自界說(shuō)屬性)
- 其他屬性content_type, content_encoding, prioritycorrelation_id : 可以以為是消息的唯一idreplay_to : 重回行列設(shè)定expiration : 消息過(guò)時(shí)時(shí)間message_id : 消息idtimestamp, type, user_id, app_id, cluster_id
Virtual Host-假造主機(jī)
- 假造地點(diǎn), 用于舉行邏輯斷絕, 最表層的消息路由
- 一個(gè)Virtual Host內(nèi)里可以有多少個(gè)Exchange和Queue
- 同一個(gè)Virtual Host內(nèi)里不克不及有相反稱呼的Exchange或Queue
Exchange互換機(jī)
吸收消息,并依據(jù)路由鍵轉(zhuǎn)發(fā)消息到所綁定的行列
注:互換機(jī)不會(huì)存儲(chǔ)消息,假如消息發(fā)送到?jīng)]有綁定消耗行列的互換機(jī),消息則喪失。
互換機(jī)的屬性
- Name : 互換機(jī)稱呼
- Type : 互換機(jī)典范, direct, topic, fanout, headers
- Durability : 對(duì)否必要?dú)v久化, true為歷久化
- Auto Delete : 當(dāng)最初一個(gè)綁定到Exchange上的行列刪除后, 主動(dòng)刪除該Exchange
- Internal : 如今Exchange對(duì)否用于RabbitMQ內(nèi)里使用, 默以為False, 這個(gè)屬性很少會(huì)用到
- Arguments : 擴(kuò)展參數(shù), 用于擴(kuò)展AMQP協(xié)議訂定化使用
互換機(jī)的四品種型
- Direct exchange(直連互換機(jī))是依據(jù)消息攜帶的路由鍵(routing key)將消息投遞給對(duì)應(yīng)行列的注意 : Direct形式可以使用RabbitMQ自帶的Exchange(default Exchange), 以是不必要將Exchange舉行任何綁定(binding)利用, 消息轉(zhuǎn)達(dá)時(shí), RoutingKey必需完全婚配才會(huì)被行列吸收, 不然該消息會(huì)被丟棄
- Fanout exchange(扇型互換機(jī))將消息路由給綁定到它身上的一切行列不處理路由鍵, 只必要簡(jiǎn)便的將行列綁定到互換機(jī)上發(fā)送到互換機(jī)的消息都市被轉(zhuǎn)發(fā)到與該互換機(jī)綁定的一切行列上Fanout互換機(jī)轉(zhuǎn)發(fā)消息是最快的
- Topic exchange(主題互換機(jī))行列經(jīng)過(guò)路由鍵綁定到互換機(jī)上,然后,互換機(jī)依據(jù)消息里的路由值,將消息路由給一個(gè)或多個(gè)綁定行列(含糊婚配)“#” : 婚配一個(gè)或多個(gè)詞“*” : 婚配一個(gè)詞
- Headers exchange(頭互換機(jī))相似主題互換機(jī),但是頭互換機(jī)使用多個(gè)消息屬性來(lái)代替路由鍵創(chuàng)建路由端正。經(jīng)過(guò)推斷消息頭的值可否與指定的綁定相婚配來(lái)建立路由端正。
RabbitMQ常用的5種事情形式
1、點(diǎn)對(duì)點(diǎn)(簡(jiǎn)便)的行列
- 不必要互換機(jī)
- 一個(gè)消費(fèi)者,一個(gè)消耗者
2、事情行列(公平性)
- 不必要互換機(jī)
- 一個(gè)消費(fèi)者,多個(gè)消耗者,但是一個(gè)消息只會(huì)發(fā)送給一個(gè)行列(競(jìng)爭(zhēng)的消耗者形式)
- 默許是輪詢,即會(huì)將消息輪替發(fā)給多個(gè)消耗者,但如此抵消耗得比力慢的消耗者不公平
- 可接納公中分派,即能者多勞channel.basicQos(1);// 限定:發(fā)送一條信息給消耗者A,消耗者A未反應(yīng)處理后果之前,不會(huì)再次發(fā)送信息給消耗者Aboolean autoAck = false;// 取消主動(dòng)反應(yīng) channel.basicConsume(QUEUE_NAME, autoAck, consumer);// 吸收信息channel.basicAck(envelope.getDeliveryTag(), false);// 反應(yīng)消息處理終了
3、公布/訂閱
- 一個(gè)消費(fèi)者,多個(gè)消耗者
- 每一個(gè)消耗者都有本人的一個(gè)行列
- 消費(fèi)者沒(méi)有直接發(fā)消息到行列中,而是發(fā)送到互換機(jī)
- 每個(gè)消耗者的行列都綁定到互換機(jī)上
- 消息經(jīng)過(guò)互換機(jī)抵達(dá)每個(gè)消耗者的行列
該形式就是Fanout Exchange(扇型互換機(jī))將消息路由給綁定到它身上的一切行列
4、路由
消費(fèi)者發(fā)送消息到互換機(jī)并指定一個(gè)路由key,消耗者行列綁定到互換機(jī)時(shí)要訂定路由key(key婚配就能承受消息,key不婚配就不克不及承受消息)
該形式接納Direct exchange(直連互換機(jī))
5、主題(通配符)
此形式真實(shí)路由key形式的基本上,使用了通配符來(lái)辦理消耗者吸收消息。消費(fèi)者P發(fā)送消息到互換機(jī)X,互換機(jī)依據(jù)綁定行列的routing key的值舉行通配符婚配
標(biāo)記#:婚配一個(gè)大概多個(gè)詞lazy.# 可以婚配lazy.irs大概lazy.irs.cor
標(biāo)記*:只能婚配一個(gè)詞lazy.* 可以婚配lazy.irs大概lazy.cor
該形式接納Topic exchange(主題互換機(jī))
消息可靠性轉(zhuǎn)達(dá)或回退(消費(fèi)者端)
消費(fèi)者發(fā)送消息出去之后,不曉得畢竟有沒(méi)有發(fā)送到RabbitMQ辦事器, 默許是不曉得的。并且有的時(shí)分我們?cè)诎l(fā)送消息之后,后方的邏輯出成績(jī)了,我們不想要發(fā)送之前的消息了,必要撤回該怎樣做。
AMQP 事件機(jī)制
- txSelect 將如今channel設(shè)置為transaction形式
- txCommit 提交如今事件
- txRollback 事件回滾
Confirm 形式
消息的確認(rèn), 是指消費(fèi)者投遞消息后, 假如Broker收到消息, 則會(huì)給我們產(chǎn)生一個(gè)應(yīng)對(duì)
消費(fèi)者舉行吸收應(yīng)對(duì), 用來(lái)確定這條消息對(duì)否正常發(fā)送到Broker, 這種辦法也是消息的可靠性投遞的中心保證
- 在channel上開啟確認(rèn)形式 : channel.confirmSelect()
- 在channel上添加監(jiān)聽 : addConfirmListener, 監(jiān)聽告捷和失敗的前往后果, 依據(jù)具體的后果抵消息舉行重新發(fā)送, 或紀(jì)錄日志等后續(xù)處理
Return消息機(jī)制
Return Listener用于處理一些不成路由的消息
正常情況下消息消費(fèi)者經(jīng)過(guò)指定一個(gè)Exchange和RoutingKey, 把消息送到某一個(gè)行列中去, 然后消耗者監(jiān)聽行列, 舉行消耗,但在某些情況下, 假如在發(fā)送消息的時(shí)分, 如今的exchange不存在大概指定的路由key路由不到,這個(gè)時(shí)分假如我們必要監(jiān)聽這種不成達(dá)的消息, 就要使用Return Listener。
在基本API中有一個(gè)緊張的設(shè)置項(xiàng)Mandatory : 假如為true, 則監(jiān)聽器會(huì)吸收到路由不成達(dá)的消息, 然后舉行后續(xù)處理(補(bǔ)償或人工處理), 假如為false, 那么broker端主動(dòng)刪除該消息。
怎樣保證消息可靠轉(zhuǎn)達(dá)
- 保證消息的告捷發(fā)射
- 保證MQ節(jié)點(diǎn)的告捷吸收
- 發(fā)送端收到MQ節(jié)點(diǎn)(Broker)的確認(rèn)應(yīng)對(duì)
- 完滿的消息補(bǔ)償機(jī)制
方案:
1、消息落庫(kù), 抵消息形態(tài)舉行標(biāo)志
- step1:消息入庫(kù)
- step2:消息發(fā)送
- step3:消耗端消息確認(rèn)
- step4:更新庫(kù)中消息形態(tài)為已確認(rèn)
- step5:定時(shí)職責(zé)讀取數(shù)據(jù)庫(kù)中未確認(rèn)的消息
- step6:未收到確認(rèn)后果的消息重新發(fā)送
- step7:假如重試多次之后仍舊失敗, 則將消息形態(tài)變動(dòng)為投遞失敗的終態(tài), 后方必要人工到場(chǎng)
2、消息的延長(zhǎng)投遞, 做二次確認(rèn), 回調(diào)反省
- step1 : 第一次消息發(fā)送, 必需業(yè)務(wù)數(shù)據(jù)落庫(kù)之后才干舉行消息發(fā)送
- step2 : 第二次消息延長(zhǎng)發(fā)送, 設(shè)定延長(zhǎng)一段時(shí)間發(fā)送第二次check消息
- step3 : 消耗端監(jiān)聽Broker, 舉行消息消耗
- step4 : 消耗告捷之后, 發(fā)送確認(rèn)消息到確認(rèn)消息行列
- step5 : Callback Service監(jiān)聽step4中的確認(rèn)消息行列, 維護(hù)消息形態(tài), 對(duì)否消耗告捷等形態(tài)
- step6 : Callback Service監(jiān)聽step2發(fā)送的Delay Check的消息行列, 檢測(cè)內(nèi)里的消息形態(tài), 假如消息是發(fā)送告捷形態(tài), 則流程完畢, 假如消息是失敗形態(tài), 大概查不到如今消息形態(tài)時(shí), 會(huì)關(guān)照消費(fèi)者, 舉行消息重發(fā), 重新上述步調(diào)
重試機(jī)制和冪等性保證(消耗者端)
重試機(jī)制
消耗者在消耗消息的時(shí)分,假如消耗者業(yè)務(wù)邏輯顯現(xiàn)步驟特別,會(huì)使用消息重試機(jī)制。
- 情況1: 消耗者獲取到消息后,調(diào)用第三方接口,但接口暫且無(wú)法拜候,對(duì)否必要重試? (必要重試機(jī)制)
- 情況2: 消耗者獲取到消息后,拋出數(shù)據(jù)轉(zhuǎn)換特別,對(duì)否必要重試?(不必要重試機(jī)制)必要公布舉行處理。
關(guān)于情況2,假如消耗者代碼拋出特別是必要公布新版本才干處理的成績(jī),那么不必要重試,重試也于事無(wú)補(bǔ)。應(yīng)該接納日志紀(jì)錄+定時(shí)職責(zé)job康健反省+人工舉行補(bǔ)償
重試機(jī)制的完成
在SpringBoot中,@RabbitListener(queue="")用于消耗者監(jiān)聽行列。底層使用Aop舉行攔阻,假如步驟沒(méi)有拋出特別,則主動(dòng)提交事件。假如拋出特別,該消息會(huì)緩存到RabbitMQ辦事器,主動(dòng)實(shí)行重試機(jī)制,不休到告捷為止。可以設(shè)置重試距離時(shí)間和重試的次數(shù)。
冪等性保證
冪等性:多次實(shí)行, 后果堅(jiān)持一律
網(wǎng)絡(luò)延長(zhǎng)傳輸中,消耗顯現(xiàn)特別大概是消耗延長(zhǎng)消耗,會(huì)形成MQ舉行重試補(bǔ)償,在重試歷程中,約莫會(huì)形成反復(fù)消耗。
處理方案:
- 唯一ID+指紋碼機(jī)制唯一ID + 指紋碼機(jī)制,使用數(shù)據(jù)庫(kù)主鍵去重SELECT COUNT(1) FROM T_ORDER WHERE ID = 唯一ID +指紋碼利益:完成簡(jiǎn)便壞處:高并發(fā)下多數(shù)據(jù)庫(kù)寫入的功能瓶頸處理方案:跟進(jìn)ID舉行分庫(kù)分表舉行算法路由
- 使用Redis的原子性去完成在吸收到消息后將消息ID作為key實(shí)行 setnx 下令,假如實(shí)行告捷就表現(xiàn)沒(méi)有處理過(guò)這條消息,可以舉行消耗了,實(shí)行失敗表現(xiàn)消息以前被消耗了。
主動(dòng)簽收與手動(dòng)簽收(消耗端)
默許是主動(dòng)簽收
channel.basicConsume(QUEUE_NAME, false, defaultConsumer);
channel.basicAck(envelope.getDeliveryTag(), false);
消耗端限流
消息行列中囤積了多量的消息, 大概某些時(shí)候消費(fèi)的消息遠(yuǎn)宏大于消耗者處理才能的時(shí)分, 這個(gè)時(shí)分假如消耗者一次取出多量的消息, 但是客戶端又無(wú)法處理, 就會(huì)顯現(xiàn)成績(jī), 乃至約莫招致辦事崩潰, 以是必要抵消耗端舉行限流
RabbitMQ提供了一種qos(辦事質(zhì)量確保)功效, 即在非主動(dòng)確認(rèn)消息的條件下, 假如一定數(shù)目標(biāo)消息(經(jīng)過(guò)consumer大概channel設(shè)置qos的值)未被確認(rèn)前, 不舉行消耗新的消息
- 主動(dòng)簽收要設(shè)置成false, 發(fā)起實(shí)踐事情中也設(shè)置成false
- void basicQos(int prefetchSize, int prefetchCount, boolean global) throws IOException;prefetchSize : 消息輕重限定, 尋常設(shè)置為0, 消耗端不做限定prefetchCount : 會(huì)報(bào)告RabbitMQ不要同時(shí)給一個(gè)消耗者推送多于N個(gè)消息, 即一旦有N個(gè)消息還沒(méi)有ack, 則該consumer將block(壅閉), 直到有消息ackglobal : true/false 對(duì)否將外表設(shè)置使用于channel, 簡(jiǎn)便來(lái)說(shuō)就是外表的限定是channel級(jí)別的照舊consumer級(jí)別 注意 :
prefetchSize和global這兩項(xiàng),RabbitMQ沒(méi)有完成,臨時(shí)不眷注,prefetchCount在autoAck設(shè)置false的情況下奏效,即在主動(dòng)確認(rèn)的情況下這個(gè)值是不奏效的
限流可完成公平行列。
