主頁(yè) > 知識(shí)庫(kù) > 關(guān)于redigo中PubSub的一點(diǎn)小坑分析

關(guān)于redigo中PubSub的一點(diǎn)小坑分析

熱門(mén)標(biāo)簽:百度商家地圖標(biāo)注怎么做 地圖標(biāo)注如何即時(shí)生效 地圖標(biāo)注費(fèi)用 最簡(jiǎn)單的百度地圖標(biāo)注 太原營(yíng)銷(xiāo)外呼系統(tǒng) 玄武湖地圖標(biāo)注 西藏教育智能外呼系統(tǒng)價(jià)格 小紅書(shū)怎么地圖標(biāo)注店 竹間科技AI電銷(xiāo)機(jī)器人

前言

最近在用 golang 做一些 redis 相關(guān)的操作,選用了 redigo 這個(gè)第三方庫(kù)。然后在使用 Pub/Sub 的時(shí)候,卻發(fā)現(xiàn)了一個(gè)小坑……

Redis Client

首先,我們來(lái)初始化一個(gè)帶連接池的 Redis Client:

import (
	"github.com/gomodule/redigo/redis"
)

type RedisClient struct {
	pool *redis.Pool
}

func NewRedisClient(addr string, db int, passwd string) *RedisClient {
	pool := redis.Pool{
		MaxIdle:  10,
		IdleTimeout: 300 * time.Second,
		Dial: func() (redis.Conn, error) {
			c, err := redis.Dial("tcp", addr, redis.DialPassword(passwd), redis.DialDatabase(db))
			if err != nil {
				return nil, err
			}
			return c, nil
		},
		TestOnBorrow: func(c redis.Conn, t time.Time) error {
			if time.Since(t)  time.Minute {
				return nil
			}
			_, err := c.Do("PING")
			return err
		},
	}
	log.Printf("new redis pool at %s", addr)
	client := RedisClient{
		pool: pool,
	}
	return client
}

Publish

然后我們可以簡(jiǎn)單的實(shí)現(xiàn)一個(gè) publish 方法:

func (r *RedisClient) Publish(channel, message string) (int, error) {
	c := r.pool.Get()
	defer c.Close()
	n, err := redis.Int(c.Do("PUBLISH", channel, message))
	if err != nil {
		return 0, fmt.Errorf("redis publish %s %s, err: %v", channel, message, err)
	}
	return n, nil
}

Subscribe

接下來(lái)就是一個(gè)稍微復(fù)雜點(diǎn)的帶有心跳的 subscribe 方法:

func (r *RedisClient) Subscribe(ctx context.Context, consume ConsumeFunc, channel ...string) error {
	psc := redis.PubSubConn{Conn: r.pool.Get()}
	defer psc.Close()
	log.Printf("redis pubsub subscribe channel: %v", channel)
	if err := psc.Subscribe(redis.Args{}.AddFlat(channel)...); err != nil {
		return err
	}
	done := make(chan error, 1)
	// start a new goroutine to receive message
	go func() {
		for {
			switch msg := psc.Receive().(type) {
			case error:
				done - fmt.Errorf("redis pubsub receive err: %v", msg)
				return
			case redis.Message:
				if err := consume(msg); err != nil {
					done - err
					return
				}
			case redis.Subscription:
				if msg.Count == 0 {
					// all channels are unsubscribed
					done - nil
					return
				}
			}
		}
	}()

	// health check
	tick := time.NewTicker(time.Minute)
	defer tick.Stop()
	for {
		select {
		case -ctx.Done():
			if err := psc.Unsubscribe(); err != nil {
				return fmt.Errorf("redis pubsub unsubscribe err: %v", err)
			}
			return nil
		case err := -done:
			return err
		case -tick.C:
			if err := psc.Ping(""); err != nil {
				return err
			}
		}
	}

	return nil
}

最后,我們寫(xiě)一個(gè)簡(jiǎn)單地 main 函數(shù)來(lái)調(diào)用 publish subscribe:

func (r *RedisClient) Subscribe(ctx context.Context, consume ConsumeFunc, channel ...string) error {
	psc := redis.PubSubConn{Conn: r.pool.Get()}
	defer psc.Close()
	log.Printf("redis pubsub subscribe channel: %v", channel)
	if err := psc.Subscribe(redis.Args{}.AddFlat(channel)...); err != nil {
		return err
	}
	done := make(chan error, 1)
	// start a new goroutine to receive message
	go func() {
		for {
			switch msg := psc.Receive().(type) {
			case error:
				done - fmt.Errorf("redis pubsub receive err: %v", msg)
				return
			case redis.Message:
				if err := consume(msg); err != nil {
					done - err
					return
				}
			case redis.Subscription:
				if msg.Count == 0 {
					// all channels are unsubscribed
					done - nil
					return
				}
			}
		}
	}()

	// health check
	tick := time.NewTicker(time.Minute)
	defer tick.Stop()
	for {
		select {
		case -ctx.Done():
			if err := psc.Unsubscribe(); err != nil {
				return fmt.Errorf("redis pubsub unsubscribe err: %v", err)
			}
			return nil
		case err := -done:
			return err
		case -tick.C:
			if err := psc.Ping(""); err != nil {
				return err
			}
		}
	}

	return nil
}


