httpd-cvs mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pque...@apache.org
Subject svn commit: r721952 - in /httpd/httpd/trunk: ./ modules/ modules/cluster/
Date Mon, 01 Dec 2008 02:55:15 GMT
Author: pquerna
Date: Sun Nov 30 18:55:14 2008
New Revision: 721952

URL: http://svn.apache.org/viewvc?rev=721952&view=rev
Log:
Add two new modules, originally written at Joost, to handle load balancing across
multiple apache servers within the same datacenter.

mod_heartbeat generates multicast status messages with the current number of 
clients connected, but the formated can easily be extended to include other things.

mod_heartmonitor collects these messages into a static file, which then can be 
used for other modules to make load balancing decisions on.

Added:
    httpd/httpd/trunk/modules/cluster/   (with props)
    httpd/httpd/trunk/modules/cluster/Makefile.in   (with props)
    httpd/httpd/trunk/modules/cluster/README.heartbeat
    httpd/httpd/trunk/modules/cluster/README.heartmonitor
    httpd/httpd/trunk/modules/cluster/config.m4
    httpd/httpd/trunk/modules/cluster/mod_heartbeat.c   (with props)
    httpd/httpd/trunk/modules/cluster/mod_heartmonitor.c   (with props)
Modified:
    httpd/httpd/trunk/CHANGES
    httpd/httpd/trunk/modules/README

Modified: httpd/httpd/trunk/CHANGES
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/CHANGES?rev=721952&r1=721951&r2=721952&view=diff
==============================================================================
--- httpd/httpd/trunk/CHANGES [utf-8] (original)
+++ httpd/httpd/trunk/CHANGES [utf-8] Sun Nov 30 18:55:14 2008
@@ -2,6 +2,12 @@
 Changes with Apache 2.3.0
 [ When backported to 2.2.x, remove entry from this file ]
 
+  *) mod_heartmonitor: New module to collect heartbeats, and write out a file
+     so that other modules can load balance traffic as needed. [Paul Querna]
+
+  *) mod_heartbeat: New module to genarate multicast heartbeats to konw if a 
+     server is online. [Paul Querna]
+
   *) core: Error responses set by filters were being coerced into 500 errors,
      sometimes appended to the original error response. Log entry of:
      'Handler for (null) returned invalid result code -3' 

Modified: httpd/httpd/trunk/modules/README
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/README?rev=721952&r1=721951&r2=721952&view=diff
==============================================================================
--- httpd/httpd/trunk/modules/README (original)
+++ httpd/httpd/trunk/modules/README Sun Nov 30 18:55:14 2008
@@ -13,6 +13,9 @@
 database/
   The apache DBD framework manages connections to SQL backends efficiently.
 
+cluster/
+  Modules for working with multiple servers.
+
 dav/
   This directory houses modules that implement WebDAV functionality.
 

Propchange: httpd/httpd/trunk/modules/cluster/
------------------------------------------------------------------------------
--- svn:ignore (added)
+++ svn:ignore Sun Nov 30 18:55:14 2008
@@ -0,0 +1,5 @@
+.libs
+.deps
+*.slo
+Makefile
+modules.mk

Added: httpd/httpd/trunk/modules/cluster/Makefile.in
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/cluster/Makefile.in?rev=721952&view=auto
==============================================================================
--- httpd/httpd/trunk/modules/cluster/Makefile.in (added)
+++ httpd/httpd/trunk/modules/cluster/Makefile.in Sun Nov 30 18:55:14 2008
@@ -0,0 +1,3 @@
+# a modules Makefile has no explicit targets -- they will be defined by
+# whatever modules are enabled. just grab special.mk to deal with this.
+include $(top_srcdir)/build/special.mk

Propchange: httpd/httpd/trunk/modules/cluster/Makefile.in
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: httpd/httpd/trunk/modules/cluster/Makefile.in
------------------------------------------------------------------------------
    svn:keywords = Date Revision Author HeadURL Id

Added: httpd/httpd/trunk/modules/cluster/README.heartbeat
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/cluster/README.heartbeat?rev=721952&view=auto
==============================================================================
--- httpd/httpd/trunk/modules/cluster/README.heartbeat (added)
+++ httpd/httpd/trunk/modules/cluster/README.heartbeat Sun Nov 30 18:55:14 2008
@@ -0,0 +1,33 @@
+mod_heartbeat
+
+Broadcasts the current Apache Connection status over multicast.
+
+Example Configuration:
+  HeartbeatAddress 239.0.0.1:27999
+
+Dependencies:
+  mod_status must be either a static module, or if a dynamic module, it must be 
+  loaded before mod_heartbeat.
+
+
+Consuming:
+  Every 1 second, this module generates a single multicast UDP packet,
+  containing the number of busy and idle workers.
+  
+  The packet is a simple ASCII format, similiar to GET query parameters in UDP.
+  
+  An Example packet:
+    v=1&ready=75&busy=0
+
+  Consumers should handle new variables besides busy and ready, separated by '&'
+  being added in the future.
+  
+Misc:
+  The interval of 1 seconds is controlled by the HEARTBEAT_INTERVAL
+  compile time define.  This is not currently tunable at run time. To make this
+  module send the status packet more often, you must add to the CFLAGS used to
+  compile the module to include:
+    -DHEARTBEAT_INTERVAL=3
+  Would cause the broadcasts to be sent every 3 seconds.
+
+

