nginext has been renamed to unit.
This commit is contained in:
15
src/go/unit/cbytes-1.6.go
Normal file
15
src/go/unit/cbytes-1.6.go
Normal file
@@ -0,0 +1,15 @@
|
||||
// +build !go1.7
|
||||
|
||||
/*
|
||||
* Copyright (C) Max Romanov
|
||||
* Copyright (C) NGINX, Inc.
|
||||
*/
|
||||
|
||||
package unit
|
||||
|
||||
import "C"
|
||||
import "unsafe"
|
||||
|
||||
func getCBytes(p []byte) unsafe.Pointer {
|
||||
return unsafe.Pointer(C.CString(string(p))) // go <= 1.6
|
||||
}
|
||||
15
src/go/unit/cbytes-1.7.go
Normal file
15
src/go/unit/cbytes-1.7.go
Normal file
@@ -0,0 +1,15 @@
|
||||
// +build go1.7
|
||||
|
||||
/*
|
||||
* Copyright (C) Max Romanov
|
||||
* Copyright (C) NGINX, Inc.
|
||||
*/
|
||||
|
||||
package unit
|
||||
|
||||
import "C"
|
||||
import "unsafe"
|
||||
|
||||
func getCBytes(p []byte) unsafe.Pointer {
|
||||
return C.CBytes(p) // go >= 1.7
|
||||
}
|
||||
66
src/go/unit/nxt_go_array.c
Normal file
66
src/go/unit/nxt_go_array.c
Normal file
@@ -0,0 +1,66 @@
|
||||
|
||||
/*
|
||||
* Copyright (C) Max Romanov
|
||||
* Copyright (C) NGINX, Inc.
|
||||
*/
|
||||
|
||||
#ifndef NXT_CONFIGURE
|
||||
|
||||
#include <stdint.h>
|
||||
#include <sys/types.h>
|
||||
|
||||
#include <nxt_main.h>
|
||||
|
||||
#include "nxt_go_array.h"
|
||||
|
||||
void
|
||||
nxt_go_array_init(nxt_array_t *array, nxt_uint_t n, size_t size)
|
||||
{
|
||||
array->elts = malloc(n * size);
|
||||
|
||||
if (nxt_slow_path(n != 0 && array->elts == NULL)) {
|
||||
return;
|
||||
}
|
||||
|
||||
array->nelts = 0;
|
||||
array->size = size;
|
||||
array->nalloc = n;
|
||||
array->mem_pool = NULL;
|
||||
}
|
||||
|
||||
void *
|
||||
nxt_go_array_add(nxt_array_t *array)
|
||||
{
|
||||
void *p;
|
||||
uint32_t nalloc, new_alloc;
|
||||
|
||||
nalloc = array->nalloc;
|
||||
|
||||
if (array->nelts == nalloc) {
|
||||
|
||||
if (nalloc < 16) {
|
||||
/* Allocate new array twice larger than current. */
|
||||
new_alloc = nalloc * 2;
|
||||
|
||||
} else {
|
||||
/* Allocate new array 1.5 times larger than current. */
|
||||
new_alloc = nalloc + nalloc / 2;
|
||||
}
|
||||
|
||||
p = realloc(array->elts, array->size * new_alloc);
|
||||
|
||||
if (nxt_slow_path(p == NULL)) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
array->elts = p;
|
||||
array->nalloc = new_alloc;
|
||||
}
|
||||
|
||||
p = nxt_pointer_to(array->elts, array->size * array->nelts);
|
||||
array->nelts++;
|
||||
|
||||
return p;
|
||||
}
|
||||
|
||||
#endif /* NXT_CONFIGURE */
|
||||
36
src/go/unit/nxt_go_array.h
Normal file
36
src/go/unit/nxt_go_array.h
Normal file
@@ -0,0 +1,36 @@
|
||||
|
||||
/*
|
||||
* Copyright (C) Max Romanov
|
||||
* Copyright (C) NGINX, Inc.
|
||||
*/
|
||||
|
||||
#ifndef _NXT_GO_ARRAY_H_INCLUDED_
|
||||
#define _NXT_GO_ARRAY_H_INCLUDED_
|
||||
|
||||
|
||||
#include <nxt_array.h>
|
||||
|
||||
void nxt_go_array_init(nxt_array_t *array, nxt_uint_t n, size_t size);
|
||||
|
||||
void *nxt_go_array_add(nxt_array_t *array);
|
||||
|
||||
nxt_inline void *
|
||||
nxt_go_array_zero_add(nxt_array_t *array)
|
||||
{
|
||||
void *p;
|
||||
|
||||
p = nxt_go_array_add(array);
|
||||
|
||||
if (nxt_fast_path(p != NULL)) {
|
||||
nxt_memzero(p, array->size);
|
||||
}
|
||||
|
||||
return p;
|
||||
}
|
||||
|
||||
#define \
|
||||
nxt_go_array_at(array, n) \
|
||||
nxt_pointer_to((array)->elts, (array)->size * (n))
|
||||
|
||||
|
||||
#endif /* _NXT_GO_ARRAY_H_INCLUDED_ */
|
||||
193
src/go/unit/nxt_go_lib.c
Normal file
193
src/go/unit/nxt_go_lib.c
Normal file
@@ -0,0 +1,193 @@
|
||||
|
||||
/*
|
||||
* Copyright (C) Max Romanov
|
||||
* Copyright (C) NGINX, Inc.
|
||||
*/
|
||||
|
||||
#ifdef NXT_CONFIGURE
|
||||
|
||||
#include <stdio.h>
|
||||
#include "nxt_go_lib.h"
|
||||
|
||||
// Stubs to compile during configure process.
|
||||
int
|
||||
nxt_go_response_write(nxt_go_request_t r, void *buf, size_t len)
|
||||
{
|
||||
return -1;
|
||||
}
|
||||
|
||||
int
|
||||
nxt_go_request_read(nxt_go_request_t r, void *dst, size_t dst_len)
|
||||
{
|
||||
return -1;
|
||||
}
|
||||
|
||||
int
|
||||
nxt_go_request_read_from(nxt_go_request_t r, void *dst, size_t dst_len,
|
||||
void *src, size_t src_len)
|
||||
{
|
||||
return -1;
|
||||
}
|
||||
|
||||
int
|
||||
nxt_go_request_close(nxt_go_request_t r)
|
||||
{
|
||||
return -1;
|
||||
}
|
||||
|
||||
int
|
||||
nxt_go_request_done(nxt_go_request_t r)
|
||||
{
|
||||
return -1;
|
||||
}
|
||||
|
||||
void
|
||||
nxt_go_ready()
|
||||
{
|
||||
}
|
||||
|
||||
nxt_go_request_t
|
||||
nxt_go_process_port_msg(void *buf, size_t buf_len, void *oob, size_t oob_len)
|
||||
{
|
||||
return 0;
|
||||
}
|
||||
|
||||
#else
|
||||
|
||||
#include "nxt_go_run_ctx.h"
|
||||
#include "nxt_go_log.h"
|
||||
#include "nxt_go_port.h"
|
||||
|
||||
#include <nxt_main.h>
|
||||
#include <nxt_go_gen.h>
|
||||
|
||||
int
|
||||
nxt_go_response_write(nxt_go_request_t r, void *buf, size_t len)
|
||||
{
|
||||
nxt_int_t rc;
|
||||
nxt_go_run_ctx_t *ctx;
|
||||
|
||||
if (nxt_slow_path(r == 0)) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
nxt_go_debug("write: %d", (int) len);
|
||||
|
||||
ctx = (nxt_go_run_ctx_t *) r;
|
||||
rc = nxt_go_ctx_write(ctx, buf, len);
|
||||
|
||||
return rc == NXT_OK ? len : -1;
|
||||
}
|
||||
|
||||
|
||||
int
|
||||
nxt_go_request_read(nxt_go_request_t r, void *dst, size_t dst_len)
|
||||
{
|
||||
size_t res;
|
||||
nxt_go_run_ctx_t *ctx;
|
||||
|
||||
if (nxt_slow_path(r == 0)) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
ctx = (nxt_go_run_ctx_t *) r;
|
||||
|
||||
dst_len = nxt_min(dst_len, ctx->r.body.preread_size);
|
||||
|
||||
res = nxt_go_ctx_read_raw(ctx, dst, dst_len);
|
||||
|
||||
ctx->r.body.preread_size -= res;
|
||||
|
||||
return res;
|
||||
}
|
||||
|
||||
|
||||
int
|
||||
nxt_go_request_read_from(nxt_go_request_t r, void *dst, size_t dst_len,
|
||||
void *src, size_t src_len)
|
||||
{
|
||||
nxt_go_run_ctx_t *ctx;
|
||||
|
||||
if (nxt_slow_path(r == 0)) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
ctx = (nxt_go_run_ctx_t *) r;
|
||||
|
||||
nxt_go_ctx_add_msg(ctx, src, src_len);
|
||||
|
||||
return nxt_go_request_read(r, dst, dst_len);
|
||||
}
|
||||
|
||||
|
||||
int
|
||||
nxt_go_request_close(nxt_go_request_t r)
|
||||
{
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
int
|
||||
nxt_go_request_done(nxt_go_request_t r)
|
||||
{
|
||||
nxt_int_t res;
|
||||
nxt_go_run_ctx_t *ctx;
|
||||
nxt_go_msg_t *msg, *b;
|
||||
|
||||
if (nxt_slow_path(r == 0)) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
ctx = (nxt_go_run_ctx_t *) r;
|
||||
|
||||
res = nxt_go_ctx_flush(ctx, 1);
|
||||
|
||||
nxt_go_ctx_release_msg(ctx, &ctx->msg);
|
||||
|
||||
msg = ctx->msg.next;
|
||||
while (msg != NULL) {
|
||||
nxt_go_ctx_release_msg(ctx, msg);
|
||||
|
||||
b = msg;
|
||||
msg = b->next;
|
||||
|
||||
free(b);
|
||||
}
|
||||
|
||||
free(ctx);
|
||||
|
||||
return res;
|
||||
}
|
||||
|
||||
|
||||
void
|
||||
nxt_go_ready()
|
||||
{
|
||||
char *go_stream;
|
||||
nxt_port_msg_t port_msg;
|
||||
|
||||
go_stream = getenv("NXT_GO_STREAM");
|
||||
|
||||
if (go_stream == NULL) {
|
||||
return;
|
||||
}
|
||||
|
||||
port_msg.stream = atol(go_stream);
|
||||
port_msg.pid = getpid();
|
||||
port_msg.reply_port = 0;
|
||||
port_msg.type = _NXT_PORT_MSG_READY;
|
||||
port_msg.last = 1;
|
||||
port_msg.mmap = 0;
|
||||
|
||||
nxt_go_main_send(&port_msg, sizeof(port_msg), NULL, 0);
|
||||
}
|
||||
|
||||
|
||||
nxt_go_request_t
|
||||
nxt_go_process_port_msg(void *buf, size_t buf_len, void *oob, size_t oob_len)
|
||||
{
|
||||
return nxt_go_port_on_read(buf, buf_len, oob, oob_len);
|
||||
}
|
||||
|
||||
|
||||
#endif /* NXT_CONFIGURE */
|
||||
39
src/go/unit/nxt_go_lib.h
Normal file
39
src/go/unit/nxt_go_lib.h
Normal file
@@ -0,0 +1,39 @@
|
||||
|
||||
/*
|
||||
* Copyright (C) Max Romanov
|
||||
* Copyright (C) NGINX, Inc.
|
||||
*/
|
||||
|
||||
#ifndef _NXT_GO_LIB_H_INCLUDED_
|
||||
#define _NXT_GO_LIB_H_INCLUDED_
|
||||
|
||||
|
||||
#include <stdint.h>
|
||||
#include <stdlib.h>
|
||||
#include <sys/types.h>
|
||||
|
||||
typedef struct {
|
||||
int length;
|
||||
char *start;
|
||||
} nxt_go_str_t;
|
||||
|
||||
typedef uintptr_t nxt_go_request_t;
|
||||
|
||||
int nxt_go_response_write(nxt_go_request_t r, void *buf, size_t len);
|
||||
|
||||
int nxt_go_request_read(nxt_go_request_t r, void *dst, size_t dst_len);
|
||||
|
||||
int nxt_go_request_read_from(nxt_go_request_t r, void *dst, size_t dst_len,
|
||||
void *src, size_t src_len);
|
||||
|
||||
int nxt_go_request_close(nxt_go_request_t r);
|
||||
|
||||
int nxt_go_request_done(nxt_go_request_t r);
|
||||
|
||||
void nxt_go_ready();
|
||||
|
||||
nxt_go_request_t nxt_go_process_port_msg(void *buf, size_t buf_len,
|
||||
void *oob, size_t oob_len);
|
||||
|
||||
|
||||
#endif /* _NXT_GO_LIB_H_INCLUDED_ */
|
||||
34
src/go/unit/nxt_go_log.h
Normal file
34
src/go/unit/nxt_go_log.h
Normal file
@@ -0,0 +1,34 @@
|
||||
|
||||
/*
|
||||
* Copyright (C) Max Romanov
|
||||
* Copyright (C) NGINX, Inc.
|
||||
*/
|
||||
|
||||
#ifndef _NXT_GO_LOG_H_INCLUDED_
|
||||
#define _NXT_GO_LOG_H_INCLUDED_
|
||||
|
||||
|
||||
#include <stdio.h>
|
||||
#include <pthread.h>
|
||||
|
||||
#include <nxt_auto_config.h>
|
||||
|
||||
#if (NXT_DEBUG)
|
||||
|
||||
#define nxt_go_debug(fmt, ARGS...) \
|
||||
fprintf(stderr, "[go debug] " fmt "\n", ##ARGS)
|
||||
|
||||
#else
|
||||
|
||||
#define nxt_go_debug(fmt, ARGS...)
|
||||
|
||||
#endif
|
||||
|
||||
#define nxt_go_warn(fmt, ARGS...) \
|
||||
fprintf(stderr, "[go warn] " fmt "\n", ##ARGS)
|
||||
|
||||
#define nxt_go_error(fmt, ARGS...) \
|
||||
fprintf(stderr, "[go error] " fmt "\n", ##ARGS)
|
||||
|
||||
|
||||
#endif /* _NXT_GO_LOG_H_INCLUDED_ */
|
||||
21
src/go/unit/nxt_go_mutex.h
Normal file
21
src/go/unit/nxt_go_mutex.h
Normal file
@@ -0,0 +1,21 @@
|
||||
|
||||
/*
|
||||
* Copyright (C) Max Romanov
|
||||
* Copyright (C) NGINX, Inc.
|
||||
*/
|
||||
|
||||
#ifndef _NXT_GO_MUTEX_H_INCLUDED_
|
||||
#define _NXT_GO_MUTEX_H_INCLUDED_
|
||||
|
||||
|
||||
#include <pthread.h>
|
||||
|
||||
typedef pthread_mutex_t nxt_go_mutex_t;
|
||||
|
||||
#define nxt_go_mutex_create(mutex) pthread_mutex_init(mutex, NULL)
|
||||
#define nxt_go_mutex_destroy(mutex) pthread_mutex_destroy(mutex)
|
||||
#define nxt_go_mutex_lock(mutex) pthread_mutex_lock(mutex)
|
||||
#define nxt_go_mutex_unlock(mutex) pthread_mutex_unlock(mutex)
|
||||
|
||||
|
||||
#endif /* _NXT_GO_MUTEX_H_INCLUDED_ */
|
||||
210
src/go/unit/nxt_go_port.c
Normal file
210
src/go/unit/nxt_go_port.c
Normal file
@@ -0,0 +1,210 @@
|
||||
|
||||
/*
|
||||
* Copyright (C) Max Romanov
|
||||
* Copyright (C) NGINX, Inc.
|
||||
*/
|
||||
|
||||
#ifndef NXT_CONFIGURE
|
||||
|
||||
|
||||
#include "nxt_go_port.h"
|
||||
#include "nxt_go_log.h"
|
||||
#include "nxt_go_process.h"
|
||||
#include "nxt_go_run_ctx.h"
|
||||
|
||||
#include <nxt_main.h>
|
||||
#include <nxt_go_gen.h>
|
||||
|
||||
|
||||
#define nxt_go_str(p) ((nxt_go_str_t *)(p))
|
||||
|
||||
static nxt_go_request_t
|
||||
nxt_go_data_handler(nxt_port_msg_t *port_msg, size_t size)
|
||||
{
|
||||
size_t s;
|
||||
nxt_str_t n, v;
|
||||
nxt_int_t rc;
|
||||
nxt_uint_t i;
|
||||
nxt_go_run_ctx_t *ctx;
|
||||
nxt_go_request_t r;
|
||||
nxt_app_request_header_t *h;
|
||||
|
||||
r = nxt_go_find_request(port_msg->stream);
|
||||
if (r != 0) {
|
||||
return r;
|
||||
}
|
||||
|
||||
ctx = malloc(sizeof(nxt_go_run_ctx_t));
|
||||
nxt_go_ctx_init(ctx, port_msg, size - sizeof(nxt_port_msg_t));
|
||||
|
||||
r = (nxt_go_request_t)(ctx);
|
||||
h = &ctx->r.header;
|
||||
|
||||
nxt_go_ctx_read_str(ctx, &h->method);
|
||||
nxt_go_ctx_read_str(ctx, &h->target);
|
||||
nxt_go_ctx_read_str(ctx, &h->path);
|
||||
|
||||
nxt_go_ctx_read_size(ctx, &s);
|
||||
if (s > 0) {
|
||||
s--;
|
||||
h->query.start = h->target.start + s;
|
||||
h->query.length = h->target.length - s;
|
||||
|
||||
if (h->path.start == NULL) {
|
||||
h->path.start = h->target.start;
|
||||
h->path.length = s - 1;
|
||||
}
|
||||
}
|
||||
|
||||
if (h->path.start == NULL) {
|
||||
h->path = h->target;
|
||||
}
|
||||
|
||||
nxt_go_new_request(r, port_msg->stream, nxt_go_str(&h->method),
|
||||
nxt_go_str(&h->target));
|
||||
|
||||
nxt_go_ctx_read_str(ctx, &h->version);
|
||||
|
||||
nxt_go_request_set_proto(r, nxt_go_str(&h->version),
|
||||
h->version.start[5] - '0',
|
||||
h->version.start[7] - '0');
|
||||
|
||||
nxt_go_ctx_read_str(ctx, &ctx->r.remote);
|
||||
if (ctx->r.remote.start != NULL) {
|
||||
nxt_go_request_set_remote_addr(r, nxt_go_str(&ctx->r.remote));
|
||||
}
|
||||
|
||||
nxt_go_ctx_read_str(ctx, &h->host);
|
||||
nxt_go_ctx_read_str(ctx, &h->cookie);
|
||||
nxt_go_ctx_read_str(ctx, &h->content_type);
|
||||
nxt_go_ctx_read_str(ctx, &h->content_length);
|
||||
|
||||
if (h->host.start != NULL) {
|
||||
nxt_go_request_set_host(r, nxt_go_str(&h->host));
|
||||
}
|
||||
|
||||
nxt_go_ctx_read_size(ctx, &s);
|
||||
h->parsed_content_length = s;
|
||||
|
||||
do {
|
||||
rc = nxt_go_ctx_read_str(ctx, &n);
|
||||
|
||||
if (n.length == 0) {
|
||||
break;
|
||||
}
|
||||
|
||||
rc = nxt_go_ctx_read_str(ctx, &v);
|
||||
nxt_go_request_add_header(r, nxt_go_str(&n), nxt_go_str(&v));
|
||||
} while(1);
|
||||
|
||||
nxt_go_ctx_read_size(ctx, &s);
|
||||
ctx->r.body.preread_size = s;
|
||||
|
||||
if (h->parsed_content_length > 0) {
|
||||
nxt_go_request_set_content_length(r, h->parsed_content_length);
|
||||
}
|
||||
|
||||
if (ctx->r.body.preread_size < h->parsed_content_length) {
|
||||
nxt_go_request_create_channel(r);
|
||||
}
|
||||
|
||||
return r;
|
||||
}
|
||||
|
||||
nxt_go_request_t
|
||||
nxt_go_port_on_read(void *buf, size_t buf_size, void *oob, size_t oob_size)
|
||||
{
|
||||
void *buf_end;
|
||||
void *payload;
|
||||
size_t payload_size;
|
||||
nxt_fd_t fd;
|
||||
struct cmsghdr *cm;
|
||||
nxt_port_msg_t *port_msg;
|
||||
nxt_port_msg_new_port_t *new_port_msg;
|
||||
|
||||
fd = -1;
|
||||
nxt_go_debug("on read: %d (%d)", (int)buf_size, (int)oob_size);
|
||||
|
||||
cm = oob;
|
||||
if (oob_size >= CMSG_SPACE(sizeof(int))
|
||||
&& cm->cmsg_len == CMSG_LEN(sizeof(int))
|
||||
&& cm->cmsg_level == SOL_SOCKET
|
||||
&& cm->cmsg_type == SCM_RIGHTS) {
|
||||
|
||||
nxt_memcpy(&fd, CMSG_DATA(cm), sizeof(int));
|
||||
nxt_go_debug("fd = %d", fd);
|
||||
}
|
||||
|
||||
port_msg = buf;
|
||||
if (buf_size < sizeof(nxt_port_msg_t)) {
|
||||
nxt_go_warn("message too small (%d bytes)", (int)buf_size);
|
||||
goto fail;
|
||||
}
|
||||
|
||||
buf_end = ((char *)buf) + buf_size;
|
||||
|
||||
payload = port_msg + 1;
|
||||
payload_size = buf_size - sizeof(nxt_port_msg_t);
|
||||
|
||||
if (port_msg->mmap) {
|
||||
nxt_go_debug("using data in shared memory");
|
||||
}
|
||||
|
||||
if (port_msg->type >= NXT_PORT_MSG_MAX) {
|
||||
nxt_go_warn("unknown message type (%d)", (int)port_msg->type);
|
||||
goto fail;
|
||||
}
|
||||
|
||||
switch (port_msg->type) {
|
||||
case _NXT_PORT_MSG_QUIT:
|
||||
nxt_go_debug("quit");
|
||||
|
||||
nxt_go_set_quit();
|
||||
break;
|
||||
|
||||
case _NXT_PORT_MSG_NEW_PORT:
|
||||
nxt_go_debug("new port");
|
||||
new_port_msg = payload;
|
||||
|
||||
nxt_go_new_port(new_port_msg->pid, new_port_msg->id, new_port_msg->type,
|
||||
-1, fd);
|
||||
break;
|
||||
|
||||
case _NXT_PORT_MSG_CHANGE_FILE:
|
||||
nxt_go_debug("change file");
|
||||
break;
|
||||
|
||||
case _NXT_PORT_MSG_MMAP:
|
||||
nxt_go_debug("mmap");
|
||||
|
||||
nxt_go_new_incoming_mmap(port_msg->pid, fd);
|
||||
break;
|
||||
|
||||
case _NXT_PORT_MSG_DATA:
|
||||
nxt_go_debug("data");
|
||||
|
||||
return nxt_go_data_handler(port_msg, buf_size);
|
||||
|
||||
case _NXT_PORT_MSG_REMOVE_PID:
|
||||
nxt_go_debug("remove pid");
|
||||
|
||||
/* TODO remove all ports for this pid in Go */
|
||||
/* TODO remove incoming & outgoing mmaps for this pid */
|
||||
break;
|
||||
|
||||
default:
|
||||
goto fail;
|
||||
}
|
||||
|
||||
|
||||
fail:
|
||||
|
||||
if (fd != -1) {
|
||||
close(fd);
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
#endif /* NXT_CONFIGURE */
|
||||
18
src/go/unit/nxt_go_port.h
Normal file
18
src/go/unit/nxt_go_port.h
Normal file
@@ -0,0 +1,18 @@
|
||||
|
||||
/*
|
||||
* Copyright (C) Max Romanov
|
||||
* Copyright (C) NGINX, Inc.
|
||||
*/
|
||||
|
||||
#ifndef _NXT_GO_PORT_H_INCLUDED_
|
||||
#define _NXT_GO_PORT_H_INCLUDED_
|
||||
|
||||
|
||||
#include <sys/types.h>
|
||||
#include "nxt_go_lib.h"
|
||||
|
||||
nxt_go_request_t
|
||||
nxt_go_port_on_read(void *buf, size_t buf_size, void *oob, size_t oob_size);
|
||||
|
||||
|
||||
#endif /* _NXT_GO_PORT_H_INCLUDED_ */
|
||||
192
src/go/unit/nxt_go_port_memory.c
Normal file
192
src/go/unit/nxt_go_port_memory.c
Normal file
@@ -0,0 +1,192 @@
|
||||
|
||||
/*
|
||||
* Copyright (C) Max Romanov
|
||||
* Copyright (C) NGINX, Inc.
|
||||
*/
|
||||
|
||||
#ifndef NXT_CONFIGURE
|
||||
|
||||
|
||||
#include "nxt_go_port_memory.h"
|
||||
#include "nxt_go_process.h"
|
||||
#include "nxt_go_array.h"
|
||||
#include "nxt_go_log.h"
|
||||
|
||||
#include <nxt_go_gen.h>
|
||||
#include <nxt_main.h>
|
||||
|
||||
#if (NXT_HAVE_MEMFD_CREATE)
|
||||
|
||||
#include <linux/memfd.h>
|
||||
#include <unistd.h>
|
||||
#include <sys/syscall.h>
|
||||
|
||||
#endif
|
||||
|
||||
|
||||
static nxt_port_mmap_header_t *
|
||||
nxt_go_new_port_mmap(nxt_go_process_t *process, nxt_port_id_t id)
|
||||
{
|
||||
int name_len, rc;
|
||||
void *mem;
|
||||
char name[64];
|
||||
nxt_fd_t fd;
|
||||
nxt_port_msg_t port_msg;
|
||||
nxt_port_mmap_t *port_mmap;
|
||||
nxt_port_mmap_header_t *hdr;
|
||||
|
||||
fd = -1;
|
||||
|
||||
union {
|
||||
struct cmsghdr cm;
|
||||
char space[CMSG_SPACE(sizeof(int))];
|
||||
} cmsg;
|
||||
|
||||
port_mmap = nxt_go_array_zero_add(&process->outgoing);
|
||||
if (nxt_slow_path(port_mmap == NULL)) {
|
||||
nxt_go_warn("failed to add port mmap to outgoing array");
|
||||
|
||||
return NULL;
|
||||
}
|
||||
|
||||
name_len = snprintf(name, sizeof(name) - 1, "/unit.go.%p", name);
|
||||
|
||||
#if (NXT_HAVE_MEMFD_CREATE)
|
||||
fd = syscall(SYS_memfd_create, name, MFD_CLOEXEC);
|
||||
|
||||
if (nxt_slow_path(fd == -1)) {
|
||||
nxt_go_warn("memfd_create(%s) failed %d", name, errno);
|
||||
|
||||
goto remove_fail;
|
||||
}
|
||||
|
||||
nxt_go_debug("memfd_create(%s): %d", name, fd);
|
||||
|
||||
#elif (NXT_HAVE_SHM_OPEN)
|
||||
shm_unlink((char *) name); // just in case
|
||||
|
||||
fd = shm_open((char *) name, O_CREAT | O_EXCL | O_RDWR, S_IRUSR | S_IWUSR);
|
||||
|
||||
nxt_go_debug("shm_open(%s): %d", name, fd);
|
||||
|
||||
if (nxt_slow_path(fd == -1)) {
|
||||
nxt_go_warn("shm_open(%s) failed %d", name, errno);
|
||||
|
||||
goto remove_fail;
|
||||
}
|
||||
|
||||
if (nxt_slow_path(shm_unlink((char *) name) == -1)) {
|
||||
nxt_go_warn("shm_unlink(%s) failed %d", name, errno);
|
||||
}
|
||||
#endif
|
||||
|
||||
if (nxt_slow_path(ftruncate(fd, PORT_MMAP_SIZE) == -1)) {
|
||||
nxt_go_warn("ftruncate() failed %d", errno);
|
||||
|
||||
goto remove_fail;
|
||||
}
|
||||
|
||||
mem = mmap(NULL, PORT_MMAP_SIZE,
|
||||
PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
|
||||
|
||||
if (nxt_slow_path(mem == MAP_FAILED)) {
|
||||
goto remove_fail;
|
||||
}
|
||||
|
||||
port_mmap->hdr = mem;
|
||||
|
||||
/* Init segment header. */
|
||||
hdr = port_mmap->hdr;
|
||||
|
||||
memset(hdr->free_map, 0xFFU, sizeof(hdr->free_map));
|
||||
|
||||
hdr->id = process->outgoing.nelts - 1;
|
||||
hdr->pid = process->pid;
|
||||
|
||||
/* Mark first chunk as busy */
|
||||
nxt_port_mmap_set_chunk_busy(hdr, 0);
|
||||
|
||||
/* Mark as busy chunk followed the last available chunk. */
|
||||
nxt_port_mmap_set_chunk_busy(hdr, PORT_MMAP_CHUNK_COUNT);
|
||||
|
||||
port_msg.stream = 0;
|
||||
port_msg.pid = getpid();
|
||||
port_msg.reply_port = 0;
|
||||
port_msg.type = _NXT_PORT_MSG_MMAP;
|
||||
port_msg.last = 1;
|
||||
port_msg.mmap = 0;
|
||||
|
||||
cmsg.cm.cmsg_len = CMSG_LEN(sizeof(int));
|
||||
cmsg.cm.cmsg_level = SOL_SOCKET;
|
||||
cmsg.cm.cmsg_type = SCM_RIGHTS;
|
||||
|
||||
/*
|
||||
* nxt_memcpy() is used instead of simple
|
||||
* *(int *) CMSG_DATA(&cmsg.cm) = fd;
|
||||
* because GCC 4.4 with -O2/3/s optimization may issue a warning:
|
||||
* dereferencing type-punned pointer will break strict-aliasing rules
|
||||
*
|
||||
* Fortunately, GCC with -O1 compiles this nxt_memcpy()
|
||||
* in the same simple assignment as in the code above.
|
||||
*/
|
||||
memcpy(CMSG_DATA(&cmsg.cm), &fd, sizeof(int));
|
||||
|
||||
rc = nxt_go_port_send(hdr->pid, id, &port_msg, sizeof(port_msg),
|
||||
&cmsg, sizeof(cmsg));
|
||||
|
||||
nxt_go_debug("new mmap #%d created for %d -> %d",
|
||||
(int)hdr->id, (int)getpid(), (int)process->pid);
|
||||
|
||||
close(fd);
|
||||
|
||||
return hdr;
|
||||
|
||||
remove_fail:
|
||||
|
||||
if (fd != -1) {
|
||||
close(fd);
|
||||
}
|
||||
|
||||
process->outgoing.nelts--;
|
||||
|
||||
return NULL;
|
||||
}
|
||||
|
||||
nxt_port_mmap_header_t *
|
||||
nxt_go_port_mmap_get(nxt_go_process_t *process, nxt_port_id_t port_id,
|
||||
nxt_chunk_id_t *c)
|
||||
{
|
||||
nxt_port_mmap_t *port_mmap;
|
||||
nxt_port_mmap_t *end_port_mmap;
|
||||
nxt_port_mmap_header_t *hdr;
|
||||
|
||||
port_mmap = NULL;
|
||||
hdr = NULL;
|
||||
|
||||
nxt_go_mutex_lock(&process->outgoing_mutex);
|
||||
|
||||
port_mmap = process->outgoing.elts;
|
||||
end_port_mmap = port_mmap + process->outgoing.nelts;
|
||||
|
||||
while (port_mmap < end_port_mmap) {
|
||||
|
||||
if (nxt_port_mmap_get_free_chunk(port_mmap->hdr, c)) {
|
||||
hdr = port_mmap->hdr;
|
||||
|
||||
goto unlock_return;
|
||||
}
|
||||
|
||||
port_mmap++;
|
||||
}
|
||||
|
||||
hdr = nxt_go_new_port_mmap(process, port_id);
|
||||
|
||||
unlock_return:
|
||||
|
||||
nxt_go_mutex_unlock(&process->outgoing_mutex);
|
||||
|
||||
return hdr;
|
||||
}
|
||||
|
||||
|
||||
#endif /* NXT_CONFIGURE */
|
||||
24
src/go/unit/nxt_go_port_memory.h
Normal file
24
src/go/unit/nxt_go_port_memory.h
Normal file
@@ -0,0 +1,24 @@
|
||||
|
||||
/*
|
||||
* Copyright (C) Max Romanov
|
||||
* Copyright (C) NGINX, Inc.
|
||||
*/
|
||||
|
||||
#ifndef _NXT_GO_PORT_MEMORY_H_INCLUDED_
|
||||
#define _NXT_GO_PORT_MEMORY_H_INCLUDED_
|
||||
|
||||
|
||||
#include <nxt_main.h>
|
||||
#include <nxt_port_memory_int.h>
|
||||
|
||||
#ifndef _NXT_GO_PROCESS_T_DEFINED_
|
||||
#define _NXT_GO_PROCESS_T_DEFINED_
|
||||
typedef struct nxt_go_process_s nxt_go_process_t;
|
||||
#endif
|
||||
|
||||
struct nxt_port_mmap_header_s *
|
||||
nxt_go_port_mmap_get(nxt_go_process_t *process, nxt_port_id_t port_id,
|
||||
nxt_chunk_id_t *c);
|
||||
|
||||
|
||||
#endif /* _NXT_GO_PORT_MEMORY_H_INCLUDED_ */
|
||||
150
src/go/unit/nxt_go_process.c
Normal file
150
src/go/unit/nxt_go_process.c
Normal file
@@ -0,0 +1,150 @@
|
||||
|
||||
/*
|
||||
* Copyright (C) Max Romanov
|
||||
* Copyright (C) NGINX, Inc.
|
||||
*/
|
||||
|
||||
#ifndef NXT_CONFIGURE
|
||||
|
||||
|
||||
#include "nxt_go_process.h"
|
||||
#include "nxt_go_array.h"
|
||||
#include "nxt_go_mutex.h"
|
||||
#include "nxt_go_log.h"
|
||||
|
||||
#include <nxt_port_memory_int.h>
|
||||
|
||||
|
||||
static nxt_array_t processes; /* of nxt_go_process_t */
|
||||
|
||||
static nxt_go_process_t *
|
||||
nxt_go_find_process(nxt_pid_t pid, uint32_t *pos)
|
||||
{
|
||||
uint32_t l, r, i;
|
||||
nxt_go_process_t *process;
|
||||
|
||||
if (nxt_slow_path(processes.size == 0)) {
|
||||
nxt_go_array_init(&processes, 1, sizeof(nxt_go_process_t));
|
||||
}
|
||||
|
||||
l = 0;
|
||||
r = processes.nelts;
|
||||
i = (l + r) / 2;
|
||||
|
||||
while (r > l) {
|
||||
process = nxt_go_array_at(&processes, i);
|
||||
|
||||
nxt_go_debug("compare process #%d (%p) at %d",
|
||||
(int)process->pid, process, (int)i);
|
||||
|
||||
if (pid == process->pid) {
|
||||
nxt_go_debug("found process %d at %d", (int)pid, (int)i);
|
||||
|
||||
if (pos != NULL) {
|
||||
*pos = i;
|
||||
}
|
||||
|
||||
return process;
|
||||
}
|
||||
|
||||
if (pid < process->pid) {
|
||||
r = i;
|
||||
} else {
|
||||
l = i + 1;
|
||||
}
|
||||
|
||||
i = (l + r) / 2;
|
||||
}
|
||||
|
||||
if (pos != NULL) {
|
||||
*pos = i;
|
||||
}
|
||||
|
||||
nxt_go_debug("process %d not found, best pos %d", (int)pid, (int)i);
|
||||
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
||||
nxt_go_process_t *
|
||||
nxt_go_get_process(nxt_pid_t pid)
|
||||
{
|
||||
uint32_t pos;
|
||||
nxt_go_process_t *process;
|
||||
|
||||
process = nxt_go_find_process(pid, &pos);
|
||||
|
||||
if (process == NULL) {
|
||||
nxt_go_array_add(&processes);
|
||||
process = nxt_go_array_at(&processes, pos);
|
||||
|
||||
nxt_go_debug("init process #%d (%p) at %d", (int)pid, process,
|
||||
(int)pos);
|
||||
|
||||
if (pos < processes.nelts - 1) {
|
||||
memmove(process + 1, process,
|
||||
processes.size * (processes.nelts - 1 - pos));
|
||||
}
|
||||
|
||||
process->pid = pid;
|
||||
nxt_go_mutex_create(&process->incoming_mutex);
|
||||
nxt_go_array_init(&process->incoming, 1, sizeof(nxt_port_mmap_t));
|
||||
nxt_go_mutex_create(&process->outgoing_mutex);
|
||||
nxt_go_array_init(&process->outgoing, 1, sizeof(nxt_port_mmap_t));
|
||||
}
|
||||
|
||||
return process;
|
||||
}
|
||||
|
||||
|
||||
void
|
||||
nxt_go_new_incoming_mmap(nxt_pid_t pid, nxt_fd_t fd)
|
||||
{
|
||||
void *mem;
|
||||
struct stat mmap_stat;
|
||||
nxt_port_mmap_t *port_mmap;
|
||||
nxt_go_process_t *process;
|
||||
|
||||
process = nxt_go_get_process(pid);
|
||||
|
||||
nxt_go_debug("got new mmap fd #%d from process %d",
|
||||
(int)fd, (int)pid);
|
||||
|
||||
if (fstat(fd, &mmap_stat) == -1) {
|
||||
nxt_go_warn("fstat(%d) failed %d", (int)fd, errno);
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
nxt_go_mutex_lock(&process->incoming_mutex);
|
||||
|
||||
port_mmap = nxt_go_array_zero_add(&process->incoming);
|
||||
if (nxt_slow_path(port_mmap == NULL)) {
|
||||
nxt_go_warn("failed to add mmap to incoming array");
|
||||
|
||||
goto fail;
|
||||
}
|
||||
|
||||
mem = mmap(NULL, mmap_stat.st_size,
|
||||
PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
|
||||
|
||||
if (nxt_slow_path(mem == MAP_FAILED)) {
|
||||
nxt_go_warn("mmap() failed %d", errno);
|
||||
|
||||
goto fail;
|
||||
}
|
||||
|
||||
port_mmap->hdr = mem;
|
||||
|
||||
if (nxt_slow_path(port_mmap->hdr->id != process->incoming.nelts - 1)) {
|
||||
nxt_go_warn("port mmap id mismatch (%d != %d)",
|
||||
port_mmap->hdr->id, process->incoming.nelts - 1);
|
||||
}
|
||||
|
||||
fail:
|
||||
|
||||
nxt_go_mutex_unlock(&process->incoming_mutex);
|
||||
}
|
||||
|
||||
|
||||
#endif /* NXT_CONFIGURE */
|
||||
33
src/go/unit/nxt_go_process.h
Normal file
33
src/go/unit/nxt_go_process.h
Normal file
@@ -0,0 +1,33 @@
|
||||
|
||||
/*
|
||||
* Copyright (C) Max Romanov
|
||||
* Copyright (C) NGINX, Inc.
|
||||
*/
|
||||
|
||||
#ifndef _NXT_GO_PROCESS_H_INCLUDED_
|
||||
#define _NXT_GO_PROCESS_H_INCLUDED_
|
||||
|
||||
|
||||
#include <nxt_main.h>
|
||||
#include "nxt_go_mutex.h"
|
||||
|
||||
#ifndef _NXT_GO_PROCESS_T_DEFINED_
|
||||
#define _NXT_GO_PROCESS_T_DEFINED_
|
||||
typedef struct nxt_go_process_s nxt_go_process_t;
|
||||
#endif
|
||||
|
||||
struct nxt_go_process_s {
|
||||
nxt_pid_t pid;
|
||||
nxt_go_mutex_t incoming_mutex;
|
||||
nxt_array_t incoming; /* of nxt_port_mmap_t */
|
||||
nxt_go_mutex_t outgoing_mutex;
|
||||
nxt_array_t outgoing; /* of nxt_port_mmap_t */
|
||||
};
|
||||
|
||||
nxt_go_process_t *nxt_go_get_process(nxt_pid_t pid);
|
||||
|
||||
void nxt_go_new_incoming_mmap(nxt_pid_t pid, nxt_fd_t fd);
|
||||
|
||||
|
||||
#endif /* _NXT_GO_PROCESS_H_INCLUDED_ */
|
||||
|
||||
531
src/go/unit/nxt_go_run_ctx.c
Normal file
531
src/go/unit/nxt_go_run_ctx.c
Normal file
@@ -0,0 +1,531 @@
|
||||
|
||||
/*
|
||||
* Copyright (C) Max Romanov
|
||||
* Copyright (C) NGINX, Inc.
|
||||
*/
|
||||
|
||||
#ifndef NXT_CONFIGURE
|
||||
|
||||
|
||||
#include "nxt_go_run_ctx.h"
|
||||
#include "nxt_go_log.h"
|
||||
#include "nxt_go_process.h"
|
||||
#include "nxt_go_array.h"
|
||||
#include "nxt_go_mutex.h"
|
||||
#include "nxt_go_port_memory.h"
|
||||
|
||||
#include <nxt_port_memory_int.h>
|
||||
#include <nxt_main.h>
|
||||
#include <nxt_go_gen.h>
|
||||
|
||||
|
||||
static nxt_int_t
|
||||
nxt_go_ctx_msg_rbuf(nxt_go_run_ctx_t *ctx, nxt_go_msg_t *msg, nxt_buf_t *buf,
|
||||
uint32_t n)
|
||||
{
|
||||
size_t nchunks;
|
||||
nxt_port_mmap_t *port_mmap;
|
||||
nxt_port_mmap_msg_t *mmap_msg;
|
||||
|
||||
if (nxt_slow_path(msg->mmap_msg == NULL)) {
|
||||
if (n > 0) {
|
||||
nxt_go_warn("failed to get plain buf #%d", (int)n);
|
||||
|
||||
return NXT_ERROR;
|
||||
}
|
||||
|
||||
buf->mem.start = (u_char *) (msg->port_msg + 1);
|
||||
buf->mem.pos = buf->mem.start;
|
||||
buf->mem.end = buf->mem.start + msg->raw_size;
|
||||
buf->mem.free = buf->mem.end;
|
||||
|
||||
return NXT_OK;
|
||||
}
|
||||
|
||||
mmap_msg = msg->mmap_msg + n;
|
||||
if (nxt_slow_path(mmap_msg >= msg->end)) {
|
||||
nxt_go_warn("no more data in shm #%d", (int)n);
|
||||
|
||||
return NXT_ERROR;
|
||||
}
|
||||
|
||||
if (nxt_slow_path(mmap_msg->mmap_id >= ctx->process->incoming.nelts)) {
|
||||
nxt_go_warn("incoming shared memory segment #%d not found "
|
||||
"for process %d", (int)mmap_msg->mmap_id,
|
||||
(int)msg->port_msg->pid);
|
||||
|
||||
return NXT_ERROR;
|
||||
}
|
||||
|
||||
nxt_go_mutex_lock(&ctx->process->incoming_mutex);
|
||||
|
||||
port_mmap = nxt_go_array_at(&ctx->process->incoming, mmap_msg->mmap_id);
|
||||
buf->mem.start = nxt_port_mmap_chunk_start(port_mmap->hdr,
|
||||
mmap_msg->chunk_id);
|
||||
buf->mem.pos = buf->mem.start;
|
||||
buf->mem.free = buf->mem.start + mmap_msg->size;
|
||||
|
||||
nxt_go_mutex_unlock(&ctx->process->incoming_mutex);
|
||||
|
||||
nchunks = mmap_msg->size / PORT_MMAP_CHUNK_SIZE;
|
||||
if ((mmap_msg->size % PORT_MMAP_CHUNK_SIZE) != 0) {
|
||||
nchunks++;
|
||||
}
|
||||
|
||||
buf->mem.end = buf->mem.start + nchunks * PORT_MMAP_CHUNK_SIZE;
|
||||
|
||||
return NXT_OK;
|
||||
}
|
||||
|
||||
static nxt_int_t
|
||||
nxt_go_ctx_init_rbuf(nxt_go_run_ctx_t *ctx)
|
||||
{
|
||||
return nxt_go_ctx_msg_rbuf(ctx, &ctx->msg, &ctx->rbuf, ctx->nrbuf);
|
||||
}
|
||||
|
||||
static void
|
||||
nxt_go_ctx_init_msg(nxt_go_msg_t *msg, nxt_port_msg_t *port_msg,
|
||||
size_t payload_size)
|
||||
{
|
||||
nxt_port_mmap_msg_t *mmap_msg;
|
||||
|
||||
memset(msg, 0, sizeof(nxt_go_msg_t));
|
||||
|
||||
msg->port_msg = port_msg;
|
||||
msg->raw_size = payload_size;
|
||||
|
||||
if (nxt_fast_path(port_msg->mmap != 0)) {
|
||||
msg->mmap_msg = (nxt_port_mmap_msg_t *) (port_msg + 1);
|
||||
msg->end = nxt_pointer_to(msg->mmap_msg, payload_size);
|
||||
|
||||
mmap_msg = msg->mmap_msg;
|
||||
while(mmap_msg < msg->end) {
|
||||
msg->data_size += mmap_msg->size;
|
||||
mmap_msg += 1;
|
||||
}
|
||||
} else {
|
||||
msg->mmap_msg = NULL;
|
||||
msg->end = NULL;
|
||||
msg->data_size = payload_size;
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
nxt_go_ctx_release_msg(nxt_go_run_ctx_t *ctx, nxt_go_msg_t *msg)
|
||||
{
|
||||
u_char *b, *e;
|
||||
nxt_chunk_id_t c;
|
||||
nxt_port_mmap_t *port_mmap;
|
||||
nxt_port_mmap_msg_t *mmap_msg, *end;
|
||||
|
||||
if (nxt_slow_path(msg->mmap_msg == NULL)) {
|
||||
return;
|
||||
}
|
||||
|
||||
mmap_msg = msg->mmap_msg;
|
||||
end = msg->end;
|
||||
|
||||
nxt_go_mutex_lock(&ctx->process->incoming_mutex);
|
||||
|
||||
for (; mmap_msg < end; mmap_msg++ ) {
|
||||
port_mmap = nxt_go_array_at(&ctx->process->incoming, mmap_msg->mmap_id);
|
||||
|
||||
c = mmap_msg->chunk_id;
|
||||
b = nxt_port_mmap_chunk_start(port_mmap->hdr, c);
|
||||
e = b + mmap_msg->size;
|
||||
|
||||
while (b < e) {
|
||||
nxt_port_mmap_set_chunk_free(port_mmap->hdr, c);
|
||||
|
||||
b += PORT_MMAP_CHUNK_SIZE;
|
||||
c++;
|
||||
}
|
||||
}
|
||||
|
||||
nxt_go_mutex_unlock(&ctx->process->incoming_mutex);
|
||||
}
|
||||
|
||||
|
||||
nxt_int_t
|
||||
nxt_go_ctx_init(nxt_go_run_ctx_t *ctx, nxt_port_msg_t *port_msg,
|
||||
size_t payload_size)
|
||||
{
|
||||
memset(ctx, 0, sizeof(nxt_go_run_ctx_t));
|
||||
|
||||
ctx->process = nxt_go_get_process(port_msg->pid);
|
||||
if (nxt_slow_path(ctx->process == NULL)) {
|
||||
nxt_go_warn("failed to get process %d", port_msg->pid);
|
||||
|
||||
return NXT_ERROR;
|
||||
}
|
||||
|
||||
nxt_go_ctx_init_msg(&ctx->msg, port_msg, payload_size);
|
||||
|
||||
ctx->msg_last = &ctx->msg;
|
||||
|
||||
ctx->wport_msg.stream = port_msg->stream;
|
||||
ctx->wport_msg.pid = getpid();
|
||||
ctx->wport_msg.type = _NXT_PORT_MSG_DATA;
|
||||
ctx->wport_msg.mmap = 1;
|
||||
|
||||
ctx->wmmap_msg = (nxt_port_mmap_msg_t *) ( &ctx->wport_msg + 1 );
|
||||
|
||||
return nxt_go_ctx_init_rbuf(ctx);
|
||||
}
|
||||
|
||||
|
||||
void
|
||||
nxt_go_ctx_add_msg(nxt_go_run_ctx_t *ctx, nxt_port_msg_t *port_msg, size_t size)
|
||||
{
|
||||
nxt_go_msg_t *msg;
|
||||
|
||||
msg = malloc(sizeof(nxt_go_msg_t));
|
||||
|
||||
nxt_go_ctx_init_msg(msg, port_msg, size - sizeof(nxt_port_msg_t));
|
||||
|
||||
msg->start_offset = ctx->msg_last->start_offset;
|
||||
|
||||
if (ctx->msg_last == &ctx->msg) {
|
||||
msg->start_offset += ctx->r.body.preread_size;
|
||||
} else {
|
||||
msg->start_offset += ctx->msg_last->data_size;
|
||||
}
|
||||
|
||||
ctx->msg_last->next = msg;
|
||||
ctx->msg_last = msg;
|
||||
}
|
||||
|
||||
|
||||
nxt_int_t
|
||||
nxt_go_ctx_flush(nxt_go_run_ctx_t *ctx, int last)
|
||||
{
|
||||
int i;
|
||||
nxt_int_t rc;
|
||||
|
||||
if (last != 0) {
|
||||
ctx->wport_msg.last = 1;
|
||||
}
|
||||
|
||||
nxt_go_debug("flush buffers (%d)", last);
|
||||
|
||||
for (i = 0; i < ctx->nwbuf; i++) {
|
||||
nxt_port_mmap_msg_t *m = ctx->wmmap_msg + i;
|
||||
|
||||
nxt_go_debug(" mmap_msg[%d]={%d, %d, %d}", i,
|
||||
m->mmap_id, m->chunk_id, m->size);
|
||||
}
|
||||
|
||||
rc = nxt_go_port_send(ctx->msg.port_msg->pid, ctx->msg.port_msg->reply_port,
|
||||
&ctx->wport_msg, sizeof(nxt_port_msg_t) +
|
||||
ctx->nwbuf * sizeof(nxt_port_mmap_msg_t), NULL, 0);
|
||||
|
||||
ctx->nwbuf = 0;
|
||||
|
||||
memset(&ctx->wbuf, 0, sizeof(ctx->wbuf));
|
||||
|
||||
return rc;
|
||||
}
|
||||
|
||||
|
||||
nxt_buf_t *
|
||||
nxt_go_port_mmap_get_buf(nxt_go_run_ctx_t *ctx, size_t size)
|
||||
{
|
||||
size_t nchunks;
|
||||
nxt_buf_t *buf;
|
||||
nxt_chunk_id_t c;
|
||||
nxt_port_mmap_t *port_mmap;
|
||||
nxt_port_mmap_msg_t *mmap_msg;
|
||||
nxt_port_mmap_header_t *hdr;
|
||||
|
||||
c = 0;
|
||||
|
||||
buf = &ctx->wbuf;
|
||||
|
||||
hdr = nxt_go_port_mmap_get(ctx->process,
|
||||
ctx->msg.port_msg->reply_port, &c);
|
||||
if (nxt_slow_path(hdr == NULL)) {
|
||||
nxt_go_warn("failed to get port_mmap");
|
||||
|
||||
return NULL;
|
||||
}
|
||||
|
||||
buf->mem.start = nxt_port_mmap_chunk_start(hdr, c);
|
||||
buf->mem.pos = buf->mem.start;
|
||||
buf->mem.free = buf->mem.start;
|
||||
buf->mem.end = buf->mem.start + PORT_MMAP_CHUNK_SIZE;
|
||||
|
||||
buf->parent = hdr;
|
||||
|
||||
mmap_msg = ctx->wmmap_msg + ctx->nwbuf;
|
||||
mmap_msg->mmap_id = hdr->id;
|
||||
mmap_msg->chunk_id = c;
|
||||
mmap_msg->size = 0;
|
||||
|
||||
nchunks = size / PORT_MMAP_CHUNK_SIZE;
|
||||
if ((size % PORT_MMAP_CHUNK_SIZE) != 0 || nchunks == 0) {
|
||||
nchunks++;
|
||||
}
|
||||
|
||||
c++;
|
||||
nchunks--;
|
||||
|
||||
/* Try to acquire as much chunks as required. */
|
||||
while (nchunks > 0) {
|
||||
|
||||
if (nxt_port_mmap_chk_set_chunk_busy(hdr, c) == 0) {
|
||||
break;
|
||||
}
|
||||
|
||||
buf->mem.end += PORT_MMAP_CHUNK_SIZE;
|
||||
c++;
|
||||
nchunks--;
|
||||
}
|
||||
|
||||
ctx->nwbuf++;
|
||||
|
||||
return buf;
|
||||
}
|
||||
|
||||
|
||||
nxt_int_t
|
||||
nxt_go_port_mmap_increase_buf(nxt_buf_t *b, size_t size, size_t min_size)
|
||||
{
|
||||
size_t nchunks, free_size;
|
||||
nxt_chunk_id_t c, start;
|
||||
nxt_port_mmap_header_t *hdr;
|
||||
|
||||
free_size = nxt_buf_mem_free_size(&b->mem);
|
||||
|
||||
if (nxt_slow_path(size <= free_size)) {
|
||||
return NXT_OK;
|
||||
}
|
||||
|
||||
hdr = b->parent;
|
||||
|
||||
start = nxt_port_mmap_chunk_id(hdr, b->mem.end);
|
||||
|
||||
size -= free_size;
|
||||
|
||||
nchunks = size / PORT_MMAP_CHUNK_SIZE;
|
||||
if ((size % PORT_MMAP_CHUNK_SIZE) != 0 || nchunks == 0) {
|
||||
nchunks++;
|
||||
}
|
||||
|
||||
c = start;
|
||||
|
||||
/* Try to acquire as much chunks as required. */
|
||||
while (nchunks > 0) {
|
||||
|
||||
if (nxt_port_mmap_chk_set_chunk_busy(hdr, c) == 0) {
|
||||
break;
|
||||
}
|
||||
|
||||
c++;
|
||||
nchunks--;
|
||||
}
|
||||
|
||||
if (nchunks != 0 &&
|
||||
min_size > free_size + PORT_MMAP_CHUNK_SIZE * (c - start)) {
|
||||
|
||||
c--;
|
||||
while (c >= start) {
|
||||
nxt_port_mmap_set_chunk_free(hdr, c);
|
||||
c--;
|
||||
}
|
||||
|
||||
return NXT_ERROR;
|
||||
} else {
|
||||
b->mem.end += PORT_MMAP_CHUNK_SIZE * (c - start);
|
||||
|
||||
return NXT_OK;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
nxt_int_t
|
||||
nxt_go_ctx_write(nxt_go_run_ctx_t *ctx, void *data, size_t len)
|
||||
{
|
||||
size_t free_size, copy_size;
|
||||
nxt_buf_t *buf;
|
||||
nxt_port_mmap_msg_t *mmap_msg;
|
||||
|
||||
buf = &ctx->wbuf;
|
||||
|
||||
while (len > 0) {
|
||||
if (ctx->nwbuf == 0) {
|
||||
buf = nxt_go_port_mmap_get_buf(ctx, len);
|
||||
|
||||
if (nxt_slow_path(buf == NULL)) {
|
||||
return NXT_ERROR;
|
||||
}
|
||||
}
|
||||
|
||||
do {
|
||||
free_size = nxt_buf_mem_free_size(&buf->mem);
|
||||
|
||||
if (free_size > 0) {
|
||||
copy_size = nxt_min(free_size, len);
|
||||
|
||||
buf->mem.free = nxt_cpymem(buf->mem.free, data, copy_size);
|
||||
|
||||
mmap_msg = ctx->wmmap_msg + ctx->nwbuf - 1;
|
||||
mmap_msg->size += copy_size;
|
||||
|
||||
len -= copy_size;
|
||||
data = nxt_pointer_to(data, copy_size);
|
||||
|
||||
if (len == 0) {
|
||||
return NXT_OK;
|
||||
}
|
||||
}
|
||||
} while (nxt_go_port_mmap_increase_buf(buf, len, 1) == NXT_OK);
|
||||
|
||||
if (ctx->nwbuf >= 8) {
|
||||
nxt_go_ctx_flush(ctx, 0);
|
||||
}
|
||||
|
||||
buf = nxt_go_port_mmap_get_buf(ctx, len);
|
||||
|
||||
if (nxt_slow_path(buf == NULL)) {
|
||||
return NXT_ERROR;
|
||||
}
|
||||
}
|
||||
|
||||
return NXT_OK;
|
||||
}
|
||||
|
||||
|
||||
static nxt_int_t
|
||||
nxt_go_ctx_read_size_(nxt_go_run_ctx_t *ctx, size_t *size)
|
||||
{
|
||||
nxt_buf_t *buf;
|
||||
nxt_int_t rc;
|
||||
|
||||
do {
|
||||
buf = &ctx->rbuf;
|
||||
|
||||
if (nxt_slow_path(nxt_buf_mem_used_size(&buf->mem) < 1)) {
|
||||
if (nxt_fast_path(nxt_buf_mem_used_size(&buf->mem) == 0)) {
|
||||
|
||||
ctx->nrbuf++;
|
||||
rc = nxt_go_ctx_init_rbuf(ctx);
|
||||
if (nxt_slow_path(rc != NXT_OK)) {
|
||||
nxt_go_warn("read size: init rbuf failed");
|
||||
return rc;
|
||||
}
|
||||
|
||||
continue;
|
||||
}
|
||||
nxt_go_warn("read size: used size is not 0");
|
||||
return NXT_ERROR;
|
||||
}
|
||||
|
||||
if (buf->mem.pos[0] >= 128) {
|
||||
if (nxt_slow_path(nxt_buf_mem_used_size(&buf->mem) < 4)) {
|
||||
nxt_go_warn("read size: used size < 4");
|
||||
return NXT_ERROR;
|
||||
}
|
||||
}
|
||||
|
||||
break;
|
||||
} while (1);
|
||||
|
||||
buf->mem.pos = nxt_app_msg_read_length(buf->mem.pos, size);
|
||||
|
||||
return NXT_OK;
|
||||
}
|
||||
|
||||
nxt_int_t
|
||||
nxt_go_ctx_read_size(nxt_go_run_ctx_t *ctx, size_t *size)
|
||||
{
|
||||
nxt_int_t rc;
|
||||
|
||||
rc = nxt_go_ctx_read_size_(ctx, size);
|
||||
|
||||
if (nxt_fast_path(rc == NXT_OK)) {
|
||||
nxt_go_debug("read_size: %d", (int)*size);
|
||||
}
|
||||
|
||||
return rc;
|
||||
}
|
||||
|
||||
nxt_int_t
|
||||
nxt_go_ctx_read_str(nxt_go_run_ctx_t *ctx, nxt_str_t *str)
|
||||
{
|
||||
size_t length;
|
||||
nxt_int_t rc;
|
||||
nxt_buf_t *buf;
|
||||
|
||||
rc = nxt_go_ctx_read_size_(ctx, &length);
|
||||
if (nxt_slow_path(rc != NXT_OK)) {
|
||||
nxt_go_warn("read str: read size failed");
|
||||
return rc;
|
||||
}
|
||||
|
||||
buf = &ctx->rbuf;
|
||||
|
||||
if (nxt_slow_path(nxt_buf_mem_used_size(&buf->mem) < (intptr_t)length)) {
|
||||
nxt_go_warn("read str: used size too small %d < %d",
|
||||
(int)nxt_buf_mem_used_size(&buf->mem), (int)length);
|
||||
return NXT_ERROR;
|
||||
}
|
||||
|
||||
if (length > 0) {
|
||||
str->start = buf->mem.pos;
|
||||
str->length = length - 1;
|
||||
|
||||
buf->mem.pos += length;
|
||||
|
||||
nxt_go_debug("read_str: %d %.*s", (int)length - 1, (int)length - 1,
|
||||
str->start);
|
||||
} else {
|
||||
str->start = NULL;
|
||||
str->length = 0;
|
||||
|
||||
nxt_go_debug("read_str: NULL");
|
||||
}
|
||||
|
||||
return NXT_OK;
|
||||
}
|
||||
|
||||
|
||||
size_t
|
||||
nxt_go_ctx_read_raw(nxt_go_run_ctx_t *ctx, void *dst, size_t size)
|
||||
{
|
||||
size_t res, read_size;
|
||||
nxt_int_t rc;
|
||||
nxt_buf_t *buf;
|
||||
|
||||
res = 0;
|
||||
|
||||
while (size > 0) {
|
||||
buf = &ctx->rbuf;
|
||||
|
||||
if (nxt_slow_path(nxt_buf_mem_used_size(&buf->mem) == 0)) {
|
||||
ctx->nrbuf++;
|
||||
rc = nxt_go_ctx_init_rbuf(ctx);
|
||||
if (nxt_slow_path(rc != NXT_OK)) {
|
||||
nxt_go_warn("read raw: init rbuf failed");
|
||||
return res;
|
||||
}
|
||||
|
||||
continue;
|
||||
}
|
||||
|
||||
read_size = nxt_buf_mem_used_size(&buf->mem);
|
||||
read_size = nxt_min(read_size, size);
|
||||
|
||||
dst = nxt_cpymem(dst, buf->mem.pos, read_size);
|
||||
|
||||
size -= read_size;
|
||||
buf->mem.pos += read_size;
|
||||
res += read_size;
|
||||
}
|
||||
|
||||
nxt_go_debug("read_raw: %d", (int) res);
|
||||
|
||||
return res;
|
||||
}
|
||||
|
||||
|
||||
#endif /* NXT_CONFIGURE */
|
||||
75
src/go/unit/nxt_go_run_ctx.h
Normal file
75
src/go/unit/nxt_go_run_ctx.h
Normal file
@@ -0,0 +1,75 @@
|
||||
|
||||
/*
|
||||
* Copyright (C) Max Romanov
|
||||
* Copyright (C) NGINX, Inc.
|
||||
*/
|
||||
|
||||
#ifndef _NXT_GO_RUN_CTX_H_INCLUDED_
|
||||
#define _NXT_GO_RUN_CTX_H_INCLUDED_
|
||||
|
||||
|
||||
#include <nxt_main.h>
|
||||
#include <nxt_application.h>
|
||||
#include <nxt_port_memory_int.h>
|
||||
|
||||
#ifndef _NXT_GO_PROCESS_T_DEFINED_
|
||||
#define _NXT_GO_PROCESS_T_DEFINED_
|
||||
typedef struct nxt_go_process_s nxt_go_process_t;
|
||||
#endif
|
||||
|
||||
typedef struct nxt_go_msg_s nxt_go_msg_t;
|
||||
|
||||
struct nxt_go_msg_s {
|
||||
off_t start_offset;
|
||||
|
||||
nxt_port_msg_t *port_msg;
|
||||
size_t raw_size;
|
||||
size_t data_size;
|
||||
|
||||
nxt_port_mmap_msg_t *mmap_msg;
|
||||
nxt_port_mmap_msg_t *end;
|
||||
|
||||
nxt_go_msg_t *next;
|
||||
};
|
||||
|
||||
|
||||
typedef struct {
|
||||
nxt_go_msg_t msg;
|
||||
|
||||
nxt_go_process_t *process;
|
||||
nxt_port_mmap_msg_t *wmmap_msg;
|
||||
|
||||
uint32_t nrbuf;
|
||||
nxt_buf_t rbuf;
|
||||
|
||||
uint32_t nwbuf;
|
||||
nxt_buf_t wbuf;
|
||||
nxt_port_msg_t wport_msg;
|
||||
char wmmap_msg_buf[ sizeof(nxt_port_mmap_msg_t) * 8 ];
|
||||
|
||||
nxt_app_request_t r;
|
||||
|
||||
nxt_go_msg_t *msg_last;
|
||||
} nxt_go_run_ctx_t;
|
||||
|
||||
|
||||
void nxt_go_ctx_release_msg(nxt_go_run_ctx_t *ctx, nxt_go_msg_t *msg);
|
||||
|
||||
nxt_int_t nxt_go_ctx_init(nxt_go_run_ctx_t *ctx, nxt_port_msg_t *port_msg,
|
||||
size_t payload_size);
|
||||
|
||||
void nxt_go_ctx_add_msg(nxt_go_run_ctx_t *ctx, nxt_port_msg_t *port_msg,
|
||||
size_t size);
|
||||
|
||||
nxt_int_t nxt_go_ctx_flush(nxt_go_run_ctx_t *ctx, int last);
|
||||
|
||||
nxt_int_t nxt_go_ctx_write(nxt_go_run_ctx_t *ctx, void *data, size_t len);
|
||||
|
||||
nxt_int_t nxt_go_ctx_read_size(nxt_go_run_ctx_t *ctx, size_t *size);
|
||||
|
||||
nxt_int_t nxt_go_ctx_read_str(nxt_go_run_ctx_t *ctx, nxt_str_t *str);
|
||||
|
||||
size_t nxt_go_ctx_read_raw(nxt_go_run_ctx_t *ctx, void *dst, size_t size);
|
||||
|
||||
|
||||
#endif /* _NXT_GO_RUN_CTX_H_INCLUDED_ */
|
||||
219
src/go/unit/port.go
Normal file
219
src/go/unit/port.go
Normal file
@@ -0,0 +1,219 @@
|
||||
/*
|
||||
* Copyright (C) Max Romanov
|
||||
* Copyright (C) NGINX, Inc.
|
||||
*/
|
||||
|
||||
package unit
|
||||
|
||||
/*
|
||||
#include "nxt_go_lib.h"
|
||||
#include "nxt_process_type.h"
|
||||
*/
|
||||
import "C"
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net"
|
||||
"net/http"
|
||||
"os"
|
||||
"sync"
|
||||
"unsafe"
|
||||
)
|
||||
|
||||
type port_key struct {
|
||||
pid int
|
||||
id int
|
||||
}
|
||||
|
||||
type port struct {
|
||||
key port_key
|
||||
t int
|
||||
rcv *net.UnixConn
|
||||
snd *net.UnixConn
|
||||
}
|
||||
|
||||
type port_registry struct {
|
||||
sync.RWMutex
|
||||
m map[port_key]*port
|
||||
t [C.NXT_PROCESS_MAX]*port
|
||||
}
|
||||
|
||||
var port_registry_ port_registry
|
||||
|
||||
func find_port(key port_key) *port {
|
||||
port_registry_.RLock()
|
||||
res := port_registry_.m[key]
|
||||
port_registry_.RUnlock()
|
||||
|
||||
return res
|
||||
}
|
||||
|
||||
func remove_by_pid(pid int) {
|
||||
port_registry_.Lock()
|
||||
if port_registry_.m != nil {
|
||||
for k, p := range port_registry_.m {
|
||||
if k.pid == pid {
|
||||
if port_registry_.t[p.t] == p {
|
||||
port_registry_.t[p.t] = nil
|
||||
}
|
||||
|
||||
delete(port_registry_.m, k)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
port_registry_.Unlock()
|
||||
}
|
||||
|
||||
func main_port() *port {
|
||||
port_registry_.RLock()
|
||||
res := port_registry_.t[C.NXT_PROCESS_MAIN]
|
||||
port_registry_.RUnlock()
|
||||
|
||||
return res
|
||||
}
|
||||
|
||||
func add_port(p *port) {
|
||||
port_registry_.Lock()
|
||||
if port_registry_.m == nil {
|
||||
port_registry_.m = make(map[port_key]*port)
|
||||
}
|
||||
|
||||
port_registry_.m[p.key] = p
|
||||
port_registry_.t[p.t] = p
|
||||
|
||||
port_registry_.Unlock()
|
||||
}
|
||||
|
||||
func (p *port) Close() {
|
||||
if p.rcv != nil {
|
||||
p.rcv.Close()
|
||||
}
|
||||
|
||||
if p.snd != nil {
|
||||
p.snd.Close()
|
||||
}
|
||||
}
|
||||
|
||||
func getUnixConn(fd int) *net.UnixConn {
|
||||
if fd < 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
f := os.NewFile(uintptr(fd), "sock")
|
||||
defer f.Close()
|
||||
|
||||
c, err := net.FileConn(f)
|
||||
if err != nil {
|
||||
fmt.Printf("FileConn error %s\n", err)
|
||||
return nil
|
||||
}
|
||||
|
||||
uc, ok := c.(*net.UnixConn)
|
||||
if !ok {
|
||||
fmt.Printf("Not a Unix-domain socket %d\n", fd)
|
||||
return nil
|
||||
}
|
||||
|
||||
return uc
|
||||
}
|
||||
|
||||
//export nxt_go_new_port
|
||||
func nxt_go_new_port(pid C.int, id C.int, t C.int, rcv C.int, snd C.int) {
|
||||
new_port(int(pid), int(id), int(t), int(rcv), int(snd))
|
||||
}
|
||||
|
||||
//export nxt_go_port_send
|
||||
func nxt_go_port_send(pid C.int, id C.int, buf unsafe.Pointer, buf_size C.int, oob unsafe.Pointer, oob_size C.int) C.int {
|
||||
key := port_key{
|
||||
pid: int(pid),
|
||||
id: int(id),
|
||||
}
|
||||
|
||||
p := find_port(key)
|
||||
|
||||
if p != nil {
|
||||
n, oobn, err := p.snd.WriteMsgUnix(C.GoBytes(buf, buf_size), C.GoBytes(oob, oob_size), nil)
|
||||
|
||||
if err != nil {
|
||||
fmt.Printf("write result %d (%d), %s\n", n, oobn, err)
|
||||
}
|
||||
|
||||
return C.int(n)
|
||||
}
|
||||
|
||||
return 0
|
||||
}
|
||||
|
||||
//export nxt_go_main_send
|
||||
func nxt_go_main_send(buf unsafe.Pointer, buf_size C.int, oob unsafe.Pointer, oob_size C.int) C.int {
|
||||
p := main_port()
|
||||
|
||||
if p != nil {
|
||||
n, oobn, err := p.snd.WriteMsgUnix(C.GoBytes(buf, buf_size), C.GoBytes(oob, oob_size), nil)
|
||||
|
||||
if err != nil {
|
||||
fmt.Printf("write result %d (%d), %s\n", n, oobn, err)
|
||||
}
|
||||
|
||||
return C.int(n)
|
||||
}
|
||||
|
||||
return 0
|
||||
}
|
||||
|
||||
func new_port(pid int, id int, t int, rcv int, snd int) *port {
|
||||
p := &port{
|
||||
key: port_key{
|
||||
pid: pid,
|
||||
id: id,
|
||||
},
|
||||
t: t,
|
||||
rcv: getUnixConn(rcv),
|
||||
snd: getUnixConn(snd),
|
||||
}
|
||||
|
||||
add_port(p)
|
||||
|
||||
return p
|
||||
}
|
||||
|
||||
func (p *port) read(handler http.Handler) error {
|
||||
var buf [16384]byte
|
||||
var oob [1024]byte
|
||||
|
||||
n, oobn, _, _, err := p.rcv.ReadMsgUnix(buf[:], oob[:])
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
m := new_cmsg(buf[:n], oob[:oobn])
|
||||
|
||||
c_req := C.nxt_go_process_port_msg(m.buf.b, m.buf.s, m.oob.b, m.oob.s)
|
||||
|
||||
if c_req == 0 {
|
||||
m.Close()
|
||||
} else {
|
||||
r := find_request(c_req)
|
||||
|
||||
go func(r *request) {
|
||||
if handler == nil {
|
||||
handler = http.DefaultServeMux
|
||||
}
|
||||
|
||||
handler.ServeHTTP(r.response(), &r.req)
|
||||
r.done()
|
||||
}(r)
|
||||
|
||||
if len(r.msgs) == 0 {
|
||||
r.push(m)
|
||||
} else if r.ch != nil {
|
||||
r.ch <- m
|
||||
} else {
|
||||
m.Close()
|
||||
}
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
211
src/go/unit/request.go
Normal file
211
src/go/unit/request.go
Normal file
@@ -0,0 +1,211 @@
|
||||
/*
|
||||
* Copyright (C) Max Romanov
|
||||
* Copyright (C) NGINX, Inc.
|
||||
*/
|
||||
|
||||
package unit
|
||||
|
||||
/*
|
||||
#include "nxt_go_lib.h"
|
||||
*/
|
||||
import "C"
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
"net/url"
|
||||
"sync"
|
||||
)
|
||||
|
||||
type request struct {
|
||||
req http.Request
|
||||
resp *response
|
||||
c_req C.nxt_go_request_t
|
||||
id C.uint32_t
|
||||
msgs []*cmsg
|
||||
ch chan *cmsg
|
||||
}
|
||||
|
||||
func (r *request) Read(p []byte) (n int, err error) {
|
||||
c := C.size_t(cap(p))
|
||||
b := C.malloc(c)
|
||||
res := C.nxt_go_request_read(r.c_req, b, c)
|
||||
|
||||
if res == -2 /* NXT_AGAIN */ {
|
||||
m := <-r.ch
|
||||
|
||||
res = C.nxt_go_request_read_from(r.c_req, b, c, m.buf.b, m.buf.s)
|
||||
r.push(m)
|
||||
}
|
||||
|
||||
if res > 0 {
|
||||
copy(p, C.GoBytes(b, res))
|
||||
}
|
||||
|
||||
C.free(b)
|
||||
return int(res), nil
|
||||
}
|
||||
|
||||
func (r *request) Close() error {
|
||||
C.nxt_go_request_close(r.c_req)
|
||||
return nil
|
||||
}
|
||||
|
||||
type request_registry struct {
|
||||
sync.RWMutex
|
||||
m map[C.nxt_go_request_t]*request
|
||||
id map[C.uint32_t]*request
|
||||
}
|
||||
|
||||
var request_registry_ request_registry
|
||||
|
||||
func find_request(c_req C.nxt_go_request_t) *request {
|
||||
request_registry_.RLock()
|
||||
res := request_registry_.m[c_req]
|
||||
request_registry_.RUnlock()
|
||||
|
||||
return res
|
||||
}
|
||||
|
||||
func find_request_by_id(id C.uint32_t) *request {
|
||||
request_registry_.RLock()
|
||||
res := request_registry_.id[id]
|
||||
request_registry_.RUnlock()
|
||||
|
||||
return res
|
||||
}
|
||||
|
||||
func add_request(r *request) {
|
||||
request_registry_.Lock()
|
||||
if request_registry_.m == nil {
|
||||
request_registry_.m = make(map[C.nxt_go_request_t]*request)
|
||||
request_registry_.id = make(map[C.uint32_t]*request)
|
||||
}
|
||||
|
||||
request_registry_.m[r.c_req] = r
|
||||
request_registry_.id[r.id] = r
|
||||
|
||||
request_registry_.Unlock()
|
||||
}
|
||||
|
||||
func remove_request(r *request) {
|
||||
request_registry_.Lock()
|
||||
if request_registry_.m != nil {
|
||||
delete(request_registry_.m, r.c_req)
|
||||
delete(request_registry_.id, r.id)
|
||||
}
|
||||
|
||||
request_registry_.Unlock()
|
||||
}
|
||||
|
||||
func (r *request) response() *response {
|
||||
if r.resp == nil {
|
||||
r.resp = new_response(r.c_req, &r.req)
|
||||
}
|
||||
|
||||
return r.resp
|
||||
}
|
||||
|
||||
func (r *request) done() {
|
||||
C.nxt_go_request_done(r.c_req)
|
||||
|
||||
remove_request(r)
|
||||
|
||||
for _, m := range r.msgs {
|
||||
m.Close()
|
||||
}
|
||||
|
||||
if r.ch != nil {
|
||||
close(r.ch)
|
||||
}
|
||||
}
|
||||
|
||||
func (r *request) push(m *cmsg) {
|
||||
r.msgs = append(r.msgs, m)
|
||||
}
|
||||
|
||||
//export nxt_go_new_request
|
||||
func nxt_go_new_request(c_req C.nxt_go_request_t, id C.uint32_t, c_method *C.nxt_go_str_t, c_uri *C.nxt_go_str_t) {
|
||||
uri := C.GoStringN(c_uri.start, c_uri.length)
|
||||
|
||||
var URL *url.URL
|
||||
var err error
|
||||
if URL, err = url.ParseRequestURI(uri); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
r := &request{
|
||||
req: http.Request{
|
||||
Method: C.GoStringN(c_method.start, c_method.length),
|
||||
URL: URL,
|
||||
Header: http.Header{},
|
||||
Body: nil,
|
||||
RequestURI: uri,
|
||||
},
|
||||
c_req: c_req,
|
||||
id: id,
|
||||
msgs: make([]*cmsg, 0, 1),
|
||||
}
|
||||
r.req.Body = r
|
||||
|
||||
add_request(r)
|
||||
}
|
||||
|
||||
//export nxt_go_find_request
|
||||
func nxt_go_find_request(id C.uint32_t) C.nxt_go_request_t {
|
||||
r := find_request_by_id(id)
|
||||
|
||||
if r != nil {
|
||||
return r.c_req
|
||||
}
|
||||
|
||||
return 0
|
||||
}
|
||||
|
||||
//export nxt_go_request_set_proto
|
||||
func nxt_go_request_set_proto(c_req C.nxt_go_request_t, proto *C.nxt_go_str_t, maj C.int, min C.int) {
|
||||
r := find_request(c_req)
|
||||
r.req.Proto = C.GoStringN(proto.start, proto.length)
|
||||
r.req.ProtoMajor = int(maj)
|
||||
r.req.ProtoMinor = int(min)
|
||||
}
|
||||
|
||||
//export nxt_go_request_add_header
|
||||
func nxt_go_request_add_header(c_req C.nxt_go_request_t, name *C.nxt_go_str_t, value *C.nxt_go_str_t) {
|
||||
r := find_request(c_req)
|
||||
r.req.Header.Add(C.GoStringN(name.start, name.length), C.GoStringN(value.start, value.length))
|
||||
}
|
||||
|
||||
//export nxt_go_request_set_content_length
|
||||
func nxt_go_request_set_content_length(c_req C.nxt_go_request_t, l C.int64_t) {
|
||||
find_request(c_req).req.ContentLength = int64(l)
|
||||
}
|
||||
|
||||
//export nxt_go_request_create_channel
|
||||
func nxt_go_request_create_channel(c_req C.nxt_go_request_t) {
|
||||
find_request(c_req).ch = make(chan *cmsg)
|
||||
}
|
||||
|
||||
//export nxt_go_request_set_host
|
||||
func nxt_go_request_set_host(c_req C.nxt_go_request_t, host *C.nxt_go_str_t) {
|
||||
find_request(c_req).req.Host = C.GoStringN(host.start, host.length)
|
||||
}
|
||||
|
||||
//export nxt_go_request_set_url
|
||||
func nxt_go_request_set_url(c_req C.nxt_go_request_t, scheme *C.char) {
|
||||
find_request(c_req).req.URL.Scheme = C.GoString(scheme)
|
||||
}
|
||||
|
||||
//export nxt_go_request_set_remote_addr
|
||||
func nxt_go_request_set_remote_addr(c_req C.nxt_go_request_t, addr *C.nxt_go_str_t) {
|
||||
find_request(c_req).req.RemoteAddr = C.GoStringN(addr.start, addr.length)
|
||||
}
|
||||
|
||||
//export nxt_go_request_serve
|
||||
func nxt_go_request_serve(c_req C.nxt_go_request_t) {
|
||||
r := find_request(c_req)
|
||||
|
||||
go func(r *request) {
|
||||
http.DefaultServeMux.ServeHTTP(r.response(), &r.req)
|
||||
r.done()
|
||||
}(r)
|
||||
}
|
||||
69
src/go/unit/response.go
Normal file
69
src/go/unit/response.go
Normal file
@@ -0,0 +1,69 @@
|
||||
/*
|
||||
* Copyright (C) Max Romanov
|
||||
* Copyright (C) NGINX, Inc.
|
||||
*/
|
||||
|
||||
package unit
|
||||
|
||||
/*
|
||||
#include "nxt_go_lib.h"
|
||||
*/
|
||||
import "C"
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net/http"
|
||||
"os"
|
||||
)
|
||||
|
||||
type response struct {
|
||||
header http.Header
|
||||
headerSent bool
|
||||
req *http.Request
|
||||
c_req C.nxt_go_request_t
|
||||
}
|
||||
|
||||
func new_response(c_req C.nxt_go_request_t, req *http.Request) *response {
|
||||
resp := &response{
|
||||
header: http.Header{},
|
||||
req: req,
|
||||
c_req: c_req,
|
||||
}
|
||||
|
||||
return resp
|
||||
}
|
||||
|
||||
func (r *response) Header() http.Header {
|
||||
return r.header
|
||||
}
|
||||
|
||||
func (r *response) Write(p []byte) (n int, err error) {
|
||||
if !r.headerSent {
|
||||
r.WriteHeader(http.StatusOK)
|
||||
}
|
||||
|
||||
l := C.size_t(len(p))
|
||||
b := getCBytes(p)
|
||||
res := C.nxt_go_response_write(r.c_req, b, l)
|
||||
C.free(b)
|
||||
return int(res), nil
|
||||
}
|
||||
|
||||
func (r *response) WriteHeader(code int) {
|
||||
if r.headerSent {
|
||||
// Note: explicitly using Stderr, as Stdout is our HTTP output.
|
||||
fmt.Fprintf(os.Stderr, "CGI attempted to write header twice")
|
||||
return
|
||||
}
|
||||
r.headerSent = true
|
||||
fmt.Fprintf(r, "%s %d %s\r\n", r.req.Proto, code, http.StatusText(code))
|
||||
|
||||
// Set a default Content-Type
|
||||
if _, hasType := r.header["Content-Type"]; !hasType {
|
||||
r.header.Add("Content-Type", "text/html; charset=utf-8")
|
||||
}
|
||||
|
||||
r.header.Write(r)
|
||||
|
||||
r.Write([]byte("\r\n"))
|
||||
}
|
||||
134
src/go/unit/unit.go
Normal file
134
src/go/unit/unit.go
Normal file
@@ -0,0 +1,134 @@
|
||||
/*
|
||||
* Copyright (C) Max Romanov
|
||||
* Copyright (C) NGINX, Inc.
|
||||
*/
|
||||
|
||||
package unit
|
||||
|
||||
/*
|
||||
#include "nxt_go_lib.h"
|
||||
*/
|
||||
import "C"
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net/http"
|
||||
"os"
|
||||
"strconv"
|
||||
"strings"
|
||||
"unsafe"
|
||||
)
|
||||
|
||||
type cbuf struct {
|
||||
b unsafe.Pointer
|
||||
s C.size_t
|
||||
f bool
|
||||
}
|
||||
|
||||
func new_cbuf(buf []byte) *cbuf {
|
||||
if len(buf) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
return &cbuf{
|
||||
getCBytes(buf), C.size_t(len(buf)), true,
|
||||
}
|
||||
}
|
||||
|
||||
func (buf *cbuf) Close() {
|
||||
if buf == nil {
|
||||
return
|
||||
}
|
||||
|
||||
if buf.f && buf.s > 0 {
|
||||
C.free(buf.b)
|
||||
buf.f = false
|
||||
buf.b = nil
|
||||
buf.s = 0
|
||||
}
|
||||
}
|
||||
|
||||
func (buf *cbuf) GoBytes() []byte {
|
||||
if buf == nil {
|
||||
var b [0]byte
|
||||
return b[:0]
|
||||
}
|
||||
|
||||
return C.GoBytes(buf.b, C.int(buf.s))
|
||||
}
|
||||
|
||||
type cmsg struct {
|
||||
buf cbuf
|
||||
oob cbuf
|
||||
}
|
||||
|
||||
func new_cmsg(buf []byte, oob []byte) *cmsg {
|
||||
return &cmsg{
|
||||
buf: cbuf{getCBytes(buf), C.size_t(len(buf)), true},
|
||||
oob: cbuf{getCBytes(oob), C.size_t(len(oob)), true},
|
||||
}
|
||||
}
|
||||
|
||||
func (msg *cmsg) Close() {
|
||||
msg.buf.Close()
|
||||
msg.oob.Close()
|
||||
}
|
||||
|
||||
var nxt_go_quit bool = false
|
||||
|
||||
//export nxt_go_set_quit
|
||||
func nxt_go_set_quit() {
|
||||
nxt_go_quit = true
|
||||
}
|
||||
|
||||
func ListenAndServe(addr string, handler http.Handler) error {
|
||||
var read_port *port
|
||||
|
||||
go_ports_env := os.Getenv("NXT_GO_PORTS")
|
||||
|
||||
ports := strings.Split(go_ports_env, ";")
|
||||
pid := os.Getpid()
|
||||
|
||||
for _, port_str := range ports {
|
||||
if len(port_str) <= 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
attrs := strings.Split(port_str, ",")
|
||||
|
||||
var attrsN [5]int
|
||||
var err error
|
||||
for i, attr := range attrs {
|
||||
attrsN[i], err = strconv.Atoi(attr)
|
||||
if err != nil {
|
||||
fmt.Printf("err %s\n", err)
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
p := new_port(attrsN[0], attrsN[1], attrsN[2], attrsN[3], attrsN[4])
|
||||
|
||||
if attrsN[0] == pid {
|
||||
read_port = p
|
||||
}
|
||||
}
|
||||
|
||||
if read_port != nil {
|
||||
C.nxt_go_ready()
|
||||
|
||||
for !nxt_go_quit {
|
||||
err := read_port.read(handler)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
} else {
|
||||
return http.ListenAndServe(addr, handler)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
Reference in New Issue
Block a user