Return-Path: Delivered-To: apmail-apr-dev-archive@www.apache.org Received: (qmail 27227 invoked from network); 3 Aug 2006 23:26:05 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (209.237.227.199) by minotaur.apache.org with SMTP; 3 Aug 2006 23:26:05 -0000 Received: (qmail 78108 invoked by uid 500); 3 Aug 2006 23:26:04 -0000 Delivered-To: apmail-apr-dev-archive@apr.apache.org Received: (qmail 77764 invoked by uid 500); 3 Aug 2006 23:26:03 -0000 Mailing-List: contact dev-help@apr.apache.org; run by ezmlm Precedence: bulk List-Post: List-Help: List-Unsubscribe: List-Id: Delivered-To: mailing list dev@apr.apache.org Received: (qmail 77753 invoked by uid 99); 3 Aug 2006 23:26:03 -0000 Received: from asf.osuosl.org (HELO asf.osuosl.org) (140.211.166.49) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 03 Aug 2006 16:26:03 -0700 X-ASF-Spam-Status: No, hits=-0.0 required=10.0 tests=SPF_PASS X-Spam-Check-By: apache.org Received-SPF: pass (asf.osuosl.org: domain of chip@force-elite.com designates 66.225.8.37 as permitted sender) Received: from [66.225.8.37] (HELO utopia.in.force-elite.com) (66.225.8.37) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 03 Aug 2006 16:26:02 -0700 X-AuthUser: chip@force-elite.com Received: from [127.0.0.1] (127.0.0.1:53940) by localhost with [XMail 1.17 (Linux/Ix86) ESMTP Server] id for from ; Thu, 03 Aug 2006 16:25:37 -0700 Message-ID: <44D285F0.2040408@force-elite.com> Date: Thu, 03 Aug 2006 16:25:36 -0700 From: Paul Querna User-Agent: Thunderbird 1.5.0.2 (Macintosh/20060426) MIME-Version: 1.0 To: dev@apr.apache.org Subject: [PATCH] APR Memcache Multi-Get Support X-Enigmail-Version: 0.94.0.0 Content-Type: multipart/mixed; boundary="------------050202010905090707000703" X-Virus-Checked: Checked by ClamAV on apache.org X-Spam-Rating: minotaur.apache.org 1.6.2 0/1000/N This is a multi-part message in MIME format. --------------050202010905090707000703 Content-Type: text/plain; charset=ISO-8859-1 Content-Transfer-Encoding: 7bit Attached is a patch to add support for multiple parallel fetching of keys from a memcache cluster. It enables fetching of thousands of values from multiple memcache nodes without any significant slow downs for adding more nodes or keys. The basic logic behind the main function, apr_memcache_multgetp, is currently pounding away with thousands of concurrent queries per second, without any problem. I wouldn't call the function itself 'pretty', but I believe the logic embedded in it is correct. [[[ Add memcache multi-get support to apr_memcache. * include/apr_memcache.h Add _value_t structure for holding an individual value from memcached. Add new functions: apr_memcache_add_multget_key, apr_memcache_multgetp. * memcache/apr_memcache.c: Add a local baton structure for associating severs to queries. Add new functions: apr_memcache_add_multget_key, mget_conn_result, apr_memcache_multgetp Submitted By: Rob Emanuele , Paul Querna ]]] --------------050202010905090707000703 Content-Type: text/plain; x-mac-type="0"; x-mac-creator="0"; name="multigetp.patch" Content-Transfer-Encoding: 7bit Content-Disposition: inline; filename="multigetp.patch" Index: memcache/apr_memcache.c =================================================================== --- memcache/apr_memcache.c (revision 428526) +++ memcache/apr_memcache.c (working copy) @@ -17,6 +17,7 @@ */ #include "apr_memcache.h" +#include "apr_poll.h" #include "apr_version.h" #include @@ -38,6 +39,9 @@ #define MC_EOL "\r\n" #define MC_EOL_LEN (sizeof(MC_EOL)-1) +#define MC_WS " " +#define MC_WS_LEN (sizeof(MC_WS)-1) + #define MC_GET "get " #define MC_GET_LEN (sizeof(MC_GET)-1) @@ -97,7 +101,16 @@ #define MS_END "END" #define MS_END_LEN (sizeof(MS_END)-1) +/** Server and Query Structure for a multiple get */ +struct cache_server_query_t { + apr_memcache_server_t* ms; + apr_memcache_conn_t* conn; + struct iovec* query_vec; + unsigned int query_vec_count; +}; +#define MULT_GET_TIMEOUT 50000 + static apr_status_t make_server_dead(apr_memcache_t *mc, apr_memcache_server_t *ms) { #if APR_HAS_THREADS @@ -1021,6 +1034,334 @@ } +APR_DECLARE(void) +apr_memcache_add_multget_key(apr_pool_t *data_pool, + const char* key, + apr_hash_t **values) +{ + apr_memcache_value_t* value; + int klen = strlen(key); + + // create the value hash if need be + if (!*values) { + *values = apr_hash_make(data_pool); + } + + // init key and add it to the value hash + value = apr_pcalloc(data_pool, sizeof(apr_memcache_value_t)); + + value->status = APR_NOTFOUND; + value->key = apr_pstrdup(data_pool, key); + + apr_hash_set(*values, value->key, klen, value); +} + +static void mget_conn_result(int up, + apr_status_t rv, + apr_memcache_t *mc, + apr_memcache_server_t *ms, + apr_memcache_conn_t *conn, + struct cache_server_query_t *server_query, + apr_hash_t *values, + apr_hash_t *server_queries) +{ + int j; + apr_memcache_value_t* value; + + if (!up) { + ms_bad_conn(ms, conn); + apr_memcache_disable_server(mc, ms); + } + + for (j = 1; j < server_query->query_vec_count ; j+=2) { + if (server_query->query_vec[j].iov_base) { + value = apr_hash_get(values, server_query->query_vec[j].iov_base, + strlen(server_query->query_vec[j].iov_base)); + + if (value->status == APR_NOTFOUND) { + value->status = rv; + } + } + } + + ms_release_conn(ms, conn); + + apr_hash_set(server_queries, &ms, sizeof(ms), NULL); +} + +APR_DECLARE(apr_status_t) +apr_memcache_multgetp(apr_memcache_t *mc, + apr_pool_t *temp_pool, + apr_pool_t *data_pool, + apr_hash_t *values) +{ + apr_status_t rv; + apr_memcache_server_t* ms; + apr_memcache_conn_t* conn; + apr_uint32_t hash; + apr_size_t written; + int klen; + + apr_memcache_value_t* value; + apr_hash_index_t* value_hash_index; + + /* this is a little over aggresive, but beats multiple loops + * to figure out how long each vector needs to be per-server. + */ + unsigned int veclen = 2 + 2 * apr_hash_count(values) - 1; /* get [...]\r\n */ + unsigned int i, j; + unsigned int queries_sent; + apr_int32_t queries_recvd; + + apr_hash_t * server_queries = apr_hash_make(temp_pool); + struct cache_server_query_t* server_query; + apr_hash_index_t * query_hash_index; + + apr_pollset_t* pollset; + const apr_pollfd_t* activefds; + apr_pollfd_t* pollfds; + + + // build all the queries + value_hash_index = apr_hash_first(temp_pool, values); + while (value_hash_index) { + apr_hash_this(value_hash_index, NULL, NULL, (void**)&value); + value_hash_index = apr_hash_next(value_hash_index); + klen = strlen(value->key); + + hash = apr_memcache_hash(value->key, klen); + ms = apr_memcache_find_server_hash(mc, hash); + if (ms == NULL) { + continue; + } + + server_query = apr_hash_get(server_queries, &ms, sizeof(ms)); + + if (!server_query) { + rv = ms_find_conn(ms, &conn); + + if (rv != APR_SUCCESS) { + apr_memcache_disable_server(mc, ms); + value->status = rv; + continue; + } + + server_query = apr_pcalloc(temp_pool,sizeof(struct cache_server_query_t)); + + apr_hash_set(server_queries, &ms, sizeof(ms), server_query); + + server_query->ms = ms; + server_query->conn = conn; + server_query->query_vec = apr_pcalloc(temp_pool, sizeof(struct iovec)*veclen); + + // set up the first key + server_query->query_vec[0].iov_base = MC_GET; + server_query->query_vec[0].iov_len = MC_GET_LEN; + + server_query->query_vec[1].iov_base = (void*)(value->key); + server_query->query_vec[1].iov_len = klen; + + server_query->query_vec[2].iov_base = MC_EOL; + server_query->query_vec[2].iov_len = MC_EOL_LEN; + + server_query->query_vec_count = 3; + } + else { + j = server_query->query_vec_count - 1; + + server_query->query_vec[j].iov_base = MC_WS; + server_query->query_vec[j].iov_len = MC_WS_LEN; + j++; + + server_query->query_vec[j].iov_base = (void*)(value->key); + server_query->query_vec[j].iov_len = klen; + j++; + + server_query->query_vec[j].iov_base = MC_EOL; + server_query->query_vec[j].iov_len = MC_EOL_LEN; + j++; + + server_query->query_vec_count = j; + } + } + + // create polling structures + pollfds = apr_pcalloc(temp_pool, apr_hash_count(server_queries) * sizeof(apr_pollfd_t)); + + rv = apr_pollset_create(&pollset, apr_hash_count(server_queries), temp_pool, 0); + + if (rv != APR_SUCCESS) { + return rv; + } + + // send all the queries + queries_sent = 0; + query_hash_index = apr_hash_first(temp_pool, server_queries); + + while (query_hash_index) { + apr_hash_this(query_hash_index, NULL, NULL, (void**)&server_query); + query_hash_index = apr_hash_next(query_hash_index); + + conn = server_query->conn; + ms = server_query->ms; + + for (i = 0, rv = APR_SUCCESS; i < veclen && rv == APR_SUCCESS; i += IOV_MAX) { + rv = apr_socket_sendv(conn->sock, &(server_query->query_vec[i]), + veclen-i>IOV_MAX ? IOV_MAX : veclen-i , &written); + } + + if (rv != APR_SUCCESS) { + mget_conn_result(FALSE, rv, mc, ms, conn, + server_query, values, server_queries); + continue; + } + + pollfds[queries_sent].desc_type = APR_POLL_SOCKET; + pollfds[queries_sent].reqevents = APR_POLLIN; + pollfds[queries_sent].p = temp_pool; + pollfds[queries_sent].desc.s = conn->sock; + pollfds[queries_sent].client_data = (void *)server_query; + apr_pollset_add (pollset, &pollfds[queries_sent]); + + queries_sent++; + } + + while (queries_sent) { + rv = apr_pollset_poll(pollset, MULT_GET_TIMEOUT, &queries_recvd, &activefds); + + if (rv != APR_SUCCESS) { + // timeout + queries_sent = 0; + continue; + } + for (i = 0; i < queries_recvd; i++) { + server_query = activefds[i].client_data; + conn = server_query->conn; + ms = server_query->ms; + + rv = get_server_line(conn); + + if (rv != APR_SUCCESS) { + apr_pollset_remove (pollset, &activefds[i]); + mget_conn_result(FALSE, rv, mc, ms, conn, + server_query, values, server_queries); + queries_sent--; + continue; + } + + if (strncmp(MS_VALUE, conn->buffer, MS_VALUE_LEN) == 0) { + char *key; + char *flags; + char *length; + char *start; + char *last; + char *data; + apr_size_t len; + + start = conn->buffer; + key = apr_strtok(conn->buffer, " ", &last); // just the VALUE, ignore + key = apr_strtok(NULL, " ", &last); + flags = apr_strtok(NULL, " ", &last); + + + length = apr_strtok(NULL, " ", &last); + len = atoi(length); + + value = apr_hash_get(values, key, strlen(key)); + + + if (value) { + if (len > 0) { + apr_bucket_brigade *bbb; + apr_bucket *e; + + /* eat the trailing \r\n */ + rv = apr_brigade_partition(conn->bb, len+2, &e); + + if (rv != APR_SUCCESS) { + apr_pollset_remove (pollset, &activefds[i]); + mget_conn_result(FALSE, rv, mc, ms, conn, + server_query, values, server_queries); + queries_sent--; + continue; + } + + bbb = apr_brigade_split(conn->bb, e); + + rv = apr_brigade_pflatten(conn->bb, &data, &len, data_pool); + + if (rv != APR_SUCCESS) { + apr_pollset_remove (pollset, &activefds[i]); + mget_conn_result(FALSE, rv, mc, ms, conn, + server_query, values, server_queries); + queries_sent--; + continue; + } + + rv = apr_brigade_destroy(conn->bb); + if (rv != APR_SUCCESS) { + apr_pollset_remove (pollset, &activefds[i]); + mget_conn_result(FALSE, rv, mc, ms, conn, + server_query, values, server_queries); + queries_sent--; + continue; + } + + conn->bb = bbb; + + value->len = len - 2; + data[value->len] = '\0'; + value->data = data; + } + + value->status = rv; + value->flags = atoi(flags); + + // stay on the server + i--; + + } + else { + // TODO: Server Sent back a key I didn't ask for or my hash is corrupt + } + } + else if (strncmp(MS_END, conn->buffer, MS_END_LEN) == 0) { + // this connection is done + ms_release_conn(ms, conn); + apr_hash_set(server_queries, &ms, sizeof(ms), NULL); + + apr_pollset_remove (pollset, &activefds[i]); + queries_sent--; + } + else { + /* unknown reply? */ + rv = APR_EGENERAL; + } + + } /* /for */ + } /* /while */ + + query_hash_index = apr_hash_first(temp_pool, server_queries); + while (query_hash_index) { + apr_hash_this(query_hash_index, NULL, NULL, (void**)&server_query); + query_hash_index = apr_hash_next(query_hash_index); + + conn = server_query->conn; + ms = server_query->ms; + + mget_conn_result(TRUE, rv, mc, ms, conn, + server_query, values, server_queries); + continue; + } + + apr_pool_clear(temp_pool); + apr_pollset_destroy(pollset); + return APR_SUCCESS; + +} + + + /** * Define all of the strings for stats */ Index: include/apr_memcache.h =================================================================== --- include/apr_memcache.h (revision 428526) +++ include/apr_memcache.h (working copy) @@ -35,6 +35,7 @@ #include "apr_ring.h" #include "apr_buckets.h" #include "apr_reslist.h" +#include "apr_hash.h" #ifdef __cplusplus extern "C" { @@ -85,6 +86,16 @@ apr_pool_t *p; /** Pool to use for allocations */ } apr_memcache_t; +/** Returned Data from a multiple get */ +typedef struct +{ + apr_status_t status; + const char* key; + apr_size_t len; + char *data; + apr_uint16_t flags; +} apr_memcache_value_t; + /** * Creates a crc32 hash used to split keys between servers * @param data Data to be hashed @@ -194,7 +205,37 @@ apr_size_t *len, apr_uint16_t *flags); + /** + * Add a key to a hash for a multiget query + * if the hash (*value) is NULL it will be created + * @param data_pool pool from where the hash and their items are created from + * @param key null terminated string containing the key + * @param values hash of keys and values that this key will be added to + * @return + */ +APR_DECLARE(void) +apr_memcache_add_multget_key(apr_pool_t *data_pool, + const char* key, + apr_hash_t **values); + +/** + * Gets multiple values from the server, allocating the values out of p + * @param mc client to use + * @param temp_pool Pool used for tempoary allocations. May be cleared inside this + * call. + * @param data_pool Pool used to allocate data for the returned values. + * @param values hash of apr_memcache_value_t keyed by strings, contains the + * result of the multiget call. + * @return + */ +APR_DECLARE(apr_status_t) +apr_memcache_multgetp(apr_memcache_t *mc, + apr_pool_t *temp_pool, + apr_pool_t *data_pool, + apr_hash_t *values); + +/** * Sets a value by key on the server * @param mc client to use * @param key null terminated string containing the key --------------050202010905090707000703--