I/O operations refactoring.

This commit is contained in:
Igor Sysoev
2017-02-22 15:09:59 +03:00
parent 059a864289
commit 029942f4eb
49 changed files with 1145 additions and 1216 deletions

View File

@@ -17,7 +17,7 @@ static void nxt_port_error_handler(nxt_task_t *task, void *obj, void *data);
nxt_port_t *
nxt_port_alloc(void)
nxt_port_alloc(nxt_task_t *task)
{
nxt_port_t *port;
nxt_mem_pool_t *mp;
@@ -29,6 +29,8 @@ nxt_port_alloc(void)
port = nxt_mem_zalloc(mp, sizeof(nxt_port_t));
port->mem_pool = mp;
port->socket.task = task;
port->pair[0] = -1;
port->pair[1] = -1;
@@ -42,31 +44,31 @@ nxt_port_alloc(void)
nxt_port_t *
nxt_port_create(size_t max_size)
nxt_port_create(nxt_task_t *task, size_t max_size)
{
nxt_int_t sndbuf, rcvbuf, size;
nxt_port_t *port;
nxt_socket_t snd, rcv;
port = nxt_port_alloc();
port = nxt_port_alloc(task);
if (nxt_slow_path(port == NULL)) {
return NULL;
}
if (nxt_slow_path(nxt_socketpair_create(port->pair) != NXT_OK)) {
if (nxt_slow_path(nxt_socketpair_create(task, port->pair) != NXT_OK)) {
goto socketpair_fail;
}
snd = port->pair[1];
sndbuf = nxt_socket_getsockopt(snd, SOL_SOCKET, SO_SNDBUF);
sndbuf = nxt_socket_getsockopt(task, snd, SOL_SOCKET, SO_SNDBUF);
if (nxt_slow_path(sndbuf < 0)) {
goto getsockopt_fail;
}
rcv = port->pair[0];
rcvbuf = nxt_socket_getsockopt(rcv, SOL_SOCKET, SO_RCVBUF);
rcvbuf = nxt_socket_getsockopt(task, rcv, SOL_SOCKET, SO_RCVBUF);
if (nxt_slow_path(rcvbuf < 0)) {
goto getsockopt_fail;
}
@@ -83,9 +85,10 @@ nxt_port_create(size_t max_size)
* on send direction and 4K buffer size on receive direction;
* Solaris uses 16K on send direction and 5K on receive direction.
*/
(void) nxt_socket_setsockopt(snd, SOL_SOCKET, SO_SNDBUF, max_size);
(void) nxt_socket_setsockopt(task, snd, SOL_SOCKET, SO_SNDBUF,
max_size);
sndbuf = nxt_socket_getsockopt(snd, SOL_SOCKET, SO_SNDBUF);
sndbuf = nxt_socket_getsockopt(task, snd, SOL_SOCKET, SO_SNDBUF);
if (nxt_slow_path(sndbuf < 0)) {
goto getsockopt_fail;
}
@@ -93,9 +96,10 @@ nxt_port_create(size_t max_size)
size = sndbuf * 4;
if (rcvbuf < size) {
(void) nxt_socket_setsockopt(rcv, SOL_SOCKET, SO_RCVBUF, size);
(void) nxt_socket_setsockopt(task, rcv, SOL_SOCKET, SO_RCVBUF,
size);
rcvbuf = nxt_socket_getsockopt(rcv, SOL_SOCKET, SO_RCVBUF);
rcvbuf = nxt_socket_getsockopt(task, rcv, SOL_SOCKET, SO_RCVBUF);
if (nxt_slow_path(rcvbuf < 0)) {
goto getsockopt_fail;
}
@@ -109,8 +113,8 @@ nxt_port_create(size_t max_size)
getsockopt_fail:
nxt_socket_close(port->pair[0]);
nxt_socket_close(port->pair[1]);
nxt_socket_close(task, port->pair[0]);
nxt_socket_close(task, port->pair[1]);
socketpair_fail:
@@ -123,7 +127,7 @@ socketpair_fail:
void
nxt_port_destroy(nxt_port_t *port)
{
nxt_socket_close(port->socket.fd);
nxt_socket_close(port->socket.task, port->socket.fd);
nxt_mem_pool_destroy(port->mem_pool);
}
@@ -135,12 +139,6 @@ nxt_port_write_enable(nxt_task_t *task, nxt_port_t *port)
port->socket.log = &nxt_main_log;
port->socket.write_ready = 1;
port->task.thread = task->thread;
port->task.log = port->socket.log;
port->task.ident = nxt_task_next_ident();
port->socket.task = &port->task;
port->socket.write_work_queue = &task->thread->engine->fast_work_queue;
port->socket.write_handler = nxt_port_write_handler;
port->socket.error_handler = nxt_port_error_handler;
@@ -150,7 +148,7 @@ nxt_port_write_enable(nxt_task_t *task, nxt_port_t *port)
void
nxt_port_write_close(nxt_port_t *port)
{
nxt_socket_close(port->pair[1]);
nxt_socket_close(port->socket.task, port->pair[1]);
port->pair[1] = -1;
}
@@ -301,12 +299,6 @@ nxt_port_read_enable(nxt_task_t *task, nxt_port_t *port)
port->socket.fd = port->pair[0];
port->socket.log = &nxt_main_log;
port->task.thread = task->thread;
port->task.log = port->socket.log;
port->task.ident = nxt_task_next_ident();
port->socket.task = &port->task;
port->socket.read_work_queue = &task->thread->engine->fast_work_queue;
port->socket.read_handler = nxt_port_read_handler;
port->socket.error_handler = nxt_port_error_handler;
@@ -318,7 +310,7 @@ nxt_port_read_enable(nxt_task_t *task, nxt_port_t *port)
void
nxt_port_read_close(nxt_port_t *port)
{
nxt_socket_close(port->pair[0]);
nxt_socket_close(port->socket.task, port->pair[0]);
port->pair[0] = -1;
}