Nginx(八): 观进程锁的实现

共 14033字,需浏览 29分钟

 ·

2021-02-27 14:23

走过路过不要错过

点击蓝字关注我们


前面的nginx系列讲解了nginx很多通用概念,流程,以及核心的http模块的一些实现。应该说大体上对nginx已经不再陌生和神秘。

今天我们不看全局,只看一个非常非常小的细节:nginx是多进程并发模型的应用,但为了网络请求的安全性,必须要使用到锁,那么这个进程锁如何实现呢?

1:nginx进程锁的作用

nginx是多进程并发模型应用,直白点就是:有多个worker都在监听网络请求,谁接收某个请求,那么后续的事务就由它来完成。如果没有锁的存在,那么就是这种场景,当一个请求被系统接入后,所以可以监听该端口的进程,就会同时去处理该事务。当然了,系统会避免这种糟糕事情的发生,但也就出现了所谓的惊群。(不知道说得对不对,大概是那么个意思吧)

所以,为了避免出现同一时刻,有许多进程监听,就应该该多个worker间有序地监听socket. 为了让多个worker有序,所以就有了本文要讲的进程锁的出现了,只有抢到锁的进程才可以进行网络请求的接入操作。

即如下过程:

// worker 核心事务框架// ngx_event.cvoidngx_process_events_and_timers(ngx_cycle_t *cycle){    ngx_uint_t  flags;    ngx_msec_t  timer, delta;
if (ngx_timer_resolution) { timer = NGX_TIMER_INFINITE; flags = 0;
} else { timer = ngx_event_find_timer(); flags = NGX_UPDATE_TIME;
#if (NGX_WIN32)
/* handle signals from master in case of network inactivity */
if (timer == NGX_TIMER_INFINITE || timer > 500) { timer = 500; }
#endif }
if (ngx_use_accept_mutex) { // 为了一定的公平性,避免反复争抢锁 if (ngx_accept_disabled > 0) { ngx_accept_disabled--;
} else { // 只有抢到锁的进程,进行 socket 的 accept() 操作 // 其他worker则处理之前接入的请求,read/write操作 if (ngx_trylock_accept_mutex(cycle) == NGX_ERROR) { return; }
if (ngx_accept_mutex_held) { flags |= NGX_POST_EVENTS;
} else { if (timer == NGX_TIMER_INFINITE || timer > ngx_accept_mutex_delay) { timer = ngx_accept_mutex_delay; } } } } // 其他核心事务处理 if (!ngx_queue_empty(&ngx_posted_next_events)) { ngx_event_move_posted_next(cycle); timer = 0; }
delta = ngx_current_msec;
(void) ngx_process_events(cycle, timer, flags);
delta = ngx_current_msec - delta;
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "timer delta: %M", delta);
ngx_event_process_posted(cycle, &ngx_posted_accept_events);
if (ngx_accept_mutex_held) { ngx_shmtx_unlock(&ngx_accept_mutex); }
if (delta) { ngx_event_expire_timers(); }
ngx_event_process_posted(cycle, &ngx_posted_events);}// 获取锁,并注册socket accept() 过程如下ngx_int_tngx_trylock_accept_mutex(ngx_cycle_t *cycle){ if (ngx_shmtx_trylock(&ngx_accept_mutex)) {
ngx_log_debug0(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "accept mutex locked");
if (ngx_accept_mutex_held && ngx_accept_events == 0) { return NGX_OK; }
if (ngx_enable_accept_events(cycle) == NGX_ERROR) { // 解锁操作 ngx_shmtx_unlock(&ngx_accept_mutex); return NGX_ERROR; }
ngx_accept_events = 0; ngx_accept_mutex_held = 1;
return NGX_OK; }
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "accept mutex lock failed: %ui", ngx_accept_mutex_held);
if (ngx_accept_mutex_held) { if (ngx_disable_accept_events(cycle, 0) == NGX_ERROR) { return NGX_ERROR; }
ngx_accept_mutex_held = 0; }
return NGX_OK;}

