欧美日韩国产一区二区三区不卡,欧洲一区二区三区精品,日韩一区不卡,成人国产二区

rabbitmq(RabbitMQ詳解)

時(shí)間:2023-10-09 12:53:34 閱讀:6

RabbitMQ詳解

RabbitMQ的優(yōu)點(diǎn):

  • 開源, 功能好效, 安定性好
  • 提供可靠性消息投遞形式(confirm), 前往形式(return)等
  • 與Spring完善整合, API豐厚
  • 集群形式豐厚, 支持表達(dá)式設(shè)置, 高可用HA形式, 鏡像行列模子
  • 可以確保數(shù)據(jù)不喪失的條件下做到高可靠性, 可用性

RabbitMQ高功能緣故:

  • 由Erlang言語(yǔ)開發(fā),承繼其天生的并發(fā)性,安定性和寧?kù)o性有保證

RabbitMQ的協(xié)議:

AMQP(Advanced Message Queuing Protocol)高等消息行列協(xié)議,是一個(gè)異步消息轉(zhuǎn)達(dá)所使用使用層協(xié)議標(biāo)準(zhǔn),為面向消息正中件計(jì)劃,基于此協(xié)議的客戶端與消息正中件可以無(wú)視消息泉源轉(zhuǎn)達(dá)消息,不受客戶端、消息正中件、不同的開發(fā)言語(yǔ)情況等條件的限定。

計(jì)劃看法表明:

  • Server : 又稱Broker, 承受客戶端毗連, 完成AMQP實(shí)體辦事
  • Connection : 毗連, 使用步驟與Broker的網(wǎng)絡(luò)毗連
  • Channel : 網(wǎng)絡(luò)信道, 幾乎一切的利用都在Channel中舉行, Channel是舉行消息讀寫的通道。客戶端可以創(chuàng)建多個(gè)Channel, 每個(gè)Channel代表一個(gè)會(huì)話職責(zé)。
  • Message : 消息, 辦事器和使用步驟之間傳送的數(shù)據(jù), 有Properties和Body構(gòu)成。Properties可以抵消息舉行修飾, 好比消息的優(yōu)先級(jí), 延長(zhǎng)等高等特性; Body就是消息體內(nèi)容。
  • Virtual Host : 假造地點(diǎn), 用于舉行邏輯斷絕, 最表層的消息路由。一個(gè)Virtual Host內(nèi)里可以有多少個(gè)Exchange和Queue, 同一個(gè)Virtual Host內(nèi)里不克不及有相反稱呼的Exchange或Queue
  • Exchange : 互換機(jī), 用于吸收消息, 依據(jù)路由鍵轉(zhuǎn)發(fā)消息到綁定的行列
  • Binding : Exchange和Queue之間的假造毗連, binding中可以包含routing key
  • Routing Key : 一個(gè)路由端正, 假造機(jī)可用它來(lái)確定怎樣路由一個(gè)特定消息
  • Queue : 也成Message Queue, 消息行列, 用于保存消息并將它們轉(zhuǎn)發(fā)給消耗者

RabbitMQ全體架構(gòu)

RabbitMQ成員簡(jiǎn)介

Binding-綁定

  • Exchange和Exchange, Queue之間的毗連干系
  • 綁定中可以包含RoutingKey大概參數(shù)

Queue-消息行列

  • 消息行列, 實(shí)踐存儲(chǔ)消息數(shù)據(jù)
  • Durability : 對(duì)否歷久化
  • Auto delete : 如選yes,代表當(dāng)最初一個(gè)監(jiān)聽被移除之后, 該Queue會(huì)主動(dòng)被刪除

Message-消息

  • 辦事和使用步驟之間傳送的數(shù)據(jù)
  • 實(shí)質(zhì)上就是一段數(shù)據(jù), 由Properties和Payload(Body)構(gòu)成
  • 常用屬性 : delivery mode, headers(自界說(shuō)屬性)
  • 其他屬性content_type, content_encoding, prioritycorrelation_id : 可以以為是消息的唯一idreplay_to : 重回行列設(shè)定expiration : 消息過(guò)時(shí)時(shí)間message_id : 消息idtimestamp, type, user_id, app_id, cluster_id

