http://git-wip-us.apache.org/repos/asf/zookeeper/blob/eab8c1eb/src/contrib/zkfuse/src/event.h
----------------------------------------------------------------------
diff --git a/src/contrib/zkfuse/src/event.h b/src/contrib/zkfuse/src/event.h
deleted file mode 100644
index 936ecc6..0000000
--- a/src/contrib/zkfuse/src/event.h
+++ /dev/null
@@ -1,553 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#ifndef __EVENT_H__
-#define __EVENT_H__
-
-#include <string>
-#include <set>
-#include <deque>
-#include <algorithm>
-#ifdef GCC4
-# include <tr1/memory>
-using namespace std::tr1;
-#else
-# include <boost/shared_ptr.hpp>
-using namespace boost;
-#endif
-
-#include "log.h"
-#include "blockingqueue.h"
-#include "mutex.h"
-#include "thread.h"
-
-using namespace std;
-using namespace zk;
-
-namespace zkfuse {
-
-//forward declaration of EventSource
-template<typename E>
-class EventSource;
-
-/**
- * \brief This interface is implemented by an observer
- * \brief of a particular {@link EventSource}.
- */
-template<typename E>
-class EventListener {
- public:
-
- /**
- * \brief This method is invoked whenever an event
- * \brief has been received by the event source being observed.
- *
- * @param source the source the triggered the event
- * @param e the actual event being triggered
- */
- virtual void eventReceived(const EventSource<E> &source, const E &e) = 0;
-};
-
-/**
- * \brief This class represents a source of events.
- *
- * <p>
- * Each source can have many observers (listeners) attached to it
- * and in case of an event, this source may propagate the event
- * using {@link #fireEvent} method.
- */
-template<typename E>
-class EventSource {
- public:
-
- /**
- * \brief The type corresponding to the list of registered event listeners.
- */
- typedef set<EventListener<E> *> EventListeners;
-
- /**
- * \brief Registers a new event listener.
- *
- * @param listener the listener to be added to the set of listeners
- */
- void addListener(EventListener<E> *listener) {
- m_listeners.insert( listener );
- }
-
- /**
- * \brief Removes an already registered listener.
- *
- * @param listener the listener to be removed
- */
- void removeListener(EventListener<E> *listener) {
- m_listeners.erase( listener );
- }
-
- /**
- * \brief Destructor.
- */
- virtual ~EventSource() {}
-
- protected:
-
- /**
- * \brief Fires the given event to all registered listeners.
- *
- * <p>
- * This method essentially iterates over all listeners
- * and invokes {@link fireEvent(EventListener<E> *listener, const E &event)}
- * for each element. All derived classes are free to
- * override the method to provide better error handling
- * than the default implementation.
- *
- * @param event the event to be propagated to all listeners
- */
- void fireEvent(const E &event);
-
- /**
- * \brief Sends an event to the given listener.
- *
- * @param listener the listener to whom pass the event
- * @param event the event to be handled
- */
- virtual void fireEvent(EventListener<E> *listener, const E &event);
-
- private:
-
- /**
- * The set of registered event listeners.
- */
- EventListeners m_listeners;
-
-};
-
-/**
- * \brief The interface of a generic event wrapper.
- */
-class AbstractEventWrapper {
- public:
-
- /**
- * \brief Destructor.
- */
- virtual ~AbstractEventWrapper() {}
-
- /**
- * \brief Returns the underlying wrapee's data.
- */
- virtual void *getWrapee() = 0;
-};
-
-/**
- * \brief A template based implementation of {@link AbstractEventWrapper}.
- */
-template<typename E>
-class EventWrapper : public AbstractEventWrapper {
- public:
- EventWrapper(const E &e) : m_e(e) {
- }
- void *getWrapee() {
- return &m_e;
- }
- private:
- E m_e;
-};
-
-/**
- * \brief This class represents a generic event.
- */
-class GenericEvent {
- public:
-
- /**
- * \brief Constructor.
- */
- GenericEvent() : m_type(0) {}
-
- /**
- * \brief Constructor.
- *
- * @param type the type of this event
- * @param eventWarpper the wrapper around event's data
- */
- GenericEvent(int type, AbstractEventWrapper *eventWrapper) :
- m_type(type), m_eventWrapper(eventWrapper) {
- }
-
- /**
- * \brief Returns the type of this event.
- *
- * @return type of this event
- */
- int getType() const { return m_type; }
-
- /**
- * \brief Returns the event's data.
- *
- * @return the event's data
- */
- void *getEvent() const { return m_eventWrapper->getWrapee(); }
-
- private:
-
- /**
- * The event type.
- */
- int m_type;
-
- /**
- * The event represented as abstract wrapper.
- */
- boost::shared_ptr<AbstractEventWrapper> m_eventWrapper;
-
-};
-
-/**
- * \brief This class adapts {@link EventListener} to a generic listener.
- * Essentially this class listens on incoming events and fires them
- * as {@link GenericEvent}s.
- */
-template<typename E, const int type>
-class EventListenerAdapter : public virtual EventListener<E>,
- public virtual EventSource<GenericEvent>
-{
- public:
-
- /**
- * \brief Constructor.
- *
- * @param eventSource the source on which register this listener
- */
- EventListenerAdapter(EventSource<E> &eventSource) {
- eventSource.addListener(this);
- }
-
- void eventReceived(const EventSource<E> &source, const E &e) {
- AbstractEventWrapper *wrapper = new EventWrapper<E>(e);
- GenericEvent event(type, wrapper);
- fireEvent( event );
- }
-
-};
-
-/**
- * \brief This class provides an adapter between an asynchronous and synchronous
- * \brief event handling.
- *
- * <p>
- * This class queues up all received events and exposes them through
- * {@link #getNextEvent()} method.
- */
-template<typename E>
-class SynchronousEventAdapter : public EventListener<E> {
- public:
-
- void eventReceived(const EventSource<E> &source, const E &e) {
- m_queue.put( e );
- }
-
- /**
- * \brief Returns the next available event from the underlying queue,
- * \brief possibly blocking, if no data is available.
- *
- * @return the next available event
- */
- E getNextEvent() {
- return m_queue.take();
- }
-
- /**
- * \brief Returns whether there are any events in the queue or not.
- *
- * @return true if there is at least one event and
- * the next call to {@link #getNextEvent} won't block
- */
- bool hasEvents() const {
- return (m_queue.empty() ? false : true);
- }
-
- /**
- * \brief Destructor.
- */
- virtual ~SynchronousEventAdapter() {}
-
- private:
-
- /**
- * The blocking queue of all events received so far.
- */
- BlockingQueue<E> m_queue;
-
-};
-
-/**
- * This typedef defines the type of a timer Id.
- */
-typedef int32_t TimerId;
-
-/**
- * This class represents a timer event parametrized by the user's data type.
- */
-template<typename T>
-class TimerEvent {
- public:
-
- /**
- * \brief Constructor.
- *
- * @param id the ID of this event
- * @param alarmTime when this event is to be triggered
- * @param userData the user data associated with this event
- */
- TimerEvent(TimerId id, int64_t alarmTime, const T &userData) :
- m_id(id), m_alarmTime(alarmTime), m_userData(userData)
- {}
-
- /**
- * \brief Constructor.
- */
- TimerEvent() : m_id(-1), m_alarmTime(-1) {}
-
- /**
- * \brief Returns the ID.
- *
- * @return the ID of this event
- */
- TimerId getID() const { return m_id; }
-
- /**
- * \brief Returns the alarm time.
- *
- * @return the alarm time
- */
- int64_t getAlarmTime() const { return m_alarmTime; }
-
- /**
- * \brief Returns the user's data.
- *
- * @return the user's data
- */
- T const &getUserData() const { return m_userData; }
-
- /**
- * \brief Returns whether the given alarm time is less than this event's
- * \brief time.
- */
- bool operator<(const int64_t alarmTime) const {
- return m_alarmTime < alarmTime;
- }
-
- private:
-
- /**
- * The ID of ths event.
- */
- TimerId m_id;
-
- /**
- * The time at which this event triggers.
- */
- int64_t m_alarmTime;
-
- /**
- * The user specific data associated with this event.
- */
- T m_userData;
-
-};
-
-template<typename T>
-class Timer : public EventSource<TimerEvent<T> > {
- public:
-
- /**
- * \brief Constructor.
- */
- Timer() : m_currentEventID(0), m_terminating(false) {
- m_workerThread.Create( *this, &Timer<T>::sendAlarms );
- }
-
- /**
- * \brief Destructor.
- */
- ~Timer() {
- m_terminating = true;
- m_lock.notify();
- m_workerThread.Join();
- }
-
- /**
- * \brief Schedules the given event <code>timeFromNow</code> milliseconds.
- *
- * @param timeFromNow time from now, in milliseconds, when the event
- * should be triggered
- * @param userData the user data associated with the timer event
- *
- * @return the ID of the newly created timer event
- */
- TimerId scheduleAfter(int64_t timeFromNow, const T &userData) {
- return scheduleAt( getCurrentTimeMillis() + timeFromNow, userData );
- }
-
- /**
- * \brief Schedules an event at the given time.
- *
- * @param absTime absolute time, in milliseconds, at which the event
- * should be triggered; the time is measured
- * from Jan 1st, 1970
- * @param userData the user data associated with the timer event
- *
- * @return the ID of the newly created timer event
- */
- TimerId scheduleAt(int64_t absTime, const T &userData) {
- m_lock.lock();
- typename QueueType::iterator pos =
- lower_bound( m_queue.begin(), m_queue.end(), absTime );
- TimerId id = m_currentEventID++;
- TimerEvent<T> event(id, absTime, userData);
- m_queue.insert( pos, event );
- m_lock.notify();
- m_lock.unlock();
- return id;
- }
-
- /**
- * \brief Returns the current time since Jan 1, 1970, in milliseconds.
- *
- * @return the current time in milliseconds
- */
- static int64_t getCurrentTimeMillis() {
- struct timeval now;
- gettimeofday( &now, NULL );
- return now.tv_sec * 1000LL + now.tv_usec / 1000;
- }
-
- /**
- * \brief Cancels the given timer event.
- *
- *
- * @param eventID the ID of the event to be canceled
- *
- * @return whether the event has been canceled
- */
- bool cancelAlarm(TimerId eventID) {
- bool canceled = false;
- m_lock.lock();
- typename QueueType::iterator i;
- for (i = m_queue.begin(); i != m_queue.end(); ++i) {
- if (eventID == i->getID()) {
- m_queue.erase( i );
- canceled = true;
- break;
- }
- }
- m_lock.unlock();
- return canceled;
- }
-
- /**
- * Executes the main loop of the worker thread.
- */
- void sendAlarms() {
- //iterate until terminating
- while (!m_terminating) {
- m_lock.lock();
- //1 step - wait until there is an event in the queue
- if (m_queue.empty()) {
- //wait up to 100ms to get next event
- m_lock.wait( 100 );
- }
- bool fire = false;
- if (!m_queue.empty()) {
- //retrieve the event from the queue and send it
- TimerEvent<T> event = m_queue.front();
- //check whether we can send it right away
- int64_t timeToWait =
- event.getAlarmTime() - getCurrentTimeMillis();
- if (timeToWait <= 0) {
- m_queue.pop_front();
- //we fire only if it's still in the queue and alarm
- //time has just elapsed (in case the top event
- //is canceled)
- fire = true;
- } else {
- m_lock.wait( timeToWait );
- }
- m_lock.unlock();
- if (fire) {
- fireEvent( event );
- }
- } else {
- m_lock.unlock();
- }
- }
- }
-
- private:
-
- /**
- * The type of timer events queue.
- */
- typedef deque<TimerEvent<T> > QueueType;
-
- /**
- * The current event ID, auto-incremented each time a new event
- * is created.
- */
- TimerId m_currentEventID;
-
- /**
- * The queue of timer events sorted by {@link TimerEvent#alarmTime}.
- */
- QueueType m_queue;
-
- /**
- * The lock used to guard {@link #m_queue}.
- */
- Lock m_lock;
-
- /**
- * The thread that triggers alarms.
- */
- CXXThread<Timer<T> > m_workerThread;
-
- /**
- * Whether {@link #m_workerThread} is terminating.
- */
- volatile bool m_terminating;
-
-};
-
-template<typename E>
-void EventSource<E>::fireEvent(const E &event) {
- for (typename EventListeners::iterator i = m_listeners.begin();
- i != m_listeners.end();
- ++i)
- {
- fireEvent( *i, event );
- }
-}
-
-template<typename E>
-void EventSource<E>::fireEvent(EventListener<E> *listener, const E &event) {
- listener->eventReceived( *this, event );
-}
-
-} /* end of 'namespace zkfuse' */
-
-#endif /* __EVENT_H__ */
http://git-wip-us.apache.org/repos/asf/zookeeper/blob/eab8c1eb/src/contrib/zkfuse/src/log.cc
----------------------------------------------------------------------
diff --git a/src/contrib/zkfuse/src/log.cc b/src/contrib/zkfuse/src/log.cc
deleted file mode 100644
index e2bfb0d..0000000
--- a/src/contrib/zkfuse/src/log.cc
+++ /dev/null
@@ -1,36 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include <string>
-
-#include "log.h"
-
-using namespace std;
-
-/**
- * \brief This class encapsulates a log4cxx configuration.
- */
-class LogConfiguration {
- public:
- LogConfiguration(const string &file) {
- PropertyConfigurator::configureAndWatch( file, 5000 );
- }
-};
-
-//enforces the configuration to be initialized
-static LogConfiguration logConfig( "log4cxx.properties" );
http://git-wip-us.apache.org/repos/asf/zookeeper/blob/eab8c1eb/src/contrib/zkfuse/src/log.h
----------------------------------------------------------------------
diff --git a/src/contrib/zkfuse/src/log.h b/src/contrib/zkfuse/src/log.h
deleted file mode 100644
index aefce10..0000000
--- a/src/contrib/zkfuse/src/log.h
+++ /dev/null
@@ -1,116 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#ifndef __LOG_H__
-#define __LOG_H__
-
-#define ZKFUSE_NAMESPACE zkfuse
-#define START_ZKFUSE_NAMESPACE namespace ZKFUSE_NAMESPACE {
-#define END_ZKFUSE_NAMESPACE }
-#define USING_ZKFUSE_NAMESPACE using namespace ZKFUSE_NAMESPACE;
-
-#include <stdlib.h>
-#include <stdio.h>
-#include <pthread.h>
-
-#include <log4cxx/logger.h>
-#include <log4cxx/propertyconfigurator.h>
-#include <log4cxx/helpers/exception.h>
-using namespace log4cxx;
-using namespace log4cxx::helpers;
-
-#define PRINTIP(x) ((uint8_t*)&x)[0], ((uint8_t*)&x)[1], \
- ((uint8_t*)&x)[2], ((uint8_t*)&x)[3]
-
-#define IPFMT "%u.%u.%u.%u"
-
-#define DECLARE_LOGGER(varName) \
-extern LoggerPtr varName;
-
-#define DEFINE_LOGGER(varName, logName) \
-static LoggerPtr varName = Logger::getLogger( logName );
-
-#define MAX_BUFFER_SIZE 20000
-
-#define SPRINTF_LOG_MSG(buffer, fmt, args...) \
- char buffer[MAX_BUFFER_SIZE]; \
- snprintf( buffer, MAX_BUFFER_SIZE, fmt, ##args );
-
-// older versions of log4cxx don't support tracing
-#ifdef LOG4CXX_TRACE
-#define LOG_TRACE(logger, fmt, args...) \
- if (logger->isTraceEnabled()) { \
- SPRINTF_LOG_MSG( __tmp, fmt, ##args ); \
- LOG4CXX_TRACE( logger, __tmp ); \
- }
-#else
-#define LOG_TRACE(logger, fmt, args...) \
- if (logger->isDebugEnabled()) { \
- SPRINTF_LOG_MSG( __tmp, fmt, ##args ); \
- LOG4CXX_DEBUG( logger, __tmp ); \
- }
-#endif
-
-#define LOG_DEBUG(logger, fmt, args...) \
- if (logger->isDebugEnabled()) { \
- SPRINTF_LOG_MSG( __tmp, fmt, ##args ); \
- LOG4CXX_DEBUG( logger, __tmp ); \
- }
-
-#define LOG_INFO(logger, fmt, args...) \
- if (logger->isInfoEnabled()) { \
- SPRINTF_LOG_MSG( __tmp, fmt, ##args ); \
- LOG4CXX_INFO( logger, __tmp ); \
- }
-
-#define LOG_WARN(logger, fmt, args...) \
- if (logger->isWarnEnabled()) { \
- SPRINTF_LOG_MSG( __tmp, fmt, ##args ); \
- LOG4CXX_WARN( logger, __tmp ); \
- }
-
-#define LOG_ERROR(logger, fmt, args...) \
- if (logger->isErrorEnabled()) { \
- SPRINTF_LOG_MSG( __tmp, fmt, ##args ); \
- LOG4CXX_ERROR( logger, __tmp ); \
- }
-
-#define LOG_FATAL(logger, fmt, args...) \
- if (logger->isFatalEnabled()) { \
- SPRINTF_LOG_MSG( __tmp, fmt, ##args ); \
- LOG4CXX_FATAL( logger, __tmp ); \
- }
-
-#ifdef DISABLE_TRACE
-# define TRACE(logger, x)
-#else
-# define TRACE(logger, x) \
-class Trace { \
- public: \
- Trace(const void* p) : _p(p) { \
- LOG_TRACE(logger, "%s %p Enter", __PRETTY_FUNCTION__, p); \
- } \
- ~Trace() { \
- LOG_TRACE(logger, "%s %p Exit", __PRETTY_FUNCTION__, _p); \
- } \
- const void* _p; \
-} traceObj(x);
-#endif /* DISABLE_TRACE */
-
-#endif /* __LOG_H__ */
-
http://git-wip-us.apache.org/repos/asf/zookeeper/blob/eab8c1eb/src/contrib/zkfuse/src/log4cxx.properties
----------------------------------------------------------------------
diff --git a/src/contrib/zkfuse/src/log4cxx.properties b/src/contrib/zkfuse/src/log4cxx.properties
deleted file mode 100644
index 1e373e4..0000000
--- a/src/contrib/zkfuse/src/log4cxx.properties
+++ /dev/null
@@ -1,28 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-
-# http://www.apache.org/licenses/LICENSE-2.0
-
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-# Set root logger level to DEBUG and its only appender to A1.
-log4j.rootLogger=TRACE, A1
-
-# A1 is set to be a ConsoleAppender.
-log4j.appender.A1=org.apache.log4cxx.ConsoleAppender
-
-# A1 uses PatternLayout.
-log4j.appender.A1.layout=org.apache.log4cxx.PatternLayout
-log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
-
-log4j.category.zkfuse=TRACE
-
http://git-wip-us.apache.org/repos/asf/zookeeper/blob/eab8c1eb/src/contrib/zkfuse/src/mutex.h
----------------------------------------------------------------------
diff --git a/src/contrib/zkfuse/src/mutex.h b/src/contrib/zkfuse/src/mutex.h
deleted file mode 100644
index 86c4604..0000000
--- a/src/contrib/zkfuse/src/mutex.h
+++ /dev/null
@@ -1,169 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#ifndef __MUTEX_H__
-#define __MUTEX_H__
-
-#include <pthread.h>
-#include <errno.h>
-#include <sys/time.h>
-
-#include "log.h"
-
-START_ZKFUSE_NAMESPACE
-
-class Cond;
-
-class Mutex {
- friend class Cond;
- public:
- Mutex() {
- pthread_mutexattr_init( &m_mutexAttr );
- pthread_mutexattr_settype( &m_mutexAttr, PTHREAD_MUTEX_RECURSIVE_NP );
- pthread_mutex_init( &mutex, &m_mutexAttr );
- }
- ~Mutex() {
- pthread_mutex_destroy(&mutex);
- pthread_mutexattr_destroy( &m_mutexAttr );
- }
- void Acquire() { Lock(); }
- void Release() { Unlock(); }
- void Lock() {
- pthread_mutex_lock(&mutex);
- }
- int TryLock() {
- return pthread_mutex_trylock(&mutex);
- }
- void Unlock() {
- pthread_mutex_unlock(&mutex);
- }
- private:
- pthread_mutex_t mutex;
- pthread_mutexattr_t m_mutexAttr;
-};
-
-class AutoLock {
- public:
- AutoLock(Mutex& mutex) : _mutex(mutex) {
- mutex.Lock();
- }
- ~AutoLock() {
- _mutex.Unlock();
- }
- private:
- friend class AutoUnlockTemp;
- Mutex& _mutex;
-};
-
-class AutoUnlockTemp {
- public:
- AutoUnlockTemp(AutoLock & autoLock) : _autoLock(autoLock) {
- _autoLock._mutex.Unlock();
- }
- ~AutoUnlockTemp() {
- _autoLock._mutex.Lock();
- }
- private:
- AutoLock & _autoLock;
-};
-
-class Cond {
- public:
- Cond() {
- static pthread_condattr_t attr;
- static bool inited = false;
- if(!inited) {
- inited = true;
- pthread_condattr_init(&attr);
- }
- pthread_cond_init(&_cond, &attr);
- }
- ~Cond() {
- pthread_cond_destroy(&_cond);
- }
-
- void Wait(Mutex& mutex) {
- pthread_cond_wait(&_cond, &mutex.mutex);
- }
-
- bool Wait(Mutex& mutex, long long int timeout) {
- struct timeval now;
- gettimeofday( &now, NULL );
- struct timespec abstime;
- int64_t microSecs = now.tv_sec * 1000000LL + now.tv_usec;
- microSecs += timeout * 1000;
- abstime.tv_sec = microSecs / 1000000LL;
- abstime.tv_nsec = (microSecs % 1000000LL) * 1000;
- if (pthread_cond_timedwait(&_cond, &mutex.mutex, &abstime) == ETIMEDOUT) {
- return false;
- } else {
- return true;
- }
- }
-
- void Signal() {
- pthread_cond_signal(&_cond);
- }
-
- private:
- pthread_cond_t _cond;
-};
-
-/**
- * A wrapper class for {@link Mutex} and {@link Cond}.
- */
-class Lock {
- public:
-
- void lock() {
- m_mutex.Lock();
- }
-
- void unlock() {
- m_mutex.Unlock();
- }
-
- void wait() {
- m_cond.Wait( m_mutex );
- }
-
- bool wait(long long int timeout) {
- return m_cond.Wait( m_mutex, timeout );
- }
-
- void notify() {
- m_cond.Signal();
- }
-
- private:
-
- /**
- * The mutex.
- */
- Mutex m_mutex;
-
- /**
- * The condition associated with this lock's mutex.
- */
- Cond m_cond;
-};
-
-END_ZKFUSE_NAMESPACE
-
-#endif /* __MUTEX_H__ */
-
http://git-wip-us.apache.org/repos/asf/zookeeper/blob/eab8c1eb/src/contrib/zkfuse/src/thread.cc
----------------------------------------------------------------------
diff --git a/src/contrib/zkfuse/src/thread.cc b/src/contrib/zkfuse/src/thread.cc
deleted file mode 100644
index f1ed816..0000000
--- a/src/contrib/zkfuse/src/thread.cc
+++ /dev/null
@@ -1,41 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include <log.h>
-
-#include "thread.h"
-
-DEFINE_LOGGER( LOG, "Thread" )
-
-START_ZKFUSE_NAMESPACE
-
-void Thread::Create(void* ctx, ThreadFunc func)
-{
- pthread_attr_t attr;
- pthread_attr_init(&attr);
- pthread_attr_setstacksize(&attr, _stackSize);
- int ret = pthread_create(&mThread, &attr, func, ctx);
- if(ret != 0) {
- LOG_FATAL( LOG, "pthread_create failed: %s", strerror(errno) );
- }
- // pthread_attr_destroy(&attr);
- _ctx = ctx;
- _func = func;
-}
-
-END_ZKFUSE_NAMESPACE
http://git-wip-us.apache.org/repos/asf/zookeeper/blob/eab8c1eb/src/contrib/zkfuse/src/thread.h
----------------------------------------------------------------------
diff --git a/src/contrib/zkfuse/src/thread.h b/src/contrib/zkfuse/src/thread.h
deleted file mode 100644
index 0ed12d7..0000000
--- a/src/contrib/zkfuse/src/thread.h
+++ /dev/null
@@ -1,99 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#ifndef __THREAD_H__
-#define __THREAD_H__
-
-#include <errno.h>
-#include <string.h>
-#include <assert.h>
-#include <pthread.h>
-
-#include "log.h"
-
-START_ZKFUSE_NAMESPACE
-
-class Thread {
- public:
- static const size_t defaultStackSize = 1024 * 1024;
- typedef void* (*ThreadFunc) (void*);
- Thread(size_t stackSize = defaultStackSize)
- : _stackSize(stackSize), _ctx(NULL), _func(NULL)
- {
- memset( &mThread, 0, sizeof(mThread) );
- }
- ~Thread() { }
-
- void Create(void* ctx, ThreadFunc func);
- void Join() {
- //avoid SEGFAULT because of unitialized mThread
- //in case Create(...) was never called
- if (_func != NULL) {
- pthread_join(mThread, 0);
- }
- }
- private:
- pthread_t mThread;
- void *_ctx;
- ThreadFunc _func;
- size_t _stackSize;
-};
-
-
-template<typename T>
-struct ThreadContext {
- typedef void (T::*FuncPtr) (void);
- ThreadContext(T& ctx, FuncPtr func) : _ctx(ctx), _func(func) {}
- void run(void) {
- (_ctx.*_func)();
- }
- T& _ctx;
- FuncPtr _func;
-};
-
-template<typename T>
-void* ThreadExec(void *obj) {
- ThreadContext<T>* tc = (ThreadContext<T>*)(obj);
- assert(tc != 0);
- tc->run();
- return 0;
-}
-
-template <typename T>
-class CXXThread : public Thread {
- public:
- typedef void (T::*FuncPtr) (void);
- CXXThread(size_t stackSize = Thread::defaultStackSize)
- : Thread(stackSize), ctx(0) {}
- ~CXXThread() { if (ctx) delete ctx; }
-
- void Create(T& obj, FuncPtr func) {
- assert(ctx == 0);
- ctx = new ThreadContext<T>(obj, func);
- Thread::Create(ctx, ThreadExec<T>);
- }
-
- private:
- ThreadContext<T>* ctx;
-};
-
-
-END_ZKFUSE_NAMESPACE
-
-#endif /* __THREAD_H__ */
-
http://git-wip-us.apache.org/repos/asf/zookeeper/blob/eab8c1eb/src/contrib/zkfuse/src/zkadapter.cc
----------------------------------------------------------------------
diff --git a/src/contrib/zkfuse/src/zkadapter.cc b/src/contrib/zkfuse/src/zkadapter.cc
deleted file mode 100644
index 7f02fa3..0000000
--- a/src/contrib/zkfuse/src/zkadapter.cc
+++ /dev/null
@@ -1,884 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include <algorithm>
-#include <iostream>
-
-#include "blockingqueue.h"
-#include "thread.h"
-#include "zkadapter.h"
-
-using namespace std;
-using namespace zk;
-
-DEFINE_LOGGER( LOG, "zookeeper.adapter" )
-DEFINE_LOGGER( ZK_LOG, "zookeeper.core" )
-
-/**
- * \brief A helper class to initialize ZK logging.
- */
-class InitZooKeeperLogging
-{
- public:
- InitZooKeeperLogging() {
- if (ZK_LOG->isDebugEnabled()
-#ifdef LOG4CXX_TRACE
- || ZK_LOG->isTraceEnabled()
-#endif
- )
- {
- zoo_set_debug_level( ZOO_LOG_LEVEL_DEBUG );
- } else if (ZK_LOG->isInfoEnabled()) {
- zoo_set_debug_level( ZOO_LOG_LEVEL_INFO );
- } else if (ZK_LOG->isWarnEnabled()) {
- zoo_set_debug_level( ZOO_LOG_LEVEL_WARN );
- } else {
- zoo_set_debug_level( ZOO_LOG_LEVEL_ERROR );
- }
- }
-};
-
-using namespace std;
-
-namespace zk
-{
-
-/**
- * \brief This class provides logic for checking if a request can be retried.
- */
-class RetryHandler
-{
- public:
- RetryHandler(const ZooKeeperConfig &zkConfig)
- : m_zkConfig(zkConfig)
- {
- if (zkConfig.getAutoReconnect()) {
- retries = 2;
- } else {
- retries = 0;
- }
- }
-
- /**
- * \brief Attempts to fix a side effect of the given RC.
- *
- * @param rc the ZK error code
- * @return whether the error code has been handled and the caller should
- * retry an operation the caused this error
- */
- bool handleRC(int rc)
- {
- TRACE( LOG, "handleRC" );
-
- //check if the given error code is recoverable
- if (!retryOnError(rc)) {
- return false;
- }
- LOG_TRACE( LOG, "RC: %d, retries left: %d", rc, retries );
- if (retries-- > 0) {
- return true;
- } else {
- return false;
- }
- }
-
- private:
- /**
- * The ZK config.
- */
- const ZooKeeperConfig &m_zkConfig;
-
- /**
- * The number of outstanding retries.
- */
- int retries;
-
- /**
- * Checks whether the given error entitles this adapter
- * to retry the previous operation.
- *
- * @param zkErrorCode one of the ZK error code
- */
- static bool retryOnError(int zkErrorCode)
- {
- return (zkErrorCode == ZCONNECTIONLOSS ||
- zkErrorCode == ZOPERATIONTIMEOUT);
- }
-};
-
-
-//the implementation of the global ZK event watcher
-void zkWatcher(zhandle_t *zh, int type, int state, const char *path,
- void *watcherCtx)
-{
- TRACE( LOG, "zkWatcher" );
-
- //a workaround for buggy ZK API
- string sPath =
- (path == NULL ||
- state == ZOO_SESSION_EVENT ||
- state == ZOO_NOTWATCHING_EVENT)
- ? ""
- : string(path);
- LOG_INFO( LOG,
- "Received a ZK event - type: %d, state: %d, path: '%s'",
- type, state, sPath.c_str() );
- ZooKeeperAdapter *zka = (ZooKeeperAdapter *)zoo_get_context(zh);
- if (zka != NULL) {
- zka->enqueueEvent( type, state, sPath );
- } else {
- LOG_ERROR( LOG,
- "Skipping ZK event (type: %d, state: %d, path: '%s'), "
- "because ZK passed no context",
- type, state, sPath.c_str() );
- }
-}
-
-
-
-// =======================================================================
-
-ZooKeeperAdapter::ZooKeeperAdapter(ZooKeeperConfig config,
- ZKEventListener *listener,
- bool establishConnection)
- throw(ZooKeeperException)
- : m_zkConfig(config),
- mp_zkHandle(NULL),
- m_terminating(false),
- m_connected(false),
- m_state(AS_DISCONNECTED)
-{
- TRACE( LOG, "ZooKeeperAdapter" );
-
- resetRemainingConnectTimeout();
-
- //enforce setting up appropriate ZK log level
- static InitZooKeeperLogging INIT_ZK_LOGGING;
-
- if (listener != NULL) {
- addListener(listener);
- }
-
- //start the event dispatcher thread
- m_eventDispatcher.Create( *this, &ZooKeeperAdapter::processEvents );
-
- //start the user event dispatcher thread
- m_userEventDispatcher.Create( *this, &ZooKeeperAdapter::processUserEvents );
-
- //optionally establish the connection
- if (establishConnection) {
- reconnect();
- }
-}
-
-ZooKeeperAdapter::~ZooKeeperAdapter()
-{
- TRACE( LOG, "~ZooKeeperAdapter" );
-
- try {
- disconnect();
- } catch (std::exception &e) {
- LOG_ERROR( LOG,
- "An exception while disconnecting from ZK: %s",
- e.what() );
- }
- m_terminating = true;
- m_userEventDispatcher.Join();
- m_eventDispatcher.Join();
-}
-
-void
-ZooKeeperAdapter::validatePath(const string &path) throw(ZooKeeperException)
-{
- TRACE( LOG, "validatePath" );
-
- if (path.find( "/" ) != 0) {
- throw ZooKeeperException( string("Node path must start with '/' but"
- "it was '") +
- path +
- "'" );
- }
- if (path.length() > 1) {
- if (path.rfind( "/" ) == path.length() - 1) {
- throw ZooKeeperException( string("Node path must not end with "
- "'/' but it was '") +
- path +
- "'" );
- }
- if (path.find( "//" ) != string::npos) {
- throw ZooKeeperException( string("Node path must not contain "
- "'//' but it was '") +
- path +
- "'" );
- }
- }
-}
-
-void
-ZooKeeperAdapter::disconnect()
-{
- TRACE( LOG, "disconnect" );
- LOG_TRACE( LOG, "mp_zkHandle: %p, state %d", mp_zkHandle, m_state );
-
- m_stateLock.lock();
- if (mp_zkHandle != NULL) {
- zookeeper_close( mp_zkHandle );
- mp_zkHandle = NULL;
- setState( AS_DISCONNECTED );
- }
- m_stateLock.unlock();
-}
-
-void
-ZooKeeperAdapter::reconnect() throw(ZooKeeperException)
-{
- TRACE( LOG, "reconnect" );
-
- m_stateLock.lock();
- //clear the connection state
- disconnect();
-
- //establish a new connection to ZooKeeper
- mp_zkHandle = zookeeper_init( m_zkConfig.getHosts().c_str(),
- zkWatcher,
- m_zkConfig.getLeaseTimeout(),
- NULL, this, 0);
- resetRemainingConnectTimeout();
- if (mp_zkHandle != NULL) {
- setState( AS_CONNECTING );
- m_stateLock.unlock();
- } else {
- m_stateLock.unlock();
- throw ZooKeeperException(
- string("Unable to connect to ZK running at '") +
- m_zkConfig.getHosts() + "'" );
- }
-
- LOG_DEBUG( LOG, "mp_zkHandle: %p, state %d", mp_zkHandle, m_state );
-}
-
-void
-ZooKeeperAdapter::handleEvent(int type, int state, const string &path)
-{
- TRACE( LOG, "handleEvent" );
- LOG_TRACE( LOG,
- "type: %d, state %d, path: %s",
- type, state, path.c_str() );
- Listener2Context context, context2;
- //ignore internal ZK events
- if (type != ZOO_SESSION_EVENT && type != ZOO_NOTWATCHING_EVENT) {
- m_zkContextsMutex.Acquire();
- //check if the user context is available
- if (type == ZOO_CHANGED_EVENT || type == ZOO_DELETED_EVENT) {
- //we may have two types of interest here,
- //in this case lets try to notify twice
- context = findAndRemoveListenerContext( GET_NODE_DATA, path );
- context2 = findAndRemoveListenerContext( NODE_EXISTS, path );
- if (context.empty()) {
- //make sure that the 2nd context is NULL and
- // assign it to the 1st one
- context = context2;
- context2.clear();
- }
- } else if (type == ZOO_CHILD_EVENT) {
- context = findAndRemoveListenerContext( GET_NODE_CHILDREN, path );
- } else if (type == ZOO_CREATED_EVENT) {
- context = findAndRemoveListenerContext( NODE_EXISTS, path );
- }
- m_zkContextsMutex.Release();
- }
-
- handleEvent( type, state, path, context );
- if (!context2.empty()) {
- handleEvent( type, state, path, context2 );
- }
-}
-
-void
-ZooKeeperAdapter::handleEvent(int type,
- int state,
- const string &path,
- const Listener2Context &listeners)
-{
- TRACE( LOG, "handleEvents" );
-
- if (listeners.empty()) {
- //propagate with empty context
- ZKWatcherEvent event(type, state, path);
- fireEvent( event );
- } else {
- for (Listener2Context::const_iterator i = listeners.begin();
- i != listeners.end();
- ++i) {
- ZKWatcherEvent event(type, state, path, i->second);
- if (i->first != NULL) {
- fireEvent( i->first, event );
- } else {
- fireEvent( event );
- }
- }
- }
-}
-
-void
-ZooKeeperAdapter::enqueueEvent(int type, int state, const string &path)
-{
- TRACE( LOG, "enqueueEvents" );
-
- m_events.put( ZKWatcherEvent( type, state, path ) );
-}
-
-void
-ZooKeeperAdapter::processEvents()
-{
- TRACE( LOG, "processEvents" );
-
- while (!m_terminating) {
- bool timedOut = false;
- ZKWatcherEvent source = m_events.take( 100, &timedOut );
- if (!timedOut) {
- if (source.getType() == ZOO_SESSION_EVENT) {
- LOG_INFO( LOG,
- "Received SESSION event, state: %d. Adapter state: %d",
- source.getState(), m_state );
- m_stateLock.lock();
- if (source.getState() == ZOO_CONNECTED_STATE) {
- m_connected = true;
- resetRemainingConnectTimeout();
- setState( AS_CONNECTED );
- } else if (source.getState() == ZOO_CONNECTING_STATE) {
- m_connected = false;
- setState( AS_CONNECTING );
- } else if (source.getState() == ZOO_EXPIRED_SESSION_STATE) {
- LOG_INFO( LOG, "Received EXPIRED_SESSION event" );
- setState( AS_SESSION_EXPIRED );
- }
- m_stateLock.unlock();
- }
- m_userEvents.put( source );
- }
- }
-}
-
-void
-ZooKeeperAdapter::processUserEvents()
-{
- TRACE( LOG, "processUserEvents" );
-
- while (!m_terminating) {
- bool timedOut = false;
- ZKWatcherEvent source = m_userEvents.take( 100, &timedOut );
- if (!timedOut) {
- try {
- handleEvent( source.getType(),
- source.getState(),
- source.getPath() );
- } catch (std::exception &e) {
- LOG_ERROR( LOG,
- "Unable to process event (type: %d, state: %d, "
- "path: %s), because of exception: %s",
- source.getType(),
- source.getState(),
- source.getPath().c_str(),
- e.what() );
- }
- }
- }
-}
-
-void
-ZooKeeperAdapter::registerContext(WatchableMethod method,
- const string &path,
- ZKEventListener *listener,
- ContextType context)
-{
- TRACE( LOG, "registerContext" );
-
- m_zkContexts[method][path][listener] = context;
-}
-
-ZooKeeperAdapter::Listener2Context
-ZooKeeperAdapter::findAndRemoveListenerContext(WatchableMethod method,
- const string &path)
-{
- TRACE( LOG, "findAndRemoveListenerContext" );
-
- Listener2Context listeners;
- Path2Listener2Context::iterator elem = m_zkContexts[method].find( path );
- if (elem != m_zkContexts[method].end()) {
- listeners = elem->second;
- m_zkContexts[method].erase( elem );
- }
- return listeners;
-}
-
-void
-ZooKeeperAdapter::setState(AdapterState newState)
-{
- TRACE( LOG, "setState" );
- if (newState != m_state) {
- LOG_INFO( LOG, "Adapter state transition: %d -> %d", m_state, newState );
- m_state = newState;
- m_stateLock.notify();
- } else {
- LOG_TRACE( LOG, "New state same as the current: %d", newState );
- }
-}
-
-
-//TODO move this code to verifyConnection so reconnect()
-//is called from one place only
-void
-ZooKeeperAdapter::waitUntilConnected()
- throw(ZooKeeperException)
-{
- TRACE( LOG, "waitUntilConnected" );
- long long int timeout = getRemainingConnectTimeout();
- LOG_INFO( LOG,
- "Waiting up to %lld ms until a connection to ZK is established",
- timeout );
- bool connected;
- if (timeout > 0) {
- long long int toWait = timeout;
- while (m_state != AS_CONNECTED && toWait > 0) {
- //check if session expired and reconnect if so
- if (m_state == AS_SESSION_EXPIRED) {
- LOG_INFO( LOG,
- "Reconnecting because the current session has expired" );
- reconnect();
- }
- struct timeval now;
- gettimeofday( &now, NULL );
- int64_t milliSecs = -(now.tv_sec * 1000LL + now.tv_usec / 1000);
- LOG_TRACE( LOG, "About to wait %lld ms", toWait );
- m_stateLock.wait( toWait );
- gettimeofday( &now, NULL );
- milliSecs += now.tv_sec * 1000LL + now.tv_usec / 1000;
- toWait -= milliSecs;
- }
- waitedForConnect( timeout - toWait );
- LOG_INFO( LOG, "Waited %lld ms", timeout - toWait );
- }
- connected = (m_state == AS_CONNECTED);
- if (!connected) {
- if (timeout > 0) {
- LOG_WARN( LOG, "Timed out while waiting for connection to ZK" );
- throw ZooKeeperException("Timed out while waiting for "
- "connection to ZK");
- } else {
- LOG_ERROR( LOG, "Global timeout expired and still not connected to ZK" );
- throw ZooKeeperException("Global timeout expired and still not "
- "connected to ZK");
- }
- }
- LOG_INFO( LOG, "Connected!" );
-}
-
-void
-ZooKeeperAdapter::verifyConnection() throw(ZooKeeperException)
-{
- TRACE( LOG, "verifyConnection" );
-
- m_stateLock.lock();
- try {
- if (m_state == AS_DISCONNECTED) {
- throw ZooKeeperException("Disconnected from ZK. " \
- "Please use reconnect() before attempting to use any ZK API");
- } else if (m_state != AS_CONNECTED) {
- LOG_TRACE( LOG, "Checking if need to reconnect..." );
- //we are not connected, so check if connection in progress...
- if (m_state != AS_CONNECTING) {
- LOG_TRACE( LOG,
- "yes. Checking if allowed to auto-reconnect..." );
- //...not in progres, so check if we can reconnect
- if (!m_zkConfig.getAutoReconnect()) {
- //...too bad, disallowed :(
- LOG_TRACE( LOG, "no. Sorry." );
- throw ZooKeeperException("ZK connection is down and "
- "auto-reconnect is not allowed");
- } else {
- LOG_TRACE( LOG, "...yes. About to reconnect" );
- }
- //...we are good to retry the connection
- reconnect();
- } else {
- LOG_TRACE( LOG, "...no, already in CONNECTING state" );
- }
- //wait until the connection is established
- waitUntilConnected();
- }
- } catch (ZooKeeperException &e) {
- m_stateLock.unlock();
- throw;
- }
- m_stateLock.unlock();
-}
-
-bool
-ZooKeeperAdapter::createNode(const string &path,
- const string &value,
- int flags,
- bool createAncestors,
- string &returnPath)
- throw(ZooKeeperException)
-{
- TRACE( LOG, "createNode (internal)" );
- validatePath( path );
-
- const int MAX_PATH_LENGTH = 1024;
- char realPath[MAX_PATH_LENGTH];
- realPath[0] = 0;
-
- int rc;
- RetryHandler rh(m_zkConfig);
- do {
- verifyConnection();
- rc = zoo_create( mp_zkHandle,
- path.c_str(),
- value.c_str(),
- value.length(),
- &ZOO_OPEN_ACL_UNSAFE,
- flags,
- realPath,
- MAX_PATH_LENGTH );
- } while (rc != ZOK && rh.handleRC(rc));
- if (rc != ZOK) {
- if (rc == ZNODEEXISTS) {
- //the node already exists
- LOG_WARN( LOG, "Error %d for %s", rc, path.c_str() );
- return false;
- } else if (rc == ZNONODE && createAncestors) {
- LOG_WARN( LOG, "Error %d for %s", rc, path.c_str() );
- //one of the ancestors doesn't exist so lets start from the root
- //and make sure the whole path exists, creating missing nodes if
- //necessary
- for (string::size_type pos = 1; pos != string::npos; ) {
- pos = path.find( "/", pos );
- if (pos != string::npos) {
- try {
- createNode( path.substr( 0, pos ), "", 0, true );
- } catch (ZooKeeperException &e) {
- throw ZooKeeperException( string("Unable to create "
- "node ") +
- path,
- rc );
- }
- pos++;
- } else {
- //no more path components
- return createNode( path, value, flags, false, returnPath );
- }
- }
- }
- LOG_ERROR( LOG,"Error %d for %s", rc, path.c_str() );
- throw ZooKeeperException( string("Unable to create node ") +
- path,
- rc );
- } else {
- LOG_INFO( LOG, "%s has been created", realPath );
- returnPath = string( realPath );
- return true;
- }
-}
-
-bool
-ZooKeeperAdapter::createNode(const string &path,
- const string &value,
- int flags,
- bool createAncestors)
- throw(ZooKeeperException)
-{
- TRACE( LOG, "createNode" );
-
- string createdPath;
- return createNode( path, value, flags, createAncestors, createdPath );
-}
-
-int64_t
-ZooKeeperAdapter::createSequence(const string &path,
- const string &value,
- int flags,
- bool createAncestors)
- throw(ZooKeeperException)
-{
- TRACE( LOG, "createSequence" );
-
- string createdPath;
- bool result = createNode( path,
- value,
- flags | ZOO_SEQUENCE,
- createAncestors,
- createdPath );
- if (!result) {
- return -1;
- } else {
- //extract sequence number from the returned path
- if (createdPath.find( path ) != 0) {
- throw ZooKeeperException( string("Expecting returned path '") +
- createdPath +
- "' to start with '" +
- path +
- "'" );
- }
- string seqSuffix =
- createdPath.substr( path.length(),
- createdPath.length() - path.length() );
- char *ptr = NULL;
- int64_t seq = strtol( seqSuffix.c_str(), &ptr, 10 );
- if (ptr != NULL && *ptr != '\0') {
- throw ZooKeeperException( string("Expecting a number but got ") +
- seqSuffix );
- }
- return seq;
- }
-}
-
-bool
-ZooKeeperAdapter::deleteNode(const string &path,
- bool recursive,
- int version)
- throw(ZooKeeperException)
-{
- TRACE( LOG, "deleteNode" );
-
- validatePath( path );
-
- int rc;
- RetryHandler rh(m_zkConfig);
- do {
- verifyConnection();
- rc = zoo_delete( mp_zkHandle, path.c_str(), version );
- } while (rc != ZOK && rh.handleRC(rc));
- if (rc != ZOK) {
- if (rc == ZNONODE) {
- LOG_WARN( LOG, "Error %d for %s", rc, path.c_str() );
- return false;
- }
- if (rc == ZNOTEMPTY && recursive) {
- LOG_WARN( LOG, "Error %d for %s", rc, path.c_str() );
- //get all children and delete them recursively...
- vector<string> nodeList;
- getNodeChildren( nodeList, path, NULL );
- for (vector<string>::const_iterator i = nodeList.begin();
- i != nodeList.end();
- ++i) {
- deleteNode( *i, true );
- }
- //...and finally attempt to delete the node again
- return deleteNode( path, false );
- }
- LOG_ERROR( LOG, "Error %d for %s", rc, path.c_str() );
- throw ZooKeeperException( string("Unable to delete node ") + path,
- rc );
- } else {
- LOG_INFO( LOG, "%s has been deleted", path.c_str() );
- return true;
- }
-}
-
-bool
-ZooKeeperAdapter::nodeExists(const string &path,
- ZKEventListener *listener,
- void *context, Stat *stat)
- throw(ZooKeeperException)
-{
- TRACE( LOG, "nodeExists" );
-
- validatePath( path );
-
- struct Stat tmpStat;
- if (stat == NULL) {
- stat = &tmpStat;
- }
- memset( stat, 0, sizeof(Stat) );
-
- int rc;
- RetryHandler rh(m_zkConfig);
- do {
- verifyConnection();
- if (context != NULL) {
- m_zkContextsMutex.Acquire();
- rc = zoo_exists( mp_zkHandle,
- path.c_str(),
- (listener != NULL ? 1 : 0),
- stat );
- if (rc == ZOK || rc == ZNONODE) {
- registerContext( NODE_EXISTS, path, listener, context );
- }
- m_zkContextsMutex.Release();
- } else {
- rc = zoo_exists( mp_zkHandle,
- path.c_str(),
- (listener != NULL ? 1 : 0),
- stat );
- }
- } while (rc != ZOK && rh.handleRC(rc));
- if (rc != ZOK) {
- if (rc == ZNONODE) {
- LOG_TRACE( LOG, "Node %s does not exist", path.c_str() );
- return false;
- }
- LOG_ERROR( LOG, "Error %d for %s", rc, path.c_str() );
- throw ZooKeeperException(
- string("Unable to check existence of node ") + path,
- rc );
- } else {
- return true;
- }
-}
-
-void
-ZooKeeperAdapter::getNodeChildren(vector<string> &nodeList,
- const string &path,
- ZKEventListener *listener,
- void *context)
- throw (ZooKeeperException)
-{
- TRACE( LOG, "getNodeChildren" );
-
- validatePath( path );
-
- String_vector children;
- memset( &children, 0, sizeof(children) );
-
- int rc;
- RetryHandler rh(m_zkConfig);
- do {
- verifyConnection();
- if (context != NULL) {
- m_zkContextsMutex.Acquire();
- rc = zoo_get_children( mp_zkHandle,
- path.c_str(),
- (listener != NULL ? 1 : 0),
- &children );
- if (rc == ZOK) {
- registerContext( GET_NODE_CHILDREN, path, listener, context );
- }
- m_zkContextsMutex.Release();
- } else {
- rc = zoo_get_children( mp_zkHandle,
- path.c_str(),
- (listener != NULL ? 1 : 0),
- &children );
- }
- } while (rc != ZOK && rh.handleRC(rc));
- if (rc != ZOK) {
- LOG_ERROR( LOG, "Error %d for %s", rc, path.c_str() );
- throw ZooKeeperException( string("Unable to get children of node ") +
- path,
- rc );
- } else {
- for (int i = 0; i < children.count; ++i) {
- //convert each child's path from relative to absolute
- string absPath(path);
- if (path != "/") {
- absPath.append( "/" );
- }
- absPath.append( children.data[i] );
- nodeList.push_back( absPath );
- }
- //make sure the order is always deterministic
- sort( nodeList.begin(), nodeList.end() );
- }
-}
-
-string
-ZooKeeperAdapter::getNodeData(const string &path,
- ZKEventListener *listener,
- void *context, Stat *stat)
- throw(ZooKeeperException)
-{
- TRACE( LOG, "getNodeData" );
-
- validatePath( path );
-
- const int MAX_DATA_LENGTH = 128 * 1024;
- char buffer[MAX_DATA_LENGTH];
- memset( buffer, 0, MAX_DATA_LENGTH );
- struct Stat tmpStat;
- if (stat == NULL) {
- stat = &tmpStat;
- }
- memset( stat, 0, sizeof(Stat) );
-
- int rc;
- int len;
- RetryHandler rh(m_zkConfig);
- do {
- verifyConnection();
- len = MAX_DATA_LENGTH - 1;
- if (context != NULL) {
- m_zkContextsMutex.Acquire();
- rc = zoo_get( mp_zkHandle,
- path.c_str(),
- (listener != NULL ? 1 : 0),
- buffer, &len, stat );
- if (rc == ZOK) {
- registerContext( GET_NODE_DATA, path, listener, context );
- }
- m_zkContextsMutex.Release();
- } else {
- rc = zoo_get( mp_zkHandle,
- path.c_str(),
- (listener != NULL ? 1 : 0),
- buffer, &len, stat );
- }
- } while (rc != ZOK && rh.handleRC(rc));
- if (rc != ZOK) {
- LOG_ERROR( LOG, "Error %d for %s", rc, path.c_str() );
- throw ZooKeeperException(
- string("Unable to get data of node ") + path, rc
- );
- } else {
- if (len == -1) {
- len = 0;
- }
- return string( buffer, len );
- }
-}
-
-void
-ZooKeeperAdapter::setNodeData(const string &path,
- const string &value,
- int version)
- throw(ZooKeeperException)
-{
- TRACE( LOG, "setNodeData" );
-
- validatePath( path );
-
- int rc;
- RetryHandler rh(m_zkConfig);
- do {
- verifyConnection();
- rc = zoo_set( mp_zkHandle,
- path.c_str(),
- value.c_str(),
- value.length(),
- version);
- } while (rc != ZOK && rh.handleRC(rc));
- if (rc != ZOK) {
- LOG_ERROR( LOG, "Error %d for %s", rc, path.c_str() );
- throw ZooKeeperException( string("Unable to set data for node ") +
- path,
- rc );
- }
-}
-
-} /* end of 'namespace zk' */
-
http://git-wip-us.apache.org/repos/asf/zookeeper/blob/eab8c1eb/src/contrib/zkfuse/src/zkadapter.h
----------------------------------------------------------------------
diff --git a/src/contrib/zkfuse/src/zkadapter.h b/src/contrib/zkfuse/src/zkadapter.h
deleted file mode 100644
index 8d4d1d5..0000000
--- a/src/contrib/zkfuse/src/zkadapter.h
+++ /dev/null
@@ -1,718 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#ifndef __ZKADAPTER_H__
-#define __ZKADAPTER_H__
-
-#include <string>
-#include <vector>
-#include <map>
-
-extern "C" {
-#include "zookeeper.h"
-}
-
-#include "log.h"
-#include "mutex.h"
-#include "thread.h"
-#include "blockingqueue.h"
-#include "event.h"
-
-using namespace std;
-using namespace zkfuse;
-
-namespace zk {
-
-/**
- * \brief A cluster related exception.
- */
-class ZooKeeperException :
- public std::exception
-{
- public:
-
- /**
- * \brief Constructor.
- *
- * @param msg the detailed message associated with this exception
- */
- ZooKeeperException(const string &msg) :
- m_message(msg), m_zkErrorCode(0)
- {}
-
- /**
- * \brief Constructor.
- *
- * @param msg the detailed message associated with this exception
- * @param errorCode the ZK error code associated with this exception
- */
- ZooKeeperException(const string &msg, int errorCode) :
- m_zkErrorCode(errorCode)
- {
- char tmp[100];
- sprintf( tmp, " (ZK error code: %d)", errorCode );
- m_message = msg + tmp;
- }
-
- /**
- * \brief Destructor.
- */
- ~ZooKeeperException() throw() {}
-
- /**
- * \brief Returns detailed description of the exception.
- */
- const char *what() const throw() {
- return m_message.c_str();
- }
-
- /**
- * \brief Returns the ZK error code.
- */
- int getZKErrorCode() const {
- return m_zkErrorCode;
- }
-
- private:
-
- /**
- * The detailed message associated with this exception.
- */
- string m_message;
-
- /**
- * The optional error code received from ZK.
- */
- int m_zkErrorCode;
-
-};
-
-/**
- * \brief This class encapsulates configuration of a ZK client.
- */
-class ZooKeeperConfig
-{
- public:
-
- /**
- * \brief Constructor.
- *
- * @param hosts the comma separated list of host and port pairs of ZK nodes
- * @param leaseTimeout the lease timeout (heartbeat)
- * @param autoReconnect whether to allow for auto-reconnect
- * @param connectTimeout the connect timeout, in milliseconds;
- */
- ZooKeeperConfig(const string &hosts,
- int leaseTimeout,
- bool autoReconnect = true,
- long long int connectTimeout = 15000) :
- m_hosts(hosts), m_leaseTimeout(leaseTimeout),
- m_autoReconnect(autoReconnect), m_connectTimeout(connectTimeout) {}
-
- /**
- * \brief Returns the list of ZK hosts to connect to.
- */
- string getHosts() const { return m_hosts; }
-
- /**
- * \brief Returns the lease timeout.
- */
- int getLeaseTimeout() const { return m_leaseTimeout; }
-
- /**
- * \brief Returns whether {@link ZooKeeperAdapter} should attempt
- * \brief to automatically reconnect in case of a connection failure.
- */
- bool getAutoReconnect() const { return m_autoReconnect; }
-
- /**
- * \brief Gets the connect timeout.
- *
- * @return the connect timeout
- */
- long long int getConnectTimeout() const { return m_connectTimeout; }
-
- private:
-
- /**
- * The host addresses of ZK nodes.
- */
- const string m_hosts;
-
- /**
- * The ZK lease timeout.
- */
- const int m_leaseTimeout;
-
- /**
- * True if this adapater should attempt to autoreconnect in case
- * the current session has been dropped.
- */
- const bool m_autoReconnect;
-
- /**
- * How long to wait, in milliseconds, before a connection
- * is established to ZK.
- */
- const long long int m_connectTimeout;
-
-};
-
-/**
- * \brief A data value object representing a watcher event received from the ZK.
- */
-class ZKWatcherEvent
-{
- public:
-
- /**
- * \brief The type representing the user's context.
- */
- typedef void *ContextType;
-
- /**
- * \brief Constructor.
- *
- * @param type the type of this event
- * @param state the state of this event
- * @param path the corresponding path, may be empty for some event types
- * @param context the user specified context; possibly NULL
- */
- ZKWatcherEvent() :
- m_type(-1), m_state(-1), m_path(""), mp_context(NULL) {}
-
- /**
- * \brief Constructor.
- *
- * @param type the type of this event
- * @param state the state of this event
- * @param path the corresponding path, may be empty for some event types
- * @param context the user specified context; possibly NULL
- */
- ZKWatcherEvent(int type, int state, const string &path,
- ContextType context = NULL) :
- m_type(type), m_state(state), m_path(path), mp_context(context) {}
-
- int getType() const { return m_type; }
- int getState() const { return m_state; }
- string const &getPath() const { return m_path; }
- ContextType getContext() const { return mp_context; }
-
- bool operator==(const ZKWatcherEvent &we) const {
- return m_type == we.m_type && m_state == we.m_state
- && m_path == we.m_path && mp_context == we.mp_context;
- }
-
- private:
-
- /**
- * The type of this event. It can be either ZOO_CREATED_EVENT, ZOO_DELETED_EVENT,
- * ZOO_CHANGED_EVENT, ZOO_CHILD_EVENT, ZOO_SESSION_EVENT or ZOO_NOTWATCHING_EVENT.
- * See zookeeper.h for more details.
- */
- const int m_type;
-
- /**
- * The state of ZK at the time of sending this event.
- * It can be either ZOO_CONNECTING_STATE, ZOO_ASSOCIATING_STATE,
- * ZOO_CONNECTED_STATE, ZOO_EXPIRED_SESSION_STATE or AUTH_FAILED_STATE.
- * See {@file zookeeper.h} for more details.
- */
- const int m_state;
-
- /**
- * The corresponding path of the node in subject. It may be empty
- * for some event types.
- */
- const string m_path;
-
- /**
- * The pointer to the user specified context, possibly NULL.
- */
- ContextType mp_context;
-
-};
-
-/**
- * \brief The type definition of ZK event source.
- */
-typedef EventSource<ZKWatcherEvent> ZKEventSource;
-
-/**
- * \brief The type definition of ZK event listener.
- */
-typedef EventListener<ZKWatcherEvent> ZKEventListener;
-
-/**
- * \brief This is a wrapper around ZK C synchrounous API.
- */
-class ZooKeeperAdapter
- : public ZKEventSource
-{
- public:
- /**
- * \brief The global function that handles all ZK asynchronous notifications.
- */
- friend void zkWatcher(zhandle_t *, int, int, const char *, void *watcherCtx);
-
- /**
- * \brief The type representing the user's context.
- */
- typedef void *ContextType;
-
- /**
- * \brief The map type of ZK event listener to user specified context mapping.
- */
- typedef map<ZKEventListener *, ContextType> Listener2Context;
-
- /**
- * \brief The map type of ZK path's to listener's contexts.
- */
- typedef map<string, Listener2Context> Path2Listener2Context;
-
- /**
- * \brief All possible states of this client, in respect to
- * \brief connection to the ZK server.
- */
- enum AdapterState {
- //mp_zkHandle is NULL
- AS_DISCONNECTED = 0,
- //mp_zkHandle is valid but this client is reconnecting
- AS_CONNECTING,
- //mp_zkHandle is valid and this client is connected
- AS_CONNECTED,
- //mp_zkHandle is valid, however no more calls can be made to ZK API
- AS_SESSION_EXPIRED
- };
-
- /**
- * \brief Constructor.
- * Attempts to create a ZK adapter, optionally connecting
- * to the ZK. Note, that if the connection is to be established
- * and the given listener is NULL, some events may be lost,
- * as they may arrive asynchronously before this method finishes.
- *
- * @param config the ZK configuration
- * @param listener the event listener to be used for listening
- * on incoming ZK events;
- * if <code>NULL</code> not used
- * @param establishConnection whether to establish connection to the ZK
- *
- * @throw ZooKeeperException if cannot establish connection to the given ZK
- */
- ZooKeeperAdapter(ZooKeeperConfig config,
- ZKEventListener *listener = NULL,
- bool establishConnection = false)
- throw(ZooKeeperException);
-
- /**
- * \brief Destructor.
- */
- ~ZooKeeperAdapter();
-
- /**
- * \brief Returns the current config.
- */
- const ZooKeeperConfig &getZooKeeperConfig() const {
- return m_zkConfig;
- }
-
- /**
- * \brief Restablishes connection to the ZK.
- * If this adapter is already connected, the current connection
- * will be dropped and a new connection will be established.
- *
- * @throw ZooKeeperException if cannot establish connection to the ZK
- */
- void reconnect() throw(ZooKeeperException);
-
- /**
- * \brief Disconnects from the ZK and unregisters {@link #mp_zkHandle}.
- */
- void disconnect();
-
- /**
- * \brief Creates a new node identified by the given path.
- * This method will optionally attempt to create all missing ancestors.
- *
- * @param path the absolute path name of the node to be created
- * @param value the initial value to be associated with the node
- * @param flags the ZK flags of the node to be created
- * @param createAncestors if true and there are some missing ancestor nodes,
- * this method will attempt to create them
- *
- * @return true if the node has been successfully created; false otherwise
- * @throw ZooKeeperException if the operation has failed
- */
- bool createNode(const string &path,
- const string &value = "",
- int flags = 0,
- bool createAncestors = true)
- throw(ZooKeeperException);
-
- /**
- * \brief Creates a new sequence node using the give path as the prefix.
- * This method will optionally attempt to create all missing ancestors.
- *
- * @param path the absolute path name of the node to be created;
- * @param value the initial value to be associated with the node
- * @param flags the ZK flags of the sequence node to be created
- * (in addition to SEQUENCE)
- * @param createAncestors if true and there are some missing ancestor
- * nodes, this method will attempt to create them
- *
- * @return the sequence number associate with newly created node,
- * or -1 if it couldn't be created
- * @throw ZooKeeperException if the operation has failed
- */
- int64_t createSequence(const string &path,
- const string &value = "",
- int flags = 0,
- bool createAncestors = true)
- throw(ZooKeeperException);
-
- /**
- * \brief Deletes a node identified by the given path.
- *
- * @param path the absolute path name of the node to be deleted
- * @param recursive if true this method will attempt to remove
- * all children of the given node if any exist
- * @param version the expected version of the node. The function will
- * fail if the actual version of the node does not match
- * the expected version
- *
- * @return true if the node has been deleted; false otherwise
- * @throw ZooKeeperException if the operation has failed
- */
- bool deleteNode(const string &path, bool recursive = false, int version = -1)
- throw(ZooKeeperException);
-
- /**
- * \brief Checks whether the given node exists or not.
- *
- * @param path the absolute path name of the node to be checked
- * @param listener the listener for ZK watcher events;
- * passing non <code>NULL</code> effectively establishes
- * a ZK watch on the given node
- * @param context the user specified context that is to be passed
- * in a corresponding {@link ZKWatcherEvent} at later time;
- * not used if <code>listener</code> is <code>NULL</code>
- * @param stat the optional node statistics to be filled in by ZK
- *
- * @return true if the given node exists; false otherwise
- * @throw ZooKeeperException if the operation has failed
- */
- bool nodeExists(const string &path,
- ZKEventListener *listener = NULL,
- void *context = NULL,
- Stat *stat = NULL)
- throw(ZooKeeperException);
-
- /**
- * \brief Retrieves list of all children of the given node.
- *
- * @param path the absolute path name of the node for which to get children
- * @param listener the listener for ZK watcher events;
- * passing non <code>NULL</code> effectively establishes
- * a ZK watch on the given node
- * @param context the user specified context that is to be passed
- * in a corresponding {@link ZKWatcherEvent} at later time;
- * not used if <code>listener</code> is <code>NULL</code>
- *
- * @return the list of absolute paths of child nodes, possibly empty
- * @throw ZooKeeperException if the operation has failed
- */
- void getNodeChildren(vector<string> &children,
- const string &path,
- ZKEventListener *listener = NULL,
- void *context = NULL)
- throw(ZooKeeperException);
-
- /**
- * \brief Gets the given node's data.
- *
- * @param path the absolute path name of the node to get data from
- * @param listener the listener for ZK watcher events;
- * passing non <code>NULL</code> effectively establishes
- * a ZK watch on the given node
- * @param context the user specified context that is to be passed
- * in a corresponding {@link ZKWatcherEvent} at later time;
- * not used if <code>listener</code> is <code>NULL</code>
- * @param stat the optional node statistics to be filled in by ZK
- *
- * @return the node's data
- * @throw ZooKeeperException if the operation has failed
- */
- string getNodeData(const string &path,
- ZKEventListener *listener = NULL,
- void *context = NULL,
- Stat *stat = NULL)
- throw(ZooKeeperException);
-
- /**
- * \brief Sets the given node's data.
- *
- * @param path the absolute path name of the node to get data from
- * @param value the node's data to be set
- * @param version the expected version of the node. The function will
- * fail if the actual version of the node does not match
- * the expected version
- *
- * @throw ZooKeeperException if the operation has failed
- */
- void setNodeData(const string &path, const string &value, int version = -1)
- throw(ZooKeeperException);
-
- /**
- * \brief Validates the given path to a node in ZK.
- *
- * @param the path to be validated
- *
- * @throw ZooKeeperException if the given path is not valid
- * (for instance it doesn't start with "/")
- */
- static void validatePath(const string &path) throw(ZooKeeperException);
-
- /**
- * Returns the current state of this adapter.
- *
- * @return the current state of this adapter
- * @see AdapterState
- */
- AdapterState getState() const {
- return m_state;
- }
-
- private:
-
- /**
- * This enum defines methods from this class than can trigger an event.
- */
- enum WatchableMethod {
- NODE_EXISTS = 0,
- GET_NODE_CHILDREN,
- GET_NODE_DATA
- };
-
- /**
- * \brief Creates a new node identified by the given path.
- * This method is used internally to implement {@link createNode(...)}
- * and {@link createSequence(...)}. On success, this method will set
- * <code>createdPath</code>.
- *
- * @param path the absolute path name of the node to be created
- * @param value the initial value to be associated with the node
- * @param flags the ZK flags of the node to be created
- * @param createAncestors if true and there are some missing ancestor nodes,
- * this method will attempt to create them
- * @param createdPath the actual path of the node that has been created;
- * useful for sequences
- *
- * @return true if the node has been successfully created; false otherwise
- * @throw ZooKeeperException if the operation has failed
- */
- bool createNode(const string &path,
- const string &value,
- int flags,
- bool createAncestors,
- string &createdPath)
- throw(ZooKeeperException);
-
- /**
- * Handles an asynchronous event received from the ZK.
- */
- void handleEvent(int type, int state, const string &path);
-
- /**
- * Handles an asynchronous event received from the ZK.
- * This method iterates over all listeners and passes the event
- * to each of them.
- */
- void handleEvent(int type, int state, const string &path,
- const Listener2Context &listeners);
-
- /**
- * \brief Enqueues the given event in {@link #m_events} queue.
- */
- void enqueueEvent(int type, int state, const string &path);
-
- /**
- * \brief Processes all ZK adapter events in a loop.
- */
- void processEvents();
-
- /**
- * \brief Processes all user events in a loop.
- */
- void processUserEvents();
-
- /**
- * \brief Registers the given context in the {@link #m_zkContexts}
- * \brief contexts map.
- *
- * @param method the method where the given path is being used
- * @param path the path of interest
- * @param listener the event listener to call back later on
- * @param context the user specified context to be passed back to user
- */
- void registerContext(WatchableMethod method, const string &path,
- ZKEventListener *listener, ContextType context);
-
- /**
- * \brief Attempts to find a listener to context map in the contexts'
- * \brief map, based on the specified criteria.
- * If the context is found, it will be removed the udnerlying map.
- *
- * @param method the method type identify Listener2Context map
- * @param path the path to be used to search in the Listener2Context map
- *
- * @return the context map associated with the given method and path,
- * or empty map if not found
- */
- Listener2Context findAndRemoveListenerContext(WatchableMethod method,
- const string &path);
-
- /**
- * Sets the new state in case it's different then the current one.
- * This method assumes that {@link #m_stateLock} has been already locked.
- *
- * @param newState the new state to be set
- */
- void setState(AdapterState newState);
-
- /**
- * Waits until this client gets connected. The total wait time
- * is given by {@link getRemainingConnectTimeout()}.
- * If a timeout elapses, this method will throw an exception.
- *
- * @throw ZooKeeperException if unable to connect within the given timeout
- */
- void waitUntilConnected()
- throw(ZooKeeperException);
-
- /**
- * Verifies whether the connection is established,
- * optionally auto reconnecting.
- *
- * @throw ZooKeeperConnection if this client is disconnected
- * and auto-reconnect failed or was not allowed
- */
- void verifyConnection() throw(ZooKeeperException);
-
- /**
- * Returns the remaining connect timeout. The timeout resets
- * to {@link #m_connectTimeout} on a successfull connection to the ZK.
- *
- * @return the remaining connect timeout, in milliseconds
- */
- long long int getRemainingConnectTimeout() {
- return m_remainingConnectTimeout;
- }
-
- /**
- * Resets the remaining connect timeout to {@link #m_connectTimeout}.
- */
- void resetRemainingConnectTimeout() {
- m_remainingConnectTimeout = m_zkConfig.getConnectTimeout();
- }
-
- /**
- * Updates the remaining connect timeout to reflect the given wait time.
- *
- * @param time the time for how long waited so far on connect to succeed
- */
- void waitedForConnect(long long time) {
- m_remainingConnectTimeout -= time;
- }
-
- private:
-
- /**
- * The mutex use to protect {@link #m_zkContexts}.
- */
- zkfuse::Mutex m_zkContextsMutex;
-
- /**
- * The map of registered ZK paths that are being watched.
- * Each entry maps a function type to another map of registered contexts.
- *
- * @see WatchableMethod
- */
- map<int, Path2Listener2Context> m_zkContexts;
-
- /**
- * The current ZK configuration.
- */
- const ZooKeeperConfig m_zkConfig;
-
- /**
- * The current ZK session.
- */
- zhandle_t *mp_zkHandle;
-
- /**
- * The blocking queue of all events waiting to be processed by ZK adapter.
- */
- BlockingQueue<ZKWatcherEvent> m_events;
-
- /**
- * The blocking queue of all events waiting to be processed by users
- * of ZK adapter.
- */
- BlockingQueue<ZKWatcherEvent> m_userEvents;
-
- /**
- * The thread that dispatches all events from {@link #m_events} queue.
- */
- CXXThread<ZooKeeperAdapter> m_eventDispatcher;
-
- /**
- * The thread that dispatches all events from {@link #m_userEvents} queue.
- */
- CXXThread<ZooKeeperAdapter> m_userEventDispatcher;
-
- /**
- * Whether {@link #m_eventDispatcher} is terminating.
- */
- volatile bool m_terminating;
-
- /**
- * Whether this adapter is connected to the ZK.
- */
- volatile bool m_connected;
-
- /**
- * The state of this adapter.
- */
- AdapterState m_state;
-
- /**
- * The lock used to synchronize access to {@link #m_state}.
- */
- Lock m_stateLock;
-
- /**
- * How much time left for the connect to succeed, in milliseconds.
- */
- long long int m_remainingConnectTimeout;
-
-};
-
-} /* end of 'namespace zk' */
-
-#endif /* __ZKADAPTER_H__ */
|