Legacy upstream code removed.
This commit is contained in:
@@ -149,15 +149,7 @@ typedef void (*nxt_event_conn_handler_t)(nxt_thread_t *thr, nxt_conn_t *c);
|
||||
|
||||
#include <nxt_cache.h>
|
||||
|
||||
#include <nxt_source.h>
|
||||
typedef struct nxt_upstream_source_s nxt_upstream_source_t;
|
||||
|
||||
#include <nxt_http_parse.h>
|
||||
#include <nxt_stream_source.h>
|
||||
#include <nxt_upstream.h>
|
||||
#include <nxt_upstream_source.h>
|
||||
#include <nxt_http_source.h>
|
||||
#include <nxt_fastcgi_source.h>
|
||||
#include <nxt_runtime.h>
|
||||
#include <nxt_port_hash.h>
|
||||
|
||||
|
||||
@@ -5,39 +5,3 @@
|
||||
*/
|
||||
|
||||
#include <nxt_main.h>
|
||||
|
||||
|
||||
typedef struct {
|
||||
void (*peer_get)(nxt_upstream_peer_t *up);
|
||||
void (*peer_free)(nxt_upstream_peer_t *up);
|
||||
} nxt_upstream_name_t;
|
||||
|
||||
|
||||
static const nxt_upstream_name_t nxt_upstream_names[] = {
|
||||
|
||||
{ "round_robin", &nxt_upstream_round_robin },
|
||||
};
|
||||
|
||||
|
||||
void
|
||||
nxt_upstream_create(nxt_upstream_peer_t *up)
|
||||
{
|
||||
/* TODO: dynamic balancer add & lvlhsh */
|
||||
nxt_upstream_names[0].create(up);
|
||||
}
|
||||
|
||||
|
||||
void
|
||||
nxt_upstream_peer(nxt_upstream_peer_t *up)
|
||||
{
|
||||
nxt_upstream_t *u;
|
||||
|
||||
u = up->upstream;
|
||||
|
||||
if (u != NULL) {
|
||||
u->peer_get(up);
|
||||
return;
|
||||
}
|
||||
|
||||
nxt_upstream_create(up);
|
||||
}
|
||||
|
||||
@@ -8,40 +8,4 @@
|
||||
#define _NXT_UPSTREAM_H_INCLUDED_
|
||||
|
||||
|
||||
typedef struct nxt_upstream_peer_s nxt_upstream_peer_t;
|
||||
|
||||
|
||||
struct nxt_upstream_peer_s {
|
||||
/* STUB */
|
||||
void *upstream;
|
||||
void *data;
|
||||
/**/
|
||||
|
||||
nxt_sockaddr_t *sockaddr;
|
||||
nxt_nsec_t delay;
|
||||
|
||||
uint32_t tries;
|
||||
in_port_t port;
|
||||
|
||||
nxt_str_t addr;
|
||||
nxt_mp_t *mem_pool;
|
||||
void (*ready_handler)(nxt_task_t *task, nxt_upstream_peer_t *up);
|
||||
|
||||
void (*protocol_handler)(nxt_upstream_source_t *us);
|
||||
};
|
||||
|
||||
|
||||
typedef struct {
|
||||
void (*ready_handler)(void *data);
|
||||
nxt_work_handler_t completion_handler;
|
||||
nxt_work_handler_t error_handler;
|
||||
} nxt_upstream_state_t;
|
||||
|
||||
|
||||
/* STUB */
|
||||
NXT_EXPORT void nxt_upstream_round_robin_peer(nxt_task_t *task,
|
||||
nxt_upstream_peer_t *up);
|
||||
/**/
|
||||
|
||||
|
||||
#endif /* _NXT_UPSTREAM_H_INCLUDED_ */
|
||||
|
||||
@@ -4,197 +4,3 @@
|
||||
* Copyright (C) NGINX, Inc.
|
||||
*/
|
||||
|
||||
#include <nxt_main.h>
|
||||
|
||||
|
||||
typedef struct {
|
||||
int32_t weight;
|
||||
int32_t effective_weight;
|
||||
int32_t current_weight;
|
||||
uint32_t down; /* 1 bit */
|
||||
nxt_msec_t last_accessed;
|
||||
nxt_sockaddr_t *sockaddr;
|
||||
} nxt_upstream_round_robin_peer_t;
|
||||
|
||||
|
||||
typedef struct {
|
||||
nxt_uint_t npeers;
|
||||
nxt_upstream_round_robin_peer_t *peers;
|
||||
nxt_thread_spinlock_t lock;
|
||||
} nxt_upstream_round_robin_t;
|
||||
|
||||
|
||||
static void nxt_upstream_round_robin_create(nxt_task_t *task, void *obj,
|
||||
void *data);
|
||||
static void nxt_upstream_round_robin_peer_error(nxt_task_t *task, void *obj,
|
||||
void *data);
|
||||
static void nxt_upstream_round_robin_get_peer(nxt_task_t *task,
|
||||
nxt_upstream_peer_t *up);
|
||||
|
||||
|
||||
void
|
||||
nxt_upstream_round_robin_peer(nxt_task_t *task, nxt_upstream_peer_t *up)
|
||||
{
|
||||
nxt_job_sockaddr_parse_t *jbs;
|
||||
|
||||
if (up->upstream != NULL) {
|
||||
nxt_upstream_round_robin_get_peer(task, up);
|
||||
}
|
||||
|
||||
jbs = nxt_job_create(up->mem_pool, sizeof(nxt_job_sockaddr_parse_t));
|
||||
if (nxt_slow_path(jbs == NULL)) {
|
||||
up->ready_handler(task, up);
|
||||
return;
|
||||
}
|
||||
|
||||
jbs->resolve.job.task = task;
|
||||
jbs->resolve.job.data = up;
|
||||
jbs->resolve.port = up->port;
|
||||
jbs->resolve.log_level = NXT_LOG_ERR;
|
||||
jbs->resolve.ready_handler = nxt_upstream_round_robin_create;
|
||||
jbs->resolve.error_handler = nxt_upstream_round_robin_peer_error;
|
||||
jbs->addr = up->addr;
|
||||
|
||||
nxt_job_sockaddr_parse(jbs);
|
||||
}
|
||||
|
||||
|
||||
static void
|
||||
nxt_upstream_round_robin_create(nxt_task_t *task, void *obj, void *data)
|
||||
{
|
||||
nxt_uint_t i;
|
||||
nxt_sockaddr_t *sa;
|
||||
nxt_upstream_peer_t *up;
|
||||
nxt_job_sockaddr_parse_t *jbs;
|
||||
nxt_upstream_round_robin_t *urr;
|
||||
nxt_upstream_round_robin_peer_t *peer;
|
||||
|
||||
jbs = obj;
|
||||
up = jbs->resolve.job.data;
|
||||
|
||||
urr = nxt_mp_zget(up->mem_pool, sizeof(nxt_upstream_round_robin_t));
|
||||
if (nxt_slow_path(urr == NULL)) {
|
||||
goto fail;
|
||||
}
|
||||
|
||||
urr->npeers = jbs->resolve.count;
|
||||
|
||||
peer = nxt_mp_zget(up->mem_pool,
|
||||
urr->npeers * sizeof(nxt_upstream_round_robin_peer_t));
|
||||
if (nxt_slow_path(peer == NULL)) {
|
||||
goto fail;
|
||||
}
|
||||
|
||||
urr->peers = peer;
|
||||
|
||||
for (i = 0; i < urr->npeers; i++) {
|
||||
peer[i].weight = 1;
|
||||
peer[i].effective_weight = 1;
|
||||
|
||||
sa = jbs->resolve.sockaddrs[i];
|
||||
|
||||
/* STUB */
|
||||
sa->type = SOCK_STREAM;
|
||||
|
||||
nxt_sockaddr_text(sa);
|
||||
|
||||
nxt_debug(task, "upstream peer: %*s",
|
||||
(size_t) sa->length, nxt_sockaddr_start(sa));
|
||||
|
||||
/* TODO: memcpy to shared memory pool. */
|
||||
peer[i].sockaddr = sa;
|
||||
}
|
||||
|
||||
up->upstream = urr;
|
||||
|
||||
/* STUB */
|
||||
up->sockaddr = peer[0].sockaddr;
|
||||
|
||||
nxt_job_destroy(task, jbs);
|
||||
up->ready_handler(task, up);
|
||||
|
||||
//nxt_upstream_round_robin_get_peer(up);
|
||||
return;
|
||||
|
||||
fail:
|
||||
|
||||
nxt_job_destroy(task, jbs);
|
||||
|
||||
up->ready_handler(task, up);
|
||||
}
|
||||
|
||||
|
||||
static void
|
||||
nxt_upstream_round_robin_peer_error(nxt_task_t *task, void *obj, void *data)
|
||||
{
|
||||
nxt_upstream_peer_t *up;
|
||||
nxt_job_sockaddr_parse_t *jbs;
|
||||
|
||||
jbs = obj;
|
||||
up = jbs->resolve.job.data;
|
||||
|
||||
up->ready_handler(task, up);
|
||||
}
|
||||
|
||||
|
||||
static void
|
||||
nxt_upstream_round_robin_get_peer(nxt_task_t *task, nxt_upstream_peer_t *up)
|
||||
{
|
||||
int32_t effective_weights;
|
||||
nxt_uint_t i;
|
||||
nxt_msec_t now;
|
||||
nxt_upstream_round_robin_t *urr;
|
||||
nxt_upstream_round_robin_peer_t *peer, *best;
|
||||
|
||||
urr = up->upstream;
|
||||
|
||||
now = task->thread->engine->timers.now;
|
||||
|
||||
nxt_thread_spin_lock(&urr->lock);
|
||||
|
||||
best = NULL;
|
||||
effective_weights = 0;
|
||||
peer = urr->peers;
|
||||
|
||||
for (i = 0; i < urr->npeers; i++) {
|
||||
|
||||
if (peer[i].down) {
|
||||
continue;
|
||||
}
|
||||
|
||||
#if 0
|
||||
if (peer[i].max_fails != 0 && peer[i].fails >= peer->max_fails) {
|
||||
good = peer[i].last_accessed + peer[i].fail_timeout;
|
||||
|
||||
if (nxt_msec_diff(now, peer[i].last_accessed) <= 0) {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
#endif
|
||||
|
||||
peer[i].current_weight += peer[i].effective_weight;
|
||||
effective_weights += peer[i].effective_weight;
|
||||
|
||||
if (peer[i].effective_weight < peer[i].weight) {
|
||||
peer[i].effective_weight++;
|
||||
}
|
||||
|
||||
if (best == NULL || peer[i].current_weight > best->current_weight) {
|
||||
best = &peer[i];
|
||||
}
|
||||
}
|
||||
|
||||
if (best != NULL) {
|
||||
best->current_weight -= effective_weights;
|
||||
best->last_accessed = now;
|
||||
|
||||
up->sockaddr = best->sockaddr;
|
||||
|
||||
} else {
|
||||
up->sockaddr = NULL;
|
||||
}
|
||||
|
||||
nxt_thread_spin_unlock(&urr->lock);
|
||||
|
||||
up->ready_handler(task, up);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user