bookkeeper-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From si...@apache.org
Subject [28/49] bookkeeper git commit: BOOKKEEPER-769: Remove the Hedwig Code
Date Wed, 16 Mar 2016 03:44:38 GMT
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/cpp/lib/subscriberimpl.cpp
----------------------------------------------------------------------
diff --git a/hedwig-client/src/main/cpp/lib/subscriberimpl.cpp b/hedwig-client/src/main/cpp/lib/subscriberimpl.cpp
deleted file mode 100644
index 8573cea..0000000
--- a/hedwig-client/src/main/cpp/lib/subscriberimpl.cpp
+++ /dev/null
@@ -1,687 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifdef HAVE_CONFIG_H
-#include <config.h>
-#endif
-
-#include <string>
-#include <boost/asio.hpp>
-#include <boost/date_time/posix_time/posix_time.hpp>
-
-#include <log4cxx/logger.h>
-
-#include "subscriberimpl.h"
-#include "util.h"
-#include "channel.h"
-#include "filterablemessagehandler.h"
-
-static log4cxx::LoggerPtr logger(log4cxx::Logger::getLogger("hedwig."__FILE__));
-
-using namespace Hedwig;
-const int DEFAULT_MESSAGE_CONSUME_RETRY_WAIT_TIME = 5000;
-const int DEFAULT_SUBSCRIBER_CONSUME_RETRY_WAIT_TIME = 5000;
-const int DEFAULT_RECONNECT_SUBSCRIBE_RETRY_WAIT_TIME = 5000;
-const bool DEFAULT_SUBSCRIBER_AUTOCONSUME = true;
-const int DEFAULT_SUBSCRIPTION_MESSAGE_BOUND = 0;
-
-static const std::type_info& RESUBSCRIBE_EXCEPTION_TYPEID = typeid(ResubscribeException());
-
-ConsumeWriteCallback::ConsumeWriteCallback(const ActiveSubscriberPtr& activeSubscriber,
-                                           const PubSubDataPtr& data,
-                                           int retrywait)
-  : activeSubscriber(activeSubscriber), data(data), retrywait(retrywait) {
-}
-
-ConsumeWriteCallback::~ConsumeWriteCallback() {
-}
-
-/* static */ void ConsumeWriteCallback::timerComplete(
-                  const ActiveSubscriberPtr& activeSubscriber,
-                  const PubSubDataPtr& data,
-                  const boost::system::error_code& error) {
-  if (error) {
-    // shutting down
-    return;
-  }
-
-  activeSubscriber->consume(data->getMessageSeqId());
-}
-
-void ConsumeWriteCallback::operationComplete() {
-  LOG4CXX_DEBUG(logger, "Successfully wrote consume transaction: " << data->getTxnId());
-}
-
-void ConsumeWriteCallback::operationFailed(const std::exception& exception) {
-  LOG4CXX_ERROR(logger, "Error writing consume request (topic:" << data->getTopic()
-                     << ", subscriber:" << data->getSubscriberId() << ", txn:" << data->getTxnId()
-                     << ") : " << exception.what() << ", will be tried in "
-                     << retrywait << " milliseconds");
-
-  boost::asio::deadline_timer t(activeSubscriber->getChannel()->getService(),
-                                boost::posix_time::milliseconds(retrywait));
-}
-
-SubscriberConsumeCallback::SubscriberConsumeCallback(const DuplexChannelManagerPtr& channelManager,
-                                                     const ActiveSubscriberPtr& activeSubscriber,
-                                                     const PubSubResponsePtr& m)
-  : channelManager(channelManager), activeSubscriber(activeSubscriber), m(m) {
-}
-
-void SubscriberConsumeCallback::operationComplete() {
-  LOG4CXX_DEBUG(logger, "ConsumeCallback::operationComplete " << *activeSubscriber);
-
-  if (channelManager->getConfiguration().getBool(Configuration::SUBSCRIBER_AUTOCONSUME,
-                                                 DEFAULT_SUBSCRIBER_AUTOCONSUME)) {
-    activeSubscriber->consume(m->message().msgid());
-  }
-}
-
-/* static */ void SubscriberConsumeCallback::timerComplete(
-                 const ActiveSubscriberPtr activeSubscriber,
-                 const PubSubResponsePtr m,
-                 const boost::system::error_code& error) {
-  if (error) {
-    return;
-  }
-  activeSubscriber->deliverMessage(m);
-}
-
-void SubscriberConsumeCallback::operationFailed(const std::exception& exception) {
-  LOG4CXX_ERROR(logger, "ConsumeCallback::operationFailed  " << *activeSubscriber);
-
-  int retrywait = channelManager->getConfiguration()
-                  .getInt(Configuration::SUBSCRIBER_CONSUME_RETRY_WAIT_TIME,
-                          DEFAULT_SUBSCRIBER_CONSUME_RETRY_WAIT_TIME);
-
-  LOG4CXX_ERROR(logger, "Error passing message to client for " << *activeSubscriber << " error: "
-                        << exception.what() << " retrying in " << retrywait << " Microseconds");
-
-  // We leverage same io service for retrying delivering messages.
-  AbstractDuplexChannelPtr ch = activeSubscriber->getChannel();
-  boost::asio::deadline_timer t(ch->getService(), boost::posix_time::milliseconds(retrywait));
-
-  t.async_wait(boost::bind(&SubscriberConsumeCallback::timerComplete,
-                           activeSubscriber, m, boost::asio::placeholders::error));
-}
-
-CloseSubscriptionForUnsubscribeCallback::CloseSubscriptionForUnsubscribeCallback(
-  const DuplexChannelManagerPtr& channelManager, const std::string& topic,
-  const std::string& subscriberId, const OperationCallbackPtr& unsubCb)
-  : channelManager(channelManager), topic(topic), subscriberId(subscriberId), unsubCb(unsubCb) {
-}
-
-void CloseSubscriptionForUnsubscribeCallback::operationComplete() {
-  ResponseCallbackPtr respCallback(new ResponseCallbackAdaptor(unsubCb));
-  PubSubDataPtr data = PubSubData::forUnsubscribeRequest(channelManager->nextTxnId(),
-                                                         subscriberId, topic, respCallback);
-  // submit the unsubscribe request
-  channelManager->submitOp(data);
-}
-
-void CloseSubscriptionForUnsubscribeCallback::operationFailed(const std::exception& exception) {
-  unsubCb->operationFailed(exception);
-}
-
-ResubscribeCallback::ResubscribeCallback(const ActiveSubscriberPtr& activeSubscriber)
-  : activeSubscriber(activeSubscriber) {
-}
-
-void ResubscribeCallback::operationComplete(const ResponseBody & resp) {
-  // handover delivery to resubscribed subscriber.
-  activeSubscriber->handoverDelivery();
-}
-
-void ResubscribeCallback::operationFailed(const std::exception& exception) {
-  if (RESUBSCRIBE_EXCEPTION_TYPEID == typeid(exception)) {
-    // it might be caused by closesub when resubscribing.
-    // so we don't need to retry resubscribe again
-    LOG4CXX_WARN(logger, "Failed to resubscribe " << *activeSubscriber
-                         << " : but it is caused by closesub when resubscribing. "
-                         << "so we don't need to retry subscribe again.");
-    return;
-  }
-  LOG4CXX_ERROR(logger, "Failed to resubscribe " << *activeSubscriber << ", will retry later : "
-                        << exception.what());
-  activeSubscriber->resubscribe();
-}
-
-ActiveSubscriber::ActiveSubscriber(const PubSubDataPtr& data,
-                                   const AbstractDuplexChannelPtr& channel,
-                                   const SubscriptionPreferencesPtr& preferences,
-                                   const DuplexChannelManagerPtr& channelManager)
-  : channel(channel), deliverystate(STOPPED_DELIVERY), origData(data),
-    preferences(preferences), channelManager(channelManager), should_wait(false) {
-  LOG4CXX_DEBUG(logger, "Creating ActiveSubscriber " << this << " for (topic:"
-                        << data->getTopic() << ", subscriber:" << data->getSubscriberId() << ").");
-}
-
-const std::string& ActiveSubscriber::getTopic() const {
-  return origData->getTopic();
-}
-
-const std::string& ActiveSubscriber::getSubscriberId() const {
-  return origData->getSubscriberId();
-}
-
-void ActiveSubscriber::deliverMessage(const PubSubResponsePtr& m) {
-  boost::lock_guard<boost::shared_mutex> lock(queue_lock);
-
-  LOG4CXX_INFO(logger, "Message received (topic:" << origData->getTopic() << ", subscriberId:"
-                       << origData->getSubscriberId() << ", msgId:" << m->message().msgid().localcomponent()
-                       << ") from channel " << channel.get());
-
-  if (this->handler.get()) {
-    OperationCallbackPtr callback(new SubscriberConsumeCallback(channelManager, shared_from_this(), m));
-    this->handler->consume(origData->getTopic(), origData->getSubscriberId(), m->message(), callback);
-  } else {
-    queueMessage(m);
-  }
-}
-
-void ActiveSubscriber::queueMessage(const PubSubResponsePtr& m) {
-  queue.push_back(m);
-}
-
-void ActiveSubscriber::startDelivery(const MessageHandlerCallbackPtr& origHandler,
-                                     const ClientMessageFilterPtr& origFilter) {
-  // check delivery state to avoid dealock when calling startdelivery/stopdelivery
-  // in message handler.
-  // STOPPED_DELIVERY => STARTED_DELIVERY (only one could start delivery)
-  {
-    boost::lock_guard<boost::shared_mutex> lock(deliverystate_lock);
-    if (STARTED_DELIVERY == deliverystate) {
-      LOG4CXX_ERROR(logger, *this << " has started delivery with message handler "
-                            << this->handler.get());
-      throw AlreadyStartDeliveryException();
-    } else if (STARTING_DELIVERY == deliverystate) {
-      LOG4CXX_ERROR(logger, *this << " is starting delivery by other one now.");
-      throw StartingDeliveryException();
-    }
-    deliverystate = STARTING_DELIVERY;
-  }
-  try {
-    doStartDelivery(origHandler, origFilter);
-    // STARTING_DELIVERY => STARTED_DELIVERY
-    setDeliveryState(STARTED_DELIVERY);
-  } catch (const std::exception& e) {
-    // STARTING_DELIVERY => STOPPED_DELIVERY
-    setDeliveryState(STOPPED_DELIVERY); 
-    throw e;
-  }
-}
-
-void ActiveSubscriber::doStartDelivery(const MessageHandlerCallbackPtr& origHandler,
-                                       const ClientMessageFilterPtr& origFilter) {
-  MessageHandlerCallbackPtr handler;
-  // origHandler & origFilter has been passed validation. If origFilter is null,
-  // we start delivery w/o message filtering. or If the preferences is null, which
-  // means we connected to an old version hub server, also starts w/o message filtering
-  if (origFilter.get() && preferences.get()) {
-    origFilter->setSubscriptionPreferences(origData->getTopic(), origData->getSubscriberId(),
-                                           preferences);
-    handler = MessageHandlerCallbackPtr(new FilterableMessageHandler(origHandler, origFilter));
-  } else {
-    handler = origHandler;
-  }
-  {
-    boost::lock_guard<boost::shared_mutex> lock(queue_lock);
-
-    if (this->handler.get()) {
-      LOG4CXX_ERROR(logger, *this << " has started delivery with message handler "
-                            << this->handler.get());
-      throw AlreadyStartDeliveryException();
-    }
-
-    if (!handler.get()) {
-      // no message handler callback
-      LOG4CXX_WARN(logger, *this << " try to start an empty message handler");
-      return;
-    }
-
-    this->handler = handler;
-    // store the original filter and handler
-    this->origHandler = origHandler;
-    this->origFilter = origFilter;
-
-    while (!queue.empty()) {
-      PubSubResponsePtr m = queue.front();
-      queue.pop_front();
-
-      OperationCallbackPtr callback(new SubscriberConsumeCallback(channelManager, shared_from_this(), m));
-      this->handler->consume(origData->getTopic(), origData->getSubscriberId(), m->message(), callback);
-    }
-  }
-
-  LOG4CXX_INFO(logger, *this << " #startDelivery to receive messages from channel " << channel.get());
-}
-
-void ActiveSubscriber::stopDelivery() {
-  // if someone is starting delivery, we should not allow it to stop.
-  // otherwise we would break order gurantee. since queued message would be
-  // delivered to message handler when #startDelivery.
-  {
-    boost::lock_guard<boost::shared_mutex> lock(deliverystate_lock);
-    if (STARTING_DELIVERY == deliverystate) {
-      LOG4CXX_ERROR(logger, "someone is starting delivery for " << *this
-                            << ". we could not stop delivery now.");
-      throw StartingDeliveryException();      
-    }
-  }
-  LOG4CXX_INFO(logger, *this << " #stopDelivery to stop receiving messages from channel " << channel.get());
-  // actual stop delivery
-  doStopDelivery();
-  boost::lock_guard<boost::shared_mutex> lock(queue_lock);
-  this->handler = MessageHandlerCallbackPtr();
-  // marked the state to stopped
-  setDeliveryState(STOPPED_DELIVERY);
-}
-
-void ActiveSubscriber::doStopDelivery() {
-  // do nothing.
-}
-
-void ActiveSubscriber::consume(const MessageSeqId& messageSeqId) {
-  PubSubDataPtr data = PubSubData::forConsumeRequest(channelManager->nextTxnId(),
-                                                     origData->getSubscriberId(),
-                                                     origData->getTopic(), messageSeqId);
-
-  int retrywait = channelManager->getConfiguration()
-                  .getInt(Configuration::MESSAGE_CONSUME_RETRY_WAIT_TIME,
-                          DEFAULT_MESSAGE_CONSUME_RETRY_WAIT_TIME);
-  OperationCallbackPtr writecb(new ConsumeWriteCallback(shared_from_this(), data, retrywait));
-  channel->writeRequest(data->getRequest(), writecb);
-}
-
-void ActiveSubscriber::handoverDelivery() {
-  if (handler.get()) {
-    TopicSubscriber ts(origData->getTopic(), origData->getSubscriberId());
-    // handover the message handler to other active subscriber
-    channelManager->handoverDelivery(ts, origHandler, origFilter);
-  }
-}
-
-void ActiveSubscriber::processEvent(const std::string &topic, const std::string &subscriberId,
-                                    const SubscriptionEvent event) {
-  if (!isResubscribeRequired()) {
-    channelManager->getEventEmitter().emitSubscriptionEvent(topic, subscriberId, event);
-    return;
-  }
-  // resumbit the subscribe request
-  switch (event) {
-  case TOPIC_MOVED:
-  case SUBSCRIPTION_FORCED_CLOSED:
-    resubscribe();
-    break;
-  default:
-    LOG4CXX_ERROR(logger, "Received unknown subscription event " << event
-                          << " for (topic:" << topic << ", subscriber:" << subscriberId << ").");
-    break;
-  }
-}
-
-void ActiveSubscriber::resubscribe() {
-  if (should_wait) {
-    waitToResubscribe();
-    return;
-  }
-  should_wait = true;
-
-  origData->clearTriedServers();
-  origData->setCallback(ResponseCallbackPtr(new ResubscribeCallback(shared_from_this())));
-  DuplexChannelPtr origChannel = 
-    boost::dynamic_pointer_cast<DuplexChannel>(channel);
-  origData->setOrigChannelForResubscribe(origChannel);
-
-  // submit subscribe request again
-  channelManager->submitOp(origData);
-}
-
-void ActiveSubscriber::waitToResubscribe() {
-  int retrywait = channelManager->getConfiguration().getInt(Configuration::RECONNECT_SUBSCRIBE_RETRY_WAIT_TIME,
-                                                            DEFAULT_RECONNECT_SUBSCRIBE_RETRY_WAIT_TIME);
-  retryTimer = RetryTimerPtr(new boost::asio::deadline_timer(channel->getService(),
-                             boost::posix_time::milliseconds(retrywait)));
-  retryTimer->async_wait(boost::bind(&ActiveSubscriber::retryTimerComplete,
-                                     shared_from_this(), boost::asio::placeholders::error));
-}
-
-void ActiveSubscriber::retryTimerComplete(const boost::system::error_code& error) {
-  if (error) {
-    return;
-  }
-  should_wait = false;
-  // resubscribe again
-  resubscribe();
-}
-
-void ActiveSubscriber::close() {
-  // cancel reconnect timer
-  RetryTimerPtr timer = retryTimer;
-  if (timer.get()) {
-    boost::system::error_code ec;
-    timer->cancel(ec);
-    if (ec) {
-      LOG4CXX_WARN(logger,  *this << " cancel resubscribe task " << timer.get() << " error :"
-                            << ec.message().c_str());
-    }
-  }
-}
-
-SubscriberClientChannelHandler::SubscriberClientChannelHandler(
-  const DuplexChannelManagerPtr& channelManager, ResponseHandlerMap& handlers)
-  : HedwigClientChannelHandler(channelManager, handlers) {
-  LOG4CXX_DEBUG(logger, "Creating SubscriberClientChannelHandler " << this);
-}
-
-SubscriberClientChannelHandler::~SubscriberClientChannelHandler() {
-  LOG4CXX_DEBUG(logger, "Cleaning up SubscriberClientChannelHandler " << this);
-}
-
-void SubscriberClientChannelHandler::messageReceived(const DuplexChannelPtr& channel, const PubSubResponsePtr& m) {
-  if (m->has_message()) {
-    TopicSubscriber ts(m->topic(), m->subscriberid());
-    // dispatch the message to target topic subscriber.
-    deliverMessage(ts, m);
-    return;
-  }
-  if (m->has_responsebody()) {
-    const ResponseBody& respBody = m->responsebody();
-    if (respBody.has_subscriptionevent()) {
-      const SubscriptionEventResponse& eventResp =
-        respBody.subscriptionevent(); 
-      // dispatch the event
-      TopicSubscriber ts(m->topic(), m->subscriberid());
-      handleSubscriptionEvent(ts, eventResp.event());
-      return;
-    }
-  }
-  
-  HedwigClientChannelHandler::messageReceived(channel, m);
-}
-
-void SubscriberClientChannelHandler::doClose() {
-  // clean the handler status
-  closeHandler();
-
-  if (channel.get()) {
-    // need to ensure the channel is removed from allchannels list
-    // since it will be killed
-    channelManager->removeChannel(channel);
-    LOG4CXX_INFO(logger, "remove subscription channel " << channel.get() << ".");
-  }
-}
-
-SubscriberImpl::SubscriberImpl(const DuplexChannelManagerPtr& channelManager)
-  : channelManager(channelManager) {
-}
-
-SubscriberImpl::~SubscriberImpl() {
-  LOG4CXX_DEBUG(logger, "deleting subscriber" << this);
-}
-
-void SubscriberImpl::subscribe(const std::string& topic, const std::string& subscriberId, const SubscribeRequest::CreateOrAttach mode) {
-  SubscriptionOptions options;
-  options.set_createorattach(mode);
-  subscribe(topic, subscriberId, options);
-}
-
-void SubscriberImpl::subscribe(const std::string& topic, const std::string& subscriberId, const SubscriptionOptions& options) {
-  SyncOperationCallback* cb = new SyncOperationCallback(
-    channelManager->getConfiguration().getInt(Configuration::SYNC_REQUEST_TIMEOUT,
-                                              DEFAULT_SYNC_REQUEST_TIMEOUT));
-  OperationCallbackPtr callback(cb);
-  asyncSubscribe(topic, subscriberId, options, callback);
-  cb->wait();
-  
-  cb->throwExceptionIfNeeded();
-}
-
-void SubscriberImpl::asyncSubscribe(const std::string& topic, const std::string& subscriberId, const SubscribeRequest::CreateOrAttach mode, const OperationCallbackPtr& callback) {
-  SubscriptionOptions options;
-  options.set_createorattach(mode);
-  asyncSubscribe(topic, subscriberId, options, callback);
-}
-
-void SubscriberImpl::asyncSubscribe(const std::string& topic, const std::string& subscriberId, const SubscriptionOptions& options, const OperationCallbackPtr& callback) {
-  SubscriptionOptions options2 = options;
-
-  if (!options2.has_messagebound()) {
-    int messageBound = channelManager->getConfiguration()
-                       .getInt(Configuration::SUBSCRIPTION_MESSAGE_BOUND,
-                               DEFAULT_SUBSCRIPTION_MESSAGE_BOUND);
-    options2.set_messagebound(messageBound);
-  }
-
-  ResponseCallbackPtr respCallback(new ResponseCallbackAdaptor(callback));
-  PubSubDataPtr data = PubSubData::forSubscribeRequest(channelManager->nextTxnId(),
-                                                       subscriberId, topic,
-                                                       respCallback, options2);
-  channelManager->submitOp(data);
-}
-
-void SubscriberImpl::unsubscribe(const std::string& topic, const std::string& subscriberId) {
-  SyncOperationCallback* cb = new SyncOperationCallback(
-    channelManager->getConfiguration().getInt(Configuration::SYNC_REQUEST_TIMEOUT,
-                                              DEFAULT_SYNC_REQUEST_TIMEOUT));
-  OperationCallbackPtr callback(cb);
-  asyncUnsubscribe(topic, subscriberId, callback);
-  cb->wait();
-  
-  cb->throwExceptionIfNeeded();
-}
-
-void SubscriberImpl::asyncUnsubscribe(const std::string& topic, const std::string& subscriberId, const OperationCallbackPtr& callback) {
-  OperationCallbackPtr closeCb(new CloseSubscriptionForUnsubscribeCallback(channelManager, topic,
-                                                                           subscriberId, callback));
-  asyncCloseSubscription(topic, subscriberId, closeCb);
-}
-
-void SubscriberImpl::consume(const std::string& topic, const std::string& subscriberId,
-                             const MessageSeqId& messageSeqId) {
-  TopicSubscriber t(topic, subscriberId);
-
-  // Get the subscriber channel handler
-  SubscriberClientChannelHandlerPtr handler =
-    channelManager->getSubscriptionChannelHandler(t);
-
-  if (handler.get() == 0) {
-    LOG4CXX_ERROR(logger, "Cannot consume. No subscription channel handler found for topic ("
-                          << topic << ") subscriberId(" << subscriberId << ").");
-    return;
-  }
-
-  handler->consume(t, messageSeqId);
-}
-
-void SubscriberImpl::startDeliveryWithFilter(const std::string& topic,
-                                             const std::string& subscriberId,
-                                             const MessageHandlerCallbackPtr& callback,
-                                             const ClientMessageFilterPtr& filter) {
-  if (0 == filter.get()) {
-    throw NullMessageFilterException();
-  }
-  if (0 == callback.get()) {
-    throw NullMessageHandlerException();
-  }
-
-  TopicSubscriber t(topic, subscriberId);
-
-  // Get the subscriber channel handler
-  SubscriberClientChannelHandlerPtr handler =
-    channelManager->getSubscriptionChannelHandler(t);
-
-  if (handler.get() == 0) {
-    LOG4CXX_ERROR(logger, "Trying to start deliver on a non existant handler topic = "
-                          << topic << ", subscriber = " << subscriberId);
-    throw NotSubscribedException();
-  }
-
-  handler->startDelivery(t, callback, filter);
-}
-
-void SubscriberImpl::startDelivery(const std::string& topic, const std::string& subscriberId,
-                                   const MessageHandlerCallbackPtr& callback) {
-  TopicSubscriber t(topic, subscriberId);
-
-  // Get the subscriber channel handler
-  SubscriberClientChannelHandlerPtr handler =
-    channelManager->getSubscriptionChannelHandler(t);
-
-  if (handler.get() == 0) {
-    LOG4CXX_ERROR(logger, "Trying to start deliver on a non existant handler topic = "
-                          << topic << ", subscriber = " << subscriberId);
-    throw NotSubscribedException();
-  }
-  handler->startDelivery(t, callback, ClientMessageFilterPtr());
-}
-
-void SubscriberImpl::stopDelivery(const std::string& topic, const std::string& subscriberId) {
-  TopicSubscriber t(topic, subscriberId);
-
-  // Get the subscriber channel handler
-  SubscriberClientChannelHandlerPtr handler =
-    channelManager->getSubscriptionChannelHandler(t);
-
-  if (handler.get() == 0) {
-    LOG4CXX_ERROR(logger, "Trying to stop deliver on a non existant handler topic = "
-                          << topic << ", subscriber = " << subscriberId);
-    throw NotSubscribedException();
-  }
-  handler->stopDelivery(t);
-}
-
-bool SubscriberImpl::hasSubscription(const std::string& topic, const std::string& subscriberId) {
-  TopicSubscriber ts(topic, subscriberId);
-  // Get the subscriber channel handler
-  SubscriberClientChannelHandlerPtr handler =
-    channelManager->getSubscriptionChannelHandler(ts);
-  if (!handler.get()) {
-    return false;
-  }
-  return handler->hasSubscription(ts);
-}
-
-void SubscriberImpl::closeSubscription(const std::string& topic, const std::string& subscriberId) {
-  SyncOperationCallback* cb = new SyncOperationCallback(
-    channelManager->getConfiguration().getInt(Configuration::SYNC_REQUEST_TIMEOUT,
-                                              DEFAULT_SYNC_REQUEST_TIMEOUT));
-  OperationCallbackPtr callback(cb);
-  asyncCloseSubscription(topic, subscriberId, callback);
-  cb->wait();
-
-  cb->throwExceptionIfNeeded();
-}
-
-void SubscriberImpl::asyncCloseSubscription(const std::string& topic,
-                                            const std::string& subscriberId,
-                                            const OperationCallbackPtr& callback) {
-  LOG4CXX_INFO(logger, "closeSubscription (" << topic << ",  " << subscriberId << ")");
-
-  TopicSubscriber t(topic, subscriberId);
-  channelManager->asyncCloseSubscription(t, callback);
-}
-
-void SubscriberImpl::addSubscriptionListener(SubscriptionListenerPtr& listener) {
-  channelManager->getEventEmitter().addSubscriptionListener(listener);
-}
-
-void SubscriberImpl::removeSubscriptionListener(SubscriptionListenerPtr& listener) {
-  channelManager->getEventEmitter().removeSubscriptionListener(listener);
-}
-
-//
-// Unsubscribe Response Handler
-//
-UnsubscribeResponseHandler::UnsubscribeResponseHandler(const DuplexChannelManagerPtr& channelManager)
-  : ResponseHandler(channelManager) {}
-
-void UnsubscribeResponseHandler::handleResponse(const PubSubResponsePtr& m,
-                                                const PubSubDataPtr& txn,
-                                                const DuplexChannelPtr& channel) {
-  switch (m->statuscode()) {
-  case SUCCESS:
-    if (m->has_responsebody()) {
-      txn->getCallback()->operationComplete(m->responsebody());
-    } else {
-      txn->getCallback()->operationComplete(ResponseBody());
-    }
-    break;
-  case SERVICE_DOWN:
-    LOG4CXX_ERROR(logger, "Server responsed with SERVICE_DOWN for " << txn->getTxnId());
-    txn->getCallback()->operationFailed(ServiceDownException());
-    break;
-  case CLIENT_ALREADY_SUBSCRIBED:
-  case TOPIC_BUSY:
-    txn->getCallback()->operationFailed(AlreadySubscribedException());
-    break;
-  case CLIENT_NOT_SUBSCRIBED:
-    txn->getCallback()->operationFailed(NotSubscribedException());
-    break;
-  case NOT_RESPONSIBLE_FOR_TOPIC:
-    redirectRequest(m, txn, channel);
-    break;
-  default:
-    LOG4CXX_ERROR(logger, "Unexpected response " << m->statuscode() << " for " << txn->getTxnId());
-    txn->getCallback()->operationFailed(UnexpectedResponseException());
-    break;
-  }
-}
-
-//
-// CloseSubscription Response Handler
-//
-CloseSubscriptionResponseHandler::CloseSubscriptionResponseHandler(
-  const DuplexChannelManagerPtr& channelManager) : ResponseHandler(channelManager) {}
-
-void CloseSubscriptionResponseHandler::handleResponse(
-  const PubSubResponsePtr& m, const PubSubDataPtr& txn,
-  const DuplexChannelPtr& channel) {
-  switch (m->statuscode()) {
-  case SUCCESS:
-    if (m->has_responsebody()) {
-      txn->getCallback()->operationComplete(m->responsebody());
-    } else {
-      txn->getCallback()->operationComplete(ResponseBody());
-    }
-    break;
-  case SERVICE_DOWN:
-    LOG4CXX_ERROR(logger, "Server responsed with SERVICE_DOWN for " << txn->getTxnId());
-    txn->getCallback()->operationFailed(ServiceDownException());
-    break;
-  case CLIENT_ALREADY_SUBSCRIBED:
-  case TOPIC_BUSY:
-    txn->getCallback()->operationFailed(AlreadySubscribedException());
-    break;
-  case CLIENT_NOT_SUBSCRIBED:
-    txn->getCallback()->operationFailed(NotSubscribedException());
-    break;
-  case NOT_RESPONSIBLE_FOR_TOPIC:
-    redirectRequest(m, txn, channel);
-    break;
-  default:
-    LOG4CXX_ERROR(logger, "Unexpected response " << m->statuscode() << " for " << txn->getTxnId());
-    txn->getCallback()->operationFailed(UnexpectedResponseException());
-    break;
-  }
-}
-
-std::ostream& Hedwig::operator<<(std::ostream& os, const ActiveSubscriber& subscriber) {
-  os << "ActiveSubscriber(" << &subscriber << ", topic:" << subscriber.getTopic()
-     << ", subscriber:" << subscriber.getSubscriberId() << ")";
-  return os;
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/cpp/lib/subscriberimpl.h
----------------------------------------------------------------------
diff --git a/hedwig-client/src/main/cpp/lib/subscriberimpl.h b/hedwig-client/src/main/cpp/lib/subscriberimpl.h
deleted file mode 100644
index 0cdf5f1..0000000
--- a/hedwig-client/src/main/cpp/lib/subscriberimpl.h
+++ /dev/null
@@ -1,338 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifndef SUBSCRIBE_IMPL_H
-#define SUBSCRIBE_IMPL_H
-
-#include <hedwig/subscribe.h>
-#include <hedwig/callback.h>
-#include "clientimpl.h"
-#include <utility>
-
-#ifdef USE_BOOST_TR1
-#include <boost/tr1/memory.hpp>
-#else
-#include <tr1/memory>
-#endif
-
-#include <deque>
-#include <iostream>
-
-#include <boost/shared_ptr.hpp>
-#include <boost/enable_shared_from_this.hpp>
-#include <boost/thread/shared_mutex.hpp>
-
-namespace Hedwig {
-
-  class ActiveSubscriber;
-  typedef boost::shared_ptr<ActiveSubscriber> ActiveSubscriberPtr;
-
-  class ConsumeWriteCallback : public OperationCallback {
-  public:
-    ConsumeWriteCallback(const ActiveSubscriberPtr& activeSubscriber,
-                         const PubSubDataPtr& data,
-                         int retrywait);
-    virtual ~ConsumeWriteCallback();
-
-    void operationComplete();
-    void operationFailed(const std::exception& exception);
-
-    static void timerComplete(const ActiveSubscriberPtr& activeSubscriber,
-                              const PubSubDataPtr& data,
-                              const boost::system::error_code& error);
-  private:
-    const ActiveSubscriberPtr activeSubscriber;
-    const PubSubDataPtr data;
-    int retrywait;
-  };
-
-  class SubscriberClientChannelHandler;
-  typedef boost::shared_ptr<SubscriberClientChannelHandler> SubscriberClientChannelHandlerPtr;
-
-  class SubscriberConsumeCallback : public OperationCallback {
-  public:
-    SubscriberConsumeCallback(const DuplexChannelManagerPtr& channelManager,
-                              const ActiveSubscriberPtr& activeSubscriber,
-                              const PubSubResponsePtr& m);
-    void operationComplete();
-    void operationFailed(const std::exception& exception);
-    static void timerComplete(const ActiveSubscriberPtr activeSubscriber,
-                              const PubSubResponsePtr m,
-                              const boost::system::error_code& error);
-  private:
-    const DuplexChannelManagerPtr channelManager;
-    const ActiveSubscriberPtr activeSubscriber;
-    const PubSubResponsePtr m;
-  };
-
-  class CloseSubscriptionForUnsubscribeCallback : public OperationCallback {
-  public:
-    CloseSubscriptionForUnsubscribeCallback(const DuplexChannelManagerPtr& channelManager,
-                                            const std::string& topic,
-                                            const std::string& subscriberId,
-                                            const OperationCallbackPtr& unsubCb);
-    virtual void operationComplete();
-    virtual void operationFailed(const std::exception& exception);
-  private:
-    const DuplexChannelManagerPtr channelManager;
-    const std::string topic;
-    const std::string subscriberId;
-    const OperationCallbackPtr unsubCb;
-  };
-
-  // A instance handle all actions belongs to a subscription
-  class ActiveSubscriber : public boost::enable_shared_from_this<ActiveSubscriber> {
-  public:
-    ActiveSubscriber(const PubSubDataPtr& data,
-                     const AbstractDuplexChannelPtr& channel,
-                     const SubscriptionPreferencesPtr& preferences,
-                     const DuplexChannelManagerPtr& channelManager);
-    virtual ~ActiveSubscriber() {}
-
-    // Get the topic
-    const std::string& getTopic() const;
-
-    // Get the subscriber id
-    const std::string& getSubscriberId() const;
-
-    inline MessageHandlerCallbackPtr getMessageHandler() const {
-      return handler;
-    }
-
-    inline const AbstractDuplexChannelPtr& getChannel() const {
-      return channel;
-    }
-
-    // Deliver a received message
-    void deliverMessage(const PubSubResponsePtr& m);
-
-    //
-    // Start Delivery. If filter is null, just start delivery w/o filter 
-    // otherwise start delivery with the given filter.
-    // 
-    void startDelivery(const MessageHandlerCallbackPtr& handler,
-                       const ClientMessageFilterPtr& filter);
-
-    // Stop Delivery
-    virtual void stopDelivery();
-
-    // Consume message
-    void consume(const MessageSeqId& messageSeqId);
-
-    // Process Event received from subscription channel
-    void processEvent(const std::string &topic, const std::string &subscriberId,
-                      const SubscriptionEvent event);
-
-    // handover message delivery to other subscriber
-    void handoverDelivery();
-
-    // Is resubscribe required
-    inline bool isResubscribeRequired() {
-      return origData->getSubscriptionOptions().enableresubscribe();
-    }
-
-    // Resubscribe the subscriber
-    void resubscribe();
-
-    // Close the ActiveSubscriber
-    void close();
-
-    friend std::ostream& operator<<(std::ostream& os, const ActiveSubscriber& subscriber);
-  protected:
-    // Wait to resubscribe
-    void waitToResubscribe();
-
-    void retryTimerComplete(const boost::system::error_code& error);
-
-    // Start Delivery with a message filter
-    virtual void doStartDelivery(const MessageHandlerCallbackPtr& handler,
-                                 const ClientMessageFilterPtr& filter);
-
-    // Stop Delivery
-    virtual void doStopDelivery();
-
-    // Queue message when message handler is not ready
-    virtual void queueMessage(const PubSubResponsePtr& m);
-
-    AbstractDuplexChannelPtr channel;
-
-    boost::shared_mutex queue_lock;
-    std::deque<PubSubResponsePtr> queue;
-
-  private:
-    enum DeliveryState {
-      STARTING_DELIVERY,
-      STARTED_DELIVERY,
-      STOPPED_DELIVERY,
-    };
-
-    inline void setDeliveryState(DeliveryState state) {
-      {
-        boost::lock_guard<boost::shared_mutex> lock(deliverystate_lock);
-        deliverystate = state;
-      }
-    }
-
-    boost::shared_mutex deliverystate_lock;
-    DeliveryState deliverystate;
-
-    // Keep original handler and filter to handover when resubscribed
-    MessageHandlerCallbackPtr origHandler;
-    ClientMessageFilterPtr origFilter;
-
-    MessageHandlerCallbackPtr handler;
-
-    const PubSubDataPtr origData;
-    const SubscriptionPreferencesPtr preferences;
-
-    DuplexChannelManagerPtr channelManager;
-
-    // variables used for resubscribe
-    bool should_wait;
-    typedef boost::shared_ptr<boost::asio::deadline_timer> RetryTimerPtr;
-    RetryTimerPtr retryTimer;
-  };
-
-  class ResubscribeCallback : public ResponseCallback {
-  public:
-    explicit ResubscribeCallback(const ActiveSubscriberPtr& activeSubscriber);
-
-    virtual void operationComplete(const ResponseBody & resp);
-    virtual void operationFailed(const std::exception& exception);
-
-  private:
-    const ActiveSubscriberPtr activeSubscriber;
-  };
-
-  class SubscriberClientChannelHandler : public HedwigClientChannelHandler,
-      public boost::enable_shared_from_this<SubscriberClientChannelHandler> {
-  public:
-    SubscriberClientChannelHandler(const DuplexChannelManagerPtr& channelManager,
-                                   ResponseHandlerMap& handlers);
-    virtual ~SubscriberClientChannelHandler();
-
-    virtual void handleSubscriptionEvent(const TopicSubscriber& ts,
-                                         const SubscriptionEvent event) = 0;
-
-    // Deliver a received message to given message handler
-    virtual void deliverMessage(const TopicSubscriber& ts,
-                                const PubSubResponsePtr& m) = 0;
-
-    //
-    // Start Delivery for a given topic subscriber. If the filter is null,
-    // start delivery w/o filtering; otherwise start delivery with the
-    // given message filter.
-    //
-    virtual void startDelivery(const TopicSubscriber& ts,
-                               const MessageHandlerCallbackPtr& handler,
-                               const ClientMessageFilterPtr& filter) = 0;
-
-    // Stop Delivery for a given topic subscriber
-    virtual void stopDelivery(const TopicSubscriber& ts) = 0;
-
-    // Has Subscription on the Channel
-    virtual bool hasSubscription(const TopicSubscriber& ts) = 0;
-
-    // Close Subscription for a given topic subscriber
-    virtual void asyncCloseSubscription(const TopicSubscriber& ts,
-                                        const OperationCallbackPtr& callback) = 0;
-
-    // Consume message for a given topic subscriber
-    virtual void consume(const TopicSubscriber& ts,
-                         const MessageSeqId& messageSeqId) = 0;
-
-    // Message received from the underlying channel
-    virtual void messageReceived(const DuplexChannelPtr& channel, const PubSubResponsePtr& m);
-
-    // Bind the underlying channel to the subscription channel handler
-    inline void setChannel(const AbstractDuplexChannelPtr& channel) {
-      this->channel = channel;
-    }
-
-    // Return the underlying channel
-    inline const AbstractDuplexChannelPtr& getChannel() const {
-      return channel;
-    }
-
-  protected:
-    // close logic for subscription channel handler
-    virtual void doClose();
-
-    // Clean the handler status
-    virtual void closeHandler() = 0;
-
-    AbstractDuplexChannelPtr channel;
-  };
-
-  class SubscriberImpl : public Subscriber {
-  public:
-    SubscriberImpl(const DuplexChannelManagerPtr& channelManager);
-    ~SubscriberImpl();
-
-    void subscribe(const std::string& topic, const std::string& subscriberId, const SubscribeRequest::CreateOrAttach mode);
-    void asyncSubscribe(const std::string& topic, const std::string& subscriberId, const SubscribeRequest::CreateOrAttach mode, const OperationCallbackPtr& callback);
-    void subscribe(const std::string& topic, const std::string& subscriberId, const SubscriptionOptions& options);
-    void asyncSubscribe(const std::string& topic, const std::string& subscriberId, const SubscriptionOptions& options, const OperationCallbackPtr& callback);
-    
-    void unsubscribe(const std::string& topic, const std::string& subscriberId);
-    void asyncUnsubscribe(const std::string& topic, const std::string& subscriberId, const OperationCallbackPtr& callback);
-
-    void consume(const std::string& topic, const std::string& subscriberId, const MessageSeqId& messageSeqId);
-
-    void startDelivery(const std::string& topic, const std::string& subscriberId,
-                       const MessageHandlerCallbackPtr& callback);
-    void startDeliveryWithFilter(const std::string& topic, const std::string& subscriberId,
-                                 const MessageHandlerCallbackPtr& callback,
-                                 const ClientMessageFilterPtr& filter);
-    void stopDelivery(const std::string& topic, const std::string& subscriberId);
-
-    bool hasSubscription(const std::string& topic, const std::string& subscriberId);
-    void closeSubscription(const std::string& topic, const std::string& subscriberId);
-    void asyncCloseSubscription(const std::string& topic, const std::string& subscriberId,
-                                const OperationCallbackPtr& callback);
-
-    virtual void addSubscriptionListener(SubscriptionListenerPtr& listener);
-    virtual void removeSubscriptionListener(SubscriptionListenerPtr& listener);
-
-  private:
-    const DuplexChannelManagerPtr channelManager;
-  };
-
-  // Unsubscribe Response Handler
-
-  class UnsubscribeResponseHandler : public ResponseHandler {
-  public:
-    explicit UnsubscribeResponseHandler(const DuplexChannelManagerPtr& channelManager);
-    virtual ~UnsubscribeResponseHandler() {}
-
-    virtual void handleResponse(const PubSubResponsePtr& m, const PubSubDataPtr& txn,
-                                const DuplexChannelPtr& channel);
-  };
-
-  // CloseSubscription Response Handler 
-
-  class CloseSubscriptionResponseHandler : public ResponseHandler {
-  public:
-    explicit CloseSubscriptionResponseHandler(const DuplexChannelManagerPtr& channelManager);
-    virtual ~CloseSubscriptionResponseHandler() {}
-
-    virtual void handleResponse(const PubSubResponsePtr& m, const PubSubDataPtr& txn,
-                                const DuplexChannelPtr& channel);
-  };
-};
-
-#endif

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/cpp/lib/util.cpp
----------------------------------------------------------------------
diff --git a/hedwig-client/src/main/cpp/lib/util.cpp b/hedwig-client/src/main/cpp/lib/util.cpp
deleted file mode 100644
index b5a7cc0..0000000
--- a/hedwig-client/src/main/cpp/lib/util.cpp
+++ /dev/null
@@ -1,183 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifdef HAVE_CONFIG_H
-#include <config.h>
-#endif
-
-#include <string>
-
-#include <netdb.h>
-#include <errno.h>
-#include "util.h"
-#include "channel.h"
-#include <log4cxx/logger.h>
-#include <sys/types.h>
-#include <sys/socket.h>
-
-static log4cxx::LoggerPtr logger(log4cxx::Logger::getLogger("hedwig."__FILE__));
-
-using namespace Hedwig;
-
-#define MAX_HOSTNAME_LENGTH 256
-const std::string UNITIALISED_HOST("UNINITIALISED HOST");
-
-const int DEFAULT_PORT = 4080;
-const int DEFAULT_SSL_PORT = 9876;
-
-HostAddress::HostAddress() : initialised(false), address_str(), ssl_host_port(0) {
-}
-
-HostAddress::~HostAddress() {
-}
-
-bool HostAddress::isNullHost() const {
-  return !initialised;
-}
-
-bool HostAddress::operator==(const HostAddress& other) const {
-  return (other.ip() == ip() && other.port() == port());
-}
-
-const std::string& HostAddress::getAddressString() const {
-  if (!isNullHost()) {
-    return address_str;
-  } else {
-    return UNITIALISED_HOST;
-  }
-}
-   
-uint32_t HostAddress::ip() const {
-  return host_ip;
-}
-
-void HostAddress::updateIP(uint32_t ip) {
-  this->host_ip = ip;
-}
-
-uint16_t HostAddress::port() const {
-  return host_port;
-}
-
-uint16_t HostAddress::sslPort() const {
-  return ssl_host_port;
-}
-
-void HostAddress::parse_string() {
-  char* url = strdup(address_str.c_str());
-
-  LOG4CXX_DEBUG(logger, "Parse address : " << url);
-
-  if (url == NULL) {
-    LOG4CXX_ERROR(logger, "You seems to be out of memory");
-    throw OomException();
-  }
-  int port = DEFAULT_PORT;
-  int sslport = DEFAULT_SSL_PORT;
-
-  char *colon = strchr(url, ':');
-  if (colon) {
-    *colon = 0;
-    colon++;
-    
-    char* sslcolon = strchr(colon, ':');
-    if (sslcolon) {
-      *sslcolon = 0;
-      sslcolon++;
-      
-      sslport = strtol(sslcolon, NULL, 10);
-      if (sslport == 0) {
-        LOG4CXX_ERROR(logger, "Invalid SSL port given: [" << sslcolon << "]");
-	free((void*)url);
-	throw InvalidPortException();
-      }
-    }
-    
-    port = strtol(colon, NULL, 10);
-    if (port == 0) {
-      LOG4CXX_ERROR(logger, "Invalid port given: [" << colon << "]");
-      free((void*)url);
-      throw InvalidPortException();
-    }
-  }
-
-  int err = 0;
-  
-  struct addrinfo *addr;
-  struct addrinfo hints;
-
-  memset(&hints, 0, sizeof(struct addrinfo));
-  hints.ai_family = AF_INET;
-
-  err = getaddrinfo(url, NULL, &hints, &addr);
-  if (err != 0) {
-    LOG4CXX_ERROR(logger, "Couldn't resolve host [" << url << "]:" << hstrerror(err));
-    free((void*)url);
-    throw HostResolutionException();
-  }
-
-  sockaddr_in* sa_ptr = (sockaddr_in*)addr->ai_addr;
-
-  struct sockaddr_in socket_addr;
-  memset(&socket_addr, 0, sizeof(struct sockaddr_in));
-  socket_addr = *sa_ptr;
-  socket_addr.sin_port = htons(port); 
-  //socket_addr.sin_family = AF_INET;
-
-  host_ip = ntohl(socket_addr.sin_addr.s_addr);
-  host_port = ntohs(socket_addr.sin_port);
-  ssl_host_port = sslport;
-
-  freeaddrinfo(addr);
-  free((void*)url);
-}
-
-HostAddress HostAddress::fromString(std::string str) {
-  HostAddress h;
-  h.address_str = str;
-  h.parse_string();
-  h.initialised = true;
-  return h;
-}
-
-ResponseCallbackAdaptor::ResponseCallbackAdaptor(const OperationCallbackPtr& opCallbackPtr)
-  : opCallbackPtr(opCallbackPtr) {
-}
-
-void ResponseCallbackAdaptor::operationComplete(const ResponseBody& response) {
-  opCallbackPtr->operationComplete();
-}
-
-void ResponseCallbackAdaptor::operationFailed(const std::exception& exception) {
-  opCallbackPtr->operationFailed(exception);
-}
-
-// Help Function
-std::ostream& Hedwig::operator<<(std::ostream& os, const HostAddress& host) {
-  if (host.isNullHost()) {
-    os << "(host:null)";
-  } else {
-    os << "(host:" << host.getAddressString() << ", ip=" << host.ip() << ", port="
-       << host.port() << ", ssl_port=" << host.sslPort() << ")";
-  }
-  return os;
-}
-
-std::ostream& std::operator<<(std::ostream& os, const TopicSubscriber& ts) {
-  os << "(topic:" << ts.first << ", subscriber:" << ts.second << ")";
-  return os;
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/cpp/lib/util.h
----------------------------------------------------------------------
diff --git a/hedwig-client/src/main/cpp/lib/util.h b/hedwig-client/src/main/cpp/lib/util.h
deleted file mode 100644
index a7741e2..0000000
--- a/hedwig-client/src/main/cpp/lib/util.h
+++ /dev/null
@@ -1,128 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#ifndef HEDWIG_UTIL_H
-#define HEDWIG_UTIL_H
-
-#include <sys/socket.h>
-#include <netinet/in.h>
-#include <hedwig/exceptions.h>
-#include <hedwig/callback.h>
-#include <list>
-#include <iostream>
-#include <utility>
-
-#ifdef USE_BOOST_TR1
-#include <boost/tr1/functional.hpp>
-#else
-#include <tr1/functional>
-#endif
-
-#include <semaphore.h>
-#include <pthread.h>
-
-namespace Hedwig {
-  typedef std::pair<const std::string, const std::string> TopicSubscriber;
-
-  /**
-     Representation of a hosts address
-  */
-  class HostAddress {
-  public:
-    HostAddress();
-    ~HostAddress();
-
-    bool operator==(const HostAddress& other) const;
-    
-    bool isNullHost() const;
-    const std::string& getAddressString() const;
-    uint32_t ip() const;
-    uint16_t port() const;
-    uint16_t sslPort() const;
-
-    // the real ip address is different from default server
-    // if default server is a VIP
-    void updateIP(uint32_t ip);
-
-    static HostAddress fromString(std::string host);
-
-    friend std::ostream& operator<<(std::ostream& os, const HostAddress& host);
-  private:
-
-    void parse_string();
-    
-    bool initialised;
-    std::string address_str;
-    uint32_t host_ip;
-    uint16_t host_port;
-    uint16_t ssl_host_port;
-  };
-
-  /**
-   * An adaptor for OperationCallback
-   */
-  class ResponseCallbackAdaptor : public Callback<ResponseBody> {
-  public:
-    ResponseCallbackAdaptor(const OperationCallbackPtr& opCallbackPtr);
-
-    virtual void operationComplete(const ResponseBody& response);
-    virtual void operationFailed(const std::exception& exception);
-  private:
-    OperationCallbackPtr opCallbackPtr;
-  };
-
-  /**
-     Hash a host address. Takes the least significant 16-bits of the address and the 16-bits of the
-     port and packs them into one 32-bit number. While collisons are theoretically very possible, they
-     shouldn't happen as the hedwig servers should be in the same subnet.
-  */
-  struct HostAddressHash : public std::unary_function<Hedwig::HostAddress, size_t> {
-    size_t operator()(const Hedwig::HostAddress& address) const {
-        return (address.ip() << 16) & (address.port());
-    }
-  };
-
-
-  /**
-     Hash a channel pointer, just returns the pointer.
-  */
-  struct TopicSubscriberHash : public std::unary_function<Hedwig::TopicSubscriber, size_t> {
-    size_t operator()(const Hedwig::TopicSubscriber& topicsub) const {
-      std::string fullstr = topicsub.first + topicsub.second;
-      return std::tr1::hash<std::string>()(fullstr);
-    }
-  };
-
-  /**
-   * Operation Type Hash
-   */
-  struct OperationTypeHash : public std::unary_function<Hedwig::OperationType, size_t> {
-    size_t operator()(const Hedwig::OperationType& type) const {
-      return type;
-    }
-  };
-};
-
-// Since TopicSubscriber is an typedef of std::pair. so log4cxx would lookup 'operator<<'
-// in std namespace.
-namespace std {
-  // Help Function to print topicSubscriber
-  std::ostream& operator<<(std::ostream& os, const Hedwig::TopicSubscriber& ts);
-};
-
-#endif

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/cpp/m4/ax_boost_asio.m4
----------------------------------------------------------------------
diff --git a/hedwig-client/src/main/cpp/m4/ax_boost_asio.m4 b/hedwig-client/src/main/cpp/m4/ax_boost_asio.m4
deleted file mode 100644
index 8cc4666..0000000
--- a/hedwig-client/src/main/cpp/m4/ax_boost_asio.m4
+++ /dev/null
@@ -1,111 +0,0 @@
-# ===========================================================================
-#       http://www.gnu.org/software/autoconf-archive/ax_boost_asio.html
-# ===========================================================================
-#
-# SYNOPSIS
-#
-#   AX_BOOST_ASIO
-#
-# DESCRIPTION
-#
-#   Test for Asio library from the Boost C++ libraries. The macro requires a
-#   preceding call to AX_BOOST_BASE. Further documentation is available at
-#   <http://randspringer.de/boost/index.html>.
-#
-#   This macro calls:
-#
-#     AC_SUBST(BOOST_ASIO_LIB)
-#
-#   And sets:
-#
-#     HAVE_BOOST_ASIO
-#
-# LICENSE
-#
-#   Copyright (c) 2008 Thomas Porschberg <thomas@randspringer.de>
-#   Copyright (c) 2008 Pete Greenwell <pete@mu.org>
-#
-#   Copying and distribution of this file, with or without modification, are
-#   permitted in any medium without royalty provided the copyright notice
-#   and this notice are preserved. This file is offered as-is, without any
-#   warranty.
-
-#serial 9
-
-AC_DEFUN([AX_BOOST_ASIO],
-[
-	AC_ARG_WITH([boost-asio],
-	AS_HELP_STRING([--with-boost-asio@<:@=special-lib@:>@],
-                   [use the ASIO library from boost - it is possible to specify a certain library for the linker
-                        e.g. --with-boost-asio=boost_system-gcc41-mt-1_34 ]),
-        [
-        if test "$withval" = "no"; then
-			want_boost="no"
-        elif test "$withval" = "yes"; then
-            want_boost="yes"
-            ax_boost_user_asio_lib=""
-        else
-		    want_boost="yes"
-        	ax_boost_user_asio_lib="$withval"
-		fi
-        ],
-        [want_boost="yes"]
-	)
-
-	if test "x$want_boost" = "xyes"; then
-        AC_REQUIRE([AC_PROG_CC])
-		CPPFLAGS_SAVED="$CPPFLAGS"
-		CPPFLAGS="$CPPFLAGS $BOOST_CPPFLAGS"
-		export CPPFLAGS
-
-		LDFLAGS_SAVED="$LDFLAGS"
-		LDFLAGS="$LDFLAGS $BOOST_LDFLAGS"
-		export LDFLAGS
-
-        AC_CACHE_CHECK(whether the Boost::ASIO library is available,
-					   ax_cv_boost_asio,
-        [AC_LANG_PUSH([C++])
-		 AC_COMPILE_IFELSE(AC_LANG_PROGRAM([[ @%:@include <boost/asio.hpp>
-											]],
-                                  [[
-
-                                    boost::asio::io_service io;
-                                    boost::system::error_code timer_result;
-                                    boost::asio::deadline_timer t(io);
-                                    t.cancel();
-                                    io.run_one();
-									return 0;
-                                   ]]),
-                             ax_cv_boost_asio=yes, ax_cv_boost_asio=no)
-         AC_LANG_POP([C++])
-		])
-		if test "x$ax_cv_boost_asio" = "xyes"; then
-			AC_DEFINE(HAVE_BOOST_ASIO,,[define if the Boost::ASIO library is available])
-			BN=boost_system
-            if test "x$ax_boost_user_asio_lib" = "x"; then
-				for ax_lib in $BN $BN-$CC $BN-$CC-mt $BN-$CC-mt-s $BN-$CC-s \
-                              lib$BN lib$BN-$CC lib$BN-$CC-mt lib$BN-$CC-mt-s lib$BN-$CC-s \
-                              $BN-mgw $BN-mgw $BN-mgw-mt $BN-mgw-mt-s $BN-mgw-s ; do
-				    AC_CHECK_LIB($ax_lib, main, [BOOST_ASIO_LIB="-l$ax_lib" AC_SUBST(BOOST_ASIO_LIB) link_thread="yes" break],
-                                 [link_thread="no"])
-  				done
-            else
-               for ax_lib in $ax_boost_user_asio_lib $BN-$ax_boost_user_asio_lib; do
-				      AC_CHECK_LIB($ax_lib, main,
-                                   [BOOST_ASIO_LIB="-l$ax_lib" AC_SUBST(BOOST_ASIO_LIB) link_asio="yes" break],
-                                   [link_asio="no"])
-                  done
-
-            fi
-            if test "x$ax_lib" = "x"; then
-                AC_MSG_ERROR(Could not find a version of the library!)
-            fi
-			if test "x$link_asio" = "xno"; then
-				AC_MSG_ERROR(Could not link against $ax_lib !)
-			fi
-		fi
-
-		CPPFLAGS="$CPPFLAGS_SAVED"
-    	LDFLAGS="$LDFLAGS_SAVED"
-	fi
-])

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/cpp/m4/ax_boost_base.m4
----------------------------------------------------------------------
diff --git a/hedwig-client/src/main/cpp/m4/ax_boost_base.m4 b/hedwig-client/src/main/cpp/m4/ax_boost_base.m4
deleted file mode 100644
index 8f935f6..0000000
--- a/hedwig-client/src/main/cpp/m4/ax_boost_base.m4
+++ /dev/null
@@ -1,252 +0,0 @@
-# ===========================================================================
-#       http://www.gnu.org/software/autoconf-archive/ax_boost_base.html
-# ===========================================================================
-#
-# SYNOPSIS
-#
-#   AX_BOOST_BASE([MINIMUM-VERSION], [ACTION-IF-FOUND], [ACTION-IF-NOT-FOUND])
-#
-# DESCRIPTION
-#
-#   Test for the Boost C++ libraries of a particular version (or newer)
-#
-#   If no path to the installed boost library is given the macro searchs
-#   under /usr, /usr/local, /opt and /opt/local and evaluates the
-#   $BOOST_ROOT environment variable. Further documentation is available at
-#   <http://randspringer.de/boost/index.html>.
-#
-#   This macro calls:
-#
-#     AC_SUBST(BOOST_CPPFLAGS) / AC_SUBST(BOOST_LDFLAGS)
-#
-#   And sets:
-#
-#     HAVE_BOOST
-#
-# LICENSE
-#
-#   Copyright (c) 2008 Thomas Porschberg <thomas@randspringer.de>
-#   Copyright (c) 2009 Peter Adolphs
-#
-#   Copying and distribution of this file, with or without modification, are
-#   permitted in any medium without royalty provided the copyright notice
-#   and this notice are preserved. This file is offered as-is, without any
-#   warranty.
-
-#serial 17
-
-AC_DEFUN([AX_BOOST_BASE],
-[
-AC_ARG_WITH([boost],
-  [AS_HELP_STRING([--with-boost@<:@=ARG@:>@],
-    [use Boost library from a standard location (ARG=yes),
-     from the specified location (ARG=<path>),
-     or disable it (ARG=no)
-     @<:@ARG=yes@:>@ ])],
-    [
-    if test "$withval" = "no"; then
-        want_boost="no"
-    elif test "$withval" = "yes"; then
-        want_boost="yes"
-        ac_boost_path=""
-    else
-        want_boost="yes"
-        ac_boost_path="$withval"
-    fi
-    ],
-    [want_boost="yes"])
-
-
-AC_ARG_WITH([boost-libdir],
-        AS_HELP_STRING([--with-boost-libdir=LIB_DIR],
-        [Force given directory for boost libraries. Note that this will overwrite library path detection, so use this parameter only if default library detection fails and you know exactly where your boost libraries are located.]),
-        [
-        if test -d "$withval"
-        then
-                ac_boost_lib_path="$withval"
-        else
-                AC_MSG_ERROR(--with-boost-libdir expected directory name)
-        fi
-        ],
-        [ac_boost_lib_path=""]
-)
-
-if test "x$want_boost" = "xyes"; then
-    boost_lib_version_req=ifelse([$1], ,1.20.0,$1)
-    boost_lib_version_req_shorten=`expr $boost_lib_version_req : '\([[0-9]]*\.[[0-9]]*\)'`
-    boost_lib_version_req_major=`expr $boost_lib_version_req : '\([[0-9]]*\)'`
-    boost_lib_version_req_minor=`expr $boost_lib_version_req : '[[0-9]]*\.\([[0-9]]*\)'`
-    boost_lib_version_req_sub_minor=`expr $boost_lib_version_req : '[[0-9]]*\.[[0-9]]*\.\([[0-9]]*\)'`
-    if test "x$boost_lib_version_req_sub_minor" = "x" ; then
-        boost_lib_version_req_sub_minor="0"
-        fi
-    WANT_BOOST_VERSION=`expr $boost_lib_version_req_major \* 100000 \+  $boost_lib_version_req_minor \* 100 \+ $boost_lib_version_req_sub_minor`
-    AC_MSG_CHECKING(for boostlib >= $boost_lib_version_req)
-    succeeded=no
-
-    dnl On x86_64 systems check for system libraries in both lib64 and lib.
-    dnl The former is specified by FHS, but e.g. Debian does not adhere to
-    dnl this (as it rises problems for generic multi-arch support).
-    dnl The last entry in the list is chosen by default when no libraries
-    dnl are found, e.g. when only header-only libraries are installed!
-    libsubdirs="lib"
-    if test `uname -m` = x86_64; then
-        libsubdirs="lib64 lib lib64"
-    fi
-
-    dnl first we check the system location for boost libraries
-    dnl this location ist chosen if boost libraries are installed with the --layout=system option
-    dnl or if you install boost with RPM
-    if test "$ac_boost_path" != ""; then
-        BOOST_LDFLAGS="-L$ac_boost_path/$libsubdir"
-        BOOST_CPPFLAGS="-I$ac_boost_path/include"
-    elif test "$cross_compiling" != yes; then
-        for ac_boost_path_tmp in /usr /usr/local /opt /opt/local ; do
-            if test -d "$ac_boost_path_tmp/include/boost" && test -r "$ac_boost_path_tmp/include/boost"; then
-                for libsubdir in $libsubdirs ; do
-                    if ls "$ac_boost_path_tmp/$libsubdir/libboost_"* >/dev/null 2>&1 ; then break; fi
-                done
-                BOOST_LDFLAGS="-L$ac_boost_path_tmp/$libsubdir"
-                BOOST_CPPFLAGS="-I$ac_boost_path_tmp/include"
-                break;
-            fi
-        done
-    fi
-
-    dnl overwrite ld flags if we have required special directory with
-    dnl --with-boost-libdir parameter
-    if test "$ac_boost_lib_path" != ""; then
-       BOOST_LDFLAGS="-L$ac_boost_lib_path"
-    fi
-
-    CPPFLAGS_SAVED="$CPPFLAGS"
-    CPPFLAGS="$CPPFLAGS $BOOST_CPPFLAGS"
-    export CPPFLAGS
-
-    LDFLAGS_SAVED="$LDFLAGS"
-    LDFLAGS="$LDFLAGS $BOOST_LDFLAGS"
-    export LDFLAGS
-
-    AC_REQUIRE([AC_PROG_CXX])
-    AC_LANG_PUSH(C++)
-        AC_COMPILE_IFELSE([AC_LANG_PROGRAM([[
-    @%:@include <boost/version.hpp>
-    ]], [[
-    #if BOOST_VERSION >= $WANT_BOOST_VERSION
-    // Everything is okay
-    #else
-    #  error Boost version is too old
-    #endif
-    ]])],[
-        AC_MSG_RESULT(yes)
-    succeeded=yes
-    found_system=yes
-        ],[
-        ])
-    AC_LANG_POP([C++])
-
-
-
-    dnl if we found no boost with system layout we search for boost libraries
-    dnl built and installed without the --layout=system option or for a staged(not installed) version
-    if test "x$succeeded" != "xyes"; then
-        _version=0
-        if test "$ac_boost_path" != ""; then
-            if test -d "$ac_boost_path" && test -r "$ac_boost_path"; then
-                for i in `ls -d $ac_boost_path/include/boost-* 2>/dev/null`; do
-                    _version_tmp=`echo $i | sed "s#$ac_boost_path##" | sed 's/\/include\/boost-//' | sed 's/_/./'`
-                    V_CHECK=`expr $_version_tmp \> $_version`
-                    if test "$V_CHECK" = "1" ; then
-                        _version=$_version_tmp
-                    fi
-                    VERSION_UNDERSCORE=`echo $_version | sed 's/\./_/'`
-                    BOOST_CPPFLAGS="-I$ac_boost_path/include/boost-$VERSION_UNDERSCORE"
-                done
-            fi
-        else
-            if test "$cross_compiling" != yes; then
-                for ac_boost_path in /usr /usr/local /opt /opt/local ; do
-                    if test -d "$ac_boost_path" && test -r "$ac_boost_path"; then
-                        for i in `ls -d $ac_boost_path/include/boost-* 2>/dev/null`; do
-                            _version_tmp=`echo $i | sed "s#$ac_boost_path##" | sed 's/\/include\/boost-//' | sed 's/_/./'`
-                            V_CHECK=`expr $_version_tmp \> $_version`
-                            if test "$V_CHECK" = "1" ; then
-                                _version=$_version_tmp
-                                best_path=$ac_boost_path
-                            fi
-                        done
-                    fi
-                done
-
-                VERSION_UNDERSCORE=`echo $_version | sed 's/\./_/'`
-                BOOST_CPPFLAGS="-I$best_path/include/boost-$VERSION_UNDERSCORE"
-                if test "$ac_boost_lib_path" = ""; then
-                    for libsubdir in $libsubdirs ; do
-                        if ls "$best_path/$libsubdir/libboost_"* >/dev/null 2>&1 ; then break; fi
-                    done
-                    BOOST_LDFLAGS="-L$best_path/$libsubdir"
-                fi
-            fi
-
-            if test "x$BOOST_ROOT" != "x"; then
-                for libsubdir in $libsubdirs ; do
-                    if ls "$BOOST_ROOT/stage/$libsubdir/libboost_"* >/dev/null 2>&1 ; then break; fi
-                done
-                if test -d "$BOOST_ROOT" && test -r "$BOOST_ROOT" && test -d "$BOOST_ROOT/stage/$libsubdir" && test -r "$BOOST_ROOT/stage/$libsubdir"; then
-                    version_dir=`expr //$BOOST_ROOT : '.*/\(.*\)'`
-                    stage_version=`echo $version_dir | sed 's/boost_//' | sed 's/_/./g'`
-                        stage_version_shorten=`expr $stage_version : '\([[0-9]]*\.[[0-9]]*\)'`
-                    V_CHECK=`expr $stage_version_shorten \>\= $_version`
-                    if test "$V_CHECK" = "1" -a "$ac_boost_lib_path" = "" ; then
-                        AC_MSG_NOTICE(We will use a staged boost library from $BOOST_ROOT)
-                        BOOST_CPPFLAGS="-I$BOOST_ROOT"
-                        BOOST_LDFLAGS="-L$BOOST_ROOT/stage/$libsubdir"
-                    fi
-                fi
-            fi
-        fi
-
-        CPPFLAGS="$CPPFLAGS $BOOST_CPPFLAGS"
-        export CPPFLAGS
-        LDFLAGS="$LDFLAGS $BOOST_LDFLAGS"
-        export LDFLAGS
-
-        AC_LANG_PUSH(C++)
-            AC_COMPILE_IFELSE([AC_LANG_PROGRAM([[
-        @%:@include <boost/version.hpp>
-        ]], [[
-        #if BOOST_VERSION >= $WANT_BOOST_VERSION
-        // Everything is okay
-        #else
-        #  error Boost version is too old
-        #endif
-        ]])],[
-            AC_MSG_RESULT(yes)
-        succeeded=yes
-        found_system=yes
-            ],[
-            ])
-        AC_LANG_POP([C++])
-    fi
-
-    if test "$succeeded" != "yes" ; then
-        if test "$_version" = "0" ; then
-            AC_MSG_NOTICE([[We could not detect the boost libraries (version $boost_lib_version_req_shorten or higher). If you have a staged boost library (still not installed) please specify \$BOOST_ROOT in your environment and do not give a PATH to --with-boost option.  If you are sure you have boost installed, then check your version number looking in <boost/version.hpp>. See http://randspringer.de/boost for more documentation.]])
-        else
-            AC_MSG_NOTICE([Your boost libraries seems to old (version $_version).])
-        fi
-        # execute ACTION-IF-NOT-FOUND (if present):
-        ifelse([$3], , :, [$3])
-    else
-        AC_SUBST(BOOST_CPPFLAGS)
-        AC_SUBST(BOOST_LDFLAGS)
-        AC_DEFINE(HAVE_BOOST,,[define if the Boost library is available])
-        # execute ACTION-IF-FOUND (if present):
-        ifelse([$2], , :, [$2])
-    fi
-
-    CPPFLAGS="$CPPFLAGS_SAVED"
-    LDFLAGS="$LDFLAGS_SAVED"
-fi
-
-])

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/cpp/m4/ax_boost_thread.m4
----------------------------------------------------------------------
diff --git a/hedwig-client/src/main/cpp/m4/ax_boost_thread.m4 b/hedwig-client/src/main/cpp/m4/ax_boost_thread.m4
deleted file mode 100644
index fb7e530..0000000
--- a/hedwig-client/src/main/cpp/m4/ax_boost_thread.m4
+++ /dev/null
@@ -1,149 +0,0 @@
-# ===========================================================================
-#      http://www.gnu.org/software/autoconf-archive/ax_boost_thread.html
-# ===========================================================================
-#
-# SYNOPSIS
-#
-#   AX_BOOST_THREAD
-#
-# DESCRIPTION
-#
-#   Test for Thread library from the Boost C++ libraries. The macro requires
-#   a preceding call to AX_BOOST_BASE. Further documentation is available at
-#   <http://randspringer.de/boost/index.html>.
-#
-#   This macro calls:
-#
-#     AC_SUBST(BOOST_THREAD_LIB)
-#
-#   And sets:
-#
-#     HAVE_BOOST_THREAD
-#
-# LICENSE
-#
-#   Copyright (c) 2009 Thomas Porschberg <thomas@randspringer.de>
-#   Copyright (c) 2009 Michael Tindal
-#
-#   Copying and distribution of this file, with or without modification, are
-#   permitted in any medium without royalty provided the copyright notice
-#   and this notice are preserved. This file is offered as-is, without any
-#   warranty.
-
-#serial 17
-
-AC_DEFUN([AX_BOOST_THREAD],
-[
-	AC_ARG_WITH([boost-thread],
-	AS_HELP_STRING([--with-boost-thread@<:@=special-lib@:>@],
-                   [use the Thread library from boost - it is possible to specify a certain library for the linker
-                        e.g. --with-boost-thread=boost_thread-gcc-mt ]),
-        [
-        if test "$withval" = "no"; then
-			want_boost="no"
-        elif test "$withval" = "yes"; then
-            want_boost="yes"
-            ax_boost_user_thread_lib=""
-        else
-		    want_boost="yes"
-        	ax_boost_user_thread_lib="$withval"
-		fi
-        ],
-        [want_boost="yes"]
-	)
-
-	if test "x$want_boost" = "xyes"; then
-        AC_REQUIRE([AC_PROG_CC])
-        AC_REQUIRE([AC_CANONICAL_BUILD])
-		CPPFLAGS_SAVED="$CPPFLAGS"
-		CPPFLAGS="$CPPFLAGS $BOOST_CPPFLAGS"
-		export CPPFLAGS
-
-		LDFLAGS_SAVED="$LDFLAGS"
-		LDFLAGS="$LDFLAGS $BOOST_LDFLAGS"
-		export LDFLAGS
-
-        AC_CACHE_CHECK(whether the Boost::Thread library is available,
-					   ax_cv_boost_thread,
-        [AC_LANG_PUSH([C++])
-			 CXXFLAGS_SAVE=$CXXFLAGS
-
-			 if test "x$build_os" = "xsolaris" ; then
-  				 CXXFLAGS="-pthreads $CXXFLAGS"
-			 elif test "x$build_os" = "xming32" ; then
-				 CXXFLAGS="-mthreads $CXXFLAGS"
-			 else
-				CXXFLAGS="-pthread $CXXFLAGS"
-			 fi
-			 AC_COMPILE_IFELSE(AC_LANG_PROGRAM([[@%:@include <boost/thread/thread.hpp>]],
-                                   [[boost::thread_group thrds;
-                                   return 0;]]),
-                   ax_cv_boost_thread=yes, ax_cv_boost_thread=no)
-			 CXXFLAGS=$CXXFLAGS_SAVE
-             AC_LANG_POP([C++])
-		])
-		if test "x$ax_cv_boost_thread" = "xyes"; then
-           if test "x$build_os" = "xsolaris" ; then
-			  BOOST_CPPFLAGS="-pthreads $BOOST_CPPFLAGS"
-		   elif test "x$build_os" = "xming32" ; then
-			  BOOST_CPPFLAGS="-mthreads $BOOST_CPPFLAGS"
-		   else
-			  BOOST_CPPFLAGS="-pthread $BOOST_CPPFLAGS"
-		   fi
-
-			AC_SUBST(BOOST_CPPFLAGS)
-
-			AC_DEFINE(HAVE_BOOST_THREAD,,[define if the Boost::Thread library is available])
-            BOOSTLIBDIR=`echo $BOOST_LDFLAGS | sed -e 's/@<:@^\/@:>@*//'`
-
-			LDFLAGS_SAVE=$LDFLAGS
-                        case "x$build_os" in
-                          *bsd* )
-                               LDFLAGS="-pthread $LDFLAGS"
-                          break;
-                          ;;
-                        esac
-            if test "x$ax_boost_user_thread_lib" = "x"; then
-                for libextension in `ls $BOOSTLIBDIR/libboost_thread*.so* 2>/dev/null | sed 's,.*/,,' | sed -e 's;^lib\(boost_thread.*\)\.so.*$;\1;'` `ls $BOOSTLIBDIR/libboost_thread*.a* 2>/dev/null | sed 's,.*/,,' | sed -e 's;^lib\(boost_thread.*\)\.a*$;\1;'`; do
-                     ax_lib=${libextension}
-				    AC_CHECK_LIB($ax_lib, exit,
-                                 [BOOST_THREAD_LIB="-l$ax_lib"; AC_SUBST(BOOST_THREAD_LIB) link_thread="yes"; break],
-                                 [link_thread="no"])
-  				done
-                if test "x$link_thread" != "xyes"; then
-                for libextension in `ls $BOOSTLIBDIR/boost_thread*.dll* 2>/dev/null | sed 's,.*/,,' | sed -e 's;^\(boost_thread.*\)\.dll.*$;\1;'` `ls $BOOSTLIBDIR/boost_thread*.a* 2>/dev/null | sed 's,.*/,,' | sed -e 's;^\(boost_thread.*\)\.a*$;\1;'` ; do
-                     ax_lib=${libextension}
-				    AC_CHECK_LIB($ax_lib, exit,
-                                 [BOOST_THREAD_LIB="-l$ax_lib"; AC_SUBST(BOOST_THREAD_LIB) link_thread="yes"; break],
-                                 [link_thread="no"])
-  				done
-                fi
-
-            else
-               for ax_lib in $ax_boost_user_thread_lib boost_thread-$ax_boost_user_thread_lib; do
-				      AC_CHECK_LIB($ax_lib, exit,
-                                   [BOOST_THREAD_LIB="-l$ax_lib"; AC_SUBST(BOOST_THREAD_LIB) link_thread="yes"; break],
-                                   [link_thread="no"])
-                  done
-
-            fi
-            if test "x$ax_lib" = "x"; then
-                AC_MSG_ERROR(Could not find a version of the library!)
-            fi
-			if test "x$link_thread" = "xno"; then
-				AC_MSG_ERROR(Could not link against $ax_lib !)
-                        else
-                           case "x$build_os" in
-                              *bsd* )
-			        BOOST_LDFLAGS="-pthread $BOOST_LDFLAGS"
-                              break;
-                              ;;
-                           esac
-
-			fi
-		fi
-
-		CPPFLAGS="$CPPFLAGS_SAVED"
-    	LDFLAGS="$LDFLAGS_SAVED"
-	fi
-])


Mime
View raw message