apr-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pque...@apache.org
Subject svn commit: r428931 - in /apr/apr-util/trunk: include/apr_memcache.h memcache/apr_memcache.c
Date Sat, 05 Aug 2006 00:34:25 GMT
Author: pquerna
Date: Fri Aug  4 17:34:25 2006
New Revision: 428931

URL: http://svn.apache.org/viewvc?rev=428931&view=rev
Log:
Add memcache multi-get support to apr_memcache.

* include/apr_memcache.h
    Add _value_t structure for holding an individual value from
    memcached.
    Add new functions: apr_memcache_add_multget_key,
    apr_memcache_multgetp.

* memcache/apr_memcache.c:
    Add a local baton structure for associating severs to queries.
    Add new functions: apr_memcache_add_multget_key,
    mget_conn_result, apr_memcache_multgetp

Submitted By: Rob Emanuele <rob.emanuele ask.com>
              Paul Querna <paul.querna ask.com>

Modified:
    apr/apr-util/trunk/include/apr_memcache.h
    apr/apr-util/trunk/memcache/apr_memcache.c

Modified: apr/apr-util/trunk/include/apr_memcache.h
URL: http://svn.apache.org/viewvc/apr/apr-util/trunk/include/apr_memcache.h?rev=428931&r1=428930&r2=428931&view=diff
==============================================================================
--- apr/apr-util/trunk/include/apr_memcache.h (original)
+++ apr/apr-util/trunk/include/apr_memcache.h Fri Aug  4 17:34:25 2006
@@ -35,6 +35,7 @@
 #include "apr_ring.h"
 #include "apr_buckets.h"
 #include "apr_reslist.h"
+#include "apr_hash.h"
 
 #ifdef __cplusplus
 extern "C" {
@@ -85,6 +86,16 @@
     apr_pool_t *p; /** Pool to use for allocations */
 } apr_memcache_t;
 
+/** Returned Data from a multiple get */
+typedef struct
+{
+    apr_status_t status;
+    const char* key;
+    apr_size_t len;
+    char *data;
+    apr_uint16_t flags;
+} apr_memcache_value_t;
+
 /**
  * Creates a crc32 hash used to split keys between servers
  * @param data Data to be hashed
@@ -193,6 +204,36 @@
                                             char **baton,
                                             apr_size_t *len,
                                             apr_uint16_t *flags);
+
+
+/**
+ * Add a key to a hash for a multiget query
+ *  if the hash (*value) is NULL it will be created
+ * @param data_pool pool from where the hash and their items are created from
+ * @param key null terminated string containing the key
+ * @param values hash of keys and values that this key will be added to
+ * @return
+ */
+APR_DECLARE(void) 
+apr_memcache_add_multget_key(apr_pool_t *data_pool,
+                             const char* key,
+                             apr_hash_t **values);
+
+/**
+ * Gets multiple values from the server, allocating the values out of p
+ * @param mc client to use
+ * @param temp_pool Pool used for tempoary allocations. May be cleared inside this
+ *        call.
+ * @param data_pool Pool used to allocate data for the returned values.
+ * @param values hash of apr_memcache_value_t keyed by strings, contains the
+ *        result of the multiget call.
+ * @return
+ */
+APR_DECLARE(apr_status_t)
+apr_memcache_multgetp(apr_memcache_t *mc,
+                      apr_pool_t *temp_pool,
+                      apr_pool_t *data_pool,
+                      apr_hash_t *values);
 
 /**
  * Sets a value by key on the server

Modified: apr/apr-util/trunk/memcache/apr_memcache.c
URL: http://svn.apache.org/viewvc/apr/apr-util/trunk/memcache/apr_memcache.c?rev=428931&r1=428930&r2=428931&view=diff
==============================================================================
--- apr/apr-util/trunk/memcache/apr_memcache.c (original)
+++ apr/apr-util/trunk/memcache/apr_memcache.c Fri Aug  4 17:34:25 2006
@@ -17,6 +17,7 @@
  */
 
 #include "apr_memcache.h"
+#include "apr_poll.h"
 #include "apr_version.h"
 #include <stdlib.h>
 
