zookeeper-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From f..@apache.org
Subject zookeeper git commit: ZOOKEEPER-761: Remove *synchronous* calls from the *single-threaded* C client API
Date Fri, 16 Dec 2016 09:34:04 GMT
Repository: zookeeper
Updated Branches:
  refs/heads/master cd0e32383 -> c0868332a


ZOOKEEPER-761: Remove *synchronous* calls from the *single-threaded* C client API

Reviewers: fpj <fpj@apache.org>, hanm <hanm@cloudera.com>, rgs <rgs+zk@itevenworks.net>


Project: http://git-wip-us.apache.org/repos/asf/zookeeper/repo
Commit: http://git-wip-us.apache.org/repos/asf/zookeeper/commit/c0868332
Tree: http://git-wip-us.apache.org/repos/asf/zookeeper/tree/c0868332
Diff: http://git-wip-us.apache.org/repos/asf/zookeeper/diff/c0868332

Branch: refs/heads/master
Commit: c0868332a76cb6d0fa120058dc6cf7c55b0d0ec0
Parents: cd0e323
Author: fpj <fpj@apache.org>
Authored: Fri Dec 16 09:27:25 2016 +0000
Committer: fpj <fpj@apache.org>
Committed: Fri Dec 16 09:27:25 2016 +0000

----------------------------------------------------------------------
 src/c/include/zookeeper.h         |  70 +++---
 src/c/src/cli.c                   |  15 +-
 src/c/src/st_adaptor.c            |  18 --
 src/c/src/zk_adaptor.h            |   2 +
 src/c/src/zookeeper.c             | 419 ++++++++++++++++++---------------
 src/c/tests/TestClient.cc         |   5 +
 src/c/tests/TestMulti.cc          |   4 +-
 src/c/tests/TestReadOnlyClient.cc |   4 +-
 src/c/tests/TestReconfigServer.cc |   3 +-
 9 files changed, 276 insertions(+), 264 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/zookeeper/blob/c0868332/src/c/include/zookeeper.h
----------------------------------------------------------------------
diff --git a/src/c/include/zookeeper.h b/src/c/include/zookeeper.h
index b92acbb..973daa4 100644
--- a/src/c/include/zookeeper.h
+++ b/src/c/include/zookeeper.h
@@ -1505,6 +1505,41 @@ ZOOAPI void zoo_set_log_callback(zhandle_t *zh, log_callback_fn callback);
 ZOOAPI void zoo_deterministic_conn_order(int yesOrNo);
 
 /**
+ * Type of watchers: used to select which type of watchers should be removed
+ */
+typedef enum {
+  ZWATCHERTYPE_CHILDREN = 1,
+  ZWATCHERTYPE_DATA = 2,
+  ZWATCHERTYPE_ANY = 3
+} ZooWatcherType;
+
+/**
+ * \brief removes the watchers for the given path and watcher type.
+ *
+ * \param zh the zookeeper handle obtained by a call to \ref zookeeper_init
+ * \param path the path for which watchers will be removed
+ * \param wtype the watcher type to be removed
+ * \param watcher the watcher to be removed, if null all watchers for that
+ * path (and watcher type) will be removed
+ * \param watcherCtx the contex associated with the watcher to be removed
+ * \param local whether the watchers will be removed locally even if there is
+ * no server connection
+ * \return the return code for the function call.
+ * ZOK - operation completed successfully
+ * ZNOWATCHER - the watcher couldn't be found.
+ * ZINVALIDSTATE - if !local, zhandle state is either ZOO_SESSION_EXPIRED_STATE
+ * or ZOO_AUTH_FAILED_STATE
+ * ZBADARGUMENTS - invalid input parameters
+ * ZMARSHALLINGERROR - failed to marshall a request; possibly, out of memory
+ * ZSYSTEMERROR - a system error occured
+ */
+ZOOAPI int zoo_aremove_watchers(zhandle_t *zh, const char *path,
+        ZooWatcherType wtype, watcher_fn watcher, void *watcherCtx, int local,
+        void_completion_t *completion, const void *data);
+
+
+#ifdef THREADED
+/**
  * \brief create a node synchronously.
  *
  * This method will create a node in ZooKeeper. A node can only be created if
@@ -2002,15 +2037,6 @@ ZOOAPI int zoo_set_acl(zhandle_t *zh, const char *path, int version,
 ZOOAPI int zoo_multi(zhandle_t *zh, int count, const zoo_op_t *ops, zoo_op_result_t *results);
 
 /**
- * Type of watchers: used to select which type of watchers should be removed
- */
-typedef enum {
-  ZWATCHERTYPE_CHILDREN = 1,
-  ZWATCHERTYPE_DATA = 2,
-  ZWATCHERTYPE_ANY = 3
-} ZooWatcherType;
-
-/**
  * \brief removes the watchers for the given path and watcher type.
  *
  * \param zh the zookeeper handle obtained by a call to \ref zookeeper_init
@@ -2032,31 +2058,7 @@ typedef enum {
  */
 ZOOAPI int zoo_remove_watchers(zhandle_t *zh, const char *path,
         ZooWatcherType wtype, watcher_fn watcher, void *watcherCtx, int local);
