Added bit flags to type parameter of nxt_port_socket_write().

NXT_PORT_MSG_LAST     - mark message as last;
NXT_PORT_MSG_CLOSE_FD - close fd right after send;

Type constants altered to include last flag for single buffer messages.

Last sign is critical for coming port RPC layer. Handlers unregistered on last
message. Create sync buffer is not convenient, extra parameter is better.
This commit is contained in:
Max Romanov
2017-08-02 13:10:48 +03:00
parent a7ef8481fc
commit 3812ffd336
9 changed files with 56 additions and 66 deletions

View File

@@ -189,8 +189,8 @@ nxt_go_ready()
port_msg.stream = atol(go_stream);
port_msg.pid = getpid();
port_msg.reply_port = 0;
port_msg.type = NXT_PORT_MSG_READY;
port_msg.last = 0;
port_msg.type = _NXT_PORT_MSG_READY;
port_msg.last = 1;
port_msg.mmap = 0;
nxt_go_master_send(&port_msg, sizeof(port_msg), NULL, 0);

View File

@@ -152,13 +152,13 @@ nxt_go_port_on_read(void *buf, size_t buf_size, void *oob, size_t oob_size)
}
switch (port_msg->type) {
case NXT_PORT_MSG_QUIT:
case _NXT_PORT_MSG_QUIT:
nxt_go_debug("quit");
nxt_go_set_quit();
break;
case NXT_PORT_MSG_NEW_PORT:
case _NXT_PORT_MSG_NEW_PORT:
nxt_go_debug("new port");
new_port_msg = payload;
@@ -166,22 +166,22 @@ nxt_go_port_on_read(void *buf, size_t buf_size, void *oob, size_t oob_size)
-1, fd);
break;
case NXT_PORT_MSG_CHANGE_FILE:
case _NXT_PORT_MSG_CHANGE_FILE:
nxt_go_debug("change file");
break;
case NXT_PORT_MSG_MMAP:
case _NXT_PORT_MSG_MMAP:
nxt_go_debug("mmap");
nxt_go_new_incoming_mmap(port_msg->pid, fd);
break;
case NXT_PORT_MSG_DATA:
case _NXT_PORT_MSG_DATA:
nxt_go_debug("data");
return nxt_go_data_handler(port_msg, buf_size);
case NXT_PORT_MSG_REMOVE_PID:
case _NXT_PORT_MSG_REMOVE_PID:
nxt_go_debug("remove pid");
/* TODO remove all ports for this pid in Go */

View File

@@ -112,8 +112,8 @@ nxt_go_new_port_mmap(nxt_go_process_t *process, nxt_port_id_t id)
port_msg.stream = 0;
port_msg.pid = getpid();
port_msg.reply_port = 0;
port_msg.type = NXT_PORT_MSG_MMAP;
port_msg.last = 0;
port_msg.type = _NXT_PORT_MSG_MMAP;
port_msg.last = 1;
port_msg.mmap = 0;
cmsg.cm.cmsg_len = CMSG_LEN(sizeof(int));

View File

@@ -165,7 +165,7 @@ nxt_go_ctx_init(nxt_go_run_ctx_t *ctx, nxt_port_msg_t *port_msg,
ctx->wport_msg.stream = port_msg->stream;
ctx->wport_msg.pid = getpid();
ctx->wport_msg.type = NXT_PORT_MSG_DATA;
ctx->wport_msg.type = _NXT_PORT_MSG_DATA;
ctx->wport_msg.mmap = 1;
ctx->wmmap_msg = (nxt_port_mmap_msg_t *) ( &ctx->wport_msg + 1 );

View File

@@ -862,7 +862,8 @@ nxt_controller_conf_pass(nxt_task_t *task, nxt_conf_value_t *conf)
b->mem.free = nxt_conf_json_print(b->mem.free, conf, NULL);
return nxt_port_socket_write(task, port, NXT_PORT_MSG_DATA, -1, 0, 0, b);
return nxt_port_socket_write(task, port, NXT_PORT_MSG_DATA_LAST, -1, 0,
0, b);
}

