Added basic port error handler.
This commit is contained in:
@@ -223,7 +223,7 @@ nxt_master_process_port_create(nxt_task_t *task, nxt_runtime_t *rt)
|
|||||||
return NXT_ERROR;
|
return NXT_ERROR;
|
||||||
}
|
}
|
||||||
|
|
||||||
port = nxt_port_new(0, nxt_pid, NXT_PROCESS_MASTER);
|
port = nxt_port_new(task, 0, nxt_pid, NXT_PROCESS_MASTER);
|
||||||
if (nxt_slow_path(port == NULL)) {
|
if (nxt_slow_path(port == NULL)) {
|
||||||
return NXT_ERROR;
|
return NXT_ERROR;
|
||||||
}
|
}
|
||||||
@@ -404,7 +404,7 @@ nxt_master_create_worker_process(nxt_task_t *task, nxt_runtime_t *rt,
|
|||||||
|
|
||||||
process->init = init;
|
process->init = init;
|
||||||
|
|
||||||
port = nxt_port_new(0, 0, init->type);
|
port = nxt_port_new(task, 0, 0, init->type);
|
||||||
if (nxt_slow_path(port == NULL)) {
|
if (nxt_slow_path(port == NULL)) {
|
||||||
nxt_runtime_process_remove(rt, process);
|
nxt_runtime_process_remove(rt, process);
|
||||||
return NXT_ERROR;
|
return NXT_ERROR;
|
||||||
|
|||||||
@@ -952,7 +952,9 @@ nxt_mp_retain(nxt_mp_t *mp, size_t size)
|
|||||||
uint32_t
|
uint32_t
|
||||||
nxt_mp_release(nxt_mp_t *mp, void *p)
|
nxt_mp_release(nxt_mp_t *mp, void *p)
|
||||||
{
|
{
|
||||||
|
if (nxt_fast_path(p != NULL)) {
|
||||||
nxt_mp_free(mp, p);
|
nxt_mp_free(mp, p);
|
||||||
|
}
|
||||||
|
|
||||||
mp->retain--;
|
mp->retain--;
|
||||||
|
|
||||||
|
|||||||
@@ -15,8 +15,32 @@ static void nxt_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
|
|||||||
static nxt_atomic_uint_t nxt_port_last_id = 1;
|
static nxt_atomic_uint_t nxt_port_last_id = 1;
|
||||||
|
|
||||||
|
|
||||||
|
static void
|
||||||
|
nxt_port_mp_cleanup(nxt_task_t *task, void *obj, void *data)
|
||||||
|
{
|
||||||
|
nxt_mp_t *mp;
|
||||||
|
nxt_port_t *port;
|
||||||
|
|
||||||
|
port = obj;
|
||||||
|
mp = data;
|
||||||
|
|
||||||
|
nxt_assert(port->pair[0] == -1);
|
||||||
|
nxt_assert(port->pair[1] == -1);
|
||||||
|
|
||||||
|
nxt_assert(port->app_req_id == 0);
|
||||||
|
nxt_assert(port->app_link.next == NULL);
|
||||||
|
|
||||||
|
nxt_assert(nxt_queue_is_empty(&port->messages));
|
||||||
|
nxt_assert(nxt_lvlhsh_is_empty(&port->rpc_streams));
|
||||||
|
nxt_assert(nxt_lvlhsh_is_empty(&port->rpc_peers));
|
||||||
|
|
||||||
|
nxt_mp_free(mp, port);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
nxt_port_t *
|
nxt_port_t *
|
||||||
nxt_port_new(nxt_port_id_t id, nxt_pid_t pid, nxt_process_type_t type)
|
nxt_port_new(nxt_task_t *task, nxt_port_id_t id, nxt_pid_t pid,
|
||||||
|
nxt_process_type_t type)
|
||||||
{
|
{
|
||||||
nxt_mp_t *mp;
|
nxt_mp_t *mp;
|
||||||
nxt_port_t *port;
|
nxt_port_t *port;
|
||||||
@@ -36,6 +60,8 @@ nxt_port_new(nxt_port_id_t id, nxt_pid_t pid, nxt_process_type_t type)
|
|||||||
port->mem_pool = mp;
|
port->mem_pool = mp;
|
||||||
port->next_stream = 1;
|
port->next_stream = 1;
|
||||||
|
|
||||||
|
nxt_mp_cleanup(mp, nxt_port_mp_cleanup, task, port, mp);
|
||||||
|
|
||||||
nxt_queue_init(&port->messages);
|
nxt_queue_init(&port->messages);
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
@@ -74,7 +100,7 @@ nxt_port_release(nxt_port_t *port)
|
|||||||
nxt_process_port_remove(port);
|
nxt_process_port_remove(port);
|
||||||
}
|
}
|
||||||
|
|
||||||
nxt_mp_release(port->mem_pool, port);
|
nxt_mp_release(port->mem_pool, NULL);
|
||||||
|
|
||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
@@ -222,7 +248,7 @@ nxt_port_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
port = nxt_port_new(new_port_msg->id, new_port_msg->pid,
|
port = nxt_port_new(task, new_port_msg->id, new_port_msg->pid,
|
||||||
new_port_msg->type);
|
new_port_msg->type);
|
||||||
if (nxt_slow_path(port == NULL)) {
|
if (nxt_slow_path(port == NULL)) {
|
||||||
return;
|
return;
|
||||||
|
|||||||
@@ -118,6 +118,9 @@ struct nxt_port_s {
|
|||||||
|
|
||||||
nxt_process_type_t type;
|
nxt_process_type_t type;
|
||||||
nxt_work_t work;
|
nxt_work_t work;
|
||||||
|
|
||||||
|
struct iovec *iov;
|
||||||
|
void *mmsg_buf;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
||||||
@@ -140,7 +143,7 @@ typedef union {
|
|||||||
} nxt_port_data_t;
|
} nxt_port_data_t;
|
||||||
|
|
||||||
|
|
||||||
nxt_port_t *nxt_port_new(nxt_port_id_t id, nxt_pid_t pid,
|
nxt_port_t *nxt_port_new(nxt_task_t *task, nxt_port_id_t id, nxt_pid_t pid,
|
||||||
nxt_process_type_t type);
|
nxt_process_type_t type);
|
||||||
nxt_bool_t nxt_port_release(nxt_port_t *port);
|
nxt_bool_t nxt_port_release(nxt_port_t *port);
|
||||||
|
|
||||||
|
|||||||
@@ -568,7 +568,7 @@ nxt_port_mmap_write(nxt_task_t *task, nxt_port_t *port,
|
|||||||
nxt_port_send_msg_t *msg, nxt_sendbuf_coalesce_t *sb)
|
nxt_port_send_msg_t *msg, nxt_sendbuf_coalesce_t *sb)
|
||||||
{
|
{
|
||||||
size_t bsize;
|
size_t bsize;
|
||||||
nxt_buf_t *b, *bmem;
|
nxt_buf_t *bmem;
|
||||||
nxt_uint_t i;
|
nxt_uint_t i;
|
||||||
nxt_port_mmap_msg_t *mmap_msg;
|
nxt_port_mmap_msg_t *mmap_msg;
|
||||||
nxt_port_mmap_header_t *hdr;
|
nxt_port_mmap_header_t *hdr;
|
||||||
@@ -577,13 +577,8 @@ nxt_port_mmap_write(nxt_task_t *task, nxt_port_t *port,
|
|||||||
"via shared memory", sb->size, port->pid);
|
"via shared memory", sb->size, port->pid);
|
||||||
|
|
||||||
bsize = sb->niov * sizeof(nxt_port_mmap_msg_t);
|
bsize = sb->niov * sizeof(nxt_port_mmap_msg_t);
|
||||||
|
mmap_msg = port->mmsg_buf;
|
||||||
|
|
||||||
b = nxt_buf_mem_ts_alloc(task, port->mem_pool, bsize);
|
|
||||||
if (nxt_slow_path(b == NULL)) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
mmap_msg = (nxt_port_mmap_msg_t *) b->mem.start;
|
|
||||||
bmem = msg->buf;
|
bmem = msg->buf;
|
||||||
|
|
||||||
for (i = 0; i < sb->niov; i++, mmap_msg++) {
|
for (i = 0; i < sb->niov; i++, mmap_msg++) {
|
||||||
@@ -611,10 +606,7 @@ nxt_port_mmap_write(nxt_task_t *task, nxt_port_t *port,
|
|||||||
port->pid);
|
port->pid);
|
||||||
}
|
}
|
||||||
|
|
||||||
msg->buf = b;
|
sb->iobuf[0].iov_base = port->mmsg_buf;
|
||||||
b->mem.free += bsize;
|
|
||||||
|
|
||||||
sb->iobuf[0].iov_base = b->mem.pos;
|
|
||||||
sb->iobuf[0].iov_len = bsize;
|
sb->iobuf[0].iov_len = bsize;
|
||||||
sb->niov = 1;
|
sb->niov = 1;
|
||||||
sb->size = bsize;
|
sb->size = bsize;
|
||||||
|
|||||||
@@ -38,11 +38,6 @@ void
|
|||||||
nxt_port_mmap_write(nxt_task_t *task, nxt_port_t *port,
|
nxt_port_mmap_write(nxt_task_t *task, nxt_port_t *port,
|
||||||
nxt_port_send_msg_t *msg, nxt_sendbuf_coalesce_t *sb);
|
nxt_port_send_msg_t *msg, nxt_sendbuf_coalesce_t *sb);
|
||||||
|
|
||||||
nxt_inline void
|
|
||||||
nxt_port_mmap_completion(nxt_task_t *task, nxt_work_queue_t *wq, nxt_buf_t *b) {
|
|
||||||
nxt_work_queue_add(wq, b->completion_handler, task, b, b->parent);
|
|
||||||
}
|
|
||||||
|
|
||||||
void
|
void
|
||||||
nxt_port_mmap_read(nxt_task_t *task, nxt_port_t *port,
|
nxt_port_mmap_read(nxt_task_t *task, nxt_port_t *port,
|
||||||
nxt_port_recv_msg_t *msg);
|
nxt_port_recv_msg_t *msg);
|
||||||
|
|||||||
@@ -114,6 +114,13 @@ nxt_port_write_enable(nxt_task_t *task, nxt_port_t *port)
|
|||||||
port->socket.write_work_queue = &port->engine->fast_work_queue;
|
port->socket.write_work_queue = &port->engine->fast_work_queue;
|
||||||
port->socket.write_handler = nxt_port_write_handler;
|
port->socket.write_handler = nxt_port_write_handler;
|
||||||
port->socket.error_handler = nxt_port_error_handler;
|
port->socket.error_handler = nxt_port_error_handler;
|
||||||
|
|
||||||
|
if (port->iov == NULL) {
|
||||||
|
port->iov = nxt_mp_get(port->mem_pool, sizeof(struct iovec) *
|
||||||
|
NXT_IOBUF_MAX * 10);
|
||||||
|
port->mmsg_buf = nxt_mp_get(port->mem_pool, sizeof(uint32_t) * 3 *
|
||||||
|
NXT_IOBUF_MAX * 10);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@@ -224,20 +231,20 @@ nxt_port_socket_write(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type,
|
|||||||
static void
|
static void
|
||||||
nxt_port_write_handler(nxt_task_t *task, void *obj, void *data)
|
nxt_port_write_handler(nxt_task_t *task, void *obj, void *data)
|
||||||
{
|
{
|
||||||
|
size_t plain_size;
|
||||||
ssize_t n;
|
ssize_t n;
|
||||||
nxt_port_t *port;
|
nxt_port_t *port;
|
||||||
struct iovec iov[NXT_IOBUF_MAX * 10];
|
struct iovec *iov;
|
||||||
nxt_work_queue_t *wq;
|
nxt_work_queue_t *wq;
|
||||||
nxt_queue_link_t *link;
|
nxt_queue_link_t *link;
|
||||||
nxt_port_method_t m;
|
nxt_port_method_t m;
|
||||||
nxt_port_send_msg_t *msg;
|
nxt_port_send_msg_t *msg;
|
||||||
nxt_sendbuf_coalesce_t sb;
|
nxt_sendbuf_coalesce_t sb;
|
||||||
|
|
||||||
size_t plain_size;
|
|
||||||
nxt_buf_t *plain_buf;
|
|
||||||
|
|
||||||
port = nxt_container_of(obj, nxt_port_t, socket);
|
port = nxt_container_of(obj, nxt_port_t, socket);
|
||||||
|
|
||||||
|
iov = port->iov;
|
||||||
|
|
||||||
do {
|
do {
|
||||||
link = nxt_queue_first(&port->messages);
|
link = nxt_queue_first(&port->messages);
|
||||||
|
|
||||||
@@ -269,7 +276,6 @@ nxt_port_write_handler(nxt_task_t *task, void *obj, void *data)
|
|||||||
nxt_sendbuf_mem_coalesce(task, &sb);
|
nxt_sendbuf_mem_coalesce(task, &sb);
|
||||||
|
|
||||||
plain_size = sb.size;
|
plain_size = sb.size;
|
||||||
plain_buf = msg->buf;
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Send through mmap enabled only when payload
|
* Send through mmap enabled only when payload
|
||||||
@@ -302,16 +308,6 @@ nxt_port_write_handler(nxt_task_t *task, void *obj, void *data)
|
|||||||
|
|
||||||
wq = &task->thread->engine->fast_work_queue;
|
wq = &task->thread->engine->fast_work_queue;
|
||||||
|
|
||||||
if (msg->buf != plain_buf) {
|
|
||||||
/*
|
|
||||||
* Complete crafted mmap_msgs buf and restore msg->buf
|
|
||||||
* for regular completion call.
|
|
||||||
*/
|
|
||||||
nxt_port_mmap_completion(task, wq, msg->buf);
|
|
||||||
|
|
||||||
msg->buf = plain_buf;
|
|
||||||
}
|
|
||||||
|
|
||||||
msg->buf = nxt_sendbuf_completion(task, wq, msg->buf, plain_size);
|
msg->buf = nxt_sendbuf_completion(task, wq, msg->buf, plain_size);
|
||||||
|
|
||||||
if (msg->buf != NULL) {
|
if (msg->buf != NULL) {
|
||||||
@@ -330,7 +326,8 @@ nxt_port_write_handler(nxt_task_t *task, void *obj, void *data)
|
|||||||
|
|
||||||
} else {
|
} else {
|
||||||
nxt_queue_remove(link);
|
nxt_queue_remove(link);
|
||||||
nxt_port_release_send_msg(task, msg, msg->engine);
|
nxt_work_queue_add(wq, nxt_port_release_send_msg, task, msg,
|
||||||
|
msg->engine);
|
||||||
}
|
}
|
||||||
|
|
||||||
} else if (nxt_slow_path(n == NXT_ERROR)) {
|
} else if (nxt_slow_path(n == NXT_ERROR)) {
|
||||||
@@ -536,6 +533,31 @@ nxt_port_buf_free(nxt_port_t *port, nxt_buf_t *b)
|
|||||||
static void
|
static void
|
||||||
nxt_port_error_handler(nxt_task_t *task, void *obj, void *data)
|
nxt_port_error_handler(nxt_task_t *task, void *obj, void *data)
|
||||||
{
|
{
|
||||||
|
nxt_buf_t *b;
|
||||||
|
nxt_port_t *port;
|
||||||
|
nxt_work_queue_t *wq;
|
||||||
|
nxt_port_send_msg_t *msg;
|
||||||
|
|
||||||
nxt_debug(task, "port error handler %p", obj);
|
nxt_debug(task, "port error handler %p", obj);
|
||||||
/* TODO */
|
/* TODO */
|
||||||
|
|
||||||
|
port = nxt_container_of(obj, nxt_port_t, socket);
|
||||||
|
|
||||||
|
nxt_queue_each(msg, &port->messages, nxt_port_send_msg_t, link) {
|
||||||
|
|
||||||
|
wq = &task->thread->engine->fast_work_queue;
|
||||||
|
|
||||||
|
for(b = msg->buf; b != NULL; b = b->next) {
|
||||||
|
if (nxt_buf_is_sync(b)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
nxt_work_queue_add(wq, b->completion_handler, task, b, b->parent);
|
||||||
|
}
|
||||||
|
|
||||||
|
nxt_queue_remove(&msg->link);
|
||||||
|
nxt_work_queue_add(wq, nxt_port_release_send_msg, task, msg,
|
||||||
|
msg->engine);
|
||||||
|
|
||||||
|
} nxt_queue_loop;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1386,7 +1386,8 @@ nxt_router_thread_start(void *data)
|
|||||||
|
|
||||||
engine->mem_pool = nxt_mp_create(4096, 128, 1024, 64);
|
engine->mem_pool = nxt_mp_create(4096, 128, 1024, 64);
|
||||||
|
|
||||||
port = nxt_port_new(nxt_port_get_next_id(), nxt_pid, NXT_PROCESS_ROUTER);
|
port = nxt_port_new(task, nxt_port_get_next_id(), nxt_pid,
|
||||||
|
NXT_PROCESS_ROUTER);
|
||||||
if (nxt_slow_path(port == NULL)) {
|
if (nxt_slow_path(port == NULL)) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user