目錄
- 一、 nginx進程鎖的作用
- 二、入門級鎖使用
- 三、nginx進程鎖的實現(xiàn)
- 3.1、鎖的數(shù)據(jù)結(jié)構(gòu)
- 3.2、基于fd的上鎖/解鎖實現(xiàn)
- 3.3、nginx鎖實例的初始化
- 3.4、基于共享內(nèi)存的上鎖/解鎖實現(xiàn)
- 四、 說到底鎖的含義是什么
一、 nginx進程鎖的作用
nginx是多進程并發(fā)模型應(yīng)用,直白點就是:有多個worker都在監(jiān)聽網(wǎng)絡(luò)請求,誰接收某個請求,那么后續(xù)的事務(wù)就由它來完成。如果沒有鎖的存在,那么就是這種場景,當(dāng)一個請求被系統(tǒng)接入后,所以可以監(jiān)聽該端口的進程,就會同時去處理該事務(wù)。當(dāng)然了,系統(tǒng)會避免這種糟糕事情的發(fā)生,但也就出現(xiàn)了所謂的驚群。(不知道說得對不對,大概是那么個意思吧)
所以,為了避免出現(xiàn)同一時刻,有許多進程監(jiān)聽,就應(yīng)該該多個worker間有序地監(jiān)聽socket. 為了讓多個worker有序,所以就有了本文要講的進程鎖的出現(xiàn)了,只有搶到鎖的進程才可以進行網(wǎng)絡(luò)請求的接入操作。
即如下過程:
// worker 核心事務(wù)框架
// ngx_event.c
void
ngx_process_events_and_timers(ngx_cycle_t *cycle)
{
ngx_uint_t flags;
ngx_msec_t timer, delta;
if (ngx_timer_resolution) {
timer = NGX_TIMER_INFINITE;
flags = 0;
} else {
timer = ngx_event_find_timer();
flags = NGX_UPDATE_TIME;
#if (NGX_WIN32)
/* handle signals from master in case of network inactivity */
if (timer == NGX_TIMER_INFINITE || timer > 500) {
timer = 500;
}
#endif
}
if (ngx_use_accept_mutex) {
// 為了一定的公平性,避免反復(fù)爭搶鎖
if (ngx_accept_disabled > 0) {
ngx_accept_disabled--;
} else {
// 只有搶到鎖的進程,進行 socket 的 accept() 操作
// 其他worker則處理之前接入的請求,read/write操作
if (ngx_trylock_accept_mutex(cycle) == NGX_ERROR) {
return;
}
if (ngx_accept_mutex_held) {
flags |= NGX_POST_EVENTS;
} else {
if (timer == NGX_TIMER_INFINITE
|| timer > ngx_accept_mutex_delay)
{
timer = ngx_accept_mutex_delay;
}
}
}
}
// 其他核心事務(wù)處理
if (!ngx_queue_empty(&ngx_posted_next_events)) {
ngx_event_move_posted_next(cycle);
timer = 0;
}
delta = ngx_current_msec;
(void) ngx_process_events(cycle, timer, flags);
delta = ngx_current_msec - delta;
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
"timer delta: %M", delta);
ngx_event_process_posted(cycle, &ngx_posted_accept_events);
if (ngx_accept_mutex_held) {
ngx_shmtx_unlock(&ngx_accept_mutex);
}
if (delta) {
ngx_event_expire_timers();
}
ngx_event_process_posted(cycle, &ngx_posted_events);
}
// 獲取鎖,并注冊socket accept() 過程如下
ngx_int_t
ngx_trylock_accept_mutex(ngx_cycle_t *cycle)
{
if (ngx_shmtx_trylock(&ngx_accept_mutex)) {
ngx_log_debug0(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
"accept mutex locked");
if (ngx_accept_mutex_held && ngx_accept_events == 0) {
return NGX_OK;
}
if (ngx_enable_accept_events(cycle) == NGX_ERROR) {
// 解鎖操作
ngx_shmtx_unlock(&ngx_accept_mutex);
return NGX_ERROR;
}
ngx_accept_events = 0;
ngx_accept_mutex_held = 1;
return NGX_OK;
}
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
"accept mutex lock failed: %ui", ngx_accept_mutex_held);
if (ngx_accept_mutex_held) {
if (ngx_disable_accept_events(cycle, 0) == NGX_ERROR) {
return NGX_ERROR;
}
ngx_accept_mutex_held = 0;
}
return NGX_OK;
}
其他的不必多說,核心即搶到鎖的worker,才可以進行accept操作。而沒有搶到鎖的worker, 則要主動釋放之前的accept()權(quán)力。從而達到,同一時刻,只有一個worker在處理accept事件。
二、入門級鎖使用
鎖這種東西,一般都是編程語言自己定義好的接口,或者固定用法。
比如 java 中的 synchronized xxx, Lock 相關(guān)并發(fā)包鎖如 CountDownLatch, CyclicBarrier, ReentrantLock, ReentrantReadWriteLock, Semaphore...
比如 python 中的 threading.Lock(), threading.RLock()...
比如 php 中的 flock()...
之所以說是入門級,是因為這都是些接口api, 你只要按照使用規(guī)范,調(diào)一下就可以了,無需更多知識。但要想用好各細節(jié),則實際不簡單。
三、nginx進程鎖的實現(xiàn)
nginx因為是使用C語言編寫的,所以肯定是更接近底層些的。能夠通過它的實現(xiàn),來看鎖如何實現(xiàn),應(yīng)該能夠讓我們更能理解鎖的深層次含義。
一般地,鎖包含這么幾個大方向:鎖數(shù)據(jù)結(jié)構(gòu)定義,上鎖邏輯,解鎖邏輯,以及一些通知機制,超時機制什么的。下面我們就其中幾個方向,看下nginx 實現(xiàn):
3.1、鎖的數(shù)據(jù)結(jié)構(gòu)
首先要定義出鎖有些什么變量,然后實例化一個值,共享給多進程使用。
// event/ngx_event.c
// 全局accept鎖變量定義
ngx_shmtx_t ngx_accept_mutex;
// 這個鎖有一個
// atomic 使用 volatile 修飾實現(xiàn)
typedef volatile ngx_atomic_uint_t ngx_atomic_t;
typedef struct {
#if (NGX_HAVE_ATOMIC_OPS)
// 有使用原子更新變量實現(xiàn)鎖,其背后是共享內(nèi)存區(qū)域
ngx_atomic_t *lock;
#if (NGX_HAVE_POSIX_SEM)
ngx_atomic_t *wait;
ngx_uint_t semaphore;
sem_t sem;
#endif
#else
// 有使用fd實現(xiàn)鎖,fd的背后是一個文件實例
ngx_fd_t fd;
u_char *name;
#endif
ngx_uint_t spin;
} ngx_shmtx_t;
// 共享內(nèi)存數(shù)據(jù)結(jié)構(gòu)定義
typedef struct {
u_char *addr;
size_t size;
ngx_str_t name;
ngx_log_t *log;
ngx_uint_t exists; /* unsigned exists:1; */
} ngx_shm_t;
3.2、基于fd的上鎖/解鎖實現(xiàn)
有了鎖實例,就可以對其進行上鎖解鎖了。nginx有兩種鎖實現(xiàn),主要是基于平臺的差異性決定的:基于文件或者基于共享內(nèi)在實現(xiàn)。基于fd即基于文件的實現(xiàn),這個還是有點重的操作。如下:
// ngx_shmtx.c
ngx_uint_t
ngx_shmtx_trylock(ngx_shmtx_t *mtx)
{
ngx_err_t err;
err = ngx_trylock_fd(mtx->fd);
if (err == 0) {
return 1;
}
if (err == NGX_EAGAIN) {
return 0;
}
#if __osf__ /* Tru64 UNIX */
if (err == NGX_EACCES) {
return 0;
}
#endif
ngx_log_abort(err, ngx_trylock_fd_n " %s failed", mtx->name);
return 0;
}
// core/ngx_shmtx.c
// 1. 上鎖過程
ngx_err_t
ngx_trylock_fd(ngx_fd_t fd)
{
struct flock fl;
ngx_memzero(&fl, sizeof(struct flock));
fl.l_type = F_WRLCK;
fl.l_whence = SEEK_SET;
if (fcntl(fd, F_SETLK, &fl) == -1) {
return ngx_errno;
}
return 0;
}
// os/unix/ngx_file.c
ngx_err_t
ngx_lock_fd(ngx_fd_t fd)
{
struct flock fl;
ngx_memzero(&fl, sizeof(struct flock));
fl.l_type = F_WRLCK;
fl.l_whence = SEEK_SET;
// 調(diào)用系統(tǒng)提供的上鎖方法
if (fcntl(fd, F_SETLKW, &fl) == -1) {
return ngx_errno;
}
return 0;
}
// 2. 解鎖實現(xiàn)
// core/ngx_shmtx.c
void
ngx_shmtx_unlock(ngx_shmtx_t *mtx)
{
ngx_err_t err;
err = ngx_unlock_fd(mtx->fd);
if (err == 0) {
return;
}
ngx_log_abort(err, ngx_unlock_fd_n " %s failed", mtx->name);
}
// os/unix/ngx_file.c
ngx_err_t
ngx_unlock_fd(ngx_fd_t fd)
{
struct flock fl;
ngx_memzero(&fl, sizeof(struct flock));
fl.l_type = F_UNLCK;
fl.l_whence = SEEK_SET;
if (fcntl(fd, F_SETLK, &fl) == -1) {
return ngx_errno;
}
return 0;
}
重點就是 fcntl() 這個系統(tǒng)api的調(diào)用,無他。當(dāng)然,站在一個旁觀者角度來看,實際就是因為多進程對文件的操作是可見的,所以達到進程鎖的目的。其中,tryLock 和 lock 存在一定的語義差異,即try時,會得到一些是否成功的標(biāo)識,而直接進行l(wèi)ock時,則不能得到標(biāo)識。一般會要求阻塞住請求
3.3、nginx鎖實例的初始化
也許在有些地方,一個鎖實例的初始化,就是一個變量的簡單賦值而已。但在nginx有些不同。首先,需要保證各worker能看到相同的實例或者相當(dāng)?shù)膶嵗?。因為worker是從master處fork()出來的進程,所以只要在master中實例化好的鎖,必然可以保證各worker能拿到一樣的值。那么,到底是不是只是這樣呢?
// 共享鎖的初始化,在ngx master 中進行,后fork()到worker進程
// event/ngx_event.c
static ngx_int_t
ngx_event_module_init(ngx_cycle_t *cycle)
{
void ***cf;
u_char *shared;
size_t size, cl;
// 定義一段共享內(nèi)存
ngx_shm_t shm;
ngx_time_t *tp;
ngx_core_conf_t *ccf;
ngx_event_conf_t *ecf;
cf = ngx_get_conf(cycle->conf_ctx, ngx_events_module);
ecf = (*cf)[ngx_event_core_module.ctx_index];
if (!ngx_test_config && ngx_process <= NGX_PROCESS_MASTER) {
ngx_log_error(NGX_LOG_NOTICE, cycle->log, 0,
"using the \"%s\" event method", ecf->name);
}
ccf = (ngx_core_conf_t *) ngx_get_conf(cycle->conf_ctx, ngx_core_module);
ngx_timer_resolution = ccf->timer_resolution;
#if !(NGX_WIN32)
{
ngx_int_t limit;
struct rlimit rlmt;
if (getrlimit(RLIMIT_NOFILE, &rlmt) == -1) {
ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
"getrlimit(RLIMIT_NOFILE) failed, ignored");
} else {
if (ecf->connections > (ngx_uint_t) rlmt.rlim_cur
&& (ccf->rlimit_nofile == NGX_CONF_UNSET
|| ecf->connections > (ngx_uint_t) ccf->rlimit_nofile))
{
limit = (ccf->rlimit_nofile == NGX_CONF_UNSET) ?
(ngx_int_t) rlmt.rlim_cur : ccf->rlimit_nofile;
ngx_log_error(NGX_LOG_WARN, cycle->log, 0,
"%ui worker_connections exceed "
"open file resource limit: %i",
ecf->connections, limit);
}
}
}
#endif /* !(NGX_WIN32) */
if (ccf->master == 0) {
return NGX_OK;
}
if (ngx_accept_mutex_ptr) {
return NGX_OK;
}
/* cl should be equal to or greater than cache line size */
cl = 128;
size = cl /* ngx_accept_mutex */
+ cl /* ngx_connection_counter */
+ cl; /* ngx_temp_number */
#if (NGX_STAT_STUB)
size += cl /* ngx_stat_accepted */
+ cl /* ngx_stat_handled */
+ cl /* ngx_stat_requests */
+ cl /* ngx_stat_active */
+ cl /* ngx_stat_reading */
+ cl /* ngx_stat_writing */
+ cl; /* ngx_stat_waiting */
#endif
shm.size = size;
ngx_str_set(&shm.name, "nginx_shared_zone");
shm.log = cycle->log;
// 分配共享內(nèi)存空間, 使用 mmap 實現(xiàn)
if (ngx_shm_alloc(&shm) != NGX_OK) {
return NGX_ERROR;
}
shared = shm.addr;
ngx_accept_mutex_ptr = (ngx_atomic_t *) shared;
ngx_accept_mutex.spin = (ngx_uint_t) -1;
// 基于共享文件或者內(nèi)存賦值進程鎖,從而實現(xiàn)多進程控制
if (ngx_shmtx_create(&ngx_accept_mutex, (ngx_shmtx_sh_t *) shared,
cycle->lock_file.data)
!= NGX_OK)
{
return NGX_ERROR;
}
ngx_connection_counter = (ngx_atomic_t *) (shared + 1 * cl);
(void) ngx_atomic_cmp_set(ngx_connection_counter, 0, 1);
ngx_log_debug2(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
"counter: %p, %uA",
ngx_connection_counter, *ngx_connection_counter);
ngx_temp_number = (ngx_atomic_t *) (shared + 2 * cl);
tp = ngx_timeofday();
ngx_random_number = (tp->msec << 16) + ngx_pid;
#if (NGX_STAT_STUB)
ngx_stat_accepted = (ngx_atomic_t *) (shared + 3 * cl);
ngx_stat_handled = (ngx_atomic_t *) (shared + 4 * cl);
ngx_stat_requests = (ngx_atomic_t *) (shared + 5 * cl);
ngx_stat_active = (ngx_atomic_t *) (shared + 6 * cl);
ngx_stat_reading = (ngx_atomic_t *) (shared + 7 * cl);
ngx_stat_writing = (ngx_atomic_t *) (shared + 8 * cl);
ngx_stat_waiting = (ngx_atomic_t *) (shared + 9 * cl);
#endif
return NGX_OK;
}
// core/ngx_shmtx.c
// 1. 基于文件進程共享空間, 使用 fd
ngx_int_t
ngx_shmtx_create(ngx_shmtx_t *mtx, ngx_shmtx_sh_t *addr, u_char *name)
{
// 由master進程創(chuàng)建,所以是進程安全的操作,各worker直接使用即可
if (mtx->name) {
// 如果已經(jīng)創(chuàng)建好了,則 fd 已被賦值,不能創(chuàng)建了,直接共享fd即可
// fd 的背后是一個文件實例
if (ngx_strcmp(name, mtx->name) == 0) {
mtx->name = name;
return NGX_OK;
}
ngx_shmtx_destroy(mtx);
}
// 使用文件創(chuàng)建的方式鎖共享
mtx->fd = ngx_open_file(name, NGX_FILE_RDWR, NGX_FILE_CREATE_OR_OPEN,
NGX_FILE_DEFAULT_ACCESS);
if (mtx->fd == NGX_INVALID_FILE) {
ngx_log_error(NGX_LOG_EMERG, ngx_cycle->log, ngx_errno,
ngx_open_file_n " \"%s\" failed", name);
return NGX_ERROR;
}
// 創(chuàng)建完成即可刪除,后續(xù)只基于該fd實例做鎖操作
if (ngx_delete_file(name) == NGX_FILE_ERROR) {
ngx_log_error(NGX_LOG_ALERT, ngx_cycle->log, ngx_errno,
ngx_delete_file_n " \"%s\" failed", name);
}
mtx->name = name;
return NGX_OK;
}
// 2. 基于共享內(nèi)存的共享鎖的創(chuàng)建
// ngx_shmtx.c
ngx_int_t
ngx_shmtx_create(ngx_shmtx_t *mtx, ngx_shmtx_sh_t *addr, u_char *name)
{
mtx->lock = &addr->lock;
if (mtx->spin == (ngx_uint_t) -1) {
return NGX_OK;
}
mtx->spin = 2048;
#if (NGX_HAVE_POSIX_SEM)
mtx->wait = &addr->wait;
if (sem_init(&mtx->sem, 1, 0) == -1) {
ngx_log_error(NGX_LOG_ALERT, ngx_cycle->log, ngx_errno,
"sem_init() failed");
} else {
mtx->semaphore = 1;
}
#endif
return NGX_OK;
}
// os/unix/ngx_shmem.c
ngx_int_t
ngx_shm_alloc(ngx_shm_t *shm)
{
shm->addr = (u_char *) mmap(NULL, shm->size,
PROT_READ|PROT_WRITE,
MAP_ANON|MAP_SHARED, -1, 0);
if (shm->addr == MAP_FAILED) {
ngx_log_error(NGX_LOG_ALERT, shm->log, ngx_errno,
"mmap(MAP_ANON|MAP_SHARED, %uz) failed", shm->size);
return NGX_ERROR;
}
return NGX_OK;
}
基于fd的鎖實現(xiàn),本質(zhì)是基于其背后的文件系統(tǒng)的實現(xiàn),因為文件系統(tǒng)是進程可見的,所以對于相同fd控制,就是對共同的鎖的控制了。
3.4、基于共享內(nèi)存的上鎖/解鎖實現(xiàn)
所謂共享內(nèi)存,實際就是一塊公共的內(nèi)存區(qū)域,它超出了進程的范圍(受操作系統(tǒng)管理)。就是前面我們看到的mmap()的創(chuàng)建,就是一塊共享內(nèi)存。
// ngx_shmtx.c
ngx_uint_t
ngx_shmtx_trylock(ngx_shmtx_t *mtx)
{
// 直接對共享內(nèi)存區(qū)域的值進行改變
// cas 改變成功即是上鎖成功。
return (*mtx->lock == 0 && ngx_atomic_cmp_set(mtx->lock, 0, ngx_pid));
}
// shm版本的解鎖操作, cas 解析,帶通知
void
ngx_shmtx_unlock(ngx_shmtx_t *mtx)
{
if (mtx->spin != (ngx_uint_t) -1) {
ngx_log_debug0(NGX_LOG_DEBUG_CORE, ngx_cycle->log, 0, "shmtx unlock");
}
if (ngx_atomic_cmp_set(mtx->lock, ngx_pid, 0)) {
ngx_shmtx_wakeup(mtx);
}
}
// 通知等待進程
static void
ngx_shmtx_wakeup(ngx_shmtx_t *mtx)
{
#if (NGX_HAVE_POSIX_SEM)
ngx_atomic_uint_t wait;
if (!mtx->semaphore) {
return;
}
for ( ;; ) {
wait = *mtx->wait;
if ((ngx_atomic_int_t) wait <= 0) {
return;
}
if (ngx_atomic_cmp_set(mtx->wait, wait, wait - 1)) {
break;
}
}
ngx_log_debug1(NGX_LOG_DEBUG_CORE, ngx_cycle->log, 0,
"shmtx wake %uA", wait);
if (sem_post(&mtx->sem) == -1) {
ngx_log_error(NGX_LOG_ALERT, ngx_cycle->log, ngx_errno,
"sem_post() failed while wake shmtx");
}
#endif
}
共享內(nèi)存版本的鎖的實現(xiàn),基本就是cas的對內(nèi)存變量的設(shè)置。只是這個面向的內(nèi)存,是共享區(qū)域的內(nèi)存。
四、 說到底鎖的含義是什么
見過了許多的鎖,依然過不好這一關(guān)。
鎖到底是什么呢?事實上,鎖就是一個標(biāo)識位。當(dāng)有人看到這個標(biāo)識位后,就主動停止操作,或者進行等等,從而使其看起來起到了鎖的作用。這個標(biāo)識位,可以設(shè)置在某個對象中,也可以為設(shè)置在某個全局值中,還可以借助于各種存在介質(zhì),比如文件,比如redis,比如zk 。 這都沒有差別。因為問題關(guān)鍵不在存放在哪里,而在于如何安全地設(shè)置這個標(biāo)識位。
要實現(xiàn)鎖,一般都需要要一個強有力的底層含義保證,比如cpu層面的cas操作,應(yīng)用級別的隊列串行原子操作。。。
至于什么,內(nèi)存鎖,文件鎖,高級鎖,都是有各自的應(yīng)用場景。而要選好各種鎖,則變成了評價高低地關(guān)鍵。此時此刻,你應(yīng)該能判斷出來的!
以上就是詳解nginx進程鎖的實現(xiàn)的詳細內(nèi)容,更多關(guān)于nginx 進程鎖的資料請關(guān)注腳本之家其它相關(guān)文章!