http://git-wip-us.apache.org/repos/asf/zookeeper/blob/eab8c1eb/zookeeper-contrib/zookeeper-contrib-zkfuse/src/event.h
----------------------------------------------------------------------
diff --git a/zookeeper-contrib/zookeeper-contrib-zkfuse/src/event.h b/zookeeper-contrib/zookeeper-contrib-zkfuse/src/event.h
new file mode 100644
index 0000000..936ecc6
--- /dev/null
+++ b/zookeeper-contrib/zookeeper-contrib-zkfuse/src/event.h
@@ -0,0 +1,553 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __EVENT_H__
+#define __EVENT_H__
+
+#include <string>
+#include <set>
+#include <deque>
+#include <algorithm>
+#ifdef GCC4
+# include <tr1/memory>
+using namespace std::tr1;
+#else
+# include <boost/shared_ptr.hpp>
+using namespace boost;
+#endif
+
+#include "log.h"
+#include "blockingqueue.h"
+#include "mutex.h"
+#include "thread.h"
+
+using namespace std;
+using namespace zk;
+
+namespace zkfuse {
+
+//forward declaration of EventSource
+template<typename E>
+class EventSource;
+
+/**
+ * \brief This interface is implemented by an observer
+ * \brief of a particular {@link EventSource}.
+ */
+template<typename E>
+class EventListener {
+ public:
+
+ /**
+ * \brief This method is invoked whenever an event
+ * \brief has been received by the event source being observed.
+ *
+ * @param source the source the triggered the event
+ * @param e the actual event being triggered
+ */
+ virtual void eventReceived(const EventSource<E> &source, const E &e) = 0;
+};
+
+/**
+ * \brief This class represents a source of events.
+ *
+ * <p>
+ * Each source can have many observers (listeners) attached to it
+ * and in case of an event, this source may propagate the event
+ * using {@link #fireEvent} method.
+ */
+template<typename E>
+class EventSource {
+ public:
+
+ /**
+ * \brief The type corresponding to the list of registered event listeners.
+ */
+ typedef set<EventListener<E> *> EventListeners;
+
+ /**
+ * \brief Registers a new event listener.
+ *
+ * @param listener the listener to be added to the set of listeners
+ */
+ void addListener(EventListener<E> *listener) {
+ m_listeners.insert( listener );
+ }
+
+ /**
+ * \brief Removes an already registered listener.
+ *
+ * @param listener the listener to be removed
+ */
+ void removeListener(EventListener<E> *listener) {
+ m_listeners.erase( listener );
+ }
+
+ /**
+ * \brief Destructor.
+ */
+ virtual ~EventSource() {}
+
+ protected:
+
+ /**
+ * \brief Fires the given event to all registered listeners.
+ *
+ * <p>
+ * This method essentially iterates over all listeners
+ * and invokes {@link fireEvent(EventListener<E> *listener, const E &event)}
+ * for each element. All derived classes are free to
+ * override the method to provide better error handling
+ * than the default implementation.
+ *
+ * @param event the event to be propagated to all listeners
+ */
+ void fireEvent(const E &event);
+
+ /**
+ * \brief Sends an event to the given listener.
+ *
+ * @param listener the listener to whom pass the event
+ * @param event the event to be handled
+ */
+ virtual void fireEvent(EventListener<E> *listener, const E &event);
+
+ private:
+
+ /**
+ * The set of registered event listeners.
+ */
+ EventListeners m_listeners;
+
+};
+
+/**
+ * \brief The interface of a generic event wrapper.
+ */
+class AbstractEventWrapper {
+ public:
+
+ /**
+ * \brief Destructor.
+ */
+ virtual ~AbstractEventWrapper() {}
+
+ /**
+ * \brief Returns the underlying wrapee's data.
+ */
+ virtual void *getWrapee() = 0;
+};
+
+/**
+ * \brief A template based implementation of {@link AbstractEventWrapper}.
+ */
+template<typename E>
+class EventWrapper : public AbstractEventWrapper {
+ public:
+ EventWrapper(const E &e) : m_e(e) {
+ }
+ void *getWrapee() {
+ return &m_e;
+ }
+ private:
+ E m_e;
+};
+
+/**
+ * \brief This class represents a generic event.
+ */
+class GenericEvent {
+ public:
+
+ /**
+ * \brief Constructor.
+ */
+ GenericEvent() : m_type(0) {}
+
+ /**
+ * \brief Constructor.
+ *
+ * @param type the type of this event
+ * @param eventWarpper the wrapper around event's data
+ */
+ GenericEvent(int type, AbstractEventWrapper *eventWrapper) :
+ m_type(type), m_eventWrapper(eventWrapper) {
+ }
+
+ /**
+ * \brief Returns the type of this event.
+ *
+ * @return type of this event
+ */
+ int getType() const { return m_type; }
+
+ /**
+ * \brief Returns the event's data.
+ *
+ * @return the event's data
+ */
+ void *getEvent() const { return m_eventWrapper->getWrapee(); }
+
+ private:
+
+ /**
+ * The event type.
+ */
+ int m_type;
+
+ /**
+ * The event represented as abstract wrapper.
+ */
+ boost::shared_ptr<AbstractEventWrapper> m_eventWrapper;
+
+};
+
+/**
+ * \brief This class adapts {@link EventListener} to a generic listener.
+ * Essentially this class listens on incoming events and fires them
+ * as {@link GenericEvent}s.
+ */
+template<typename E, const int type>
+class EventListenerAdapter : public virtual EventListener<E>,
+ public virtual EventSource<GenericEvent>
+{
+ public:
+
+ /**
+ * \brief Constructor.
+ *
+ * @param eventSource the source on which register this listener
+ */
+ EventListenerAdapter(EventSource<E> &eventSource) {
+ eventSource.addListener(this);
+ }
+
+ void eventReceived(const EventSource<E> &source, const E &e) {
+ AbstractEventWrapper *wrapper = new EventWrapper<E>(e);
+ GenericEvent event(type, wrapper);
+ fireEvent( event );
+ }
+
+};
+
+/**
+ * \brief This class provides an adapter between an asynchronous and synchronous
+ * \brief event handling.
+ *
+ * <p>
+ * This class queues up all received events and exposes them through
+ * {@link #getNextEvent()} method.
+ */
+template<typename E>
+class SynchronousEventAdapter : public EventListener<E> {
+ public:
+
+ void eventReceived(const EventSource<E> &source, const E &e) {
+ m_queue.put( e );
+ }
+
+ /**
+ * \brief Returns the next available event from the underlying queue,
+ * \brief possibly blocking, if no data is available.
+ *
+ * @return the next available event
+ */
+ E getNextEvent() {
+ return m_queue.take();
+ }
+
+ /**
+ * \brief Returns whether there are any events in the queue or not.
+ *
+ * @return true if there is at least one event and
+ * the next call to {@link #getNextEvent} won't block
+ */
+ bool hasEvents() const {
+ return (m_queue.empty() ? false : true);
+ }
+
+ /**
+ * \brief Destructor.
+ */
+ virtual ~SynchronousEventAdapter() {}
+
+ private:
+
+ /**
+ * The blocking queue of all events received so far.
+ */
+ BlockingQueue<E> m_queue;
+
+};
+
+/**
+ * This typedef defines the type of a timer Id.
+ */
+typedef int32_t TimerId;
+
+/**
+ * This class represents a timer event parametrized by the user's data type.
+ */
+template<typename T>
+class TimerEvent {
+ public:
+
+ /**
+ * \brief Constructor.
+ *
+ * @param id the ID of this event
+ * @param alarmTime when this event is to be triggered
+ * @param userData the user data associated with this event
+ */
+ TimerEvent(TimerId id, int64_t alarmTime, const T &userData) :
+ m_id(id), m_alarmTime(alarmTime), m_userData(userData)
+ {}
+
+ /**
+ * \brief Constructor.
+ */
+ TimerEvent() : m_id(-1), m_alarmTime(-1) {}
+
+ /**
+ * \brief Returns the ID.
+ *
+ * @return the ID of this event
+ */
+ TimerId getID() const { return m_id; }
+
+ /**
+ * \brief Returns the alarm time.
+ *
+ * @return the alarm time
+ */
+ int64_t getAlarmTime() const { return m_alarmTime; }
+
+ /**
+ * \brief Returns the user's data.
+ *
+ * @return the user's data
+ */
+ T const &getUserData() const { return m_userData; }
+
+ /**
+ * \brief Returns whether the given alarm time is less than this event's
+ * \brief time.
+ */
+ bool operator<(const int64_t alarmTime) const {
+ return m_alarmTime < alarmTime;
+ }
+
+ private:
+
+ /**
+ * The ID of ths event.
+ */
+ TimerId m_id;
+
+ /**
+ * The time at which this event triggers.
+ */
+ int64_t m_alarmTime;
+
+ /**
+ * The user specific data associated with this event.
+ */
+ T m_userData;
+
+};
+
+template<typename T>
+class Timer : public EventSource<TimerEvent<T> > {
+ public:
+
+ /**
+ * \brief Constructor.
+ */
+ Timer() : m_currentEventID(0), m_terminating(false) {
+ m_workerThread.Create( *this, &Timer<T>::sendAlarms );
+ }
+
+ /**
+ * \brief Destructor.
+ */
+ ~Timer() {
+ m_terminating = true;
+ m_lock.notify();
+ m_workerThread.Join();
+ }
+
+ /**
+ * \brief Schedules the given event <code>timeFromNow</code> milliseconds.
+ *
+ * @param timeFromNow time from now, in milliseconds, when the event
+ * should be triggered
+ * @param userData the user data associated with the timer event
+ *
+ * @return the ID of the newly created timer event
+ */
+ TimerId scheduleAfter(int64_t timeFromNow, const T &userData) {
+ return scheduleAt( getCurrentTimeMillis() + timeFromNow, userData );
+ }
+
+ /**
+ * \brief Schedules an event at the given time.
+ *
+ * @param absTime absolute time, in milliseconds, at which the event
+ * should be triggered; the time is measured
+ * from Jan 1st, 1970
+ * @param userData the user data associated with the timer event
+ *
+ * @return the ID of the newly created timer event
+ */
+ TimerId scheduleAt(int64_t absTime, const T &userData) {
+ m_lock.lock();
+ typename QueueType::iterator pos =
+ lower_bound( m_queue.begin(), m_queue.end(), absTime );
+ TimerId id = m_currentEventID++;
+ TimerEvent<T> event(id, absTime, userData);
+ m_queue.insert( pos, event );
+ m_lock.notify();
+ m_lock.unlock();
+ return id;
+ }
+
+ /**
+ * \brief Returns the current time since Jan 1, 1970, in milliseconds.
+ *
+ * @return the current time in milliseconds
+ */
+ static int64_t getCurrentTimeMillis() {
+ struct timeval now;
+ gettimeofday( &now, NULL );
+ return now.tv_sec * 1000LL + now.tv_usec / 1000;
+ }
+
+ /**
+ * \brief Cancels the given timer event.
+ *
+ *
+ * @param eventID the ID of the event to be canceled
+ *
+ * @return whether the event has been canceled
+ */
+ bool cancelAlarm(TimerId eventID) {
+ bool canceled = false;
+ m_lock.lock();
+ typename QueueType::iterator i;
+ for (i = m_queue.begin(); i != m_queue.end(); ++i) {
+ if (eventID == i->getID()) {
+ m_queue.erase( i );
+ canceled = true;
+ break;
+ }
+ }
+ m_lock.unlock();
+ return canceled;
+ }
+
+ /**
+ * Executes the main loop of the worker thread.
+ */
+ void sendAlarms() {
+ //iterate until terminating
+ while (!m_terminating) {
+ m_lock.lock();
+ //1 step - wait until there is an event in the queue
+ if (m_queue.empty()) {
+ //wait up to 100ms to get next event
+ m_lock.wait( 100 );
+ }
+ bool fire = false;
+ if (!m_queue.empty()) {
+ //retrieve the event from the queue and send it
+ TimerEvent<T> event = m_queue.front();
+ //check whether we can send it right away
+ int64_t timeToWait =
+ event.getAlarmTime() - getCurrentTimeMillis();
+ if (timeToWait <= 0) {
+ m_queue.pop_front();
+ //we fire only if it's still in the queue and alarm
+ //time has just elapsed (in case the top event
+ //is canceled)
+ fire = true;
+ } else {
+ m_lock.wait( timeToWait );
+ }
+ m_lock.unlock();
+ if (fire) {
+ fireEvent( event );
+ }
+ } else {
+ m_lock.unlock();
+ }
+ }
+ }
+
+ private:
+
+ /**
+ * The type of timer events queue.
+ */
+ typedef deque<TimerEvent<T> > QueueType;
+
+ /**
+ * The current event ID, auto-incremented each time a new event
+ * is created.
+ */
+ TimerId m_currentEventID;
+
+ /**
+ * The queue of timer events sorted by {@link TimerEvent#alarmTime}.
+ */
+ QueueType m_queue;
+
+ /**
+ * The lock used to guard {@link #m_queue}.
+ */
+ Lock m_lock;
+
+ /**
+ * The thread that triggers alarms.
+ */
+ CXXThread<Timer<T> > m_workerThread;
+
+ /**
+ * Whether {@link #m_workerThread} is terminating.
+ */
+ volatile bool m_terminating;
+
+};
+
+template<typename E>
+void EventSource<E>::fireEvent(const E &event) {
+ for (typename EventListeners::iterator i = m_listeners.begin();
+ i != m_listeners.end();
+ ++i)
+ {
+ fireEvent( *i, event );
+ }
+}
+
+template<typename E>
+void EventSource<E>::fireEvent(EventListener<E> *listener, const E &event) {
+ listener->eventReceived( *this, event );
+}
+
+} /* end of 'namespace zkfuse' */
+
+#endif /* __EVENT_H__ */
http://git-wip-us.apache.org/repos/asf/zookeeper/blob/eab8c1eb/zookeeper-contrib/zookeeper-contrib-zkfuse/src/log.cc
----------------------------------------------------------------------
diff --git a/zookeeper-contrib/zookeeper-contrib-zkfuse/src/log.cc b/zookeeper-contrib/zookeeper-contrib-zkfuse/src/log.cc
new file mode 100644
index 0000000..e2bfb0d
--- /dev/null
+++ b/zookeeper-contrib/zookeeper-contrib-zkfuse/src/log.cc
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <string>
+
+#include "log.h"
+
+using namespace std;
+
+/**
+ * \brief This class encapsulates a log4cxx configuration.
+ */
+class LogConfiguration {
+ public:
+ LogConfiguration(const string &file) {
+ PropertyConfigurator::configureAndWatch( file, 5000 );
+ }
+};
+
+//enforces the configuration to be initialized
+static LogConfiguration logConfig( "log4cxx.properties" );
http://git-wip-us.apache.org/repos/asf/zookeeper/blob/eab8c1eb/zookeeper-contrib/zookeeper-contrib-zkfuse/src/log.h
----------------------------------------------------------------------
diff --git a/zookeeper-contrib/zookeeper-contrib-zkfuse/src/log.h b/zookeeper-contrib/zookeeper-contrib-zkfuse/src/log.h
new file mode 100644
index 0000000..aefce10
--- /dev/null
+++ b/zookeeper-contrib/zookeeper-contrib-zkfuse/src/log.h
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __LOG_H__
+#define __LOG_H__
+
+#define ZKFUSE_NAMESPACE zkfuse
+#define START_ZKFUSE_NAMESPACE namespace ZKFUSE_NAMESPACE {
+#define END_ZKFUSE_NAMESPACE }
+#define USING_ZKFUSE_NAMESPACE using namespace ZKFUSE_NAMESPACE;
+
+#include <stdlib.h>
+#include <stdio.h>
+#include <pthread.h>
+
+#include <log4cxx/logger.h>
+#include <log4cxx/propertyconfigurator.h>
+#include <log4cxx/helpers/exception.h>
+using namespace log4cxx;
+using namespace log4cxx::helpers;
+
+#define PRINTIP(x) ((uint8_t*)&x)[0], ((uint8_t*)&x)[1], \
+ ((uint8_t*)&x)[2], ((uint8_t*)&x)[3]
+
+#define IPFMT "%u.%u.%u.%u"
+
+#define DECLARE_LOGGER(varName) \
+extern LoggerPtr varName;
+
+#define DEFINE_LOGGER(varName, logName) \
+static LoggerPtr varName = Logger::getLogger( logName );
+
+#define MAX_BUFFER_SIZE 20000
+
+#define SPRINTF_LOG_MSG(buffer, fmt, args...) \
+ char buffer[MAX_BUFFER_SIZE]; \
+ snprintf( buffer, MAX_BUFFER_SIZE, fmt, ##args );
+
+// older versions of log4cxx don't support tracing
+#ifdef LOG4CXX_TRACE
+#define LOG_TRACE(logger, fmt, args...) \
+ if (logger->isTraceEnabled()) { \
+ SPRINTF_LOG_MSG( __tmp, fmt, ##args ); \
+ LOG4CXX_TRACE( logger, __tmp ); \
+ }
+#else
+#define LOG_TRACE(logger, fmt, args...) \
+ if (logger->isDebugEnabled()) { \
+ SPRINTF_LOG_MSG( __tmp, fmt, ##args ); \
+ LOG4CXX_DEBUG( logger, __tmp ); \
+ }
+#endif
+
+#define LOG_DEBUG(logger, fmt, args...) \
+ if (logger->isDebugEnabled()) { \
+ SPRINTF_LOG_MSG( __tmp, fmt, ##args ); \
+ LOG4CXX_DEBUG( logger, __tmp ); \
+ }
+
+#define LOG_INFO(logger, fmt, args...) \
+ if (logger->isInfoEnabled()) { \
+ SPRINTF_LOG_MSG( __tmp, fmt, ##args ); \
+ LOG4CXX_INFO( logger, __tmp ); \
+ }
+
+#define LOG_WARN(logger, fmt, args...) \
+ if (logger->isWarnEnabled()) { \
+ SPRINTF_LOG_MSG( __tmp, fmt, ##args ); \
+ LOG4CXX_WARN( logger, __tmp ); \
+ }
+
+#define LOG_ERROR(logger, fmt, args...) \
+ if (logger->isErrorEnabled()) { \
+ SPRINTF_LOG_MSG( __tmp, fmt, ##args ); \
+ LOG4CXX_ERROR( logger, __tmp ); \
+ }
+
+#define LOG_FATAL(logger, fmt, args...) \
+ if (logger->isFatalEnabled()) { \
+ SPRINTF_LOG_MSG( __tmp, fmt, ##args ); \
+ LOG4CXX_FATAL( logger, __tmp ); \
+ }
+
+#ifdef DISABLE_TRACE
+# define TRACE(logger, x)
+#else
+# define TRACE(logger, x) \
+class Trace { \
+ public: \
+ Trace(const void* p) : _p(p) { \
+ LOG_TRACE(logger, "%s %p Enter", __PRETTY_FUNCTION__, p); \
+ } \
+ ~Trace() { \
+ LOG_TRACE(logger, "%s %p Exit", __PRETTY_FUNCTION__, _p); \
+ } \
+ const void* _p; \
+} traceObj(x);
+#endif /* DISABLE_TRACE */
+
+#endif /* __LOG_H__ */
+
http://git-wip-us.apache.org/repos/asf/zookeeper/blob/eab8c1eb/zookeeper-contrib/zookeeper-contrib-zkfuse/src/log4cxx.properties
----------------------------------------------------------------------
diff --git a/zookeeper-contrib/zookeeper-contrib-zkfuse/src/log4cxx.properties b/zookeeper-contrib/zookeeper-contrib-zkfuse/src/log4cxx.properties
new file mode 100644
index 0000000..1e373e4
--- /dev/null
+++ b/zookeeper-contrib/zookeeper-contrib-zkfuse/src/log4cxx.properties
@@ -0,0 +1,28 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+
+# http://www.apache.org/licenses/LICENSE-2.0
+
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# Set root logger level to DEBUG and its only appender to A1.
+log4j.rootLogger=TRACE, A1
+
+# A1 is set to be a ConsoleAppender.
+log4j.appender.A1=org.apache.log4cxx.ConsoleAppender
+
+# A1 uses PatternLayout.
+log4j.appender.A1.layout=org.apache.log4cxx.PatternLayout
+log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
+
+log4j.category.zkfuse=TRACE
+
http://git-wip-us.apache.org/repos/asf/zookeeper/blob/eab8c1eb/zookeeper-contrib/zookeeper-contrib-zkfuse/src/mutex.h
----------------------------------------------------------------------
diff --git a/zookeeper-contrib/zookeeper-contrib-zkfuse/src/mutex.h b/zookeeper-contrib/zookeeper-contrib-zkfuse/src/mutex.h
new file mode 100644
index 0000000..86c4604
--- /dev/null
+++ b/zookeeper-contrib/zookeeper-contrib-zkfuse/src/mutex.h
@@ -0,0 +1,169 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __MUTEX_H__
+#define __MUTEX_H__
+
+#include <pthread.h>
+#include <errno.h>
+#include <sys/time.h>
+
+#include "log.h"
+
+START_ZKFUSE_NAMESPACE
+
+class Cond;
+
+class Mutex {
+ friend class Cond;
+ public:
+ Mutex() {
+ pthread_mutexattr_init( &m_mutexAttr );
+ pthread_mutexattr_settype( &m_mutexAttr, PTHREAD_MUTEX_RECURSIVE_NP );
+ pthread_mutex_init( &mutex, &m_mutexAttr );
+ }
+ ~Mutex() {
+ pthread_mutex_destroy(&mutex);
+ pthread_mutexattr_destroy( &m_mutexAttr );
+ }
+ void Acquire() { Lock(); }
+ void Release() { Unlock(); }
+ void Lock() {
+ pthread_mutex_lock(&mutex);
+ }
+ int TryLock() {
+ return pthread_mutex_trylock(&mutex);
+ }
+ void Unlock() {
+ pthread_mutex_unlock(&mutex);
+ }
+ private:
+ pthread_mutex_t mutex;
+ pthread_mutexattr_t m_mutexAttr;
+};
+
+class AutoLock {
+ public:
+ AutoLock(Mutex& mutex) : _mutex(mutex) {
+ mutex.Lock();
+ }
+ ~AutoLock() {
+ _mutex.Unlock();
+ }
+ private:
+ friend class AutoUnlockTemp;
+ Mutex& _mutex;
+};
+
+class AutoUnlockTemp {
+ public:
+ AutoUnlockTemp(AutoLock & autoLock) : _autoLock(autoLock) {
+ _autoLock._mutex.Unlock();
+ }
+ ~AutoUnlockTemp() {
+ _autoLock._mutex.Lock();
+ }
+ private:
+ AutoLock & _autoLock;
+};
+
+class Cond {
+ public:
+ Cond() {
+ static pthread_condattr_t attr;
+ static bool inited = false;
+ if(!inited) {
+ inited = true;
+ pthread_condattr_init(&attr);
+ }
+ pthread_cond_init(&_cond, &attr);
+ }
+ ~Cond() {
+ pthread_cond_destroy(&_cond);
+ }
+
+ void Wait(Mutex& mutex) {
+ pthread_cond_wait(&_cond, &mutex.mutex);
+ }
+
+ bool Wait(Mutex& mutex, long long int timeout) {
+ struct timeval now;
+ gettimeofday( &now, NULL );
+ struct timespec abstime;
+ int64_t microSecs = now.tv_sec * 1000000LL + now.tv_usec;
+ microSecs += timeout * 1000;
+ abstime.tv_sec = microSecs / 1000000LL;
+ abstime.tv_nsec = (microSecs % 1000000LL) * 1000;
+ if (pthread_cond_timedwait(&_cond, &mutex.mutex, &abstime) == ETIMEDOUT) {
+ return false;
+ } else {
+ return true;
+ }
+ }
+
+ void Signal() {
+ pthread_cond_signal(&_cond);
+ }
+
+ private:
+ pthread_cond_t _cond;
+};
+
+/**
+ * A wrapper class for {@link Mutex} and {@link Cond}.
+ */
+class Lock {
+ public:
+
+ void lock() {
+ m_mutex.Lock();
+ }
+
+ void unlock() {
+ m_mutex.Unlock();
+ }
+
+ void wait() {
+ m_cond.Wait( m_mutex );
+ }
+
+ bool wait(long long int timeout) {
+ return m_cond.Wait( m_mutex, timeout );
+ }
+
+ void notify() {
+ m_cond.Signal();
+ }
+
+ private:
+
+ /**
+ * The mutex.
+ */
+ Mutex m_mutex;
+
+ /**
+ * The condition associated with this lock's mutex.
+ */
+ Cond m_cond;
+};
+
+END_ZKFUSE_NAMESPACE
+
+#endif /* __MUTEX_H__ */
+
http://git-wip-us.apache.org/repos/asf/zookeeper/blob/eab8c1eb/zookeeper-contrib/zookeeper-contrib-zkfuse/src/thread.cc
----------------------------------------------------------------------
diff --git a/zookeeper-contrib/zookeeper-contrib-zkfuse/src/thread.cc b/zookeeper-contrib/zookeeper-contrib-zkfuse/src/thread.cc
new file mode 100644
index 0000000..f1ed816
--- /dev/null
+++ b/zookeeper-contrib/zookeeper-contrib-zkfuse/src/thread.cc
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <log.h>
+
+#include "thread.h"
+
+DEFINE_LOGGER( LOG, "Thread" )
+
+START_ZKFUSE_NAMESPACE
+
+void Thread::Create(void* ctx, ThreadFunc func)
+{
+ pthread_attr_t attr;
+ pthread_attr_init(&attr);
+ pthread_attr_setstacksize(&attr, _stackSize);
+ int ret = pthread_create(&mThread, &attr, func, ctx);
+ if(ret != 0) {
+ LOG_FATAL( LOG, "pthread_create failed: %s", strerror(errno) );
+ }
+ // pthread_attr_destroy(&attr);
+ _ctx = ctx;
+ _func = func;
+}
+
+END_ZKFUSE_NAMESPACE
http://git-wip-us.apache.org/repos/asf/zookeeper/blob/eab8c1eb/zookeeper-contrib/zookeeper-contrib-zkfuse/src/thread.h
----------------------------------------------------------------------
diff --git a/zookeeper-contrib/zookeeper-contrib-zkfuse/src/thread.h b/zookeeper-contrib/zookeeper-contrib-zkfuse/src/thread.h
new file mode 100644
index 0000000..0ed12d7
--- /dev/null
+++ b/zookeeper-contrib/zookeeper-contrib-zkfuse/src/thread.h
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __THREAD_H__
+#define __THREAD_H__
+
+#include <errno.h>
+#include <string.h>
+#include <assert.h>
+#include <pthread.h>
+
+#include "log.h"
+
+START_ZKFUSE_NAMESPACE
+
+class Thread {
+ public:
+ static const size_t defaultStackSize = 1024 * 1024;
+ typedef void* (*ThreadFunc) (void*);
+ Thread(size_t stackSize = defaultStackSize)
+ : _stackSize(stackSize), _ctx(NULL), _func(NULL)
+ {
+ memset( &mThread, 0, sizeof(mThread) );
+ }
+ ~Thread() { }
+
+ void Create(void* ctx, ThreadFunc func);
+ void Join() {
+ //avoid SEGFAULT because of unitialized mThread
+ //in case Create(...) was never called
+ if (_func != NULL) {
+ pthread_join(mThread, 0);
+ }
+ }
+ private:
+ pthread_t mThread;
+ void *_ctx;
+ ThreadFunc _func;
+ size_t _stackSize;
+};
+
+
+template<typename T>
+struct ThreadContext {
+ typedef void (T::*FuncPtr) (void);
+ ThreadContext(T& ctx, FuncPtr func) : _ctx(ctx), _func(func) {}
+ void run(void) {
+ (_ctx.*_func)();
+ }
+ T& _ctx;
+ FuncPtr _func;
+};
+
+template<typename T>
+void* ThreadExec(void *obj) {
+ ThreadContext<T>* tc = (ThreadContext<T>*)(obj);
+ assert(tc != 0);
+ tc->run();
+ return 0;
+}
+
+template <typename T>
+class CXXThread : public Thread {
+ public:
+ typedef void (T::*FuncPtr) (void);
+ CXXThread(size_t stackSize = Thread::defaultStackSize)
+ : Thread(stackSize), ctx(0) {}
+ ~CXXThread() { if (ctx) delete ctx; }
+
+ void Create(T& obj, FuncPtr func) {
+ assert(ctx == 0);
+ ctx = new ThreadContext<T>(obj, func);
+ Thread::Create(ctx, ThreadExec<T>);
+ }
+
+ private:
+ ThreadContext<T>* ctx;
+};
+
+
+END_ZKFUSE_NAMESPACE
+
+#endif /* __THREAD_H__ */
+
http://git-wip-us.apache.org/repos/asf/zookeeper/blob/eab8c1eb/zookeeper-contrib/zookeeper-contrib-zkfuse/src/zkadapter.cc
----------------------------------------------------------------------
diff --git a/zookeeper-contrib/zookeeper-contrib-zkfuse/src/zkadapter.cc b/zookeeper-contrib/zookeeper-contrib-zkfuse/src/zkadapter.cc
new file mode 100644
index 0000000..7f02fa3
--- /dev/null
+++ b/zookeeper-contrib/zookeeper-contrib-zkfuse/src/zkadapter.cc
@@ -0,0 +1,884 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <algorithm>
+#include <iostream>
+
+#include "blockingqueue.h"
+#include "thread.h"
+#include "zkadapter.h"
+
+using namespace std;
+using namespace zk;
+
+DEFINE_LOGGER( LOG, "zookeeper.adapter" )
+DEFINE_LOGGER( ZK_LOG, "zookeeper.core" )
+
+/**
+ * \brief A helper class to initialize ZK logging.
+ */
+class InitZooKeeperLogging
+{
+ public:
+ InitZooKeeperLogging() {
+ if (ZK_LOG->isDebugEnabled()
+#ifdef LOG4CXX_TRACE
+ || ZK_LOG->isTraceEnabled()
+#endif
+ )
+ {
+ zoo_set_debug_level( ZOO_LOG_LEVEL_DEBUG );
+ } else if (ZK_LOG->isInfoEnabled()) {
+ zoo_set_debug_level( ZOO_LOG_LEVEL_INFO );
+ } else if (ZK_LOG->isWarnEnabled()) {
+ zoo_set_debug_level( ZOO_LOG_LEVEL_WARN );
+ } else {
+ zoo_set_debug_level( ZOO_LOG_LEVEL_ERROR );
+ }
+ }
+};
+
+using namespace std;
+
+namespace zk
+{
+
+/**
+ * \brief This class provides logic for checking if a request can be retried.
+ */
+class RetryHandler
+{
+ public:
+ RetryHandler(const ZooKeeperConfig &zkConfig)
+ : m_zkConfig(zkConfig)
+ {
+ if (zkConfig.getAutoReconnect()) {
+ retries = 2;
+ } else {
+ retries = 0;
+ }
+ }
+
+ /**
+ * \brief Attempts to fix a side effect of the given RC.
+ *
+ * @param rc the ZK error code
+ * @return whether the error code has been handled and the caller should
+ * retry an operation the caused this error
+ */
+ bool handleRC(int rc)
+ {
+ TRACE( LOG, "handleRC" );
+
+ //check if the given error code is recoverable
+ if (!retryOnError(rc)) {
+ return false;
+ }
+ LOG_TRACE( LOG, "RC: %d, retries left: %d", rc, retries );
+ if (retries-- > 0) {
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ private:
+ /**
+ * The ZK config.
+ */
+ const ZooKeeperConfig &m_zkConfig;
+
+ /**
+ * The number of outstanding retries.
+ */
+ int retries;
+
+ /**
+ * Checks whether the given error entitles this adapter
+ * to retry the previous operation.
+ *
+ * @param zkErrorCode one of the ZK error code
+ */
+ static bool retryOnError(int zkErrorCode)
+ {
+ return (zkErrorCode == ZCONNECTIONLOSS ||
+ zkErrorCode == ZOPERATIONTIMEOUT);
+ }
+};
+
+
+//the implementation of the global ZK event watcher
+void zkWatcher(zhandle_t *zh, int type, int state, const char *path,
+ void *watcherCtx)
+{
+ TRACE( LOG, "zkWatcher" );
+
+ //a workaround for buggy ZK API
+ string sPath =
+ (path == NULL ||
+ state == ZOO_SESSION_EVENT ||
+ state == ZOO_NOTWATCHING_EVENT)
+ ? ""
+ : string(path);
+ LOG_INFO( LOG,
+ "Received a ZK event - type: %d, state: %d, path: '%s'",
+ type, state, sPath.c_str() );
+ ZooKeeperAdapter *zka = (ZooKeeperAdapter *)zoo_get_context(zh);
+ if (zka != NULL) {
+ zka->enqueueEvent( type, state, sPath );
+ } else {
+ LOG_ERROR( LOG,
+ "Skipping ZK event (type: %d, state: %d, path: '%s'), "
+ "because ZK passed no context",
+ type, state, sPath.c_str() );
+ }
+}
+
+
+
+// =======================================================================
+
+ZooKeeperAdapter::ZooKeeperAdapter(ZooKeeperConfig config,
+ ZKEventListener *listener,
+ bool establishConnection)
+ throw(ZooKeeperException)
+ : m_zkConfig(config),
+ mp_zkHandle(NULL),
+ m_terminating(false),
+ m_connected(false),
+ m_state(AS_DISCONNECTED)
+{
+ TRACE( LOG, "ZooKeeperAdapter" );
+
+ resetRemainingConnectTimeout();
+
+ //enforce setting up appropriate ZK log level
+ static InitZooKeeperLogging INIT_ZK_LOGGING;
+
+ if (listener != NULL) {
+ addListener(listener);
+ }
+
+ //start the event dispatcher thread
+ m_eventDispatcher.Create( *this, &ZooKeeperAdapter::processEvents );
+
+ //start the user event dispatcher thread
+ m_userEventDispatcher.Create( *this, &ZooKeeperAdapter::processUserEvents );
+
+ //optionally establish the connection
+ if (establishConnection) {
+ reconnect();
+ }
+}
+
+ZooKeeperAdapter::~ZooKeeperAdapter()
+{
+ TRACE( LOG, "~ZooKeeperAdapter" );
+
+ try {
+ disconnect();
+ } catch (std::exception &e) {
+ LOG_ERROR( LOG,
+ "An exception while disconnecting from ZK: %s",
+ e.what() );
+ }
+ m_terminating = true;
+ m_userEventDispatcher.Join();
+ m_eventDispatcher.Join();
+}
+
+void
+ZooKeeperAdapter::validatePath(const string &path) throw(ZooKeeperException)
+{
+ TRACE( LOG, "validatePath" );
+
+ if (path.find( "/" ) != 0) {
+ throw ZooKeeperException( string("Node path must start with '/' but"
+ "it was '") +
+ path +
+ "'" );
+ }
+ if (path.length() > 1) {
+ if (path.rfind( "/" ) == path.length() - 1) {
+ throw ZooKeeperException( string("Node path must not end with "
+ "'/' but it was '") +
+ path +
+ "'" );
+ }
+ if (path.find( "//" ) != string::npos) {
+ throw ZooKeeperException( string("Node path must not contain "
+ "'//' but it was '") +
+ path +
+ "'" );
+ }
+ }
+}
+
+void
+ZooKeeperAdapter::disconnect()
+{
+ TRACE( LOG, "disconnect" );
+ LOG_TRACE( LOG, "mp_zkHandle: %p, state %d", mp_zkHandle, m_state );
+
+ m_stateLock.lock();
+ if (mp_zkHandle != NULL) {
+ zookeeper_close( mp_zkHandle );
+ mp_zkHandle = NULL;
+ setState( AS_DISCONNECTED );
+ }
+ m_stateLock.unlock();
+}
+
+void
+ZooKeeperAdapter::reconnect() throw(ZooKeeperException)
+{
+ TRACE( LOG, "reconnect" );
+
+ m_stateLock.lock();
+ //clear the connection state
+ disconnect();
+
+ //establish a new connection to ZooKeeper
+ mp_zkHandle = zookeeper_init( m_zkConfig.getHosts().c_str(),
+ zkWatcher,
+ m_zkConfig.getLeaseTimeout(),
+ NULL, this, 0);
+ resetRemainingConnectTimeout();
+ if (mp_zkHandle != NULL) {
+ setState( AS_CONNECTING );
+ m_stateLock.unlock();
+ } else {
+ m_stateLock.unlock();
+ throw ZooKeeperException(
+ string("Unable to connect to ZK running at '") +
+ m_zkConfig.getHosts() + "'" );
+ }
+
+ LOG_DEBUG( LOG, "mp_zkHandle: %p, state %d", mp_zkHandle, m_state );
+}
+
+void
+ZooKeeperAdapter::handleEvent(int type, int state, const string &path)
+{
+ TRACE( LOG, "handleEvent" );
+ LOG_TRACE( LOG,
+ "type: %d, state %d, path: %s",
+ type, state, path.c_str() );
+ Listener2Context context, context2;
+ //ignore internal ZK events
+ if (type != ZOO_SESSION_EVENT && type != ZOO_NOTWATCHING_EVENT) {
+ m_zkContextsMutex.Acquire();
+ //check if the user context is available
+ if (type == ZOO_CHANGED_EVENT || type == ZOO_DELETED_EVENT) {
+ //we may have two types of interest here,
+ //in this case lets try to notify twice
+ context = findAndRemoveListenerContext( GET_NODE_DATA, path );
+ context2 = findAndRemoveListenerContext( NODE_EXISTS, path );
+ if (context.empty()) {
+ //make sure that the 2nd context is NULL and
+ // assign it to the 1st one
+ context = context2;
+ context2.clear();
+ }
+ } else if (type == ZOO_CHILD_EVENT) {
+ context = findAndRemoveListenerContext( GET_NODE_CHILDREN, path );
+ } else if (type == ZOO_CREATED_EVENT) {
+ context = findAndRemoveListenerContext( NODE_EXISTS, path );
+ }
+ m_zkContextsMutex.Release();
+ }
+
+ handleEvent( type, state, path, context );
+ if (!context2.empty()) {
+ handleEvent( type, state, path, context2 );
+ }
+}
+
+void
+ZooKeeperAdapter::handleEvent(int type,
+ int state,
+ const string &path,
+ const Listener2Context &listeners)
+{
+ TRACE( LOG, "handleEvents" );
+
+ if (listeners.empty()) {
+ //propagate with empty context
+ ZKWatcherEvent event(type, state, path);
+ fireEvent( event );
+ } else {
+ for (Listener2Context::const_iterator i = listeners.begin();
+ i != listeners.end();
+ ++i) {
+ ZKWatcherEvent event(type, state, path, i->second);
+ if (i->first != NULL) {
+ fireEvent( i->first, event );
+ } else {
+ fireEvent( event );
+ }
+ }
+ }
+}
+
+void
+ZooKeeperAdapter::enqueueEvent(int type, int state, const string &path)
+{
+ TRACE( LOG, "enqueueEvents" );
+
+ m_events.put( ZKWatcherEvent( type, state, path ) );
+}
+
+void
+ZooKeeperAdapter::processEvents()
+{
+ TRACE( LOG, "processEvents" );
+
+ while (!m_terminating) {
+ bool timedOut = false;
+ ZKWatcherEvent source = m_events.take( 100, &timedOut );
+ if (!timedOut) {
+ if (source.getType() == ZOO_SESSION_EVENT) {
+ LOG_INFO( LOG,
+ "Received SESSION event, state: %d. Adapter state: %d",
+ source.getState(), m_state );
+ m_stateLock.lock();
+ if (source.getState() == ZOO_CONNECTED_STATE) {
+ m_connected = true;
+ resetRemainingConnectTimeout();
+ setState( AS_CONNECTED );
+ } else if (source.getState() == ZOO_CONNECTING_STATE) {
+ m_connected = false;
+ setState( AS_CONNECTING );
+ } else if (source.getState() == ZOO_EXPIRED_SESSION_STATE) {
+ LOG_INFO( LOG, "Received EXPIRED_SESSION event" );
+ setState( AS_SESSION_EXPIRED );
+ }
+ m_stateLock.unlock();
+ }
+ m_userEvents.put( source );
+ }
+ }
+}
+
+void
+ZooKeeperAdapter::processUserEvents()
+{
+ TRACE( LOG, "processUserEvents" );
+
+ while (!m_terminating) {
+ bool timedOut = false;
+ ZKWatcherEvent source = m_userEvents.take( 100, &timedOut );
+ if (!timedOut) {
+ try {
+ handleEvent( source.getType(),
+ source.getState(),
+ source.getPath() );
+ } catch (std::exception &e) {
+ LOG_ERROR( LOG,
+ "Unable to process event (type: %d, state: %d, "
+ "path: %s), because of exception: %s",
+ source.getType(),
+ source.getState(),
+ source.getPath().c_str(),
+ e.what() );
+ }
+ }
+ }
+}
+
+void
+ZooKeeperAdapter::registerContext(WatchableMethod method,
+ const string &path,
+ ZKEventListener *listener,
+ ContextType context)
+{
+ TRACE( LOG, "registerContext" );
+
+ m_zkContexts[method][path][listener] = context;
+}
+
+ZooKeeperAdapter::Listener2Context
+ZooKeeperAdapter::findAndRemoveListenerContext(WatchableMethod method,
+ const string &path)
+{
+ TRACE( LOG, "findAndRemoveListenerContext" );
+
+ Listener2Context listeners;
+ Path2Listener2Context::iterator elem = m_zkContexts[method].find( path );
+ if (elem != m_zkContexts[method].end()) {
+ listeners = elem->second;
+ m_zkContexts[method].erase( elem );
+ }
+ return listeners;
+}
+
+void
+ZooKeeperAdapter::setState(AdapterState newState)
+{
+ TRACE( LOG, "setState" );
+ if (newState != m_state) {
+ LOG_INFO( LOG, "Adapter state transition: %d -> %d", m_state, newState );
+ m_state = newState;
+ m_stateLock.notify();
+ } else {
+ LOG_TRACE( LOG, "New state same as the current: %d", newState );
+ }
+}
+
+
+//TODO move this code to verifyConnection so reconnect()
+//is called from one place only
+void
+ZooKeeperAdapter::waitUntilConnected()
+ throw(ZooKeeperException)
+{
+ TRACE( LOG, "waitUntilConnected" );
+ long long int timeout = getRemainingConnectTimeout();
+ LOG_INFO( LOG,
+ "Waiting up to %lld ms until a connection to ZK is established",
+ timeout );
+ bool connected;
+ if (timeout > 0) {
+ long long int toWait = timeout;
+ while (m_state != AS_CONNECTED && toWait > 0) {
+ //check if session expired and reconnect if so
+ if (m_state == AS_SESSION_EXPIRED) {
+ LOG_INFO( LOG,
+ "Reconnecting because the current session has expired" );
+ reconnect();
+ }
+ struct timeval now;
+ gettimeofday( &now, NULL );
+ int64_t milliSecs = -(now.tv_sec * 1000LL + now.tv_usec / 1000);
+ LOG_TRACE( LOG, "About to wait %lld ms", toWait );
+ m_stateLock.wait( toWait );
+ gettimeofday( &now, NULL );
+ milliSecs += now.tv_sec * 1000LL + now.tv_usec / 1000;
+ toWait -= milliSecs;
+ }
+ waitedForConnect( timeout - toWait );
+ LOG_INFO( LOG, "Waited %lld ms", timeout - toWait );
+ }
+ connected = (m_state == AS_CONNECTED);
+ if (!connected) {
+ if (timeout > 0) {
+ LOG_WARN( LOG, "Timed out while waiting for connection to ZK" );
+ throw ZooKeeperException("Timed out while waiting for "
+ "connection to ZK");
+ } else {
+ LOG_ERROR( LOG, "Global timeout expired and still not connected to ZK" );
+ throw ZooKeeperException("Global timeout expired and still not "
+ "connected to ZK");
+ }
+ }
+ LOG_INFO( LOG, "Connected!" );
+}
+
+void
+ZooKeeperAdapter::verifyConnection() throw(ZooKeeperException)
+{
+ TRACE( LOG, "verifyConnection" );
+
+ m_stateLock.lock();
+ try {
+ if (m_state == AS_DISCONNECTED) {
+ throw ZooKeeperException("Disconnected from ZK. " \
+ "Please use reconnect() before attempting to use any ZK API");
+ } else if (m_state != AS_CONNECTED) {
+ LOG_TRACE( LOG, "Checking if need to reconnect..." );
+ //we are not connected, so check if connection in progress...
+ if (m_state != AS_CONNECTING) {
+ LOG_TRACE( LOG,
+ "yes. Checking if allowed to auto-reconnect..." );
+ //...not in progres, so check if we can reconnect
+ if (!m_zkConfig.getAutoReconnect()) {
+ //...too bad, disallowed :(
+ LOG_TRACE( LOG, "no. Sorry." );
+ throw ZooKeeperException("ZK connection is down and "
+ "auto-reconnect is not allowed");
+ } else {
+ LOG_TRACE( LOG, "...yes. About to reconnect" );
+ }
+ //...we are good to retry the connection
+ reconnect();
+ } else {
+ LOG_TRACE( LOG, "...no, already in CONNECTING state" );
+ }
+ //wait until the connection is established
+ waitUntilConnected();
+ }
+ } catch (ZooKeeperException &e) {
+ m_stateLock.unlock();
+ throw;
+ }
+ m_stateLock.unlock();
+}
+
+bool
+ZooKeeperAdapter::createNode(const string &path,
+ const string &value,
+ int flags,
+ bool createAncestors,
+ string &returnPath)
+ throw(ZooKeeperException)
+{
+ TRACE( LOG, "createNode (internal)" );
+ validatePath( path );
+
+ const int MAX_PATH_LENGTH = 1024;
+ char realPath[MAX_PATH_LENGTH];
+ realPath[0] = 0;
+
+ int rc;
+ RetryHandler rh(m_zkConfig);
+ do {
+ verifyConnection();
+ rc = zoo_create( mp_zkHandle,
+ path.c_str(),
+ value.c_str(),
+ value.length(),
+ &ZOO_OPEN_ACL_UNSAFE,
+ flags,
+ realPath,
+ MAX_PATH_LENGTH );
+ } while (rc != ZOK && rh.handleRC(rc));
+ if (rc != ZOK) {
+ if (rc == ZNODEEXISTS) {
+ //the node already exists
+ LOG_WARN( LOG, "Error %d for %s", rc, path.c_str() );
+ return false;
+ } else if (rc == ZNONODE && createAncestors) {
+ LOG_WARN( LOG, "Error %d for %s", rc, path.c_str() );
+ //one of the ancestors doesn't exist so lets start from the root
+ //and make sure the whole path exists, creating missing nodes if
+ //necessary
+ for (string::size_type pos = 1; pos != string::npos; ) {
+ pos = path.find( "/", pos );
+ if (pos != string::npos) {
+ try {
+ createNode( path.substr( 0, pos ), "", 0, true );
+ } catch (ZooKeeperException &e) {
+ throw ZooKeeperException( string("Unable to create "
+ "node ") +
+ path,
+ rc );
+ }
+ pos++;
+ } else {
+ //no more path components
+ return createNode( path, value, flags, false, returnPath );
+ }
+ }
+ }
+ LOG_ERROR( LOG,"Error %d for %s", rc, path.c_str() );
+ throw ZooKeeperException( string("Unable to create node ") +
+ path,
+ rc );
+ } else {
+ LOG_INFO( LOG, "%s has been created", realPath );
+ returnPath = string( realPath );
+ return true;
+ }
+}
+
+bool
+ZooKeeperAdapter::createNode(const string &path,
+ const string &value,
+ int flags,
+ bool createAncestors)
+ throw(ZooKeeperException)
+{
+ TRACE( LOG, "createNode" );
+
+ string createdPath;
+ return createNode( path, value, flags, createAncestors, createdPath );
+}
+
+int64_t
+ZooKeeperAdapter::createSequence(const string &path,
+ const string &value,
+ int flags,
+ bool createAncestors)
+ throw(ZooKeeperException)
+{
+ TRACE( LOG, "createSequence" );
+
+ string createdPath;
+ bool result = createNode( path,
+ value,
+ flags | ZOO_SEQUENCE,
+ createAncestors,
+ createdPath );
+ if (!result) {
+ return -1;
+ } else {
+ //extract sequence number from the returned path
+ if (createdPath.find( path ) != 0) {
+ throw ZooKeeperException( string("Expecting returned path '") +
+ createdPath +
+ "' to start with '" +
+ path +
+ "'" );
+ }
+ string seqSuffix =
+ createdPath.substr( path.length(),
+ createdPath.length() - path.length() );
+ char *ptr = NULL;
+ int64_t seq = strtol( seqSuffix.c_str(), &ptr, 10 );
+ if (ptr != NULL && *ptr != '\0') {
+ throw ZooKeeperException( string("Expecting a number but got ") +
+ seqSuffix );
+ }
+ return seq;
+ }
+}
+
+bool
+ZooKeeperAdapter::deleteNode(const string &path,
+ bool recursive,
+ int version)
+ throw(ZooKeeperException)
+{
+ TRACE( LOG, "deleteNode" );
+
+ validatePath( path );
+
+ int rc;
+ RetryHandler rh(m_zkConfig);
+ do {
+ verifyConnection();
+ rc = zoo_delete( mp_zkHandle, path.c_str(), version );
+ } while (rc != ZOK && rh.handleRC(rc));
+ if (rc != ZOK) {
+ if (rc == ZNONODE) {
+ LOG_WARN( LOG, "Error %d for %s", rc, path.c_str() );
+ return false;
+ }
+ if (rc == ZNOTEMPTY && recursive) {
+ LOG_WARN( LOG, "Error %d for %s", rc, path.c_str() );
+ //get all children and delete them recursively...
+ vector<string> nodeList;
+ getNodeChildren( nodeList, path, NULL );
+ for (vector<string>::const_iterator i = nodeList.begin();
+ i != nodeList.end();
+ ++i) {
+ deleteNode( *i, true );
+ }
+ //...and finally attempt to delete the node again
+ return deleteNode( path, false );
+ }
+ LOG_ERROR( LOG, "Error %d for %s", rc, path.c_str() );
+ throw ZooKeeperException( string("Unable to delete node ") + path,
+ rc );
+ } else {
+ LOG_INFO( LOG, "%s has been deleted", path.c_str() );
+ return true;
+ }
+}
+
+bool
+ZooKeeperAdapter::nodeExists(const string &path,
+ ZKEventListener *listener,
+ void *context, Stat *stat)
+ throw(ZooKeeperException)
+{
+ TRACE( LOG, "nodeExists" );
+
+ validatePath( path );
+
+ struct Stat tmpStat;
+ if (stat == NULL) {
+ stat = &tmpStat;
+ }
+ memset( stat, 0, sizeof(Stat) );
+
+ int rc;
+ RetryHandler rh(m_zkConfig);
+ do {
+ verifyConnection();
+ if (context != NULL) {
+ m_zkContextsMutex.Acquire();
+ rc = zoo_exists( mp_zkHandle,
+ path.c_str(),
+ (listener != NULL ? 1 : 0),
+ stat );
+ if (rc == ZOK || rc == ZNONODE) {
+ registerContext( NODE_EXISTS, path, listener, context );
+ }
+ m_zkContextsMutex.Release();
+ } else {
+ rc = zoo_exists( mp_zkHandle,
+ path.c_str(),
+ (listener != NULL ? 1 : 0),
+ stat );
+ }
+ } while (rc != ZOK && rh.handleRC(rc));
+ if (rc != ZOK) {
+ if (rc == ZNONODE) {
+ LOG_TRACE( LOG, "Node %s does not exist", path.c_str() );
+ return false;
+ }
+ LOG_ERROR( LOG, "Error %d for %s", rc, path.c_str() );
+ throw ZooKeeperException(
+ string("Unable to check existence of node ") + path,
+ rc );
+ } else {
+ return true;
+ }
+}
+
+void
+ZooKeeperAdapter::getNodeChildren(vector<string> &nodeList,
+ const string &path,
+ ZKEventListener *listener,
+ void *context)
+ throw (ZooKeeperException)
+{
+ TRACE( LOG, "getNodeChildren" );
+
+ validatePath( path );
+
+ String_vector children;
+ memset( &children, 0, sizeof(children) );
+
+ int rc;
+ RetryHandler rh(m_zkConfig);
+ do {
+ verifyConnection();
+ if (context != NULL) {
+ m_zkContextsMutex.Acquire();
+ rc = zoo_get_children( mp_zkHandle,
+ path.c_str(),
+ (listener != NULL ? 1 : 0),
+ &children );
+ if (rc == ZOK) {
+ registerContext( GET_NODE_CHILDREN, path, listener, context );
+ }
+ m_zkContextsMutex.Release();
+ } else {
+ rc = zoo_get_children( mp_zkHandle,
+ path.c_str(),
+ (listener != NULL ? 1 : 0),
+ &children );
+ }
+ } while (rc != ZOK && rh.handleRC(rc));
+ if (rc != ZOK) {
+ LOG_ERROR( LOG, "Error %d for %s", rc, path.c_str() );
+ throw ZooKeeperException( string("Unable to get children of node ") +
+ path,
+ rc );
+ } else {
+ for (int i = 0; i < children.count; ++i) {
+ //convert each child's path from relative to absolute
+ string absPath(path);
+ if (path != "/") {
+ absPath.append( "/" );
+ }
+ absPath.append( children.data[i] );
+ nodeList.push_back( absPath );
+ }
+ //make sure the order is always deterministic
+ sort( nodeList.begin(), nodeList.end() );
+ }
+}
+
+string
+ZooKeeperAdapter::getNodeData(const string &path,
+ ZKEventListener *listener,
+ void *context, Stat *stat)
+ throw(ZooKeeperException)
+{
+ TRACE( LOG, "getNodeData" );
+
+ validatePath( path );
+
+ const int MAX_DATA_LENGTH = 128 * 1024;
+ char buffer[MAX_DATA_LENGTH];
+ memset( buffer, 0, MAX_DATA_LENGTH );
+ struct Stat tmpStat;
+ if (stat == NULL) {
+ stat = &tmpStat;
+ }
+ memset( stat, 0, sizeof(Stat) );
+
+ int rc;
+ int len;
+ RetryHandler rh(m_zkConfig);
+ do {
+ verifyConnection();
+ len = MAX_DATA_LENGTH - 1;
+ if (context != NULL) {
+ m_zkContextsMutex.Acquire();
+ rc = zoo_get( mp_zkHandle,
+ path.c_str(),
+ (listener != NULL ? 1 : 0),
+ buffer, &len, stat );
+ if (rc == ZOK) {
+ registerContext( GET_NODE_DATA, path, listener, context );
+ }
+ m_zkContextsMutex.Release();
+ } else {
+ rc = zoo_get( mp_zkHandle,
+ path.c_str(),
+ (listener != NULL ? 1 : 0),
+ buffer, &len, stat );
+ }
+ } while (rc != ZOK && rh.handleRC(rc));
+ if (rc != ZOK) {
+ LOG_ERROR( LOG, "Error %d for %s", rc, path.c_str() );
+ throw ZooKeeperException(
+ string("Unable to get data of node ") + path, rc
+ );
+ } else {
+ if (len == -1) {
+ len = 0;
+ }
+ return string( buffer, len );
+ }
+}
+
+void
+ZooKeeperAdapter::setNodeData(const string &path,
+ const string &value,
+ int version)
+ throw(ZooKeeperException)
+{
+ TRACE( LOG, "setNodeData" );
+
+ validatePath( path );
+
+ int rc;
+ RetryHandler rh(m_zkConfig);
+ do {
+ verifyConnection();
+ rc = zoo_set( mp_zkHandle,
+ path.c_str(),
+ value.c_str(),
+ value.length(),
+ version);
+ } while (rc != ZOK && rh.handleRC(rc));
+ if (rc != ZOK) {
+ LOG_ERROR( LOG, "Error %d for %s", rc, path.c_str() );
+ throw ZooKeeperException( string("Unable to set data for node ") +
+ path,
+ rc );
+ }
+}
+
+} /* end of 'namespace zk' */
+
|