其他的不必多说,核心即抢到锁的worker,才可以进行accept操作。而没有抢到锁的worker, 则要主动释放之前的accept()权力。从而达到,同一时刻,只有一个worker在处理accept事件。


2:入门级锁使用

锁这种东西,一般都是编程语言自己定义好的接口,或者固定用法。

比如 java 中的 synchronized xxx, Lock 相关并发包锁如 CountDownLatch, CyclicBarrier, ReentrantLock, ReentrantReadWriteLock, Semaphore...

比如 python 中的 threading.Lock(), threading.RLock()...

比如 php 中的 flock()...

之所以说是入门级,是因为这都是些接口api, 你只要按照使用规范,调一下就可以了,无需更多知识。但要想用好各细节,则实际不简单。

3:nginx进程锁的实现

nginx因为是使用C语言编写的,所以肯定是更接近底层些的。能够通过它的实现,来看锁如何实现,应该能够让我们更能理解锁的深层次含义。

一般地,锁包含这么几个大方向:锁数据结构定义,上锁逻辑,解锁逻辑,以及一些通知机制,超时机制什么的。下面我们就其中几个方向,看下nginx 实现:


3.1. 锁的数据结构

首先要定义出锁有些什么变量,然后实例化一个值,共享给多进程使用。

// event/ngx_event.c// 全局accept锁变量定义ngx_shmtx_t           ngx_accept_mutex;// 这个锁有一个// atomic 使用 volatile 修饰实现typedef volatile ngx_atomic_uint_t  ngx_atomic_t;typedef struct {#if (NGX_HAVE_ATOMIC_OPS)    // 有使用原子更新变量实现锁,其背后是共享内存区域    ngx_atomic_t  *lock;#if (NGX_HAVE_POSIX_SEM)    ngx_atomic_t  *wait;    ngx_uint_t     semaphore;    sem_t          sem;#endif#else    // 有使用fd实现锁,fd的背后是一个文件实例    ngx_fd_t       fd;    u_char        *name;#endif    ngx_uint_t     spin;} ngx_shmtx_t;// 共享内存数据结构定义typedef struct {    u_char      *addr;    size_t       size;    ngx_str_t    name;    ngx_log_t   *log;    ngx_uint_t   exists;   /* unsigned  exists:1;  */} ngx_shm_t;


3.2. 基于fd的上锁/解锁实现

有了锁实例,就可以对其进行上锁解锁了。nginx有两种锁实现,主要是基于平台的差异性决定的:基于文件或者基于共享内在实现。基于fd即基于文件的实现,这个还是有点重的操作。如下:

// ngx_shmtx.cngx_uint_tngx_shmtx_trylock(ngx_shmtx_t *mtx){    ngx_err_t  err;
err = ngx_trylock_fd(mtx->fd);
if (err == 0) { return 1; }
if (err == NGX_EAGAIN) { return 0; }
#if __osf__ /* Tru64 UNIX */
if (err == NGX_EACCES) { return 0; }
#endif
ngx_log_abort(err, ngx_trylock_fd_n " %s failed", mtx->name);
return 0;}// core/ngx_shmtx.c// 1. 上锁过程ngx_err_tngx_trylock_fd(ngx_fd_t fd){ struct flock fl;
ngx_memzero(&fl, sizeof(struct flock)); fl.l_type = F_WRLCK; fl.l_whence = SEEK_SET;
if (fcntl(fd, F_SETLK, &fl) == -1) { return ngx_errno; }
return 0;}// os/unix/ngx_file.cngx_err_tngx_lock_fd(ngx_fd_t fd){ struct flock fl;
ngx_memzero(&fl, sizeof(struct flock)); fl.l_type = F_WRLCK; fl.l_whence = SEEK_SET; // 调用系统提供的上锁方法 if (fcntl(fd, F_SETLKW, &fl) == -1) { return ngx_errno; }
return 0;}
// 2. 解锁实现// core/ngx_shmtx.cvoidngx_shmtx_unlock(ngx_shmtx_t *mtx){ ngx_err_t err;
err = ngx_unlock_fd(mtx->fd);
if (err == 0) { return; }
ngx_log_abort(err, ngx_unlock_fd_n " %s failed", mtx->name);}// os/unix/ngx_file.cngx_err_tngx_unlock_fd(ngx_fd_t fd){ struct flock fl;
ngx_memzero(&fl, sizeof(struct flock)); fl.l_type = F_UNLCK; fl.l_whence = SEEK_SET;
if (fcntl(fd, F_SETLK, &fl) == -1) { return ngx_errno; }
return 0;}


