Go: removing C proxy functions and re-using goroutines.

This commit is contained in:
Max Romanov
2020-11-18 22:33:53 +03:00
parent d26afcb481
commit 8132e1f700
8 changed files with 238 additions and 316 deletions

View File

@@ -10,16 +10,10 @@
#include <nxt_unit_request.h>
static void nxt_cgo_request_handler(nxt_unit_request_info_t *req);
static nxt_cgo_str_t *nxt_cgo_str_init(nxt_cgo_str_t *dst,
nxt_unit_sptr_t *sptr, uint32_t length);
static int nxt_cgo_add_port(nxt_unit_ctx_t *, nxt_unit_port_t *port);
static void nxt_cgo_remove_port(nxt_unit_t *, nxt_unit_port_t *port);
static ssize_t nxt_cgo_port_send(nxt_unit_ctx_t *, nxt_unit_port_t *port,
const void *buf, size_t buf_size, const void *oob, size_t oob_size);
static ssize_t nxt_cgo_port_recv(nxt_unit_ctx_t *, nxt_unit_port_t *port,
void *buf, size_t buf_size, void *oob, size_t oob_size);
static void nxt_cgo_shm_ack_handler(nxt_unit_ctx_t *ctx);
int
nxt_cgo_run(uintptr_t handler)
@@ -30,12 +24,12 @@ nxt_cgo_run(uintptr_t handler)
memset(&init, 0, sizeof(init));
init.callbacks.request_handler = nxt_cgo_request_handler;
init.callbacks.add_port = nxt_cgo_add_port;
init.callbacks.remove_port = nxt_cgo_remove_port;
init.callbacks.request_handler = nxt_go_request_handler;
init.callbacks.add_port = nxt_go_add_port;
init.callbacks.remove_port = nxt_go_remove_port;
init.callbacks.port_send = nxt_cgo_port_send;
init.callbacks.port_recv = nxt_cgo_port_recv;
init.callbacks.shm_ack_handler = nxt_cgo_shm_ack_handler;
init.callbacks.shm_ack_handler = nxt_go_shm_ack_handler;
init.data = (void *) handler;
@@ -52,76 +46,6 @@ nxt_cgo_run(uintptr_t handler)
}
static void
nxt_cgo_request_handler(nxt_unit_request_info_t *req)
{
uint32_t i;
uintptr_t go_req;
nxt_cgo_str_t method, uri, name, value, proto, host, remote_addr;
nxt_unit_field_t *f;
nxt_unit_request_t *r;
r = req->request;
go_req = nxt_go_request_create((uintptr_t) req,
nxt_cgo_str_init(&method, &r->method, r->method_length),
nxt_cgo_str_init(&uri, &r->target, r->target_length));
nxt_go_request_set_proto(go_req,
nxt_cgo_str_init(&proto, &r->version, r->version_length), 1, 1);
for (i = 0; i < r->fields_count; i++) {
f = &r->fields[i];
nxt_go_request_add_header(go_req,
nxt_cgo_str_init(&name, &f->name, f->name_length),
nxt_cgo_str_init(&value, &f->value, f->value_length));
}
nxt_go_request_set_content_length(go_req, r->content_length);
nxt_go_request_set_host(go_req,
nxt_cgo_str_init(&host, &r->server_name, r->server_name_length));
nxt_go_request_set_remote_addr(go_req,
nxt_cgo_str_init(&remote_addr, &r->remote, r->remote_length));
if (r->tls) {
nxt_go_request_set_tls(go_req);
}
nxt_go_request_handler(go_req, (uintptr_t) req->unit->data);
}
static nxt_cgo_str_t *
nxt_cgo_str_init(nxt_cgo_str_t *dst, nxt_unit_sptr_t *sptr, uint32_t length)
{
dst->length = length;
dst->start = nxt_unit_sptr_get(sptr);
return dst;
}
static int
nxt_cgo_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
{
nxt_go_add_port((uintptr_t) ctx, port->id.pid, port->id.id,
port->in_fd, port->out_fd);
port->in_fd = -1;
port->out_fd = -1;
return NXT_UNIT_OK;
}
static void
nxt_cgo_remove_port(nxt_unit_t *unit, nxt_unit_port_t *port)
{
nxt_go_remove_port(port->id.pid, port->id.id);
}
static ssize_t
nxt_cgo_port_send(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
const void *buf, size_t buf_size, const void *oob, size_t oob_size)
@@ -140,78 +64,31 @@ nxt_cgo_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
}
static void
nxt_cgo_shm_ack_handler(nxt_unit_ctx_t *ctx)
ssize_t
nxt_cgo_response_write(nxt_unit_request_info_t *req, uintptr_t start,
uint32_t len)
{
return nxt_go_shm_ack_handler();
}
int
nxt_cgo_response_create(uintptr_t req, int status, int fields,
uint32_t fields_size)
{
return nxt_unit_response_init((nxt_unit_request_info_t *) req,
status, fields, fields_size);
}
int
nxt_cgo_response_add_field(uintptr_t req, uintptr_t name, uint8_t name_len,
uintptr_t value, uint32_t value_len)
{
return nxt_unit_response_add_field((nxt_unit_request_info_t *) req,
(char *) name, name_len,
(char *) value, value_len);
}
int
nxt_cgo_response_send(uintptr_t req)
{
return nxt_unit_response_send((nxt_unit_request_info_t *) req);
return nxt_unit_response_write_nb(req, (void *) start, len, 0);
}
ssize_t
nxt_cgo_response_write(uintptr_t req, uintptr_t start, uint32_t len)
nxt_cgo_request_read(nxt_unit_request_info_t *req, uintptr_t dst,
uint32_t dst_len)
{
return nxt_unit_response_write_nb((nxt_unit_request_info_t *) req,
(void *) start, len, 0);
}
ssize_t
nxt_cgo_request_read(uintptr_t req, uintptr_t dst, uint32_t dst_len)
{
return nxt_unit_request_read((nxt_unit_request_info_t *) req,
(void *) dst, dst_len);
}
int
nxt_cgo_request_close(uintptr_t req)
{
return 0;
return nxt_unit_request_read(req, (void *) dst, dst_len);
}
void
nxt_cgo_request_done(uintptr_t req, int res)
{
nxt_unit_request_done((nxt_unit_request_info_t *) req, res);
}
void
nxt_cgo_unit_run_shared(uintptr_t ctx)
{
nxt_unit_run_shared((nxt_unit_ctx_t *) ctx);
}
void
nxt_cgo_warn(uintptr_t msg, uint32_t msg_len)
nxt_cgo_warn(const char *msg, uint32_t msg_len)
{
nxt_unit_warn(NULL, "%.*s", (int) msg_len, (char *) msg);
}
void
nxt_cgo_alert(const char *msg, uint32_t msg_len)
{
nxt_unit_alert(NULL, "%.*s", (int) msg_len, (char *) msg);
}