Virtual Host-假造主機(jī)

  • 假造地點(diǎn), 用于舉行邏輯斷絕, 最表層的消息路由
  • 一個(gè)Virtual Host內(nèi)里可以有多少個(gè)Exchange和Queue
  • 同一個(gè)Virtual Host內(nèi)里不克不及有相反稱呼的Exchange或Queue

Exchange互換機(jī)

吸收消息,并依據(jù)路由鍵轉(zhuǎn)發(fā)消息到所綁定的行列

注:互換機(jī)不會(huì)存儲(chǔ)消息,假如消息發(fā)送到?jīng)]有綁定消耗行列的互換機(jī),消息則喪失。

互換機(jī)的屬性

  • Name : 互換機(jī)稱呼
  • Type : 互換機(jī)典范, direct, topic, fanout, headers
  • Durability : 對(duì)否必要?dú)v久化, true為歷久化
  • Auto Delete : 當(dāng)最初一個(gè)綁定到Exchange上的行列刪除后, 主動(dòng)刪除該Exchange
  • Internal : 如今Exchange對(duì)否用于RabbitMQ內(nèi)里使用, 默以為False, 這個(gè)屬性很少會(huì)用到
  • Arguments : 擴(kuò)展參數(shù), 用于擴(kuò)展AMQP協(xié)議訂定化使用

互換機(jī)的四品種型

  • Direct exchange(直連互換機(jī))是依據(jù)消息攜帶的路由鍵(routing key)將消息投遞給對(duì)應(yīng)行列的注意 : Direct形式可以使用RabbitMQ自帶的Exchange(default Exchange), 以是不必要將Exchange舉行任何綁定(binding)利用, 消息轉(zhuǎn)達(dá)時(shí), RoutingKey必需完全婚配才會(huì)被行列吸收, 不然該消息會(huì)被丟棄

  • Fanout exchange(扇型互換機(jī))將消息路由給綁定到它身上的一切行列不處理路由鍵, 只必要簡(jiǎn)便的將行列綁定到互換機(jī)上發(fā)送到互換機(jī)的消息都市被轉(zhuǎn)發(fā)到與該互換機(jī)綁定的一切行列上Fanout互換機(jī)轉(zhuǎn)發(fā)消息是最快的

  • Topic exchange(主題互換機(jī))行列經(jīng)過(guò)路由鍵綁定到互換機(jī)上,然后,互換機(jī)依據(jù)消息里的路由值,將消息路由給一個(gè)或多個(gè)綁定行列(含糊婚配)“#” : 婚配一個(gè)或多個(gè)詞“*” : 婚配一個(gè)詞

  • Headers exchange(頭互換機(jī))相似主題互換機(jī),但是頭互換機(jī)使用多個(gè)消息屬性來(lái)代替路由鍵創(chuàng)建路由端正。經(jīng)過(guò)推斷消息頭的值可否與指定的綁定相婚配來(lái)建立路由端正。

RabbitMQ常用的5種事情形式

1、點(diǎn)對(duì)點(diǎn)(簡(jiǎn)便)的行列

  • 不必要互換機(jī)
  • 一個(gè)消費(fèi)者,一個(gè)消耗者

2、事情行列(公平性)

  • 不必要互換機(jī)
  • 一個(gè)消費(fèi)者,多個(gè)消耗者,但是一個(gè)消息只會(huì)發(fā)送給一個(gè)行列(競(jìng)爭(zhēng)的消耗者形式)
  • 默許是輪詢,即會(huì)將消息輪替發(fā)給多個(gè)消耗者,但如此抵消耗得比力慢的消耗者不公平
  • 可接納公中分派,即能者多勞channel.basicQos(1);// 限定:發(fā)送一條信息給消耗者A,消耗者A未反應(yīng)處理后果之前,不會(huì)再次發(fā)送信息給消耗者Aboolean autoAck = false;// 取消主動(dòng)反應(yīng) channel.basicConsume(QUEUE_NAME, autoAck, consumer);// 吸收信息channel.basicAck(envelope.getDeliveryTag(), false);// 反應(yīng)消息處理終了

