zookeeper-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mic...@apache.org
Subject svn commit: r1484357 - in /zookeeper/trunk: ./ src/c/include/ src/c/src/ src/c/tests/
Date Sun, 19 May 2013 22:08:08 GMT
Author: michim
Date: Sun May 19 22:08:08 2013
New Revision: 1484357

URL: http://svn.apache.org/r1484357
Log:
ZOOKEEPER-1400. Allow logging via callback instead of raw FILE pointer (Marshall McMullen via michim)

Modified:
    zookeeper/trunk/CHANGES.txt
    zookeeper/trunk/src/c/include/zookeeper.h
    zookeeper/trunk/src/c/include/zookeeper_log.h
    zookeeper/trunk/src/c/src/load_gen.c
    zookeeper/trunk/src/c/src/mt_adaptor.c
    zookeeper/trunk/src/c/src/zk_adaptor.h
    zookeeper/trunk/src/c/src/zk_log.c
    zookeeper/trunk/src/c/src/zookeeper.c
    zookeeper/trunk/src/c/tests/PthreadMocks.h
    zookeeper/trunk/src/c/tests/TestClient.cc
    zookeeper/trunk/src/c/tests/TestOperations.cc
    zookeeper/trunk/src/c/tests/Util.h

Modified: zookeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/trunk/CHANGES.txt?rev=1484357&r1=1484356&r2=1484357&view=diff
==============================================================================
--- zookeeper/trunk/CHANGES.txt (original)
+++ zookeeper/trunk/CHANGES.txt Sun May 19 22:08:08 2013
@@ -12,7 +12,10 @@ NEW FEATURES:
   ZOOKEEPER-1572. Add an async (Java) interface for multi request (Sijie Guo via camille)
 
   ZOOKEEPER-107. Allow dynamic changes to server cluster membership (Alex Shraer via breed)
-  
+
+  ZOOKEEPER-1400. Allow logging via callback instead of raw FILE pointer
+  (Marshall McMullen via michim)
+
 BUGFIXES:
 
   ZOOKEEPER-786. Exception in ZooKeeper.toString