Added: httpd/httpd/trunk/modules/cluster/README.heartmonitor
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/cluster/README.heartmonitor?rev=721952&view=auto
==============================================================================
--- httpd/httpd/trunk/modules/cluster/README.heartmonitor (added)
+++ httpd/httpd/trunk/modules/cluster/README.heartmonitor Sun Nov 30 18:55:14 2008
@@ -0,0 +1,30 @@
+mod_heartmonitor
+
+Collects the Apache Connection status data over multicast.
+
+Example Configuration:
+  # First parameter is the interface to listen on
+  HeartbeatListen 239.0.0.1:27999
+  # Absolute path, or relative path to ServerRoot
+  HeartbeatStorage logs/hb.dat
+
+Dependencies:
+  Due to a bug in APR's apr_socket_recvfrom, version 1.2.12 or newer must be
+  used:
+    <http://svn.apache.org/viewvc?view=rev&revision=467600>
+
+Consuming:
+  This module atomically writes to the configured path, a list of servers, 
+  along with metadata about them.
+  
+  Included data about each server:
+    - IP Address
+    - Busy Slots
+    - Open Slots
+    - Last Seen
+
+  Every 5 seconds, this file will be updated with the current status of the 
+  cluster.
+
+  
+

Added: httpd/httpd/trunk/modules/cluster/config.m4
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/cluster/config.m4?rev=721952&view=auto
==============================================================================
--- httpd/httpd/trunk/modules/cluster/config.m4 (added)
+++ httpd/httpd/trunk/modules/cluster/config.m4 Sun Nov 30 18:55:14 2008
@@ -0,0 +1,7 @@
+
+APACHE_MODPATH_INIT(cluster)
+
+APACHE_MODULE(heartbeat, Generates Heartbeats, , , most)
+APACHE_MODULE(heartmonitor, Collects Heartbeats, , , most)
+
+APACHE_MODPATH_FINISH