-
-/**
- * \brief removes the watchers for the given path and watcher type.
- *
- * \param zh the zookeeper handle obtained by a call to \ref zookeeper_init
- * \param path the path for which watchers will be removed
- * \param wtype the watcher type to be removed
- * \param watcher the watcher to be removed, if null all watchers for that
- * path (and watcher type) will be removed
- * \param watcherCtx the contex associated with the watcher to be removed
- * \param local whether the watchers will be removed locally even if there is
- * no server connection
- * \return the return code for the function call.
- * ZOK - operation completed successfully
- * ZNOWATCHER - the watcher couldn't be found.
- * ZINVALIDSTATE - if !local, zhandle state is either ZOO_SESSION_EXPIRED_STATE
- * or ZOO_AUTH_FAILED_STATE
- * ZBADARGUMENTS - invalid input parameters
- * ZMARSHALLINGERROR - failed to marshall a request; possibly, out of memory
- * ZSYSTEMERROR - a system error occured
- */
-ZOOAPI int zoo_aremove_watchers(zhandle_t *zh, const char *path,
-        ZooWatcherType wtype, watcher_fn watcher, void *watcherCtx, int local,
-        void_completion_t *completion, const void *data);
-
+#endif
 #ifdef __cplusplus
 }
 #endif

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/c0868332/src/c/src/cli.c
----------------------------------------------------------------------
diff --git a/src/c/src/cli.c b/src/c/src/cli.c
index 0f451cf..6ca4a41 100644
--- a/src/c/src/cli.c
+++ b/src/c/src/cli.c
@@ -504,13 +504,8 @@ void processline(char *line) {
         }
         *ptr = '\0';
         ptr++;
-        if (async) {
-            rc = zoo_aset(zh, line, ptr, strlen(ptr), -1, my_stat_completion,
-                    strdup(line));
-        } else {
-            struct Stat stat;
-            rc = zoo_set2(zh, line, ptr, strlen(ptr), -1, &stat);
-        }
+        rc = zoo_aset(zh, line, ptr, strlen(ptr), -1, my_stat_completion,
+                strdup(line));
         if (rc) {
             fprintf(stderr, "Error %d for %s\n", rc, line);
         }
@@ -579,11 +574,7 @@ void processline(char *line) {
             fprintf(stderr, "Path must start with /, found: %s\n", line);
             return;
         }
-        if (async) {
-            rc = zoo_adelete(zh, line, -1, my_void_completion, strdup(line));
-        } else {
-            rc = zoo_delete(zh, line, -1);
-        }
+        rc = zoo_adelete(zh, line, -1, my_void_completion, strdup(line));
         if (rc) {
             fprintf(stderr, "Error %d for %s\n", rc, line);
         }

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/c0868332/src/c/src/st_adaptor.c
----------------------------------------------------------------------
diff --git a/src/c/src/st_adaptor.c b/src/c/src/st_adaptor.c
index 0f62966..5e9a4ff 100644
--- a/src/c/src/st_adaptor.c
+++ b/src/c/src/st_adaptor.c
@@ -48,24 +48,6 @@ int unlock_completion_list(completion_head_t *l)
 {
     return 0;
 }
