hadoop-zookeeper-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From maha...@apache.org
Subject svn commit: r685624 [2/3] - in /hadoop/zookeeper/trunk/src/contrib: ./ zkfuse/ zkfuse/src/
Date Wed, 13 Aug 2008 17:59:00 GMT
Added: hadoop/zookeeper/trunk/src/contrib/zkfuse/src/log4cxx.properties
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/zkfuse/src/log4cxx.properties?rev=685624&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/zkfuse/src/log4cxx.properties (added)
+++ hadoop/zookeeper/trunk/src/contrib/zkfuse/src/log4cxx.properties Wed Aug 13 10:58:59 2008
@@ -0,0 +1,12 @@
+# Set root logger level to DEBUG and its only appender to A1.
+log4j.rootLogger=TRACE, A1
+
+# A1 is set to be a ConsoleAppender.
+log4j.appender.A1=org.apache.log4cxx.ConsoleAppender
+
+# A1 uses PatternLayout.
+log4j.appender.A1.layout=org.apache.log4cxx.PatternLayout
+log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
+
+log4j.category.zkfuse=TRACE
+

Added: hadoop/zookeeper/trunk/src/contrib/zkfuse/src/mutex.h
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/zkfuse/src/mutex.h?rev=685624&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/zkfuse/src/mutex.h (added)
+++ hadoop/zookeeper/trunk/src/contrib/zkfuse/src/mutex.h Wed Aug 13 10:58:59 2008
@@ -0,0 +1,169 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __MUTEX_H__
+#define __MUTEX_H__
+
+#include <pthread.h>
+#include <errno.h>
+#include <sys/time.h>
+
+#include "log.h"
+
+START_ZKFUSE_NAMESPACE
+
+class Cond;
+
+class Mutex {
+    friend class Cond;
+  public:
+    Mutex() {
+        pthread_mutexattr_init( &m_mutexAttr );
+        pthread_mutexattr_settype( &m_mutexAttr, PTHREAD_MUTEX_RECURSIVE_NP );
+        pthread_mutex_init( &mutex, &m_mutexAttr );
+    }
+    ~Mutex() {
+        pthread_mutex_destroy(&mutex);
+        pthread_mutexattr_destroy( &m_mutexAttr );
+    }
+    void Acquire() { Lock(); }
+    void Release() { Unlock(); }
+    void Lock() {
+        pthread_mutex_lock(&mutex);
+    }
+    int  TryLock() {
+        return pthread_mutex_trylock(&mutex);
+    }
+    void Unlock() {
+        pthread_mutex_unlock(&mutex);
+    }
+  private:
+    pthread_mutex_t mutex;
+    pthread_mutexattr_t m_mutexAttr;
+};
+
+class AutoLock {
+  public:
+    AutoLock(Mutex& mutex) : _mutex(mutex) {
+        mutex.Lock();
+    }
+    ~AutoLock() {
+        _mutex.Unlock();
+    }
+  private:
+    friend class AutoUnlockTemp;
+    Mutex& _mutex;
+};
+
+class AutoUnlockTemp {
+  public:
+    AutoUnlockTemp(AutoLock & autoLock) : _autoLock(autoLock) {
+        _autoLock._mutex.Unlock();
+    }
+    ~AutoUnlockTemp() {
+        _autoLock._mutex.Lock();
+    }
+  private:
+    AutoLock & _autoLock;
+};
+
+class Cond {
+  public:
+    Cond() {
+        static pthread_condattr_t attr;
+        static bool inited = false;
+        if(!inited) {
+            inited = true;
+            pthread_condattr_init(&attr);
+        }
+        pthread_cond_init(&_cond, &attr);
+    }
+    ~Cond() {
+        pthread_cond_destroy(&_cond);
+    }
+
+    void Wait(Mutex& mutex) {
+        pthread_cond_wait(&_cond, &mutex.mutex);
+    }
+
+    bool Wait(Mutex& mutex, long long int timeout) {
+        struct timeval now;
+        gettimeofday( &now, NULL );
+        struct timespec abstime;
+        int64_t microSecs = now.tv_sec * 1000000LL + now.tv_usec;
+        microSecs += timeout * 1000;
+        abstime.tv_sec = microSecs / 1000000LL;
+        abstime.tv_nsec = (microSecs % 1000000LL) * 1000;
+        if (pthread_cond_timedwait(&_cond, &mutex.mutex, &abstime) == ETIMEDOUT) {
+            return false;
+        } else {
+            return true;
+        }
+    }
+    
+    void Signal() {
+        pthread_cond_signal(&_cond);
+    }
+
+  private:
+    pthread_cond_t            _cond;
+};
+
+/**
+ * A wrapper class for {@link Mutex} and {@link Cond}.
+ */
+class Lock {
+    public:
+        
+        void lock() {
+            m_mutex.Lock();
+        }
+        
+        void unlock() {
+            m_mutex.Unlock();
+        }
+        
+        void wait() {
+            m_cond.Wait( m_mutex );
+        }
+
+        bool wait(long long int timeout) {
+            return m_cond.Wait( m_mutex, timeout );
+        }
+        
+        void notify() {
+            m_cond.Signal();
+        }
+
+    private:
+        
+        /**
+         * The mutex.
+         */
+        Mutex m_mutex;
+        
+        /**
+         * The condition associated with this lock's mutex.
+         */
+        Cond m_cond;         
+};
+
+END_ZKFUSE_NAMESPACE
+        
+#endif /* __MUTEX_H__ */
+

Added: hadoop/zookeeper/trunk/src/contrib/zkfuse/src/thread.cc
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/zkfuse/src/thread.cc?rev=685624&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/zkfuse/src/thread.cc (added)
+++ hadoop/zookeeper/trunk/src/contrib/zkfuse/src/thread.cc Wed Aug 13 10:58:59 2008
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <log.h>
+
+#include "thread.h"
+
+DEFINE_LOGGER( LOG, "Thread" )
+
+START_ZKFUSE_NAMESPACE
+
+void Thread::Create(void* ctx, ThreadFunc func)
+{
+    pthread_attr_t attr;
+    pthread_attr_init(&attr);
+    pthread_attr_setstacksize(&attr, _stackSize);
+    int ret = pthread_create(&mThread, &attr, func, ctx);
+    if(ret != 0) {
+        LOG_FATAL( LOG, "pthread_create failed: %s", strerror(errno) );
+    }
+    // pthread_attr_destroy(&attr); 
+    _ctx = ctx;
+    _func = func;
+}
+
+END_ZKFUSE_NAMESPACE

