Return-Path: Delivered-To: apmail-hadoop-zookeeper-commits-archive@locus.apache.org Received: (qmail 62035 invoked from network); 24 Jul 2008 21:46:54 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 24 Jul 2008 21:46:54 -0000 Received: (qmail 54134 invoked by uid 500); 24 Jul 2008 21:46:54 -0000 Delivered-To: apmail-hadoop-zookeeper-commits-archive@hadoop.apache.org Received: (qmail 54112 invoked by uid 500); 24 Jul 2008 21:46:54 -0000 Mailing-List: contact zookeeper-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: zookeeper-dev@ Delivered-To: mailing list zookeeper-commits@hadoop.apache.org Received: (qmail 54101 invoked by uid 99); 24 Jul 2008 21:46:54 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 24 Jul 2008 14:46:54 -0700 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 24 Jul 2008 21:46:07 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id C6EF023889C4; Thu, 24 Jul 2008 14:46:32 -0700 (PDT) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r679557 [2/3] - in /hadoop/zookeeper/trunk/src/c: ./ include/ src/ src/hashtable/ tests/ Date: Thu, 24 Jul 2008 21:46:31 -0000 To: zookeeper-commits@hadoop.apache.org From: akornev@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20080724214632.C6EF023889C4@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Modified: hadoop/zookeeper/trunk/src/c/src/zookeeper.c URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/c/src/zookeeper.c?rev=679557&r1=679556&r2=679557&view=diff ============================================================================== --- hadoop/zookeeper/trunk/src/c/src/zookeeper.c (original) +++ hadoop/zookeeper/trunk/src/c/src/zookeeper.c Thu Jul 24 14:46:30 2008 @@ -29,6 +29,8 @@ #include #include "zk_adaptor.h" #include "zk_log.h" +#include "zk_hashtable.h" + #include #include #include @@ -55,50 +57,50 @@ const int EPHEMERAL = 1 << 0; const int SEQUENCE = 1 << 1; -const int EXPIRED_SESSION_STATE = -112; -const int AUTH_FAILED_STATE = -113; -const int CONNECTING_STATE = 1; -const int ASSOCIATING_STATE = 2; -const int CONNECTED_STATE = 3; +const int EXPIRED_SESSION_STATE = EXPIRED_SESSION_STATE_DEF; +const int AUTH_FAILED_STATE = AUTH_FAILED_STATE_DEF; +const int CONNECTING_STATE = CONNECTING_STATE_DEF; +const int ASSOCIATING_STATE = ASSOCIATING_STATE_DEF; +const int CONNECTED_STATE = CONNECTED_STATE_DEF; static __attribute__ ((unused)) const char* state2String(int state){ switch(state){ case 0: return "CLOSED_STATE"; - case 1 /*CONNECTING_STATE*/: + case CONNECTING_STATE_DEF: return "CONNECTING_STATE"; - case 2 /*ASSOCIATING_STATE*/: + case ASSOCIATING_STATE_DEF: return "ASSOCIATING_STATE"; - case 3 /*CONNECTED_STATE*/: + case CONNECTED_STATE_DEF: return "CONNECTED_STATE"; - case -112 /*EXPIRED_SESSION_STATE*/: + case EXPIRED_SESSION_STATE_DEF: return "EXPIRED_SESSION_STATE"; - case -113 /*AUTH_FAILED_STATE*/: + case AUTH_FAILED_STATE_DEF: return "AUTH_FAILED_STATE"; } return "INVALID_STATE"; } -const int CREATED_EVENT = 1; -const int DELETED_EVENT = 2; -const int CHANGED_EVENT = 3; -const int CHILD_EVENT = 4; -const int SESSION_EVENT = -1; -const int NOTWATCHING_EVENT = -2; +const int CREATED_EVENT = CREATED_EVENT_DEF; +const int DELETED_EVENT = DELETED_EVENT_DEF; +const int CHANGED_EVENT = CHANGED_EVENT_DEF; +const int CHILD_EVENT = CHILD_EVENT_DEF; +const int SESSION_EVENT = SESSION_EVENT_DEF; +const int NOTWATCHING_EVENT = NOTWATCHING_EVENT_DEF; static __attribute__ ((unused)) const char* watcherEvent2String(int ev){ switch(ev){ case 0: return "ERROR_EVENT"; - case 1 /*CREATED_EVENT*/: + case CREATED_EVENT_DEF: return "CREATED_EVENT"; - case 2 /*DELETED_EVENT*/: + case DELETED_EVENT_DEF: return "DELETED_EVENT"; - case 3 /*CHANGED_EVENT*/: + case CHANGED_EVENT_DEF: return "CHANGED_EVENT"; - case 4 /*CHILD_EVENT*/: + case CHILD_EVENT_DEF: return "CHILD_EVENT"; - case -1 /*SESSION_EVENT*/: + case SESSION_EVENT_DEF: return "SESSION_EVENT"; - case -2 /*NOTWATCHING_EVENT*/: + case NOTWATCHING_EVENT_DEF: return "NOTWATCHING_EVENT"; } return "INVALID_EVENT"; @@ -126,19 +128,6 @@ #define COMPLETION_ACLLIST 4 #define COMPLETION_STRING 5 -const char*err2string(int err); -static const char* format_endpoint_info(const struct sockaddr* ep); -static const char* format_current_endpoint_info(zhandle_t* zh); -static int add_completion(zhandle_t *zh, int xid, int completion_type, - const void *dc, const void *data, int add_to_front); -static int handle_socket_error_msg(zhandle_t *zh, int line, int rc, - const char* format,...); -static void cleanup_bufs(zhandle_t *zh,int callCompletion,int rc); - -static int disable_conn_permute=0; // permute enabled by default - -static void *SYNCHRONOUS_MARKER = (void*)&SYNCHRONOUS_MARKER; - typedef struct _completion_list { int xid; int completion_type; /* one of the COMPLETION_* values */ @@ -153,8 +142,30 @@ const void *data; buffer_list_t *buffer; struct _completion_list *next; + watcher_registration_t* watcher; } completion_list_t; +const char*err2string(int err); +static const char* format_endpoint_info(const struct sockaddr* ep); +static const char* format_current_endpoint_info(zhandle_t* zh); + +/* completion routine forward declarations */ +static int add_completion(zhandle_t *zh, int xid, int completion_type, + const void *dc, const void *data, int add_to_front,watcher_registration_t* wo); +static completion_list_t* create_completion_entry(int xid, int completion_type, + const void *dc, const void *data,watcher_registration_t* wo); +static void destroy_completion_entry(completion_list_t* c); +static void queue_completion(completion_head_t *list, completion_list_t *c, + int add_to_front); + +static int handle_socket_error_msg(zhandle_t *zh, int line, int rc, + const char* format,...); +static void cleanup_bufs(zhandle_t *zh,int callCompletion,int rc); + +static int disable_conn_permute=0; // permute enabled by default + +static void *SYNCHRONOUS_MARKER = (void*)&SYNCHRONOUS_MARKER; + const void *zoo_get_context(zhandle_t *zh) { return zh->context; @@ -194,6 +205,16 @@ { return (zh->state<0)? ZINVALIDSTATE: ZOK; } + +int exists_result_checker(int rc) +{ + return rc==ZOK ||rc == ZNONODE; +} + +int default_result_checker(int rc) +{ + return rc==ZOK; +} /** * Frees and closes everything associated with a handle, * including the handle itself. @@ -219,6 +240,8 @@ zh->addrs = NULL; } free_auth_info(&zh->auth); + destroy_zk_hashtable(zh->active_node_watchers); + destroy_zk_hashtable(zh->active_child_watchers); } static void setup_random() @@ -359,7 +382,7 @@ return &zh->client_id; } -static void null_watcher_fn(zhandle_t* p1, int p2, int p3,const char* p4){} +static void null_watcher_fn(zhandle_t* p1, int p2, int p3,const char* p4,void*p5){} watcher_fn zoo_set_watcher(zhandle_t *zh,watcher_fn newFn) { @@ -412,9 +435,13 @@ zh->last_zxid = 0; zh->next_deadline.tv_sec=zh->next_deadline.tv_usec=0; zh->socket_readable.tv_sec=zh->socket_readable.tv_usec=0; + zh->active_node_watchers=create_zk_hashtable(); + zh->active_child_watchers=create_zk_hashtable(); + if (adaptor_init(zh) == -1) { goto abort; } + return zh; abort: errnosave=errno; @@ -675,7 +702,7 @@ break; } } - free(cptr); + destroy_completion_entry(cptr); } } @@ -1116,11 +1143,6 @@ fprintf(LOGSTREAM,"end\n"); } -static completion_list_t* create_completion_entry(int xid, int completion_type, - const void *dc, const void *data); -static void queue_completion(completion_head_t *list, completion_list_t *c, - int add_to_front); - #ifdef THREADED // IO thread queues session events to be processed by the completion thread int queue_session_event(zhandle_t *zh, int state) @@ -1141,12 +1163,7 @@ close_buffer_oarchive(&oa, 1); goto error; } - if ((cptr=calloc(1,sizeof(*cptr)))==NULL) { - LOG_ERROR(("out of memory")); - close_buffer_oarchive(&oa, 1); - goto error; - } - cptr->xid = WATCHER_EVENT_XID; + cptr = create_completion_entry(WATCHER_EVENT_XID,-1,0,0,0); cptr->buffer = allocate_buffer(get_buffer(oa), get_buffer_len(oa)); cptr->buffer->curr_offset = get_buffer_len(oa); if (!cptr->buffer) { @@ -1180,6 +1197,8 @@ return cptr; } + +/* handles async completion (both single- and multithreaded) */ void process_completions(zhandle_t *zh) { completion_list_t *cptr; @@ -1201,7 +1220,7 @@ /* This is a notification so there aren't any pending requests */ LOG_DEBUG(("Calling a watcher for node [%s], event=%s", (evt.path==NULL?"NULL":evt.path),watcherEvent2String(type))); - zh->watcher(zh, type, state, evt.path); + deliverWatchers(zh,type,state,evt.path); deallocate_WatcherEvent(&evt); } else { int rc = hdr.err; @@ -1271,9 +1290,9 @@ } break; } - free_buffer(cptr->buffer); - free(cptr); + activateWatcher(cptr->watcher,rc); } + destroy_completion_entry(cptr); close_buffer_iarchive(&ia); } } @@ -1331,7 +1350,7 @@ zh->last_zxid = hdr.zxid; if (hdr.xid == WATCHER_EVENT_XID) { - completion_list_t *c = create_completion_entry(WATCHER_EVENT_XID,-1,0,0); + completion_list_t *c = create_completion_entry(WATCHER_EVENT_XID,-1,0,0,0); c->buffer = bptr; queue_completion(&zh->completions_to_process, c, 0); } else if(hdr.xid == AUTH_XID){ @@ -1378,10 +1397,10 @@ sc->rc = rc; switch(cptr->completion_type) { case COMPLETION_DATA: + LOG_DEBUG(("Calling COMPLETION_DATA for xid=%x rc=%d",cptr->xid,rc)); if (rc==0) { struct GetDataResponse res; int len; - LOG_DEBUG(("Calling COMPLETION_DATA for xid=%x rc=%d",cptr->xid,rc)); deserialize_GetDataResponse(ia, "reply", &res); if (res.data.len <= sc->u.data.buff_len) { len = res.data.len; @@ -1395,18 +1414,18 @@ } break; case COMPLETION_STAT: + LOG_DEBUG(("Calling COMPLETION_STAT for xid=%x rc=%d",cptr->xid,rc)); if (rc == 0) { struct SetDataResponse res; - LOG_DEBUG(("Calling COMPLETION_STAT for xid=%x rc=%d",cptr->xid,rc)); deserialize_SetDataResponse(ia, "reply", &res); sc->u.stat = res.stat; deallocate_SetDataResponse(&res); } break; case COMPLETION_STRINGLIST: + LOG_DEBUG(("Calling COMPLETION_STRINGLIST for xid=%x rc=%d",cptr->xid,rc)); if (rc == 0) { struct GetChildrenResponse res; - LOG_DEBUG(("Calling COMPLETION_STRINGLIST for xid=%x rc=%d",cptr->xid,rc)); deserialize_GetChildrenResponse(ia, "reply", &res); sc->u.strs = res.children; /* We don't deallocate since we are passing it back */ @@ -1414,10 +1433,10 @@ } break; case COMPLETION_STRING: + LOG_DEBUG(("Calling COMPLETION_STRING for xid=%x rc=%d",cptr->xid,rc)); if (rc == 0) { struct CreateResponse res; int len; - LOG_DEBUG(("Calling COMPLETION_STRING for xid=%x rc=%d",cptr->xid,rc)); deserialize_CreateResponse(ia, "reply", &res); if (sc->u.str.str_len > strlen(res.path)) { len = strlen(res.path); @@ -1430,9 +1449,9 @@ } break; case COMPLETION_ACLLIST: + LOG_DEBUG(("Calling COMPLETION_ACLLIST for xid=%x rc=%d",cptr->xid,rc)); if (rc == 0) { struct GetACLResponse res; - LOG_DEBUG(("Calling COMPLETION_ACLLIST for xid=%x rc=%d",cptr->xid,rc)); deserialize_GetACLResponse(ia, "reply", &res); cptr->c.acl_result(rc, &res.acl, &res.stat, cptr->data); sc->u.acl.acl = res.acl; @@ -1445,6 +1464,7 @@ LOG_DEBUG(("Calling COMPLETION_VOID for xid=%x rc=%d",cptr->xid,rc)); break; } + activateWatcher(cptr->watcher,rc); notify_sync_completion(sc); free_buffer(bptr); zh->outstanding_sync--; @@ -1468,8 +1488,30 @@ return 0; } +static watcher_registration_t* create_watcher_registration(const char* path, + result_checker_fn checker,watcher_fn watcher,void* ctx, + zk_hashtable* activeMap){ + watcher_registration_t* wo; + if(watcher==0) + return 0; + wo=calloc(1,sizeof(watcher_registration_t)); + wo->path=strdup(path); + wo->watcher=watcher; + wo->context=ctx; + wo->checker=checker==0?default_result_checker:checker; + wo->activeMap=activeMap; + return wo; +} + +static void destroy_watcher_registration(watcher_registration_t* wo){ + if(wo!=0){ + free((void*)wo->path); + free(wo); + } +} + static completion_list_t* create_completion_entry(int xid, int completion_type, - const void *dc, const void *data) + const void *dc, const void *data,watcher_registration_t* wo) { completion_list_t *c = calloc(1,sizeof(completion_list_t)); if (!c) { @@ -1499,15 +1541,24 @@ break; } c->xid = xid; - c->next = 0; + c->watcher = wo; return c; } +static void destroy_completion_entry(completion_list_t* c){ + if(c!=0){ + if(c->buffer!=0) + free_buffer(c->buffer); + destroy_watcher_registration(c->watcher); + free(c); + } +} + static void queue_completion(completion_head_t *list, completion_list_t *c, int add_to_front) { - c->next = 0; + c->next = 0; /* appending a new entry to the back of the list */ lock_completion_list(list); if (list->last) { @@ -1530,10 +1581,11 @@ } static int add_completion(zhandle_t *zh, int xid, int completion_type, - const void *dc, const void *data, int add_to_front) + const void *dc, const void *data, int add_to_front, + watcher_registration_t* wo) { completion_list_t *c =create_completion_entry(xid, completion_type, dc, - data); + data,wo); if (!c) return ZSYSTEMERROR; queue_completion(&zh->sent_requests, c, add_to_front); @@ -1544,39 +1596,39 @@ } static int add_data_completion(zhandle_t *zh, int xid, data_completion_t dc, - const void *data) + const void *data,watcher_registration_t* wo) { - return add_completion(zh, xid, COMPLETION_DATA, dc, data, 0); + return add_completion(zh, xid, COMPLETION_DATA, dc, data, 0,wo); } static int add_stat_completion(zhandle_t *zh, int xid, stat_completion_t dc, - const void *data) + const void *data,watcher_registration_t* wo) { - return add_completion(zh, xid, COMPLETION_STAT, dc, data, 0); + return add_completion(zh, xid, COMPLETION_STAT, dc, data, 0,wo); } static int add_strings_completion(zhandle_t *zh, int xid, - strings_completion_t dc, const void *data) + strings_completion_t dc, const void *data,watcher_registration_t* wo) { - return add_completion(zh, xid, COMPLETION_STRINGLIST, dc, data, 0); + return add_completion(zh, xid, COMPLETION_STRINGLIST, dc, data, 0,wo); } static int add_acl_completion(zhandle_t *zh, int xid, acl_completion_t dc, const void *data) { - return add_completion(zh, xid, COMPLETION_ACLLIST, dc, data, 0); + return add_completion(zh, xid, COMPLETION_ACLLIST, dc, data, 0,0); } static int add_void_completion(zhandle_t *zh, int xid, void_completion_t dc, const void *data) { - return add_completion(zh, xid, COMPLETION_VOID, dc, data, 0); + return add_completion(zh, xid, COMPLETION_VOID, dc, data, 0,0); } static int add_string_completion(zhandle_t *zh, int xid, string_completion_t dc, const void *data) { - return add_completion(zh, xid, COMPLETION_STRING, dc, data, 0); + return add_completion(zh, xid, COMPLETION_STRING, dc, data, 0,0); } int zookeeper_close(zhandle_t *zh) @@ -1625,9 +1677,16 @@ int zoo_aget(zhandle_t *zh, const char *path, int watch, data_completion_t dc, const void *data) { + return zoo_awget(zh,path,watch?zh->watcher:0,zh->context,dc,data); +} + +int zoo_awget(zhandle_t *zh, const char *path, + watcher_fn watcher, void* watcherCtx, + data_completion_t dc, const void *data) +{ struct oarchive *oa; struct RequestHeader h = { .xid = get_xid(), .type = GETDATA_OP}; - struct GetDataRequest req = { (char*)path, watch }; + struct GetDataRequest req = { (char*)path, watcher!=0 }; int rc; if (zh==0 || path==0) @@ -1638,7 +1697,9 @@ rc = serialize_RequestHeader(oa, "header", &h); rc = rc < 0 ? rc : serialize_GetDataRequest(oa, "req", &req); enter_critical(zh); - rc = rc < 0 ? rc : add_data_completion(zh, h.xid, dc, data); + rc = rc < 0 ? rc : add_data_completion(zh, h.xid, dc, data, + create_watcher_registration(path,0,watcher,watcherCtx, + zh->active_node_watchers)); rc = rc < 0 ? rc : queue_buffer_bytes(&zh->to_send, get_buffer(oa), get_buffer_len(oa)); leave_critical(zh); @@ -1673,7 +1734,7 @@ rc = serialize_RequestHeader(oa, "header", &h); rc = rc < 0 ? rc : serialize_SetDataRequest(oa, "req", &req); enter_critical(zh); - rc = rc < 0 ? rc : add_stat_completion(zh, h.xid, dc, data); + rc = rc < 0 ? rc : add_stat_completion(zh, h.xid, dc, data,0); rc = rc < 0 ? rc : queue_buffer_bytes(&zh->to_send, get_buffer(oa), get_buffer_len(oa)); leave_critical(zh); @@ -1761,11 +1822,18 @@ } int zoo_aexists(zhandle_t *zh, const char *path, int watch, + stat_completion_t sc, const void *data) +{ + return zoo_awexists(zh,path,watch?zh->watcher:0,zh->context,sc,data); +} + +int zoo_awexists(zhandle_t *zh, const char *path, + watcher_fn watcher, void* watcherCtx, stat_completion_t completion, const void *data) { struct oarchive *oa; struct RequestHeader h = { .xid = get_xid(), .type = EXISTS_OP }; - struct ExistsRequest req; + struct ExistsRequest req = {(char*)path, watcher!=0 }; int rc; if (zh==0 || path==0) @@ -1773,12 +1841,12 @@ if (is_unrecoverable(zh)) return ZINVALIDSTATE; oa = create_buffer_oarchive(); - req.path = (char*)path; - req.watch = watch; rc = serialize_RequestHeader(oa, "header", &h); rc = rc < 0 ? rc : serialize_ExistsRequest(oa, "req", &req); enter_critical(zh); - rc = rc < 0 ? rc : add_stat_completion(zh, h.xid, completion, data); + rc = rc < 0 ? rc : add_stat_completion(zh, h.xid, completion, data, + create_watcher_registration(path,exists_result_checker, + watcher,watcherCtx,zh->active_node_watchers)); rc = rc < 0 ? rc : queue_buffer_bytes(&zh->to_send, get_buffer(oa), get_buffer_len(oa)); leave_critical(zh); @@ -1793,11 +1861,18 @@ } int zoo_aget_children(zhandle_t *zh, const char *path, int watch, - strings_completion_t completion, const void *data) + strings_completion_t dc, const void *data) +{ + return zoo_awget_children(zh,path,watch?zh->watcher:0,zh->context,dc,data); +} + +int zoo_awget_children(zhandle_t *zh, const char *path, + watcher_fn watcher, void* watcherCtx, + strings_completion_t dc, const void *data) { struct oarchive *oa; struct RequestHeader h = { .xid = get_xid(), .type = GETCHILDREN_OP}; - struct GetChildrenRequest req; + struct GetChildrenRequest req={(char*)path, watcher!=0 }; int rc; if (zh==0 || path==0) @@ -1805,12 +1880,12 @@ if (is_unrecoverable(zh)) return ZINVALIDSTATE; oa = create_buffer_oarchive(); - req.path = (char*)path; - req.watch = watch; rc = serialize_RequestHeader(oa, "header", &h); rc = rc < 0 ? rc : serialize_GetChildrenRequest(oa, "req", &req); enter_critical(zh); - rc = rc < 0 ? rc : add_strings_completion(zh, h.xid, completion, data); + rc = rc < 0 ? rc : add_strings_completion(zh, h.xid, dc, data, + create_watcher_registration(path,0,watcher,watcherCtx, + zh->active_child_watchers)); rc = rc < 0 ? rc : queue_buffer_bytes(&zh->to_send, get_buffer(oa), get_buffer_len(oa)); leave_critical(zh); @@ -2131,12 +2206,18 @@ int zoo_exists(zhandle_t *zh, const char *path, int watch, struct Stat *stat) { + return zoo_wexists(zh,path,watch?zh->watcher:0,zh->context,stat); +} + +int zoo_wexists(zhandle_t *zh, const char *path, + watcher_fn watcher, void* watcherCtx, struct Stat *stat) +{ struct sync_completion *sc = alloc_sync_completion(); int rc; if (!sc) { return ZSYSTEMERROR; } - rc=zoo_aexists(zh, path, watch, SYNCHRONOUS_MARKER, sc); + rc=zoo_awexists(zh,path,watcher,watcherCtx,SYNCHRONOUS_MARKER, sc); if(rc==ZOK){ wait_sync_completion(sc); rc = sc->rc; @@ -2145,12 +2226,20 @@ } } free_sync_completion(sc); - return rc; + return rc; } int zoo_get(zhandle_t *zh, const char *path, int watch, char *buffer, int* buffer_len, struct Stat *stat) { + return zoo_wget(zh,path,watch?zh->watcher:0,zh->context, + buffer,buffer_len,stat); +} + +int zoo_wget(zhandle_t *zh, const char *path, + watcher_fn watcher, void* watcherCtx, + char *buffer, int* buffer_len, struct Stat *stat) +{ struct sync_completion *sc; int rc=0; @@ -2161,7 +2250,7 @@ sc->u.data.buffer = buffer; sc->u.data.buff_len = *buffer_len; - rc=zoo_aget(zh, path, watch, SYNCHRONOUS_MARKER, sc); + rc=zoo_awget(zh, path, watcher, watcherCtx, SYNCHRONOUS_MARKER, sc); if(rc==ZOK){ wait_sync_completion(sc); rc = sc->rc; @@ -2195,12 +2284,19 @@ int zoo_get_children(zhandle_t *zh, const char *path, int watch, struct String_vector *strings) { + return zoo_wget_children(zh,path,watch?zh->watcher:0,zh->context,strings); +} + +int zoo_wget_children(zhandle_t *zh, const char *path, + watcher_fn watcher, void* watcherCtx, + struct String_vector *strings) +{ struct sync_completion *sc = alloc_sync_completion(); int rc; if (!sc) { return ZSYSTEMERROR; } - rc=zoo_aget_children(zh, path, watch, SYNCHRONOUS_MARKER, sc); + rc=zoo_awget_children(zh, path, watcher, watcherCtx, SYNCHRONOUS_MARKER, sc); if(rc==ZOK){ wait_sync_completion(sc); rc = sc->rc; Added: hadoop/zookeeper/trunk/src/c/tests/CollectionUtil.h URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/c/tests/CollectionUtil.h?rev=679557&view=auto ============================================================================== --- hadoop/zookeeper/trunk/src/c/tests/CollectionUtil.h (added) +++ hadoop/zookeeper/trunk/src/c/tests/CollectionUtil.h Thu Jul 24 14:46:30 2008 @@ -0,0 +1,195 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef _COLLECTION_UTIL_H_ +#define _COLLECTION_UTIL_H_ + +/** + * \file + * CollectionBuilder and DictionaryBuilder classes and collection utility functions + */ + +namespace Util +{ + +// ********************************************************* +/** A shortcut to use for building collections. + * This class is a wrapper around standard STL collection containers such as vector. + * It allows one to conveniently build collections at the variable initialization time: + * \code + * #include "CollectionUtil.h" + * #include "Vector.h" // for ostream << operator overload for STL vector + * using Util; + * + * int main() + * { + * typedef vector MyVector; + * MyVector myVector=CollectionBuilder()("str1")("str2")("str3"); + * cout< +class CollectionBuilder +{ +public: + /// Type of the collection container. + typedef CONT CollectionType; + /// Container's value type. + typedef typename CollectionType::value_type value_type; + /// Container's constant iterator type. + typedef typename CollectionType::const_iterator const_iterator; + /// Container's size type. + typedef typename CollectionType::size_type size_type; + + /** Operator function call overload to allow call chaining. + * \param value the value to be inserted into the container + */ + CollectionBuilder& operator()(const value_type& value){ + return push_back(value); + } + /** Same as regular STL push_back() but allows call chaining. + * \param value the value to be inserted into the container + */ + CollectionBuilder& push_back(const value_type& value){ + collection_.push_back(value); + return *this; + } + /// \name Standard STL container interface + /// @{ + const_iterator begin() const{return collection_.begin();} + const_iterator end() const{return collection_.end();} + size_type size() const{return collection_.size();} + void clear() {collection_.clear();} + ///@} + /// Explicit typecast operator. + operator const CollectionType&() const {return collection_;} +private: + /// \cond PRIVATE + CollectionType collection_; + /// \endcond +}; + + +// ********************************************************* +/** A shortcut to use for building dictionaries. + * This class is a wrapper around standard STL associative containers such as map. + * It allows one to conveniently build dictionaries at the variable initialization time: + * \code + * #include "CollectionUtil.h" + * #include "Map.h" // for ostream << operator overload for STL map + * using Util; + * + * int main() + * { + * typedef map MyMap; + * MyMap myMap=DictionaryBuilder()("str1",1)("str2",2)("str3",3); + * cout< +class DictionaryBuilder +{ +public: + /// The type of the associative container + typedef CONT DictionaryType; + /// Container's element type (usually a pair) + typedef typename DictionaryType::value_type value_type; + /// Container's key type + typedef typename DictionaryType::key_type key_type; + /// Container's value type + typedef typename DictionaryType::mapped_type mapped_type; + /// Container's constant iterator type + typedef typename DictionaryType::const_iterator const_iterator; + /// Container's writable iterator type + typedef typename DictionaryType::iterator iterator; + /// Container's size type + typedef typename DictionaryType::size_type size_type; + + /** Operator function call overload to allow call chaining. + * \param key the value key to be inserted + * \param value the value to be inserted into the container + * \return a non-const reference to self + */ + DictionaryBuilder& operator()(const key_type& key,const mapped_type& value){ + dict_.insert(value_type(key,value)); + return *this; + } + /** Lookup value by key. + * \param key the key associated with the value. + * \return a non-const iterator pointing to the element whose key matched the \a key parameter + */ + iterator find(const key_type& key){ + return dict_.find(key); + } + /** Lookup value by key. + * \param key the key associated with the value. + * \return a const iterator pointing to the element whose key matched the \a key parameter + */ + const_iterator find(const key_type& key) const{ + return dict_.find(key); + } + + /// \name Standard STL container interface + /// @{ + const_iterator begin() const{return dict_.begin();} + const_iterator end() const{return dict_.end();} + size_type size() const{return dict_.size();} + void clear() {dict_.clear();} + ///@} + /// Explicit typecast operator. + operator const DictionaryType&() const {return dict_;} +private: + DictionaryType dict_; +}; + + +// *********************************************************** +/** Deletes all dynamically allocated elements of a collection. + * C::value_type is expected to be a pointer to a dynamically allocated object, or it won't compile. + * The function will iterate over all container elements and call delete for each of them. + * \param c a collection (vector,set) whose elements are being deleted. + */ +template +void clearCollection(C& c){ + for(typename C::const_iterator it=c.begin();it!=c.end();++it) + delete *it; + c.clear(); +} + +/** Deletes all dynamically allocated values of the assotiative container. + * The function expects the M::value_type to be a pair<..., ptr_to_type>, or it won't compile. + * It first deletes the objects pointed to by ptr_to_type + * and then clears (calls m.clear()) the container. + * \param m an associative container (map,hash_map) whose elements are being deleted. + */ +template +void clearMap(M& m){ + for(typename M::const_iterator it=m.begin();it!=m.end();++it) + delete it->second; + m.clear(); +} + +} // namespace Util + + +#endif // _COLLECTION_UTIL_H_ Propchange: hadoop/zookeeper/trunk/src/c/tests/CollectionUtil.h ------------------------------------------------------------------------------ svn:eol-style = native Added: hadoop/zookeeper/trunk/src/c/tests/TestHashtable.cc URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/c/tests/TestHashtable.cc?rev=679557&view=auto ============================================================================== --- hadoop/zookeeper/trunk/src/c/tests/TestHashtable.cc (added) +++ hadoop/zookeeper/trunk/src/c/tests/TestHashtable.cc Thu Jul 24 14:46:30 2008 @@ -0,0 +1,268 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include "CppAssertHelper.h" + + +#include "CollectionUtil.h" +using namespace Util; + +#include "Vector.h" +using namespace std; + +#include "src/zk_hashtable.h" + +class Zookeeper_hashtable : public CPPUNIT_NS::TestFixture +{ + CPPUNIT_TEST_SUITE(Zookeeper_hashtable); + CPPUNIT_TEST(testInsertElement1); + CPPUNIT_TEST(testInsertElement2); + CPPUNIT_TEST(testInsertElement3); + CPPUNIT_TEST(testContainsWatcher1); + CPPUNIT_TEST(testContainsWatcher2); + CPPUNIT_TEST(testCombineHashtable1); + CPPUNIT_TEST(testMoveMergeWatchers1); + CPPUNIT_TEST(testDeliverSessionEvent1); + CPPUNIT_TEST(testDeliverZnodeEvent1); + CPPUNIT_TEST_SUITE_END(); + + static void watcher(zhandle_t *, int, int, const char *,void*){} + zk_hashtable *ht; + +public: + + void setUp() + { + ht=create_zk_hashtable(); + } + + void tearDown() + { + destroy_zk_hashtable(ht); + } + + static vector getWatcherCtxAsVector(zk_hashtable* ht,const char* path){ + watcher_object_t* wo=getFirstWatcher(ht,path); + vector res; + while(wo!=0){ + res.push_back((int)wo->context); + wo=wo->next; + } + return res; + } + + // insert 2 watchers for different paths + // verify that hashtable size is 2 + void testInsertElement1() + { + CPPUNIT_ASSERT_EQUAL(0,get_element_count(ht)); + int res=insert_watcher_object(ht,"path1", + create_watcher_object(watcher,(void*)1)); + CPPUNIT_ASSERT_EQUAL(1,res); + res=insert_watcher_object(ht,"path2", + create_watcher_object(watcher,(void*)1)); + CPPUNIT_ASSERT_EQUAL(1,res); + CPPUNIT_ASSERT_EQUAL(2,get_element_count(ht)); + vector expWatchers=CollectionBuilder >().push_back(1); + CPPUNIT_ASSERT_EQUAL(expWatchers,getWatcherCtxAsVector(ht,"path1")); + CPPUNIT_ASSERT_EQUAL(expWatchers,getWatcherCtxAsVector(ht,"path2")); + clean_zk_hashtable(ht); + CPPUNIT_ASSERT_EQUAL(0,get_element_count(ht)); + } + + // insert 2 different watchers for the same path; + // verify: hashtable element count is 1, and the watcher count for the path + // is 2 + void testInsertElement2() + { + int res=insert_watcher_object(ht,"path1",create_watcher_object(watcher,(void*)1)); + CPPUNIT_ASSERT_EQUAL(1,res); + res=insert_watcher_object(ht,"path1",create_watcher_object(watcher,(void*)2)); + CPPUNIT_ASSERT_EQUAL(1,res); + CPPUNIT_ASSERT_EQUAL(1,get_element_count(ht)); + CPPUNIT_ASSERT_EQUAL(2,get_watcher_count(ht,"path1")); + vector expWatchers=CollectionBuilder >(). + push_back(2).push_back(1); + CPPUNIT_ASSERT_EQUAL(expWatchers,getWatcherCtxAsVector(ht,"path1")); + } + + // insert 2 identical watchers for the same path; + // verify: hashtable element count is 1, the watcher count for the path is 1 + void testInsertElement3() + { + watcher_object_t wobject; + wobject.watcher=watcher; + wobject.context=(void*)1; + + int res=insert_watcher_object(ht,"path1",clone_watcher_object(&wobject)); + CPPUNIT_ASSERT_EQUAL(1,res); + watcher_object_t* wo=clone_watcher_object(&wobject); + res=insert_watcher_object(ht,"path1",wo); + CPPUNIT_ASSERT_EQUAL(0,res); + CPPUNIT_ASSERT_EQUAL(1,get_element_count(ht)); + CPPUNIT_ASSERT_EQUAL(1,get_watcher_count(ht,"path1")); + vector expWatchers=CollectionBuilder >().push_back(1); + CPPUNIT_ASSERT_EQUAL(expWatchers,getWatcherCtxAsVector(ht,"path1")); + // must delete the object that wasn't inserted! + free(wo); + } + + // verify: the watcher is found in the table + void testContainsWatcher1() + { + watcher_object_t expected; + expected.watcher=watcher; + expected.context=(void*)1; + + insert_watcher_object(ht,"path1",create_watcher_object(watcher,(void*)2)); + insert_watcher_object(ht,"path1",create_watcher_object(watcher,(void*)3)); + insert_watcher_object(ht,"path2",create_watcher_object(watcher,(void*)4)); + insert_watcher_object(ht,"path2",clone_watcher_object(&expected)); + + CPPUNIT_ASSERT_EQUAL(2,get_element_count(ht)); + CPPUNIT_ASSERT_EQUAL(2,get_watcher_count(ht,"path1")); + CPPUNIT_ASSERT_EQUAL(2,get_watcher_count(ht,"path2")); + + int res=contains_watcher(ht,&expected); + CPPUNIT_ASSERT(res==1); + } + + // verify: the watcher is not found + void testContainsWatcher2() + { + watcher_object_t expected; + expected.watcher=watcher; + expected.context=(void*)1; + + insert_watcher_object(ht,"path1",create_watcher_object(watcher,(void*)2)); + insert_watcher_object(ht,"path1",create_watcher_object(watcher,(void*)3)); + insert_watcher_object(ht,"path2",create_watcher_object(watcher,(void*)4)); + insert_watcher_object(ht,"path2",create_watcher_object(watcher,(void*)5)); + + CPPUNIT_ASSERT_EQUAL(2,get_element_count(ht)); + CPPUNIT_ASSERT_EQUAL(2,get_watcher_count(ht,"path1")); + CPPUNIT_ASSERT_EQUAL(2,get_watcher_count(ht,"path2")); + + int res=contains_watcher(ht,&expected); + CPPUNIT_ASSERT(res==0); + } + + void testCombineHashtable1() + { + insert_watcher_object(ht,"path1",create_watcher_object(watcher,(void*)2)); + insert_watcher_object(ht,"path1",create_watcher_object(watcher,(void*)3)); + insert_watcher_object(ht,"path2",create_watcher_object(watcher,(void*)4)); + insert_watcher_object(ht,"path2",create_watcher_object(watcher,(void*)5)); + + zk_hashtable* ht2=create_zk_hashtable(); + + insert_watcher_object(ht2,"path1",create_watcher_object(watcher,(void*)2)); + insert_watcher_object(ht2,"path2",create_watcher_object(watcher,(void*)6)); + insert_watcher_object(ht2,"path3",create_watcher_object(watcher,(void*)2)); + + zk_hashtable* res=combine_hashtables(ht,ht2); + + CPPUNIT_ASSERT_EQUAL(3,get_element_count(res)); + // path1 --> 2,3 + CPPUNIT_ASSERT_EQUAL(2,get_watcher_count(res,"path1")); + vector expWatchers1=CollectionBuilder >(). + push_back(2).push_back(3); + CPPUNIT_ASSERT_EQUAL(expWatchers1,getWatcherCtxAsVector(res,"path1")); + // path2 --> 4,5,6 + CPPUNIT_ASSERT_EQUAL(3,get_watcher_count(res,"path2")); + vector expWatchers2=CollectionBuilder >(). + push_back(6).push_back(4).push_back(5); + CPPUNIT_ASSERT_EQUAL(expWatchers2,getWatcherCtxAsVector(res,"path2")); + // path3 --> 2 + CPPUNIT_ASSERT_EQUAL(1,get_watcher_count(res,"path3")); + vector expWatchers3=CollectionBuilder >().push_back(2); + CPPUNIT_ASSERT_EQUAL(expWatchers3,getWatcherCtxAsVector(res,"path3")); + + destroy_zk_hashtable(ht2); + destroy_zk_hashtable(res); + } + + void testMoveMergeWatchers1() + { + insert_watcher_object(ht,"path1",create_watcher_object(watcher,(void*)2)); + insert_watcher_object(ht,"path1",create_watcher_object(watcher,(void*)3)); + insert_watcher_object(ht,"path2",create_watcher_object(watcher,(void*)4)); + insert_watcher_object(ht,"path2",create_watcher_object(watcher,(void*)5)); + + zk_hashtable* ht2=create_zk_hashtable(); + + insert_watcher_object(ht2,"path1",create_watcher_object(watcher,(void*)2)); + insert_watcher_object(ht2,"path1",create_watcher_object(watcher,(void*)6)); + + zk_hashtable* res=move_merge_watchers(ht,ht2,"path1"); + + CPPUNIT_ASSERT_EQUAL(1,get_element_count(res)); + CPPUNIT_ASSERT_EQUAL(3,get_watcher_count(res,"path1")); + vector expWatchers=CollectionBuilder >(). + push_back(6).push_back(2).push_back(3); + CPPUNIT_ASSERT_EQUAL(expWatchers,getWatcherCtxAsVector(res,"path1")); + + // make sure the path entry has been deleted from the source hashtables + CPPUNIT_ASSERT_EQUAL(1,get_element_count(ht)); + CPPUNIT_ASSERT_EQUAL(2,get_watcher_count(ht,"path2")); + CPPUNIT_ASSERT_EQUAL(0,get_element_count(ht2)); + + destroy_zk_hashtable(ht2); + destroy_zk_hashtable(res); + } + + static void iterWatcher(zhandle_t *zh, int type, int state, + const char* path,void* ctx){ + vector* res=reinterpret_cast*>(zh); + res->push_back((int)ctx); + } + + void testDeliverSessionEvent1(){ + insert_watcher_object(ht,"path1",create_watcher_object(iterWatcher,(void*)2)); + insert_watcher_object(ht,"path2",create_watcher_object(iterWatcher,(void*)3)); + insert_watcher_object(ht,"path2",create_watcher_object(iterWatcher,(void*)4)); + insert_watcher_object(ht,"path3",create_watcher_object(iterWatcher,(void*)5)); + + vector res; + deliver_session_event(ht,(zhandle_t*)&res,10,20); + vector expWatchers=CollectionBuilder >(). + push_back(4).push_back(3).push_back(5).push_back(2); + CPPUNIT_ASSERT_EQUAL(expWatchers,res); + } + + void testDeliverZnodeEvent1(){ + insert_watcher_object(ht,"path2",create_watcher_object(iterWatcher,(void*)3)); + insert_watcher_object(ht,"path2",create_watcher_object(iterWatcher,(void*)4)); + + vector res; + deliver_znode_event(ht,(zhandle_t*)&res,"path2",10,20); + vector expWatchers=CollectionBuilder >(). + push_back(4).push_back(3); + CPPUNIT_ASSERT_EQUAL(expWatchers,res); + expWatchers.clear(); + res.clear(); + // non-existent path + deliver_znode_event(ht,(zhandle_t*)&res,"path100",10,20); + CPPUNIT_ASSERT_EQUAL(expWatchers,res); + // make sure the path entry has been deleted from the source hashtable + CPPUNIT_ASSERT_EQUAL(0,get_element_count(ht)); + } +}; + +CPPUNIT_TEST_SUITE_REGISTRATION(Zookeeper_hashtable); Propchange: hadoop/zookeeper/trunk/src/c/tests/TestHashtable.cc ------------------------------------------------------------------------------ svn:eol-style = native Modified: hadoop/zookeeper/trunk/src/c/tests/TestOperations.cc URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/c/tests/TestOperations.cc?rev=679557&r1=679556&r2=679557&view=diff ============================================================================== --- hadoop/zookeeper/trunk/src/c/tests/TestOperations.cc (original) +++ hadoop/zookeeper/trunk/src/c/tests/TestOperations.cc Thu Jul 24 14:46:30 2008 @@ -41,7 +41,7 @@ CPPUNIT_TEST_SUITE_END(); zhandle_t *zh; - static void watcher(zhandle_t *, int, int, const char *){} + static void watcher(zhandle_t *, int, int, const char *,void*){} public: void setUp() { @@ -256,7 +256,7 @@ zkServer.addRecvResponse(new PingResponse); rc=zookeeper_interest(zh,&fd,&interest,&tv); CPPUNIT_ASSERT_EQUAL(ZOK,rc); - // sleep for a short while (10 ms) + // pseudo-sleep for a short while (10 ms) timeMock.millitick(10); rc=zookeeper_process(zh,interest); CPPUNIT_ASSERT_EQUAL(ZOK,rc); @@ -575,12 +575,26 @@ changed_=true; if(path!=0) path_=path; } + // this predicate checks if CHANGE_EVENT event type was triggered, unlike + // the isWatcherTriggered() that returns true whenever a watcher is triggered + // regardless of the event type SyncedBoolCondition isNodeChangedTriggered() const{ return SyncedBoolCondition(changed_,mx_); } bool changed_; string path_; }; + + class AsyncWatcherCompletion: public AsyncCompletion{ + public: + AsyncWatcherCompletion(ZookeeperServer& zkServer):zkServer_(zkServer){} + virtual void statCompl(int rc, const Stat *stat){ + // we received a server response, now enqueue a watcher event + // to trigger the watcher + zkServer_.addRecvResponse(new ZNodeEvent(CHANGED_EVENT,"/x/y/z")); + } + ZookeeperServer& zkServer_; + }; // verify that async watcher is called for znode events (CREATED, DELETED etc.) void testAsyncWatcher1(){ Mock_gettimeofday timeMock; @@ -596,9 +610,14 @@ CPPUNIT_ASSERT(zh!=0); // make sure the client has connected CPPUNIT_ASSERT(ensureCondition(ClientConnected(zh),1000)<1000); - - // trigger the watcher - zkServer.addRecvResponse(new ZNodeEvent(CHANGED_EVENT,"/x/y/z")); + + // set the watcher + AsyncWatcherCompletion completion(zkServer); + // prepare a response for the zoo_aexists() request + zkServer.addOperationResponse(new ZooStatResponse); + int rc=zoo_aexists(zh,"/x/y/z",1,asyncCompletion,&completion); + CPPUNIT_ASSERT_EQUAL(ZOK,rc); + CPPUNIT_ASSERT(ensureCondition(action.isNodeChangedTriggered(),1000)<1000); CPPUNIT_ASSERT_EQUAL(string("/x/y/z"),action.path_); } Added: hadoop/zookeeper/trunk/src/c/tests/TestWatchers.cc URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/c/tests/TestWatchers.cc?rev=679557&view=auto ============================================================================== --- hadoop/zookeeper/trunk/src/c/tests/TestWatchers.cc (added) +++ hadoop/zookeeper/trunk/src/c/tests/TestWatchers.cc Thu Jul 24 14:46:30 2008 @@ -0,0 +1,745 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include "CppAssertHelper.h" + +#include "ZKMocks.h" +#include "CollectionUtil.h" + +class Zookeeper_watchers : public CPPUNIT_NS::TestFixture +{ + CPPUNIT_TEST_SUITE(Zookeeper_watchers); + CPPUNIT_TEST(testDefaultSessionWatcher1); + CPPUNIT_TEST(testDefaultSessionWatcher2); + CPPUNIT_TEST(testObjectSessionWatcher1); + CPPUNIT_TEST(testObjectSessionWatcher2); + CPPUNIT_TEST(testNodeWatcher1); + CPPUNIT_TEST(testChildWatcher1); + CPPUNIT_TEST(testChildWatcher2); + CPPUNIT_TEST_SUITE_END(); + + static void watcher(zhandle_t *, int, int, const char *,void*){} + zhandle_t *zh; + +public: + + void setUp() + { + zoo_set_debug_level((ZooLogLevel)0); // disable logging + zoo_deterministic_conn_order(0); + zh=0; + } + + void tearDown() + { + zookeeper_close(zh); + } + + class ConnectionWatcher: public WatcherAction{ + public: + ConnectionWatcher():connected_(false),counter_(0){} + virtual void onConnectionEstablished(zhandle_t*){ + synchronized(mx_); + counter_++; + connected_=true; + } + SyncedBoolCondition isConnectionEstablished() const{ + return SyncedBoolCondition(connected_,mx_); + } + bool connected_; + int counter_; + }; + + class DisconnectWatcher: public WatcherAction{ + public: + DisconnectWatcher():disconnected_(false),counter_(0){} + virtual void onConnectionLost(zhandle_t*){ + synchronized(mx_); + counter_++; + disconnected_=true; + } + SyncedBoolCondition isDisconnected() const{ + return SyncedBoolCondition(disconnected_,mx_); + } + bool disconnected_; + int counter_; + }; + + class CountingDataWatcher: public WatcherAction{ + public: + CountingDataWatcher():disconnected_(false),counter_(0){} + virtual void onNodeValueChanged(zhandle_t*,const char* path){ + synchronized(mx_); + counter_++; + } + virtual void onConnectionLost(zhandle_t*){ + synchronized(mx_); + counter_++; + disconnected_=true; + } + bool disconnected_; + int counter_; + }; + + class DeletionCountingDataWatcher: public WatcherAction{ + public: + DeletionCountingDataWatcher():counter_(0){} + virtual void onNodeDeleted(zhandle_t*,const char* path){ + synchronized(mx_); + counter_++; + } + int counter_; + }; + + class ChildEventCountingWatcher: public WatcherAction{ + public: + ChildEventCountingWatcher():counter_(0){} + virtual void onChildChanged(zhandle_t*,const char* path){ + synchronized(mx_); + counter_++; + } + int counter_; + }; + +#ifndef THREADED + + // verify: the default watcher is called once for a session event + void testDefaultSessionWatcher1(){ + Mock_gettimeofday timeMock; + ZookeeperServer zkServer; + // must call zookeeper_close() while all the mocks are in scope + CloseFinally guard(&zh); + + ConnectionWatcher watcher; + zh=zookeeper_init("localhost:2121",activeWatcher,10000,TEST_CLIENT_ID, + &watcher,0); + CPPUNIT_ASSERT(zh!=0); + + int fd=0; + int interest=0; + timeval tv; + // open the socket + int rc=zookeeper_interest(zh,&fd,&interest,&tv); + CPPUNIT_ASSERT_EQUAL(ZOK,rc); + CPPUNIT_ASSERT_EQUAL(CONNECTING_STATE,zoo_state(zh)); + // send the handshake packet to the server + rc=zookeeper_process(zh,interest); + CPPUNIT_ASSERT_EQUAL(ZOK,rc); + CPPUNIT_ASSERT_EQUAL(ASSOCIATING_STATE,zoo_state(zh)); + // receive the server handshake response + rc=zookeeper_process(zh,interest); + CPPUNIT_ASSERT_EQUAL(ZOK,rc); + // verify connected + CPPUNIT_ASSERT_EQUAL(CONNECTED_STATE,zoo_state(zh)); + CPPUNIT_ASSERT(watcher.connected_); + CPPUNIT_ASSERT_EQUAL(1,watcher.counter_); + } + + // test case: connect to server, set a default watcher, disconnect from the server + // verify: the default watcher is called once + void testDefaultSessionWatcher2(){ + Mock_gettimeofday timeMock; + ZookeeperServer zkServer; + // must call zookeeper_close() while all the mocks are in scope + CloseFinally guard(&zh); + + DisconnectWatcher watcher; + zh=zookeeper_init("localhost:2121",activeWatcher,10000,TEST_CLIENT_ID, + &watcher,0); + CPPUNIT_ASSERT(zh!=0); + // simulate connected state + forceConnected(zh); + + // first operation + AsyncCompletion ignored; + zkServer.addOperationResponse(new ZooGetResponse("1",1)); + int rc=zoo_aget(zh,"/x/y/1",0,asyncCompletion,&ignored); + CPPUNIT_ASSERT_EQUAL(ZOK,rc); + // this will process the response and activate the watcher + rc=zookeeper_process(zh,ZOOKEEPER_READ); + CPPUNIT_ASSERT_EQUAL(ZOK,rc); + + // now, disconnect + zkServer.setConnectionLost(); + rc=zookeeper_process(zh,ZOOKEEPER_READ); + CPPUNIT_ASSERT_EQUAL(ZCONNECTIONLOSS,rc); + // verify disconnected + CPPUNIT_ASSERT(watcher.disconnected_); + CPPUNIT_ASSERT_EQUAL(1,watcher.counter_); + } + + // testcase: connect to the server, set a watcher object on a node, + // disconnect from the server + // verify: the watcher object as well as the default watcher are called + void testObjectSessionWatcher1(){ + Mock_gettimeofday timeMock; + ZookeeperServer zkServer; + // must call zookeeper_close() while all the mocks are in scope + CloseFinally guard(&zh); + + DisconnectWatcher defWatcher; + zh=zookeeper_init("localhost:2121",activeWatcher,10000,TEST_CLIENT_ID, + &defWatcher,0); + CPPUNIT_ASSERT(zh!=0); + // simulate connected state + forceConnected(zh); + + AsyncCompletion ignored; + CountingDataWatcher wobject; + zkServer.addOperationResponse(new ZooStatResponse); + int rc=zoo_awexists(zh,"/x/y/1",activeWatcher,&wobject, + asyncCompletion,&ignored); + CPPUNIT_ASSERT_EQUAL(ZOK,rc); + // this will process the response and activate the watcher + rc=zookeeper_process(zh,ZOOKEEPER_READ); + CPPUNIT_ASSERT_EQUAL(ZOK,rc); + + // now, disconnect + zkServer.setConnectionLost(); + rc=zookeeper_process(zh,ZOOKEEPER_READ); + CPPUNIT_ASSERT_EQUAL(ZCONNECTIONLOSS,rc); + + // verify the default watcher has been triggered + CPPUNIT_ASSERT(defWatcher.disconnected_); + // and triggered only once + CPPUNIT_ASSERT_EQUAL(1,defWatcher.counter_); + + // the path-specific watcher has been triggered as well + CPPUNIT_ASSERT(wobject.disconnected_); + // only once! + CPPUNIT_ASSERT_EQUAL(1,wobject.counter_); + } + + // testcase: connect to the server, set a watcher object on a node, + // set a def watcher on another node,disconnect from the server + // verify: the watcher object as well as the default watcher are called + void testObjectSessionWatcher2(){ + Mock_gettimeofday timeMock; + ZookeeperServer zkServer; + // must call zookeeper_close() while all the mocks are in scope + CloseFinally guard(&zh); + + DisconnectWatcher defWatcher; + zh=zookeeper_init("localhost:2121",activeWatcher,10000,TEST_CLIENT_ID, + &defWatcher,0); + CPPUNIT_ASSERT(zh!=0); + // simulate connected state + forceConnected(zh); + + // set the default watcher + AsyncCompletion ignored; + zkServer.addOperationResponse(new ZooStatResponse); + int rc=zoo_aexists(zh,"/a/b/c",1,asyncCompletion,&ignored); + CPPUNIT_ASSERT_EQUAL(ZOK,rc); + + CountingDataWatcher wobject; + zkServer.addOperationResponse(new ZooStatResponse); + rc=zoo_awexists(zh,"/x/y/z",activeWatcher,&wobject, + asyncCompletion,&ignored); + CPPUNIT_ASSERT_EQUAL(ZOK,rc); + + // this will process the response and activate the watcher + while((rc=zookeeper_process(zh,ZOOKEEPER_READ))==ZOK); + CPPUNIT_ASSERT_EQUAL(ZNOTHING,rc); + + // disconnect now + zkServer.setConnectionLost(); + rc=zookeeper_process(zh,ZOOKEEPER_READ); + CPPUNIT_ASSERT_EQUAL(ZCONNECTIONLOSS,rc); + + // verify the default watcher has been triggered + CPPUNIT_ASSERT(defWatcher.disconnected_); + // and triggered only once + CPPUNIT_ASSERT_EQUAL(1,defWatcher.counter_); + + // the path-specific watcher has been triggered as well + CPPUNIT_ASSERT(wobject.disconnected_); + // only once! + CPPUNIT_ASSERT_EQUAL(1,wobject.counter_); + } + + // testcase: register 2 node watches for different paths, trigger the watches + // verify: the data watchers are processed, the default watcher is not called + void testNodeWatcher1(){ + Mock_gettimeofday timeMock; + ZookeeperServer zkServer; + // must call zookeeper_close() while all the mocks are in scope + CloseFinally guard(&zh); + + DisconnectWatcher defWatcher; + zh=zookeeper_init("localhost:2121",activeWatcher,10000,TEST_CLIENT_ID, + &defWatcher,0); + CPPUNIT_ASSERT(zh!=0); + // simulate connected state + forceConnected(zh); + + AsyncCompletion ignored; + CountingDataWatcher wobject1; + zkServer.addOperationResponse(new ZooStatResponse); + int rc=zoo_awexists(zh,"/a/b/c",activeWatcher,&wobject1, + asyncCompletion,&ignored); + CPPUNIT_ASSERT_EQUAL(ZOK,rc); + + CountingDataWatcher wobject2; + zkServer.addOperationResponse(new ZooStatResponse); + rc=zoo_awexists(zh,"/x/y/z",activeWatcher,&wobject2, + asyncCompletion,&ignored); + CPPUNIT_ASSERT_EQUAL(ZOK,rc); + + // this will process the response and activate the watcher + while((rc=zookeeper_process(zh,ZOOKEEPER_READ))==ZOK); + CPPUNIT_ASSERT_EQUAL(ZNOTHING,rc); + + // we are all set now; let's trigger the watches + zkServer.addRecvResponse(new ZNodeEvent(CHANGED_EVENT,"/a/b/c")); + zkServer.addRecvResponse(new ZNodeEvent(CHANGED_EVENT,"/x/y/z")); + // make sure all watchers have been processed + while((rc=zookeeper_process(zh,ZOOKEEPER_READ))==ZOK); + CPPUNIT_ASSERT_EQUAL(ZNOTHING,rc); + + CPPUNIT_ASSERT_EQUAL(1,wobject1.counter_); + CPPUNIT_ASSERT_EQUAL(1,wobject2.counter_); + CPPUNIT_ASSERT_EQUAL(0,defWatcher.counter_); + } + + // testcase: set up both a children and a data watchers on the node /a, then + // delete the node by sending a DELETE_EVENT event + // verify: both watchers are triggered + void testChildWatcher1(){ + Mock_gettimeofday timeMock; + ZookeeperServer zkServer; + // must call zookeeper_close() while all the mocks are in scope + CloseFinally guard(&zh); + + DeletionCountingDataWatcher defWatcher; + zh=zookeeper_init("localhost:2121",activeWatcher,10000,TEST_CLIENT_ID, + &defWatcher,0); + CPPUNIT_ASSERT(zh!=0); + // simulate connected state + forceConnected(zh); + + AsyncCompletion ignored; + DeletionCountingDataWatcher wobject1; + zkServer.addOperationResponse(new ZooStatResponse); + int rc=zoo_awexists(zh,"/a",activeWatcher,&wobject1, + asyncCompletion,&ignored); + CPPUNIT_ASSERT_EQUAL(ZOK,rc); + + typedef ZooGetChildrenResponse::StringVector ZooVector; + zkServer.addOperationResponse(new ZooGetChildrenResponse( + Util::CollectionBuilder()("/a/1")("/a/2") + )); + DeletionCountingDataWatcher wobject2; + rc=zoo_awget_children(zh,"/a",activeWatcher, + &wobject2,asyncCompletion,&ignored); + CPPUNIT_ASSERT_EQUAL(ZOK,rc); + + // this will process the response and activate the watcher + while((rc=zookeeper_process(zh,ZOOKEEPER_READ))==ZOK); + CPPUNIT_ASSERT_EQUAL(ZNOTHING,rc); + + // we are all set now; let's trigger the watches + zkServer.addRecvResponse(new ZNodeEvent(DELETED_EVENT,"/a")); + // make sure the watchers have been processed + while((rc=zookeeper_process(zh,ZOOKEEPER_READ))==ZOK); + CPPUNIT_ASSERT_EQUAL(ZNOTHING,rc); + + CPPUNIT_ASSERT_EQUAL(1,wobject1.counter_); + CPPUNIT_ASSERT_EQUAL(1,wobject2.counter_); + CPPUNIT_ASSERT_EQUAL(0,defWatcher.counter_); + } + + // testcase: create both a child and data watch on the node /a, send a CHILD_EVENT + // verify: only the child watch triggered + void testChildWatcher2(){ + Mock_gettimeofday timeMock; + ZookeeperServer zkServer; + // must call zookeeper_close() while all the mocks are in scope + CloseFinally guard(&zh); + + ChildEventCountingWatcher defWatcher; + zh=zookeeper_init("localhost:2121",activeWatcher,10000,TEST_CLIENT_ID, + &defWatcher,0); + CPPUNIT_ASSERT(zh!=0); + // simulate connected state + forceConnected(zh); + + AsyncCompletion ignored; + ChildEventCountingWatcher wobject1; + zkServer.addOperationResponse(new ZooStatResponse); + int rc=zoo_awexists(zh,"/a",activeWatcher,&wobject1, + asyncCompletion,&ignored); + CPPUNIT_ASSERT_EQUAL(ZOK,rc); + + typedef ZooGetChildrenResponse::StringVector ZooVector; + zkServer.addOperationResponse(new ZooGetChildrenResponse( + Util::CollectionBuilder()("/a/1")("/a/2") + )); + ChildEventCountingWatcher wobject2; + rc=zoo_awget_children(zh,"/a",activeWatcher, + &wobject2,asyncCompletion,&ignored); + CPPUNIT_ASSERT_EQUAL(ZOK,rc); + + // this will process the response and activate the watcher + while((rc=zookeeper_process(zh,ZOOKEEPER_READ))==ZOK); + CPPUNIT_ASSERT_EQUAL(ZNOTHING,rc); + + // we are all set now; let's trigger the watches + zkServer.addRecvResponse(new ZNodeEvent(CHILD_EVENT,"/a")); + // make sure the watchers have been processed + while((rc=zookeeper_process(zh,ZOOKEEPER_READ))==ZOK); + CPPUNIT_ASSERT_EQUAL(ZNOTHING,rc); + + CPPUNIT_ASSERT_EQUAL(0,wobject1.counter_); + CPPUNIT_ASSERT_EQUAL(1,wobject2.counter_); + CPPUNIT_ASSERT_EQUAL(0,defWatcher.counter_); + } + +#else + // verify: the default watcher is called once for a session event + void testDefaultSessionWatcher1(){ + Mock_gettimeofday timeMock; + // zookeeper simulator + ZookeeperServer zkServer; + // detects when all watchers have been delivered + WatcherDeliveryTracker deliveryTracker(SESSION_EVENT,CONNECTED_STATE); + Mock_poll pollMock(&zkServer,ZookeeperServer::FD); + // must call zookeeper_close() while all the mocks are in the scope! + CloseFinally guard(&zh); + + ConnectionWatcher watcher; + zh=zookeeper_init("localhost:2121",activeWatcher,10000,TEST_CLIENT_ID, + &watcher,0); + CPPUNIT_ASSERT(zh!=0); + // wait till watcher proccessing has completed (the connection + // established event) + CPPUNIT_ASSERT(ensureCondition( + deliveryTracker.isWatcherProcessingCompleted(),1000)<1000); + + // verify the watcher has been triggered + CPPUNIT_ASSERT(ensureCondition(watcher.isConnectionEstablished(),1000)<1000); + // triggered only once + CPPUNIT_ASSERT_EQUAL(1,watcher.counter_); + } + + // test case: connect to server, set a default watcher, disconnect from the server + // verify: the default watcher is called once + void testDefaultSessionWatcher2(){ + Mock_gettimeofday timeMock; + // zookeeper simulator + ZookeeperServer zkServer; + Mock_poll pollMock(&zkServer,ZookeeperServer::FD); + // must call zookeeper_close() while all the mocks are in the scope! + CloseFinally guard(&zh); + + // detects when all watchers have been delivered + WatcherDeliveryTracker deliveryTracker(SESSION_EVENT,CONNECTING_STATE); + DisconnectWatcher watcher; + zh=zookeeper_init("localhost:2121",activeWatcher,10000,TEST_CLIENT_ID, + &watcher,0); + CPPUNIT_ASSERT(zh!=0); + // make sure the client has connected + CPPUNIT_ASSERT(ensureCondition(ClientConnected(zh),1000)<1000); + // set a default watch + AsyncCompletion ignored; + // a successful server response will activate the watcher + zkServer.addOperationResponse(new ZooStatResponse); + int rc=zoo_aexists(zh,"/x/y/z",1,asyncCompletion,&ignored); + CPPUNIT_ASSERT_EQUAL(ZOK,rc); + + // now, initiate a disconnect + zkServer.setConnectionLost(); + CPPUNIT_ASSERT(ensureCondition( + deliveryTracker.isWatcherProcessingCompleted(),1000)<1000); + + // verify the watcher has been triggered + CPPUNIT_ASSERT(watcher.disconnected_); + // triggered only once + CPPUNIT_ASSERT_EQUAL(1,watcher.counter_); + } + + // testcase: connect to the server, set a watcher object on a node, + // disconnect from the server + // verify: the watcher object as well as the default watcher are called + void testObjectSessionWatcher1(){ + Mock_gettimeofday timeMock; + // zookeeper simulator + ZookeeperServer zkServer; + Mock_poll pollMock(&zkServer,ZookeeperServer::FD); + // must call zookeeper_close() while all the mocks are in the scope! + CloseFinally guard(&zh); + + // detects when all watchers have been delivered + WatcherDeliveryTracker deliveryTracker(SESSION_EVENT,CONNECTING_STATE); + DisconnectWatcher defWatcher; + // use the tracker to find out when the watcher has been activated + WatcherActivationTracker activationTracker; + zh=zookeeper_init("localhost:2121",activeWatcher,10000,TEST_CLIENT_ID, + &defWatcher,0); + CPPUNIT_ASSERT(zh!=0); + // make sure the client has connected + CPPUNIT_ASSERT(ensureCondition(ClientConnected(zh),1000)<1000); + + AsyncCompletion ignored; + // this successful server response will activate the watcher + zkServer.addOperationResponse(new ZooStatResponse); + CountingDataWatcher wobject; + activationTracker.track(&wobject); + // set a path-specific watcher + int rc=zoo_awexists(zh,"/x/y/z",activeWatcher,&wobject, + asyncCompletion,&ignored); + CPPUNIT_ASSERT_EQUAL(ZOK,rc); + // make sure the watcher gets activated before we continue + CPPUNIT_ASSERT(ensureCondition(activationTracker.isWatcherActivated(),1000)<1000); + + // now, initiate a disconnect + zkServer.setConnectionLost(); + // make sure all watchers have been processed + CPPUNIT_ASSERT(ensureCondition( + deliveryTracker.isWatcherProcessingCompleted(),1000)<1000); + + // verify the default watcher has been triggered + CPPUNIT_ASSERT(defWatcher.disconnected_); + // and triggered only once + CPPUNIT_ASSERT_EQUAL(1,defWatcher.counter_); + + // the path-specific watcher has been triggered as well + CPPUNIT_ASSERT(wobject.disconnected_); + // only once! + CPPUNIT_ASSERT_EQUAL(1,wobject.counter_); + } + + // testcase: connect to the server, set a watcher object on a node, + // set a def watcher on another node,disconnect from the server + // verify: the watcher object as well as the default watcher are called + void testObjectSessionWatcher2(){ + Mock_gettimeofday timeMock; + // zookeeper simulator + ZookeeperServer zkServer; + Mock_poll pollMock(&zkServer,ZookeeperServer::FD); + // must call zookeeper_close() while all the mocks are in the scope! + CloseFinally guard(&zh); + + // detects when all watchers have been delivered + WatcherDeliveryTracker deliveryTracker(SESSION_EVENT,CONNECTING_STATE); + DisconnectWatcher defWatcher; + // use the tracker to find out when the watcher has been activated + WatcherActivationTracker activationTracker; + zh=zookeeper_init("localhost:2121",activeWatcher,10000,TEST_CLIENT_ID, + &defWatcher,0); + CPPUNIT_ASSERT(zh!=0); + // make sure the client has connected + CPPUNIT_ASSERT(ensureCondition(ClientConnected(zh),1000)<1000); + + // set a default watch + AsyncCompletion ignored; + // a successful server response will activate the watcher + zkServer.addOperationResponse(new ZooStatResponse); + activationTracker.track(&defWatcher); + int rc=zoo_aexists(zh,"/a/b/c",1,asyncCompletion,&ignored); + CPPUNIT_ASSERT_EQUAL(ZOK,rc); + // make sure the watcher gets activated before we continue + CPPUNIT_ASSERT(ensureCondition(activationTracker.isWatcherActivated(),1000)<1000); + + // this successful server response will activate the watcher + zkServer.addOperationResponse(new ZooStatResponse); + CountingDataWatcher wobject; + activationTracker.track(&wobject); + // set a path-specific watcher + rc=zoo_awexists(zh,"/x/y/z",activeWatcher,&wobject, + asyncCompletion,&ignored); + CPPUNIT_ASSERT_EQUAL(ZOK,rc); + // make sure the watcher gets activated before we continue + CPPUNIT_ASSERT(ensureCondition(activationTracker.isWatcherActivated(),1000)<1000); + + // now, initiate a disconnect + zkServer.setConnectionLost(); + // make sure all watchers have been processed + CPPUNIT_ASSERT(ensureCondition( + deliveryTracker.isWatcherProcessingCompleted(),1000)<1000); + + // verify the default watcher has been triggered + CPPUNIT_ASSERT(defWatcher.disconnected_); + // and triggered only once + CPPUNIT_ASSERT_EQUAL(1,defWatcher.counter_); + + // the path-specific watcher has been triggered as well + CPPUNIT_ASSERT(wobject.disconnected_); + // only once! + CPPUNIT_ASSERT_EQUAL(1,wobject.counter_); + } + + // testcase: register 2 node watches for different paths, trigger the watches + // verify: the data watchers are processed, the default watcher is not called + void testNodeWatcher1(){ + Mock_gettimeofday timeMock; + // zookeeper simulator + ZookeeperServer zkServer; + Mock_poll pollMock(&zkServer,ZookeeperServer::FD); + // must call zookeeper_close() while all the mocks are in the scope! + CloseFinally guard(&zh); + + // detects when all watchers have been delivered + WatcherDeliveryTracker deliveryTracker(CHANGED_EVENT,0,false); + CountingDataWatcher defWatcher; + // use the tracker to find out when the watcher has been activated + WatcherActivationTracker activationTracker; + zh=zookeeper_init("localhost:2121",activeWatcher,10000,TEST_CLIENT_ID, + &defWatcher,0); + CPPUNIT_ASSERT(zh!=0); + // make sure the client has connected + CPPUNIT_ASSERT(ensureCondition(ClientConnected(zh),1000)<1000); + + // don't care about completions + AsyncCompletion ignored; + // set a one-shot watch + // a successful server response will activate the watcher + zkServer.addOperationResponse(new ZooStatResponse); + CountingDataWatcher wobject1; + activationTracker.track(&wobject1); + int rc=zoo_awexists(zh,"/a/b/c",activeWatcher,&wobject1, + asyncCompletion,&ignored); + CPPUNIT_ASSERT_EQUAL(ZOK,rc); + // make sure the watcher gets activated before we continue + CPPUNIT_ASSERT(ensureCondition(activationTracker.isWatcherActivated(),1000)<1000); + + // this successful server response will activate the watcher + zkServer.addOperationResponse(new ZooStatResponse); + CountingDataWatcher wobject2; + activationTracker.track(&wobject2); + // set a path-specific watcher + rc=zoo_awexists(zh,"/x/y/z",activeWatcher,&wobject2, + asyncCompletion,&ignored); + CPPUNIT_ASSERT_EQUAL(ZOK,rc); + // make sure the watcher gets activated before we continue + CPPUNIT_ASSERT(ensureCondition(activationTracker.isWatcherActivated(),1000)<1000); + + // we are all set now; let's trigger the watches + zkServer.addRecvResponse(new ZNodeEvent(CHANGED_EVENT,"/a/b/c")); + zkServer.addRecvResponse(new ZNodeEvent(CHANGED_EVENT,"/x/y/z")); + // make sure all watchers have been processed + CPPUNIT_ASSERT(ensureCondition( + deliveryTracker.deliveryCounterEquals(2),1000)<1000); + + CPPUNIT_ASSERT_EQUAL(1,wobject1.counter_); + CPPUNIT_ASSERT_EQUAL(1,wobject2.counter_); + CPPUNIT_ASSERT_EQUAL(0,defWatcher.counter_); + } + + // testcase: set up both a children and a data watchers on the node /a, then + // delete the node (that is, send a DELETE_EVENT) + // verify: both watchers are triggered + void testChildWatcher1(){ + Mock_gettimeofday timeMock; + // zookeeper simulator + ZookeeperServer zkServer; + Mock_poll pollMock(&zkServer,ZookeeperServer::FD); + // must call zookeeper_close() while all the mocks are in the scope! + CloseFinally guard(&zh); + + // detects when all watchers have been delivered + WatcherDeliveryTracker deliveryTracker(DELETED_EVENT,0); + DeletionCountingDataWatcher defWatcher; + zh=zookeeper_init("localhost:2121",activeWatcher,10000,TEST_CLIENT_ID, + &defWatcher,0); + CPPUNIT_ASSERT(zh!=0); + // make sure the client has connected + CPPUNIT_ASSERT(ensureCondition(ClientConnected(zh),1000)<1000); + + // a successful server response will activate the watcher + zkServer.addOperationResponse(new ZooStatResponse); + DeletionCountingDataWatcher wobject1; + Stat stat; + // add a node watch + int rc=zoo_wexists(zh,"/a",activeWatcher,&wobject1,&stat); + CPPUNIT_ASSERT_EQUAL(ZOK,rc); + + typedef ZooGetChildrenResponse::StringVector ZooVector; + zkServer.addOperationResponse(new ZooGetChildrenResponse( + Util::CollectionBuilder()("/a/1")("/a/2") + )); + DeletionCountingDataWatcher wobject2; + String_vector children; + rc=zoo_wget_children(zh,"/a",activeWatcher,&wobject2,&children); + deallocate_String_vector(&children); + CPPUNIT_ASSERT_EQUAL(ZOK,rc); + + // we are all set now; let's trigger the watches + zkServer.addRecvResponse(new ZNodeEvent(DELETED_EVENT,"/a")); + // make sure the watchers have been processed + CPPUNIT_ASSERT(ensureCondition( + deliveryTracker.isWatcherProcessingCompleted(),1000)<1000); + + CPPUNIT_ASSERT_EQUAL(1,wobject1.counter_); + CPPUNIT_ASSERT_EQUAL(1,wobject2.counter_); + CPPUNIT_ASSERT_EQUAL(0,defWatcher.counter_); + } + + // testcase: create both a child and data watch on the node /a, send a CHILD_EVENT + // verify: only the child watch triggered + void testChildWatcher2(){ + Mock_gettimeofday timeMock; + // zookeeper simulator + ZookeeperServer zkServer; + Mock_poll pollMock(&zkServer,ZookeeperServer::FD); + // must call zookeeper_close() while all the mocks are in the scope! + CloseFinally guard(&zh); + + // detects when all watchers have been delivered + WatcherDeliveryTracker deliveryTracker(CHILD_EVENT,0); + ChildEventCountingWatcher defWatcher; + zh=zookeeper_init("localhost:2121",activeWatcher,10000,TEST_CLIENT_ID, + &defWatcher,0); + CPPUNIT_ASSERT(zh!=0); + // make sure the client has connected + CPPUNIT_ASSERT(ensureCondition(ClientConnected(zh),1000)<1000); + + // a successful server response will activate the watcher + zkServer.addOperationResponse(new ZooStatResponse); + ChildEventCountingWatcher wobject1; + Stat stat; + // add a node watch + int rc=zoo_wexists(zh,"/a",activeWatcher,&wobject1,&stat); + CPPUNIT_ASSERT_EQUAL(ZOK,rc); + + typedef ZooGetChildrenResponse::StringVector ZooVector; + zkServer.addOperationResponse(new ZooGetChildrenResponse( + Util::CollectionBuilder()("/a/1")("/a/2") + )); + ChildEventCountingWatcher wobject2; + String_vector children; + rc=zoo_wget_children(zh,"/a",activeWatcher,&wobject2,&children); + deallocate_String_vector(&children); + CPPUNIT_ASSERT_EQUAL(ZOK,rc); + + // we are all set now; let's trigger the watches + zkServer.addRecvResponse(new ZNodeEvent(CHILD_EVENT,"/a")); + // make sure the watchers have been processed + CPPUNIT_ASSERT(ensureCondition( + deliveryTracker.isWatcherProcessingCompleted(),1000)<1000); + + CPPUNIT_ASSERT_EQUAL(0,wobject1.counter_); + CPPUNIT_ASSERT_EQUAL(1,wobject2.counter_); + CPPUNIT_ASSERT_EQUAL(0,defWatcher.counter_); + } + +#endif //THREADED +}; + +CPPUNIT_TEST_SUITE_REGISTRATION(Zookeeper_watchers); Propchange: hadoop/zookeeper/trunk/src/c/tests/TestWatchers.cc ------------------------------------------------------------------------------ svn:eol-style = native Modified: hadoop/zookeeper/trunk/src/c/tests/TestZookeeperClose.cc URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/c/tests/TestZookeeperClose.cc?rev=679557&r1=679556&r2=679557&view=diff ============================================================================== --- hadoop/zookeeper/trunk/src/c/tests/TestZookeeperClose.cc (original) +++ hadoop/zookeeper/trunk/src/c/tests/TestZookeeperClose.cc Thu Jul 24 14:46:30 2008 @@ -38,7 +38,7 @@ CPPUNIT_TEST(testCloseFromWatcher1); CPPUNIT_TEST_SUITE_END(); zhandle_t *zh; - static void watcher(zhandle_t *, int, int, const char *){} + static void watcher(zhandle_t *, int, int, const char *,void*){} public: void setUp() { @@ -59,7 +59,7 @@ virtual void onSessionExpired(zhandle_t* zh){ memcpy(&lzh,zh,sizeof(lzh)); if(callClose_) - rc=zookeeper_close(zh); + rc=zookeeper_close(zh); } zhandle_t lzh; bool callClose_; @@ -88,7 +88,7 @@ CPPUNIT_ASSERT_EQUAL(1,freeMock.getFreeCount(savezh)); CPPUNIT_ASSERT_EQUAL(1,freeMock.getFreeCount(lzh.hostname)); CPPUNIT_ASSERT_EQUAL(1,freeMock.getFreeCount(lzh.addrs)); - CPPUNIT_ASSERT_EQUAL(3,freeMock.callCounter); + CPPUNIT_ASSERT_EQUAL(9,freeMock.callCounter); } void testCloseUnconnected1() { @@ -236,7 +236,7 @@ CPPUNIT_ASSERT_EQUAL(1,freeMock.getFreeCount(lzh.hostname)); CPPUNIT_ASSERT_EQUAL(1,freeMock.getFreeCount(lzh.addrs)); CPPUNIT_ASSERT_EQUAL(1,freeMock.getFreeCount(adaptor)); - CPPUNIT_ASSERT_EQUAL(4,freeMock.callCounter); + CPPUNIT_ASSERT_EQUAL(10,freeMock.callCounter); // threads CPPUNIT_ASSERT_EQUAL(1,MockPthreadsNull::getDestroyCounter(adaptor->io)); CPPUNIT_ASSERT_EQUAL(1,MockPthreadsNull::getDestroyCounter(adaptor->completion)); @@ -410,6 +410,8 @@ CPPUNIT_ASSERT(zh!=0); CPPUNIT_ASSERT(ensureCondition(SessionExpired(zh),1000)<1000); CPPUNIT_ASSERT(ensureCondition(IOThreadStopped(zh),1000)<1000); + // make sure the watcher has been processed + CPPUNIT_ASSERT(ensureCondition(closeAction.isWatcherTriggered(),1000)<1000); // make sure the threads have not been destroyed yet adaptor_threads* adaptor=(adaptor_threads*)zh->adaptor_priv; CPPUNIT_ASSERT_EQUAL(0,CheckedPthread::getDestroyCounter(adaptor->io)); Modified: hadoop/zookeeper/trunk/src/c/tests/TestZookeeperInit.cc URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/c/tests/TestZookeeperInit.cc?rev=679557&r1=679556&r2=679557&view=diff ============================================================================== --- hadoop/zookeeper/trunk/src/c/tests/TestZookeeperInit.cc (original) +++ hadoop/zookeeper/trunk/src/c/tests/TestZookeeperInit.cc Thu Jul 24 14:46:30 2008 @@ -49,7 +49,7 @@ CPPUNIT_TEST_SUITE_END(); zhandle_t *zh; MockPthreadsNull* pthreadMock; - static void watcher(zhandle_t *, int , int , const char *){} + static void watcher(zhandle_t *, int , int , const char *,void*){} public: Zookeeper_init():zh(0),pthreadMock(0){} Added: hadoop/zookeeper/trunk/src/c/tests/Vector.h URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/c/tests/Vector.h?rev=679557&view=auto ============================================================================== --- hadoop/zookeeper/trunk/src/c/tests/Vector.h (added) +++ hadoop/zookeeper/trunk/src/c/tests/Vector.h Thu Jul 24 14:46:30 2008 @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef _VECTOR_UTIL_H +#define _VECTOR_UTIL_H + +#include + +// function to conveniently stream vectors +template +std::ostream& operator<<(std::ostream& os,const std::vector& c){ + typedef std::vector V; + os<<"["; + if(c.size()>0){ + for(typename V::const_iterator it=c.begin();it!=c.end();++it) + os<<*it<<","; + os.seekp(-1,std::ios::cur); + } + os<<"]"; + return os; +} + +#endif // _VECTOR_UTIL_H Propchange: hadoop/zookeeper/trunk/src/c/tests/Vector.h ------------------------------------------------------------------------------ svn:eol-style = native