988 lines
26 KiB
C
988 lines
26 KiB
C
|
|
/*
|
|
* Copyright (C) Igor Sysoev
|
|
* Copyright (C) NGINX, Inc.
|
|
*/
|
|
|
|
#include <nxt_main.h>
|
|
|
|
|
|
static void nxt_conn_proxy_client_buffer_alloc(nxt_task_t *task, void *obj,
|
|
void *data);
|
|
static void nxt_conn_proxy_peer_connect(nxt_task_t *task, void *obj,
|
|
void *data);
|
|
static void nxt_conn_proxy_connected(nxt_task_t *task, void *obj, void *data);
|
|
static void nxt_conn_proxy_peer_read(nxt_task_t *task, void *obj, void *data);
|
|
static void nxt_conn_proxy_client_read_ready(nxt_task_t *task, void *obj,
|
|
void *data);
|
|
static void nxt_conn_proxy_peer_read_ready(nxt_task_t *task, void *obj,
|
|
void *data);
|
|
static void nxt_conn_proxy_read_process(nxt_task_t *task, nxt_conn_proxy_t *p,
|
|
nxt_conn_t *source, nxt_conn_t *sink);
|
|
static void nxt_conn_proxy_write_add(nxt_conn_t *c, nxt_buf_t *b);
|
|
static void nxt_conn_proxy_read(nxt_task_t *task, void *obj, void *data);
|
|
static void nxt_conn_proxy_client_write_ready(nxt_task_t *task, void *obj,
|
|
void *data);
|
|
static void nxt_conn_proxy_peer_write_ready(nxt_task_t *task, void *obj,
|
|
void *data);
|
|
static void nxt_conn_proxy_write_process(nxt_task_t *task, nxt_conn_proxy_t *p,
|
|
nxt_conn_t *sink, nxt_conn_t *source);
|
|
static void nxt_conn_proxy_read_add(nxt_conn_t *c, nxt_buf_t *b);
|
|
static void nxt_conn_proxy_close(nxt_task_t *task, void *obj, void *data);
|
|
static void nxt_conn_proxy_error(nxt_task_t *task, void *obj, void *data);
|
|
static void nxt_conn_proxy_read_timeout(nxt_task_t *task, void *obj,
|
|
void *data);
|
|
static void nxt_conn_proxy_write_timeout(nxt_task_t *task, void *obj,
|
|
void *data);
|
|
static nxt_msec_t nxt_conn_proxy_timeout_value(nxt_conn_t *c, uintptr_t data);
|
|
static void nxt_conn_proxy_refused(nxt_task_t *task, void *obj, void *data);
|
|
static void nxt_conn_proxy_reconnect_handler(nxt_task_t *task, void *obj,
|
|
void *data);
|
|
static void nxt_conn_proxy_shutdown(nxt_task_t *task, nxt_conn_proxy_t *p,
|
|
nxt_conn_t *source, nxt_conn_t *sink);
|
|
static void nxt_conn_proxy_read_error(nxt_task_t *task, void *obj, void *data);
|
|
static void nxt_conn_proxy_write_error(nxt_task_t *task, void *obj, void *data);
|
|
static void nxt_conn_proxy_complete(nxt_task_t *task, nxt_conn_proxy_t *p);
|
|
static void nxt_conn_proxy_completion(nxt_task_t *task, void *obj, void *data);
|
|
|
|
|
|
static const nxt_conn_state_t nxt_conn_proxy_client_wait_state;
|
|
static const nxt_conn_state_t nxt_conn_proxy_client_first_read_state;
|
|
static const nxt_conn_state_t nxt_conn_proxy_peer_connect_state;
|
|
static const nxt_conn_state_t nxt_conn_proxy_peer_wait_state;
|
|
static const nxt_conn_state_t nxt_conn_proxy_client_read_state;
|
|
static const nxt_conn_state_t nxt_conn_proxy_peer_read_state;
|
|
static const nxt_conn_state_t nxt_conn_proxy_client_write_state;
|
|
static const nxt_conn_state_t nxt_conn_proxy_peer_write_state;
|
|
|
|
|
|
nxt_conn_proxy_t *
|
|
nxt_conn_proxy_create(nxt_conn_t *client)
|
|
{
|
|
nxt_conn_t *peer;
|
|
nxt_thread_t *thr;
|
|
nxt_conn_proxy_t *p;
|
|
|
|
p = nxt_mp_zget(client->mem_pool, sizeof(nxt_conn_proxy_t));
|
|
if (nxt_slow_path(p == NULL)) {
|
|
return NULL;
|
|
}
|
|
|
|
peer = nxt_conn_create(client->mem_pool, client->socket.task);
|
|
if (nxt_slow_path(peer == NULL)) {
|
|
return NULL;
|
|
}
|
|
|
|
thr = nxt_thread();
|
|
|
|
client->read_work_queue = &thr->engine->read_work_queue;
|
|
client->write_work_queue = &thr->engine->write_work_queue;
|
|
client->socket.read_work_queue = &thr->engine->read_work_queue;
|
|
client->socket.write_work_queue = &thr->engine->write_work_queue;
|
|
peer->socket.read_work_queue = &thr->engine->read_work_queue;
|
|
peer->socket.write_work_queue = &thr->engine->write_work_queue;
|
|
|
|
peer->socket.data = client->socket.data;
|
|
|
|
peer->read_work_queue = client->read_work_queue;
|
|
peer->write_work_queue = client->write_work_queue;
|
|
peer->read_timer.work_queue = client->read_work_queue;
|
|
peer->write_timer.work_queue = client->write_work_queue;
|
|
|
|
p->client = client;
|
|
p->peer = peer;
|
|
|
|
return p;
|
|
}
|
|
|
|
|
|
void
|
|
nxt_conn_proxy(nxt_task_t *task, nxt_conn_proxy_t *p)
|
|
{
|
|
nxt_conn_t *peer;
|
|
|
|
/*
|
|
* Peer read event: not connected, disabled.
|
|
* Peer write event: not connected, disabled.
|
|
*/
|
|
|
|
if (p->client_wait_timeout == 0) {
|
|
/*
|
|
* Peer write event: waiting for connection
|
|
* to be established with connect_timeout.
|
|
*/
|
|
peer = p->peer;
|
|
peer->write_state = &nxt_conn_proxy_peer_connect_state;
|
|
|
|
nxt_conn_connect(task->thread->engine, peer);
|
|
}
|
|
|
|
/*
|
|
* Client read event: waiting for client data with
|
|
* client_wait_timeout before buffer allocation.
|
|
*/
|
|
p->client->read_state = &nxt_conn_proxy_client_wait_state;
|
|
|
|
nxt_conn_wait(p->client);
|
|
}
|
|
|
|
|
|
static const nxt_conn_state_t nxt_conn_proxy_client_wait_state
|
|
nxt_aligned(64) =
|
|
{
|
|
.ready_handler = nxt_conn_proxy_client_buffer_alloc,
|
|
.close_handler = nxt_conn_proxy_close,
|
|
.error_handler = nxt_conn_proxy_error,
|
|
|
|
.timer_handler = nxt_conn_proxy_read_timeout,
|
|
.timer_value = nxt_conn_proxy_timeout_value,
|
|
.timer_data = offsetof(nxt_conn_proxy_t, client_wait_timeout),
|
|
};
|
|
|
|
|
|
static void
|
|
nxt_conn_proxy_client_buffer_alloc(nxt_task_t *task, void *obj, void *data)
|
|
{
|
|
nxt_buf_t *b;
|
|
nxt_conn_t *client;
|
|
nxt_conn_proxy_t *p;
|
|
|
|
client = obj;
|
|
p = data;
|
|
|
|
nxt_debug(task, "conn proxy client first read fd:%d", client->socket.fd);
|
|
|
|
b = nxt_buf_mem_alloc(client->mem_pool, p->client_buffer_size, 0);
|
|
if (nxt_slow_path(b == NULL)) {
|
|
/* An error completion. */
|
|
nxt_conn_proxy_complete(task, p);
|
|
return;
|
|
}
|
|
|
|
p->client_buffer = b;
|
|
client->read = b;
|
|
|
|
if (p->peer->socket.fd != -1) {
|
|
/*
|
|
* Client read event: waiting, no timeout.
|
|
* Client write event: blocked.
|
|
* Peer read event: disabled.
|
|
* Peer write event: waiting for connection to be established
|
|
* or blocked after the connection has established.
|
|
*/
|
|
client->read_state = &nxt_conn_proxy_client_read_state;
|
|
|
|
} else {
|
|
/*
|
|
* Client read event: waiting for data with client_wait_timeout
|
|
* before connecting to a peer.
|
|
* Client write event: blocked.
|
|
* Peer read event: not connected, disabled.
|
|
* Peer write event: not connected, disabled.
|
|
*/
|
|
client->read_state = &nxt_conn_proxy_client_first_read_state;
|
|
}
|
|
|
|
nxt_conn_read(task->thread->engine, client);
|
|
}
|
|
|
|
|
|
static const nxt_conn_state_t nxt_conn_proxy_client_first_read_state
|
|
nxt_aligned(64) =
|
|
{
|
|
.ready_handler = nxt_conn_proxy_peer_connect,
|
|
.close_handler = nxt_conn_proxy_close,
|
|
.error_handler = nxt_conn_proxy_error,
|
|
|
|
.timer_handler = nxt_conn_proxy_read_timeout,
|
|
.timer_value = nxt_conn_proxy_timeout_value,
|
|
.timer_data = offsetof(nxt_conn_proxy_t, client_wait_timeout),
|
|
.timer_autoreset = 1,
|
|
};
|
|
|
|
|
|
static void
|
|
nxt_conn_proxy_peer_connect(nxt_task_t *task, void *obj, void *data)
|
|
{
|
|
nxt_conn_t *client;
|
|
nxt_conn_proxy_t *p;
|
|
|
|
client = obj;
|
|
p = data;
|
|
|
|
/*
|
|
* Client read event: waiting, no timeout.
|
|
* Client write event: blocked.
|
|
* Peer read event: disabled.
|
|
* Peer write event: waiting for connection to be established
|
|
* with connect_timeout.
|
|
*/
|
|
client->read_state = &nxt_conn_proxy_client_read_state;
|
|
|
|
p->peer->write_state = &nxt_conn_proxy_peer_connect_state;
|
|
|
|
nxt_conn_connect(task->thread->engine, p->peer);
|
|
}
|
|
|
|
|
|
static const nxt_conn_state_t nxt_conn_proxy_peer_connect_state
|
|
nxt_aligned(64) =
|
|
{
|
|
.ready_handler = nxt_conn_proxy_connected,
|
|
.close_handler = nxt_conn_proxy_refused,
|
|
.error_handler = nxt_conn_proxy_error,
|
|
|
|
.timer_handler = nxt_conn_proxy_write_timeout,
|
|
.timer_value = nxt_conn_proxy_timeout_value,
|
|
.timer_data = offsetof(nxt_conn_proxy_t, connect_timeout),
|
|
.timer_autoreset = 1,
|
|
};
|
|
|
|
|
|
static void
|
|
nxt_conn_proxy_connected(nxt_task_t *task, void *obj, void *data)
|
|
{
|
|
nxt_conn_t *client, *peer;
|
|
nxt_conn_proxy_t *p;
|
|
|
|
peer = obj;
|
|
p = data;
|
|
|
|
nxt_debug(task, "conn proxy connected fd:%d", peer->socket.fd);
|
|
|
|
p->connected = 1;
|
|
|
|
nxt_conn_tcp_nodelay_on(task, peer);
|
|
nxt_conn_tcp_nodelay_on(task, p->client);
|
|
|
|
/* Peer read event: waiting with peer_wait_timeout. */
|
|
|
|
peer->read_state = &nxt_conn_proxy_peer_wait_state;
|
|
peer->write_state = &nxt_conn_proxy_peer_write_state;
|
|
|
|
nxt_conn_wait(peer);
|
|
|
|
if (p->client_buffer != NULL) {
|
|
client = p->client;
|
|
|
|
client->read_state = &nxt_conn_proxy_client_read_state;
|
|
client->write_state = &nxt_conn_proxy_client_write_state;
|
|
/*
|
|
* Send a client read data to the connected peer.
|
|
* Client write event: blocked.
|
|
*/
|
|
nxt_conn_proxy_read_process(task, p, client, peer);
|
|
}
|
|
}
|
|
|
|
|
|
static const nxt_conn_state_t nxt_conn_proxy_peer_wait_state
|
|
nxt_aligned(64) =
|
|
{
|
|
.ready_handler = nxt_conn_proxy_peer_read,
|
|
.close_handler = nxt_conn_proxy_close,
|
|
.error_handler = nxt_conn_proxy_error,
|
|
|
|
.timer_handler = nxt_conn_proxy_read_timeout,
|
|
.timer_value = nxt_conn_proxy_timeout_value,
|
|
.timer_data = offsetof(nxt_conn_proxy_t, peer_wait_timeout),
|
|
};
|
|
|
|
|
|
static void
|
|
nxt_conn_proxy_peer_read(nxt_task_t *task, void *obj, void *data)
|
|
{
|
|
nxt_buf_t *b;
|
|
nxt_conn_t *peer;
|
|
nxt_conn_proxy_t *p;
|
|
|
|
peer = obj;
|
|
p = data;
|
|
|
|
nxt_debug(task, "conn proxy peer read fd:%d", peer->socket.fd);
|
|
|
|
b = nxt_buf_mem_alloc(peer->mem_pool, p->peer_buffer_size, 0);
|
|
if (nxt_slow_path(b == NULL)) {
|
|
/* An error completion. */
|
|
nxt_conn_proxy_complete(task, p);
|
|
return;
|
|
}
|
|
|
|
p->peer_buffer = b;
|
|
peer->read = b;
|
|
|
|
p->client->write_state = &nxt_conn_proxy_client_write_state;
|
|
peer->read_state = &nxt_conn_proxy_peer_read_state;
|
|
peer->write_state = &nxt_conn_proxy_peer_write_state;
|
|
|
|
/*
|
|
* Client read event: waiting, no timeout.
|
|
* Client write event: blocked.
|
|
* Peer read event: waiting with possible peer_wait_timeout.
|
|
* Peer write event: blocked.
|
|
*/
|
|
nxt_conn_read(task->thread->engine, peer);
|
|
}
|
|
|
|
|
|
static const nxt_conn_state_t nxt_conn_proxy_client_read_state
|
|
nxt_aligned(64) =
|
|
{
|
|
.ready_handler = nxt_conn_proxy_client_read_ready,
|
|
.close_handler = nxt_conn_proxy_close,
|
|
.error_handler = nxt_conn_proxy_read_error,
|
|
};
|
|
|
|
|
|
static void
|
|
nxt_conn_proxy_client_read_ready(nxt_task_t *task, void *obj, void *data)
|
|
{
|
|
nxt_conn_t *client;
|
|
nxt_conn_proxy_t *p;
|
|
|
|
client = obj;
|
|
p = data;
|
|
|
|
nxt_debug(task, "conn proxy client read ready fd:%d", client->socket.fd);
|
|
|
|
nxt_conn_proxy_read_process(task, p, client, p->peer);
|
|
}
|
|
|
|
|
|
static const nxt_conn_state_t nxt_conn_proxy_peer_read_state
|
|
nxt_aligned(64) =
|
|
{
|
|
.ready_handler = nxt_conn_proxy_peer_read_ready,
|
|
.close_handler = nxt_conn_proxy_close,
|
|
.error_handler = nxt_conn_proxy_read_error,
|
|
};
|
|
|
|
|
|
static void
|
|
nxt_conn_proxy_peer_read_ready(nxt_task_t *task, void *obj, void *data)
|
|
{
|
|
nxt_conn_t *peer;
|
|
nxt_conn_proxy_t *p;
|
|
|
|
peer = obj;
|
|
p = data;
|
|
|
|
nxt_debug(task, "conn proxy peer read ready fd:%d", peer->socket.fd);
|
|
|
|
nxt_conn_proxy_read_process(task, p, peer, p->client);
|
|
}
|
|
|
|
|
|
static void
|
|
nxt_conn_proxy_read_process(nxt_task_t *task, nxt_conn_proxy_t *p,
|
|
nxt_conn_t *source, nxt_conn_t *sink)
|
|
{
|
|
nxt_buf_t *rb, *wb;
|
|
|
|
if (sink->socket.error != 0) {
|
|
nxt_debug(task, "conn proxy sink fd:%d error:%d",
|
|
sink->socket.fd, sink->socket.error);
|
|
|
|
nxt_conn_proxy_write_error(task, sink, sink->socket.data);
|
|
return;
|
|
}
|
|
|
|
while (source->read != NULL) {
|
|
|
|
rb = source->read;
|
|
|
|
if (rb->mem.pos != rb->mem.free) {
|
|
|
|
/* Add a read part to a write chain. */
|
|
|
|
wb = nxt_buf_mem_alloc(source->mem_pool, 0, 0);
|
|
if (wb == NULL) {
|
|
/* An error completion. */
|
|
nxt_conn_proxy_complete(task, p);
|
|
return;
|
|
}
|
|
|
|
wb->mem.pos = rb->mem.pos;
|
|
wb->mem.free = rb->mem.free;
|
|
wb->mem.start = rb->mem.pos;
|
|
wb->mem.end = rb->mem.free;
|
|
|
|
rb->mem.pos = rb->mem.free;
|
|
rb->mem.start = rb->mem.free;
|
|
|
|
nxt_conn_proxy_write_add(sink, wb);
|
|
}
|
|
|
|
if (rb->mem.start != rb->mem.end) {
|
|
nxt_work_queue_add(source->read_work_queue, nxt_conn_proxy_read,
|
|
task, source, source->socket.data);
|
|
break;
|
|
}
|
|
|
|
source->read = rb->next;
|
|
nxt_buf_free(source->mem_pool, rb);
|
|
}
|
|
|
|
if (p->connected) {
|
|
nxt_conn_write(task->thread->engine, sink);
|
|
}
|
|
}
|
|
|
|
|
|
static void
|
|
nxt_conn_proxy_write_add(nxt_conn_t *c, nxt_buf_t *b)
|
|
{
|
|
nxt_buf_t *first, *second, *prev;
|
|
|
|
first = c->write;
|
|
|
|
if (first == NULL) {
|
|
c->write = b;
|
|
return;
|
|
}
|
|
|
|
/*
|
|
* A event conn proxy maintains a buffer per each direction.
|
|
* The buffer is divided by read and write parts. These parts are
|
|
* linked in buffer chains. There can be no more than two buffers
|
|
* in write chain at any time, because an added buffer is coalesced
|
|
* with the last buffer if possible.
|
|
*/
|
|
|
|
second = first->next;
|
|
|
|
if (second == NULL) {
|
|
|
|
if (first->mem.end != b->mem.start) {
|
|
first->next = b;
|
|
return;
|
|
}
|
|
|
|
/*
|
|
* The first buffer is just before the added buffer, so
|
|
* expand the first buffer to the end of the added buffer.
|
|
*/
|
|
prev = first;
|
|
|
|
} else {
|
|
if (second->mem.end != b->mem.start) {
|
|
nxt_thread_log_alert("event conn proxy write: second buffer end:%p "
|
|
"is not equal to added buffer start:%p",
|
|
second->mem.end, b->mem.start);
|
|
return;
|
|
}
|
|
|
|
/*
|
|
* "second->mem.end == b->mem.start" must be always true here,
|
|
* that is the second buffer is just before the added buffer,
|
|
* so expand the second buffer to the end of added buffer.
|
|
*/
|
|
prev = second;
|
|
}
|
|
|
|
prev->mem.free = b->mem.end;
|
|
prev->mem.end = b->mem.end;
|
|
|
|
nxt_buf_free(c->mem_pool, b);
|
|
}
|
|
|
|
|
|
static void
|
|
nxt_conn_proxy_read(nxt_task_t *task, void *obj, void *data)
|
|
{
|
|
nxt_conn_t *source, *sink;
|
|
nxt_conn_proxy_t *p;
|
|
|
|
source = obj;
|
|
p = data;
|
|
|
|
nxt_debug(task, "conn proxy read fd:%d", source->socket.fd);
|
|
|
|
if (!source->socket.closed) {
|
|
sink = (source == p->client) ? p->peer : p->client;
|
|
|
|
if (sink->socket.error == 0) {
|
|
nxt_conn_read(task->thread->engine, source);
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
static const nxt_conn_state_t nxt_conn_proxy_client_write_state
|
|
nxt_aligned(64) =
|
|
{
|
|
.ready_handler = nxt_conn_proxy_client_write_ready,
|
|
.error_handler = nxt_conn_proxy_write_error,
|
|
|
|
.timer_handler = nxt_conn_proxy_write_timeout,
|
|
.timer_value = nxt_conn_proxy_timeout_value,
|
|
.timer_data = offsetof(nxt_conn_proxy_t, client_write_timeout),
|
|
.timer_autoreset = 1,
|
|
};
|
|
|
|
|
|
static void
|
|
nxt_conn_proxy_client_write_ready(nxt_task_t *task, void *obj, void *data)
|
|
{
|
|
nxt_conn_t *client;
|
|
nxt_conn_proxy_t *p;
|
|
|
|
client = obj;
|
|
p = data;
|
|
|
|
nxt_debug(task, "conn proxy client write ready fd:%d", client->socket.fd);
|
|
|
|
nxt_conn_proxy_write_process(task, p, client, p->peer);
|
|
}
|
|
|
|
|
|
static const nxt_conn_state_t nxt_conn_proxy_peer_write_state
|
|
nxt_aligned(64) =
|
|
{
|
|
.ready_handler = nxt_conn_proxy_peer_write_ready,
|
|
.error_handler = nxt_conn_proxy_write_error,
|
|
|
|
.timer_handler = nxt_conn_proxy_write_timeout,
|
|
.timer_value = nxt_conn_proxy_timeout_value,
|
|
.timer_data = offsetof(nxt_conn_proxy_t, peer_write_timeout),
|
|
.timer_autoreset = 1,
|
|
};
|
|
|
|
|
|
static void
|
|
nxt_conn_proxy_peer_write_ready(nxt_task_t *task, void *obj, void *data)
|
|
{
|
|
nxt_conn_t *peer;
|
|
nxt_conn_proxy_t *p;
|
|
|
|
peer = obj;
|
|
p = data;
|
|
|
|
nxt_debug(task, "conn proxy peer write ready fd:%d", peer->socket.fd);
|
|
|
|
nxt_conn_proxy_write_process(task, p, peer, p->client);
|
|
}
|
|
|
|
|
|
static void
|
|
nxt_conn_proxy_write_process(nxt_task_t *task, nxt_conn_proxy_t *p,
|
|
nxt_conn_t *sink, nxt_conn_t *source)
|
|
{
|
|
nxt_buf_t *rb, *wb;
|
|
|
|
while (sink->write != NULL) {
|
|
|
|
wb = sink->write;
|
|
|
|
if (nxt_buf_is_sync(wb)) {
|
|
|
|
/* A sync buffer marks the end of stream. */
|
|
|
|
sink->write = NULL;
|
|
nxt_buf_free(sink->mem_pool, wb);
|
|
nxt_conn_proxy_shutdown(task, p, source, sink);
|
|
return;
|
|
}
|
|
|
|
if (wb->mem.start != wb->mem.pos) {
|
|
|
|
/* Add a written part to a read chain. */
|
|
|
|
rb = nxt_buf_mem_alloc(sink->mem_pool, 0, 0);
|
|
if (rb == NULL) {
|
|
/* An error completion. */
|
|
nxt_conn_proxy_complete(task, p);
|
|
return;
|
|
}
|
|
|
|
rb->mem.pos = wb->mem.start;
|
|
rb->mem.free = wb->mem.start;
|
|
rb->mem.start = wb->mem.start;
|
|
rb->mem.end = wb->mem.pos;
|
|
|
|
wb->mem.start = wb->mem.pos;
|
|
|
|
nxt_conn_proxy_read_add(source, rb);
|
|
}
|
|
|
|
if (wb->mem.pos != wb->mem.free) {
|
|
nxt_conn_write(task->thread->engine, sink);
|
|
|
|
break;
|
|
}
|
|
|
|
sink->write = wb->next;
|
|
nxt_buf_free(sink->mem_pool, wb);
|
|
}
|
|
|
|
nxt_work_queue_add(source->read_work_queue, nxt_conn_proxy_read,
|
|
task, source, source->socket.data);
|
|
}
|
|
|
|
|
|
static void
|
|
nxt_conn_proxy_read_add(nxt_conn_t *c, nxt_buf_t *b)
|
|
{
|
|
nxt_buf_t *first, *second;
|
|
|
|
first = c->read;
|
|
|
|
if (first == NULL) {
|
|
c->read = b;
|
|
return;
|
|
}
|
|
|
|
/*
|
|
* A event conn proxy maintains a buffer per each direction.
|
|
* The buffer is divided by read and write parts. These parts are
|
|
* linked in buffer chains. There can be no more than two buffers
|
|
* in read chain at any time, because an added buffer is coalesced
|
|
* with the last buffer if possible. The first and the second
|
|
* buffers are also coalesced if possible.
|
|
*/
|
|
|
|
second = first->next;
|
|
|
|
if (second == NULL) {
|
|
|
|
if (first->mem.start == b->mem.end) {
|
|
/*
|
|
* The added buffer is just before the first buffer, so expand
|
|
* the first buffer to the beginning of the added buffer.
|
|
*/
|
|
first->mem.pos = b->mem.start;
|
|
first->mem.free = b->mem.start;
|
|
first->mem.start = b->mem.start;
|
|
|
|
} else if (first->mem.end == b->mem.start) {
|
|
/*
|
|
* The added buffer is just after the first buffer, so
|
|
* expand the first buffer to the end of the added buffer.
|
|
*/
|
|
first->mem.end = b->mem.end;
|
|
|
|
} else {
|
|
first->next = b;
|
|
return;
|
|
}
|
|
|
|
} else {
|
|
if (second->mem.end != b->mem.start) {
|
|
nxt_thread_log_alert("event conn proxy read: second buffer end:%p "
|
|
"is not equal to added buffer start:%p",
|
|
second->mem.end, b->mem.start);
|
|
return;
|
|
}
|
|
|
|
/*
|
|
* The added buffer is just after the second buffer, so
|
|
* expand the second buffer to the end of the added buffer.
|
|
*/
|
|
second->mem.end = b->mem.end;
|
|
|
|
if (first->mem.start == second->mem.end) {
|
|
/*
|
|
* The second buffer is just before the first buffer, so expand
|
|
* the first buffer to the beginning of the second buffer.
|
|
*/
|
|
first->mem.pos = second->mem.start;
|
|
first->mem.free = second->mem.start;
|
|
first->mem.start = second->mem.start;
|
|
first->next = NULL;
|
|
|
|
nxt_buf_free(c->mem_pool, second);
|
|
}
|
|
}
|
|
|
|
nxt_buf_free(c->mem_pool, b);
|
|
}
|
|
|
|
|
|
static void
|
|
nxt_conn_proxy_close(nxt_task_t *task, void *obj, void *data)
|
|
{
|
|
nxt_buf_t *b;
|
|
nxt_conn_t *source, *sink;
|
|
nxt_conn_proxy_t *p;
|
|
|
|
source = obj;
|
|
p = data;
|
|
|
|
nxt_debug(task, "conn proxy close fd:%d", source->socket.fd);
|
|
|
|
sink = (source == p->client) ? p->peer : p->client;
|
|
|
|
if (sink->write == NULL) {
|
|
nxt_conn_proxy_shutdown(task, p, source, sink);
|
|
return;
|
|
}
|
|
|
|
b = nxt_buf_sync_alloc(source->mem_pool, 0);
|
|
if (b == NULL) {
|
|
/* An error completion. */
|
|
nxt_conn_proxy_complete(task, p);
|
|
return;
|
|
}
|
|
|
|
nxt_buf_chain_add(&sink->write, b);
|
|
}
|
|
|
|
|
|
static void
|
|
nxt_conn_proxy_error(nxt_task_t *task, void *obj, void *data)
|
|
{
|
|
nxt_conn_t *c;
|
|
nxt_conn_proxy_t *p;
|
|
|
|
c = obj;
|
|
p = data;
|
|
|
|
nxt_debug(task, "conn proxy error fd:%d", c->socket.fd);
|
|
|
|
nxt_conn_proxy_close(task, c, p);
|
|
}
|
|
|
|
|
|
static void
|
|
nxt_conn_proxy_read_timeout(nxt_task_t *task, void *obj, void *data)
|
|
{
|
|
nxt_conn_t *c;
|
|
nxt_timer_t *timer;
|
|
|
|
timer = obj;
|
|
|
|
c = nxt_read_timer_conn(timer);
|
|
c->socket.timedout = 1;
|
|
c->socket.closed = 1;
|
|
|
|
nxt_debug(task, "conn proxy read timeout fd:%d", c->socket.fd);
|
|
|
|
nxt_conn_proxy_close(task, c, c->socket.data);
|
|
}
|
|
|
|
|
|
static void
|
|
nxt_conn_proxy_write_timeout(nxt_task_t *task, void *obj, void *data)
|
|
{
|
|
nxt_conn_t *c;
|
|
nxt_timer_t *timer;
|
|
|
|
timer = obj;
|
|
|
|
c = nxt_write_timer_conn(timer);
|
|
c->socket.timedout = 1;
|
|
c->socket.closed = 1;
|
|
|
|
nxt_debug(task, "conn proxy write timeout fd:%d", c->socket.fd);
|
|
|
|
nxt_conn_proxy_close(task, c, c->socket.data);
|
|
}
|
|
|
|
|
|
static nxt_msec_t
|
|
nxt_conn_proxy_timeout_value(nxt_conn_t *c, uintptr_t data)
|
|
{
|
|
return nxt_value_at(nxt_msec_t, c->socket.data, data);
|
|
}
|
|
|
|
|
|
static void
|
|
nxt_conn_proxy_refused(nxt_task_t *task, void *obj, void *data)
|
|
{
|
|
nxt_conn_t *peer;
|
|
nxt_conn_proxy_t *p;
|
|
|
|
peer = obj;
|
|
p = data;
|
|
|
|
nxt_debug(task, "conn proxy refused fd:%d", peer->socket.fd);
|
|
|
|
if (p->retries == 0) {
|
|
/* An error completion. */
|
|
nxt_conn_proxy_complete(task, p);
|
|
return;
|
|
}
|
|
|
|
p->retries--;
|
|
|
|
nxt_socket_close(task, peer->socket.fd);
|
|
peer->socket.fd = -1;
|
|
peer->socket.error = 0;
|
|
|
|
p->delayed = 1;
|
|
|
|
peer->write_timer.handler = nxt_conn_proxy_reconnect_handler;
|
|
nxt_timer_add(task->thread->engine, &peer->write_timer,
|
|
p->reconnect_timeout);
|
|
}
|
|
|
|
|
|
static void
|
|
nxt_conn_proxy_reconnect_handler(nxt_task_t *task, void *obj, void *data)
|
|
{
|
|
nxt_conn_t *peer;
|
|
nxt_timer_t *timer;
|
|
nxt_conn_proxy_t *p;
|
|
|
|
timer = obj;
|
|
|
|
nxt_debug(task, "conn proxy reconnect timer");
|
|
|
|
peer = nxt_write_timer_conn(timer);
|
|
p = peer->socket.data;
|
|
|
|
if (p->client->socket.closed) {
|
|
nxt_conn_proxy_complete(task, p);
|
|
return;
|
|
}
|
|
|
|
p->delayed = 0;
|
|
|
|
peer->write_state = &nxt_conn_proxy_peer_connect_state;
|
|
/*
|
|
* Peer read event: disabled.
|
|
* Peer write event: waiting for connection with connect_timeout.
|
|
*/
|
|
nxt_conn_connect(task->thread->engine, peer);
|
|
}
|
|
|
|
|
|
static void
|
|
nxt_conn_proxy_shutdown(nxt_task_t *task, nxt_conn_proxy_t *p,
|
|
nxt_conn_t *source, nxt_conn_t *sink)
|
|
{
|
|
nxt_buf_t *b;
|
|
|
|
nxt_debug(source->socket.task,
|
|
"conn proxy shutdown source fd:%d cl:%d err:%d",
|
|
source->socket.fd, source->socket.closed, source->socket.error);
|
|
|
|
nxt_debug(sink->socket.task,
|
|
"conn proxy shutdown sink fd:%d cl:%d err:%d",
|
|
sink->socket.fd, sink->socket.closed, sink->socket.error);
|
|
|
|
if (!p->connected || p->delayed) {
|
|
nxt_conn_proxy_complete(task, p);
|
|
return;
|
|
}
|
|
|
|
if (sink->socket.error == 0 && !sink->socket.closed) {
|
|
sink->socket.shutdown = 1;
|
|
nxt_socket_shutdown(task, sink->socket.fd, SHUT_WR);
|
|
}
|
|
|
|
if (sink->socket.error != 0
|
|
|| (sink->socket.closed && source->write == NULL))
|
|
{
|
|
/* The opposite direction also has been already closed. */
|
|
nxt_conn_proxy_complete(task, p);
|
|
return;
|
|
}
|
|
|
|
nxt_debug(source->socket.task, "free source buffer");
|
|
|
|
/* Free the direction's buffer. */
|
|
b = (source == p->client) ? p->client_buffer : p->peer_buffer;
|
|
nxt_mp_free(source->mem_pool, b);
|
|
}
|
|
|
|
|
|
static void
|
|
nxt_conn_proxy_read_error(nxt_task_t *task, void *obj, void *data)
|
|
{
|
|
nxt_conn_t *c;
|
|
nxt_conn_proxy_t *p;
|
|
|
|
c = obj;
|
|
p = data;
|
|
|
|
nxt_debug(task, "conn proxy read error fd:%d", c->socket.fd);
|
|
|
|
nxt_conn_proxy_close(task, c, p);
|
|
}
|
|
|
|
|
|
static void
|
|
nxt_conn_proxy_write_error(nxt_task_t *task, void *obj, void *data)
|
|
{
|
|
nxt_conn_t *source, *sink;
|
|
nxt_conn_proxy_t *p;
|
|
|
|
sink = obj;
|
|
p = data;
|
|
|
|
nxt_debug(task, "conn proxy write error fd:%d", sink->socket.fd);
|
|
|
|
/* Clear data for the direction sink. */
|
|
sink->write = NULL;
|
|
|
|
/* Block the direction source. */
|
|
source = (sink == p->client) ? p->peer : p->client;
|
|
nxt_fd_event_block_read(task->thread->engine, &source->socket);
|
|
|
|
if (source->write == NULL) {
|
|
/*
|
|
* There is no data for the opposite direction and
|
|
* the next read from the sink will most probably fail.
|
|
*/
|
|
nxt_conn_proxy_complete(task, p);
|
|
}
|
|
}
|
|
|
|
|
|
static const nxt_conn_state_t nxt_conn_proxy_close_state
|
|
nxt_aligned(64) =
|
|
{
|
|
.ready_handler = nxt_conn_proxy_completion,
|
|
};
|
|
|
|
|
|
static void
|
|
nxt_conn_proxy_complete(nxt_task_t *task, nxt_conn_proxy_t *p)
|
|
{
|
|
nxt_event_engine_t *engine;
|
|
|
|
engine = task->thread->engine;
|
|
|
|
nxt_debug(p->client->socket.task, "conn proxy complete %d:%d",
|
|
p->client->socket.fd, p->peer->socket.fd);
|
|
|
|
if (p->delayed) {
|
|
p->delayed = 0;
|
|
nxt_queue_remove(&p->peer->link);
|
|
}
|
|
|
|
if (p->client->socket.fd != -1) {
|
|
p->retain = 1;
|
|
p->client->write_state = &nxt_conn_proxy_close_state;
|
|
nxt_conn_close(engine, p->client);
|
|
}
|
|
|
|
if (p->peer->socket.fd != -1) {
|
|
p->retain++;
|
|
p->peer->write_state = &nxt_conn_proxy_close_state;
|
|
nxt_conn_close(engine, p->peer);
|
|
}
|
|
}
|
|
|
|
|
|
static void
|
|
nxt_conn_proxy_completion(nxt_task_t *task, void *obj, void *data)
|
|
{
|
|
nxt_conn_proxy_t *p;
|
|
|
|
p = data;
|
|
|
|
nxt_debug(p->client->socket.task, "conn proxy completion %d:%d:%d",
|
|
p->retain, p->client->socket.fd, p->peer->socket.fd);
|
|
|
|
p->retain--;
|
|
|
|
if (p->retain == 0) {
|
|
nxt_mp_free(p->client->mem_pool, p->client_buffer);
|
|
nxt_mp_free(p->client->mem_pool, p->peer_buffer);
|
|
|
|
p->completion_handler(task, p, NULL);
|
|
}
|
|
}
|