View File

@@ -11,32 +11,22 @@
#include <stdint.h>
#include <stdlib.h>
#include <sys/types.h>
#include <nxt_unit.h>
#include <nxt_unit_request.h>
typedef struct {
int length;
char *start;
} nxt_cgo_str_t;
enum {
NXT_FIELDS_OFFSET = offsetof(nxt_unit_request_t, fields)
};
int nxt_cgo_run(uintptr_t handler);
int nxt_cgo_response_create(uintptr_t req, int code, int fields,
uint32_t fields_size);
ssize_t nxt_cgo_response_write(nxt_unit_request_info_t *req,
uintptr_t src, uint32_t len);
int nxt_cgo_response_add_field(uintptr_t req, uintptr_t name, uint8_t name_len,
uintptr_t value, uint32_t value_len);
ssize_t nxt_cgo_request_read(nxt_unit_request_info_t *req,
uintptr_t dst, uint32_t dst_len);
int nxt_cgo_response_send(uintptr_t req);
ssize_t nxt_cgo_response_write(uintptr_t req, uintptr_t src, uint32_t len);
ssize_t nxt_cgo_request_read(uintptr_t req, uintptr_t dst, uint32_t dst_len);
int nxt_cgo_request_close(uintptr_t req);
void nxt_cgo_request_done(uintptr_t req, int res);
void nxt_cgo_unit_run_shared(uintptr_t ctx);
void nxt_cgo_warn(uintptr_t msg, uint32_t msg_len);
void nxt_cgo_warn(const char *msg, uint32_t msg_len);
void nxt_cgo_alert(const char *msg, uint32_t msg_len);
#endif /* _NXT_CGO_LIB_H_INCLUDED_ */