3、公布/訂閱

  • 一個(gè)消費(fèi)者,多個(gè)消耗者
  • 每一個(gè)消耗者都有本人的一個(gè)行列
  • 消費(fèi)者沒(méi)有直接發(fā)消息到行列中,而是發(fā)送到互換機(jī)
  • 每個(gè)消耗者的行列都綁定到互換機(jī)上
  • 消息經(jīng)過(guò)互換機(jī)抵達(dá)每個(gè)消耗者的行列

該形式就是Fanout Exchange(扇型互換機(jī))將消息路由給綁定到它身上的一切行列

4、路由

消費(fèi)者發(fā)送消息到互換機(jī)并指定一個(gè)路由key,消耗者行列綁定到互換機(jī)時(shí)要訂定路由key(key婚配就能承受消息,key不婚配就不克不及承受消息)

該形式接納Direct exchange(直連互換機(jī))

5、主題(通配符)

此形式真實(shí)路由key形式的基本上,使用了通配符來(lái)辦理消耗者吸收消息。消費(fèi)者P發(fā)送消息到互換機(jī)X,互換機(jī)依據(jù)綁定行列的routing key的值舉行通配符婚配

標(biāo)記#:婚配一個(gè)大概多個(gè)詞lazy.# 可以婚配lazy.irs大概lazy.irs.cor

標(biāo)記*:只能婚配一個(gè)詞lazy.* 可以婚配lazy.irs大概lazy.cor

該形式接納Topic exchange(主題互換機(jī))

消息可靠性轉(zhuǎn)達(dá)或回退(消費(fèi)者端)

消費(fèi)者發(fā)送消息出去之后,不曉得畢竟有沒(méi)有發(fā)送到RabbitMQ辦事器, 默許是不曉得的。并且有的時(shí)分我們?cè)诎l(fā)送消息之后,后方的邏輯出成績(jī)了,我們不想要發(fā)送之前的消息了,必要撤回該怎樣做。

AMQP 事件機(jī)制

  • txSelect 將如今channel設(shè)置為transaction形式
  • txCommit 提交如今事件
  • txRollback 事件回滾

Confirm 形式

消息的確認(rèn), 是指消費(fèi)者投遞消息后, 假如Broker收到消息, 則會(huì)給我們產(chǎn)生一個(gè)應(yīng)對(duì)

消費(fèi)者舉行吸收應(yīng)對(duì), 用來(lái)確定這條消息對(duì)否正常發(fā)送到Broker, 這種辦法也是消息的可靠性投遞的中心保證

  • 在channel上開啟確認(rèn)形式 : channel.confirmSelect()
  • 在channel上添加監(jiān)聽 : addConfirmListener, 監(jiān)聽告捷和失敗的前往后果, 依據(jù)具體的后果抵消息舉行重新發(fā)送, 或紀(jì)錄日志等后續(xù)處理

Return消息機(jī)制

Return Listener用于處理一些不成路由的消息

正常情況下消息消費(fèi)者經(jīng)過(guò)指定一個(gè)Exchange和RoutingKey, 把消息送到某一個(gè)行列中去, 然后消耗者監(jiān)聽行列, 舉行消耗,但在某些情況下, 假如在發(fā)送消息的時(shí)分, 如今的exchange不存在大概指定的路由key路由不到,這個(gè)時(shí)分假如我們必要監(jiān)聽這種不成達(dá)的消息, 就要使用Return Listener。

在基本API中有一個(gè)緊張的設(shè)置項(xiàng)Mandatory : 假如為true, 則監(jiān)聽器會(huì)吸收到路由不成達(dá)的消息, 然后舉行后續(xù)處理(補(bǔ)償或人工處理), 假如為false, 那么broker端主動(dòng)刪除該消息。

