Scripted change: $ grep -ril recevied src/ | xargs sed -i s/recevied/received/ Reported-by: <https://github.com/jeffdafoe> Closes: <https://github.com/nginx/unit/issues/920> Cc: <https://github.com/meezaan> Cc: Timo Stark <t.stark@nginx.com> Signed-off-by: Alejandro Colomar <alx@nginx.com> Reviewed-by: Andrew Clayton <a.clayton@nginx.com> Signed-off-by: Andrew Clayton <a.clayton@nginx.com>
538 lines
12 KiB
C
538 lines
12 KiB
C
|
|
/*
|
|
* Copyright (C) Igor Sysoev
|
|
* Copyright (C) NGINX, Inc.
|
|
*/
|
|
|
|
#include <nxt_main.h>
|
|
|
|
|
|
static void nxt_conn_write_timer_handler(nxt_task_t *task, void *obj,
|
|
void *data);
|
|
static ssize_t nxt_conn_io_sendfile(nxt_task_t *task, nxt_sendbuf_t *sb);
|
|
static ssize_t nxt_sendfile(int fd, int s, off_t pos, size_t size);
|
|
|
|
|
|
void
|
|
nxt_conn_io_write(nxt_task_t *task, void *obj, void *data)
|
|
{
|
|
ssize_t ret;
|
|
nxt_buf_t *b;
|
|
nxt_conn_t *c;
|
|
nxt_sendbuf_t sb;
|
|
nxt_event_engine_t *engine;
|
|
|
|
c = obj;
|
|
|
|
nxt_debug(task, "conn write fd:%d er:%d bl:%d",
|
|
c->socket.fd, c->socket.error, c->block_write);
|
|
|
|
if (c->socket.error != 0 || c->block_write) {
|
|
goto error;
|
|
}
|
|
|
|
if (!c->socket.write_ready || c->write == NULL) {
|
|
return;
|
|
}
|
|
|
|
engine = task->thread->engine;
|
|
|
|
c->socket.write_handler = nxt_conn_io_write;
|
|
c->socket.error_handler = c->write_state->error_handler;
|
|
|
|
b = c->write;
|
|
|
|
sb.socket = c->socket.fd;
|
|
sb.error = 0;
|
|
sb.sent = 0;
|
|
sb.size = 0;
|
|
sb.buf = b;
|
|
#if (NXT_TLS)
|
|
sb.tls = c->u.tls;
|
|
#endif
|
|
sb.limit = 10 * 1024 * 1024;
|
|
sb.ready = 1;
|
|
sb.sync = 0;
|
|
|
|
do {
|
|
ret = c->io->sendbuf(task, &sb);
|
|
|
|
c->socket.write_ready = sb.ready;
|
|
c->socket.error = sb.error;
|
|
|
|
if (ret < 0) {
|
|
/* ret == NXT_AGAIN || ret == NXT_ERROR. */
|
|
break;
|
|
}
|
|
|
|
sb.sent += ret;
|
|
sb.limit -= ret;
|
|
|
|
b = nxt_sendbuf_update(b, ret);
|
|
|
|
if (b == NULL) {
|
|
nxt_fd_event_block_write(engine, &c->socket);
|
|
break;
|
|
}
|
|
|
|
sb.buf = b;
|
|
|
|
if (!c->socket.write_ready) {
|
|
ret = NXT_AGAIN;
|
|
break;
|
|
}
|
|
|
|
} while (sb.limit != 0);
|
|
|
|
nxt_debug(task, "event conn: %z sent:%O", ret, sb.sent);
|
|
|
|
if (sb.sent != 0) {
|
|
if (c->write_state->timer_autoreset) {
|
|
nxt_timer_disable(engine, &c->write_timer);
|
|
}
|
|
}
|
|
|
|
if (ret != NXT_ERROR) {
|
|
|
|
if (sb.limit == 0) {
|
|
/*
|
|
* Postpone writing until next event poll to allow to
|
|
* process other received events and to get new events.
|
|
*/
|
|
c->write_timer.handler = nxt_conn_write_timer_handler;
|
|
nxt_timer_add(engine, &c->write_timer, 0);
|
|
|
|
} else if (ret == NXT_AGAIN) {
|
|
/*
|
|
* SSL libraries can require to toggle either write or read
|
|
* event if renegotiation occurs during SSL write operation.
|
|
* This case is handled on the c->io->send() level. Timer
|
|
* can be set here because it should be set only for write
|
|
* direction.
|
|
*/
|
|
nxt_conn_timer(engine, c, c->write_state, &c->write_timer);
|
|
|
|
if (nxt_fd_event_is_disabled(c->socket.write)) {
|
|
nxt_fd_event_enable_write(engine, &c->socket);
|
|
}
|
|
}
|
|
}
|
|
|
|
if (ret == 0 || sb.sent != 0) {
|
|
/*
|
|
* ret == 0 means a sync buffer was processed.
|
|
* ret == NXT_ERROR is ignored here if some data was sent,
|
|
* the error will be handled on the next nxt_conn_write() call.
|
|
*/
|
|
c->sent += sb.sent;
|
|
nxt_work_queue_add(c->write_work_queue, c->write_state->ready_handler,
|
|
task, c, data);
|
|
return;
|
|
}
|
|
|
|
if (ret != NXT_ERROR) {
|
|
return;
|
|
}
|
|
|
|
nxt_fd_event_block_write(engine, &c->socket);
|
|
|
|
error:
|
|
|
|
nxt_work_queue_add(c->write_work_queue, c->write_state->error_handler,
|
|
task, c, data);
|
|
}
|
|
|
|
|
|
static void
|
|
nxt_conn_write_timer_handler(nxt_task_t *task, void *obj, void *data)
|
|
{
|
|
nxt_conn_t *c;
|
|
nxt_timer_t *timer;
|
|
|
|
timer = obj;
|
|
|
|
nxt_debug(task, "conn write timer");
|
|
|
|
c = nxt_write_timer_conn(timer);
|
|
c->delayed = 0;
|
|
|
|
c->io->write(task, c, c->socket.data);
|
|
}
|
|
|
|
|
|
ssize_t
|
|
nxt_conn_io_sendbuf(nxt_task_t *task, nxt_sendbuf_t *sb)
|
|
{
|
|
nxt_uint_t niov;
|
|
struct iovec iov[NXT_IOBUF_MAX];
|
|
|
|
niov = nxt_sendbuf_mem_coalesce0(task, sb, iov, NXT_IOBUF_MAX);
|
|
|
|
if (niov == 0 && sb->sync) {
|
|
return 0;
|
|
}
|
|
|
|
if (niov == 0 && nxt_buf_is_file(sb->buf)) {
|
|
return nxt_conn_io_sendfile(task, sb);
|
|
}
|
|
|
|
return nxt_conn_io_writev(task, sb, iov, niov);
|
|
}
|
|
|
|
|
|
static ssize_t
|
|
nxt_conn_io_sendfile(nxt_task_t *task, nxt_sendbuf_t *sb)
|
|
{
|
|
size_t size;
|
|
ssize_t n;
|
|
nxt_buf_t *b;
|
|
nxt_err_t err;
|
|
|
|
b = sb->buf;
|
|
|
|
for ( ;; ) {
|
|
size = b->file_end - b->file_pos;
|
|
|
|
n = nxt_sendfile(b->file->fd, sb->socket, b->file_pos, size);
|
|
|
|
err = (n == -1) ? nxt_errno : 0;
|
|
|
|
nxt_debug(task, "sendfile(%FD, %d, @%O, %uz): %z",
|
|
b->file->fd, sb->socket, b->file_pos, size, n);
|
|
|
|
if (n > 0) {
|
|
if (n < (ssize_t) size) {
|
|
sb->ready = 0;
|
|
}
|
|
|
|
return n;
|
|
}
|
|
|
|
if (nxt_slow_path(n == 0)) {
|
|
nxt_alert(task, "sendfile() reported that file was truncated at %O",
|
|
b->file_pos);
|
|
|
|
return NXT_ERROR;
|
|
}
|
|
|
|
/* n == -1 */
|
|
|
|
switch (err) {
|
|
|
|
case NXT_EAGAIN:
|
|
sb->ready = 0;
|
|
nxt_debug(task, "sendfile() %E", err);
|
|
|
|
return NXT_AGAIN;
|
|
|
|
case NXT_EINTR:
|
|
nxt_debug(task, "sendfile() %E", err);
|
|
continue;
|
|
|
|
default:
|
|
sb->error = err;
|
|
nxt_log(task, nxt_socket_error_level(err),
|
|
"sendfile(%FD, %d, @%O, %uz) failed %E",
|
|
b->file->fd, sb->socket, b->file_pos, size, err);
|
|
|
|
return NXT_ERROR;
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
static ssize_t
|
|
nxt_sendfile(int fd, int s, off_t pos, size_t size)
|
|
{
|
|
ssize_t res;
|
|
|
|
#if (NXT_HAVE_MACOSX_SENDFILE)
|
|
|
|
off_t sent = size;
|
|
|
|
int rc = sendfile(fd, s, pos, &sent, NULL, 0);
|
|
|
|
res = (rc == 0 || sent > 0) ? sent : -1;
|
|
|
|
#elif (NXT_HAVE_FREEBSD_SENDFILE)
|
|
|
|
off_t sent = 0;
|
|
|
|
int rc = sendfile(fd, s, pos, size, NULL, &sent, 0);
|
|
|
|
res = (rc == 0 || sent > 0) ? sent : -1;
|
|
|
|
#elif (NXT_HAVE_LINUX_SENDFILE)
|
|
|
|
res = sendfile(s, fd, &pos, size);
|
|
|
|
#else
|
|
|
|
int err;
|
|
void *map;
|
|
off_t page_off;
|
|
|
|
page_off = pos % nxt_pagesize;
|
|
|
|
map = nxt_mem_mmap(NULL, size + page_off, PROT_READ, MAP_SHARED, fd,
|
|
pos - page_off);
|
|
if (nxt_slow_path(map == MAP_FAILED)) {
|
|
return -1;
|
|
}
|
|
|
|
res = write(s, nxt_pointer_to(map, page_off), size);
|
|
|
|
/* Backup and restore errno to catch socket errors in the upper level. */
|
|
err = errno;
|
|
nxt_mem_munmap(map, size + page_off);
|
|
errno = err;
|
|
|
|
#endif
|
|
|
|
return res;
|
|
}
|
|
|
|
|
|
ssize_t
|
|
nxt_conn_io_writev(nxt_task_t *task, nxt_sendbuf_t *sb, struct iovec *iov,
|
|
nxt_uint_t niov)
|
|
{
|
|
ssize_t n;
|
|
nxt_err_t err;
|
|
|
|
if (niov == 1) {
|
|
/* Disposal of surplus kernel iovec copy-in operation. */
|
|
return nxt_conn_io_send(task, sb, iov[0].iov_base, iov[0].iov_len);
|
|
}
|
|
|
|
for ( ;; ) {
|
|
n = writev(sb->socket, iov, niov);
|
|
|
|
err = (n == -1) ? nxt_socket_errno : 0;
|
|
|
|
nxt_debug(task, "writev(%d, %ui): %z", sb->socket, niov, n);
|
|
|
|
if (n > 0) {
|
|
return n;
|
|
}
|
|
|
|
/* n == -1 */
|
|
|
|
switch (err) {
|
|
|
|
case NXT_EAGAIN:
|
|
sb->ready = 0;
|
|
nxt_debug(task, "writev() %E", err);
|
|
|
|
return NXT_AGAIN;
|
|
|
|
case NXT_EINTR:
|
|
nxt_debug(task, "writev() %E", err);
|
|
continue;
|
|
|
|
default:
|
|
sb->error = err;
|
|
nxt_log(task, nxt_socket_error_level(err),
|
|
"writev(%d, %ui) failed %E", sb->socket, niov, err);
|
|
|
|
return NXT_ERROR;
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
ssize_t
|
|
nxt_conn_io_send(nxt_task_t *task, nxt_sendbuf_t *sb, void *buf, size_t size)
|
|
{
|
|
ssize_t n;
|
|
nxt_err_t err;
|
|
|
|
for ( ;; ) {
|
|
n = send(sb->socket, buf, size, 0);
|
|
|
|
err = (n == -1) ? nxt_socket_errno : 0;
|
|
|
|
nxt_debug(task, "send(%d, %p, %uz): %z", sb->socket, buf, size, n);
|
|
|
|
if (n > 0) {
|
|
return n;
|
|
}
|
|
|
|
/* n == -1 */
|
|
|
|
switch (err) {
|
|
|
|
case NXT_EAGAIN:
|
|
sb->ready = 0;
|
|
nxt_debug(task, "send() %E", err);
|
|
|
|
return NXT_AGAIN;
|
|
|
|
case NXT_EINTR:
|
|
nxt_debug(task, "send() %E", err);
|
|
continue;
|
|
|
|
default:
|
|
sb->error = err;
|
|
nxt_log(task, nxt_socket_error_level(err),
|
|
"send(%d, %p, %uz) failed %E", sb->socket, buf, size, err);
|
|
|
|
return NXT_ERROR;
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
/* Obsolete interfaces. */
|
|
|
|
size_t
|
|
nxt_event_conn_write_limit(nxt_conn_t *c)
|
|
{
|
|
ssize_t limit, correction;
|
|
nxt_event_write_rate_t *rate;
|
|
|
|
rate = c->rate;
|
|
|
|
if (rate == NULL) {
|
|
return c->max_chunk;
|
|
}
|
|
|
|
limit = rate->limit;
|
|
correction = limit - (size_t) rate->average;
|
|
|
|
nxt_debug(c->socket.task, "event conn correction:%z average:%0.3f",
|
|
correction, rate->average);
|
|
|
|
limit += correction;
|
|
|
|
if (limit <= 0) {
|
|
return 0;
|
|
}
|
|
|
|
if (rate->limit_after != 0) {
|
|
limit += rate->limit_after;
|
|
limit = nxt_min((size_t) limit, rate->max_limit);
|
|
}
|
|
|
|
return nxt_min((size_t) limit, c->max_chunk);
|
|
}
|
|
|
|
|
|
nxt_bool_t
|
|
nxt_event_conn_write_delayed(nxt_event_engine_t *engine, nxt_conn_t *c,
|
|
size_t sent)
|
|
{
|
|
return 0;
|
|
}
|
|
|
|
|
|
ssize_t
|
|
nxt_event_conn_io_sendbuf(nxt_conn_t *c, nxt_buf_t *b, size_t limit)
|
|
{
|
|
nxt_uint_t niob;
|
|
struct iovec iob[NXT_IOBUF_MAX];
|
|
nxt_sendbuf_coalesce_t sb;
|
|
|
|
sb.buf = b;
|
|
sb.iobuf = iob;
|
|
sb.nmax = NXT_IOBUF_MAX;
|
|
sb.sync = 0;
|
|
sb.size = 0;
|
|
sb.limit = limit;
|
|
|
|
niob = nxt_sendbuf_mem_coalesce(c->socket.task, &sb);
|
|
|
|
if (niob == 0 && sb.sync) {
|
|
return 0;
|
|
}
|
|
|
|
return nxt_event_conn_io_writev(c, iob, niob);
|
|
}
|
|
|
|
|
|
ssize_t
|
|
nxt_event_conn_io_writev(nxt_conn_t *c, nxt_iobuf_t *iob, nxt_uint_t niob)
|
|
{
|
|
ssize_t n;
|
|
nxt_err_t err;
|
|
|
|
if (niob == 1) {
|
|
/* Disposal of surplus kernel iovec copy-in operation. */
|
|
return nxt_event_conn_io_send(c, iob->iov_base, iob->iov_len);
|
|
}
|
|
|
|
for ( ;; ) {
|
|
n = writev(c->socket.fd, iob, niob);
|
|
|
|
err = (n == -1) ? nxt_socket_errno : 0;
|
|
|
|
nxt_debug(c->socket.task, "writev(%d, %ui): %z", c->socket.fd, niob, n);
|
|
|
|
if (n > 0) {
|
|
return n;
|
|
}
|
|
|
|
/* n == -1 */
|
|
|
|
switch (err) {
|
|
|
|
case NXT_EAGAIN:
|
|
nxt_debug(c->socket.task, "writev() %E", err);
|
|
c->socket.write_ready = 0;
|
|
return NXT_AGAIN;
|
|
|
|
case NXT_EINTR:
|
|
nxt_debug(c->socket.task, "writev() %E", err);
|
|
continue;
|
|
|
|
default:
|
|
c->socket.error = err;
|
|
nxt_log(c->socket.task, nxt_socket_error_level(err),
|
|
"writev(%d, %ui) failed %E", c->socket.fd, niob, err);
|
|
return NXT_ERROR;
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
ssize_t
|
|
nxt_event_conn_io_send(nxt_conn_t *c, void *buf, size_t size)
|
|
{
|
|
ssize_t n;
|
|
nxt_err_t err;
|
|
|
|
for ( ;; ) {
|
|
n = send(c->socket.fd, buf, size, 0);
|
|
|
|
err = (n == -1) ? nxt_socket_errno : 0;
|
|
|
|
nxt_debug(c->socket.task, "send(%d, %p, %uz): %z",
|
|
c->socket.fd, buf, size, n);
|
|
|
|
if (n > 0) {
|
|
return n;
|
|
}
|
|
|
|
/* n == -1 */
|
|
|
|
switch (err) {
|
|
|
|
case NXT_EAGAIN:
|
|
nxt_debug(c->socket.task, "send() %E", err);
|
|
c->socket.write_ready = 0;
|
|
return NXT_AGAIN;
|
|
|
|
case NXT_EINTR:
|
|
nxt_debug(c->socket.task, "send() %E", err);
|
|
continue;
|
|
|
|
default:
|
|
c->socket.error = err;
|
|
nxt_log(c->socket.task, nxt_socket_error_level(err),
|
|
"send(%d, %p, %uz) failed %E",
|
|
c->socket.fd, buf, size, err);
|
|
return NXT_ERROR;
|
|
}
|
|
}
|
|
}
|