1/* Licensed to the Apache Software Foundation (ASF) under one or more
2 * contributor license agreements.  See the NOTICE file distributed with
3 * this work for additional information regarding copyright ownership.
4 * The ASF licenses this file to You under the Apache License, Version 2.0
5 * (the "License"); you may not use this file except in compliance with
6 * the License.  You may obtain a copy of the License at
7 *
8 *     http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17#include "mod_proxy.h"
18#include "scoreboard.h"
19#include "ap_mpm.h"
20#include "apr_version.h"
21#include "ap_hooks.h"
22#include "ap_slotmem.h"
23#include "heartbeat.h"
24
25#ifndef LBM_HEARTBEAT_MAX_LASTSEEN
26/* If we haven't seen a heartbeat in the last N seconds, don't count this IP
27 * as allive.
28 */
29#define LBM_HEARTBEAT_MAX_LASTSEEN (10)
30#endif
31
32module AP_MODULE_DECLARE_DATA lbmethod_heartbeat_module;
33
34static int (*ap_proxy_retry_worker_fn)(const char *proxy_function,
35        proxy_worker *worker, server_rec *s) = NULL;
36
37static const ap_slotmem_provider_t *storage = NULL;
38static ap_slotmem_instance_t *hm_serversmem = NULL;
39
40/*
41 * configuration structure
42 * path: path of the file where the heartbeat information is stored.
43 */
44typedef struct lb_hb_ctx_t
45{
46    const char *path;
47} lb_hb_ctx_t;
48
49typedef struct hb_server_t {
50    const char *ip;
51    int busy;
52    int ready;
53    int port;
54    int id;
55    apr_time_t seen;
56    proxy_worker *worker;
57} hb_server_t;
58
59typedef struct ctx_servers {
60    apr_time_t now;
61    apr_hash_t *servers;
62} ctx_servers_t;
63
64static void
65argstr_to_table(apr_pool_t *p, char *str, apr_table_t *parms)
66{
67    char *key;
68    char *value;
69    char *strtok_state;
70
71    key = apr_strtok(str, "&", &strtok_state);
72    while (key) {
73        value = strchr(key, '=');
74        if (value) {
75            *value = '\0';      /* Split the string in two */
76            value++;            /* Skip passed the = */
77        }
78        else {
79            value = "1";
80        }
81        ap_unescape_url(key);
82        ap_unescape_url(value);
83        apr_table_set(parms, key, value);
84        /*
85         ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r,
86         "Found query arg: %s = %s", key, value);
87         */
88        key = apr_strtok(NULL, "&", &strtok_state);
89    }
90}
91
92static apr_status_t readfile_heartbeats(const char *path, apr_hash_t *servers,
93                                    apr_pool_t *pool)
94{
95    apr_finfo_t fi;
96    apr_status_t rv;
97    apr_file_t *fp;
98
99    if (!path) {
100        return APR_SUCCESS;
101    }
102
103    rv = apr_file_open(&fp, path, APR_READ|APR_BINARY|APR_BUFFERED,
104                       APR_OS_DEFAULT, pool);
105
106    if (rv) {
107        return rv;
108    }
109
110    rv = apr_file_info_get(&fi, APR_FINFO_SIZE, fp);
111
112    if (rv) {
113        return rv;
114    }
115
116    {
117        char *t;
118        int lineno = 0;
119        apr_bucket_alloc_t *ba = apr_bucket_alloc_create(pool);
120        apr_bucket_brigade *bb = apr_brigade_create(pool, ba);
121        apr_bucket_brigade *tmpbb = apr_brigade_create(pool, ba);
122        apr_table_t *hbt = apr_table_make(pool, 10);
123
124        apr_brigade_insert_file(bb, fp, 0, fi.size, pool);
125
126        do {
127            hb_server_t *server;
128            char buf[4096];
129            apr_size_t bsize = sizeof(buf);
130            const char *ip;
131
132            apr_brigade_cleanup(tmpbb);
133
134            if (APR_BRIGADE_EMPTY(bb)) {
135                break;
136            }
137
138            rv = apr_brigade_split_line(tmpbb, bb,
139                                        APR_BLOCK_READ, sizeof(buf));
140            lineno++;
141
142            if (rv) {
143                return rv;
144            }
145
146            apr_brigade_flatten(tmpbb, buf, &bsize);
147
148            if (bsize == 0) {
149                break;
150            }
151
152            buf[bsize - 1] = 0;
153
154            /* comment */
155            if (buf[0] == '#') {
156                continue;
157            }
158
159            /* line format: <IP> <query_string>\n */
160            t = strchr(buf, ' ');
161            if (!t) {
162                continue;
163            }
164
165            ip = apr_pstrmemdup(pool, buf, t - buf);
166            t++;
167
168            server = apr_hash_get(servers, ip, APR_HASH_KEY_STRING);
169
170            if (server == NULL) {
171                server = apr_pcalloc(pool, sizeof(hb_server_t));
172                server->ip = ip;
173                server->port = 80;
174                server->seen = -1;
175
176                apr_hash_set(servers, server->ip, APR_HASH_KEY_STRING, server);
177            }
178
179            apr_table_clear(hbt);
180
181            argstr_to_table(pool, apr_pstrdup(pool, t), hbt);
182
183            if (apr_table_get(hbt, "busy")) {
184                server->busy = atoi(apr_table_get(hbt, "busy"));
185            }
186
187            if (apr_table_get(hbt, "ready")) {
188                server->ready = atoi(apr_table_get(hbt, "ready"));
189            }
190
191            if (apr_table_get(hbt, "lastseen")) {
192                server->seen = atoi(apr_table_get(hbt, "lastseen"));
193            }
194
195            if (apr_table_get(hbt, "port")) {
196                server->port = atoi(apr_table_get(hbt, "port"));
197            }
198
199            if (server->busy == 0 && server->ready != 0) {
200                /* Server has zero threads active, but lots of them ready,
201                 * it likely just started up, so lets /4 the number ready,
202                 * to prevent us from completely flooding it with all new
203                 * requests.
204                 */
205                server->ready = server->ready / 4;
206            }
207
208        } while (1);
209    }
210
211    return APR_SUCCESS;
212}
213
214static apr_status_t hm_read(void* mem, void *data, apr_pool_t *pool)
215{
216    hm_slot_server_t *slotserver = (hm_slot_server_t *) mem;
217    ctx_servers_t *ctx = (ctx_servers_t *) data;
218    apr_hash_t *servers = (apr_hash_t *) ctx->servers;
219    hb_server_t *server = apr_hash_get(servers, slotserver->ip, APR_HASH_KEY_STRING);
220    if (server == NULL) {
221        server = apr_pcalloc(pool, sizeof(hb_server_t));
222        server->ip = apr_pstrdup(pool, slotserver->ip);
223        server->seen = -1;
224
225        apr_hash_set(servers, server->ip, APR_HASH_KEY_STRING, server);
226
227    }
228    server->busy = slotserver->busy;
229    server->ready = slotserver->ready;
230    server->seen = apr_time_sec(ctx->now - slotserver->seen);
231    server->id = slotserver->id;
232    if (server->busy == 0 && server->ready != 0) {
233        server->ready = server->ready / 4;
234    }
235    return APR_SUCCESS;
236}
237static apr_status_t readslot_heartbeats(ctx_servers_t *ctx,
238                                    apr_pool_t *pool)
239{
240    storage->doall(hm_serversmem, hm_read, ctx, pool);
241    return APR_SUCCESS;
242}
243
244
245static apr_status_t read_heartbeats(const char *path, apr_hash_t *servers,
246                                        apr_pool_t *pool)
247{
248    apr_status_t rv;
249    if (hm_serversmem) {
250        ctx_servers_t ctx;
251        ctx.now = apr_time_now();
252        ctx.servers = servers;
253        rv = readslot_heartbeats(&ctx, pool);
254    } else
255        rv = readfile_heartbeats(path, servers, pool);
256    return rv;
257}
258
259static proxy_worker *find_best_hb(proxy_balancer *balancer,
260                                  request_rec *r)
261{
262    apr_status_t rv;
263    int i;
264    apr_uint32_t openslots = 0;
265    proxy_worker **worker;
266    hb_server_t *server;
267    apr_array_header_t *up_servers;
268    proxy_worker *mycandidate = NULL;
269    apr_pool_t *tpool;
270    apr_hash_t *servers;
271
272    lb_hb_ctx_t *ctx =
273        ap_get_module_config(r->server->module_config,
274                             &lbmethod_heartbeat_module);
275
276    if (!ap_proxy_retry_worker_fn) {
277        ap_proxy_retry_worker_fn =
278                APR_RETRIEVE_OPTIONAL_FN(ap_proxy_retry_worker);
279        if (!ap_proxy_retry_worker_fn) {
280            /* can only happen if mod_proxy isn't loaded */
281            return NULL;
282        }
283    }
284
285    apr_pool_create(&tpool, r->pool);
286
287    servers = apr_hash_make(tpool);
288
289    rv = read_heartbeats(ctx->path, servers, tpool);
290
291    if (rv) {
292        ap_log_rerror(APLOG_MARK, APLOG_ERR, rv, r, APLOGNO(01213)
293                      "lb_heartbeat: Unable to read heartbeats at '%s'",
294                      ctx->path);
295        apr_pool_destroy(tpool);
296        return NULL;
297    }
298
299    up_servers = apr_array_make(tpool, apr_hash_count(servers), sizeof(hb_server_t *));
300
301    for (i = 0; i < balancer->workers->nelts; i++) {
302        worker = &APR_ARRAY_IDX(balancer->workers, i, proxy_worker *);
303        server = apr_hash_get(servers, (*worker)->s->hostname, APR_HASH_KEY_STRING);
304
305        if (!server) {
306            ap_log_rerror(APLOG_MARK, APLOG_DEBUG, rv, r, APLOGNO(01214)
307                      "lb_heartbeat: No server for worker %s", (*worker)->s->name);
308            continue;
309        }
310
311        if (!PROXY_WORKER_IS_USABLE(*worker)) {
312            ap_proxy_retry_worker_fn("BALANCER", *worker, r->server);
313        }
314
315        if (PROXY_WORKER_IS_USABLE(*worker)) {
316            server->worker = *worker;
317            if (server->seen < LBM_HEARTBEAT_MAX_LASTSEEN) {
318                openslots += server->ready;
319                APR_ARRAY_PUSH(up_servers, hb_server_t *) = server;
320            }
321        }
322    }
323
324    if (openslots > 0) {
325        apr_uint32_t c = 0;
326        apr_uint32_t pick = 0;
327
328        pick = ap_random_pick(0, openslots);
329
330        for (i = 0; i < up_servers->nelts; i++) {
331            server = APR_ARRAY_IDX(up_servers, i, hb_server_t *);
332            if (pick >= c && pick <= c + server->ready) {
333                mycandidate = server->worker;
334            }
335
336            c += server->ready;
337        }
338    }
339
340    apr_pool_destroy(tpool);
341
342    return mycandidate;
343}
344
345static apr_status_t reset(proxy_balancer *balancer, server_rec *s) {
346        return APR_SUCCESS;
347}
348
349static apr_status_t age(proxy_balancer *balancer, server_rec *s) {
350        return APR_SUCCESS;
351}
352
353static const proxy_balancer_method heartbeat =
354{
355    "heartbeat",
356    &find_best_hb,
357    NULL,
358    &reset,
359    &age
360};
361
362static int lb_hb_init(apr_pool_t *p, apr_pool_t *plog,
363                          apr_pool_t *ptemp, server_rec *s)
364{
365    apr_size_t size;
366    unsigned int num;
367    lb_hb_ctx_t *ctx = ap_get_module_config(s->module_config,
368                                            &lbmethod_heartbeat_module);
369
370    /* do nothing on first call */
371    if (ap_state_query(AP_SQ_MAIN_STATE) == AP_SQ_MS_CREATE_PRE_CONFIG)
372        return OK;
373
374    storage = ap_lookup_provider(AP_SLOTMEM_PROVIDER_GROUP, "shm",
375                                 AP_SLOTMEM_PROVIDER_VERSION);
376    if (!storage) {
377        ap_log_error(APLOG_MARK, APLOG_NOTICE, 0, s, APLOGNO(02281)
378                     "Failed to lookup provider 'shm' for '%s'. Maybe you "
379                     "need to load mod_slotmem_shm?",
380                     AP_SLOTMEM_PROVIDER_GROUP);
381        return OK;
382    }
383
384    /* Try to use a slotmem created by mod_heartmonitor */
385    storage->attach(&hm_serversmem, "mod_heartmonitor", &size, &num, p);
386    if (!hm_serversmem)
387        ap_log_error(APLOG_MARK, APLOG_NOTICE, 0, s, APLOGNO(02282)
388                     "No slotmem from mod_heartmonitor");
389    else
390        ap_log_error(APLOG_MARK, APLOG_NOTICE, 0, s, APLOGNO(02283)
391                     "Using slotmem from mod_heartmonitor");
392
393    if (hm_serversmem)
394        ctx->path = "(slotmem)";
395
396    return OK;
397}
398
399static void register_hooks(apr_pool_t *p)
400{
401    static const char * const aszPred[]={ "mod_heartmonitor.c", NULL };
402    ap_register_provider(p, PROXY_LBMETHOD, "heartbeat", "0", &heartbeat);
403    ap_hook_post_config(lb_hb_init, aszPred, NULL, APR_HOOK_MIDDLE);
404}
405
406static void *lb_hb_create_config(apr_pool_t *p, server_rec *s)
407{
408    lb_hb_ctx_t *ctx = (lb_hb_ctx_t *) apr_palloc(p, sizeof(lb_hb_ctx_t));
409
410    ctx->path = ap_runtime_dir_relative(p, DEFAULT_HEARTBEAT_STORAGE);
411
412    return ctx;
413}
414
415static void *lb_hb_merge_config(apr_pool_t *p, void *basev, void *overridesv)
416{
417    lb_hb_ctx_t *ps = apr_pcalloc(p, sizeof(lb_hb_ctx_t));
418    lb_hb_ctx_t *base = (lb_hb_ctx_t *) basev;
419    lb_hb_ctx_t *overrides = (lb_hb_ctx_t *) overridesv;
420
421    if (overrides->path) {
422        ps->path = apr_pstrdup(p, overrides->path);
423    }
424    else {
425        ps->path = apr_pstrdup(p, base->path);
426    }
427
428    return ps;
429}
430
431static const char *cmd_lb_hb_storage(cmd_parms *cmd,
432                                  void *dconf, const char *path)
433{
434    apr_pool_t *p = cmd->pool;
435    lb_hb_ctx_t *ctx =
436    (lb_hb_ctx_t *) ap_get_module_config(cmd->server->module_config,
437                                         &lbmethod_heartbeat_module);
438
439    const char *err = ap_check_cmd_context(cmd, GLOBAL_ONLY);
440
441    if (err != NULL) {
442        return err;
443    }
444
445    ctx->path = ap_runtime_dir_relative(p, path);
446
447    return NULL;
448}
449
450static const command_rec cmds[] = {
451    AP_INIT_TAKE1("HeartbeatStorage", cmd_lb_hb_storage, NULL, RSRC_CONF,
452                  "Path to read heartbeat data."),
453    {NULL}
454};
455
456AP_DECLARE_MODULE(lbmethod_heartbeat) = {
457    STANDARD20_MODULE_STUFF,
458    NULL,                       /* create per-directory config structure */
459    NULL,                       /* merge per-directory config structures */
460    lb_hb_create_config,        /* create per-server config structure */
461    lb_hb_merge_config,         /* merge per-server config structures */
462    cmds,                       /* command apr_table_t */
463    register_hooks              /* register hooks */
464};
465