Return-Path: Delivered-To: apmail-hadoop-zookeeper-commits-archive@minotaur.apache.org Received: (qmail 72375 invoked from network); 9 Jun 2009 19:01:22 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 9 Jun 2009 19:01:22 -0000 Received: (qmail 34214 invoked by uid 500); 9 Jun 2009 19:01:34 -0000 Delivered-To: apmail-hadoop-zookeeper-commits-archive@hadoop.apache.org Received: (qmail 34186 invoked by uid 500); 9 Jun 2009 19:01:34 -0000 Mailing-List: contact zookeeper-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: zookeeper-dev@ Delivered-To: mailing list zookeeper-commits@hadoop.apache.org Received: (qmail 34176 invoked by uid 99); 9 Jun 2009 19:01:34 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 09 Jun 2009 19:01:34 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 09 Jun 2009 19:01:32 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id DEE3C2388892; Tue, 9 Jun 2009 19:01:11 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r783096 - in /hadoop/zookeeper/trunk: CHANGES.txt src/c/src/mt_adaptor.c src/c/src/zk_adaptor.h src/c/src/zookeeper.c src/c/tests/TestClient.cc Date: Tue, 09 Jun 2009 19:01:11 -0000 To: zookeeper-commits@hadoop.apache.org From: mahadev@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20090609190111.DEE3C2388892@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: mahadev Date: Tue Jun 9 19:01:11 2009 New Revision: 783096 URL: http://svn.apache.org/viewvc?rev=783096&view=rev Log: ZOOKEEPER-375. zoo_add_auth only retains most recent auth on re-sync. (mahadev) Modified: hadoop/zookeeper/trunk/CHANGES.txt hadoop/zookeeper/trunk/src/c/src/mt_adaptor.c hadoop/zookeeper/trunk/src/c/src/zk_adaptor.h hadoop/zookeeper/trunk/src/c/src/zookeeper.c hadoop/zookeeper/trunk/src/c/tests/TestClient.cc Modified: hadoop/zookeeper/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/CHANGES.txt?rev=783096&r1=783095&r2=783096&view=diff ============================================================================== --- hadoop/zookeeper/trunk/CHANGES.txt (original) +++ hadoop/zookeeper/trunk/CHANGES.txt Tue Jun 9 19:01:11 2009 @@ -115,6 +115,9 @@ ZOOKEEPER-435. allow "super" admin digest based auth to be configurable (phunt via breed) + ZOOKEEPER-375. zoo_add_auth only retains most recent auth on re-sync. +(mahadev) + IMPROVEMENTS: ZOOKEEPER-308. improve the atomic broadcast performance 3x. (breed via mahadev) Modified: hadoop/zookeeper/trunk/src/c/src/mt_adaptor.c URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/c/src/mt_adaptor.c?rev=783096&r1=783095&r2=783096&view=diff ============================================================================== --- hadoop/zookeeper/trunk/src/c/src/mt_adaptor.c (original) +++ hadoop/zookeeper/trunk/src/c/src/mt_adaptor.c Tue Jun 9 19:01:11 2009 @@ -43,11 +43,11 @@ void zoo_lock_auth(zhandle_t *zh) { - pthread_mutex_lock(&zh->auth.lock); + pthread_mutex_lock(&zh->auth_h.lock); } void zoo_unlock_auth(zhandle_t *zh) { - pthread_mutex_unlock(&zh->auth.lock); + pthread_mutex_unlock(&zh->auth_h.lock); } void lock_buffer_list(buffer_head_t *l) { @@ -175,7 +175,7 @@ set_nonblock(adaptor_threads->self_pipe[1]); set_nonblock(adaptor_threads->self_pipe[0]); - pthread_mutex_init(&zh->auth.lock,0); + pthread_mutex_init(&zh->auth_h.lock,0); zh->adaptor_priv = adaptor_threads; pthread_mutex_init(&zh->to_process.lock,0); @@ -237,7 +237,7 @@ pthread_cond_destroy(&zh->completions_to_process.cond); pthread_mutex_destroy(&adaptor->zh_lock); - pthread_mutex_destroy(&zh->auth.lock); + pthread_mutex_destroy(&zh->auth_h.lock); close(adaptor->self_pipe[0]); close(adaptor->self_pipe[1]); Modified: hadoop/zookeeper/trunk/src/c/src/zk_adaptor.h URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/c/src/zk_adaptor.h?rev=783096&r1=783095&r2=783096&view=diff ============================================================================== --- hadoop/zookeeper/trunk/src/c/src/zk_adaptor.h (original) +++ hadoop/zookeeper/trunk/src/c/src/zk_adaptor.h Tue Jun 9 19:01:11 2009 @@ -107,9 +107,7 @@ struct buffer auth; void_completion_t completion; const char* data; -#ifdef THREADED - pthread_mutex_t lock; -#endif + struct _auth_info *next; } auth_info; /** @@ -156,7 +154,15 @@ int self_pipe[2]; }; #endif - + +/** the auth list for adding auth */ +typedef struct _auth_list_head { + auth_info *auth; +#ifdef THREADED + pthread_mutex_t lock; +#endif +} auth_list_head_t; + /** * This structure represents the connection to zookeeper. */ @@ -187,7 +193,7 @@ char primer_storage_buffer[40]; /* the true size of primer_storage */ volatile int state; void *context; - struct _auth_info auth; /* authentication data */ + auth_list_head_t auth_h; /* authentication data list */ /* zookeeper_close is not reentrant because it de-allocates the zhandler. * This guard variable is used to defer the destruction of zhandle till * right before top-level API call returns to the caller */ @@ -204,6 +210,7 @@ zk_hashtable* active_child_watchers; }; + int adaptor_init(zhandle_t *zh); void adaptor_finish(zhandle_t *zh); void adaptor_destroy(zhandle_t *zh); Modified: hadoop/zookeeper/trunk/src/c/src/zookeeper.c URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/c/src/zookeeper.c?rev=783096&r1=783095&r2=783096&view=diff ============================================================================== --- hadoop/zookeeper/trunk/src/c/src/zookeeper.c (original) +++ hadoop/zookeeper/trunk/src/c/src/zookeeper.c Tue Jun 9 19:01:11 2009 @@ -139,6 +139,12 @@ #define COMPLETION_ACLLIST 4 #define COMPLETION_STRING 5 +typedef struct _auth_completion_list { + void_completion_t completion; + const char *auth_data; + struct _auth_completion_list *next; +} auth_completion_list_t; + typedef struct _completion_list { int xid; int completion_type; /* one of the COMPLETION_* values */ @@ -198,22 +204,123 @@ return zh->recv_timeout; } -static void init_auth_info(auth_info *auth) +/** these functions are thread unsafe, so make sure that + zoo_lock_auth is called before you access them **/ +static auth_info* get_last_auth(auth_list_head_t *auth_list) { + auth_info *element; + element = auth_list->auth; + if (element == NULL) { + return NULL; + } + while (element->next != NULL) { + element = element->next; + } + return element; +} + +static void free_auth_completion(auth_completion_list_t *a_list) { + auth_completion_list_t *tmp, *ftmp; + if (a_list == NULL) { + return; + } + tmp = a_list->next; + while (tmp != NULL) { + ftmp = tmp; + tmp = tmp->next; + ftmp->completion = NULL; + ftmp->auth_data = NULL; + free(ftmp); + } + a_list->completion = NULL; + a_list->auth_data = NULL; + a_list->next = NULL; + return; +} + +static void add_auth_completion(auth_completion_list_t* a_list, void_completion_t* completion, + const char *data) { + auth_completion_list_t *element; + auth_completion_list_t *n_element; + element = a_list; + if (a_list->completion == NULL) { + //this is the first element + a_list->completion = *completion; + a_list->next = NULL; + a_list->auth_data = data; + return; + } + while (element->next != NULL) { + element = element->next; + } + n_element = (auth_completion_list_t*) malloc(sizeof(auth_completion_list_t)); + n_element->next = NULL; + n_element->completion = *completion; + n_element->auth_data = data; + element->next = n_element; + return; +} + +static void get_auth_completions(auth_list_head_t *auth_list, auth_completion_list_t *a_list) { + auth_info *element; + element = auth_list->auth; + if (element == NULL) { + return; + } + while (element) { + if (element->completion) { + add_auth_completion(a_list, &element->completion, element->data); + } + element->completion = NULL; + element = element->next; + } + return; +} + +static void add_last_auth(auth_list_head_t *auth_list, auth_info *add_el) { + auth_info *element; + element = auth_list->auth; + if (element == NULL) { + //first element in the list + auth_list->auth = add_el; + return; + } + while (element->next != NULL) { + element = element->next; + } + element->next = add_el; + return; +} + +static void init_auth_info(auth_list_head_t *auth_list) { - auth->scheme=NULL; - auth->auth.buff=NULL; - auth->auth.len=0; - auth->state=0; - auth->completion=0; - auth->data=0; -} - -static void free_auth_info(auth_info *auth) -{ - if(auth->scheme!=NULL) - free(auth->scheme); - deallocate_Buffer(&auth->auth); - init_auth_info(auth); + auth_list->auth = NULL; +} + +static void mark_active_auth(zhandle_t *zh) { + auth_list_head_t auth_h = zh->auth_h; + auth_info *element; + if (auth_h.auth == NULL) { + return; + } + element = auth_h.auth; + while (element->next != NULL) { + element->state = 1; + element = element->next; + } +} + +static void free_auth_info(auth_list_head_t *auth_list) +{ + auth_info *auth = auth_list->auth; + while (auth != NULL) { + if(auth->scheme!=NULL) + free(auth->scheme); + deallocate_Buffer(&auth->auth); + auth_info *old_auth = auth; + auth = auth->next; + free(old_auth); + } + init_auth_info(auth_list); } int is_unrecoverable(zhandle_t *zh) @@ -265,7 +372,7 @@ free(zh->addrs); zh->addrs = NULL; } - free_auth_info(&zh->auth); + free_auth_info(&zh->auth_h); destroy_zk_hashtable(zh->active_node_watchers); destroy_zk_hashtable(zh->active_exist_watchers); destroy_zk_hashtable(zh->active_child_watchers); @@ -507,6 +614,7 @@ zh->state = 0; zh->context = context; zh->recv_timeout = recv_timeout; + init_auth_info(&zh->auth_h); if (watcher) { zh->watcher = watcher; } else { @@ -762,8 +870,8 @@ struct oarchive *oa; struct ReplyHeader h; void_completion_t auth_completion = NULL; - const char *auth_data = NULL; - + auth_completion_list_t a_list, *a_tmp; + lock_completion_list(&zh->sent_requests); tmp_list = zh->sent_requests; zh->sent_requests.head = 0; @@ -802,18 +910,21 @@ } } } - + a_list.completion = NULL; + a_list.next = NULL; zoo_lock_auth(zh); - if (zh->auth.completion) { - auth_completion = zh->auth.completion; - auth_data = zh->auth.data; - zh->auth.completion = 0; - } + get_auth_completions(&zh->auth_h, &a_list); zoo_unlock_auth(zh); - - if (auth_completion) { - auth_completion(reason, auth_data); + a_tmp = &a_list; + // chain call user's completion function + while (a_tmp->completion != NULL) { + auth_completion = a_tmp->completion; + auth_completion(reason, a_tmp->auth_data); + a_tmp = a_tmp->next; + if (a_tmp == NULL) + break; } + free_auth_completion(&a_list); } static void cleanup_bufs(zhandle_t *zh,int callCompletion,int rc) @@ -871,8 +982,9 @@ static void auth_completion_func(int rc, zhandle_t* zh) { void_completion_t auth_completion = NULL; - const char *auth_data = NULL; - + auth_completion_list_t a_list; + auth_completion_list_t *a_tmp; + if(zh==NULL) return; @@ -881,61 +993,82 @@ if(rc!=0){ zh->state=ZOO_AUTH_FAILED_STATE; }else{ - zh->auth.state=1; // active - } - - if (zh->auth.completion) { - auth_completion = zh->auth.completion; - auth_data = zh->auth.data; - zh->auth.completion=0; + //change state for all auths + mark_active_auth(zh); } - + a_list.completion = NULL; + a_list.next = NULL; + get_auth_completions(&zh->auth_h, &a_list); zoo_unlock_auth(zh); - if (rc) { LOG_ERROR(("Authentication scheme %s failed. Connection closed.", - zh->auth.scheme)); + zh->auth_h.auth->scheme)); } else { - LOG_INFO(("Authentication scheme %s succeeded", zh->auth.scheme)); + LOG_INFO(("Authentication scheme %s succeeded", zh->auth_h.auth->scheme)); } - + a_tmp = &a_list; // chain call user's completion function - if (auth_completion) { - auth_completion(rc, auth_data); + while (a_tmp->completion != NULL) { + auth_completion = a_tmp->completion; + auth_completion(rc, a_tmp->auth_data); + a_tmp = a_tmp->next; + if (a_tmp == NULL) + break; } + free_auth_completion(&a_list); } -static int send_auth_info(zhandle_t *zh) -{ +static int send_info_packet(zhandle_t *zh, auth_info* auth) { struct oarchive *oa; struct RequestHeader h = { .xid = AUTH_XID, .type = SETAUTH_OP}; struct AuthPacket req; int rc; - - zoo_lock_auth(zh); - - if(zh->auth.scheme==NULL) { - zoo_unlock_auth(zh); - return ZOK; // there is nothing to send - } - oa = create_buffer_oarchive(); rc = serialize_RequestHeader(oa, "header", &h); - req.type=0; // ignored by the server - req.scheme = zh->auth.scheme; - req.auth = zh->auth.auth; + req.scheme = auth->scheme; + req.auth = auth->auth; rc = rc < 0 ? rc : serialize_AuthPacket(oa, "req", &req); - - zoo_unlock_auth(zh); - /* add this buffer to the head of the send queue */ rc = rc < 0 ? rc : queue_front_buffer_bytes(&zh->to_send, get_buffer(oa), get_buffer_len(oa)); /* We queued the buffer, so don't free it */ close_buffer_oarchive(&oa, 0); - + + return rc; +} + +/** send all auths, not just the last one **/ +static int send_auth_info(zhandle_t *zh) { + int rc; + zoo_lock_auth(zh); + auth_info *auth; + auth = zh->auth_h.auth; + if (auth == NULL) { + zoo_unlock_auth(zh); + return ZOK; + } + while (auth->next != NULL) { + rc = send_info_packet(zh, auth); + auth = auth->next; + } + zoo_unlock_auth(zh); + LOG_DEBUG(("Sending all auth info request to %s", format_current_endpoint_info(zh))); + return (rc <0) ? ZMARSHALLINGERROR:ZOK; +} + +static int send_last_auth_info(zhandle_t *zh) +{ + int rc; + zoo_lock_auth(zh); + auth_info *auth = get_last_auth(&zh->auth_h); + if(auth==NULL) { + zoo_unlock_auth(zh); + return ZOK; // there is nothing to send + } + rc = send_info_packet(zh, auth); + zoo_unlock_auth(zh); LOG_DEBUG(("Sending auth info request to %s",format_current_endpoint_info(zh))); return (rc < 0)?ZMARSHALLINGERROR:ZOK; } @@ -2382,7 +2515,7 @@ int certLen,void_completion_t completion, const void *data) { struct buffer auth; - + auth_info *authinfo; if(scheme==NULL || zh==NULL) return ZBADARGUMENTS; @@ -2402,18 +2535,18 @@ } zoo_lock_auth(zh); - - free_auth_info(&zh->auth); - zh->auth.scheme=strdup(scheme); + authinfo = (auth_info*) malloc(sizeof(auth_info)); + authinfo->scheme=strdup(scheme); if(auth.buff) - zh->auth.auth=auth; - zh->auth.completion=completion; - zh->auth.data=data; - + authinfo->auth=auth; + authinfo->completion=completion; + authinfo->data=data; + authinfo->next = NULL; + add_last_auth(&zh->auth_h, authinfo); zoo_unlock_auth(zh); - + if(zh->state == ZOO_CONNECTED_STATE || zh->state == ZOO_ASSOCIATING_STATE) - return send_auth_info(zh); + return send_last_auth_info(zh); return ZOK; } Modified: hadoop/zookeeper/trunk/src/c/tests/TestClient.cc URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/c/tests/TestClient.cc?rev=783096&r1=783095&r2=783096&view=diff ============================================================================== --- hadoop/zookeeper/trunk/src/c/tests/TestClient.cc (original) +++ hadoop/zookeeper/trunk/src/c/tests/TestClient.cc Tue Jun 9 19:01:11 2009 @@ -97,6 +97,8 @@ typedef struct watchCtx { private: list events; + watchCtx(const watchCtx&); + watchCtx& operator=(const watchCtx&); public: bool connected; zhandle_t *zh; @@ -270,7 +272,8 @@ #define COUNT 100 static zhandle_t *async_zk; - + static volatile int count; + static void statCompletion(int rc, const struct Stat *stat, const void *data) { int tmp = (int) (long) data; CPPUNIT_ASSERT_EQUAL(tmp, rc); @@ -289,12 +292,21 @@ free(path); } } - + + static void waitForVoidCompletion(int seconds) { + time_t expires = time(0) + seconds; + while(count == 0 && time(0) < expires) { + sleep(1); + } + count--; + } + static void voidCompletion(int rc, const void *data) { int tmp = (int) (long) data; CPPUNIT_ASSERT_EQUAL(tmp, rc); + count++; } - + static void verifyCreateFails(const char *path, zhandle_t *zk) { CPPUNIT_ASSERT_EQUAL((int)ZBADARGUMENTS, zoo_create(zk, path, "", 0, &ZOO_OPEN_ACL_UNSAFE, 0, 0, 0)); @@ -363,47 +375,68 @@ void testAuth() { int rc; - - watchctx_t ctx1; + count = 0; + watchctx_t ctx1, ctx2, ctx3; zhandle_t *zk = createClient(&ctx1); - + rc = zoo_add_auth(0, "", 0, 0, voidCompletion, (void*)-1); CPPUNIT_ASSERT_EQUAL((int) ZBADARGUMENTS, rc); - + rc = zoo_add_auth(zk, 0, 0, 0, voidCompletion, (void*)-1); CPPUNIT_ASSERT_EQUAL((int) ZBADARGUMENTS, rc); - + // auth as pat, create /tauth1, close session rc = zoo_add_auth(zk, "digest", "pat:passwd", 10, voidCompletion, (void*)ZOK); CPPUNIT_ASSERT_EQUAL((int) ZOK, rc); - + waitForVoidCompletion(3); + + CPPUNIT_ASSERT(count == 0); + rc = zoo_create(zk, "/tauth1", "", 0, &ZOO_CREATOR_ALL_ACL, 0, 0, 0); CPPUNIT_ASSERT_EQUAL((int)ZOK, rc); - // auth as pat w/bad pass, access /tauth1, verify failure - watchctx_t ctx2; zk = createClient(&ctx2); rc = zoo_add_auth(zk, "digest", "pat:passwd2", 11, voidCompletion, (void*)ZOK); CPPUNIT_ASSERT_EQUAL((int) ZOK, rc); - + waitForVoidCompletion(3); + CPPUNIT_ASSERT(count == 0); + char buf[1024]; int blen = sizeof(buf); struct Stat stat; rc = zoo_get(zk, "/tauth1", 0, buf, &blen, &stat); CPPUNIT_ASSERT_EQUAL((int)ZNOAUTH, rc); - // add auth pat w/correct pass verify success rc = zoo_add_auth(zk, "digest", "pat:passwd", 10, voidCompletion, (void*)ZOK); + CPPUNIT_ASSERT_EQUAL((int) ZOK, rc); - + rc = zoo_get(zk, "/tauth1", 0, buf, &blen, &stat); + CPPUNIT_ASSERT_EQUAL((int)ZOK, rc); + waitForVoidCompletion(3); + CPPUNIT_ASSERT(count == 0); + //create a new client + zk = createClient(&ctx3); + rc = zoo_add_auth(zk, "digest", "pat:passwd", 10, voidCompletion, (void*) ZOK); + waitForVoidCompletion(3); + CPPUNIT_ASSERT(count == 0); + rc = zoo_add_auth(zk, "ip", "none", 4, voidCompletion, (void*)ZOK); + //make the server forget the auths + waitForVoidCompletion(3); + CPPUNIT_ASSERT(count == 0); + + stopServer(); + CPPUNIT_ASSERT(ctx3.waitForDisconnected(zk)); + startServer(); + CPPUNIT_ASSERT(ctx3.waitForConnected(zk)); + // now try getting the data rc = zoo_get(zk, "/tauth1", 0, buf, &blen, &stat); CPPUNIT_ASSERT_EQUAL((int)ZOK, rc); } - + void testNullData() { watchctx_t ctx; zhandle_t *zk = createClient(&ctx); @@ -697,6 +730,7 @@ } }; +volatile int Zookeeper_simpleSystem::count; zhandle_t *Zookeeper_simpleSystem::async_zk; const char Zookeeper_simpleSystem::hostPorts[] = "127.0.0.1:22181"; CPPUNIT_TEST_SUITE_REGISTRATION(Zookeeper_simpleSystem);