Controller: improved handling of unix domain control socket.

One of the ways to detect Unit's startup and subsequent readiness to accept
commands relies on waiting for the control socket file to be created.
Earlier, it was unreliable due to a race condition between the client's
connect() and the daemon's listen() calls after the socket's bind() call.

Now, unix domain listening sockets are created with a nxt_listen_socket_create()
call as follows:

   s = socket();
   unlink("path/to/socket.tmp")
   bind(s, "path/to/socket.tmp");
   listen(s);
   rename("path/to/socket.tmp", "path/to/socket");

This eliminates a time-lapse when the socket file is already created but nobody
is listening on it yet, which therefore prevents the condition described above.

Also, it allows reliably detecting whether the socket is being used or simply
wasn't cleaned after the daemon stopped abruptly.  A successful connection to
the socket file means the daemon has been started; otherwise, the file can be
overwritten.
This commit is contained in:
Valentin Bartenev
2020-04-08 15:15:24 +03:00
parent a6d9efcee1
commit c7f5c1c664
5 changed files with 90 additions and 18 deletions

View File

@@ -431,7 +431,7 @@ nxt_runtime_controller_socket(nxt_task_t *task, nxt_runtime_t *rt)
#endif
ls->handler = nxt_controller_conn_init;
if (nxt_listen_socket_create(task, ls) != NXT_OK) {
if (nxt_listen_socket_create(task, rt->mem_pool, ls) != NXT_OK) {
return NXT_ERROR;
}

View File

@@ -27,13 +27,23 @@ nxt_listen_socket(nxt_task_t *task, nxt_socket_t s, int backlog)
nxt_int_t
nxt_listen_socket_create(nxt_task_t *task, nxt_listen_socket_t *ls)
nxt_listen_socket_create(nxt_task_t *task, nxt_mp_t *mp,
nxt_listen_socket_t *ls)
{
nxt_log_t log, *old;
nxt_uint_t family;
nxt_socket_t s;
nxt_thread_t *thr;
nxt_sockaddr_t *sa;
#if (NXT_HAVE_UNIX_DOMAIN)
int ret;
u_char *p;
nxt_err_t err;
nxt_socket_t ts;
nxt_sockaddr_t *orig_sa;
nxt_file_name_t *name, *tmp;
nxt_file_access_t access;
#endif
sa = ls->sockaddr;
@@ -80,6 +90,36 @@ nxt_listen_socket_create(nxt_task_t *task, nxt_listen_socket_t *ls)
nxt_socket_defer_accept(task, s, sa);
}
#if (NXT_HAVE_UNIX_DOMAIN)
if (family == AF_UNIX
&& sa->type == SOCK_STREAM
&& sa->u.sockaddr_un.sun_path[0] != '\0')
{
orig_sa = sa;
sa = nxt_sockaddr_alloc(mp, sa->socklen + 4, sa->length + 4);
if (sa == NULL) {
goto fail;
}
sa->type = SOCK_STREAM;
sa->u.sockaddr_un.sun_family = AF_UNIX;
p = nxt_cpystr((u_char *) sa->u.sockaddr_un.sun_path,
(u_char *) orig_sa->u.sockaddr_un.sun_path);
nxt_memcpy(p, ".tmp", 4);
nxt_sockaddr_text(sa);
(void) unlink(sa->u.sockaddr_un.sun_path);
} else {
orig_sa = NULL;
}
#endif
if (nxt_socket_bind(task, s, sa) != NXT_OK) {
goto fail;
}
@@ -87,9 +127,6 @@ nxt_listen_socket_create(nxt_task_t *task, nxt_listen_socket_t *ls)
#if (NXT_HAVE_UNIX_DOMAIN)
if (family == AF_UNIX) {
nxt_file_name_t *name;
nxt_file_access_t access;
name = (nxt_file_name_t *) sa->u.sockaddr_un.sun_path;
access = (S_IRUSR | S_IWUSR);
@@ -109,6 +146,46 @@ nxt_listen_socket_create(nxt_task_t *task, nxt_listen_socket_t *ls)
goto listen_fail;
}
#if (NXT_HAVE_UNIX_DOMAIN)
if (orig_sa != NULL) {
ts = nxt_socket_create(task, AF_UNIX, SOCK_STREAM, 0, 0);
if (ts == -1) {
goto listen_fail;
}
ret = connect(ts, &orig_sa->u.sockaddr, orig_sa->socklen);
err = nxt_socket_errno;
nxt_socket_close(task, ts);
if (ret == 0) {
nxt_alert(task, "connect(%d, %*s) succeed, address already in use",
ts, (size_t) orig_sa->length,
nxt_sockaddr_start(orig_sa));
goto listen_fail;
}
if (err != NXT_ENOENT && err != NXT_ECONNREFUSED) {
nxt_alert(task, "connect(%d, %*s) failed %E",
ts, (size_t) orig_sa->length,
nxt_sockaddr_start(orig_sa), err);
goto listen_fail;
}
tmp = (nxt_file_name_t *) sa->u.sockaddr_un.sun_path;
name = (nxt_file_name_t *) orig_sa->u.sockaddr_un.sun_path;
if (nxt_file_rename(tmp, name) != NXT_OK) {
goto listen_fail;
}
}
#endif
ls->socket = s;
thr->log = old;
@@ -119,8 +196,6 @@ listen_fail:
#if (NXT_HAVE_UNIX_DOMAIN)
if (family == AF_UNIX) {
nxt_file_name_t *name;
name = (nxt_file_name_t *) sa->u.sockaddr_un.sun_path;
(void) nxt_file_delete(name);

View File

@@ -54,7 +54,7 @@ typedef struct {
NXT_EXPORT nxt_int_t nxt_listen_socket(nxt_task_t *task, nxt_socket_t s,
int backlog);
NXT_EXPORT nxt_int_t nxt_listen_socket_create(nxt_task_t *task,
NXT_EXPORT nxt_int_t nxt_listen_socket_create(nxt_task_t *task, nxt_mp_t *mp,
nxt_listen_socket_t *ls);
NXT_EXPORT nxt_int_t nxt_listen_socket_update(nxt_task_t *task,
nxt_listen_socket_t *ls, nxt_listen_socket_t *prev);

View File

@@ -1205,7 +1205,7 @@ nxt_runtime_listen_sockets_create(nxt_task_t *task, nxt_runtime_t *rt)
}
}
if (nxt_listen_socket_create(task, &curr[c]) != NXT_OK) {
if (nxt_listen_socket_create(task, rt->mem_pool, &curr[c]) != NXT_OK) {
return NXT_ERROR;
}

View File

@@ -185,10 +185,7 @@ class TestUnit(unittest.TestCase):
atexit.register(self.stop)
# Due to race between connect() and listen() after the socket binding
# tests waits for unit.pid file which is created after listen().
if not self.waitforfiles(self.testdir + '/unit.pid'):
if not self.waitforfiles(self.testdir + '/control.unit.sock'):
exit("Could not start unit")
self.skip_alerts = [