-struct sync_completion *alloc_sync_completion(void)
-{
-    return (struct sync_completion*)calloc(1, sizeof(struct sync_completion));
-}
-int wait_sync_completion(struct sync_completion *sc)
-{
-    return 0;
-}
-
-void free_sync_completion(struct sync_completion *sc)
-{
-    free(sc);
-}
-
-void notify_sync_completion(struct sync_completion *sc)
-{
-}
-
 int process_async(int outstanding_sync)
 {
     return outstanding_sync == 0;

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/c0868332/src/c/src/zk_adaptor.h
----------------------------------------------------------------------
diff --git a/src/c/src/zk_adaptor.h b/src/c/src/zk_adaptor.h
index 74ca4c0..97995e3 100644
--- a/src/c/src/zk_adaptor.h
+++ b/src/c/src/zk_adaptor.h
@@ -274,10 +274,12 @@ struct _zhandle {
 int adaptor_init(zhandle_t *zh);
 void adaptor_finish(zhandle_t *zh);
 void adaptor_destroy(zhandle_t *zh);
+#if THREADED
 struct sync_completion *alloc_sync_completion(void);
 int wait_sync_completion(struct sync_completion *sc);
 void free_sync_completion(struct sync_completion *sc);
 void notify_sync_completion(struct sync_completion *sc);
+#endif
 int adaptor_send_queue(zhandle_t *zh, int timeout);
 int process_async(int outstanding_sync);
 void process_completions(zhandle_t *zh);

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/c0868332/src/c/src/zookeeper.c
----------------------------------------------------------------------
diff --git a/src/c/src/zookeeper.c b/src/c/src/zookeeper.c
index d36b05a..f6d09b7 100644
--- a/src/c/src/zookeeper.c
+++ b/src/c/src/zookeeper.c
@@ -234,6 +234,13 @@ static __attribute__((unused)) void print_completion_queue(zhandle_t
*zh);
 static void *SYNCHRONOUS_MARKER = (void*)&SYNCHRONOUS_MARKER;
 static int isValidPath(const char* path, const int flags);
 
+#ifdef THREADED
+static void process_sync_completion(zhandle_t *zh,
+        completion_list_t *cptr,
+        struct sync_completion *sc,
+        struct iarchive *ia);
+#endif
+
 #ifdef _WIN32
 typedef SOCKET socket_t;
 typedef int sendsize_t;
@@ -258,6 +265,15 @@ static void zookeeper_set_sock_timeout(zhandle_t *, socket_t, int);
 static socket_t zookeeper_connect(zhandle_t *, struct sockaddr_storage *, socket_t);
 
 
+/*
+ * abort due to the use of a sync api in a singlethreaded environment
+ */
+static void abort_singlethreaded(zhandle_t *zh)
+{
+    LOG_ERROR(LOGCALLBACK(zh), "Sync completion used without threads");
+    abort();
+}
+
 static sendsize_t zookeeper_send(socket_t s, const void* buf, size_t len)
 {
     return send(s, buf, len, SEND_FLAGS);
@@ -1626,12 +1642,16 @@ void free_completions(zhandle_t *zh,int callCompletion,int reason)
 
             tmp_list.head = cptr->next;
             if (cptr->c.data_result == SYNCHRONOUS_MARKER) {
+#ifdef THREADED
                 struct sync_completion
                             *sc = (struct sync_completion*)cptr->data;
                 sc->rc = reason;
                 notify_sync_completion(sc);
                 zh->outstanding_sync--;
                 destroy_completion_entry(cptr);
+#else
+                abort_singlethreaded(zh);
+#endif
             } else if (callCompletion) {
                 // Fake the response
                 buffer_list_t *bptr;
@@ -2530,124 +2550,6 @@ completion_list_t *dequeue_completion(completion_head_t *list)
     return cptr;
 }
 
-static void process_sync_completion(zhandle_t *zh,
-        completion_list_t *cptr,
-        struct sync_completion *sc,
-        struct iarchive *ia)
-{
-    LOG_DEBUG(LOGCALLBACK(zh), "Processing sync_completion with type=%d xid=%#x rc=%d",
-            cptr->c.type, cptr->xid, sc->rc);
-
-    switch(cptr->c.type) {
-    case COMPLETION_DATA:
-        if (sc->rc==0) {
-            struct GetDataResponse res;
-            int len;
-            deserialize_GetDataResponse(ia, "reply", &res);
-            if (res.data.len <= sc->u.data.buff_len) {
-                len = res.data.len;
-            } else {
-                len = sc->u.data.buff_len;
-            }
-            sc->u.data.buff_len = len;
-            // check if len is negative
-            // just of NULL which is -1 int
-            if (len == -1) {
-                sc->u.data.buffer = NULL;
-            } else {
-                memcpy(sc->u.data.buffer, res.data.buff, len);
-            }
-            sc->u.data.stat = res.stat;
-            deallocate_GetDataResponse(&res);
-        }
-        break;
-    case COMPLETION_STAT:
-        if (sc->rc==0) {
-            struct SetDataResponse res;
-            deserialize_SetDataResponse(ia, "reply", &res);
-            sc->u.stat = res.stat;
-            deallocate_SetDataResponse(&res);
-        }
-        break;
-    case COMPLETION_STRINGLIST:
-        if (sc->rc==0) {
-            struct GetChildrenResponse res;
-            deserialize_GetChildrenResponse(ia, "reply", &res);
-            sc->u.strs2 = res.children;
-            /* We don't deallocate since we are passing it back */
-            // deallocate_GetChildrenResponse(&res);
-        }
-        break;
-    case COMPLETION_STRINGLIST_STAT:
-        if (sc->rc==0) {
-            struct GetChildren2Response res;
-            deserialize_GetChildren2Response(ia, "reply", &res);
-            sc->u.strs_stat.strs2 = res.children;
-            sc->u.strs_stat.stat2 = res.stat;
-            /* We don't deallocate since we are passing it back */
-            // deallocate_GetChildren2Response(&res);
-        }
-        break;
-    case COMPLETION_STRING:
-        if (sc->rc==0) {
-            struct CreateResponse res;
-            int len;
-            const char * client_path;
-            deserialize_CreateResponse(ia, "reply", &res);
-            //ZOOKEEPER-1027
-            client_path = sub_string(zh, res.path);
-            len = strlen(client_path) + 1;if (len > sc->u.str.str_len) {
-                len = sc->u.str.str_len;
-            }
-            if (len > 0) {
-                memcpy(sc->u.str.str, client_path, len - 1);
-                sc->u.str.str[len - 1] = '\0';
-            }
-            free_duplicate_path(client_path, res.path);
-            deallocate_CreateResponse(&res);
-        }
-        break;
-    case COMPLETION_STRING_STAT:
-        if (sc->rc==0) {
-            struct Create2Response res;
-            int len;
-            const char * client_path;
-            deserialize_Create2Response(ia, "reply", &res);
-            client_path = sub_string(zh, res.path);
-            len = strlen(client_path) + 1;
-            if (len > sc->u.str.str_len) {
-                len = sc->u.str.str_len;
-            }
-            if (len > 0) {
-                memcpy(sc->u.str.str, client_path, len - 1);
-                sc->u.str.str[len - 1] = '\0';
-            }
-            free_duplicate_path(client_path, res.path);
-            sc->u.stat = res.stat;
-            deallocate_Create2Response(&res);
-        }
-        break;
-    case COMPLETION_ACLLIST:
-        if (sc->rc==0) {
-            struct GetACLResponse res;
-            deserialize_GetACLResponse(ia, "reply", &res);
-            sc->u.acl.acl = res.acl;
-            sc->u.acl.stat = res.stat;
-            /* We don't deallocate since we are passing it back */
-            //deallocate_GetACLResponse(&res);
-        }
-        break;
-    case COMPLETION_VOID:
-        break;
-    case COMPLETION_MULTI:
-        sc->rc = deserialize_multi(zh, cptr->xid, cptr, ia);
-        break;
-    default:
-        LOG_DEBUG(LOGCALLBACK(zh), "Unsupported completion type=%d", cptr->c.type);
-        break;
-    }
-}
-
 static int deserialize_multi(zhandle_t *zh, int xid, completion_list_t *cptr, struct iarchive
*ia)
 {
     int rc = 0;
@@ -2967,6 +2869,7 @@ int zookeeper_process(zhandle_t *zh, int events)
                 cptr->buffer = bptr;
                 queue_completion(&zh->completions_to_process, cptr, 0);
             } else {
+#ifdef THREADED
                 struct sync_completion
                         *sc = (struct sync_completion*)cptr->data;
                 sc->rc = rc;
@@ -2977,6 +2880,9 @@ int zookeeper_process(zhandle_t *zh, int events)
                 free_buffer(bptr);
                 zh->outstanding_sync--;
                 destroy_completion_entry(cptr);
+#else
+                abort_singlethreaded(zh);
+#endif
             }
         }
 
@@ -4062,6 +3968,76 @@ int zoo_amulti(zhandle_t *zh, int count, const zoo_op_t *ops,
     return (rc < 0) ? ZMARSHALLINGERROR : ZOK;
 }
 
+
+int zoo_aremove_watchers(zhandle_t *zh, const char *path, ZooWatcherType wtype,
+        watcher_fn watcher, void *watcherCtx, int local,
+        void_completion_t *completion, const void *data)
+{
+    char *server_path = prepend_string(zh, path);
+    int rc;
+    struct oarchive *oa;
+    struct RequestHeader h = { get_xid(), ZOO_REMOVE_WATCHES };
+    struct RemoveWatchesRequest req =  { (char*)server_path, wtype };
+    watcher_deregistration_t *wdo;
+
+    if (!zh || !isValidPath(server_path, 0)) {
+        rc = ZBADARGUMENTS;
+        goto done;
+    }
+
+    if (!local && is_unrecoverable(zh)) {
+        rc = ZINVALIDSTATE;
+        goto done;
+    }
+
+    if (!pathHasWatcher(zh, server_path, wtype, watcher, watcherCtx)) {
+        rc = ZNOWATCHER;
+        goto done;
+    }
+
+    if (local) {
+        removeWatchers(zh, server_path, wtype, watcher, watcherCtx);
+#ifdef THREADED
+        notify_sync_completion((struct sync_completion *)data);
+#endif
+        rc = ZOK;
+        goto done;
+    }
+
+    oa = create_buffer_oarchive();
+    rc = serialize_RequestHeader(oa, "header", &h);
+    rc = rc < 0 ? rc : serialize_RemoveWatchesRequest(oa, "req", &req);
+    if (rc < 0) {
+        goto done;
+    }
+
+    wdo = create_watcher_deregistration(server_path, watcher, watcherCtx,
+                                        wtype);
+    if (!wdo) {
+        rc = ZSYSTEMERROR;
+        goto done;
+    }
+
+    enter_critical(zh);
+    rc = add_completion_deregistration(zh, h.xid, COMPLETION_VOID,
+                                       completion, data, 0, wdo, 0);
+    rc = rc < 0 ? rc : queue_buffer_bytes(&zh->to_send, get_buffer(oa),
+            get_buffer_len(oa));
+    rc = rc < 0 ? ZMARSHALLINGERROR : ZOK;
+    leave_critical(zh);
+
+    /* We queued the buffer, so don't free it */
+    close_buffer_oarchive(&oa, 0);
+
+    LOG_DEBUG(LOGCALLBACK(zh), "Sending request xid=%#x for path [%s] to %s",
+              h.xid, path, zoo_get_current_server(zh));
+
+    adaptor_send_queue(zh, 0);
+
+done:
+    free_duplicate_path(server_path, path);
+    return rc;
+}
 void zoo_create_op_init(zoo_op_t *op, const char *path, const char *value,
         int valuelen,  const struct ACL_vector *acl, int flags,
         char *path_buffer, int path_buffer_len)
@@ -4120,25 +4096,6 @@ void zoo_check_op_init(zoo_op_t *op, const char *path, int version)
     op->check_op.version = version;
 }
 
-int zoo_multi(zhandle_t *zh, int count, const zoo_op_t *ops, zoo_op_result_t *results)
-{
-    int rc;
-
-    struct sync_completion *sc = alloc_sync_completion();
-    if (!sc) {
-        return ZSYSTEMERROR;
-    }
-
-    rc = zoo_amulti(zh, count, ops, results, SYNCHRONOUS_MARKER, sc);
-    if (rc == ZOK) {
-        wait_sync_completion(sc);
-        rc = sc->rc;
-    }
-    free_sync_completion(sc);
-
-    return rc;
-}
-
 /* specify timeout of 0 to make the function non-blocking */
 /* timeout is in milliseconds */
 int flush_send_queue(zhandle_t*zh, int timeout)
@@ -4375,6 +4332,126 @@ void zoo_deterministic_conn_order(int yesOrNo)
     disable_conn_permute=yesOrNo;
 }
 
