worker進(jìn)程啟動(dòng)后,其首先會(huì)初始化自身運(yùn)行所需要的環(huán)境,然后會(huì)進(jìn)入一個(gè)循環(huán),在該循環(huán)中不斷檢查是否有需要執(zhí)行的事件,然后處理事件。在這個(gè)過(guò)程中,worker進(jìn)程也是需要與master進(jìn)程交互的,更有甚者,worker進(jìn)程作為一個(gè)子進(jìn)程,也是可以接收命令行指令(比如kill等)以進(jìn)行相應(yīng)邏輯的處理的。那么worker進(jìn)程是如何與master或者命令行指令進(jìn)行交互的呢?本文首先會(huì)對(duì)worker進(jìn)程與master進(jìn)程交互方式,以及worker進(jìn)程如何處理命令行指令的流程進(jìn)行講解,然后會(huì)從源碼上對(duì)worker進(jìn)程交互的整個(gè)工作流程進(jìn)行介紹。
1. worker與master進(jìn)程交互方式
這里首先需要說(shuō)明的是,無(wú)論是master還是外部命令的方式,nginx都是通過(guò)標(biāo)志位的方式來(lái)處理相應(yīng)的指令的,也即在接收到一個(gè)指令(無(wú)論是master還是外部命令)的時(shí)候,worker會(huì)在其回調(diào)方法中設(shè)置與該指令相對(duì)應(yīng)的標(biāo)志位,然后在worker進(jìn)程在其自身的循環(huán)中處理完事件之后會(huì)依次檢查這些標(biāo)志位是否為真,是則根據(jù)該標(biāo)志位的作用執(zhí)行相應(yīng)的邏輯。
對(duì)于worker進(jìn)程與master進(jìn)程的交互,其是通過(guò)socket管道的方式進(jìn)行的。在ngx_process.h文件中聲明了一個(gè)ngx_process_t結(jié)構(gòu)體,這里我們主要關(guān)注其channel屬性:
typedef struct {
// 其余屬性...
ngx_socket_t channel[2];
} ngx_process_t;
這里的ngx_process_t結(jié)構(gòu)體的作用是存儲(chǔ)某個(gè)進(jìn)程相關(guān)的信息的,比如pid、channel、status等。每個(gè)進(jìn)程中都有一個(gè)ngx_processes數(shù)組,數(shù)組元素就是這里的ngx_process_t結(jié)構(gòu)體,也就是說(shuō)每個(gè)進(jìn)程都會(huì)通過(guò)ngx_processes數(shù)組保存其余進(jìn)程的基本信息。其聲明如下:
// 存儲(chǔ)了nginx中所有的子進(jìn)程數(shù)組,每個(gè)子進(jìn)程都有一個(gè)對(duì)應(yīng)的ngx_process_t結(jié)構(gòu)體進(jìn)行標(biāo)記
extern ngx_process_t ngx_processes[NGX_MAX_PROCESSES];
這里我們就可以看出,每個(gè)進(jìn)程都會(huì)一個(gè)與之對(duì)應(yīng)的channel數(shù)組,這個(gè)數(shù)組的長(zhǎng)度為2,其是與master進(jìn)程進(jìn)行交互的管道流。在master進(jìn)程創(chuàng)建每一個(gè)子進(jìn)程的之前,都會(huì)創(chuàng)建一個(gè)channel數(shù)組,該數(shù)組的創(chuàng)建方法為:
int socketpair(int domain, int type, int protocol, int sv[2]);
這個(gè)方法的主要作用是創(chuàng)建一對(duì)匿名的已經(jīng)連接的套接字,也就是說(shuō),如果在一個(gè)套接字中寫入數(shù)據(jù),那么在另一個(gè)套接字中就可以接收到寫入的數(shù)據(jù)。通過(guò)這種方式,如果在父進(jìn)程中往管道的一邊寫入數(shù)據(jù),那么在子進(jìn)程就可以在另一邊接收到數(shù)據(jù),這樣就可以實(shí)現(xiàn)父子進(jìn)程的數(shù)據(jù)通信了。
在master進(jìn)程啟動(dòng)完子進(jìn)程之后,子進(jìn)程會(huì)保有master進(jìn)程中相應(yīng)的數(shù)據(jù),也包括這里的channel數(shù)組。如此,master進(jìn)程就可以通過(guò)channel數(shù)組實(shí)現(xiàn)與子進(jìn)程的通信了。
2. worker處理外部命令
對(duì)于外部命令,其本質(zhì)上是通過(guò)signals數(shù)組中定義的各個(gè)信號(hào)以及回調(diào)方法進(jìn)行處理的。在master進(jìn)程初始化基本環(huán)境的時(shí)候,會(huì)將signals數(shù)組中指定的信號(hào)回調(diào)方法設(shè)置到對(duì)應(yīng)的信號(hào)中。由于worker進(jìn)程會(huì)繼承master進(jìn)程的基本環(huán)境,因而worker進(jìn)程在接收到這里設(shè)置的信號(hào)之后,也會(huì)調(diào)用對(duì)應(yīng)的回調(diào)方法。而該回調(diào)方法的主要邏輯也僅僅只是設(shè)置相應(yīng)的標(biāo)志位的值。關(guān)于nginx接收到信號(hào)之后如何設(shè)置對(duì)應(yīng)的標(biāo)志位,可以參照本人前面的文章(nginx master工作循環(huán) 超鏈接),這里不再贅述。
3. 源碼講解
master進(jìn)程是通過(guò)ngx_start_worker_processes()方法啟動(dòng)各個(gè)子進(jìn)程的,如下是該方法源碼:
/**
* 啟動(dòng)n個(gè)worker子進(jìn)程,并設(shè)置好每個(gè)子進(jìn)程與master父進(jìn)程之間使用socketpair
* 系統(tǒng)調(diào)用建立起來(lái)的socket句柄通信機(jī)制
*/
static void ngx_start_worker_processes(ngx_cycle_t *cycle, ngx_int_t n, ngx_int_t type) {
ngx_int_t i;
ngx_channel_t ch;
ngx_memzero(&ch, sizeof(ngx_channel_t));
ch.command = NGX_CMD_OPEN_CHANNEL;
for (i = 0; i < n; i++) {
// spawn是產(chǎn)卵的意思,這里就是生成一個(gè)子進(jìn)程的意思,而該子進(jìn)程所進(jìn)行的事件循環(huán)就是
// ngx_worker_process_cycle()方法,這里的ngx_worker_process_cycle是worker進(jìn)程處理事件的循環(huán),
// worker進(jìn)程在一個(gè)無(wú)限for循環(huán)中,不斷的檢查相應(yīng)的事件模型中是否存在對(duì)應(yīng)的事件,
// 然后將accept事件和read、write事件分開放入兩個(gè)隊(duì)列中,最后在事件循環(huán)中不斷的處理事件
ngx_spawn_process(cycle, ngx_worker_process_cycle,
(void *) (intptr_t) i, "worker process", type);
// 下面的這段代碼的主要作用是將新建進(jìn)程這個(gè)事件通知到其他的進(jìn)程,上面的
// ch.command = NGX_CMD_OPEN_CHANNEL;中NGX_CMD_OPEN_CHANNEL表示的就是當(dāng)前是新建了一個(gè)進(jìn)程,
// 而ngx_process_slot存儲(chǔ)的就是該新建進(jìn)程所存放的數(shù)組位置,這里需要進(jìn)行廣播的原因在于,
// 每個(gè)子進(jìn)程被創(chuàng)建后,其內(nèi)存數(shù)據(jù)都是復(fù)制的父進(jìn)程的,但是ngx_processes數(shù)組是每個(gè)進(jìn)程都有一份的,
// 因而數(shù)組中先創(chuàng)建的子進(jìn)程是沒有后創(chuàng)建的子進(jìn)程的數(shù)據(jù)的,但是master進(jìn)程是有所有子進(jìn)程的數(shù)據(jù)的,
// 因而這里master進(jìn)程創(chuàng)建子進(jìn)程之后,其就會(huì)向ngx_processes數(shù)組的每個(gè)進(jìn)程的channel[0]上
// 寫入當(dāng)前廣播的事件,也即這里的ch,通過(guò)這種方式,每個(gè)子進(jìn)程接收到這個(gè)事件之后,
// 都會(huì)嘗試更新其所保存的ngx_processes數(shù)據(jù)信息
ch.pid = ngx_processes[ngx_process_slot].pid;
ch.slot = ngx_process_slot;
ch.fd = ngx_processes[ngx_process_slot].channel[0];
// 廣播事件
ngx_pass_open_channel(cycle, &ch);
}
}
這里我們主要需要關(guān)注上面的啟動(dòng)子進(jìn)程的方法調(diào)用,也即這里的ngx_spawn_process()方法,該方法的第二個(gè)參數(shù)是一個(gè)方法,在啟動(dòng)子進(jìn)程之后,子進(jìn)程就會(huì)進(jìn)入該方法所指定的循環(huán)中。而在ngx_spawn_process()方法中,master進(jìn)程會(huì)為當(dāng)前新創(chuàng)建的子進(jìn)程創(chuàng)建一個(gè)channel數(shù)組,以用于與當(dāng)前子進(jìn)程進(jìn)行通信。如下是ngx_spawn_process()方法的源碼:
ngx_pid_t ngx_spawn_process(ngx_cycle_t *cycle, ngx_spawn_proc_pt proc, void *data, char *name, ngx_int_t respawn) {
u_long on;
ngx_pid_t pid;
ngx_int_t s;
if (respawn >= 0) {
s = respawn;
} else {
// 在ngx_processes數(shù)組中存儲(chǔ)了當(dāng)前創(chuàng)建的所有進(jìn)程,而ngx_last_process則是當(dāng)前當(dāng)前記錄的最后一個(gè)
// process在ngx_processes中的下一個(gè)位置的索引,只不過(guò)ngx_processes中記錄的進(jìn)程有可能有部分
// 已經(jīng)失效了。當(dāng)前循環(huán)就是從頭開始查找是否有某個(gè)進(jìn)程已經(jīng)失效了,如果已經(jīng)失效了,則復(fù)用該進(jìn)程位置,
// 否則直接使用ngx_last_process所指向的位置
for (s = 0; s < ngx_last_process; s++) {
if (ngx_processes[s].pid == -1) {
break;
}
}
// 這里說(shuō)明所創(chuàng)建的進(jìn)程數(shù)達(dá)到了最大限度
if (s == NGX_MAX_PROCESSES) {
ngx_log_error(NGX_LOG_ALERT, cycle->log, 0,
"no more than %d processes can be spawned",
NGX_MAX_PROCESSES);
return NGX_INVALID_PID;
}
}
// NGX_PROCESS_DETACHED標(biāo)志表示當(dāng)前fork出來(lái)的進(jìn)程與原來(lái)的父進(jìn)程沒有任何關(guān)系,比如進(jìn)行nginx升級(jí)時(shí),
// 新生成的master進(jìn)程就與原先的master進(jìn)程沒有關(guān)系
if (respawn != NGX_PROCESS_DETACHED) {
/* Solaris 9 still has no AF_LOCAL */
// 這里的socketpair()方法的主要作用是生成一對(duì)套接字流,用于主進(jìn)程和子進(jìn)程的通信,這一對(duì)套接字會(huì)
// 存儲(chǔ)在ngx_processes[s].channel中,本質(zhì)上這個(gè)字段是一個(gè)長(zhǎng)度為2的整型數(shù)組。在主進(jìn)程和子進(jìn)程
// 進(jìn)行通信的之前,主進(jìn)程會(huì)關(guān)閉其中一個(gè),而子進(jìn)程會(huì)關(guān)閉另一個(gè),然后相互之間往未關(guān)閉的另一個(gè)文件描述符中
// 寫入或讀取數(shù)據(jù)即可實(shí)現(xiàn)通信。
// AF_UNIX表示當(dāng)前使用的是UNIX文件形式的socket地址族
// SOCK_STREAM指定了當(dāng)前套接字建立的通信方式是管道流,并且這個(gè)管道流是雙向的,
// 即管道雙方都可以進(jìn)行讀寫操作
// 第三個(gè)參數(shù)protocol必須為0
if (socketpair(AF_UNIX, SOCK_STREAM, 0, ngx_processes[s].channel) == -1) {
ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
"socketpair() failed while spawning \"%s\"", name);
return NGX_INVALID_PID;
}
ngx_log_debug2(NGX_LOG_DEBUG_CORE, cycle->log, 0,
"channel %d:%d",
ngx_processes[s].channel[0],
ngx_processes[s].channel[1]);
// 將ngx_processes[s].channel[0]設(shè)置為非阻塞模式
if (ngx_nonblocking(ngx_processes[s].channel[0]) == -1) {
ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
ngx_nonblocking_n
" failed while spawning \"%s\"",
name);
ngx_close_channel(ngx_processes[s].channel, cycle->log);
return NGX_INVALID_PID;
}
// 將ngx_processes[s].channel[1]設(shè)置為非阻塞模式
if (ngx_nonblocking(ngx_processes[s].channel[1]) == -1) {
ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
ngx_nonblocking_n
" failed while spawning \"%s\"",
name);
ngx_close_channel(ngx_processes[s].channel, cycle->log);
return NGX_INVALID_PID;
}
on = 1;
// 將ngx_processes[s].channel[0]套接字管道設(shè)置為異步模式
if (ioctl(ngx_processes[s].channel[0], FIOASYNC, &on) == -1) {
ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
"ioctl(FIOASYNC) failed while spawning \"%s\"", name);
ngx_close_channel(ngx_processes[s].channel, cycle->log);
return NGX_INVALID_PID;
}
// 當(dāng)前還處于主進(jìn)程中,這里的ngx_pid指向了主進(jìn)程的進(jìn)程id,當(dāng)前方法的作用主要是將
// ngx_processes[s].channel[0]的操作權(quán)限設(shè)置給主進(jìn)程,也就是說(shuō)主進(jìn)程通過(guò)向
// ngx_processes[s].channel[0]寫入和讀取數(shù)據(jù)來(lái)與子進(jìn)程進(jìn)行通信
if (fcntl(ngx_processes[s].channel[0], F_SETOWN, ngx_pid) == -1) {
ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
"fcntl(F_SETOWN) failed while spawning \"%s\"", name);
ngx_close_channel(ngx_processes[s].channel, cycle->log);
return NGX_INVALID_PID;
}
// FD_CLOEXEC表示當(dāng)前指定的套接字管道在子進(jìn)程中可以使用,但是在execl()執(zhí)行的程序中不可使用
if (fcntl(ngx_processes[s].channel[0], F_SETFD, FD_CLOEXEC) == -1) {
ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
"fcntl(FD_CLOEXEC) failed while spawning \"%s\"",
name);
ngx_close_channel(ngx_processes[s].channel, cycle->log);
return NGX_INVALID_PID;
}
// FD_CLOEXEC表示當(dāng)前指定的套接字管道在子進(jìn)程中可以使用,但是在execl()執(zhí)行的程序中不可使用
if (fcntl(ngx_processes[s].channel[1], F_SETFD, FD_CLOEXEC) == -1) {
ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
"fcntl(FD_CLOEXEC) failed while spawning \"%s\"",
name);
ngx_close_channel(ngx_processes[s].channel, cycle->log);
return NGX_INVALID_PID;
}
// ngx_processes[s].channel[1]是用于給子進(jìn)程監(jiān)聽相關(guān)事件使用的,當(dāng)父進(jìn)程向
// ngx_processes[s].channel[0]發(fā)布事件之后,ngx_processes[s].channel[1]中就會(huì)接收到
// 對(duì)應(yīng)的事件,從而進(jìn)行相應(yīng)的處理
ngx_channel = ngx_processes[s].channel[1];
} else {
// 如果是NGX_PROCESS_DETACHED模式,則表示當(dāng)前是另外新起的一個(gè)master進(jìn)程,因而將其管道值都置為-1
ngx_processes[s].channel[0] = -1;
ngx_processes[s].channel[1] = -1;
}
ngx_process_slot = s;
// fork()方法將產(chǎn)生一個(gè)新的進(jìn)程,這個(gè)進(jìn)程與父進(jìn)程的關(guān)系是子進(jìn)程的內(nèi)存數(shù)據(jù)將完全復(fù)制父進(jìn)程的。
// 還需要注意的是,fork()出來(lái)的子進(jìn)程執(zhí)行的代碼是從fork()之后開始執(zhí)行的,而對(duì)于父進(jìn)程而言,
// 該方法的返回值為父進(jìn)程id,而對(duì)于子進(jìn)程而言,該方法返回值為0,因而通過(guò)if-else語(yǔ)句就可以讓父進(jìn)程
// 和子進(jìn)程分別調(diào)用后續(xù)不同的代碼片段
pid = fork();
switch (pid) {
case -1:
// fork出錯(cuò)
ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
"fork() failed while spawning \"%s\"", name);
ngx_close_channel(ngx_processes[s].channel, cycle->log);
return NGX_INVALID_PID;
case 0:
// 子進(jìn)程執(zhí)行的分支,這里的proc()方法是外部傳進(jìn)來(lái)的,也就是說(shuō),當(dāng)前方法只是創(chuàng)建一個(gè)新的進(jìn)程,
// 具體的進(jìn)程處理邏輯,將交由外部代碼塊進(jìn)行定義ngx_getpid()方法獲取的就是當(dāng)前新創(chuàng)建的子進(jìn)程的進(jìn)程id
ngx_pid = ngx_getpid();
proc(cycle, data);
break;
default:
// 父進(jìn)程會(huì)走到這里
break;
}
ngx_log_error(NGX_LOG_NOTICE, cycle->log, 0, "start %s %P", name, pid);
// 父進(jìn)程會(huì)走到這里,當(dāng)前的pid是fork()之后父進(jìn)程得到的新創(chuàng)建的子進(jìn)程的pid
ngx_processes[s].pid = pid;
ngx_processes[s].exited = 0;
if (respawn >= 0) {
return pid;
}
// 設(shè)置當(dāng)前進(jìn)程的各個(gè)屬性,并且存儲(chǔ)到ngx_processes數(shù)組中的對(duì)應(yīng)位置
ngx_processes[s].proc = proc;
ngx_processes[s].data = data;
ngx_processes[s].name = name;
ngx_processes[s].exiting = 0;
switch (respawn) {
case NGX_PROCESS_NORESPAWN:
ngx_processes[s].respawn = 0;
ngx_processes[s].just_spawn = 0;
ngx_processes[s].detached = 0;
break;
case NGX_PROCESS_JUST_SPAWN:
ngx_processes[s].respawn = 0;
ngx_processes[s].just_spawn = 1;
ngx_processes[s].detached = 0;
break;
case NGX_PROCESS_RESPAWN:
ngx_processes[s].respawn = 1;
ngx_processes[s].just_spawn = 0;
ngx_processes[s].detached = 0;
break;
case NGX_PROCESS_JUST_RESPAWN:
ngx_processes[s].respawn = 1;
ngx_processes[s].just_spawn = 1;
ngx_processes[s].detached = 0;
break;
case NGX_PROCESS_DETACHED:
ngx_processes[s].respawn = 0;
ngx_processes[s].just_spawn = 0;
ngx_processes[s].detached = 1;
break;
}
if (s == ngx_last_process) {
ngx_last_process++;
}
return pid;
}
ngx_spawn_process()方法最后會(huì)fork()一個(gè)子進(jìn)程以執(zhí)行其第二個(gè)參數(shù)所指定的回調(diào)方法。但是在這之前,我們需要說(shuō)明的是,其通過(guò)socketpair()方法調(diào)用會(huì)創(chuàng)建一對(duì)匿名的socket,然后將其存儲(chǔ)在當(dāng)前進(jìn)程的channel數(shù)組中,如此就完成了channel數(shù)組的創(chuàng)建。
worker進(jìn)程啟動(dòng)之后會(huì)執(zhí)行ngx_worker_process_cycle()方法,該方法首先會(huì)對(duì)worker進(jìn)程進(jìn)行初始化,其中就包括對(duì)繼承而來(lái)的channel數(shù)組的處理。由于master進(jìn)程和worker進(jìn)程都保有channel數(shù)組所指代的socket描述符,而本質(zhì)上master進(jìn)程和各個(gè)worker進(jìn)程只需要保有該數(shù)組的某一邊的描述符即可。因而這里worker進(jìn)程在初始化過(guò)程中,會(huì)關(guān)閉其所保存的另一邊的描述符。在nginx中,master進(jìn)程統(tǒng)一的會(huì)保留channel數(shù)組的0號(hào)位的socket描述符,關(guān)閉1號(hào)位的socket描述符,而worker進(jìn)程則會(huì)關(guān)閉0號(hào)位的socket描述符,保留1號(hào)位的描述符。這樣master進(jìn)程需要與worker進(jìn)程通信時(shí),就只需要往channel[0]中寫入數(shù)據(jù),而worker進(jìn)程則會(huì)監(jiān)聽channel[1],從而接收到master進(jìn)程的數(shù)據(jù)寫入。這里我們首先看一下worker進(jìn)程的初始化方法ngx_worker_process_init()的源碼:
/**
* 這里主要是對(duì)當(dāng)前進(jìn)程進(jìn)行初始化,為其設(shè)置優(yōu)先級(jí)和打開的文件限制等參數(shù)。
* 最后會(huì)為當(dāng)前進(jìn)程添加一個(gè)監(jiān)聽channel[1]的連接,以不斷讀取master進(jìn)程的消息,從而進(jìn)行相應(yīng)的處理
*/
static void ngx_worker_process_init(ngx_cycle_t *cycle, ngx_int_t worker) {
sigset_t set;
ngx_int_t n;
ngx_time_t *tp;
ngx_uint_t i;
ngx_cpuset_t *cpu_affinity;
struct rlimit rlmt;
ngx_core_conf_t *ccf;
ngx_listening_t *ls;
// 設(shè)置時(shí)區(qū)相關(guān)的信息
if (ngx_set_environment(cycle, NULL) == NULL) {
/* fatal */
exit(2);
}
ccf = (ngx_core_conf_t *) ngx_get_conf(cycle->conf_ctx, ngx_core_module);
// 設(shè)置當(dāng)前進(jìn)程的優(yōu)先級(jí)
if (worker >= 0 && ccf->priority != 0) {
if (setpriority(PRIO_PROCESS, 0, ccf->priority) == -1) {
ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
"setpriority(%d) failed", ccf->priority);
}
}
// 設(shè)置當(dāng)前進(jìn)程能夠打開的文件句柄數(shù)
if (ccf->rlimit_nofile != NGX_CONF_UNSET) {
rlmt.rlim_cur = (rlim_t) ccf->rlimit_nofile;
rlmt.rlim_max = (rlim_t) ccf->rlimit_nofile;
if (setrlimit(RLIMIT_NOFILE, &rlmt) == -1) {
ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
"setrlimit(RLIMIT_NOFILE, %i) failed",
ccf->rlimit_nofile);
}
}
// Changes the limit on the largest size of a core file(RLIMIT_CORE) for worker processes.
// 簡(jiǎn)而言之就是設(shè)置核心文件能夠使用的最大大小
if (ccf->rlimit_core != NGX_CONF_UNSET) {
rlmt.rlim_cur = (rlim_t) ccf->rlimit_core;
rlmt.rlim_max = (rlim_t) ccf->rlimit_core;
if (setrlimit(RLIMIT_CORE, &rlmt) == -1) {
ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
"setrlimit(RLIMIT_CORE, %O) failed",
ccf->rlimit_core);
}
}
// geteuid()返回執(zhí)行當(dāng)前程序的用戶id,這里的0表示是否為root用戶
if (geteuid() == 0) {
// setgid()方法的作用是更改組的id
if (setgid(ccf->group) == -1) {
ngx_log_error(NGX_LOG_EMERG, cycle->log, ngx_errno,
"setgid(%d) failed", ccf->group);
/* fatal */
exit(2);
}
// initgroups()是更改附加組的id
if (initgroups(ccf->username, ccf->group) == -1) {
ngx_log_error(NGX_LOG_EMERG, cycle->log, ngx_errno,
"initgroups(%s, %d) failed",
ccf->username, ccf->group);
}
// 更改用戶的id
if (setuid(ccf->user) == -1) {
ngx_log_error(NGX_LOG_EMERG, cycle->log, ngx_errno,
"setuid(%d) failed", ccf->user);
/* fatal */
exit(2);
}
}
// 需要注意的是,對(duì)于cache manager和cache loader進(jìn)程,這里的worker傳入的是-1,
// 表示這兩個(gè)進(jìn)程不需要設(shè)置親核性
if (worker >= 0) {
// 獲取當(dāng)前worker的CPU親核性
cpu_affinity = ngx_get_cpu_affinity(worker);
if (cpu_affinity) {
// 設(shè)置worker的親核心
ngx_setaffinity(cpu_affinity, cycle->log);
}
}
#if (NGX_HAVE_PR_SET_DUMPABLE)
if (prctl(PR_SET_DUMPABLE, 1, 0, 0, 0) == -1) {
ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
"prctl(PR_SET_DUMPABLE) failed");
}
#endif
if (ccf->working_directory.len) {
// chdir()的作用是將當(dāng)前的工作目錄更改為其參數(shù)所傳入的路徑
if (chdir((char *) ccf->working_directory.data) == -1) {
ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
"chdir(\"%s\") failed", ccf->working_directory.data);
/* fatal */
exit(2);
}
}
// 初始化空的set指令集合
sigemptyset(&set);
// ◆ SIG_BLOCK:將 set 參數(shù)指向信號(hào)集中的信號(hào)加入到信號(hào)掩碼中。
// ◆ SIG_UNBLOCK:將 set 參數(shù)指向的信號(hào)集中的信號(hào)從信號(hào)掩碼中刪除。
// ◆ SIG_SETMASK:將 set 參數(shù)指向信號(hào)集設(shè)置為信號(hào)掩碼。
// 這里就是直接初始化要阻塞的信號(hào)集,默認(rèn)為空集
if (sigprocmask(SIG_SETMASK, &set, NULL) == -1) {
ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
"sigprocmask() failed");
}
tp = ngx_timeofday();
srandom(((unsigned) ngx_pid << 16) ^ tp->sec ^ tp->msec);
ls = cycle->listening.elts;
for (i = 0; i < cycle->listening.nelts; i++) {
ls[i].previous = NULL;
}
// 這里調(diào)用各個(gè)模塊的init_process()方法進(jìn)行進(jìn)程模塊的初始化
for (i = 0; cycle->modules[i]; i++) {
if (cycle->modules[i]->init_process) {
if (cycle->modules[i]->init_process(cycle) == NGX_ERROR) {
/* fatal */
exit(2);
}
}
}
// 這里主要是關(guān)閉當(dāng)前進(jìn)程中其他各個(gè)進(jìn)程的channel[1]管道句柄
for (n = 0; n < ngx_last_process; n++) {
if (ngx_processes[n].pid == -1) {
continue;
}
if (n == ngx_process_slot) {
continue;
}
if (ngx_processes[n].channel[1] == -1) {
continue;
}
if (close(ngx_processes[n].channel[1]) == -1) {
ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
"close() channel failed");
}
}
// 關(guān)閉當(dāng)前進(jìn)程的channel[0]管道句柄
if (close(ngx_processes[ngx_process_slot].channel[0]) == -1) {
ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
"close() channel failed");
}
#if 0
ngx_last_process = 0;
#endif
// ngx_channel指向的是當(dāng)前進(jìn)程的channel[1]句柄,也即監(jiān)聽master進(jìn)程發(fā)送消息的句柄。
// 當(dāng)前方法中,首先會(huì)為當(dāng)前的句柄創(chuàng)建一個(gè)connection對(duì)象,并且將其封裝為一個(gè)事件,然后將該事件添加到
// 對(duì)應(yīng)的事件模型隊(duì)列中以監(jiān)聽當(dāng)前句柄的事件,事件的處理邏輯則主要有這里的ngx_channel_handler()
// 方法進(jìn)行。這里的ngx_channel_handler的主要處理邏輯是,根據(jù)當(dāng)前收到的消息設(shè)置當(dāng)前進(jìn)程的一些標(biāo)志位,
// 或者更新某些緩存數(shù)據(jù),如此,在當(dāng)前進(jìn)行的事件循環(huán)中,通過(guò)不斷檢查這些標(biāo)志位,從而實(shí)現(xiàn)在事件進(jìn)程中
// 處理真正的邏輯。因而這里的ngx_channel_handler的處理效率是非常高的
if (ngx_add_channel_event(cycle, ngx_channel, NGX_READ_EVENT,
ngx_channel_handler)
== NGX_ERROR) {
/* fatal */
exit(2);
}
}
該方法主要是對(duì)worker進(jìn)程進(jìn)行初始化,這里我們主要需要關(guān)注最后會(huì)遍歷ngx_processes數(shù)組,這個(gè)數(shù)組中保存了當(dāng)前nginx中各個(gè)進(jìn)程的相關(guān)信息。在遍歷過(guò)程中,會(huì)關(guān)閉當(dāng)前進(jìn)程保有的其余進(jìn)程的channel[1]句柄,而保留有channel[0]句柄,這樣當(dāng)前進(jìn)程如果需要與其他進(jìn)程通信,也只需要往目標(biāo)進(jìn)程的channel[0]中寫入數(shù)據(jù)即可。在遍歷完成之后,當(dāng)前進(jìn)程就會(huì)關(guān)閉自身的channel[0]句柄,而保留channel[1]句柄。最后,會(huì)通過(guò)ngx_add_channel_event()方法為當(dāng)前進(jìn)程添加對(duì)channel[1]的監(jiān)聽事件,這里在調(diào)用ngx_add_channel_event()方法時(shí)傳入的第二個(gè)參數(shù)是ngx_channel,該參數(shù)是在前面的ngx_spawn_process()方法中賦值的,指向的就是當(dāng)前進(jìn)程的channel[1]的socket句柄。
關(guān)于ngx_add_channel_event()方法,其本質(zhì)就是創(chuàng)建一個(gè)ngx_event_t結(jié)構(gòu)體的事件,然后將其添加到當(dāng)前所使用的事件模型(比如epoll)句柄中。這里不再贅述該方法的實(shí)現(xiàn)源碼,不過(guò)我們需要關(guān)注的是該事件觸發(fā)時(shí)的回調(diào)方法,即調(diào)用ngx_add_channel_event()方法時(shí)傳入的第三個(gè)參數(shù)ngx_channel_handler()方法。如下是該方法的源碼:
static void ngx_channel_handler(ngx_event_t *ev) {
ngx_int_t n;
ngx_channel_t ch;
ngx_connection_t *c;
if (ev->timedout) {
ev->timedout = 0;
return;
}
c = ev->data;
for (;;) {
// 在無(wú)限for循環(huán)中不斷讀取master進(jìn)程發(fā)過(guò)來(lái)的消息
n = ngx_read_channel(c->fd, &ch, sizeof(ngx_channel_t), ev->log);
// 如果讀取消息出錯(cuò),說(shuō)明當(dāng)前的句柄可能失效了,就需要關(guān)閉當(dāng)前連接
if (n == NGX_ERROR) {
if (ngx_event_flags & NGX_USE_EPOLL_EVENT) {
ngx_del_conn(c, 0);
}
ngx_close_connection(c);
return;
}
if (ngx_event_flags & NGX_USE_EVENTPORT_EVENT) {
if (ngx_add_event(ev, NGX_READ_EVENT, 0) == NGX_ERROR) {
return;
}
}
if (n == NGX_AGAIN) {
return;
}
// 對(duì)發(fā)送過(guò)來(lái)的消息進(jìn)行處理
switch (ch.command) {
// 如果是quit消息,則設(shè)置quit標(biāo)志位
case NGX_CMD_QUIT:
ngx_quit = 1;
break;
// 如果terminate消息,則設(shè)置terminate標(biāo)志位
case NGX_CMD_TERMINATE:
ngx_terminate = 1;
break;
// 如果是reopen消息,則設(shè)置reopen標(biāo)志位
case NGX_CMD_REOPEN:
ngx_reopen = 1;
break;
// 如果是新建進(jìn)程消息,則更新當(dāng)前ngx_processes數(shù)組對(duì)應(yīng)位置的數(shù)據(jù)
case NGX_CMD_OPEN_CHANNEL:
ngx_processes[ch.slot].pid = ch.pid;
ngx_processes[ch.slot].channel[0] = ch.fd;
break;
// 如果是關(guān)閉channel的消息,則關(guān)閉ngx_processes數(shù)組對(duì)應(yīng)位置的句柄
case NGX_CMD_CLOSE_CHANNEL:
if (close(ngx_processes[ch.slot].channel[0]) == -1) {
ngx_log_error(NGX_LOG_ALERT, ev->log, ngx_errno,
"close() channel failed");
}
ngx_processes[ch.slot].channel[0] = -1;
break;
}
}
}
在ngx_channel_handler()方法中,主要是讀取所監(jiān)聽的socket句柄中的數(shù)據(jù),而數(shù)據(jù)是以一個(gè)ngx_channel_t結(jié)構(gòu)體所承載的,這個(gè)ngx_channel_t是nginx所統(tǒng)一使用的master與worker進(jìn)程進(jìn)行通信的結(jié)構(gòu)體,其會(huì)指定當(dāng)前發(fā)生的事件類型,以及發(fā)生該事件的進(jìn)程信息。如下是ngx_channel_t結(jié)構(gòu)體的聲明:
typedef struct {
// 當(dāng)前發(fā)生的事件類型
ngx_uint_t command;
// 發(fā)生事件的pid
ngx_pid_t pid;
// 發(fā)生事件的進(jìn)程在ngx_processes數(shù)組中的下標(biāo)
ngx_int_t slot;
// 發(fā)生事件的進(jìn)程的channel[0]描述符的值
ngx_fd_t fd;
} ngx_channel_t;
在從當(dāng)前進(jìn)程的channel[1]中讀取了ngx_channel_t結(jié)構(gòu)體的數(shù)據(jù)之后,ngx_channel_handler()方法會(huì)根據(jù)發(fā)生的事件類型更新相應(yīng)的標(biāo)志位的狀態(tài),并且會(huì)更新當(dāng)前進(jìn)程的ngx_processes數(shù)組中對(duì)應(yīng)的發(fā)生事件的進(jìn)程的狀態(tài)信息。
在處理了master進(jìn)程所發(fā)送的事件之后,worker進(jìn)程就會(huì)繼續(xù)其循環(huán),在該循環(huán)中會(huì)檢查其所關(guān)注的標(biāo)志位的狀態(tài),然后會(huì)根據(jù)這些狀態(tài)執(zhí)行對(duì)應(yīng)的邏輯。如下是worker進(jìn)程工作的循環(huán)的源碼:
/**
* 進(jìn)入worker進(jìn)程工作的循環(huán)
*/
static void ngx_worker_process_cycle(ngx_cycle_t *cycle, void *data) {
ngx_int_t worker = (intptr_t) data;
ngx_process = NGX_PROCESS_WORKER;
ngx_worker = worker;
// 初始化worker進(jìn)程,前面對(duì)該方法的源碼進(jìn)行了講解
ngx_worker_process_init(cycle, worker);
ngx_setproctitle("worker process");
for (;;) {
if (ngx_exiting) {
// 這里主要是檢查有沒有事件是非cancelable狀態(tài)的,也就是說(shuō)是否所有的事件都已經(jīng)取消了,如果取消了,
// 就會(huì)返回NGX_OK。這里的邏輯可以理解為,如果被標(biāo)記為了ngx_exiting,那么此時(shí),如果還有未取消的
// 事件存在,則會(huì)走到下面的ngx_process_events_and_timers()方法,如此就會(huì)處理未完成的事件,
// 然后在循環(huán)中再次走到這個(gè)位置,最終if條件為true,從而執(zhí)行退出worker進(jìn)程的工作
if (ngx_event_no_timers_left() == NGX_OK) {
ngx_log_error(NGX_LOG_NOTICE, cycle->log, 0, "exiting");
ngx_worker_process_exit(cycle);
}
}
ngx_log_debug0(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "worker cycle");
// 這里通過(guò)檢查相應(yīng)的事件模型中是否存在對(duì)應(yīng)的事件,然后將其放入隊(duì)列中進(jìn)行處理,
// 這里是worker進(jìn)程處理事件的核心方法
ngx_process_events_and_timers(cycle);
// 這里ngx_terminate是強(qiáng)制關(guān)閉nginx的選項(xiàng),如果向nginx發(fā)送了強(qiáng)制關(guān)閉nginx命令,則當(dāng)前進(jìn)程會(huì)直接退出
if (ngx_terminate) {
ngx_log_error(NGX_LOG_NOTICE, cycle->log, 0, "exiting");
ngx_worker_process_exit(cycle);
}
// 這里ngx_quit是優(yōu)雅退出的選項(xiàng)。這里主要是將ngx_exiting置為1,用于表征當(dāng)前進(jìn)程需要退出,
// 然后會(huì)執(zhí)行如下三個(gè)工作:
// 1. 往事件隊(duì)列中添加一個(gè)事件,用于處理當(dāng)前處于活躍狀態(tài)的連接,將其close標(biāo)志位置為1,并且執(zhí)行該連接
// 當(dāng)前的處理方法,以盡快完成連接事件;
// 2. 關(guān)閉當(dāng)前cycle中監(jiān)聽的socket句柄;
// 3. 將當(dāng)前所有處于空閑狀態(tài)的連接的close狀態(tài)標(biāo)記為1,然后調(diào)用其連接處理方法.
if (ngx_quit) {
ngx_quit = 0;
ngx_log_error(NGX_LOG_NOTICE, cycle->log, 0, "gracefully shutting down");
ngx_setproctitle("worker process is shutting down");
if (!ngx_exiting) {
ngx_exiting = 1;
ngx_set_shutdown_timer(cycle);
ngx_close_listening_sockets(cycle);
ngx_close_idle_connections(cycle);
}
}
// ngx_reopen主要是重新打開nginx的所有文件,比如切換nginx的日志文件等等
if (ngx_reopen) {
ngx_reopen = 0;
ngx_log_error(NGX_LOG_NOTICE, cycle->log, 0, "reopening logs");
ngx_reopen_files(cycle, -1);
}
}
}
可以看到,worker進(jìn)程主要處理了nginx是否退出相關(guān)的標(biāo)志位,還處理了nginx是否重新讀取了配置文件的標(biāo)志位。
4. 小結(jié)
本文首先對(duì)master-worker進(jìn)程交互的基本原理進(jìn)行了講解,然后深入到源碼中講解了nginx是如何實(shí)現(xiàn)master和worker進(jìn)程的相互通信的。
以上就是本文的全部?jī)?nèi)容,希望對(duì)大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。