Java: request processing in multiple threads.
This closes #458 issue on GitHub.
This commit is contained in:
@@ -77,6 +77,8 @@ typedef struct {
|
|||||||
char *webapp;
|
char *webapp;
|
||||||
nxt_conf_value_t *options;
|
nxt_conf_value_t *options;
|
||||||
char *unit_jars;
|
char *unit_jars;
|
||||||
|
uint32_t threads;
|
||||||
|
uint32_t thread_stack_size;
|
||||||
} nxt_java_app_conf_t;
|
} nxt_java_app_conf_t;
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -639,6 +639,14 @@ static nxt_conf_vldt_object_t nxt_conf_vldt_java_members[] = {
|
|||||||
}, {
|
}, {
|
||||||
.name = nxt_string("unit_jars"),
|
.name = nxt_string("unit_jars"),
|
||||||
.type = NXT_CONF_VLDT_STRING,
|
.type = NXT_CONF_VLDT_STRING,
|
||||||
|
}, {
|
||||||
|
.name = nxt_string("threads"),
|
||||||
|
.type = NXT_CONF_VLDT_INTEGER,
|
||||||
|
.validator = nxt_conf_vldt_threads,
|
||||||
|
}, {
|
||||||
|
.name = nxt_string("thread_stack_size"),
|
||||||
|
.type = NXT_CONF_VLDT_INTEGER,
|
||||||
|
.validator = nxt_conf_vldt_thread_stack_size,
|
||||||
},
|
},
|
||||||
|
|
||||||
NXT_CONF_VLDT_NEXT(nxt_conf_vldt_common_members)
|
NXT_CONF_VLDT_NEXT(nxt_conf_vldt_common_members)
|
||||||
|
|||||||
199
src/nxt_java.c
199
src/nxt_java.c
@@ -36,6 +36,11 @@ static nxt_int_t nxt_java_start(nxt_task_t *task,
|
|||||||
static void nxt_java_request_handler(nxt_unit_request_info_t *req);
|
static void nxt_java_request_handler(nxt_unit_request_info_t *req);
|
||||||
static void nxt_java_websocket_handler(nxt_unit_websocket_frame_t *ws);
|
static void nxt_java_websocket_handler(nxt_unit_websocket_frame_t *ws);
|
||||||
static void nxt_java_close_handler(nxt_unit_request_info_t *req);
|
static void nxt_java_close_handler(nxt_unit_request_info_t *req);
|
||||||
|
static int nxt_java_ready_handler(nxt_unit_ctx_t *ctx);
|
||||||
|
static void *nxt_java_thread_func(void *main_ctx);
|
||||||
|
static int nxt_java_init_threads(nxt_java_app_conf_t *c);
|
||||||
|
static void nxt_java_join_threads(nxt_unit_ctx_t *ctx,
|
||||||
|
nxt_java_app_conf_t *c);
|
||||||
|
|
||||||
static uint32_t compat[] = {
|
static uint32_t compat[] = {
|
||||||
NXT_VERNUM, NXT_DEBUG,
|
NXT_VERNUM, NXT_DEBUG,
|
||||||
@@ -43,6 +48,9 @@ static uint32_t compat[] = {
|
|||||||
|
|
||||||
char *nxt_java_modules;
|
char *nxt_java_modules;
|
||||||
|
|
||||||
|
static pthread_t *nxt_java_threads;
|
||||||
|
static pthread_attr_t *nxt_java_thread_attr;
|
||||||
|
|
||||||
|
|
||||||
#define NXT_STRING(x) _NXT_STRING(x)
|
#define NXT_STRING(x) _NXT_STRING(x)
|
||||||
#define _NXT_STRING(x) #x
|
#define _NXT_STRING(x) #x
|
||||||
@@ -59,8 +67,10 @@ NXT_EXPORT nxt_app_module_t nxt_app_module = {
|
|||||||
};
|
};
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
JNIEnv *env;
|
JavaVM *jvm;
|
||||||
jobject ctx;
|
jobject cl;
|
||||||
|
jobject ctx;
|
||||||
|
nxt_java_app_conf_t *conf;
|
||||||
} nxt_java_data_t;
|
} nxt_java_data_t;
|
||||||
|
|
||||||
|
|
||||||
@@ -402,8 +412,10 @@ nxt_java_start(nxt_task_t *task, nxt_process_data_t *data)
|
|||||||
goto env_failed;
|
goto env_failed;
|
||||||
}
|
}
|
||||||
|
|
||||||
java_data.env = env;
|
java_data.jvm = jvm;
|
||||||
|
java_data.cl = cl;
|
||||||
java_data.ctx = nxt_java_startContext(env, c->webapp, classpath);
|
java_data.ctx = nxt_java_startContext(env, c->webapp, classpath);
|
||||||
|
java_data.conf = c;
|
||||||
|
|
||||||
if ((*env)->ExceptionCheck(env)) {
|
if ((*env)->ExceptionCheck(env)) {
|
||||||
nxt_alert(task, "Unhandled exception in application start");
|
nxt_alert(task, "Unhandled exception in application start");
|
||||||
@@ -411,13 +423,20 @@ nxt_java_start(nxt_task_t *task, nxt_process_data_t *data)
|
|||||||
return NXT_ERROR;
|
return NXT_ERROR;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
rc = nxt_java_init_threads(c);
|
||||||
|
if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
|
||||||
|
return NXT_ERROR;
|
||||||
|
}
|
||||||
|
|
||||||
nxt_unit_default_init(task, &java_init);
|
nxt_unit_default_init(task, &java_init);
|
||||||
|
|
||||||
java_init.callbacks.request_handler = nxt_java_request_handler;
|
java_init.callbacks.request_handler = nxt_java_request_handler;
|
||||||
java_init.callbacks.websocket_handler = nxt_java_websocket_handler;
|
java_init.callbacks.websocket_handler = nxt_java_websocket_handler;
|
||||||
java_init.callbacks.close_handler = nxt_java_close_handler;
|
java_init.callbacks.close_handler = nxt_java_close_handler;
|
||||||
|
java_init.callbacks.ready_handler = nxt_java_ready_handler;
|
||||||
java_init.request_data_size = sizeof(nxt_java_request_data_t);
|
java_init.request_data_size = sizeof(nxt_java_request_data_t);
|
||||||
java_init.data = &java_data;
|
java_init.data = &java_data;
|
||||||
|
java_init.ctx_data = env;
|
||||||
java_init.shm_limit = app_conf->shm_limit;
|
java_init.shm_limit = app_conf->shm_limit;
|
||||||
|
|
||||||
ctx = nxt_unit_init(&java_init);
|
ctx = nxt_unit_init(&java_init);
|
||||||
@@ -427,9 +446,8 @@ nxt_java_start(nxt_task_t *task, nxt_process_data_t *data)
|
|||||||
}
|
}
|
||||||
|
|
||||||
rc = nxt_unit_run(ctx);
|
rc = nxt_unit_run(ctx);
|
||||||
if (nxt_slow_path(rc != NXT_UNIT_OK)) {
|
|
||||||
/* TODO report error */
|
nxt_java_join_threads(ctx, c);
|
||||||
}
|
|
||||||
|
|
||||||
nxt_java_stopContext(env, java_data.ctx);
|
nxt_java_stopContext(env, java_data.ctx);
|
||||||
|
|
||||||
@@ -441,7 +459,7 @@ nxt_java_start(nxt_task_t *task, nxt_process_data_t *data)
|
|||||||
|
|
||||||
(*jvm)->DestroyJavaVM(jvm);
|
(*jvm)->DestroyJavaVM(jvm);
|
||||||
|
|
||||||
exit(0);
|
exit(rc);
|
||||||
|
|
||||||
return NXT_OK;
|
return NXT_OK;
|
||||||
|
|
||||||
@@ -464,7 +482,7 @@ nxt_java_request_handler(nxt_unit_request_info_t *req)
|
|||||||
nxt_java_request_data_t *data;
|
nxt_java_request_data_t *data;
|
||||||
|
|
||||||
java_data = req->unit->data;
|
java_data = req->unit->data;
|
||||||
env = java_data->env;
|
env = req->ctx->data;
|
||||||
data = req->data;
|
data = req->data;
|
||||||
|
|
||||||
jreq = nxt_java_newRequest(env, java_data->ctx, req);
|
jreq = nxt_java_newRequest(env, java_data->ctx, req);
|
||||||
@@ -543,11 +561,9 @@ nxt_java_websocket_handler(nxt_unit_websocket_frame_t *ws)
|
|||||||
void *b;
|
void *b;
|
||||||
JNIEnv *env;
|
JNIEnv *env;
|
||||||
jobject jbuf;
|
jobject jbuf;
|
||||||
nxt_java_data_t *java_data;
|
|
||||||
nxt_java_request_data_t *data;
|
nxt_java_request_data_t *data;
|
||||||
|
|
||||||
java_data = ws->req->unit->data;
|
env = ws->req->ctx->data;
|
||||||
env = java_data->env;
|
|
||||||
data = ws->req->data;
|
data = ws->req->data;
|
||||||
|
|
||||||
b = malloc(ws->payload_len);
|
b = malloc(ws->payload_len);
|
||||||
@@ -578,11 +594,9 @@ static void
|
|||||||
nxt_java_close_handler(nxt_unit_request_info_t *req)
|
nxt_java_close_handler(nxt_unit_request_info_t *req)
|
||||||
{
|
{
|
||||||
JNIEnv *env;
|
JNIEnv *env;
|
||||||
nxt_java_data_t *java_data;
|
|
||||||
nxt_java_request_data_t *data;
|
nxt_java_request_data_t *data;
|
||||||
|
|
||||||
java_data = req->unit->data;
|
env = req->ctx->data;
|
||||||
env = java_data->env;
|
|
||||||
data = req->data;
|
data = req->data;
|
||||||
|
|
||||||
nxt_java_Request_close(env, data->jreq);
|
nxt_java_Request_close(env, data->jreq);
|
||||||
@@ -593,3 +607,160 @@ nxt_java_close_handler(nxt_unit_request_info_t *req)
|
|||||||
nxt_unit_request_done(req, NXT_UNIT_OK);
|
nxt_unit_request_done(req, NXT_UNIT_OK);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static int
|
||||||
|
nxt_java_ready_handler(nxt_unit_ctx_t *ctx)
|
||||||
|
{
|
||||||
|
int res;
|
||||||
|
uint32_t i;
|
||||||
|
nxt_java_data_t *java_data;
|
||||||
|
nxt_java_app_conf_t *c;
|
||||||
|
|
||||||
|
/* Worker thread context. */
|
||||||
|
if (!nxt_unit_is_main_ctx(ctx)) {
|
||||||
|
return NXT_UNIT_OK;
|
||||||
|
}
|
||||||
|
|
||||||
|
java_data = ctx->unit->data;
|
||||||
|
c = java_data->conf;
|
||||||
|
|
||||||
|
if (c->threads <= 1) {
|
||||||
|
return NXT_UNIT_OK;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (i = 0; i < c->threads - 1; i++) {
|
||||||
|
res = pthread_create(&nxt_java_threads[i], nxt_java_thread_attr,
|
||||||
|
nxt_java_thread_func, ctx);
|
||||||
|
|
||||||
|
if (nxt_fast_path(res == 0)) {
|
||||||
|
nxt_unit_debug(ctx, "thread #%d created", (int) (i + 1));
|
||||||
|
|
||||||
|
} else {
|
||||||
|
nxt_unit_alert(ctx, "thread #%d create failed: %s (%d)",
|
||||||
|
(int) (i + 1), strerror(res), res);
|
||||||
|
|
||||||
|
return NXT_UNIT_ERROR;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return NXT_UNIT_OK;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static void *
|
||||||
|
nxt_java_thread_func(void *data)
|
||||||
|
{
|
||||||
|
int rc;
|
||||||
|
JavaVM *jvm;
|
||||||
|
JNIEnv *env;
|
||||||
|
nxt_unit_ctx_t *main_ctx, *ctx;
|
||||||
|
nxt_java_data_t *java_data;
|
||||||
|
|
||||||
|
main_ctx = data;
|
||||||
|
|
||||||
|
nxt_unit_debug(main_ctx, "worker thread start");
|
||||||
|
|
||||||
|
java_data = main_ctx->unit->data;
|
||||||
|
jvm = java_data->jvm;
|
||||||
|
|
||||||
|
rc = (*jvm)->AttachCurrentThread(jvm, (void **) &env, NULL);
|
||||||
|
if (rc != JNI_OK) {
|
||||||
|
nxt_unit_alert(main_ctx, "failed to attach Java VM: %d", (int) rc);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
nxt_java_setContextClassLoader(env, java_data->cl);
|
||||||
|
|
||||||
|
ctx = nxt_unit_ctx_alloc(main_ctx, env);
|
||||||
|
if (nxt_slow_path(ctx == NULL)) {
|
||||||
|
goto fail;
|
||||||
|
}
|
||||||
|
|
||||||
|
(void) nxt_unit_run(ctx);
|
||||||
|
|
||||||
|
nxt_unit_done(ctx);
|
||||||
|
|
||||||
|
fail:
|
||||||
|
|
||||||
|
(*jvm)->DetachCurrentThread(jvm);
|
||||||
|
|
||||||
|
nxt_unit_debug(NULL, "worker thread end");
|
||||||
|
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static int
|
||||||
|
nxt_java_init_threads(nxt_java_app_conf_t *c)
|
||||||
|
{
|
||||||
|
int res;
|
||||||
|
static pthread_attr_t attr;
|
||||||
|
|
||||||
|
if (c->threads <= 1) {
|
||||||
|
return NXT_UNIT_OK;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (c->thread_stack_size > 0) {
|
||||||
|
res = pthread_attr_init(&attr);
|
||||||
|
if (nxt_slow_path(res != 0)) {
|
||||||
|
nxt_unit_alert(NULL, "thread attr init failed: %s (%d)",
|
||||||
|
strerror(res), res);
|
||||||
|
|
||||||
|
return NXT_UNIT_ERROR;
|
||||||
|
}
|
||||||
|
|
||||||
|
res = pthread_attr_setstacksize(&attr, c->thread_stack_size);
|
||||||
|
if (nxt_slow_path(res != 0)) {
|
||||||
|
nxt_unit_alert(NULL, "thread attr set stack size failed: %s (%d)",
|
||||||
|
strerror(res), res);
|
||||||
|
|
||||||
|
return NXT_UNIT_ERROR;
|
||||||
|
}
|
||||||
|
|
||||||
|
nxt_java_thread_attr = &attr;
|
||||||
|
}
|
||||||
|
|
||||||
|
nxt_java_threads = nxt_unit_malloc(NULL,
|
||||||
|
sizeof(pthread_t) * (c->threads - 1));
|
||||||
|
if (nxt_slow_path(nxt_java_threads == NULL)) {
|
||||||
|
nxt_unit_alert(NULL, "Failed to allocate thread id array");
|
||||||
|
|
||||||
|
return NXT_UNIT_ERROR;
|
||||||
|
}
|
||||||
|
|
||||||
|
memset(nxt_java_threads, 0, sizeof(pthread_t) * (c->threads - 1));
|
||||||
|
|
||||||
|
return NXT_UNIT_OK;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static void
|
||||||
|
nxt_java_join_threads(nxt_unit_ctx_t *ctx, nxt_java_app_conf_t *c)
|
||||||
|
{
|
||||||
|
int res;
|
||||||
|
uint32_t i;
|
||||||
|
|
||||||
|
if (nxt_java_threads == NULL) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (i = 0; i < c->threads - 1; i++) {
|
||||||
|
if ((uintptr_t) nxt_java_threads[i] == 0) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
res = pthread_join(nxt_java_threads[i], NULL);
|
||||||
|
|
||||||
|
if (nxt_fast_path(res == 0)) {
|
||||||
|
nxt_unit_debug(ctx, "thread #%d joined", (int) i);
|
||||||
|
|
||||||
|
} else {
|
||||||
|
nxt_unit_alert(ctx, "thread #%d join failed: %s (%d)",
|
||||||
|
(int) i, strerror(res), res);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
nxt_unit_free(ctx, nxt_java_threads);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -266,6 +266,16 @@ static nxt_conf_map_t nxt_java_app_conf[] = {
|
|||||||
NXT_CONF_MAP_CSTRZ,
|
NXT_CONF_MAP_CSTRZ,
|
||||||
offsetof(nxt_common_app_conf_t, u.java.unit_jars),
|
offsetof(nxt_common_app_conf_t, u.java.unit_jars),
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
nxt_string("threads"),
|
||||||
|
NXT_CONF_MAP_INT32,
|
||||||
|
offsetof(nxt_common_app_conf_t, u.java.threads),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
nxt_string("thread_stack_size"),
|
||||||
|
NXT_CONF_MAP_INT32,
|
||||||
|
offsetof(nxt_common_app_conf_t, u.java.thread_stack_size),
|
||||||
|
},
|
||||||
|
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user