Fixing application process infinite loop.
Main process exiting before app process init may have caused hanging.
This commit is contained in:
@@ -138,6 +138,8 @@ func nxt_go_port_send(pid C.int, id C.int, buf unsafe.Pointer, buf_size C.int,
|
|||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
nxt_go_warn("write result %d (%d), %s", n, oobn, err)
|
nxt_go_warn("write result %d (%d), %s", n, oobn, err)
|
||||||
|
|
||||||
|
n = -1
|
||||||
}
|
}
|
||||||
|
|
||||||
return C.ssize_t(n)
|
return C.ssize_t(n)
|
||||||
@@ -164,6 +166,8 @@ func nxt_go_port_recv(pid C.int, id C.int, buf unsafe.Pointer, buf_size C.int,
|
|||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
nxt_go_warn("read result %d (%d), %s", n, oobn, err)
|
nxt_go_warn("read result %d (%d), %s", n, oobn, err)
|
||||||
|
|
||||||
|
n = -1
|
||||||
}
|
}
|
||||||
|
|
||||||
return C.ssize_t(n)
|
return C.ssize_t(n)
|
||||||
|
|||||||
@@ -2635,7 +2635,6 @@ nxt_unit_buf_read(nxt_unit_buf_t **b, uint64_t *len, void *dst, size_t size)
|
|||||||
void
|
void
|
||||||
nxt_unit_request_done(nxt_unit_request_info_t *req, int rc)
|
nxt_unit_request_done(nxt_unit_request_info_t *req, int rc)
|
||||||
{
|
{
|
||||||
ssize_t res;
|
|
||||||
uint32_t size;
|
uint32_t size;
|
||||||
nxt_port_msg_t msg;
|
nxt_port_msg_t msg;
|
||||||
nxt_unit_impl_t *lib;
|
nxt_unit_impl_t *lib;
|
||||||
@@ -2690,12 +2689,8 @@ skip_response_send:
|
|||||||
msg.mf = 0;
|
msg.mf = 0;
|
||||||
msg.tracking = 0;
|
msg.tracking = 0;
|
||||||
|
|
||||||
res = lib->callbacks.port_send(req->ctx, &req->response_port,
|
(void) lib->callbacks.port_send(req->ctx, &req->response_port,
|
||||||
&msg, sizeof(msg), NULL, 0);
|
&msg, sizeof(msg), NULL, 0);
|
||||||
if (nxt_slow_path(res != sizeof(msg))) {
|
|
||||||
nxt_unit_req_alert(req, "last message send failed: %s (%d)",
|
|
||||||
strerror(errno), errno);
|
|
||||||
}
|
|
||||||
|
|
||||||
nxt_unit_request_info_release(req);
|
nxt_unit_request_info_release(req);
|
||||||
}
|
}
|
||||||
@@ -3013,9 +3008,6 @@ nxt_unit_send_oosm(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id)
|
|||||||
|
|
||||||
res = lib->callbacks.port_send(ctx, port_id, &msg, sizeof(msg), NULL, 0);
|
res = lib->callbacks.port_send(ctx, port_id, &msg, sizeof(msg), NULL, 0);
|
||||||
if (nxt_slow_path(res != sizeof(msg))) {
|
if (nxt_slow_path(res != sizeof(msg))) {
|
||||||
nxt_unit_warn(ctx, "failed to send oosm to %d: %s (%d)",
|
|
||||||
(int) port_id->pid, strerror(errno), errno);
|
|
||||||
|
|
||||||
return NXT_UNIT_ERROR;
|
return NXT_UNIT_ERROR;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -3039,6 +3031,7 @@ nxt_unit_wait_shm_ack(nxt_unit_ctx_t *ctx)
|
|||||||
}
|
}
|
||||||
|
|
||||||
nxt_unit_read_buf(ctx, rbuf);
|
nxt_unit_read_buf(ctx, rbuf);
|
||||||
|
|
||||||
if (nxt_slow_path(rbuf->size < (ssize_t) sizeof(nxt_port_msg_t))) {
|
if (nxt_slow_path(rbuf->size < (ssize_t) sizeof(nxt_port_msg_t))) {
|
||||||
nxt_unit_read_buf_release(ctx, rbuf);
|
nxt_unit_read_buf_release(ctx, rbuf);
|
||||||
|
|
||||||
@@ -3294,9 +3287,6 @@ nxt_unit_send_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, int fd)
|
|||||||
res = lib->callbacks.port_send(ctx, port_id, &msg, sizeof(msg),
|
res = lib->callbacks.port_send(ctx, port_id, &msg, sizeof(msg),
|
||||||
&cmsg, sizeof(cmsg));
|
&cmsg, sizeof(cmsg));
|
||||||
if (nxt_slow_path(res != sizeof(msg))) {
|
if (nxt_slow_path(res != sizeof(msg))) {
|
||||||
nxt_unit_warn(ctx, "failed to send shm to %d: %s (%d)",
|
|
||||||
(int) port_id->pid, strerror(errno), errno);
|
|
||||||
|
|
||||||
return NXT_UNIT_ERROR;
|
return NXT_UNIT_ERROR;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -3739,9 +3729,6 @@ nxt_unit_send_shm_ack(nxt_unit_ctx_t *ctx, pid_t pid)
|
|||||||
|
|
||||||
res = lib->callbacks.port_send(ctx, &port_id, &msg, sizeof(msg), NULL, 0);
|
res = lib->callbacks.port_send(ctx, &port_id, &msg, sizeof(msg), NULL, 0);
|
||||||
if (nxt_slow_path(res != sizeof(msg))) {
|
if (nxt_slow_path(res != sizeof(msg))) {
|
||||||
nxt_unit_warn(ctx, "failed to send ack to %d: %s (%d)",
|
|
||||||
(int) port_id.pid, strerror(errno), errno);
|
|
||||||
|
|
||||||
return NXT_UNIT_ERROR;
|
return NXT_UNIT_ERROR;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -3894,6 +3881,10 @@ nxt_unit_run(nxt_unit_ctx_t *ctx)
|
|||||||
|
|
||||||
while (nxt_fast_path(lib->online)) {
|
while (nxt_fast_path(lib->online)) {
|
||||||
rc = nxt_unit_run_once(ctx);
|
rc = nxt_unit_run_once(ctx);
|
||||||
|
|
||||||
|
if (nxt_slow_path(rc != NXT_UNIT_OK)) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return rc;
|
return rc;
|
||||||
@@ -4552,14 +4543,24 @@ nxt_unit_port_send(nxt_unit_ctx_t *ctx, int fd,
|
|||||||
msg.msg_control = (void *) oob;
|
msg.msg_control = (void *) oob;
|
||||||
msg.msg_controllen = oob_size;
|
msg.msg_controllen = oob_size;
|
||||||
|
|
||||||
|
retry:
|
||||||
|
|
||||||
res = sendmsg(fd, &msg, 0);
|
res = sendmsg(fd, &msg, 0);
|
||||||
|
|
||||||
if (nxt_slow_path(res == -1)) {
|
if (nxt_slow_path(res == -1)) {
|
||||||
nxt_unit_warn(ctx, "port_send(%d, %d) failed: %s (%d)",
|
if (errno == EINTR) {
|
||||||
|
goto retry;
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* FIXME: This should be "alert" after router graceful shutdown
|
||||||
|
* implementation.
|
||||||
|
*/
|
||||||
|
nxt_unit_warn(ctx, "sendmsg(%d, %d) failed: %s (%d)",
|
||||||
fd, (int) buf_size, strerror(errno), errno);
|
fd, (int) buf_size, strerror(errno), errno);
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
nxt_unit_debug(ctx, "port_send(%d, %d): %d", fd, (int) buf_size,
|
nxt_unit_debug(ctx, "sendmsg(%d, %d): %d", fd, (int) buf_size,
|
||||||
(int) res);
|
(int) res);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -4629,14 +4630,20 @@ nxt_unit_port_recv(nxt_unit_ctx_t *ctx, int fd, void *buf, size_t buf_size,
|
|||||||
msg.msg_control = oob;
|
msg.msg_control = oob;
|
||||||
msg.msg_controllen = oob_size;
|
msg.msg_controllen = oob_size;
|
||||||
|
|
||||||
|
retry:
|
||||||
|
|
||||||
res = recvmsg(fd, &msg, 0);
|
res = recvmsg(fd, &msg, 0);
|
||||||
|
|
||||||
if (nxt_slow_path(res == -1)) {
|
if (nxt_slow_path(res == -1)) {
|
||||||
nxt_unit_warn(ctx, "port_recv(%d) failed: %s (%d)",
|
if (errno == EINTR) {
|
||||||
fd, strerror(errno), errno);
|
goto retry;
|
||||||
|
}
|
||||||
|
|
||||||
|
nxt_unit_alert(ctx, "recvmsg(%d) failed: %s (%d)",
|
||||||
|
fd, strerror(errno), errno);
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
nxt_unit_debug(ctx, "port_recv(%d): %d", fd, (int) res);
|
nxt_unit_debug(ctx, "recvmsg(%d): %d", fd, (int) res);
|
||||||
}
|
}
|
||||||
|
|
||||||
return res;
|
return res;
|
||||||
|
|||||||
@@ -22,7 +22,7 @@ class TestNodeWebsockets(TestApplicationNode):
|
|||||||
)
|
)
|
||||||
|
|
||||||
self.skip_alerts.extend(
|
self.skip_alerts.extend(
|
||||||
[r'last message send failed', r'socket close\(\d+\) failed']
|
[r'socket close\(\d+\) failed']
|
||||||
)
|
)
|
||||||
|
|
||||||
def close_connection(self, sock):
|
def close_connection(self, sock):
|
||||||
|
|||||||
@@ -193,7 +193,6 @@ class TestUnit(unittest.TestCase):
|
|||||||
|
|
||||||
self.skip_alerts = [
|
self.skip_alerts = [
|
||||||
r'read signalfd\(4\) failed',
|
r'read signalfd\(4\) failed',
|
||||||
r'last message send failed',
|
|
||||||
r'sendmsg.+failed',
|
r'sendmsg.+failed',
|
||||||
r'recvmsg.+failed',
|
r'recvmsg.+failed',
|
||||||
]
|
]
|
||||||
|
|||||||
Reference in New Issue
Block a user