前言
nginx采用多進(jìn)程的模,當(dāng)一個(gè)請(qǐng)求過(guò)來(lái)的時(shí)候,系統(tǒng)會(huì)對(duì)進(jìn)程進(jìn)行加鎖操作,保證只有一個(gè)進(jìn)程來(lái)接受請(qǐng)求。
本文基于Nginx 0.8.55源代碼,并基于epoll機(jī)制分析
1. accept鎖的實(shí)現(xiàn)
1.1 accpet鎖是個(gè)什么東西
提到accept鎖,就不得不提起驚群?jiǎn)栴}。
所謂驚群?jiǎn)栴},就是指的像Nginx這種多進(jìn)程的服務(wù)器,在fork后同時(shí)監(jiān)聽同一個(gè)端口時(shí),如果有一個(gè)外部連接進(jìn)來(lái),會(huì)導(dǎo)致所有休眠的子進(jìn)程被喚醒,而最終只有一個(gè)子進(jìn)程能夠成功處理accept事件,其他進(jìn)程都會(huì)重新進(jìn)入休眠中。這就導(dǎo)致出現(xiàn)了很多不必要的schedule和上下文切換,而這些開銷是完全不必要的。
而在Linux內(nèi)核的較新版本中,accept調(diào)用本身所引起的驚群?jiǎn)栴}已經(jīng)得到了解決,但是在Nginx中,accept是交給epoll機(jī)制來(lái)處理的,epoll的accept帶來(lái)的驚群?jiǎn)栴}并沒有得到解決(應(yīng)該是epoll_wait本身并沒有區(qū)別讀事件是否來(lái)自于一個(gè)Listen套接字的能力,所以所有監(jiān)聽這個(gè)事件的進(jìn)程會(huì)被這個(gè)epoll_wait喚醒。),所以Nginx的accept驚群?jiǎn)栴}仍然需要定制一個(gè)自己的解決方案。
accept鎖就是nginx的解決方案,本質(zhì)上這是一個(gè)跨進(jìn)程的互斥鎖,以這個(gè)互斥鎖來(lái)保證只有一個(gè)進(jìn)程具備監(jiān)聽accept事件的能力。
實(shí)現(xiàn)上accept鎖是一個(gè)跨進(jìn)程鎖,其在Nginx中是一個(gè)全局變量,聲明如下:
ngx_shmtx_t ngx_accept_mutex;
這是一個(gè)在event模塊初始化時(shí)就分配好的鎖,放在一塊進(jìn)程間共享的內(nèi)存中,以保證所有進(jìn)程都能訪問這一個(gè)實(shí)例,其加鎖解鎖是借由linux的原子變量來(lái)做CAS,如果加鎖失敗則立即返回,是一種非阻塞的鎖。加解鎖代碼如下:
static ngx_inline ngx_uint_t
ngx_shmtx_trylock(ngx_shmtx_t *mtx)
{
return (*mtx->lock == 0 && ngx_atomic_cmp_set(mtx->lock, 0, ngx_pid));
}
#define ngx_shmtx_lock(mtx) ngx_spinlock((mtx)->lock, ngx_pid, 1024)
#define ngx_shmtx_unlock(mtx) (void) ngx_atomic_cmp_set((mtx)->lock, ngx_pid, 0)
可以看出,調(diào)用ngx_shmtx_trylock失敗后會(huì)立刻返回而不會(huì)阻塞。
1.2 accept鎖如何保證只有一個(gè)進(jìn)程能夠處理新連接
要解決epoll帶來(lái)的accept鎖的問題也很簡(jiǎn)單,只需要保證同一時(shí)間只有一個(gè)進(jìn)程注冊(cè)了accept的epoll事件即可。
Nginx采用的處理模式也沒什么特別的,大概就是如下的邏輯:
嘗試獲取accept鎖
if 獲取成功:
在epoll中注冊(cè)accept事件
else:
在epoll中注銷accept事件
處理所有事件
釋放accept鎖
當(dāng)然這里忽略了延后事件的處理,這部分我們放到后面討論。
對(duì)于accept鎖的處理和epoll中注冊(cè)注銷accept事件的的處理都是在ngx_trylock_accept_mutex中進(jìn)行的。而這一系列過(guò)程則是在nginx主體循環(huán)中反復(fù)調(diào)用的void ngx_process_events_and_timers(ngx_cycle_t *cycle)中進(jìn)行。
也就是說(shuō),每輪事件的處理都會(huì)首先競(jìng)爭(zhēng)accept鎖,競(jìng)爭(zhēng)成功則在epoll中注冊(cè)accept事件,失敗則注銷accept事件,然后處理完事件之后,釋放accept鎖。由此只有一個(gè)進(jìn)程監(jiān)聽一個(gè)listen套接字,從而避免了驚群?jiǎn)栴}。
1.3 事件處理機(jī)制為不長(zhǎng)時(shí)間占用accept鎖作了哪些努力
accept鎖處理驚群?jiǎn)栴}的方案看起來(lái)似乎很美,但如果完全使用上述邏輯,就會(huì)有一個(gè)問題:如果服務(wù)器非常忙,有非常多事件要處理,那么“處理所有事件這一步”就會(huì)消耗非常長(zhǎng)的時(shí)間,也就是說(shuō),某一個(gè)進(jìn)程長(zhǎng)時(shí)間占用accept鎖,而又無(wú)暇處理新連接;其他進(jìn)程又沒有占用accept鎖,同樣無(wú)法處理新連接——至此,新連接就處于無(wú)人處理的狀態(tài),這對(duì)服務(wù)的實(shí)時(shí)性無(wú)疑是很要命的。
為了解決這個(gè)問題,Nginx采用了將事件處理延后的方式。即在ngx_process_events的處理中,僅僅將事件放入兩個(gè)隊(duì)列中:
ngx_thread_volatile ngx_event_t *ngx_posted_accept_events;
ngx_thread_volatile ngx_event_t *ngx_posted_events;
返回后先處理ngx_posted_accept_events后立刻釋放accept鎖,然后再慢慢處理其他事件。
即ngx_process_events僅對(duì)epoll_wait進(jìn)行處理,事件的消費(fèi)則放到accept鎖釋放之后,來(lái)最大限度地縮短占有accept的時(shí)間,來(lái)讓其他進(jìn)程也有足夠的時(shí)機(jī)處理accept事件。
那么具體是怎么實(shí)現(xiàn)的呢?其實(shí)就是在static ngx_int_t ngx_epoll_process_events(ngx_cycle_t *cycle, ngx_msec_t timer, ngx_uint_t flags)
的flags參數(shù)中傳入一個(gè)NGX_POST_EVENTS的標(biāo)志位,處理事件時(shí)檢查這個(gè)標(biāo)志位即可。
這里只是避免了事件的消費(fèi)對(duì)于accept鎖的長(zhǎng)期占用,那么萬(wàn)一epoll_wait本身占用的時(shí)間很長(zhǎng)呢?這種事情也不是不可能發(fā)生。這方面的處理也很簡(jiǎn)單,epoll_wait本身是有超時(shí)時(shí)間的,限制住它的值就可以了,這個(gè)參數(shù)保存在ngx_accept_mutex_delay這個(gè)全局變量中。
下面放上ngx_process_events_and_timers 的實(shí)現(xiàn)代碼,可以大概一觀相關(guān)的處理:
void
ngx_process_events_and_timers(ngx_cycle_t *cycle)
{
ngx_uint_t flags;
ngx_msec_t timer, delta;
/* 省略一些處理時(shí)間事件的代碼 */
// 這里是處理負(fù)載均衡鎖和accept鎖的時(shí)機(jī)
if (ngx_use_accept_mutex) {
// 如果負(fù)載均衡token的值大于0, 則說(shuō)明負(fù)載已滿,此時(shí)不再處理accept, 同時(shí)把這個(gè)值減一
if (ngx_accept_disabled > 0) {
ngx_accept_disabled--;
} else {
// 嘗試拿到accept鎖
if (ngx_trylock_accept_mutex(cycle) == NGX_ERROR) {
return;
}
// 拿到鎖之后把flag加上post標(biāo)志,讓所有事件的處理都延后
// 以免太長(zhǎng)時(shí)間占用accept鎖
if (ngx_accept_mutex_held) {
flags |= NGX_POST_EVENTS;
} else {
if (timer == NGX_TIMER_INFINITE
|| timer > ngx_accept_mutex_delay)
{
timer = ngx_accept_mutex_delay; // 最多等ngx_accept_mutex_delay個(gè)毫秒,防止占用太久accept鎖
}
}
}
}
delta = ngx_current_msec;
// 調(diào)用事件處理模塊的process_events,處理一個(gè)epoll_wait的方法
(void) ngx_process_events(cycle, timer, flags);
delta = ngx_current_msec - delta; //計(jì)算處理events事件所消耗的時(shí)間
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
"timer delta: %M", delta);
// 如果有延后處理的accept事件,那么延后處理這個(gè)事件
if (ngx_posted_accept_events) {
ngx_event_process_posted(cycle, &ngx_posted_accept_events);
}
// 釋放accept鎖
if (ngx_accept_mutex_held) {
ngx_shmtx_unlock(&ngx_accept_mutex);
}
// 處理所有的超時(shí)事件
if (delta) {
ngx_event_expire_timers();
}
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
"posted events %p", ngx_posted_events);
if (ngx_posted_events) {
if (ngx_threaded) {
ngx_wakeup_worker_thread(cycle);
} else {
// 處理所有的延后事件
ngx_event_process_posted(cycle, &ngx_posted_events);
}
}
}
再來(lái)看看ngx_epoll_process_events的相關(guān)處理:
// 讀事件
if ((revents & EPOLLIN) && rev->active) {
if ((flags & NGX_POST_THREAD_EVENTS) && !rev->accept) {
rev->posted_ready = 1;
} else {
rev->ready = 1;
}
if (flags & NGX_POST_EVENTS) {
queue = (ngx_event_t **) (rev->accept ?
&ngx_posted_accept_events : &ngx_posted_events);
ngx_locked_post_event(rev, queue);
} else {
rev->handler(rev);
}
}
wev = c->write;
// 寫事件
if ((revents & EPOLLOUT) && wev->active) {
if (flags & NGX_POST_THREAD_EVENTS) {
wev->posted_ready = 1;
} else {
wev->ready = 1;
}
if (flags & NGX_POST_EVENTS) {
ngx_locked_post_event(wev, &ngx_posted_events);
} else {
wev->handler(wev);
}
}
處理也相對(duì)簡(jiǎn)單,如果拿到了accept鎖,就會(huì)有NGX_POST_EVENTS標(biāo)志那么就會(huì)放到相應(yīng)的隊(duì)列中。沒有的話就會(huì)直接處理事件。
總結(jié)
以上就是這篇文章的全部?jī)?nèi)容了,希望本文的內(nèi)容對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,如果有疑問大家可以留言交流,謝謝大家對(duì)腳本之家的支持。