重点就是 fcntl() 这个系统api的调用,无他。当然,站在一个旁观者角度来看,实际就是因为多进程对文件的操作是可见的,所以达到进程锁的目的。其中,tryLock 和 lock 存在一定的语义差异,即try时,会得到一些是否成功的标识,而直接进行lock时,则不能得到标识。一般会要求阻塞住请求。


3.3. nginx锁实例的初始化

也许在有些地方,一个锁实例的初始化,就是一个变量的简单赋值而已。但在nginx有些不同。首先,需要保证各worker能看到相同的实例或者相当的实例。因为worker是从master处fork()出来的进程,所以只要在master中实例化好的锁,必然可以保证各worker能拿到一样的值。那么,到底是不是只是这样呢?

// 共享锁的初始化,在ngx master 中进行,后fork()到worker进程// event/ngx_event.cstatic ngx_int_tngx_event_module_init(ngx_cycle_t *cycle){    void              ***cf;    u_char              *shared;    size_t               size, cl;    // 定义一段共享内存    ngx_shm_t            shm;    ngx_time_t          *tp;    ngx_core_conf_t     *ccf;    ngx_event_conf_t    *ecf;
cf = ngx_get_conf(cycle->conf_ctx, ngx_events_module); ecf = (*cf)[ngx_event_core_module.ctx_index];
if (!ngx_test_config && ngx_process <= NGX_PROCESS_MASTER) { ngx_log_error(NGX_LOG_NOTICE, cycle->log, 0, "using the \"%s\" event method", ecf->name); }
ccf = (ngx_core_conf_t *) ngx_get_conf(cycle->conf_ctx, ngx_core_module);
ngx_timer_resolution = ccf->timer_resolution;
#if !(NGX_WIN32) { ngx_int_t limit; struct rlimit rlmt;
if (getrlimit(RLIMIT_NOFILE, &rlmt) == -1) { ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno, "getrlimit(RLIMIT_NOFILE) failed, ignored");
} else { if (ecf->connections > (ngx_uint_t) rlmt.rlim_cur && (ccf->rlimit_nofile == NGX_CONF_UNSET || ecf->connections > (ngx_uint_t) ccf->rlimit_nofile)) { limit = (ccf->rlimit_nofile == NGX_CONF_UNSET) ? (ngx_int_t) rlmt.rlim_cur : ccf->rlimit_nofile;
ngx_log_error(NGX_LOG_WARN, cycle->log, 0, "%ui worker_connections exceed " "open file resource limit: %i", ecf->connections, limit); } } }#endif /* !(NGX_WIN32) */

if (ccf->master == 0) { return NGX_OK; }
if (ngx_accept_mutex_ptr) { return NGX_OK; }