咋一看之下,好像并沒(méi)有什么異常?然而,如果我們這時(shí)候去看 redis 的 tcp 連接,就可以發(fā)現(xiàn)一些貓膩:

$sudo netstat -antp | grep redis
tcp  0  0 0.0.0.0:6379   0.0.0.0:*    LISTEN  940/redis-server 0. 
tcp  0  0 172.16.8.128:6379  172.16.8.1:55010  ESTABLISHED 940/redis-server 0. 
tcp  0  0 172.16.8.128:6379  172.16.8.1:55015  ESTABLISHED 940/redis-server 0. 
tcp  0  0 172.16.8.128:6379  172.16.8.1:55009  ESTABLISHED 940/redis-server 0. 
tcp  0  0 172.16.8.128:6379  172.16.8.1:55005  ESTABLISHED 940/redis-server 0. 
tcp  0  0 172.16.8.128:6379  172.16.8.1:55012  ESTABLISHED 940/redis-server 0. 
tcp  0  0 172.16.8.128:6379  172.16.8.1:55011  ESTABLISHED 940/redis-server 0. 
tcp  0  0 172.16.8.128:6379  172.16.8.1:55013  ESTABLISHED 940/redis-server 0. 
tcp  0  0 172.16.8.128:6379  172.16.8.1:55007  ESTABLISHED 940/redis-server 0. 
tcp  0  0 172.16.8.128:6379  172.16.8.1:55006  ESTABLISHED 940/redis-server 0. 
tcp  0  0 172.16.8.128:6379  172.16.8.1:55014  ESTABLISHED 940/redis-server 0. 
tcp  0  0 172.16.8.128:6379  172.16.8.1:54972  ESTABLISHED 940/redis-server 0. 

竟然是每一次 subscribe 就新建了一個(gè)連接,而 connection pool 似乎沒(méi)有什么作用。

更進(jìn)一步地調(diào)試,我們發(fā)現(xiàn)在 defer psc.Close() 的時(shí)候就卡住了,也就是上面的 10 個(gè) goroutine 其實(shí)并沒(méi)有正常退出。

Concurrent

排查許久之后,終于定位到了問(wèn)題!引用 redigo 的說(shuō)明:

Connections support one concurrent caller to the Receive method and one concurrent caller to the Send and Flush methods. No other concurrency is supported including concurrent calls to the Do method.

For full concurrent access to Redis, use the thread-safe Pool to get, use and release a connection from within a goroutine. Connections returned from a Pool have the concurrency restrictions described in the previous paragraph.

也就是說(shuō),雖然一個(gè)連接可以在不同的 goroutine 并發(fā)調(diào)用 Receive() 和 Subscribe()(subscribe調(diào)用了send和flush) ,但是卻不能再有其他并發(fā)操作(比如 Close())。

其他相似的問(wèn)題還可以參考 issue

Fix

知道了上面的原因之后,我們稍微修改一下 defer psc.Close() 的位置即可解決問(wèn)題:

	// start a new goroutine to receive message
	go func() {
		// IMPORTANT!
		defer psc.Close()
		for {
			switch msg := psc.Receive().(type) {
			case error:

總結(jié)

以上就是這篇文章的全部?jī)?nèi)容了,希望本文的內(nèi)容對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,如果有疑問(wèn)大家可以留言交流,謝謝大家對(duì)腳本之家的支持。

您可能感興趣的文章:
  • go實(shí)現(xiàn)redigo的簡(jiǎn)單操作

標(biāo)簽:廣東 香港 唐山 贛州 澳門(mén) 林芝 揚(yáng)州 景德鎮(zhèn)

巨人網(wǎng)絡(luò)通訊聲明:本文標(biāo)題《關(guān)于redigo中PubSub的一點(diǎn)小坑分析》,本文關(guān)鍵詞  關(guān)于,redigo,中,PubSub,的,一點(diǎn),;如發(fā)現(xiàn)本文內(nèi)容存在版權(quán)問(wèn)題,煩請(qǐng)?zhí)峁┫嚓P(guān)信息告之我們,我們將及時(shí)溝通與處理。本站內(nèi)容系統(tǒng)采集于網(wǎng)絡(luò),涉及言論、版權(quán)與本站無(wú)關(guān)。
  • 相關(guān)文章
  • 下面列出與本文章《關(guān)于redigo中PubSub的一點(diǎn)小坑分析》相關(guān)的同類(lèi)信息!
  • 本頁(yè)收集關(guān)于關(guān)于redigo中PubSub的一點(diǎn)小坑分析的相關(guān)信息資訊供網(wǎng)民參考!
  • 推薦文章