View File

@@ -9,15 +9,30 @@
typedef enum {
NXT_PORT_MSG_QUIT = 0,
NXT_PORT_MSG_NEW_PORT,
NXT_PORT_MSG_CHANGE_FILE,
NXT_PORT_MSG_MMAP,
NXT_PORT_MSG_DATA,
NXT_PORT_MSG_REMOVE_PID,
NXT_PORT_MSG_READY,
NXT_PORT_MSG_LAST = 0x100,
NXT_PORT_MSG_CLOSE_FD = 0x200,
NXT_PORT_MSG_MASK = 0xFF,
_NXT_PORT_MSG_QUIT = 0,
_NXT_PORT_MSG_NEW_PORT,
_NXT_PORT_MSG_CHANGE_FILE,
_NXT_PORT_MSG_MMAP,
_NXT_PORT_MSG_DATA,
_NXT_PORT_MSG_REMOVE_PID,
_NXT_PORT_MSG_READY,
NXT_PORT_MSG_MAX,
NXT_PORT_MSG_QUIT = _NXT_PORT_MSG_QUIT | NXT_PORT_MSG_LAST,
NXT_PORT_MSG_NEW_PORT = _NXT_PORT_MSG_NEW_PORT | NXT_PORT_MSG_LAST,
NXT_PORT_MSG_CHANGE_FILE = _NXT_PORT_MSG_CHANGE_FILE | NXT_PORT_MSG_LAST,
NXT_PORT_MSG_MMAP = _NXT_PORT_MSG_MMAP | NXT_PORT_MSG_LAST |
NXT_PORT_MSG_CLOSE_FD,
NXT_PORT_MSG_DATA = _NXT_PORT_MSG_DATA,
NXT_PORT_MSG_DATA_LAST = _NXT_PORT_MSG_DATA | NXT_PORT_MSG_LAST,
NXT_PORT_MSG_REMOVE_PID = _NXT_PORT_MSG_REMOVE_PID | NXT_PORT_MSG_LAST,
NXT_PORT_MSG_READY = _NXT_PORT_MSG_READY | NXT_PORT_MSG_LAST,
} nxt_port_msg_type_t;
@@ -27,7 +42,7 @@ typedef struct {
nxt_pid_t pid;
nxt_port_id_t reply_port;
nxt_port_msg_type_t type:8;
uint8_t type;
uint8_t last; /* 1 bit */
/* Message data send using mmap, next chunk is a nxt_port_mmap_msg_t. */
@@ -40,6 +55,7 @@ typedef struct {
nxt_buf_t *buf;
size_t share;
nxt_fd_t fd;
nxt_bool_t close_fd;
nxt_port_msg_t port_msg;
nxt_work_t work;

View File

@@ -198,37 +198,6 @@ fail:
}
static void
nxt_port_mmap_send_fd_buf_completion(nxt_task_t *task, void *obj, void *data)
{
nxt_fd_t fd;
nxt_buf_t *b;
nxt_mp_t *mp;
if (nxt_buf_ts_handle(task, obj, data)) {
return;
}
b = obj;
mp = b->data;
fd = (nxt_fd_t) (intptr_t) data;
#if (NXT_DEBUG)
if (nxt_slow_path(data != b->parent)) {
nxt_log_alert(task->log, "completion data (%p) != b->parent (%p)",
data, b->parent);
nxt_abort();
}
#endif
nxt_debug(task, "mmap fd %FD has been sent", fd);
nxt_fd_close(fd);
nxt_mp_release(mp, b);
}
static nxt_port_mmap_header_t *
nxt_port_new_port_mmap(nxt_task_t *task, nxt_process_t *process,
nxt_port_t *port)
@@ -236,7 +205,6 @@ nxt_port_new_port_mmap(nxt_task_t *task, nxt_process_t *process,
void *mem;
u_char *p, name[64];
nxt_fd_t fd;
nxt_buf_t *b;
nxt_port_mmap_t *port_mmap;
nxt_port_mmap_header_t *hdr;
@@ -310,14 +278,6 @@ nxt_port_new_port_mmap(nxt_task_t *task, nxt_process_t *process,
port_mmap->hdr = mem;
b = nxt_buf_mem_ts_alloc(task, port->mem_pool, 0);
if (nxt_slow_path(b == NULL)) {
goto remove_fail;
}
b->completion_handler = nxt_port_mmap_send_fd_buf_completion;
b->parent = (void *) (intptr_t) fd;
/* Init segment header. */
hdr = port_mmap->hdr;
@@ -336,7 +296,7 @@ nxt_port_new_port_mmap(nxt_task_t *task, nxt_process_t *process,
port->pid);
/* TODO handle error */
(void) nxt_port_socket_write(task, port, NXT_PORT_MSG_MMAP, fd, 0, 0, b);
(void) nxt_port_socket_write(task, port, NXT_PORT_MSG_MMAP, fd, 0, 0, NULL);
nxt_log(task, NXT_LOG_DEBUG, "new mmap #%D created for %PI -> %PI",
hdr->id, nxt_pid, process->pid);

View File

@@ -166,12 +166,17 @@ nxt_port_socket_write(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type,
if (msg->port_msg.stream == stream &&
msg->port_msg.reply_port == reply_port) {
nxt_assert(msg->port_msg.last == 0);
/*
* An fd is ignored since a file descriptor
* must be sent only in the first message of a stream.
*/
nxt_buf_chain_add(&msg->buf, b);
msg->port_msg.last |= (type & NXT_PORT_MSG_LAST) != 0;
return NXT_OK;
}
@@ -187,6 +192,7 @@ nxt_port_socket_write(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type,
msg->buf = b;
msg->fd = fd;
msg->close_fd = (type & NXT_PORT_MSG_CLOSE_FD) != 0;
msg->share = 0;
msg->work.next = NULL;
@@ -201,8 +207,8 @@ nxt_port_socket_write(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type,
msg->port_msg.stream = stream;
msg->port_msg.pid = nxt_pid;
msg->port_msg.reply_port = reply_port;
msg->port_msg.type = type;
msg->port_msg.last = 0;
msg->port_msg.type = type & NXT_PORT_MSG_MASK;
msg->port_msg.last = (type & NXT_PORT_MSG_LAST) != 0;
msg->port_msg.mmap = 0;
nxt_queue_insert_tail(&port->messages, &msg->link);
@@ -276,7 +282,7 @@ nxt_port_write_handler(nxt_task_t *task, void *obj, void *data)
m = NXT_PORT_METHOD_PLAIN;
}
msg->port_msg.last = sb.last;
msg->port_msg.last |= sb.last;
n = nxt_socketpair_send(&port->socket, msg->fd, iov, sb.niov + 1);
@@ -288,6 +294,12 @@ nxt_port_write_handler(nxt_task_t *task, void *obj, void *data)
goto fail;
}
if (msg->fd != -1 && msg->close_fd != 0) {
nxt_fd_close(msg->fd);
msg->fd = -1;
}
wq = &task->thread->engine->fast_work_queue;
if (msg->buf != plain_buf) {

View File

@@ -575,7 +575,7 @@ nxt_router_conf_send(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
b->parent = tmcf->mem_pool;
b->completion_handler = nxt_router_conf_buf_completion;
nxt_port_socket_write(task, tmcf->port, NXT_PORT_MSG_DATA, -1,
nxt_port_socket_write(task, tmcf->port, NXT_PORT_MSG_DATA_LAST, -1,
tmcf->stream, 0, b);
}
@@ -1977,7 +1977,8 @@ nxt_router_send_sw_request(nxt_task_t *task, void *obj, void *data)
*b->mem.free++ = '\0';
nxt_buf_cpystr(b, &app->conf);
nxt_port_socket_write(task, port, NXT_PORT_MSG_DATA, -1, sw->stream, 0, b);
nxt_port_socket_write(task, port, NXT_PORT_MSG_DATA_LAST, -1,
sw->stream, 0, b);
}