/* cl should be equal to or greater than cache line size */
cl = 128;
size = cl /* ngx_accept_mutex */ + cl /* ngx_connection_counter */ + cl; /* ngx_temp_number */
#if (NGX_STAT_STUB)
size += cl /* ngx_stat_accepted */ + cl /* ngx_stat_handled */ + cl /* ngx_stat_requests */ + cl /* ngx_stat_active */ + cl /* ngx_stat_reading */ + cl /* ngx_stat_writing */ + cl; /* ngx_stat_waiting */
#endif
shm.size = size; ngx_str_set(&shm.name, "nginx_shared_zone"); shm.log = cycle->log; // 分配共享内存空间, 使用 mmap 实现 if (ngx_shm_alloc(&shm) != NGX_OK) { return NGX_ERROR; }
shared = shm.addr;
ngx_accept_mutex_ptr = (ngx_atomic_t *) shared; ngx_accept_mutex.spin = (ngx_uint_t) -1; // 基于共享文件或者内存赋值进程锁,从而实现多进程控制 if (ngx_shmtx_create(&ngx_accept_mutex, (ngx_shmtx_sh_t *) shared, cycle->lock_file.data) != NGX_OK) { return NGX_ERROR; }
ngx_connection_counter = (ngx_atomic_t *) (shared + 1 * cl);
(void) ngx_atomic_cmp_set(ngx_connection_counter, 0, 1);
ngx_log_debug2(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "counter: %p, %uA", ngx_connection_counter, *ngx_connection_counter);
ngx_temp_number = (ngx_atomic_t *) (shared + 2 * cl);
tp = ngx_timeofday();
ngx_random_number = (tp->msec << 16) + ngx_pid;
#if (NGX_STAT_STUB)
ngx_stat_accepted = (ngx_atomic_t *) (shared + 3 * cl); ngx_stat_handled = (ngx_atomic_t *) (shared + 4 * cl); ngx_stat_requests = (ngx_atomic_t *) (shared + 5 * cl); ngx_stat_active = (ngx_atomic_t *) (shared + 6 * cl); ngx_stat_reading = (ngx_atomic_t *) (shared + 7 * cl); ngx_stat_writing = (ngx_atomic_t *) (shared + 8 * cl); ngx_stat_waiting = (ngx_atomic_t *) (shared + 9 * cl);
#endif
return NGX_OK;}// core/ngx_shmtx.c// 1. 基于文件进程共享空间, 使用 fdngx_int_tngx_shmtx_create(ngx_shmtx_t *mtx, ngx_shmtx_sh_t *addr, u_char *name){ // 由master进程创建,所以是进程安全的操作,各worker直接使用即可 if (mtx->name) { // 如果已经创建好了,则 fd 已被赋值,不能创建了,直接共享fd即可 // fd 的背后是一个文件实例 if (ngx_strcmp(name, mtx->name) == 0) { mtx->name = name; return NGX_OK; }
ngx_shmtx_destroy(mtx); } // 使用文件创建的方式锁共享 mtx->fd = ngx_open_file(name, NGX_FILE_RDWR, NGX_FILE_CREATE_OR_OPEN, NGX_FILE_DEFAULT_ACCESS);
if (mtx->fd == NGX_INVALID_FILE) { ngx_log_error(NGX_LOG_EMERG, ngx_cycle->log, ngx_errno, ngx_open_file_n " \"%s\" failed", name); return NGX_ERROR; } // 创建完成即可删除,后续只基于该fd实例做锁操作 if (ngx_delete_file(name) == NGX_FILE_ERROR) { ngx_log_error(NGX_LOG_ALERT, ngx_cycle->log, ngx_errno, ngx_delete_file_n " \"%s\" failed", name); }
mtx->name = name;
return NGX_OK;}
// 2. 基于共享内存的共享锁的创建// ngx_shmtx.cngx_int_tngx_shmtx_create(ngx_shmtx_t *mtx, ngx_shmtx_sh_t *addr, u_char *name){ mtx->lock = &addr->lock;
if (mtx->spin == (ngx_uint_t) -1) { return NGX_OK; }
mtx->spin = 2048;
#if (NGX_HAVE_POSIX_SEM)
mtx->wait = &addr->wait;
if (sem_init(&mtx->sem, 1, 0) == -1) { ngx_log_error(NGX_LOG_ALERT, ngx_cycle->log, ngx_errno, "sem_init() failed"); } else { mtx->semaphore = 1; }
#endif
return NGX_OK;}// os/unix/ngx_shmem.cngx_int_tngx_shm_alloc(ngx_shm_t *shm){ shm->addr = (u_char *) mmap(NULL, shm->size, PROT_READ|PROT_WRITE, MAP_ANON|MAP_SHARED, -1, 0);
if (shm->addr == MAP_FAILED) { ngx_log_error(NGX_LOG_ALERT, shm->log, ngx_errno, "mmap(MAP_ANON|MAP_SHARED, %uz) failed", shm->size); return NGX_ERROR; }
return NGX_OK;}