Added: httpd/httpd/trunk/modules/cluster/mod_heartbeat.c
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/cluster/mod_heartbeat.c?rev=721952&view=auto
==============================================================================
--- httpd/httpd/trunk/modules/cluster/mod_heartbeat.c (added)
+++ httpd/httpd/trunk/modules/cluster/mod_heartbeat.c Sun Nov 30 18:55:14 2008
@@ -0,0 +1,354 @@
+/* Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "httpd.h"
+#include "http_config.h"
+#include "http_log.h"
+#include "apr_strings.h"
+
+#include "ap_mpm.h"
+#include "scoreboard.h"
+
+#ifndef HEARTBEAT_INTERVAL
+#define HEARTBEAT_INTERVAL (1)
+#endif
+
+module AP_MODULE_DECLARE_DATA heartbeat_module;
+
+typedef struct hb_ctx_t
+{
+    int active;
+    apr_sockaddr_t *mcast_addr;
+    int server_limit;
+    int thread_limit;
+    int status;
+    int keep_running;
+    apr_proc_mutex_t *mutex;
+    const char *mutex_path;
+    apr_thread_mutex_t *start_mtx;
+    apr_thread_t *thread;
+    apr_file_t *lockf;
+} hb_ctx_t;
+
+static const char *msg_format = "v=%u&ready=%u&busy=%u";
+
+#define MSG_VERSION (1)
+
+static int hb_monitor(hb_ctx_t *ctx, apr_pool_t *p)
+{
+    int i, j;
+    apr_uint32_t ready = 0;
+    apr_uint32_t busy = 0;
+
+    for (i = 0; i < ctx->server_limit; i++) {
+        process_score *ps;
+        ps = ap_get_scoreboard_process(i);
+
+        for (j = 0; j < ctx->thread_limit; j++) {
+            worker_score *ws = NULL;
+
+            ws = &ap_scoreboard_image->servers[i][j];
+
+            int res = ws->status;
+
+            if (res == SERVER_READY && ps->generation == ap_my_generation) {
+                ready++;
+            }
+            else if (res != SERVER_DEAD &&
+                     res != SERVER_STARTING && res != SERVER_IDLE_KILL) {
+                busy++;
+            }
+        }
+    }
+
+    char buf[256];
+    apr_size_t len =
+        apr_snprintf(buf, sizeof(buf), msg_format, MSG_VERSION, ready, busy);
+
+    apr_socket_t *sock = NULL;
+    do {
+        apr_status_t rv;
+        rv = apr_socket_create(&sock, ctx->mcast_addr->family,
+                               SOCK_DGRAM, APR_PROTO_UDP, p);
+        if (rv) {
+            ap_log_error(APLOG_MARK, APLOG_WARNING, rv,
+                         NULL, "Heartbeat: apr_socket_create failed");
+            break;
+        }
+
+        rv = apr_mcast_loopback(sock, 1);
+        if (rv) {
+            ap_log_error(APLOG_MARK, APLOG_WARNING, rv,
+                         NULL, "Heartbeat: apr_mcast_loopback failed");
+            break;
+        }
+
+        rv = apr_socket_sendto(sock, ctx->mcast_addr, 0, buf, &len);
+        if (rv) {
+            ap_log_error(APLOG_MARK, APLOG_WARNING, rv,
+                         NULL, "Heartbeat: apr_socket_sendto failed");
+            break;
+        }
+    } while (0);
+
+    if (sock) {
+        apr_socket_close(sock);
+    }
+
+    return OK;
+}
+
+#ifndef apr_time_from_msec
+#define apr_time_from_msec(x) (x * 1000)
+#endif
+
+static void *hb_worker(apr_thread_t *thd, void *data)
+{
+    hb_ctx_t *ctx = (hb_ctx_t *) data;
+    apr_status_t rv;
+
+    apr_pool_t *pool = apr_thread_pool_get(thd);
+    apr_pool_tag(pool, "heartbeat_worker");
+    ctx->status = 0;
+    ctx->keep_running = 1;
+    apr_thread_mutex_unlock(ctx->start_mtx);
+
+    while (ctx->keep_running) {
+        rv = apr_proc_mutex_trylock(ctx->mutex);
+        if (rv == APR_SUCCESS) {
+            break;
+        }
+        apr_sleep(apr_time_from_msec(200));
+    }
+
+    while (ctx->keep_running) {
+        int mpm_state = 0;
+        rv = ap_mpm_query(AP_MPMQ_MPM_STATE, &mpm_state);
+
+        if (rv != APR_SUCCESS) {
+            break;
+        }
+
+        if (mpm_state == AP_MPMQ_STOPPING) {
+            ctx->keep_running = 0;
+            break;
+        }
+
+        apr_pool_t *tpool;
+        apr_pool_create(&tpool, pool);
+        apr_pool_tag(tpool, "heartbeat_worker_temp");
+        hb_monitor(ctx, tpool);
+        apr_pool_destroy(tpool);
+        apr_sleep(apr_time_from_sec(HEARTBEAT_INTERVAL));
+    }
+
+    apr_proc_mutex_unlock(ctx->mutex);
+    apr_thread_exit(ctx->thread, APR_SUCCESS);
+
+    return NULL;
+}
+
+static apr_status_t hb_pool_cleanup(void *baton)
+{
+    apr_status_t rv;
+    hb_ctx_t *ctx = (hb_ctx_t *) baton;
+
+    ctx->keep_running = 0;
+
+    apr_thread_join(&rv, ctx->thread);
+
+    return rv;
+}
+
+static void start_hb_worker(apr_pool_t *p, hb_ctx_t *ctx)
+{
+    apr_status_t rv;
+
+    rv = apr_thread_mutex_create(&ctx->start_mtx, APR_THREAD_MUTEX_UNNESTED,
+                                 p);
+
+    if (rv) {
+        ap_log_error(APLOG_MARK, APLOG_CRIT, rv, NULL,
+                     "Heartbeat: apr_thread_cond_create failed");
+        ctx->status = rv;
+        return;
+    }
+
+    apr_thread_mutex_lock(ctx->start_mtx);
+
+    apr_pool_cleanup_register(p, ctx, hb_pool_cleanup, apr_pool_cleanup_null);
+
+    rv = apr_thread_create(&ctx->thread, NULL, hb_worker, ctx, p);
+    if (rv) {
+        apr_pool_cleanup_kill(p, ctx, hb_pool_cleanup);
+        ap_log_error(APLOG_MARK, APLOG_CRIT, rv, NULL,
+                     "Heartbeat: apr_thread_create failed");
+        ctx->status = rv;
+    }
+
+    apr_thread_mutex_lock(ctx->start_mtx);
+    apr_thread_mutex_unlock(ctx->start_mtx);
+    apr_thread_mutex_destroy(ctx->start_mtx);
+}
+
+static void hb_child_init(apr_pool_t *p, server_rec *s)
+{
+    hb_ctx_t *ctx = ap_get_module_config(s->module_config, &heartbeat_module);
+
+    apr_proc_mutex_child_init(&ctx->mutex, ctx->mutex_path, p);
+
+    ctx->status = -1;
+
+    if (ctx->active) {
+        start_hb_worker(p, ctx);
+        if (ctx->status != 0) {
+            ap_log_error(APLOG_MARK, APLOG_CRIT, 0, s,
+                         "Heartbeat: Failed to start worker thread.");
+            return;
+        }
+    }
+
+    return;
+}
+
+static int hb_init(apr_pool_t *p, apr_pool_t *plog, apr_pool_t *ptemp,
+                   server_rec *s)
+{
+    apr_status_t rv;
+    hb_ctx_t *ctx = ap_get_module_config(s->module_config, &heartbeat_module);
+
+    ap_mpm_query(AP_MPMQ_HARD_LIMIT_THREADS, &ctx->thread_limit);
+    ap_mpm_query(AP_MPMQ_HARD_LIMIT_DAEMONS, &ctx->server_limit);
+
+    rv = apr_proc_mutex_create(&ctx->mutex, ctx->mutex_path,
+#if APR_HAS_FCNTL_SERIALIZE
+                               APR_LOCK_FCNTL,
+#else
+#if APR_HAS_FLOCK_SERIALIZE
+                               APR_LOCK_FLOCK,
+#else
+#error port me to a non crap platform.
+#endif
+#endif
+                               p);
+
+    if (rv) {
+        ap_log_error(APLOG_MARK, APLOG_CRIT, rv, s,
+                     "Heartbeat: mutex failed creation at %s (type=%s)",
+                     ctx->mutex_path, apr_proc_mutex_defname());
+        return !OK;
+    }
+
+    return OK;
+}
+
+static void hb_register_hooks(apr_pool_t *p)
+{
+    ap_hook_post_config(hb_init, NULL, NULL, APR_HOOK_MIDDLE);
+    ap_hook_child_init(hb_child_init, NULL, NULL, APR_HOOK_MIDDLE);
+}
+
+static void *hb_create_config(apr_pool_t *p, server_rec *s)
+{
+    hb_ctx_t *cfg = (hb_ctx_t *) apr_palloc(p, sizeof(hb_ctx_t));
+
+    cfg->active = 0;
+    cfg->thread_limit = 0;
+    cfg->server_limit = 0;
+
+    return cfg;
+}
+
+static const char *cmd_hb_address(cmd_parms *cmd,
+                                  void *dconf, const char *addr)
+{
+    apr_status_t rv;
+    char *host_str;
+    char *scope_id;
+    apr_port_t port = 0;
+    apr_pool_t *p = cmd->pool;
+    hb_ctx_t *ctx =
+        (hb_ctx_t *) ap_get_module_config(cmd->server->module_config,
+                                          &heartbeat_module);
+    const char *err = ap_check_cmd_context(cmd, GLOBAL_ONLY);
+
+    if (err != NULL) {
+        return err;
+    }
+
+    ctx->active = 1;
+
+    rv = apr_parse_addr_port(&host_str, &scope_id, &port, addr, p);
+
+    if (rv) {
+        return "HeartbeatAddress: Unable to parse address.";
+    }
+
+    if (host_str == NULL) {
+        return "HeartbeatAddress: No host provided in address";
+    }
+
+    if (port == 0) {
+        return "HeartbeatAddress: No port provided in address";
+    }
+
+    rv = apr_sockaddr_info_get(&ctx->mcast_addr, host_str, APR_INET, port, 0,
+                               p);
+
+    if (rv) {
+        return "HeartbeatAddress: apr_sockaddr_info_get failed.";
+    }
+
+    const char *tmpdir = NULL;
+    rv = apr_temp_dir_get(&tmpdir, p);
+    if (rv) {
+        return "HeartbeatAddress: unable to find temp directory.";
+    }
+
+    char *path = apr_pstrcat(p, tmpdir, "/hb-tmp.XXXXXX", NULL);
+
+    rv = apr_file_mktemp(&ctx->lockf, path, 0, p);
+
+    if (rv) {
+        return "HeartbeatAddress: unable to allocate temp file.";
+    }
+
+    rv = apr_file_name_get(&ctx->mutex_path, ctx->lockf);
+
+    if (rv) {
+        return "HeartbeatAddress: unable to get lockf name.";
+    }
+
+    apr_file_close(ctx->lockf);
+
+    return NULL;
+}
+
+static const command_rec hb_cmds[] = {
+    AP_INIT_TAKE1("HeartbeatAddress", cmd_hb_address, NULL, RSRC_CONF,
+                  "Address to send heartbeat requests"),
+    {NULL}
+};
+
+module AP_MODULE_DECLARE_DATA heartbeat_module = {
+    STANDARD20_MODULE_STUFF,
+    NULL,                       /* create per-directory config structure */
+    NULL,                       /* merge per-directory config structures */
+    hb_create_config,           /* create per-server config structure */
+    NULL,                       /* merge per-server config structures */
+    hb_cmds,                    /* command apr_table_t */
+    hb_register_hooks
+};

