1/* Licensed to the Apache Software Foundation (ASF) under one or more 2 * contributor license agreements. See the NOTICE file distributed with 3 * this work for additional information regarding copyright ownership. 4 * The ASF licenses this file to You under the Apache License, Version 2.0 5 * (the "License"); you may not use this file except in compliance with 6 * the License. You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17#include "mod_proxy.h" 18#include "scoreboard.h" 19#include "ap_mpm.h" 20#include "apr_version.h" 21#include "ap_hooks.h" 22#include "ap_slotmem.h" 23#include "heartbeat.h" 24 25#ifndef LBM_HEARTBEAT_MAX_LASTSEEN 26/* If we haven't seen a heartbeat in the last N seconds, don't count this IP 27 * as allive. 28 */ 29#define LBM_HEARTBEAT_MAX_LASTSEEN (10) 30#endif 31 32module AP_MODULE_DECLARE_DATA lbmethod_heartbeat_module; 33 34static int (*ap_proxy_retry_worker_fn)(const char *proxy_function, 35 proxy_worker *worker, server_rec *s) = NULL; 36 37static const ap_slotmem_provider_t *storage = NULL; 38static ap_slotmem_instance_t *hm_serversmem = NULL; 39 40/* 41 * configuration structure 42 * path: path of the file where the heartbeat information is stored. 43 */ 44typedef struct lb_hb_ctx_t 45{ 46 const char *path; 47} lb_hb_ctx_t; 48 49typedef struct hb_server_t { 50 const char *ip; 51 int busy; 52 int ready; 53 int port; 54 int id; 55 apr_time_t seen; 56 proxy_worker *worker; 57} hb_server_t; 58 59typedef struct ctx_servers { 60 apr_time_t now; 61 apr_hash_t *servers; 62} ctx_servers_t; 63 64static void 65argstr_to_table(apr_pool_t *p, char *str, apr_table_t *parms) 66{ 67 char *key; 68 char *value; 69 char *strtok_state; 70 71 key = apr_strtok(str, "&", &strtok_state); 72 while (key) { 73 value = strchr(key, '='); 74 if (value) { 75 *value = '\0'; /* Split the string in two */ 76 value++; /* Skip passed the = */ 77 } 78 else { 79 value = "1"; 80 } 81 ap_unescape_url(key); 82 ap_unescape_url(value); 83 apr_table_set(parms, key, value); 84 /* 85 ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r, 86 "Found query arg: %s = %s", key, value); 87 */ 88 key = apr_strtok(NULL, "&", &strtok_state); 89 } 90} 91 92static apr_status_t readfile_heartbeats(const char *path, apr_hash_t *servers, 93 apr_pool_t *pool) 94{ 95 apr_finfo_t fi; 96 apr_status_t rv; 97 apr_file_t *fp; 98 99 if (!path) { 100 return APR_SUCCESS; 101 } 102 103 rv = apr_file_open(&fp, path, APR_READ|APR_BINARY|APR_BUFFERED, 104 APR_OS_DEFAULT, pool); 105 106 if (rv) { 107 return rv; 108 } 109 110 rv = apr_file_info_get(&fi, APR_FINFO_SIZE, fp); 111 112 if (rv) { 113 return rv; 114 } 115 116 { 117 char *t; 118 int lineno = 0; 119 apr_bucket_alloc_t *ba = apr_bucket_alloc_create(pool); 120 apr_bucket_brigade *bb = apr_brigade_create(pool, ba); 121 apr_bucket_brigade *tmpbb = apr_brigade_create(pool, ba); 122 apr_table_t *hbt = apr_table_make(pool, 10); 123 124 apr_brigade_insert_file(bb, fp, 0, fi.size, pool); 125 126 do { 127 hb_server_t *server; 128 char buf[4096]; 129 apr_size_t bsize = sizeof(buf); 130 const char *ip; 131 132 apr_brigade_cleanup(tmpbb); 133 134 if (APR_BRIGADE_EMPTY(bb)) { 135 break; 136 } 137 138 rv = apr_brigade_split_line(tmpbb, bb, 139 APR_BLOCK_READ, sizeof(buf)); 140 lineno++; 141 142 if (rv) { 143 return rv; 144 } 145 146 apr_brigade_flatten(tmpbb, buf, &bsize); 147 148 if (bsize == 0) { 149 break; 150 } 151 152 buf[bsize - 1] = 0; 153 154 /* comment */ 155 if (buf[0] == '#') { 156 continue; 157 } 158 159 /* line format: <IP> <query_string>\n */ 160 t = strchr(buf, ' '); 161 if (!t) { 162 continue; 163 } 164 165 ip = apr_pstrmemdup(pool, buf, t - buf); 166 t++; 167 168 server = apr_hash_get(servers, ip, APR_HASH_KEY_STRING); 169 170 if (server == NULL) { 171 server = apr_pcalloc(pool, sizeof(hb_server_t)); 172 server->ip = ip; 173 server->port = 80; 174 server->seen = -1; 175 176 apr_hash_set(servers, server->ip, APR_HASH_KEY_STRING, server); 177 } 178 179 apr_table_clear(hbt); 180 181 argstr_to_table(pool, apr_pstrdup(pool, t), hbt); 182 183 if (apr_table_get(hbt, "busy")) { 184 server->busy = atoi(apr_table_get(hbt, "busy")); 185 } 186 187 if (apr_table_get(hbt, "ready")) { 188 server->ready = atoi(apr_table_get(hbt, "ready")); 189 } 190 191 if (apr_table_get(hbt, "lastseen")) { 192 server->seen = atoi(apr_table_get(hbt, "lastseen")); 193 } 194 195 if (apr_table_get(hbt, "port")) { 196 server->port = atoi(apr_table_get(hbt, "port")); 197 } 198 199 if (server->busy == 0 && server->ready != 0) { 200 /* Server has zero threads active, but lots of them ready, 201 * it likely just started up, so lets /4 the number ready, 202 * to prevent us from completely flooding it with all new 203 * requests. 204 */ 205 server->ready = server->ready / 4; 206 } 207 208 } while (1); 209 } 210 211 return APR_SUCCESS; 212} 213 214static apr_status_t hm_read(void* mem, void *data, apr_pool_t *pool) 215{ 216 hm_slot_server_t *slotserver = (hm_slot_server_t *) mem; 217 ctx_servers_t *ctx = (ctx_servers_t *) data; 218 apr_hash_t *servers = (apr_hash_t *) ctx->servers; 219 hb_server_t *server = apr_hash_get(servers, slotserver->ip, APR_HASH_KEY_STRING); 220 if (server == NULL) { 221 server = apr_pcalloc(pool, sizeof(hb_server_t)); 222 server->ip = apr_pstrdup(pool, slotserver->ip); 223 server->seen = -1; 224 225 apr_hash_set(servers, server->ip, APR_HASH_KEY_STRING, server); 226 227 } 228 server->busy = slotserver->busy; 229 server->ready = slotserver->ready; 230 server->seen = apr_time_sec(ctx->now - slotserver->seen); 231 server->id = slotserver->id; 232 if (server->busy == 0 && server->ready != 0) { 233 server->ready = server->ready / 4; 234 } 235 return APR_SUCCESS; 236} 237static apr_status_t readslot_heartbeats(ctx_servers_t *ctx, 238 apr_pool_t *pool) 239{ 240 storage->doall(hm_serversmem, hm_read, ctx, pool); 241 return APR_SUCCESS; 242} 243 244 245static apr_status_t read_heartbeats(const char *path, apr_hash_t *servers, 246 apr_pool_t *pool) 247{ 248 apr_status_t rv; 249 if (hm_serversmem) { 250 ctx_servers_t ctx; 251 ctx.now = apr_time_now(); 252 ctx.servers = servers; 253 rv = readslot_heartbeats(&ctx, pool); 254 } else 255 rv = readfile_heartbeats(path, servers, pool); 256 return rv; 257} 258 259static proxy_worker *find_best_hb(proxy_balancer *balancer, 260 request_rec *r) 261{ 262 apr_status_t rv; 263 int i; 264 apr_uint32_t openslots = 0; 265 proxy_worker **worker; 266 hb_server_t *server; 267 apr_array_header_t *up_servers; 268 proxy_worker *mycandidate = NULL; 269 apr_pool_t *tpool; 270 apr_hash_t *servers; 271 272 lb_hb_ctx_t *ctx = 273 ap_get_module_config(r->server->module_config, 274 &lbmethod_heartbeat_module); 275 276 if (!ap_proxy_retry_worker_fn) { 277 ap_proxy_retry_worker_fn = 278 APR_RETRIEVE_OPTIONAL_FN(ap_proxy_retry_worker); 279 if (!ap_proxy_retry_worker_fn) { 280 /* can only happen if mod_proxy isn't loaded */ 281 return NULL; 282 } 283 } 284 285 apr_pool_create(&tpool, r->pool); 286 287 servers = apr_hash_make(tpool); 288 289 rv = read_heartbeats(ctx->path, servers, tpool); 290 291 if (rv) { 292 ap_log_rerror(APLOG_MARK, APLOG_ERR, rv, r, APLOGNO(01213) 293 "lb_heartbeat: Unable to read heartbeats at '%s'", 294 ctx->path); 295 apr_pool_destroy(tpool); 296 return NULL; 297 } 298 299 up_servers = apr_array_make(tpool, apr_hash_count(servers), sizeof(hb_server_t *)); 300 301 for (i = 0; i < balancer->workers->nelts; i++) { 302 worker = &APR_ARRAY_IDX(balancer->workers, i, proxy_worker *); 303 server = apr_hash_get(servers, (*worker)->s->hostname, APR_HASH_KEY_STRING); 304 305 if (!server) { 306 ap_log_rerror(APLOG_MARK, APLOG_DEBUG, rv, r, APLOGNO(01214) 307 "lb_heartbeat: No server for worker %s", (*worker)->s->name); 308 continue; 309 } 310 311 if (!PROXY_WORKER_IS_USABLE(*worker)) { 312 ap_proxy_retry_worker_fn("BALANCER", *worker, r->server); 313 } 314 315 if (PROXY_WORKER_IS_USABLE(*worker)) { 316 server->worker = *worker; 317 if (server->seen < LBM_HEARTBEAT_MAX_LASTSEEN) { 318 openslots += server->ready; 319 APR_ARRAY_PUSH(up_servers, hb_server_t *) = server; 320 } 321 } 322 } 323 324 if (openslots > 0) { 325 apr_uint32_t c = 0; 326 apr_uint32_t pick = 0; 327 328 pick = ap_random_pick(0, openslots); 329 330 for (i = 0; i < up_servers->nelts; i++) { 331 server = APR_ARRAY_IDX(up_servers, i, hb_server_t *); 332 if (pick >= c && pick <= c + server->ready) { 333 mycandidate = server->worker; 334 } 335 336 c += server->ready; 337 } 338 } 339 340 apr_pool_destroy(tpool); 341 342 return mycandidate; 343} 344 345static apr_status_t reset(proxy_balancer *balancer, server_rec *s) { 346 return APR_SUCCESS; 347} 348 349static apr_status_t age(proxy_balancer *balancer, server_rec *s) { 350 return APR_SUCCESS; 351} 352 353static const proxy_balancer_method heartbeat = 354{ 355 "heartbeat", 356 &find_best_hb, 357 NULL, 358 &reset, 359 &age 360}; 361 362static int lb_hb_init(apr_pool_t *p, apr_pool_t *plog, 363 apr_pool_t *ptemp, server_rec *s) 364{ 365 apr_size_t size; 366 unsigned int num; 367 lb_hb_ctx_t *ctx = ap_get_module_config(s->module_config, 368 &lbmethod_heartbeat_module); 369 370 /* do nothing on first call */ 371 if (ap_state_query(AP_SQ_MAIN_STATE) == AP_SQ_MS_CREATE_PRE_CONFIG) 372 return OK; 373 374 storage = ap_lookup_provider(AP_SLOTMEM_PROVIDER_GROUP, "shm", 375 AP_SLOTMEM_PROVIDER_VERSION); 376 if (!storage) { 377 ap_log_error(APLOG_MARK, APLOG_NOTICE, 0, s, APLOGNO(02281) 378 "Failed to lookup provider 'shm' for '%s'. Maybe you " 379 "need to load mod_slotmem_shm?", 380 AP_SLOTMEM_PROVIDER_GROUP); 381 return OK; 382 } 383 384 /* Try to use a slotmem created by mod_heartmonitor */ 385 storage->attach(&hm_serversmem, "mod_heartmonitor", &size, &num, p); 386 if (!hm_serversmem) 387 ap_log_error(APLOG_MARK, APLOG_NOTICE, 0, s, APLOGNO(02282) 388 "No slotmem from mod_heartmonitor"); 389 else 390 ap_log_error(APLOG_MARK, APLOG_NOTICE, 0, s, APLOGNO(02283) 391 "Using slotmem from mod_heartmonitor"); 392 393 if (hm_serversmem) 394 ctx->path = "(slotmem)"; 395 396 return OK; 397} 398 399static void register_hooks(apr_pool_t *p) 400{ 401 static const char * const aszPred[]={ "mod_heartmonitor.c", NULL }; 402 ap_register_provider(p, PROXY_LBMETHOD, "heartbeat", "0", &heartbeat); 403 ap_hook_post_config(lb_hb_init, aszPred, NULL, APR_HOOK_MIDDLE); 404} 405 406static void *lb_hb_create_config(apr_pool_t *p, server_rec *s) 407{ 408 lb_hb_ctx_t *ctx = (lb_hb_ctx_t *) apr_palloc(p, sizeof(lb_hb_ctx_t)); 409 410 ctx->path = ap_runtime_dir_relative(p, DEFAULT_HEARTBEAT_STORAGE); 411 412 return ctx; 413} 414 415static void *lb_hb_merge_config(apr_pool_t *p, void *basev, void *overridesv) 416{ 417 lb_hb_ctx_t *ps = apr_pcalloc(p, sizeof(lb_hb_ctx_t)); 418 lb_hb_ctx_t *base = (lb_hb_ctx_t *) basev; 419 lb_hb_ctx_t *overrides = (lb_hb_ctx_t *) overridesv; 420 421 if (overrides->path) { 422 ps->path = apr_pstrdup(p, overrides->path); 423 } 424 else { 425 ps->path = apr_pstrdup(p, base->path); 426 } 427 428 return ps; 429} 430 431static const char *cmd_lb_hb_storage(cmd_parms *cmd, 432 void *dconf, const char *path) 433{ 434 apr_pool_t *p = cmd->pool; 435 lb_hb_ctx_t *ctx = 436 (lb_hb_ctx_t *) ap_get_module_config(cmd->server->module_config, 437 &lbmethod_heartbeat_module); 438 439 const char *err = ap_check_cmd_context(cmd, GLOBAL_ONLY); 440 441 if (err != NULL) { 442 return err; 443 } 444 445 ctx->path = ap_runtime_dir_relative(p, path); 446 447 return NULL; 448} 449 450static const command_rec cmds[] = { 451 AP_INIT_TAKE1("HeartbeatStorage", cmd_lb_hb_storage, NULL, RSRC_CONF, 452 "Path to read heartbeat data."), 453 {NULL} 454}; 455 456AP_DECLARE_MODULE(lbmethod_heartbeat) = { 457 STANDARD20_MODULE_STUFF, 458 NULL, /* create per-directory config structure */ 459 NULL, /* merge per-directory config structures */ 460 lb_hb_create_config, /* create per-server config structure */ 461 lb_hb_merge_config, /* merge per-server config structures */ 462 cmds, /* command apr_table_t */ 463 register_hooks /* register hooks */ 464}; 465