一:手動故障轉移
Redis集群支持手動故障轉移。也就是向從節(jié)點發(fā)送”CLUSTER FAILOVER”命令,使其在主節(jié)點未下線的情況下,發(fā)起故障轉移流程,升級為新的主節(jié)點,而原來的主節(jié)點降級為從節(jié)點。
為了不丟失數(shù)據(jù),向從節(jié)點發(fā)送”CLUSTER FAILOVER”命令后,流程如下:
a:從節(jié)點收到命令后,向主節(jié)點發(fā)送CLUSTERMSG_TYPE_MFSTART包;
b:主節(jié)點收到該包后,會將其所有客戶端置于阻塞狀態(tài),也就是在10s的時間內(nèi),不再處理客戶端發(fā)來的命令;并且在其發(fā)送的心跳包中,會帶有CLUSTERMSG_FLAG0_PAUSED標記;
c:從節(jié)點收到主節(jié)點發(fā)來的,帶CLUSTERMSG_FLAG0_PAUSED標記的心跳包后,從中獲取主節(jié)點當前的復制偏移量。從節(jié)點等到自己的復制偏移量達到該值后,才會開始執(zhí)行故障轉移流程:發(fā)起選舉、統(tǒng)計選票、贏得選舉、升級為主節(jié)點并更新配置;
”CLUSTER FAILOVER”命令支持兩個選項:FORCE和TAKEOVER。使用這兩個選項,可以改變上述的流程。
如果有FORCE選項,則從節(jié)點不會與主節(jié)點進行交互,主節(jié)點也不會阻塞其客戶端,而是從節(jié)點立即開始故障轉移流程:發(fā)起選舉、統(tǒng)計選票、贏得選舉、升級為主節(jié)點并更新配置。
如果有TAKEOVER選項,則更加簡單粗暴:從節(jié)點不再發(fā)起選舉,而是直接將自己升級為主節(jié)點,接手原主節(jié)點的槽位,增加自己的configEpoch后更新配置。
因此,使用FORCE和TAKEOVER選項,主節(jié)點可以已經(jīng)下線;而不使用任何選項,只發(fā)送”CLUSTER FAILOVER”命令的話,主節(jié)點必須在線。
在clusterCommand函數(shù)中,處理”CLUSTER FAILOVER”命令的部分代碼如下:
else if (!strcasecmp(c->argv[1]->ptr,"failover")
(c->argc == 2 || c->argc == 3))
{
/* CLUSTER FAILOVER [FORCE|TAKEOVER] */
int force = 0, takeover = 0;
if (c->argc == 3) {
if (!strcasecmp(c->argv[2]->ptr,"force")) {
force = 1;
} else if (!strcasecmp(c->argv[2]->ptr,"takeover")) {
takeover = 1;
force = 1; /* Takeover also implies force. */
} else {
addReply(c,shared.syntaxerr);
return;
}
}
/* Check preconditions. */
if (nodeIsMaster(myself)) {
addReplyError(c,"You should send CLUSTER FAILOVER to a slave");
return;
} else if (myself->slaveof == NULL) {
addReplyError(c,"I'm a slave but my master is unknown to me");
return;
} else if (!force
(nodeFailed(myself->slaveof) ||
myself->slaveof->link == NULL))
{
addReplyError(c,"Master is down or failed, "
"please use CLUSTER FAILOVER FORCE");
return;
}
resetManualFailover();
server.cluster->mf_end = mstime() + REDIS_CLUSTER_MF_TIMEOUT;
if (takeover) {
/* A takeover does not perform any initial check. It just
* generates a new configuration epoch for this node without
* consensus, claims the master's slots, and broadcast the new
* configuration. */
redisLog(REDIS_WARNING,"Taking over the master (user request).");
clusterBumpConfigEpochWithoutConsensus();
clusterFailoverReplaceYourMaster();
} else if (force) {
/* If this is a forced failover, we don't need to talk with our
* master to agree about the offset. We just failover taking over
* it without coordination. */
redisLog(REDIS_WARNING,"Forced failover user request accepted.");
server.cluster->mf_can_start = 1;
} else {
redisLog(REDIS_WARNING,"Manual failover user request accepted.");
clusterSendMFStart(myself->slaveof);
}
addReply(c,shared.ok);
}
首先檢查命令的最后一個參數(shù)是否是FORCE或TAKEOVER;
如果當前節(jié)點是主節(jié)點;或者當前節(jié)點是從節(jié)點,但沒有主節(jié)點;或者當前從節(jié)點的主節(jié)點已經(jīng)下線或者斷鏈,并且命令中沒有FORCE或TAKEOVER參數(shù),則直接回復客戶端錯誤信息后返回;
然后調(diào)用resetManualFailover,重置手動強制故障轉移的狀態(tài);
置mf_end為當前時間加5秒,該屬性表示手動強制故障轉移流程的超時時間,也用來表示當前是否正在進行手動強制故障轉移;
如果命令最后一個參數(shù)為TAKEOVER,這表示收到命令的從節(jié)點無需經(jīng)過選舉的過程,直接接手其主節(jié)點的槽位,并成為新的主節(jié)點。因此首先調(diào)用函數(shù)clusterBumpConfigEpochWithoutConsensus,產(chǎn)生新的configEpoch,以便后續(xù)更新配置;然后調(diào)用clusterFailoverReplaceYourMaster函數(shù),轉變成為新的主節(jié)點,并將這種轉變廣播給集群中所有節(jié)點;
如果命令最后一個參數(shù)是FORCE,這表示收到命令的從節(jié)點可以直接開始選舉過程,而無需達到主節(jié)點的復制偏移量之后才開始選舉過程。因此置mf_can_start為1,這樣在函數(shù)clusterHandleSlaveFailover中,即使在主節(jié)點未下線或者當前從節(jié)點的復制數(shù)據(jù)比較舊的情況下,也可以開始故障轉移流程;
如果最后一個參數(shù)不是FORCE或TAKEOVER,這表示收到命令的從節(jié)點,首先需要向主節(jié)點發(fā)送CLUSTERMSG_TYPE_MFSTART包,因此調(diào)用clusterSendMFStart函數(shù),向其主節(jié)點發(fā)送該包;
主節(jié)點收到CLUSTERMSG_TYPE_MFSTART包后,在clusterProcessPacket函數(shù)中,是這樣處理的:
else if (type == CLUSTERMSG_TYPE_MFSTART) {
/* This message is acceptable only if I'm a master and the sender
* is one of my slaves. */
if (!sender || sender->slaveof != myself) return 1;
/* Manual failover requested from slaves. Initialize the state
* accordingly. */
resetManualFailover();
server.cluster->mf_end = mstime() + REDIS_CLUSTER_MF_TIMEOUT;
server.cluster->mf_slave = sender;
pauseClients(mstime()+(REDIS_CLUSTER_MF_TIMEOUT*2));
redisLog(REDIS_WARNING,"Manual failover requested by slave %.40s.",
sender->name);
}
如果字典中找不到發(fā)送節(jié)點,或者發(fā)送節(jié)點的主節(jié)點不是當前節(jié)點,則直接返回;
調(diào)用resetManualFailover,重置手動強制故障轉移的狀態(tài);
然后置mf_end為當前時間加5秒,該屬性表示手動強制故障轉移流程的超時時間,也用來表示當前是否正在進行手動強制故障轉移;
然后設置mf_slave為sender,該屬性表示要進行手動強制故障轉移的從節(jié)點;
然后調(diào)用pauseClients,使所有客戶端在之后的10s內(nèi)阻塞;
主節(jié)點在發(fā)送心跳包時,在構建包頭時,如果發(fā)現(xiàn)當前正處于手動強制故障轉移階段,則會在包頭中增加CLUSTERMSG_FLAG0_PAUSED標記:
void clusterBuildMessageHdr(clusterMsg *hdr, int type) {
...
/* Set the message flags. */
if (nodeIsMaster(myself) server.cluster->mf_end)
hdr->mflags[0] |= CLUSTERMSG_FLAG0_PAUSED;
...
}
從節(jié)點在clusterProcessPacket函數(shù)中處理收到的包,一旦發(fā)現(xiàn)主節(jié)點發(fā)來的,帶有CLUSTERMSG_FLAG0_PAUSED標記的包,就會將該主節(jié)點的復制偏移量記錄到server.cluster->mf_master_offset中:
int clusterProcessPacket(clusterLink *link) {
...
/* Check if the sender is a known node. */
sender = clusterLookupNode(hdr->sender);
if (sender !nodeInHandshake(sender)) {
...
/* Update the replication offset info for this node. */
sender->repl_offset = ntohu64(hdr->offset);
sender->repl_offset_time = mstime();
/* If we are a slave performing a manual failover and our master
* sent its offset while already paused, populate the MF state. */
if (server.cluster->mf_end
nodeIsSlave(myself)
myself->slaveof == sender
hdr->mflags[0] CLUSTERMSG_FLAG0_PAUSED
server.cluster->mf_master_offset == 0)
{
server.cluster->mf_master_offset = sender->repl_offset;
redisLog(REDIS_WARNING,
"Received replication offset for paused "
"master manual failover: %lld",
server.cluster->mf_master_offset);
}
}
}
從節(jié)點在集群定時器函數(shù)clusterCron中,會調(diào)用clusterHandleManualFailover函數(shù),判斷一旦當前從節(jié)點的復制偏移量達到了server.cluster->mf_master_offset,就會置server.cluster->mf_can_start為1。這樣在接下來要調(diào)用的clusterHandleSlaveFailover函數(shù)中,就會立即開始故障轉移流程了。
clusterHandleManualFailover函數(shù)的代碼如下:
void clusterHandleManualFailover(void) {
/* Return ASAP if no manual failover is in progress. */
if (server.cluster->mf_end == 0) return;
/* If mf_can_start is non-zero, the failover was already triggered so the
* next steps are performed by clusterHandleSlaveFailover(). */
if (server.cluster->mf_can_start) return;
if (server.cluster->mf_master_offset == 0) return; /* Wait for offset... */
if (server.cluster->mf_master_offset == replicationGetSlaveOffset()) {
/* Our replication offset matches the master replication offset
* announced after clients were paused. We can start the failover. */
server.cluster->mf_can_start = 1;
redisLog(REDIS_WARNING,
"All master replication stream processed, "
"manual failover can start.");
}
}
不管是從節(jié)點,還是主節(jié)點,在集群定時器函數(shù)clusterCron中,都會調(diào)用manualFailoverCheckTimeout函數(shù),一旦發(fā)現(xiàn)手動故障轉移的超時時間已到,就會重置手動故障轉移的狀態(tài),表示終止該過程。manualFailoverCheckTimeout函數(shù)代碼如下:
/* If a manual failover timed out, abort it. */
void manualFailoverCheckTimeout(void) {
if (server.cluster->mf_end server.cluster->mf_end mstime()) {
redisLog(REDIS_WARNING,"Manual failover timed out.");
resetManualFailover();
}
}
二:從節(jié)點遷移
在Redis集群中,為了增強集群的可用性,一般情況下需要為每個主節(jié)點配置若干從節(jié)點。但是這種主從關系如果是固定不變的,則經(jīng)過一段時間之后,就有可能出現(xiàn)孤立主節(jié)點的情況,也就是一個主節(jié)點再也沒有可用于故障轉移的從節(jié)點了,一旦這樣的主節(jié)點下線,整個集群也就不可用了。
因此,在Redis集群中,增加了從節(jié)點遷移的功能。簡單描述如下:一旦發(fā)現(xiàn)集群中出現(xiàn)了孤立主節(jié)點,則某個從節(jié)點A就會自動變成該孤立主節(jié)點的從節(jié)點。該從節(jié)點A滿足這樣的條件:A的主節(jié)點具有最多的附屬從節(jié)點;A在這些附屬從節(jié)點中,節(jié)點ID是最小的(The acting slave is the slave among the masterswith the maximum number of attached slaves, that is not in FAIL state and hasthe smallest node ID)。
該功能是在集群定時器函數(shù)clusterCron中實現(xiàn)的。這部分的代碼如下:
void clusterCron(void) {
...
orphaned_masters = 0;
max_slaves = 0;
this_slaves = 0;
di = dictGetSafeIterator(server.cluster->nodes);
while((de = dictNext(di)) != NULL) {
clusterNode *node = dictGetVal(de);
now = mstime(); /* Use an updated time at every iteration. */
mstime_t delay;
if (node->flags
(REDIS_NODE_MYSELF|REDIS_NODE_NOADDR|REDIS_NODE_HANDSHAKE))
continue;
/* Orphaned master check, useful only if the current instance
* is a slave that may migrate to another master. */
if (nodeIsSlave(myself) nodeIsMaster(node) !nodeFailed(node)) {
int okslaves = clusterCountNonFailingSlaves(node);
/* A master is orphaned if it is serving a non-zero number of
* slots, have no working slaves, but used to have at least one
* slave. */
if (okslaves == 0 node->numslots > 0 node->numslaves)
orphaned_masters++;
if (okslaves > max_slaves) max_slaves = okslaves;
if (nodeIsSlave(myself) myself->slaveof == node)
this_slaves = okslaves;
}
...
}
...
if (nodeIsSlave(myself)) {
...
/* If there are orphaned slaves, and we are a slave among the masters
* with the max number of non-failing slaves, consider migrating to
* the orphaned masters. Note that it does not make sense to try
* a migration if there is no master with at least *two* working
* slaves. */
if (orphaned_masters max_slaves >= 2 this_slaves == max_slaves)
clusterHandleSlaveMigration(max_slaves);
}
...
}
輪訓字典server.cluster->nodes,只要其中的節(jié)點不是當前節(jié)點,沒有處于REDIS_NODE_NOADDR或者握手狀態(tài),就對該node節(jié)點做相應的處理:
如果當前節(jié)點是從節(jié)點,并且node節(jié)點是主節(jié)點,并且node未被標記為下線,則首先調(diào)用函數(shù)clusterCountNonFailingSlaves,計算node節(jié)點未下線的從節(jié)點個數(shù)okslaves,如果node主節(jié)點的okslaves為0,并且該主節(jié)點負責的插槽數(shù)不為0,說明該node主節(jié)點是孤立主節(jié)點,因此增加orphaned_masters的值;如果該node主節(jié)點的okslaves大于max_slaves,則將max_slaves改為okslaves,因此,max_slaves記錄了所有主節(jié)點中,擁有最多未下線從節(jié)點的那個主節(jié)點的未下線從節(jié)點個數(shù);如果當前節(jié)點正好是node主節(jié)點的從節(jié)點之一,則將okslaves記錄到this_slaves中,以上都是為后續(xù)做從節(jié)點遷移做的準備;
輪訓完所有節(jié)點之后,如果存在孤立主節(jié)點,并且max_slaves大于等于2,并且當前節(jié)點剛好是那個擁有最多未下線從節(jié)點的主節(jié)點的眾多從節(jié)點之一,則調(diào)用函數(shù)clusterHandleSlaveMigration,滿足條件的情況下,進行從節(jié)點遷移,也就是將當前從節(jié)點置為某孤立主節(jié)點的從節(jié)點。
clusterHandleSlaveMigration函數(shù)的代碼如下:
void clusterHandleSlaveMigration(int max_slaves) {
int j, okslaves = 0;
clusterNode *mymaster = myself->slaveof, *target = NULL, *candidate = NULL;
dictIterator *di;
dictEntry *de;
/* Step 1: Don't migrate if the cluster state is not ok. */
if (server.cluster->state != REDIS_CLUSTER_OK) return;
/* Step 2: Don't migrate if my master will not be left with at least
* 'migration-barrier' slaves after my migration. */
if (mymaster == NULL) return;
for (j = 0; j mymaster->numslaves; j++)
if (!nodeFailed(mymaster->slaves[j])
!nodeTimedOut(mymaster->slaves[j])) okslaves++;
if (okslaves = server.cluster_migration_barrier) return;
/* Step 3: Idenitfy a candidate for migration, and check if among the
* masters with the greatest number of ok slaves, I'm the one with the
* smaller node ID.
*
* Note that this means that eventually a replica migration will occurr
* since slaves that are reachable again always have their FAIL flag
* cleared. At the same time this does not mean that there are no
* race conditions possible (two slaves migrating at the same time), but
* this is extremely unlikely to happen, and harmless. */
candidate = myself;
di = dictGetSafeIterator(server.cluster->nodes);
while((de = dictNext(di)) != NULL) {
clusterNode *node = dictGetVal(de);
int okslaves;
/* Only iterate over working masters. */
if (nodeIsSlave(node) || nodeFailed(node)) continue;
/* If this master never had slaves so far, don't migrate. We want
* to migrate to a master that remained orphaned, not masters that
* were never configured to have slaves. */
if (node->numslaves == 0) continue;
okslaves = clusterCountNonFailingSlaves(node);
if (okslaves == 0 target == NULL node->numslots > 0)
target = node;
if (okslaves == max_slaves) {
for (j = 0; j node->numslaves; j++) {
if (memcmp(node->slaves[j]->name,
candidate->name,
REDIS_CLUSTER_NAMELEN) 0)
{
candidate = node->slaves[j];
}
}
}
}
dictReleaseIterator(di);
/* Step 4: perform the migration if there is a target, and if I'm the
* candidate. */
if (target candidate == myself) {
redisLog(REDIS_WARNING,"Migrating to orphaned master %.40s",
target->name);
clusterSetMaster(target);
}
}
如果當前集群狀態(tài)不是REDIS_CLUSTER_OK,則直接返回;如果當前從節(jié)點沒有主節(jié)點,則直接返回;
接下來計算,當前從節(jié)點的主節(jié)點,具有未下線從節(jié)點的個數(shù)okslaves;如果okslaves小于等于遷移閾值server.cluster_migration_barrier,則直接返回;
接下來,開始輪訓字典server.cluster->nodes,針對其中的每一個節(jié)點node:
如果node節(jié)點是從節(jié)點,或者處于下線狀態(tài),則直接處理下一個節(jié)點;如果node節(jié)點沒有配置從節(jié)點,則直接處理下一個節(jié)點;
調(diào)用clusterCountNonFailingSlaves函數(shù),計算該node節(jié)點的未下線主節(jié)點數(shù)okslaves;如果okslaves為0,并且該node節(jié)點的numslots大于0,說明該主節(jié)點之前有從節(jié)點,但是都下線了,因此找到了一個孤立主節(jié)點target;
如果okslaves等于參數(shù)max_slaves,說明該node節(jié)點就是具有最多未下線從節(jié)點的主節(jié)點,因此將當前節(jié)點的節(jié)點ID,與其所有從節(jié)點的節(jié)點ID進行比較,如果當前節(jié)點的名字更大,則將candidate置為具有更小名字的那個從節(jié)點;(其實從這里就可以直接退出返回了)
輪訓完所有節(jié)點后,如果找到了孤立節(jié)點,并且當前節(jié)點擁有最小的節(jié)點ID,則調(diào)用clusterSetMaster,將target置為當前節(jié)點的主節(jié)點,并開始主從復制流程。
三:configEpoch沖突問題
在集群中,負責不同槽位的主節(jié)點,具有相同的configEpoch其實是沒有問題的,但是有可能因為人為介入的原因或者BUG的問題,導致具有相同configEpoch的主節(jié)點都宣稱負責相同的槽位,這在分布式系統(tǒng)中是致命的問題;因此,Redis規(guī)定集群中的所有節(jié)點,必須具有不同的configEpoch。
當某個從節(jié)點升級為新主節(jié)點時,它會得到一個大于當前所有節(jié)點的configEpoch的新configEpoch,所以不會導致具有重復configEpoch的從節(jié)點(因為一次選舉中,不會有兩個從節(jié)點同時勝出)。但是在管理員發(fā)起的重新分片過程的最后,遷入槽位的節(jié)點會自己更新自己的configEpoch,而無需其他節(jié)點的同意;或者手動強制故障轉移過程,也會導致從節(jié)點在無需其他節(jié)點同意的情況下更新configEpoch,以上的情況都可能導致出現(xiàn)多個主節(jié)點具有相同configEpoch的情況。
因此,就需要一種算法,保證集群中所有節(jié)點的configEpoch都不相同。這種算法是這樣實現(xiàn)的:當某個主節(jié)點收到其他主節(jié)點發(fā)來的心跳包后,發(fā)現(xiàn)包中的configEpoch與自己的configEpoch相同,就會調(diào)用clusterHandleConfigEpochCollision函數(shù),解決這種configEpoch沖突的問題。
clusterHandleConfigEpochCollision函數(shù)的代碼如下:
void clusterHandleConfigEpochCollision(clusterNode *sender) {
/* Prerequisites: nodes have the same configEpoch and are both masters. */
if (sender->configEpoch != myself->configEpoch ||
!nodeIsMaster(sender) || !nodeIsMaster(myself)) return;
/* Don't act if the colliding node has a smaller Node ID. */
if (memcmp(sender->name,myself->name,REDIS_CLUSTER_NAMELEN) = 0) return;
/* Get the next ID available at the best of this node knowledge. */
server.cluster->currentEpoch++;
myself->configEpoch = server.cluster->currentEpoch;
clusterSaveConfigOrDie(1);
redisLog(REDIS_VERBOSE,
"WARNING: configEpoch collision with node %.40s."
" configEpoch set to %llu",
sender->name,
(unsigned long long) myself->configEpoch);
}
如果發(fā)送節(jié)點的configEpoch不等于當前節(jié)點的configEpoch,或者發(fā)送節(jié)點不是主節(jié)點,或者當前節(jié)點不是主節(jié)點,則直接返回;
如果相比于當前節(jié)點的節(jié)點ID,發(fā)送節(jié)點的節(jié)點ID更小,則直接返回;
因此,較小名字的節(jié)點能獲得更大的configEpoch,接下來首先增加自己的currentEpoch,然后將configEpoch賦值為currentEpoch。
這樣,即使有多個節(jié)點具有相同的configEpoch,最終,只有具有最大節(jié)點ID的節(jié)點的configEpoch保持不變,其他節(jié)點都會增加自己的configEpoch,而且增加的值會不同,具有最小NODE ID的節(jié)點,最終具有最大的configEpoch。
總結
以上就是本文關于Redis源碼解析:集群手動故障轉移、從節(jié)點遷移詳解的全部內(nèi)容,感興趣的朋友可以參閱:詳細分析Redis集群故障、簡述Redis和MySQL的區(qū)別、Spring AOP實現(xiàn)Redis緩存數(shù)據(jù)庫查詢源碼等,有不足之處,請留言指出,感謝朋友們對本站的支持!
您可能感興趣的文章:- 從MySQL到Redis的簡單數(shù)據(jù)庫遷移方法
- Redis數(shù)據(jù)導入導出以及數(shù)據(jù)遷移的4種方法詳解
- php實現(xiàn)redis數(shù)據(jù)庫指定庫號遷移的方法
- Redis migrate數(shù)據(jù)遷移工具的使用教程