+#ifdef THREADED
+
+static void process_sync_completion(zhandle_t *zh,
+        completion_list_t *cptr,
+        struct sync_completion *sc,
+        struct iarchive *ia)
+{
+    LOG_DEBUG(LOGCALLBACK(zh), "Processing sync_completion with type=%d xid=%#x rc=%d",
+            cptr->c.type, cptr->xid, sc->rc);
+
+    switch(cptr->c.type) {
+    case COMPLETION_DATA:
+        if (sc->rc==0) {
+            struct GetDataResponse res;
+            int len;
+            deserialize_GetDataResponse(ia, "reply", &res);
+            if (res.data.len <= sc->u.data.buff_len) {
+                len = res.data.len;
+            } else {
+                len = sc->u.data.buff_len;
+            }
+            sc->u.data.buff_len = len;
+            // check if len is negative
+            // just of NULL which is -1 int
+            if (len == -1) {
+                sc->u.data.buffer = NULL;
+            } else {
+                memcpy(sc->u.data.buffer, res.data.buff, len);
+            }
+            sc->u.data.stat = res.stat;
+            deallocate_GetDataResponse(&res);
+        }
+        break;
+    case COMPLETION_STAT:
+        if (sc->rc==0) {
+            struct SetDataResponse res;
+            deserialize_SetDataResponse(ia, "reply", &res);
+            sc->u.stat = res.stat;
+            deallocate_SetDataResponse(&res);
+        }
+        break;
+    case COMPLETION_STRINGLIST:
+        if (sc->rc==0) {
+            struct GetChildrenResponse res;
+            deserialize_GetChildrenResponse(ia, "reply", &res);
+            sc->u.strs2 = res.children;
+            /* We don't deallocate since we are passing it back */
+            // deallocate_GetChildrenResponse(&res);
+        }
+        break;
+    case COMPLETION_STRINGLIST_STAT:
+        if (sc->rc==0) {
+            struct GetChildren2Response res;
+            deserialize_GetChildren2Response(ia, "reply", &res);
+            sc->u.strs_stat.strs2 = res.children;
+            sc->u.strs_stat.stat2 = res.stat;
+            /* We don't deallocate since we are passing it back */
+            // deallocate_GetChildren2Response(&res);
+        }
+        break;
+    case COMPLETION_STRING:
+        if (sc->rc==0) {
+            struct CreateResponse res;
+            int len;
+            const char * client_path;
+            deserialize_CreateResponse(ia, "reply", &res);
+            //ZOOKEEPER-1027
+            client_path = sub_string(zh, res.path);
+            len = strlen(client_path) + 1;if (len > sc->u.str.str_len) {
+                len = sc->u.str.str_len;
+            }
+            if (len > 0) {
+                memcpy(sc->u.str.str, client_path, len - 1);
+                sc->u.str.str[len - 1] = '\0';
+            }
+            free_duplicate_path(client_path, res.path);
+            deallocate_CreateResponse(&res);
+        }
+        break;
+    case COMPLETION_STRING_STAT:
+        if (sc->rc==0) {
+            struct Create2Response res;
+            int len;
+            const char * client_path;
+            deserialize_Create2Response(ia, "reply", &res);
+            client_path = sub_string(zh, res.path);
+            len = strlen(client_path) + 1;
+            if (len > sc->u.str.str_len) {
+                len = sc->u.str.str_len;
+            }
+            if (len > 0) {
+                memcpy(sc->u.str.str, client_path, len - 1);
+                sc->u.str.str[len - 1] = '\0';
+            }
+            free_duplicate_path(client_path, res.path);
+            sc->u.stat = res.stat;
+            deallocate_Create2Response(&res);
+        }
+        break;
+    case COMPLETION_ACLLIST:
+        if (sc->rc==0) {
+            struct GetACLResponse res;
+            deserialize_GetACLResponse(ia, "reply", &res);
+            sc->u.acl.acl = res.acl;
+            sc->u.acl.stat = res.stat;
+            /* We don't deallocate since we are passing it back */
+            //deallocate_GetACLResponse(&res);
+        }
+        break;
+    case COMPLETION_VOID:
+        break;
+    case COMPLETION_MULTI:
+        sc->rc = deserialize_multi(zh, cptr->xid, cptr, ia);
+        break;
+    default:
+        LOG_DEBUG(LOGCALLBACK(zh), "Unsupported completion type=%d", cptr->c.type);
+        break;
+    }
+}
+
 /*---------------------------------------------------------------------------*
  * SYNC API
  *---------------------------------------------------------------------------*/
