Introducing port messages to notify about out of shared memory.

- OOSM (out of shared memory).  Sent by application process to router
  when application reaches the limit of allocated shared memory and
  needs more.
- SHM_ACK.  Sent by router to application when the application's shared
  memory is released and the OOSM flag is enabled for the segment.

This implements blocking mode (the library waits for SHM_ACK in case of
out of shared memory condition and retries allocating the required memory
amount) and non-blocking mode (the library notifies the application that
it's out of shared memory and returns control to the application module
that sets up the output queue and puts SHM_ACK in the main message loop).
This commit is contained in:
Max Romanov
2019-12-24 18:04:13 +03:00
parent 64f649f990
commit df7caf4650
6 changed files with 568 additions and 69 deletions

View File

@@ -44,6 +44,9 @@ struct nxt_port_handlers_s {
/* Various data. */
nxt_port_handler_t data;
nxt_port_handler_t oosm;
nxt_port_handler_t shm_ack;
};
@@ -82,6 +85,9 @@ typedef enum {
_NXT_PORT_MSG_DATA = nxt_port_handler_idx(data),
_NXT_PORT_MSG_OOSM = nxt_port_handler_idx(oosm),
_NXT_PORT_MSG_SHM_ACK = nxt_port_handler_idx(shm_ack),
NXT_PORT_MSG_MAX = sizeof(nxt_port_handlers_t)
/ sizeof(nxt_port_handler_t),
@@ -114,6 +120,9 @@ typedef enum {
NXT_PORT_MSG_DATA = _NXT_PORT_MSG_DATA,
NXT_PORT_MSG_DATA_LAST = _NXT_PORT_MSG_DATA | NXT_PORT_MSG_LAST,
NXT_PORT_MSG_OOSM = _NXT_PORT_MSG_OOSM | NXT_PORT_MSG_LAST,
NXT_PORT_MSG_SHM_ACK = _NXT_PORT_MSG_SHM_ACK | NXT_PORT_MSG_LAST,
} nxt_port_msg_type_t;

View File

@@ -112,6 +112,8 @@ nxt_port_mmap_buf_completion(nxt_task_t *task, void *obj, void *data)
u_char *p;
nxt_mp_t *mp;
nxt_buf_t *b;
nxt_port_t *port;
nxt_process_t *process;
nxt_chunk_id_t c;
nxt_port_mmap_header_t *hdr;
nxt_port_mmap_handler_t *mmap_handler;
@@ -163,6 +165,21 @@ nxt_port_mmap_buf_completion(nxt_task_t *task, void *obj, void *data)
c++;
}
if (hdr->dst_pid == nxt_pid
&& nxt_atomic_cmp_set(&hdr->oosm, 1, 0))
{
process = nxt_runtime_process_find(task->thread->runtime, hdr->src_pid);
if (process != NULL && !nxt_queue_is_empty(&process->ports)) {
port = nxt_process_port_first(process);
if (port->type == NXT_PROCESS_WORKER) {
(void) nxt_port_socket_write(task, port, NXT_PORT_MSG_SHM_ACK,
-1, 0, 0, NULL);
}
}
}
release_buf:
nxt_port_mmap_handler_use(mmap_handler, -1);
@@ -454,6 +471,8 @@ nxt_port_mmap_get(nxt_task_t *task, nxt_port_t *port, nxt_chunk_id_t *c,
goto unlock_return;
}
}
hdr->oosm = 1;
}
/* TODO introduce port_mmap limit and release wait. */

View File

@@ -51,6 +51,7 @@ struct nxt_port_mmap_header_s {
nxt_pid_t src_pid; /* For sanity check. */
nxt_pid_t dst_pid; /* For sanity check. */
nxt_port_id_t sent_over;
nxt_atomic_t oosm;
nxt_free_map_t free_map[MAX_FREE_IDX];
nxt_free_map_t free_map_padding;
nxt_free_map_t free_tracking_map[MAX_FREE_IDX];

View File

@@ -248,6 +248,7 @@ static nxt_int_t nxt_router_http_request_done(nxt_task_t *task,
nxt_http_request_t *r);
static void nxt_router_http_request_release(nxt_task_t *task, void *obj,
void *data);
static void nxt_router_oosm_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
extern const nxt_http_request_state_t nxt_http_websocket;
@@ -276,6 +277,7 @@ nxt_port_handlers_t nxt_router_process_port_handlers = {
.access_log = nxt_router_access_log_reopen_handler,
.rpc_ready = nxt_port_rpc_handler,
.rpc_error = nxt_port_rpc_handler,
.oosm = nxt_router_oosm_handler,
};
@@ -2748,6 +2750,7 @@ static nxt_port_handlers_t nxt_router_app_port_handlers = {
.rpc_error = nxt_port_rpc_handler,
.mmap = nxt_port_mmap_handler,
.data = nxt_port_rpc_handler,
.oosm = nxt_router_oosm_handler,
};
@@ -5241,3 +5244,56 @@ nxt_router_http_request_release(nxt_task_t *task, void *obj, void *data)
nxt_mp_release(r->mem_pool);
}
static void
nxt_router_oosm_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
{
size_t mi;
uint32_t i;
nxt_bool_t ack;
nxt_process_t *process;
nxt_free_map_t *m;
nxt_port_mmap_header_t *hdr;
nxt_debug(task, "oosm in %PI", msg->port_msg.pid);
process = nxt_runtime_process_find(task->thread->runtime,
msg->port_msg.pid);
if (nxt_slow_path(process == NULL)) {
return;
}
ack = 0;
/*
* To mitigate possible racing condition (when OOSM message received
* after some of the memory was already freed), need to try to find
* first free segment in shared memory and send ACK if found.
*/
nxt_thread_mutex_lock(&process->incoming.mutex);
for (i = 0; i < process->incoming.size; i++) {
hdr = process->incoming.elts[i].mmap_handler->hdr;
m = hdr->free_map;
for (mi = 0; mi < MAX_FREE_IDX; mi++) {
if (m[mi] != 0) {
ack = 1;
nxt_debug(task, "oosm: already free #%uD %uz = 0x%08xA",
i, mi, m[mi]);
break;
}
}
}
nxt_thread_mutex_unlock(&process->incoming.mutex);
if (ack) {
(void) nxt_port_socket_write(task, msg->port, NXT_PORT_MSG_SHM_ACK,
-1, 0, 0, NULL);
}
}