Propchange: httpd/httpd/trunk/modules/cluster/mod_heartbeat.c
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: httpd/httpd/trunk/modules/cluster/mod_heartbeat.c
------------------------------------------------------------------------------
    svn:keywords = Date Revision Author HeadURL Id

Propchange: httpd/httpd/trunk/modules/cluster/mod_heartbeat.c
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: httpd/httpd/trunk/modules/cluster/mod_heartmonitor.c
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/cluster/mod_heartmonitor.c?rev=721952&view=auto
==============================================================================
--- httpd/httpd/trunk/modules/cluster/mod_heartmonitor.c (added)
+++ httpd/httpd/trunk/modules/cluster/mod_heartmonitor.c Sun Nov 30 18:55:14 2008
@@ -0,0 +1,551 @@
+/* Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "httpd.h"
+#include "http_config.h"
+#include "http_log.h"
+#include "apr_strings.h"
+#include "apr_hash.h"
+#include "ap_mpm.h"
+#include "scoreboard.h"
+
+module AP_MODULE_DECLARE_DATA heartmonitor_module;
+
+typedef struct hm_server_t
+{
+    const char *ip;
+    int busy;
+    int ready;
+    apr_time_t seen;
+} hm_server_t;
+
+typedef struct hm_ctx_t
+{
+    int active;
+    const char *storage_path;
+    apr_proc_mutex_t *mutex;
+    const char *mutex_path;
+    apr_sockaddr_t *mcast_addr;
+    int status;
+    int keep_running;
+    apr_thread_mutex_t *start_mtx;
+    apr_thread_t *thread;
+    apr_socket_t *sock;
+    apr_pool_t *p;
+    apr_hash_t *servers;
+} hm_ctx_t;
+
+static apr_status_t hm_listen(hm_ctx_t *ctx)
+{
+    apr_status_t rv;
+
+    rv = apr_socket_create(&ctx->sock, ctx->mcast_addr->family,
+                           SOCK_DGRAM, APR_PROTO_UDP, ctx->p);
+
+    if (rv) {
+        ap_log_error(APLOG_MARK, APLOG_CRIT, rv, NULL,
+                     "Heartmonitor: Failed to create listening socket.");
+        return rv;
+    }
+
+    rv = apr_socket_opt_set(ctx->sock, APR_SO_REUSEADDR, 1);
+    if (rv) {
+        ap_log_error(APLOG_MARK, APLOG_CRIT, rv, NULL,
+                     "Heartmonitor: Failed to set APR_SO_REUSEADDR to 1 on socket.");
+        return rv;
+    }
+
+
+    rv = apr_socket_opt_set(ctx->sock, APR_SO_NONBLOCK, 1);
+    if (rv) {
+        ap_log_error(APLOG_MARK, APLOG_CRIT, rv, NULL,
+                     "Heartmonitor: Failed to set APR_SO_REUSEADDR to 1 on socket.");
+        return rv;
+    }
+
+    rv = apr_socket_bind(ctx->sock, ctx->mcast_addr);
+    if (rv) {
+        ap_log_error(APLOG_MARK, APLOG_CRIT, rv, NULL,
+                     "Heartmonitor: Failed to bind on socket.");
+        return rv;
+    }
+
+    rv = apr_mcast_join(ctx->sock, ctx->mcast_addr, NULL, NULL);
+
+    if (rv) {
+        ap_log_error(APLOG_MARK, APLOG_CRIT, rv, NULL,
+                     "Heartmonitor: Failed to join multicast group");
+        return rv;
+    }
+
+    rv = apr_mcast_loopback(ctx->sock, 1);
+    if (rv) {
+        ap_log_error(APLOG_MARK, APLOG_CRIT, rv, NULL,
+                     "Heartmonitor: Failed to accept localhost mulitcast on socket.");
+        return rv;
+    }
+
+    ctx->servers = apr_hash_make(ctx->p);
+
+    return APR_SUCCESS;
+}
+
+static void qs_to_table(const char *input, apr_table_t *parms,
+                        apr_pool_t *p)
+{
+    char *key;
+    char *value;
+    char *query_string;
+    char *strtok_state;
+
+    if (input == NULL) {
+        return;
+    }
+
+    query_string = apr_pstrdup(p, input);
+
+    key = apr_strtok(query_string, "&", &strtok_state);
+    while (key) {
+        value = strchr(key, '=');
+        if (value) {
+            *value = '\0';      /* Split the string in two */
+            value++;            /* Skip passed the = */
+        }
+        else {
+            value = "1";
+        }
+        ap_unescape_url(key);
+        ap_unescape_url(value);
+        apr_table_set(parms, key, value);
+        /*
+           ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r,
+           "Found query arg: %s = %s", key, value);
+         */
+        key = apr_strtok(NULL, "&", &strtok_state);
+    }
+}
+
+
+#define SEEN_TIMEOUT (30)
+
+static apr_status_t hm_update_stats(hm_ctx_t *ctx, apr_pool_t *p)
+{
+    apr_status_t rv;
+    apr_file_t *fp;
+    char *path = apr_pstrcat(p, ctx->storage_path, ".tmp.XXXXXX", NULL);
+    /* TODO: Update stats file (!) */
+    rv = apr_file_mktemp(&fp, path, APR_CREATE | APR_WRITE, p);
+
+    if (rv) {
+        ap_log_error(APLOG_MARK, APLOG_CRIT, rv, NULL,
+                     "Heartmonitor: Unable to open tmp file: %s", path);
+        return rv;
+    }
+
+    apr_hash_index_t *hi;
+    apr_time_t now = apr_time_now();
+    for (hi = apr_hash_first(p, ctx->servers);
+         hi != NULL; hi = apr_hash_next(hi)) {
+        hm_server_t *s = NULL;
+        apr_hash_this(hi, NULL, NULL, (void **) &s);
+        apr_uint32_t seen = apr_time_sec(now - s->seen);
+        if (seen > SEEN_TIMEOUT) {
+            /*
+             * Skip this entry from the heartbeat file -- when it comes back,
+             * we will reuse the memory...
+             */
+        }
+        else {
+            apr_file_printf(fp, "%s &ready=%u&busy=%u&lastseen=%u\n",
+                            s->ip, s->ready, s->busy, seen);
+        }
+    }
+
+    apr_file_close(fp);
+
+    rv = apr_file_perms_set(path,
+                            APR_FPROT_UREAD | APR_FPROT_GREAD |
+                            APR_FPROT_WREAD);
+    if (rv && rv != APR_INCOMPLETE) {
+        ap_log_error(APLOG_MARK, APLOG_CRIT, rv, NULL,
+                     "Heartmonitor: Unable to set file permssions on %s",
+                     path);
+        return rv;
+    }
+
+    rv = apr_file_rename(path, ctx->storage_path, p);
+
+    if (rv) {
+        ap_log_error(APLOG_MARK, APLOG_CRIT, rv, NULL,
+                     "Heartmonitor: Unable to move file: %s -> %s", path,
+                     ctx->storage_path);
+        return rv;
+    }
+
+    return APR_SUCCESS;
+}
+
+static hm_server_t *hm_get_server(hm_ctx_t *ctx, const char *ip)
+{
+    hm_server_t *s;
+
+    s = apr_hash_get(ctx->servers, ip, APR_HASH_KEY_STRING);
+
+    if (s == NULL) {
+        s = apr_palloc(ctx->p, sizeof(hm_server_t));
+        s->ip = apr_pstrdup(ctx->p, ip);
+        s->ready = 0;
+        s->busy = 0;
+        s->seen = 0;
+        apr_hash_set(ctx->servers, s->ip, APR_HASH_KEY_STRING, s);
+    }
+
+    return s;
+}
+
+#define MAX_MSG_LEN (1000)
+static apr_status_t hm_recv(hm_ctx_t *ctx, apr_pool_t *p)
+{
+    char buf[MAX_MSG_LEN + 1];
+    apr_sockaddr_t from;
+    from.pool = p;
+    apr_size_t len = MAX_MSG_LEN;
+    apr_status_t rv;
+
+    rv = apr_socket_recvfrom(&from, ctx->sock, 0, buf, &len);
+
+    if (APR_STATUS_IS_EAGAIN(rv)) {
+        ap_log_error(APLOG_MARK, APLOG_CRIT, rv, NULL,
+                     "Heartmonitor: would block");
+        return APR_SUCCESS;
+    }
+    else if (rv) {
+        ap_log_error(APLOG_MARK, APLOG_CRIT, rv, NULL,
+                     "Heartmonitor: recvfrom failed");
+        return rv;
+    }
+
+    buf[len] = '\0';
+
+    apr_table_t *tbl;
+
+    tbl = apr_table_make(p, 10);
+
+    qs_to_table(buf, tbl, p);
+
+    if (apr_table_get(tbl, "v") != NULL &&
+        apr_table_get(tbl, "busy") != NULL &&
+        apr_table_get(tbl, "ready") != NULL) {
+        char *ip;
+        /* TODO: REMOVE ME BEFORE PRODUCTION (????) */
+        ap_log_error(APLOG_MARK, APLOG_DEBUG, rv, NULL,
+                     "Heartmonitor: %pI busy=%s ready=%s", &from,
+                     apr_table_get(tbl, "busy"), apr_table_get(tbl, "ready"));
+
+        apr_sockaddr_ip_get(&ip, &from);
+
+        hm_server_t *s = hm_get_server(ctx, ip);
+
+        s->busy = atoi(apr_table_get(tbl, "busy"));
+        s->ready = atoi(apr_table_get(tbl, "ready"));
+        s->seen = apr_time_now();
+    }
+    else {
+        ap_log_error(APLOG_MARK, APLOG_CRIT, rv, NULL,
+                     "Heartmonitor: malformed multicast message from %pI",
+                     &from);
+    }
+
+    return rv;
+}
+
+
+#ifndef apr_time_from_msec
+#define apr_time_from_msec(x) (x * 1000)
+#endif
+
+static void *hm_worker(apr_thread_t *thd, void *data)
+{
+    hm_ctx_t *ctx = (hm_ctx_t *) data;
+    apr_status_t rv;
+
+    ctx->p = apr_thread_pool_get(thd);
+    ctx->status = 0;
+    ctx->keep_running = 1;
+    apr_thread_mutex_unlock(ctx->start_mtx);
+
+    while (ctx->keep_running) {
+        rv = apr_proc_mutex_trylock(ctx->mutex);
+        if (rv == APR_SUCCESS) {
+            break;
+        }
+        apr_sleep(apr_time_from_msec(200));
+    }
+
+    rv = hm_listen(ctx);
+
+    if (rv) {
+        ctx->status = rv;
+        ap_log_error(APLOG_MARK, APLOG_CRIT, rv, NULL,
+                     "Heartmonitor: Unable to listen for connections!");
+        apr_proc_mutex_unlock(ctx->mutex);
+        apr_thread_exit(ctx->thread, rv);
+        return NULL;
+    }
+
+
+    apr_time_t last = apr_time_now();
+    while (ctx->keep_running) {
+        int n;
+        apr_pool_t *p;
+        apr_pollfd_t pfd;
+        apr_interval_time_t timeout;
+        apr_pool_create(&p, ctx->p);
+
+        apr_time_t now = apr_time_now();
+
+        if (apr_time_sec((now - last)) > 5) {
+            hm_update_stats(ctx, p);
+            apr_pool_clear(p);
+            last = now;
+        }
+
+        pfd.desc_type = APR_POLL_SOCKET;
+        pfd.desc.s = ctx->sock;
+        pfd.p = p;
+        pfd.reqevents = APR_POLLIN;
+
+        timeout = apr_time_from_sec(1);
+
+        rv = apr_poll(&pfd, 1, &n, timeout);
+
+        if (!ctx->keep_running) {
+            break;
+        }
+
+        if (rv) {
+            apr_pool_destroy(p);
+            continue;
+        }
+
+        if (pfd.rtnevents & APR_POLLIN) {
+            hm_recv(ctx, p);
+        }
+
+        apr_pool_destroy(p);
+    }
+
+    apr_proc_mutex_unlock(ctx->mutex);
+    apr_thread_exit(ctx->thread, APR_SUCCESS);
+
+    return NULL;
+}
+
+static apr_status_t hm_pool_cleanup(void *baton)
+{
+    apr_status_t rv;
+    hm_ctx_t *ctx = (hm_ctx_t *) baton;
+
+    ctx->keep_running = 0;
+
+    apr_thread_join(&rv, ctx->thread);
+
+    return rv;
+}
+
+static void start_hm_worker(apr_pool_t *p, hm_ctx_t *ctx)
+{
+    apr_status_t rv;
+
+    rv = apr_thread_mutex_create(&ctx->start_mtx, APR_THREAD_MUTEX_UNNESTED,
+                                 p);
+
+    if (rv) {
+        ap_log_error(APLOG_MARK, APLOG_CRIT, rv, NULL,
+                     "Heartmonitor: apr_thread_cond_create failed");
+        ctx->status = rv;
+        return;
+    }
+
+    apr_thread_mutex_lock(ctx->start_mtx);
+
+    apr_pool_cleanup_register(p, ctx, hm_pool_cleanup, apr_pool_cleanup_null);
+
+    rv = apr_thread_create(&ctx->thread, NULL, hm_worker, ctx, p);
+    if (rv) {
+        apr_pool_cleanup_kill(p, ctx, hm_pool_cleanup);
+        ap_log_error(APLOG_MARK, APLOG_CRIT, rv, NULL,
+                     "Heartmonitor: apr_thread_create failed");
+        ctx->status = rv;
+    }
+
+    apr_thread_mutex_lock(ctx->start_mtx);
+    apr_thread_mutex_unlock(ctx->start_mtx);
+    apr_thread_mutex_destroy(ctx->start_mtx);
+}
+
+static void hm_child_init(apr_pool_t *p, server_rec *s)
+{
+    hm_ctx_t *ctx =
+        ap_get_module_config(s->module_config, &heartmonitor_module);
+
+    apr_proc_mutex_child_init(&ctx->mutex, ctx->mutex_path, p);
+
+    ap_log_error(APLOG_MARK, APLOG_NOTICE, 0, s,
+                 "Heartmonitor: Starting Listener Thread. mcast=%pI",
+                 ctx->mcast_addr);
+
+    ctx->status = -1;
+
+    start_hm_worker(p, ctx);
+
+    if (ctx->status != 0) {
+        ap_log_error(APLOG_MARK, APLOG_CRIT, 0, s,
+                     "Heartmonitor: Failed to start listener thread.");
+        return;
+    }
+
+    return;
+}
+
+static int hm_post_config(apr_pool_t *p, apr_pool_t *plog,
+                          apr_pool_t *ptemp, server_rec *s)
+{
+    hm_ctx_t *ctx = ap_get_module_config(s->module_config,
+                                         &heartmonitor_module);
+
+    apr_status_t rv = apr_proc_mutex_create(&ctx->mutex,
+                                            ctx->mutex_path,
+#if APR_HAS_FCNTL_SERIALIZE
+
+                                            APR_LOCK_FCNTL,
+#else
+#if APR_HAS_FLOCK_SERIALIZE
+                                            APR_LOCK_FLOCK,
+#else
+#error port me to a non crap platform.
+#endif
+#endif
+                                            p);
+
+    if (rv) {
+        ap_log_error(APLOG_MARK, APLOG_CRIT, rv, s,
+                     "Heartmonitor: Failed to create listener "
+                     "mutex at %s (type=%s)", ctx->mutex_path,
+                     apr_proc_mutex_defname());
+        return !OK;
+    }
+
+    return OK;
+}
+
+static void hm_register_hooks(apr_pool_t *p)
+{
+    ap_hook_post_config(hm_post_config, NULL, NULL, APR_HOOK_MIDDLE);
+    ap_hook_child_init(hm_child_init, NULL, NULL, APR_HOOK_MIDDLE);
+}
+
+static void *hm_create_config(apr_pool_t *p, server_rec *s)
+{
+    hm_ctx_t *ctx = (hm_ctx_t *) apr_palloc(p, sizeof(hm_ctx_t));
+
+    ctx->active = 0;
+    ctx->storage_path = ap_server_root_relative(p, "logs/hb.dat");
+
+    return ctx;
+}
+
+static const char *cmd_hm_storage(cmd_parms *cmd,
+                                  void *dconf, const char *path)
+{
+    apr_pool_t *p = cmd->pool;
+    hm_ctx_t *ctx =
+        (hm_ctx_t *) ap_get_module_config(cmd->server->module_config,
+                                          &heartmonitor_module);
+    const char *err = ap_check_cmd_context(cmd, GLOBAL_ONLY);
+
+    if (err != NULL) {
+        return err;
+    }
+
+    ctx->storage_path = ap_server_root_relative(p, path);
+    ctx->mutex_path =
+        ap_server_root_relative(p, apr_pstrcat(p, path, ".hm-lock", NULL));
+
+    return NULL;
+}
+
+static const char *cmd_hm_listen(cmd_parms *cmd,
+                                 void *dconf, const char *mcast_addr)
+{
+    apr_status_t rv;
+    char *host_str;
+    char *scope_id;
+    apr_port_t port = 0;
+    apr_pool_t *p = cmd->pool;
+    hm_ctx_t *ctx =
+        (hm_ctx_t *) ap_get_module_config(cmd->server->module_config,
+                                          &heartmonitor_module);
+    const char *err = ap_check_cmd_context(cmd, GLOBAL_ONLY);
+
+    if (err != NULL) {
+        return err;
+    }
+
+    ctx->active = 1;
+
+    rv = apr_parse_addr_port(&host_str, &scope_id, &port, mcast_addr, p);
+
+    if (rv) {
+        return "HeartbeatListen: Unable to parse multicast address.";
+    }
+
+    if (host_str == NULL) {
+        return "HeartbeatListen: No host provided in multicast address";
+    }
+
+    if (port == 0) {
+        return "HeartbeatListen: No port provided in multicast address";
+    }
+
+    rv = apr_sockaddr_info_get(&ctx->mcast_addr, host_str, APR_INET, port, 0,
+                               p);
+
+    if (rv) {
+        return
+            "HeartbeatListen: apr_sockaddr_info_get failed on multicast address";
+    }
+
+    return NULL;
+}
+
+static const command_rec hm_cmds[] = {
+    AP_INIT_TAKE1("HeartbeatListen", cmd_hm_listen, NULL, RSRC_CONF,
+                  "Address to listen for heartbeat requests"),
+    AP_INIT_TAKE1("HeartbeatStorage", cmd_hm_storage, NULL, RSRC_CONF,
+                  "Path to store heartbeat data."),
+    {NULL}
+};
+
+module AP_MODULE_DECLARE_DATA heartmonitor_module = {
+    STANDARD20_MODULE_STUFF,
+    NULL,                       /* create per-directory config structure */
+    NULL,                       /* merge per-directory config structures */
+    hm_create_config,           /* create per-server config structure */
+    NULL,                       /* merge per-server config structures */
+    hm_cmds,                    /* command apr_table_t */
+    hm_register_hooks
+};

Propchange: httpd/httpd/trunk/modules/cluster/mod_heartmonitor.c
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: httpd/httpd/trunk/modules/cluster/mod_heartmonitor.c
------------------------------------------------------------------------------
    svn:keywords = Date Revision Author HeadURL Id

Propchange: httpd/httpd/trunk/modules/cluster/mod_heartmonitor.c
------------------------------------------------------------------------------
    svn:mime-type = text/plain



Mime
View raw message