怎樣保證消息可靠轉(zhuǎn)達(dá)

  • 保證消息的告捷發(fā)射
  • 保證MQ節(jié)點(diǎn)的告捷吸收
  • 發(fā)送端收到MQ節(jié)點(diǎn)(Broker)的確認(rèn)應(yīng)對(duì)
  • 完滿的消息補(bǔ)償機(jī)制

方案:

1、消息落庫(kù), 抵消息形態(tài)舉行標(biāo)志

  • step1:消息入庫(kù)
  • step2:消息發(fā)送
  • step3:消耗端消息確認(rèn)
  • step4:更新庫(kù)中消息形態(tài)為已確認(rèn)
  • step5:定時(shí)職責(zé)讀取數(shù)據(jù)庫(kù)中未確認(rèn)的消息
  • step6:未收到確認(rèn)后果的消息重新發(fā)送
  • step7:假如重試多次之后仍舊失敗, 則將消息形態(tài)變動(dòng)為投遞失敗的終態(tài), 后方必要人工到場(chǎng)

2、消息的延長(zhǎng)投遞, 做二次確認(rèn), 回調(diào)反省

  • step1 : 第一次消息發(fā)送, 必需業(yè)務(wù)數(shù)據(jù)落庫(kù)之后才干舉行消息發(fā)送
  • step2 : 第二次消息延長(zhǎng)發(fā)送, 設(shè)定延長(zhǎng)一段時(shí)間發(fā)送第二次check消息
  • step3 : 消耗端監(jiān)聽Broker, 舉行消息消耗
  • step4 : 消耗告捷之后, 發(fā)送確認(rèn)消息到確認(rèn)消息行列
  • step5 : Callback Service監(jiān)聽step4中的確認(rèn)消息行列, 維護(hù)消息形態(tài), 對(duì)否消耗告捷等形態(tài)
  • step6 : Callback Service監(jiān)聽step2發(fā)送的Delay Check的消息行列, 檢測(cè)內(nèi)里的消息形態(tài), 假如消息是發(fā)送告捷形態(tài), 則流程完畢, 假如消息是失敗形態(tài), 大概查不到如今消息形態(tài)時(shí), 會(huì)關(guān)照消費(fèi)者, 舉行消息重發(fā), 重新上述步調(diào)

重試機(jī)制和冪等性保證(消耗者端)

重試機(jī)制

消耗者在消耗消息的時(shí)分,假如消耗者業(yè)務(wù)邏輯顯現(xiàn)步驟特別,會(huì)使用消息重試機(jī)制。

  • 情況1: 消耗者獲取到消息后,調(diào)用第三方接口,但接口暫且無(wú)法拜候,對(duì)否必要重試? (必要重試機(jī)制)
  • 情況2: 消耗者獲取到消息后,拋出數(shù)據(jù)轉(zhuǎn)換特別,對(duì)否必要重試?(不必要重試機(jī)制)必要公布舉行處理。

關(guān)于情況2,假如消耗者代碼拋出特別是必要公布新版本才干處理的成績(jī),那么不必要重試,重試也于事無(wú)補(bǔ)。應(yīng)該接納日志紀(jì)錄+定時(shí)職責(zé)job康健反省+人工舉行補(bǔ)償

重試機(jī)制的完成

在SpringBoot中,@RabbitListener(queue="")用于消耗者監(jiān)聽行列。底層使用Aop舉行攔阻,假如步驟沒(méi)有拋出特別,則主動(dòng)提交事件。假如拋出特別,該消息會(huì)緩存到RabbitMQ辦事器,主動(dòng)實(shí)行重試機(jī)制,不休到告捷為止。可以設(shè)置重試距離時(shí)間和重試的次數(shù)。

冪等性保證

冪等性:多次實(shí)行, 后果堅(jiān)持一律

網(wǎng)絡(luò)延長(zhǎng)傳輸中,消耗顯現(xiàn)特別大概是消耗延長(zhǎng)消耗,會(huì)形成MQ舉行重試補(bǔ)償,在重試歷程中,約莫會(huì)形成反復(fù)消耗。

