zookeeper-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From f..@apache.org
Subject svn commit: r1608620 - in /zookeeper/trunk: ./ src/c/ src/c/include/ src/c/src/ src/c/tests/
Date Mon, 07 Jul 2014 21:43:45 GMT
Author: fpj
Date: Mon Jul  7 21:43:44 2014
New Revision: 1608620

URL: http://svn.apache.org/r1608620
Log:
ZOOKEEPER-827. enable r/o mode in C client library (rgs via fpj)


Added:
    zookeeper/trunk/src/c/tests/TestReadOnlyClient.cc
    zookeeper/trunk/src/c/tests/WatchUtil.h
    zookeeper/trunk/src/c/tests/quorum.cfg
Modified:
    zookeeper/trunk/CHANGES.txt
    zookeeper/trunk/src/c/Makefile.am
    zookeeper/trunk/src/c/README
    zookeeper/trunk/src/c/include/zookeeper.h
    zookeeper/trunk/src/c/src/addrvec.c
    zookeeper/trunk/src/c/src/addrvec.h
    zookeeper/trunk/src/c/src/cli.c
    zookeeper/trunk/src/c/src/load_gen.c
    zookeeper/trunk/src/c/src/zk_adaptor.h
    zookeeper/trunk/src/c/src/zookeeper.c
    zookeeper/trunk/src/c/tests/TestClientRetry.cc
    zookeeper/trunk/src/c/tests/ZKMocks.cc
    zookeeper/trunk/src/c/tests/ZKMocks.h
    zookeeper/trunk/src/c/tests/zkServer.sh

Modified: zookeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/trunk/CHANGES.txt?rev=1608620&r1=1608619&r2=1608620&view=diff
==============================================================================
--- zookeeper/trunk/CHANGES.txt (original)
+++ zookeeper/trunk/CHANGES.txt Mon Jul  7 21:43:44 2014
@@ -32,6 +32,8 @@ NEW FEATURES:
   ZOOKEEPER-1928. add configurable throttling to the number of snapshots
   concurrently sent by a leader (Edward Carter via fpj)
 
+  ZOOKEEPER-827. enable r/o mode in C client library (rgs via fpj)
+
 BUGFIXES:
 
   ZOOKEEPER-1900. NullPointerException in truncate (Camille Fournier)

Modified: zookeeper/trunk/src/c/Makefile.am
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/c/Makefile.am?rev=1608620&r1=1608619&r2=1608620&view=diff
==============================================================================
--- zookeeper/trunk/src/c/Makefile.am (original)
+++ zookeeper/trunk/src/c/Makefile.am Mon Jul  7 21:43:44 2014
@@ -93,7 +93,9 @@ TEST_SOURCES = \
 	tests/TestWatchers.cc \
 	tests/TestClient.cc \
 	tests/ZooKeeperQuorumServer.cc \
-	tests/ZooKeeperQuorumServer.h
+	tests/ZooKeeperQuorumServer.h \
+	tests/TestReadOnlyClient.cc \
+	$(NULL)
 
 SYMBOL_WRAPPERS=$(shell cat ${srcdir}/tests/wrappers.opt)
 

Modified: zookeeper/trunk/src/c/README
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/c/README?rev=1608620&r1=1608619&r2=1608620&view=diff
==============================================================================
--- zookeeper/trunk/src/c/README (original)
+++ zookeeper/trunk/src/c/README Mon Jul  7 21:43:44 2014
@@ -85,6 +85,10 @@ against zookeeper_st library):
 
 $ cli_mt zookeeper_host:9876
 
+To start a client with read-only mode enabled, use the -r flag:
+
+$ cli_mt -r zookeeper_host:9876
+
 This is a client application that gives you a shell for executing
 simple zookeeper commands. Once successfully started and connected to
 the server it displays a shell prompt.

Modified: zookeeper/trunk/src/c/include/zookeeper.h
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/c/include/zookeeper.h?rev=1608620&r1=1608619&r2=1608620&view=diff
==============================================================================
--- zookeeper/trunk/src/c/include/zookeeper.h (original)
+++ zookeeper/trunk/src/c/include/zookeeper.h Mon Jul  7 21:43:44 2014
@@ -121,8 +121,10 @@ enum ZOO_ERRORS {
   ZCLOSING = -116, /*!< ZooKeeper is closing */
   ZNOTHING = -117, /*!< (not error) no server responses to process */
   ZSESSIONMOVED = -118, /*!<session moved to another server, so operation is ignored */
+  ZNOTREADONLY = -119, /*!< state-changing request is passed to read-only server */
   ZEPHEMERALONLOCALSESSION = -120, /*!< Attempt to create ephemeral node on a local session */
-  ZNOWATCHER = -121 /*!< The watcher couldn't be found */
+  ZNOWATCHER = -121, /*!< The watcher couldn't be found */
+  ZRWSERVERFOUND = -122 /*!< r/w server found while in r/o mode */
 };
 
 #ifdef __cplusplus
@@ -147,6 +149,9 @@ extern ZOOAPI const int ZOO_PERM_ALL;
 
 #define ZOO_CONFIG_NODE "/zookeeper/config"
 
+/* flags for zookeeper_init{,2} */
+#define ZOO_READONLY         1
+
 /** This Id represents anyone. */
 extern ZOOAPI struct Id ZOO_ANYONE_ID_UNSAFE;
 /** This Id is only usable to set ACLs. It will get substituted with the
@@ -196,6 +201,7 @@ extern ZOOAPI const int ZOO_AUTH_FAILED_
 extern ZOOAPI const int ZOO_CONNECTING_STATE;
 extern ZOOAPI const int ZOO_ASSOCIATING_STATE;
 extern ZOOAPI const int ZOO_CONNECTED_STATE;
+extern ZOOAPI const int ZOO_READONLY_STATE;
 extern ZOOAPI const int ZOO_NOTCONNECTED_STATE;
 // @}
 

Modified: zookeeper/trunk/src/c/src/addrvec.c
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/c/src/addrvec.c?rev=1608620&r1=1608619&r2=1608620&view=diff
==============================================================================
--- zookeeper/trunk/src/c/src/addrvec.c (original)
+++ zookeeper/trunk/src/c/src/addrvec.c Mon Jul  7 21:43:44 2014
@@ -194,21 +194,45 @@ int addrvec_atend(const addrvec_t *avec)
 
 void addrvec_next(addrvec_t *avec, struct sockaddr_storage *next)
 {
+    int index;
+
     // If we're at the end of the list, then reset index to start
-    if (addrvec_atend(avec))
-    {
+    if (addrvec_atend(avec)) {
         avec->next = 0;
     }
 
-    if (!addrvec_hasnext(avec))
-    {
+    if (!addrvec_hasnext(avec)) {
+        if (next) {
+            memset(next, 0, sizeof(*next));
+        }
+
+        return;
+    }
+
+    index = avec->next++;
+
+    if (next) {
+        *next = avec->data[index];
+    }
+}
+
+void addrvec_peek(addrvec_t *avec, struct sockaddr_storage *next)
+{
+    int index = avec->next;
+
+    if (avec->count == 0) {
         memset(next, 0, sizeof(*next));
         return;
     }
 
-    *next = avec->data[avec->next++];
+    if (addrvec_atend(avec)) {
+        index = 0;
+    }
+
+    *next = avec->data[index];
 }
 
+
 int addrvec_eq(const addrvec_t *a1, const addrvec_t *a2)
 {
     uint32_t i = 0;

Modified: zookeeper/trunk/src/c/src/addrvec.h
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/c/src/addrvec.h?rev=1608620&r1=1608619&r2=1608620&view=diff
==============================================================================
--- zookeeper/trunk/src/c/src/addrvec.h (original)
+++ zookeeper/trunk/src/c/src/addrvec.h Mon Jul  7 21:43:44 2014
@@ -111,7 +111,9 @@ int addrvec_hasnext(const addrvec_t *ave
 int addrvec_atend(const addrvec_t *avec);
 
 /**
- * Get the next entry from the addrvec and update the associated index. 
+ * Get the next entry from the addrvec and update the associated index.
+ *
+ * If next is NULL, the index will still be updated.
  * 
  * If the current index points at (or after) the last element in the vector then
  * it will loop back around and start at the beginning of the list.
@@ -119,6 +121,11 @@ int addrvec_atend(const addrvec_t *avec)
 void addrvec_next(addrvec_t *avec, struct sockaddr_storage *next);
 
 /**
+ * Retrieves the next entry from the addrvec but doesn't update the index.
+ */
+void addrvec_peek(addrvec_t *avec, struct sockaddr_storage *next);
+
+/**
  * Compare two addrvecs for equality. 
  * 
  * \returns 1 if the contents of the two lists are identical and and 0 otherwise.

Modified: zookeeper/trunk/src/c/src/cli.c
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/c/src/cli.c?rev=1608620&r1=1608619&r2=1608620&view=diff
==============================================================================
--- zookeeper/trunk/src/c/src/cli.c (original)
+++ zookeeper/trunk/src/c/src/cli.c Mon Jul  7 21:43:44 2014
@@ -76,6 +76,8 @@ static const char* state2String(int stat
     return "ASSOCIATING_STATE";
   if (state == ZOO_CONNECTED_STATE)
     return "CONNECTED_STATE";
+  if (state == ZOO_READONLY_STATE)
+    return "READONLY_STATE";
   if (state == ZOO_EXPIRED_SESSION_STATE)
     return "EXPIRED_SESSION_STATE";
   if (state == ZOO_AUTH_FAILED_STATE)
@@ -661,6 +663,7 @@ int main(int argc, char **argv) {
     char appId[64];
 #endif
     int bufoff = 0;
+    int flags, i;
     FILE *fh;
 
     if (argc < 2) {
@@ -690,6 +693,15 @@ int main(int argc, char **argv) {
         }
       }
     }
+
+    flags = 0;
+    for (i = 1; i < argc; ++i) {
+      if (strcmp("-r", argv[i]) == 0) {
+        flags = ZOO_READONLY;
+        break;
+      }
+    }
+
 #ifdef YCA
     strcpy(appId,"yahoo.example.yca_test");
     cert = yca_get_cert_once(appId);
@@ -708,7 +720,7 @@ int main(int argc, char **argv) {
     zoo_set_debug_level(ZOO_LOG_LEVEL_WARN);
     zoo_deterministic_conn_order(1); // enable deterministic order
     hostPort = argv[1];
-    zh = zookeeper_init(hostPort, watcher, 30000, &myid, 0, 0);
+    zh = zookeeper_init(hostPort, watcher, 30000, &myid, NULL, flags);
     if (!zh) {
         return errno;
     }

Modified: zookeeper/trunk/src/c/src/load_gen.c
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/c/src/load_gen.c?rev=1608620&r1=1608619&r2=1608620&view=diff
==============================================================================
--- zookeeper/trunk/src/c/src/load_gen.c (original)
+++ zookeeper/trunk/src/c/src/load_gen.c Mon Jul  7 21:43:44 2014
@@ -75,8 +75,8 @@ void waitCounter(){
 }
 
 void listener(zhandle_t *zzh, int type, int state, const char *path,void* ctx) {
-    if(type == ZOO_SESSION_EVENT){
-        if(state == ZOO_CONNECTED_STATE){
+    if (type == ZOO_SESSION_EVENT) {
+        if (state == ZOO_CONNECTED_STATE || state == ZOO_READONLY_STATE) {
             pthread_mutex_lock(&lock);
             pthread_cond_broadcast(&cond);
             pthread_mutex_unlock(&lock);

Modified: zookeeper/trunk/src/c/src/zk_adaptor.h
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/c/src/zk_adaptor.h?rev=1608620&r1=1608619&r2=1608620&view=diff
==============================================================================
--- zookeeper/trunk/src/c/src/zk_adaptor.h (original)
+++ zookeeper/trunk/src/c/src/zk_adaptor.h Mon Jul  7 21:43:44 2014
@@ -42,6 +42,7 @@
 #define CONNECTING_STATE_DEF 1
 #define ASSOCIATING_STATE_DEF 2
 #define CONNECTED_STATE_DEF 3
+#define READONLY_STATE_DEF 5
 #define NOTCONNECTED_STATE_DEF 999
 
 /* zookeeper event type constants */
