Improving port message fragment recognition.
This is required to assemble fragmented messages correctly. Stream identifier is unique only for messages generated within a process, but the (stream, pid) pair should be enough to avoid collisions. Adding reply_port seems redundant because it's enough to add stream to a pid. This closes #199 issue on GitHub. Thanks to 洪志道 (Hong Zhi Dao).
This commit is contained in:
@@ -638,15 +638,24 @@ nxt_port_read_handler(nxt_task_t *task, void *obj, void *data)
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
uint32_t stream;
|
||||||
|
uint32_t pid;
|
||||||
|
} nxt_port_frag_key_t;
|
||||||
|
|
||||||
|
|
||||||
static nxt_int_t
|
static nxt_int_t
|
||||||
nxt_port_lvlhsh_frag_test(nxt_lvlhsh_query_t *lhq, void *data)
|
nxt_port_lvlhsh_frag_test(nxt_lvlhsh_query_t *lhq, void *data)
|
||||||
{
|
{
|
||||||
nxt_port_recv_msg_t *fmsg;
|
nxt_port_recv_msg_t *fmsg;
|
||||||
|
nxt_port_frag_key_t *frag_key;
|
||||||
|
|
||||||
fmsg = data;
|
fmsg = data;
|
||||||
|
frag_key = (nxt_port_frag_key_t *) lhq->key.start;
|
||||||
|
|
||||||
if (lhq->key.length == sizeof(uint32_t)
|
if (lhq->key.length == sizeof(nxt_port_frag_key_t)
|
||||||
&& *(uint32_t *) lhq->key.start == fmsg->port_msg.stream)
|
&& frag_key->stream == fmsg->port_msg.stream
|
||||||
|
&& frag_key->pid == (uint32_t) fmsg->port_msg.pid)
|
||||||
{
|
{
|
||||||
return NXT_OK;
|
return NXT_OK;
|
||||||
}
|
}
|
||||||
@@ -684,6 +693,7 @@ nxt_port_frag_start(nxt_task_t *task, nxt_port_t *port,
|
|||||||
nxt_int_t res;
|
nxt_int_t res;
|
||||||
nxt_lvlhsh_query_t lhq;
|
nxt_lvlhsh_query_t lhq;
|
||||||
nxt_port_recv_msg_t *fmsg;
|
nxt_port_recv_msg_t *fmsg;
|
||||||
|
nxt_port_frag_key_t frag_key;
|
||||||
|
|
||||||
nxt_debug(task, "start frag stream #%uD", msg->port_msg.stream);
|
nxt_debug(task, "start frag stream #%uD", msg->port_msg.stream);
|
||||||
|
|
||||||
@@ -695,9 +705,12 @@ nxt_port_frag_start(nxt_task_t *task, nxt_port_t *port,
|
|||||||
|
|
||||||
*fmsg = *msg;
|
*fmsg = *msg;
|
||||||
|
|
||||||
lhq.key_hash = nxt_murmur_hash2(&fmsg->port_msg.stream, sizeof(uint32_t));
|
frag_key.stream = fmsg->port_msg.stream;
|
||||||
lhq.key.length = sizeof(uint32_t);
|
frag_key.pid = fmsg->port_msg.pid;
|
||||||
lhq.key.start = (u_char *) &fmsg->port_msg.stream;
|
|
||||||
|
lhq.key_hash = nxt_murmur_hash2(&frag_key, sizeof(nxt_port_frag_key_t));
|
||||||
|
lhq.key.length = sizeof(nxt_port_frag_key_t);
|
||||||
|
lhq.key.start = (u_char *) &frag_key;
|
||||||
lhq.proto = &lvlhsh_frag_proto;
|
lhq.proto = &lvlhsh_frag_proto;
|
||||||
lhq.replace = 0;
|
lhq.replace = 0;
|
||||||
lhq.value = fmsg;
|
lhq.value = fmsg;
|
||||||
@@ -730,17 +743,24 @@ nxt_port_frag_start(nxt_task_t *task, nxt_port_t *port,
|
|||||||
|
|
||||||
|
|
||||||
static nxt_port_recv_msg_t *
|
static nxt_port_recv_msg_t *
|
||||||
nxt_port_frag_find(nxt_task_t *task, nxt_port_t *port, uint32_t stream,
|
nxt_port_frag_find(nxt_task_t *task, nxt_port_t *port, nxt_port_recv_msg_t *msg)
|
||||||
nxt_bool_t last)
|
|
||||||
{
|
{
|
||||||
nxt_int_t res;
|
nxt_int_t res;
|
||||||
|
nxt_bool_t last;
|
||||||
nxt_lvlhsh_query_t lhq;
|
nxt_lvlhsh_query_t lhq;
|
||||||
|
nxt_port_frag_key_t frag_key;
|
||||||
|
|
||||||
nxt_debug(task, "%s frag stream #%uD", last ? "last" : "next", stream);
|
last = msg->port_msg.mf == 0;
|
||||||
|
|
||||||
lhq.key_hash = nxt_murmur_hash2(&stream, sizeof(uint32_t));
|
nxt_debug(task, "%s frag stream #%uD", last ? "last" : "next",
|
||||||
lhq.key.length = sizeof(uint32_t);
|
msg->port_msg.stream);
|
||||||
lhq.key.start = (u_char *) &stream;
|
|
||||||
|
frag_key.stream = msg->port_msg.stream;
|
||||||
|
frag_key.pid = msg->port_msg.pid;
|
||||||
|
|
||||||
|
lhq.key_hash = nxt_murmur_hash2(&frag_key, sizeof(nxt_port_frag_key_t));
|
||||||
|
lhq.key.length = sizeof(nxt_port_frag_key_t);
|
||||||
|
lhq.key.start = (u_char *) &frag_key;
|
||||||
lhq.proto = &lvlhsh_frag_proto;
|
lhq.proto = &lvlhsh_frag_proto;
|
||||||
lhq.pool = port->mem_pool;
|
lhq.pool = port->mem_pool;
|
||||||
|
|
||||||
@@ -753,7 +773,8 @@ nxt_port_frag_find(nxt_task_t *task, nxt_port_t *port, uint32_t stream,
|
|||||||
return lhq.value;
|
return lhq.value;
|
||||||
|
|
||||||
default:
|
default:
|
||||||
nxt_log(task, NXT_LOG_INFO, "frag stream #%uD not found", stream);
|
nxt_log(task, NXT_LOG_INFO, "frag stream #%uD not found",
|
||||||
|
frag_key.stream);
|
||||||
|
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
@@ -793,8 +814,7 @@ nxt_port_read_msg_process(nxt_task_t *task, nxt_port_t *port,
|
|||||||
|
|
||||||
if (nxt_slow_path(msg->port_msg.nf != 0)) {
|
if (nxt_slow_path(msg->port_msg.nf != 0)) {
|
||||||
|
|
||||||
fmsg = nxt_port_frag_find(task, port, msg->port_msg.stream,
|
fmsg = nxt_port_frag_find(task, port, msg);
|
||||||
msg->port_msg.mf == 0);
|
|
||||||
|
|
||||||
if (nxt_slow_path(fmsg == NULL)) {
|
if (nxt_slow_path(fmsg == NULL)) {
|
||||||
goto fmsg_failed;
|
goto fmsg_failed;
|
||||||
|
|||||||
Reference in New Issue
Block a user