處理方案:

  • 唯一ID+指紋碼機(jī)制唯一ID + 指紋碼機(jī)制,使用數(shù)據(jù)庫(kù)主鍵去重SELECT COUNT(1) FROM T_ORDER WHERE ID = 唯一ID +指紋碼利益:完成簡(jiǎn)便壞處:高并發(fā)下多數(shù)據(jù)庫(kù)寫入的功能瓶頸處理方案:跟進(jìn)ID舉行分庫(kù)分表舉行算法路由
  • 使用Redis的原子性去完成在吸收到消息后將消息ID作為key實(shí)行 setnx 下令,假如實(shí)行告捷就表現(xiàn)沒(méi)有處理過(guò)這條消息,可以舉行消耗了,實(shí)行失敗表現(xiàn)消息以前被消耗了。

主動(dòng)簽收與手動(dòng)簽收(消耗端)

默許是主動(dòng)簽收

channel.basicConsume(QUEUE_NAME, false, defaultConsumer);//關(guān)閉主動(dòng)簽收,變?yōu)槭謩?dòng)簽收

channel.basicAck(envelope.getDeliveryTag(), false);// 手工簽收, 第二個(gè)參數(shù)表現(xiàn)對(duì)否批量簽收

消耗端限流

消息行列中囤積了多量的消息, 大概某些時(shí)候消費(fèi)的消息遠(yuǎn)宏大于消耗者處理才能的時(shí)分, 這個(gè)時(shí)分假如消耗者一次取出多量的消息, 但是客戶端又無(wú)法處理, 就會(huì)顯現(xiàn)成績(jī), 乃至約莫招致辦事崩潰, 以是必要抵消耗端舉行限流

RabbitMQ提供了一種qos(辦事質(zhì)量確保)功效, 即在非主動(dòng)確認(rèn)消息的條件下, 假如一定數(shù)目標(biāo)消息(經(jīng)過(guò)consumer大概channel設(shè)置qos的值)未被確認(rèn)前, 不舉行消耗新的消息

  • 主動(dòng)簽收要設(shè)置成false, 發(fā)起實(shí)踐事情中也設(shè)置成false
  • void basicQos(int prefetchSize, int prefetchCount, boolean global) throws IOException;prefetchSize : 消息輕重限定, 尋常設(shè)置為0, 消耗端不做限定prefetchCount : 會(huì)報(bào)告RabbitMQ不要同時(shí)給一個(gè)消耗者推送多于N個(gè)消息, 即一旦有N個(gè)消息還沒(méi)有ack, 則該consumer將block(壅閉), 直到有消息ackglobal : true/false 對(duì)否將外表設(shè)置使用于channel, 簡(jiǎn)便來(lái)說(shuō)就是外表的限定是channel級(jí)別的照舊consumer級(jí)別 注意 :

prefetchSize和global這兩項(xiàng),RabbitMQ沒(méi)有完成,臨時(shí)不眷注,prefetchCount在autoAck設(shè)置false的情況下奏效,即在主動(dòng)確認(rèn)的情況下這個(gè)值是不奏效的

限流可完成公平行列。

版權(quán)聲明:本文來(lái)自互聯(lián)網(wǎng)整理發(fā)布,如有侵權(quán),聯(lián)系刪除

原文鏈接:http://www.freetextsend.comhttp://www.freetextsend.com/wangluozixun/37429.html


Copyright ? 2021-2022 All Rights Reserved 備案編號(hào):閩ICP備2023009674號(hào) 網(wǎng)站地圖 聯(lián)系:dhh0407@outlook.com

主站蜘蛛池模板: 天长市| 文化| 新邵县| 曲周县| 体育| 吴忠市| 德钦县| 武陟县| 元氏县| 望江县| 长治市| 武城县| 普兰店市| 阜南县| 广州市| 屯门区| 明光市| 大冶市| 高邮市| 南陵县| 乡城县| 南江县| 满洲里市| 石河子市| 明水县| 安图县| 若羌县| 彭泽县| 南木林县| 健康| 山丹县| 云南省| 壤塘县| 嘉义县| 柳河县| 济宁市| 米脂县| 福海县| 莫力| 遵义县| 灌南县|