Adding a reference counter to the libunit port structure.

The goal is to minimize the number of (pid, id) to port hash lookups which
require a library mutex lock.  The response port is found once per request,
while the read port is initialized at startup.
This commit is contained in:
Max Romanov
2020-08-11 19:20:06 +03:00
parent ec3389b63b
commit bf647588ff
3 changed files with 347 additions and 358 deletions

View File

@@ -15,9 +15,9 @@ static nxt_cgo_str_t *nxt_cgo_str_init(nxt_cgo_str_t *dst,
nxt_unit_sptr_t *sptr, uint32_t length); nxt_unit_sptr_t *sptr, uint32_t length);
static int nxt_cgo_add_port(nxt_unit_ctx_t *, nxt_unit_port_t *port); static int nxt_cgo_add_port(nxt_unit_ctx_t *, nxt_unit_port_t *port);
static void nxt_cgo_remove_port(nxt_unit_t *, nxt_unit_port_t *port); static void nxt_cgo_remove_port(nxt_unit_t *, nxt_unit_port_t *port);
static ssize_t nxt_cgo_port_send(nxt_unit_ctx_t *, nxt_unit_port_id_t *port_id, static ssize_t nxt_cgo_port_send(nxt_unit_ctx_t *, nxt_unit_port_t *port,
const void *buf, size_t buf_size, const void *oob, size_t oob_size); const void *buf, size_t buf_size, const void *oob, size_t oob_size);
static ssize_t nxt_cgo_port_recv(nxt_unit_ctx_t *, nxt_unit_port_id_t *port_id, static ssize_t nxt_cgo_port_recv(nxt_unit_ctx_t *, nxt_unit_port_t *port,
void *buf, size_t buf_size, void *oob, size_t oob_size); void *buf, size_t buf_size, void *oob, size_t oob_size);
static void nxt_cgo_shm_ack_handler(nxt_unit_ctx_t *ctx); static void nxt_cgo_shm_ack_handler(nxt_unit_ctx_t *ctx);
@@ -123,19 +123,19 @@ nxt_cgo_remove_port(nxt_unit_t *unit, nxt_unit_port_t *port)
static ssize_t static ssize_t
nxt_cgo_port_send(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, nxt_cgo_port_send(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
const void *buf, size_t buf_size, const void *oob, size_t oob_size) const void *buf, size_t buf_size, const void *oob, size_t oob_size)
{ {
return nxt_go_port_send(port_id->pid, port_id->id, return nxt_go_port_send(port->id.pid, port->id.id,
(void *) buf, buf_size, (void *) oob, oob_size); (void *) buf, buf_size, (void *) oob, oob_size);
} }
static ssize_t static ssize_t
nxt_cgo_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, nxt_cgo_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
void *buf, size_t buf_size, void *oob, size_t oob_size) void *buf, size_t buf_size, void *oob, size_t oob_size)
{ {
return nxt_go_port_recv(port_id->pid, port_id->id, return nxt_go_port_recv(port->id.pid, port->id.id,
buf, buf_size, oob, oob_size); buf, buf_size, oob, oob_size);
} }

File diff suppressed because it is too large Load Diff

View File

@@ -91,8 +91,7 @@ struct nxt_unit_request_info_s {
nxt_unit_t *unit; nxt_unit_t *unit;
nxt_unit_ctx_t *ctx; nxt_unit_ctx_t *ctx;
nxt_unit_port_id_t request_port; nxt_unit_port_t *response_port;
nxt_unit_port_id_t response_port;
nxt_unit_request_t *request; nxt_unit_request_t *request;
nxt_unit_buf_t *request_buf; nxt_unit_buf_t *request_buf;
@@ -142,12 +141,12 @@ struct nxt_unit_callbacks_s {
void (*shm_ack_handler)(nxt_unit_ctx_t *); void (*shm_ack_handler)(nxt_unit_ctx_t *);
/* Send data and control to process pid using port id. Optional. */ /* Send data and control to process pid using port id. Optional. */
ssize_t (*port_send)(nxt_unit_ctx_t *, nxt_unit_port_id_t *port_id, ssize_t (*port_send)(nxt_unit_ctx_t *, nxt_unit_port_t *port,
const void *buf, size_t buf_size, const void *buf, size_t buf_size,
const void *oob, size_t oob_size); const void *oob, size_t oob_size);
/* Receive data on port id. Optional. */ /* Receive data on port id. Optional. */
ssize_t (*port_recv)(nxt_unit_ctx_t *, nxt_unit_port_id_t *port_id, ssize_t (*port_recv)(nxt_unit_ctx_t *, nxt_unit_port_t *port,
void *buf, size_t buf_size, void *oob, size_t oob_size); void *buf, size_t buf_size, void *oob, size_t oob_size);
}; };
@@ -195,7 +194,7 @@ nxt_unit_ctx_t *nxt_unit_init(nxt_unit_init_t *);
* from port socket should be initially processed by unit. This function * from port socket should be initially processed by unit. This function
* may invoke other application-defined callback for message processing. * may invoke other application-defined callback for message processing.
*/ */
int nxt_unit_process_msg(nxt_unit_ctx_t *, nxt_unit_port_id_t *port_id, int nxt_unit_process_msg(nxt_unit_ctx_t *,
void *buf, size_t buf_size, void *oob, size_t oob_size); void *buf, size_t buf_size, void *oob, size_t oob_size);
/* /*
@@ -225,10 +224,6 @@ nxt_unit_ctx_t *nxt_unit_ctx_alloc(nxt_unit_ctx_t *, void *);
/* Initialize port_id, calculate hash. */ /* Initialize port_id, calculate hash. */
void nxt_unit_port_id_init(nxt_unit_port_id_t *port_id, pid_t pid, uint16_t id); void nxt_unit_port_id_init(nxt_unit_port_id_t *port_id, pid_t pid, uint16_t id);
/* Default 'port_recv' implementation. */
ssize_t nxt_unit_port_recv(nxt_unit_ctx_t *, int fd, void *buf, size_t buf_size,
void *oob, size_t oob_size);
/* Calculates hash for given field name. */ /* Calculates hash for given field name. */
uint16_t nxt_unit_field_hash(const char* name, size_t name_length); uint16_t nxt_unit_field_hash(const char* name, size_t name_length);