@@ -4712,70 +4789,22 @@ int zoo_remove_watchers(zhandle_t *zh, const char *path, ZooWatcherType
wtype,
     return rc;
 }
 
-int zoo_aremove_watchers(zhandle_t *zh, const char *path, ZooWatcherType wtype,
-        watcher_fn watcher, void *watcherCtx, int local,
-        void_completion_t *completion, const void *data)
+int zoo_multi(zhandle_t *zh, int count, const zoo_op_t *ops, zoo_op_result_t *results)
 {
-    char *server_path = prepend_string(zh, path);
     int rc;
-    struct oarchive *oa;
-    struct RequestHeader h = { get_xid(), ZOO_REMOVE_WATCHES };
-    struct RemoveWatchesRequest req =  { (char*)server_path, wtype };
-    watcher_deregistration_t *wdo;
-
-    if (!zh || !isValidPath(server_path, 0)) {
-        rc = ZBADARGUMENTS;
-        goto done;
-    }
 
-    if (!local && is_unrecoverable(zh)) {
-        rc = ZINVALIDSTATE;
-        goto done;
-    }
-
-    if (!pathHasWatcher(zh, server_path, wtype, watcher, watcherCtx)) {
-        rc = ZNOWATCHER;
-        goto done;
-    }
-
-    if (local) {
-        removeWatchers(zh, server_path, wtype, watcher, watcherCtx);
-        notify_sync_completion((struct sync_completion *)data);
-        rc = ZOK;
-        goto done;
-    }
-
-    oa = create_buffer_oarchive();
-    rc = serialize_RequestHeader(oa, "header", &h);
-    rc = rc < 0 ? rc : serialize_RemoveWatchesRequest(oa, "req", &req);
-    if (rc < 0) {
-        goto done;
+    struct sync_completion *sc = alloc_sync_completion();
+    if (!sc) {
+        return ZSYSTEMERROR;
     }
 
-    wdo = create_watcher_deregistration(server_path, watcher, watcherCtx,
-                                        wtype);
-    if (!wdo) {
-        rc = ZSYSTEMERROR;
-        goto done;
+    rc = zoo_amulti(zh, count, ops, results, SYNCHRONOUS_MARKER, sc);
+    if (rc == ZOK) {
+        wait_sync_completion(sc);
+        rc = sc->rc;
     }
+    free_sync_completion(sc);
 
-    enter_critical(zh);
-    rc = add_completion_deregistration(zh, h.xid, COMPLETION_VOID,
-                                       completion, data, 0, wdo, 0);
-    rc = rc < 0 ? rc : queue_buffer_bytes(&zh->to_send, get_buffer(oa),
-            get_buffer_len(oa));
-    rc = rc < 0 ? ZMARSHALLINGERROR : ZOK;
-    leave_critical(zh);
-
-    /* We queued the buffer, so don't free it */
-    close_buffer_oarchive(&oa, 0);
-
-    LOG_DEBUG(LOGCALLBACK(zh), "Sending request xid=%#x for path [%s] to %s",
-              h.xid, path, zoo_get_current_server(zh));
-
-    adaptor_send_queue(zh, 0);
-
-done:
-    free_duplicate_path(server_path, path);
     return rc;
 }
+#endif

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/c0868332/src/c/tests/TestClient.cc
----------------------------------------------------------------------
diff --git a/src/c/tests/TestClient.cc b/src/c/tests/TestClient.cc
index ab6a133..92b1699 100644
--- a/src/c/tests/TestClient.cc
+++ b/src/c/tests/TestClient.cc
@@ -47,6 +47,10 @@ struct buff_struct_2 {
     char *buffer;
 };
 
+// TODO(br33d): the vast majority of this test is not usable with single threaded.
+//              it needs a overhaul to work properly with both threaded and single
+//              threaded (ZOOKEEPER-2640)
+#ifdef THREADED
 // For testing LogMessage Callback functionality
 list<string> logMessages;
 void logMessageHandler(const char* message) {
@@ -1415,3 +1419,4 @@ volatile int Zookeeper_simpleSystem::count;
 zhandle_t *Zookeeper_simpleSystem::async_zk;
 const char Zookeeper_simpleSystem::hostPorts[] = "127.0.0.1:22181";
 CPPUNIT_TEST_SUITE_REGISTRATION(Zookeeper_simpleSystem);
+#endif

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/c0868332/src/c/tests/TestMulti.cc
----------------------------------------------------------------------
diff --git a/src/c/tests/TestMulti.cc b/src/c/tests/TestMulti.cc
index a54e047..0ee9566 100644
--- a/src/c/tests/TestMulti.cc
+++ b/src/c/tests/TestMulti.cc
@@ -160,12 +160,12 @@ public:
     }
 } watchctx_t;
 
+#ifdef THREADED
 class Zookeeper_multi : public CPPUNIT_NS::TestFixture
 {
     CPPUNIT_TEST_SUITE(Zookeeper_multi);
 //FIXME: None of these tests pass in single-threaded mode. It seems to be a
 //flaw in the test suite setup.
-#ifdef THREADED
     CPPUNIT_TEST(testCreate);
     CPPUNIT_TEST(testCreateDelete);
     CPPUNIT_TEST(testInvalidVersion);
@@ -178,7 +178,6 @@ class Zookeeper_multi : public CPPUNIT_NS::TestFixture
     CPPUNIT_TEST(testCheck);
     CPPUNIT_TEST(testWatch);
     CPPUNIT_TEST(testSequentialNodeCreateInAsyncMulti);
-#endif
     CPPUNIT_TEST_SUITE_END();
 
     static void watcher(zhandle_t *, int type, int state, const char *path,void*v){
@@ -701,3 +700,4 @@ public:
 volatile int Zookeeper_multi::count;
 const char Zookeeper_multi::hostPorts[] = "127.0.0.1:22181";
 CPPUNIT_TEST_SUITE_REGISTRATION(Zookeeper_multi);
+#endif

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/c0868332/src/c/tests/TestReadOnlyClient.cc
----------------------------------------------------------------------
diff --git a/src/c/tests/TestReadOnlyClient.cc b/src/c/tests/TestReadOnlyClient.cc
index 37ab147..d73f189 100644
--- a/src/c/tests/TestReadOnlyClient.cc
+++ b/src/c/tests/TestReadOnlyClient.cc
@@ -27,11 +27,10 @@
 #include "Util.h"
 #include "WatchUtil.h"
 
+#ifdef THREADED
 class Zookeeper_readOnly : public CPPUNIT_NS::TestFixture {
     CPPUNIT_TEST_SUITE(Zookeeper_readOnly);
-#ifdef THREADED
     CPPUNIT_TEST(testReadOnly);
-#endif
     CPPUNIT_TEST_SUITE_END();
 
     static void watcher(zhandle_t* zh, int type, int state,
@@ -108,3 +107,4 @@ public:
 };
 
 CPPUNIT_TEST_SUITE_REGISTRATION(Zookeeper_readOnly);
+#endif

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/c0868332/src/c/tests/TestReconfigServer.cc
----------------------------------------------------------------------
diff --git a/src/c/tests/TestReconfigServer.cc b/src/c/tests/TestReconfigServer.cc
index b86b33d..2893a1b 100644
--- a/src/c/tests/TestReconfigServer.cc
+++ b/src/c/tests/TestReconfigServer.cc
@@ -25,6 +25,7 @@
 #include "Util.h"
 #include "ZooKeeperQuorumServer.h"
 
+#ifdef THREADED
 class TestReconfigServer : public CPPUNIT_NS::TestFixture {
     CPPUNIT_TEST_SUITE(TestReconfigServer);
 #ifdef THREADED
@@ -33,7 +34,6 @@ class TestReconfigServer : public CPPUNIT_NS::TestFixture {
     CPPUNIT_TEST(testRemoveFollower);
     CPPUNIT_TEST(testReconfigFailureWithoutAuth);
     CPPUNIT_TEST(testReconfigFailureWithoutServerSuperuserPasswordConfigured);
-#endif
     CPPUNIT_TEST_SUITE_END();
 
   public:
@@ -418,3 +418,4 @@ testReconfigFailureWithoutServerSuperuserPasswordConfigured() {
 }
 
 CPPUNIT_TEST_SUITE_REGISTRATION(TestReconfigServer);
+#endif


Mime
View raw message