From commits-return-6812-archive-asf-public=cust-asf.ponee.io@zookeeper.apache.org Mon Aug 6 14:14:23 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 7DCC01807A1 for ; Mon, 6 Aug 2018 14:14:21 +0200 (CEST) Received: (qmail 57031 invoked by uid 500); 6 Aug 2018 12:14:13 -0000 Mailing-List: contact commits-help@zookeeper.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@zookeeper.apache.org Delivered-To: mailing list commits@zookeeper.apache.org Received: (qmail 53438 invoked by uid 99); 6 Aug 2018 12:14:11 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 06 Aug 2018 12:14:11 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 247ACE1189; Mon, 6 Aug 2018 12:14:11 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: andor@apache.org To: commits@zookeeper.apache.org Date: Mon, 06 Aug 2018 12:14:44 -0000 Message-Id: <30290fbbc750485883931d8fe18f5bbe@git.apache.org> In-Reply-To: <23d59816fc864cdcaa333309761a6f23@git.apache.org> References: <23d59816fc864cdcaa333309761a6f23@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [35/45] zookeeper git commit: ZOOKEEPER-3030: MAVEN MIGRATION - Step 1.3 - move contrib directories http://git-wip-us.apache.org/repos/asf/zookeeper/blob/eab8c1eb/src/contrib/zkfuse/src/event.h ---------------------------------------------------------------------- diff --git a/src/contrib/zkfuse/src/event.h b/src/contrib/zkfuse/src/event.h deleted file mode 100644 index 936ecc6..0000000 --- a/src/contrib/zkfuse/src/event.h +++ /dev/null @@ -1,553 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#ifndef __EVENT_H__ -#define __EVENT_H__ - -#include -#include -#include -#include -#ifdef GCC4 -# include -using namespace std::tr1; -#else -# include -using namespace boost; -#endif - -#include "log.h" -#include "blockingqueue.h" -#include "mutex.h" -#include "thread.h" - -using namespace std; -using namespace zk; - -namespace zkfuse { - -//forward declaration of EventSource -template -class EventSource; - -/** - * \brief This interface is implemented by an observer - * \brief of a particular {@link EventSource}. - */ -template -class EventListener { - public: - - /** - * \brief This method is invoked whenever an event - * \brief has been received by the event source being observed. - * - * @param source the source the triggered the event - * @param e the actual event being triggered - */ - virtual void eventReceived(const EventSource &source, const E &e) = 0; -}; - -/** - * \brief This class represents a source of events. - * - *