@@ -131,7 +132,7 @@ typedef struct _buffer_list {
 } buffer_list_t;
 
 /* the size of connect request */
-#define HANDSHAKE_REQ_SIZE 44
+#define HANDSHAKE_REQ_SIZE 45
 /* connect request */
 struct connect_req {
     int32_t protocolVersion;
@@ -140,6 +141,7 @@ struct connect_req {
     int64_t sessionId;
     int32_t passwd_len;
     char passwd[16];
+    char readOnly;
 };
 
 /* the connect response */
@@ -150,6 +152,7 @@ struct prime_struct {
     int64_t sessionId;
     int32_t passwd_len;
     char passwd[16];
+    char readOnly;
 }; 
 
 #ifdef THREADED
@@ -217,6 +220,10 @@ struct _zhandle {
     completion_head_t completions_to_process; // completions that are ready to run
     int outstanding_sync;               // number of outstanding synchronous requests
 
+    /* read-only mode specific fields */
+    struct timeval last_ping_rw; /* The last time we checked server for being r/w */
+    int ping_rw_timeout; /* The time that can go by before checking next server */
+
     // State info
     volatile int state;                 // Current zookeeper state
     void *context;                      // client-side provided context
@@ -228,7 +235,7 @@ struct _zhandle {
     // Primer storage
     struct _buffer_list primer_buffer;  // The buffer used for the handshake at the start of a connection
     struct prime_struct primer_storage; // the connect response
-    char primer_storage_buffer[40];     // the true size of primer_storage
+    char primer_storage_buffer[41];     // the true size of primer_storage
 
     /* zookeeper_close is not reentrant because it de-allocates the zhandler. 
      * This guard variable is used to defer the destruction of zhandle till 
@@ -249,6 +256,11 @@ struct _zhandle {
 
     /** used for chroot path at the client side **/
     char *chroot;
+
+    /** Indicates if this client is allowed to go to r/o mode */
+    char allow_read_only;
+    /** Indicates if we connected to a majority server before */
+    char seen_rw_server_before;
 };
 
 

Modified: zookeeper/trunk/src/c/src/zookeeper.c
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/c/src/zookeeper.c?rev=1608620&r1=1608619&r2=1608620&view=diff
==============================================================================
--- zookeeper/trunk/src/c/src/zookeeper.c (original)
+++ zookeeper/trunk/src/c/src/zookeeper.c Mon Jul  7 21:43:44 2014
@@ -41,7 +41,7 @@
 #include <stdarg.h>
 #include <limits.h>
 
-#ifndef WIN32
+#ifndef _WIN32
 #include <sys/time.h>
 #include <sys/socket.h>
 #include <poll.h>
@@ -76,6 +76,7 @@ const int ZOO_AUTH_FAILED_STATE = AUTH_F
 const int ZOO_CONNECTING_STATE = CONNECTING_STATE_DEF;
 const int ZOO_ASSOCIATING_STATE = ASSOCIATING_STATE_DEF;
 const int ZOO_CONNECTED_STATE = CONNECTED_STATE_DEF;
+const int ZOO_READONLY_STATE = READONLY_STATE_DEF;
 const int ZOO_NOTCONNECTED_STATE = NOTCONNECTED_STATE_DEF;
 
 static __attribute__ ((unused)) const char* state2String(int state){
@@ -88,6 +89,8 @@ static __attribute__ ((unused)) const ch
         return "ZOO_ASSOCIATING_STATE";
     case CONNECTED_STATE_DEF:
         return "ZOO_CONNECTED_STATE";
+    case READONLY_STATE_DEF:
+        return "ZOO_READONLY_STATE";
     case EXPIRED_SESSION_STATE_DEF:
         return "ZOO_EXPIRED_SESSION_STATE";
     case AUTH_FAILED_STATE_DEF:
@@ -225,17 +228,28 @@ static __attribute__((unused)) void prin
 static void *SYNCHRONOUS_MARKER = (void*)&SYNCHRONOUS_MARKER;
 static int isValidPath(const char* path, const int flags);
 
-#ifdef _WINDOWS
-static int zookeeper_send(SOCKET s, const void* buf, int len)
+#ifdef _WIN32
+typedef SOCKET socket_t;
+typedef int sendsize_t;
+#define SEND_FLAGS  0
 #else
-static ssize_t zookeeper_send(int s, const void* buf, size_t len)
+#ifdef __APPLE__
+#define MSG_NOSIGNAL SO_NOSIGPIPE
 #endif
-{
-#ifdef __linux__
-  return send(s, buf, len, MSG_NOSIGNAL);
-#else
-  return send(s, buf, len, 0);
+typedef int socket_t;
+typedef ssize_t sendsize_t;
+#define SEND_FLAGS  MSG_NOSIGNAL
 #endif
+
+static void zookeeper_set_sock_nodelay(zhandle_t *, socket_t);
+static void zookeeper_set_sock_noblock(zhandle_t *, socket_t);
+static void zookeeper_set_sock_timeout(zhandle_t *, socket_t, int);
+static int zookeeper_connect(zhandle_t *, struct sockaddr_storage *, socket_t);
+
+
+static sendsize_t zookeeper_send(socket_t s, const void* buf, size_t len)
+{
+    return send(s, buf, len, SEND_FLAGS);
 }
 
 const void *zoo_get_context(zhandle_t *zh)
@@ -438,7 +452,7 @@ static void destroy(zhandle_t *zh)
 
 static void setup_random()
 {
-#ifndef WIN32          // TODO: better seed
+#ifndef _WIN32          // TODO: better seed
     int seed;
     int fd = open("/dev/urandom", O_RDONLY);
     if (fd == -1) {
@@ -651,7 +665,7 @@ static int resolve_hosts(const zhandle_t
 #endif
             if (rc != 0) {
                 errno = getaddrinfo_errno(rc);
-#ifdef WIN32
+#ifdef _WIN32
                 LOG_ERROR(LOGCALLBACK(zh), "Win32 message: %s\n", gai_strerror(rc));
 #elif __linux__ && __GNUC__
                 LOG_ERROR(LOGCALLBACK(zh), "getaddrinfo: %s\n", gai_strerror(rc));
@@ -990,7 +1004,7 @@ static zhandle_t *zookeeper_init_interna
     zh->log_callback = log_callback;
     log_env(zh);
 
-#ifdef WIN32
+#ifdef _WIN32
     if (Win32WSAStartup()){
         LOG_ERROR(LOGCALLBACK(zh), "Error initializing ws2_32.dll");
         return 0;
@@ -1012,6 +1026,9 @@ static zhandle_t *zookeeper_init_interna
     zh->state = ZOO_NOTCONNECTED_STATE;
     zh->context = context;
     zh->recv_timeout = recv_timeout;
+    zh->allow_read_only = flags & ZOO_READONLY;
+    // non-zero clientid implies we've seen r/w server already
+    zh->seen_rw_server_before = (clientid != 0 && clientid->client_id != 0);
     init_auth_info(&zh->auth_h);
     if (watcher) {
        zh->watcher = watcher;
@@ -1054,6 +1071,7 @@ static zhandle_t *zookeeper_init_interna
     if(update_addrs(zh) != 0) {
         goto abort;
     }
+
     if (clientid) {
         memcpy(&zh->client_id, clientid, sizeof(zh->client_id));
     } else {
@@ -1382,11 +1400,7 @@ static __attribute__ ((unused)) int get_
  * 0 if send would block while sending the buffer (or a send was incomplete),
  * 1 if success
  */
-#ifdef WIN32
-static int send_buffer(SOCKET fd, buffer_list_t *buff)
-#else
-static int send_buffer(int fd, buffer_list_t *buff)
-#endif
+static int send_buffer(socket_t fd, buffer_list_t *buff)
 {
     int len = buff->len;
     int off = buff->curr_offset;
@@ -1398,10 +1412,10 @@ static int send_buffer(int fd, buffer_li
         char *b = (char*)&nlen;
         rc = zookeeper_send(fd, b + off, sizeof(nlen) - off);
         if (rc == -1) {
-#ifndef _WINDOWS
-            if (errno != EAGAIN) {
-#else
+#ifdef _WIN32
             if (WSAGetLastError() != WSAEWOULDBLOCK) {
+#else
+            if (errno != EAGAIN) {
 #endif
                 return -1;
             } else {
@@ -1417,10 +1431,10 @@ static int send_buffer(int fd, buffer_li
         off -= sizeof(buff->len);
         rc = zookeeper_send(fd, buff->buffer + off, len - off);
         if (rc == -1) {
-#ifndef _WINDOWS
-            if (errno != EAGAIN) {
-#else
+#ifdef _WIN32
             if (WSAGetLastError() != WSAEWOULDBLOCK) {
+#else
+            if (errno != EAGAIN) {
 #endif
                 return -1;
             }
@@ -1436,29 +1450,23 @@ static int send_buffer(int fd, buffer_li
  * 0 if recv would block,
  * 1 if success
  */
-#ifdef WIN32
-static int recv_buffer(SOCKET fd, buffer_list_t *buff)
-#else
-static int recv_buffer(int fd, buffer_list_t *buff)
-#endif
+static int recv_buffer(zhandle_t *zh, buffer_list_t *buff)
 {
     int off = buff->curr_offset;
     int rc = 0;
-    //fprintf(LOGSTREAM, "rc = %d, off = %d, line %d\n", rc, off, __LINE__);
 
     /* if buffer is less than 4, we are reading in the length */
     if (off < 4) {
         char *buffer = (char*)&(buff->len);
-        rc = recv(fd, buffer+off, sizeof(int)-off, 0);
-        //fprintf(LOGSTREAM, "rc = %d, off = %d, line %d\n", rc, off, __LINE__);
-        switch(rc) {
+        rc = recv(zh->fd, buffer+off, sizeof(int)-off, 0);
+        switch (rc) {
         case 0:
             errno = EHOSTDOWN;
         case -1:
-#ifndef _WINDOWS
-            if (errno == EAGAIN) {
-#else
+#ifdef _WIN32
             if (WSAGetLastError() == WSAEWOULDBLOCK) {
+#else
+            if (errno == EAGAIN) {
 #endif
                 return 0;
             }
@@ -1476,15 +1484,21 @@ static int recv_buffer(int fd, buffer_li
         /* want off to now represent the offset into the buffer */
         off -= sizeof(buff->len);
 
-        rc = recv(fd, buff->buffer+off, buff->len-off, 0);
+        rc = recv(zh->fd, buff->buffer+off, buff->len-off, 0);
+
+        /* dirty hack to make new client work against old server
+         * old server sends 40 bytes to finish connection handshake,
+         * while we're expecting 41 (1 byte for read-only mode data) */
+        if (buff == &zh->primer_buffer && rc == buff->len - 1) ++rc;
+
         switch(rc) {
         case 0:
             errno = EHOSTDOWN;
         case -1:
-#ifndef _WINDOWS
-            if (errno == EAGAIN) {
-#else
+#ifdef _WIN32
             if (WSAGetLastError() == WSAEWOULDBLOCK) {
+#else
+            if (errno == EAGAIN) {
 #endif
                 break;
             }
@@ -1578,6 +1592,13 @@ static void cleanup_bufs(zhandle_t *zh,i
     }
 }
 
+/* return 1 if zh's state is ZOO_CONNECTED_STATE or ZOO_READONLY_STATE,
+ * 0 otherwise */
+static int is_connected(zhandle_t* zh)
+{
+    return (zh->state==ZOO_CONNECTED_STATE || zh->state==ZOO_READONLY_STATE);
+}
+
 static void handle_error(zhandle_t *zh,int rc)
 {
     close(zh->fd);
@@ -1585,7 +1606,7 @@ static void handle_error(zhandle_t *zh,i
         LOG_DEBUG(LOGCALLBACK(zh), "Calling a watcher for a ZOO_SESSION_EVENT and the state=%s",
                 state2String(zh->state));
         PROCESS_SESSION_EVENT(zh, zh->state);
-    } else if (zh->state == ZOO_CONNECTED_STATE) {
+    } else if (is_connected(zh)) {
         LOG_DEBUG(LOGCALLBACK(zh), "Calling a watcher for a ZOO_SESSION_EVENT and the state=CONNECTING_STATE");
         PROCESS_SESSION_EVENT(zh, ZOO_CONNECTING_STATE);
     }
@@ -1793,36 +1814,46 @@ static int serialize_prime_connect(struc
     offset = offset +  sizeof(req->passwd_len);
 
     memcpy(buffer + offset, req->passwd, sizeof(req->passwd));
+    offset = offset +  sizeof(req->passwd);
+
+    memcpy(buffer + offset, &req->readOnly, sizeof(req->readOnly));
 
     return 0;
 }
 
- static int deserialize_prime_response(struct prime_struct *req, char* buffer){
+static int deserialize_prime_response(struct prime_struct *resp, char* buffer)
+{
      //this should be the order of deserialization
      int offset = 0;
-     memcpy(&req->len, buffer + offset, sizeof(req->len));
-     offset = offset +  sizeof(req->len);
+     memcpy(&resp->len, buffer + offset, sizeof(resp->len));
+     offset = offset +  sizeof(resp->len);
+
+     resp->len = ntohl(resp->len);
+     memcpy(&resp->protocolVersion,
+            buffer + offset,
+            sizeof(resp->protocolVersion));
+     offset = offset +  sizeof(resp->protocolVersion);
+
+     resp->protocolVersion = ntohl(resp->protocolVersion);
+     memcpy(&resp->timeOut, buffer + offset, sizeof(resp->timeOut));
+     offset = offset +  sizeof(resp->timeOut);
+
+     resp->timeOut = ntohl(resp->timeOut);
+     memcpy(&resp->sessionId, buffer + offset, sizeof(resp->sessionId));
+     offset = offset +  sizeof(resp->sessionId);
+
+     resp->sessionId = htonll(resp->sessionId);
+     memcpy(&resp->passwd_len, buffer + offset, sizeof(resp->passwd_len));
+     offset = offset +  sizeof(resp->passwd_len);
+
+     resp->passwd_len = ntohl(resp->passwd_len);
+     memcpy(resp->passwd, buffer + offset, sizeof(resp->passwd));
+     offset = offset +  sizeof(resp->passwd);
 
-     req->len = ntohl(req->len);
-     memcpy(&req->protocolVersion, buffer + offset, sizeof(req->protocolVersion));
-     offset = offset +  sizeof(req->protocolVersion);
-
-     req->protocolVersion = ntohl(req->protocolVersion);
-     memcpy(&req->timeOut, buffer + offset, sizeof(req->timeOut));
-     offset = offset +  sizeof(req->timeOut);
-
-     req->timeOut = ntohl(req->timeOut);
-     memcpy(&req->sessionId, buffer + offset, sizeof(req->sessionId));
-     offset = offset +  sizeof(req->sessionId);
-
-     req->sessionId = htonll(req->sessionId);
-     memcpy(&req->passwd_len, buffer + offset, sizeof(req->passwd_len));
-     offset = offset +  sizeof(req->passwd_len);
+     memcpy(&resp->readOnly, buffer + offset, sizeof(resp->readOnly));
 
-     req->passwd_len = ntohl(req->passwd_len);
-     memcpy(req->passwd, buffer + offset, sizeof(req->passwd));
      return 0;
- }
+}
 
 static int prime_connection(zhandle_t *zh)
 {
@@ -1833,11 +1864,12 @@ static int prime_connection(zhandle_t *z
     int hlen = 0;
     struct connect_req req;
     req.protocolVersion = 0;
-    req.sessionId = zh->client_id.client_id;
+    req.sessionId = zh->seen_rw_server_before ? zh->client_id.client_id : 0;
     req.passwd_len = sizeof(req.passwd);
     memcpy(req.passwd, zh->client_id.passwd, sizeof(zh->client_id.passwd));
     req.timeOut = zh->recv_timeout;
     req.lastZxidSeen = zh->last_zxid;
+    req.readOnly = zh->allow_read_only;
     hlen = htonl(len);
     /* We are running fast and loose here, but this string should fit in the initial buffer! */
     rc=zookeeper_send(zh->fd, &hlen, sizeof(len));
@@ -1850,6 +1882,8 @@ static int prime_connection(zhandle_t *z
     zh->state = ZOO_ASSOCIATING_STATE;
 
     zh->input_buffer = &zh->primer_buffer;
+    memset(zh->input_buffer->buffer, 0, zh->input_buffer->len);
+
     /* This seems a bit weird to to set the offset to 4, but we already have a
      * length, so we skip reading the length (and allocating the buffer) by
      * saying that we are already at offset 4 */
@@ -1905,16 +1939,128 @@ static struct timeval get_timeval(int in
     return rc<0 ? rc : adaptor_send_queue(zh, 0);
 }
 
-#ifdef WIN32
-int zookeeper_interest(zhandle_t *zh, SOCKET *fd, int *interest,
-     struct timeval *tv)
+/* upper bound of a timeout for seeking for r/w server when in read-only mode */
+const int MAX_RW_TIMEOUT = 60000;
+const int MIN_RW_TIMEOUT = 200;
+
+static int ping_rw_server(zhandle_t* zh)
 {
+    char buf[10];
+    socket_t sock;
+    int rc;
+    sendsize_t ssize;
+    struct sockaddr_storage addr;
+
+    addrvec_peek(&zh->addrs, &addr);
+
+    sock = socket(addr.ss_family, SOCK_STREAM, 0);
+    if (sock < 0) {
+        return 0;
+    }
+
+    zookeeper_set_sock_nodelay(zh, sock);
+    zookeeper_set_sock_timeout(zh, sock, 1);
+
+    rc = zookeeper_connect(zh, &addr, sock);
+    if (rc < 0) {
+        return 0;
+    }
+
+    ssize = zookeeper_send(sock, "isro", 4);
+    if (ssize < 0) {
+        rc = 0;
+        goto out;
+    }
+
+    memset(buf, 0, sizeof(buf));
+    rc = recv(sock, buf, sizeof(buf), 0);
+    if (rc < 0) {
+        rc = 0;
+        goto out;
+    }
+
+    rc = strcmp("rw", buf) == 0;
+
+out:
+    close(sock);
+    return rc;
+}
+
+static inline int min(int a, int b)
+{
+    return a < b ? a : b;
+}
+
+static void zookeeper_set_sock_noblock(zhandle_t *zh, socket_t sock)
+{
+#ifdef _WIN32
     ULONG nonblocking_flag = 1;
+
+    ioctlsocket(sock, FIONBIO, &nonblocking_flag);
 #else
-int zookeeper_interest(zhandle_t *zh, int *fd, int *interest,
-     struct timeval *tv)
+    fcntl(sock, F_SETFL, O_NONBLOCK|fcntl(sock, F_GETFL, 0));
+#endif
+}
+
+static void zookeeper_set_sock_timeout(zhandle_t *zh, socket_t s, int timeout)
 {
+    struct timeval tv = { .tv_sec = timeout };
+
+    setsockopt(s, SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(struct timeval));
+    setsockopt(s, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(struct timeval));
+}
+
+static void zookeeper_set_sock_nodelay(zhandle_t *zh, socket_t sock)
+{
+#ifdef _WIN32
+    char enable_tcp_nodelay = 1;
+#else
+    int enable_tcp_nodelay = 1;
+#endif
+    int rc;
+
+    rc = setsockopt(sock,
+                    IPPROTO_TCP,
+                    TCP_NODELAY,
+                    &enable_tcp_nodelay,
+                    sizeof(enable_tcp_nodelay));
+
+    if (rc) {
+        LOG_WARN(LOGCALLBACK(zh),
+                 "Unable to set TCP_NODELAY, latency may be effected");
+    }
+}
+
+static socket_t zookeeper_connect(zhandle_t *zh,
+                                  struct sockaddr_storage *addr,
+                                  socket_t fd)
+{
+    int rc;
+    int addr_len;
+
+#if defined(AF_INET6)
+    if (addr->ss_family == AF_INET6) {
+        addr_len = sizeof(struct sockaddr_in6);
+    } else {
+        addr_len = sizeof(struct sockaddr_in);
+    }
+#else
+    addr_len = sizeof(struct sockaddr_in);
 #endif
+
+    LOG_DEBUG(LOGCALLBACK(zh), "[zk] connect()\n");
+    rc = connect(fd, (struct sockaddr *)addr, addr_len);
+
+#ifdef _WIN32
+    get_errno();
+#endif
+
+    return rc;
+}
+
+int zookeeper_interest(zhandle_t *zh, socket_t *fd, int *interest,
+     struct timeval *tv)
+{
     int rc = 0;
     struct timeval now;
     if(zh==0 || fd==0 ||interest==0 || tv==0)
@@ -1942,7 +2088,6 @@ int zookeeper_interest(zhandle_t *zh, in
     tv->tv_usec = 0;
 
     if (*fd == -1) {
-
         /*
          * If we previously failed to connect to server pool (zh->delay == 1)
          * then we need delay our connection on this iteration 1/60 of the
@@ -1957,63 +2102,45 @@ int zookeeper_interest(zhandle_t *zh, in
 
             LOG_WARN(LOGCALLBACK(zh), "Delaying connection after exhaustively trying all servers [%s]",
                      zh->hostname);
-        }
-
-        // No need to delay -- grab the next server and attempt connection
-        else {
-            int ssoresult;
-
-#ifdef WIN32
-            char enable_tcp_nodelay = 1;
-#else
-            int enable_tcp_nodelay = 1;
-#endif
+        } else {
+            // No need to delay -- grab the next server and attempt connection
             zoo_cycle_next_server(zh);
-
             zh->fd = socket(zh->addr_cur.ss_family, SOCK_STREAM, 0);
             if (zh->fd < 0) {
-                return api_epilog(zh,handle_socket_error_msg(zh,__LINE__,
-                                                             ZSYSTEMERROR, "socket() call failed"));
-            }
-            ssoresult = setsockopt(zh->fd, IPPROTO_TCP, TCP_NODELAY, &enable_tcp_nodelay, sizeof(enable_tcp_nodelay));
-            if (ssoresult != 0) {
-                LOG_WARN(LOGCALLBACK(zh), "Unable to set TCP_NODELAY, operation latency may be effected");
+              rc = handle_socket_error_msg(zh,
+                                           __LINE__,
+                                           ZSYSTEMERROR,
+                                           "socket() call failed");
+              return api_epilog(zh, rc);
             }
-#ifdef WIN32
-            ioctlsocket(zh->fd, FIONBIO, &nonblocking_flag);
-#else
-            fcntl(zh->fd, F_SETFL, O_NONBLOCK|fcntl(zh->fd, F_GETFL, 0));
-#endif
-#if defined(AF_INET6)
-            if (zh->addr_cur.ss_family == AF_INET6) {
-                rc = connect(zh->fd, (struct sockaddr*)&zh->addr_cur, sizeof(struct sockaddr_in6));
-            } else {
-#else
-               LOG_DEBUG(LOGCALLBACK(zh), "[zk] connect()\n");
-            {
-#endif
-                rc = connect(zh->fd, (struct sockaddr*)&zh->addr_cur, sizeof(struct sockaddr_in));
-#ifdef WIN32
-                get_errno();
-#endif
-            }
-            if (rc == -1) {
 
+            zookeeper_set_sock_nodelay(zh, zh->fd);
+            zookeeper_set_sock_noblock(zh, zh->fd);
+
+            rc = zookeeper_connect(zh, &zh->addr_cur, zh->fd);
+
+            if (rc == -1) {
                 /* we are handling the non-blocking connect according to
                  * the description in section 16.3 "Non-blocking connect"
                  * in UNIX Network Programming vol 1, 3rd edition */
-                if (errno == EWOULDBLOCK || errno == EINPROGRESS)
+                if (errno == EWOULDBLOCK || errno == EINPROGRESS) {
                     zh->state = ZOO_CONNECTING_STATE;
-                else
-                {
-                    return api_epilog(zh,handle_socket_error_msg(zh,__LINE__,
-                            ZCONNECTIONLOSS,"connect() call failed"));
+                } else {
+                    rc = handle_socket_error_msg(zh,
+                                                 __LINE__,
+                                                 ZCONNECTIONLOSS,
+                                                 "connect() call failed");
+                    return api_epilog(zh, rc);
                 }
             } else {
-                if((rc=prime_connection(zh))!=0)
+                rc = prime_connection(zh);
+                if (rc != 0) {
                     return api_epilog(zh,rc);
+                }
 
-                LOG_INFO(LOGCALLBACK(zh), "Initiated connection to server [%s]", format_endpoint_info(&zh->addr_cur));
+                LOG_INFO(LOGCALLBACK(zh),
+                         "Initiated connection to server [%s]",
+                         format_endpoint_info(&zh->addr_cur));
             }
             *tv = get_timeval(zh->recv_timeout/3);
         }
@@ -2021,6 +2148,8 @@ int zookeeper_interest(zhandle_t *zh, in
         zh->last_recv = now;
         zh->last_send = now;
         zh->last_ping = now;
+        zh->last_ping_rw = now;
+        zh->ping_rw_timeout = MIN_RW_TIMEOUT;
     }
 
     if (zh->fd != -1) {
@@ -2031,7 +2160,7 @@ int zookeeper_interest(zhandle_t *zh, in
         // have we exceeded the receive timeout threshold?
         if (recv_to <= 0) {
             // We gotta cut our losses and connect to someone else
-#ifdef WIN32
+#ifdef _WIN32
             errno = WSAETIMEDOUT;
 #else
             errno = ETIMEDOUT;
@@ -2045,14 +2174,13 @@ int zookeeper_interest(zhandle_t *zh, in
                     -recv_to));
 
         }
+
         // We only allow 1/3 of our timeout time to expire before sending
         // a PING
-        if (zh->state==ZOO_CONNECTED_STATE) {
+        if (is_connected(zh)) {
             send_to = zh->recv_timeout/3 - idle_send;
             if (send_to <= 0) {
                 if (zh->sent_requests.head == 0) {
-//                  LOG_DEBUG(LOGCALLBACK(zh), "Sending PING to %s (exceeded idle by %dms)",
-//                            zoo_get_current_server(zh),-send_to);
                     rc = send_ping(zh);
                     if (rc < 0) {
                         LOG_ERROR(LOGCALLBACK(zh), "failed to send PING request (zk retcode=%d)",rc);
@@ -2062,8 +2190,33 @@ int zookeeper_interest(zhandle_t *zh, in
                 send_to = zh->recv_timeout/3;
             }
         }
+
+        // If we are in read-only mode, seek for read/write server
+        if (zh->state == ZOO_READONLY_STATE) {
+            int idle_ping_rw = calculate_interval(&zh->last_ping_rw, &now);
+            if (idle_ping_rw >= zh->ping_rw_timeout) {
+                zh->last_ping_rw = now;
+                idle_ping_rw = 0;
+                zh->ping_rw_timeout = min(zh->ping_rw_timeout * 2,
+                                          MAX_RW_TIMEOUT);
+                if (ping_rw_server(zh)) {
+                    struct sockaddr_storage addr;
+                    addrvec_peek(&zh->addrs, &addr);
+                    zh->ping_rw_timeout = MIN_RW_TIMEOUT;
+                    LOG_INFO(LOGCALLBACK(zh),
+                             "r/w server found at %s",
+                             format_endpoint_info(&addr));
+                    handle_error(zh, ZRWSERVERFOUND);
+                } else {
+                    addrvec_next(&zh->addrs, NULL);
+                }
+            }
+            send_to = min(send_to, zh->ping_rw_timeout - idle_ping_rw);
+        }
+
         // choose the lesser value as the timeout
-        *tv = get_timeval(recv_to < send_to? recv_to:send_to);
+        *tv = get_timeval(min(recv_to, send_to));
+
         zh->next_deadline.tv_sec = now.tv_sec + tv->tv_sec;
         zh->next_deadline.tv_usec = now.tv_usec + tv->tv_usec;
         if (zh->next_deadline.tv_usec > 1000000) {
@@ -2073,7 +2226,7 @@ int zookeeper_interest(zhandle_t *zh, in
         *interest = ZOOKEEPER_READ;
         /* we are interested in a write if we are connected and have something
          * to send, or we are waiting for a connect to finish. */
-        if ((zh->to_send.head && (zh->state == ZOO_CONNECTED_STATE))
+        if ((zh->to_send.head && is_connected(zh))
             || zh->state == ZOO_CONNECTING_STATE) {
             *interest |= ZOOKEEPER_WRITE;
         }
@@ -2118,7 +2271,7 @@ static int check_events(zhandle_t *zh, i
             zh->input_buffer = allocate_buffer(0,0);
         }
 
-        rc = recv_buffer(zh->fd, zh->input_buffer);
+        rc = recv_buffer(zh, zh->input_buffer);
         if (rc < 0) {
             return handle_socket_error_msg(zh, __LINE__,ZCONNECTIONLOSS,
                 "failed while receiving a server response");
@@ -2128,12 +2281,13 @@ static int check_events(zhandle_t *zh, i
             if (zh->input_buffer != &zh->primer_buffer) {
                 queue_buffer(&zh->to_process, zh->input_buffer, 0);
             } else  {
-                int64_t oldid,newid;
+                int64_t oldid, newid;
                 //deserialize
                 deserialize_prime_response(&zh->primer_storage, zh->primer_buffer.buffer);
                 /* We are processing the primer_buffer, so we need to finish
                  * the connection handshake */
-                oldid = zh->client_id.client_id;
+                oldid = zh->seen_rw_server_before ? zh->client_id.client_id : 0;
+                zh->seen_rw_server_before |= !zh->primer_storage.readOnly;
                 newid = zh->primer_storage.sessionId;
                 if (oldid != 0 && oldid != newid) {
                     zh->state = ZOO_EXPIRED_SESSION_STATE;
@@ -2146,11 +2300,14 @@ static int check_events(zhandle_t *zh, i
 
                     memcpy(zh->client_id.passwd, &zh->primer_storage.passwd,
                            sizeof(zh->client_id.passwd));
-                    zh->state = ZOO_CONNECTED_STATE;
+                    zh->state = zh->primer_storage.readOnly ?
+                      ZOO_READONLY_STATE : ZOO_CONNECTED_STATE;
                     zh->reconfig = 0;
-                    LOG_INFO(LOGCALLBACK(zh), "session establishment complete on server [%s], sessionId=%#llx, negotiated timeout=%d",
-                              format_endpoint_info(&zh->addr_cur),
-                              newid, zh->recv_timeout);
+                    LOG_INFO(LOGCALLBACK(zh),
+                             "session establishment complete on server [%s], sessionId=%#llx, negotiated timeout=%d %s",
+                             format_endpoint_info(&zh->addr_cur),
+                             newid, zh->recv_timeout,
+                             zh->primer_storage.readOnly ? "(READ-ONLY mode)" : "");
                     /* we want the auth to be sent for, but since both call push to front
                        we need to call send_watch_set first */
                     send_set_watches(zh);
@@ -2158,7 +2315,7 @@ static int check_events(zhandle_t *zh, i
                     send_auth_info(zh);
                     LOG_DEBUG(LOGCALLBACK(zh), "Calling a watcher for a ZOO_SESSION_EVENT and the state=ZOO_CONNECTED_STATE");
                     zh->input_buffer = 0; // just in case the watcher calls zookeeper_process() again
-                    PROCESS_SESSION_EVENT(zh, ZOO_CONNECTED_STATE);
+                    PROCESS_SESSION_EVENT(zh, zh->state);
                 }
             }
             zh->input_buffer = 0;
@@ -2555,7 +2712,7 @@ void process_completions(zhandle_t *zh)
 
 static void isSocketReadable(zhandle_t* zh)
 {
-#ifndef WIN32
+#ifndef _WIN32
     struct pollfd fds;
     fds.fd = zh->fd;
     fds.events = POLLIN;
@@ -2724,7 +2881,9 @@ int zookeeper_process(zhandle_t *zh, int
     if (process_async(zh->outstanding_sync)) {
         process_completions(zh);
     }
-    return api_epilog(zh,ZOK);}
+
+    return api_epilog(zh, ZOK);
+}
 
 int zoo_state(zhandle_t *zh)
 {
@@ -3000,7 +3159,7 @@ int zookeeper_close(zhandle_t *zh)
     }
     /* No need to decrement the counter since we're just going to
      * destroy the handle later. */
-    if(zh->state==ZOO_CONNECTED_STATE){
+    if (is_connected(zh)){
         struct oarchive *oa;
         struct RequestHeader h = {get_xid(), ZOO_CLOSE_OP};
         LOG_INFO(LOGCALLBACK(zh), "Closing zookeeper sessionId=%#llx to [%s]\n",
@@ -3029,7 +3188,7 @@ finish:
     destroy(zh);
     adaptor_destroy(zh);
     free(zh);
-#ifdef WIN32
+#ifdef _WIN32
     Win32WSACleanup();
 #endif
     return rc;
@@ -3905,7 +4064,7 @@ int flush_send_queue(zhandle_t*zh, int t
 {
     int rc= ZOK;
     struct timeval started;
-#ifdef WIN32
+#ifdef _WIN32
     fd_set pollSet;
     struct timeval wait;
 #endif
@@ -3915,9 +4074,9 @@ int flush_send_queue(zhandle_t*zh, int t
     // we use a recursive lock instead and only dequeue the buffer if a send was
     // successful
     lock_buffer_list(&zh->to_send);
-    while (zh->to_send.head != 0&& zh->state == ZOO_CONNECTED_STATE) {
+    while (zh->to_send.head != 0 && is_connected(zh)) {
         if(timeout!=0){
-#ifndef WIN32
+#ifndef _WIN32
             struct pollfd fds;
 #endif
             int elapsed;
@@ -3929,7 +4088,7 @@ int flush_send_queue(zhandle_t*zh, int t
                 break;
             }
 
-#ifdef WIN32
+#ifdef _WIN32
             wait = get_timeval(timeout-elapsed);
             FD_ZERO(&pollSet);
             FD_SET(zh->fd, &pollSet);
@@ -4019,6 +4178,8 @@ const char* zerror(int c)
       return "(not error) no server responses to process";
     case ZSESSIONMOVED:
       return "session moved to another server, so operation is ignored";
+    case ZNOTREADONLY:
+      return "state-changing request is passed to read-only server";
    case ZNEWCONFIGNOQUORUM:
        return "no quorum of new config is connected and up-to-date with the leader of last commmitted config - try invoking reconfiguration after new servers are connected and synced";
    case ZRECONFIGINPROGRESS:
@@ -4069,7 +4230,7 @@ int zoo_add_auth(zhandle_t *zh,const cha
     add_last_auth(&zh->auth_h, authinfo);
     zoo_unlock_auth(zh);
 
-    if(zh->state == ZOO_CONNECTED_STATE || zh->state == ZOO_ASSOCIATING_STATE)
+    if (is_connected(zh) || zh->state == ZOO_ASSOCIATING_STATE)
         return send_last_auth_info(zh);
 
     return ZOK;
@@ -4080,7 +4241,7 @@ static const char* format_endpoint_info(
     static char buf[128] = { 0 };
     char addrstr[128] = { 0 };
     void *inaddr;
-#ifdef WIN32
+#ifdef _WIN32
     char * addrstring;
 #endif
     int port;
@@ -4098,7 +4259,7 @@ static const char* format_endpoint_info(
 #if defined(AF_INET6)
     }
 #endif
-#ifdef WIN32
+#ifdef _WIN32
     addrstring = inet_ntoa (*(struct in_addr*)inaddr);
     sprintf(buf,"%s:%d",addrstring,ntohs(port));
 #else

Modified: zookeeper/trunk/src/c/tests/TestClientRetry.cc
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/c/tests/TestClientRetry.cc?rev=1608620&r1=1608619&r2=1608620&view=diff
==============================================================================
--- zookeeper/trunk/src/c/tests/TestClientRetry.cc (original)
+++ zookeeper/trunk/src/c/tests/TestClientRetry.cc Mon Jul  7 21:43:44 2014
@@ -22,140 +22,14 @@
 #include <signal.h>
 #include <stdlib.h>
 #include <unistd.h>
-#include <sys/select.h>
-
-#include "CollectionUtil.h"
-#include "ThreadingUtil.h"
-
-using namespace Util;
 
 #include "Vector.h"
 using namespace std;
 
-#include <cstring>
-#include <list>
-
 #include <zookeeper.h>
 
 #include "Util.h"
-
-#ifdef THREADED
-    static void yield(zhandle_t *zh, int i)
-    {
-        sleep(i);
-    }
-#else
-    static void yield(zhandle_t *zh, int seconds)
-    {
-        int fd;
-        int interest;
-        int events;
-        struct timeval tv;
-        int rc;
-        time_t expires = time(0) + seconds; 
-        time_t timeLeft = seconds;
-        fd_set rfds, wfds, efds;
-        FD_ZERO(&rfds);
-        FD_ZERO(&wfds);
-        FD_ZERO(&efds);
-
-        while(timeLeft >= 0) {
-            zookeeper_interest(zh, &fd, &interest, &tv);
-            if (fd != -1) {
-                if (interest&ZOOKEEPER_READ) {
-                    FD_SET(fd, &rfds);
-                } else {
-                    FD_CLR(fd, &rfds);
-                }
-                if (interest&ZOOKEEPER_WRITE) {
-                    FD_SET(fd, &wfds);
-                } else {
-                    FD_CLR(fd, &wfds);
-                }
-            } else {
-                fd = 0;
-            }
-            FD_SET(0, &rfds);
-            if (tv.tv_sec > timeLeft) {
-                tv.tv_sec = timeLeft;
-            }
-            rc = select(fd+1, &rfds, &wfds, &efds, &tv);
-            timeLeft = expires - time(0);
-            events = 0;
-            if (FD_ISSET(fd, &rfds)) {
-                events |= ZOOKEEPER_READ;
-            }
-            if (FD_ISSET(fd, &wfds)) {
-                events |= ZOOKEEPER_WRITE;
-            }
-            zookeeper_process(zh, events);
-        }
-    }
-#endif
-
-typedef struct evt {
-    string path;
-    int type;
-} evt_t;
-
-typedef struct watchCtx {
-private:
-    list<evt_t> events;
-public:
-    bool connected;
-    zhandle_t *zh;
-    Mutex mutex;
-
-    watchCtx() {
-        connected = false;
-        zh = 0;
-    }
-    ~watchCtx() {
-        if (zh) {
-            zookeeper_close(zh);
-            zh = 0;
-        }
-    }
-
-    evt_t getEvent() {
-        evt_t evt;
-        mutex.acquire();
-        CPPUNIT_ASSERT( events.size() > 0);
-        evt = events.front();
-        events.pop_front();
-        mutex.release();
-        return evt;
-    }
-
-    int countEvents() {
-        int count;
-        mutex.acquire();
-        count = events.size();
-        mutex.release();
-        return count;
-    }
-
-    void putEvent(evt_t evt) {
-        mutex.acquire();
-        events.push_back(evt);
-        mutex.release();
-    }
-
-    bool waitForConnected(zhandle_t *zh) {
-        time_t expires = time(0) + 10;
-        while(!connected && time(0) < expires) {
-            yield(zh, 1);
-        }
-        return connected;
-    }
-    bool waitForDisconnected(zhandle_t *zh) {
-        time_t expires = time(0) + 15;
-        while(connected && time(0) < expires) {
-            yield(zh, 1);
-        }
-        return !connected;
-    }
-} watchctx_t; 
+#include "WatchUtil.h"
 
 class Zookeeper_clientretry : public CPPUNIT_NS::TestFixture
 {

Added: zookeeper/trunk/src/c/tests/TestReadOnlyClient.cc
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/c/tests/TestReadOnlyClient.cc?rev=1608620&view=auto
==============================================================================
--- zookeeper/trunk/src/c/tests/TestReadOnlyClient.cc (added)
+++ zookeeper/trunk/src/c/tests/TestReadOnlyClient.cc Mon Jul  7 21:43:44 2014
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <cppunit/extensions/HelperMacros.h>
+#include "CppAssertHelper.h"
+
+#include <sys/socket.h>
+#include <unistd.h>
+
+#include <zookeeper.h>
+
+#include "Util.h"
+#include "WatchUtil.h"
+
+class Zookeeper_readOnly : public CPPUNIT_NS::TestFixture {
+    CPPUNIT_TEST_SUITE(Zookeeper_readOnly);
+#ifdef THREADED
+    CPPUNIT_TEST(testReadOnly);
+#endif
+    CPPUNIT_TEST_SUITE_END();
+
+    static void watcher(zhandle_t* zh, int type, int state,
+                        const char* path, void* v) {
+        watchctx_t *ctx = (watchctx_t*)v;
+
+        if (state==ZOO_CONNECTED_STATE || state==ZOO_READONLY_STATE) {
+            ctx->connected = true;
+        } else {
+            ctx->connected = false;
+        }
+        if (type != ZOO_SESSION_EVENT) {
+            evt_t evt;
+            evt.path = path;
+            evt.type = type;
+            ctx->putEvent(evt);
+        }
+    }
+
+    FILE *logfile;
+public:
+
+    Zookeeper_readOnly() {
+      logfile = openlogfile("Zookeeper_readOnly");
+    }
+
+    ~Zookeeper_readOnly() {
+      if (logfile) {
+        fflush(logfile);
+        fclose(logfile);
+        logfile = 0;
+      }
+    }
+
+    void setUp() {
+        zoo_set_log_stream(logfile);
+    }
+
+    void startReadOnly() {
+        char cmd[1024];
+        sprintf(cmd, "%s startReadOnly", ZKSERVER_CMD);
+        CPPUNIT_ASSERT(system(cmd) == 0);
+    }
+
+    void stopPeer() {
+        char cmd[1024];
+        sprintf(cmd, "%s stop", ZKSERVER_CMD);
+        CPPUNIT_ASSERT(system(cmd) == 0);
+    }
+
+    void testReadOnly() {
+        startReadOnly();
+        watchctx_t watch;
+        zhandle_t* zh = zookeeper_init("localhost:22181",
+                                       watcher,
+                                       10000,
+                                       NULL,
+                                       &watch,
+                                       ZOO_READONLY);
+        watch.zh = zh;
+        CPPUNIT_ASSERT(zh != 0);
+        sleep(1);
+        int len = 1024;
+        char buf[len];
+        int res = zoo_get(zh, "/", 0, buf, &len, 0);
+        CPPUNIT_ASSERT_EQUAL((int)ZOK, res);
+
+        char path[1024];
+        res = zoo_create(zh, "/test", buf, 10, &ZOO_OPEN_ACL_UNSAFE, 0, path,
+                         512);
+        CPPUNIT_ASSERT_EQUAL((int)ZNOTREADONLY, res);
+        stopPeer();
+    }
+};
+
+CPPUNIT_TEST_SUITE_REGISTRATION(Zookeeper_readOnly);

Added: zookeeper/trunk/src/c/tests/WatchUtil.h
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/c/tests/WatchUtil.h?rev=1608620&view=auto
==============================================================================
--- zookeeper/trunk/src/c/tests/WatchUtil.h (added)
+++ zookeeper/trunk/src/c/tests/WatchUtil.h Mon Jul  7 21:43:44 2014
@@ -0,0 +1,153 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef WATCH_UTIL_H_
+#define WATCH_UTIL_H_
+
+#include <sys/select.h>
+#include <cstring>
+#include <list>
+
+using namespace std;
+
+#include "CollectionUtil.h"
+#include "ThreadingUtil.h"
+
+using namespace Util;
+
+#ifdef THREADED
+    static void yield(zhandle_t *zh, int i)
+    {
+        sleep(i);
+    }
+#else
+    static void yield(zhandle_t *zh, int seconds)
+    {
+        int fd;
+        int interest;
+        int events;
+        struct timeval tv;
+        int rc;
+        time_t expires = time(0) + seconds;
+        time_t timeLeft = seconds;
+        fd_set rfds, wfds, efds;
+        FD_ZERO(&rfds);
+        FD_ZERO(&wfds);
+        FD_ZERO(&efds);
+
+        while(timeLeft >= 0) {
+            zookeeper_interest(zh, &fd, &interest, &tv);
+            if (fd != -1) {
+                if (interest&ZOOKEEPER_READ) {
+                    FD_SET(fd, &rfds);
+                } else {
+                    FD_CLR(fd, &rfds);
+                }
+                if (interest&ZOOKEEPER_WRITE) {
+                    FD_SET(fd, &wfds);
+                } else {
+                    FD_CLR(fd, &wfds);
+                }
+            } else {
+                fd = 0;
+            }
+            FD_SET(0, &rfds);
+            if (tv.tv_sec > timeLeft) {
+                tv.tv_sec = timeLeft;
+            }
+            rc = select(fd+1, &rfds, &wfds, &efds, &tv);
+            timeLeft = expires - time(0);
+            events = 0;
+            if (FD_ISSET(fd, &rfds)) {
+                events |= ZOOKEEPER_READ;
+            }
+            if (FD_ISSET(fd, &wfds)) {
+                events |= ZOOKEEPER_WRITE;
+            }
+            zookeeper_process(zh, events);
+        }
+    }
+#endif
+
+typedef struct evt {
+	string path;
+	int type;
+} evt_t;
+
+typedef struct watchCtx {
+private:
+    list<evt_t> events;
+    watchCtx(const watchCtx&);
+    watchCtx& operator=(const watchCtx&);
+public:
+    bool connected;
+    zhandle_t *zh;
+    Mutex mutex;
+
+    watchCtx() {
+        connected = false;
+        zh = 0;
+    }
+    ~watchCtx() {
+        if (zh) {
+            zookeeper_close(zh);
+            zh = 0;
+        }
+    }
+
+    evt_t getEvent() {
+        evt_t evt;
+        mutex.acquire();
+        CPPUNIT_ASSERT( events.size() > 0);
+        evt = events.front();
+        events.pop_front();
+        mutex.release();
+        return evt;
+    }
+
+    int countEvents() {
+        int count;
+        mutex.acquire();
+        count = events.size();
+        mutex.release();
+        return count;
+    }
+
+    void putEvent(evt_t evt) {
+        mutex.acquire();
+        events.push_back(evt);
+        mutex.release();
+    }
+
+    bool waitForConnected(zhandle_t *zh) {
+        time_t expires = time(0) + 10;
+        while(!connected && time(0) < expires) {
+            yield(zh, 1);
+        }
+        return connected;
+    }
+    bool waitForDisconnected(zhandle_t *zh) {
+        time_t expires = time(0) + 15;
+        while(connected && time(0) < expires) {
+            yield(zh, 1);
+        }
+        return !connected;
+    }
+} watchctx_t;
+
+#endif /*WATCH_UTIL_H_*/

Modified: zookeeper/trunk/src/c/tests/ZKMocks.cc
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/c/tests/ZKMocks.cc?rev=1608620&r1=1608619&r2=1608620&view=diff
==============================================================================
--- zookeeper/trunk/src/c/tests/ZKMocks.cc (original)
+++ zookeeper/trunk/src/c/tests/ZKMocks.cc Mon Jul  7 21:43:44 2014
@@ -32,101 +32,117 @@ using namespace std;
 TestClientId testClientId;
 const char* TestClientId::PASSWD="1234567890123456";
 
-HandshakeRequest* HandshakeRequest::parse(const std::string& buf){
+HandshakeRequest* HandshakeRequest::parse(const std::string& buf) {
     auto_ptr<HandshakeRequest> req(new HandshakeRequest);
 
     memcpy(&req->protocolVersion,buf.data(), sizeof(req->protocolVersion));
     req->protocolVersion = htonl(req->protocolVersion);
-    
+
     int offset=sizeof(req->protocolVersion);
-    
+
     memcpy(&req->lastZxidSeen,buf.data()+offset,sizeof(req->lastZxidSeen));
     req->lastZxidSeen = htonll(req->lastZxidSeen);
     offset+=sizeof(req->lastZxidSeen);
-    
+
     memcpy(&req->timeOut,buf.data()+offset,sizeof(req->timeOut));
     req->timeOut = htonl(req->timeOut);
     offset+=sizeof(req->timeOut);
-    
+
     memcpy(&req->sessionId,buf.data()+offset,sizeof(req->sessionId));
     req->sessionId = htonll(req->sessionId);
     offset+=sizeof(req->sessionId);
-    
+
     memcpy(&req->passwd_len,buf.data()+offset,sizeof(req->passwd_len));
     req->passwd_len = htonl(req->passwd_len);
     offset+=sizeof(req->passwd_len);
-    
+
     memcpy(req->passwd,buf.data()+offset,sizeof(req->passwd));
+    offset+=sizeof(req->passwd);
+
+    memcpy(&req->readOnly,buf.data()+offset,sizeof(req->readOnly));
+
     if(testClientId.client_id==req->sessionId &&
             !memcmp(testClientId.passwd,req->passwd,sizeof(req->passwd)))
         return req.release();
     // the request didn't match -- may not be a handshake request after all
+
     return 0;
 }
 
 // *****************************************************************************
 // watcher action implementation
-void activeWatcher(zhandle_t *zh, int type, int state, const char *path,void* ctx){
-    if(zh==0 || ctx==0) return;
-    WatcherAction* action=(WatcherAction*)ctx;
-    
-    if(type==ZOO_SESSION_EVENT){
-        if(state==ZOO_EXPIRED_SESSION_STATE)
+void activeWatcher(zhandle_t *zh,
+                   int type, int state, const char *path,void* ctx) {
+
+    if (zh == 0 || ctx == 0)
+      return;
+
+    WatcherAction* action = (WatcherAction *)ctx;
+
+    if (type == ZOO_SESSION_EVENT) {
+        if (state == ZOO_EXPIRED_SESSION_STATE)
             action->onSessionExpired(zh);
-        else if(state==ZOO_CONNECTING_STATE)
+        else if(state == ZOO_CONNECTING_STATE)
             action->onConnectionLost(zh);
-        else if(state==ZOO_CONNECTED_STATE)
+        else if(state == ZOO_CONNECTED_STATE)
             action->onConnectionEstablished(zh);
-    }else if(type==ZOO_CHANGED_EVENT)
+    } else if (type == ZOO_CHANGED_EVENT)
         action->onNodeValueChanged(zh,path);
-    else if(type==ZOO_DELETED_EVENT)
+    else if (type == ZOO_DELETED_EVENT)
         action->onNodeDeleted(zh,path);
-    else if(type==ZOO_CHILD_EVENT)
+    else if (type == ZOO_CHILD_EVENT)
         action->onChildChanged(zh,path);
+
     // TODO: implement for the rest of the event types
-    // ...
-    action->setWatcherTriggered();    
+
+    action->setWatcherTriggered();
 }
-SyncedBoolCondition WatcherAction::isWatcherTriggered() const{
+
+SyncedBoolCondition WatcherAction::isWatcherTriggered() const {
     return SyncedBoolCondition(triggered_,mx_);
 }
 
-// *****************************************************************************
 // a set of async completion signatures
+
 void asyncCompletion(int rc, ACL_vector *acl,Stat *stat, const void *data){
     assert("Completion data is NULL"&&data);
     static_cast<AsyncCompletion*>((void*)data)->aclCompl(rc,acl,stat);
 }
-void asyncCompletion(int rc, const char *value, int len, const Stat *stat, 
-        const void *data){    
+
+void asyncCompletion(int rc, const char *value, int len, const Stat *stat,
+        const void *data) {
     assert("Completion data is NULL"&&data);
-    static_cast<AsyncCompletion*>((void*)data)->dataCompl(rc,value,len,stat);    
+    static_cast<AsyncCompletion*>((void*)data)->dataCompl(rc,value,len,stat);
 }
-void asyncCompletion(int rc, const Stat *stat, const void *data){    
+
+void asyncCompletion(int rc, const Stat *stat, const void *data) {
     assert("Completion data is NULL"&&data);
     static_cast<AsyncCompletion*>((void*)data)->statCompl(rc,stat);
 }
-void asyncCompletion(int rc, const char *value, const void *data){
+
+void asyncCompletion(int rc, const char *value, const void *data) {
     assert("Completion data is NULL"&&data);
     static_cast<AsyncCompletion*>((void*)data)->stringCompl(rc,value);
 }
-void asyncCompletion(int rc,const String_vector *strings, const void *data){    
+
+void asyncCompletion(int rc,const String_vector *strings, const void *data) {
     assert("Completion data is NULL"&&data);
     static_cast<AsyncCompletion*>((void*)data)->stringsCompl(rc,strings);
 }
-void asyncCompletion(int rc, const void *data){    
+
+void asyncCompletion(int rc, const void *data) {
     assert("Completion data is NULL"&&data);
     static_cast<AsyncCompletion*>((void*)data)->voidCompl(rc);
 }
 
-// *****************************************************************************
 // a predicate implementation
 bool IOThreadStopped::operator()() const{
 #ifdef THREADED
     adaptor_threads* adaptor=(adaptor_threads*)zh_->adaptor_priv;
     return CheckedPthread::isTerminated(adaptor->io);
 #else
-    assert("IOThreadStopped predicate is only for use with THREADED client"&& false);
+    assert("IOThreadStopped predicate is only for use with THREADED client" &&
+           false);
     return false;
 #endif
 }
@@ -169,7 +185,7 @@ Mock_activateWatcher* Mock_activateWatch
 class ActivateWatcherWrapper: public Mock_activateWatcher{
 public:
     ActivateWatcherWrapper():ctx_(0),activated_(false){}
-    
+
     virtual void call(zhandle_t *zh, watcher_registration_t* reg, int rc){
         CALL_REAL(activateWatcher,(zh, reg,rc));
         synchronized(mx_);
@@ -178,13 +194,13 @@ public:
             ctx_=0;
         }
     }
-    
+
     void setContext(void* ctx){
         synchronized(mx_);
         ctx_=ctx;
         activated_=false;
     }
-    
+
     SyncedBoolCondition isActivated() const{
         return SyncedBoolCondition(activated_,mx_);
     }
@@ -195,7 +211,7 @@ public:
 
 WatcherActivationTracker::WatcherActivationTracker():
     wrapper_(new ActivateWatcherWrapper)
-{    
+{
 }
 
 WatcherActivationTracker::~WatcherActivationTracker(){
@@ -245,7 +261,8 @@ public:
     DeliverWatchersWrapper(int type,int state,bool terminate):
         type_(type),state_(state),
         allDelivered_(false),terminate_(terminate),zh_(0),deliveryCounter_(0){}
-    virtual void call(zhandle_t* zh,int type,int state, const char* path, watcher_object_list **list){
+    virtual void call(zhandle_t* zh, int type, int state,
+                      const char* path, watcher_object_list **list) {
         {
             synchronized(mx_);
             zh_=zh;
@@ -255,8 +272,8 @@ public:
         if(type_==type && state_==state){
             if(terminate_){
                 // prevent zhandle_t from being prematurely distroyed;
-                // this will also ensure that zookeeper_close() cleanups the thread
-                // resources by calling finish_adaptor()
+                // this will also ensure that zookeeper_close() cleanups the
+                //  thread resources by calling finish_adaptor()
                 inc_ref_counter(zh,1);
                 terminateZookeeperThreads(zh);
             }
@@ -302,7 +319,7 @@ WatcherDeliveryTracker::~WatcherDelivery
     delete deliveryWrapper_;
 }
 
-SyncedBoolCondition WatcherDeliveryTracker::isWatcherProcessingCompleted() const{
+SyncedBoolCondition WatcherDeliveryTracker::isWatcherProcessingCompleted() const {
     return deliveryWrapper_->isDelivered();
 }
 
@@ -310,7 +327,7 @@ void WatcherDeliveryTracker::resetDelive
     deliveryWrapper_->resetDeliveryCounter();
 }
 
-SyncedIntegerEqual WatcherDeliveryTracker::deliveryCounterEquals(int expected) const{
+SyncedIntegerEqual WatcherDeliveryTracker::deliveryCounterEquals(int expected) const {
     return deliveryWrapper_->deliveryCounterEquals(expected);
 }
 
@@ -327,6 +344,7 @@ string HandshakeResponse::toString() con
     tmp=htonl(passwd_len);
     buf.append((char*)&tmp,sizeof(tmp));
     buf.append(passwd,sizeof(passwd));
+    buf.append(&readOnly,sizeof(readOnly));
     // finally set the buffer length
     tmp=htonl(buf.size()+sizeof(tmp));
     buf.insert(0,(char*)&tmp, sizeof(tmp));
@@ -335,12 +353,12 @@ string HandshakeResponse::toString() con
 
 string ZooGetResponse::toString() const{
     oarchive* oa=create_buffer_oarchive();
-    
+
     ReplyHeader h = {xid_,1,ZOK};
     serialize_ReplyHeader(oa, "hdr", &h);
-    
+
     GetDataResponse resp;
-	char buf[1024];
+    char buf[1024];
     assert("GetDataResponse is too long"&&data_.size()<=sizeof(buf));
     resp.data.len=data_.size();
     resp.data.buff=buf;
@@ -350,34 +368,34 @@ string ZooGetResponse::toString() const{
     int32_t len=htonl(get_buffer_len(oa));
     string res((char*)&len,sizeof(len));
     res.append(get_buffer(oa),get_buffer_len(oa));
-    
+
     close_buffer_oarchive(&oa,1);
     return res;
 }
 
 string ZooStatResponse::toString() const{
     oarchive* oa=create_buffer_oarchive();
-    
+
     ReplyHeader h = {xid_,1,rc_};
     serialize_ReplyHeader(oa, "hdr", &h);
-    
+
     SetDataResponse resp;
     resp.stat=stat_;
     serialize_SetDataResponse(oa, "reply", &resp);
     int32_t len=htonl(get_buffer_len(oa));
     string res((char*)&len,sizeof(len));
     res.append(get_buffer(oa),get_buffer_len(oa));
-    
+
     close_buffer_oarchive(&oa,1);
     return res;
 }
 
 string ZooGetChildrenResponse::toString() const{
     oarchive* oa=create_buffer_oarchive();
-    
+
     ReplyHeader h = {xid_,1,rc_};
     serialize_ReplyHeader(oa, "hdr", &h);
- 
+
     GetChildrenResponse resp;
     // populate the string vector
     allocate_String_vector(&resp.children,strings_.size());
@@ -385,11 +403,11 @@ string ZooGetChildrenResponse::toString(
         resp.children.data[i]=strdup(strings_[i].c_str());
     serialize_GetChildrenResponse(oa, "reply", &resp);
     deallocate_GetChildrenResponse(&resp);
-    
+
     int32_t len=htonl(get_buffer_len(oa));
     string res((char*)&len,sizeof(len));
     res.append(get_buffer(oa),get_buffer_len(oa));
-    
+
     close_buffer_oarchive(&oa,1);
     return res;
 }
@@ -398,35 +416,35 @@ string ZNodeEvent::toString() const{
     oarchive* oa=create_buffer_oarchive();
     struct WatcherEvent evt = {type_,0,(char*)path_.c_str()};
     struct ReplyHeader h = {WATCHER_EVENT_XID,0,ZOK };
-    
+
     serialize_ReplyHeader(oa, "hdr", &h);
     serialize_WatcherEvent(oa, "event", &evt);
-    
+
     int32_t len=htonl(get_buffer_len(oa));
     string res((char*)&len,sizeof(len));
     res.append(get_buffer(oa),get_buffer_len(oa));
-    
+
     close_buffer_oarchive(&oa,1);
     return res;
 }
 
 string PingResponse::toString() const{
     oarchive* oa=create_buffer_oarchive();
-    
+
     ReplyHeader h = {PING_XID,1,ZOK};
     serialize_ReplyHeader(oa, "hdr", &h);
-    
+
     int32_t len=htonl(get_buffer_len(oa));
     string res((char*)&len,sizeof(len));
     res.append(get_buffer(oa),get_buffer_len(oa));
-    
+
     close_buffer_oarchive(&oa,1);
     return res;
 }
 
 //******************************************************************************
 // Zookeeper server simulator
-// 
+//
 bool ZookeeperServer::hasMoreRecv() const{
   return recvHasMore.get()!=0  || connectionLost;
 }
@@ -467,7 +485,7 @@ void ZookeeperServer::notifyBufferSent(c
             // handle the handshake
             int64_t sessId=sessionExpired?req->sessionId+1:req->sessionId;
             sessionExpired=false;
-            addRecvResponse(new HandshakeResponse(sessId));            
+            addRecvResponse(new HandshakeResponse(sessId));
             return;
         }
         // not a connect request -- fall thru
@@ -491,7 +509,8 @@ void ZookeeperServer::notifyBufferSent(c
         ++closeSent;
         return; // no reply for close requests
     }
-    // get the next response from the response queue and append it to the receive list
+    // get the next response from the response queue and append it to the
+    // receive list
     Element e;
     {
         synchronized(respQMx);
@@ -507,7 +526,7 @@ void ZookeeperServer::notifyBufferSent(c
 void forceConnected(zhandle_t* zh){
     // simulate connected state
     zh->state=ZOO_CONNECTED_STATE;
-    
+
     // Simulate we're connected to the first host in our host list
     zh->fd=ZookeeperServer::FD;
     assert(zh->addrs.count > 0);
@@ -515,8 +534,8 @@ void forceConnected(zhandle_t* zh){
     zh->addrs.next++;
 
     zh->input_buffer=0;
-    gettimeofday(&zh->last_recv,0);    
-    gettimeofday(&zh->last_send,0);    
+    gettimeofday(&zh->last_recv,0);
+    gettimeofday(&zh->last_send,0);
 }
 
 void terminateZookeeperThreads(zhandle_t* zh){

Modified: zookeeper/trunk/src/c/tests/ZKMocks.h
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/c/tests/ZKMocks.h?rev=1608620&r1=1608619&r2=1608620&view=diff
==============================================================================
--- zookeeper/trunk/src/c/tests/ZKMocks.h (original)
+++ zookeeper/trunk/src/c/tests/ZKMocks.h Mon Jul  7 21:43:44 2014
@@ -303,8 +303,9 @@ public:
 class HandshakeResponse: public Response
 {
 public:
-    HandshakeResponse(int64_t sessId=1)
-        :protocolVersion(1),timeOut(10000),sessionId(sessId),passwd_len(sizeof(passwd))
+    HandshakeResponse(int64_t sessId=1):
+        protocolVersion(1),timeOut(10000),sessionId(sessId),
+        passwd_len(sizeof(passwd)),readOnly(0)
     {
         memcpy(passwd,"1234567890123456",sizeof(passwd));
     }
@@ -313,6 +314,7 @@ public:
     int64_t sessionId;
     int32_t passwd_len;
     char passwd[16];
+    char readOnly;
     virtual std::string toString() const ;
 };
 

Added: zookeeper/trunk/src/c/tests/quorum.cfg
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/c/tests/quorum.cfg?rev=1608620&view=auto
==============================================================================
--- zookeeper/trunk/src/c/tests/quorum.cfg (added)
+++ zookeeper/trunk/src/c/tests/quorum.cfg Mon Jul  7 21:43:44 2014
@@ -0,0 +1,8 @@
+tickTime=500
+initLimit=10
+syncLimit=5
+dataDir=/tmp/zkdata
+clientPort=22181
+server.1=localhost:22881:33881
+server.2=localhost:22882:33882
+server.3=localhost:22883:33883

Modified: zookeeper/trunk/src/c/tests/zkServer.sh
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/c/tests/zkServer.sh?rev=1608620&r1=1608619&r2=1608620&view=diff
==============================================================================
--- zookeeper/trunk/src/c/tests/zkServer.sh (original)
+++ zookeeper/trunk/src/c/tests/zkServer.sh Mon Jul  7 21:43:44 2014
@@ -21,7 +21,7 @@ ZOOPORT=22181
 
 if [ "x$1" == "x" ]
 then
-    echo "USAGE: $0 startClean|start|stop hostPorts"
+    echo "USAGE: $0 startClean|start|startReadOnly|stop hostPorts"
     exit 2
 fi
 
@@ -151,6 +151,23 @@ start|startClean)
     fi
 
     ;;
+startReadOnly)
+    if [ "x${base_dir}" == "x" ]
+    then
+        echo "this target is for unit tests only"
+        exit 2
+    else
+        mkdir -p /tmp/zkdata
+        rm -f /tmp/zkdata/myid && echo 1 > /tmp/zkdata/myid
+
+        # force read-only mode
+        java -cp "$CLASSPATH" -Dreadonlymode.enabled=true org.apache.zookeeper.server.quorum.QuorumPeerMain ${base_dir}/src/c/tests/quorum.cfg &> "${base_dir}/build/tmp/zk.log" &
+        pid=$!
+        echo -n $pid > "${base_dir}/build/tmp/zk.pid"
+        sleep 3 # wait until read-only server is up
+    fi
+
+    ;;
 stop)
     # Already killed above
     ;;



Mime
View raw message