Added: hadoop/zookeeper/trunk/src/contrib/zkfuse/src/thread.h
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/zkfuse/src/thread.h?rev=685624&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/zkfuse/src/thread.h (added)
+++ hadoop/zookeeper/trunk/src/contrib/zkfuse/src/thread.h Wed Aug 13 10:58:59 2008
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __THREAD_H__
+#define __THREAD_H__
+
+#include <errno.h>
+#include <string.h>
+#include <assert.h>
+#include <pthread.h>
+
+#include "log.h"
+
+START_ZKFUSE_NAMESPACE
+
+class Thread {
+  public:
+    static const size_t defaultStackSize = 1024 * 1024;
+    typedef void* (*ThreadFunc) (void*);
+    Thread(size_t stackSize = defaultStackSize) 
+      : _stackSize(stackSize), _ctx(NULL), _func(NULL) 
+    {
+        memset( &mThread, 0, sizeof(mThread) );
+    }
+    ~Thread() { }
+
+    void Create(void* ctx, ThreadFunc func);
+    void Join() {
+        //avoid SEGFAULT because of unitialized mThread
+        //in case Create(...) was never called
+        if (_func != NULL) {
+            pthread_join(mThread, 0);
+        }
+    }
+  private:
+    pthread_t mThread;  
+    void *_ctx;
+    ThreadFunc _func;
+    size_t _stackSize;
+};
+
+
+template<typename T>
+struct ThreadContext {
+    typedef void (T::*FuncPtr) (void);
+    ThreadContext(T& ctx, FuncPtr func) : _ctx(ctx), _func(func) {}
+    void run(void) {
+        (_ctx.*_func)();
+    }
+    T& _ctx;
+    FuncPtr   _func;
+};
+
+template<typename T>
+void* ThreadExec(void *obj) {
+    ThreadContext<T>* tc = (ThreadContext<T>*)(obj);
+    assert(tc != 0);
+    tc->run();
+    return 0;
+}
+
+template <typename T>
+class CXXThread : public Thread {
+  public:
+    typedef void (T::*FuncPtr) (void);
+    CXXThread(size_t stackSize = Thread::defaultStackSize) 
+      : Thread(stackSize), ctx(0) {}
+    ~CXXThread() { if (ctx) delete ctx; }
+
+    void Create(T& obj, FuncPtr func) {
+        assert(ctx == 0);
+        ctx = new ThreadContext<T>(obj, func);
+        Thread::Create(ctx, ThreadExec<T>);
+    }
+
+  private:
+    ThreadContext<T>* ctx;
+};
+
+    
+END_ZKFUSE_NAMESPACE
+
+#endif /* __THREAD_H__ */
+

