apr-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Paul Querna <c...@force-elite.com>
Subject Re: [PATCH] APR Memcache Multi-Get Support
Date Sat, 05 Aug 2006 00:35:34 GMT
Since no one seemed to object, I have committed this patch to trunk in
r428931.

-Paul

Paul Querna wrote:
> Attached is a patch to add support for multiple parallel fetching of
> keys from a memcache cluster.  It enables fetching of thousands of
> values from multiple memcache nodes without any significant slow downs
> for adding more nodes or keys.
> 
> The basic logic behind the main function, apr_memcache_multgetp, is
> currently pounding away with thousands of concurrent queries per second,
> without any problem.  I wouldn't call the function itself 'pretty', but
> I believe the logic embedded in it is correct.
> 
> [[[
> Add memcache multi-get support to apr_memcache.
> 
> * include/apr_memcache.h
>     Add _value_t structure for holding an individual value from
>     memcached.
>     Add new functions: apr_memcache_add_multget_key,
>     apr_memcache_multgetp.
> 
> * memcache/apr_memcache.c:
>     Add a local baton structure for associating severs to queries.
>     Add new functions: apr_memcache_add_multget_key,
>     mget_conn_result, apr_memcache_multgetp
> 
> Submitted By: Rob Emanuele <rob.emanuele ask.com>, Paul Querna
> <paul.querna ask.com>
> ]]]
> 
> 
> ------------------------------------------------------------------------
> 
> Index: memcache/apr_memcache.c
> ===================================================================
> --- memcache/apr_memcache.c	(revision 428526)
> +++ memcache/apr_memcache.c	(working copy)
> @@ -17,6 +17,7 @@
>   */
>  
>  #include "apr_memcache.h"
> +#include "apr_poll.h"
>  #include "apr_version.h"
>  #include <stdlib.h>
>  
> @@ -38,6 +39,9 @@
>  #define MC_EOL "\r\n"
>  #define MC_EOL_LEN (sizeof(MC_EOL)-1)
>  
> +#define MC_WS " "
> +#define MC_WS_LEN (sizeof(MC_WS)-1)
> +
>  #define MC_GET "get "
>  #define MC_GET_LEN (sizeof(MC_GET)-1)
>  
> @@ -97,7 +101,16 @@
>  #define MS_END "END"
>  #define MS_END_LEN (sizeof(MS_END)-1)
>  
> +/** Server and Query Structure for a multiple get */
> +struct cache_server_query_t {
> +    apr_memcache_server_t* ms;
> +    apr_memcache_conn_t* conn;
> +    struct iovec* query_vec;
> +    unsigned int query_vec_count;
> +};
>  
> +#define MULT_GET_TIMEOUT 50000
> +
>  static apr_status_t make_server_dead(apr_memcache_t *mc, apr_memcache_server_t *ms)
>  {
>  #if APR_HAS_THREADS
> @@ -1021,6 +1034,334 @@
>  }
>  
>  
> +APR_DECLARE(void) 
> +apr_memcache_add_multget_key(apr_pool_t *data_pool,
> +                             const char* key,
> +                             apr_hash_t **values)
> +{
> +    apr_memcache_value_t* value;
> +    int klen = strlen(key);
> +
> +    // create the value hash if need be
> +    if (!*values) {
> +        *values = apr_hash_make(data_pool);
> +    }
> +
> +    // init key and add it to the value hash
> +    value = apr_pcalloc(data_pool, sizeof(apr_memcache_value_t));
> +
> +    value->status = APR_NOTFOUND;
> +    value->key = apr_pstrdup(data_pool, key);
> +
> +    apr_hash_set(*values, value->key, klen, value);
> +}
> +
> +static void mget_conn_result(int up,
> +                             apr_status_t rv,
> +                             apr_memcache_t *mc,
> +                             apr_memcache_server_t *ms,
> +                             apr_memcache_conn_t *conn,
> +                             struct cache_server_query_t *server_query,
> +                             apr_hash_t *values,
> +                             apr_hash_t *server_queries)
> +{
> +    int j;
> +    apr_memcache_value_t* value;
> +    
> +    if (!up) {
> +        ms_bad_conn(ms, conn);
> +        apr_memcache_disable_server(mc, ms);
> +    }
> +    
> +    for (j = 1; j < server_query->query_vec_count ; j+=2) {
> +        if (server_query->query_vec[j].iov_base) {
> +            value = apr_hash_get(values, server_query->query_vec[j].iov_base,
> +                                 strlen(server_query->query_vec[j].iov_base));
> +            
> +            if (value->status == APR_NOTFOUND) {
> +                value->status = rv;
> +            }
> +        }
> +    }
> +
> +    ms_release_conn(ms, conn);
> +    
> +    apr_hash_set(server_queries, &ms, sizeof(ms), NULL);
> +}
> +
> +APR_DECLARE(apr_status_t)
> +apr_memcache_multgetp(apr_memcache_t *mc,
> +                      apr_pool_t *temp_pool,
> +                      apr_pool_t *data_pool,
> +                      apr_hash_t *values)
> +{
> +    apr_status_t rv;
> +    apr_memcache_server_t* ms;
> +    apr_memcache_conn_t* conn;
> +    apr_uint32_t hash;
> +    apr_size_t written;
> +    int klen;
> +
> +    apr_memcache_value_t* value;
> +    apr_hash_index_t* value_hash_index;
> +
> +    /* this is a little over aggresive, but beats multiple loops
> +     * to figure out how long each vector needs to be per-server.
> +     */
> +    unsigned int veclen = 2 + 2 * apr_hash_count(values) - 1; /* get <key>[<space><key>...]\r\n
*/
> +    unsigned int i, j;
> +    unsigned int queries_sent;
> +    apr_int32_t queries_recvd;
> +
> +    apr_hash_t * server_queries = apr_hash_make(temp_pool);
> +    struct cache_server_query_t* server_query;
> +    apr_hash_index_t * query_hash_index;
> +
> +    apr_pollset_t* pollset;
> +    const apr_pollfd_t* activefds;
> +    apr_pollfd_t* pollfds;
> +
> +
> +    // build all the queries
> +    value_hash_index = apr_hash_first(temp_pool, values);
> +    while (value_hash_index) {
> +        apr_hash_this(value_hash_index, NULL, NULL, (void**)&value);
> +        value_hash_index = apr_hash_next(value_hash_index);
> +        klen = strlen(value->key);
> +
> +        hash = apr_memcache_hash(value->key, klen);
> +        ms = apr_memcache_find_server_hash(mc, hash);
> +        if (ms == NULL) {
> +            continue;
> +        }
> +
> +        server_query = apr_hash_get(server_queries, &ms, sizeof(ms));
> +
> +        if (!server_query) {
> +            rv = ms_find_conn(ms, &conn);
> +
> +            if (rv != APR_SUCCESS) {
> +                apr_memcache_disable_server(mc, ms);
> +                value->status = rv;
> +                continue;
> +            }
> +
> +            server_query = apr_pcalloc(temp_pool,sizeof(struct cache_server_query_t));
> +
> +            apr_hash_set(server_queries, &ms, sizeof(ms), server_query);
> +
> +            server_query->ms = ms;
> +            server_query->conn = conn;
> +            server_query->query_vec = apr_pcalloc(temp_pool, sizeof(struct iovec)*veclen);
> +
> +            // set up the first key
> +            server_query->query_vec[0].iov_base = MC_GET;
> +            server_query->query_vec[0].iov_len  = MC_GET_LEN;
> +
> +            server_query->query_vec[1].iov_base = (void*)(value->key);
> +            server_query->query_vec[1].iov_len  = klen;
> +
> +            server_query->query_vec[2].iov_base = MC_EOL;
> +            server_query->query_vec[2].iov_len  = MC_EOL_LEN;
> +
> +            server_query->query_vec_count = 3;
> +        }
> +        else {
> +            j = server_query->query_vec_count - 1;
> +
> +            server_query->query_vec[j].iov_base = MC_WS;
> +            server_query->query_vec[j].iov_len  = MC_WS_LEN;
> +            j++;
> +
> +            server_query->query_vec[j].iov_base = (void*)(value->key);
> +            server_query->query_vec[j].iov_len  = klen;
> +            j++;
> +
> +            server_query->query_vec[j].iov_base = MC_EOL;
> +            server_query->query_vec[j].iov_len  = MC_EOL_LEN;
> +            j++;
> +
> +           server_query->query_vec_count = j;
> +        }
> +    }
> +
> +    // create polling structures
> +    pollfds = apr_pcalloc(temp_pool, apr_hash_count(server_queries) * sizeof(apr_pollfd_t));
> +    
> +    rv = apr_pollset_create(&pollset, apr_hash_count(server_queries), temp_pool,
0);
> +
> +    if (rv != APR_SUCCESS) {
> +        return rv;
> +    }
> +
> +    // send all the queries
> +    queries_sent = 0;
> +    query_hash_index = apr_hash_first(temp_pool, server_queries);
> +
> +    while (query_hash_index) {
> +        apr_hash_this(query_hash_index, NULL, NULL, (void**)&server_query);
> +        query_hash_index = apr_hash_next(query_hash_index);
> +
> +        conn = server_query->conn;
> +        ms = server_query->ms;
> +
> +        for (i = 0, rv = APR_SUCCESS; i < veclen && rv == APR_SUCCESS; i
+= IOV_MAX) {
> +            rv = apr_socket_sendv(conn->sock, &(server_query->query_vec[i]),
> +                                  veclen-i>IOV_MAX ? IOV_MAX : veclen-i , &written);
> +        }
> +
> +        if (rv != APR_SUCCESS) {
> +            mget_conn_result(FALSE, rv, mc, ms, conn,
> +                             server_query, values, server_queries);
> +            continue;
> +        }
> +
> +        pollfds[queries_sent].desc_type = APR_POLL_SOCKET;
> +        pollfds[queries_sent].reqevents = APR_POLLIN;
> +        pollfds[queries_sent].p = temp_pool;
> +        pollfds[queries_sent].desc.s = conn->sock;
> +        pollfds[queries_sent].client_data = (void *)server_query;
> +        apr_pollset_add (pollset, &pollfds[queries_sent]);
> +
> +        queries_sent++;
> +    }
> +
> +    while (queries_sent) {
> +        rv = apr_pollset_poll(pollset, MULT_GET_TIMEOUT, &queries_recvd, &activefds);
> +
> +        if (rv != APR_SUCCESS) {
> +            // timeout
> +            queries_sent = 0;
> +            continue;
> +        }
> +        for (i = 0; i < queries_recvd; i++) {
> +            server_query = activefds[i].client_data;
> +            conn = server_query->conn;
> +            ms = server_query->ms;
> +
> +           rv = get_server_line(conn);
> +
> +           if (rv != APR_SUCCESS) {
> +               apr_pollset_remove (pollset, &activefds[i]);
> +               mget_conn_result(FALSE, rv, mc, ms, conn,
> +                                server_query, values, server_queries);
> +               queries_sent--;
> +               continue;
> +           }
> +
> +           if (strncmp(MS_VALUE, conn->buffer, MS_VALUE_LEN) == 0) {
> +               char *key;
> +               char *flags;
> +               char *length;
> +               char *start;
> +               char *last;
> +               char *data;
> +               apr_size_t len;
> +
> +               start = conn->buffer;
> +               key = apr_strtok(conn->buffer, " ", &last); // just the VALUE,
ignore
> +               key = apr_strtok(NULL, " ", &last);
> +               flags = apr_strtok(NULL, " ", &last);
> +
> +
> +               length = apr_strtok(NULL, " ", &last);
> +               len = atoi(length);
> +
> +               value = apr_hash_get(values, key, strlen(key));
> +
> +               
> +               if (value) {
> +                   if (len > 0)  {
> +                       apr_bucket_brigade *bbb;
> +                       apr_bucket *e;
> +                       
> +                       /* eat the trailing \r\n */
> +                       rv = apr_brigade_partition(conn->bb, len+2, &e);
> +                       
> +                       if (rv != APR_SUCCESS) {
> +                           apr_pollset_remove (pollset, &activefds[i]);
> +                           mget_conn_result(FALSE, rv, mc, ms, conn,
> +                                            server_query, values, server_queries);
> +                           queries_sent--;
> +                           continue;
> +                       }
> +                       
> +                       bbb = apr_brigade_split(conn->bb, e);
> +                       
> +                       rv = apr_brigade_pflatten(conn->bb, &data, &len, data_pool);
> +                       
> +                       if (rv != APR_SUCCESS) {
> +                           apr_pollset_remove (pollset, &activefds[i]);
> +                           mget_conn_result(FALSE, rv, mc, ms, conn,
> +                                            server_query, values, server_queries);
> +                           queries_sent--;
> +                           continue;
> +                       }
> +                       
> +                       rv = apr_brigade_destroy(conn->bb);
> +                       if (rv != APR_SUCCESS) {
> +                           apr_pollset_remove (pollset, &activefds[i]);
> +                           mget_conn_result(FALSE, rv, mc, ms, conn,
> +                                            server_query, values, server_queries);
> +                           queries_sent--;
> +                           continue;
> +                       }
> +                       
> +                       conn->bb = bbb;
> +                       
> +                       value->len = len - 2;
> +                       data[value->len] = '\0';
> +                       value->data = data;
> +                   }
> +                   
> +                   value->status = rv;
> +                   value->flags = atoi(flags);
> +                   
> +                   // stay on the server
> +                   i--;
> +                   
> +               }
> +               else {
> +                   // TODO: Server Sent back a key I didn't ask for or my hash is corrupt
> +               }
> +           }
> +           else if (strncmp(MS_END, conn->buffer, MS_END_LEN) == 0) {
> +               // this connection is done
> +               ms_release_conn(ms, conn);
> +               apr_hash_set(server_queries, &ms, sizeof(ms), NULL);
> +               
> +               apr_pollset_remove (pollset, &activefds[i]);
> +               queries_sent--;
> +           }
> +           else {
> +               /* unknown reply? */
> +               rv = APR_EGENERAL;
> +           }
> +           
> +        } /* /for */
> +    } /* /while */
> +    
> +    query_hash_index = apr_hash_first(temp_pool, server_queries);
> +    while (query_hash_index) {
> +        apr_hash_this(query_hash_index, NULL, NULL, (void**)&server_query);
> +        query_hash_index = apr_hash_next(query_hash_index);
> +        
> +        conn = server_query->conn;
> +        ms = server_query->ms;
> +        
> +        mget_conn_result(TRUE, rv, mc, ms, conn,
> +                         server_query, values, server_queries);
> +        continue;
> +    }
> +    
> +    apr_pool_clear(temp_pool);
> +    apr_pollset_destroy(pollset);
> +    return APR_SUCCESS;
> +    
> +}
> +
> +
> +
>  /**
>   * Define all of the strings for stats
>   */
> Index: include/apr_memcache.h
> ===================================================================
> --- include/apr_memcache.h	(revision 428526)
> +++ include/apr_memcache.h	(working copy)
> @@ -35,6 +35,7 @@
>  #include "apr_ring.h"
>  #include "apr_buckets.h"
>  #include "apr_reslist.h"
> +#include "apr_hash.h"
>  
>  #ifdef __cplusplus
>  extern "C" {
> @@ -85,6 +86,16 @@
>      apr_pool_t *p; /** Pool to use for allocations */
>  } apr_memcache_t;
>  
> +/** Returned Data from a multiple get */
> +typedef struct
> +{
> +    apr_status_t status;
> +    const char* key;
> +    apr_size_t len;
> +    char *data;
> +    apr_uint16_t flags;
> +} apr_memcache_value_t;
> +
>  /**
>   * Creates a crc32 hash used to split keys between servers
>   * @param data Data to be hashed
> @@ -194,7 +205,37 @@
>                                              apr_size_t *len,
>                                              apr_uint16_t *flags);
>  
> +
>  /**
> + * Add a key to a hash for a multiget query
> + *  if the hash (*value) is NULL it will be created
> + * @param data_pool pool from where the hash and their items are created from
> + * @param key null terminated string containing the key
> + * @param values hash of keys and values that this key will be added to
> + * @return
> + */
> +APR_DECLARE(void) 
> +apr_memcache_add_multget_key(apr_pool_t *data_pool,
> +                             const char* key,
> +                             apr_hash_t **values);
> +
> +/**
> + * Gets multiple values from the server, allocating the values out of p
> + * @param mc client to use
> + * @param temp_pool Pool used for tempoary allocations. May be cleared inside this
> + *        call.
> + * @param data_pool Pool used to allocate data for the returned values.
> + * @param values hash of apr_memcache_value_t keyed by strings, contains the
> + *        result of the multiget call.
> + * @return
> + */
> +APR_DECLARE(apr_status_t)
> +apr_memcache_multgetp(apr_memcache_t *mc,
> +                      apr_pool_t *temp_pool,
> +                      apr_pool_t *data_pool,
> +                      apr_hash_t *values);
> +
> +/**
>   * Sets a value by key on the server
>   * @param mc client to use
>   * @param key   null terminated string containing the key


Mime
View raw message