- * Each source can have many observers (listeners) attached to it - * and in case of an event, this source may propagate the event - * using {@link #fireEvent} method. - */ -template -class EventSource { - public: - - /** - * \brief The type corresponding to the list of registered event listeners. - */ - typedef set *> EventListeners; - - /** - * \brief Registers a new event listener. - * - * @param listener the listener to be added to the set of listeners - */ - void addListener(EventListener *listener) { - m_listeners.insert( listener ); - } - - /** - * \brief Removes an already registered listener. - * - * @param listener the listener to be removed - */ - void removeListener(EventListener *listener) { - m_listeners.erase( listener ); - } - - /** - * \brief Destructor. - */ - virtual ~EventSource() {} - - protected: - - /** - * \brief Fires the given event to all registered listeners. - * - *

- * This method essentially iterates over all listeners - * and invokes {@link fireEvent(EventListener *listener, const E &event)} - * for each element. All derived classes are free to - * override the method to provide better error handling - * than the default implementation. - * - * @param event the event to be propagated to all listeners - */ - void fireEvent(const E &event); - - /** - * \brief Sends an event to the given listener. - * - * @param listener the listener to whom pass the event - * @param event the event to be handled - */ - virtual void fireEvent(EventListener *listener, const E &event); - - private: - - /** - * The set of registered event listeners. - */ - EventListeners m_listeners; - -}; - -/** - * \brief The interface of a generic event wrapper. - */ -class AbstractEventWrapper { - public: - - /** - * \brief Destructor. - */ - virtual ~AbstractEventWrapper() {} - - /** - * \brief Returns the underlying wrapee's data. - */ - virtual void *getWrapee() = 0; -}; - -/** - * \brief A template based implementation of {@link AbstractEventWrapper}. - */ -template -class EventWrapper : public AbstractEventWrapper { - public: - EventWrapper(const E &e) : m_e(e) { - } - void *getWrapee() { - return &m_e; - } - private: - E m_e; -}; - -/** - * \brief This class represents a generic event. - */ -class GenericEvent { - public: - - /** - * \brief Constructor. - */ - GenericEvent() : m_type(0) {} - - /** - * \brief Constructor. - * - * @param type the type of this event - * @param eventWarpper the wrapper around event's data - */ - GenericEvent(int type, AbstractEventWrapper *eventWrapper) : - m_type(type), m_eventWrapper(eventWrapper) { - } - - /** - * \brief Returns the type of this event. - * - * @return type of this event - */ - int getType() const { return m_type; } - - /** - * \brief Returns the event's data. - * - * @return the event's data - */ - void *getEvent() const { return m_eventWrapper->getWrapee(); } - - private: - - /** - * The event type. - */ - int m_type; - - /** - * The event represented as abstract wrapper. - */ - boost::shared_ptr m_eventWrapper; - -}; - -/** - * \brief This class adapts {@link EventListener} to a generic listener. - * Essentially this class listens on incoming events and fires them - * as {@link GenericEvent}s. - */ -template -class EventListenerAdapter : public virtual EventListener, - public virtual EventSource -{ - public: - - /** - * \brief Constructor. - * - * @param eventSource the source on which register this listener - */ - EventListenerAdapter(EventSource &eventSource) { - eventSource.addListener(this); - } - - void eventReceived(const EventSource &source, const E &e) { - AbstractEventWrapper *wrapper = new EventWrapper(e); - GenericEvent event(type, wrapper); - fireEvent( event ); - } - -}; - -/** - * \brief This class provides an adapter between an asynchronous and synchronous - * \brief event handling. - * - *

- * This class queues up all received events and exposes them through - * {@link #getNextEvent()} method. - */ -template -class SynchronousEventAdapter : public EventListener { - public: - - void eventReceived(const EventSource &source, const E &e) { - m_queue.put( e ); - } - - /** - * \brief Returns the next available event from the underlying queue, - * \brief possibly blocking, if no data is available. - * - * @return the next available event - */ - E getNextEvent() { - return m_queue.take(); - } - - /** - * \brief Returns whether there are any events in the queue or not. - * - * @return true if there is at least one event and - * the next call to {@link #getNextEvent} won't block - */ - bool hasEvents() const { - return (m_queue.empty() ? false : true); - } - - /** - * \brief Destructor. - */ - virtual ~SynchronousEventAdapter() {} - - private: - - /** - * The blocking queue of all events received so far. - */ - BlockingQueue m_queue; - -}; - -/** - * This typedef defines the type of a timer Id. - */ -typedef int32_t TimerId; - -/** - * This class represents a timer event parametrized by the user's data type. - */ -template -class TimerEvent { - public: - - /** - * \brief Constructor. - * - * @param id the ID of this event - * @param alarmTime when this event is to be triggered - * @param userData the user data associated with this event - */ - TimerEvent(TimerId id, int64_t alarmTime, const T &userData) : - m_id(id), m_alarmTime(alarmTime), m_userData(userData) - {} - - /** - * \brief Constructor. - */ - TimerEvent() : m_id(-1), m_alarmTime(-1) {} - - /** - * \brief Returns the ID. - * - * @return the ID of this event - */ - TimerId getID() const { return m_id; } - - /** - * \brief Returns the alarm time. - * - * @return the alarm time - */ - int64_t getAlarmTime() const { return m_alarmTime; } - - /** - * \brief Returns the user's data. - * - * @return the user's data - */ - T const &getUserData() const { return m_userData; } - - /** - * \brief Returns whether the given alarm time is less than this event's - * \brief time. - */ - bool operator<(const int64_t alarmTime) const { - return m_alarmTime < alarmTime; - } - - private: - - /** - * The ID of ths event. - */ - TimerId m_id; - - /** - * The time at which this event triggers. - */ - int64_t m_alarmTime; - - /** - * The user specific data associated with this event. - */ - T m_userData; - -}; - -template -class Timer : public EventSource > { - public: - - /** - * \brief Constructor. - */ - Timer() : m_currentEventID(0), m_terminating(false) { - m_workerThread.Create( *this, &Timer::sendAlarms ); - } - - /** - * \brief Destructor. - */ - ~Timer() { - m_terminating = true; - m_lock.notify(); - m_workerThread.Join(); - } - - /** - * \brief Schedules the given event timeFromNow milliseconds. - * - * @param timeFromNow time from now, in milliseconds, when the event - * should be triggered - * @param userData the user data associated with the timer event - * - * @return the ID of the newly created timer event - */ - TimerId scheduleAfter(int64_t timeFromNow, const T &userData) { - return scheduleAt( getCurrentTimeMillis() + timeFromNow, userData ); - } - - /** - * \brief Schedules an event at the given time. - * - * @param absTime absolute time, in milliseconds, at which the event - * should be triggered; the time is measured - * from Jan 1st, 1970 - * @param userData the user data associated with the timer event - * - * @return the ID of the newly created timer event - */ - TimerId scheduleAt(int64_t absTime, const T &userData) { - m_lock.lock(); - typename QueueType::iterator pos = - lower_bound( m_queue.begin(), m_queue.end(), absTime ); - TimerId id = m_currentEventID++; - TimerEvent event(id, absTime, userData); - m_queue.insert( pos, event ); - m_lock.notify(); - m_lock.unlock(); - return id; - } - - /** - * \brief Returns the current time since Jan 1, 1970, in milliseconds. - * - * @return the current time in milliseconds - */ - static int64_t getCurrentTimeMillis() { - struct timeval now; - gettimeofday( &now, NULL ); - return now.tv_sec * 1000LL + now.tv_usec / 1000; - } - - /** - * \brief Cancels the given timer event. - * - * - * @param eventID the ID of the event to be canceled - * - * @return whether the event has been canceled - */ - bool cancelAlarm(TimerId eventID) { - bool canceled = false; - m_lock.lock(); - typename QueueType::iterator i; - for (i = m_queue.begin(); i != m_queue.end(); ++i) { - if (eventID == i->getID()) { - m_queue.erase( i ); - canceled = true; - break; - } - } - m_lock.unlock(); - return canceled; - } - - /** - * Executes the main loop of the worker thread. - */ - void sendAlarms() { - //iterate until terminating - while (!m_terminating) { - m_lock.lock(); - //1 step - wait until there is an event in the queue - if (m_queue.empty()) { - //wait up to 100ms to get next event - m_lock.wait( 100 ); - } - bool fire = false; - if (!m_queue.empty()) { - //retrieve the event from the queue and send it - TimerEvent event = m_queue.front(); - //check whether we can send it right away - int64_t timeToWait = - event.getAlarmTime() - getCurrentTimeMillis(); - if (timeToWait <= 0) { - m_queue.pop_front(); - //we fire only if it's still in the queue and alarm - //time has just elapsed (in case the top event - //is canceled) - fire = true; - } else { - m_lock.wait( timeToWait ); - } - m_lock.unlock(); - if (fire) { - fireEvent( event ); - } - } else { - m_lock.unlock(); - } - } - } - - private: - - /** - * The type of timer events queue. - */ - typedef deque > QueueType; - - /** - * The current event ID, auto-incremented each time a new event - * is created. - */ - TimerId m_currentEventID; - - /** - * The queue of timer events sorted by {@link TimerEvent#alarmTime}. - */ - QueueType m_queue; - - /** - * The lock used to guard {@link #m_queue}. - */ - Lock m_lock; - - /** - * The thread that triggers alarms. - */ - CXXThread > m_workerThread; - - /** - * Whether {@link #m_workerThread} is terminating. - */ - volatile bool m_terminating; - -}; - -template -void EventSource::fireEvent(const E &event) { - for (typename EventListeners::iterator i = m_listeners.begin(); - i != m_listeners.end(); - ++i) - { - fireEvent( *i, event ); - } -} - -template -void EventSource::fireEvent(EventListener *listener, const E &event) { - listener->eventReceived( *this, event ); -} - -} /* end of 'namespace zkfuse' */ - -#endif /* __EVENT_H__ */ http://git-wip-us.apache.org/repos/asf/zookeeper/blob/eab8c1eb/src/contrib/zkfuse/src/log.cc ---------------------------------------------------------------------- diff --git a/src/contrib/zkfuse/src/log.cc b/src/contrib/zkfuse/src/log.cc deleted file mode 100644 index e2bfb0d..0000000 --- a/src/contrib/zkfuse/src/log.cc +++ /dev/null @@ -1,36 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include - -#include "log.h" - -using namespace std; - -/** - * \brief This class encapsulates a log4cxx configuration. - */ -class LogConfiguration { - public: - LogConfiguration(const string &file) { - PropertyConfigurator::configureAndWatch( file, 5000 ); - } -}; - -//enforces the configuration to be initialized -static LogConfiguration logConfig( "log4cxx.properties" ); http://git-wip-us.apache.org/repos/asf/zookeeper/blob/eab8c1eb/src/contrib/zkfuse/src/log.h ---------------------------------------------------------------------- diff --git a/src/contrib/zkfuse/src/log.h b/src/contrib/zkfuse/src/log.h deleted file mode 100644 index aefce10..0000000 --- a/src/contrib/zkfuse/src/log.h +++ /dev/null @@ -1,116 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#ifndef __LOG_H__ -#define __LOG_H__ - -#define ZKFUSE_NAMESPACE zkfuse -#define START_ZKFUSE_NAMESPACE namespace ZKFUSE_NAMESPACE { -#define END_ZKFUSE_NAMESPACE } -#define USING_ZKFUSE_NAMESPACE using namespace ZKFUSE_NAMESPACE; - -#include -#include -#include - -#include -#include -#include -using namespace log4cxx; -using namespace log4cxx::helpers; - -#define PRINTIP(x) ((uint8_t*)&x)[0], ((uint8_t*)&x)[1], \ - ((uint8_t*)&x)[2], ((uint8_t*)&x)[3] - -#define IPFMT "%u.%u.%u.%u" - -#define DECLARE_LOGGER(varName) \ -extern LoggerPtr varName; - -#define DEFINE_LOGGER(varName, logName) \ -static LoggerPtr varName = Logger::getLogger( logName ); - -#define MAX_BUFFER_SIZE 20000 - -#define SPRINTF_LOG_MSG(buffer, fmt, args...) \ - char buffer[MAX_BUFFER_SIZE]; \ - snprintf( buffer, MAX_BUFFER_SIZE, fmt, ##args ); - -// older versions of log4cxx don't support tracing -#ifdef LOG4CXX_TRACE -#define LOG_TRACE(logger, fmt, args...) \ - if (logger->isTraceEnabled()) { \ - SPRINTF_LOG_MSG( __tmp, fmt, ##args ); \ - LOG4CXX_TRACE( logger, __tmp ); \ - } -#else -#define LOG_TRACE(logger, fmt, args...) \ - if (logger->isDebugEnabled()) { \ - SPRINTF_LOG_MSG( __tmp, fmt, ##args ); \ - LOG4CXX_DEBUG( logger, __tmp ); \ - } -#endif - -#define LOG_DEBUG(logger, fmt, args...) \ - if (logger->isDebugEnabled()) { \ - SPRINTF_LOG_MSG( __tmp, fmt, ##args ); \ - LOG4CXX_DEBUG( logger, __tmp ); \ - } - -#define LOG_INFO(logger, fmt, args...) \ - if (logger->isInfoEnabled()) { \ - SPRINTF_LOG_MSG( __tmp, fmt, ##args ); \ - LOG4CXX_INFO( logger, __tmp ); \ - } - -#define LOG_WARN(logger, fmt, args...) \ - if (logger->isWarnEnabled()) { \ - SPRINTF_LOG_MSG( __tmp, fmt, ##args ); \ - LOG4CXX_WARN( logger, __tmp ); \ - } - -#define LOG_ERROR(logger, fmt, args...) \ - if (logger->isErrorEnabled()) { \ - SPRINTF_LOG_MSG( __tmp, fmt, ##args ); \ - LOG4CXX_ERROR( logger, __tmp ); \ - } - -#define LOG_FATAL(logger, fmt, args...) \ - if (logger->isFatalEnabled()) { \ - SPRINTF_LOG_MSG( __tmp, fmt, ##args ); \ - LOG4CXX_FATAL( logger, __tmp ); \ - } - -#ifdef DISABLE_TRACE -# define TRACE(logger, x) -#else -# define TRACE(logger, x) \ -class Trace { \ - public: \ - Trace(const void* p) : _p(p) { \ - LOG_TRACE(logger, "%s %p Enter", __PRETTY_FUNCTION__, p); \ - } \ - ~Trace() { \ - LOG_TRACE(logger, "%s %p Exit", __PRETTY_FUNCTION__, _p); \ - } \ - const void* _p; \ -} traceObj(x); -#endif /* DISABLE_TRACE */ - -#endif /* __LOG_H__ */ - http://git-wip-us.apache.org/repos/asf/zookeeper/blob/eab8c1eb/src/contrib/zkfuse/src/log4cxx.properties ---------------------------------------------------------------------- diff --git a/src/contrib/zkfuse/src/log4cxx.properties b/src/contrib/zkfuse/src/log4cxx.properties deleted file mode 100644 index 1e373e4..0000000 --- a/src/contrib/zkfuse/src/log4cxx.properties +++ /dev/null @@ -1,28 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at - -# http://www.apache.org/licenses/LICENSE-2.0 - -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -# Set root logger level to DEBUG and its only appender to A1. -log4j.rootLogger=TRACE, A1 - -# A1 is set to be a ConsoleAppender. -log4j.appender.A1=org.apache.log4cxx.ConsoleAppender - -# A1 uses PatternLayout. -log4j.appender.A1.layout=org.apache.log4cxx.PatternLayout -log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n - -log4j.category.zkfuse=TRACE - http://git-wip-us.apache.org/repos/asf/zookeeper/blob/eab8c1eb/src/contrib/zkfuse/src/mutex.h ---------------------------------------------------------------------- diff --git a/src/contrib/zkfuse/src/mutex.h b/src/contrib/zkfuse/src/mutex.h deleted file mode 100644 index 86c4604..0000000 --- a/src/contrib/zkfuse/src/mutex.h +++ /dev/null @@ -1,169 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#ifndef __MUTEX_H__ -#define __MUTEX_H__ - -#include -#include -#include - -#include "log.h" - -START_ZKFUSE_NAMESPACE - -class Cond; - -class Mutex { - friend class Cond; - public: - Mutex() { - pthread_mutexattr_init( &m_mutexAttr ); - pthread_mutexattr_settype( &m_mutexAttr, PTHREAD_MUTEX_RECURSIVE_NP ); - pthread_mutex_init( &mutex, &m_mutexAttr ); - } - ~Mutex() { - pthread_mutex_destroy(&mutex); - pthread_mutexattr_destroy( &m_mutexAttr ); - } - void Acquire() { Lock(); } - void Release() { Unlock(); } - void Lock() { - pthread_mutex_lock(&mutex); - } - int TryLock() { - return pthread_mutex_trylock(&mutex); - } - void Unlock() { - pthread_mutex_unlock(&mutex); - } - private: - pthread_mutex_t mutex; - pthread_mutexattr_t m_mutexAttr; -}; - -class AutoLock { - public: - AutoLock(Mutex& mutex) : _mutex(mutex) { - mutex.Lock(); - } - ~AutoLock() { - _mutex.Unlock(); - } - private: - friend class AutoUnlockTemp; - Mutex& _mutex; -}; - -class AutoUnlockTemp { - public: - AutoUnlockTemp(AutoLock & autoLock) : _autoLock(autoLock) { - _autoLock._mutex.Unlock(); - } - ~AutoUnlockTemp() { - _autoLock._mutex.Lock(); - } - private: - AutoLock & _autoLock; -}; - -class Cond { - public: - Cond() { - static pthread_condattr_t attr; - static bool inited = false; - if(!inited) { - inited = true; - pthread_condattr_init(&attr); - } - pthread_cond_init(&_cond, &attr); - } - ~Cond() { - pthread_cond_destroy(&_cond); - } - - void Wait(Mutex& mutex) { - pthread_cond_wait(&_cond, &mutex.mutex); - } - - bool Wait(Mutex& mutex, long long int timeout) { - struct timeval now; - gettimeofday( &now, NULL ); - struct timespec abstime; - int64_t microSecs = now.tv_sec * 1000000LL + now.tv_usec; - microSecs += timeout * 1000; - abstime.tv_sec = microSecs / 1000000LL; - abstime.tv_nsec = (microSecs % 1000000LL) * 1000; - if (pthread_cond_timedwait(&_cond, &mutex.mutex, &abstime) == ETIMEDOUT) { - return false; - } else { - return true; - } - } - - void Signal() { - pthread_cond_signal(&_cond); - } - - private: - pthread_cond_t _cond; -}; - -/** - * A wrapper class for {@link Mutex} and {@link Cond}. - */ -class Lock { - public: - - void lock() { - m_mutex.Lock(); - } - - void unlock() { - m_mutex.Unlock(); - } - - void wait() { - m_cond.Wait( m_mutex ); - } - - bool wait(long long int timeout) { - return m_cond.Wait( m_mutex, timeout ); - } - - void notify() { - m_cond.Signal(); - } - - private: - - /** - * The mutex. - */ - Mutex m_mutex; - - /** - * The condition associated with this lock's mutex. - */ - Cond m_cond; -}; - -END_ZKFUSE_NAMESPACE - -#endif /* __MUTEX_H__ */ - http://git-wip-us.apache.org/repos/asf/zookeeper/blob/eab8c1eb/src/contrib/zkfuse/src/thread.cc ---------------------------------------------------------------------- diff --git a/src/contrib/zkfuse/src/thread.cc b/src/contrib/zkfuse/src/thread.cc deleted file mode 100644 index f1ed816..0000000 --- a/src/contrib/zkfuse/src/thread.cc +++ /dev/null @@ -1,41 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include - -#include "thread.h" - -DEFINE_LOGGER( LOG, "Thread" ) - -START_ZKFUSE_NAMESPACE - -void Thread::Create(void* ctx, ThreadFunc func) -{ - pthread_attr_t attr; - pthread_attr_init(&attr); - pthread_attr_setstacksize(&attr, _stackSize); - int ret = pthread_create(&mThread, &attr, func, ctx); - if(ret != 0) { - LOG_FATAL( LOG, "pthread_create failed: %s", strerror(errno) ); - } - // pthread_attr_destroy(&attr); - _ctx = ctx; - _func = func; -} - -END_ZKFUSE_NAMESPACE http://git-wip-us.apache.org/repos/asf/zookeeper/blob/eab8c1eb/src/contrib/zkfuse/src/thread.h ---------------------------------------------------------------------- diff --git a/src/contrib/zkfuse/src/thread.h b/src/contrib/zkfuse/src/thread.h deleted file mode 100644 index 0ed12d7..0000000 --- a/src/contrib/zkfuse/src/thread.h +++ /dev/null @@ -1,99 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#ifndef __THREAD_H__ -#define __THREAD_H__ - -#include -#include -#include -#include - -#include "log.h" - -START_ZKFUSE_NAMESPACE - -class Thread { - public: - static const size_t defaultStackSize = 1024 * 1024; - typedef void* (*ThreadFunc) (void*); - Thread(size_t stackSize = defaultStackSize) - : _stackSize(stackSize), _ctx(NULL), _func(NULL) - { - memset( &mThread, 0, sizeof(mThread) ); - } - ~Thread() { } - - void Create(void* ctx, ThreadFunc func); - void Join() { - //avoid SEGFAULT because of unitialized mThread - //in case Create(...) was never called - if (_func != NULL) { - pthread_join(mThread, 0); - } - } - private: - pthread_t mThread; - void *_ctx; - ThreadFunc _func; - size_t _stackSize; -}; - - -template -struct ThreadContext { - typedef void (T::*FuncPtr) (void); - ThreadContext(T& ctx, FuncPtr func) : _ctx(ctx), _func(func) {} - void run(void) { - (_ctx.*_func)(); - } - T& _ctx; - FuncPtr _func; -}; - -template -void* ThreadExec(void *obj) { - ThreadContext* tc = (ThreadContext*)(obj); - assert(tc != 0); - tc->run(); - return 0; -} - -template -class CXXThread : public Thread { - public: - typedef void (T::*FuncPtr) (void); - CXXThread(size_t stackSize = Thread::defaultStackSize) - : Thread(stackSize), ctx(0) {} - ~CXXThread() { if (ctx) delete ctx; } - - void Create(T& obj, FuncPtr func) { - assert(ctx == 0); - ctx = new ThreadContext(obj, func); - Thread::Create(ctx, ThreadExec); - } - - private: - ThreadContext* ctx; -}; - - -END_ZKFUSE_NAMESPACE - -#endif /* __THREAD_H__ */ - http://git-wip-us.apache.org/repos/asf/zookeeper/blob/eab8c1eb/src/contrib/zkfuse/src/zkadapter.cc ---------------------------------------------------------------------- diff --git a/src/contrib/zkfuse/src/zkadapter.cc b/src/contrib/zkfuse/src/zkadapter.cc deleted file mode 100644 index 7f02fa3..0000000 --- a/src/contrib/zkfuse/src/zkadapter.cc +++ /dev/null @@ -1,884 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include -#include - -#include "blockingqueue.h" -#include "thread.h" -#include "zkadapter.h" - -using namespace std; -using namespace zk; - -DEFINE_LOGGER( LOG, "zookeeper.adapter" ) -DEFINE_LOGGER( ZK_LOG, "zookeeper.core" ) - -/** - * \brief A helper class to initialize ZK logging. - */ -class InitZooKeeperLogging -{ - public: - InitZooKeeperLogging() { - if (ZK_LOG->isDebugEnabled() -#ifdef LOG4CXX_TRACE - || ZK_LOG->isTraceEnabled() -#endif - ) - { - zoo_set_debug_level( ZOO_LOG_LEVEL_DEBUG ); - } else if (ZK_LOG->isInfoEnabled()) { - zoo_set_debug_level( ZOO_LOG_LEVEL_INFO ); - } else if (ZK_LOG->isWarnEnabled()) { - zoo_set_debug_level( ZOO_LOG_LEVEL_WARN ); - } else { - zoo_set_debug_level( ZOO_LOG_LEVEL_ERROR ); - } - } -}; - -using namespace std; - -namespace zk -{ - -/** - * \brief This class provides logic for checking if a request can be retried. - */ -class RetryHandler -{ - public: - RetryHandler(const ZooKeeperConfig &zkConfig) - : m_zkConfig(zkConfig) - { - if (zkConfig.getAutoReconnect()) { - retries = 2; - } else { - retries = 0; - } - } - - /** - * \brief Attempts to fix a side effect of the given RC. - * - * @param rc the ZK error code - * @return whether the error code has been handled and the caller should - * retry an operation the caused this error - */ - bool handleRC(int rc) - { - TRACE( LOG, "handleRC" ); - - //check if the given error code is recoverable - if (!retryOnError(rc)) { - return false; - } - LOG_TRACE( LOG, "RC: %d, retries left: %d", rc, retries ); - if (retries-- > 0) { - return true; - } else { - return false; - } - } - - private: - /** - * The ZK config. - */ - const ZooKeeperConfig &m_zkConfig; - - /** - * The number of outstanding retries. - */ - int retries; - - /** - * Checks whether the given error entitles this adapter - * to retry the previous operation. - * - * @param zkErrorCode one of the ZK error code - */ - static bool retryOnError(int zkErrorCode) - { - return (zkErrorCode == ZCONNECTIONLOSS || - zkErrorCode == ZOPERATIONTIMEOUT); - } -}; - - -//the implementation of the global ZK event watcher -void zkWatcher(zhandle_t *zh, int type, int state, const char *path, - void *watcherCtx) -{ - TRACE( LOG, "zkWatcher" ); - - //a workaround for buggy ZK API - string sPath = - (path == NULL || - state == ZOO_SESSION_EVENT || - state == ZOO_NOTWATCHING_EVENT) - ? "" - : string(path); - LOG_INFO( LOG, - "Received a ZK event - type: %d, state: %d, path: '%s'", - type, state, sPath.c_str() ); - ZooKeeperAdapter *zka = (ZooKeeperAdapter *)zoo_get_context(zh); - if (zka != NULL) { - zka->enqueueEvent( type, state, sPath ); - } else { - LOG_ERROR( LOG, - "Skipping ZK event (type: %d, state: %d, path: '%s'), " - "because ZK passed no context", - type, state, sPath.c_str() ); - } -} - - - -// ======================================================================= - -ZooKeeperAdapter::ZooKeeperAdapter(ZooKeeperConfig config, - ZKEventListener *listener, - bool establishConnection) - throw(ZooKeeperException) - : m_zkConfig(config), - mp_zkHandle(NULL), - m_terminating(false), - m_connected(false), - m_state(AS_DISCONNECTED) -{ - TRACE( LOG, "ZooKeeperAdapter" ); - - resetRemainingConnectTimeout(); - - //enforce setting up appropriate ZK log level - static InitZooKeeperLogging INIT_ZK_LOGGING; - - if (listener != NULL) { - addListener(listener); - } - - //start the event dispatcher thread - m_eventDispatcher.Create( *this, &ZooKeeperAdapter::processEvents ); - - //start the user event dispatcher thread - m_userEventDispatcher.Create( *this, &ZooKeeperAdapter::processUserEvents ); - - //optionally establish the connection - if (establishConnection) { - reconnect(); - } -} - -ZooKeeperAdapter::~ZooKeeperAdapter() -{ - TRACE( LOG, "~ZooKeeperAdapter" ); - - try { - disconnect(); - } catch (std::exception &e) { - LOG_ERROR( LOG, - "An exception while disconnecting from ZK: %s", - e.what() ); - } - m_terminating = true; - m_userEventDispatcher.Join(); - m_eventDispatcher.Join(); -} - -void -ZooKeeperAdapter::validatePath(const string &path) throw(ZooKeeperException) -{ - TRACE( LOG, "validatePath" ); - - if (path.find( "/" ) != 0) { - throw ZooKeeperException( string("Node path must start with '/' but" - "it was '") + - path + - "'" ); - } - if (path.length() > 1) { - if (path.rfind( "/" ) == path.length() - 1) { - throw ZooKeeperException( string("Node path must not end with " - "'/' but it was '") + - path + - "'" ); - } - if (path.find( "//" ) != string::npos) { - throw ZooKeeperException( string("Node path must not contain " - "'//' but it was '") + - path + - "'" ); - } - } -} - -void -ZooKeeperAdapter::disconnect() -{ - TRACE( LOG, "disconnect" ); - LOG_TRACE( LOG, "mp_zkHandle: %p, state %d", mp_zkHandle, m_state ); - - m_stateLock.lock(); - if (mp_zkHandle != NULL) { - zookeeper_close( mp_zkHandle ); - mp_zkHandle = NULL; - setState( AS_DISCONNECTED ); - } - m_stateLock.unlock(); -} - -void -ZooKeeperAdapter::reconnect() throw(ZooKeeperException) -{ - TRACE( LOG, "reconnect" ); - - m_stateLock.lock(); - //clear the connection state - disconnect(); - - //establish a new connection to ZooKeeper - mp_zkHandle = zookeeper_init( m_zkConfig.getHosts().c_str(), - zkWatcher, - m_zkConfig.getLeaseTimeout(), - NULL, this, 0); - resetRemainingConnectTimeout(); - if (mp_zkHandle != NULL) { - setState( AS_CONNECTING ); - m_stateLock.unlock(); - } else { - m_stateLock.unlock(); - throw ZooKeeperException( - string("Unable to connect to ZK running at '") + - m_zkConfig.getHosts() + "'" ); - } - - LOG_DEBUG( LOG, "mp_zkHandle: %p, state %d", mp_zkHandle, m_state ); -} - -void -ZooKeeperAdapter::handleEvent(int type, int state, const string &path) -{ - TRACE( LOG, "handleEvent" ); - LOG_TRACE( LOG, - "type: %d, state %d, path: %s", - type, state, path.c_str() ); - Listener2Context context, context2; - //ignore internal ZK events - if (type != ZOO_SESSION_EVENT && type != ZOO_NOTWATCHING_EVENT) { - m_zkContextsMutex.Acquire(); - //check if the user context is available - if (type == ZOO_CHANGED_EVENT || type == ZOO_DELETED_EVENT) { - //we may have two types of interest here, - //in this case lets try to notify twice - context = findAndRemoveListenerContext( GET_NODE_DATA, path ); - context2 = findAndRemoveListenerContext( NODE_EXISTS, path ); - if (context.empty()) { - //make sure that the 2nd context is NULL and - // assign it to the 1st one - context = context2; - context2.clear(); - } - } else if (type == ZOO_CHILD_EVENT) { - context = findAndRemoveListenerContext( GET_NODE_CHILDREN, path ); - } else if (type == ZOO_CREATED_EVENT) { - context = findAndRemoveListenerContext( NODE_EXISTS, path ); - } - m_zkContextsMutex.Release(); - } - - handleEvent( type, state, path, context ); - if (!context2.empty()) { - handleEvent( type, state, path, context2 ); - } -} - -void -ZooKeeperAdapter::handleEvent(int type, - int state, - const string &path, - const Listener2Context &listeners) -{ - TRACE( LOG, "handleEvents" ); - - if (listeners.empty()) { - //propagate with empty context - ZKWatcherEvent event(type, state, path); - fireEvent( event ); - } else { - for (Listener2Context::const_iterator i = listeners.begin(); - i != listeners.end(); - ++i) { - ZKWatcherEvent event(type, state, path, i->second); - if (i->first != NULL) { - fireEvent( i->first, event ); - } else { - fireEvent( event ); - } - } - } -} - -void -ZooKeeperAdapter::enqueueEvent(int type, int state, const string &path) -{ - TRACE( LOG, "enqueueEvents" ); - - m_events.put( ZKWatcherEvent( type, state, path ) ); -} - -void -ZooKeeperAdapter::processEvents() -{ - TRACE( LOG, "processEvents" ); - - while (!m_terminating) { - bool timedOut = false; - ZKWatcherEvent source = m_events.take( 100, &timedOut ); - if (!timedOut) { - if (source.getType() == ZOO_SESSION_EVENT) { - LOG_INFO( LOG, - "Received SESSION event, state: %d. Adapter state: %d", - source.getState(), m_state ); - m_stateLock.lock(); - if (source.getState() == ZOO_CONNECTED_STATE) { - m_connected = true; - resetRemainingConnectTimeout(); - setState( AS_CONNECTED ); - } else if (source.getState() == ZOO_CONNECTING_STATE) { - m_connected = false; - setState( AS_CONNECTING ); - } else if (source.getState() == ZOO_EXPIRED_SESSION_STATE) { - LOG_INFO( LOG, "Received EXPIRED_SESSION event" ); - setState( AS_SESSION_EXPIRED ); - } - m_stateLock.unlock(); - } - m_userEvents.put( source ); - } - } -} - -void -ZooKeeperAdapter::processUserEvents() -{ - TRACE( LOG, "processUserEvents" ); - - while (!m_terminating) { - bool timedOut = false; - ZKWatcherEvent source = m_userEvents.take( 100, &timedOut ); - if (!timedOut) { - try { - handleEvent( source.getType(), - source.getState(), - source.getPath() ); - } catch (std::exception &e) { - LOG_ERROR( LOG, - "Unable to process event (type: %d, state: %d, " - "path: %s), because of exception: %s", - source.getType(), - source.getState(), - source.getPath().c_str(), - e.what() ); - } - } - } -} - -void -ZooKeeperAdapter::registerContext(WatchableMethod method, - const string &path, - ZKEventListener *listener, - ContextType context) -{ - TRACE( LOG, "registerContext" ); - - m_zkContexts[method][path][listener] = context; -} - -ZooKeeperAdapter::Listener2Context -ZooKeeperAdapter::findAndRemoveListenerContext(WatchableMethod method, - const string &path) -{ - TRACE( LOG, "findAndRemoveListenerContext" ); - - Listener2Context listeners; - Path2Listener2Context::iterator elem = m_zkContexts[method].find( path ); - if (elem != m_zkContexts[method].end()) { - listeners = elem->second; - m_zkContexts[method].erase( elem ); - } - return listeners; -} - -void -ZooKeeperAdapter::setState(AdapterState newState) -{ - TRACE( LOG, "setState" ); - if (newState != m_state) { - LOG_INFO( LOG, "Adapter state transition: %d -> %d", m_state, newState ); - m_state = newState; - m_stateLock.notify(); - } else { - LOG_TRACE( LOG, "New state same as the current: %d", newState ); - } -} - - -//TODO move this code to verifyConnection so reconnect() -//is called from one place only -void -ZooKeeperAdapter::waitUntilConnected() - throw(ZooKeeperException) -{ - TRACE( LOG, "waitUntilConnected" ); - long long int timeout = getRemainingConnectTimeout(); - LOG_INFO( LOG, - "Waiting up to %lld ms until a connection to ZK is established", - timeout ); - bool connected; - if (timeout > 0) { - long long int toWait = timeout; - while (m_state != AS_CONNECTED && toWait > 0) { - //check if session expired and reconnect if so - if (m_state == AS_SESSION_EXPIRED) { - LOG_INFO( LOG, - "Reconnecting because the current session has expired" ); - reconnect(); - } - struct timeval now; - gettimeofday( &now, NULL ); - int64_t milliSecs = -(now.tv_sec * 1000LL + now.tv_usec / 1000); - LOG_TRACE( LOG, "About to wait %lld ms", toWait ); - m_stateLock.wait( toWait ); - gettimeofday( &now, NULL ); - milliSecs += now.tv_sec * 1000LL + now.tv_usec / 1000; - toWait -= milliSecs; - } - waitedForConnect( timeout - toWait ); - LOG_INFO( LOG, "Waited %lld ms", timeout - toWait ); - } - connected = (m_state == AS_CONNECTED); - if (!connected) { - if (timeout > 0) { - LOG_WARN( LOG, "Timed out while waiting for connection to ZK" ); - throw ZooKeeperException("Timed out while waiting for " - "connection to ZK"); - } else { - LOG_ERROR( LOG, "Global timeout expired and still not connected to ZK" ); - throw ZooKeeperException("Global timeout expired and still not " - "connected to ZK"); - } - } - LOG_INFO( LOG, "Connected!" ); -} - -void -ZooKeeperAdapter::verifyConnection() throw(ZooKeeperException) -{ - TRACE( LOG, "verifyConnection" ); - - m_stateLock.lock(); - try { - if (m_state == AS_DISCONNECTED) { - throw ZooKeeperException("Disconnected from ZK. " \ - "Please use reconnect() before attempting to use any ZK API"); - } else if (m_state != AS_CONNECTED) { - LOG_TRACE( LOG, "Checking if need to reconnect..." ); - //we are not connected, so check if connection in progress... - if (m_state != AS_CONNECTING) { - LOG_TRACE( LOG, - "yes. Checking if allowed to auto-reconnect..." ); - //...not in progres, so check if we can reconnect - if (!m_zkConfig.getAutoReconnect()) { - //...too bad, disallowed :( - LOG_TRACE( LOG, "no. Sorry." ); - throw ZooKeeperException("ZK connection is down and " - "auto-reconnect is not allowed"); - } else { - LOG_TRACE( LOG, "...yes. About to reconnect" ); - } - //...we are good to retry the connection - reconnect(); - } else { - LOG_TRACE( LOG, "...no, already in CONNECTING state" ); - } - //wait until the connection is established - waitUntilConnected(); - } - } catch (ZooKeeperException &e) { - m_stateLock.unlock(); - throw; - } - m_stateLock.unlock(); -} - -bool -ZooKeeperAdapter::createNode(const string &path, - const string &value, - int flags, - bool createAncestors, - string &returnPath) - throw(ZooKeeperException) -{ - TRACE( LOG, "createNode (internal)" ); - validatePath( path ); - - const int MAX_PATH_LENGTH = 1024; - char realPath[MAX_PATH_LENGTH]; - realPath[0] = 0; - - int rc; - RetryHandler rh(m_zkConfig); - do { - verifyConnection(); - rc = zoo_create( mp_zkHandle, - path.c_str(), - value.c_str(), - value.length(), - &ZOO_OPEN_ACL_UNSAFE, - flags, - realPath, - MAX_PATH_LENGTH ); - } while (rc != ZOK && rh.handleRC(rc)); - if (rc != ZOK) { - if (rc == ZNODEEXISTS) { - //the node already exists - LOG_WARN( LOG, "Error %d for %s", rc, path.c_str() ); - return false; - } else if (rc == ZNONODE && createAncestors) { - LOG_WARN( LOG, "Error %d for %s", rc, path.c_str() ); - //one of the ancestors doesn't exist so lets start from the root - //and make sure the whole path exists, creating missing nodes if - //necessary - for (string::size_type pos = 1; pos != string::npos; ) { - pos = path.find( "/", pos ); - if (pos != string::npos) { - try { - createNode( path.substr( 0, pos ), "", 0, true ); - } catch (ZooKeeperException &e) { - throw ZooKeeperException( string("Unable to create " - "node ") + - path, - rc ); - } - pos++; - } else { - //no more path components - return createNode( path, value, flags, false, returnPath ); - } - } - } - LOG_ERROR( LOG,"Error %d for %s", rc, path.c_str() ); - throw ZooKeeperException( string("Unable to create node ") + - path, - rc ); - } else { - LOG_INFO( LOG, "%s has been created", realPath ); - returnPath = string( realPath ); - return true; - } -} - -bool -ZooKeeperAdapter::createNode(const string &path, - const string &value, - int flags, - bool createAncestors) - throw(ZooKeeperException) -{ - TRACE( LOG, "createNode" ); - - string createdPath; - return createNode( path, value, flags, createAncestors, createdPath ); -} - -int64_t -ZooKeeperAdapter::createSequence(const string &path, - const string &value, - int flags, - bool createAncestors) - throw(ZooKeeperException) -{ - TRACE( LOG, "createSequence" ); - - string createdPath; - bool result = createNode( path, - value, - flags | ZOO_SEQUENCE, - createAncestors, - createdPath ); - if (!result) { - return -1; - } else { - //extract sequence number from the returned path - if (createdPath.find( path ) != 0) { - throw ZooKeeperException( string("Expecting returned path '") + - createdPath + - "' to start with '" + - path + - "'" ); - } - string seqSuffix = - createdPath.substr( path.length(), - createdPath.length() - path.length() ); - char *ptr = NULL; - int64_t seq = strtol( seqSuffix.c_str(), &ptr, 10 ); - if (ptr != NULL && *ptr != '\0') { - throw ZooKeeperException( string("Expecting a number but got ") + - seqSuffix ); - } - return seq; - } -} - -bool -ZooKeeperAdapter::deleteNode(const string &path, - bool recursive, - int version) - throw(ZooKeeperException) -{ - TRACE( LOG, "deleteNode" ); - - validatePath( path ); - - int rc; - RetryHandler rh(m_zkConfig); - do { - verifyConnection(); - rc = zoo_delete( mp_zkHandle, path.c_str(), version ); - } while (rc != ZOK && rh.handleRC(rc)); - if (rc != ZOK) { - if (rc == ZNONODE) { - LOG_WARN( LOG, "Error %d for %s", rc, path.c_str() ); - return false; - } - if (rc == ZNOTEMPTY && recursive) { - LOG_WARN( LOG, "Error %d for %s", rc, path.c_str() ); - //get all children and delete them recursively... - vector nodeList; - getNodeChildren( nodeList, path, NULL ); - for (vector::const_iterator i = nodeList.begin(); - i != nodeList.end(); - ++i) { - deleteNode( *i, true ); - } - //...and finally attempt to delete the node again - return deleteNode( path, false ); - } - LOG_ERROR( LOG, "Error %d for %s", rc, path.c_str() ); - throw ZooKeeperException( string("Unable to delete node ") + path, - rc ); - } else { - LOG_INFO( LOG, "%s has been deleted", path.c_str() ); - return true; - } -} - -bool -ZooKeeperAdapter::nodeExists(const string &path, - ZKEventListener *listener, - void *context, Stat *stat) - throw(ZooKeeperException) -{ - TRACE( LOG, "nodeExists" ); - - validatePath( path ); - - struct Stat tmpStat; - if (stat == NULL) { - stat = &tmpStat; - } - memset( stat, 0, sizeof(Stat) ); - - int rc; - RetryHandler rh(m_zkConfig); - do { - verifyConnection(); - if (context != NULL) { - m_zkContextsMutex.Acquire(); - rc = zoo_exists( mp_zkHandle, - path.c_str(), - (listener != NULL ? 1 : 0), - stat ); - if (rc == ZOK || rc == ZNONODE) { - registerContext( NODE_EXISTS, path, listener, context ); - } - m_zkContextsMutex.Release(); - } else { - rc = zoo_exists( mp_zkHandle, - path.c_str(), - (listener != NULL ? 1 : 0), - stat ); - } - } while (rc != ZOK && rh.handleRC(rc)); - if (rc != ZOK) { - if (rc == ZNONODE) { - LOG_TRACE( LOG, "Node %s does not exist", path.c_str() ); - return false; - } - LOG_ERROR( LOG, "Error %d for %s", rc, path.c_str() ); - throw ZooKeeperException( - string("Unable to check existence of node ") + path, - rc ); - } else { - return true; - } -} - -void -ZooKeeperAdapter::getNodeChildren(vector &nodeList, - const string &path, - ZKEventListener *listener, - void *context) - throw (ZooKeeperException) -{ - TRACE( LOG, "getNodeChildren" ); - - validatePath( path ); - - String_vector children; - memset( &children, 0, sizeof(children) ); - - int rc; - RetryHandler rh(m_zkConfig); - do { - verifyConnection(); - if (context != NULL) { - m_zkContextsMutex.Acquire(); - rc = zoo_get_children( mp_zkHandle, - path.c_str(), - (listener != NULL ? 1 : 0), - &children ); - if (rc == ZOK) { - registerContext( GET_NODE_CHILDREN, path, listener, context ); - } - m_zkContextsMutex.Release(); - } else { - rc = zoo_get_children( mp_zkHandle, - path.c_str(), - (listener != NULL ? 1 : 0), - &children ); - } - } while (rc != ZOK && rh.handleRC(rc)); - if (rc != ZOK) { - LOG_ERROR( LOG, "Error %d for %s", rc, path.c_str() ); - throw ZooKeeperException( string("Unable to get children of node ") + - path, - rc ); - } else { - for (int i = 0; i < children.count; ++i) { - //convert each child's path from relative to absolute - string absPath(path); - if (path != "/") { - absPath.append( "/" ); - } - absPath.append( children.data[i] ); - nodeList.push_back( absPath ); - } - //make sure the order is always deterministic - sort( nodeList.begin(), nodeList.end() ); - } -} - -string -ZooKeeperAdapter::getNodeData(const string &path, - ZKEventListener *listener, - void *context, Stat *stat) - throw(ZooKeeperException) -{ - TRACE( LOG, "getNodeData" ); - - validatePath( path ); - - const int MAX_DATA_LENGTH = 128 * 1024; - char buffer[MAX_DATA_LENGTH]; - memset( buffer, 0, MAX_DATA_LENGTH ); - struct Stat tmpStat; - if (stat == NULL) { - stat = &tmpStat; - } - memset( stat, 0, sizeof(Stat) ); - - int rc; - int len; - RetryHandler rh(m_zkConfig); - do { - verifyConnection(); - len = MAX_DATA_LENGTH - 1; - if (context != NULL) { - m_zkContextsMutex.Acquire(); - rc = zoo_get( mp_zkHandle, - path.c_str(), - (listener != NULL ? 1 : 0), - buffer, &len, stat ); - if (rc == ZOK) { - registerContext( GET_NODE_DATA, path, listener, context ); - } - m_zkContextsMutex.Release(); - } else { - rc = zoo_get( mp_zkHandle, - path.c_str(), - (listener != NULL ? 1 : 0), - buffer, &len, stat ); - } - } while (rc != ZOK && rh.handleRC(rc)); - if (rc != ZOK) { - LOG_ERROR( LOG, "Error %d for %s", rc, path.c_str() ); - throw ZooKeeperException( - string("Unable to get data of node ") + path, rc - ); - } else { - if (len == -1) { - len = 0; - } - return string( buffer, len ); - } -} - -void -ZooKeeperAdapter::setNodeData(const string &path, - const string &value, - int version) - throw(ZooKeeperException) -{ - TRACE( LOG, "setNodeData" ); - - validatePath( path ); - - int rc; - RetryHandler rh(m_zkConfig); - do { - verifyConnection(); - rc = zoo_set( mp_zkHandle, - path.c_str(), - value.c_str(), - value.length(), - version); - } while (rc != ZOK && rh.handleRC(rc)); - if (rc != ZOK) { - LOG_ERROR( LOG, "Error %d for %s", rc, path.c_str() ); - throw ZooKeeperException( string("Unable to set data for node ") + - path, - rc ); - } -} - -} /* end of 'namespace zk' */ - http://git-wip-us.apache.org/repos/asf/zookeeper/blob/eab8c1eb/src/contrib/zkfuse/src/zkadapter.h ---------------------------------------------------------------------- diff --git a/src/contrib/zkfuse/src/zkadapter.h b/src/contrib/zkfuse/src/zkadapter.h deleted file mode 100644 index 8d4d1d5..0000000 --- a/src/contrib/zkfuse/src/zkadapter.h +++ /dev/null @@ -1,718 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#ifndef __ZKADAPTER_H__ -#define __ZKADAPTER_H__ - -#include -#include -#include - -extern "C" { -#include "zookeeper.h" -} - -#include "log.h" -#include "mutex.h" -#include "thread.h" -#include "blockingqueue.h" -#include "event.h" - -using namespace std; -using namespace zkfuse; - -namespace zk { - -/** - * \brief A cluster related exception. - */ -class ZooKeeperException : - public std::exception -{ - public: - - /** - * \brief Constructor. - * - * @param msg the detailed message associated with this exception - */ - ZooKeeperException(const string &msg) : - m_message(msg), m_zkErrorCode(0) - {} - - /** - * \brief Constructor. - * - * @param msg the detailed message associated with this exception - * @param errorCode the ZK error code associated with this exception - */ - ZooKeeperException(const string &msg, int errorCode) : - m_zkErrorCode(errorCode) - { - char tmp[100]; - sprintf( tmp, " (ZK error code: %d)", errorCode ); - m_message = msg + tmp; - } - - /** - * \brief Destructor. - */ - ~ZooKeeperException() throw() {} - - /** - * \brief Returns detailed description of the exception. - */ - const char *what() const throw() { - return m_message.c_str(); - } - - /** - * \brief Returns the ZK error code. - */ - int getZKErrorCode() const { - return m_zkErrorCode; - } - - private: - - /** - * The detailed message associated with this exception. - */ - string m_message; - - /** - * The optional error code received from ZK. - */ - int m_zkErrorCode; - -}; - -/** - * \brief This class encapsulates configuration of a ZK client. - */ -class ZooKeeperConfig -{ - public: - - /** - * \brief Constructor. - * - * @param hosts the comma separated list of host and port pairs of ZK nodes - * @param leaseTimeout the lease timeout (heartbeat) - * @param autoReconnect whether to allow for auto-reconnect - * @param connectTimeout the connect timeout, in milliseconds; - */ - ZooKeeperConfig(const string &hosts, - int leaseTimeout, - bool autoReconnect = true, - long long int connectTimeout = 15000) : - m_hosts(hosts), m_leaseTimeout(leaseTimeout), - m_autoReconnect(autoReconnect), m_connectTimeout(connectTimeout) {} - - /** - * \brief Returns the list of ZK hosts to connect to. - */ - string getHosts() const { return m_hosts; } - - /** - * \brief Returns the lease timeout. - */ - int getLeaseTimeout() const { return m_leaseTimeout; } - - /** - * \brief Returns whether {@link ZooKeeperAdapter} should attempt - * \brief to automatically reconnect in case of a connection failure. - */ - bool getAutoReconnect() const { return m_autoReconnect; } - - /** - * \brief Gets the connect timeout. - * - * @return the connect timeout - */ - long long int getConnectTimeout() const { return m_connectTimeout; } - - private: - - /** - * The host addresses of ZK nodes. - */ - const string m_hosts; - - /** - * The ZK lease timeout. - */ - const int m_leaseTimeout; - - /** - * True if this adapater should attempt to autoreconnect in case - * the current session has been dropped. - */ - const bool m_autoReconnect; - - /** - * How long to wait, in milliseconds, before a connection - * is established to ZK. - */ - const long long int m_connectTimeout; - -}; - -/** - * \brief A data value object representing a watcher event received from the ZK. - */ -class ZKWatcherEvent -{ - public: - - /** - * \brief The type representing the user's context. - */ - typedef void *ContextType; - - /** - * \brief Constructor. - * - * @param type the type of this event - * @param state the state of this event - * @param path the corresponding path, may be empty for some event types - * @param context the user specified context; possibly NULL - */ - ZKWatcherEvent() : - m_type(-1), m_state(-1), m_path(""), mp_context(NULL) {} - - /** - * \brief Constructor. - * - * @param type the type of this event - * @param state the state of this event - * @param path the corresponding path, may be empty for some event types - * @param context the user specified context; possibly NULL - */ - ZKWatcherEvent(int type, int state, const string &path, - ContextType context = NULL) : - m_type(type), m_state(state), m_path(path), mp_context(context) {} - - int getType() const { return m_type; } - int getState() const { return m_state; } - string const &getPath() const { return m_path; } - ContextType getContext() const { return mp_context; } - - bool operator==(const ZKWatcherEvent &we) const { - return m_type == we.m_type && m_state == we.m_state - && m_path == we.m_path && mp_context == we.mp_context; - } - - private: - - /** - * The type of this event. It can be either ZOO_CREATED_EVENT, ZOO_DELETED_EVENT, - * ZOO_CHANGED_EVENT, ZOO_CHILD_EVENT, ZOO_SESSION_EVENT or ZOO_NOTWATCHING_EVENT. - * See zookeeper.h for more details. - */ - const int m_type; - - /** - * The state of ZK at the time of sending this event. - * It can be either ZOO_CONNECTING_STATE, ZOO_ASSOCIATING_STATE, - * ZOO_CONNECTED_STATE, ZOO_EXPIRED_SESSION_STATE or AUTH_FAILED_STATE. - * See {@file zookeeper.h} for more details. - */ - const int m_state; - - /** - * The corresponding path of the node in subject. It may be empty - * for some event types. - */ - const string m_path; - - /** - * The pointer to the user specified context, possibly NULL. - */ - ContextType mp_context; - -}; - -/** - * \brief The type definition of ZK event source. - */ -typedef EventSource ZKEventSource; - -/** - * \brief The type definition of ZK event listener. - */ -typedef EventListener ZKEventListener; - -/** - * \brief This is a wrapper around ZK C synchrounous API. - */ -class ZooKeeperAdapter - : public ZKEventSource -{ - public: - /** - * \brief The global function that handles all ZK asynchronous notifications. - */ - friend void zkWatcher(zhandle_t *, int, int, const char *, void *watcherCtx); - - /** - * \brief The type representing the user's context. - */ - typedef void *ContextType; - - /** - * \brief The map type of ZK event listener to user specified context mapping. - */ - typedef map Listener2Context; - - /** - * \brief The map type of ZK path's to listener's contexts. - */ - typedef map Path2Listener2Context; - - /** - * \brief All possible states of this client, in respect to - * \brief connection to the ZK server. - */ - enum AdapterState { - //mp_zkHandle is NULL - AS_DISCONNECTED = 0, - //mp_zkHandle is valid but this client is reconnecting - AS_CONNECTING, - //mp_zkHandle is valid and this client is connected - AS_CONNECTED, - //mp_zkHandle is valid, however no more calls can be made to ZK API - AS_SESSION_EXPIRED - }; - - /** - * \brief Constructor. - * Attempts to create a ZK adapter, optionally connecting - * to the ZK. Note, that if the connection is to be established - * and the given listener is NULL, some events may be lost, - * as they may arrive asynchronously before this method finishes. - * - * @param config the ZK configuration - * @param listener the event listener to be used for listening - * on incoming ZK events; - * if NULL not used - * @param establishConnection whether to establish connection to the ZK - * - * @throw ZooKeeperException if cannot establish connection to the given ZK - */ - ZooKeeperAdapter(ZooKeeperConfig config, - ZKEventListener *listener = NULL, - bool establishConnection = false) - throw(ZooKeeperException); - - /** - * \brief Destructor. - */ - ~ZooKeeperAdapter(); - - /** - * \brief Returns the current config. - */ - const ZooKeeperConfig &getZooKeeperConfig() const { - return m_zkConfig; - } - - /** - * \brief Restablishes connection to the ZK. - * If this adapter is already connected, the current connection - * will be dropped and a new connection will be established. - * - * @throw ZooKeeperException if cannot establish connection to the ZK - */ - void reconnect() throw(ZooKeeperException); - - /** - * \brief Disconnects from the ZK and unregisters {@link #mp_zkHandle}. - */ - void disconnect(); - - /** - * \brief Creates a new node identified by the given path. - * This method will optionally attempt to create all missing ancestors. - * - * @param path the absolute path name of the node to be created - * @param value the initial value to be associated with the node - * @param flags the ZK flags of the node to be created - * @param createAncestors if true and there are some missing ancestor nodes, - * this method will attempt to create them - * - * @return true if the node has been successfully created; false otherwise - * @throw ZooKeeperException if the operation has failed - */ - bool createNode(const string &path, - const string &value = "", - int flags = 0, - bool createAncestors = true) - throw(ZooKeeperException); - - /** - * \brief Creates a new sequence node using the give path as the prefix. - * This method will optionally attempt to create all missing ancestors. - * - * @param path the absolute path name of the node to be created; - * @param value the initial value to be associated with the node - * @param flags the ZK flags of the sequence node to be created - * (in addition to SEQUENCE) - * @param createAncestors if true and there are some missing ancestor - * nodes, this method will attempt to create them - * - * @return the sequence number associate with newly created node, - * or -1 if it couldn't be created - * @throw ZooKeeperException if the operation has failed - */ - int64_t createSequence(const string &path, - const string &value = "", - int flags = 0, - bool createAncestors = true) - throw(ZooKeeperException); - - /** - * \brief Deletes a node identified by the given path. - * - * @param path the absolute path name of the node to be deleted - * @param recursive if true this method will attempt to remove - * all children of the given node if any exist - * @param version the expected version of the node. The function will - * fail if the actual version of the node does not match - * the expected version - * - * @return true if the node has been deleted; false otherwise - * @throw ZooKeeperException if the operation has failed - */ - bool deleteNode(const string &path, bool recursive = false, int version = -1) - throw(ZooKeeperException); - - /** - * \brief Checks whether the given node exists or not. - * - * @param path the absolute path name of the node to be checked - * @param listener the listener for ZK watcher events; - * passing non NULL effectively establishes - * a ZK watch on the given node - * @param context the user specified context that is to be passed - * in a corresponding {@link ZKWatcherEvent} at later time; - * not used if listener is NULL - * @param stat the optional node statistics to be filled in by ZK - * - * @return true if the given node exists; false otherwise - * @throw ZooKeeperException if the operation has failed - */ - bool nodeExists(const string &path, - ZKEventListener *listener = NULL, - void *context = NULL, - Stat *stat = NULL) - throw(ZooKeeperException); - - /** - * \brief Retrieves list of all children of the given node. - * - * @param path the absolute path name of the node for which to get children - * @param listener the listener for ZK watcher events; - * passing non NULL effectively establishes - * a ZK watch on the given node - * @param context the user specified context that is to be passed - * in a corresponding {@link ZKWatcherEvent} at later time; - * not used if listener is NULL - * - * @return the list of absolute paths of child nodes, possibly empty - * @throw ZooKeeperException if the operation has failed - */ - void getNodeChildren(vector &children, - const string &path, - ZKEventListener *listener = NULL, - void *context = NULL) - throw(ZooKeeperException); - - /** - * \brief Gets the given node's data. - * - * @param path the absolute path name of the node to get data from - * @param listener the listener for ZK watcher events; - * passing non NULL effectively establishes - * a ZK watch on the given node - * @param context the user specified context that is to be passed - * in a corresponding {@link ZKWatcherEvent} at later time; - * not used if listener is NULL - * @param stat the optional node statistics to be filled in by ZK - * - * @return the node's data - * @throw ZooKeeperException if the operation has failed - */ - string getNodeData(const string &path, - ZKEventListener *listener = NULL, - void *context = NULL, - Stat *stat = NULL) - throw(ZooKeeperException); - - /** - * \brief Sets the given node's data. - * - * @param path the absolute path name of the node to get data from - * @param value the node's data to be set - * @param version the expected version of the node. The function will - * fail if the actual version of the node does not match - * the expected version - * - * @throw ZooKeeperException if the operation has failed - */ - void setNodeData(const string &path, const string &value, int version = -1) - throw(ZooKeeperException); - - /** - * \brief Validates the given path to a node in ZK. - * - * @param the path to be validated - * - * @throw ZooKeeperException if the given path is not valid - * (for instance it doesn't start with "/") - */ - static void validatePath(const string &path) throw(ZooKeeperException); - - /** - * Returns the current state of this adapter. - * - * @return the current state of this adapter - * @see AdapterState - */ - AdapterState getState() const { - return m_state; - } - - private: - - /** - * This enum defines methods from this class than can trigger an event. - */ - enum WatchableMethod { - NODE_EXISTS = 0, - GET_NODE_CHILDREN, - GET_NODE_DATA - }; - - /** - * \brief Creates a new node identified by the given path. - * This method is used internally to implement {@link createNode(...)} - * and {@link createSequence(...)}. On success, this method will set - * createdPath. - * - * @param path the absolute path name of the node to be created - * @param value the initial value to be associated with the node - * @param flags the ZK flags of the node to be created - * @param createAncestors if true and there are some missing ancestor nodes, - * this method will attempt to create them - * @param createdPath the actual path of the node that has been created; - * useful for sequences - * - * @return true if the node has been successfully created; false otherwise - * @throw ZooKeeperException if the operation has failed - */ - bool createNode(const string &path, - const string &value, - int flags, - bool createAncestors, - string &createdPath) - throw(ZooKeeperException); - - /** - * Handles an asynchronous event received from the ZK. - */ - void handleEvent(int type, int state, const string &path); - - /** - * Handles an asynchronous event received from the ZK. - * This method iterates over all listeners and passes the event - * to each of them. - */ - void handleEvent(int type, int state, const string &path, - const Listener2Context &listeners); - - /** - * \brief Enqueues the given event in {@link #m_events} queue. - */ - void enqueueEvent(int type, int state, const string &path); - - /** - * \brief Processes all ZK adapter events in a loop. - */ - void processEvents(); - - /** - * \brief Processes all user events in a loop. - */ - void processUserEvents(); - - /** - * \brief Registers the given context in the {@link #m_zkContexts} - * \brief contexts map. - * - * @param method the method where the given path is being used - * @param path the path of interest - * @param listener the event listener to call back later on - * @param context the user specified context to be passed back to user - */ - void registerContext(WatchableMethod method, const string &path, - ZKEventListener *listener, ContextType context); - - /** - * \brief Attempts to find a listener to context map in the contexts' - * \brief map, based on the specified criteria. - * If the context is found, it will be removed the udnerlying map. - * - * @param method the method type identify Listener2Context map - * @param path the path to be used to search in the Listener2Context map - * - * @return the context map associated with the given method and path, - * or empty map if not found - */ - Listener2Context findAndRemoveListenerContext(WatchableMethod method, - const string &path); - - /** - * Sets the new state in case it's different then the current one. - * This method assumes that {@link #m_stateLock} has been already locked. - * - * @param newState the new state to be set - */ - void setState(AdapterState newState); - - /** - * Waits until this client gets connected. The total wait time - * is given by {@link getRemainingConnectTimeout()}. - * If a timeout elapses, this method will throw an exception. - * - * @throw ZooKeeperException if unable to connect within the given timeout - */ - void waitUntilConnected() - throw(ZooKeeperException); - - /** - * Verifies whether the connection is established, - * optionally auto reconnecting. - * - * @throw ZooKeeperConnection if this client is disconnected - * and auto-reconnect failed or was not allowed - */ - void verifyConnection() throw(ZooKeeperException); - - /** - * Returns the remaining connect timeout. The timeout resets - * to {@link #m_connectTimeout} on a successfull connection to the ZK. - * - * @return the remaining connect timeout, in milliseconds - */ - long long int getRemainingConnectTimeout() { - return m_remainingConnectTimeout; - } - - /** - * Resets the remaining connect timeout to {@link #m_connectTimeout}. - */ - void resetRemainingConnectTimeout() { - m_remainingConnectTimeout = m_zkConfig.getConnectTimeout(); - } - - /** - * Updates the remaining connect timeout to reflect the given wait time. - * - * @param time the time for how long waited so far on connect to succeed - */ - void waitedForConnect(long long time) { - m_remainingConnectTimeout -= time; - } - - private: - - /** - * The mutex use to protect {@link #m_zkContexts}. - */ - zkfuse::Mutex m_zkContextsMutex; - - /** - * The map of registered ZK paths that are being watched. - * Each entry maps a function type to another map of registered contexts. - * - * @see WatchableMethod - */ - map m_zkContexts; - - /** - * The current ZK configuration. - */ - const ZooKeeperConfig m_zkConfig; - - /** - * The current ZK session. - */ - zhandle_t *mp_zkHandle; - - /** - * The blocking queue of all events waiting to be processed by ZK adapter. - */ - BlockingQueue m_events; - - /** - * The blocking queue of all events waiting to be processed by users - * of ZK adapter. - */ - BlockingQueue m_userEvents; - - /** - * The thread that dispatches all events from {@link #m_events} queue. - */ - CXXThread m_eventDispatcher; - - /** - * The thread that dispatches all events from {@link #m_userEvents} queue. - */ - CXXThread m_userEventDispatcher; - - /** - * Whether {@link #m_eventDispatcher} is terminating. - */ - volatile bool m_terminating; - - /** - * Whether this adapter is connected to the ZK. - */ - volatile bool m_connected; - - /** - * The state of this adapter. - */ - AdapterState m_state; - - /** - * The lock used to synchronize access to {@link #m_state}. - */ - Lock m_stateLock; - - /** - * How much time left for the connect to succeed, in milliseconds. - */ - long long int m_remainingConnectTimeout; - -}; - -} /* end of 'namespace zk' */ - -#endif /* __ZKADAPTER_H__ */