View File

@@ -11,6 +11,7 @@ package unit
import "C"
import (
"io"
"net"
"os"
"sync"
@@ -79,13 +80,13 @@ func getUnixConn(fd int) *net.UnixConn {
c, err := net.FileConn(f)
if err != nil {
nxt_go_warn("FileConn error %s", err)
nxt_go_alert("FileConn error %s", err)
return nil
}
uc, ok := c.(*net.UnixConn)
if !ok {
nxt_go_warn("Not a Unix-domain socket %d", fd)
nxt_go_alert("Not a Unix-domain socket %d", fd)
return nil
}
@@ -93,30 +94,37 @@ func getUnixConn(fd int) *net.UnixConn {
}
//export nxt_go_add_port
func nxt_go_add_port(ctx C.uintptr_t, pid C.int, id C.int, rcv C.int, snd C.int) {
p := &port{
func nxt_go_add_port(ctx *C.nxt_unit_ctx_t, p *C.nxt_unit_port_t) C.int {
new_port := &port{
key: port_key{
pid: int(pid),
id: int(id),
pid: int(p.id.pid),
id: int(p.id.id),
},
rcv: getUnixConn(int(rcv)),
snd: getUnixConn(int(snd)),
rcv: getUnixConn(int(p.in_fd)),
snd: getUnixConn(int(p.out_fd)),
}
add_port(p)
add_port(new_port)
if id == 65535 {
go func(ctx C.uintptr_t) {
C.nxt_cgo_unit_run_shared(ctx);
p.in_fd = -1
p.out_fd = -1
if new_port.key.id == 65535 {
go func(ctx *C.nxt_unit_ctx_t) {
C.nxt_unit_run_shared(ctx);
}(ctx)
}
return C.NXT_UNIT_OK
}
//export nxt_go_remove_port
func nxt_go_remove_port(pid C.int, id C.int) {
func nxt_go_remove_port(unit *C.nxt_unit_t, p *C.nxt_unit_port_t) {
key := port_key{
pid: int(pid),
id: int(id),
pid: int(p.id.pid),
id: int(p.id.id),
}
port_registry_.Lock()
@@ -139,7 +147,7 @@ func nxt_go_port_send(pid C.int, id C.int, buf unsafe.Pointer, buf_size C.int,
p := find_port(key)
if p == nil {
nxt_go_warn("port %d:%d not found", pid, id)
nxt_go_alert("port %d:%d not found", pid, id)
return 0
}
@@ -167,7 +175,7 @@ func nxt_go_port_recv(pid C.int, id C.int, buf unsafe.Pointer, buf_size C.int,
p := find_port(key)
if p == nil {
nxt_go_warn("port %d:%d not found", pid, id)
nxt_go_alert("port %d:%d not found", pid, id)
return 0
}
@@ -175,6 +183,12 @@ func nxt_go_port_recv(pid C.int, id C.int, buf unsafe.Pointer, buf_size C.int,
GoBytes(oob, oob_size))
if err != nil {
if nerr, ok := err.(*net.OpError); ok {
if nerr.Err == io.EOF {
return 0
}
}
nxt_go_warn("read result %d (%d), %s", n, oobn, err)
n = -1

View File

@@ -20,8 +20,8 @@ import (
type request struct {
req http.Request
resp *response
c_req C.uintptr_t
resp response
c_req *C.nxt_unit_request_info_t
}
func (r *request) Read(p []byte) (n int, err error) {
@@ -35,110 +35,102 @@ func (r *request) Read(p []byte) (n int, err error) {
}
func (r *request) Close() error {
C.nxt_cgo_request_close(r.c_req)
return nil
}
func (r *request) response() *response {
if r.resp == nil {
r.resp = new_response(r.c_req, &r.req)
func new_request(c_req *C.nxt_unit_request_info_t) (r *request, err error) {
req := c_req.request
uri := GoStringN(&req.target, C.int(req.target_length))
URL, err := url.ParseRequestURI(uri)
if err != nil {
return nil, err
}
return r.resp
}
proto := GoStringN(&req.version, C.int(req.version_length))
func (r *request) done() {
resp := r.response()
if !resp.headerSent {
resp.WriteHeader(http.StatusOK)
}
C.nxt_cgo_request_done(r.c_req, 0)
}
func get_request(go_req uintptr) *request {
return (*request)(unsafe.Pointer(go_req))
}
//export nxt_go_request_create
func nxt_go_request_create(c_req C.uintptr_t,
c_method *C.nxt_cgo_str_t, c_uri *C.nxt_cgo_str_t) uintptr {
uri := C.GoStringN(c_uri.start, c_uri.length)
var URL *url.URL
var err error
if URL, err = url.ParseRequestURI(uri); err != nil {
return 0
}
r := &request{
r = &request{
req: http.Request {
Method: C.GoStringN(c_method.start, c_method.length),
URL: URL,
Header: http.Header{},
Body: nil,
RequestURI: uri,
Method: GoStringN(&req.method, C.int(req.method_length)),
Proto: proto,
ProtoMajor: 1,
ProtoMinor: int(proto[7] - '0'),
ContentLength: int64(req.content_length),
Host: GoStringN(&req.server_name, C.int(req.server_name_length)),
RemoteAddr: GoStringN(&req.remote, C.int(req.remote_length)),
},
resp: response{header: http.Header{}, c_req: c_req},
c_req: c_req,
}
r.req.Body = r
return uintptr(unsafe.Pointer(r))
if req.tls != 0 {
r.req.TLS = &tls.ConnectionState{ }
r.req.URL.Scheme = "https"
} else {
r.req.URL.Scheme = "http"
}
//export nxt_go_request_set_proto
func nxt_go_request_set_proto(go_req uintptr, proto *C.nxt_cgo_str_t,
maj C.int, min C.int) {
fields := get_fields(req)
r := get_request(go_req)
r.req.Proto = C.GoStringN(proto.start, proto.length)
r.req.ProtoMajor = int(maj)
r.req.ProtoMinor = int(min)
for i := 0; i < len(fields); i++ {
f := &fields[i]
n := GoStringN(&f.name, C.int(f.name_length))
v := GoStringN(&f.value, C.int(f.value_length))
r.req.Header.Add(n, v)
}
//export nxt_go_request_add_header
func nxt_go_request_add_header(go_req uintptr, name *C.nxt_cgo_str_t,
value *C.nxt_cgo_str_t) {
r := get_request(go_req)
r.req.Header.Add(C.GoStringN(name.start, name.length),
C.GoStringN(value.start, value.length))
return r, nil
}
//export nxt_go_request_set_content_length
func nxt_go_request_set_content_length(go_req uintptr, l C.int64_t) {
get_request(go_req).req.ContentLength = int64(l)
func get_fields(req *C.nxt_unit_request_t) []C.nxt_unit_field_t {
f := uintptr(unsafe.Pointer(req)) + uintptr(C.NXT_FIELDS_OFFSET)
h := &slice_header{
Data: unsafe.Pointer(f),
Len: int(req.fields_count),
Cap: int(req.fields_count),
}
//export nxt_go_request_set_host
func nxt_go_request_set_host(go_req uintptr, host *C.nxt_cgo_str_t) {
get_request(go_req).req.Host = C.GoStringN(host.start, host.length)
}
//export nxt_go_request_set_url
func nxt_go_request_set_url(go_req uintptr, scheme *C.char) {
get_request(go_req).req.URL.Scheme = C.GoString(scheme)
}
//export nxt_go_request_set_remote_addr
func nxt_go_request_set_remote_addr(go_req uintptr, addr *C.nxt_cgo_str_t) {
get_request(go_req).req.RemoteAddr = C.GoStringN(addr.start, addr.length)
}
//export nxt_go_request_set_tls
func nxt_go_request_set_tls(go_req uintptr) {
get_request(go_req).req.TLS = &tls.ConnectionState{ }
return *(*[]C.nxt_unit_field_t)(unsafe.Pointer(h))
}
//export nxt_go_request_handler
func nxt_go_request_handler(go_req uintptr, h uintptr) {
r := get_request(go_req)
handler := get_handler(h)
func nxt_go_request_handler(c_req *C.nxt_unit_request_info_t) {
go func(r *request) {
handler.ServeHTTP(r.response(), &r.req)
r.done()
}(r)
go func(c_req *C.nxt_unit_request_info_t, handler http.Handler) {
ctx := c_req.ctx
for {
r, err := new_request(c_req)
if err == nil {
handler.ServeHTTP(&r.resp, &r.req)
if !r.resp.header_sent {
r.resp.WriteHeader(http.StatusOK)
}
C.nxt_unit_request_done(c_req, C.NXT_UNIT_OK)
} else {
C.nxt_unit_request_done(c_req, C.NXT_UNIT_ERROR)
}
c_req = C.nxt_unit_dequeue_request(ctx)
if c_req == nil {
break
}
}
}(c_req, get_handler(uintptr(c_req.unit.data)))
}

View File

@@ -16,28 +16,17 @@ import (
type response struct {
header http.Header
headerSent bool
req *http.Request
c_req C.uintptr_t
header_sent bool
c_req *C.nxt_unit_request_info_t
ch chan int
}
func new_response(c_req C.uintptr_t, req *http.Request) *response {
resp := &response{
header: http.Header{},
req: req,
c_req: c_req,
}
return resp
}
func (r *response) Header() http.Header {
return r.header
}
func (r *response) Write(p []byte) (n int, err error) {
if !r.headerSent {
if !r.header_sent {
r.WriteHeader(http.StatusOK)
}
@@ -64,12 +53,11 @@ func (r *response) Write(p []byte) (n int, err error) {
}
func (r *response) WriteHeader(code int) {
if r.headerSent {
// Note: explicitly using Stderr, as Stdout is our HTTP output.
if r.header_sent {
nxt_go_warn("multiple response.WriteHeader calls")
return
}
r.headerSent = true
r.header_sent = true
// Set a default Content-Type
if _, hasType := r.header["Content-Type"]; !hasType {
@@ -86,21 +74,21 @@ func (r *response) WriteHeader(code int) {
}
}
C.nxt_cgo_response_create(r.c_req, C.int(code), C.int(fields),
C.nxt_unit_response_init(r.c_req, C.uint16_t(code), C.uint32_t(fields),
C.uint32_t(fields_size))
for k, vv := range r.header {
for _, v := range vv {
C.nxt_cgo_response_add_field(r.c_req, str_ref(k), C.uint8_t(len(k)),
C.nxt_unit_response_add_field(r.c_req, str_ref(k), C.uint8_t(len(k)),
str_ref(v), C.uint32_t(len(v)))
}
}
C.nxt_cgo_response_send(r.c_req)
C.nxt_unit_response_send(r.c_req)
}
func (r *response) Flush() {
if !r.headerSent {
if !r.header_sent {
r.WriteHeader(http.StatusOK)
}
}
@@ -114,6 +102,6 @@ func wait_shm_ack(c chan int) {
}
//export nxt_go_shm_ack_handler
func nxt_go_shm_ack_handler() {
func nxt_go_shm_ack_handler(ctx *C.nxt_unit_ctx_t) {
observer_registry_.notify(1)
}

View File

@@ -30,15 +30,15 @@ func buf_ref(buf []byte) C.uintptr_t {
return C.uintptr_t(uintptr(unsafe.Pointer(&buf[0])))
}
type StringHeader struct {
type string_header struct {
Data unsafe.Pointer
Len int
}
func str_ref(s string) C.uintptr_t {
header := (*StringHeader)(unsafe.Pointer(&s))
func str_ref(s string) *C.char {
header := (*string_header)(unsafe.Pointer(&s))
return C.uintptr_t(uintptr(unsafe.Pointer(header.Data)))
return (*C.char)(header.Data)
}
func (buf *cbuf) init_bytes(b []byte) {
@@ -46,12 +46,7 @@ func (buf *cbuf) init_bytes(b []byte) {
buf.s = C.size_t(len(b))
}
func (buf *cbuf) init_string(s string) {
buf.b = str_ref(s)
buf.s = C.size_t(len(s))
}
type SliceHeader struct {
type slice_header struct {
Data unsafe.Pointer
Len int
Cap int
@@ -63,17 +58,17 @@ func (buf *cbuf) GoBytes() []byte {
return b[:0]
}
bytesHeader := &SliceHeader{
header := &slice_header{
Data: unsafe.Pointer(uintptr(buf.b)),
Len: int(buf.s),
Cap: int(buf.s),
}
return *(*[]byte)(unsafe.Pointer(bytesHeader))
return *(*[]byte)(unsafe.Pointer(header))
}
func GoBytes(buf unsafe.Pointer, size C.int) []byte {
bytesHeader := &SliceHeader{
bytesHeader := &slice_header{
Data: buf,
Len: int(size),
Cap: int(size),
@@ -82,12 +77,25 @@ func GoBytes(buf unsafe.Pointer, size C.int) []byte {
return *(*[]byte)(unsafe.Pointer(bytesHeader))
}
func GoStringN(sptr *C.nxt_unit_sptr_t, l C.int) string {
p := unsafe.Pointer(sptr)
b := uintptr(p) + uintptr(*(*C.uint32_t)(p))
return C.GoStringN((*C.char)(unsafe.Pointer(b)), l)
}
func nxt_go_warn(format string, args ...interface{}) {
str := fmt.Sprintf("[go] " + format, args...)
C.nxt_cgo_warn(str_ref(str), C.uint32_t(len(str)))
}
func nxt_go_alert(format string, args ...interface{}) {
str := fmt.Sprintf("[go] " + format, args...)
C.nxt_cgo_alert(str_ref(str), C.uint32_t(len(str)))
}
type handler_registry struct {
sync.RWMutex
next uintptr

View File

@@ -54,12 +54,13 @@ static int nxt_unit_read_env(nxt_unit_port_t *ready_port,
int *log_fd, uint32_t *stream, uint32_t *shm_limit);
static int nxt_unit_ready(nxt_unit_ctx_t *ctx, int ready_fd, uint32_t stream,
int queue_fd);
static int nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf);
static int nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf,
nxt_unit_request_info_t **preq);
static int nxt_unit_process_new_port(nxt_unit_ctx_t *ctx,
nxt_unit_recv_msg_t *recv_msg);
static int nxt_unit_ctx_ready(nxt_unit_ctx_t *ctx);
static int nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx,
nxt_unit_recv_msg_t *recv_msg);
nxt_unit_recv_msg_t *recv_msg, nxt_unit_request_info_t **preq);
static int nxt_unit_process_req_body(nxt_unit_ctx_t *ctx,
nxt_unit_recv_msg_t *recv_msg);
static int nxt_unit_request_check_response_port(nxt_unit_request_info_t *req,
@@ -904,7 +905,8 @@ nxt_unit_ready(nxt_unit_ctx_t *ctx, int ready_fd, uint32_t stream, int queue_fd)
static int
nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf)
nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf,
nxt_unit_request_info_t **preq)
{
int rc;
pid_t pid;
@@ -1040,7 +1042,7 @@ nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf)
break;
case _NXT_PORT_MSG_REQ_HEADERS:
rc = nxt_unit_process_req_headers(ctx, &recv_msg);
rc = nxt_unit_process_req_headers(ctx, &recv_msg, preq);
break;
case _NXT_PORT_MSG_REQ_BODY:
@@ -1213,7 +1215,8 @@ nxt_unit_ctx_ready(nxt_unit_ctx_t *ctx)
static int
nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg,
nxt_unit_request_info_t **preq)
{
int res;
nxt_unit_impl_t *lib;
@@ -1329,7 +1332,12 @@ nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
}
}
if (preq == NULL) {
lib->callbacks.request_handler(req);
} else {
*preq = req;
}
}
return NXT_UNIT_OK;
@@ -2179,7 +2187,8 @@ nxt_unit_response_add_field(nxt_unit_request_info_t *req,
resp = req->response;
if (nxt_slow_path(resp->fields_count >= req->response_max_fields)) {
nxt_unit_req_warn(req, "add_field: too many response fields");
nxt_unit_req_warn(req, "add_field: too many response fields (%d)",
(int) resp->fields_count);
return NXT_UNIT_ERROR;
}
@@ -2356,6 +2365,8 @@ nxt_unit_response_buf_alloc(nxt_unit_request_info_t *req, uint32_t size)
if (nxt_slow_path(rc != NXT_UNIT_OK)) {
nxt_unit_mmap_buf_release(mmap_buf);
nxt_unit_req_alert(req, "response_buf_alloc: failed to get out buf");
return NULL;
}
@@ -4537,7 +4548,7 @@ nxt_unit_run_once_impl(nxt_unit_ctx_t *ctx)
return rc;
}
rc = nxt_unit_process_msg(ctx, rbuf);
rc = nxt_unit_process_msg(ctx, rbuf, NULL);
if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
return NXT_UNIT_ERROR;
}
@@ -4686,7 +4697,7 @@ nxt_unit_process_pending_rbuf(nxt_unit_ctx_t *ctx)
nxt_queue_each(rbuf, &pending_rbuf, nxt_unit_read_buf_t, link) {
if (nxt_fast_path(rc != NXT_UNIT_ERROR)) {
rc = nxt_unit_process_msg(&ctx_impl->ctx, rbuf);
rc = nxt_unit_process_msg(&ctx_impl->ctx, rbuf, NULL);
} else {
nxt_unit_read_buf_release(ctx, rbuf);
@@ -4793,7 +4804,7 @@ nxt_unit_run_ctx(nxt_unit_ctx_t *ctx)
goto retry;
}
rc = nxt_unit_process_msg(ctx, rbuf);
rc = nxt_unit_process_msg(ctx, rbuf, NULL);
if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
break;
}
@@ -4902,7 +4913,7 @@ nxt_unit_run_shared(nxt_unit_ctx_t *ctx)
break;
}
rc = nxt_unit_process_msg(ctx, rbuf);
rc = nxt_unit_process_msg(ctx, rbuf, NULL);
if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
break;
}
@@ -4921,6 +4932,46 @@ nxt_unit_run_shared(nxt_unit_ctx_t *ctx)
}
nxt_unit_request_info_t *
nxt_unit_dequeue_request(nxt_unit_ctx_t *ctx)
{
int rc;
nxt_unit_impl_t *lib;
nxt_unit_read_buf_t *rbuf;
nxt_unit_ctx_impl_t *ctx_impl;
nxt_unit_request_info_t *req;
nxt_unit_ctx_use(ctx);
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
req = NULL;
if (nxt_slow_path(!ctx_impl->online)) {
goto done;
}
rbuf = nxt_unit_read_buf_get(ctx);
if (nxt_slow_path(rbuf == NULL)) {
goto done;
}
rc = nxt_unit_app_queue_recv(lib->shared_port, rbuf);
if (rc != NXT_UNIT_OK) {
goto done;
}
(void) nxt_unit_process_msg(ctx, rbuf, &req);
done:
nxt_unit_ctx_release(ctx);
return req;
}
int
nxt_unit_is_main_ctx(nxt_unit_ctx_t *ctx)
{
@@ -4977,7 +5028,7 @@ retry:
return rc;
}
rc = nxt_unit_process_msg(ctx, rbuf);
rc = nxt_unit_process_msg(ctx, rbuf, NULL);
if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
return NXT_UNIT_ERROR;
}

View File

@@ -213,6 +213,8 @@ int nxt_unit_run_ctx(nxt_unit_ctx_t *ctx);
int nxt_unit_run_shared(nxt_unit_ctx_t *ctx);
nxt_unit_request_info_t *nxt_unit_dequeue_request(nxt_unit_ctx_t *ctx);
int nxt_unit_is_main_ctx(nxt_unit_ctx_t *ctx);
/*