@@ -38,6 +39,9 @@
 #define MC_EOL "\r\n"
 #define MC_EOL_LEN (sizeof(MC_EOL)-1)
 
+#define MC_WS " "
+#define MC_WS_LEN (sizeof(MC_WS)-1)
+
 #define MC_GET "get "
 #define MC_GET_LEN (sizeof(MC_GET)-1)
 
@@ -97,6 +101,15 @@
 #define MS_END "END"
 #define MS_END_LEN (sizeof(MS_END)-1)
 
+/** Server and Query Structure for a multiple get */
+struct cache_server_query_t {
+    apr_memcache_server_t* ms;
+    apr_memcache_conn_t* conn;
+    struct iovec* query_vec;
+    unsigned int query_vec_count;
+};
+
+#define MULT_GET_TIMEOUT 50000
 
 static apr_status_t make_server_dead(apr_memcache_t *mc, apr_memcache_server_t *ms)
 {
@@ -1019,6 +1032,334 @@
     ms_release_conn(ms, conn);
     return rv;
 }
+
+
+APR_DECLARE(void) 
+apr_memcache_add_multget_key(apr_pool_t *data_pool,
+                             const char* key,
+                             apr_hash_t **values)
+{
+    apr_memcache_value_t* value;
+    int klen = strlen(key);
+
+    // create the value hash if need be
+    if (!*values) {
+        *values = apr_hash_make(data_pool);
+    }
+
+    // init key and add it to the value hash
+    value = apr_pcalloc(data_pool, sizeof(apr_memcache_value_t));
+
+    value->status = APR_NOTFOUND;
+    value->key = apr_pstrdup(data_pool, key);
+
+    apr_hash_set(*values, value->key, klen, value);
+}
+
+static void mget_conn_result(int up,
+                             apr_status_t rv,
+                             apr_memcache_t *mc,
+                             apr_memcache_server_t *ms,
+                             apr_memcache_conn_t *conn,
+                             struct cache_server_query_t *server_query,
+                             apr_hash_t *values,
+                             apr_hash_t *server_queries)
+{
+    int j;
+    apr_memcache_value_t* value;
+    
+    if (!up) {
+        ms_bad_conn(ms, conn);
+        apr_memcache_disable_server(mc, ms);
+    }
+    
+    for (j = 1; j < server_query->query_vec_count ; j+=2) {
+        if (server_query->query_vec[j].iov_base) {
+            value = apr_hash_get(values, server_query->query_vec[j].iov_base,
+                                 strlen(server_query->query_vec[j].iov_base));
+            
+            if (value->status == APR_NOTFOUND) {
+                value->status = rv;
+            }
+        }
+    }
+
+    ms_release_conn(ms, conn);
+    
+    apr_hash_set(server_queries, &ms, sizeof(ms), NULL);
+}
+
+APR_DECLARE(apr_status_t)
+apr_memcache_multgetp(apr_memcache_t *mc,
+                      apr_pool_t *temp_pool,
+                      apr_pool_t *data_pool,
+                      apr_hash_t *values)
+{
+    apr_status_t rv;
+    apr_memcache_server_t* ms;
+    apr_memcache_conn_t* conn;
+    apr_uint32_t hash;
+    apr_size_t written;
+    int klen;
+
+    apr_memcache_value_t* value;
+    apr_hash_index_t* value_hash_index;
+
+    /* this is a little over aggresive, but beats multiple loops
+     * to figure out how long each vector needs to be per-server.
+     */
+    unsigned int veclen = 2 + 2 * apr_hash_count(values) - 1; /* get <key>[<space><key>...]\r\n
*/
+    unsigned int i, j;
+    unsigned int queries_sent;
+    apr_int32_t queries_recvd;
+
+    apr_hash_t * server_queries = apr_hash_make(temp_pool);
+    struct cache_server_query_t* server_query;
+    apr_hash_index_t * query_hash_index;
+
+    apr_pollset_t* pollset;
+    const apr_pollfd_t* activefds;
+    apr_pollfd_t* pollfds;
+
+
+    // build all the queries
+    value_hash_index = apr_hash_first(temp_pool, values);
+    while (value_hash_index) {
+        apr_hash_this(value_hash_index, NULL, NULL, (void**)&value);
+        value_hash_index = apr_hash_next(value_hash_index);
+        klen = strlen(value->key);
+
+        hash = apr_memcache_hash(value->key, klen);
+        ms = apr_memcache_find_server_hash(mc, hash);
+        if (ms == NULL) {
+            continue;
+        }
+
+        server_query = apr_hash_get(server_queries, &ms, sizeof(ms));
+
+        if (!server_query) {
+            rv = ms_find_conn(ms, &conn);
+
+            if (rv != APR_SUCCESS) {
+                apr_memcache_disable_server(mc, ms);
+                value->status = rv;
+                continue;
+            }
+
+            server_query = apr_pcalloc(temp_pool,sizeof(struct cache_server_query_t));
+
+            apr_hash_set(server_queries, &ms, sizeof(ms), server_query);
+
+            server_query->ms = ms;
+            server_query->conn = conn;
+            server_query->query_vec = apr_pcalloc(temp_pool, sizeof(struct iovec)*veclen);
+
+            // set up the first key
+            server_query->query_vec[0].iov_base = MC_GET;
+            server_query->query_vec[0].iov_len  = MC_GET_LEN;
+
+            server_query->query_vec[1].iov_base = (void*)(value->key);
+            server_query->query_vec[1].iov_len  = klen;
+
+            server_query->query_vec[2].iov_base = MC_EOL;
+            server_query->query_vec[2].iov_len  = MC_EOL_LEN;
+
+            server_query->query_vec_count = 3;
+        }
+        else {
+            j = server_query->query_vec_count - 1;
+
+            server_query->query_vec[j].iov_base = MC_WS;
+            server_query->query_vec[j].iov_len  = MC_WS_LEN;
+            j++;
+
+            server_query->query_vec[j].iov_base = (void*)(value->key);
+            server_query->query_vec[j].iov_len  = klen;
+            j++;
+
+            server_query->query_vec[j].iov_base = MC_EOL;
+            server_query->query_vec[j].iov_len  = MC_EOL_LEN;
+            j++;
+
+           server_query->query_vec_count = j;
+        }
+    }
+
+    // create polling structures
+    pollfds = apr_pcalloc(temp_pool, apr_hash_count(server_queries) * sizeof(apr_pollfd_t));
+    
+    rv = apr_pollset_create(&pollset, apr_hash_count(server_queries), temp_pool, 0);
+
+    if (rv != APR_SUCCESS) {
+        return rv;
+    }
+
+    // send all the queries
+    queries_sent = 0;
+    query_hash_index = apr_hash_first(temp_pool, server_queries);
+
+    while (query_hash_index) {
+        apr_hash_this(query_hash_index, NULL, NULL, (void**)&server_query);
+        query_hash_index = apr_hash_next(query_hash_index);
+
+        conn = server_query->conn;
+        ms = server_query->ms;
+
+        for (i = 0, rv = APR_SUCCESS; i < veclen && rv == APR_SUCCESS; i += IOV_MAX)
{
+            rv = apr_socket_sendv(conn->sock, &(server_query->query_vec[i]),
+                                  veclen-i>IOV_MAX ? IOV_MAX : veclen-i , &written);
+        }
+
+        if (rv != APR_SUCCESS) {
+            mget_conn_result(FALSE, rv, mc, ms, conn,
+                             server_query, values, server_queries);
+            continue;
+        }
+
+        pollfds[queries_sent].desc_type = APR_POLL_SOCKET;
+        pollfds[queries_sent].reqevents = APR_POLLIN;
+        pollfds[queries_sent].p = temp_pool;
+        pollfds[queries_sent].desc.s = conn->sock;
+        pollfds[queries_sent].client_data = (void *)server_query;
+        apr_pollset_add (pollset, &pollfds[queries_sent]);
+
+        queries_sent++;
+    }
+
+    while (queries_sent) {
+        rv = apr_pollset_poll(pollset, MULT_GET_TIMEOUT, &queries_recvd, &activefds);
+
+        if (rv != APR_SUCCESS) {
+            // timeout
+            queries_sent = 0;
+            continue;
+        }
+        for (i = 0; i < queries_recvd; i++) {
+            server_query = activefds[i].client_data;
+            conn = server_query->conn;
+            ms = server_query->ms;
+
+           rv = get_server_line(conn);
+
+           if (rv != APR_SUCCESS) {
+               apr_pollset_remove (pollset, &activefds[i]);
+               mget_conn_result(FALSE, rv, mc, ms, conn,
+                                server_query, values, server_queries);
+               queries_sent--;
+               continue;
+           }
+
+           if (strncmp(MS_VALUE, conn->buffer, MS_VALUE_LEN) == 0) {
+               char *key;
+               char *flags;
+               char *length;
+               char *start;
+               char *last;
+               char *data;
+               apr_size_t len;
+
+               start = conn->buffer;
+               key = apr_strtok(conn->buffer, " ", &last); // just the VALUE, ignore
+               key = apr_strtok(NULL, " ", &last);
+               flags = apr_strtok(NULL, " ", &last);
+
+
+               length = apr_strtok(NULL, " ", &last);
+               len = atoi(length);
+
+               value = apr_hash_get(values, key, strlen(key));
+
+               
+               if (value) {
+                   if (len > 0)  {
+                       apr_bucket_brigade *bbb;
+                       apr_bucket *e;
+                       
+                       /* eat the trailing \r\n */
+                       rv = apr_brigade_partition(conn->bb, len+2, &e);
+                       
+                       if (rv != APR_SUCCESS) {
+                           apr_pollset_remove (pollset, &activefds[i]);
+                           mget_conn_result(FALSE, rv, mc, ms, conn,
+                                            server_query, values, server_queries);
+                           queries_sent--;
+                           continue;
+                       }
+                       
+                       bbb = apr_brigade_split(conn->bb, e);
+                       
+                       rv = apr_brigade_pflatten(conn->bb, &data, &len, data_pool);
+                       
+                       if (rv != APR_SUCCESS) {
+                           apr_pollset_remove (pollset, &activefds[i]);
+                           mget_conn_result(FALSE, rv, mc, ms, conn,
+                                            server_query, values, server_queries);
+                           queries_sent--;
+                           continue;
+                       }
+                       
+                       rv = apr_brigade_destroy(conn->bb);
+                       if (rv != APR_SUCCESS) {
+                           apr_pollset_remove (pollset, &activefds[i]);
+                           mget_conn_result(FALSE, rv, mc, ms, conn,
+                                            server_query, values, server_queries);
+                           queries_sent--;
+                           continue;
+                       }
+                       
+                       conn->bb = bbb;
+                       
+                       value->len = len - 2;
+                       data[value->len] = '\0';
+                       value->data = data;
+                   }
+                   
+                   value->status = rv;
+                   value->flags = atoi(flags);
+                   
+                   // stay on the server
+                   i--;
+                   
+               }
+               else {
+                   // TODO: Server Sent back a key I didn't ask for or my hash is corrupt
+               }
+           }
+           else if (strncmp(MS_END, conn->buffer, MS_END_LEN) == 0) {
+               // this connection is done
+               ms_release_conn(ms, conn);
+               apr_hash_set(server_queries, &ms, sizeof(ms), NULL);
+               
+               apr_pollset_remove (pollset, &activefds[i]);
+               queries_sent--;
+           }
+           else {
+               /* unknown reply? */
+               rv = APR_EGENERAL;
+           }
+           
+        } /* /for */
+    } /* /while */
+    
+    query_hash_index = apr_hash_first(temp_pool, server_queries);
+    while (query_hash_index) {
+        apr_hash_this(query_hash_index, NULL, NULL, (void**)&server_query);
+        query_hash_index = apr_hash_next(query_hash_index);
+        
+        conn = server_query->conn;
+        ms = server_query->ms;
+        
+        mget_conn_result(TRUE, rv, mc, ms, conn,
+                         server_query, values, server_queries);
+        continue;
+    }
+    
+    apr_pool_clear(temp_pool);
+    apr_pollset_destroy(pollset);
+    return APR_SUCCESS;
+    
+}
+
 
 
 /**



Mime
View raw message