Modified: zookeeper/trunk/src/c/include/zookeeper.h
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/c/include/zookeeper.h?rev=1484357&r1=1484356&r2=1484357&view=diff
==============================================================================
--- zookeeper/trunk/src/c/include/zookeeper.h (original)
+++ zookeeper/trunk/src/c/include/zookeeper.h Sun May 19 22:08:08 2013
@@ -427,6 +427,14 @@ typedef void (*watcher_fn)(zhandle_t *zh
         int state, const char *path,void *watcherCtx);
 
 /**
+ * \brief typedef for setting the log callback. It's a function pointer which
+ * returns void and accepts a const char* as its only argument.
+ *
+ * \param message message to be passed to the callback function.
+ */
+typedef void (*log_callback_fn)(const char *message);
+
+/**
  * \brief create a handle to used communicate with zookeeper.
  *
  * This method creates a new handle and a zookeeper session that corresponds
@@ -458,6 +466,45 @@ ZOOAPI zhandle_t *zookeeper_init(const c
   int recv_timeout, const clientid_t *clientid, void *context, int flags);
 
 /**
+ * \brief create a handle to communicate with zookeeper.
+ *
+ * This function is identical to \ref zookeeper_init except it allows one
+ * to specify an additional callback to be used for all logging for that
+ * specific connection. For more details on the logging callback see
+ * \ref zoo_get_log_callback and \ref zoo_set_log_callback.
+ *
+ * This method creates a new handle and a zookeeper session that corresponds
+ * to that handle. Session establishment is asynchronous, meaning that the
+ * session should not be considered established until (and unless) an
+ * event of state ZOO_CONNECTED_STATE is received.
+ * \param host comma separated host:port pairs, each corresponding to a zk
+ *   server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
+ * \param fn the global watcher callback function. When notifications are
+ *   triggered this function will be invoked.
+ * \param clientid the id of a previously established session that this
+ *   client will be reconnecting to. Pass 0 if not reconnecting to a previous
+ *   session. Clients can access the session id of an established, valid,
+ *   connection by calling \ref zoo_client_id. If the session corresponding to
+ *   the specified clientid has expired, or if the clientid is invalid for
+ *   any reason, the returned zhandle_t will be invalid -- the zhandle_t
+ *   state will indicate the reason for failure (typically
+ *   ZOO_EXPIRED_SESSION_STATE).
+ * \param context the handback object that will be associated with this instance
+ *   of zhandle_t. Application can access it (for example, in the watcher
+ *   callback) using \ref zoo_get_context. The object is not used by zookeeper
+ *   internally and can be null.
+ * \param flags reserved for future use. Should be set to zero.
+ * \param log_callback All log messages will be passed to this callback function.
+ *   For more details see \ref zoo_get_log_callback and \ref zoo_set_log_callback.
+ * \return a pointer to the opaque zhandle structure. If it fails to create
+ * a new zhandle the function returns NULL and the errno variable
+ * indicates the reason.
+ */
+ZOOAPI zhandle_t *zookeeper_init2(const char *host, watcher_fn fn,
+  int recv_timeout, const clientid_t *clientid, void *context, int flags,
+  log_callback_fn log_callback);
+
+/**
  * \brief update the list of servers this client will connect to.
  *
  * This method allows a client to update the connection string by providing
@@ -1408,6 +1455,32 @@ ZOOAPI void zoo_set_debug_level(ZooLogLe
 ZOOAPI void zoo_set_log_stream(FILE* logStream);
 
 /**
+ * \brief gets the callback to be used by this connection for logging.
+ *
+ * This is a per-connection logging mechanism that will take priority over
+ * the library-wide default log stream. That is, zookeeper library will first
+ * try to use a per-connection callback if available and if not, will fallback
+ * to using the logging stream. Passing in NULL resets the callback and will
+ * cause it to then fallback to using the logging stream as described in \ref
+ * zoo_set_log_stream.
+ */
+ZOOAPI log_callback_fn zoo_get_log_callback(const zhandle_t *zh);
+
+/**
+ * \brief sets the callback to be used by the library for logging
+ *
+ * Setting this callback has the effect of overriding the default log stream.
+ * Zookeeper will first try to use a per-connection callback if available
+ * and if not, will fallback to using the logging stream. Passing in NULL
+ * resets the callback and will cause it to then fallback to using the logging
+ * stream as described in \ref zoo_set_log_stream.
+ *
+ * Note: The provided callback will be invoked by multiple threads and therefore
+ * it needs to be thread-safe.
+ */
+ZOOAPI void zoo_set_log_callback(zhandle_t *zh, log_callback_fn callback);
+
+/**
  * \brief enable/disable quorum endpoint order randomization
  *
  * Note: typically this method should NOT be used outside of testing.

Modified: zookeeper/trunk/src/c/include/zookeeper_log.h
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/c/include/zookeeper_log.h?rev=1484357&r1=1484356&r2=1484357&view=diff
==============================================================================
--- zookeeper/trunk/src/c/include/zookeeper_log.h (original)
+++ zookeeper/trunk/src/c/include/zookeeper_log.h Sun May 19 22:08:08 2013
@@ -26,23 +26,22 @@ extern "C" {
 #endif
 
 extern ZOOAPI ZooLogLevel logLevel;
-#define LOGSTREAM getLogStream()
+#define LOGCALLBACK(_zh) zoo_get_log_callback(_zh)
+#define LOGSTREAM NULL
 
-#define LOG_ERROR(x) if(logLevel>=ZOO_LOG_LEVEL_ERROR) \
-    log_message(ZOO_LOG_LEVEL_ERROR,__LINE__,__func__,format_log_message x)
-#define LOG_WARN(x) if(logLevel>=ZOO_LOG_LEVEL_WARN) \
-    log_message(ZOO_LOG_LEVEL_WARN,__LINE__,__func__,format_log_message x)
-#define LOG_INFO(x) if(logLevel>=ZOO_LOG_LEVEL_INFO) \
-    log_message(ZOO_LOG_LEVEL_INFO,__LINE__,__func__,format_log_message x)
-#define LOG_DEBUG(x) if(logLevel==ZOO_LOG_LEVEL_DEBUG) \
-    log_message(ZOO_LOG_LEVEL_DEBUG,__LINE__,__func__,format_log_message x)
+#define LOG_ERROR(_cb, ...) if(logLevel>=ZOO_LOG_LEVEL_ERROR) \
+    log_message(_cb, ZOO_LOG_LEVEL_ERROR, __LINE__, __func__, __VA_ARGS__)
+#define LOG_WARN(_cb, ...) if(logLevel>=ZOO_LOG_LEVEL_WARN) \
+    log_message(_cb, ZOO_LOG_LEVEL_WARN, __LINE__, __func__, __VA_ARGS__)
+#define LOG_INFO(_cb, ...) if(logLevel>=ZOO_LOG_LEVEL_INFO) \
+    log_message(_cb, ZOO_LOG_LEVEL_INFO, __LINE__, __func__, __VA_ARGS__)
+#define LOG_DEBUG(_cb, ...) if(logLevel==ZOO_LOG_LEVEL_DEBUG) \
+    log_message(_cb, ZOO_LOG_LEVEL_DEBUG, __LINE__, __func__, __VA_ARGS__)
 
-ZOOAPI void log_message(ZooLogLevel curLevel, int line,const char* funcName,
-    const char* message);
+ZOOAPI void log_message(log_callback_fn callback, ZooLogLevel curLevel,
+    int line, const char* funcName, const char* format, ...);
 
-ZOOAPI const char* format_log_message(const char* format,...);
-
-FILE* getLogStream();
+FILE* zoo_get_log_stream();
 
 #ifdef __cplusplus
 }

Modified: zookeeper/trunk/src/c/src/load_gen.c
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/c/src/load_gen.c?rev=1484357&r1=1484356&r2=1484357&view=diff
==============================================================================
--- zookeeper/trunk/src/c/src/load_gen.c (original)
+++ zookeeper/trunk/src/c/src/load_gen.c Sun May 19 22:08:08 2013
@@ -88,7 +88,7 @@ void listener(zhandle_t *zzh, int type, 
 void create_completion(int rc, const char *name, const void *data) {
     incCounter(-1);
     if(rc!=ZOK){
-        LOG_ERROR(("Failed to create a node rc=%d",rc));
+        LOG_ERROR(LOGSTREAM, "Failed to create a node rc=%d",rc);
     }
 }
 
@@ -102,7 +102,7 @@ int doCreateNodes(const char* root, int 
         rc=zoo_acreate(zh, nodeName, "first", 5, &ZOO_OPEN_ACL_UNSAFE, 0,
                             create_completion, 0);
         if(i%1000==0){
-            LOG_INFO(("Created %s",nodeName));
+            LOG_INFO(LOGSTREAM, "Created %s", nodeName);
         }
         if(rc!=ZOK) return rc;        
     }
@@ -116,7 +116,7 @@ int createRoot(const char* root){
 void write_completion(int rc, const struct Stat *stat, const void *data) {
     incCounter(-1);
     if(rc!=ZOK){
-        LOG_ERROR(("Failed to write a node rc=%d",rc));
+        LOG_ERROR(LOGSTREAM, "Failed to write a node rc=%d",rc);
     }
 }
 
@@ -137,13 +137,13 @@ void read_completion(int rc, const char 
         const struct Stat *stat, const void *data) {
     incCounter(-1);    
     if(rc!=ZOK){
-        LOG_ERROR(("Failed to read a node rc=%d",rc));
+        LOG_ERROR(LOGSTREAM, "Failed to read a node rc=%d",rc);
         return;
     }
     if(memcmp(value,"second",6)!=0){
         char buf[value_len+1];
         memcpy(buf,value,value_len);buf[value_len]=0;
-        LOG_ERROR(("Invalid read, expected [second], received [%s]\n",buf));
+        LOG_ERROR(LOGSTREAM, "Invalid read, expected [second], received [%s]\n",buf);
         exit(1);
     }
 }
@@ -198,7 +198,7 @@ int recursiveDelete(const char* root){
     int rc=zoo_get_children(zh,root,0,&children);
     if(rc!=ZNONODE){
         if(rc!=ZOK){
-            LOG_ERROR(("Failed to get children of %s, rc=%d",root,rc));
+            LOG_ERROR(LOGSTREAM, "Failed to get children of %s, rc=%d",root,rc);
             return rc;
         }
         for(i=0;i<children.count; i++){
@@ -214,10 +214,10 @@ int recursiveDelete(const char* root){
         free_String_vector(&children);
     }
     if(deletedCounter%1000==0)
-        LOG_INFO(("Deleting %s",root));
+        LOG_INFO(LOGSTREAM, "Deleting %s",root);
     rc=zoo_delete(zh,root,-1);
     if(rc!=ZOK){
-        LOG_ERROR(("Failed to delete znode %s, rc=%d",root,rc));
+        LOG_ERROR(LOGSTREAM, "Failed to delete znode %s, rc=%d",root,rc);
     }else
         deletedCounter++;
     return rc;
@@ -245,15 +245,15 @@ int main(int argc, char **argv) {
     if (!zh)
         return errno;
 
-    LOG_INFO(("Checking server connection..."));
+    LOG_INFO(LOGSTREAM, "Checking server connection...");
     ensureConnected();
     if(cleaning==1){
         int rc = 0;
         deletedCounter=0;
         rc=recursiveDelete(argv[2]);
         if(rc==ZOK){
-            LOG_INFO(("Succesfully deleted a subtree starting at %s (%d nodes)",
-                    argv[2],deletedCounter));
+            LOG_INFO(LOGSTREAM, "Succesfully deleted a subtree starting at %s (%d nodes)",
+                    argv[2],deletedCounter);
             exit(0);
         }
         exit(1);
@@ -262,18 +262,18 @@ int main(int argc, char **argv) {
     createRoot(argv[2]);
     while(1) {
         ensureConnected();
-        LOG_INFO(("Creating children for path %s",argv[2]));
+        LOG_INFO(LOGSTREAM, "Creating children for path %s",argv[2]);
         doCreateNodes(argv[2],nodeCount);
         waitCounter();
         
-        LOG_INFO(("Starting the write cycle for path %s",argv[2]));
+        LOG_INFO(LOGSTREAM, "Starting the write cycle for path %s",argv[2]);
         doWrites(argv[2],nodeCount);
         waitCounter();
-        LOG_INFO(("Starting the read cycle for path %s",argv[2]));
+        LOG_INFO(LOGSTREAM, "Starting the read cycle for path %s",argv[2]);
         doReads(argv[2],nodeCount);
         waitCounter();
 
-        LOG_INFO(("Starting the delete cycle for path %s",argv[2]));
+        LOG_INFO(LOGSTREAM, "Starting the delete cycle for path %s",argv[2]);
         doDeletes(argv[2],nodeCount);
         waitCounter();
     }

Modified: zookeeper/trunk/src/c/src/mt_adaptor.c
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/c/src/mt_adaptor.c?rev=1484357&r1=1484356&r2=1484357&view=diff
==============================================================================
--- zookeeper/trunk/src/c/src/mt_adaptor.c (original)
+++ zookeeper/trunk/src/c/src/mt_adaptor.c Sun May 19 22:08:08 2013
@@ -116,7 +116,7 @@ unsigned __stdcall do_completion( void *
 
 int handle_error(SOCKET sock, char* message)
 {
-       LOG_ERROR(("%s. %d",message, WSAGetLastError()));
+       LOG_ERROR(LOGCALLBACK(zh), "%s. %d",message, WSAGetLastError());
        closesocket (sock);
        return -1;
 }
@@ -131,7 +131,7 @@ int create_socket_pair(SOCKET fds[2]) 
        
     SOCKET lst=socket(AF_INET, SOCK_STREAM,IPPROTO_TCP); 
     if (lst ==  INVALID_SOCKET ){
-       LOG_ERROR(("Error creating socket. %d",WSAGetLastError()));
+       LOG_ERROR(LOGCALLBACK(zh), "Error creating socket. %d",WSAGetLastError());
        return -1;
     }
     memset(&inaddr, 0, sizeof(inaddr)); 
@@ -218,7 +218,7 @@ void start_threads(zhandle_t* zh)
     // use api_prolog() to make sure zhandle doesn't get destroyed
     // while initialization is in progress
     api_prolog(zh);
-    LOG_DEBUG(("starting threads..."));
+    LOG_DEBUG(LOGCALLBACK(zh), "starting threads...");
     rc=pthread_create(&adaptor->io, 0, do_io, zh);
     assert("pthread_create() failed for the IO thread"&&!rc);
     rc=pthread_create(&adaptor->completion, 0, do_completion, zh);
@@ -232,17 +232,17 @@ int adaptor_init(zhandle_t *zh)
     pthread_mutexattr_t recursive_mx_attr;
     struct adaptor_threads *adaptor_threads = calloc(1, sizeof(*adaptor_threads));
     if (!adaptor_threads) {
-        LOG_ERROR(("Out of memory"));
+        LOG_ERROR(LOGCALLBACK(zh), "Out of memory");
         return -1;
     }
 
     /* We use a pipe for interrupting select() in unix/sol and socketpair in windows. */
 #ifdef WIN32   
     if (create_socket_pair(adaptor_threads->self_pipe) == -1){
-       LOG_ERROR(("Can't make a socket."));
+       LOG_ERROR(LOGCALLBACK(zh), "Can't make a socket.");
 #else
     if(pipe(adaptor_threads->self_pipe)==-1) {
-        LOG_ERROR(("Can't make a pipe %d",errno));
+        LOG_ERROR(LOGCALLBACK(zh), "Can't make a pipe %d",errno);
 #endif
         free(adaptor_threads);
         return -1;
@@ -365,7 +365,7 @@ void *do_io(void *v)
 
     api_prolog(zh);
     notify_thread_ready(zh);
-    LOG_DEBUG(("started IO thread"));
+    LOG_DEBUG(LOGCALLBACK(zh), "started IO thread");
     fds[0].fd=adaptor_threads->self_pipe[0];
     fds[0].events=POLLIN;
     while(!zh->close_requested) {
@@ -400,7 +400,7 @@ void *do_io(void *v)
     struct adaptor_threads *adaptor_threads = zh->adaptor_priv;
     api_prolog(zh);
     notify_thread_ready(zh);
-    LOG_DEBUG(("started IO thread"));
+    LOG_DEBUG(LOGCALLBACK(zh), "started IO thread");
     FD_ZERO(&rfds);   FD_ZERO(&wfds);    FD_ZERO(&efds);
     while(!zh->close_requested) {      
         struct timeval tv;
@@ -444,7 +444,7 @@ void *do_io(void *v)
             break;
     }
     api_epilog(zh, 0);    
-    LOG_DEBUG(("IO thread terminated"));
+    LOG_DEBUG(LOGCALLBACK(zh), "IO thread terminated");
     return 0;
 }
 
@@ -457,7 +457,7 @@ void *do_completion(void *v)
     zhandle_t *zh = v;
     api_prolog(zh);
     notify_thread_ready(zh);
-    LOG_DEBUG(("started completion thread"));
+    LOG_DEBUG(LOGCALLBACK(zh), "started completion thread");
     while(!zh->close_requested) {
         pthread_mutex_lock(&zh->completions_to_process.lock);
         while(!zh->completions_to_process.head && !zh->close_requested) {
@@ -467,7 +467,7 @@ void *do_completion(void *v)
         process_completions(zh);
     }
     api_epilog(zh, 0);    
-    LOG_DEBUG(("completion thread terminated"));
+    LOG_DEBUG(LOGCALLBACK(zh), "completion thread terminated");
     return 0;
 }
 

Modified: zookeeper/trunk/src/c/src/zk_adaptor.h
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/c/src/zk_adaptor.h?rev=1484357&r1=1484356&r2=1484357&view=diff
==============================================================================
--- zookeeper/trunk/src/c/src/zk_adaptor.h (original)
+++ zookeeper/trunk/src/c/src/zk_adaptor.h Sun May 19 22:08:08 2013
@@ -223,6 +223,7 @@ struct _zhandle {
     clientid_t client_id;               // client-id
     long long last_zxid;                // last zookeeper ID
     auth_list_head_t auth_h;            // authentication data list
+    log_callback_fn log_callback;       // Callback for logging (falls back to logging to stderr)  
 
     // Primer storage
     struct _buffer_list primer_buffer;  // The buffer used for the handshake at the start of a connection

Modified: zookeeper/trunk/src/c/src/zk_log.c
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/c/src/zk_log.c?rev=1484357&r1=1484356&r2=1484357&view=diff
==============================================================================
--- zookeeper/trunk/src/c/src/zk_log.c (original)
+++ zookeeper/trunk/src/c/src/zk_log.c Sun May 19 22:08:08 2013
@@ -86,7 +86,7 @@ char* get_format_log_buffer(){
 ZooLogLevel logLevel=ZOO_LOG_LEVEL_INFO;
 
 static FILE* logStream=0;
-FILE* getLogStream(){
+FILE* zoo_get_log_stream(){
     if(logStream==0)
         logStream=stderr;
     return logStream;
@@ -122,44 +122,64 @@ static const char* time_now(char* now_st
     return now_str;
 }
 
-void log_message(ZooLogLevel curLevel,int line,const char* funcName,
-    const char* message)
+void log_message(log_callback_fn callback, ZooLogLevel curLevel,
+    int line, const char* funcName, const char* format, ...)
 {
     static const char* dbgLevelStr[]={"ZOO_INVALID","ZOO_ERROR","ZOO_WARN",
             "ZOO_INFO","ZOO_DEBUG"};
+
+    char* buf = get_format_log_buffer();
+    if(!buf)
+    {
+        fprintf(stderr, "log_message: Unable to allocate memory buffer");
+        return;
+    }
+
     static pid_t pid=0;
+
+    if(pid==0)
+    {
+        pid=getpid();
+    }
+
 #ifdef WIN32
     char timebuf [TIME_NOW_BUF_SIZE];
+    const char* time = time_now(timebuf);
+#else
+    const char* time = time_now(get_time_buffer());
 #endif
-    if(pid==0)pid=getpid();
+
 #ifndef THREADED
-    fprintf(LOGSTREAM, "%s:%d:%s@%s@%d: %s\n", time_now(get_time_buffer()),pid,
-            dbgLevelStr[curLevel],funcName,line,message);
-#else
-#ifdef WIN32
-    fprintf(LOGSTREAM, "%s:%d(0x%lx):%s@%s@%d: %s\n", time_now(timebuf),pid,
-            (unsigned long int)(pthread_self().thread_id),
-            dbgLevelStr[curLevel],funcName,line,message);      
+
+    int ofs = snprintf(buf, FORMAT_LOG_BUF_SIZE,
+                       "%s:%d:%s@%s@%d: ", time, pid,
+                       dbgLevelStr[curLevel], funcName, line);
 #else
-    fprintf(LOGSTREAM, "%s:%d(0x%lx):%s@%s@%d: %s\n", time_now(get_time_buffer()),pid,
-            (unsigned long int)pthread_self(),
-            dbgLevelStr[curLevel],funcName,line,message);      
-#endif
+
+    #ifdef WIN32
+        unsigned long int tid = (unsigned long int)(pthread_self().thread_id);
+    #else
+        unsigned long int tid = (unsigned long int)(pthread_self());
+    #endif
+
+    int ofs = snprintf(buf, FORMAT_LOG_BUF_SIZE-1,
+                       "%s:%d(0x%lx):%s@%s@%d: ", time, pid, tid,
+                       dbgLevelStr[curLevel], funcName, line);
 #endif
-    fflush(LOGSTREAM);
-}
 
-const char* format_log_message(const char* format,...)
-{
+    // Now grab the actual message out of the variadic arg list
     va_list va;
-    char* buf=get_format_log_buffer();
-    if(!buf)
-        return "format_log_message: Unable to allocate memory buffer";
-    
-    va_start(va,format);
-    vsnprintf(buf, FORMAT_LOG_BUF_SIZE-1,format,va);
-    va_end(va); 
-    return buf;
+    va_start(va, format);
+    vsnprintf(buf+ofs, FORMAT_LOG_BUF_SIZE-1-ofs, format, va);
+    va_end(va);
+
+    if (callback)
+    {
+        callback(buf);
+    } else {
+        fprintf(zoo_get_log_stream(), "%s\n", buf);
+        fflush(zoo_get_log_stream());
+    }
 }
 
 void zoo_set_debug_level(ZooLogLevel level)

Modified: zookeeper/trunk/src/c/src/zookeeper.c
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/c/src/zookeeper.c?rev=1484357&r1=1484356&r2=1484357&view=diff
==============================================================================
--- zookeeper/trunk/src/c/src/zookeeper.c (original)
+++ zookeeper/trunk/src/c/src/zookeeper.c Sun May 19 22:08:08 2013
@@ -184,14 +184,14 @@ static int queue_session_event(zhandle_t
 static const char* format_endpoint_info(const struct sockaddr_storage* ep);
 
 /* deserialize forward declarations */
-static void deserialize_response(int type, int xid, int failed, int rc, completion_list_t *cptr, struct iarchive *ia);
-static int deserialize_multi(int xid, completion_list_t *cptr, struct iarchive *ia);
+static void deserialize_response(zhandle_t *zh, int type, int xid, int failed, int rc, completion_list_t *cptr, struct iarchive *ia);
+static int deserialize_multi(zhandle_t *zh, int xid, completion_list_t *cptr, struct iarchive *ia);
 
 /* completion routine forward declarations */
 static int add_completion(zhandle_t *zh, int xid, int completion_type,
         const void *dc, const void *data, int add_to_front,
         watcher_registration_t* wo, completion_head_t *clist);
-static completion_list_t* create_completion_entry(int xid, int completion_type,
+static completion_list_t* create_completion_entry(zhandle_t *zh, int xid, int completion_type,
         const void *dc, const void *data, watcher_registration_t* wo,
         completion_head_t *clist);
 static void destroy_completion_entry(completion_list_t* c);
@@ -484,7 +484,7 @@ static int count_hosts(char *hosts)
  * The contents of the provided address vector will be initialized to an
  * empty state.
  */
-int resolve_hosts(const char *hosts_in, addrvec_t *avec)
+static int resolve_hosts(const zhandle_t *zh, const char *hosts_in, addrvec_t *avec)
 {
     int rc = ZOK;
     char *host = NULL;
@@ -492,7 +492,7 @@ int resolve_hosts(const char *hosts_in, 
     int num_hosts = 0;
     char *strtok_last = NULL;
 
-    if (hosts_in == NULL || avec == NULL) {
+    if (zh == NULL || hosts_in == NULL || avec == NULL) {
         return ZBADARGUMENTS;
     }
 
@@ -501,7 +501,7 @@ int resolve_hosts(const char *hosts_in, 
 
     hosts = strdup(hosts_in);
     if (hosts == NULL) {
-        LOG_ERROR(("out of memory"));
+        LOG_ERROR(LOGCALLBACK(zh), "out of memory");
         errno=ENOMEM;
         rc=ZSYSTEMERROR;
         goto fail;
@@ -516,7 +516,7 @@ int resolve_hosts(const char *hosts_in, 
     // Allocate list inside avec
     rc = addrvec_alloc_capacity(avec, num_hosts);
     if (rc != 0) {
-        LOG_ERROR(("out of memory"));
+        LOG_ERROR(LOGCALLBACK(zh), "out of memory");
         errno=ENOMEM;
         rc=ZSYSTEMERROR;
         goto fail;
@@ -528,7 +528,7 @@ int resolve_hosts(const char *hosts_in, 
         char *end_port_spec;
         int port;
         if (!port_spec) {
-            LOG_ERROR(("no port in %s", host));
+            LOG_ERROR(LOGCALLBACK(zh), "no port in %s", host);
             errno=EINVAL;
             rc=ZBADARGUMENTS;
             goto fail;
@@ -537,7 +537,7 @@ int resolve_hosts(const char *hosts_in, 
         port_spec++;
         port = strtol(port_spec, &end_port_spec, 0);
         if (!*port_spec || *end_port_spec || port == 0) {
-            LOG_ERROR(("invalid port in %s", host));
+            LOG_ERROR(LOGCALLBACK(zh), "invalid port in %s", host);
             errno=EINVAL;
             rc=ZBADARGUMENTS;
             goto fail;
@@ -552,7 +552,7 @@ int resolve_hosts(const char *hosts_in, 
 
         he = gethostbyname(host);
         if (!he) {
-            LOG_ERROR(("could not resolve %s", host));
+            LOG_ERROR(LOGCALLBACK(zh), "could not resolve %s", host);
             errno=ENOENT;
             rc=ZBADARGUMENTS;
             goto fail;
@@ -563,7 +563,7 @@ int resolve_hosts(const char *hosts_in, 
             if (addrs->count == addrs->capacity) {
                 rc = addrvec_grow_default(addrs);
                 if (rc != 0) {
-                    LOG_ERROR(("out of memory"));
+                    LOG_ERROR(LOGCALLBACK(zh), "out of memory");
                     errno=ENOMEM;
                     rc=ZSYSTEMERROR;
                     goto fail;
@@ -591,8 +591,8 @@ int resolve_hosts(const char *hosts_in, 
             }
 #endif
             else {
-                LOG_WARN(("skipping unknown address family %x for %s",
-                         addr->ss_family, hosts_in));
+                LOG_WARN(LOGCALLBACK(zh), "skipping unknown address family %x for %s",
+                         addr->ss_family, hosts_in);
             }
         }
         host = strtok_r(0, ",", &strtok_last);
@@ -635,11 +635,11 @@ int resolve_hosts(const char *hosts_in, 
             if (rc != 0) {
                 errno = getaddrinfo_errno(rc);
 #ifdef WIN32
-                LOG_ERROR(("Win32 message: %s\n", gai_strerror(rc)));
+                LOG_ERROR(LOGCALLBACK(zh), "Win32 message: %s\n", gai_strerror(rc));
 #elif __linux__ && __GNUC__
-                LOG_ERROR(("getaddrinfo: %s\n", gai_strerror(rc)));
+                LOG_ERROR(LOGCALLBACK(zh), "getaddrinfo: %s\n", gai_strerror(rc));
 #else
-                LOG_ERROR(("getaddrinfo: %s\n", strerror(errno)));
+                LOG_ERROR(LOGCALLBACK(zh), "getaddrinfo: %s\n", strerror(errno));
 #endif
                 rc=ZSYSTEMERROR;
                 goto fail;
@@ -651,7 +651,7 @@ int resolve_hosts(const char *hosts_in, 
             if (avec->count == avec->capacity) {
                 rc = addrvec_grow_default(avec);
                 if (rc != 0) {
-                    LOG_ERROR(("out of memory"));
+                    LOG_ERROR(LOGCALLBACK(zh), "out of memory");
                     errno=ENOMEM;
                     rc=ZSYSTEMERROR;
                     goto fail;
@@ -667,8 +667,8 @@ int resolve_hosts(const char *hosts_in, 
                 addrvec_append_addrinfo(avec, res);
                 break;
             default:
-                LOG_WARN(("skipping unknown address family %x for %s",
-                          res->ai_family, hosts_in));
+                LOG_WARN(LOGCALLBACK(zh), "skipping unknown address family %x for %s",
+                          res->ai_family, hosts_in);
                 break;
             }
         }
@@ -753,7 +753,7 @@ int update_addrs(zhandle_t *zh)
         goto fail;
     }
 
-    rc = resolve_hosts(hosts, &resolved);
+    rc = resolve_hosts(zh, hosts, &resolved);
     if (rc != ZOK)
     {
         goto fail;
@@ -892,7 +892,7 @@ struct sockaddr* zookeeper_get_connected
     return addr;
 }
 
-static void log_env() {
+static void log_env(zhandle_t *zh) {
   char buf[2048];
 #ifdef HAVE_SYS_UTSNAME_H
   struct utsname utsname;
@@ -904,72 +904,82 @@ static void log_env() {
   uid_t uid = 0;
 #endif
 
-  LOG_INFO(("Client environment:zookeeper.version=%s", PACKAGE_STRING));
+  LOG_INFO(LOGCALLBACK(zh), "Client environment:zookeeper.version=%s", PACKAGE_STRING);
 
 #ifdef HAVE_GETHOSTNAME
   gethostname(buf, sizeof(buf));
-  LOG_INFO(("Client environment:host.name=%s", buf));
+  LOG_INFO(LOGCALLBACK(zh), "Client environment:host.name=%s", buf);
 #else
-  LOG_INFO(("Client environment:host.name=<not implemented>"));
+  LOG_INFO(LOGCALLBACK(zh), "Client environment:host.name=<not implemented>");
 #endif
 
 #ifdef HAVE_SYS_UTSNAME_H
   uname(&utsname);
-  LOG_INFO(("Client environment:os.name=%s", utsname.sysname));
-  LOG_INFO(("Client environment:os.arch=%s", utsname.release));
-  LOG_INFO(("Client environment:os.version=%s", utsname.version));
+  LOG_INFO(LOGCALLBACK(zh), "Client environment:os.name=%s", utsname.sysname);
+  LOG_INFO(LOGCALLBACK(zh), "Client environment:os.arch=%s", utsname.release);
+  LOG_INFO(LOGCALLBACK(zh), "Client environment:os.version=%s", utsname.version);
 #else
-  LOG_INFO(("Client environment:os.name=<not implemented>"));
-  LOG_INFO(("Client environment:os.arch=<not implemented>"));
-  LOG_INFO(("Client environment:os.version=<not implemented>"));
+  LOG_INFO(LOGCALLBACK(zh), "Client environment:os.name=<not implemented>");
+  LOG_INFO(LOGCALLBACK(zh), "Client environment:os.arch=<not implemented>");
+  LOG_INFO(LOGCALLBACK(zh), "Client environment:os.version=<not implemented>");
 #endif
 
 #ifdef HAVE_GETLOGIN
-  LOG_INFO(("Client environment:user.name=%s", getlogin()));
+  LOG_INFO(LOGCALLBACK(zh), "Client environment:user.name=%s", getlogin());
 #else
-  LOG_INFO(("Client environment:user.name=<not implemented>"));
+  LOG_INFO(LOGCALLBACK(zh), "Client environment:user.name=<not implemented>");
 #endif
 
 #if defined(HAVE_GETUID) && defined(HAVE_GETPWUID_R)
   uid = getuid();
   if (!getpwuid_r(uid, &pw, buf, sizeof(buf), &pwp) && pwp) {
-    LOG_INFO(("Client environment:user.home=%s", pw.pw_dir));
+    LOG_INFO(LOGCALLBACK(zh), "Client environment:user.home=%s", pw.pw_dir);
   } else {
-    LOG_INFO(("Client environment:user.home=<NA>"));
+    LOG_INFO(LOGCALLBACK(zh), "Client environment:user.home=<NA>");
   }
 #else
-  LOG_INFO(("Client environment:user.home=<not implemented>"));
+  LOG_INFO(LOGCALLBACK(zh), "Client environment:user.home=<not implemented>");
 #endif
 
 #ifdef HAVE_GETCWD
   if (!getcwd(buf, sizeof(buf))) {
-    LOG_INFO(("Client environment:user.dir=<toolong>"));
+    LOG_INFO(LOGCALLBACK(zh), "Client environment:user.dir=<toolong>");
   } else {
-    LOG_INFO(("Client environment:user.dir=%s", buf));
+    LOG_INFO(LOGCALLBACK(zh), "Client environment:user.dir=%s", buf);
   }
 #else
-  LOG_INFO(("Client environment:user.dir=<not implemented>"));
+  LOG_INFO(LOGCALLBACK(zh), "Client environment:user.dir=<not implemented>");
 #endif
 }
 
 /**
  * Create a zookeeper handle associated with the given host and port.
  */
-zhandle_t *zookeeper_init(const char *host, watcher_fn watcher,
-  int recv_timeout, const clientid_t *clientid, void *context, int flags)
+static zhandle_t *zookeeper_init_internal(const char *host, watcher_fn watcher,
+        int recv_timeout, const clientid_t *clientid, void *context, int flags,
+        log_callback_fn log_callback)
 {
     int errnosave = 0;
     zhandle_t *zh = NULL;
     char *index_chroot = NULL;
 
-    log_env();
+    // Create our handle
+    zh = calloc(1, sizeof(*zh));
+    if (!zh) {
+        return 0;
+    }
+
+    // Set log callback before calling into log_env
+    zh->log_callback = log_callback;
+    log_env(zh);
+
 #ifdef WIN32
-       if (Win32WSAStartup()){
-               LOG_ERROR(("Error initializing ws2_32.dll"));
-               return 0;
-       }
+    if (Win32WSAStartup()){
+        LOG_ERROR(LOGCALLBACK(zh), "Error initializing ws2_32.dll");
+        return 0;
+    }
 #endif
-    LOG_INFO(("Initiating client connection, host=%s sessionTimeout=%d watcher=%p"
+    LOG_INFO(LOGCALLBACK(zh), "Initiating client connection, host=%s sessionTimeout=%d watcher=%p"
           " sessionId=%#llx sessionPasswd=%s context=%p flags=%d",
               host,
               recv_timeout,
@@ -978,12 +988,8 @@ zhandle_t *zookeeper_init(const char *ho
               ((clientid == 0) || (clientid->passwd[0] == 0) ?
                "<null>" : "<hidden>"),
               context,
-              flags));
+              flags);
 
-    zh = calloc(1, sizeof(*zh));
-    if (!zh) {
-        return 0;
-    }
     zh->hostname = NULL;
     zh->fd = -1;
     zh->state = ZOO_NOTCONNECTED_STATE;
@@ -1060,6 +1066,19 @@ abort:
     return 0;
 }
 
+zhandle_t *zookeeper_init(const char *host, watcher_fn watcher,
+        int recv_timeout, const clientid_t *clientid, void *context, int flags)
+{
+    return zookeeper_init_internal(host, watcher, recv_timeout, clientid, context, flags, NULL);
+}
+
+zhandle_t *zookeeper_init2(const char *host, watcher_fn watcher,
+        int recv_timeout, const clientid_t *clientid, void *context, int flags,
+        log_callback_fn log_callback)
+{
+    return zookeeper_init_internal(host, watcher, recv_timeout, clientid, context, flags, log_callback);
+}
+
 /**
  * Set a new list of zk servers to connect to.  Disconnect will occur if
  * current connection endpoint is not in the list.
@@ -1068,7 +1087,7 @@ int zoo_set_servers(zhandle_t *zh, const
 {
     if (hosts == NULL)
     {
-        LOG_ERROR(("New server list cannot be empty"));
+        LOG_ERROR(LOGCALLBACK(zh), "New server list cannot be empty");
         return ZBADARGUMENTS;
     }
 
@@ -1107,12 +1126,12 @@ static int get_next_server_in_reconfig(z
 {
     int take_new = drand48() <= zh->pNew;
 
-    LOG_DEBUG(("[OLD] count=%d capacity=%d next=%d hasnext=%d",
+    LOG_DEBUG(LOGCALLBACK(zh), "[OLD] count=%d capacity=%d next=%d hasnext=%d",
                zh->addrs_old.count, zh->addrs_old.capacity, zh->addrs_old.next,
-               addrvec_hasnext(&zh->addrs_old)));
-    LOG_DEBUG(("[NEW] count=%d capacity=%d next=%d hasnext=%d",
+               addrvec_hasnext(&zh->addrs_old));
+    LOG_DEBUG(LOGCALLBACK(zh), "[NEW] count=%d capacity=%d next=%d hasnext=%d",
                zh->addrs_new.count, zh->addrs_new.capacity, zh->addrs_new.next,
-               addrvec_hasnext(&zh->addrs_new)));
+               addrvec_hasnext(&zh->addrs_new));
 
     // Take one of the new servers if we haven't tried them all yet
     // and either the probability tells us to connect to one of the new servers
@@ -1121,18 +1140,18 @@ static int get_next_server_in_reconfig(z
             && (take_new || !addrvec_hasnext(&zh->addrs_old)))
     {
         addrvec_next(&zh->addrs_new, &zh->addr_cur);
-        LOG_DEBUG(("Using next from NEW=%s", format_endpoint_info(&zh->addr_cur)));
+        LOG_DEBUG(LOGCALLBACK(zh), "Using next from NEW=%s", format_endpoint_info(&zh->addr_cur));
         return 0;
     }
 
     // start taking old servers
     if (addrvec_hasnext(&zh->addrs_old)) {
         addrvec_next(&zh->addrs_old, &zh->addr_cur);
-        LOG_DEBUG(("Using next from OLD=%s", format_endpoint_info(&zh->addr_cur)));
+        LOG_DEBUG(LOGCALLBACK(zh), "Using next from OLD=%s", format_endpoint_info(&zh->addr_cur));
         return 0;
     }
 
-    LOG_DEBUG(("Failed to find either new or old"));
+    LOG_DEBUG(LOGCALLBACK(zh), "Failed to find either new or old");
     memset(&zh->addr_cur, 0, sizeof(zh->addr_cur));
     return 1;
 }
@@ -1226,8 +1245,8 @@ char* sub_string(zhandle_t *zh, const ch
         return (char *) server_path;
     //ZOOKEEPER-1027
     if (strncmp(server_path, zh->chroot, strlen(zh->chroot)) != 0) {
-        LOG_ERROR(("server path %s does not include chroot path %s",
-                   server_path, zh->chroot));
+        LOG_ERROR(LOGCALLBACK(zh), "server path %s does not include chroot path %s",
+                   server_path, zh->chroot);
         return (char *) server_path;
     }
     if (strlen(server_path) == strlen(zh->chroot)) {
@@ -1546,17 +1565,17 @@ static void handle_error(zhandle_t *zh,i
 {
     close(zh->fd);
     if (is_unrecoverable(zh)) {
-        LOG_DEBUG(("Calling a watcher for a ZOO_SESSION_EVENT and the state=%s",
-                state2String(zh->state)));
+        LOG_DEBUG(LOGCALLBACK(zh), "Calling a watcher for a ZOO_SESSION_EVENT and the state=%s",
+                state2String(zh->state));
         PROCESS_SESSION_EVENT(zh, zh->state);
     } else if (zh->state == ZOO_CONNECTED_STATE) {
-        LOG_DEBUG(("Calling a watcher for a ZOO_SESSION_EVENT and the state=CONNECTING_STATE"));
+        LOG_DEBUG(LOGCALLBACK(zh), "Calling a watcher for a ZOO_SESSION_EVENT and the state=CONNECTING_STATE");
         PROCESS_SESSION_EVENT(zh, ZOO_CONNECTING_STATE);
     }
     cleanup_bufs(zh,1,rc);
     zh->fd = -1;
 
-    LOG_DEBUG(("Previous connection=[%s] delay=%d", zoo_get_current_server(zh), zh->delay));
+    LOG_DEBUG(LOGCALLBACK(zh), "Previous connection=[%s] delay=%d", zoo_get_current_server(zh), zh->delay);
 
     // NOTE: If we're at the end of the list of addresses to connect to, then
     // we want to delay the next connection attempt to avoid spinning.
@@ -1580,9 +1599,9 @@ static int handle_socket_error_msg(zhand
         char buf[1024];
         va_start(va,format);
         vsnprintf(buf, sizeof(buf)-1,format,va);
-        log_message(ZOO_LOG_LEVEL_ERROR,line,__func__,
-            format_log_message("Socket [%s] zk retcode=%d, errno=%d(%s): %s",
-            zoo_get_current_server(zh),rc,errno,strerror(errno),buf));
+        log_message(LOGCALLBACK(zh), ZOO_LOG_LEVEL_ERROR,line,__func__,
+            "Socket [%s] zk retcode=%d, errno=%d(%s): %s",
+            zoo_get_current_server(zh),rc,errno,strerror(errno),buf);
         va_end(va);
     }
     handle_error(zh,rc);
@@ -1611,11 +1630,11 @@ static void auth_completion_func(int rc,
     get_auth_completions(&zh->auth_h, &a_list);
     zoo_unlock_auth(zh);
     if (rc) {
-        LOG_ERROR(("Authentication scheme %s failed. Connection closed.",
-                   zh->auth_h.auth->scheme));
+        LOG_ERROR(LOGCALLBACK(zh), "Authentication scheme %s failed. Connection closed.",
+                   zh->auth_h.auth->scheme);
     }
     else {
-        LOG_INFO(("Authentication scheme %s succeeded", zh->auth_h.auth->scheme));
+        LOG_INFO(LOGCALLBACK(zh), "Authentication scheme %s succeeded", zh->auth_h.auth->scheme);
     }
     a_tmp = &a_list;
     // chain call user's completion function
@@ -1665,7 +1684,7 @@ static int send_auth_info(zhandle_t *zh)
         auth = auth->next;
     }
     zoo_unlock_auth(zh);
-    LOG_DEBUG(("Sending all auth info request to %s", zoo_get_current_server(zh)));
+    LOG_DEBUG(LOGCALLBACK(zh), "Sending all auth info request to %s", zoo_get_current_server(zh));
     return (rc <0) ? ZMARSHALLINGERROR:ZOK;
 }
 
@@ -1682,7 +1701,7 @@ static int send_last_auth_info(zhandle_t
     }
     rc = send_info_packet(zh, auth);
     zoo_unlock_auth(zh);
-    LOG_DEBUG(("Sending auth info request to %s",zoo_get_current_server(zh)));
+    LOG_DEBUG(LOGCALLBACK(zh), "Sending auth info request to %s",zoo_get_current_server(zh));
     return (rc < 0)?ZMARSHALLINGERROR:ZOK;
 }
 
@@ -1729,7 +1748,7 @@ static int send_set_watches(zhandle_t *z
     free_key_list(req.dataWatches.data, req.dataWatches.count);
     free_key_list(req.existWatches.data, req.existWatches.count);
     free_key_list(req.childWatches.data, req.childWatches.count);
-    LOG_DEBUG(("Sending set watches request to %s",zoo_get_current_server(zh)));
+    LOG_DEBUG(LOGCALLBACK(zh), "Sending set watches request to %s",zoo_get_current_server(zh));
     return (rc < 0)?ZMARSHALLINGERROR:ZOK;
 }
 
@@ -1891,7 +1910,7 @@ int zookeeper_interest(zhandle_t *zh, in
         int max_exceed = zh->recv_timeout / 10 > 200 ? 200 :
                          (zh->recv_timeout / 10);
         if (time_left > max_exceed)
-            LOG_WARN(("Exceeded deadline by %dms", time_left));
+            LOG_WARN(LOGCALLBACK(zh), "Exceeded deadline by %dms", time_left);
     }
     api_prolog(zh);
 
@@ -1919,8 +1938,8 @@ int zookeeper_interest(zhandle_t *zh, in
             *tv = get_timeval(zh->recv_timeout/60);
             zh->delay = 0;
 
-            LOG_WARN(("Delaying connection after exhaustively trying all servers [%s]",
-                     zh->hostname));
+            LOG_WARN(LOGCALLBACK(zh), "Delaying connection after exhaustively trying all servers [%s]",
+                     zh->hostname);
         }
 
         // No need to delay -- grab the next server and attempt connection
@@ -1941,7 +1960,7 @@ int zookeeper_interest(zhandle_t *zh, in
             }
             ssoresult = setsockopt(zh->fd, IPPROTO_TCP, TCP_NODELAY, &enable_tcp_nodelay, sizeof(enable_tcp_nodelay));
             if (ssoresult != 0) {
-                LOG_WARN(("Unable to set TCP_NODELAY, operation latency may be effected"));
+                LOG_WARN(LOGCALLBACK(zh), "Unable to set TCP_NODELAY, operation latency may be effected");
             }
 #ifdef WIN32
             ioctlsocket(zh->fd, FIONBIO, &nonblocking_flag);
@@ -1953,7 +1972,7 @@ int zookeeper_interest(zhandle_t *zh, in
                 rc = connect(zh->fd, (struct sockaddr*)&zh->addr_cur, sizeof(struct sockaddr_in6));
             } else {
 #else
-               LOG_DEBUG(("[zk] connect()\n"));
+               LOG_DEBUG(LOGCALLBACK(zh), "[zk] connect()\n");
             {
 #endif
                 rc = connect(zh->fd, (struct sockaddr*)&zh->addr_cur, sizeof(struct sockaddr_in));
@@ -1977,7 +1996,7 @@ int zookeeper_interest(zhandle_t *zh, in
                 if((rc=prime_connection(zh))!=0)
                     return api_epilog(zh,rc);
 
-                LOG_INFO(("Initiated connection to server [%s]", format_endpoint_info(&zh->addr_cur)));
+                LOG_INFO(LOGCALLBACK(zh), "Initiated connection to server [%s]", format_endpoint_info(&zh->addr_cur));
             }
             *tv = get_timeval(zh->recv_timeout/3);
         }
@@ -2014,11 +2033,11 @@ int zookeeper_interest(zhandle_t *zh, in
         if (zh->state==ZOO_CONNECTED_STATE) {
             send_to = zh->recv_timeout/3 - idle_send;
             if (send_to <= 0 && zh->sent_requests.head==0) {
-//                LOG_DEBUG(("Sending PING to %s (exceeded idle by %dms)",
-//                                zoo_get_current_server(zh),-send_to));
+//                LOG_DEBUG(LOGCALLBACK(zh), "Sending PING to %s (exceeded idle by %dms)",
+//                                zoo_get_current_server(zh),-send_to);
                 rc = send_ping(zh);
                 if (rc < 0){
-                    LOG_ERROR(("failed to send PING request (zk retcode=%d)",rc));
+                    LOG_ERROR(LOGCALLBACK(zh), "failed to send PING request (zk retcode=%d)",rc);
                     return api_epilog(zh,rc);
                 }
                 send_to = zh->recv_timeout/3;
@@ -2064,7 +2083,7 @@ static int check_events(zhandle_t *zh, i
         if((rc=prime_connection(zh))!=0)
             return rc;
 
-        LOG_INFO(("initiated connection to server [%s]", format_endpoint_info(&zh->addr_cur)));
+        LOG_INFO(LOGCALLBACK(zh), "initiated connection to server [%s]", format_endpoint_info(&zh->addr_cur));
         return ZOK;
     }
     if (zh->to_send.head && (events&ZOOKEEPER_WRITE)) {
@@ -2110,15 +2129,15 @@ static int check_events(zhandle_t *zh, i
                            sizeof(zh->client_id.passwd));
                     zh->state = ZOO_CONNECTED_STATE;
                     zh->reconfig = 0;
-                    LOG_INFO(("session establishment complete on server [%s], sessionId=%#llx, negotiated timeout=%d",
+                    LOG_INFO(LOGCALLBACK(zh), "session establishment complete on server [%s], sessionId=%#llx, negotiated timeout=%d",
                               format_endpoint_info(&zh->addr_cur),
-                              newid, zh->recv_timeout));
+                              newid, zh->recv_timeout);
                     /* we want the auth to be sent for, but since both call push to front
                        we need to call send_watch_set first */
                     send_set_watches(zh);
                     /* send the authentication packet now */
                     send_auth_info(zh);
-                    LOG_DEBUG(("Calling a watcher for a ZOO_SESSION_EVENT and the state=ZOO_CONNECTED_STATE"));
+                    LOG_DEBUG(LOGCALLBACK(zh), "Calling a watcher for a ZOO_SESSION_EVENT and the state=ZOO_CONNECTED_STATE");
                     zh->input_buffer = 0; // just in case the watcher calls zookeeper_process() again
                     PROCESS_SESSION_EVENT(zh, ZOO_CONNECTED_STATE);
                 }
@@ -2176,7 +2195,7 @@ static int queue_session_event(zhandle_t
     completion_list_t *cptr;
 
     if ((oa=create_buffer_oarchive())==NULL) {
-        LOG_ERROR(("out of memory"));
+        LOG_ERROR(LOGCALLBACK(zh), "out of memory");
         goto error;
     }
     rc = serialize_ReplyHeader(oa, "hdr", &hdr);
@@ -2185,7 +2204,7 @@ static int queue_session_event(zhandle_t
         close_buffer_oarchive(&oa, 1);
         goto error;
     }
-    cptr = create_completion_entry(WATCHER_EVENT_XID,-1,0,0,0,0);
+    cptr = create_completion_entry(zh, WATCHER_EVENT_XID,-1,0,0,0,0);
     cptr->buffer = allocate_buffer(get_buffer(oa), get_buffer_len(oa));
     cptr->buffer->curr_offset = get_buffer_len(oa);
     if (!cptr->buffer) {
@@ -2223,14 +2242,13 @@ completion_list_t *dequeue_completion(co
     return cptr;
 }
 
-static void process_sync_completion(
+static void process_sync_completion(zhandle_t *zh,
         completion_list_t *cptr,
         struct sync_completion *sc,
-        struct iarchive *ia,
-    zhandle_t *zh)
+        struct iarchive *ia)
 {
-    LOG_DEBUG(("Processing sync_completion with type=%d xid=%#x rc=%d",
-            cptr->c.type, cptr->xid, sc->rc));
+    LOG_DEBUG(LOGCALLBACK(zh), "Processing sync_completion with type=%d xid=%#x rc=%d",
+            cptr->c.type, cptr->xid, sc->rc);
 
     switch(cptr->c.type) {
     case COMPLETION_DATA:
@@ -2334,15 +2352,15 @@ static void process_sync_completion(
     case COMPLETION_VOID:
         break;
     case COMPLETION_MULTI:
-        sc->rc = deserialize_multi(cptr->xid, cptr, ia);
+        sc->rc = deserialize_multi(zh, cptr->xid, cptr, ia);
         break;
     default:
-        LOG_DEBUG(("Unsupported completion type=%d", cptr->c.type));
+        LOG_DEBUG(LOGCALLBACK(zh), "Unsupported completion type=%d", cptr->c.type);
         break;
     }
 }
 
-static int deserialize_multi(int xid, completion_list_t *cptr, struct iarchive *ia)
+static int deserialize_multi(zhandle_t *zh, int xid, completion_list_t *cptr, struct iarchive *ia)
 {
     int rc = 0;
     completion_head_t *clist = &cptr->c.clist;
@@ -2362,7 +2380,7 @@ static int deserialize_multi(int xid, co
             }
         }
 
-        deserialize_response(entry->c.type, xid, mhdr.type == -1, mhdr.err, entry, ia);
+        deserialize_response(zh, entry->c.type, xid, mhdr.type == -1, mhdr.err, entry, ia);
         deserialize_MultiHeader(ia, "multiheader", &mhdr);
         //While deserializing the response we must destroy completion entry for each operation in 
         //the zoo_multi transaction. Otherwise this results in memory leak when client invokes zoo_multi
@@ -2373,12 +2391,12 @@ static int deserialize_multi(int xid, co
     return rc;
 }
 
-static void deserialize_response(int type, int xid, int failed, int rc, completion_list_t *cptr, struct iarchive *ia)
+static void deserialize_response(zhandle_t *zh, int type, int xid, int failed, int rc, completion_list_t *cptr, struct iarchive *ia)
 {
     switch (type) {
     case COMPLETION_DATA:
-        LOG_DEBUG(("Calling COMPLETION_DATA for xid=%#x failed=%d rc=%d",
-                    cptr->xid, failed, rc));
+        LOG_DEBUG(LOGCALLBACK(zh), "Calling COMPLETION_DATA for xid=%#x failed=%d rc=%d",
+                    cptr->xid, failed, rc);
         if (failed) {
             cptr->c.data_result(rc, 0, 0, 0, cptr->data);
         } else {
@@ -2390,8 +2408,8 @@ static void deserialize_response(int typ
         }
         break;
     case COMPLETION_STAT:
-        LOG_DEBUG(("Calling COMPLETION_STAT for xid=%#x failed=%d rc=%d",
-                    cptr->xid, failed, rc));
+        LOG_DEBUG(LOGCALLBACK(zh), "Calling COMPLETION_STAT for xid=%#x failed=%d rc=%d",
+                    cptr->xid, failed, rc);
         if (failed) {
             cptr->c.stat_result(rc, 0, cptr->data);
         } else {
@@ -2402,8 +2420,8 @@ static void deserialize_response(int typ
         }
         break;
     case COMPLETION_STRINGLIST:
-        LOG_DEBUG(("Calling COMPLETION_STRINGLIST for xid=%#x failed=%d rc=%d",
-                    cptr->xid, failed, rc));
+        LOG_DEBUG(LOGCALLBACK(zh), "Calling COMPLETION_STRINGLIST for xid=%#x failed=%d rc=%d",
+                    cptr->xid, failed, rc);
         if (failed) {
             cptr->c.strings_result(rc, 0, cptr->data);
         } else {
@@ -2414,8 +2432,8 @@ static void deserialize_response(int typ
         }
         break;
     case COMPLETION_STRINGLIST_STAT:
-        LOG_DEBUG(("Calling COMPLETION_STRINGLIST_STAT for xid=%#x failed=%d rc=%d",
-                    cptr->xid, failed, rc));
+        LOG_DEBUG(LOGCALLBACK(zh), "Calling COMPLETION_STRINGLIST_STAT for xid=%#x failed=%d rc=%d",
+                    cptr->xid, failed, rc);
         if (failed) {
             cptr->c.strings_stat_result(rc, 0, 0, cptr->data);
         } else {
@@ -2426,8 +2444,8 @@ static void deserialize_response(int typ
         }
         break;
     case COMPLETION_STRING:
-        LOG_DEBUG(("Calling COMPLETION_STRING for xid=%#x failed=%d, rc=%d",
-                    cptr->xid, failed, rc));
+        LOG_DEBUG(LOGCALLBACK(zh), "Calling COMPLETION_STRING for xid=%#x failed=%d, rc=%d",
+                    cptr->xid, failed, rc);
         if (failed) {
             cptr->c.string_result(rc, 0, cptr->data);
         } else {
@@ -2438,8 +2456,8 @@ static void deserialize_response(int typ
         }
         break;
     case COMPLETION_STRING_STAT:
-        LOG_DEBUG(("Calling COMPLETION_STRING_STAT for xid=%#x failed=%d, rc=%d",
-                    cptr->xid, failed, rc));
+        LOG_DEBUG(LOGCALLBACK(zh), "Calling COMPLETION_STRING_STAT for xid=%#x failed=%d, rc=%d",
+                    cptr->xid, failed, rc);
         if (failed) {
             cptr->c.string_stat_result(rc, 0, 0, cptr->data);
         } else {
@@ -2450,8 +2468,8 @@ static void deserialize_response(int typ
         }
         break;
     case COMPLETION_ACLLIST:
-        LOG_DEBUG(("Calling COMPLETION_ACLLIST for xid=%#x failed=%d rc=%d",
-                    cptr->xid, failed, rc));
+        LOG_DEBUG(LOGCALLBACK(zh), "Calling COMPLETION_ACLLIST for xid=%#x failed=%d rc=%d",
+                    cptr->xid, failed, rc);
         if (failed) {
             cptr->c.acl_result(rc, 0, 0, cptr->data);
         } else {
@@ -2462,8 +2480,8 @@ static void deserialize_response(int typ
         }
         break;
     case COMPLETION_VOID:
-        LOG_DEBUG(("Calling COMPLETION_VOID for xid=%#x failed=%d rc=%d",
-                    cptr->xid, failed, rc));
+        LOG_DEBUG(LOGCALLBACK(zh), "Calling COMPLETION_VOID for xid=%#x failed=%d rc=%d",
+                    cptr->xid, failed, rc);
         if (xid == PING_XID) {
             // We want to skip the ping
         } else {
@@ -2472,14 +2490,14 @@ static void deserialize_response(int typ
         }
         break;
     case COMPLETION_MULTI:
-        LOG_DEBUG(("Calling COMPLETION_MULTI for xid=%#x failed=%d rc=%d",
-                    cptr->xid, failed, rc));
-        rc = deserialize_multi(xid, cptr, ia);
+        LOG_DEBUG(LOGCALLBACK(zh), "Calling COMPLETION_MULTI for xid=%#x failed=%d rc=%d",
+                    cptr->xid, failed, rc);
+        rc = deserialize_multi(zh, xid, cptr, ia);
         assert(cptr->c.void_result);
         cptr->c.void_result(rc, cptr->data);
         break;
     default:
-        LOG_DEBUG(("Unsupported completion type=%d", cptr->c.type));
+        LOG_DEBUG(LOGCALLBACK(zh), "Unsupported completion type=%d", cptr->c.type);
     }
 }
 
@@ -2503,13 +2521,13 @@ void process_completions(zhandle_t *zh)
             type = evt.type;
             state = evt.state;
             /* This is a notification so there aren't any pending requests */
-            LOG_DEBUG(("Calling a watcher for node [%s], type = %d event=%s",
+            LOG_DEBUG(LOGCALLBACK(zh), "Calling a watcher for node [%s], type = %d event=%s",
                        (evt.path==NULL?"NULL":evt.path), cptr->c.type,
-                       watcherEvent2String(type)));
+                       watcherEvent2String(type));
             deliverWatchers(zh,type,state,evt.path, &cptr->c.watcher_result);
             deallocate_WatcherEvent(&evt);
         } else {
-            deserialize_response(cptr->c.type, hdr.xid, hdr.err != 0, hdr.err, cptr, ia);
+            deserialize_response(zh, cptr->c.type, hdr.xid, hdr.err != 0, hdr.err, cptr, ia);
         }
         destroy_completion_entry(cptr);
         close_buffer_iarchive(&ia);
@@ -2552,7 +2570,7 @@ static void checkResponseLatency(zhandle
     gettimeofday(&now,0);
     delay=calculate_interval(&zh->socket_readable, &now);
     if(delay>20)
-        LOG_DEBUG(("The following server response has spent at least %dms sitting in the client socket recv buffer",delay));
+        LOG_DEBUG(LOGCALLBACK(zh), "The following server response has spent at least %dms sitting in the client socket recv buffer",delay);
 
     zh->socket_readable.tv_sec=zh->socket_readable.tv_usec=0;
 }
@@ -2591,13 +2609,13 @@ int zookeeper_process(zhandle_t *zh, int
             char *path = NULL;
             completion_list_t *c = NULL;
 
-            LOG_DEBUG(("Processing WATCHER_EVENT"));
+            LOG_DEBUG(LOGCALLBACK(zh), "Processing WATCHER_EVENT");
 
             deserialize_WatcherEvent(ia, "event", &evt);
             type = evt.type;
             path = evt.path;
             /* We are doing a notification, so there is no pending request */
-            c = create_completion_entry(WATCHER_EVENT_XID,-1,0,0,0,0);
+            c = create_completion_entry(zh, WATCHER_EVENT_XID,-1,0,0,0,0);
             c->buffer = bptr;
             c->c.watcher_result = collectWatchers(zh, type, path);
 
@@ -2605,10 +2623,10 @@ int zookeeper_process(zhandle_t *zh, int
             deallocate_WatcherEvent(&evt);
             queue_completion(&zh->completions_to_process, c, 0);
         } else if (hdr.xid == SET_WATCHES_XID) {
-            LOG_DEBUG(("Processing SET_WATCHES"));
+            LOG_DEBUG(LOGCALLBACK(zh), "Processing SET_WATCHES");
             free_buffer(bptr);
         } else if (hdr.xid == AUTH_XID){
-            LOG_DEBUG(("Processing AUTH_XID"));
+            LOG_DEBUG(LOGCALLBACK(zh), "Processing AUTH_XID");
 
             /* special handling for the AUTH response as it may come back
              * out-of-band */
@@ -2628,14 +2646,14 @@ int zookeeper_process(zhandle_t *zh, int
 
             /* [ZOOKEEPER-804] Don't assert if zookeeper_close has been called. */
             if (zh->close_requested == 1 && cptr == NULL) {
-                LOG_DEBUG(("Completion queue has been cleared by zookeeper_close()"));
+                LOG_DEBUG(LOGCALLBACK(zh), "Completion queue has been cleared by zookeeper_close()");
                 close_buffer_iarchive(&ia);
                 return api_epilog(zh,ZINVALIDSTATE);
             }
             assert(cptr);
             /* The requests are going to come back in order */
             if (cptr->xid != hdr.xid) {
-                LOG_DEBUG(("Processing unexpected or out-of-order response!"));
+                LOG_DEBUG(LOGCALLBACK(zh), "Processing unexpected or out-of-order response!");
 
                 // received unexpected (or out-of-order) response
                 close_buffer_iarchive(&ia);
@@ -2656,13 +2674,13 @@ int zookeeper_process(zhandle_t *zh, int
                     struct timeval now;
                     gettimeofday(&now, 0);
                     elapsed = calculate_interval(&zh->last_ping, &now);
-                    LOG_DEBUG(("Got ping response in %d ms", elapsed));
+                    LOG_DEBUG(LOGCALLBACK(zh), "Got ping response in %d ms", elapsed);
 
                     // Nothing to do with a ping response
                     free_buffer(bptr);
                     destroy_completion_entry(cptr);
                 } else {
-                    LOG_DEBUG(("Queueing asynchronous response"));
+                    LOG_DEBUG(LOGCALLBACK(zh), "Queueing asynchronous response");
 
                     cptr->buffer = bptr;
                     queue_completion(&zh->completions_to_process, cptr, 0);
@@ -2672,7 +2690,7 @@ int zookeeper_process(zhandle_t *zh, int
                         *sc = (struct sync_completion*)cptr->data;
                 sc->rc = rc;
 
-                process_sync_completion(cptr, sc, ia, zh);
+                process_sync_completion(zh, cptr, sc, ia);
 
                 notify_sync_completion(sc);
                 free_buffer(bptr);
@@ -2716,12 +2734,12 @@ static void destroy_watcher_registration
     }
 }
 
-static completion_list_t* create_completion_entry(int xid, int completion_type,
+static completion_list_t* create_completion_entry(zhandle_t *zh, int xid, int completion_type,
         const void *dc, const void *data,watcher_registration_t* wo, completion_head_t *clist)
 {
     completion_list_t *c = calloc(1,sizeof(completion_list_t));
     if (!c) {
-        LOG_ERROR(("out of memory"));
+        LOG_ERROR(LOGCALLBACK(zh), "out of memory");
         return 0;
     }
     c->c.type = completion_type;
@@ -2808,7 +2826,7 @@ static int add_completion(zhandle_t *zh,
         const void *dc, const void *data, int add_to_front,
         watcher_registration_t* wo, completion_head_t *clist)
 {
-    completion_list_t *c =create_completion_entry(xid, completion_type, dc,
+    completion_list_t *c =create_completion_entry(zh, xid, completion_type, dc,
             data, wo, clist);
     int rc = 0;
     if (!c)
@@ -2910,8 +2928,8 @@ int zookeeper_close(zhandle_t *zh)
     if(zh->state==ZOO_CONNECTED_STATE){
         struct oarchive *oa;
         struct RequestHeader h = {get_xid(), ZOO_CLOSE_OP};
-        LOG_INFO(("Closing zookeeper sessionId=%#llx to [%s]\n",
-                zh->client_id.client_id,zoo_get_current_server(zh)));
+        LOG_INFO(LOGCALLBACK(zh), "Closing zookeeper sessionId=%#llx to [%s]\n",
+                zh->client_id.client_id,zoo_get_current_server(zh));
         oa = create_buffer_oarchive();
         rc = serialize_RequestHeader(oa, "header", &h);
         rc = rc < 0 ? rc : queue_buffer_bytes(&zh->to_send, get_buffer(oa),
@@ -2927,8 +2945,8 @@ int zookeeper_close(zhandle_t *zh)
          * (but reasonable) number of milliseconds since we want the call to block*/
         rc=adaptor_send_queue(zh, 3000);
     }else{
-        LOG_INFO(("Freeing zookeeper resources for sessionId=%#llx\n",
-                zh->client_id.client_id));
+        LOG_INFO(LOGCALLBACK(zh), "Freeing zookeeper resources for sessionId=%#llx\n",
+                zh->client_id.client_id);
         rc = ZOK;
     }
 
@@ -3060,8 +3078,8 @@ int zoo_awget(zhandle_t *zh, const char 
     /* We queued the buffer, so don't free it */
     close_buffer_oarchive(&oa, 0);
 
-    LOG_DEBUG(("Sending request xid=%#x for path [%s] to %s",h.xid,path,
-            zoo_get_current_server(zh)));
+    LOG_DEBUG(LOGCALLBACK(zh), "Sending request xid=%#x for path [%s] to %s",h.xid,path,
+            zoo_get_current_server(zh));
     /* make a best (non-blocking) effort to send the requests asap */
     adaptor_send_queue(zh, 0);
     return (rc < 0)?ZMARSHALLINGERROR:ZOK;
@@ -3104,8 +3122,8 @@ int zoo_awgetconfig(zhandle_t *zh, watch
     /* We queued the buffer, so don't free it */
     close_buffer_oarchive(&oa, 0);
 
-    LOG_DEBUG(("Sending request xid=%#x for path [%s] to %s",h.xid,path,
-               zoo_get_current_server(zh)));
+    LOG_DEBUG(LOGCALLBACK(zh), "Sending request xid=%#x for path [%s] to %s",h.xid,path,
+               zoo_get_current_server(zh));
     /* make a best (non-blocking) effort to send the requests asap */
     adaptor_send_queue(zh, 0);
     return (rc < 0)?ZMARSHALLINGERROR:ZOK;
@@ -3141,7 +3159,7 @@ int zoo_areconfig(zhandle_t *zh, const c
     /* We queued the buffer, so don't free it */
     close_buffer_oarchive(&oa, 0);
 
-    LOG_DEBUG(("Sending Reconfig request xid=%#x to %s",h.xid, zoo_get_current_server(zh)));
+    LOG_DEBUG(LOGCALLBACK(zh), "Sending Reconfig request xid=%#x to %s",h.xid, zoo_get_current_server(zh));
     /* make a best (non-blocking) effort to send the requests asap */
     adaptor_send_queue(zh, 0);
 
@@ -3186,8 +3204,8 @@ int zoo_aset(zhandle_t *zh, const char *
     /* We queued the buffer, so don't free it */
     close_buffer_oarchive(&oa, 0);
 
-    LOG_DEBUG(("Sending request xid=%#x for path [%s] to %s",h.xid,path,
-            zoo_get_current_server(zh)));
+    LOG_DEBUG(LOGCALLBACK(zh), "Sending request xid=%#x for path [%s] to %s",h.xid,path,
+            zoo_get_current_server(zh));
     /* make a best (non-blocking) effort to send the requests asap */
     adaptor_send_queue(zh, 0);
     return (rc < 0)?ZMARSHALLINGERROR:ZOK;
@@ -3266,8 +3284,8 @@ int zoo_acreate(zhandle_t *zh, const cha
     /* We queued the buffer, so don't free it */
     close_buffer_oarchive(&oa, 0);
 
-    LOG_DEBUG(("Sending request xid=%#x for path [%s] to %s",h.xid,path,
-            zoo_get_current_server(zh)));
+    LOG_DEBUG(LOGCALLBACK(zh), "Sending request xid=%#x for path [%s] to %s",h.xid,path,
+            zoo_get_current_server(zh));
     /* make a best (non-blocking) effort to send the requests asap */
     adaptor_send_queue(zh, 0);
     return (rc < 0)?ZMARSHALLINGERROR:ZOK;
@@ -3297,8 +3315,8 @@ int zoo_acreate2(zhandle_t *zh, const ch
     /* We queued the buffer, so don't free it */
     close_buffer_oarchive(&oa, 0);
 
-    LOG_DEBUG(("Sending request xid=%#x for path [%s] to %s",h.xid,path,
-            zoo_get_current_server(zh)));
+    LOG_DEBUG(LOGCALLBACK(zh), "Sending request xid=%#x for path [%s] to %s",h.xid,path,
+            zoo_get_current_server(zh));
     /* make a best (non-blocking) effort to send the requests asap */
     adaptor_send_queue(zh, 0);
     return (rc < 0)?ZMARSHALLINGERROR:ZOK;
@@ -3337,8 +3355,8 @@ int zoo_adelete(zhandle_t *zh, const cha
     /* We queued the buffer, so don't free it */
     close_buffer_oarchive(&oa, 0);
 
-    LOG_DEBUG(("Sending request xid=%#x for path [%s] to %s",h.xid,path,
-            zoo_get_current_server(zh)));
+    LOG_DEBUG(LOGCALLBACK(zh), "Sending request xid=%#x for path [%s] to %s",h.xid,path,
+            zoo_get_current_server(zh));
     /* make a best (non-blocking) effort to send the requests asap */
     adaptor_send_queue(zh, 0);
     return (rc < 0)?ZMARSHALLINGERROR:ZOK;
@@ -3376,8 +3394,8 @@ int zoo_awexists(zhandle_t *zh, const ch
     /* We queued the buffer, so don't free it */
     close_buffer_oarchive(&oa, 0);
 
-    LOG_DEBUG(("Sending request xid=%#x for path [%s] to %s",h.xid,path,
-            zoo_get_current_server(zh)));
+    LOG_DEBUG(LOGCALLBACK(zh), "Sending request xid=%#x for path [%s] to %s",h.xid,path,
+            zoo_get_current_server(zh));
     /* make a best (non-blocking) effort to send the requests asap */
     adaptor_send_queue(zh, 0);
     return (rc < 0)?ZMARSHALLINGERROR:ZOK;
@@ -3409,8 +3427,8 @@ static int zoo_awget_children_(zhandle_t
     /* We queued the buffer, so don't free it */
     close_buffer_oarchive(&oa, 0);
 
-    LOG_DEBUG(("Sending request xid=%#x for path [%s] to %s",h.xid,path,
-            zoo_get_current_server(zh)));
+    LOG_DEBUG(LOGCALLBACK(zh), "Sending request xid=%#x for path [%s] to %s",h.xid,path,
+            zoo_get_current_server(zh));
     /* make a best (non-blocking) effort to send the requests asap */
     adaptor_send_queue(zh, 0);
     return (rc < 0)?ZMARSHALLINGERROR:ZOK;
@@ -3457,8 +3475,8 @@ static int zoo_awget_children2_(zhandle_
     /* We queued the buffer, so don't free it */
     close_buffer_oarchive(&oa, 0);
 
-    LOG_DEBUG(("Sending request xid=%#x for path [%s] to %s",h.xid,path,
-            zoo_get_current_server(zh)));
+    LOG_DEBUG(LOGCALLBACK(zh), "Sending request xid=%#x for path [%s] to %s",h.xid,path,
+            zoo_get_current_server(zh));
     /* make a best (non-blocking) effort to send the requests asap */
     adaptor_send_queue(zh, 0);
     return (rc < 0)?ZMARSHALLINGERROR:ZOK;
@@ -3500,8 +3518,8 @@ int zoo_async(zhandle_t *zh, const char 
     /* We queued the buffer, so don't free it */
     close_buffer_oarchive(&oa, 0);
 
-    LOG_DEBUG(("Sending request xid=%#x for path [%s] to %s",h.xid,path,
-            zoo_get_current_server(zh)));
+    LOG_DEBUG(LOGCALLBACK(zh), "Sending request xid=%#x for path [%s] to %s",h.xid,path,
+            zoo_get_current_server(zh));
     /* make a best (non-blocking) effort to send the requests asap */
     adaptor_send_queue(zh, 0);
     return (rc < 0)?ZMARSHALLINGERROR:ZOK;
@@ -3530,8 +3548,8 @@ int zoo_aget_acl(zhandle_t *zh, const ch
     /* We queued the buffer, so don't free it */
     close_buffer_oarchive(&oa, 0);
 
-    LOG_DEBUG(("Sending request xid=%#x for path [%s] to %s",h.xid,path,
-            zoo_get_current_server(zh)));
+    LOG_DEBUG(LOGCALLBACK(zh), "Sending request xid=%#x for path [%s] to %s",h.xid,path,
+            zoo_get_current_server(zh));
     /* make a best (non-blocking) effort to send the requests asap */
     adaptor_send_queue(zh, 0);
     return (rc < 0)?ZMARSHALLINGERROR:ZOK;
@@ -3561,8 +3579,8 @@ int zoo_aset_acl(zhandle_t *zh, const ch
     /* We queued the buffer, so don't free it */
     close_buffer_oarchive(&oa, 0);
 
-    LOG_DEBUG(("Sending request xid=%#x for path [%s] to %s",h.xid,path,
-            zoo_get_current_server(zh)));
+    LOG_DEBUG(LOGCALLBACK(zh), "Sending request xid=%#x for path [%s] to %s",h.xid,path,
+            zoo_get_current_server(zh));
     /* make a best (non-blocking) effort to send the requests asap */
     adaptor_send_queue(zh, 0);
     return (rc < 0)?ZMARSHALLINGERROR:ZOK;
@@ -3655,7 +3673,7 @@ int zoo_amulti(zhandle_t *zh, int count,
                 result->valuelen = op->create_op.buflen;
 
                 enter_critical(zh);
-                entry = create_completion_entry(h.xid, COMPLETION_STRING, op_result_string_completion, result, 0, 0);
+                entry = create_completion_entry(zh, h.xid, COMPLETION_STRING, op_result_string_completion, result, 0, 0);
                 leave_critical(zh);
                 free_duplicate_path(req.path, op->create_op.path);
                 break;
@@ -3667,7 +3685,7 @@ int zoo_amulti(zhandle_t *zh, int count,
                 rc = rc < 0 ? rc : serialize_DeleteRequest(oa, "req", &req);
 
                 enter_critical(zh);
-                entry = create_completion_entry(h.xid, COMPLETION_VOID, op_result_void_completion, result, 0, 0);
+                entry = create_completion_entry(zh, h.xid, COMPLETION_VOID, op_result_void_completion, result, 0, 0);
                 leave_critical(zh);
                 free_duplicate_path(req.path, op->delete_op.path);
                 break;
@@ -3682,7 +3700,7 @@ int zoo_amulti(zhandle_t *zh, int count,
                 result->stat = op->set_op.stat;
 
                 enter_critical(zh);
-                entry = create_completion_entry(h.xid, COMPLETION_STAT, op_result_stat_completion, result, 0, 0);
+                entry = create_completion_entry(zh, h.xid, COMPLETION_STAT, op_result_stat_completion, result, 0, 0);
                 leave_critical(zh);
                 free_duplicate_path(req.path, op->set_op.path);
                 break;
@@ -3695,14 +3713,14 @@ int zoo_amulti(zhandle_t *zh, int count,
                 rc = rc < 0 ? rc : serialize_CheckVersionRequest(oa, "req", &req);
 
                 enter_critical(zh);
-                entry = create_completion_entry(h.xid, COMPLETION_VOID, op_result_void_completion, result, 0, 0);
+                entry = create_completion_entry(zh, h.xid, COMPLETION_VOID, op_result_void_completion, result, 0, 0);
                 leave_critical(zh);
                 free_duplicate_path(req.path, op->check_op.path);
                 break;
             }
 
             default:
-                LOG_ERROR(("Unimplemented sub-op type=%d in multi-op", op->type));
+                LOG_ERROR(LOGCALLBACK(zh), "Unimplemented sub-op type=%d in multi-op", op->type);
                 return ZUNIMPLEMENTED;
         }
 
@@ -3721,8 +3739,8 @@ int zoo_amulti(zhandle_t *zh, int count,
     /* We queued the buffer, so don't free it */
     close_buffer_oarchive(&oa, 0);
 
-    LOG_DEBUG(("Sending multi request xid=%#x with %d subrequests to %s",
-            h.xid, index, zoo_get_current_server(zh)));
+    LOG_DEBUG(LOGCALLBACK(zh), "Sending multi request xid=%#x with %d subrequests to %s",
+            h.xid, index, zoo_get_current_server(zh));
     /* make a best (non-blocking) effort to send the requests asap */
     adaptor_send_queue(zh, 0);
 
@@ -4013,6 +4031,26 @@ static const char* format_endpoint_info(
     return buf;
 }
 
+log_callback_fn zoo_get_log_callback(const zhandle_t* zh)
+{
+    // Verify we have a valid handle
+    if (zh == NULL) {
+        return NULL;
+    }
+
+    return zh->log_callback;
+}
+
+void zoo_set_log_callback(zhandle_t *zh, log_callback_fn callback)
+{
+    // Verify we have a valid handle
+    if (zh == NULL) {
+        return;
+    }
+
+    zh->log_callback = callback;
+}
+
 void zoo_deterministic_conn_order(int yesOrNo)
 {
     disable_conn_permute=yesOrNo;

Modified: zookeeper/trunk/src/c/tests/PthreadMocks.h
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/c/tests/PthreadMocks.h?rev=1484357&r1=1484356&r2=1484357&view=diff
==============================================================================
--- zookeeper/trunk/src/c/tests/PthreadMocks.h (original)
+++ zookeeper/trunk/src/c/tests/PthreadMocks.h Sun May 19 22:08:08 2013
@@ -381,18 +381,18 @@ public:
         int ret=LIBC_SYMBOLS.pthread_create(t,a,threadFuncWrapper,
                 new ThreadContext(f,d));
         if(verbose)
-            TEST_TRACE(("thread created %p",*t));
+            TEST_TRACE("thread created %p",*t);
         return ret;
     }
     virtual int pthread_join(pthread_t t, void ** r){
-        if(verbose) TEST_TRACE(("thread joined %p",t));
+        if(verbose) TEST_TRACE("thread joined %p",t);
         int ret=LIBC_SYMBOLS.pthread_join(t,r);
         if(ret==0)
             markDestroyed(t);
         return ret;
     }
     virtual int pthread_detach(pthread_t t){
-        if(verbose) TEST_TRACE(("thread detached %p",t));
+        if(verbose) TEST_TRACE("thread detached %p",t);
         int ret=LIBC_SYMBOLS.pthread_detach(t);
         if(ret==0)
             markDestroyed(t);

Modified: zookeeper/trunk/src/c/tests/TestClient.cc
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/c/tests/TestClient.cc?rev=1484357&r1=1484356&r2=1484357&view=diff
==============================================================================
--- zookeeper/trunk/src/c/tests/TestClient.cc (original)
+++ zookeeper/trunk/src/c/tests/TestClient.cc Sun May 19 22:08:08 2013
@@ -46,6 +46,13 @@ struct buff_struct_2 {
     char *buffer;
 };
 
+// For testing LogMessage Callback functionality
+list<string> logMessages;
+void logMessageHandler(const char* message) {
+    cout << "Log Message Received: [" << message << "]" << endl;
+    logMessages.push_back(message);
+}
+
 static int Stat_eq(struct Stat* a, struct Stat* b)
 {
     if (a->czxid != b->czxid) return 0;
@@ -172,6 +179,7 @@ public:
         }
         return connected;
     }
+
     bool waitForDisconnected(zhandle_t *zh) {
         time_t expires = time(0) + 15;
         while(connected && time(0) < expires) {
@@ -179,11 +187,15 @@ public:
         }
         return !connected;
     }
+
 } watchctx_t;
 
 class Zookeeper_simpleSystem : public CPPUNIT_NS::TestFixture
 {
     CPPUNIT_TEST_SUITE(Zookeeper_simpleSystem);
+    CPPUNIT_TEST(testLogCallbackSet);
+    CPPUNIT_TEST(testLogCallbackInit);
+    CPPUNIT_TEST(testLogCallbackClear);
     CPPUNIT_TEST(testAsyncWatcherAutoReset);
     CPPUNIT_TEST(testDeserializeString);
 #ifdef THREADED
@@ -231,6 +243,13 @@ class Zookeeper_simpleSystem : public CP
         return createClient(hostPorts, ctx);
     }
 
+    zhandle_t *createClient(watchctx_t *ctx, log_callback_fn logCallback) {
+        zhandle_t *zk = zookeeper_init2(hostPorts, watcher, 10000, 0, ctx, 0, logCallback);
+        ctx->zh = zk;
+        sleep(1);
+        return zk;
+    }
+
     zhandle_t *createClient(const char *hp, watchctx_t *ctx) {
         zhandle_t *zk = zookeeper_init(hp, watcher, 10000, 0, ctx, 0);
         ctx->zh = zk;
@@ -265,7 +284,6 @@ public:
         zoo_set_log_stream(logfile);
     }
 
-
     void startServer() {
         char cmd[1024];
         sprintf(cmd, "%s start %s", ZKSERVER_CMD, getHostPorts());
@@ -907,6 +925,69 @@ public:
         CPPUNIT_ASSERT_EQUAL(string(path), string(path_buffer));
     }
 
+    // Test creating normal handle via zookeeper_init then explicitly setting callback
+    void testLogCallbackSet()
+    {
+        watchctx_t ctx;
+        CPPUNIT_ASSERT(logMessages.empty());
+        zhandle_t *zk = createClient(&ctx);
+
+        zoo_set_log_callback(zk, &logMessageHandler);
+        CPPUNIT_ASSERT_EQUAL(zoo_get_log_callback(zk), &logMessageHandler);
+
+        // Log 10 messages and ensure all go to callback
+        int expected = 10;
+        for (int i = 0; i < expected; i++)
+        {
+            LOG_INFO(LOGCALLBACK(zk), "%s #%d", __FUNCTION__, i);
+        }
+        CPPUNIT_ASSERT(expected == logMessages.size());
+    }
+
+    // Test creating handle via zookeeper_init2 to ensure all connection messages go to callback
+    void testLogCallbackInit()
+    {
+        logMessages.clear();
+        watchctx_t ctx;
+        zhandle_t *zk = createClient(&ctx, &logMessageHandler);
+        CPPUNIT_ASSERT_EQUAL(zoo_get_log_callback(zk), &logMessageHandler);
+
+        // All the connection messages should have gone to the callback -- don't
+        // want this to be a maintenance issue so we're not asserting exact count
+        int numBefore = logMessages.size();
+        CPPUNIT_ASSERT(numBefore != 0);
+
+        // Log 10 messages and ensure all go to callback
+        int expected = 10;
+        for (int i = 0; i < expected; i++)
+        {
+            LOG_INFO(LOGCALLBACK(zk), "%s #%d", __FUNCTION__, i);
+        }
+        CPPUNIT_ASSERT(logMessages.size() == numBefore + expected);
+    }
+
+    // Test clearing log callback -- logging should resume going to logstream
+    void testLogCallbackClear()
+    {
+        logMessages.clear();
+        watchctx_t ctx;
+        zhandle_t *zk = createClient(&ctx, &logMessageHandler);
+        CPPUNIT_ASSERT_EQUAL(zoo_get_log_callback(zk), &logMessageHandler);
+
+        // All the connection messages should have gone to the callback -- again, we don't
+        // want this to be a maintenance issue so we're not asserting exact count
+        int numBefore = logMessages.size();
+        CPPUNIT_ASSERT(numBefore > 0);
+
+        // Clear log_callback
+        zoo_set_log_callback(zk, NULL);
+
+        // Future log messages should go to logstream not callback
+        LOG_INFO(LOGCALLBACK(zk), __FUNCTION__);
+        int numAfter = logMessages.size();
+        CPPUNIT_ASSERT_EQUAL(numBefore, numAfter);
+    }
+
     void testAsyncWatcherAutoReset()
     {
         watchctx_t ctx;

Modified: zookeeper/trunk/src/c/tests/TestOperations.cc
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/c/tests/TestOperations.cc?rev=1484357&r1=1484356&r2=1484357&view=diff
==============================================================================
--- zookeeper/trunk/src/c/tests/TestOperations.cc (original)
+++ zookeeper/trunk/src/c/tests/TestOperations.cc Sun May 19 22:08:08 2013
@@ -490,7 +490,7 @@ public: 
                     break;
                 }
             }
-            //TEST_TRACE(("Finished %d iterations",i));
+            //TEST_TRACE("Finished %d iterations",i);
         }
         virtual void validate(const char* file, int line) const{
             CPPUNIT_ASSERT_EQUAL_MESSAGE_LOC("ZOK != rc",(int)ZOK,rc_,file,line);
@@ -525,7 +525,7 @@ public: 
         zookeeper_close(lzh); 
   
         for(int counter=0; counter<200; counter++){
-            TEST_TRACE(("Loop count %d",counter));
+            TEST_TRACE("Loop count %d",counter);
             
             CloseFinally guard(&zh);
 
@@ -539,7 +539,7 @@ public: 
             jmgr.startJobsImmediately();
             jmgr.wait();
             VALIDATE_JOBS(jmgr);
-            TEST_TRACE(("run %d finished",counter));
+            TEST_TRACE("run %d finished",counter);
         }
 
     }
@@ -564,7 +564,7 @@ public: 
     void testOperationsAndDisconnectConcurrently1()
     {
         for(int counter=0; counter<50; counter++){
-            //TEST_TRACE(("Loop count %d",counter));
+            //TEST_TRACE("Loop count %d",counter);
             // frozen time -- no timeouts and no pings
             Mock_gettimeofday timeMock;
             

Modified: zookeeper/trunk/src/c/tests/Util.h
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/c/tests/Util.h?rev=1484357&r1=1484356&r2=1484357&view=diff
==============================================================================
--- zookeeper/trunk/src/c/tests/Util.h (original)
+++ zookeeper/trunk/src/c/tests/Util.h Sun May 19 22:08:08 2013
@@ -36,8 +36,8 @@
     __real_##sym params
 
 // must include "src/zookeeper_log.h" to be able to use this macro
-#define TEST_TRACE(x) \
-    log_message(ZOO_LOG_LEVEL_DEBUG,__LINE__,__func__,format_log_message x)
+#define TEST_TRACE(x...) \
+    log_message(LOGSTREAM, ZOO_LOG_LEVEL_DEBUG,__LINE__,__func__,x)
 
 extern const std::string EMPTY_STRING;
 



Mime
View raw message