zookeeper-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From an...@apache.org
Subject [07/22] zookeeper git commit: ZOOKEEPER-3031: MAVEN MIGRATION - Step 1.4 - move client dir
Date Tue, 21 Aug 2018 05:31:07 GMT
http://git-wip-us.apache.org/repos/asf/zookeeper/blob/176bd682/zookeeper-client/zookeeper-client-c/src/mt_adaptor.c
----------------------------------------------------------------------
diff --git a/zookeeper-client/zookeeper-client-c/src/mt_adaptor.c b/zookeeper-client/zookeeper-client-c/src/mt_adaptor.c
new file mode 100644
index 0000000..38cced4
--- /dev/null
+++ b/zookeeper-client/zookeeper-client-c/src/mt_adaptor.c
@@ -0,0 +1,551 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef THREADED
+#define THREADED
+#endif
+
+#if !defined(DLL_EXPORT) && !defined(USE_STATIC_LIB)
+#  define USE_STATIC_LIB
+#endif
+
+#ifndef _GNU_SOURCE
+#define _GNU_SOURCE
+#endif
+
+#include "zk_adaptor.h"
+#include "zookeeper_log.h"
+
+#include <stdlib.h>
+#include <stdio.h>
+#include <time.h>
+#include <fcntl.h>
+#include <assert.h>
+#include <errno.h>
+
+#ifndef WIN32
+#include <signal.h>
+#include <poll.h>
+#include <unistd.h>
+#include <sys/time.h>
+#endif
+
+int zoo_lock_auth(zhandle_t *zh)
+{
+    return pthread_mutex_lock(&zh->auth_h.lock);
+}
+int zoo_unlock_auth(zhandle_t *zh)
+{
+    return pthread_mutex_unlock(&zh->auth_h.lock);
+}
+int lock_buffer_list(buffer_head_t *l)
+{
+    return pthread_mutex_lock(&l->lock);
+}
+int unlock_buffer_list(buffer_head_t *l)
+{
+    return pthread_mutex_unlock(&l->lock);
+}
+int lock_completion_list(completion_head_t *l)
+{
+    return pthread_mutex_lock(&l->lock);
+}
+int unlock_completion_list(completion_head_t *l)
+{
+    pthread_cond_broadcast(&l->cond);
+    return pthread_mutex_unlock(&l->lock);
+}
+struct sync_completion *alloc_sync_completion(void)
+{
+    struct sync_completion *sc = (struct sync_completion*)calloc(1, sizeof(struct sync_completion));
+    if (sc) {
+       pthread_cond_init(&sc->cond, 0);
+       pthread_mutex_init(&sc->lock, 0);
+    }
+    return sc;
+}
+int wait_sync_completion(struct sync_completion *sc)
+{
+    pthread_mutex_lock(&sc->lock);
+    while (!sc->complete) {
+        pthread_cond_wait(&sc->cond, &sc->lock);
+    }
+    pthread_mutex_unlock(&sc->lock);
+    return 0;
+}
+
+void free_sync_completion(struct sync_completion *sc)
+{
+    if (sc) {
+        pthread_mutex_destroy(&sc->lock);
+        pthread_cond_destroy(&sc->cond);
+        free(sc);
+    }
+}
+
+void notify_sync_completion(struct sync_completion *sc)
+{
+    pthread_mutex_lock(&sc->lock);
+    sc->complete = 1;
+    pthread_cond_broadcast(&sc->cond);
+    pthread_mutex_unlock(&sc->lock);
+}
+
+int process_async(int outstanding_sync)
+{
+    return 0;
+}
+
+#ifdef WIN32
+unsigned __stdcall do_io( void * );
+unsigned __stdcall do_completion( void * );
+
+int handle_error(zhandle_t* zh, SOCKET sock, char* message)
+{
+       LOG_ERROR(LOGCALLBACK(zh), "%s. %d",message, WSAGetLastError());
+       closesocket (sock);
+       return -1;
+}
+
+//--create socket pair for interupting selects.
+int create_socket_pair(zhandle_t* zh, SOCKET fds[2]) 
+{ 
+    struct sockaddr_in inaddr; 
+    struct sockaddr addr; 
+    int yes=1; 
+    int len=0;
+       
+    SOCKET lst=socket(AF_INET, SOCK_STREAM,IPPROTO_TCP); 
+    if (lst ==  INVALID_SOCKET ){
+       LOG_ERROR(LOGCALLBACK(zh), "Error creating socket. %d",WSAGetLastError());
+       return -1;
+    }
+    memset(&inaddr, 0, sizeof(inaddr)); 
+    memset(&addr, 0, sizeof(addr)); 
+    inaddr.sin_family = AF_INET; 
+    inaddr.sin_addr.s_addr = htonl(INADDR_LOOPBACK); 
+    inaddr.sin_port = 0; //--system assigns the port
+
+    if ( setsockopt(lst,SOL_SOCKET,SO_REUSEADDR,(char*)&yes,sizeof(yes)) == SOCKET_ERROR  ) {
+       return handle_error(zh, lst,"Error trying to set socket option.");          
+    }  
+    if (bind(lst,(struct sockaddr *)&inaddr,sizeof(inaddr)) == SOCKET_ERROR){
+       return handle_error(zh, lst,"Error trying to bind socket.");                
+    }
+    if (listen(lst,1) == SOCKET_ERROR){
+       return handle_error(zh, lst,"Error trying to listen on socket.");
+    }
+    len=sizeof(inaddr); 
+    getsockname(lst, &addr,&len); 
+    fds[0]=socket(AF_INET, SOCK_STREAM,0); 
+    if (connect(fds[0],&addr,len) == SOCKET_ERROR){
+       return handle_error(zh, lst, "Error while connecting to socket.");
+    }
+    if ((fds[1]=accept(lst,0,0)) == INVALID_SOCKET){
+       closesocket(fds[0]);
+       return handle_error(zh, lst, "Error while accepting socket connection.");
+    }
+    closesocket(lst);  
+    return 0;
+} 
+#else
+void *do_io(void *);
+void *do_completion(void *);
+#endif
+
+
+int wakeup_io_thread(zhandle_t *zh);
+
+#ifdef WIN32
+static int set_nonblock(SOCKET fd){
+    ULONG nonblocking_flag = 1;
+    if (ioctlsocket(fd, FIONBIO, &nonblocking_flag) == 0)
+        return 1;
+    else 
+        return -1;
+}
+#else
+static int set_nonblock(int fd){
+    long l = fcntl(fd, F_GETFL);
+    if(l & O_NONBLOCK) return 0;
+    return fcntl(fd, F_SETFL, l | O_NONBLOCK);
+}
+#endif
+
+void wait_for_others(zhandle_t* zh)
+{
+    struct adaptor_threads* adaptor=zh->adaptor_priv;
+    pthread_mutex_lock(&adaptor->lock);
+    while(adaptor->threadsToWait>0) 
+        pthread_cond_wait(&adaptor->cond,&adaptor->lock);
+    pthread_mutex_unlock(&adaptor->lock);    
+}
+
+void notify_thread_ready(zhandle_t* zh)
+{
+    struct adaptor_threads* adaptor=zh->adaptor_priv;
+    pthread_mutex_lock(&adaptor->lock);
+    adaptor->threadsToWait--;
+    pthread_cond_broadcast(&adaptor->cond);
+    while(adaptor->threadsToWait>0) 
+        pthread_cond_wait(&adaptor->cond,&adaptor->lock);
+    pthread_mutex_unlock(&adaptor->lock);
+}
+
+
+void start_threads(zhandle_t* zh)
+{
+    int rc = 0;
+    struct adaptor_threads* adaptor=zh->adaptor_priv;
+    pthread_cond_init(&adaptor->cond,0);
+    pthread_mutex_init(&adaptor->lock,0);
+    adaptor->threadsToWait=2;  // wait for 2 threads before opening the barrier
+    
+    // use api_prolog() to make sure zhandle doesn't get destroyed
+    // while initialization is in progress
+    api_prolog(zh);
+    LOG_DEBUG(LOGCALLBACK(zh), "starting threads...");
+    rc=pthread_create(&adaptor->io, 0, do_io, zh);
+    assert("pthread_create() failed for the IO thread"&&!rc);
+    rc=pthread_create(&adaptor->completion, 0, do_completion, zh);
+    assert("pthread_create() failed for the completion thread"&&!rc);
+    wait_for_others(zh);
+    api_epilog(zh, 0);    
+}
+
+int adaptor_init(zhandle_t *zh)
+{
+    pthread_mutexattr_t recursive_mx_attr;
+    struct adaptor_threads *adaptor_threads = calloc(1, sizeof(*adaptor_threads));
+    if (!adaptor_threads) {
+        LOG_ERROR(LOGCALLBACK(zh), "Out of memory");
+        return -1;
+    }
+
+    /* We use a pipe for interrupting select() in unix/sol and socketpair in windows. */
+#ifdef WIN32   
+    if (create_socket_pair(zh, adaptor_threads->self_pipe) == -1){
+       LOG_ERROR(LOGCALLBACK(zh), "Can't make a socket.");
+#else
+    if(pipe(adaptor_threads->self_pipe)==-1) {
+        LOG_ERROR(LOGCALLBACK(zh), "Can't make a pipe %d",errno);
+#endif
+        free(adaptor_threads);
+        return -1;
+    }
+    set_nonblock(adaptor_threads->self_pipe[1]);
+    set_nonblock(adaptor_threads->self_pipe[0]);
+
+    pthread_mutex_init(&zh->auth_h.lock,0);
+
+    zh->adaptor_priv = adaptor_threads;
+    pthread_mutex_init(&zh->to_process.lock,0);
+    pthread_mutex_init(&adaptor_threads->zh_lock,0);
+    pthread_mutex_init(&adaptor_threads->reconfig_lock,0);
+    // to_send must be recursive mutex    
+    pthread_mutexattr_init(&recursive_mx_attr);
+    pthread_mutexattr_settype(&recursive_mx_attr, PTHREAD_MUTEX_RECURSIVE);
+    pthread_mutex_init(&zh->to_send.lock,&recursive_mx_attr);
+    pthread_mutexattr_destroy(&recursive_mx_attr);
+    
+    pthread_mutex_init(&zh->sent_requests.lock,0);
+    pthread_cond_init(&zh->sent_requests.cond,0);
+    pthread_mutex_init(&zh->completions_to_process.lock,0);
+    pthread_cond_init(&zh->completions_to_process.cond,0);
+    start_threads(zh);
+    return 0;
+}
+
+void adaptor_finish(zhandle_t *zh)
+{
+    struct adaptor_threads *adaptor_threads;
+    // make sure zh doesn't get destroyed until after we're done here
+    api_prolog(zh); 
+    adaptor_threads = zh->adaptor_priv;
+    if(adaptor_threads==0) {
+        api_epilog(zh,0);
+        return;
+    }
+
+    if(!pthread_equal(adaptor_threads->io,pthread_self())){
+        wakeup_io_thread(zh);
+        pthread_join(adaptor_threads->io, 0);
+    }else
+        pthread_detach(adaptor_threads->io);
+    
+    if(!pthread_equal(adaptor_threads->completion,pthread_self())){
+        pthread_mutex_lock(&zh->completions_to_process.lock);
+        pthread_cond_broadcast(&zh->completions_to_process.cond);
+        pthread_mutex_unlock(&zh->completions_to_process.lock);
+        pthread_join(adaptor_threads->completion, 0);
+    }else
+        pthread_detach(adaptor_threads->completion);
+    
+    api_epilog(zh,0);
+}
+
+void adaptor_destroy(zhandle_t *zh)
+{
+    struct adaptor_threads *adaptor = zh->adaptor_priv;
+    if(adaptor==0) return;
+    
+    pthread_cond_destroy(&adaptor->cond);
+    pthread_mutex_destroy(&adaptor->lock);
+    pthread_mutex_destroy(&zh->to_process.lock);
+    pthread_mutex_destroy(&zh->to_send.lock);
+    pthread_mutex_destroy(&zh->sent_requests.lock);
+    pthread_cond_destroy(&zh->sent_requests.cond);
+    pthread_mutex_destroy(&zh->completions_to_process.lock);
+    pthread_cond_destroy(&zh->completions_to_process.cond);
+    pthread_mutex_destroy(&adaptor->zh_lock);
+
+    pthread_mutex_destroy(&zh->auth_h.lock);
+
+    close(adaptor->self_pipe[0]);
+    close(adaptor->self_pipe[1]);
+    free(adaptor);
+    zh->adaptor_priv=0;
+}
+
+int wakeup_io_thread(zhandle_t *zh)
+{
+    struct adaptor_threads *adaptor_threads = zh->adaptor_priv;
+    char c=0;
+#ifndef WIN32
+    return write(adaptor_threads->self_pipe[1],&c,1)==1? ZOK: ZSYSTEMERROR;    
+#else
+    return send(adaptor_threads->self_pipe[1], &c, 1, 0)==1? ZOK: ZSYSTEMERROR;    
+#endif         
+}
+
+int adaptor_send_queue(zhandle_t *zh, int timeout)
+{
+    if(!zh->close_requested)
+        return wakeup_io_thread(zh);
+    // don't rely on the IO thread to send the messages if the app has
+    // requested to close 
+    return flush_send_queue(zh, timeout);
+}
+
+/* These two are declared here because we will run the event loop
+ * and not the client */
+#ifdef WIN32
+int zookeeper_interest(zhandle_t *zh, SOCKET *fd, int *interest,
+        struct timeval *tv);
+#else
+int zookeeper_interest(zhandle_t *zh, int *fd, int *interest,
+        struct timeval *tv);
+#endif
+int zookeeper_process(zhandle_t *zh, int events);
+
+#ifdef WIN32
+unsigned __stdcall do_io( void * v)
+#else
+void *do_io(void *v)
+#endif
+{
+    zhandle_t *zh = (zhandle_t*)v;
+#ifndef WIN32
+    struct pollfd fds[2];
+    struct adaptor_threads *adaptor_threads = zh->adaptor_priv;
+
+    api_prolog(zh);
+    notify_thread_ready(zh);
+    LOG_DEBUG(LOGCALLBACK(zh), "started IO thread");
+    fds[0].fd=adaptor_threads->self_pipe[0];
+    fds[0].events=POLLIN;
+    while(!zh->close_requested) {
+        zh->io_count++;
+        struct timeval tv;
+        int fd;
+        int interest;
+        int timeout;
+        int maxfd=1;
+
+        zookeeper_interest(zh, &fd, &interest, &tv);
+        if (fd != -1) {
+            fds[1].fd=fd;
+            fds[1].events=(interest&ZOOKEEPER_READ)?POLLIN:0;
+            fds[1].events|=(interest&ZOOKEEPER_WRITE)?POLLOUT:0;
+            maxfd=2;
+        }
+        timeout=tv.tv_sec * 1000 + (tv.tv_usec/1000);
+        
+        poll(fds,maxfd,timeout);
+        if (fd != -1) {
+            interest=(fds[1].revents&POLLIN)?ZOOKEEPER_READ:0;
+            interest|=((fds[1].revents&POLLOUT)||(fds[1].revents&POLLHUP))?ZOOKEEPER_WRITE:0;
+        }
+        if(fds[0].revents&POLLIN){
+            // flush the pipe
+            char b[128];
+            while(read(adaptor_threads->self_pipe[0],b,sizeof(b))==sizeof(b)){}
+        }        
+#else
+    fd_set rfds, wfds;
+    struct adaptor_threads *adaptor_threads = zh->adaptor_priv;
+    api_prolog(zh);
+    notify_thread_ready(zh);
+    LOG_DEBUG(LOGCALLBACK(zh), "started IO thread");
+    
+    while(!zh->close_requested) {      
+        struct timeval tv;
+        SOCKET fd;
+        int interest;
+        int rc;
+
+        zookeeper_interest(zh, &fd, &interest, &tv);
+
+        // FD_ZERO is cheap on Win32, it just sets count of elements to zero.
+        // It needs to be done to ensure no stale entries.
+        FD_ZERO(&rfds);
+        FD_ZERO(&wfds);
+
+        if (fd != -1) {
+            if (interest&ZOOKEEPER_READ) {
+                FD_SET(fd, &rfds);
+            }
+
+            if (interest&ZOOKEEPER_WRITE) {
+                FD_SET(fd, &wfds);
+            }
+        }
+
+        // Always interested in self_pipe.
+        FD_SET(adaptor_threads->self_pipe[0], &rfds);
+
+        rc = select(/* unused */0, &rfds, &wfds, NULL, &tv);
+        if (rc > 0) {
+            interest=(FD_ISSET(fd, &rfds))? ZOOKEEPER_READ: 0;
+            interest|=(FD_ISSET(fd, &wfds))? ZOOKEEPER_WRITE: 0;
+
+            if (FD_ISSET(adaptor_threads->self_pipe[0], &rfds)){
+                // flush the pipe/socket
+                char b[128];
+                while(recv(adaptor_threads->self_pipe[0],b,sizeof(b), 0)==sizeof(b)){}
+            }
+        }
+        else if (rc < 0) {
+            LOG_ERROR(LOGCALLBACK(zh), ("select() failed %d [%d].", rc, WSAGetLastError()));
+
+            // Clear interest events for zookeeper_process if select() fails.
+            interest = 0;
+        }
+
+#endif
+        // dispatch zookeeper events
+        zookeeper_process(zh, interest);
+        // check the current state of the zhandle and terminate 
+        // if it is_unrecoverable()
+        if(is_unrecoverable(zh))
+            break;
+    }
+    api_epilog(zh, 0);    
+    LOG_DEBUG(LOGCALLBACK(zh), "IO thread terminated");
+    return 0;
+}
+
+#ifdef WIN32
+unsigned __stdcall do_completion( void * v)
+#else
+void *do_completion(void *v)
+#endif
+{
+    zhandle_t *zh = v;
+    api_prolog(zh);
+    notify_thread_ready(zh);
+    LOG_DEBUG(LOGCALLBACK(zh), "started completion thread");
+    while(!zh->close_requested) {
+        pthread_mutex_lock(&zh->completions_to_process.lock);
+        while(!zh->completions_to_process.head && !zh->close_requested) {
+            pthread_cond_wait(&zh->completions_to_process.cond, &zh->completions_to_process.lock);
+        }
+        pthread_mutex_unlock(&zh->completions_to_process.lock);
+        process_completions(zh);
+    }
+    api_epilog(zh, 0);    
+    LOG_DEBUG(LOGCALLBACK(zh), "completion thread terminated");
+    return 0;
+}
+
+int32_t inc_ref_counter(zhandle_t* zh,int i)
+{
+    int incr=(i<0?-1:(i>0?1:0));
+    // fetch_and_add implements atomic post-increment
+    int v=fetch_and_add(&zh->ref_counter,incr);
+    // inc_ref_counter wants pre-increment
+    v+=incr;   // simulate pre-increment
+    return v;
+}
+
+int32_t fetch_and_add(volatile int32_t* operand, int incr)
+{
+#ifndef WIN32
+    return __sync_fetch_and_add(operand, incr);
+#else
+    return InterlockedExchangeAdd(operand, incr);
+#endif
+}
+
+// make sure the static xid is initialized before any threads started
+__attribute__((constructor)) int32_t get_xid()
+{
+    static int32_t xid = -1;
+    if (xid == -1) {
+        xid = time(0);
+    }
+    return fetch_and_add(&xid,1);
+}
+
+int lock_reconfig(struct _zhandle *zh)
+{
+    struct adaptor_threads *adaptor = zh->adaptor_priv;
+    if (adaptor) {
+        return pthread_mutex_lock(&adaptor->reconfig_lock);
+    } else {
+        return 0;
+    }
+}
+int unlock_reconfig(struct _zhandle *zh)
+{
+    struct adaptor_threads *adaptor = zh->adaptor_priv;
+    if (adaptor) {
+        return pthread_mutex_unlock(&adaptor->reconfig_lock);
+    } else {
+        return 0;
+    }
+}
+
+int enter_critical(zhandle_t* zh)
+{
+    struct adaptor_threads *adaptor = zh->adaptor_priv;
+    if (adaptor) {
+        return pthread_mutex_lock(&adaptor->zh_lock);
+    } else {
+        return 0;
+    }
+}
+
+int leave_critical(zhandle_t* zh)
+{
+    struct adaptor_threads *adaptor = zh->adaptor_priv;
+    if (adaptor) {
+        return pthread_mutex_unlock(&adaptor->zh_lock);
+    } else {
+        return 0;
+    }
+}

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/176bd682/zookeeper-client/zookeeper-client-c/src/recordio.c
----------------------------------------------------------------------
diff --git a/zookeeper-client/zookeeper-client-c/src/recordio.c b/zookeeper-client/zookeeper-client-c/src/recordio.c
new file mode 100644
index 0000000..77fae28
--- /dev/null
+++ b/zookeeper-client/zookeeper-client-c/src/recordio.c
@@ -0,0 +1,372 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <recordio.h>
+#include <string.h>
+#include <stdio.h>
+#include <errno.h>
+#include <stdlib.h>
+#ifndef WIN32
+#include <netinet/in.h>
+#else
+#include <winsock2.h> /* for _htonl and _ntohl */
+#endif
+
+void deallocate_String(char **s)
+{
+    if (*s)
+        free(*s);
+    *s = 0;
+}
+
+void deallocate_Buffer(struct buffer *b)
+{
+    if (b->buff)
+        free(b->buff);
+    b->buff = 0;
+}
+
+struct buff_struct {
+    int32_t len;
+    int32_t off;
+    char *buffer;
+};
+
+static int resize_buffer(struct buff_struct *s, int newlen)
+{
+    char *buffer= NULL;
+    while (s->len < newlen) {
+        s->len *= 2;
+    }
+    buffer = (char*)realloc(s->buffer, s->len);
+    if (!buffer) {
+        s->buffer = 0;
+        return -ENOMEM;
+    }
+    s->buffer = buffer;
+    return 0;
+}
+
+int oa_start_record(struct oarchive *oa, const char *tag)
+{
+    return 0;
+}
+int oa_end_record(struct oarchive *oa, const char *tag)
+{
+    return 0;
+}
+int oa_serialize_int(struct oarchive *oa, const char *tag, const int32_t *d)
+{
+    struct buff_struct *priv = oa->priv;
+    int32_t i = htonl(*d);
+    if ((priv->len - priv->off) < sizeof(i)) {
+        int rc = resize_buffer(priv, priv->len + sizeof(i));
+        if (rc < 0) return rc;
+    }
+    memcpy(priv->buffer+priv->off, &i, sizeof(i));
+    priv->off+=sizeof(i);
+    return 0;
+}
+int64_t zoo_htonll(int64_t v)
+{
+    int i = 0;
+    char *s = (char *)&v;
+    if (htonl(1) == 1) {
+        return v;
+    }
+    for (i = 0; i < 4; i++) {
+        int tmp = s[i];
+        s[i] = s[8-i-1];
+        s[8-i-1] = tmp;
+    }
+
+    return v;
+}
+
+int oa_serialize_long(struct oarchive *oa, const char *tag, const int64_t *d)
+{
+    const int64_t i = zoo_htonll(*d);
+    struct buff_struct *priv = oa->priv;
+    if ((priv->len - priv->off) < sizeof(i)) {
+        int rc = resize_buffer(priv, priv->len + sizeof(i));
+        if (rc < 0) return rc;
+    }
+    memcpy(priv->buffer+priv->off, &i, sizeof(i));
+    priv->off+=sizeof(i);
+    return 0;
+}
+int oa_start_vector(struct oarchive *oa, const char *tag, const int32_t *count)
+{
+    return oa_serialize_int(oa, tag, count);
+}
+int oa_end_vector(struct oarchive *oa, const char *tag)
+{
+    return 0;
+}
+int oa_serialize_bool(struct oarchive *oa, const char *name, const int32_t *i)
+{
+    //return oa_serialize_int(oa, name, i);
+    struct buff_struct *priv = oa->priv;
+    if ((priv->len - priv->off) < 1) {
+        int rc = resize_buffer(priv, priv->len + 1);
+        if (rc < 0)
+            return rc;
+    }
+    priv->buffer[priv->off] = (*i == 0 ? '\0' : '\1');
+    priv->off++;
+    return 0;
+}
+static const int32_t negone = -1;
+int oa_serialize_buffer(struct oarchive *oa, const char *name,
+        const struct buffer *b)
+{
+    struct buff_struct *priv = oa->priv;
+    int rc;
+    if (!b) {
+        return oa_serialize_int(oa, "len", &negone);
+    }
+    rc = oa_serialize_int(oa, "len", &b->len);
+    if (rc < 0)
+        return rc;
+    // this means a buffer of NUll 
+    // with size of -1. This is 
+    // waht we use in java serialization for NULL
+    if (b->len == -1) {
+      return rc;
+    }
+    if ((priv->len - priv->off) < b->len) {
+        rc = resize_buffer(priv, priv->len + b->len);
+        if (rc < 0)
+            return rc;
+    }
+    memcpy(priv->buffer+priv->off, b->buff, b->len);
+    priv->off += b->len;
+    return 0;
+}
+int oa_serialize_string(struct oarchive *oa, const char *name, char **s)
+{
+    struct buff_struct *priv = oa->priv;
+    int32_t len;
+    int rc;
+    if (!*s) {
+        oa_serialize_int(oa, "len", &negone);
+        return 0;
+    }
+    len = strlen(*s);
+    rc = oa_serialize_int(oa, "len", &len);
+    if (rc < 0)
+        return rc;
+    if ((priv->len - priv->off) < len) {
+        rc = resize_buffer(priv, priv->len + len);
+        if (rc < 0)
+            return rc;
+    }
+    memcpy(priv->buffer+priv->off, *s, len);
+    priv->off += len;
+    return 0;
+}
+int ia_start_record(struct iarchive *ia, const char *tag)
+{
+    return 0;
+}
+int ia_end_record(struct iarchive *ia, const char *tag)
+{
+    return 0;
+}
+int ia_deserialize_int(struct iarchive *ia, const char *tag, int32_t *count)
+{
+    struct buff_struct *priv = ia->priv;
+    if ((priv->len - priv->off) < sizeof(*count)) {
+        return -E2BIG;
+    }
+    memcpy(count, priv->buffer+priv->off, sizeof(*count));
+    priv->off+=sizeof(*count);
+    *count = ntohl(*count);
+    return 0;
+}
+
+int ia_deserialize_long(struct iarchive *ia, const char *tag, int64_t *count)
+{
+    struct buff_struct *priv = ia->priv;
+    int64_t v = 0;
+    if ((priv->len - priv->off) < sizeof(*count)) {
+        return -E2BIG;
+    }
+    memcpy(count, priv->buffer+priv->off, sizeof(*count));
+    priv->off+=sizeof(*count);
+    v = zoo_htonll(*count); // htonll and  ntohll do the same
+    *count = v;
+    return 0;
+}
+int ia_start_vector(struct iarchive *ia, const char *tag, int32_t *count)
+{
+    return ia_deserialize_int(ia, tag, count);
+}
+int ia_end_vector(struct iarchive *ia, const char *tag)
+{
+    return 0;
+}
+int ia_deserialize_bool(struct iarchive *ia, const char *name, int32_t *v)
+{
+    struct buff_struct *priv = ia->priv;
+    //fprintf(stderr, "Deserializing bool %d\n", priv->off);
+    //return ia_deserialize_int(ia, name, v);
+    if ((priv->len - priv->off) < 1) {
+        return -E2BIG;
+    }
+    *v = priv->buffer[priv->off];
+    priv->off+=1;
+    //fprintf(stderr, "Deserializing bool end %d\n", priv->off);
+    return 0;
+}
+int ia_deserialize_buffer(struct iarchive *ia, const char *name,
+        struct buffer *b)
+{
+    struct buff_struct *priv = ia->priv;
+    int rc = ia_deserialize_int(ia, "len", &b->len);
+    if (rc < 0)
+        return rc;
+    if ((priv->len - priv->off) < b->len) {
+        return -E2BIG;
+    }
+    // set the buffer to null
+    if (b->len == -1) {
+       b->buff = NULL;
+       return rc;
+    }
+    b->buff = malloc(b->len);
+    if (!b->buff) {
+        return -ENOMEM;
+    }
+    memcpy(b->buff, priv->buffer+priv->off, b->len);
+    priv->off += b->len;
+    return 0;
+}
+int ia_deserialize_string(struct iarchive *ia, const char *name, char **s)
+{
+    struct buff_struct *priv = ia->priv;
+    int32_t len;
+    int rc = ia_deserialize_int(ia, "len", &len);
+    if (rc < 0)
+        return rc;
+    if ((priv->len - priv->off) < len) {
+        return -E2BIG;
+    }
+    if (len < 0) {
+        return -EINVAL;
+    }
+    *s = malloc(len+1);
+    if (!*s) {
+        return -ENOMEM;
+    }
+    memcpy(*s, priv->buffer+priv->off, len);
+    (*s)[len] = '\0';
+    priv->off += len;
+    return 0;
+}
+
+static struct iarchive ia_default = {
+        ia_start_record,
+        ia_end_record,
+        ia_start_vector,
+        ia_end_vector,
+        ia_deserialize_bool,
+        ia_deserialize_int,
+        ia_deserialize_long ,
+        ia_deserialize_buffer,
+        ia_deserialize_string};
+
+static struct oarchive oa_default = {
+        oa_start_record,
+        oa_end_record,
+        oa_start_vector,
+        oa_end_vector,
+        oa_serialize_bool,
+        oa_serialize_int,
+        oa_serialize_long ,
+        oa_serialize_buffer,
+        oa_serialize_string};
+
+struct iarchive *create_buffer_iarchive(char *buffer, int len)
+{
+    struct iarchive *ia;
+    struct buff_struct *buff;
+    ia = malloc(sizeof(*ia));
+    if (!ia) return 0;
+    buff = malloc(sizeof(struct buff_struct));
+    if (!buff) {
+        free(ia);
+        return 0;
+    }
+    *ia = ia_default;
+    buff->off = 0;
+    buff->buffer = buffer;
+    buff->len = len;
+    ia->priv = buff;
+    return ia;
+}
+
+struct oarchive *create_buffer_oarchive()
+{
+    struct oarchive *oa;
+    struct buff_struct *buff;
+    oa = malloc(sizeof(*oa));
+    if (!oa) return 0;
+    buff = malloc(sizeof(struct buff_struct));
+    if (!buff) {
+        free(oa);
+        return 0;
+    }
+    *oa = oa_default;
+    buff->off = 0;
+    buff->buffer = malloc(128);
+    buff->len = 128;
+    oa->priv = buff;
+    return oa;
+}
+
+void close_buffer_iarchive(struct iarchive **ia)
+{
+    free((*ia)->priv);
+    free(*ia);
+    *ia = 0;
+}
+
+void close_buffer_oarchive(struct oarchive **oa, int free_buffer)
+{
+    if (free_buffer) {
+        struct buff_struct *buff = (struct buff_struct *)(*oa)->priv;
+        if (buff->buffer) {
+            free(buff->buffer);
+        }
+    }
+    free((*oa)->priv);
+    free(*oa);
+    *oa = 0;
+}
+
+char *get_buffer(struct oarchive *oa)
+{
+    struct buff_struct *buff = oa->priv;
+    return buff->buffer;
+}
+int get_buffer_len(struct oarchive *oa)
+{
+    struct buff_struct *buff = oa->priv;
+    return buff->off;
+}

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/176bd682/zookeeper-client/zookeeper-client-c/src/st_adaptor.c
----------------------------------------------------------------------
diff --git a/zookeeper-client/zookeeper-client-c/src/st_adaptor.c b/zookeeper-client/zookeeper-client-c/src/st_adaptor.c
new file mode 100644
index 0000000..5e9a4ff
--- /dev/null
+++ b/zookeeper-client/zookeeper-client-c/src/st_adaptor.c
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef DLL_EXPORT
+#  define USE_STATIC_LIB
+#endif
+
+#include "zk_adaptor.h"
+#include <stdlib.h>
+#include <time.h>
+
+int zoo_lock_auth(zhandle_t *zh)
+{
+    return 0;
+}
+int zoo_unlock_auth(zhandle_t *zh)
+{
+    return 0;
+}
+int lock_buffer_list(buffer_head_t *l)
+{
+    return 0;
+}
+int unlock_buffer_list(buffer_head_t *l)
+{
+    return 0;
+}
+int lock_completion_list(completion_head_t *l)
+{
+    return 0;
+}
+int unlock_completion_list(completion_head_t *l)
+{
+    return 0;
+}
+int process_async(int outstanding_sync)
+{
+    return outstanding_sync == 0;
+}
+
+int adaptor_init(zhandle_t *zh)
+{
+    return 0;
+}
+
+void adaptor_finish(zhandle_t *zh){}
+
+void adaptor_destroy(zhandle_t *zh){}
+
+int flush_send_queue(zhandle_t *, int);
+
+int adaptor_send_queue(zhandle_t *zh, int timeout)
+{
+    return flush_send_queue(zh, timeout);
+}
+
+int32_t inc_ref_counter(zhandle_t* zh,int i)
+{
+    zh->ref_counter+=(i<0?-1:(i>0?1:0));
+    return zh->ref_counter;
+}
+
+int32_t get_xid()
+{
+    static int32_t xid = -1;
+    if (xid == -1) {
+        xid = time(0);
+    }
+    return xid++;
+}
+
+int lock_reconfig(struct _zhandle *zh)
+{
+    return 0;
+}
+
+int unlock_reconfig(struct _zhandle *zh)
+{
+    return 0;
+}
+
+int enter_critical(zhandle_t* zh)
+{
+    return 0;
+}
+
+int leave_critical(zhandle_t* zh)
+{
+    return 0;
+}

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/176bd682/zookeeper-client/zookeeper-client-c/src/winport.c
----------------------------------------------------------------------
diff --git a/zookeeper-client/zookeeper-client-c/src/winport.c b/zookeeper-client/zookeeper-client-c/src/winport.c
new file mode 100644
index 0000000..d40614c
--- /dev/null
+++ b/zookeeper-client/zookeeper-client-c/src/winport.c
@@ -0,0 +1,307 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifdef WIN32
+#include "winport.h"
+#include <stdlib.h>
+#include <stdint.h> /* for int64_t */
+#include <winsock2.h> /* must always be included before ws2tcpip.h */
+#include <ws2tcpip.h> /* for SOCKET */
+
+int pthread_mutex_lock(pthread_mutex_t* _mutex ){      
+       int rc = WaitForSingleObject( *_mutex,    // handle to mutex
+            INFINITE);  // no time-out interval
+       return ((rc == WAIT_OBJECT_0) ? 0: rc); 
+}
+
+int pthread_mutex_unlock( pthread_mutex_t* _mutex ){   
+       int rc = ReleaseMutex(*_mutex);
+       return ((rc != 0)? 0: GetLastError());  
+}
+
+int pthread_mutex_init(pthread_mutex_t* _mutex, void* ignoredAttr){
+       //use CreateMutex as we are using the HANDLES in pthread_cond
+       *_mutex = CreateMutex( 
+        NULL,              // default security attributes
+        FALSE,             // initially not owned
+        NULL);             // unnamed mutex    
+       return ((*_mutex == NULL) ? GetLastError() : 0);
+}
+
+int pthread_mutex_destroy(pthread_mutex_t* _mutex)
+{
+       int rc = CloseHandle(*_mutex);
+       return ((rc != 0)? 0: GetLastError());  
+}
+
+int pthread_create(pthread_t *thread, const pthread_attr_t *attr, unsigned  (__stdcall* start_routine)(void* a), void *arg)
+{
+   int _intThreadId; 
+   (*thread).thread_handle = (HANDLE)_beginthreadex( NULL, 0, start_routine , arg, 0, (unsigned int*)&_intThreadId );
+   (*thread).thread_id = _intThreadId;
+   return (((*thread).thread_handle == 0 ) ? errno : 0 );         
+}
+
+
+int pthread_equal(pthread_t t1, pthread_t t2){
+//Is there a better way to do this? GetThreadId(handle) is only supported Windows 2003 n above.
+       return ((t1.thread_id == t2.thread_id) ? 1:0);              
+}
+
+pthread_t pthread_self(){
+    pthread_t thread_self;
+    thread_self.thread_handle = GetCurrentThread();
+    thread_self.thread_id     = GetCurrentThreadId();
+    return thread_self;
+}
+
+int pthread_join(pthread_t _thread, void** ignore)
+{
+       int rc = WaitForSingleObject( _thread.thread_handle, INFINITE );
+       return ((rc == WAIT_OBJECT_0) ? 0: rc); 
+}
+
+int pthread_detach(pthread_t _thread)
+{
+       int rc = CloseHandle(_thread.thread_handle) ;
+       return  (rc != 0) ? 0: GetLastError();
+}
+
+void pthread_mutexattr_init(pthread_mutexattr_t* ignore){}
+void pthread_mutexattr_settype(pthread_mutexattr_t* ingore_attr, int ignore){}
+void pthread_mutexattr_destroy(pthread_mutexattr_t* ignore_attr){}
+       
+int 
+pthread_cond_init (pthread_cond_t *cv,
+                   const pthread_condattr_t * ignore)
+{
+  cv->waiters_count_ = 0;
+  cv->was_broadcast_ = 0;
+  cv->sema_ = CreateSemaphore (NULL,       // no security
+                                0,          // initially 0
+                                0x7fffffff, // max count
+                                NULL);      // unnamed 
+  if (cv->sema_ == NULL ) 
+               return GetLastError();
+  InitializeCriticalSection (&cv->waiters_count_lock_);
+  cv->waiters_done_ = CreateEvent (NULL,  // no security
+                                   FALSE, // auto-reset
+                                   FALSE, // non-signaled initially
+                                   NULL); // unnamed
+  return (cv->waiters_done_ == NULL) ? GetLastError() : 0;
+       
+}
+
+
+int pthread_cond_destroy(pthread_cond_t *cond)
+{
+       CloseHandle( cond->sema_);
+       DeleteCriticalSection(&cond->waiters_count_lock_);
+       return (CloseHandle( cond->waiters_done_ ) == 0)? GetLastError(): 0 ;
+}
+
+
+int
+pthread_cond_signal (pthread_cond_t *cv)
+{
+  int have_waiters;
+  EnterCriticalSection (& (cv->waiters_count_lock_));
+  have_waiters = cv->waiters_count_ > 0;
+  LeaveCriticalSection (&cv->waiters_count_lock_);
+
+  // If there aren't any waiters, then this is a no-op.  
+  if (have_waiters){
+         return (ReleaseSemaphore (cv->sema_, 1, 0) == 0 )  ? GetLastError() : 0 ;
+  }else
+         return 0;
+}
+
+
+int
+pthread_cond_broadcast (pthread_cond_t *cv)
+{
+  // This is needed to ensure that <waiters_count_> and <was_broadcast_> are
+  // consistent relative to each other.
+  int have_waiters = 0;
+  EnterCriticalSection (&cv->waiters_count_lock_);
+  
+  if (cv->waiters_count_ > 0) {
+    // We are broadcasting, even if there is just one waiter...
+    // Record that we are broadcasting, which helps optimize
+    // <pthread_cond_wait> for the non-broadcast case.
+    cv->was_broadcast_ = 1;
+    have_waiters = 1;
+  }
+
+  if (have_waiters) {
+    // Wake up all the waiters atomically.
+    ReleaseSemaphore (cv->sema_, cv->waiters_count_, 0);
+
+    LeaveCriticalSection (&cv->waiters_count_lock_);
+
+    // Wait for all the awakened threads to acquire the counting
+    // semaphore. 
+    WaitForSingleObject (cv->waiters_done_, INFINITE);
+    // This assignment is okay, even without the <waiters_count_lock_> held 
+    // because no other waiter threads can wake up to access it.
+    cv->was_broadcast_ = 0;
+  }
+  else
+    LeaveCriticalSection (&cv->waiters_count_lock_);
+}
+
+
+int
+pthread_cond_wait (pthread_cond_t *cv, 
+                   pthread_mutex_t *external_mutex)
+{
+  int last_waiter;
+  // Avoid race conditions.
+  EnterCriticalSection (&cv->waiters_count_lock_);
+  cv->waiters_count_++;
+  LeaveCriticalSection (&cv->waiters_count_lock_);
+
+  // This call atomically releases the mutex and waits on the
+  // semaphore until <pthread_cond_signal> or <pthread_cond_broadcast>
+  // are called by another thread.
+  SignalObjectAndWait (*external_mutex, cv->sema_, INFINITE, FALSE);
+
+  // Reacquire lock to avoid race conditions.
+  EnterCriticalSection (&cv->waiters_count_lock_);
+
+  // We're no longer waiting...
+  cv->waiters_count_--;
+
+  // Check to see if we're the last waiter after <pthread_cond_broadcast>.
+  last_waiter = cv->was_broadcast_ && cv->waiters_count_ == 0;
+
+  LeaveCriticalSection (&cv->waiters_count_lock_);
+
+  // If we're the last waiter thread during this particular broadcast
+  // then let all the other threads proceed.
+  if (last_waiter)
+    // This call atomically signals the <waiters_done_> event and waits until
+    // it can acquire the <external_mutex>.  This is required to ensure fairness. 
+    SignalObjectAndWait (cv->waiters_done_, *external_mutex, INFINITE, FALSE);
+  else
+    // Always regain the external mutex since that's the guarantee we
+    // give to our callers. 
+    WaitForSingleObject (*external_mutex, INFINITE);
+}
+
+int pthread_key_create(pthread_key_t *key, void (*destructor)(void *) )
+{
+  int result = 0;
+  pthread_key_t* newkey;
+
+  if ((newkey = (pthread_key_t*) calloc (1, sizeof (pthread_key_t))) == NULL)
+    {
+      result = ENOMEM;
+    }
+  else if ((newkey->key = TlsAlloc ()) == TLS_OUT_OF_INDEXES)
+    {
+      result = EAGAIN;
+      free (newkey);
+      newkey = NULL;
+    }
+  else if (destructor != NULL)
+    {
+      //--we have to store the function pointer for destructor, so that we can call it 
+         //--to free up the user allocated storage--       
+      newkey->destructor = destructor;
+    }
+  key = newkey;  
+  return (result);     
+}
+
+int pthread_key_delete(pthread_key_t key)
+{
+  int rc = 0;
+  LPVOID lpvData =  TlsGetValue(key.key);
+  rc = TlsFree (key.key);
+  rc = (rc != 0 ) ? 0 : GetLastError();
+  if (key.destructor != NULL && lpvData != 0){
+       key.destructor(lpvData);         //we take control of calling destructor, instead of calling it on thread exit.
+  }
+  free (&key);
+  return (rc);
+}
+
+void *pthread_getspecific(pthread_key_t key)
+{
+       LPVOID lpvData =  TlsGetValue(key.key);
+       if ((lpvData == 0) && (GetLastError() != ERROR_SUCCESS)) 
+               return NULL;
+       else 
+               return lpvData;
+}
+
+int pthread_setspecific(pthread_key_t key, const void *value)
+{
+       int rc = TlsSetValue (key.key, value);
+       return ((rc != 0 ) ? 0 : GetLastError());
+}
+
+int gettimeofday(struct timeval *tp, void *tzp) {
+        int64_t now = 0;
+        if (tzp != 0) { errno = EINVAL; return -1; }
+        GetSystemTimeAsFileTime( (LPFILETIME)&now );
+        tp->tv_sec = (long)(now / 10000000 - 11644473600LL);
+        tp->tv_usec = (now / 10) % 1000000;
+        return 0;
+}
+
+int close(SOCKET fd) {
+        return closesocket(fd);
+}
+
+int Win32WSAStartup()
+{
+       WORD    wVersionRq;
+       WSADATA wsaData;
+       int             err;
+
+       wVersionRq = MAKEWORD(2,0);
+       err = WSAStartup(wVersionRq, &wsaData);
+       if (err != 0)
+               return 1;
+       
+       // confirm the version information
+       if ((LOBYTE(wsaData.wVersion) != 2) ||
+           (HIBYTE(wsaData.wVersion) != 0))
+       {
+               Win32WSACleanup();              
+               return 1;
+       }
+       return 0;
+}
+
+void Win32WSACleanup()
+{
+       WSACleanup();
+}
+
+double drand48(void)
+{
+    return (double)(rand()) / RAND_MAX;
+}
+
+#endif //WIN32
+
+
+

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/176bd682/zookeeper-client/zookeeper-client-c/src/winport.h
----------------------------------------------------------------------
diff --git a/zookeeper-client/zookeeper-client-c/src/winport.h b/zookeeper-client/zookeeper-client-c/src/winport.h
new file mode 100644
index 0000000..d216f7f
--- /dev/null
+++ b/zookeeper-client/zookeeper-client-c/src/winport.h
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * This header file is to port pthread lib , sockets and other utility methods on windows.
+ * Specifically the threads function, mutexes, keys, and socket initialization.
+ */
+
+#ifndef WINPORT_H_
+#define WINPORT_H_
+
+#ifdef WIN32
+#include "winconfig.h"
+
+#define _WINSOCK_DEPRECATED_NO_WARNINGS
+#include <winsock2.h> /* must always be included before ws2tcpip.h */
+#include <ws2tcpip.h> /* for struct sock_addr used in zookeeper.h */
+
+/* POSIX names are deprecated, use ISO conformant names instead. */
+#define strdup _strdup
+#define getcwd _getcwd
+#define getpid _getpid
+
+/* Windows "secure" versions of POSIX reentrant functions */
+#define strtok_r strtok_s
+#define localtime_r(a,b) localtime_s(b,a)
+
+/* After this version of MSVC, snprintf became a defined function,
+   and so cannot be redefined, nor can #ifndef be used to guard it. */
+#if ((defined(_MSC_VER) && _MSC_VER < 1900) || !defined(_MSC_VER))
+#define snprintf _snprintf
+#endif
+
+
+#include <errno.h>
+#include <process.h>
+#include <stdint.h> /* for int64_t */
+#include <stdlib.h>
+#include <malloc.h>
+
+
+typedef int ssize_t;
+typedef HANDLE pthread_mutex_t;
+
+struct pthread_t_
+{
+  HANDLE thread_handle;
+  DWORD  thread_id;
+};
+
+typedef struct pthread_t_ pthread_t;
+typedef int pthread_mutexattr_t;       
+typedef int pthread_condattr_t;        
+typedef int pthread_attr_t; 
+#define PTHREAD_MUTEX_RECURSIVE 0
+
+int pthread_mutex_lock(pthread_mutex_t* _mutex );
+int pthread_mutex_unlock( pthread_mutex_t* _mutex );
+int pthread_mutex_init(pthread_mutex_t* _mutex, void* ignoredAttr);
+int pthread_mutex_destroy(pthread_mutex_t* _mutex);
+int pthread_create(pthread_t *thread, const pthread_attr_t *attr, unsigned  (__stdcall* start_routine)(void* a), void *arg);
+int pthread_equal(pthread_t t1, pthread_t t2);
+pthread_t pthread_self();
+int pthread_join(pthread_t _thread, void** ignore);
+int pthread_detach(pthread_t _thread);
+
+void pthread_mutexattr_init(pthread_mutexattr_t* ignore);
+void pthread_mutexattr_settype(pthread_mutexattr_t* ingore_attr, int ignore);
+void pthread_mutexattr_destroy(pthread_mutexattr_t* ignore_attr);
+
+
+// http://www.cs.wustl.edu/~schmidt/win32-cv-1.html
+ 
+typedef struct 
+{
+       int waiters_count_;
+    // Number of waiting threads.
+
+    CRITICAL_SECTION waiters_count_lock_;
+    // Serialize access to <waiters_count_>.
+
+    HANDLE sema_;
+       // Semaphore used to queue up threads waiting for the condition to
+    // become signaled. 
+
+    HANDLE waiters_done_;
+    // An auto-reset event used by the broadcast/signal thread to wait
+    // for all the waiting thread(s) to wake up and be released from the
+    // semaphore. 
+
+    size_t was_broadcast_;
+    // Keeps track of whether we were broadcasting or signaling.  This
+    // allows us to optimize the code if we're just signaling.
+}pthread_cond_t;
+       
+int pthread_cond_init (pthread_cond_t *cv,const pthread_condattr_t * ignore);
+int pthread_cond_destroy(pthread_cond_t *cond);
+int pthread_cond_signal (pthread_cond_t *cv);
+int pthread_cond_broadcast (pthread_cond_t *cv);
+int pthread_cond_wait (pthread_cond_t *cv, pthread_mutex_t *external_mutex);
+
+
+struct pthread_key_t_
+{
+  DWORD key;
+  void (*destructor) (void *);  
+};
+
+typedef struct pthread_key_t_ pthread_key_t;
+int pthread_key_create(pthread_key_t *key, void (*destructor)(void *) );
+int pthread_key_delete(pthread_key_t key);
+void *pthread_getspecific(pthread_key_t key);
+int pthread_setspecific(pthread_key_t key, const void *value);
+
+int gettimeofday(struct timeval *tp, void *tzp);
+int close(SOCKET fd);
+int Win32WSAStartup();
+void Win32WSACleanup();
+double drand48(void);
+#endif //WIN32
+
+
+
+#endif //WINPORT_H_

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/176bd682/zookeeper-client/zookeeper-client-c/src/zk_adaptor.h
----------------------------------------------------------------------
diff --git a/zookeeper-client/zookeeper-client-c/src/zk_adaptor.h b/zookeeper-client/zookeeper-client-c/src/zk_adaptor.h
new file mode 100644
index 0000000..97995e3
--- /dev/null
+++ b/zookeeper-client/zookeeper-client-c/src/zk_adaptor.h
@@ -0,0 +1,325 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ZK_ADAPTOR_H_
+#define ZK_ADAPTOR_H_
+#include <zookeeper.jute.h>
+#ifdef THREADED
+#ifndef WIN32
+#include <pthread.h>
+#else
+#include "winport.h"
+#endif
+#endif
+#include "zookeeper.h"
+#include "zk_hashtable.h"
+#include "addrvec.h"
+
+/* predefined xid's values recognized as special by the server */
+#define WATCHER_EVENT_XID -1 
+#define PING_XID -2
+#define AUTH_XID -4
+#define SET_WATCHES_XID -8
+
+/* zookeeper state constants */
+#define EXPIRED_SESSION_STATE_DEF -112
+#define AUTH_FAILED_STATE_DEF -113
+#define CONNECTING_STATE_DEF 1
+#define ASSOCIATING_STATE_DEF 2
+#define CONNECTED_STATE_DEF 3
+#define READONLY_STATE_DEF 5
+#define NOTCONNECTED_STATE_DEF 999
+
+/* zookeeper event type constants */
+#define CREATED_EVENT_DEF 1
+#define DELETED_EVENT_DEF 2
+#define CHANGED_EVENT_DEF 3
+#define CHILD_EVENT_DEF 4
+#define SESSION_EVENT_DEF -1
+#define NOTWATCHING_EVENT_DEF -2
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+struct _buffer_list;
+struct _completion_list;
+
+typedef struct _buffer_head {
+    struct _buffer_list *volatile head;
+    struct _buffer_list *last;
+#ifdef THREADED
+    pthread_mutex_t lock;
+#endif
+} buffer_head_t;
+
+typedef struct _completion_head {
+    struct _completion_list *volatile head;
+    struct _completion_list *last;
+#ifdef THREADED
+    pthread_cond_t cond;
+    pthread_mutex_t lock;
+#endif
+} completion_head_t;
+
+int lock_buffer_list(buffer_head_t *l);
+int unlock_buffer_list(buffer_head_t *l);
+int lock_completion_list(completion_head_t *l);
+int unlock_completion_list(completion_head_t *l);
+
+struct sync_completion {
+    int rc;
+    union {
+        struct {
+            char *str;
+            int str_len;
+        } str;
+        struct Stat stat;
+        struct {
+            char *buffer;
+            int buff_len;
+            struct Stat stat;
+        } data;
+        struct {
+            struct ACL_vector acl;
+            struct Stat stat;
+        } acl;
+        struct String_vector strs2;
+        struct {
+            struct String_vector strs2;
+            struct Stat stat2;
+        } strs_stat;
+    } u;
+    int complete;
+#ifdef THREADED
+    pthread_cond_t cond;
+    pthread_mutex_t lock;
+#endif
+};
+
+typedef struct _auth_info {
+    int state; /* 0=>inactive, >0 => active */
+    char* scheme;
+    struct buffer auth;
+    void_completion_t completion;
+    const char* data;
+    struct _auth_info *next;
+} auth_info;
+
+/**
+ * This structure represents a packet being read or written.
+ */
+typedef struct _buffer_list {
+    char *buffer;
+    int len; /* This represents the length of sizeof(header) + length of buffer */
+    int curr_offset; /* This is the offset into the header followed by offset into the buffer */
+    struct _buffer_list *next;
+} buffer_list_t;
+
+/* the size of connect request */
+#define HANDSHAKE_REQ_SIZE 45
+/* connect request */
+struct connect_req {
+    int32_t protocolVersion;
+    int64_t lastZxidSeen;
+    int32_t timeOut;
+    int64_t sessionId;
+    int32_t passwd_len;
+    char passwd[16];
+    char readOnly;
+};
+
+/* the connect response */
+struct prime_struct {
+    int32_t len;
+    int32_t protocolVersion;
+    int32_t timeOut;
+    int64_t sessionId;
+    int32_t passwd_len;
+    char passwd[16];
+    char readOnly;
+}; 
+
+#ifdef THREADED
+/* this is used by mt_adaptor internally for thread management */
+struct adaptor_threads {
+     pthread_t io;
+     pthread_t completion;
+     int threadsToWait;             // barrier
+     pthread_cond_t cond;           // barrier's conditional
+     pthread_mutex_t lock;          // ... and a lock
+     pthread_mutex_t zh_lock;       // critical section lock
+     pthread_mutex_t reconfig_lock; // lock for reconfiguring cluster's ensemble
+#ifdef WIN32
+     SOCKET self_pipe[2];
+#else
+     int self_pipe[2];
+#endif
+};
+#endif
+
+/** the auth list for adding auth */
+typedef struct _auth_list_head {
+     auth_info *auth;
+#ifdef THREADED
+     pthread_mutex_t lock;
+#endif
+} auth_list_head_t;
+
+/**
+ * This structure represents the connection to zookeeper.
+ */
+struct _zhandle {
+#ifdef WIN32
+    SOCKET fd;                          // the descriptor used to talk to zookeeper
+#else
+    int fd;                             // the descriptor used to talk to zookeeper
+#endif
+
+    // Hostlist and list of addresses
+    char *hostname;                     // hostname contains list of zookeeper servers to connect to
+    struct sockaddr_storage addr_cur;   // address of server we're currently connecting/connected to 
+    struct sockaddr_storage addr_rw_server; // address of last known read/write server found.
+
+    addrvec_t addrs;                    // current list of addresses we're connected to
+    addrvec_t addrs_old;                // old list of addresses that we are no longer connected to
+    addrvec_t addrs_new;                // new list of addresses to connect to if we're reconfiguring
+
+    int reconfig;                       // Are we in the process of reconfiguring cluster's ensemble
+    double pOld, pNew;                  // Probability for selecting between 'addrs_old' and 'addrs_new'
+    int delay;
+    int disable_reconnection_attempt;   // When set, client will not try reconnect to a different server in
+                                        // server list. This makes a sticky server for client, and is useful
+                                        // for testing if a sticky server is required, or if client wants to
+                                        // explicitly shuffle server by calling zoo_cycle_next_server.
+                                        // The default value is 0.
+
+    watcher_fn watcher;                 // the registered watcher
+
+    // Message timings
+    struct timeval last_recv;           // time last message was received
+    struct timeval last_send;           // time last message was sent
+    struct timeval last_ping;           // time last PING was sent
+    struct timeval next_deadline;       // time of the next deadline
+    int recv_timeout;                   // max receive timeout for messages from server
+
+    // Buffers
+    buffer_list_t *input_buffer;        // current buffer being read in
+    buffer_head_t to_process;           // buffers that have been read and ready to be processed
+    buffer_head_t to_send;              // packets queued to send
+    completion_head_t sent_requests;    // outstanding requests
+    completion_head_t completions_to_process; // completions that are ready to run
+    int outstanding_sync;               // number of outstanding synchronous requests
+
+    /* read-only mode specific fields */
+    struct timeval last_ping_rw; /* The last time we checked server for being r/w */
+    int ping_rw_timeout; /* The time that can go by before checking next server */
+
+    // State info
+    volatile int state;                 // Current zookeeper state
+    void *context;                      // client-side provided context
+    clientid_t client_id;               // client-id
+    long long last_zxid;                // last zookeeper ID
+    auth_list_head_t auth_h;            // authentication data list
+    log_callback_fn log_callback;       // Callback for logging (falls back to logging to stderr)
+    int io_count;			// counts the number of iterations of do_io
+
+    // Primer storage
+    struct _buffer_list primer_buffer;  // The buffer used for the handshake at the start of a connection
+    struct prime_struct primer_storage; // the connect response
+    char primer_storage_buffer[41];     // the true size of primer_storage
+
+    /* zookeeper_close is not reentrant because it de-allocates the zhandler. 
+     * This guard variable is used to defer the destruction of zhandle till 
+     * right before top-level API call returns to the caller */
+    int32_t ref_counter;
+    volatile int close_requested;
+    void *adaptor_priv;
+
+    /* Used for debugging only: non-zero value indicates the time when the zookeeper_process
+     * call returned while there was at least one unprocessed server response 
+     * available in the socket recv buffer */
+    struct timeval socket_readable;
+
+    // Watchers
+    zk_hashtable* active_node_watchers;   
+    zk_hashtable* active_exist_watchers;
+    zk_hashtable* active_child_watchers;
+
+    /** used for chroot path at the client side **/
+    char *chroot;
+
+    /** Indicates if this client is allowed to go to r/o mode */
+    char allow_read_only;
+    /** Indicates if we connected to a majority server before */
+    char seen_rw_server_before;
+};
+
+
+int adaptor_init(zhandle_t *zh);
+void adaptor_finish(zhandle_t *zh);
+void adaptor_destroy(zhandle_t *zh);
+#if THREADED
+struct sync_completion *alloc_sync_completion(void);
+int wait_sync_completion(struct sync_completion *sc);
+void free_sync_completion(struct sync_completion *sc);
+void notify_sync_completion(struct sync_completion *sc);
+#endif
+int adaptor_send_queue(zhandle_t *zh, int timeout);
+int process_async(int outstanding_sync);
+void process_completions(zhandle_t *zh);
+int flush_send_queue(zhandle_t*zh, int timeout);
+char* sub_string(zhandle_t *zh, const char* server_path);
+void free_duplicate_path(const char* free_path, const char* path);
+int zoo_lock_auth(zhandle_t *zh);
+int zoo_unlock_auth(zhandle_t *zh);
+
+// ensemble reconfigure access guards
+int lock_reconfig(struct _zhandle *zh);
+int unlock_reconfig(struct _zhandle *zh);
+
+// critical section guards
+int enter_critical(zhandle_t* zh);
+int leave_critical(zhandle_t* zh);
+
+// zhandle object reference counting
+void api_prolog(zhandle_t* zh);
+int api_epilog(zhandle_t *zh, int rc);
+int32_t get_xid();
+
+// returns the new value of the ref counter
+int32_t inc_ref_counter(zhandle_t* zh,int i);
+
+#ifdef THREADED
+// atomic post-increment
+int32_t fetch_and_add(volatile int32_t* operand, int incr);
+// in mt mode process session event asynchronously by the completion thread
+#define PROCESS_SESSION_EVENT(zh,newstate) queue_session_event(zh,newstate)
+#else
+// in single-threaded mode process session event immediately
+//#define PROCESS_SESSION_EVENT(zh,newstate) deliverWatchers(zh,ZOO_SESSION_EVENT,newstate,0)
+#define PROCESS_SESSION_EVENT(zh,newstate) queue_session_event(zh,newstate)
+#endif
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /*ZK_ADAPTOR_H_*/
+
+

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/176bd682/zookeeper-client/zookeeper-client-c/src/zk_hashtable.c
----------------------------------------------------------------------
diff --git a/zookeeper-client/zookeeper-client-c/src/zk_hashtable.c b/zookeeper-client/zookeeper-client-c/src/zk_hashtable.c
new file mode 100644
index 0000000..9d858e2
--- /dev/null
+++ b/zookeeper-client/zookeeper-client-c/src/zk_hashtable.c
@@ -0,0 +1,476 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "zk_hashtable.h"
+#include "zk_adaptor.h"
+#include "hashtable/hashtable.h"
+#include "hashtable/hashtable_itr.h"
+#include <string.h>
+#include <stdlib.h>
+#include <assert.h>
+
+typedef struct _watcher_object {
+    watcher_fn watcher;
+    void* context;
+    struct _watcher_object* next;
+} watcher_object_t;
+
+
+struct _zk_hashtable {
+    struct hashtable* ht;
+};
+
+struct watcher_object_list {
+    watcher_object_t* head;
+};
+
+/* the following functions are for testing only */
+typedef struct hashtable hashtable_impl;
+
+hashtable_impl* getImpl(zk_hashtable* ht){
+    return ht->ht;
+}
+
+watcher_object_t* getFirstWatcher(zk_hashtable* ht,const char* path)
+{
+    watcher_object_list_t* wl=hashtable_search(ht->ht,(void*)path);
+    if(wl!=0)
+        return wl->head;
+    return 0;
+}
+/* end of testing functions */
+
+watcher_object_t* clone_watcher_object(watcher_object_t* wo)
+{
+    watcher_object_t* res=calloc(1,sizeof(watcher_object_t));
+    assert(res);
+    res->watcher=wo->watcher;
+    res->context=wo->context;
+    return res;
+}
+
+static unsigned int string_hash_djb2(void *str) 
+{
+    unsigned int hash = 5381;
+    int c;
+    const char* cstr = (const char*)str;
+    while ((c = *cstr++))
+        hash = ((hash << 5) + hash) + c; /* hash * 33 + c */
+
+    return hash;
+}
+
+static int string_equal(void *key1,void *key2)
+{
+    return strcmp((const char*)key1,(const char*)key2)==0;
+}
+
+static watcher_object_t* create_watcher_object(watcher_fn watcher,void* ctx)
+{
+    watcher_object_t* wo=calloc(1,sizeof(watcher_object_t));
+    assert(wo);
+    wo->watcher=watcher;
+    wo->context=ctx;
+    return wo;
+}
+
+static watcher_object_list_t* create_watcher_object_list(watcher_object_t* head) 
+{
+    watcher_object_list_t* wl=calloc(1,sizeof(watcher_object_list_t));
+    assert(wl);
+    wl->head=head;
+    return wl;
+}
+
+static void destroy_watcher_object_list(watcher_object_list_t* list)
+{
+    watcher_object_t* e = NULL;
+
+    if(list==0)
+        return;
+    e=list->head;
+    while(e!=0){
+        watcher_object_t* this=e;
+        e=e->next;
+        free(this);
+    }
+    free(list);
+}
+
+zk_hashtable* create_zk_hashtable()
+{
+    struct _zk_hashtable *ht=calloc(1,sizeof(struct _zk_hashtable));
+    assert(ht);
+    ht->ht=create_hashtable(32,string_hash_djb2,string_equal);
+    return ht;
+}
+
+static void do_clean_hashtable(zk_hashtable* ht)
+{
+    struct hashtable_itr *it;
+    int hasMore;
+    if(hashtable_count(ht->ht)==0)
+        return;
+    it=hashtable_iterator(ht->ht);
+    do {
+        watcher_object_list_t* w=hashtable_iterator_value(it);
+        destroy_watcher_object_list(w);
+        hasMore=hashtable_iterator_remove(it);
+    } while(hasMore);
+    free(it);
+}
+
+void destroy_zk_hashtable(zk_hashtable* ht)
+{
+    if(ht!=0){
+        do_clean_hashtable(ht);
+        hashtable_destroy(ht->ht,0);
+        free(ht);
+    }
+}
+
+// searches for a watcher object instance in a watcher object list;
+// two watcher objects are equal if their watcher function and context pointers
+// are equal
+static watcher_object_t* search_watcher(watcher_object_list_t** wl,watcher_object_t* wo)
+{
+    watcher_object_t* wobj=(*wl)->head;
+    while(wobj!=0){
+        if(wobj->watcher==wo->watcher && wobj->context==wo->context)
+            return wobj;
+        wobj=wobj->next;
+    }
+    return 0;
+}
+
+static int add_to_list(watcher_object_list_t **wl, watcher_object_t *wo,
+                       int clone)
+{
+    if (search_watcher(wl, wo)==0) {
+        watcher_object_t* cloned=wo;
+        if (clone) {
+            cloned = clone_watcher_object(wo);
+            assert(cloned);
+        }
+        cloned->next = (*wl)->head;
+        (*wl)->head = cloned;
+        return 1;
+    } else if (!clone) {
+        // If it's here and we aren't supposed to clone, we must destroy
+        free(wo);
+    }
+    return 0;
+}
+
+static int do_insert_watcher_object(zk_hashtable *ht, const char *path, watcher_object_t* wo)
+{
+    int res=1;
+    watcher_object_list_t* wl;
+
+    wl=hashtable_search(ht->ht,(void*)path);
+    if(wl==0){
+        int res;
+        /* inserting a new path element */
+        res=hashtable_insert(ht->ht,strdup(path),create_watcher_object_list(wo));
+        assert(res);
+    }else{
+        /*
+         * Path already exists; check if the watcher already exists.
+         * Don't clone the watcher since it's allocated on the heap --- avoids
+         * a memory leak and saves a clone operation (calloc + copy).
+         */
+        res = add_to_list(&wl, wo, 0);
+    }
+    return res;    
+}
+
+
+char **collect_keys(zk_hashtable *ht, int *count)
+{
+    char **list;
+    struct hashtable_itr *it;
+    int i;
+
+    *count = hashtable_count(ht->ht);
+    list = calloc(*count, sizeof(char*));
+    it=hashtable_iterator(ht->ht);
+    for(i = 0; i < *count; i++) {
+        list[i] = strdup(hashtable_iterator_key(it));
+        hashtable_iterator_advance(it);
+    }
+    free(it);
+    return list;
+}
+
+static int insert_watcher_object(zk_hashtable *ht, const char *path,
+                                 watcher_object_t* wo)
+{
+    int res;
+    res=do_insert_watcher_object(ht,path,wo);
+    return res;
+}
+
+static void copy_watchers(watcher_object_list_t *from, watcher_object_list_t *to, int clone)
+{
+    watcher_object_t* wo=from->head;
+    while(wo){
+        watcher_object_t *next = wo->next;
+        add_to_list(&to, wo, clone);
+        wo=next;
+    }
+}
+
+static void copy_table(zk_hashtable *from, watcher_object_list_t *to) {
+    struct hashtable_itr *it;
+    int hasMore;
+    if(hashtable_count(from->ht)==0)
+        return;
+    it=hashtable_iterator(from->ht);
+    do {
+        watcher_object_list_t *w = hashtable_iterator_value(it);
+        copy_watchers(w, to, 1);
+        hasMore=hashtable_iterator_advance(it);
+    } while(hasMore);
+    free(it);
+}
+
+static void collect_session_watchers(zhandle_t *zh,
+                                     watcher_object_list_t **list)
+{
+    copy_table(zh->active_node_watchers, *list);
+    copy_table(zh->active_exist_watchers, *list);
+    copy_table(zh->active_child_watchers, *list);
+}
+
+static void add_for_event(zk_hashtable *ht, char *path, watcher_object_list_t **list)
+{
+    watcher_object_list_t* wl;
+    wl = (watcher_object_list_t*)hashtable_remove(ht->ht, path);
+    if (wl) {
+        copy_watchers(wl, *list, 0);
+        // Since we move, not clone the watch_objects, we just need to free the
+        // head pointer
+        free(wl);
+    }
+}
+
+static void do_foreach_watcher(watcher_object_t* wo,zhandle_t* zh,
+        const char* path,int type,int state)
+{
+    // session event's don't have paths
+    const char *client_path =
+        (type != ZOO_SESSION_EVENT ? sub_string(zh, path) : path);
+    while(wo!=0){
+        wo->watcher(zh,type,state,client_path,wo->context);
+        wo=wo->next;
+    }    
+    free_duplicate_path(client_path, path);
+}
+
+watcher_object_list_t *collectWatchers(zhandle_t *zh,int type, char *path)
+{
+    struct watcher_object_list *list = create_watcher_object_list(0); 
+
+    if(type==ZOO_SESSION_EVENT){
+        watcher_object_t defWatcher;
+        defWatcher.watcher=zh->watcher;
+        defWatcher.context=zh->context;
+        add_to_list(&list, &defWatcher, 1);
+        collect_session_watchers(zh, &list);
+        return list;
+    }
+    switch(type){
+    case CREATED_EVENT_DEF:
+    case CHANGED_EVENT_DEF:
+        // look up the watchers for the path and move them to a delivery list
+        add_for_event(zh->active_node_watchers,path,&list);
+        add_for_event(zh->active_exist_watchers,path,&list);
+        break;
+    case CHILD_EVENT_DEF:
+        // look up the watchers for the path and move them to a delivery list
+        add_for_event(zh->active_child_watchers,path,&list);
+        break;
+    case DELETED_EVENT_DEF:
+        // look up the watchers for the path and move them to a delivery list
+        add_for_event(zh->active_node_watchers,path,&list);
+        add_for_event(zh->active_exist_watchers,path,&list);
+        add_for_event(zh->active_child_watchers,path,&list);
+        break;
+    }
+    return list;
+}
+
+void deliverWatchers(zhandle_t *zh, int type,int state, char *path, watcher_object_list_t **list)
+{
+    if (!list || !(*list)) return;
+    do_foreach_watcher((*list)->head, zh, path, type, state);
+    destroy_watcher_object_list(*list);
+    *list = 0;
+}
+
+void activateWatcher(zhandle_t *zh, watcher_registration_t* reg, int rc)
+{
+    if(reg){
+        /* in multithreaded lib, this code is executed 
+         * by the IO thread */
+        zk_hashtable *ht = reg->checker(zh, rc);
+        if(ht){
+            insert_watcher_object(ht,reg->path,
+                    create_watcher_object(reg->watcher, reg->context));
+        }
+    }    
+}
+
+/* If watcher is NULL, we return TRUE since we consider it a match */
+static int containsWatcher(zk_hashtable *watchers, const char *path,
+        watcher_fn watcher, void *watcherCtx)
+{
+    watcher_object_list_t *wl;
+    watcher_object_t e;
+
+    if (!watcher)
+        return 1;
+
+    wl = hashtable_search(watchers->ht, (void *)path);
+    if (!wl)
+        return 0;
+
+    e.watcher = watcher;
+    e.context = watcherCtx;
+
+    return search_watcher(&wl, &e) ? 1 : 0;
+}
+
+/**
+ * remove any watcher_object that has a matching (watcher, watcherCtx)
+ */
+static void removeWatcherFromList(watcher_object_list_t *wl, watcher_fn watcher,
+        void *watcherCtx)
+{
+    watcher_object_t *e = NULL;
+
+    if (!wl || (wl && !wl->head))
+        return;
+
+    e = wl->head;
+    while (e){
+        if (e->next &&
+            e->next->watcher == watcher &&
+            e->next->context == watcherCtx) {
+            watcher_object_t *this = e->next;
+            e->next = e->next->next;
+            free(this);
+            break;
+        }
+        e = e->next;
+    }
+
+    if (wl->head &&
+        wl->head->watcher == watcher && wl->head->context == watcherCtx) {
+        watcher_object_t *this = wl->head;
+        wl->head = wl->head->next;
+        free(this);
+    }
+}
+
+static void removeWatcher(zk_hashtable *watchers, const char *path,
+        watcher_fn watcher, void *watcherCtx)
+{
+    watcher_object_list_t *wl = hashtable_search(watchers->ht, (void *)path);
+
+    if (!wl)
+        return;
+
+    if (!watcher) {
+        wl = (watcher_object_list_t *) hashtable_remove(watchers->ht,
+                                                        (void *)path);
+        destroy_watcher_object_list(wl);
+        return;
+    }
+
+    removeWatcherFromList(wl, watcher, watcherCtx);
+
+    if (!wl->head) {
+        wl = (watcher_object_list_t *) hashtable_remove(watchers->ht,
+                                                        (void *)path);
+        destroy_watcher_object_list(wl);
+    }
+}
+
+void deactivateWatcher(zhandle_t *zh, watcher_deregistration_t *dereg, int rc)
+{
+    if (rc != ZOK || !dereg)
+        return;
+
+    removeWatchers(zh, dereg->path, dereg->type, dereg->watcher,
+                   dereg->context);
+}
+
+void removeWatchers(zhandle_t *zh, const char* path, ZooWatcherType type,
+        watcher_fn watcher, void *watcherCtx)
+{
+    switch (type) {
+    case ZWATCHTYPE_CHILD:
+        removeWatcher(zh->active_child_watchers, path, watcher, watcherCtx);
+        break;
+    case ZWATCHTYPE_DATA:
+        removeWatcher(zh->active_node_watchers, path, watcher, watcherCtx);
+        removeWatcher(zh->active_exist_watchers, path, watcher, watcherCtx);
+        break;
+    case ZWATCHTYPE_ANY:
+        removeWatcher(zh->active_child_watchers, path, watcher, watcherCtx);
+        removeWatcher(zh->active_node_watchers, path, watcher, watcherCtx);
+        removeWatcher(zh->active_exist_watchers, path, watcher, watcherCtx);
+        break;
+    }
+}
+
+int pathHasWatcher(zhandle_t *zh, const char *path, int wtype,
+        watcher_fn watcher, void *watcherCtx)
+{
+    int watcher_found = 0;
+
+    switch (wtype) {
+    case ZWATCHTYPE_CHILD:
+        watcher_found = containsWatcher(zh->active_child_watchers,
+                                        path, watcher, watcherCtx);
+        break;
+    case ZWATCHTYPE_DATA:
+        watcher_found = containsWatcher(zh->active_node_watchers, path,
+                                        watcher, watcherCtx);
+        if (!watcher_found) {
+            watcher_found = containsWatcher(zh->active_exist_watchers, path,
+                                            watcher, watcherCtx);
+        }
+        break;
+    case ZWATCHTYPE_ANY:
+        watcher_found = containsWatcher(zh->active_child_watchers, path,
+                                        watcher, watcherCtx);
+        if (!watcher_found) {
+            watcher_found = containsWatcher(zh->active_node_watchers, path,
+                                            watcher, watcherCtx);
+        }
+        if (!watcher_found) {
+            watcher_found = containsWatcher(zh->active_exist_watchers, path,
+                                            watcher, watcherCtx);
+        }
+        break;
+    }
+
+    return watcher_found;
+}

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/176bd682/zookeeper-client/zookeeper-client-c/src/zk_hashtable.h
----------------------------------------------------------------------
diff --git a/zookeeper-client/zookeeper-client-c/src/zk_hashtable.h b/zookeeper-client/zookeeper-client-c/src/zk_hashtable.h
new file mode 100644
index 0000000..5227e07
--- /dev/null
+++ b/zookeeper-client/zookeeper-client-c/src/zk_hashtable.h
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ZK_HASHTABLE_H_
+#define ZK_HASHTABLE_H_
+
+#include <zookeeper.h>
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+    typedef struct watcher_object_list watcher_object_list_t;
+typedef struct _zk_hashtable zk_hashtable;
+
+/**
+ * The function must return a non-zero value if the watcher object can be activated
+ * as a result of the server response. Normally, a watch can only be activated
+ * if the server returns a success code (ZOK). However in the case when zoo_exists() 
+ * returns a ZNONODE code the watcher should be activated nevertheless.
+ */
+typedef zk_hashtable *(*result_checker_fn)(zhandle_t *, int rc);
+
+/**
+ * A watcher object gets temporarily stored with the completion entry until 
+ * the server response comes back at which moment the watcher object is moved
+ * to the active watchers map.
+ */
+typedef struct _watcher_registration {
+    watcher_fn watcher;
+    void* context;
+    result_checker_fn checker;
+    const char* path;
+} watcher_registration_t;
+
+/**
+ * A watcher deregistration gets temporarily stored with the completion entry until
+ * the server response comes back at which moment we can remove the watchers from
+ * the active watchers map.
+ */
+typedef struct _watcher_deregistration {
+    watcher_fn watcher;
+    void* context;
+    ZooWatcherType type;
+    const char* path;
+} watcher_deregistration_t;
+
+zk_hashtable* create_zk_hashtable();
+void destroy_zk_hashtable(zk_hashtable* ht);
+
+char **collect_keys(zk_hashtable *ht, int *count);
+
+/**
+ * check if the completion has a watcher object associated
+ * with it. If it does, move the watcher object to the map of
+ * active watchers (only if the checker allows to do so)
+ */
+    void activateWatcher(zhandle_t *zh, watcher_registration_t* reg, int rc);
+    void deactivateWatcher(zhandle_t *zh, watcher_deregistration_t *dereg, int rc);
+    watcher_object_list_t *collectWatchers(zhandle_t *zh,int type, char *path);
+    void deliverWatchers(zhandle_t *zh, int type, int state, char *path, struct watcher_object_list **list);
+    void removeWatchers(zhandle_t *zh, const char* path, ZooWatcherType type,
+                        watcher_fn watcher, void *watcherCtx);
+    int pathHasWatcher(zhandle_t *zh, const char *path, int wtype,
+                       watcher_fn watcher, void *watcherCtx);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /*ZK_HASHTABLE_H_*/

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/176bd682/zookeeper-client/zookeeper-client-c/src/zk_log.c
----------------------------------------------------------------------
diff --git a/zookeeper-client/zookeeper-client-c/src/zk_log.c b/zookeeper-client/zookeeper-client-c/src/zk_log.c
new file mode 100644
index 0000000..436485e
--- /dev/null
+++ b/zookeeper-client/zookeeper-client-c/src/zk_log.c
@@ -0,0 +1,203 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#if !defined(DLL_EXPORT) && !defined(USE_STATIC_LIB)
+#  define USE_STATIC_LIB
+#endif
+
+#include "zookeeper_log.h"
+#ifndef WIN32
+#include <unistd.h>
+#else
+typedef DWORD pid_t;
+#include <process.h> /* for getpid */
+#endif
+
+#include <stdarg.h>
+#include <time.h>
+
+#define TIME_NOW_BUF_SIZE 1024
+#define FORMAT_LOG_BUF_SIZE 4096
+
+#ifdef THREADED
+#ifndef WIN32
+#include <pthread.h>
+#else 
+#include "winport.h"
+#endif
+
+static pthread_key_t time_now_buffer;
+static pthread_key_t format_log_msg_buffer;
+
+void freeBuffer(void* p){
+    if(p) free(p);
+}
+
+__attribute__((constructor)) void prepareTSDKeys() {
+    pthread_key_create (&time_now_buffer, freeBuffer);
+    pthread_key_create (&format_log_msg_buffer, freeBuffer);
+}
+
+char* getTSData(pthread_key_t key,int size){
+    char* p=pthread_getspecific(key);
+    if(p==0){
+        int res;
+        p=calloc(1,size);
+        res=pthread_setspecific(key,p);
+        if(res!=0){
+            fprintf(stderr,"Failed to set TSD key: %d",res);
+        }
+    }
+    return p;
+}
+
+char* get_time_buffer(){
+    return getTSData(time_now_buffer,TIME_NOW_BUF_SIZE);
+}
+
+char* get_format_log_buffer(){  
+    return getTSData(format_log_msg_buffer,FORMAT_LOG_BUF_SIZE);
+}
+#else
+char* get_time_buffer(){
+    static char buf[TIME_NOW_BUF_SIZE];
+    return buf;    
+}
+
+char* get_format_log_buffer(){
+    static char buf[FORMAT_LOG_BUF_SIZE];
+    return buf;
+}
+
+#endif
+
+ZooLogLevel logLevel=ZOO_LOG_LEVEL_INFO;
+
+static FILE* logStream=0;
+FILE* zoo_get_log_stream(){
+    if(logStream==0)
+        logStream=stderr;
+    return logStream;
+}
+
+void zoo_set_log_stream(FILE* stream){
+    logStream=stream;
+}
+
+static const char* time_now(char* now_str){
+    struct timeval tv;
+    struct tm lt;
+    time_t now = 0;
+    size_t len = 0;
+    
+    gettimeofday(&tv,0);
+
+    now = tv.tv_sec;
+    localtime_r(&now, &lt);
+
+    // clone the format used by log4j ISO8601DateFormat
+    // specifically: "yyyy-MM-dd HH:mm:ss,SSS"
+
+    len = strftime(now_str, TIME_NOW_BUF_SIZE,
+                          "%Y-%m-%d %H:%M:%S",
+                          &lt);
+
+    len += snprintf(now_str + len,
+                    TIME_NOW_BUF_SIZE - len,
+                    ",%03d",
+                    (int)(tv.tv_usec/1000));
+
+    return now_str;
+}
+
+void log_message(log_callback_fn callback, ZooLogLevel curLevel,
+    int line, const char* funcName, const char* format, ...)
+{
+    static const char* dbgLevelStr[]={"ZOO_INVALID","ZOO_ERROR","ZOO_WARN",
+            "ZOO_INFO","ZOO_DEBUG"};
+    static pid_t pid=0;
+    va_list va;
+    int ofs = 0;
+#ifdef THREADED
+    unsigned long int tid = 0;
+#endif
+#ifdef WIN32
+    char timebuf [TIME_NOW_BUF_SIZE];
+    const char* time = time_now(timebuf);
+#else
+    const char* time = time_now(get_time_buffer());
+#endif
+
+    char* buf = get_format_log_buffer();
+    if(!buf)
+    {
+        fprintf(stderr, "log_message: Unable to allocate memory buffer");
+        return;
+    }
+
+    if(pid==0)
+    {
+        pid=getpid();
+    }
+
+
+#ifndef THREADED
+
+    // pid_t is long on Solaris
+    ofs = snprintf(buf, FORMAT_LOG_BUF_SIZE,
+                   "%s:%ld:%s@%s@%d: ", time, (long)pid,
+                   dbgLevelStr[curLevel], funcName, line);
+#else
+
+    #ifdef WIN32
+        tid = (unsigned long int)(pthread_self().thread_id);
+    #else
+        tid = (unsigned long int)(pthread_self());
+    #endif
+
+    ofs = snprintf(buf, FORMAT_LOG_BUF_SIZE-1,
+                   "%s:%ld(0x%lx):%s@%s@%d: ", time, (long)pid, tid,
+                   dbgLevelStr[curLevel], funcName, line);
+#endif
+
+    // Now grab the actual message out of the variadic arg list
+    va_start(va, format);
+    vsnprintf(buf+ofs, FORMAT_LOG_BUF_SIZE-1-ofs, format, va);
+    va_end(va);
+
+    if (callback)
+    {
+        callback(buf);
+    } else {
+        fprintf(zoo_get_log_stream(), "%s\n", buf);
+        fflush(zoo_get_log_stream());
+    }
+}
+
+void zoo_set_debug_level(ZooLogLevel level)
+{
+    if(level==0){
+        // disable logging (unit tests do this)
+        logLevel=(ZooLogLevel)0;
+        return;
+    }
+    if(level<ZOO_LOG_LEVEL_ERROR)level=ZOO_LOG_LEVEL_ERROR;
+    if(level>ZOO_LOG_LEVEL_DEBUG)level=ZOO_LOG_LEVEL_DEBUG;
+    logLevel=level;
+}
+


Mime
View raw message