Go: removing request registry.

Passing unsafe.Pointers (void *) from Go to C is complicated by an attempt
to make such pointers less unsafe.

A straightforward optimization is to replace 'unsafe.Pointer' with 'uintptr'
(thanks to Xin Huang for the idea: https://stackoverflow.com/a/44826533 ).

As a result, request registry with mutex is gone.
This commit is contained in:
Max Romanov
2017-12-06 12:16:02 +03:00
parent 0db4d25316
commit d14c0774c7
11 changed files with 96 additions and 244 deletions

View File

@@ -1,15 +0,0 @@
// +build !go1.7
/*
* Copyright (C) Max Romanov
* Copyright (C) NGINX, Inc.
*/
package unit
import "C"
import "unsafe"
func getCBytes(p []byte) unsafe.Pointer {
return unsafe.Pointer(C.CString(string(p))) // go <= 1.6
}

View File

@@ -1,15 +0,0 @@
// +build go1.7
/*
* Copyright (C) Max Romanov
* Copyright (C) NGINX, Inc.
*/
package unit
import "C"
import "unsafe"
func getCBytes(p []byte) unsafe.Pointer {
return C.CBytes(p) // go >= 1.7
}

View File

@@ -13,7 +13,7 @@
#include <nxt_main.h>
int
nxt_go_response_write(nxt_go_request_t r, void *buf, size_t len)
nxt_go_response_write(nxt_go_request_t r, uintptr_t buf, size_t len)
{
nxt_int_t rc;
nxt_go_run_ctx_t *ctx;
@@ -25,14 +25,14 @@ nxt_go_response_write(nxt_go_request_t r, void *buf, size_t len)
nxt_go_debug("write: %d", (int) len);
ctx = (nxt_go_run_ctx_t *) r;
rc = nxt_go_ctx_write(ctx, buf, len);
rc = nxt_go_ctx_write(ctx, (void *) buf, len);
return rc == NXT_OK ? len : -1;
}
int
nxt_go_request_read(nxt_go_request_t r, void *dst, size_t dst_len)
nxt_go_request_read(nxt_go_request_t r, uintptr_t dst, size_t dst_len)
{
size_t res;
nxt_go_run_ctx_t *ctx;
@@ -43,19 +43,19 @@ nxt_go_request_read(nxt_go_request_t r, void *dst, size_t dst_len)
ctx = (nxt_go_run_ctx_t *) r;
dst_len = nxt_min(dst_len, ctx->r.body.preread_size);
dst_len = nxt_min(dst_len, ctx->request.body.preread_size);
res = nxt_go_ctx_read_raw(ctx, dst, dst_len);
res = nxt_go_ctx_read_raw(ctx, (void *) dst, dst_len);
ctx->r.body.preread_size -= res;
ctx->request.body.preread_size -= res;
return res;
}
int
nxt_go_request_read_from(nxt_go_request_t r, void *dst, size_t dst_len,
void *src, size_t src_len)
nxt_go_request_read_from(nxt_go_request_t r, uintptr_t dst, size_t dst_len,
uintptr_t src, size_t src_len)
{
nxt_go_run_ctx_t *ctx;
@@ -65,7 +65,7 @@ nxt_go_request_read_from(nxt_go_request_t r, void *dst, size_t dst_len,
ctx = (nxt_go_run_ctx_t *) r;
nxt_go_ctx_add_msg(ctx, src, src_len);
nxt_go_ctx_add_msg(ctx, (void *) src, src_len);
return nxt_go_request_read(r, dst, dst_len);
}
@@ -137,7 +137,7 @@ nxt_go_ready()
nxt_go_request_t
nxt_go_process_port_msg(void *buf, size_t buf_len, void *oob, size_t oob_len)
nxt_go_process_port_msg(uintptr_t buf, size_t buf_len, uintptr_t oob, size_t oob_len)
{
return nxt_go_port_on_read(buf, buf_len, oob, oob_len);
return nxt_go_port_on_read((void *) buf, buf_len, (void *) oob, oob_len);
}

View File

@@ -19,12 +19,12 @@ typedef struct {
typedef uintptr_t nxt_go_request_t;
int nxt_go_response_write(nxt_go_request_t r, void *buf, size_t len);
int nxt_go_response_write(nxt_go_request_t r, uintptr_t buf, size_t len);
int nxt_go_request_read(nxt_go_request_t r, void *dst, size_t dst_len);
int nxt_go_request_read(nxt_go_request_t r, uintptr_t dst, size_t dst_len);
int nxt_go_request_read_from(nxt_go_request_t r, void *dst, size_t dst_len,
void *src, size_t src_len);
int nxt_go_request_read_from(nxt_go_request_t r, uintptr_t dst, size_t dst_len,
uintptr_t src, size_t src_len);
int nxt_go_request_close(nxt_go_request_t r);
@@ -32,8 +32,8 @@ int nxt_go_request_done(nxt_go_request_t r);
void nxt_go_ready();
nxt_go_request_t nxt_go_process_port_msg(void *buf, size_t buf_len,
void *oob, size_t oob_len);
nxt_go_request_t nxt_go_process_port_msg(uintptr_t buf, size_t buf_len,
uintptr_t oob, size_t oob_len);
#endif /* _NXT_GO_LIB_H_INCLUDED_ */

View File

@@ -27,16 +27,15 @@ nxt_go_data_handler(nxt_port_msg_t *port_msg, size_t size)
nxt_go_request_t r;
nxt_app_request_header_t *h;
r = nxt_go_find_request(port_msg->stream);
if (r != 0) {
return r;
}
ctx = malloc(sizeof(nxt_go_run_ctx_t) + size);
memcpy(ctx + 1, port_msg, size);
port_msg = (nxt_port_msg_t *) (ctx + 1);
ctx = malloc(sizeof(nxt_go_run_ctx_t));
nxt_go_ctx_init(ctx, port_msg, size - sizeof(nxt_port_msg_t));
r = (nxt_go_request_t)(ctx);
h = &ctx->r.header;
h = &ctx->request.header;
nxt_go_ctx_read_str(ctx, &h->method);
nxt_go_ctx_read_str(ctx, &h->target);
@@ -58,18 +57,20 @@ nxt_go_data_handler(nxt_port_msg_t *port_msg, size_t size)
h->path = h->target;
}
nxt_go_new_request(r, port_msg->stream, nxt_go_str(&h->method),
nxt_go_str(&h->target));
ctx->go_request = nxt_go_new_request(r, port_msg->stream,
nxt_go_str(&h->method),
nxt_go_str(&h->target));
nxt_go_ctx_read_str(ctx, &h->version);
nxt_go_request_set_proto(r, nxt_go_str(&h->version),
nxt_go_request_set_proto(ctx->go_request, nxt_go_str(&h->version),
h->version.start[5] - '0',
h->version.start[7] - '0');
nxt_go_ctx_read_str(ctx, &ctx->r.remote);
if (ctx->r.remote.start != NULL) {
nxt_go_request_set_remote_addr(r, nxt_go_str(&ctx->r.remote));
nxt_go_ctx_read_str(ctx, &ctx->request.remote);
if (ctx->request.remote.start != NULL) {
nxt_go_request_set_remote_addr(ctx->go_request,
nxt_go_str(&ctx->request.remote));
}
nxt_go_ctx_read_str(ctx, &h->host);
@@ -78,7 +79,7 @@ nxt_go_data_handler(nxt_port_msg_t *port_msg, size_t size)
nxt_go_ctx_read_str(ctx, &h->content_length);
if (h->host.start != NULL) {
nxt_go_request_set_host(r, nxt_go_str(&h->host));
nxt_go_request_set_host(ctx->go_request, nxt_go_str(&h->host));
}
nxt_go_ctx_read_size(ctx, &s);
@@ -92,21 +93,23 @@ nxt_go_data_handler(nxt_port_msg_t *port_msg, size_t size)
}
rc = nxt_go_ctx_read_str(ctx, &v);
nxt_go_request_add_header(r, nxt_go_str(&n), nxt_go_str(&v));
nxt_go_request_add_header(ctx->go_request, nxt_go_str(&n),
nxt_go_str(&v));
} while(1);
nxt_go_ctx_read_size(ctx, &s);
ctx->r.body.preread_size = s;
ctx->request.body.preread_size = s;
if (h->parsed_content_length > 0) {
nxt_go_request_set_content_length(r, h->parsed_content_length);
nxt_go_request_set_content_length(ctx->go_request,
h->parsed_content_length);
}
if (ctx->r.body.preread_size < h->parsed_content_length) {
nxt_go_request_create_channel(r);
if (ctx->request.body.preread_size < h->parsed_content_length) {
nxt_go_warn("preread_size < content_length");
}
return r;
return ctx->go_request;
}
nxt_go_request_t

View File

@@ -185,7 +185,7 @@ nxt_go_ctx_add_msg(nxt_go_run_ctx_t *ctx, nxt_port_msg_t *port_msg, size_t size)
msg->start_offset = ctx->msg_last->start_offset;
if (ctx->msg_last == &ctx->msg) {
msg->start_offset += ctx->r.body.preread_size;
msg->start_offset += ctx->request.body.preread_size;
} else {
msg->start_offset += ctx->msg_last->data_size;
@@ -219,6 +219,8 @@ nxt_go_ctx_flush(nxt_go_run_ctx_t *ctx, int last)
&ctx->wport_msg, sizeof(nxt_port_msg_t) +
ctx->nwbuf * sizeof(nxt_port_mmap_msg_t), NULL, 0);
nxt_go_debug(" port send res = %d", rc);
ctx->nwbuf = 0;
memset(&ctx->wbuf, 0, sizeof(ctx->wbuf));

View File

@@ -47,7 +47,8 @@ typedef struct {
nxt_port_msg_t wport_msg;
char wmmap_msg_buf[ sizeof(nxt_port_mmap_msg_t) * 8 ];
nxt_app_request_t r;
nxt_app_request_t request;
uintptr_t go_request;
nxt_go_msg_t *msg_last;
} nxt_go_run_ctx_t;

View File

@@ -12,7 +12,6 @@ package unit
import "C"
import (
"fmt"
"net"
"net/http"
"os"
@@ -74,6 +73,9 @@ func main_port() *port {
}
func add_port(p *port) {
nxt_go_debug("add_port: %d:%d", p.key.pid, p.key.id);
port_registry_.Lock()
if port_registry_.m == nil {
port_registry_.m = make(map[port_key]*port)
@@ -105,13 +107,13 @@ func getUnixConn(fd int) *net.UnixConn {
c, err := net.FileConn(f)
if err != nil {
fmt.Printf("FileConn error %s\n", err)
nxt_go_warn("FileConn error %s", err)
return nil
}
uc, ok := c.(*net.UnixConn)
if !ok {
fmt.Printf("Not a Unix-domain socket %d\n", fd)
nxt_go_warn("Not a Unix-domain socket %d", fd)
return nil
}
@@ -135,6 +137,7 @@ func nxt_go_port_send(pid C.int, id C.int, buf unsafe.Pointer, buf_size C.int,
p := find_port(key)
if p == nil {
nxt_go_warn("port %d:%d not found", pid, id)
return 0
}
@@ -142,7 +145,7 @@ func nxt_go_port_send(pid C.int, id C.int, buf unsafe.Pointer, buf_size C.int,
C.GoBytes(oob, oob_size), nil)
if err != nil {
fmt.Printf("write result %d (%d), %s\n", n, oobn, err)
nxt_go_warn("write result %d (%d), %s", n, oobn, err)
}
return C.int(n)
@@ -163,7 +166,7 @@ func nxt_go_main_send(buf unsafe.Pointer, buf_size C.int, oob unsafe.Pointer,
C.GoBytes(oob, oob_size), nil)
if err != nil {
fmt.Printf("write result %d (%d), %s\n", n, oobn, err)
nxt_go_warn("write result %d (%d), %s", n, oobn, err)
}
return C.int(n)
@@ -188,6 +191,7 @@ func new_port(pid int, id int, t int, rcv int, snd int) *port {
func (p *port) read(handler http.Handler) error {
var buf [16384]byte
var oob [1024]byte
var c_buf, c_oob cbuf
n, oobn, _, _, err := p.rcv.ReadMsgUnix(buf[:], oob[:])
@@ -195,24 +199,16 @@ func (p *port) read(handler http.Handler) error {
return err
}
m := new_cmsg(buf[:n], oob[:oobn])
c_buf.init(buf[:n])
c_oob.init(oob[:oobn])
c_req := C.nxt_go_process_port_msg(m.buf.b, m.buf.s, m.oob.b, m.oob.s)
go_req := C.nxt_go_process_port_msg(c_buf.b, c_buf.s, c_oob.b, c_oob.s)
if c_req == 0 {
m.Close()
if go_req == 0 {
return nil
}
r := find_request(c_req)
if len(r.msgs) == 0 {
r.push(m)
} else if r.ch != nil {
r.ch <- m
} else {
m.Close()
}
r := get_request(go_req)
go func(r *request) {
if handler == nil {

View File

@@ -13,7 +13,7 @@ import "C"
import (
"net/http"
"net/url"
"sync"
"unsafe"
)
type request struct {
@@ -21,28 +21,14 @@ type request struct {
resp *response
c_req C.nxt_go_request_t
id C.uint32_t
msgs []*cmsg
ch chan *cmsg
}
func (r *request) Read(p []byte) (n int, err error) {
c := C.size_t(cap(p))
b := C.malloc(c)
b := C.uintptr_t(uintptr(unsafe.Pointer(&p[0])))
res := C.nxt_go_request_read(r.c_req, b, c)
if res == -2 /* NXT_AGAIN */ {
m := <-r.ch
res = C.nxt_go_request_read_from(r.c_req, b, c, m.buf.b,
m.buf.s)
r.push(m)
}
if res > 0 {
copy(p, C.GoBytes(b, res))
}
C.free(b)
return int(res), nil
}
@@ -51,53 +37,6 @@ func (r *request) Close() error {
return nil
}
type request_registry struct {
sync.RWMutex
m map[C.nxt_go_request_t]*request
id map[C.uint32_t]*request
}
var request_registry_ request_registry
func find_request(c_req C.nxt_go_request_t) *request {
request_registry_.RLock()
res := request_registry_.m[c_req]
request_registry_.RUnlock()
return res
}
func find_request_by_id(id C.uint32_t) *request {
request_registry_.RLock()
res := request_registry_.id[id]
request_registry_.RUnlock()
return res
}
func add_request(r *request) {
request_registry_.Lock()
if request_registry_.m == nil {
request_registry_.m = make(map[C.nxt_go_request_t]*request)
request_registry_.id = make(map[C.uint32_t]*request)
}
request_registry_.m[r.c_req] = r
request_registry_.id[r.id] = r
request_registry_.Unlock()
}
func remove_request(r *request) {
request_registry_.Lock()
if request_registry_.m != nil {
delete(request_registry_.m, r.c_req)
delete(request_registry_.id, r.id)
}
request_registry_.Unlock()
}
func (r *request) response() *response {
if r.resp == nil {
r.resp = new_response(r.c_req, &r.req)
@@ -107,33 +46,23 @@ func (r *request) response() *response {
}
func (r *request) done() {
remove_request(r)
C.nxt_go_request_done(r.c_req)
for _, m := range r.msgs {
m.Close()
}
if r.ch != nil {
close(r.ch)
}
}
func (r *request) push(m *cmsg) {
r.msgs = append(r.msgs, m)
func get_request(go_req C.nxt_go_request_t) *request {
return (*request)(unsafe.Pointer(uintptr(go_req)))
}
//export nxt_go_new_request
func nxt_go_new_request(c_req C.nxt_go_request_t, id C.uint32_t,
c_method *C.nxt_go_str_t, c_uri *C.nxt_go_str_t) {
c_method *C.nxt_go_str_t, c_uri *C.nxt_go_str_t) uintptr {
uri := C.GoStringN(c_uri.start, c_uri.length)
var URL *url.URL
var err error
if URL, err = url.ParseRequestURI(uri); err != nil {
return
return 0
}
r := &request{
@@ -146,76 +75,49 @@ func nxt_go_new_request(c_req C.nxt_go_request_t, id C.uint32_t,
},
c_req: c_req,
id: id,
msgs: make([]*cmsg, 0, 1),
}
r.req.Body = r
add_request(r)
}
//export nxt_go_find_request
func nxt_go_find_request(id C.uint32_t) C.nxt_go_request_t {
r := find_request_by_id(id)
if r != nil {
return r.c_req
}
return 0
return uintptr(unsafe.Pointer(r))
}
//export nxt_go_request_set_proto
func nxt_go_request_set_proto(c_req C.nxt_go_request_t, proto *C.nxt_go_str_t,
func nxt_go_request_set_proto(go_req C.nxt_go_request_t, proto *C.nxt_go_str_t,
maj C.int, min C.int) {
r := find_request(c_req)
r := get_request(go_req)
r.req.Proto = C.GoStringN(proto.start, proto.length)
r.req.ProtoMajor = int(maj)
r.req.ProtoMinor = int(min)
}
//export nxt_go_request_add_header
func nxt_go_request_add_header(c_req C.nxt_go_request_t, name *C.nxt_go_str_t,
func nxt_go_request_add_header(go_req C.nxt_go_request_t, name *C.nxt_go_str_t,
value *C.nxt_go_str_t) {
r := find_request(c_req)
r := get_request(go_req)
r.req.Header.Add(C.GoStringN(name.start, name.length),
C.GoStringN(value.start, value.length))
}
//export nxt_go_request_set_content_length
func nxt_go_request_set_content_length(c_req C.nxt_go_request_t, l C.int64_t) {
find_request(c_req).req.ContentLength = int64(l)
}
//export nxt_go_request_create_channel
func nxt_go_request_create_channel(c_req C.nxt_go_request_t) {
find_request(c_req).ch = make(chan *cmsg)
func nxt_go_request_set_content_length(go_req C.nxt_go_request_t, l C.int64_t) {
get_request(go_req).req.ContentLength = int64(l)
}
//export nxt_go_request_set_host
func nxt_go_request_set_host(c_req C.nxt_go_request_t, host *C.nxt_go_str_t) {
find_request(c_req).req.Host = C.GoStringN(host.start, host.length)
func nxt_go_request_set_host(go_req C.nxt_go_request_t, host *C.nxt_go_str_t) {
get_request(go_req).req.Host = C.GoStringN(host.start, host.length)
}
//export nxt_go_request_set_url
func nxt_go_request_set_url(c_req C.nxt_go_request_t, scheme *C.char) {
find_request(c_req).req.URL.Scheme = C.GoString(scheme)
func nxt_go_request_set_url(go_req C.nxt_go_request_t, scheme *C.char) {
get_request(go_req).req.URL.Scheme = C.GoString(scheme)
}
//export nxt_go_request_set_remote_addr
func nxt_go_request_set_remote_addr(c_req C.nxt_go_request_t,
func nxt_go_request_set_remote_addr(go_req C.nxt_go_request_t,
addr *C.nxt_go_str_t) {
find_request(c_req).req.RemoteAddr = C.GoStringN(addr.start, addr.length)
}
//export nxt_go_request_serve
func nxt_go_request_serve(c_req C.nxt_go_request_t) {
r := find_request(c_req)
go func(r *request) {
http.DefaultServeMux.ServeHTTP(r.response(), &r.req)
r.done()
}(r)
get_request(go_req).req.RemoteAddr = C.GoStringN(addr.start, addr.length)
}

View File

@@ -13,7 +13,6 @@ import "C"
import (
"fmt"
"net/http"
"os"
)
type response struct {
@@ -43,16 +42,15 @@ func (r *response) Write(p []byte) (n int, err error) {
}
l := C.size_t(len(p))
b := getCBytes(p)
b := buf_ref(p)
res := C.nxt_go_response_write(r.c_req, b, l)
C.free(b)
return int(res), nil
}
func (r *response) WriteHeader(code int) {
if r.headerSent {
// Note: explicitly using Stderr, as Stdout is our HTTP output.
fmt.Fprintf(os.Stderr, "CGI attempted to write header twice")
nxt_go_warn("multiple response.WriteHeader calls")
return
}
r.headerSent = true

View File

@@ -20,32 +20,21 @@ import (
)
type cbuf struct {
b unsafe.Pointer
b C.uintptr_t
s C.size_t
f bool
}
func new_cbuf(buf []byte) *cbuf {
func buf_ref(buf []byte) C.uintptr_t {
if len(buf) == 0 {
return nil
return 0
}
return &cbuf{
getCBytes(buf), C.size_t(len(buf)), true,
}
return C.uintptr_t(uintptr(unsafe.Pointer(&buf[0])))
}
func (buf *cbuf) Close() {
if buf == nil {
return
}
if buf.f && buf.s > 0 {
C.free(buf.b)
buf.f = false
buf.b = nil
buf.s = 0
}
func (buf *cbuf) init(b []byte) {
buf.b = buf_ref(b)
buf.s = C.size_t(len(b))
}
func (buf *cbuf) GoBytes() []byte {
@@ -54,24 +43,7 @@ func (buf *cbuf) GoBytes() []byte {
return b[:0]
}
return C.GoBytes(buf.b, C.int(buf.s))
}
type cmsg struct {
buf cbuf
oob cbuf
}
func new_cmsg(buf []byte, oob []byte) *cmsg {
return &cmsg{
buf: cbuf{getCBytes(buf), C.size_t(len(buf)), true},
oob: cbuf{getCBytes(oob), C.size_t(len(oob)), true},
}
}
func (msg *cmsg) Close() {
msg.buf.Close()
msg.oob.Close()
return C.GoBytes(unsafe.Pointer(uintptr(buf.b)), C.int(buf.s))
}
var nxt_go_quit bool = false
@@ -81,6 +53,14 @@ func nxt_go_set_quit() {
nxt_go_quit = true
}
func nxt_go_warn(format string, args ...interface{}) {
fmt.Fprintf(os.Stderr, "[go warn] " + format + "\n", args...)
}
func nxt_go_debug(format string, args ...interface{}) {
// fmt.Fprintf(os.Stderr, "[go debug] " + format + "\n", args...)
}
func ListenAndServe(addr string, handler http.Handler) error {
var read_port *port