206 lines
5.1 KiB
C
206 lines
5.1 KiB
C
|
|
/*
|
|
* Copyright (C) Igor Sysoev
|
|
* Copyright (C) NGINX, Inc.
|
|
*/
|
|
|
|
#include <math.h>
|
|
#include <nxt_router.h>
|
|
#include <nxt_http.h>
|
|
#include <nxt_upstream.h>
|
|
|
|
|
|
struct nxt_upstream_round_robin_server_s {
|
|
nxt_sockaddr_t *sockaddr;
|
|
|
|
int32_t current_weight;
|
|
int32_t effective_weight;
|
|
int32_t weight;
|
|
|
|
uint8_t protocol;
|
|
};
|
|
|
|
|
|
struct nxt_upstream_round_robin_s {
|
|
uint32_t items;
|
|
nxt_upstream_round_robin_server_t server[0];
|
|
};
|
|
|
|
|
|
static nxt_upstream_t *nxt_upstream_round_robin_joint_create(
|
|
nxt_router_temp_conf_t *tmcf, nxt_upstream_t *upstream);
|
|
static void nxt_upstream_round_robin_server_get(nxt_task_t *task,
|
|
nxt_upstream_server_t *us);
|
|
|
|
|
|
static const nxt_upstream_server_proto_t nxt_upstream_round_robin_proto = {
|
|
.joint_create = nxt_upstream_round_robin_joint_create,
|
|
.get = nxt_upstream_round_robin_server_get,
|
|
};
|
|
|
|
|
|
nxt_int_t
|
|
nxt_upstream_round_robin_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
|
|
nxt_conf_value_t *upstream_conf, nxt_upstream_t *upstream)
|
|
{
|
|
double total, k, w;
|
|
size_t size;
|
|
uint32_t i, n, next, wt;
|
|
nxt_mp_t *mp;
|
|
nxt_str_t name;
|
|
nxt_sockaddr_t *sa;
|
|
nxt_conf_value_t *servers_conf, *srvcf, *wtcf;
|
|
nxt_upstream_round_robin_t *urr;
|
|
|
|
static nxt_str_t servers = nxt_string("servers");
|
|
static nxt_str_t weight = nxt_string("weight");
|
|
|
|
mp = tmcf->router_conf->mem_pool;
|
|
|
|
servers_conf = nxt_conf_get_object_member(upstream_conf, &servers, NULL);
|
|
n = nxt_conf_object_members_count(servers_conf);
|
|
|
|
total = 0.0;
|
|
next = 0;
|
|
|
|
for (i = 0; i < n; i++) {
|
|
srvcf = nxt_conf_next_object_member(servers_conf, &name, &next);
|
|
wtcf = nxt_conf_get_object_member(srvcf, &weight, NULL);
|
|
w = (wtcf != NULL) ? nxt_conf_get_number(wtcf) : 1;
|
|
total += w;
|
|
}
|
|
|
|
/*
|
|
* This prevents overflow of int32_t
|
|
* in nxt_upstream_round_robin_server_get().
|
|
*/
|
|
k = (total == 0) ? 0 : (NXT_INT32_T_MAX / 2) / total;
|
|
|
|
if (isinf(k)) {
|
|
k = 1;
|
|
}
|
|
|
|
size = sizeof(nxt_upstream_round_robin_t)
|
|
+ n * sizeof(nxt_upstream_round_robin_server_t);
|
|
|
|
urr = nxt_mp_zalloc(mp, size);
|
|
if (nxt_slow_path(urr == NULL)) {
|
|
return NXT_ERROR;
|
|
}
|
|
|
|
urr->items = n;
|
|
next = 0;
|
|
|
|
for (i = 0; i < n; i++) {
|
|
srvcf = nxt_conf_next_object_member(servers_conf, &name, &next);
|
|
|
|
sa = nxt_sockaddr_parse(mp, &name);
|
|
if (nxt_slow_path(sa == NULL)) {
|
|
return NXT_ERROR;
|
|
}
|
|
|
|
sa->type = SOCK_STREAM;
|
|
|
|
urr->server[i].sockaddr = sa;
|
|
urr->server[i].protocol = NXT_HTTP_PROTO_H1;
|
|
|
|
wtcf = nxt_conf_get_object_member(srvcf, &weight, NULL);
|
|
w = (wtcf != NULL) ? k * nxt_conf_get_number(wtcf) : k;
|
|
wt = (w > 1 || w == 0) ? round(w) : 1;
|
|
|
|
urr->server[i].weight = wt;
|
|
urr->server[i].effective_weight = wt;
|
|
}
|
|
|
|
upstream->proto = &nxt_upstream_round_robin_proto;
|
|
upstream->type.round_robin = urr;
|
|
|
|
return NXT_OK;
|
|
}
|
|
|
|
|
|
static nxt_upstream_t *
|
|
nxt_upstream_round_robin_joint_create(nxt_router_temp_conf_t *tmcf,
|
|
nxt_upstream_t *upstream)
|
|
{
|
|
size_t size;
|
|
uint32_t i, n;
|
|
nxt_mp_t *mp;
|
|
nxt_upstream_t *u;
|
|
nxt_upstream_round_robin_t *urr, *urrcf;
|
|
|
|
mp = tmcf->router_conf->mem_pool;
|
|
|
|
u = nxt_mp_alloc(mp, sizeof(nxt_upstream_t));
|
|
if (nxt_slow_path(u == NULL)) {
|
|
return NULL;
|
|
}
|
|
|
|
*u = *upstream;
|
|
|
|
urrcf = upstream->type.round_robin;
|
|
|
|
size = sizeof(nxt_upstream_round_robin_t)
|
|
+ urrcf->items * sizeof(nxt_upstream_round_robin_server_t);
|
|
|
|
urr = nxt_mp_alloc(mp, size);
|
|
if (nxt_slow_path(urr == NULL)) {
|
|
return NULL;
|
|
}
|
|
|
|
u->type.round_robin = urr;
|
|
|
|
n = urrcf->items;
|
|
urr->items = n;
|
|
|
|
for (i = 0; i < n; i++) {
|
|
urr->server[i] = urrcf->server[i];
|
|
}
|
|
|
|
return u;
|
|
}
|
|
|
|
|
|
static void
|
|
nxt_upstream_round_robin_server_get(nxt_task_t *task, nxt_upstream_server_t *us)
|
|
{
|
|
int32_t total;
|
|
uint32_t i, n;
|
|
nxt_upstream_round_robin_t *round_robin;
|
|
nxt_upstream_round_robin_server_t *s, *best;
|
|
|
|
best = NULL;
|
|
total = 0;
|
|
|
|
round_robin = us->upstream->type.round_robin;
|
|
|
|
s = round_robin->server;
|
|
n = round_robin->items;
|
|
|
|
for (i = 0; i < n; i++) {
|
|
|
|
s[i].current_weight += s[i].effective_weight;
|
|
total += s[i].effective_weight;
|
|
|
|
if (s[i].effective_weight < s[i].weight) {
|
|
s[i].effective_weight++;
|
|
}
|
|
|
|
if (best == NULL || s[i].current_weight > best->current_weight) {
|
|
best = &s[i];
|
|
}
|
|
}
|
|
|
|
if (best == NULL || total == 0) {
|
|
us->state->error(task, us);
|
|
return;
|
|
}
|
|
|
|
best->current_weight -= total;
|
|
us->sockaddr = best->sockaddr;
|
|
us->protocol = best->protocol;
|
|
us->server.round_robin = best;
|
|
|
|
us->state->ready(task, us);
|
|
}
|