Added: hadoop/zookeeper/trunk/src/contrib/zkfuse/src/zkadapter.cc
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/zkfuse/src/zkadapter.cc?rev=685624&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/zkfuse/src/zkadapter.cc (added)
+++ hadoop/zookeeper/trunk/src/contrib/zkfuse/src/zkadapter.cc Wed Aug 13 10:58:59 2008
@@ -0,0 +1,879 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <algorithm>
+#include <iostream>
+
+#include "blockingqueue.h"
+#include "thread.h"
+#include "zkadapter.h"
+
+using namespace std;
+using namespace zk;
+
+DEFINE_LOGGER( LOG, "zookeeper.adapter" )
+DEFINE_LOGGER( ZK_LOG, "zookeeper.core" )
+
+/**
+ * \brief A helper class to initialize ZK logging.
+ */
+class InitZooKeeperLogging
+{
+  public:
+    InitZooKeeperLogging() {
+        if (ZK_LOG->isDebugEnabled()
+#ifdef LOG4CXX_TRACE
+            || ZK_LOG->isTraceEnabled()
+#endif
+            ) 
+        {
+            zoo_set_debug_level( LOG_LEVEL_DEBUG );
+        } else if (ZK_LOG->isInfoEnabled()) {
+            zoo_set_debug_level( LOG_LEVEL_INFO );
+        } else if (ZK_LOG->isWarnEnabled()) {
+            zoo_set_debug_level( LOG_LEVEL_WARN );
+        } else {
+            zoo_set_debug_level( LOG_LEVEL_ERROR );
+        }
+    }
+};
+
+using namespace std;
+
+namespace zk
+{
+
+/**
+ * \brief This class provides logic for checking if a request can be retried.
+ */
+class RetryHandler
+{
+  public:
+    RetryHandler(const ZooKeeperConfig &zkConfig)
+        : m_zkConfig(zkConfig)
+    {
+        if (zkConfig.getAutoReconnect()) {
+            retries = 2;
+        } else {
+            retries = 0;
+        }
+    }
+        
+    /**
+     * \brief Attempts to fix a side effect of the given RC.
+     * 
+     * @param rc the ZK error code
+     * @return whether the error code has been handled and the caller should 
+     *         retry an operation the caused this error
+     */
+    bool handleRC(int rc)
+    {
+        TRACE( LOG, "handleRC" );
+
+        //check if the given error code is recoverable
+        if (!retryOnError(rc)) {
+            return false;
+        }
+        LOG_TRACE( LOG, "RC: %d, retries left: %d", rc, retries );
+        if (retries-- > 0) {
+            return true;
+        } else {
+            return false;
+        }
+    }
+        
+  private:
+    /**
+     * The ZK config.
+     */
+    const ZooKeeperConfig &m_zkConfig;
+        
+    /**
+     * The number of outstanding retries.
+     */
+    int retries;    
+        
+    /**
+     * Checks whether the given error entitles this adapter
+     * to retry the previous operation.
+     * 
+     * @param zkErrorCode one of the ZK error code
+     */
+    static bool retryOnError(int zkErrorCode)
+    {
+        return (zkErrorCode == ZCONNECTIONLOSS ||
+                zkErrorCode == ZOPERATIONTIMEOUT);
+    }
+};
+    
+    
+//the implementation of the global ZK event watcher
+void zkWatcher(zhandle_t *zh, int type, int state, const char *path)
+{
+    TRACE( LOG, "zkWatcher" );
+
+    //a workaround for buggy ZK API
+    string sPath = 
+        (path == NULL || 
+         state == SESSION_EVENT || 
+         state == NOTWATCHING_EVENT)
+        ? "" 
+        : string(path);
+    LOG_INFO( LOG,
+              "Received a ZK event - type: %d, state: %d, path: '%s'",
+              type, state, sPath.c_str() );
+    ZooKeeperAdapter *zka = (ZooKeeperAdapter *)zoo_get_context(zh);
+    if (zka != NULL) {
+        zka->enqueueEvent( type, state, sPath );
+    } else {
+        LOG_ERROR( LOG,
+                   "Skipping ZK event (type: %d, state: %d, path: '%s'), "
+                   "because ZK passed no context",
+                   type, state, sPath.c_str() );
+    }
+}
+
+
+
+// =======================================================================
+
+ZooKeeperAdapter::ZooKeeperAdapter(ZooKeeperConfig config, 
+                                   ZKEventListener *listener,
+                                   bool establishConnection) 
+    throw(ZooKeeperException)
+    : m_zkConfig(config),
+      mp_zkHandle(NULL), 
+      m_terminating(false),
+      m_connected(false),
+      m_state(AS_DISCONNECTED) 
+{
+    TRACE( LOG, "ZooKeeperAdapter" );
+
+    resetRemainingConnectTimeout();
+    
+    //enforce setting up appropriate ZK log level
+    static InitZooKeeperLogging INIT_ZK_LOGGING;
+    
+    if (listener != NULL) {
+        addListener(listener);
+    }
+
+    //start the event dispatcher thread
+    m_eventDispatcher.Create( *this, &ZooKeeperAdapter::processEvents );
+
+    //start the user event dispatcher thread
+    m_userEventDispatcher.Create( *this, &ZooKeeperAdapter::processUserEvents );
+    
+    //optionally establish the connection
+    if (establishConnection) {
+        reconnect();
+    }
+}
+
+ZooKeeperAdapter::~ZooKeeperAdapter()
+{
+    TRACE( LOG, "~ZooKeeperAdapter" );
+
+    try {
+        disconnect();
+    } catch (std::exception &e) {
+        LOG_ERROR( LOG, 
+                   "An exception while disconnecting from ZK: %s",
+                   e.what() );
+    }
+    m_terminating = true;
+    m_userEventDispatcher.Join();
+    m_eventDispatcher.Join();
+}
+
+void
+ZooKeeperAdapter::validatePath(const string &path) throw(ZooKeeperException)
+{
+    TRACE( LOG, "validatePath" );
+    
+    if (path.find( "/" ) != 0) {
+        throw ZooKeeperException( string("Node path must start with '/' but"
+                                         "it was '") +
+                                  path +
+                                  "'" );
+    }
+    if (path.length() > 1) {
+        if (path.rfind( "/" ) == path.length() - 1) {
+            throw ZooKeeperException( string("Node path must not end with "
+                                             "'/' but it was '") +
+                                      path +
+                                      "'" );
+        }
+        if (path.find( "//" ) != string::npos) {
+            throw ZooKeeperException( string("Node path must not contain "
+                                             "'//' but it was '") +
+                                      path +
+                                      "'" ); 
+        }
+    }
+}
+
+void
+ZooKeeperAdapter::disconnect()
+{
+    TRACE( LOG, "disconnect" );
+    LOG_TRACE( LOG, "mp_zkHandle: %p, state %d", mp_zkHandle, m_state );
+
+    m_stateLock.lock();
+    if (mp_zkHandle != NULL) {
+        zookeeper_close( mp_zkHandle );
+        mp_zkHandle = NULL;
+        setState( AS_DISCONNECTED );
+    }
+    m_stateLock.unlock();
+}
+
+void
+ZooKeeperAdapter::reconnect() throw(ZooKeeperException)
+{
+    TRACE( LOG, "reconnect" );
+    
+    m_stateLock.lock();
+    //clear the connection state
+    disconnect();
+    
+    //establish a new connection to ZooKeeper
+    mp_zkHandle = zookeeper_init( m_zkConfig.getHosts().c_str(), 
+                                  zkWatcher, 
+                                  m_zkConfig.getLeaseTimeout(),
+                                  NULL, this, 0);
+    resetRemainingConnectTimeout();
+    if (mp_zkHandle != NULL) {
+        setState( AS_CONNECTING );
+        m_stateLock.unlock();
+    } else {
+        m_stateLock.unlock();
+        throw ZooKeeperException( 
+            string("Unable to connect to ZK running at '") +
+                    m_zkConfig.getHosts() + "'" );
+    }
+    
+    LOG_DEBUG( LOG, "mp_zkHandle: %p, state %d", mp_zkHandle, m_state ); 
+}
+
+void
+ZooKeeperAdapter::handleEvent(int type, int state, const string &path)
+{
+    TRACE( LOG, "handleEvent" );
+    LOG_TRACE( LOG, 
+               "type: %d, state %d, path: %s",
+               type, state, path.c_str() );
+    Listener2Context context, context2;
+    //ignore internal ZK events
+    if (type != SESSION_EVENT && type != NOTWATCHING_EVENT) {
+        m_zkContextsMutex.Acquire();
+        //check if the user context is available
+        if (type == CHANGED_EVENT || type == DELETED_EVENT) {
+            //we may have two types of interest here, 
+            //in this case lets try to notify twice
+            context = findAndRemoveListenerContext( GET_NODE_DATA, path );
+            context2 = findAndRemoveListenerContext( NODE_EXISTS, path );
+            if (context.empty()) {
+                //make sure that the 2nd context is NULL and
+                // assign it to the 1st one
+                context = context2;
+                context2.clear();
+            }
+        } else if (type == CHILD_EVENT) {
+            context = findAndRemoveListenerContext( GET_NODE_CHILDREN, path );
+        } else if (type == CREATED_EVENT) {
+            context = findAndRemoveListenerContext( NODE_EXISTS, path );
+        }
+        m_zkContextsMutex.Release();
+    }
+    
+    handleEvent( type, state, path, context );
+    if (!context2.empty()) {
+        handleEvent( type, state, path, context2 );
+    }
+}
+
+void
+ZooKeeperAdapter::handleEvent(int type,
+                              int state,
+                              const string &path,
+                              const Listener2Context &listeners)
+{
+    TRACE( LOG, "handleEvents" );
+
+    if (listeners.empty()) {
+        //propagate with empty context
+        ZKWatcherEvent event(type, state, path);
+        fireEvent( event );
+    } else {
+        for (Listener2Context::const_iterator i = listeners.begin();
+             i != listeners.end();
+             ++i) {
+            ZKWatcherEvent event(type, state, path, i->second);
+            if (i->first != NULL) {
+                fireEvent( i->first, event );
+            } else {
+                fireEvent( event );
+            }
+        }
+    }
+}
+
+void 
+ZooKeeperAdapter::enqueueEvent(int type, int state, const string &path)
+{
+    TRACE( LOG, "enqueueEvents" );
+
+    m_events.put( ZKWatcherEvent( type, state, path ) );
+}
+
+void
+ZooKeeperAdapter::processEvents()
+{
+    TRACE( LOG, "processEvents" );
+
+    while (!m_terminating) {
+        bool timedOut = false;
+        ZKWatcherEvent source = m_events.take( 100, &timedOut );
+        if (!timedOut) {
+            if (source.getType() == SESSION_EVENT) {
+                LOG_INFO( LOG,
+                          "Received SESSION event, state: %d. Adapter state: %d",
+                          source.getState(), m_state );
+                m_stateLock.lock();
+                if (source.getState() == CONNECTED_STATE) {
+                    m_connected = true;
+                    resetRemainingConnectTimeout();
+                    setState( AS_CONNECTED );
+                } else if (source.getState() == CONNECTING_STATE) {
+                    m_connected = false;
+                    setState( AS_CONNECTING );
+                } else if (source.getState() == EXPIRED_SESSION_STATE) {
+                    LOG_INFO( LOG, "Received EXPIRED_SESSION event" );
+                    setState( AS_SESSION_EXPIRED );
+                }
+                m_stateLock.unlock();
+            }
+            m_userEvents.put( source );
+        }
+    }
+}
+
+void
+ZooKeeperAdapter::processUserEvents()
+{
+    TRACE( LOG, "processUserEvents" );
+
+    while (!m_terminating) {
+        bool timedOut = false;
+        ZKWatcherEvent source = m_userEvents.take( 100, &timedOut );
+        if (!timedOut) {
+            try {
+                handleEvent( source.getType(),
+                             source.getState(),
+                             source.getPath() );
+            } catch (std::exception &e) {
+                LOG_ERROR( LOG, 
+                           "Unable to process event (type: %d, state: %d, "
+                                   "path: %s), because of exception: %s",
+                           source.getType(),
+                           source.getState(),
+                           source.getPath().c_str(),
+                           e.what() );
+            }
+        }
+    }
+}
+
+void 
+ZooKeeperAdapter::registerContext(WatchableMethod method,
+                                  const string &path,
+                                  ZKEventListener *listener,
+                                  ContextType context)
+{
+    TRACE( LOG, "registerContext" );
+
+    m_zkContexts[method][path][listener] = context;
+}
+
+ZooKeeperAdapter::Listener2Context
+ZooKeeperAdapter::findAndRemoveListenerContext(WatchableMethod method,
+                                               const string &path)
+{
+    TRACE( LOG, "findAndRemoveListenerContext" );
+
+    Listener2Context listeners;
+    Path2Listener2Context::iterator elem = m_zkContexts[method].find( path );
+    if (elem != m_zkContexts[method].end()) {
+        listeners = elem->second;
+        m_zkContexts[method].erase( elem );
+    } 
+    return listeners;
+}
+
+void 
+ZooKeeperAdapter::setState(AdapterState newState)
+{
+    TRACE( LOG, "setState" );    
+    if (newState != m_state) {
+        LOG_INFO( LOG, "Adapter state transition: %d -> %d", m_state, newState );
+        m_state = newState;
+        m_stateLock.notify();
+    } else {
+        LOG_TRACE( LOG, "New state same as the current: %d", newState );
+    }
+}
+
+
+//TODO move this code to verifyConnection so reconnect()
+//is called from one place only
+void
+ZooKeeperAdapter::waitUntilConnected() 
+  throw(ZooKeeperException)
+{
+    TRACE( LOG, "waitUntilConnected" );    
+    long long int timeout = getRemainingConnectTimeout();
+    LOG_INFO( LOG,
+              "Waiting up to %lld ms until a connection to ZK is established",
+              timeout );
+    bool connected;
+    if (timeout > 0) {
+        long long int toWait = timeout;
+        while (m_state != AS_CONNECTED && toWait > 0) {
+            //check if session expired and reconnect if so
+            if (m_state == AS_SESSION_EXPIRED) {
+                LOG_INFO( LOG,
+                        "Reconnecting because the current session has expired" );
+                reconnect();
+            }
+            struct timeval now;
+            gettimeofday( &now, NULL );
+            int64_t milliSecs = -(now.tv_sec * 1000LL + now.tv_usec / 1000);
+            LOG_TRACE( LOG, "About to wait %lld ms", toWait );
+            m_stateLock.wait( toWait );
+            gettimeofday( &now, NULL );
+            milliSecs += now.tv_sec * 1000LL + now.tv_usec / 1000;
+            toWait -= milliSecs;
+        }
+        waitedForConnect( timeout - toWait );
+        LOG_INFO( LOG, "Waited %lld ms", timeout - toWait );
+    }
+    connected = (m_state == AS_CONNECTED);
+    if (!connected) {
+        if (timeout > 0) {
+            LOG_WARN( LOG, "Timed out while waiting for connection to ZK" );
+            throw ZooKeeperException("Timed out while waiting for "
+                                    "connection to ZK");
+        } else {
+            LOG_ERROR( LOG, "Global timeout expired and still not connected to ZK" );
+            throw ZooKeeperException("Global timeout expired and still not "
+                                     "connected to ZK");
+        }
+    }
+    LOG_INFO( LOG, "Connected!" );
+}
+
+void
+ZooKeeperAdapter::verifyConnection() throw(ZooKeeperException)
+{
+    TRACE( LOG, "verifyConnection" );
+
+    m_stateLock.lock();
+    try {
+        if (m_state == AS_DISCONNECTED) {
+            throw ZooKeeperException("Disconnected from ZK. " \
+                "Please use reconnect() before attempting to use any ZK API");
+        } else if (m_state != AS_CONNECTED) {
+            LOG_TRACE( LOG, "Checking if need to reconnect..." );
+            //we are not connected, so check if connection in progress...
+            if (m_state != AS_CONNECTING) {
+                LOG_TRACE( LOG, 
+                           "yes. Checking if allowed to auto-reconnect..." );
+                //...not in progres, so check if we can reconnect
+                if (!m_zkConfig.getAutoReconnect()) {
+                    //...too bad, disallowed :(
+                    LOG_TRACE( LOG, "no. Sorry." );
+                    throw ZooKeeperException("ZK connection is down and "
+                                             "auto-reconnect is not allowed");
+                } else {
+                    LOG_TRACE( LOG, "...yes. About to reconnect" );
+                }
+                //...we are good to retry the connection
+                reconnect();
+            } else {
+                LOG_TRACE( LOG, "...no, already in CONNECTING state" );
+            }               
+            //wait until the connection is established
+            waitUntilConnected(); 
+        }
+    } catch (ZooKeeperException &e) {
+        m_stateLock.unlock();
+        throw;
+    }
+    m_stateLock.unlock();
+}
+
+bool
+ZooKeeperAdapter::createNode(const string &path, 
+                             const string &value, 
+                             int flags, 
+                             bool createAncestors,
+                             string &returnPath) 
+    throw(ZooKeeperException) 
+{
+    TRACE( LOG, "createNode (internal)" );
+    validatePath( path );
+    
+    const int MAX_PATH_LENGTH = 1024;
+    char realPath[MAX_PATH_LENGTH];
+    realPath[0] = 0;
+    
+    int rc;
+    RetryHandler rh(m_zkConfig);
+    do {
+        verifyConnection();
+        rc = zoo_create( mp_zkHandle, 
+                         path.c_str(), 
+                         value.c_str(),
+                         value.length(),
+                         &OPEN_ACL_UNSAFE,
+                         flags,
+                         realPath,
+                         MAX_PATH_LENGTH );
+    } while (rc != ZOK && rh.handleRC(rc));
+    if (rc != ZOK) {
+        if (rc == ZNODEEXISTS) {
+            //the node already exists
+            LOG_WARN( LOG, "Error %d for %s", rc, path.c_str() );
+            return false;
+        } else if (rc == ZNONODE && createAncestors) {
+            LOG_WARN( LOG, "Error %d for %s", rc, path.c_str() );
+            //one of the ancestors doesn't exist so lets start from the root 
+            //and make sure the whole path exists, creating missing nodes if
+            //necessary
+            for (string::size_type pos = 1; pos != string::npos; ) {
+                pos = path.find( "/", pos );
+                if (pos != string::npos) {
+                    try {
+                        createNode( path.substr( 0, pos ), "", 0, true );
+                    } catch (ZooKeeperException &e) {
+                        throw ZooKeeperException( string("Unable to create "
+                                                         "node ") + 
+                                                  path, 
+                                                  rc );
+                    }
+                    pos++;
+                } else {
+                    //no more path components
+                    return createNode( path, value, flags, false, returnPath );
+                }
+            }
+        }
+        LOG_ERROR( LOG,"Error %d for %s", rc, path.c_str() );
+        throw ZooKeeperException( string("Unable to create node ") +
+                                  path,
+                                  rc );
+    } else {
+        LOG_INFO( LOG, "%s has been created", realPath );
+        returnPath = string( realPath );
+        return true;
+    }
+}
+
+bool
+ZooKeeperAdapter::createNode(const string &path,
+                             const string &value,
+                             int flags,
+                             bool createAncestors) 
+        throw(ZooKeeperException) 
+{
+    TRACE( LOG, "createNode" );
+
+    string createdPath;
+    return createNode( path, value, flags, createAncestors, createdPath );
+}
+
+int64_t
+ZooKeeperAdapter::createSequence(const string &path,
+                                 const string &value,
+                                 int flags,
+                                 bool createAncestors) 
+    throw(ZooKeeperException)
+{
+    TRACE( LOG, "createSequence" );
+
+    string createdPath;    
+    bool result = createNode( path,
+                              value,
+                              flags | SEQUENCE,
+                              createAncestors,
+                              createdPath );
+    if (!result) {
+        return -1;
+    } else {
+        //extract sequence number from the returned path
+        if (createdPath.find( path ) != 0) {
+            throw ZooKeeperException( string("Expecting returned path '") +
+                                      createdPath + 
+                                      "' to start with '" +
+                                      path +
+                                      "'" );
+        }
+        string seqSuffix =
+            createdPath.substr( path.length(), 
+                                createdPath.length() - path.length() );
+        char *ptr = NULL;
+        int64_t seq = strtol( seqSuffix.c_str(), &ptr, 10 );
+        if (ptr != NULL && *ptr != '\0') {
+            throw ZooKeeperException( string("Expecting a number but got ") +
+                                      seqSuffix );
+        }
+        return seq;
+    }
+}
+
+bool
+ZooKeeperAdapter::deleteNode(const string &path,
+                             bool recursive,
+                             int version)
+    throw(ZooKeeperException)
+{
+    TRACE( LOG, "deleteNode" );
+
+    validatePath( path );
+        
+    int rc;
+    RetryHandler rh(m_zkConfig);
+    do {
+        verifyConnection();
+        rc = zoo_delete( mp_zkHandle, path.c_str(), version );
+    } while (rc != ZOK && rh.handleRC(rc));
+    if (rc != ZOK) {
+        if (rc == ZNONODE) {
+            LOG_WARN( LOG, "Error %d for %s", rc, path.c_str() );
+            return false;
+        }
+        if (rc == ZNOTEMPTY && recursive) {
+            LOG_WARN( LOG, "Error %d for %s", rc, path.c_str() );
+            //get all children and delete them recursively...
+            vector<string> nodeList;
+            getNodeChildren( nodeList, path, false );
+            for (vector<string>::const_iterator i = nodeList.begin();
+                 i != nodeList.end();
+                 ++i) {
+                deleteNode( *i, true );
+            }
+            //...and finally attempt to delete the node again
+            return deleteNode( path, false ); 
+        }
+        LOG_ERROR( LOG, "Error %d for %s", rc, path.c_str() );
+        throw ZooKeeperException( string("Unable to delete node ") + path,
+                                  rc );
+    } else {
+        LOG_INFO( LOG, "%s has been deleted", path.c_str() );
+        return true;
+    }
+}
+
+bool
+ZooKeeperAdapter::nodeExists(const string &path,
+                             ZKEventListener *listener,
+                             void *context, Stat *stat)
+    throw(ZooKeeperException)
+{
+    TRACE( LOG, "nodeExists" );
+
+    validatePath( path );
+
+    struct Stat tmpStat;
+    if (stat == NULL) {
+        stat = &tmpStat;
+    }
+    memset( stat, 0, sizeof(Stat) );
+
+    int rc;
+    RetryHandler rh(m_zkConfig);
+    do {
+        verifyConnection();
+        if (context != NULL) {    
+            m_zkContextsMutex.Acquire();
+            rc = zoo_exists( mp_zkHandle,
+                             path.c_str(),
+                             (listener != NULL ? 1 : 0),
+                             stat );
+            if (rc == ZOK || rc == ZNONODE) {
+                registerContext( NODE_EXISTS, path, listener, context );
+            }
+            m_zkContextsMutex.Release();
+        } else {
+            rc = zoo_exists( mp_zkHandle,
+                             path.c_str(),
+                             (listener != NULL ? 1 : 0),
+                             stat );
+        }
+    } while (rc != ZOK && rh.handleRC(rc));
+    if (rc != ZOK) {
+        if (rc == ZNONODE) {
+            LOG_TRACE( LOG, "Node %s does not exist", path.c_str() );
+            return false;
+        }
+        LOG_ERROR( LOG, "Error %d for %s", rc, path.c_str() );
+        throw ZooKeeperException(
+                 string("Unable to check existence of node ") + path,
+                 rc );
+    } else {
+        return true;        
+    }
+}
+
+void
+ZooKeeperAdapter::getNodeChildren(vector<string> &nodeList,
+                                  const string &path, 
+                                  ZKEventListener *listener,
+                                  void *context)
+    throw (ZooKeeperException)
+{
+    TRACE( LOG, "getNodeChildren" );
+
+    validatePath( path );
+    
+    String_vector children;
+    memset( &children, 0, sizeof(children) );
+
+    int rc;
+    RetryHandler rh(m_zkConfig);
+    do {
+        verifyConnection();
+        if (context != NULL) {
+            m_zkContextsMutex.Acquire();
+            rc = zoo_get_children( mp_zkHandle,
+                                   path.c_str(), 
+                                   (listener != NULL ? 1 : 0), 
+                                   &children );
+            if (rc == ZOK) {
+                registerContext( GET_NODE_CHILDREN, path, listener, context );
+            }
+            m_zkContextsMutex.Release();
+        } else {
+            rc = zoo_get_children( mp_zkHandle,
+                                   path.c_str(), 
+                                   (listener != NULL ? 1 : 0),
+                                   &children );
+        }
+    } while (rc != ZOK && rh.handleRC(rc));
+    if (rc != ZOK) {
+        LOG_ERROR( LOG, "Error %d for %s", rc, path.c_str() );
+        throw ZooKeeperException( string("Unable to get children of node ") +
+                                  path, 
+                                  rc );
+    } else {
+        for (int i = 0; i < children.count; ++i) {
+            //convert each child's path from relative to absolute 
+            string absPath(path);
+            if (path != "/") {
+                absPath.append( "/" );
+            } 
+            absPath.append( children.data[i] ); 
+            nodeList.push_back( absPath );
+        }
+        //make sure the order is always deterministic
+        sort( nodeList.begin(), nodeList.end() );
+    }
+}
+
+string
+ZooKeeperAdapter::getNodeData(const string &path,
+                              ZKEventListener *listener,
+                              void *context, Stat *stat)
+    throw(ZooKeeperException)
+{
+    TRACE( LOG, "getNodeData" );
+
+    validatePath( path );
+   
+    const int MAX_DATA_LENGTH = 128 * 1024;
+    char buffer[MAX_DATA_LENGTH];
+    memset( buffer, 0, MAX_DATA_LENGTH );
+    struct Stat tmpStat;
+    if (stat == NULL) {
+        stat = &tmpStat;
+    }
+    memset( stat, 0, sizeof(Stat) );
+    
+    int rc;
+    int len;
+    RetryHandler rh(m_zkConfig);
+    do {
+        verifyConnection();
+        len = MAX_DATA_LENGTH - 1;
+        if (context != NULL) {
+            m_zkContextsMutex.Acquire();
+            rc = zoo_get( mp_zkHandle, 
+                          path.c_str(),
+                          (listener != NULL ? 1 : 0),
+                          buffer, &len, stat );
+            if (rc == ZOK) {
+                registerContext( GET_NODE_DATA, path, listener, context );
+            }
+            m_zkContextsMutex.Release();
+        } else {
+            rc = zoo_get( mp_zkHandle,
+                          path.c_str(),
+                          (listener != NULL ? 1 : 0),
+                          buffer, &len, stat );
+        }
+    } while (rc != ZOK && rh.handleRC(rc));
+    if (rc != ZOK) {
+        LOG_ERROR( LOG, "Error %d for %s", rc, path.c_str() );
+        throw ZooKeeperException( 
+            string("Unable to get data of node ") + path, rc 
+        );
+    } else {
+        return string( buffer, buffer + len );
+    }
+}
+
+void
+ZooKeeperAdapter::setNodeData(const string &path,
+                              const string &value,
+                              int version)
+    throw(ZooKeeperException)
+{
+    TRACE( LOG, "setNodeData" );
+
+    validatePath( path );
+
+    int rc;
+    RetryHandler rh(m_zkConfig);
+    do {
+        verifyConnection();
+        rc = zoo_set( mp_zkHandle,
+                      path.c_str(),
+                      value.c_str(),
+                      value.length(), version );
+    } while (rc != ZOK && rh.handleRC(rc));
+    if (rc != ZOK) {
+        LOG_ERROR( LOG, "Error %d for %s", rc, path.c_str() );
+        throw ZooKeeperException( string("Unable to set data for node ") +
+                                  path,
+                                  rc );
+    }
+}
+
+}   /* end of 'namespace zk' */
+

