前言
Redis鏈表經(jīng)常會被用于消息隊列的服務(wù),以完成多程序之間的消息交換。個人認(rèn)為redis消息隊列有一個好處,就是可以實現(xiàn)分布式和共享,就和memcache作為mysql的緩存和mysql自帶的緩存一樣。
鏈表實現(xiàn)消息隊列
Redis鏈表支持前后插入以及前后取出,所以如果往尾部插入元素,往頭部取出元素,這就是一種消息隊列,也可以說是消費者/生產(chǎn)者模型??梢岳胠push和rpop來實現(xiàn)。但是有一個問題,如果鏈表中沒有數(shù)據(jù),那么消費者將要在while循環(huán)中調(diào)用rpop,這樣以來就浪費cpu資源,好在Redis提供一種阻塞版pop命令brpop或者blpop,用法為brpop/blpop list timeout
, 當(dāng)鏈表為空的時候,brpop/blpop將阻塞,直到設(shè)置超時時間到或者list插入一個元素。
用法如下:
charles@charles-Aspire-4741:~/mydir/mylib/redis$ ./src/redis-cli
127.0.0.1:6379> lpush list hello
(integer) 1
127.0.0.1:6379> brpop list 0
1) "list"
2) "hello"
127.0.0.1:6379> brpop list 0
//阻塞在這里
/* ---------------------------------------------------- */
//當(dāng)我在另一個客戶端lpush一個元素之后,客戶端輸出為
127.0.0.1:6379> brpop list 0
1) "list"
2) "world"
(50.60s)//阻塞的時間
當(dāng)鏈表為空的時候,brpop是阻塞的,等待超時時間到或者另一個客戶端lpush一個元素。接下來,看下源碼是如何實現(xiàn)阻塞brpop命令的。要實現(xiàn)客戶端阻塞,只需要服務(wù)器不給客戶端發(fā)送消息,那么客戶端就會阻塞在read調(diào)用中,等待消息到達(dá)。這是很好實現(xiàn)的,關(guān)鍵是如何判斷這個客戶端阻塞的鏈表有數(shù)據(jù)到達(dá)以及通知客戶端解除阻塞?Redis的做法是,將阻塞的鍵以及阻塞在這個鍵上的客戶端鏈表存儲在一個字典中,然后每當(dāng)向數(shù)據(jù)庫插入一個鏈表時,就判斷這個新插入的鏈表是否有客戶端阻塞,有的話,就解除這個阻塞的客戶端,并且發(fā)送剛插入鏈表元素給客戶端,客戶端就這樣解除阻塞。
先看下有關(guān)數(shù)據(jù)結(jié)構(gòu),以及server和client有關(guān)屬性
//阻塞狀態(tài)
typedef struct blockingState {
/* Generic fields. */
mstime_t timeout; /* 超時時間 */
/* REDIS_BLOCK_LIST */
dict *keys; /* The keys we are waiting to terminate a blocking
* operation such as BLPOP. Otherwise NULL. */
robj *target; /* The key that should receive the element,
* for BRPOPLPUSH. */
/* REDIS_BLOCK_WAIT */
int numreplicas; /* Number of replicas we are waiting for ACK. */
long long reploffset; /* Replication offset to reach. */
} blockingState;
//繼續(xù)列表
typedef struct readyList {
redisDb *db;//就緒鍵所在的數(shù)據(jù)庫
robj *key;//就緒鍵
} readyList;
//客戶端有關(guān)屬性
typedef struct redisClient {
int btype; /* Type of blocking op if REDIS_BLOCKED. */
blockingState bpop; /* blocking state */
}
//服務(wù)器有關(guān)屬性
struct redisServer {
/* Blocked clients */
unsigned int bpop_blocked_clients; /* Number of clients blocked by lists */
list *unblocked_clients; /* list of clients to unblock before next loop */
list *ready_keys; /* List of readyList structures for BLPOP co */
}
//數(shù)據(jù)庫有關(guān)屬性
typedef struct redisDb {
//keys->redisCLient映射
dict *blocking_keys; /* Keys with clients waiting for data (BLPOP) */
dict *ready_keys; /* Blocked keys that received a PUSH */
}redisDB
必須對上述的數(shù)據(jù)結(jié)構(gòu)足夠了解,否則很難看懂下面的代碼,因為這些代碼需要操作上述的數(shù)據(jù)結(jié)構(gòu)。先從brpop命令執(zhí)行函數(shù)開始分析,brpop命令執(zhí)行函數(shù)為
void brpopCommand(redisClient *c) {
blockingPopGenericCommand(c,REDIS_TAIL);
}
//++++++++++++++++++++++++++++++++++++++++++++++++++
void blockingPopGenericCommand(redisClient *c, int where) {
robj *o;
mstime_t timeout;
int j;
if (getTimeoutFromObjectOrReply(c,c->argv[c->argc-1],timeout,UNIT_SECONDS)
!= REDIS_OK) return;//將超時時間保存在timeout中
for (j = 1; j c->argc-1; j++) {
o = lookupKeyWrite(c->db,c->argv[j]);//在數(shù)據(jù)庫中查找操作的鏈表
if (o != NULL) {//如果不為空
if (o->type != REDIS_LIST) {//不是鏈表類型
addReply(c,shared.wrongtypeerr);//報錯
return;
} else {
if (listTypeLength(o) != 0) {//鏈表不為空
/* Non empty list, this is like a non normal [LR]POP. */
char *event = (where == REDIS_HEAD) ? "lpop" : "rpop";
robj *value = listTypePop(o,where);//從鏈表中pop出一個元素
redisAssert(value != NULL);
//給客戶端發(fā)送pop出來的元素信息
addReplyMultiBulkLen(c,2);
addReplyBulk(c,c->argv[j]);
addReplyBulk(c,value);
decrRefCount(value);
notifyKeyspaceEvent(REDIS_NOTIFY_LIST,event,
c->argv[j],c->db->id);
if (listTypeLength(o) == 0) {//如果鏈表為空,從數(shù)據(jù)庫刪除鏈表
dbDelete(c->db,c->argv[j]);
notifyKeyspaceEvent(REDIS_NOTIFY_GENERIC,"del",
c->argv[j],c->db->id);
}
/* 省略一部分 */
}
}
}
}
/* 如果鏈表為空,則阻塞客戶端 */
blockForKeys(c, c->argv + 1, c->argc - 2, timeout, NULL);
}
從源碼可以看出,brpop可以操作多個鏈表變量,例如brpop list1 list2 0
,但是只能輸出第一個有元素的鏈表。如果list1沒有元素,而list2有元素,則輸出list2的元素;如果兩個都有元素,則輸出list1的元素;如果都沒有元素,則等待其中某個鏈表插入一個元素,之后在2返回。最后調(diào)用blockForyKeys阻塞
void blockForKeys(redisClient *c, robj **keys, int numkeys, mstime_t timeout, robj *target) {
dictEntry *de;
list *l;
int j;
c->bpop.timeout = timeout;//超時時間賦值給客戶端blockingState屬性
c->bpop.target = target;//這屬性適用于brpoplpush命令的輸入對象,如果是brpop, //則target為空
if (target != NULL) incrRefCount(target);//不為空,增加引用計數(shù)
for (j = 0; j numkeys; j++) {
/* 將阻塞的key存入c.bpop.keys字典中 */
if (dictAdd(c->bpop.keys,keys[j],NULL) != DICT_OK) continue;
incrRefCount(keys[j]);
/* And in the other "side", to map keys -> clients */
//將阻塞的key和客戶端添加進(jìn)c->db->blocking_keys
de = dictFind(c->db->blocking_keys,keys[j]);
if (de == NULL) {
int retval;
/* For every key we take a list of clients blocked for it */
l = listCreate();
retval = dictAdd(c->db->blocking_keys,keys[j],l);
incrRefCount(keys[j]);
redisAssertWithInfo(c,keys[j],retval == DICT_OK);
} else {
l = dictGetVal(de);
}
listAddNodeTail(l,c);//添加到阻塞鍵的客戶點鏈表中
}
blockClient(c,REDIS_BLOCKED_LIST);//設(shè)置客戶端阻塞標(biāo)志
}
blockClient函數(shù)只是簡單的設(shè)置客戶端屬性,如下
void blockClient(redisClient *c, int btype) {
c->flags |= REDIS_BLOCKED;//設(shè)置標(biāo)志
c->btype = btype;//阻塞操作類型
server.bpop_blocked_clients++;
}
由于這個函數(shù)之后,brpop命令執(zhí)行函數(shù)就結(jié)束了,由于沒有給客戶端發(fā)送消息,所以客戶端就阻塞在read調(diào)用中。那么如何解開客戶端的阻塞了?
插入一個元素解阻塞
任何指令的執(zhí)行函數(shù)都是在processCommand函數(shù)中調(diào)用call函數(shù),然后在call函數(shù)中調(diào)用命令執(zhí)行函數(shù),lpush也一樣。當(dāng)執(zhí)行完lpush之后,此時鏈表不為空,回到processCommand調(diào)用中,執(zhí)行以下語句
if (listLength(server.ready_keys))
handleClientsBlockedOnLists();
這兩行代碼是先檢查server.ready_keys是否為空,如果不為空,說明已經(jīng)有一些就緒的鏈表,此時可以判斷是否有客戶端阻塞在這個鍵值上,如果有,則喚醒;現(xiàn)在問題又來了,這個server.ready_keys在哪更新鏈表了?
原來是在dbAdd函數(shù)中,當(dāng)往數(shù)據(jù)庫中添加的值類型為REDIS-LIST時,這時就要調(diào)用signalListAsReady函數(shù)將鏈表指針添加進(jìn)server.ready_keys:
//db.c
void dbAdd(redisDb *db, robj *key, robj *val) {
sds copy = sdsdup(key->ptr);
int retval = dictAdd(db->dict, copy, val);//將數(shù)據(jù)添加進(jìn)數(shù)據(jù)庫
redisAssertWithInfo(NULL,key,retval == REDIS_OK);
//判斷是否為鏈表類型,如果是,調(diào)用有鏈表已經(jīng)ready函數(shù)
if (val->type == REDIS_LIST) signalListAsReady(db, key);
if (server.cluster_enabled) slotToKeyAdd(key);
}
//t_list.c
void signalListAsReady(redisDb *db, robj *key) {
readyList *rl;
/* 沒有客戶端阻塞在這個鍵上,則直接返回. */
if (dictFind(db->blocking_keys,key) == NULL) return;
/* 這個鍵已近被喚醒了,所以沒必要重新入隊 */
if (dictFind(db->ready_keys,key) != NULL) return;
/* Ok, 除了上述兩情況,把這個鍵放入server.ready_keys */
rl = zmalloc(sizeof(*rl));
rl->key = key;
rl->db = db;
incrRefCount(key);
listAddNodeTail(server.ready_keys,rl);//添加鏈表末尾
/* We also add the key in the db->ready_keys dictionary in order
* to avoid adding it multiple times into a list with a simple O(1)
* check. */
incrRefCount(key);
//同時將這個阻塞鍵放入db->ready_keys
redisAssert(dictAdd(db->ready_keys,key,NULL) == DICT_OK);
}
OK,這時server.ready_keys上已經(jīng)有就緒鍵了,這時就調(diào)用processCommand函數(shù)中的handleClientsBlockedOnLists()
函數(shù)來處理阻塞客戶端,在這個函數(shù)中,
void handleClientsBlockedOnLists(void) {
while(listLength(server.ready_keys) != 0) {
list *l;
/* 將server.ready_keys賦給一個新的list,再將server.ready_keys清空 */
l = server.ready_keys;
server.ready_keys = listCreate();
/* 迭代每一個就緒的每一個readyList */
while(listLength(l) != 0) {
listNode *ln = listFirst(l);//獲取第一個就緒readyList
readyList *rl = ln->value;
/* 從rl所屬的數(shù)據(jù)庫中刪除rl */
dictDelete(rl->db->ready_keys,rl->key);
/* 查詢rl所屬的數(shù)據(jù)庫查找rl->key ,給阻塞客戶端回復(fù)rl->key鏈表中的第一個元素*/
robj *o = lookupKeyWrite(rl->db,rl->key);
if (o != NULL o->type == REDIS_LIST) {
dictEntry *de;
/* 在rl->db->blocking_keys查找阻塞在rl->key的客戶端鏈表 */
de = dictFind(rl->db->blocking_keys,rl->key);
if (de) {
list *clients = dictGetVal(de);//轉(zhuǎn)換為客戶端鏈表
int numclients = listLength(clients);
while(numclients--) {//給每個客戶端發(fā)送消息
listNode *clientnode = listFirst(clients);
redisClient *receiver = clientnode->value;//阻塞的客戶端
robj *dstkey = receiver->bpop.target;//brpoplpush命令目的鏈表
int where = (receiver->lastcmd
receiver->lastcmd->proc == blpopCommand) ?
REDIS_HEAD : REDIS_TAIL;//獲取取出的方向
robj *value = listTypePop(o,where);//取出就緒鏈表的元素
if (value) {
/* Protect receiver->bpop.target, that will be
* freed by the next unblockClient()
* call. */
if (dstkey) incrRefCount(dstkey);
unblockClient(receiver);//設(shè)置客戶端為非阻塞狀態(tài)
if (serveClientBlockedOnList(receiver,
rl->key,dstkey,rl->db,value,
where) == REDIS_ERR)
{
/* If we failed serving the client we need
* to also undo the POP operation. */
listTypePush(o,value,where);
}//給客戶端回復(fù)鏈表中的元素內(nèi)容
if (dstkey) decrRefCount(dstkey);
decrRefCount(value);
} else {
break;
}
}
}
//如果鏈表為空,則從數(shù)據(jù)庫中刪除
if (listTypeLength(o) == 0) dbDelete(rl->db,rl->key);
/* We don't call signalModifiedKey() as it was already called
* when an element was pushed on the list. */
}
/* 回收rl */
decrRefCount(rl->key);
zfree(rl);
listDelNode(l,ln);
}
listRelease(l); /* We have the new list on place at this point. */
}
}
從這個源碼可知,如果有兩個客戶端,同時阻塞在一個鏈表上面,那么如果鏈表插入一個元素之后,只有先阻塞的那個客戶端收到消息,后面阻塞的那個客戶端繼續(xù)阻塞,這也是先阻塞先服務(wù)的思想。handleClientsBlockedOnLists函數(shù)調(diào)用了unblockClient(receiver)
,該函數(shù)功能為接觸客戶端阻塞標(biāo)志,以及找到db阻塞在key上的客戶端鏈表,并將接觸阻塞的客戶端從鏈表刪除。然后調(diào)用serveClientBlockOnList給客戶端回復(fù)剛在鏈表插入的元素。
int serveClientBlockedOnList(redisClient *receiver, robj *key, robj *dstkey, redisDb *db, robj *value, int where)
{
robj *argv[3];
if (dstkey == NULL) {
/* Propagate the [LR]POP operation. */
argv[0] = (where == REDIS_HEAD) ? shared.lpop :
shared.rpop;
argv[1] = key;
propagate((where == REDIS_HEAD) ?
server.lpopCommand : server.rpopCommand,
db->id,argv,2,REDIS_PROPAGATE_AOF|REDIS_PROPAGATE_REPL);
/* BRPOP/BLPOP */
addReplyMultiBulkLen(receiver,2);
addReplyBulk(receiver,key);
addReplyBulk(receiver,value);
} else {
/* BRPOPLPUSH */
/* 省略 */
}
}
propagate函數(shù)主要是將命令信息發(fā)送給aof和slave。函數(shù)中省略部分是brpoplpush list list1 0
命令的目的鏈表list1非空時,將從list鏈表pop出來的元素插入list1中。當(dāng)給客戶端發(fā)送消息之后,客戶端就從read函數(shù)調(diào)用中返回,變?yōu)椴蛔枞?/p>
通過超時時間解阻塞
如果鏈表一直沒有數(shù)據(jù)插入,那么客戶端將會一直阻塞下去,這肯定是不行的,所以brpop還支持超時阻塞,即阻塞時間超過一定值之后,服務(wù)器返回一個空值,這樣客戶端就解脫阻塞了。
對于時間超時,都放在了100ms執(zhí)行一次的時間事件中;超時解脫阻塞函數(shù)也在serverCron中;在serverCron->clientsCron->clientsCronHandleTimeout
int clientsCronHandleTimeout(redisClient *c, mstime_t now_ms) {
time_t now = now_ms/1000;
//..........
else if (c->flags REDIS_BLOCKED) {
/* Blocked OPS timeout is handled with milliseconds resolution.
* However note that the actual resolution is limited by
* server.hz. */
if (c->bpop.timeout != 0 c->bpop.timeout now_ms) {
/* Handle blocking operation specific timeout. */
replyToBlockedClientTimedOut(c);
unblockClient(c);
}
}
//.............
把這個函數(shù)不相干的代碼刪除,主要部分先判斷這個客戶端是否阻塞,如果是,超時時間是否到期,如果是,則調(diào)用replyToBlockedClientTimedOut給客戶端回復(fù)一個空回復(fù),以及接觸客戶端阻塞。
總結(jié)
鏈表消息隊列實現(xiàn)暫時分析到這了,大家都學(xué)會了嗎?希望這篇文章給大家能帶來一定的幫助,如果有疑問可以留言交流。
您可能感興趣的文章:- SpringBoot利用redis集成消息隊列的方法
- PHP使用php-resque庫配合Redis實現(xiàn)MQ消息隊列的教程
- Java利用Redis實現(xiàn)消息隊列的示例代碼
- phpredis提高消息隊列的實時性方法(推薦)
- PHP基于Redis消息隊列實現(xiàn)發(fā)布微博的方法
- php+redis消息隊列實現(xiàn)搶購功能
- 深入理解redis分布式鎖和消息隊列
- 詳解redis是如何實現(xiàn)隊列消息的ack
- PHP+Redis 消息隊列 實現(xiàn)高并發(fā)下注冊人數(shù)統(tǒng)計的實例
- redis中隊列消息實現(xiàn)應(yīng)用解耦的方法