zookeeper-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From an...@apache.org
Subject [02/20] zookeeper git commit: ZOOKEEPER-3031: 3.4: MAVEN MIGRATION - move client dir
Date Thu, 23 Aug 2018 14:11:54 GMT
http://git-wip-us.apache.org/repos/asf/zookeeper/blob/5694559d/zookeeper-client/zookeeper-client-c/tests/TestOperations.cc
----------------------------------------------------------------------
diff --git a/zookeeper-client/zookeeper-client-c/tests/TestOperations.cc b/zookeeper-client/zookeeper-client-c/tests/TestOperations.cc
new file mode 100644
index 0000000..27d9270
--- /dev/null
+++ b/zookeeper-client/zookeeper-client-c/tests/TestOperations.cc
@@ -0,0 +1,710 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <cppunit/extensions/HelperMacros.h>
+#include "CppAssertHelper.h"
+
+#include "ZKMocks.h"
+#include <proto.h>
+
+using namespace std;
+
+class Zookeeper_operations : public CPPUNIT_NS::TestFixture
+{
+    CPPUNIT_TEST_SUITE(Zookeeper_operations);
+#ifndef THREADED
+    CPPUNIT_TEST(testPing);
+    CPPUNIT_TEST(testUnsolicitedPing);
+    CPPUNIT_TEST(testTimeoutCausedByWatches1);
+    CPPUNIT_TEST(testTimeoutCausedByWatches2);
+#else    
+    CPPUNIT_TEST(testAsyncWatcher1);
+    CPPUNIT_TEST(testAsyncGetOperation);
+#endif
+    CPPUNIT_TEST(testOperationsAndDisconnectConcurrently1);
+    CPPUNIT_TEST(testOperationsAndDisconnectConcurrently2);
+    CPPUNIT_TEST(testConcurrentOperations1);
+    CPPUNIT_TEST_SUITE_END();
+    zhandle_t *zh;
+    FILE *logfile;
+
+    static void watcher(zhandle_t *, int, int, const char *,void*){}
+public: 
+    Zookeeper_operations() {
+      logfile = openlogfile("Zookeeper_operations");
+    }
+
+    ~Zookeeper_operations() {
+      if (logfile) {
+        fflush(logfile);
+        fclose(logfile);
+        logfile = 0;
+      }
+    }
+
+    void setUp()
+    {
+        zoo_set_log_stream(logfile);
+
+        zoo_deterministic_conn_order(0);
+        zh=0;
+    }
+    
+    void tearDown()
+    {
+        zookeeper_close(zh);
+    }
+
+    class AsyncGetOperationCompletion: public AsyncCompletion{
+    public:
+        AsyncGetOperationCompletion():called_(false),rc_(ZAPIERROR){}
+        virtual void dataCompl(int rc, const char *value, int len, const Stat *stat){
+            synchronized(mx_);
+            called_=true;
+            rc_=rc;
+            value_.erase();
+            if(rc!=ZOK) return;
+            value_.assign(value,len);
+            if(stat)
+                stat_=*stat;
+        }
+        bool operator()()const{
+            synchronized(mx_);
+            return called_;
+        }
+        mutable Mutex mx_;
+        bool called_;
+        int rc_;
+        string value_;
+        NodeStat stat_;
+    };
+#ifndef THREADED
+    // send two get data requests; verify that the corresponding completions called
+    void testConcurrentOperations1()
+    {
+        Mock_gettimeofday timeMock;
+        ZookeeperServer zkServer;
+        // must call zookeeper_close() while all the mocks are in scope
+        CloseFinally guard(&zh);
+        
+        zh=zookeeper_init("localhost:2121",watcher,10000,TEST_CLIENT_ID,0,0);
+        CPPUNIT_ASSERT(zh!=0);
+        // simulate connected state
+        forceConnected(zh);
+        
+        int fd=0;
+        int interest=0;
+        timeval tv;
+        // first operation
+        AsyncGetOperationCompletion res1;
+        zkServer.addOperationResponse(new ZooGetResponse("1",1));
+        int rc=zoo_aget(zh,"/x/y/1",0,asyncCompletion,&res1);
+        CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
+        // second operation
+        AsyncGetOperationCompletion res2;
+        zkServer.addOperationResponse(new ZooGetResponse("2",1));
+        rc=zoo_aget(zh,"/x/y/2",0,asyncCompletion,&res2);
+        CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
+        // process the send queue
+        rc=zookeeper_interest(zh,&fd,&interest,&tv);
+        CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
+        while((rc=zookeeper_process(zh,interest))==ZOK) {
+          millisleep(100);
+          //printf("%d\n", rc);
+        }
+        //printf("RC = %d", rc);
+        CPPUNIT_ASSERT_EQUAL((int)ZNOTHING,rc);
+
+        CPPUNIT_ASSERT_EQUAL((int)ZOK,res1.rc_);
+        CPPUNIT_ASSERT_EQUAL(string("1"),res1.value_);
+        CPPUNIT_ASSERT_EQUAL((int)ZOK,res2.rc_);
+        CPPUNIT_ASSERT_EQUAL(string("2"),res2.value_);
+    }
+    // send two getData requests and disconnect while the second request is
+    // outstanding;
+    // verify the completions are called
+    void testOperationsAndDisconnectConcurrently1()
+    {
+        Mock_gettimeofday timeMock;
+        ZookeeperServer zkServer;
+        // must call zookeeper_close() while all the mocks are in scope
+        CloseFinally guard(&zh);
+        
+        zh=zookeeper_init("localhost:2121",watcher,10000,TEST_CLIENT_ID,0,0);
+        CPPUNIT_ASSERT(zh!=0);
+        // simulate connected state
+        forceConnected(zh);
+        
+        int fd=0;
+        int interest=0;
+        timeval tv;
+        // first operation
+        AsyncGetOperationCompletion res1;
+        zkServer.addOperationResponse(new ZooGetResponse("1",1));
+        int rc=zoo_aget(zh,"/x/y/1",0,asyncCompletion,&res1);
+        CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
+        // second operation
+        AsyncGetOperationCompletion res2;
+        zkServer.addOperationResponse(new ZooGetResponse("2",1));
+        rc=zoo_aget(zh,"/x/y/2",0,asyncCompletion,&res2);
+        CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
+        // process the send queue
+        rc=zookeeper_interest(zh,&fd,&interest,&tv);
+        CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
+        rc=zookeeper_process(zh,interest);
+        CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
+        // simulate a disconnect
+        zkServer.setConnectionLost();
+        rc=zookeeper_interest(zh,&fd,&interest,&tv);
+        rc=zookeeper_process(zh,interest);
+        CPPUNIT_ASSERT_EQUAL((int)ZCONNECTIONLOSS,rc);
+        CPPUNIT_ASSERT_EQUAL((int)ZOK,res1.rc_);
+        CPPUNIT_ASSERT_EQUAL(string("1"),res1.value_);
+        CPPUNIT_ASSERT_EQUAL((int)ZCONNECTIONLOSS,res2.rc_);
+        CPPUNIT_ASSERT_EQUAL(string(""),res2.value_);
+    }
+    // send two getData requests and simulate timeout while the both request
+    // are pending;
+    // verify the completions are called
+    void testOperationsAndDisconnectConcurrently2()
+    {
+        Mock_gettimeofday timeMock;
+        ZookeeperServer zkServer;
+        // must call zookeeper_close() while all the mocks are in scope
+        CloseFinally guard(&zh);
+        
+        zh=zookeeper_init("localhost:2121",watcher,10000,TEST_CLIENT_ID,0,0);
+        CPPUNIT_ASSERT(zh!=0);
+        // simulate connected state
+        forceConnected(zh);
+        
+        int fd=0;
+        int interest=0;
+        timeval tv;
+        // first operation
+        AsyncGetOperationCompletion res1;
+        zkServer.addOperationResponse(new ZooGetResponse("1",1));
+        int rc=zoo_aget(zh,"/x/y/1",0,asyncCompletion,&res1);
+        CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
+        // second operation
+        AsyncGetOperationCompletion res2;
+        zkServer.addOperationResponse(new ZooGetResponse("2",1));
+        rc=zoo_aget(zh,"/x/y/2",0,asyncCompletion,&res2);
+        CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
+        // simulate timeout
+        timeMock.tick(+10); // advance system time by 10 secs
+        // the next call to zookeeper_interest should return ZOPERATIONTIMEOUT
+        rc=zookeeper_interest(zh,&fd,&interest,&tv);
+        CPPUNIT_ASSERT_EQUAL((int)ZOPERATIONTIMEOUT,rc);
+        // make sure the completions have been called
+        CPPUNIT_ASSERT_EQUAL((int)ZOPERATIONTIMEOUT,res1.rc_);
+        CPPUNIT_ASSERT_EQUAL((int)ZOPERATIONTIMEOUT,res2.rc_);
+    }
+
+    class PingCountingServer: public ZookeeperServer{
+    public:
+        PingCountingServer():pingCount_(0){}
+        // called when a client request is received
+        virtual void onMessageReceived(const RequestHeader& rh, iarchive* ia){
+           if(rh.type==ZOO_PING_OP){
+               pingCount_++;
+           }
+        }
+        int pingCount_;
+    };
+
+    // establish a connection; idle for a while
+    // verify ping was sent at least once
+    void testPing()
+    {
+        const int TIMEOUT=9; // timeout in secs
+        Mock_gettimeofday timeMock;
+        PingCountingServer zkServer;
+        // must call zookeeper_close() while all the mocks are in scope
+        CloseFinally guard(&zh);
+        
+        // receive timeout is in milliseconds
+        zh=zookeeper_init("localhost:1234",watcher,TIMEOUT*1000,TEST_CLIENT_ID,0,0);
+        CPPUNIT_ASSERT(zh!=0);
+        // simulate connected state
+        forceConnected(zh);
+        
+        int fd=0;
+        int interest=0;
+        timeval tv;
+        // Round 1.
+        int rc=zookeeper_interest(zh,&fd,&interest,&tv);
+        CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
+        // simulate waiting for the select() call to timeout; 
+        // advance the system clock accordingly
+        timeMock.tick(tv);  
+        rc=zookeeper_process(zh,interest);
+        CPPUNIT_ASSERT_EQUAL((int)ZNOTHING,rc);
+        // verify no ping sent
+        CPPUNIT_ASSERT(zkServer.pingCount_==0);
+        
+        // Round 2.
+        // the client should have the idle threshold exceeded, by now
+        rc=zookeeper_interest(zh,&fd,&interest,&tv);
+        CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
+        // assume the socket is writable, so no idling here; move on to 
+        // zookeeper_process immediately
+        rc=zookeeper_process(zh,interest);
+        // ZNOTHING means the client hasn't received a ping response yet
+        CPPUNIT_ASSERT_EQUAL((int)ZNOTHING,rc);
+        // verify a ping is sent
+        CPPUNIT_ASSERT_EQUAL(1,zkServer.pingCount_);
+        
+        // Round 3.
+        // we're going to receive a server PING response and make sure
+        // that the client has updated its last_recv timestamp 
+        zkServer.addRecvResponse(new PingResponse);
+        rc=zookeeper_interest(zh,&fd,&interest,&tv);
+        CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
+        // pseudo-sleep for a short while (10 ms)
+        timeMock.millitick(10);
+        rc=zookeeper_process(zh,interest);
+        CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
+        // only one ping so far?
+        CPPUNIT_ASSERT_EQUAL(1,zkServer.pingCount_);
+        CPPUNIT_ASSERT(timeMock==zh->last_recv);
+
+        // Round 4
+        // make sure that a ping is not sent if something is outstanding
+        AsyncGetOperationCompletion res1;
+        rc=zoo_aget(zh,"/x/y/1",0,asyncCompletion,&res1);
+        CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
+        rc=zookeeper_interest(zh,&fd,&interest,&tv);
+        CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
+        timeMock.tick(tv);  
+        rc=zookeeper_interest(zh,&fd,&interest,&tv);
+        CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
+        rc=zookeeper_process(zh,interest);
+        CPPUNIT_ASSERT_EQUAL((int)ZNOTHING,rc);
+        rc=zookeeper_interest(zh,&fd,&interest,&tv);
+        CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
+        // pseudo-sleep for a short while (10 ms)
+        timeMock.millitick(10);
+        rc=zookeeper_process(zh,interest);
+        CPPUNIT_ASSERT_EQUAL((int)ZNOTHING,rc);
+        // only one ping so far?
+        CPPUNIT_ASSERT_EQUAL(1,zkServer.pingCount_);
+    }
+
+    // ZOOKEEPER-2253: Permit unsolicited pings
+    void testUnsolicitedPing()
+    {
+        const int TIMEOUT=9; // timeout in secs
+        Mock_gettimeofday timeMock;
+        PingCountingServer zkServer;
+        // must call zookeeper_close() while all the mocks are in scope
+        CloseFinally guard(&zh);
+
+        // receive timeout is in milliseconds
+        zh=zookeeper_init("localhost:1234",watcher,TIMEOUT*1000,TEST_CLIENT_ID,0,0);
+        CPPUNIT_ASSERT(zh!=0);
+        // simulate connected state
+        forceConnected(zh);
+
+        int fd=0;
+        int interest=0;
+        timeval tv;
+
+        int rc=zookeeper_interest(zh,&fd,&interest,&tv);
+        CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
+
+        // verify no ping sent
+        CPPUNIT_ASSERT(zkServer.pingCount_==0);
+
+        // we're going to receive a unsolicited PING response; ensure
+        // that the client has updated its last_recv timestamp
+        timeMock.tick(tv);
+        zkServer.addRecvResponse(new PingResponse);
+        rc=zookeeper_process(zh,interest);
+        CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
+        CPPUNIT_ASSERT(timeMock==zh->last_recv);
+    }
+
+    // simulate a watch arriving right before a ping is due
+    // assert the ping is sent nevertheless
+    void testTimeoutCausedByWatches1()
+    {
+        const int TIMEOUT=9; // timeout in secs
+        Mock_gettimeofday timeMock;
+        PingCountingServer zkServer;
+        // must call zookeeper_close() while all the mocks are in scope
+        CloseFinally guard(&zh);
+        
+        // receive timeout is in milliseconds
+        zh=zookeeper_init("localhost:1234",watcher,TIMEOUT*1000,TEST_CLIENT_ID,0,0);
+        CPPUNIT_ASSERT(zh!=0);
+        // simulate connected state
+        forceConnected(zh);
+        
+        int fd=0;
+        int interest=0;
+        timeval tv;
+        // Round 1.
+        int rc=zookeeper_interest(zh,&fd,&interest,&tv);
+        CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
+        // simulate waiting for the select() call to timeout; 
+        // advance the system clock accordingly
+        timeMock.tick(tv);
+        timeMock.tick(-1); // set the clock to a millisecond before a ping is due
+        // trigger a watch now
+        zkServer.addRecvResponse(new ZNodeEvent(ZOO_CHANGED_EVENT,"/x/y/z"));
+        rc=zookeeper_process(zh,interest);
+        CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
+        // arrival of a watch sets the last_recv to the current time
+        CPPUNIT_ASSERT(timeMock==zh->last_recv);
+        // spend 1 millisecond by processing the watch
+        timeMock.tick(1);
+        
+        // Round 2.
+        // a ping is due; zookeeper_interest() must send it now
+        rc=zookeeper_interest(zh,&fd,&interest,&tv);
+        CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
+        // no delay here -- as if the socket is immediately writable
+        rc=zookeeper_process(zh,interest);
+        CPPUNIT_ASSERT_EQUAL((int)ZNOTHING,rc);
+        // verify a ping is sent
+        CPPUNIT_ASSERT_EQUAL(1,zkServer.pingCount_);        
+    }
+
+    // similar to testTimeoutCausedByWatches1, but this time the watch is 
+    // triggered while the client has an outstanding request
+    // assert the ping is sent on time
+    void testTimeoutCausedByWatches2()
+    {
+        const int TIMEOUT=9; // timeout in secs
+        Mock_gettimeofday now;
+        PingCountingServer zkServer;
+        // must call zookeeper_close() while all the mocks are in scope
+        CloseFinally guard(&zh);
+        
+        // receive timeout is in milliseconds
+        zh=zookeeper_init("localhost:1234",watcher,TIMEOUT*1000,TEST_CLIENT_ID,0,0);
+        CPPUNIT_ASSERT(zh!=0);
+        // simulate connected state
+        forceConnected(zh);
+        
+        // queue up a request; keep it pending (as if the server is busy or has died)
+        AsyncGetOperationCompletion res1;
+        zkServer.addOperationResponse(new ZooGetResponse("2",1));
+        int rc=zoo_aget(zh,"/x/y/1",0,asyncCompletion,&res1);
+
+        int fd=0;
+        int interest=0;
+        timeval tv;
+        // Round 1.
+        // send the queued up zoo_aget() request
+        Mock_gettimeofday beginningOfTimes(now); // remember when we started
+        rc=zookeeper_interest(zh,&fd,&interest,&tv);
+        CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
+        // no delay -- the socket is writable
+        rc=zookeeper_process(zh,interest);
+        CPPUNIT_ASSERT_EQUAL((int)ZOK,rc); 
+        
+        // Round 2.
+        // what's next?
+        rc=zookeeper_interest(zh,&fd,&interest,&tv);
+        CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
+        // no response from the server yet -- waiting in the select() call
+        now.tick(tv);
+        // a watch has arrived, thus preventing the connection from timing out 
+        zkServer.addRecvResponse(new ZNodeEvent(ZOO_CHANGED_EVENT,"/x/y/z"));        
+        rc=zookeeper_process(zh,interest);
+        CPPUNIT_ASSERT_EQUAL((int)ZOK,rc); // read the watch message
+        CPPUNIT_ASSERT_EQUAL(0,zkServer.pingCount_); // not yet!
+        
+        //Round 3.
+        // now is the time to send a ping; make sure it's actually sent
+        rc=zookeeper_interest(zh,&fd,&interest,&tv);
+        CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
+        rc=zookeeper_process(zh,interest);
+        CPPUNIT_ASSERT_EQUAL((int)ZNOTHING,rc);
+        // verify a ping is sent
+        CPPUNIT_ASSERT_EQUAL(1,zkServer.pingCount_);
+        // make sure only 1/3 of the timeout has passed
+        CPPUNIT_ASSERT_EQUAL((int32_t)TIMEOUT/3*1000,toMilliseconds(now-beginningOfTimes));
+    }
+
+#else   
+    class TestGetDataJob: public TestJob{
+    public:
+        TestGetDataJob(ZookeeperServer* svr,zhandle_t* zh, int reps=500)
+            :svr_(svr),zh_(zh),rc_(ZAPIERROR),reps_(reps){}
+        virtual void run(){
+            int i;
+            for(i=0;i<reps_;i++){
+                char buf;
+                int size=sizeof(buf);
+
+                if (i % 10 == 0) {
+                    // We need to pause every once in a while so we don't
+                    // get too far ahead and finish before the disconnect
+	            millisleep(1);
+                }
+                svr_->addOperationResponse(new ZooGetResponse("1",1));
+                rc_=zoo_get(zh_,"/x/y/z",0,&buf,&size,0);
+                if(rc_!=ZOK){
+                    break;
+                }
+            }
+        }
+        ZookeeperServer* svr_;
+        zhandle_t* zh_;
+        int rc_;
+        int reps_;
+    };
+    class TestConcurrentOpJob: public TestGetDataJob{
+    public:
+        static const int REPS=500;
+        TestConcurrentOpJob(ZookeeperServer* svr,zhandle_t* zh):
+            TestGetDataJob(svr,zh,REPS){}
+        virtual TestJob* clone() const {
+            return new TestConcurrentOpJob(svr_,zh_);
+        }
+        virtual void validate(const char* file, int line) const{
+            CPPUNIT_ASSERT_EQUAL_MESSAGE_LOC("ZOK != rc",(int)ZOK,rc_,file,line);
+        }
+    };
+    void testConcurrentOperations1()
+    {
+        for(int counter=0; counter<50; counter++){
+            // frozen time -- no timeouts and no pings
+            Mock_gettimeofday timeMock;
+            
+            ZookeeperServer zkServer;
+            Mock_poll pollMock(&zkServer,ZookeeperServer::FD);
+            // must call zookeeper_close() while all the mocks are in the scope!
+            CloseFinally guard(&zh);
+            
+            zh=zookeeper_init("localhost:2121",watcher,10000,TEST_CLIENT_ID,0,0);
+            CPPUNIT_ASSERT(zh!=0);
+            // make sure the client has connected
+            CPPUNIT_ASSERT(ensureCondition(ClientConnected(zh),1000)<1000);
+            
+            TestJobManager jmgr(TestConcurrentOpJob(&zkServer,zh),10);
+            jmgr.startAllJobs();
+            jmgr.wait();
+            // validate test results
+            VALIDATE_JOBS(jmgr);
+        }
+    }
+    class ZKGetJob: public TestJob{
+    public:
+        static const int REPS=1000;
+        ZKGetJob(zhandle_t* zh)
+            :zh_(zh),rc_(ZAPIERROR){}
+        virtual TestJob* clone() const {
+            return new ZKGetJob(zh_);
+        }
+        virtual void run(){
+            int i;
+            for(i=0;i<REPS;i++){
+                char buf;
+                int size=sizeof(buf);                
+                rc_=zoo_get(zh_,"/xyz",0,&buf,&size,0);
+                if(rc_!=ZOK){
+                    break;
+                }
+            }
+            //TEST_TRACE(("Finished %d iterations",i));
+        }
+        virtual void validate(const char* file, int line) const{
+            CPPUNIT_ASSERT_EQUAL_MESSAGE_LOC("ZOK != rc",(int)ZOK,rc_,file,line);
+        }
+        zhandle_t* zh_;
+        int rc_;
+    };
+
+    // this test connects to a real ZK server and creates the /xyz node and sends
+    // lots of zoo_get requests.
+    // to run this test use the following command:
+    // zktest-mt Zookeeper_operations::testOperationsAndDisconnectConcurrently2 localhost:3181
+    // where the second parameter is the server host and port
+    void testOperationsAndDisconnectConcurrently2()
+    {
+        if(globalTestConfig.getTestName().find(__func__)==string::npos || 
+                globalTestConfig.getExtraOptCount()==0)
+        {
+            // only run this test when specifically asked so
+            return;
+        }
+        string host(*(globalTestConfig.getExtraOptBegin()));
+        zhandle_t* lzh=zookeeper_init(host.c_str(),watcher,10000,0,0,0);
+        CPPUNIT_ASSERT(lzh!=0);
+        // make sure the client has connected
+        CPPUNIT_ASSERT_MESSAGE("Unable to connect to the host",
+                ensureCondition(ClientConnected(zh),5000)<5000);
+        
+        char realpath[1024];
+        int rc=zoo_create(lzh,"/xyz","1",1,&ZOO_OPEN_ACL_UNSAFE,0,realpath,sizeof(realpath)-1);
+        CPPUNIT_ASSERT(rc==ZOK || rc==ZNODEEXISTS);
+        zookeeper_close(lzh); 
+  
+        for(int counter=0; counter<200; counter++){
+            TEST_TRACE(("Loop count %d",counter));
+            
+            CloseFinally guard(&zh);
+
+            zh=zookeeper_init(host.c_str(),watcher,10000,0,0,0);
+            CPPUNIT_ASSERT(zh!=0);
+            // make sure the client has connected
+            CPPUNIT_ASSERT_MESSAGE("Unable to connect to the host",
+                    ensureCondition(ClientConnected(zh),5000)<5000);
+            
+            TestJobManager jmgr(ZKGetJob(zh),10);
+            jmgr.startJobsImmediately();
+            jmgr.wait();
+            VALIDATE_JOBS(jmgr);
+            TEST_TRACE(("run %d finished",counter));
+        }
+
+    }
+
+    class TestConcurrentOpWithDisconnectJob: public TestGetDataJob{
+    public:
+        static const int REPS=1000;
+        TestConcurrentOpWithDisconnectJob(ZookeeperServer* svr,zhandle_t* zh):
+            TestGetDataJob(svr,zh,REPS){}
+        virtual TestJob* clone() const {
+            return new TestConcurrentOpWithDisconnectJob(svr_,zh_);
+        }
+        virtual void validate(const char* file, int line) const{
+            CPPUNIT_ASSERT_EQUAL_MESSAGE_LOC("ZCONNECTIONLOSS != rc",(int)ZCONNECTIONLOSS,rc_,file,line);
+        }
+    };
+
+    // this test is not 100% accurate in a sense it may not detect all error cases.
+    // TODO: I can't think of a test that is 100% accurate and doesn't interfere
+    //       with the code being tested (in terms of introducing additional 
+    //       implicit synchronization points)
+    void testOperationsAndDisconnectConcurrently1()
+    {
+        for(int counter=0; counter<50; counter++){
+            //TEST_TRACE(("Loop count %d",counter));
+            // frozen time -- no timeouts and no pings
+            Mock_gettimeofday timeMock;
+            
+            ZookeeperServer zkServer;
+            Mock_poll pollMock(&zkServer,ZookeeperServer::FD);
+            // must call zookeeper_close() while all the mocks are in the scope!
+            CloseFinally guard(&zh);
+            
+            zh=zookeeper_init("localhost:2121",watcher,10000,TEST_CLIENT_ID,0,0);
+            CPPUNIT_ASSERT(zh!=0);
+            // make sure the client has connected
+            CPPUNIT_ASSERT(ensureCondition(ClientConnected(zh),1000)<1000);
+            
+            TestJobManager jmgr(TestConcurrentOpWithDisconnectJob(&zkServer,zh),10);
+            jmgr.startJobsImmediately();
+            // let everything startup before we shutdown the server
+            millisleep(4);
+            // reconnect attempts will start failing immediately 
+            zkServer.setServerDown(0);
+            // next recv call will return 0
+            zkServer.setConnectionLost();
+            jmgr.wait();
+            VALIDATE_JOBS(jmgr);
+        }
+        
+    }
+    // call zoo_aget() in the multithreaded mode
+    void testAsyncGetOperation()
+    {
+        Mock_gettimeofday timeMock;
+        
+        ZookeeperServer zkServer;
+        Mock_poll pollMock(&zkServer,ZookeeperServer::FD);
+        // must call zookeeper_close() while all the mocks are in the scope!
+        CloseFinally guard(&zh);
+        
+        zh=zookeeper_init("localhost:2121",watcher,10000,TEST_CLIENT_ID,0,0);
+        CPPUNIT_ASSERT(zh!=0);
+        // make sure the client has connected
+        CPPUNIT_ASSERT(ensureCondition(ClientConnected(zh),1000)<1000);
+
+        AsyncGetOperationCompletion res1;
+        zkServer.addOperationResponse(new ZooGetResponse("1",1));
+        int rc=zoo_aget(zh,"/x/y/1",0,asyncCompletion,&res1);
+        CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
+        
+        CPPUNIT_ASSERT(ensureCondition(res1,1000)<1000);
+        CPPUNIT_ASSERT_EQUAL((int)ZOK,res1.rc_);
+        CPPUNIT_ASSERT_EQUAL(string("1"),res1.value_);        
+    }
+    class ChangeNodeWatcher: public WatcherAction{
+    public:
+        ChangeNodeWatcher():changed_(false){}
+        virtual void onNodeValueChanged(zhandle_t*,const char* path){
+            synchronized(mx_);
+            changed_=true;
+            if(path!=0) path_=path;
+        }
+        // this predicate checks if CHANGE_EVENT event type was triggered, unlike
+        // the isWatcherTriggered() that returns true whenever a watcher is triggered
+        // regardless of the event type
+        SyncedBoolCondition isNodeChangedTriggered() const{
+            return SyncedBoolCondition(changed_,mx_);
+        }
+        bool changed_;
+        string path_;
+    };
+    
+    class AsyncWatcherCompletion: public AsyncCompletion{
+    public:
+        AsyncWatcherCompletion(ZookeeperServer& zkServer):zkServer_(zkServer){}
+        virtual void statCompl(int rc, const Stat *stat){
+            // we received a server response, now enqueue a watcher event
+            // to trigger the watcher
+            zkServer_.addRecvResponse(new ZNodeEvent(ZOO_CHANGED_EVENT,"/x/y/z"));
+        }
+        ZookeeperServer& zkServer_;
+    };
+    // verify that async watcher is called for znode events (CREATED, DELETED etc.)
+    void testAsyncWatcher1(){
+        Mock_gettimeofday timeMock;
+        
+        ZookeeperServer zkServer;
+        Mock_poll pollMock(&zkServer,ZookeeperServer::FD);
+        // must call zookeeper_close() while all the mocks are in the scope!
+        CloseFinally guard(&zh);
+        
+        ChangeNodeWatcher action;        
+        zh=zookeeper_init("localhost:2121",activeWatcher,10000,
+                TEST_CLIENT_ID,&action,0);
+        CPPUNIT_ASSERT(zh!=0);
+        // make sure the client has connected
+        CPPUNIT_ASSERT(ensureCondition(ClientConnected(zh),1000)<1000);
+        
+        // set the watcher
+        AsyncWatcherCompletion completion(zkServer);
+        // prepare a response for the zoo_aexists() request
+        zkServer.addOperationResponse(new ZooStatResponse);
+        int rc=zoo_aexists(zh,"/x/y/z",1,asyncCompletion,&completion);
+        CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
+        
+        CPPUNIT_ASSERT(ensureCondition(action.isNodeChangedTriggered(),1000)<1000);
+        CPPUNIT_ASSERT_EQUAL(string("/x/y/z"),action.path_);                
+    }
+#endif
+};
+
+CPPUNIT_TEST_SUITE_REGISTRATION(Zookeeper_operations);

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/5694559d/zookeeper-client/zookeeper-client-c/tests/TestWatchers.cc
----------------------------------------------------------------------
diff --git a/zookeeper-client/zookeeper-client-c/tests/TestWatchers.cc b/zookeeper-client/zookeeper-client-c/tests/TestWatchers.cc
new file mode 100644
index 0000000..0a26934
--- /dev/null
+++ b/zookeeper-client/zookeeper-client-c/tests/TestWatchers.cc
@@ -0,0 +1,773 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <cppunit/extensions/HelperMacros.h>
+#include "CppAssertHelper.h"
+
+#include "ZKMocks.h"
+#include "CollectionUtil.h"
+#include "Util.h"
+
+class Zookeeper_watchers : public CPPUNIT_NS::TestFixture
+{
+    CPPUNIT_TEST_SUITE(Zookeeper_watchers);
+    CPPUNIT_TEST(testDefaultSessionWatcher1);
+    CPPUNIT_TEST(testDefaultSessionWatcher2);
+    CPPUNIT_TEST(testObjectSessionWatcher1);
+    CPPUNIT_TEST(testObjectSessionWatcher2);
+    CPPUNIT_TEST(testNodeWatcher1);
+    CPPUNIT_TEST(testChildWatcher1);
+    CPPUNIT_TEST(testChildWatcher2);
+    CPPUNIT_TEST_SUITE_END();
+
+    static void watcher(zhandle_t *, int, int, const char *,void*){}
+    zhandle_t *zh;
+    FILE *logfile;
+    
+public:
+
+    Zookeeper_watchers() {
+      logfile = openlogfile("Zookeeper_watchers");
+    }
+
+    ~Zookeeper_watchers() {
+      if (logfile) {
+        fflush(logfile);
+        fclose(logfile);
+        logfile = 0;
+      }
+    }
+
+    void setUp()
+    {
+        zoo_set_log_stream(logfile);
+
+        zoo_deterministic_conn_order(0);
+        zh=0;
+    }
+    
+    void tearDown()
+    {
+        zookeeper_close(zh);
+    }
+    
+    class ConnectionWatcher: public WatcherAction{
+    public:
+        ConnectionWatcher():connected_(false),counter_(0){}
+        virtual void onConnectionEstablished(zhandle_t*){
+            synchronized(mx_);
+            counter_++;
+            connected_=true;
+        }
+        SyncedBoolCondition isConnectionEstablished() const{
+            return SyncedBoolCondition(connected_,mx_);
+        }
+        bool connected_;
+        int counter_;
+    };
+
+    class DisconnectWatcher: public WatcherAction{
+    public:
+        DisconnectWatcher():disconnected_(false),counter_(0){}
+        virtual void onConnectionLost(zhandle_t*){
+            synchronized(mx_);
+            counter_++;
+            disconnected_=true;
+        }
+        SyncedBoolCondition isDisconnected() const{
+            return SyncedBoolCondition(disconnected_,mx_);
+        }
+        bool disconnected_;
+        int counter_;
+    };
+
+    class CountingDataWatcher: public WatcherAction{
+    public:
+        CountingDataWatcher():disconnected_(false),counter_(0){}
+        virtual void onNodeValueChanged(zhandle_t*,const char* path){
+            synchronized(mx_);
+            counter_++;
+        }
+        virtual void onConnectionLost(zhandle_t*){
+            synchronized(mx_);
+            counter_++;
+            disconnected_=true;
+        }
+        bool disconnected_;
+        int counter_;
+    };
+
+    class DeletionCountingDataWatcher: public WatcherAction{
+    public:
+        DeletionCountingDataWatcher():counter_(0){}
+        virtual void onNodeDeleted(zhandle_t*,const char* path){
+            synchronized(mx_);
+            counter_++;
+        }
+        int counter_;
+    };
+
+    class ChildEventCountingWatcher: public WatcherAction{
+    public:
+        ChildEventCountingWatcher():counter_(0){}
+        virtual void onChildChanged(zhandle_t*,const char* path){
+            synchronized(mx_);
+            counter_++;
+        }
+        int counter_;
+    };
+
+#ifndef THREADED
+    
+    // verify: the default watcher is called once for a session event
+    void testDefaultSessionWatcher1(){
+        Mock_gettimeofday timeMock;
+        ZookeeperServer zkServer;
+        // must call zookeeper_close() while all the mocks are in scope
+        CloseFinally guard(&zh);
+
+        ConnectionWatcher watcher;
+        zh=zookeeper_init("localhost:2121",activeWatcher,10000,TEST_CLIENT_ID,
+                &watcher,0);
+        CPPUNIT_ASSERT(zh!=0);
+        
+        int fd=0;
+        int interest=0;
+        timeval tv;
+        // open the socket
+        int rc=zookeeper_interest(zh,&fd,&interest,&tv);        
+        CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);        
+        CPPUNIT_ASSERT_EQUAL(ZOO_CONNECTING_STATE,zoo_state(zh));
+        // send the handshake packet to the server
+        rc=zookeeper_process(zh,interest);
+        CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);        
+        CPPUNIT_ASSERT_EQUAL(ZOO_ASSOCIATING_STATE,zoo_state(zh));
+        // receive the server handshake response
+        rc=zookeeper_process(zh,interest);
+        CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
+        // verify connected
+        CPPUNIT_ASSERT_EQUAL(ZOO_CONNECTED_STATE,zoo_state(zh));
+        CPPUNIT_ASSERT(watcher.connected_);
+        CPPUNIT_ASSERT_EQUAL(1,watcher.counter_);
+    }
+    
+    // test case: connect to server, set a default watcher, disconnect from the server
+    // verify: the default watcher is called once
+    void testDefaultSessionWatcher2(){
+        Mock_gettimeofday timeMock;
+        ZookeeperServer zkServer;
+        // must call zookeeper_close() while all the mocks are in scope
+        CloseFinally guard(&zh);
+
+        DisconnectWatcher watcher;
+        zh=zookeeper_init("localhost:2121",activeWatcher,10000,TEST_CLIENT_ID,
+                &watcher,0);
+        CPPUNIT_ASSERT(zh!=0);
+        // simulate connected state
+        forceConnected(zh);
+        
+        // first operation
+        AsyncCompletion ignored;
+        zkServer.addOperationResponse(new ZooGetResponse("1",1));
+        int rc=zoo_aget(zh,"/x/y/1",0,asyncCompletion,&ignored);
+        CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
+        // this will process the response and activate the watcher
+        rc=zookeeper_process(zh,ZOOKEEPER_READ);
+        CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
+
+        // now, disconnect
+        zkServer.setConnectionLost();
+        rc=zookeeper_process(zh,ZOOKEEPER_READ);
+        CPPUNIT_ASSERT_EQUAL((int)ZCONNECTIONLOSS,rc);
+        // verify disconnected
+        CPPUNIT_ASSERT(watcher.disconnected_);
+        CPPUNIT_ASSERT_EQUAL(1,watcher.counter_);
+    }
+    
+    // testcase: connect to the server, set a watcher object on a node, 
+    //           disconnect from the server
+    // verify: the watcher object as well as the default watcher are called
+    void testObjectSessionWatcher1(){
+        Mock_gettimeofday timeMock;
+        ZookeeperServer zkServer;
+        // must call zookeeper_close() while all the mocks are in scope
+        CloseFinally guard(&zh);
+
+        DisconnectWatcher defWatcher;
+        zh=zookeeper_init("localhost:2121",activeWatcher,10000,TEST_CLIENT_ID,
+                &defWatcher,0);
+        CPPUNIT_ASSERT(zh!=0);
+        // simulate connected state
+        forceConnected(zh);
+        
+        AsyncCompletion ignored;
+        CountingDataWatcher wobject;
+        zkServer.addOperationResponse(new ZooStatResponse);
+        int rc=zoo_awexists(zh,"/x/y/1",activeWatcher,&wobject,
+                asyncCompletion,&ignored);
+        CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
+        // this will process the response and activate the watcher
+        rc=zookeeper_process(zh,ZOOKEEPER_READ);
+        CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
+
+        // now, disconnect
+        zkServer.setConnectionLost();
+        rc=zookeeper_process(zh,ZOOKEEPER_READ);
+        CPPUNIT_ASSERT_EQUAL((int)ZCONNECTIONLOSS,rc);
+
+        // verify the default watcher has been triggered
+        CPPUNIT_ASSERT(defWatcher.disconnected_);
+        // and triggered only once
+        CPPUNIT_ASSERT_EQUAL(1,defWatcher.counter_);
+        
+        // the path-specific watcher has been triggered as well
+        CPPUNIT_ASSERT(wobject.disconnected_);
+        // only once!
+        CPPUNIT_ASSERT_EQUAL(1,wobject.counter_);
+    }
+
+    // testcase: connect to the server, set a watcher object on a node, 
+    //           set a def watcher on another node,disconnect from the server
+    // verify: the watcher object as well as the default watcher are called
+    void testObjectSessionWatcher2(){
+        Mock_gettimeofday timeMock;
+        ZookeeperServer zkServer;
+        // must call zookeeper_close() while all the mocks are in scope
+        CloseFinally guard(&zh);
+
+        DisconnectWatcher defWatcher;
+        zh=zookeeper_init("localhost:2121",activeWatcher,10000,TEST_CLIENT_ID,
+                &defWatcher,0);
+        CPPUNIT_ASSERT(zh!=0);
+        // simulate connected state
+        forceConnected(zh);
+        
+        // set the default watcher
+        AsyncCompletion ignored;
+        zkServer.addOperationResponse(new ZooStatResponse);
+        int rc=zoo_aexists(zh,"/a/b/c",1,asyncCompletion,&ignored);
+        CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
+
+        CountingDataWatcher wobject;
+        zkServer.addOperationResponse(new ZooStatResponse);
+        rc=zoo_awexists(zh,"/x/y/z",activeWatcher,&wobject,
+                asyncCompletion,&ignored);
+        CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
+        
+        // this will process the response and activate the watcher
+        while((rc=zookeeper_process(zh,ZOOKEEPER_READ))==ZOK) {
+          millisleep(100);
+        }
+        CPPUNIT_ASSERT_EQUAL((int)ZNOTHING,rc);
+
+        // disconnect now
+        zkServer.setConnectionLost();
+        rc=zookeeper_process(zh,ZOOKEEPER_READ);
+        CPPUNIT_ASSERT_EQUAL((int)ZCONNECTIONLOSS,rc);
+
+        // verify the default watcher has been triggered
+        CPPUNIT_ASSERT(defWatcher.disconnected_);
+        // and triggered only once
+        CPPUNIT_ASSERT_EQUAL(1,defWatcher.counter_);
+        
+        // the path-specific watcher has been triggered as well
+        CPPUNIT_ASSERT(wobject.disconnected_);
+        // only once!
+        CPPUNIT_ASSERT_EQUAL(1,wobject.counter_);
+    }
+
+    // testcase: register 2 node watches for different paths, trigger the watches
+    // verify: the data watchers are processed, the default watcher is not called
+    void testNodeWatcher1(){
+        Mock_gettimeofday timeMock;
+        ZookeeperServer zkServer;
+        // must call zookeeper_close() while all the mocks are in scope
+        CloseFinally guard(&zh);
+
+        DisconnectWatcher defWatcher;
+        zh=zookeeper_init("localhost:2121",activeWatcher,10000,TEST_CLIENT_ID,
+                &defWatcher,0);
+        CPPUNIT_ASSERT(zh!=0);
+        // simulate connected state
+        forceConnected(zh);
+        
+        AsyncCompletion ignored;
+        CountingDataWatcher wobject1;
+        zkServer.addOperationResponse(new ZooStatResponse);
+        int rc=zoo_awexists(zh,"/a/b/c",activeWatcher,&wobject1,
+                asyncCompletion,&ignored);
+        CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
+
+        CountingDataWatcher wobject2;
+        zkServer.addOperationResponse(new ZooStatResponse);
+        rc=zoo_awexists(zh,"/x/y/z",activeWatcher,&wobject2,
+                asyncCompletion,&ignored);
+        CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
+        
+        // this will process the response and activate the watcher
+        while((rc=zookeeper_process(zh,ZOOKEEPER_READ))==ZOK) {
+          millisleep(100);
+        }
+        CPPUNIT_ASSERT_EQUAL((int)ZNOTHING,rc);
+
+        // we are all set now; let's trigger the watches
+        zkServer.addRecvResponse(new ZNodeEvent(ZOO_CHANGED_EVENT,"/a/b/c"));
+        zkServer.addRecvResponse(new ZNodeEvent(ZOO_CHANGED_EVENT,"/x/y/z"));
+        // make sure all watchers have been processed
+        while((rc=zookeeper_process(zh,ZOOKEEPER_READ))==ZOK) {
+          millisleep(100);
+        }
+        CPPUNIT_ASSERT_EQUAL((int)ZNOTHING,rc);
+        
+        CPPUNIT_ASSERT_EQUAL(1,wobject1.counter_);
+        CPPUNIT_ASSERT_EQUAL(1,wobject2.counter_);        
+        CPPUNIT_ASSERT_EQUAL(0,defWatcher.counter_);
+    }
+
+    // testcase: set up both a children and a data watchers on the node /a, then
+    //           delete the node by sending a DELETE_EVENT event
+    // verify: both watchers are triggered
+    void testChildWatcher1(){
+        Mock_gettimeofday timeMock;
+        ZookeeperServer zkServer;
+        // must call zookeeper_close() while all the mocks are in scope
+        CloseFinally guard(&zh);
+
+        DeletionCountingDataWatcher defWatcher;
+        zh=zookeeper_init("localhost:2121",activeWatcher,10000,TEST_CLIENT_ID,
+                &defWatcher,0);
+        CPPUNIT_ASSERT(zh!=0);
+        // simulate connected state
+        forceConnected(zh);
+        
+        AsyncCompletion ignored;
+        DeletionCountingDataWatcher wobject1;
+        zkServer.addOperationResponse(new ZooStatResponse);
+        int rc=zoo_awexists(zh,"/a",activeWatcher,&wobject1,
+                asyncCompletion,&ignored);
+        CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
+
+        typedef ZooGetChildrenResponse::StringVector ZooVector;
+        zkServer.addOperationResponse(new ZooGetChildrenResponse(
+                Util::CollectionBuilder<ZooVector>()("/a/1")("/a/2")
+                ));
+        DeletionCountingDataWatcher wobject2;
+        rc=zoo_awget_children(zh,"/a",activeWatcher,
+                &wobject2,asyncCompletion,&ignored);
+        CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
+        
+        // this will process the response and activate the watcher
+        while((rc=zookeeper_process(zh,ZOOKEEPER_READ))==ZOK) {
+          millisleep(100);
+        }
+        CPPUNIT_ASSERT_EQUAL((int)ZNOTHING,rc);
+
+        // we are all set now; let's trigger the watches
+        zkServer.addRecvResponse(new ZNodeEvent(ZOO_DELETED_EVENT,"/a"));
+        // make sure the watchers have been processed
+        while((rc=zookeeper_process(zh,ZOOKEEPER_READ))==ZOK) {
+          millisleep(100);
+        }
+        CPPUNIT_ASSERT_EQUAL((int)ZNOTHING,rc);
+
+        CPPUNIT_ASSERT_EQUAL(1,wobject1.counter_);
+        CPPUNIT_ASSERT_EQUAL(1,wobject2.counter_);        
+        CPPUNIT_ASSERT_EQUAL(0,defWatcher.counter_);
+    }
+
+    // testcase: create both a child and data watch on the node /a, send a ZOO_CHILD_EVENT
+    // verify: only the child watch triggered
+    void testChildWatcher2(){
+        Mock_gettimeofday timeMock;
+        ZookeeperServer zkServer;
+        // must call zookeeper_close() while all the mocks are in scope
+        CloseFinally guard(&zh);
+
+        ChildEventCountingWatcher defWatcher;
+        zh=zookeeper_init("localhost:2121",activeWatcher,10000,TEST_CLIENT_ID,
+                &defWatcher,0);
+        CPPUNIT_ASSERT(zh!=0);
+        // simulate connected state
+        forceConnected(zh);
+        
+        AsyncCompletion ignored;
+        ChildEventCountingWatcher wobject1;
+        zkServer.addOperationResponse(new ZooStatResponse);
+        int rc=zoo_awexists(zh,"/a",activeWatcher,&wobject1,
+                asyncCompletion,&ignored);
+        CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
+
+        typedef ZooGetChildrenResponse::StringVector ZooVector;
+        zkServer.addOperationResponse(new ZooGetChildrenResponse(
+                Util::CollectionBuilder<ZooVector>()("/a/1")("/a/2")
+                ));
+        ChildEventCountingWatcher wobject2;
+        rc=zoo_awget_children(zh,"/a",activeWatcher,
+                &wobject2,asyncCompletion,&ignored);
+        CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
+        
+        // this will process the response and activate the watcher
+        while((rc=zookeeper_process(zh,ZOOKEEPER_READ))==ZOK) {
+          millisleep(100);
+        }
+        CPPUNIT_ASSERT_EQUAL((int)ZNOTHING,rc);
+
+        // we are all set now; let's trigger the watches
+        zkServer.addRecvResponse(new ZNodeEvent(ZOO_CHILD_EVENT,"/a"));
+        // make sure the watchers have been processed
+        while((rc=zookeeper_process(zh,ZOOKEEPER_READ))==ZOK) {
+          millisleep(100);
+        }
+        CPPUNIT_ASSERT_EQUAL((int)ZNOTHING,rc);
+
+        CPPUNIT_ASSERT_EQUAL(0,wobject1.counter_);
+        CPPUNIT_ASSERT_EQUAL(1,wobject2.counter_);        
+        CPPUNIT_ASSERT_EQUAL(0,defWatcher.counter_);
+    }
+
+#else
+    // verify: the default watcher is called once for a session event
+    void testDefaultSessionWatcher1(){
+        Mock_gettimeofday timeMock;
+        // zookeeper simulator
+        ZookeeperServer zkServer;
+        // detects when all watchers have been delivered
+        WatcherDeliveryTracker deliveryTracker(ZOO_SESSION_EVENT,ZOO_CONNECTED_STATE);
+        Mock_poll pollMock(&zkServer,ZookeeperServer::FD);
+        // must call zookeeper_close() while all the mocks are in the scope!
+        CloseFinally guard(&zh);
+        
+        ConnectionWatcher watcher;
+        zh=zookeeper_init("localhost:2121",activeWatcher,10000,TEST_CLIENT_ID,
+                &watcher,0);
+        CPPUNIT_ASSERT(zh!=0);
+        // wait till watcher proccessing has completed (the connection 
+        // established event)
+        CPPUNIT_ASSERT(ensureCondition(
+                deliveryTracker.isWatcherProcessingCompleted(),1000)<1000);
+        
+        // verify the watcher has been triggered
+        CPPUNIT_ASSERT(ensureCondition(watcher.isConnectionEstablished(),1000)<1000);
+        // triggered only once
+        CPPUNIT_ASSERT_EQUAL(1,watcher.counter_);
+    }
+
+    // test case: connect to server, set a default watcher, disconnect from the server
+    // verify: the default watcher is called once
+    void testDefaultSessionWatcher2(){
+        Mock_gettimeofday timeMock;
+        // zookeeper simulator
+        ZookeeperServer zkServer;
+        Mock_poll pollMock(&zkServer,ZookeeperServer::FD);
+        // must call zookeeper_close() while all the mocks are in the scope!
+        CloseFinally guard(&zh);
+        
+        // detects when all watchers have been delivered
+        WatcherDeliveryTracker deliveryTracker(ZOO_SESSION_EVENT,ZOO_CONNECTING_STATE);
+        DisconnectWatcher watcher;
+        zh=zookeeper_init("localhost:2121",activeWatcher,10000,TEST_CLIENT_ID,
+                &watcher,0);
+        CPPUNIT_ASSERT(zh!=0);
+        // make sure the client has connected
+        CPPUNIT_ASSERT(ensureCondition(ClientConnected(zh),1000)<1000);
+        // set a default watch
+        AsyncCompletion ignored;
+        // a successful server response will activate the watcher 
+        zkServer.addOperationResponse(new ZooStatResponse);
+        int rc=zoo_aexists(zh,"/x/y/z",1,asyncCompletion,&ignored);
+        CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
+
+        // now, initiate a disconnect
+        zkServer.setConnectionLost();
+        CPPUNIT_ASSERT(ensureCondition(
+                deliveryTracker.isWatcherProcessingCompleted(),1000)<1000);
+        
+        // verify the watcher has been triggered
+        CPPUNIT_ASSERT(watcher.disconnected_);
+        // triggered only once
+        CPPUNIT_ASSERT_EQUAL(1,watcher.counter_);
+    }
+
+    // testcase: connect to the server, set a watcher object on a node, 
+    //           disconnect from the server
+    // verify: the watcher object as well as the default watcher are called
+    void testObjectSessionWatcher1(){
+        Mock_gettimeofday timeMock;
+        // zookeeper simulator
+        ZookeeperServer zkServer;
+        Mock_poll pollMock(&zkServer,ZookeeperServer::FD);
+        // must call zookeeper_close() while all the mocks are in the scope!
+        CloseFinally guard(&zh);
+        
+        // detects when all watchers have been delivered
+        WatcherDeliveryTracker deliveryTracker(ZOO_SESSION_EVENT,ZOO_CONNECTING_STATE);
+        DisconnectWatcher defWatcher;
+        // use the tracker to find out when the watcher has been activated
+        WatcherActivationTracker activationTracker;
+        zh=zookeeper_init("localhost:2121",activeWatcher,10000,TEST_CLIENT_ID,
+                &defWatcher,0);
+        CPPUNIT_ASSERT(zh!=0);
+        // make sure the client has connected
+        CPPUNIT_ASSERT(ensureCondition(ClientConnected(zh),1000)<1000);
+
+        AsyncCompletion ignored;
+        // this successful server response will activate the watcher 
+        zkServer.addOperationResponse(new ZooStatResponse);
+        CountingDataWatcher wobject;
+        activationTracker.track(&wobject);
+        // set a path-specific watcher
+        int rc=zoo_awexists(zh,"/x/y/z",activeWatcher,&wobject,
+                asyncCompletion,&ignored);
+        CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
+        // make sure the watcher gets activated before we continue
+        CPPUNIT_ASSERT(ensureCondition(activationTracker.isWatcherActivated(),1000)<1000);
+        
+        // now, initiate a disconnect
+        zkServer.setConnectionLost();
+        // make sure all watchers have been processed
+        CPPUNIT_ASSERT(ensureCondition(
+                deliveryTracker.isWatcherProcessingCompleted(),1000)<1000);
+        
+        // verify the default watcher has been triggered
+        CPPUNIT_ASSERT(defWatcher.disconnected_);
+        // and triggered only once
+        CPPUNIT_ASSERT_EQUAL(1,defWatcher.counter_);
+        
+        // the path-specific watcher has been triggered as well
+        CPPUNIT_ASSERT(wobject.disconnected_);
+        // only once!
+        CPPUNIT_ASSERT_EQUAL(1,wobject.counter_);
+    }
+
+    // testcase: connect to the server, set a watcher object on a node, 
+    //           set a def watcher on another node,disconnect from the server
+    // verify: the watcher object as well as the default watcher are called
+    void testObjectSessionWatcher2(){
+        Mock_gettimeofday timeMock;
+        // zookeeper simulator
+        ZookeeperServer zkServer;
+        Mock_poll pollMock(&zkServer,ZookeeperServer::FD);
+        // must call zookeeper_close() while all the mocks are in the scope!
+        CloseFinally guard(&zh);
+        
+        // detects when all watchers have been delivered
+        WatcherDeliveryTracker deliveryTracker(ZOO_SESSION_EVENT,ZOO_CONNECTING_STATE);
+        DisconnectWatcher defWatcher;
+        // use the tracker to find out when the watcher has been activated
+        WatcherActivationTracker activationTracker;
+        zh=zookeeper_init("localhost:2121",activeWatcher,10000,TEST_CLIENT_ID,
+                &defWatcher,0);
+        CPPUNIT_ASSERT(zh!=0);
+        // make sure the client has connected
+        CPPUNIT_ASSERT(ensureCondition(ClientConnected(zh),1000)<1000);
+
+        // set a default watch
+        AsyncCompletion ignored;
+        // a successful server response will activate the watcher 
+        zkServer.addOperationResponse(new ZooStatResponse);
+        activationTracker.track(&defWatcher);
+        int rc=zoo_aexists(zh,"/a/b/c",1,asyncCompletion,&ignored);
+        CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
+        // make sure the watcher gets activated before we continue
+        CPPUNIT_ASSERT(ensureCondition(activationTracker.isWatcherActivated(),1000)<1000);
+
+        // this successful server response will activate the watcher 
+        zkServer.addOperationResponse(new ZooStatResponse);
+        CountingDataWatcher wobject;
+        activationTracker.track(&wobject);
+        // set a path-specific watcher
+        rc=zoo_awexists(zh,"/x/y/z",activeWatcher,&wobject,
+                asyncCompletion,&ignored);
+        CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
+        // make sure the watcher gets activated before we continue
+        CPPUNIT_ASSERT(ensureCondition(activationTracker.isWatcherActivated(),1000)<1000);
+        
+        // now, initiate a disconnect
+        zkServer.setConnectionLost();
+        // make sure all watchers have been processed
+        CPPUNIT_ASSERT(ensureCondition(
+                deliveryTracker.isWatcherProcessingCompleted(),1000)<1000);
+        
+        // verify the default watcher has been triggered
+        CPPUNIT_ASSERT(defWatcher.disconnected_);
+        // and triggered only once
+        CPPUNIT_ASSERT_EQUAL(1,defWatcher.counter_);
+        
+        // the path-specific watcher has been triggered as well
+        CPPUNIT_ASSERT(wobject.disconnected_);
+        // only once!
+        CPPUNIT_ASSERT_EQUAL(1,wobject.counter_);
+    }
+
+    // testcase: register 2 node watches for different paths, trigger the watches
+    // verify: the data watchers are processed, the default watcher is not called
+    void testNodeWatcher1(){
+        Mock_gettimeofday timeMock;
+        // zookeeper simulator
+        ZookeeperServer zkServer;
+        Mock_poll pollMock(&zkServer,ZookeeperServer::FD);
+        // must call zookeeper_close() while all the mocks are in the scope!
+        CloseFinally guard(&zh);
+        
+        // detects when all watchers have been delivered
+        WatcherDeliveryTracker deliveryTracker(ZOO_CHANGED_EVENT,0,false);
+        CountingDataWatcher defWatcher;
+        // use the tracker to find out when the watcher has been activated
+        WatcherActivationTracker activationTracker;
+        zh=zookeeper_init("localhost:2121",activeWatcher,10000,TEST_CLIENT_ID,
+                &defWatcher,0);
+        CPPUNIT_ASSERT(zh!=0);
+        // make sure the client has connected
+        CPPUNIT_ASSERT(ensureCondition(ClientConnected(zh),1000)<1000);
+
+        // don't care about completions
+        AsyncCompletion ignored;
+        // set a one-shot watch
+        // a successful server response will activate the watcher 
+        zkServer.addOperationResponse(new ZooStatResponse);
+        CountingDataWatcher wobject1;
+        activationTracker.track(&wobject1);
+        int rc=zoo_awexists(zh,"/a/b/c",activeWatcher,&wobject1,
+                asyncCompletion,&ignored);
+        CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
+        // make sure the watcher gets activated before we continue
+        CPPUNIT_ASSERT(ensureCondition(activationTracker.isWatcherActivated(),1000)<1000);
+
+        // this successful server response will activate the watcher 
+        zkServer.addOperationResponse(new ZooStatResponse);
+        CountingDataWatcher wobject2;
+        activationTracker.track(&wobject2);
+        // set a path-specific watcher
+        rc=zoo_awexists(zh,"/x/y/z",activeWatcher,&wobject2,
+                asyncCompletion,&ignored);
+        CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
+        // make sure the watcher gets activated before we continue
+        CPPUNIT_ASSERT(ensureCondition(activationTracker.isWatcherActivated(),1000)<1000);
+
+        // we are all set now; let's trigger the watches
+        zkServer.addRecvResponse(new ZNodeEvent(ZOO_CHANGED_EVENT,"/a/b/c"));
+        zkServer.addRecvResponse(new ZNodeEvent(ZOO_CHANGED_EVENT,"/x/y/z"));
+        // make sure all watchers have been processed
+        CPPUNIT_ASSERT(ensureCondition(
+                deliveryTracker.deliveryCounterEquals(2),1000)<1000);
+        
+        CPPUNIT_ASSERT_EQUAL(1,wobject1.counter_);
+        CPPUNIT_ASSERT_EQUAL(1,wobject2.counter_);        
+        CPPUNIT_ASSERT_EQUAL(0,defWatcher.counter_);
+    }
+
+    // testcase: set up both a children and a data watchers on the node /a, then
+    //           delete the node (that is, send a DELETE_EVENT)
+    // verify: both watchers are triggered
+    void testChildWatcher1(){
+        Mock_gettimeofday timeMock;
+        // zookeeper simulator
+        ZookeeperServer zkServer;
+        Mock_poll pollMock(&zkServer,ZookeeperServer::FD);
+        // must call zookeeper_close() while all the mocks are in the scope!
+        CloseFinally guard(&zh);
+        
+        // detects when all watchers have been delivered
+        WatcherDeliveryTracker deliveryTracker(ZOO_DELETED_EVENT,0);
+        DeletionCountingDataWatcher defWatcher;
+        zh=zookeeper_init("localhost:2121",activeWatcher,10000,TEST_CLIENT_ID,
+                &defWatcher,0);
+        CPPUNIT_ASSERT(zh!=0);
+        // make sure the client has connected
+        CPPUNIT_ASSERT(ensureCondition(ClientConnected(zh),1000)<1000);
+        
+        // a successful server response will activate the watcher 
+        zkServer.addOperationResponse(new ZooStatResponse);
+        DeletionCountingDataWatcher wobject1;
+        Stat stat;
+        // add a node watch
+        int rc=zoo_wexists(zh,"/a",activeWatcher,&wobject1,&stat);
+        CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
+        
+        typedef ZooGetChildrenResponse::StringVector ZooVector;
+        zkServer.addOperationResponse(new ZooGetChildrenResponse(
+                Util::CollectionBuilder<ZooVector>()("/a/1")("/a/2")
+                ));
+        DeletionCountingDataWatcher wobject2;
+        String_vector children;
+        rc=zoo_wget_children(zh,"/a",activeWatcher,&wobject2,&children);
+        deallocate_String_vector(&children);
+        CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
+        
+        // we are all set now; let's trigger the watches
+        zkServer.addRecvResponse(new ZNodeEvent(ZOO_DELETED_EVENT,"/a"));
+        // make sure the watchers have been processed
+        CPPUNIT_ASSERT(ensureCondition(
+                deliveryTracker.isWatcherProcessingCompleted(),1000)<1000);
+
+        CPPUNIT_ASSERT_EQUAL(1,wobject1.counter_);
+        CPPUNIT_ASSERT_EQUAL(1,wobject2.counter_);        
+        CPPUNIT_ASSERT_EQUAL(0,defWatcher.counter_);
+    }
+    
+    // testcase: create both a child and data watch on the node /a, send a ZOO_CHILD_EVENT
+    // verify: only the child watch triggered
+    void testChildWatcher2(){
+        Mock_gettimeofday timeMock;
+        // zookeeper simulator
+        ZookeeperServer zkServer;
+        Mock_poll pollMock(&zkServer,ZookeeperServer::FD);
+        // must call zookeeper_close() while all the mocks are in the scope!
+        CloseFinally guard(&zh);
+        
+        // detects when all watchers have been delivered
+        WatcherDeliveryTracker deliveryTracker(ZOO_CHILD_EVENT,0);
+        ChildEventCountingWatcher defWatcher;
+        zh=zookeeper_init("localhost:2121",activeWatcher,10000,TEST_CLIENT_ID,
+                &defWatcher,0);
+        CPPUNIT_ASSERT(zh!=0);
+        // make sure the client has connected
+        CPPUNIT_ASSERT(ensureCondition(ClientConnected(zh),1000)<1000);
+        // a successful server response will activate the watcher 
+        zkServer.addOperationResponse(new ZooStatResponse);
+        ChildEventCountingWatcher wobject1;
+        Stat stat;
+        // add a node watch
+        int rc=zoo_wexists(zh,"/a",activeWatcher,&wobject1,&stat);
+        CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
+        
+        typedef ZooGetChildrenResponse::StringVector ZooVector;
+        zkServer.addOperationResponse(new ZooGetChildrenResponse(
+                Util::CollectionBuilder<ZooVector>()("/a/1")("/a/2")
+                ));
+        ChildEventCountingWatcher wobject2;
+        String_vector children;
+        rc=zoo_wget_children(zh,"/a",activeWatcher,&wobject2,&children);
+        deallocate_String_vector(&children);
+        CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
+
+        // we are all set now; let's trigger the watches
+        zkServer.addRecvResponse(new ZNodeEvent(ZOO_CHILD_EVENT,"/a"));
+        // make sure the watchers have been processed
+        CPPUNIT_ASSERT(ensureCondition(
+                deliveryTracker.isWatcherProcessingCompleted(),1000)<1000);
+
+        CPPUNIT_ASSERT_EQUAL(0,wobject1.counter_);
+        CPPUNIT_ASSERT_EQUAL(1,wobject2.counter_);        
+        CPPUNIT_ASSERT_EQUAL(0,defWatcher.counter_);
+    }
+
+#endif //THREADED
+};
+
+CPPUNIT_TEST_SUITE_REGISTRATION(Zookeeper_watchers);

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/5694559d/zookeeper-client/zookeeper-client-c/tests/TestZookeeperClose.cc
----------------------------------------------------------------------
diff --git a/zookeeper-client/zookeeper-client-c/tests/TestZookeeperClose.cc b/zookeeper-client/zookeeper-client-c/tests/TestZookeeperClose.cc
new file mode 100644
index 0000000..edefa66
--- /dev/null
+++ b/zookeeper-client/zookeeper-client-c/tests/TestZookeeperClose.cc
@@ -0,0 +1,472 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <cppunit/extensions/HelperMacros.h>
+
+#include "ZKMocks.h"
+
+#ifdef THREADED
+#include "PthreadMocks.h"
+#endif
+
+using namespace std;
+
+class Zookeeper_close : public CPPUNIT_NS::TestFixture
+{
+    CPPUNIT_TEST_SUITE(Zookeeper_close);
+#ifdef THREADED
+    CPPUNIT_TEST(testIOThreadStoppedOnExpire);
+#endif
+    CPPUNIT_TEST(testCloseUnconnected);
+    CPPUNIT_TEST(testCloseUnconnected1);
+    CPPUNIT_TEST(testCloseConnected1);
+    CPPUNIT_TEST(testCloseFromWatcher1);
+    CPPUNIT_TEST_SUITE_END();
+    zhandle_t *zh;
+    static void watcher(zhandle_t *, int, int, const char *,void*){}
+    FILE *logfile;
+public: 
+
+    Zookeeper_close() {
+      logfile = openlogfile("Zookeeper_close");
+    }
+
+    ~Zookeeper_close() {
+      if (logfile) {
+        fflush(logfile);
+        fclose(logfile);
+        logfile = 0;
+      }
+    }
+
+    void setUp()
+    {
+        zoo_set_log_stream(logfile);
+
+        zoo_deterministic_conn_order(0);
+        zh=0;
+    }
+    
+    void tearDown()
+    {
+        zookeeper_close(zh);
+    }
+
+    class CloseOnSessionExpired: public WatcherAction{
+    public:
+        CloseOnSessionExpired(bool callClose=true):
+            callClose_(callClose),rc(ZOK){}
+        virtual void onSessionExpired(zhandle_t* zh){
+            memcpy(&lzh,zh,sizeof(lzh));
+            if(callClose_)
+                rc=zookeeper_close(zh);
+        }
+        zhandle_t lzh;
+        bool callClose_;
+        int rc;
+    };
+    
+#ifndef THREADED
+    void testCloseUnconnected()
+    {       
+        zh=zookeeper_init("localhost:2121",watcher,10000,0,0,0);       
+        CPPUNIT_ASSERT(zh!=0);
+        
+        // do not actually free the memory while in zookeeper_close()
+        Mock_free_noop freeMock;
+        // make a copy of zhandle before close() overwrites some of 
+        // it members with NULLs
+        zhandle_t lzh;
+        memcpy(&lzh,zh,sizeof(lzh));
+        int rc=zookeeper_close(zh);
+        zhandle_t* savezh=zh; zh=0;
+        freeMock.disable(); // disable mock's fake free()- use libc's free() instead
+        
+        // verify that zookeeper_close has done its job
+        CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
+        // memory
+        CPPUNIT_ASSERT_EQUAL(1,freeMock.getFreeCount(savezh));
+        CPPUNIT_ASSERT_EQUAL(1,freeMock.getFreeCount(lzh.hostname));
+        CPPUNIT_ASSERT_EQUAL(1,freeMock.getFreeCount(lzh.addrs));
+        // This cannot be maintained properly CPPUNIT_ASSERT_EQUAL(9,freeMock.callCounter);
+    }
+    void testCloseUnconnected1()
+    {
+        zh=zookeeper_init("localhost:2121",watcher,10000,0,0,0);       
+        CPPUNIT_ASSERT(zh!=0);
+        // simulate connected state 
+        zh->fd=ZookeeperServer::FD;
+        zh->state=ZOO_CONNECTED_STATE;
+        Mock_flush_send_queue zkMock;
+        // do not actually free the memory while in zookeeper_close()
+        Mock_free_noop freeMock;
+        // make a copy of zhandle before close() overwrites some of 
+        // it members with NULLs
+        zhandle_t lzh;
+        memcpy(&lzh,zh,sizeof(lzh));
+        int rc=zookeeper_close(zh);
+        zhandle_t* savezh=zh; zh=0;
+        freeMock.disable(); // disable mock's fake free()- use libc's free() instead
+
+        // verify that zookeeper_close has done its job
+        CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
+        // memory
+        CPPUNIT_ASSERT_EQUAL(1,freeMock.getFreeCount(savezh));
+        CPPUNIT_ASSERT_EQUAL(1,freeMock.getFreeCount(lzh.hostname));
+        CPPUNIT_ASSERT_EQUAL(1,freeMock.getFreeCount(lzh.addrs));
+        // the close request sent?
+        CPPUNIT_ASSERT_EQUAL(1,zkMock.counter);
+    }
+    void testCloseConnected1()
+    {
+        ZookeeperServer zkServer;
+        // poll() will called from zookeeper_close()
+        Mock_poll pollMock(&zkServer,ZookeeperServer::FD);
+
+        zh=zookeeper_init("localhost:2121",watcher,10000,TEST_CLIENT_ID,0,0);
+        CPPUNIT_ASSERT(zh!=0);
+
+        Mock_gettimeofday timeMock;
+        
+        int fd=0;
+        int interest=0;
+        timeval tv;
+        int rc=zookeeper_interest(zh,&fd,&interest,&tv);
+        
+        CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);        
+        CPPUNIT_ASSERT_EQUAL(ZOO_CONNECTING_STATE,zoo_state(zh));
+        CPPUNIT_ASSERT_EQUAL(ZOOKEEPER_READ|ZOOKEEPER_WRITE,interest);
+        
+        rc=zookeeper_process(zh,interest);
+        CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);        
+        CPPUNIT_ASSERT_EQUAL(ZOO_ASSOCIATING_STATE,zoo_state(zh));
+        
+        rc=zookeeper_interest(zh,&fd,&interest,&tv);
+        CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
+        rc=zookeeper_process(zh,interest);
+        CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);        
+        CPPUNIT_ASSERT_EQUAL(ZOO_CONNECTED_STATE,zoo_state(zh));
+        // do not actually free the memory while in zookeeper_close()
+        Mock_free_noop freeMock;
+        // make a copy of zhandle before close() overwrites some of 
+        // it members with NULLs
+        zhandle_t lzh;
+        memcpy(&lzh,zh,sizeof(lzh));
+        zookeeper_close(zh);
+        zhandle_t* savezh=zh; zh=0;
+        freeMock.disable(); // disable mock's fake free()- use libc's free() instead
+        // memory
+        CPPUNIT_ASSERT_EQUAL(1,freeMock.getFreeCount(savezh));
+        CPPUNIT_ASSERT_EQUAL(1,freeMock.getFreeCount(lzh.hostname));
+        CPPUNIT_ASSERT_EQUAL(1,freeMock.getFreeCount(lzh.addrs));
+        // the close request sent?
+        CPPUNIT_ASSERT_EQUAL(1,(int)zkServer.closeSent);
+    }
+    void testCloseFromWatcher1()
+    {
+        Mock_gettimeofday timeMock;
+
+        ZookeeperServer zkServer;
+        // make the server return a non-matching session id
+        zkServer.returnSessionExpired();
+        // poll() will called from zookeeper_close()
+        Mock_poll pollMock(&zkServer,ZookeeperServer::FD);
+
+        CloseOnSessionExpired closeAction;
+        zh=zookeeper_init("localhost:2121",activeWatcher,10000,
+                TEST_CLIENT_ID,&closeAction,0);
+        CPPUNIT_ASSERT(zh!=0);
+        
+        int fd=0;
+        int interest=0;
+        timeval tv;
+        // initiate connection
+        int rc=zookeeper_interest(zh,&fd,&interest,&tv);        
+        CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);        
+        CPPUNIT_ASSERT_EQUAL(ZOO_CONNECTING_STATE,zoo_state(zh));
+        CPPUNIT_ASSERT_EQUAL(ZOOKEEPER_READ|ZOOKEEPER_WRITE,interest);
+        rc=zookeeper_process(zh,interest);
+        // make sure the handshake in progress 
+        CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);        
+        CPPUNIT_ASSERT_EQUAL(ZOO_ASSOCIATING_STATE,zoo_state(zh));
+        rc=zookeeper_interest(zh,&fd,&interest,&tv);
+        CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
+        
+        // do not actually free the memory while in zookeeper_close()
+        Mock_free_noop freeMock;
+        // should call the watcher with ZOO_EXPIRED_SESSION_STATE state
+        rc=zookeeper_process(zh,interest);
+        zhandle_t* savezh=zh; zh=0;
+        freeMock.disable(); // disable mock's fake free()- use libc's free() instead
+        
+        CPPUNIT_ASSERT_EQUAL(ZOO_EXPIRED_SESSION_STATE,zoo_state(savezh));
+        // memory
+        CPPUNIT_ASSERT_EQUAL(1,freeMock.getFreeCount(savezh));
+        CPPUNIT_ASSERT_EQUAL(1,freeMock.getFreeCount(closeAction.lzh.hostname));
+        CPPUNIT_ASSERT_EQUAL(1,freeMock.getFreeCount(closeAction.lzh.addrs));
+        // make sure the close request NOT sent
+        CPPUNIT_ASSERT_EQUAL(0,(int)zkServer.closeSent);
+    }
+#else
+    void testCloseUnconnected()
+    {
+        // disable threading
+        MockPthreadZKNull pthreadMock;
+        zh=zookeeper_init("localhost:2121",watcher,10000,0,0,0); 
+        
+        CPPUNIT_ASSERT(zh!=0);
+        adaptor_threads* adaptor=(adaptor_threads*)zh->adaptor_priv;
+        CPPUNIT_ASSERT(adaptor!=0);
+
+        // do not actually free the memory while in zookeeper_close()
+        Mock_free_noop freeMock;
+        // make a copy of zhandle before close() overwrites some of 
+        // it members with NULLs
+        zhandle_t lzh;
+        memcpy(&lzh,zh,sizeof(lzh));
+        int rc=zookeeper_close(zh);
+        zhandle_t* savezh=zh; zh=0;
+        // we're done, disable mock's fake free(), use libc's free() instead
+        freeMock.disable();
+        
+        // verify that zookeeper_close has done its job
+        CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
+        // memory
+        CPPUNIT_ASSERT_EQUAL(1,freeMock.getFreeCount(savezh));
+        CPPUNIT_ASSERT_EQUAL(1,freeMock.getFreeCount(lzh.hostname));
+        CPPUNIT_ASSERT_EQUAL(1,freeMock.getFreeCount(lzh.addrs));
+        CPPUNIT_ASSERT_EQUAL(1,freeMock.getFreeCount(adaptor));
+        // Cannot be maintained accurately: CPPUNIT_ASSERT_EQUAL(10,freeMock.callCounter);
+        // threads
+        CPPUNIT_ASSERT_EQUAL(1,MockPthreadsNull::getDestroyCounter(adaptor->io));
+        CPPUNIT_ASSERT_EQUAL(1,MockPthreadsNull::getDestroyCounter(adaptor->completion));
+        // mutexes
+        CPPUNIT_ASSERT_EQUAL(1,MockPthreadsNull::getDestroyCounter(&savezh->to_process.lock));
+        CPPUNIT_ASSERT_EQUAL(0,MockPthreadsNull::getInvalidAccessCounter(&savezh->to_process.lock));
+        CPPUNIT_ASSERT_EQUAL(1,MockPthreadsNull::getDestroyCounter(&savezh->to_send.lock));
+        CPPUNIT_ASSERT_EQUAL(0,MockPthreadsNull::getInvalidAccessCounter(&savezh->to_send.lock));
+        CPPUNIT_ASSERT_EQUAL(1,MockPthreadsNull::getDestroyCounter(&savezh->sent_requests.lock));
+        CPPUNIT_ASSERT_EQUAL(0,MockPthreadsNull::getInvalidAccessCounter(&savezh->sent_requests.lock));
+        CPPUNIT_ASSERT_EQUAL(1,MockPthreadsNull::getDestroyCounter(&savezh->completions_to_process.lock));
+        CPPUNIT_ASSERT_EQUAL(0,MockPthreadsNull::getInvalidAccessCounter(&savezh->completions_to_process.lock));
+        // conditionals
+        CPPUNIT_ASSERT_EQUAL(1,MockPthreadsNull::getDestroyCounter(&savezh->sent_requests.cond));
+        CPPUNIT_ASSERT_EQUAL(0,MockPthreadsNull::getInvalidAccessCounter(&savezh->sent_requests.cond));
+        CPPUNIT_ASSERT_EQUAL(1,MockPthreadsNull::getDestroyCounter(&savezh->completions_to_process.cond));
+        CPPUNIT_ASSERT_EQUAL(0,MockPthreadsNull::getInvalidAccessCounter(&savezh->completions_to_process.cond));
+    }
+    void testCloseUnconnected1()
+    {
+        for(int i=0; i<100;i++){
+            zh=zookeeper_init("localhost:2121",watcher,10000,0,0,0); 
+            CPPUNIT_ASSERT(zh!=0);
+            adaptor_threads* adaptor=(adaptor_threads*)zh->adaptor_priv;
+            CPPUNIT_ASSERT(adaptor!=0);
+            int rc=zookeeper_close(zh);
+            zh=0;
+            CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
+        }
+    }
+    void testCloseConnected1()
+    {
+        // frozen time -- no timeouts and no pings
+        Mock_gettimeofday timeMock;
+
+        for(int i=0;i<100;i++){
+            ZookeeperServer zkServer;
+            Mock_poll pollMock(&zkServer,ZookeeperServer::FD);
+            // use a checked version of pthread calls
+            CheckedPthread threadMock;
+            // do not actually free the memory while in zookeeper_close()
+            Mock_free_noop freeMock;
+            
+            zh=zookeeper_init("localhost:2121",watcher,10000,TEST_CLIENT_ID,0,0); 
+            CPPUNIT_ASSERT(zh!=0);
+            // make sure the client has connected
+            CPPUNIT_ASSERT(ensureCondition(ClientConnected(zh),1000)<1000);
+            // make a copy of zhandle before close() overwrites some of 
+            // its members with NULLs
+            zhandle_t lzh;
+            memcpy(&lzh,zh,sizeof(lzh));
+            int rc=zookeeper_close(zh);
+            zhandle_t* savezh=zh; zh=0;
+            // we're done, disable mock's fake free(), use libc's free() instead
+            freeMock.disable();
+            
+            CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);            
+            adaptor_threads* adaptor=(adaptor_threads*)lzh.adaptor_priv;
+            // memory
+            CPPUNIT_ASSERT_EQUAL(1,freeMock.getFreeCount(savezh));
+            CPPUNIT_ASSERT_EQUAL(1,freeMock.getFreeCount(lzh.hostname));
+            CPPUNIT_ASSERT_EQUAL(1,freeMock.getFreeCount(lzh.addrs));
+            CPPUNIT_ASSERT_EQUAL(1,freeMock.getFreeCount(adaptor));
+            // threads
+            CPPUNIT_ASSERT_EQUAL(1,CheckedPthread::getDestroyCounter(adaptor->io));
+            CPPUNIT_ASSERT_EQUAL(1,CheckedPthread::getDestroyCounter(adaptor->completion));
+            // mutexes
+            CPPUNIT_ASSERT_EQUAL(1,CheckedPthread::getDestroyCounter(&savezh->to_process.lock));
+            CPPUNIT_ASSERT_EQUAL(0,CheckedPthread::getInvalidAccessCounter(&savezh->to_process.lock));
+            CPPUNIT_ASSERT_EQUAL(1,CheckedPthread::getDestroyCounter(&savezh->to_send.lock));
+            CPPUNIT_ASSERT_EQUAL(0,CheckedPthread::getInvalidAccessCounter(&savezh->to_send.lock));
+            CPPUNIT_ASSERT_EQUAL(1,CheckedPthread::getDestroyCounter(&savezh->sent_requests.lock));
+            CPPUNIT_ASSERT_EQUAL(0,CheckedPthread::getInvalidAccessCounter(&savezh->sent_requests.lock));
+            CPPUNIT_ASSERT_EQUAL(1,CheckedPthread::getDestroyCounter(&savezh->completions_to_process.lock));
+            CPPUNIT_ASSERT_EQUAL(0,CheckedPthread::getInvalidAccessCounter(&savezh->completions_to_process.lock));
+            // conditionals
+            CPPUNIT_ASSERT_EQUAL(1,CheckedPthread::getDestroyCounter(&savezh->sent_requests.cond));
+            CPPUNIT_ASSERT_EQUAL(0,CheckedPthread::getInvalidAccessCounter(&savezh->sent_requests.cond));
+            CPPUNIT_ASSERT_EQUAL(1,CheckedPthread::getDestroyCounter(&savezh->completions_to_process.cond));
+            CPPUNIT_ASSERT_EQUAL(0,CheckedPthread::getInvalidAccessCounter(&savezh->completions_to_process.cond));
+        }
+    }
+    
+    struct PointerFreed{
+        PointerFreed(Mock_free_noop& freeMock,void* ptr):
+            freeMock_(freeMock),ptr_(ptr){}
+        bool operator()() const{return freeMock_.isFreed(ptr_); }
+        Mock_free_noop& freeMock_;
+        void* ptr_;
+    };
+    // test if zookeeper_close may be called from a watcher callback on
+    // SESSION_EXPIRED event
+    void testCloseFromWatcher1()
+    {
+        // frozen time -- no timeouts and no pings
+        Mock_gettimeofday timeMock;
+        
+        for(int i=0;i<100;i++){
+            ZookeeperServer zkServer;
+            // make the server return a non-matching session id
+            zkServer.returnSessionExpired();
+            
+            Mock_poll pollMock(&zkServer,ZookeeperServer::FD);
+            // use a checked version of pthread calls
+            CheckedPthread threadMock;
+            // do not actually free the memory while in zookeeper_close()
+            Mock_free_noop freeMock;
+
+            CloseOnSessionExpired closeAction;
+            zh=zookeeper_init("localhost:2121",activeWatcher,10000,
+                    TEST_CLIENT_ID,&closeAction,0);
+            
+            CPPUNIT_ASSERT(zh!=0);
+            // we rely on the fact that zh is freed the last right before
+            // zookeeper_close() returns...
+            CPPUNIT_ASSERT(ensureCondition(PointerFreed(freeMock,zh),1000)<1000);
+            zhandle_t* lzh=zh;
+            zh=0;
+            // we're done, disable mock's fake free(), use libc's free() instead
+            freeMock.disable();
+            
+            CPPUNIT_ASSERT_EQUAL((int)ZOK,closeAction.rc);          
+            adaptor_threads* adaptor=(adaptor_threads*)closeAction.lzh.adaptor_priv;
+            // memory
+            CPPUNIT_ASSERT_EQUAL(1,freeMock.getFreeCount(lzh));
+            CPPUNIT_ASSERT_EQUAL(1,freeMock.getFreeCount(closeAction.lzh.hostname));
+            CPPUNIT_ASSERT_EQUAL(1,freeMock.getFreeCount(closeAction.lzh.addrs));
+            CPPUNIT_ASSERT_EQUAL(1,freeMock.getFreeCount(adaptor));
+            // threads
+            CPPUNIT_ASSERT_EQUAL(1,CheckedPthread::getDestroyCounter(adaptor->io));
+            CPPUNIT_ASSERT_EQUAL(1,CheckedPthread::getDestroyCounter(adaptor->completion));
+            // mutexes
+            CPPUNIT_ASSERT_EQUAL(1,CheckedPthread::getDestroyCounter(&lzh->to_process.lock));
+            CPPUNIT_ASSERT_EQUAL(0,CheckedPthread::getInvalidAccessCounter(&lzh->to_process.lock));
+            CPPUNIT_ASSERT_EQUAL(1,CheckedPthread::getDestroyCounter(&lzh->to_send.lock));
+            CPPUNIT_ASSERT_EQUAL(0,CheckedPthread::getInvalidAccessCounter(&lzh->to_send.lock));
+            CPPUNIT_ASSERT_EQUAL(1,CheckedPthread::getDestroyCounter(&lzh->sent_requests.lock));
+            CPPUNIT_ASSERT_EQUAL(0,CheckedPthread::getInvalidAccessCounter(&lzh->sent_requests.lock));
+            CPPUNIT_ASSERT_EQUAL(1,CheckedPthread::getDestroyCounter(&lzh->completions_to_process.lock));
+            CPPUNIT_ASSERT_EQUAL(0,CheckedPthread::getInvalidAccessCounter(&lzh->completions_to_process.lock));
+            // conditionals
+            CPPUNIT_ASSERT_EQUAL(1,CheckedPthread::getDestroyCounter(&lzh->sent_requests.cond));
+            CPPUNIT_ASSERT_EQUAL(0,CheckedPthread::getInvalidAccessCounter(&lzh->sent_requests.cond));
+            CPPUNIT_ASSERT_EQUAL(1,CheckedPthread::getDestroyCounter(&lzh->completions_to_process.cond));
+            CPPUNIT_ASSERT_EQUAL(0,CheckedPthread::getInvalidAccessCounter(&lzh->completions_to_process.cond));
+        }
+    }
+
+    void testIOThreadStoppedOnExpire()
+    {
+        // frozen time -- no timeouts and no pings
+        Mock_gettimeofday timeMock;
+        
+        for(int i=0;i<100;i++){
+            ZookeeperServer zkServer;
+            // make the server return a non-matching session id
+            zkServer.returnSessionExpired();
+            
+            Mock_poll pollMock(&zkServer,ZookeeperServer::FD);
+            // use a checked version of pthread calls
+            CheckedPthread threadMock;
+            // do not call zookeeper_close() from the watcher
+            CloseOnSessionExpired closeAction(false);
+            zh=zookeeper_init("localhost:2121",activeWatcher,10000,
+                    &testClientId,&closeAction,0);
+            
+            // this is to ensure that if any assert fires, zookeeper_close() 
+            // will still be called while all the mocks are in the scope!
+            CloseFinally guard(&zh);
+
+            CPPUNIT_ASSERT(zh!=0);
+            CPPUNIT_ASSERT(ensureCondition(SessionExpired(zh),1000)<1000);
+            CPPUNIT_ASSERT(ensureCondition(IOThreadStopped(zh),1000)<1000);
+            // make sure the watcher has been processed
+            CPPUNIT_ASSERT(ensureCondition(closeAction.isWatcherTriggered(),1000)<1000);
+            // make sure the threads have not been destroyed yet
+            adaptor_threads* adaptor=(adaptor_threads*)zh->adaptor_priv;
+            CPPUNIT_ASSERT_EQUAL(0,CheckedPthread::getDestroyCounter(adaptor->io));
+            CPPUNIT_ASSERT_EQUAL(0,CheckedPthread::getDestroyCounter(adaptor->completion));
+            // about to call zookeeper_close() -- no longer need the guard
+            guard.disarm();
+            
+            // do not actually free the memory while in zookeeper_close()
+            Mock_free_noop freeMock;
+            zookeeper_close(zh);
+            zhandle_t* lzh=zh; zh=0;
+            // we're done, disable mock's fake free(), use libc's free() instead
+            freeMock.disable();
+            
+            // memory
+            CPPUNIT_ASSERT_EQUAL(1,freeMock.getFreeCount(lzh));
+            CPPUNIT_ASSERT_EQUAL(1,freeMock.getFreeCount(closeAction.lzh.hostname));
+            CPPUNIT_ASSERT_EQUAL(1,freeMock.getFreeCount(closeAction.lzh.addrs));
+            CPPUNIT_ASSERT_EQUAL(1,freeMock.getFreeCount(adaptor));
+            // threads
+            CPPUNIT_ASSERT_EQUAL(1,CheckedPthread::getDestroyCounter(adaptor->io));
+            CPPUNIT_ASSERT_EQUAL(1,CheckedPthread::getDestroyCounter(adaptor->completion));
+            // mutexes
+            CPPUNIT_ASSERT_EQUAL(1,CheckedPthread::getDestroyCounter(&lzh->to_process.lock));
+            CPPUNIT_ASSERT_EQUAL(0,CheckedPthread::getInvalidAccessCounter(&lzh->to_process.lock));
+            CPPUNIT_ASSERT_EQUAL(1,CheckedPthread::getDestroyCounter(&lzh->to_send.lock));
+            CPPUNIT_ASSERT_EQUAL(0,CheckedPthread::getInvalidAccessCounter(&lzh->to_send.lock));
+            CPPUNIT_ASSERT_EQUAL(1,CheckedPthread::getDestroyCounter(&lzh->sent_requests.lock));
+            CPPUNIT_ASSERT_EQUAL(0,CheckedPthread::getInvalidAccessCounter(&lzh->sent_requests.lock));
+            CPPUNIT_ASSERT_EQUAL(1,CheckedPthread::getDestroyCounter(&lzh->completions_to_process.lock));
+            CPPUNIT_ASSERT_EQUAL(0,CheckedPthread::getInvalidAccessCounter(&lzh->completions_to_process.lock));
+            // conditionals
+            CPPUNIT_ASSERT_EQUAL(1,CheckedPthread::getDestroyCounter(&lzh->sent_requests.cond));
+            CPPUNIT_ASSERT_EQUAL(0,CheckedPthread::getInvalidAccessCounter(&lzh->sent_requests.cond));
+            CPPUNIT_ASSERT_EQUAL(1,CheckedPthread::getDestroyCounter(&lzh->completions_to_process.cond));
+            CPPUNIT_ASSERT_EQUAL(0,CheckedPthread::getInvalidAccessCounter(&lzh->completions_to_process.cond));
+        }
+    }
+
+#endif
+};
+
+CPPUNIT_TEST_SUITE_REGISTRATION(Zookeeper_close);


Mime
View raw message