基于fd的锁实现,本质是基于其背后的文件系统的实现,因为文件系统是进程可见的,所以对于相同fd控制,就是对共同的锁的控制了。 


3.4. 基于共享内存的上锁/解锁实现

所谓共享内存,实际就是一块公共的内存区域,它超出了进程的范围(受操作系统管理)。就是前面我们看到的mmap()的创建,就是一块共享内存。

// ngx_shmtx.cngx_uint_tngx_shmtx_trylock(ngx_shmtx_t *mtx){    // 直接对共享内存区域的值进行改变    // cas 改变成功即是上锁成功。    return (*mtx->lock == 0 && ngx_atomic_cmp_set(mtx->lock, 0, ngx_pid));}
// shm版本的解锁操作, cas 解析,带通知voidngx_shmtx_unlock(ngx_shmtx_t *mtx){ if (mtx->spin != (ngx_uint_t) -1) { ngx_log_debug0(NGX_LOG_DEBUG_CORE, ngx_cycle->log, 0, "shmtx unlock"); }
if (ngx_atomic_cmp_set(mtx->lock, ngx_pid, 0)) { ngx_shmtx_wakeup(mtx); }}// 通知等待进程static voidngx_shmtx_wakeup(ngx_shmtx_t *mtx){#if (NGX_HAVE_POSIX_SEM) ngx_atomic_uint_t wait;
if (!mtx->semaphore) { return; }
for ( ;; ) {
wait = *mtx->wait;
if ((ngx_atomic_int_t) wait <= 0) { return; }
if (ngx_atomic_cmp_set(mtx->wait, wait, wait - 1)) { break; } }
ngx_log_debug1(NGX_LOG_DEBUG_CORE, ngx_cycle->log, 0, "shmtx wake %uA", wait);
if (sem_post(&mtx->sem) == -1) { ngx_log_error(NGX_LOG_ALERT, ngx_cycle->log, ngx_errno, "sem_post() failed while wake shmtx"); }
#endif}

共享内存版本的锁的实现,基本就是cas的对内存变量的设置。只是这个面向的内存,是共享区域的内存。

4:说到底锁的含义是什么

见过了许多的锁,依然过不好这一关。

锁到底是什么呢?事实上,锁就是一个标识位。当有人看到这个标识位后,就主动停止操作,或者进行等等,从而使其看起来起到了锁的作用。这个标识位,可以设置在某个对象中,也可以为设置在某个全局值中,还可以借助于各种存在介质,比如文件,比如redis,比如zk 。这都没有差别。因为问题关键不在存放在哪里,而在于如何安全地设置这个标识位。

要实现锁,一般都需要要一个强有力的底层含义保证,比如cpu层面的cas操作,应用级别的队列串行原子操作。。。

到于什么,内存锁,文件锁,高级锁,都是有各自的应用场景。而要选好各种锁,则变成了评价高低地关键。此时此刻,你应该能判断出来的!



往期精彩推荐



腾讯、阿里、滴滴后台面试题汇总总结 — (含答案)

面试:史上最全多线程面试题 !

最新阿里内推Java后端面试题

JVM难学?那是因为你没认真看完这篇文章


END


关注作者微信公众号 —《JAVA烂猪皮》


了解更多java后端架构知识以及最新面试宝典


你点的每个好看,我都认真当成了


看完本文记得给作者点赞+在看哦~~~大家的支持,是作者源源不断出文的动力


作者:等你归去来

出处:https://www.cnblogs.com/yougewe/p/14445915.html

浏览 21
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报