Implementing the ability to cancel request before worker starts processing it.
This commit is contained in:
@@ -163,7 +163,7 @@ nxt_port_mmap_buf_completion(nxt_task_t *task, void *obj, void *data)
|
||||
b->is_port_mmap_sent, hdr->src_pid, hdr->dst_pid, hdr->id, c);
|
||||
|
||||
while (p < b->mem.end) {
|
||||
nxt_port_mmap_set_chunk_free(hdr, c);
|
||||
nxt_port_mmap_set_chunk_free(hdr->free_map, c);
|
||||
|
||||
p += PORT_MMAP_CHUNK_SIZE;
|
||||
c++;
|
||||
@@ -253,11 +253,12 @@ fail:
|
||||
|
||||
static nxt_port_mmap_handler_t *
|
||||
nxt_port_new_port_mmap(nxt_task_t *task, nxt_process_t *process,
|
||||
nxt_port_t *port)
|
||||
nxt_port_t *port, nxt_bool_t tracking)
|
||||
{
|
||||
void *mem;
|
||||
u_char *p, name[64];
|
||||
nxt_fd_t fd;
|
||||
nxt_free_map_t *free_map;
|
||||
nxt_port_mmap_t *port_mmap;
|
||||
nxt_port_mmap_header_t *hdr;
|
||||
nxt_port_mmap_handler_t *mmap_handler;
|
||||
@@ -342,6 +343,7 @@ nxt_port_new_port_mmap(nxt_task_t *task, nxt_process_t *process,
|
||||
hdr = mmap_handler->hdr;
|
||||
|
||||
nxt_memset(hdr->free_map, 0xFFU, sizeof(hdr->free_map));
|
||||
nxt_memset(hdr->free_tracking_map, 0xFFU, sizeof(hdr->free_tracking_map));
|
||||
|
||||
hdr->id = process->outgoing.size - 1;
|
||||
hdr->src_pid = nxt_pid;
|
||||
@@ -349,10 +351,13 @@ nxt_port_new_port_mmap(nxt_task_t *task, nxt_process_t *process,
|
||||
hdr->sent_over = port->id;
|
||||
|
||||
/* Mark first chunk as busy */
|
||||
nxt_port_mmap_set_chunk_busy(hdr, 0);
|
||||
free_map = tracking ? hdr->free_tracking_map : hdr->free_map;
|
||||
|
||||
nxt_port_mmap_set_chunk_busy(free_map, 0);
|
||||
|
||||
/* Mark as busy chunk followed the last available chunk. */
|
||||
nxt_port_mmap_set_chunk_busy(hdr, PORT_MMAP_CHUNK_COUNT);
|
||||
nxt_port_mmap_set_chunk_busy(hdr->free_map, PORT_MMAP_CHUNK_COUNT);
|
||||
nxt_port_mmap_set_chunk_busy(hdr->free_tracking_map, PORT_MMAP_CHUNK_COUNT);
|
||||
|
||||
nxt_debug(task, "send mmap fd %FD to process %PI", fd, port->pid);
|
||||
|
||||
@@ -376,9 +381,10 @@ remove_fail:
|
||||
|
||||
static nxt_port_mmap_handler_t *
|
||||
nxt_port_mmap_get(nxt_task_t *task, nxt_port_t *port, nxt_chunk_id_t *c,
|
||||
size_t size)
|
||||
nxt_bool_t tracking)
|
||||
{
|
||||
nxt_process_t *process;
|
||||
nxt_free_map_t *free_map;
|
||||
nxt_port_mmap_t *port_mmap;
|
||||
nxt_port_mmap_t *end_port_mmap;
|
||||
nxt_port_mmap_header_t *hdr;
|
||||
@@ -406,14 +412,16 @@ nxt_port_mmap_get(nxt_task_t *task, nxt_port_t *port, nxt_chunk_id_t *c,
|
||||
continue;
|
||||
}
|
||||
|
||||
if (nxt_port_mmap_get_free_chunk(hdr, c)) {
|
||||
free_map = tracking ? hdr->free_tracking_map : hdr->free_map;
|
||||
|
||||
if (nxt_port_mmap_get_free_chunk(free_map, c)) {
|
||||
goto unlock_return;
|
||||
}
|
||||
}
|
||||
|
||||
/* TODO introduce port_mmap limit and release wait. */
|
||||
|
||||
mmap_handler = nxt_port_new_port_mmap(task, process, port);
|
||||
mmap_handler = nxt_port_new_port_mmap(task, process, port, tracking);
|
||||
|
||||
unlock_return:
|
||||
|
||||
@@ -434,12 +442,15 @@ nxt_port_get_port_incoming_mmap(nxt_task_t *task, nxt_pid_t spid, uint32_t id)
|
||||
return NULL;
|
||||
}
|
||||
|
||||
mmap_handler = NULL;
|
||||
|
||||
nxt_thread_mutex_lock(&process->incoming.mutex);
|
||||
|
||||
if (nxt_fast_path(process->incoming.size > id)) {
|
||||
mmap_handler = process->incoming.elts[id].mmap_handler;
|
||||
|
||||
} else {
|
||||
mmap_handler = NULL;
|
||||
|
||||
nxt_debug(task, "invalid incoming mmap id %uD for pid %PI", id, spid);
|
||||
}
|
||||
|
||||
nxt_thread_mutex_unlock(&process->incoming.mutex);
|
||||
@@ -448,6 +459,131 @@ nxt_port_get_port_incoming_mmap(nxt_task_t *task, nxt_pid_t spid, uint32_t id)
|
||||
}
|
||||
|
||||
|
||||
nxt_int_t
|
||||
nxt_port_mmap_get_tracking(nxt_task_t *task, nxt_port_t *port,
|
||||
nxt_port_mmap_tracking_t *tracking, uint32_t stream)
|
||||
{
|
||||
nxt_chunk_id_t c;
|
||||
nxt_port_mmap_header_t *hdr;
|
||||
nxt_port_mmap_handler_t *mmap_handler;
|
||||
|
||||
nxt_debug(task, "request tracking for stream #%uD", stream);
|
||||
|
||||
mmap_handler = nxt_port_mmap_get(task, port, &c, 1);
|
||||
if (nxt_slow_path(mmap_handler == NULL)) {
|
||||
return NXT_ERROR;
|
||||
}
|
||||
|
||||
nxt_port_mmap_handler_use(mmap_handler, 1);
|
||||
|
||||
hdr = mmap_handler->hdr;
|
||||
|
||||
tracking->mmap_handler = mmap_handler;
|
||||
tracking->tracking = hdr->tracking + c;
|
||||
|
||||
*tracking->tracking = stream;
|
||||
|
||||
nxt_debug(task, "outgoing tracking allocation: %PI->%PI,%d,%d",
|
||||
hdr->src_pid, hdr->dst_pid, hdr->id, c);
|
||||
|
||||
return NXT_OK;
|
||||
}
|
||||
|
||||
|
||||
nxt_bool_t
|
||||
nxt_port_mmap_tracking_cancel(nxt_task_t *task,
|
||||
nxt_port_mmap_tracking_t *tracking, uint32_t stream)
|
||||
{
|
||||
nxt_bool_t res;
|
||||
nxt_chunk_id_t c;
|
||||
nxt_port_mmap_header_t *hdr;
|
||||
nxt_port_mmap_handler_t *mmap_handler;
|
||||
|
||||
mmap_handler = tracking->mmap_handler;
|
||||
|
||||
if (nxt_slow_path(mmap_handler == NULL)) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
hdr = mmap_handler->hdr;
|
||||
|
||||
res = nxt_atomic_cmp_set(tracking->tracking, stream, 0);
|
||||
|
||||
nxt_debug(task, "%s tracking for stream #%uD",
|
||||
(res ? "cancelled" : "failed to cancel"), stream);
|
||||
|
||||
if (!res) {
|
||||
c = tracking->tracking - hdr->tracking;
|
||||
nxt_port_mmap_set_chunk_free(hdr->free_tracking_map, c);
|
||||
}
|
||||
|
||||
nxt_port_mmap_handler_use(mmap_handler, -1);
|
||||
|
||||
return res;
|
||||
}
|
||||
|
||||
|
||||
nxt_int_t
|
||||
nxt_port_mmap_tracking_write(uint32_t *buf, nxt_port_mmap_tracking_t *t)
|
||||
{
|
||||
nxt_atomic_t *tracking;
|
||||
nxt_port_mmap_handler_t *mmap_handler;
|
||||
|
||||
mmap_handler = t->mmap_handler;
|
||||
tracking = mmap_handler->hdr->tracking;
|
||||
|
||||
nxt_assert(t->tracking >= tracking);
|
||||
nxt_assert(t->tracking < tracking + PORT_MMAP_CHUNK_COUNT);
|
||||
|
||||
buf[0] = mmap_handler->hdr->id;
|
||||
buf[1] = t->tracking - mmap_handler->hdr->tracking;
|
||||
|
||||
return NXT_OK;
|
||||
}
|
||||
|
||||
nxt_bool_t
|
||||
nxt_port_mmap_tracking_read(nxt_task_t *task, nxt_port_recv_msg_t *msg)
|
||||
{
|
||||
nxt_buf_t *b;
|
||||
nxt_bool_t res;
|
||||
nxt_chunk_id_t c;
|
||||
nxt_port_mmap_header_t *hdr;
|
||||
nxt_port_mmap_handler_t *mmap_handler;
|
||||
nxt_port_mmap_tracking_msg_t *tracking_msg;
|
||||
|
||||
b = msg->buf;
|
||||
|
||||
if (nxt_buf_used_size(b) < (int)sizeof(nxt_port_mmap_tracking_msg_t)) {
|
||||
nxt_debug(task, "too small message %u", nxt_buf_used_size(b));
|
||||
return 0;
|
||||
}
|
||||
|
||||
tracking_msg = (nxt_port_mmap_tracking_msg_t *) b->mem.pos;
|
||||
|
||||
b->mem.pos += sizeof(nxt_port_mmap_tracking_msg_t);
|
||||
mmap_handler = nxt_port_get_port_incoming_mmap(task, msg->port_msg.pid,
|
||||
tracking_msg->mmap_id);
|
||||
|
||||
if (nxt_slow_path(mmap_handler == NULL)) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
hdr = mmap_handler->hdr;
|
||||
|
||||
c = tracking_msg->tracking_id;
|
||||
res = nxt_atomic_cmp_set(hdr->tracking + c, msg->port_msg.stream, 0);
|
||||
|
||||
nxt_debug(task, "tracking for stream #%uD %s", msg->port_msg.stream,
|
||||
(res ? "received" : "already cancelled"));
|
||||
|
||||
if (!res) {
|
||||
nxt_port_mmap_set_chunk_free(hdr->free_tracking_map, c);
|
||||
}
|
||||
|
||||
return res;
|
||||
}
|
||||
|
||||
|
||||
nxt_buf_t *
|
||||
nxt_port_mmap_get_buf(nxt_task_t *task, nxt_port_t *port, size_t size)
|
||||
{
|
||||
@@ -467,7 +603,7 @@ nxt_port_mmap_get_buf(nxt_task_t *task, nxt_port_t *port, size_t size)
|
||||
b->completion_handler = nxt_port_mmap_buf_completion;
|
||||
nxt_buf_set_port_mmap(b);
|
||||
|
||||
mmap_handler = nxt_port_mmap_get(task, port, &c, size);
|
||||
mmap_handler = nxt_port_mmap_get(task, port, &c, 0);
|
||||
if (nxt_slow_path(mmap_handler == NULL)) {
|
||||
nxt_mp_release(task->thread->engine->mem_pool, b);
|
||||
return NULL;
|
||||
@@ -499,7 +635,7 @@ nxt_port_mmap_get_buf(nxt_task_t *task, nxt_port_t *port, size_t size)
|
||||
/* Try to acquire as much chunks as required. */
|
||||
while (nchunks > 0) {
|
||||
|
||||
if (nxt_port_mmap_chk_set_chunk_busy(hdr, c) == 0) {
|
||||
if (nxt_port_mmap_chk_set_chunk_busy(hdr->free_map, c) == 0) {
|
||||
break;
|
||||
}
|
||||
|
||||
@@ -552,7 +688,7 @@ nxt_port_mmap_increase_buf(nxt_task_t *task, nxt_buf_t *b, size_t size,
|
||||
/* Try to acquire as much chunks as required. */
|
||||
while (nchunks > 0) {
|
||||
|
||||
if (nxt_port_mmap_chk_set_chunk_busy(hdr, c) == 0) {
|
||||
if (nxt_port_mmap_chk_set_chunk_busy(hdr->free_map, c) == 0) {
|
||||
break;
|
||||
}
|
||||
|
||||
@@ -565,7 +701,7 @@ nxt_port_mmap_increase_buf(nxt_task_t *task, nxt_buf_t *b, size_t size,
|
||||
{
|
||||
c--;
|
||||
while (c >= start) {
|
||||
nxt_port_mmap_set_chunk_free(hdr, c);
|
||||
nxt_port_mmap_set_chunk_free(hdr->free_map, c);
|
||||
c--;
|
||||
}
|
||||
|
||||
@@ -683,40 +819,43 @@ nxt_port_mmap_write(nxt_task_t *task, nxt_port_t *port,
|
||||
|
||||
|
||||
void
|
||||
nxt_port_mmap_read(nxt_task_t *task, nxt_port_t *port,
|
||||
nxt_port_recv_msg_t *msg)
|
||||
nxt_port_mmap_read(nxt_task_t *task, nxt_port_recv_msg_t *msg)
|
||||
{
|
||||
nxt_buf_t *b, **pb;
|
||||
nxt_port_mmap_msg_t *end, *mmap_msg;
|
||||
|
||||
b = msg->buf;
|
||||
|
||||
mmap_msg = (nxt_port_mmap_msg_t *) b->mem.pos;
|
||||
end = (nxt_port_mmap_msg_t *) b->mem.free;
|
||||
|
||||
pb = &msg->buf;
|
||||
msg->size = 0;
|
||||
|
||||
while (mmap_msg < end) {
|
||||
nxt_debug(task, "mmap_msg={%D, %D, %D} from %PI",
|
||||
mmap_msg->mmap_id, mmap_msg->chunk_id, mmap_msg->size,
|
||||
msg->port_msg.pid);
|
||||
for (b = msg->buf; b != NULL; b = b->next) {
|
||||
|
||||
*pb = nxt_port_mmap_get_incoming_buf(task, port, msg->port_msg.pid,
|
||||
mmap_msg);
|
||||
if (nxt_slow_path(*pb == NULL)) {
|
||||
nxt_log_error(NXT_LOG_ERR, task->log, "failed to get mmap buffer");
|
||||
mmap_msg = (nxt_port_mmap_msg_t *) b->mem.pos;
|
||||
end = (nxt_port_mmap_msg_t *) b->mem.free;
|
||||
|
||||
break;
|
||||
while (mmap_msg < end) {
|
||||
nxt_assert(mmap_msg + 1 <= end);
|
||||
|
||||
nxt_debug(task, "mmap_msg={%D, %D, %D} from %PI",
|
||||
mmap_msg->mmap_id, mmap_msg->chunk_id, mmap_msg->size,
|
||||
msg->port_msg.pid);
|
||||
|
||||
*pb = nxt_port_mmap_get_incoming_buf(task, msg->port,
|
||||
msg->port_msg.pid, mmap_msg);
|
||||
if (nxt_slow_path(*pb == NULL)) {
|
||||
nxt_log_error(NXT_LOG_ERR, task->log,
|
||||
"failed to get mmap buffer");
|
||||
|
||||
break;
|
||||
}
|
||||
|
||||
msg->size += mmap_msg->size;
|
||||
pb = &(*pb)->next;
|
||||
mmap_msg++;
|
||||
|
||||
/* Mark original buf as complete. */
|
||||
b->mem.pos += sizeof(nxt_port_mmap_msg_t);
|
||||
}
|
||||
|
||||
msg->size += mmap_msg->size;
|
||||
pb = &(*pb)->next;
|
||||
mmap_msg++;
|
||||
}
|
||||
|
||||
/* Mark original buf as complete. */
|
||||
b->mem.pos += nxt_buf_used_size(b);
|
||||
}
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user