zookeeper-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From an...@apache.org
Subject [14/45] zookeeper git commit: ZOOKEEPER-3030: MAVEN MIGRATION - Step 1.3 - move contrib directories
Date Mon, 06 Aug 2018 12:13:39 GMT
http://git-wip-us.apache.org/repos/asf/zookeeper/blob/b0df8fe1/zookeeper-contrib/zookeeper-contrib-zkfuse/src/event.h
----------------------------------------------------------------------
diff --git a/zookeeper-contrib/zookeeper-contrib-zkfuse/src/event.h b/zookeeper-contrib/zookeeper-contrib-zkfuse/src/event.h
new file mode 100644
index 0000000..936ecc6
--- /dev/null
+++ b/zookeeper-contrib/zookeeper-contrib-zkfuse/src/event.h
@@ -0,0 +1,553 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __EVENT_H__
+#define __EVENT_H__
+
+#include <string>
+#include <set>
+#include <deque>
+#include <algorithm>
+#ifdef GCC4
+#   include <tr1/memory>
+using namespace std::tr1;
+#else
+#   include <boost/shared_ptr.hpp>
+using namespace boost;
+#endif
+
+#include "log.h"
+#include "blockingqueue.h"
+#include "mutex.h"
+#include "thread.h"
+
+using namespace std;
+using namespace zk;
+
+namespace zkfuse {
+
+//forward declaration of EventSource
+template<typename E>
+class EventSource;
+
+/**
+ * \brief This interface is implemented by an observer
+ * \brief of a particular {@link EventSource}.
+ */
+template<typename E>
+class EventListener {
+    public:
+        
+        /**
+         * \brief This method is invoked whenever an event 
+         * \brief has been received by the event source being observed.
+         * 
+         * @param source the source the triggered the event
+         * @param e      the actual event being triggered
+         */
+        virtual void eventReceived(const EventSource<E> &source, const E &e) = 0;
+};            
+
+/**
+ * \brief This class represents a source of events.
+ * 
+ * <p>
+ * Each source can have many observers (listeners) attached to it
+ * and in case of an event, this source may propagate the event
+ * using {@link #fireEvent} method.
+ */
+template<typename E>           
+class EventSource {
+    public:
+        
+        /**
+         * \brief The type corresponding to the list of registered event listeners.
+         */
+        typedef set<EventListener<E> *> EventListeners;
+        
+        /**
+         * \brief Registers a new event listener.
+         * 
+         * @param listener the listener to be added to the set of listeners
+         */
+        void addListener(EventListener<E> *listener) {
+            m_listeners.insert( listener );
+        }
+        
+        /**
+         * \brief Removes an already registered listener.
+         * 
+         * @param listener the listener to be removed
+         */
+        void removeListener(EventListener<E> *listener) {
+            m_listeners.erase( listener );
+        }
+        
+        /**
+         * \brief Destructor.
+         */
+        virtual ~EventSource() {}
+        
+    protected:
+        
+        /**
+         * \brief Fires the given event to all registered listeners.
+         * 
+         * <p>
+         * This method essentially iterates over all listeners
+         * and invokes {@link fireEvent(EventListener<E> *listener, const E &event)}
+         * for each element. All derived classes are free to
+         * override the method to provide better error handling
+         * than the default implementation.
+         * 
+         * @param event the event to be propagated to all listeners
+         */
+        void fireEvent(const E &event);
+        
+        /**
+         * \brief Sends an event to the given listener.
+         * 
+         * @param listener the listener to whom pass the event
+         * @param event the event to be handled
+         */
+        virtual void fireEvent(EventListener<E> *listener, const E &event);
+        
+    private:
+        
+        /**
+         * The set of registered event listeners.
+         */
+        EventListeners m_listeners;            
+    
+};
+
+/**
+ * \brief The interface of a generic event wrapper.
+ */
+class AbstractEventWrapper {
+    public:
+        
+        /**
+         * \brief Destructor.
+         */
+        virtual ~AbstractEventWrapper() {}
+        
+        /**
+         * \brief Returns the underlying wrapee's data.
+         */
+        virtual void *getWrapee() = 0;
+};
+
+/**
+ * \brief A template based implementation of {@link AbstractEventWrapper}.
+ */
+template<typename E>
+class EventWrapper : public AbstractEventWrapper {
+    public:
+        EventWrapper(const E &e) : m_e(e) {
+        }
+        void *getWrapee() {
+            return &m_e;
+        }
+    private:
+        E m_e;
+};
+
+/**
+ * \brief This class represents a generic event.
+ */
+class GenericEvent {
+    public:
+        
+        /**
+         * \brief Constructor.
+         */
+        GenericEvent() : m_type(0) {}
+
+        /**
+         * \brief Constructor.
+         * 
+         * @param type the type of this event
+         * @param eventWarpper the wrapper around event's data
+         */
+        GenericEvent(int type, AbstractEventWrapper *eventWrapper) : 
+            m_type(type), m_eventWrapper(eventWrapper) {
+        }
+        
+        /**
+         * \brief Returns the type of this event.
+         * 
+         * @return type of this event
+         */
+        int getType() const { return m_type; }
+        
+        /**
+         * \brief Returns the event's data.
+         * 
+         * @return the event's data
+         */
+        void *getEvent() const { return m_eventWrapper->getWrapee(); }
+        
+    private:
+
+        /**
+         * The event type.
+         */
+        int m_type;
+
+        /**
+         * The event represented as abstract wrapper.
+         */
+        boost::shared_ptr<AbstractEventWrapper> m_eventWrapper;
+        
+};
+    
+/**
+ * \brief This class adapts {@link EventListener} to a generic listener.
+ * Essentially this class listens on incoming events and fires them 
+ * as {@link GenericEvent}s.
+ */
+template<typename E, const int type>
+class EventListenerAdapter : public virtual EventListener<E>,
+                             public virtual EventSource<GenericEvent>
+{
+    public:
+        
+        /**
+         * \brief Constructor.
+         * 
+         * @param eventSource the source on which register this listener
+         */
+        EventListenerAdapter(EventSource<E> &eventSource) {
+            eventSource.addListener(this);
+        }
+        
+        void eventReceived(const EventSource<E> &source, const E &e) {
+            AbstractEventWrapper *wrapper = new EventWrapper<E>(e);
+            GenericEvent event(type, wrapper);
+            fireEvent( event );
+        }
+
+};        
+
+/**
+ * \brief This class provides an adapter between an asynchronous and synchronous 
+ * \brief event handling.
+ * 
+ * <p>
+ * This class queues up all received events and exposes them through 
+ * {@link #getNextEvent()} method.
+ */
+template<typename E>                  
+class SynchronousEventAdapter : public EventListener<E> {
+    public:
+        
+        void eventReceived(const EventSource<E> &source, const E &e) {
+            m_queue.put( e );
+        }
+
+        /**
+         * \brief Returns the next available event from the underlying queue,
+         * \brief possibly blocking, if no data is available.
+         * 
+         * @return the next available event
+         */
+        E getNextEvent() {
+            return m_queue.take();
+        }
+        
+        /**
+         * \brief Returns whether there are any events in the queue or not.
+         * 
+         * @return true if there is at least one event and 
+         *         the next call to {@link #getNextEvent} won't block
+         */
+        bool hasEvents() const {
+            return (m_queue.empty() ? false : true);
+        }
+        
+        /**
+         * \brief Destructor.
+         */
+        virtual ~SynchronousEventAdapter() {}
+
+    private:
+        
+        /**
+         * The blocking queue of all events received so far.
+         */
+        BlockingQueue<E> m_queue;
+        
+};
+
+/**
+ * This typedef defines the type of a timer Id.
+ */
+typedef int32_t TimerId;
+
+/**
+ * This class represents a timer event parametrized by the user's data type.
+ */
+template<typename T>
+class TimerEvent {
+    public:
+       
+        /**
+         * \brief Constructor.
+         * 
+         * @param id the ID of this event
+         * @param alarmTime when this event is to be triggered
+         * @param userData the user data associated with this event
+         */
+        TimerEvent(TimerId id, int64_t alarmTime, const T &userData) :
+            m_id(id), m_alarmTime(alarmTime), m_userData(userData) 
+        {}     
+
+        /**
+         * \brief Constructor.
+         */
+        TimerEvent() : m_id(-1), m_alarmTime(-1) {}
+                           
+        /**
+         * \brief Returns the ID.
+         * 
+         * @return the ID of this event
+         */
+        TimerId getID() const { return m_id; }
+        
+        /**
+         * \brief Returns the alarm time.
+         * 
+         * @return the alarm time
+         */
+        int64_t getAlarmTime() const { return m_alarmTime; }
+              
+        /**
+         * \brief Returns the user's data.
+         * 
+         * @return the user's data
+         */
+        T const &getUserData() const { return m_userData; }
+        
+        /**
+         * \brief Returns whether the given alarm time is less than this event's 
+         * \brief time.
+         */
+        bool operator<(const int64_t alarmTime) const {
+            return m_alarmTime < alarmTime;
+        }
+        
+    private:
+        
+        /**
+         * The ID of ths event.
+         */
+        TimerId m_id;
+        
+        /**
+         * The time at which this event triggers.
+         */
+        int64_t m_alarmTime;    
+        
+        /**
+         * The user specific data associated with this event.
+         */
+        T m_userData;
+        
+};
+
+template<typename T>
+class Timer : public EventSource<TimerEvent<T> > {
+    public:
+        
+        /**
+         * \brief Constructor.
+         */
+        Timer() : m_currentEventID(0), m_terminating(false) {
+            m_workerThread.Create( *this, &Timer<T>::sendAlarms );
+        }
+        
+        /**
+         * \brief Destructor.
+         */
+        ~Timer() {
+            m_terminating = true;
+            m_lock.notify();
+            m_workerThread.Join();
+        }
+        
+        /**
+         * \brief Schedules the given event <code>timeFromNow</code> milliseconds.
+         * 
+         * @param timeFromNow time from now, in milliseconds, when the event 
+         *                    should be triggered 
+         * @param userData the user data associated with the timer event
+         * 
+         * @return the ID of the newly created timer event
+         */
+        TimerId scheduleAfter(int64_t timeFromNow, const T &userData) {
+            return scheduleAt( getCurrentTimeMillis() + timeFromNow, userData );
+        }
+
+        /**
+         * \brief Schedules an event at the given time.
+         * 
+         * @param absTime absolute time, in milliseconds, at which the event 
+         *                should be triggered; the time is measured
+         *                from Jan 1st, 1970   
+         * @param userData the user data associated with the timer event
+         * 
+         * @return the ID of the newly created timer event
+         */
+        TimerId scheduleAt(int64_t absTime, const T &userData) {
+            m_lock.lock();
+            typename QueueType::iterator pos = 
+                    lower_bound( m_queue.begin(), m_queue.end(), absTime );
+            TimerId id = m_currentEventID++;
+            TimerEvent<T> event(id, absTime, userData); 
+            m_queue.insert( pos, event );
+            m_lock.notify();
+            m_lock.unlock();
+            return id;
+        }
+        
+        /**
+         * \brief Returns the current time since Jan 1, 1970, in milliseconds.
+         * 
+         * @return the current time in milliseconds
+         */
+        static int64_t getCurrentTimeMillis() {
+            struct timeval now;
+            gettimeofday( &now, NULL );
+            return now.tv_sec * 1000LL + now.tv_usec / 1000;
+        }
+
+        /**
+         * \brief Cancels the given timer event.
+         * 
+         * 
+         * @param eventID the ID of the event to be canceled
+         * 
+         * @return whether the event has been canceled
+         */
+        bool cancelAlarm(TimerId eventID) {
+            bool canceled = false;                      
+            m_lock.lock();
+            typename QueueType::iterator i;
+            for (i = m_queue.begin(); i != m_queue.end(); ++i) {
+                if (eventID == i->getID()) {
+                    m_queue.erase( i );
+                    canceled = true;
+                    break;
+                }
+            }
+            m_lock.unlock();
+            return canceled;
+        }
+        
+        /**
+         * Executes the main loop of the worker thread.
+         */
+        void sendAlarms() {
+            //iterate until terminating
+            while (!m_terminating) {
+                m_lock.lock();
+                //1 step - wait until there is an event in the queue
+                if (m_queue.empty()) {
+                    //wait up to 100ms to get next event
+                    m_lock.wait( 100 );
+                }     
+                bool fire = false;
+                if (!m_queue.empty()) {
+                    //retrieve the event from the queue and send it
+                    TimerEvent<T> event = m_queue.front();      
+                    //check whether we can send it right away
+                    int64_t timeToWait = 
+                        event.getAlarmTime() - getCurrentTimeMillis();
+                    if (timeToWait <= 0) {
+                        m_queue.pop_front();
+                        //we fire only if it's still in the queue and alarm
+                        //time has just elapsed (in case the top event
+                        //is canceled)
+                        fire = true;    
+                    } else {
+                        m_lock.wait( timeToWait );
+                    }
+                    m_lock.unlock();
+                    if (fire) {
+                        fireEvent( event );
+                    }
+                } else {
+                    m_lock.unlock();
+                }
+            }    
+        }
+        
+    private:
+        
+        /**
+         * The type of timer events queue.
+         */
+        typedef deque<TimerEvent<T> > QueueType;
+        
+        /**
+         * The current event ID, auto-incremented each time a new event 
+         * is created.
+         */
+        TimerId m_currentEventID;
+        
+        /**
+         * The queue of timer events sorted by {@link TimerEvent#alarmTime}.
+         */
+        QueueType m_queue;
+        
+        /**
+         * The lock used to guard {@link #m_queue}.
+         */
+        Lock m_lock;
+        
+        /**
+         * The thread that triggers alarms.
+         */
+        CXXThread<Timer<T> > m_workerThread;
+        
+        /**
+         * Whether {@link #m_workerThread}  is terminating.
+         */
+        volatile bool m_terminating;
+        
+};
+
+template<typename E>
+void EventSource<E>::fireEvent(const E &event) {
+    for (typename EventListeners::iterator i = m_listeners.begin(); 
+         i != m_listeners.end(); 
+         ++i) 
+    {
+        fireEvent( *i, event );
+    }
+}
+
+template<typename E>
+void EventSource<E>::fireEvent(EventListener<E> *listener, const E &event) {
+    listener->eventReceived( *this, event );
+}
+        
+}   /* end of 'namespace zkfuse' */
+
+#endif /* __EVENT_H__ */

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/b0df8fe1/zookeeper-contrib/zookeeper-contrib-zkfuse/src/log.cc
----------------------------------------------------------------------
diff --git a/zookeeper-contrib/zookeeper-contrib-zkfuse/src/log.cc b/zookeeper-contrib/zookeeper-contrib-zkfuse/src/log.cc
new file mode 100644
index 0000000..e2bfb0d
--- /dev/null
+++ b/zookeeper-contrib/zookeeper-contrib-zkfuse/src/log.cc
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <string>
+  
+#include "log.h"
+
+using namespace std;
+  
+/**
+ * \brief This class encapsulates a log4cxx configuration.
+ */
+class LogConfiguration {
+    public:
+        LogConfiguration(const string &file) {
+            PropertyConfigurator::configureAndWatch( file, 5000 );
+        }
+};
+
+//enforces the configuration to be initialized
+static LogConfiguration logConfig( "log4cxx.properties" );

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/b0df8fe1/zookeeper-contrib/zookeeper-contrib-zkfuse/src/log.h
----------------------------------------------------------------------
diff --git a/zookeeper-contrib/zookeeper-contrib-zkfuse/src/log.h b/zookeeper-contrib/zookeeper-contrib-zkfuse/src/log.h
new file mode 100644
index 0000000..aefce10
--- /dev/null
+++ b/zookeeper-contrib/zookeeper-contrib-zkfuse/src/log.h
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __LOG_H__
+#define __LOG_H__
+
+#define ZKFUSE_NAMESPACE zkfuse
+#define START_ZKFUSE_NAMESPACE namespace ZKFUSE_NAMESPACE {
+#define END_ZKFUSE_NAMESPACE   }
+#define USING_ZKFUSE_NAMESPACE using namespace ZKFUSE_NAMESPACE;
+
+#include <stdlib.h>
+#include <stdio.h>
+#include <pthread.h>
+
+#include <log4cxx/logger.h> 
+#include <log4cxx/propertyconfigurator.h> 
+#include <log4cxx/helpers/exception.h> 
+using namespace log4cxx; 
+using namespace log4cxx::helpers;
+
+#define PRINTIP(x) ((uint8_t*)&x)[0], ((uint8_t*)&x)[1], \
+                   ((uint8_t*)&x)[2], ((uint8_t*)&x)[3]
+
+#define IPFMT "%u.%u.%u.%u"
+
+#define DECLARE_LOGGER(varName) \
+extern LoggerPtr varName;
+
+#define DEFINE_LOGGER(varName, logName) \
+static LoggerPtr varName = Logger::getLogger( logName );
+
+#define MAX_BUFFER_SIZE 20000
+
+#define SPRINTF_LOG_MSG(buffer, fmt, args...) \
+    char buffer[MAX_BUFFER_SIZE]; \
+    snprintf( buffer, MAX_BUFFER_SIZE, fmt, ##args );
+
+// older versions of log4cxx don't support tracing
+#ifdef LOG4CXX_TRACE
+#define LOG_TRACE(logger, fmt, args...) \
+    if (logger->isTraceEnabled()) { \
+        SPRINTF_LOG_MSG( __tmp, fmt, ##args ); \
+        LOG4CXX_TRACE( logger, __tmp ); \
+    }
+#else
+#define LOG_TRACE(logger, fmt, args...) \
+    if (logger->isDebugEnabled()) { \
+        SPRINTF_LOG_MSG( __tmp, fmt, ##args ); \
+        LOG4CXX_DEBUG( logger, __tmp ); \
+    }
+#endif
+
+#define LOG_DEBUG(logger, fmt, args...) \
+    if (logger->isDebugEnabled()) { \
+        SPRINTF_LOG_MSG( __tmp, fmt, ##args ); \
+        LOG4CXX_DEBUG( logger, __tmp ); \
+    }
+
+#define LOG_INFO(logger, fmt, args...) \
+    if (logger->isInfoEnabled()) { \
+        SPRINTF_LOG_MSG( __tmp, fmt, ##args ); \
+        LOG4CXX_INFO( logger, __tmp ); \
+    }
+
+#define LOG_WARN(logger, fmt, args...) \
+    if (logger->isWarnEnabled()) { \
+        SPRINTF_LOG_MSG( __tmp, fmt, ##args ); \
+        LOG4CXX_WARN( logger, __tmp ); \
+    }
+
+#define LOG_ERROR(logger, fmt, args...) \
+    if (logger->isErrorEnabled()) { \
+        SPRINTF_LOG_MSG( __tmp, fmt, ##args ); \
+        LOG4CXX_ERROR( logger, __tmp ); \
+    }
+
+#define LOG_FATAL(logger, fmt, args...) \
+    if (logger->isFatalEnabled()) { \
+        SPRINTF_LOG_MSG( __tmp, fmt, ##args ); \
+        LOG4CXX_FATAL( logger, __tmp ); \
+    }
+
+#ifdef DISABLE_TRACE
+#   define TRACE(logger, x)
+#else   
+#   define TRACE(logger, x) \
+class Trace { \
+ public: \
+    Trace(const void* p) : _p(p) { \
+        LOG_TRACE(logger, "%s %p Enter", __PRETTY_FUNCTION__, p); \
+    } \
+    ~Trace() { \
+        LOG_TRACE(logger, "%s %p Exit", __PRETTY_FUNCTION__, _p); \
+    } \
+    const void* _p; \
+} traceObj(x);
+#endif  /* DISABLE_TRACE */
+    
+#endif  /* __LOG_H__ */
+

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/b0df8fe1/zookeeper-contrib/zookeeper-contrib-zkfuse/src/log4cxx.properties
----------------------------------------------------------------------
diff --git a/zookeeper-contrib/zookeeper-contrib-zkfuse/src/log4cxx.properties b/zookeeper-contrib/zookeeper-contrib-zkfuse/src/log4cxx.properties
new file mode 100644
index 0000000..1e373e4
--- /dev/null
+++ b/zookeeper-contrib/zookeeper-contrib-zkfuse/src/log4cxx.properties
@@ -0,0 +1,28 @@
+#  Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+
+#     http://www.apache.org/licenses/LICENSE-2.0
+
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# Set root logger level to DEBUG and its only appender to A1.
+log4j.rootLogger=TRACE, A1
+
+# A1 is set to be a ConsoleAppender.
+log4j.appender.A1=org.apache.log4cxx.ConsoleAppender
+
+# A1 uses PatternLayout.
+log4j.appender.A1.layout=org.apache.log4cxx.PatternLayout
+log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
+
+log4j.category.zkfuse=TRACE
+

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/b0df8fe1/zookeeper-contrib/zookeeper-contrib-zkfuse/src/mutex.h
----------------------------------------------------------------------
diff --git a/zookeeper-contrib/zookeeper-contrib-zkfuse/src/mutex.h b/zookeeper-contrib/zookeeper-contrib-zkfuse/src/mutex.h
new file mode 100644
index 0000000..86c4604
--- /dev/null
+++ b/zookeeper-contrib/zookeeper-contrib-zkfuse/src/mutex.h
@@ -0,0 +1,169 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __MUTEX_H__
+#define __MUTEX_H__
+
+#include <pthread.h>
+#include <errno.h>
+#include <sys/time.h>
+
+#include "log.h"
+
+START_ZKFUSE_NAMESPACE
+
+class Cond;
+
+class Mutex {
+    friend class Cond;
+  public:
+    Mutex() {
+        pthread_mutexattr_init( &m_mutexAttr );
+        pthread_mutexattr_settype( &m_mutexAttr, PTHREAD_MUTEX_RECURSIVE_NP );
+        pthread_mutex_init( &mutex, &m_mutexAttr );
+    }
+    ~Mutex() {
+        pthread_mutex_destroy(&mutex);
+        pthread_mutexattr_destroy( &m_mutexAttr );
+    }
+    void Acquire() { Lock(); }
+    void Release() { Unlock(); }
+    void Lock() {
+        pthread_mutex_lock(&mutex);
+    }
+    int  TryLock() {
+        return pthread_mutex_trylock(&mutex);
+    }
+    void Unlock() {
+        pthread_mutex_unlock(&mutex);
+    }
+  private:
+    pthread_mutex_t mutex;
+    pthread_mutexattr_t m_mutexAttr;
+};
+
+class AutoLock {
+  public:
+    AutoLock(Mutex& mutex) : _mutex(mutex) {
+        mutex.Lock();
+    }
+    ~AutoLock() {
+        _mutex.Unlock();
+    }
+  private:
+    friend class AutoUnlockTemp;
+    Mutex& _mutex;
+};
+
+class AutoUnlockTemp {
+  public:
+    AutoUnlockTemp(AutoLock & autoLock) : _autoLock(autoLock) {
+        _autoLock._mutex.Unlock();
+    }
+    ~AutoUnlockTemp() {
+        _autoLock._mutex.Lock();
+    }
+  private:
+    AutoLock & _autoLock;
+};
+
+class Cond {
+  public:
+    Cond() {
+        static pthread_condattr_t attr;
+        static bool inited = false;
+        if(!inited) {
+            inited = true;
+            pthread_condattr_init(&attr);
+        }
+        pthread_cond_init(&_cond, &attr);
+    }
+    ~Cond() {
+        pthread_cond_destroy(&_cond);
+    }
+
+    void Wait(Mutex& mutex) {
+        pthread_cond_wait(&_cond, &mutex.mutex);
+    }
+
+    bool Wait(Mutex& mutex, long long int timeout) {
+        struct timeval now;
+        gettimeofday( &now, NULL );
+        struct timespec abstime;
+        int64_t microSecs = now.tv_sec * 1000000LL + now.tv_usec;
+        microSecs += timeout * 1000;
+        abstime.tv_sec = microSecs / 1000000LL;
+        abstime.tv_nsec = (microSecs % 1000000LL) * 1000;
+        if (pthread_cond_timedwait(&_cond, &mutex.mutex, &abstime) == ETIMEDOUT) {
+            return false;
+        } else {
+            return true;
+        }
+    }
+    
+    void Signal() {
+        pthread_cond_signal(&_cond);
+    }
+
+  private:
+    pthread_cond_t            _cond;
+};
+
+/**
+ * A wrapper class for {@link Mutex} and {@link Cond}.
+ */
+class Lock {
+    public:
+        
+        void lock() {
+            m_mutex.Lock();
+        }
+        
+        void unlock() {
+            m_mutex.Unlock();
+        }
+        
+        void wait() {
+            m_cond.Wait( m_mutex );
+        }
+
+        bool wait(long long int timeout) {
+            return m_cond.Wait( m_mutex, timeout );
+        }
+        
+        void notify() {
+            m_cond.Signal();
+        }
+
+    private:
+        
+        /**
+         * The mutex.
+         */
+        Mutex m_mutex;
+        
+        /**
+         * The condition associated with this lock's mutex.
+         */
+        Cond m_cond;         
+};
+
+END_ZKFUSE_NAMESPACE
+        
+#endif /* __MUTEX_H__ */
+

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/b0df8fe1/zookeeper-contrib/zookeeper-contrib-zkfuse/src/thread.cc
----------------------------------------------------------------------
diff --git a/zookeeper-contrib/zookeeper-contrib-zkfuse/src/thread.cc b/zookeeper-contrib/zookeeper-contrib-zkfuse/src/thread.cc
new file mode 100644
index 0000000..f1ed816
--- /dev/null
+++ b/zookeeper-contrib/zookeeper-contrib-zkfuse/src/thread.cc
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <log.h>
+
+#include "thread.h"
+
+DEFINE_LOGGER( LOG, "Thread" )
+
+START_ZKFUSE_NAMESPACE
+
+void Thread::Create(void* ctx, ThreadFunc func)
+{
+    pthread_attr_t attr;
+    pthread_attr_init(&attr);
+    pthread_attr_setstacksize(&attr, _stackSize);
+    int ret = pthread_create(&mThread, &attr, func, ctx);
+    if(ret != 0) {
+        LOG_FATAL( LOG, "pthread_create failed: %s", strerror(errno) );
+    }
+    // pthread_attr_destroy(&attr); 
+    _ctx = ctx;
+    _func = func;
+}
+
+END_ZKFUSE_NAMESPACE

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/b0df8fe1/zookeeper-contrib/zookeeper-contrib-zkfuse/src/thread.h
----------------------------------------------------------------------
diff --git a/zookeeper-contrib/zookeeper-contrib-zkfuse/src/thread.h b/zookeeper-contrib/zookeeper-contrib-zkfuse/src/thread.h
new file mode 100644
index 0000000..0ed12d7
--- /dev/null
+++ b/zookeeper-contrib/zookeeper-contrib-zkfuse/src/thread.h
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __THREAD_H__
+#define __THREAD_H__
+
+#include <errno.h>
+#include <string.h>
+#include <assert.h>
+#include <pthread.h>
+
+#include "log.h"
+
+START_ZKFUSE_NAMESPACE
+
+class Thread {
+  public:
+    static const size_t defaultStackSize = 1024 * 1024;
+    typedef void* (*ThreadFunc) (void*);
+    Thread(size_t stackSize = defaultStackSize) 
+      : _stackSize(stackSize), _ctx(NULL), _func(NULL) 
+    {
+        memset( &mThread, 0, sizeof(mThread) );
+    }
+    ~Thread() { }
+
+    void Create(void* ctx, ThreadFunc func);
+    void Join() {
+        //avoid SEGFAULT because of unitialized mThread
+        //in case Create(...) was never called
+        if (_func != NULL) {
+            pthread_join(mThread, 0);
+        }
+    }
+  private:
+    pthread_t mThread;  
+    void *_ctx;
+    ThreadFunc _func;
+    size_t _stackSize;
+};
+
+
+template<typename T>
+struct ThreadContext {
+    typedef void (T::*FuncPtr) (void);
+    ThreadContext(T& ctx, FuncPtr func) : _ctx(ctx), _func(func) {}
+    void run(void) {
+        (_ctx.*_func)();
+    }
+    T& _ctx;
+    FuncPtr   _func;
+};
+
+template<typename T>
+void* ThreadExec(void *obj) {
+    ThreadContext<T>* tc = (ThreadContext<T>*)(obj);
+    assert(tc != 0);
+    tc->run();
+    return 0;
+}
+
+template <typename T>
+class CXXThread : public Thread {
+  public:
+    typedef void (T::*FuncPtr) (void);
+    CXXThread(size_t stackSize = Thread::defaultStackSize) 
+      : Thread(stackSize), ctx(0) {}
+    ~CXXThread() { if (ctx) delete ctx; }
+
+    void Create(T& obj, FuncPtr func) {
+        assert(ctx == 0);
+        ctx = new ThreadContext<T>(obj, func);
+        Thread::Create(ctx, ThreadExec<T>);
+    }
+
+  private:
+    ThreadContext<T>* ctx;
+};
+
+    
+END_ZKFUSE_NAMESPACE
+
+#endif /* __THREAD_H__ */
+

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/b0df8fe1/zookeeper-contrib/zookeeper-contrib-zkfuse/src/zkadapter.cc
----------------------------------------------------------------------
diff --git a/zookeeper-contrib/zookeeper-contrib-zkfuse/src/zkadapter.cc b/zookeeper-contrib/zookeeper-contrib-zkfuse/src/zkadapter.cc
new file mode 100644
index 0000000..7f02fa3
--- /dev/null
+++ b/zookeeper-contrib/zookeeper-contrib-zkfuse/src/zkadapter.cc
@@ -0,0 +1,884 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <algorithm>
+#include <iostream>
+
+#include "blockingqueue.h"
+#include "thread.h"
+#include "zkadapter.h"
+
+using namespace std;
+using namespace zk;
+
+DEFINE_LOGGER( LOG, "zookeeper.adapter" )
+DEFINE_LOGGER( ZK_LOG, "zookeeper.core" )
+
+/**
+ * \brief A helper class to initialize ZK logging.
+ */
+class InitZooKeeperLogging
+{
+  public:
+    InitZooKeeperLogging() {
+        if (ZK_LOG->isDebugEnabled()
+#ifdef LOG4CXX_TRACE
+            || ZK_LOG->isTraceEnabled()
+#endif
+            ) 
+        {
+            zoo_set_debug_level( ZOO_LOG_LEVEL_DEBUG );
+        } else if (ZK_LOG->isInfoEnabled()) {
+            zoo_set_debug_level( ZOO_LOG_LEVEL_INFO );
+        } else if (ZK_LOG->isWarnEnabled()) {
+            zoo_set_debug_level( ZOO_LOG_LEVEL_WARN );
+        } else {
+            zoo_set_debug_level( ZOO_LOG_LEVEL_ERROR );
+        }
+    }
+};
+
+using namespace std;
+
+namespace zk
+{
+
+/**
+ * \brief This class provides logic for checking if a request can be retried.
+ */
+class RetryHandler
+{
+  public:
+    RetryHandler(const ZooKeeperConfig &zkConfig)
+        : m_zkConfig(zkConfig)
+    {
+        if (zkConfig.getAutoReconnect()) {
+            retries = 2;
+        } else {
+            retries = 0;
+        }
+    }
+        
+    /**
+     * \brief Attempts to fix a side effect of the given RC.
+     * 
+     * @param rc the ZK error code
+     * @return whether the error code has been handled and the caller should 
+     *         retry an operation the caused this error
+     */
+    bool handleRC(int rc)
+    {
+        TRACE( LOG, "handleRC" );
+
+        //check if the given error code is recoverable
+        if (!retryOnError(rc)) {
+            return false;
+        }
+        LOG_TRACE( LOG, "RC: %d, retries left: %d", rc, retries );
+        if (retries-- > 0) {
+            return true;
+        } else {
+            return false;
+        }
+    }
+        
+  private:
+    /**
+     * The ZK config.
+     */
+    const ZooKeeperConfig &m_zkConfig;
+        
+    /**
+     * The number of outstanding retries.
+     */
+    int retries;    
+        
+    /**
+     * Checks whether the given error entitles this adapter
+     * to retry the previous operation.
+     * 
+     * @param zkErrorCode one of the ZK error code
+     */
+    static bool retryOnError(int zkErrorCode)
+    {
+        return (zkErrorCode == ZCONNECTIONLOSS ||
+                zkErrorCode == ZOPERATIONTIMEOUT);
+    }
+};
+    
+    
+//the implementation of the global ZK event watcher
+void zkWatcher(zhandle_t *zh, int type, int state, const char *path,
+               void *watcherCtx)
+{
+    TRACE( LOG, "zkWatcher" );
+
+    //a workaround for buggy ZK API
+    string sPath = 
+        (path == NULL || 
+         state == ZOO_SESSION_EVENT || 
+         state == ZOO_NOTWATCHING_EVENT)
+        ? "" 
+        : string(path);
+    LOG_INFO( LOG,
+              "Received a ZK event - type: %d, state: %d, path: '%s'",
+              type, state, sPath.c_str() );
+    ZooKeeperAdapter *zka = (ZooKeeperAdapter *)zoo_get_context(zh);
+    if (zka != NULL) {
+        zka->enqueueEvent( type, state, sPath );
+    } else {
+        LOG_ERROR( LOG,
+                   "Skipping ZK event (type: %d, state: %d, path: '%s'), "
+                   "because ZK passed no context",
+                   type, state, sPath.c_str() );
+    }
+}
+
+
+
+// =======================================================================
+
+ZooKeeperAdapter::ZooKeeperAdapter(ZooKeeperConfig config, 
+                                   ZKEventListener *listener,
+                                   bool establishConnection) 
+    throw(ZooKeeperException)
+    : m_zkConfig(config),
+      mp_zkHandle(NULL), 
+      m_terminating(false),
+      m_connected(false),
+      m_state(AS_DISCONNECTED) 
+{
+    TRACE( LOG, "ZooKeeperAdapter" );
+
+    resetRemainingConnectTimeout();
+    
+    //enforce setting up appropriate ZK log level
+    static InitZooKeeperLogging INIT_ZK_LOGGING;
+    
+    if (listener != NULL) {
+        addListener(listener);
+    }
+
+    //start the event dispatcher thread
+    m_eventDispatcher.Create( *this, &ZooKeeperAdapter::processEvents );
+
+    //start the user event dispatcher thread
+    m_userEventDispatcher.Create( *this, &ZooKeeperAdapter::processUserEvents );
+    
+    //optionally establish the connection
+    if (establishConnection) {
+        reconnect();
+    }
+}
+
+ZooKeeperAdapter::~ZooKeeperAdapter()
+{
+    TRACE( LOG, "~ZooKeeperAdapter" );
+
+    try {
+        disconnect();
+    } catch (std::exception &e) {
+        LOG_ERROR( LOG, 
+                   "An exception while disconnecting from ZK: %s",
+                   e.what() );
+    }
+    m_terminating = true;
+    m_userEventDispatcher.Join();
+    m_eventDispatcher.Join();
+}
+
+void
+ZooKeeperAdapter::validatePath(const string &path) throw(ZooKeeperException)
+{
+    TRACE( LOG, "validatePath" );
+    
+    if (path.find( "/" ) != 0) {
+        throw ZooKeeperException( string("Node path must start with '/' but"
+                                         "it was '") +
+                                  path +
+                                  "'" );
+    }
+    if (path.length() > 1) {
+        if (path.rfind( "/" ) == path.length() - 1) {
+            throw ZooKeeperException( string("Node path must not end with "
+                                             "'/' but it was '") +
+                                      path +
+                                      "'" );
+        }
+        if (path.find( "//" ) != string::npos) {
+            throw ZooKeeperException( string("Node path must not contain "
+                                             "'//' but it was '") +
+                                      path +
+                                      "'" ); 
+        }
+    }
+}
+
+void
+ZooKeeperAdapter::disconnect()
+{
+    TRACE( LOG, "disconnect" );
+    LOG_TRACE( LOG, "mp_zkHandle: %p, state %d", mp_zkHandle, m_state );
+
+    m_stateLock.lock();
+    if (mp_zkHandle != NULL) {
+        zookeeper_close( mp_zkHandle );
+        mp_zkHandle = NULL;
+        setState( AS_DISCONNECTED );
+    }
+    m_stateLock.unlock();
+}
+
+void
+ZooKeeperAdapter::reconnect() throw(ZooKeeperException)
+{
+    TRACE( LOG, "reconnect" );
+    
+    m_stateLock.lock();
+    //clear the connection state
+    disconnect();
+    
+    //establish a new connection to ZooKeeper
+    mp_zkHandle = zookeeper_init( m_zkConfig.getHosts().c_str(), 
+                                  zkWatcher, 
+                                  m_zkConfig.getLeaseTimeout(),
+                                  NULL, this, 0);
+    resetRemainingConnectTimeout();
+    if (mp_zkHandle != NULL) {
+        setState( AS_CONNECTING );
+        m_stateLock.unlock();
+    } else {
+        m_stateLock.unlock();
+        throw ZooKeeperException( 
+            string("Unable to connect to ZK running at '") +
+                    m_zkConfig.getHosts() + "'" );
+    }
+    
+    LOG_DEBUG( LOG, "mp_zkHandle: %p, state %d", mp_zkHandle, m_state ); 
+}
+
+void
+ZooKeeperAdapter::handleEvent(int type, int state, const string &path)
+{
+    TRACE( LOG, "handleEvent" );
+    LOG_TRACE( LOG, 
+               "type: %d, state %d, path: %s",
+               type, state, path.c_str() );
+    Listener2Context context, context2;
+    //ignore internal ZK events
+    if (type != ZOO_SESSION_EVENT && type != ZOO_NOTWATCHING_EVENT) {
+        m_zkContextsMutex.Acquire();
+        //check if the user context is available
+        if (type == ZOO_CHANGED_EVENT || type == ZOO_DELETED_EVENT) {
+            //we may have two types of interest here, 
+            //in this case lets try to notify twice
+            context = findAndRemoveListenerContext( GET_NODE_DATA, path );
+            context2 = findAndRemoveListenerContext( NODE_EXISTS, path );
+            if (context.empty()) {
+                //make sure that the 2nd context is NULL and
+                // assign it to the 1st one
+                context = context2;
+                context2.clear();
+            }
+        } else if (type == ZOO_CHILD_EVENT) {
+            context = findAndRemoveListenerContext( GET_NODE_CHILDREN, path );
+        } else if (type == ZOO_CREATED_EVENT) {
+            context = findAndRemoveListenerContext( NODE_EXISTS, path );
+        }
+        m_zkContextsMutex.Release();
+    }
+    
+    handleEvent( type, state, path, context );
+    if (!context2.empty()) {
+        handleEvent( type, state, path, context2 );
+    }
+}
+
+void
+ZooKeeperAdapter::handleEvent(int type,
+                              int state,
+                              const string &path,
+                              const Listener2Context &listeners)
+{
+    TRACE( LOG, "handleEvents" );
+
+    if (listeners.empty()) {
+        //propagate with empty context
+        ZKWatcherEvent event(type, state, path);
+        fireEvent( event );
+    } else {
+        for (Listener2Context::const_iterator i = listeners.begin();
+             i != listeners.end();
+             ++i) {
+            ZKWatcherEvent event(type, state, path, i->second);
+            if (i->first != NULL) {
+                fireEvent( i->first, event );
+            } else {
+                fireEvent( event );
+            }
+        }
+    }
+}
+
+void 
+ZooKeeperAdapter::enqueueEvent(int type, int state, const string &path)
+{
+    TRACE( LOG, "enqueueEvents" );
+
+    m_events.put( ZKWatcherEvent( type, state, path ) );
+}
+
+void
+ZooKeeperAdapter::processEvents()
+{
+    TRACE( LOG, "processEvents" );
+
+    while (!m_terminating) {
+        bool timedOut = false;
+        ZKWatcherEvent source = m_events.take( 100, &timedOut );
+        if (!timedOut) {
+            if (source.getType() == ZOO_SESSION_EVENT) {
+                LOG_INFO( LOG,
+                          "Received SESSION event, state: %d. Adapter state: %d",
+                          source.getState(), m_state );
+                m_stateLock.lock();
+                if (source.getState() == ZOO_CONNECTED_STATE) {
+                    m_connected = true;
+                    resetRemainingConnectTimeout();
+                    setState( AS_CONNECTED );
+                } else if (source.getState() == ZOO_CONNECTING_STATE) {
+                    m_connected = false;
+                    setState( AS_CONNECTING );
+                } else if (source.getState() == ZOO_EXPIRED_SESSION_STATE) {
+                    LOG_INFO( LOG, "Received EXPIRED_SESSION event" );
+                    setState( AS_SESSION_EXPIRED );
+                }
+                m_stateLock.unlock();
+            }
+            m_userEvents.put( source );
+        }
+    }
+}
+
+void
+ZooKeeperAdapter::processUserEvents()
+{
+    TRACE( LOG, "processUserEvents" );
+
+    while (!m_terminating) {
+        bool timedOut = false;
+        ZKWatcherEvent source = m_userEvents.take( 100, &timedOut );
+        if (!timedOut) {
+            try {
+                handleEvent( source.getType(),
+                             source.getState(),
+                             source.getPath() );
+            } catch (std::exception &e) {
+                LOG_ERROR( LOG, 
+                           "Unable to process event (type: %d, state: %d, "
+                                   "path: %s), because of exception: %s",
+                           source.getType(),
+                           source.getState(),
+                           source.getPath().c_str(),
+                           e.what() );
+            }
+        }
+    }
+}
+
+void 
+ZooKeeperAdapter::registerContext(WatchableMethod method,
+                                  const string &path,
+                                  ZKEventListener *listener,
+                                  ContextType context)
+{
+    TRACE( LOG, "registerContext" );
+
+    m_zkContexts[method][path][listener] = context;
+}
+
+ZooKeeperAdapter::Listener2Context
+ZooKeeperAdapter::findAndRemoveListenerContext(WatchableMethod method,
+                                               const string &path)
+{
+    TRACE( LOG, "findAndRemoveListenerContext" );
+
+    Listener2Context listeners;
+    Path2Listener2Context::iterator elem = m_zkContexts[method].find( path );
+    if (elem != m_zkContexts[method].end()) {
+        listeners = elem->second;
+        m_zkContexts[method].erase( elem );
+    } 
+    return listeners;
+}
+
+void 
+ZooKeeperAdapter::setState(AdapterState newState)
+{
+    TRACE( LOG, "setState" );    
+    if (newState != m_state) {
+        LOG_INFO( LOG, "Adapter state transition: %d -> %d", m_state, newState );
+        m_state = newState;
+        m_stateLock.notify();
+    } else {
+        LOG_TRACE( LOG, "New state same as the current: %d", newState );
+    }
+}
+
+
+//TODO move this code to verifyConnection so reconnect()
+//is called from one place only
+void
+ZooKeeperAdapter::waitUntilConnected() 
+  throw(ZooKeeperException)
+{
+    TRACE( LOG, "waitUntilConnected" );    
+    long long int timeout = getRemainingConnectTimeout();
+    LOG_INFO( LOG,
+              "Waiting up to %lld ms until a connection to ZK is established",
+              timeout );
+    bool connected;
+    if (timeout > 0) {
+        long long int toWait = timeout;
+        while (m_state != AS_CONNECTED && toWait > 0) {
+            //check if session expired and reconnect if so
+            if (m_state == AS_SESSION_EXPIRED) {
+                LOG_INFO( LOG,
+                        "Reconnecting because the current session has expired" );
+                reconnect();
+            }
+            struct timeval now;
+            gettimeofday( &now, NULL );
+            int64_t milliSecs = -(now.tv_sec * 1000LL + now.tv_usec / 1000);
+            LOG_TRACE( LOG, "About to wait %lld ms", toWait );
+            m_stateLock.wait( toWait );
+            gettimeofday( &now, NULL );
+            milliSecs += now.tv_sec * 1000LL + now.tv_usec / 1000;
+            toWait -= milliSecs;
+        }
+        waitedForConnect( timeout - toWait );
+        LOG_INFO( LOG, "Waited %lld ms", timeout - toWait );
+    }
+    connected = (m_state == AS_CONNECTED);
+    if (!connected) {
+        if (timeout > 0) {
+            LOG_WARN( LOG, "Timed out while waiting for connection to ZK" );
+            throw ZooKeeperException("Timed out while waiting for "
+                                    "connection to ZK");
+        } else {
+            LOG_ERROR( LOG, "Global timeout expired and still not connected to ZK" );
+            throw ZooKeeperException("Global timeout expired and still not "
+                                     "connected to ZK");
+        }
+    }
+    LOG_INFO( LOG, "Connected!" );
+}
+
+void
+ZooKeeperAdapter::verifyConnection() throw(ZooKeeperException)
+{
+    TRACE( LOG, "verifyConnection" );
+
+    m_stateLock.lock();
+    try {
+        if (m_state == AS_DISCONNECTED) {
+            throw ZooKeeperException("Disconnected from ZK. " \
+                "Please use reconnect() before attempting to use any ZK API");
+        } else if (m_state != AS_CONNECTED) {
+            LOG_TRACE( LOG, "Checking if need to reconnect..." );
+            //we are not connected, so check if connection in progress...
+            if (m_state != AS_CONNECTING) {
+                LOG_TRACE( LOG, 
+                           "yes. Checking if allowed to auto-reconnect..." );
+                //...not in progres, so check if we can reconnect
+                if (!m_zkConfig.getAutoReconnect()) {
+                    //...too bad, disallowed :(
+                    LOG_TRACE( LOG, "no. Sorry." );
+                    throw ZooKeeperException("ZK connection is down and "
+                                             "auto-reconnect is not allowed");
+                } else {
+                    LOG_TRACE( LOG, "...yes. About to reconnect" );
+                }
+                //...we are good to retry the connection
+                reconnect();
+            } else {
+                LOG_TRACE( LOG, "...no, already in CONNECTING state" );
+            }               
+            //wait until the connection is established
+            waitUntilConnected(); 
+        }
+    } catch (ZooKeeperException &e) {
+        m_stateLock.unlock();
+        throw;
+    }
+    m_stateLock.unlock();
+}
+
+bool
+ZooKeeperAdapter::createNode(const string &path, 
+                             const string &value, 
+                             int flags, 
+                             bool createAncestors,
+                             string &returnPath) 
+    throw(ZooKeeperException) 
+{
+    TRACE( LOG, "createNode (internal)" );
+    validatePath( path );
+    
+    const int MAX_PATH_LENGTH = 1024;
+    char realPath[MAX_PATH_LENGTH];
+    realPath[0] = 0;
+    
+    int rc;
+    RetryHandler rh(m_zkConfig);
+    do {
+        verifyConnection();
+        rc = zoo_create( mp_zkHandle, 
+                         path.c_str(), 
+                         value.c_str(),
+                         value.length(),
+                         &ZOO_OPEN_ACL_UNSAFE,
+                         flags,
+                         realPath,
+                         MAX_PATH_LENGTH );
+    } while (rc != ZOK && rh.handleRC(rc));
+    if (rc != ZOK) {
+        if (rc == ZNODEEXISTS) {
+            //the node already exists
+            LOG_WARN( LOG, "Error %d for %s", rc, path.c_str() );
+            return false;
+        } else if (rc == ZNONODE && createAncestors) {
+            LOG_WARN( LOG, "Error %d for %s", rc, path.c_str() );
+            //one of the ancestors doesn't exist so lets start from the root 
+            //and make sure the whole path exists, creating missing nodes if
+            //necessary
+            for (string::size_type pos = 1; pos != string::npos; ) {
+                pos = path.find( "/", pos );
+                if (pos != string::npos) {
+                    try {
+                        createNode( path.substr( 0, pos ), "", 0, true );
+                    } catch (ZooKeeperException &e) {
+                        throw ZooKeeperException( string("Unable to create "
+                                                         "node ") + 
+                                                  path, 
+                                                  rc );
+                    }
+                    pos++;
+                } else {
+                    //no more path components
+                    return createNode( path, value, flags, false, returnPath );
+                }
+            }
+        }
+        LOG_ERROR( LOG,"Error %d for %s", rc, path.c_str() );
+        throw ZooKeeperException( string("Unable to create node ") +
+                                  path,
+                                  rc );
+    } else {
+        LOG_INFO( LOG, "%s has been created", realPath );
+        returnPath = string( realPath );
+        return true;
+    }
+}
+
+bool
+ZooKeeperAdapter::createNode(const string &path,
+                             const string &value,
+                             int flags,
+                             bool createAncestors) 
+        throw(ZooKeeperException) 
+{
+    TRACE( LOG, "createNode" );
+
+    string createdPath;
+    return createNode( path, value, flags, createAncestors, createdPath );
+}
+
+int64_t
+ZooKeeperAdapter::createSequence(const string &path,
+                                 const string &value,
+                                 int flags,
+                                 bool createAncestors) 
+    throw(ZooKeeperException)
+{
+    TRACE( LOG, "createSequence" );
+
+    string createdPath;    
+    bool result = createNode( path,
+                              value,
+                              flags | ZOO_SEQUENCE,
+                              createAncestors,
+                              createdPath );
+    if (!result) {
+        return -1;
+    } else {
+        //extract sequence number from the returned path
+        if (createdPath.find( path ) != 0) {
+            throw ZooKeeperException( string("Expecting returned path '") +
+                                      createdPath + 
+                                      "' to start with '" +
+                                      path +
+                                      "'" );
+        }
+        string seqSuffix =
+            createdPath.substr( path.length(), 
+                                createdPath.length() - path.length() );
+        char *ptr = NULL;
+        int64_t seq = strtol( seqSuffix.c_str(), &ptr, 10 );
+        if (ptr != NULL && *ptr != '\0') {
+            throw ZooKeeperException( string("Expecting a number but got ") +
+                                      seqSuffix );
+        }
+        return seq;
+    }
+}
+
+bool
+ZooKeeperAdapter::deleteNode(const string &path,
+                             bool recursive,
+                             int version)
+    throw(ZooKeeperException)
+{
+    TRACE( LOG, "deleteNode" );
+
+    validatePath( path );
+        
+    int rc;
+    RetryHandler rh(m_zkConfig);
+    do {
+        verifyConnection();
+        rc = zoo_delete( mp_zkHandle, path.c_str(), version );
+    } while (rc != ZOK && rh.handleRC(rc));
+    if (rc != ZOK) {
+        if (rc == ZNONODE) {
+            LOG_WARN( LOG, "Error %d for %s", rc, path.c_str() );
+            return false;
+        }
+        if (rc == ZNOTEMPTY && recursive) {
+            LOG_WARN( LOG, "Error %d for %s", rc, path.c_str() );
+            //get all children and delete them recursively...
+            vector<string> nodeList;
+            getNodeChildren( nodeList, path, NULL );
+            for (vector<string>::const_iterator i = nodeList.begin();
+                 i != nodeList.end();
+                 ++i) {
+                deleteNode( *i, true );
+            }
+            //...and finally attempt to delete the node again
+            return deleteNode( path, false ); 
+        }
+        LOG_ERROR( LOG, "Error %d for %s", rc, path.c_str() );
+        throw ZooKeeperException( string("Unable to delete node ") + path,
+                                  rc );
+    } else {
+        LOG_INFO( LOG, "%s has been deleted", path.c_str() );
+        return true;
+    }
+}
+
+bool
+ZooKeeperAdapter::nodeExists(const string &path,
+                             ZKEventListener *listener,
+                             void *context, Stat *stat)
+    throw(ZooKeeperException)
+{
+    TRACE( LOG, "nodeExists" );
+
+    validatePath( path );
+
+    struct Stat tmpStat;
+    if (stat == NULL) {
+        stat = &tmpStat;
+    }
+    memset( stat, 0, sizeof(Stat) );
+
+    int rc;
+    RetryHandler rh(m_zkConfig);
+    do {
+        verifyConnection();
+        if (context != NULL) {    
+            m_zkContextsMutex.Acquire();
+            rc = zoo_exists( mp_zkHandle,
+                             path.c_str(),
+                             (listener != NULL ? 1 : 0),
+                             stat );
+            if (rc == ZOK || rc == ZNONODE) {
+                registerContext( NODE_EXISTS, path, listener, context );
+            }
+            m_zkContextsMutex.Release();
+        } else {
+            rc = zoo_exists( mp_zkHandle,
+                             path.c_str(),
+                             (listener != NULL ? 1 : 0),
+                             stat );
+        }
+    } while (rc != ZOK && rh.handleRC(rc));
+    if (rc != ZOK) {
+        if (rc == ZNONODE) {
+            LOG_TRACE( LOG, "Node %s does not exist", path.c_str() );
+            return false;
+        }
+        LOG_ERROR( LOG, "Error %d for %s", rc, path.c_str() );
+        throw ZooKeeperException(
+                 string("Unable to check existence of node ") + path,
+                 rc );
+    } else {
+        return true;        
+    }
+}
+
+void
+ZooKeeperAdapter::getNodeChildren(vector<string> &nodeList,
+                                  const string &path, 
+                                  ZKEventListener *listener,
+                                  void *context)
+    throw (ZooKeeperException)
+{
+    TRACE( LOG, "getNodeChildren" );
+
+    validatePath( path );
+    
+    String_vector children;
+    memset( &children, 0, sizeof(children) );
+
+    int rc;
+    RetryHandler rh(m_zkConfig);
+    do {
+        verifyConnection();
+        if (context != NULL) {
+            m_zkContextsMutex.Acquire();
+            rc = zoo_get_children( mp_zkHandle,
+                                   path.c_str(), 
+                                   (listener != NULL ? 1 : 0), 
+                                   &children );
+            if (rc == ZOK) {
+                registerContext( GET_NODE_CHILDREN, path, listener, context );
+            }
+            m_zkContextsMutex.Release();
+        } else {
+            rc = zoo_get_children( mp_zkHandle,
+                                   path.c_str(), 
+                                   (listener != NULL ? 1 : 0),
+                                   &children );
+        }
+    } while (rc != ZOK && rh.handleRC(rc));
+    if (rc != ZOK) {
+        LOG_ERROR( LOG, "Error %d for %s", rc, path.c_str() );
+        throw ZooKeeperException( string("Unable to get children of node ") +
+                                  path, 
+                                  rc );
+    } else {
+        for (int i = 0; i < children.count; ++i) {
+            //convert each child's path from relative to absolute 
+            string absPath(path);
+            if (path != "/") {
+                absPath.append( "/" );
+            } 
+            absPath.append( children.data[i] ); 
+            nodeList.push_back( absPath );
+        }
+        //make sure the order is always deterministic
+        sort( nodeList.begin(), nodeList.end() );
+    }
+}
+
+string
+ZooKeeperAdapter::getNodeData(const string &path,
+                              ZKEventListener *listener,
+                              void *context, Stat *stat)
+    throw(ZooKeeperException)
+{
+    TRACE( LOG, "getNodeData" );
+
+    validatePath( path );
+   
+    const int MAX_DATA_LENGTH = 128 * 1024;
+    char buffer[MAX_DATA_LENGTH];
+    memset( buffer, 0, MAX_DATA_LENGTH );
+    struct Stat tmpStat;
+    if (stat == NULL) {
+        stat = &tmpStat;
+    }
+    memset( stat, 0, sizeof(Stat) );
+    
+    int rc;
+    int len;
+    RetryHandler rh(m_zkConfig);
+    do {
+        verifyConnection();
+        len = MAX_DATA_LENGTH - 1;
+        if (context != NULL) {
+            m_zkContextsMutex.Acquire();
+            rc = zoo_get( mp_zkHandle, 
+                          path.c_str(),
+                          (listener != NULL ? 1 : 0),
+                          buffer, &len, stat );
+            if (rc == ZOK) {
+                registerContext( GET_NODE_DATA, path, listener, context );
+            }
+            m_zkContextsMutex.Release();
+        } else {
+            rc = zoo_get( mp_zkHandle,
+                          path.c_str(),
+                          (listener != NULL ? 1 : 0),
+                          buffer, &len, stat );
+        }
+    } while (rc != ZOK && rh.handleRC(rc));
+    if (rc != ZOK) {
+        LOG_ERROR( LOG, "Error %d for %s", rc, path.c_str() );
+        throw ZooKeeperException( 
+            string("Unable to get data of node ") + path, rc 
+        );
+    } else {
+        if (len == -1) {
+            len = 0;
+        }
+        return string( buffer, len );
+    }
+}
+
+void
+ZooKeeperAdapter::setNodeData(const string &path,
+                              const string &value,
+                              int version)
+    throw(ZooKeeperException)
+{
+    TRACE( LOG, "setNodeData" );
+
+    validatePath( path );
+
+    int rc;
+    RetryHandler rh(m_zkConfig);
+    do {
+        verifyConnection();
+        rc = zoo_set( mp_zkHandle,
+                      path.c_str(),
+                      value.c_str(),
+                      value.length(),
+                      version);
+    } while (rc != ZOK && rh.handleRC(rc));
+    if (rc != ZOK) {
+        LOG_ERROR( LOG, "Error %d for %s", rc, path.c_str() );
+        throw ZooKeeperException( string("Unable to set data for node ") +
+                                  path,
+                                  rc );
+    }
+}
+
+}   /* end of 'namespace zk' */
+


Mime
View raw message