Added: hadoop/zookeeper/trunk/src/contrib/zkfuse/src/zkadapter.h
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/zkfuse/src/zkadapter.h?rev=685624&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/zkfuse/src/zkadapter.h (added)
+++ hadoop/zookeeper/trunk/src/contrib/zkfuse/src/zkadapter.h Wed Aug 13 10:58:59 2008
@@ -0,0 +1,718 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __ZKADAPTER_H__
+#define __ZKADAPTER_H__
+
+#include <string>
+#include <vector>
+#include <map>
+
+extern "C" {
+#include "zookeeper.h"
+}
+
+#include "log.h"
+#include "mutex.h"
+#include "thread.h"
+#include "blockingqueue.h"
+#include "event.h"
+
+using namespace std;
+using namespace zkfuse;
+
+namespace zk {
+    
+/**
+ * \brief A cluster related exception.
+ */
+class ZooKeeperException :
+    public std::exception
+{
+    public:
+        
+        /**
+         * \brief Constructor.
+         * 
+         * @param msg the detailed message associated with this exception
+         */
+        ZooKeeperException(const string &msg) : 
+            m_message(msg), m_zkErrorCode(0) 
+        {}
+
+        /**
+         * \brief Constructor.
+         * 
+         * @param msg the detailed message associated with this exception
+         * @param errorCode the ZK error code associated with this exception
+         */
+        ZooKeeperException(const string &msg, int errorCode) : 
+            m_zkErrorCode(errorCode) 
+        {
+            char tmp[100];
+            sprintf( tmp, " (ZK error code: %d)", errorCode );
+            m_message = msg + tmp;
+        }
+                
+        /**
+         * \brief Destructor.
+         */
+        ~ZooKeeperException() throw() {}
+        
+        /**
+         * \brief Returns detailed description of the exception.
+         */
+        const char *what() const throw() {
+            return m_message.c_str();
+        }
+        
+        /**
+         * \brief Returns the ZK error code.
+         */
+        int getZKErrorCode() const {
+            return m_zkErrorCode;
+        }
+
+    private:
+        
+        /**
+         * The detailed message associated with this exception.
+         */
+        string m_message;
+        
+        /**
+         * The optional error code received from ZK.
+         */
+        int m_zkErrorCode;
+        
+};
+    
+/**
+ * \brief This class encapsulates configuration of a ZK client.
+ */
+class ZooKeeperConfig
+{
+    public:
+        
+        /**
+         * \brief Constructor.
+         * 
+         * @param hosts the comma separated list of host and port pairs of ZK nodes
+         * @param leaseTimeout the lease timeout (heartbeat)
+         * @param autoReconnect whether to allow for auto-reconnect
+         * @param connectTimeout the connect timeout, in milliseconds;
+         */
+        ZooKeeperConfig(const string &hosts, 
+                        int leaseTimeout, 
+                        bool autoReconnect = true, 
+                        long long int connectTimeout = 15000) :
+            m_hosts(hosts), m_leaseTimeout(leaseTimeout), 
+                  m_autoReconnect(autoReconnect), m_connectTimeout(connectTimeout) {}
+        
+        /**
+         * \brief Returns the list of ZK hosts to connect to.
+         */
+        string getHosts() const { return m_hosts; }
+        
+        /**
+         * \brief Returns the lease timeout.
+         */
+        int getLeaseTimeout() const { return m_leaseTimeout; }
+        
+        /**
+         * \brief Returns whether {@link ZooKeeperAdapter} should attempt 
+         * \brief to automatically reconnect in case of a connection failure.
+         */
+        bool getAutoReconnect() const { return m_autoReconnect; }
+
+        /**
+         * \brief Gets the connect timeout.
+         * 
+         * @return the connect timeout
+         */
+        long long int getConnectTimeout() const { return m_connectTimeout; }
+                  
+    private:
+        
+        /**
+         * The host addresses of ZK nodes.
+         */
+        const string m_hosts;
+
+        /**
+         * The ZK lease timeout.
+         */
+        const int m_leaseTimeout;
+        
+        /**
+         * True if this adapater should attempt to autoreconnect in case 
+         * the current session has been dropped.
+         */
+        const bool m_autoReconnect;
+        
+        /**
+         * How long to wait, in milliseconds, before a connection 
+         * is established to ZK.
+         */
+        const long long int m_connectTimeout;
+        
+};
+
+/**
+ * \brief A data value object representing a watcher event received from the ZK.
+ */
+class ZKWatcherEvent
+{
+    public:
+
+        /**
+         * \brief The type representing the user's context.
+         */
+        typedef void *ContextType;
+        
+        /**
+         * \brief Constructor.
+         * 
+         * @param type the type of this event
+         * @param state the state of this event
+         * @param path the corresponding path, may be empty for some event types
+         * @param context the user specified context; possibly NULL
+         */
+        ZKWatcherEvent() : 
+            m_type(-1), m_state(-1), m_path(""), mp_context(NULL) {}
+                        
+        /**
+         * \brief Constructor.
+         * 
+         * @param type the type of this event
+         * @param state the state of this event
+         * @param path the corresponding path, may be empty for some event types
+         * @param context the user specified context; possibly NULL
+         */
+        ZKWatcherEvent(int type, int state, const string &path, 
+                       ContextType context = NULL) :
+            m_type(type), m_state(state), m_path(path), mp_context(context) {}
+        
+        int getType() const { return m_type; }
+        int getState() const { return m_state; }
+        string const &getPath() const { return m_path; }
+        ContextType getContext() const { return mp_context; }
+        
+        bool operator==(const ZKWatcherEvent &we) const {
+            return m_type == we.m_type && m_state == we.m_state 
+                    && m_path == we.m_path && mp_context == we.mp_context;
+        }
+        
+    private:
+        
+        /**
+         * The type of this event. It can be either CREATED_EVENT, DELETED_EVENT,
+         * CHANGED_EVENT, CHILD_EVENT, SESSION_EVENT or NOTWATCHING_EVENT. 
+         * See zookeeper.h for more details.
+         */
+        const int m_type;
+        
+        /**
+         * The state of ZK at the time of sending this event.
+         * It can be either CONNECTING_STATE, ASSOCIATING_STATE, 
+         * CONNECTED_STATE, EXPIRED_SESSION_STATE or AUTH_FAILED_STATE.
+         * See {@file zookeeper.h} for more details.
+         */
+        const int m_state;
+        
+        /**
+         * The corresponding path of the node in subject. It may be empty
+         * for some event types.
+         */
+        const string m_path;
+        
+        /**
+         * The pointer to the user specified context, possibly NULL.
+         */
+        ContextType mp_context;
+        
+};
+
+/**
+ * \brief The type definition of ZK event source.
+ */
+typedef EventSource<ZKWatcherEvent> ZKEventSource;
+
+/**
+ * \brief The type definition of ZK event listener.
+ */
+typedef EventListener<ZKWatcherEvent> ZKEventListener;
+           
+/**
+ * \brief This is a wrapper around ZK C synchrounous API.
+ */
+class ZooKeeperAdapter
+    : public ZKEventSource
+{
+    public:
+        /**
+         * \brief The global function that handles all ZK asynchronous notifications.
+         */
+        friend void zkWatcher(zhandle_t *, int, int, const char *);
+        
+        /**
+         * \brief The type representing the user's context.
+         */
+        typedef void *ContextType;
+        
+        /**
+         * \brief The map type of ZK event listener to user specified context mapping.
+         */
+        typedef map<ZKEventListener *, ContextType> Listener2Context;
+        
+        /**
+         * \brief The map type of ZK path's to listener's contexts.
+         */
+        typedef map<string, Listener2Context> Path2Listener2Context;
+                  
+        /**
+         * \brief All possible states of this client, in respect to 
+         * \brief connection to the ZK server.
+         */
+        enum AdapterState {
+            //mp_zkHandle is NULL
+            AS_DISCONNECTED = 0,
+            //mp_zkHandle is valid but this client is reconnecting
+            AS_CONNECTING,
+            //mp_zkHandle is valid and this client is connected
+            AS_CONNECTED,
+            //mp_zkHandle is valid, however no more calls can be made to ZK API
+            AS_SESSION_EXPIRED
+        };
+                
+        /**
+         * \brief Constructor.
+         * Attempts to create a ZK adapter, optionally connecting
+         * to the ZK. Note, that if the connection is to be established
+         * and the given listener is NULL, some events may be lost, 
+         * as they may arrive asynchronously before this method finishes.
+         * 
+         * @param config the ZK configuration
+         * @param listener the event listener to be used for listening 
+         *                 on incoming ZK events;
+         *                 if <code>NULL</code> not used
+         * @param establishConnection whether to establish connection to the ZK
+         * 
+         * @throw ZooKeeperException if cannot establish connection to the given ZK
+         */
+        ZooKeeperAdapter(ZooKeeperConfig config, 
+                         ZKEventListener *listener = NULL,
+                         bool establishConnection = false) 
+            throw(ZooKeeperException);
+
+        /**
+         * \brief Destructor.
+         */
+        ~ZooKeeperAdapter(); 
+                  
+        /**
+         * \brief Returns the current config.
+         */
+        const ZooKeeperConfig &getZooKeeperConfig() const {
+            return m_zkConfig;                      
+        }
+
+        /**
+         * \brief Restablishes connection to the ZK. 
+         * If this adapter is already connected, the current connection 
+         * will be dropped and a new connection will be established.
+         * 
+         * @throw ZooKeeperException if cannot establish connection to the ZK
+         */
+        void reconnect() throw(ZooKeeperException);
+        
+        /**
+         * \brief Disconnects from the ZK and unregisters {@link #mp_zkHandle}.
+         */
+        void disconnect();
+        
+        /**
+         * \brief Creates a new node identified by the given path. 
+         * This method will optionally attempt to create all missing ancestors.
+         * 
+         * @param path the absolute path name of the node to be created
+         * @param value the initial value to be associated with the node
+         * @param flags the ZK flags of the node to be created
+         * @param createAncestors if true and there are some missing ancestor nodes, 
+         *        this method will attempt to create them
+         * 
+         * @return true if the node has been successfully created; false otherwise
+         * @throw ZooKeeperException if the operation has failed
+         */ 
+        bool createNode(const string &path, 
+                        const string &value = "", 
+                        int flags = 0, 
+                        bool createAncestors = true) 
+            throw(ZooKeeperException);
+                  
+        /**
+         * \brief Creates a new sequence node using the give path as the prefix.
+         * This method will optionally attempt to create all missing ancestors.
+         * 
+         * @param path the absolute path name of the node to be created; 
+         * @param value the initial value to be associated with the node
+         * @param flags the ZK flags of the sequence node to be created 
+         *              (in addition to SEQUENCE)
+         * @param createAncestors if true and there are some missing ancestor 
+         *                        nodes, this method will attempt to create them
+         * 
+         * @return the sequence number associate with newly created node,
+         *         or -1 if it couldn't be created
+         * @throw ZooKeeperException if the operation has failed
+         */ 
+        int64_t createSequence(const string &path, 
+                               const string &value = "", 
+                               int flags = 0, 
+                               bool createAncestors = true) 
+            throw(ZooKeeperException);
+        
+        /**
+         * \brief Deletes a node identified by the given path.
+         * 
+         * @param path the absolute path name of the node to be deleted
+         * @param recursive if true this method will attempt to remove 
+         *                  all children of the given node if any exist
+         * @param version the expected version of the node. The function will 
+         *                fail if the actual version of the node does not match 
+         *                the expected version
+         * 
+         * @return true if the node has been deleted; false otherwise
+         * @throw ZooKeeperException if the operation has failed
+         */
+        bool deleteNode(const string &path, bool recursive = false, int version = -1) 
+            throw(ZooKeeperException);
+        
+        /**
+         * \brief Checks whether the given node exists or not.
+         * 
+         * @param path the absolute path name of the node to be checked
+         * @param listener the listener for ZK watcher events; 
+         *                 passing non <code>NULL</code> effectively establishes
+         *                 a ZK watch on the given node
+         * @param context the user specified context that is to be passed
+         *                in a corresponding {@link ZKWatcherEvent} at later time; 
+         *                not used if <code>listener</code> is <code>NULL</code>
+         * @param stat the optional node statistics to be filled in by ZK
+         * 
+         * @return true if the given node exists; false otherwise
+         * @throw ZooKeeperException if the operation has failed
+         */
+        bool nodeExists(const string &path, 
+                        ZKEventListener *listener = NULL, 
+                        void *context = NULL,
+                        Stat *stat = NULL) 
+            throw(ZooKeeperException);
+
+        /**
+         * \brief Retrieves list of all children of the given node.
+         * 
+         * @param path the absolute path name of the node for which to get children
+         * @param listener the listener for ZK watcher events; 
+         *                 passing non <code>NULL</code> effectively establishes
+         *                 a ZK watch on the given node
+         * @param context the user specified context that is to be passed
+         *                in a corresponding {@link ZKWatcherEvent} at later time; 
+         *                not used if <code>listener</code> is <code>NULL</code>
+         * 
+         * @return the list of absolute paths of child nodes, possibly empty
+         * @throw ZooKeeperException if the operation has failed
+         */
+        void getNodeChildren(vector<string> &children,
+                             const string &path, 
+                             ZKEventListener *listener = NULL, 
+                             void *context = NULL) 
+            throw(ZooKeeperException);
+                
+        /**
+         * \brief Gets the given node's data.
+         * 
+         * @param path the absolute path name of the node to get data from
+         * @param listener the listener for ZK watcher events; 
+         *                 passing non <code>NULL</code> effectively establishes
+         *                 a ZK watch on the given node
+         * @param context the user specified context that is to be passed
+         *                in a corresponding {@link ZKWatcherEvent} at later time; 
+         *                not used if <code>listener</code> is <code>NULL</code>
+         * @param stat the optional node statistics to be filled in by ZK
+         * 
+         * @return the node's data
+         * @throw ZooKeeperException if the operation has failed
+         */
+        string getNodeData(const string &path, 
+                           ZKEventListener *listener = NULL, 
+                           void *context = NULL,
+                           Stat *stat = NULL) 
+            throw(ZooKeeperException);
+        
+        /**
+         * \brief Sets the given node's data.
+         * 
+         * @param path the absolute path name of the node to get data from
+         * @param value the node's data to be set
+         * @param version the expected version of the node. The function will 
+         *                fail if the actual version of the node does not match 
+         *                the expected version
+         * 
+         * @throw ZooKeeperException if the operation has failed
+         */
+        void setNodeData(const string &path, const string &value, int version = -1) 
+            throw(ZooKeeperException);
+        
+        /**
+         * \brief Validates the given path to a node in ZK.
+         * 
+         * @param the path to be validated
+         * 
+         * @throw ZooKeeperException if the given path is not valid
+         *        (for instance it doesn't start with "/")
+         */
+        static void validatePath(const string &path) throw(ZooKeeperException);
+
+        /**
+         * Returns the current state of this adapter.
+         * 
+         * @return the current state of this adapter
+         * @see AdapterState
+         */
+        AdapterState getState() const {
+            return m_state;
+        }          
+        
+    private:
+        
+        /**
+         * This enum defines methods from this class than can trigger an event.
+         */
+        enum WatchableMethod {
+            NODE_EXISTS = 0,
+            GET_NODE_CHILDREN,
+            GET_NODE_DATA
+        };
+                
+        /**
+         * \brief Creates a new node identified by the given path. 
+         * This method is used internally to implement {@link createNode(...)} 
+         * and {@link createSequence(...)}. On success, this method will set
+         * <code>createdPath</code>.
+         * 
+         * @param path the absolute path name of the node to be created
+         * @param value the initial value to be associated with the node
+         * @param flags the ZK flags of the node to be created
+         * @param createAncestors if true and there are some missing ancestor nodes, 
+         *        this method will attempt to create them
+         * @param createdPath the actual path of the node that has been created; 
+         *        useful for sequences
+         * 
+         * @return true if the node has been successfully created; false otherwise
+         * @throw ZooKeeperException if the operation has failed
+         */ 
+        bool createNode(const string &path, 
+                        const string &value, 
+                        int flags, 
+                        bool createAncestors,
+                        string &createdPath) 
+            throw(ZooKeeperException);
+        
+        /**
+         * Handles an asynchronous event received from the ZK.
+         */
+        void handleEvent(int type, int state, const string &path);
+        
+        /**
+         * Handles an asynchronous event received from the ZK.
+         * This method iterates over all listeners and passes the event 
+         * to each of them.
+         */
+        void handleEvent(int type, int state, const string &path, 
+                         const Listener2Context &listeners);        
+        
+        /**
+         * \brief Enqueues the given event in {@link #m_events} queue.
+         */
+        void enqueueEvent(int type, int state, const string &path);
+        
+        /**
+         * \brief Processes all ZK adapter events in a loop.
+         */
+        void processEvents();
+
+        /**
+         * \brief Processes all user events in a loop.
+         */
+        void processUserEvents();
+
+        /**
+         * \brief Registers the given context in the {@link #m_zkContexts} 
+         * \brief contexts map.
+         * 
+         * @param method the method where the given path is being used
+         * @param path the path of interest
+         * @param listener the event listener to call back later on
+         * @param context the user specified context to be passed back to user
+         */
+        void registerContext(WatchableMethod method, const string &path, 
+                             ZKEventListener *listener, ContextType context);
+        
+        /**
+         * \brief Attempts to find a listener to context map in the contexts' 
+         * \brief map, based on the specified criteria.
+         * If the context is found, it will be removed the udnerlying map.
+         * 
+         * @param method the method type identify Listener2Context map
+         * @param path the path to be used to search in the Listener2Context map
+         * 
+         * @return the context map associated with the given method and path, 
+         *         or empty map if not found
+         */
+        Listener2Context findAndRemoveListenerContext(WatchableMethod method, 
+                                                      const string &path);
+
+        /**
+         * Sets the new state in case it's different then the current one.
+         * This method assumes that {@link #m_stateLock} has been already locked.
+         * 
+         * @param newState the new state to be set
+         */
+        void setState(AdapterState newState); 
+        
+        /**
+         * Waits until this client gets connected. The total wait time 
+         * is given by {@link getRemainingConnectTimeout()}.
+         * If a timeout elapses, this method will throw an exception.
+         * 
+         * @throw ZooKeeperException if unable to connect within the given timeout
+         */
+        void waitUntilConnected() 
+            throw(ZooKeeperException);
+                                      
+        /**
+         * Verifies whether the connection is established,
+         * optionally auto reconnecting.
+         * 
+         * @throw ZooKeeperConnection if this client is disconnected
+         *        and auto-reconnect failed or was not allowed
+         */
+        void verifyConnection() throw(ZooKeeperException);
+
+        /**
+         * Returns the remaining connect timeout. The timeout resets
+         * to {@link #m_connectTimeout} on a successfull connection to the ZK.
+         * 
+         * @return the remaining connect timeout, in milliseconds
+         */
+        long long int getRemainingConnectTimeout() { 
+            return m_remainingConnectTimeout; 
+        }
+        
+        /**
+         * Resets the remaining connect timeout to {@link #m_connectTimeout}.
+         */
+        void resetRemainingConnectTimeout() { 
+            m_remainingConnectTimeout = m_zkConfig.getConnectTimeout(); 
+        }
+        
+        /**
+         * Updates the remaining connect timeout to reflect the given wait time.
+         * 
+         * @param time the time for how long waited so far on connect to succeed
+         */
+        void waitedForConnect(long long time) { 
+            m_remainingConnectTimeout -= time; 
+        }
+                
+    private:
+        
+        /**
+         * The mutex use to protect {@link #m_zkContexts}.
+         */
+        zkfuse::Mutex m_zkContextsMutex;
+        
+        /**
+         * The map of registered ZK paths that are being watched.
+         * Each entry maps a function type to another map of registered contexts.
+         * 
+         * @see WatchableMethod
+         */
+        map<int, Path2Listener2Context> m_zkContexts;
+        
+        /**
+         * The current ZK configuration.
+         */
+        const ZooKeeperConfig m_zkConfig;
+
+        /**
+         * The current ZK session.
+         */
+        zhandle_t *mp_zkHandle;
+        
+        /**
+         * The blocking queue of all events waiting to be processed by ZK adapter.
+         */
+        BlockingQueue<ZKWatcherEvent> m_events;
+        
+        /**
+         * The blocking queue of all events waiting to be processed by users
+         * of ZK adapter.
+         */
+        BlockingQueue<ZKWatcherEvent> m_userEvents;
+        
+        /**
+         * The thread that dispatches all events from {@link #m_events} queue.
+         */
+        CXXThread<ZooKeeperAdapter> m_eventDispatcher;
+
+        /**
+         * The thread that dispatches all events from {@link #m_userEvents} queue.
+         */
+        CXXThread<ZooKeeperAdapter> m_userEventDispatcher;
+                
+        /**
+         * Whether {@link #m_eventDispatcher} is terminating.
+         */
+        volatile bool m_terminating;
+        
+        /**
+         * Whether this adapter is connected to the ZK.
+         */
+        volatile bool m_connected;
+        
+        /**
+         * The state of this adapter.
+         */
+        AdapterState m_state;
+        
+        /**
+         * The lock used to synchronize access to {@link #m_state}.
+         */
+        Lock m_stateLock;
+
+        /**
+         * How much time left for the connect to succeed, in milliseconds.
+         */
+        long long int m_remainingConnectTimeout;
+                
+};
+        
+}   /* end of 'namespace zk' */
+
+#endif /* __ZKADAPTER_H__ */



Mime
View raw message