前言
最近在寫項(xiàng)目,需要用到信號(hào)量等待一些資源完成,但是最多等待N毫秒。在看本文的正文之前,我們先來看下C語言里的實(shí)現(xiàn)方法。
在C語言里,有如下的API來實(shí)現(xiàn)帶超時(shí)的信號(hào)量等待:
SYNOPSIS
#include pthread.h>
int
pthread_cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *mutex, const struct timespec *abstime);
然后在查看golang的document后,發(fā)現(xiàn)golang里并沒有實(shí)現(xiàn)帶超時(shí)的信號(hào)量,官方文檔在這里。
原理
我的業(yè)務(wù)場(chǎng)景是這樣的:我有一個(gè)緩存字典,當(dāng)多個(gè)用戶請(qǐng)求1個(gè)不存在的key時(shí),只有1個(gè)請(qǐng)求會(huì)穿透到后端,而所有用戶都要排隊(duì)等這個(gè)請(qǐng)求完成,或者超時(shí)返回。
怎么實(shí)現(xiàn)呢?其實(shí)稍微想一想cond的原理,就能模擬一個(gè)帶超時(shí)的cond出來。
在golang里,要同時(shí)實(shí)現(xiàn)”掛起等待”和”超時(shí)返回”,一般得用select case語法,一個(gè)case等待阻塞的資源,一個(gè)case等待一個(gè)timer,這一點(diǎn)是非常確定的。
原本阻塞的資源應(yīng)該通過條件變量的機(jī)制來實(shí)現(xiàn)完成通知,既然這里決定用select case,那么自然想到用channel來代替這個(gè)完成通知。
接下來的問題就是,很多請(qǐng)求者并發(fā)來獲取這個(gè)資源,但是資源還沒有準(zhǔn)備好,所以大家都要排隊(duì)并掛起,等待資源完成,并且當(dāng)資源完成后通知大家。
所以,這里很自然要為這個(gè)資源做一個(gè)隊(duì)列,每個(gè)請(qǐng)求者創(chuàng)建一個(gè)chan,并將chan放到隊(duì)列里,接著select case等待這個(gè)chan的通知。而另一端,資源完成后遍歷隊(duì)列,通知每個(gè)chan即可。
最后一個(gè)問題是,只有第一個(gè)請(qǐng)求者才能穿透請(qǐng)求到后端,而后續(xù)請(qǐng)求者不應(yīng)該穿透重復(fù)的請(qǐng)求,這可以通過判斷緩存里是否有這個(gè)key作為判定首次的條件,而標(biāo)記位init來判斷請(qǐng)求者是否應(yīng)該排隊(duì)。
我的場(chǎng)景
上面是思路,下面是我的業(yè)務(wù)場(chǎng)景實(shí)現(xiàn)。
func (cache *Cache) Get(key string, keyType int) *string {
if keyType == KEY_TYPE_DOMAIN {
key = "#" + key
} else {
key = "=" + key
}
cache.mutex.Lock()
item, existed := cache.dict[key]
if !existed {
item = cacheItem{}
item.key = key
item.waitQueue = list.New()
cache.dict[key] = item
}
cache.mutex.Unlock()
conf := config.GetConfig()
lastGet := getCurMs()
item.mutex.Lock()
item.lastGet = lastGet
if item.init { // 已存在并且初始化
defer item.mutex.Unlock()
return item.value
}
// 未初始化,排隊(duì)等待結(jié)果
wait := waitItem{}
wait.wait_chan = make(chan *string, 1)
item.waitQueue.PushBack(wait)
item.mutex.Unlock()
// 新增key, 啟動(dòng)goroutine獲取初始值
if !existed {
go cache.initCacheItem(item, keyType)
}
timer := time.NewTimer(time.Duration(conf.Cache_waitTime) * time.Millisecond)
var retval *string = nil
// 等待初始化完成
select {
case retval = - wait.wait_chan:
case - timer.C:
}
return retval
}
簡述一下整個(gè)過程:
- 首先鎖字典,如果key不存在,說明我是第一個(gè)請(qǐng)求者,我會(huì)創(chuàng)建這個(gè)key對(duì)應(yīng)的value,只不過init=false表示它正在初始化。最后,釋放字典鎖。
- 接下來,鎖住這個(gè)key,判斷它已經(jīng)初始化完成,那么直接返回value。否則,創(chuàng)建一個(gè)chan放入waitQueue等待隊(duì)列。最后,釋放key鎖。
- 接著,如果當(dāng)前是第一個(gè)請(qǐng)求者,那么會(huì)穿透請(qǐng)求到后端(在一個(gè)獨(dú)立的協(xié)程里去發(fā)起網(wǎng)絡(luò)調(diào)用)。
- 現(xiàn)在,創(chuàng)建一個(gè)用于超時(shí)的定時(shí)器。
- 最后,無論當(dāng)前是否是key的第一個(gè)請(qǐng)求者,還是初始化期間的并發(fā)請(qǐng)求者,它們都通過select case超時(shí)的等待結(jié)果完成。
在initCacheItem函數(shù)里,數(shù)據(jù)已獲取成功
// 一旦標(biāo)記為init, 后續(xù)請(qǐng)求將不再操作waitQueue
item.mutex.Lock()
item.value = newValue
item.init = true
item.expire = expire
item.mutex.Unlock()
// 喚醒所有排隊(duì)者
waitQueue := item.waitQueue
for elem := waitQueue.Front(); elem != nil; elem = waitQueue.Front() {
wait := elem.Value.(*waitItem)
wait.wait_chan - newValue
waitQueue.Remove(elem)
}
- 首先,鎖住key,標(biāo)記init=true,并賦值value,并釋放鎖。此后的請(qǐng)求,都可以立即返回,無需排隊(duì)。
- 之后,因?yàn)閕nit=true已被標(biāo)記,此刻再也有沒有請(qǐng)求會(huì)修改waitQueue,所以無需加鎖,直接遍歷隊(duì)列,通知其中的每個(gè)chan。
最后
這樣就實(shí)現(xiàn)了帶超時(shí)的條件變量效果,實(shí)際上我的場(chǎng)景是一個(gè)broadcast的cond例子,大家可以參照思路實(shí)現(xiàn)自己想要的效果,活學(xué)活用。
總結(jié)
以上就是這篇文章的全部內(nèi)容了,希望本文的內(nèi)容對(duì)大家的學(xué)習(xí)或者工作能帶來一定的幫助,如果有疑問大家可以留言交流,謝謝大家對(duì)腳本之家的支持。
您可能感興趣的文章:- golang監(jiān)聽文件變化的實(shí)例
- golang使用信號(hào)量熱更新的實(shí)現(xiàn)示例
- Golang信號(hào)處理及如何實(shí)現(xiàn)進(jìn)程的優(yōu)雅退出詳解
- golang 監(jiān)聽服務(wù)的信號(hào),實(shí)現(xiàn)平滑啟動(dòng),linux信號(hào)說明詳解