Protecting context structures with mutex.
By design, Unit context is created for the thread which reads messages from the router. However, Go request handlers are called in a separate goroutine that may be executed in a different thread. To avoid a racing condition, access to lists of free structures in the context should be serialized. This patch should fix random crashes in Go applications under high load. This is related to #253 and #309 issues on GitHub.
This commit is contained in:
@@ -31,7 +31,7 @@ typedef struct nxt_unit_request_info_impl_s nxt_unit_request_info_impl_t;
|
|||||||
typedef struct nxt_unit_websocket_frame_impl_s nxt_unit_websocket_frame_impl_t;
|
typedef struct nxt_unit_websocket_frame_impl_s nxt_unit_websocket_frame_impl_t;
|
||||||
|
|
||||||
static nxt_unit_impl_t *nxt_unit_create(nxt_unit_init_t *init);
|
static nxt_unit_impl_t *nxt_unit_create(nxt_unit_init_t *init);
|
||||||
static void nxt_unit_ctx_init(nxt_unit_impl_t *lib,
|
static int nxt_unit_ctx_init(nxt_unit_impl_t *lib,
|
||||||
nxt_unit_ctx_impl_t *ctx_impl, void *data);
|
nxt_unit_ctx_impl_t *ctx_impl, void *data);
|
||||||
nxt_inline void nxt_unit_mmap_buf_insert(nxt_unit_mmap_buf_t **head,
|
nxt_inline void nxt_unit_mmap_buf_insert(nxt_unit_mmap_buf_t **head,
|
||||||
nxt_unit_mmap_buf_t *mmap_buf);
|
nxt_unit_mmap_buf_t *mmap_buf);
|
||||||
@@ -204,6 +204,8 @@ struct nxt_unit_websocket_frame_impl_s {
|
|||||||
struct nxt_unit_ctx_impl_s {
|
struct nxt_unit_ctx_impl_s {
|
||||||
nxt_unit_ctx_t ctx;
|
nxt_unit_ctx_t ctx;
|
||||||
|
|
||||||
|
pthread_mutex_t mutex;
|
||||||
|
|
||||||
nxt_unit_port_id_t read_port_id;
|
nxt_unit_port_id_t read_port_id;
|
||||||
int read_port_fd;
|
int read_port_fd;
|
||||||
|
|
||||||
@@ -402,7 +404,10 @@ nxt_unit_create(nxt_unit_init_t *init)
|
|||||||
|
|
||||||
nxt_queue_init(&lib->contexts);
|
nxt_queue_init(&lib->contexts);
|
||||||
|
|
||||||
nxt_unit_ctx_init(lib, &lib->main_ctx, init->ctx_data);
|
rc = nxt_unit_ctx_init(lib, &lib->main_ctx, init->ctx_data);
|
||||||
|
if (nxt_slow_path(rc != NXT_UNIT_OK)) {
|
||||||
|
goto fail;
|
||||||
|
}
|
||||||
|
|
||||||
cb = &lib->callbacks;
|
cb = &lib->callbacks;
|
||||||
|
|
||||||
@@ -446,15 +451,24 @@ fail:
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
static void
|
static int
|
||||||
nxt_unit_ctx_init(nxt_unit_impl_t *lib, nxt_unit_ctx_impl_t *ctx_impl,
|
nxt_unit_ctx_init(nxt_unit_impl_t *lib, nxt_unit_ctx_impl_t *ctx_impl,
|
||||||
void *data)
|
void *data)
|
||||||
{
|
{
|
||||||
|
int rc;
|
||||||
|
|
||||||
ctx_impl->ctx.data = data;
|
ctx_impl->ctx.data = data;
|
||||||
ctx_impl->ctx.unit = &lib->unit;
|
ctx_impl->ctx.unit = &lib->unit;
|
||||||
|
|
||||||
nxt_queue_insert_tail(&lib->contexts, &ctx_impl->link);
|
nxt_queue_insert_tail(&lib->contexts, &ctx_impl->link);
|
||||||
|
|
||||||
|
rc = pthread_mutex_init(&ctx_impl->mutex, NULL);
|
||||||
|
if (nxt_slow_path(rc != 0)) {
|
||||||
|
nxt_unit_alert(NULL, "failed to initialize mutex (%d)", rc);
|
||||||
|
|
||||||
|
return NXT_UNIT_ERROR;
|
||||||
|
}
|
||||||
|
|
||||||
nxt_queue_init(&ctx_impl->free_req);
|
nxt_queue_init(&ctx_impl->free_req);
|
||||||
nxt_queue_init(&ctx_impl->free_ws);
|
nxt_queue_init(&ctx_impl->free_ws);
|
||||||
nxt_queue_init(&ctx_impl->active_req);
|
nxt_queue_init(&ctx_impl->active_req);
|
||||||
@@ -470,6 +484,8 @@ nxt_unit_ctx_init(nxt_unit_impl_t *lib, nxt_unit_ctx_impl_t *ctx_impl,
|
|||||||
|
|
||||||
ctx_impl->read_port_fd = -1;
|
ctx_impl->read_port_fd = -1;
|
||||||
ctx_impl->requests.slot = 0;
|
ctx_impl->requests.slot = 0;
|
||||||
|
|
||||||
|
return NXT_UNIT_OK;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@@ -1029,7 +1045,11 @@ nxt_unit_request_info_get(nxt_unit_ctx_t *ctx)
|
|||||||
|
|
||||||
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
|
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
|
||||||
|
|
||||||
|
pthread_mutex_lock(&ctx_impl->mutex);
|
||||||
|
|
||||||
if (nxt_queue_is_empty(&ctx_impl->free_req)) {
|
if (nxt_queue_is_empty(&ctx_impl->free_req)) {
|
||||||
|
pthread_mutex_unlock(&ctx_impl->mutex);
|
||||||
|
|
||||||
req_impl = malloc(sizeof(nxt_unit_request_info_impl_t)
|
req_impl = malloc(sizeof(nxt_unit_request_info_impl_t)
|
||||||
+ lib->request_data_size);
|
+ lib->request_data_size);
|
||||||
if (nxt_slow_path(req_impl == NULL)) {
|
if (nxt_slow_path(req_impl == NULL)) {
|
||||||
@@ -1041,6 +1061,8 @@ nxt_unit_request_info_get(nxt_unit_ctx_t *ctx)
|
|||||||
req_impl->req.unit = ctx->unit;
|
req_impl->req.unit = ctx->unit;
|
||||||
req_impl->req.ctx = ctx;
|
req_impl->req.ctx = ctx;
|
||||||
|
|
||||||
|
pthread_mutex_lock(&ctx_impl->mutex);
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
lnk = nxt_queue_first(&ctx_impl->free_req);
|
lnk = nxt_queue_first(&ctx_impl->free_req);
|
||||||
nxt_queue_remove(lnk);
|
nxt_queue_remove(lnk);
|
||||||
@@ -1050,6 +1072,8 @@ nxt_unit_request_info_get(nxt_unit_ctx_t *ctx)
|
|||||||
|
|
||||||
nxt_queue_insert_tail(&ctx_impl->active_req, &req_impl->link);
|
nxt_queue_insert_tail(&ctx_impl->active_req, &req_impl->link);
|
||||||
|
|
||||||
|
pthread_mutex_unlock(&ctx_impl->mutex);
|
||||||
|
|
||||||
req_impl->req.data = lib->request_data_size ? req_impl->extra_data : NULL;
|
req_impl->req.data = lib->request_data_size ? req_impl->extra_data : NULL;
|
||||||
|
|
||||||
return req_impl;
|
return req_impl;
|
||||||
@@ -1088,10 +1112,14 @@ nxt_unit_request_info_release(nxt_unit_request_info_t *req)
|
|||||||
nxt_unit_mmap_buf_free(req_impl->incoming_buf);
|
nxt_unit_mmap_buf_free(req_impl->incoming_buf);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pthread_mutex_lock(&ctx_impl->mutex);
|
||||||
|
|
||||||
nxt_queue_remove(&req_impl->link);
|
nxt_queue_remove(&req_impl->link);
|
||||||
|
|
||||||
nxt_queue_insert_tail(&ctx_impl->free_req, &req_impl->link);
|
nxt_queue_insert_tail(&ctx_impl->free_req, &req_impl->link);
|
||||||
|
|
||||||
|
pthread_mutex_unlock(&ctx_impl->mutex);
|
||||||
|
|
||||||
req_impl->state = NXT_UNIT_RS_RELEASED;
|
req_impl->state = NXT_UNIT_RS_RELEASED;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1120,7 +1148,11 @@ nxt_unit_websocket_frame_get(nxt_unit_ctx_t *ctx)
|
|||||||
|
|
||||||
ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
|
ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
|
||||||
|
|
||||||
|
pthread_mutex_lock(&ctx_impl->mutex);
|
||||||
|
|
||||||
if (nxt_queue_is_empty(&ctx_impl->free_ws)) {
|
if (nxt_queue_is_empty(&ctx_impl->free_ws)) {
|
||||||
|
pthread_mutex_unlock(&ctx_impl->mutex);
|
||||||
|
|
||||||
ws_impl = malloc(sizeof(nxt_unit_websocket_frame_impl_t));
|
ws_impl = malloc(sizeof(nxt_unit_websocket_frame_impl_t));
|
||||||
if (nxt_slow_path(ws_impl == NULL)) {
|
if (nxt_slow_path(ws_impl == NULL)) {
|
||||||
nxt_unit_warn(ctx, "websocket frame allocation failed");
|
nxt_unit_warn(ctx, "websocket frame allocation failed");
|
||||||
@@ -1132,6 +1164,8 @@ nxt_unit_websocket_frame_get(nxt_unit_ctx_t *ctx)
|
|||||||
lnk = nxt_queue_first(&ctx_impl->free_ws);
|
lnk = nxt_queue_first(&ctx_impl->free_ws);
|
||||||
nxt_queue_remove(lnk);
|
nxt_queue_remove(lnk);
|
||||||
|
|
||||||
|
pthread_mutex_unlock(&ctx_impl->mutex);
|
||||||
|
|
||||||
ws_impl = nxt_container_of(lnk, nxt_unit_websocket_frame_impl_t, link);
|
ws_impl = nxt_container_of(lnk, nxt_unit_websocket_frame_impl_t, link);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1160,7 +1194,11 @@ nxt_unit_websocket_frame_release(nxt_unit_websocket_frame_t *ws)
|
|||||||
ws_impl->retain_buf = NULL;
|
ws_impl->retain_buf = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pthread_mutex_lock(&ws_impl->ctx_impl->mutex);
|
||||||
|
|
||||||
nxt_queue_insert_tail(&ws_impl->ctx_impl->free_ws, &ws_impl->link);
|
nxt_queue_insert_tail(&ws_impl->ctx_impl->free_ws, &ws_impl->link);
|
||||||
|
|
||||||
|
pthread_mutex_unlock(&ws_impl->ctx_impl->mutex);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@@ -1688,16 +1726,24 @@ nxt_unit_mmap_buf_get(nxt_unit_ctx_t *ctx)
|
|||||||
|
|
||||||
ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
|
ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
|
||||||
|
|
||||||
|
pthread_mutex_lock(&ctx_impl->mutex);
|
||||||
|
|
||||||
if (ctx_impl->free_buf == NULL) {
|
if (ctx_impl->free_buf == NULL) {
|
||||||
|
pthread_mutex_unlock(&ctx_impl->mutex);
|
||||||
|
|
||||||
mmap_buf = malloc(sizeof(nxt_unit_mmap_buf_t));
|
mmap_buf = malloc(sizeof(nxt_unit_mmap_buf_t));
|
||||||
if (nxt_slow_path(mmap_buf == NULL)) {
|
if (nxt_slow_path(mmap_buf == NULL)) {
|
||||||
nxt_unit_warn(ctx, "failed to allocate buf");
|
nxt_unit_warn(ctx, "failed to allocate buf");
|
||||||
|
|
||||||
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
mmap_buf = ctx_impl->free_buf;
|
mmap_buf = ctx_impl->free_buf;
|
||||||
|
|
||||||
nxt_unit_mmap_buf_remove(mmap_buf);
|
nxt_unit_mmap_buf_remove(mmap_buf);
|
||||||
|
|
||||||
|
pthread_mutex_unlock(&ctx_impl->mutex);
|
||||||
}
|
}
|
||||||
|
|
||||||
mmap_buf->ctx_impl = ctx_impl;
|
mmap_buf->ctx_impl = ctx_impl;
|
||||||
@@ -1711,7 +1757,11 @@ nxt_unit_mmap_buf_release(nxt_unit_mmap_buf_t *mmap_buf)
|
|||||||
{
|
{
|
||||||
nxt_unit_mmap_buf_remove(mmap_buf);
|
nxt_unit_mmap_buf_remove(mmap_buf);
|
||||||
|
|
||||||
|
pthread_mutex_lock(&mmap_buf->ctx_impl->mutex);
|
||||||
|
|
||||||
nxt_unit_mmap_buf_insert(&mmap_buf->ctx_impl->free_buf, mmap_buf);
|
nxt_unit_mmap_buf_insert(&mmap_buf->ctx_impl->free_buf, mmap_buf);
|
||||||
|
|
||||||
|
pthread_mutex_unlock(&mmap_buf->ctx_impl->mutex);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@@ -3298,7 +3348,14 @@ nxt_unit_ctx_alloc(nxt_unit_ctx_t *ctx, void *data)
|
|||||||
|
|
||||||
close(fd);
|
close(fd);
|
||||||
|
|
||||||
nxt_unit_ctx_init(lib, new_ctx, data);
|
rc = nxt_unit_ctx_init(lib, new_ctx, data);
|
||||||
|
if (nxt_slow_path(rc != NXT_UNIT_OK)) {
|
||||||
|
lib->callbacks.remove_port(ctx, &new_port_id);
|
||||||
|
|
||||||
|
free(new_ctx);
|
||||||
|
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
new_ctx->read_port_id = new_port_id;
|
new_ctx->read_port_id = new_port_id;
|
||||||
|
|
||||||
@@ -3350,6 +3407,8 @@ nxt_unit_ctx_free(nxt_unit_ctx_t *ctx)
|
|||||||
|
|
||||||
} nxt_queue_loop;
|
} nxt_queue_loop;
|
||||||
|
|
||||||
|
pthread_mutex_destroy(&ctx_impl->mutex);
|
||||||
|
|
||||||
nxt_queue_remove(&ctx_impl->link);
|
nxt_queue_remove(&ctx_impl->link);
|
||||||
|
|
||||||
if (ctx_impl != &lib->main_ctx) {
|
if (ctx_impl != &lib->main_ctx) {
|
||||||
|
|||||||
Reference in New Issue
Block a user