前言
NSQ是Go語言編寫的,開源的分布式消息隊(duì)列中間件,其設(shè)計的目的是用來大規(guī)模地處理每天數(shù)以十億計級別的消息。NSQ 具有分布式和去中心化拓?fù)浣Y(jié)構(gòu),該結(jié)構(gòu)具有無單點(diǎn)故障、故障容錯、高可用性以及能夠保證消息的可靠傳遞的特征,是一個成熟的、已在大規(guī)模生成環(huán)境下應(yīng)用的產(chǎn)品。
背景介紹
在服務(wù)器最開始的時候,基本上在一臺主機(jī)上就能解決大部分問題,所以一般架構(gòu)設(shè)計如下:
但是,突然某一天,來了一個新需求,我們服務(wù)器上不只是簡單的儲存一些文本信息,我們需要儲存圖片甚至視頻,顯然直接在一臺主機(jī)上再部署一個文件服務(wù)是不科學(xué)的,因?yàn)樗鼤加么罅康牧髁?,這時候我們就要考慮單獨(dú)使用一個文件服務(wù)器了,所以我們調(diào)整一下架構(gòu):
通過nginx這個高性能代理服務(wù)器,有效的處理不同用戶請求的訴求:
然后接下來某一天,隨著用戶越來越多,你發(fā)現(xiàn)服務(wù)器的負(fù)載越來越高,這個時候,就要考慮去分離業(yè)務(wù)服務(wù)和數(shù)據(jù)庫服務(wù)了,設(shè)計再次修改:
接下來又過了一些日子,新的問題又出現(xiàn)了,你們公司的業(yè)務(wù)越做越大,用戶越來越多,有用戶投訴,收到的響應(yīng)延遲很大甚至出現(xiàn)了無響應(yīng)的情況,你檢查了一下服務(wù)器,發(fā)現(xiàn)每天峰值的時候,有一小部分?jǐn)?shù)據(jù)是經(jīng)常要使用的,頻繁的從數(shù)據(jù)庫讀出這部分?jǐn)?shù)據(jù),占用了大量的數(shù)據(jù)庫資源,然后我們的設(shè)計又要改了:
把一些常用的數(shù)據(jù)儲存到緩存中,遵循國際慣例的二八原則(80%的請求讀取20%的數(shù)據(jù)),這樣一來,數(shù)據(jù)庫的壓力負(fù)擔(dān)自然可以減少很多。
當(dāng)你覺得高枕無憂的時候,突然老板一時興起,我們來搞一個秒殺活動吧,這時候你一定會絞盡腦汁解決不能多買多賣的問題(一瞬間高并發(fā)的常見問題),好吧,終于輪到我們消息隊(duì)列出場了(當(dāng)然消息隊(duì)列不是唯一的解決方案),我們先把設(shè)計貼出來:
好的,所有的請求都先注入消息隊(duì)列,然后立刻給予請求響應(yīng),因?yàn)樽⑷胂㈥?duì)列實(shí)際上向內(nèi)存寫入數(shù)據(jù)的過程,所以響應(yīng)的速度會非常的快,然后后面的業(yè)務(wù)服務(wù)器再根據(jù)消息的隊(duì)列的內(nèi)容逐個讀出(相當(dāng)于異步讀?。?,所以可以大大削減數(shù)據(jù)庫的壓力,避免多買多賣的問題。當(dāng)然后面隨著業(yè)務(wù)的擴(kuò)大,還有很多問題要解決,比如使用負(fù)載均衡搭建多個業(yè)務(wù)服務(wù)器,使用分布式部署數(shù)據(jù)庫,搭載高可用架構(gòu)等,但是今天只是僅僅為了引出消息隊(duì)列它的應(yīng)用場景,所以我們就不往下討論了。
正文
打開 https://nsq.io/deployment/installing.html 下載對應(yīng)的nsq版本,我下載的是linux最新穩(wěn)定版
下載解壓之后,在/usr/下建立一個目錄,接著把解壓文件夾/bin/下面的文件全部拷貝進(jìn)去,最后在/etc/profile添加引用路徑,這樣就可以直接使用命令啟動nsq服務(wù)了,我的配置如下
我們先介紹一下幾個必要服務(wù)的作用
nsqlookupd:
主要負(fù)責(zé)服務(wù)發(fā)現(xiàn) 負(fù)責(zé)nsqd的心跳、狀態(tài)監(jiān)測,給客戶端、nsqadmin提供nsqd地址與狀態(tài),啟動命令如下:
nsqd:
負(fù)責(zé)接收消息,存儲隊(duì)列和將消息發(fā)送給客戶端,nsqd 可以多機(jī)器部署,當(dāng)你使用客戶端向一個topic發(fā)送消息時,可以配置多個nsqd地址,消息會隨機(jī)的分配到各個nsqd上,nsqd優(yōu)先把消息存儲到內(nèi)存channel中,當(dāng)內(nèi)存channel滿了之后,則把消息寫到磁盤文件中。他監(jiān)聽了兩個tcp端口,一個用來服務(wù)客戶端,一個用來提供http的接口 ,啟動命令如下:
nsqadmin:
nsqadmin是一個web管理界面 啟動方式如下:
啟動之后,通過 http://10.10.6.147:4171/ 可以訪問這個管理頁面, 默認(rèn)使用4171端口
我們先來說明一下這個后臺里面的一些內(nèi)容,因?yàn)槲覀兊腘SQ所使用的是經(jīng)典的pub/sub模式(發(fā)布/訂閱,典型的生產(chǎn)者/消費(fèi)者模式),我們可以先發(fā)布一個主題到NSQ,然后所有訂閱的服務(wù)器就會異步的從這里讀取主題的內(nèi)容:
Topic(左上角):發(fā)布的主題名字
NSQd Host:Nsq主機(jī)服務(wù)地址
Channel:消息通道
NSQd Host:Nsq主機(jī)服務(wù)地址
Depth:消息積壓量
In-flight:已經(jīng)投遞但是還未消費(fèi)掉的消息
Deferred:沒有消費(fèi)掉的延時消息
Messages:服務(wù)器啟動之后,總共接收到的消息量
Connections:通道里面客戶端的訂閱數(shù)
TimeOut:超時時間內(nèi)沒有被響應(yīng)的消息數(shù)
Memory + Disk:儲存在內(nèi)存和硬盤中總共的消息數(shù)
----------------------------------------華麗的分割線----------------------------------------
接著,我們講解如何在代碼中發(fā)布主題內(nèi)容,然后通過訂閱某主題去異步讀取消息
使用官方提供的下載地址:
go get github.com/nsqio/go-nsq
先創(chuàng)建一個主題,并且發(fā)布100條消息:
package main
import (
"github.com/nsqio/go-nsq"
"fmt"
)
var (
//nsqd的地址,使用了tcp監(jiān)聽的端口
tcpNsqdAddrr = "10.10.6.147:4150"
)
func main() {
//初始化配置
config := nsq.NewConfig()
for i := 0; i 100; i++ {
//創(chuàng)建100個生產(chǎn)者
tPro, err := nsq.NewProducer(tcpNsqdAddrr, config)
if err != nil {
fmt.Println(err)
}
//主題
topic := "Insert"
//主題內(nèi)容
tCommand := "new data!"
//發(fā)布消息
err = tPro.Publish(topic, []byte(tCommand))
if err != nil {
fmt.Println(err)
}
}
}
接下來我們看看admin的顯示內(nèi)容:
我們可以看到Nsqd接收到了100條信息,100條信息都儲存在內(nèi)存中,沒有被消化。
現(xiàn)在沒有任何服務(wù)訂閱了我們的主題,所以主題的消息都沒有被消化,那我們創(chuàng)建一個消費(fèi)者去訂閱我們的主題:
package main
import (
"github.com/nsqio/go-nsq"
"fmt"
"sync"
"time"
)
var (
//nsqd的地址,使用了tcp監(jiān)聽的端口
tcpNsqdAddrr = "10.10.6.147:4150"
)
//聲明一個結(jié)構(gòu)體,實(shí)現(xiàn)HandleMessage接口方法(根據(jù)文檔的要求)
type NsqHandler struct {
//消息數(shù)
msqCount int64
//標(biāo)識ID
nsqHandlerID string
}
//實(shí)現(xiàn)HandleMessage方法
//message是接收到的消息
func (s *NsqHandler) HandleMessage(message *nsq.Message) error {
//沒收到一條消息+1
s.msqCount++
//打印輸出信息和ID
fmt.Println(s.msqCount,s.nsqHandlerID)
//打印消息的一些基本信息
fmt.Printf("msg.Timestamp=%v, msg.nsqaddress=%s,msg.body=%s \n", time.Unix(0 , message.Timestamp).Format("2006-01-02 03:04:05") , message.NSQDAddress, string(message.Body))
return nil
}
func main() {
//初始化配置
config := nsq.NewConfig()
//創(chuàng)造消費(fèi)者,參數(shù)一時訂閱的主題,參數(shù)二是使用的通道
com, err := nsq.NewConsumer("Insert", "channel1", config)
if err != nil {
fmt.Println(err)
}
//添加處理回調(diào)
com.AddHandler(NsqHandler{nsqHandlerID:"One"})
//連接對應(yīng)的nsqd
err = com.ConnectToNSQD(tcpNsqdAddrr)
if err != nil {
fmt.Println(err)
}
//只是為了不結(jié)束此進(jìn)程,這里沒有意義
var wg = sync.WaitGroup{}
wg.Add(1)
wg.Wait()
/*
result:
msg.Timestamp=2018-11-02 04:37:18, msg.nsqaddress=10.10.6.147:4150,msg.body=new data!
98 One
msg.Timestamp=2018-11-02 04:37:18, msg.nsqaddress=10.10.6.147:4150,msg.body=new data!
99 One
msg.Timestamp=2018-11-02 04:37:18, msg.nsqaddress=10.10.6.147:4150,msg.body=new data!
100 One
msg.Timestamp=2018-11-02 04:37:18, msg.nsqaddress=10.10.6.147:4150,msg.body=new data!
*/
}
這里可以看到,之前擠壓的100條信息,都被我們的訂閱者消化掉了,也就是讀取了
所以我們的訂閱者(可以有多個)如果提前訂閱主題的話,只要對應(yīng)的主題有發(fā)布新內(nèi)容,就可以馬上異步讀取。
完結(jié)
消息隊(duì)列的應(yīng)用場景還有很多,Nsq這里也只是先介紹了一下入門知識,有興趣的朋友可以繼續(xù)深入了解。
以上為個人經(jīng)驗(yàn),希望能給大家一個參考,也希望大家多多支持腳本之家。如有錯誤或未考慮完全的地方,望不吝賜教。
您可能感興趣的文章:- golang http 連接超時和傳輸超時的例子
- golang實(shí)現(xiàn)整型和字節(jié)數(shù)組之間的轉(zhuǎn)換操作
- golang-gin-mgo高并發(fā)服務(wù)器搭建教程
- Golang中基礎(chǔ)的命令行模塊urfave/cli的用法說明
- Golang使用第三方包viper讀取yaml配置信息操作
- 聊聊Golang中很好用的viper配置模塊
- golang實(shí)現(xiàn)ftp實(shí)時傳輸文件的案例