View File

@@ -19,6 +19,10 @@
#include <linux/memfd.h>
#endif
#define NXT_UNIT_MAX_PLAIN_SIZE 1024
#define NXT_UNIT_LOCAL_BUF_SIZE \
(NXT_UNIT_MAX_PLAIN_SIZE + sizeof(nxt_port_msg_t))
#define NXT_UNIT_MAX_PLAIN_SIZE 1024
#define NXT_UNIT_LOCAL_BUF_SIZE \
(NXT_UNIT_MAX_PLAIN_SIZE + sizeof(nxt_port_msg_t))
@@ -29,6 +33,7 @@ typedef struct nxt_unit_mmaps_s nxt_unit_mmaps_t;
typedef struct nxt_unit_process_s nxt_unit_process_t;
typedef struct nxt_unit_mmap_buf_s nxt_unit_mmap_buf_t;
typedef struct nxt_unit_recv_msg_s nxt_unit_recv_msg_t;
typedef struct nxt_unit_read_buf_s nxt_unit_read_buf_t;
typedef struct nxt_unit_ctx_impl_s nxt_unit_ctx_impl_t;
typedef struct nxt_unit_port_impl_s nxt_unit_port_impl_t;
typedef struct nxt_unit_request_info_impl_s nxt_unit_request_info_impl_t;
@@ -53,6 +58,7 @@ static int nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx,
nxt_unit_recv_msg_t *recv_msg);
static int nxt_unit_process_websocket(nxt_unit_ctx_t *ctx,
nxt_unit_recv_msg_t *recv_msg);
static int nxt_unit_process_shm_ack(nxt_unit_ctx_t *ctx);
static nxt_unit_request_info_impl_t *nxt_unit_request_info_get(
nxt_unit_ctx_t *ctx);
static void nxt_unit_request_info_release(nxt_unit_request_info_t *req);
@@ -69,11 +75,18 @@ static int nxt_unit_mmap_buf_send(nxt_unit_ctx_t *ctx, uint32_t stream,
nxt_unit_mmap_buf_t *mmap_buf, int last);
static void nxt_unit_mmap_buf_free(nxt_unit_mmap_buf_t *mmap_buf);
static void nxt_unit_free_outgoing_buf(nxt_unit_mmap_buf_t *mmap_buf);
static nxt_unit_read_buf_t *nxt_unit_read_buf_get(nxt_unit_ctx_t *ctx);
static nxt_unit_read_buf_t *nxt_unit_read_buf_get_impl(
nxt_unit_ctx_impl_t *ctx_impl);
static void nxt_unit_read_buf_release(nxt_unit_ctx_t *ctx,
nxt_unit_read_buf_t *rbuf);
static ssize_t nxt_unit_buf_read(nxt_unit_buf_t **b, uint64_t *len, void *dst,
size_t size);
static nxt_port_mmap_header_t *nxt_unit_mmap_get(nxt_unit_ctx_t *ctx,
nxt_unit_process_t *process, nxt_unit_port_id_t *port_id,
nxt_chunk_id_t *c, int n);
nxt_chunk_id_t *c, int *n, int min_n);
static int nxt_unit_send_oosm(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id);
static int nxt_unit_wait_shm_ack(nxt_unit_ctx_t *ctx);
static nxt_unit_mmap_t *nxt_unit_mmap_at(nxt_unit_mmaps_t *mmaps, uint32_t i);
static nxt_port_mmap_header_t *nxt_unit_new_mmap(nxt_unit_ctx_t *ctx,
nxt_unit_process_t *process, nxt_unit_port_id_t *port_id, int n);
@@ -81,7 +94,7 @@ static int nxt_unit_send_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id,
int fd);
static int nxt_unit_get_outgoing_buf(nxt_unit_ctx_t *ctx,
nxt_unit_process_t *process, nxt_unit_port_id_t *port_id, uint32_t size,
nxt_unit_mmap_buf_t *mmap_buf, char *local_buf);
uint32_t min_size, nxt_unit_mmap_buf_t *mmap_buf, char *local_buf);
static int nxt_unit_incoming_mmap(nxt_unit_ctx_t *ctx, pid_t pid, int fd);
static void nxt_unit_mmaps_init(nxt_unit_mmaps_t *mmaps);
@@ -94,14 +107,18 @@ static int nxt_unit_tracking_read(nxt_unit_ctx_t *ctx,
nxt_unit_recv_msg_t *recv_msg);
static int nxt_unit_mmap_read(nxt_unit_ctx_t *ctx,
nxt_unit_recv_msg_t *recv_msg);
static int nxt_unit_mmap_release(nxt_port_mmap_header_t *hdr, void *start,
uint32_t size);
static void nxt_unit_mmap_release(nxt_unit_ctx_t *ctx,
nxt_unit_process_t *process,
nxt_port_mmap_header_t *hdr, void *start, uint32_t size);
static int nxt_unit_send_shm_ack(nxt_unit_ctx_t *ctx, pid_t pid);
static nxt_unit_process_t *nxt_unit_process_get(nxt_unit_ctx_t *ctx,
pid_t pid);
static nxt_unit_process_t *nxt_unit_process_find(nxt_unit_ctx_t *ctx,
pid_t pid, int remove);
static nxt_unit_process_t *nxt_unit_process_pop_first(nxt_unit_impl_t *lib);
static void nxt_unit_read_buf(nxt_unit_ctx_t *ctx,
nxt_unit_read_buf_t *rbuf);
static int nxt_unit_create_port(nxt_unit_ctx_t *ctx,
nxt_unit_port_id_t *port_id, int *fd);
@@ -144,6 +161,7 @@ struct nxt_unit_mmap_buf_s {
nxt_unit_port_id_t port_id;
nxt_unit_request_info_t *req;
nxt_unit_ctx_impl_t *ctx_impl;
nxt_unit_process_t *process;
char *free_ptr;
char *plain_ptr;
};
@@ -206,6 +224,14 @@ struct nxt_unit_websocket_frame_impl_s {
};
struct nxt_unit_read_buf_s {
nxt_unit_read_buf_t *next;
ssize_t size;
char buf[16384];
char oob[256];
};
struct nxt_unit_ctx_impl_s {
nxt_unit_ctx_t ctx;
@@ -230,7 +256,12 @@ struct nxt_unit_ctx_impl_s {
/* of nxt_unit_request_info_impl_t */
nxt_lvlhsh_t requests;
nxt_unit_read_buf_t *pending_read_head;
nxt_unit_read_buf_t **pending_read_tail;
nxt_unit_read_buf_t *free_read_buf;
nxt_unit_mmap_buf_t ctx_buf[2];
nxt_unit_read_buf_t ctx_read_buf;
nxt_unit_request_info_impl_t req;
};
@@ -277,6 +308,7 @@ struct nxt_unit_mmaps_s {
pthread_mutex_t mutex;
uint32_t size;
uint32_t cap;
nxt_atomic_t allocated_chunks;
nxt_unit_mmap_t *elts;
};
@@ -495,6 +527,11 @@ nxt_unit_ctx_init(nxt_unit_impl_t *lib, nxt_unit_ctx_impl_t *ctx_impl,
nxt_queue_insert_tail(&ctx_impl->free_req, &ctx_impl->req.link);
ctx_impl->pending_read_head = NULL;
ctx_impl->pending_read_tail = &ctx_impl->pending_read_head;
ctx_impl->free_read_buf = &ctx_impl->ctx_read_buf;
ctx_impl->ctx_read_buf.next = NULL;
ctx_impl->req.req.ctx = &ctx_impl->ctx;
ctx_impl->req.req.unit = &lib->unit;
@@ -772,6 +809,10 @@ nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id,
rc = NXT_UNIT_OK;
break;
case _NXT_PORT_MSG_SHM_ACK:
rc = nxt_unit_process_shm_ack(ctx);
break;
default:
nxt_unit_debug(ctx, "#%"PRIu32": ignore message type: %d",
port_msg->stream, (int) port_msg->type);
@@ -1052,6 +1093,23 @@ nxt_unit_process_websocket(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
}
static int
nxt_unit_process_shm_ack(nxt_unit_ctx_t *ctx)
{
nxt_unit_impl_t *lib;
nxt_unit_callbacks_t *cb;
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
cb = &lib->callbacks;
if (cb->shm_ack_handler != NULL) {
cb->shm_ack_handler(ctx);
}
return NXT_UNIT_OK;
}
static nxt_unit_request_info_impl_t *
nxt_unit_request_info_get(nxt_unit_ctx_t *ctx)
{
@@ -1705,7 +1763,8 @@ nxt_unit_response_buf_alloc(nxt_unit_request_info_t *req, uint32_t size)
nxt_unit_mmap_buf_insert_tail(&req_impl->outgoing_buf, mmap_buf);
rc = nxt_unit_get_outgoing_buf(req->ctx, req_impl->process,
&req->response_port, size, mmap_buf, NULL);
&req->response_port, size, size, mmap_buf,
NULL);
if (nxt_slow_path(rc != NXT_UNIT_OK)) {
nxt_unit_mmap_buf_release(mmap_buf);
@@ -1947,6 +2006,7 @@ nxt_unit_mmap_buf_send(nxt_unit_ctx_t *ctx, uint32_t stream,
nxt_port_mmap_msg_t mmap_msg;
} m;
int rc;
u_char *last_used, *first_free;
ssize_t res;
nxt_chunk_id_t first_free_chunk;
@@ -1971,6 +2031,8 @@ nxt_unit_mmap_buf_send(nxt_unit_ctx_t *ctx, uint32_t stream,
m.msg.mf = 0;
m.msg.tracking = 0;
rc = NXT_UNIT_ERROR;
if (m.msg.mmap) {
m.mmap_msg.mmap_id = hdr->id;
m.mmap_msg.chunk_id = nxt_port_mmap_chunk_id(hdr,
@@ -1985,13 +2047,13 @@ nxt_unit_mmap_buf_send(nxt_unit_ctx_t *ctx, uint32_t stream,
res = lib->callbacks.port_send(ctx, &mmap_buf->port_id, &m, sizeof(m),
NULL, 0);
if (nxt_slow_path(res != sizeof(m))) {
return NXT_UNIT_ERROR;
goto free_buf;
}
if (buf->end - buf->free >= PORT_MMAP_CHUNK_SIZE) {
last_used = (u_char *) buf->free - 1;
last_used = (u_char *) buf->free - 1;
first_free_chunk = nxt_port_mmap_chunk_id(hdr, last_used) + 1;
first_free_chunk = nxt_port_mmap_chunk_id(hdr, last_used) + 1;
if (buf->end - buf->free >= PORT_MMAP_CHUNK_SIZE) {
first_free = nxt_port_mmap_chunk_start(hdr, first_free_chunk);
buf->start = (char *) first_free;
@@ -2009,6 +2071,13 @@ nxt_unit_mmap_buf_send(nxt_unit_ctx_t *ctx, uint32_t stream,
mmap_buf->hdr = NULL;
}
nxt_atomic_fetch_add(&mmap_buf->process->outgoing.allocated_chunks,
(int) m.mmap_msg.chunk_id - (int) first_free_chunk);
nxt_unit_debug(ctx, "process %d allocated_chunks %d",
mmap_buf->process->pid,
mmap_buf->process->outgoing.allocated_chunks);
} else {
if (nxt_slow_path(mmap_buf->plain_ptr == NULL
|| mmap_buf->plain_ptr > buf->start - sizeof(m.msg)))
@@ -2016,7 +2085,7 @@ nxt_unit_mmap_buf_send(nxt_unit_ctx_t *ctx, uint32_t stream,
nxt_unit_warn(ctx, "#%"PRIu32": failed to send plain memory buffer"
": no space reserved for message header", stream);
return NXT_UNIT_ERROR;
goto free_buf;
}
memcpy(buf->start - sizeof(m.msg), &m.msg, sizeof(m.msg));
@@ -2030,11 +2099,17 @@ nxt_unit_mmap_buf_send(nxt_unit_ctx_t *ctx, uint32_t stream,
m.mmap_msg.size + sizeof(m.msg),
NULL, 0);
if (nxt_slow_path(res != (ssize_t) (m.mmap_msg.size + sizeof(m.msg)))) {
return NXT_UNIT_ERROR;
goto free_buf;
}
}
return NXT_UNIT_OK;
rc = NXT_UNIT_OK;
free_buf:
nxt_unit_free_outgoing_buf(mmap_buf);
return rc;
}
@@ -2058,12 +2133,73 @@ static void
nxt_unit_free_outgoing_buf(nxt_unit_mmap_buf_t *mmap_buf)
{
if (mmap_buf->hdr != NULL) {
nxt_unit_mmap_release(mmap_buf->hdr, mmap_buf->buf.start,
nxt_unit_mmap_release(&mmap_buf->ctx_impl->ctx,
mmap_buf->process,
mmap_buf->hdr, mmap_buf->buf.start,
mmap_buf->buf.end - mmap_buf->buf.start);
} else if (mmap_buf->free_ptr != NULL) {
free(mmap_buf->free_ptr);
mmap_buf->hdr = NULL;
return;
}
if (mmap_buf->free_ptr != NULL) {
free(mmap_buf->free_ptr);
mmap_buf->free_ptr = NULL;
}
}
static nxt_unit_read_buf_t *
nxt_unit_read_buf_get(nxt_unit_ctx_t *ctx)
{
nxt_unit_ctx_impl_t *ctx_impl;
ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
pthread_mutex_lock(&ctx_impl->mutex);
return nxt_unit_read_buf_get_impl(ctx_impl);
}
static nxt_unit_read_buf_t *
nxt_unit_read_buf_get_impl(nxt_unit_ctx_impl_t *ctx_impl)
{
nxt_unit_read_buf_t *rbuf;
if (ctx_impl->free_read_buf != NULL) {
rbuf = ctx_impl->free_read_buf;
ctx_impl->free_read_buf = rbuf->next;
pthread_mutex_unlock(&ctx_impl->mutex);
return rbuf;
}
pthread_mutex_unlock(&ctx_impl->mutex);
rbuf = malloc(sizeof(nxt_unit_read_buf_t));
return rbuf;
}
static void
nxt_unit_read_buf_release(nxt_unit_ctx_t *ctx,
nxt_unit_read_buf_t *rbuf)
{
nxt_unit_ctx_impl_t *ctx_impl;
ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
pthread_mutex_lock(&ctx_impl->mutex);
rbuf->next = ctx_impl->free_read_buf;
ctx_impl->free_read_buf = rbuf;
pthread_mutex_unlock(&ctx_impl->mutex);
}
@@ -2099,9 +2235,22 @@ nxt_unit_buf_min(void)
int
nxt_unit_response_write(nxt_unit_request_info_t *req, const void *start,
size_t size)
{
ssize_t res;
res = nxt_unit_response_write_nb(req, start, size, size);
return res < 0 ? -res : NXT_UNIT_OK;
}
ssize_t
nxt_unit_response_write_nb(nxt_unit_request_info_t *req, const void *start,
size_t size, size_t min_size)
{
int rc;
uint32_t part_size;
ssize_t sent;
uint32_t part_size, min_part_size, buf_size;
const char *part_start;
nxt_unit_mmap_buf_t mmap_buf;
nxt_unit_request_info_impl_t *req_impl;
@@ -2110,58 +2259,70 @@ nxt_unit_response_write(nxt_unit_request_info_t *req, const void *start,
req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
part_start = start;
sent = 0;
if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
nxt_unit_req_warn(req, "write: response not initialized yet");
return NXT_UNIT_ERROR;
return -NXT_UNIT_ERROR;
}
/* Check if response is not send yet. */
if (nxt_slow_path(req->response_buf)) {
if (nxt_slow_path(req->response_buf != NULL)) {
part_size = req->response_buf->end - req->response_buf->free;
part_size = nxt_min(size, part_size);
rc = nxt_unit_response_add_content(req, part_start, part_size);
if (nxt_slow_path(rc != NXT_UNIT_OK)) {
return rc;
return -rc;
}
rc = nxt_unit_response_send(req);
if (nxt_slow_path(rc != NXT_UNIT_OK)) {
return rc;
return -rc;
}
size -= part_size;
part_start += part_size;
sent += part_size;
min_size -= nxt_min(min_size, part_size);
}
while (size > 0) {
part_size = nxt_min(size, PORT_MMAP_DATA_SIZE);
min_part_size = nxt_min(min_size, part_size);
min_part_size = nxt_min(min_part_size, PORT_MMAP_CHUNK_SIZE);
rc = nxt_unit_get_outgoing_buf(req->ctx, req_impl->process,
&req->response_port, part_size,
&mmap_buf, local_buf);
min_part_size, &mmap_buf, local_buf);
if (nxt_slow_path(rc != NXT_UNIT_OK)) {
return rc;
return -rc;
}
buf_size = mmap_buf.buf.end - mmap_buf.buf.free;
if (nxt_slow_path(buf_size == 0)) {
return sent;
}
part_size = nxt_min(buf_size, part_size);
mmap_buf.buf.free = nxt_cpymem(mmap_buf.buf.free,
part_start, part_size);
rc = nxt_unit_mmap_buf_send(req->ctx, req_impl->stream, &mmap_buf, 0);
nxt_unit_free_outgoing_buf(&mmap_buf);
if (nxt_slow_path(rc != NXT_UNIT_OK)) {
return rc;
return -rc;
}
size -= part_size;
part_start += part_size;
sent += part_size;
min_size -= nxt_min(min_size, part_size);
}
return NXT_UNIT_OK;
return sent;
}
@@ -2171,6 +2332,7 @@ nxt_unit_response_write_cb(nxt_unit_request_info_t *req,
{
int rc;
ssize_t n;
uint32_t buf_size;
nxt_unit_buf_t *buf;
nxt_unit_mmap_buf_t mmap_buf;
nxt_unit_request_info_impl_t *req_impl;
@@ -2224,10 +2386,11 @@ nxt_unit_response_write_cb(nxt_unit_request_info_t *req,
nxt_unit_req_debug(req, "write_cb, alloc %"PRIu32"",
read_info->buf_size);
buf_size = nxt_min(read_info->buf_size, PORT_MMAP_DATA_SIZE);
rc = nxt_unit_get_outgoing_buf(req->ctx, req_impl->process,
&req->response_port,
nxt_min(read_info->buf_size,
PORT_MMAP_DATA_SIZE),
buf_size, buf_size,
&mmap_buf, local_buf);
if (nxt_slow_path(rc != NXT_UNIT_OK)) {
return rc;
@@ -2249,9 +2412,6 @@ nxt_unit_response_write_cb(nxt_unit_request_info_t *req,
}
rc = nxt_unit_mmap_buf_send(req->ctx, req_impl->stream, &mmap_buf, 0);
nxt_unit_free_outgoing_buf(&mmap_buf);
if (nxt_slow_path(rc != NXT_UNIT_OK)) {
nxt_unit_req_error(req, "Failed to send content");
@@ -2398,7 +2558,7 @@ nxt_unit_websocket_sendv(nxt_unit_request_info_t *req, uint8_t opcode,
{
int i, rc;
size_t l, copy;
uint32_t payload_len, buf_size;
uint32_t payload_len, buf_size, alloc_size;
const uint8_t *b;
nxt_unit_buf_t *buf;
nxt_unit_mmap_buf_t mmap_buf;
@@ -2415,10 +2575,11 @@ nxt_unit_websocket_sendv(nxt_unit_request_info_t *req, uint8_t opcode,
}
buf_size = 10 + payload_len;
alloc_size = nxt_min(buf_size, PORT_MMAP_DATA_SIZE);
rc = nxt_unit_get_outgoing_buf(req->ctx, req_impl->process,
&req->response_port,
nxt_min(buf_size, PORT_MMAP_DATA_SIZE),
alloc_size, alloc_size,
&mmap_buf, local_buf);
if (nxt_slow_path(rc != NXT_UNIT_OK)) {
return rc;
@@ -2454,17 +2615,16 @@ nxt_unit_websocket_sendv(nxt_unit_request_info_t *req, uint8_t opcode,
rc = nxt_unit_mmap_buf_send(req->ctx, req_impl->stream,
&mmap_buf, 0);
nxt_unit_free_outgoing_buf(&mmap_buf);
if (nxt_slow_path(rc != NXT_UNIT_OK)) {
return rc;
}
}
alloc_size = nxt_min(buf_size, PORT_MMAP_DATA_SIZE);
rc = nxt_unit_get_outgoing_buf(req->ctx, req_impl->process,
&req->response_port,
nxt_min(buf_size,
PORT_MMAP_DATA_SIZE),
alloc_size, alloc_size,
&mmap_buf, local_buf);
if (nxt_slow_path(rc != NXT_UNIT_OK)) {
return rc;
@@ -2478,8 +2638,6 @@ nxt_unit_websocket_sendv(nxt_unit_request_info_t *req, uint8_t opcode,
if (buf->free > buf->start) {
rc = nxt_unit_mmap_buf_send(req->ctx, req_impl->stream,
&mmap_buf, 0);
nxt_unit_free_outgoing_buf(&mmap_buf);
}
return rc;
@@ -2553,15 +2711,23 @@ nxt_unit_websocket_done(nxt_unit_websocket_frame_t *ws)
static nxt_port_mmap_header_t *
nxt_unit_mmap_get(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process,
nxt_unit_port_id_t *port_id, nxt_chunk_id_t *c, int n)
nxt_unit_port_id_t *port_id, nxt_chunk_id_t *c, int *n, int min_n)
{
int res, nchunks, i;
uint32_t outgoing_size;
nxt_unit_mmap_t *mm, *mm_end;
nxt_unit_impl_t *lib;
nxt_port_mmap_header_t *hdr;
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
pthread_mutex_lock(&process->outgoing.mutex);
mm_end = process->outgoing.elts + process->outgoing.size;
retry:
outgoing_size = process->outgoing.size;
mm_end = process->outgoing.elts + outgoing_size;
for (mm = process->outgoing.elts; mm < mm_end; mm++) {
hdr = mm->hdr;
@@ -2575,11 +2741,17 @@ nxt_unit_mmap_get(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process,
while (nxt_port_mmap_get_free_chunk(hdr->free_map, c)) {
nchunks = 1;
while (nchunks < n) {
while (nchunks < *n) {
res = nxt_port_mmap_chk_set_chunk_busy(hdr->free_map,
*c + nchunks);
if (res == 0) {
if (nchunks >= min_n) {
*n = nchunks;
goto unlock;
}
for (i = 0; i < nchunks; i++) {
nxt_port_mmap_set_chunk_free(hdr->free_map, *c + i);
}
@@ -2592,23 +2764,155 @@ nxt_unit_mmap_get(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process,
nchunks++;
}
if (nchunks == n) {
if (nchunks >= min_n) {
*n = nchunks;
goto unlock;
}
}
hdr->oosm = 1;
}
if (outgoing_size >= lib->shm_mmap_limit) {
/* Cannot allocate more shared memory. */
pthread_mutex_unlock(&process->outgoing.mutex);
if (min_n == 0) {
*n = 0;
}
if (nxt_slow_path(process->outgoing.allocated_chunks + min_n
>= lib->shm_mmap_limit * PORT_MMAP_CHUNK_COUNT))
{
/* Memory allocated by application, but not send to router. */
return NULL;
}
/* Notify router about OOSM condition. */
res = nxt_unit_send_oosm(ctx, port_id);
if (nxt_slow_path(res != NXT_UNIT_OK)) {
return NULL;
}
/* Return if caller can handle OOSM condition. Non-blocking mode. */
if (min_n == 0) {
return NULL;
}
nxt_unit_debug(ctx, "oosm: waiting for ACK");
res = nxt_unit_wait_shm_ack(ctx);
if (nxt_slow_path(res != NXT_UNIT_OK)) {
return NULL;
}
nxt_unit_debug(ctx, "oosm: retry");
pthread_mutex_lock(&process->outgoing.mutex);
goto retry;
}
*c = 0;
hdr = nxt_unit_new_mmap(ctx, process, port_id, n);
hdr = nxt_unit_new_mmap(ctx, process, port_id, *n);
unlock:
nxt_atomic_fetch_add(&process->outgoing.allocated_chunks, *n);
nxt_unit_debug(ctx, "process %d allocated_chunks %d",
process->pid,
process->outgoing.allocated_chunks);
pthread_mutex_unlock(&process->outgoing.mutex);
return hdr;
}
static int
nxt_unit_send_oosm(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id)
{
ssize_t res;
nxt_port_msg_t msg;
nxt_unit_impl_t *lib;
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
msg.stream = 0;
msg.pid = lib->pid;
msg.reply_port = 0;
msg.type = _NXT_PORT_MSG_OOSM;
msg.last = 0;
msg.mmap = 0;
msg.nf = 0;
msg.mf = 0;
msg.tracking = 0;
res = lib->callbacks.port_send(ctx, port_id, &msg, sizeof(msg), NULL, 0);
if (nxt_slow_path(res != sizeof(msg))) {
nxt_unit_warn(ctx, "failed to send oosm to %d: %s (%d)",
(int) port_id->pid, strerror(errno), errno);
return NXT_UNIT_ERROR;
}
return NXT_UNIT_OK;
}
static int
nxt_unit_wait_shm_ack(nxt_unit_ctx_t *ctx)
{
nxt_port_msg_t *port_msg;
nxt_unit_ctx_impl_t *ctx_impl;
nxt_unit_read_buf_t *rbuf;
ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
while (1) {
rbuf = nxt_unit_read_buf_get(ctx);
if (nxt_slow_path(rbuf == NULL)) {
return NXT_UNIT_ERROR;
}
nxt_unit_read_buf(ctx, rbuf);
if (nxt_slow_path(rbuf->size < (ssize_t) sizeof(nxt_port_msg_t))) {
nxt_unit_read_buf_release(ctx, rbuf);
return NXT_UNIT_ERROR;
}
port_msg = (nxt_port_msg_t *) rbuf->buf;
if (port_msg->type == _NXT_PORT_MSG_SHM_ACK) {
nxt_unit_read_buf_release(ctx, rbuf);
break;
}
pthread_mutex_lock(&ctx_impl->mutex);
*ctx_impl->pending_read_tail = rbuf;
ctx_impl->pending_read_tail = &rbuf->next;
rbuf->next = NULL;
pthread_mutex_unlock(&ctx_impl->mutex);
if (port_msg->type == _NXT_PORT_MSG_QUIT) {
nxt_unit_debug(ctx, "oosm: quit received");
return NXT_UNIT_ERROR;
}
}
return NXT_UNIT_OK;
}
static nxt_unit_mmap_t *
nxt_unit_mmap_at(nxt_unit_mmaps_t *mmaps, uint32_t i)
{
@@ -2843,10 +3147,10 @@ nxt_unit_send_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, int fd)
static int
nxt_unit_get_outgoing_buf(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process,
nxt_unit_port_id_t *port_id, uint32_t size,
nxt_unit_port_id_t *port_id, uint32_t size, uint32_t min_size,
nxt_unit_mmap_buf_t *mmap_buf, char *local_buf)
{
uint32_t nchunks;
int nchunks, min_nchunks;
nxt_chunk_id_t c;
nxt_port_mmap_header_t *hdr;
@@ -2869,6 +3173,7 @@ nxt_unit_get_outgoing_buf(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process,
mmap_buf->buf.free = mmap_buf->buf.start;
mmap_buf->buf.end = mmap_buf->buf.start + size;
mmap_buf->port_id = *port_id;
mmap_buf->process = process;
nxt_unit_debug(ctx, "outgoing plain buffer allocation: (%p, %d)",
mmap_buf->buf.start, (int) size);
@@ -2877,9 +3182,20 @@ nxt_unit_get_outgoing_buf(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process,
}
nchunks = (size + PORT_MMAP_CHUNK_SIZE - 1) / PORT_MMAP_CHUNK_SIZE;
min_nchunks = (min_size + PORT_MMAP_CHUNK_SIZE - 1) / PORT_MMAP_CHUNK_SIZE;
hdr = nxt_unit_mmap_get(ctx, process, port_id, &c, nchunks);
hdr = nxt_unit_mmap_get(ctx, process, port_id, &c, &nchunks, min_nchunks);
if (nxt_slow_path(hdr == NULL)) {
if (nxt_fast_path(min_nchunks == 0 && nchunks == 0)) {
mmap_buf->hdr = NULL;
mmap_buf->buf.start = NULL;
mmap_buf->buf.free = NULL;
mmap_buf->buf.end = NULL;
mmap_buf->free_ptr = NULL;
return NXT_UNIT_OK;
}
return NXT_UNIT_ERROR;
}
@@ -2888,6 +3204,7 @@ nxt_unit_get_outgoing_buf(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process,
mmap_buf->buf.free = mmap_buf->buf.start;
mmap_buf->buf.end = mmap_buf->buf.start + nchunks * PORT_MMAP_CHUNK_SIZE;
mmap_buf->port_id = *port_id;
mmap_buf->process = process;
mmap_buf->free_ptr = NULL;
nxt_unit_debug(ctx, "outgoing mmap allocation: (%d,%d,%d)",
@@ -2991,6 +3308,7 @@ nxt_unit_mmaps_init(nxt_unit_mmaps_t *mmaps)
mmaps->size = 0;
mmaps->cap = 0;
mmaps->elts = NULL;
mmaps->allocated_chunks = 0;
}
@@ -3174,6 +3492,7 @@ nxt_unit_mmap_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
b->buf.free = start;
b->buf.end = b->buf.start + size;
b->hdr = hdr;
b->process = process;
b = b->next;
@@ -3191,23 +3510,79 @@ nxt_unit_mmap_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
}
static int
nxt_unit_mmap_release(nxt_port_mmap_header_t *hdr, void *start, uint32_t size)
static void
nxt_unit_mmap_release(nxt_unit_ctx_t *ctx,
nxt_unit_process_t *process, nxt_port_mmap_header_t *hdr,
void *start, uint32_t size)
{
u_char *p, *end;
nxt_chunk_id_t c;
int freed_chunks;
u_char *p, *end;
nxt_chunk_id_t c;
nxt_unit_impl_t *lib;
memset(start, 0xA5, size);
p = start;
end = p + size;
c = nxt_port_mmap_chunk_id(hdr, p);
freed_chunks = 0;
while (p < end) {
nxt_port_mmap_set_chunk_free(hdr->free_map, c);
p += PORT_MMAP_CHUNK_SIZE;
c++;
freed_chunks++;
}
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
if (hdr->src_pid == lib->pid && freed_chunks != 0) {
nxt_atomic_fetch_add(&process->outgoing.allocated_chunks,
-freed_chunks);
nxt_unit_debug(ctx, "process %d allocated_chunks %d",
process->pid,
process->outgoing.allocated_chunks);
}
if (hdr->dst_pid == lib->pid
&& freed_chunks != 0
&& nxt_atomic_cmp_set(&hdr->oosm, 1, 0))
{
nxt_unit_send_shm_ack(ctx, hdr->src_pid);
}
}
static int
nxt_unit_send_shm_ack(nxt_unit_ctx_t *ctx, pid_t pid)
{
ssize_t res;
nxt_port_msg_t msg;
nxt_unit_impl_t *lib;
nxt_unit_port_id_t port_id;
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
nxt_unit_port_id_init(&port_id, pid, 0);
msg.stream = 0;
msg.pid = lib->pid;
msg.reply_port = 0;
msg.type = _NXT_PORT_MSG_SHM_ACK;
msg.last = 0;
msg.mmap = 0;
msg.nf = 0;
msg.mf = 0;
msg.tracking = 0;
res = lib->callbacks.port_send(ctx, &port_id, &msg, sizeof(msg), NULL, 0);
if (nxt_slow_path(res != sizeof(msg))) {
nxt_unit_warn(ctx, "failed to send ack to %d: %s (%d)",
(int) port_id.pid, strerror(errno), errno);
return NXT_UNIT_ERROR;
}
return NXT_UNIT_OK;
@@ -3369,43 +3744,76 @@ int
nxt_unit_run_once(nxt_unit_ctx_t *ctx)
{
int rc;
char buf[4096];
char oob[256];
ssize_t rsize;
nxt_unit_impl_t *lib;
nxt_unit_ctx_impl_t *ctx_impl;
nxt_unit_read_buf_t *rbuf;
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
memset(oob, 0, sizeof(struct cmsghdr));
pthread_mutex_lock(&ctx_impl->mutex);
if (ctx_impl->pending_read_head != NULL) {
rbuf = ctx_impl->pending_read_head;
ctx_impl->pending_read_head = rbuf->next;
if (ctx_impl->pending_read_tail == &rbuf->next) {
ctx_impl->pending_read_tail = &ctx_impl->pending_read_head;
}
pthread_mutex_unlock(&ctx_impl->mutex);
if (ctx_impl->read_port_fd != -1) {
rsize = nxt_unit_port_recv(ctx, ctx_impl->read_port_fd,
buf, sizeof(buf),
oob, sizeof(oob));
} else {
rsize = lib->callbacks.port_recv(ctx, &ctx_impl->read_port_id,
buf, sizeof(buf),
oob, sizeof(oob));
rbuf = nxt_unit_read_buf_get_impl(ctx_impl);
if (nxt_slow_path(rbuf == NULL)) {
return NXT_UNIT_ERROR;
}
nxt_unit_read_buf(ctx, rbuf);
}
if (nxt_fast_path(rsize > 0)) {
rc = nxt_unit_process_msg(ctx, &ctx_impl->read_port_id, buf, rsize,
oob, sizeof(oob));
if (nxt_fast_path(rbuf->size > 0)) {
rc = nxt_unit_process_msg(ctx, &ctx_impl->read_port_id,
rbuf->buf, rbuf->size,
rbuf->oob, sizeof(rbuf->oob));
#if (NXT_DEBUG)
memset(buf, 0xAC, rsize);
memset(rbuf->buf, 0xAC, rbuf->size);
#endif
} else {
rc = NXT_UNIT_ERROR;
}
nxt_unit_read_buf_release(ctx, rbuf);
return rc;
}
static void
nxt_unit_read_buf(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf)
{
nxt_unit_impl_t *lib;
nxt_unit_ctx_impl_t *ctx_impl;
ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
memset(rbuf->oob, 0, sizeof(struct cmsghdr));
if (ctx_impl->read_port_fd != -1) {
rbuf->size = nxt_unit_port_recv(ctx, ctx_impl->read_port_fd,
rbuf->buf, sizeof(rbuf->buf),
rbuf->oob, sizeof(rbuf->oob));
} else {
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
rbuf->size = lib->callbacks.port_recv(ctx, &ctx_impl->read_port_id,
rbuf->buf, sizeof(rbuf->buf),
rbuf->oob, sizeof(rbuf->oob));
}
}
void
nxt_unit_done(nxt_unit_ctx_t *ctx)
{

View File

@@ -137,6 +137,9 @@ struct nxt_unit_callbacks_s {
/* Gracefully quit the application. Optional. */
void (*quit)(nxt_unit_ctx_t *);
/* Shared memory release acknowledgement. */
void (*shm_ack_handler)(nxt_unit_ctx_t *);
/* Send data and control to process pid using port id. Optional. */
ssize_t (*port_send)(nxt_unit_ctx_t *, nxt_unit_port_id_t *port_id,
const void *buf, size_t buf_size,
@@ -323,6 +326,9 @@ uint32_t nxt_unit_buf_min(void);
int nxt_unit_response_write(nxt_unit_request_info_t *req, const void *start,
size_t size);
ssize_t nxt_unit_response_write_nb(nxt_unit_request_info_t *req,
const void *start, size_t size, size_t min_size);
int nxt_unit_response_write_cb(nxt_unit_request_info_t *req,
nxt_unit_read_info_t *read_info);