bookkeeper-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From si...@apache.org
Subject [26/49] bookkeeper git commit: BOOKKEEPER-769: Remove the Hedwig Code
Date Wed, 16 Mar 2016 03:44:36 GMT
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/cpp/test/pubsubtest.cpp
----------------------------------------------------------------------
diff --git a/hedwig-client/src/main/cpp/test/pubsubtest.cpp b/hedwig-client/src/main/cpp/test/pubsubtest.cpp
deleted file mode 100644
index 9baba1d..0000000
--- a/hedwig-client/src/main/cpp/test/pubsubtest.cpp
+++ /dev/null
@@ -1,735 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifdef HAVE_CONFIG_H
-#include <config.h>
-#endif
-
-#include <sstream>
-
-#include "gtest/gtest.h"
-#include <boost/thread/mutex.hpp>
-
-#include "../lib/clientimpl.h"
-#include <hedwig/exceptions.h>
-#include <hedwig/callback.h>
-#include <stdexcept>
-#include <pthread.h>
-
-#include <log4cxx/logger.h>
-
-#include "util.h"
-
-static log4cxx::LoggerPtr logger(log4cxx::Logger::getLogger("hedwig."__FILE__));
-
-class StartStopDeliveryMsgHandler : public Hedwig::MessageHandlerCallback {
-public:
-  StartStopDeliveryMsgHandler(Hedwig::Subscriber& subscriber, const int nextValue)
-    : subscriber(subscriber), nextValue(nextValue) {}
-
-  virtual void consume(const std::string& topic, const std::string& subscriberId,
-                       const Hedwig::Message& msg,
-                       Hedwig::OperationCallbackPtr& callback) {
-    {
-      boost::lock_guard<boost::mutex> lock(mutex);
-
-      int curVal = atoi(msg.body().c_str());
-      LOG4CXX_DEBUG(logger, "received message " << curVal);
-      if (curVal == nextValue) {
-        ++nextValue;
-      }
-      callback->operationComplete();
-    }
-    ASSERT_THROW(subscriber.startDelivery(topic, subscriberId,
-                                          Hedwig::MessageHandlerCallbackPtr()),
-                 Hedwig::StartingDeliveryException);
-    ASSERT_THROW(subscriber.stopDelivery(topic, subscriberId),
-                 Hedwig::StartingDeliveryException);
-  }
-
-  int getNextValue() {
-    return nextValue;
-  }
-
-private:
-  Hedwig::Subscriber& subscriber;
-  boost::mutex mutex;
-  int nextValue;
-};
-
-class PubSubMessageHandlerCallback : public Hedwig::MessageHandlerCallback {
-public:
-  PubSubMessageHandlerCallback(const std::string& topic, const std::string& subscriberId) : messagesReceived(0), topic(topic), subscriberId(subscriberId) {
-  }
-
-  virtual void consume(const std::string& topic, const std::string& subscriberId, const Hedwig::Message& msg, Hedwig::OperationCallbackPtr& callback) {
-    if (topic == this->topic && subscriberId == this->subscriberId) {
-      boost::lock_guard<boost::mutex> lock(mutex);
-      
-      messagesReceived++;
-      lastMessage = msg.body();
-      callback->operationComplete();
-    }
-  }
-    
-  std::string getLastMessage() {
-    boost::lock_guard<boost::mutex> lock(mutex);
-    std::string s = lastMessage;
-    return s;
-  }
-
-  int numMessagesReceived() {
-    boost::lock_guard<boost::mutex> lock(mutex);
-    int i = messagesReceived;
-    return i;
-  }    
-    
-protected:
-  boost::mutex mutex;
-  int messagesReceived;
-  std::string lastMessage;
-  std::string topic;
-  std::string subscriberId;
-};
-
-// order checking callback
-class PubSubOrderCheckingMessageHandlerCallback : public Hedwig::MessageHandlerCallback {
-public:
-  PubSubOrderCheckingMessageHandlerCallback(const std::string& topic, const std::string& subscriberId, const int startMsgId, const int sleepTimeInConsume)
-    : topic(topic), subscriberId(subscriberId), startMsgId(startMsgId),
-      nextMsgId(startMsgId), isInOrder(true), sleepTimeInConsume(sleepTimeInConsume) {
-  }
-
-  virtual void consume(const std::string& topic, const std::string& subscriberId,
-		       const Hedwig::Message& msg, Hedwig::OperationCallbackPtr& callback) {
-    if (topic == this->topic && subscriberId == this->subscriberId) {
-      boost::lock_guard<boost::mutex> lock(mutex);
-
-      int newMsgId = atoi(msg.body().c_str());
-      if (newMsgId == nextMsgId + 1) {
-        // only calculate unduplicated entries
-        ++nextMsgId;
-      }
-
-      // checking msgId
-      LOG4CXX_DEBUG(logger, "received message " << newMsgId);
-      if (startMsgId >= 0) { // need to check ordering if start msg id is larger than 0
-	if (isInOrder) {
-          // in some environments, ssl channel encountering error like Bad File Descriptor.
-          // the channel would disconnect and reconnect. A duplicated message would be received.
-          // so just checking we received a larger out-of-order message.
-	  if (newMsgId > startMsgId + 1) {
-	    LOG4CXX_ERROR(logger, "received out-of-order message : expected " << (startMsgId + 1) << ", actual " << newMsgId);
-	    isInOrder = false;
-	  } else {
-	    startMsgId = newMsgId;
-	  }
-	}
-      } else { // we set first msg id as startMsgId when startMsgId is -1
-	startMsgId = newMsgId;
-      }
-      callback->operationComplete();
-      sleep(sleepTimeInConsume);
-    }
-  }
-    
-  int nextExpectedMsgId() {
-    boost::lock_guard<boost::mutex> lock(mutex);
-    return nextMsgId;
-  }    
-
-  bool inOrder() {
-    boost::lock_guard<boost::mutex> lock(mutex);
-    return isInOrder;
-  }
-    
-protected:
-  boost::mutex mutex;
-  std::string topic;
-  std::string subscriberId;
-  int startMsgId;
-  int nextMsgId;
-  bool isInOrder;
-  int sleepTimeInConsume;
-};
-
-// Publisher integer until finished
-class IntegerPublisher {
-public:
-  IntegerPublisher(const std::string &topic, int startMsgId, int numMsgs, int sleepTime, Hedwig::Publisher &pub, long runTime)
-    : topic(topic), startMsgId(startMsgId), numMsgs(numMsgs), sleepTime(sleepTime), pub(pub), running(true), runTime(runTime) {
-  }
-
-  void operator()() {
-    int i = 1;
-    long beginTime = curTime();
-    long elapsedTime = 0;
-
-    while (running) {
-      try {
-	int msg = startMsgId + i;
-	std::stringstream ss;
-	ss << msg;
-	pub.publish(topic, ss.str());
-	sleep(sleepTime);
-	if (numMsgs > 0 && i >= numMsgs) {
-	  running = false;
-	} else {
-	  if (i % 100 == 0 &&
-	      (elapsedTime = (curTime() - beginTime)) >= runTime) {
-	    LOG4CXX_DEBUG(logger, "Elapsed time : " << elapsedTime);
-	    running = false;
-	  }
-	}
-	++i;
-      } catch (std::exception &e) {
-	LOG4CXX_WARN(logger, "Exception when publishing messages : " << e.what());
-      }
-    } 
-  }
-
-  long curTime() {
-    struct timeval tv;
-    long mtime;
-    gettimeofday(&tv, NULL);
-    mtime = tv.tv_sec * 1000 + tv.tv_usec / 1000.0 + 0.5;
-    return mtime;
-  }
-
-private:
-  std::string topic;
-  int startMsgId;
-  int numMsgs;
-  int sleepTime;
-  Hedwig::Publisher& pub;
-  bool running;
-  long runTime;
-};
-
-TEST(PubSubTest, testStartDeliveryWithoutSub) {
-  Hedwig::Configuration* conf = new TestServerConfiguration();
-  std::auto_ptr<Hedwig::Configuration> confptr(conf);
-  
-  Hedwig::Client* client = new Hedwig::Client(*conf);
-  std::auto_ptr<Hedwig::Client> clientptr(client);
-
-  Hedwig::Subscriber& sub = client->getSubscriber();
-
-  std::string topic = "testStartDeliveryWithoutSub";
-  std::string sid = "mysub";
-
-  PubSubMessageHandlerCallback* cb = new PubSubMessageHandlerCallback(topic, sid);
-  Hedwig::MessageHandlerCallbackPtr handler(cb);
-  ASSERT_THROW(sub.startDelivery(topic, sid, handler),
-               Hedwig::NotSubscribedException);
-}
-
-TEST(PubSubTest, testAlreadyStartDelivery) {
-  Hedwig::Configuration* conf = new TestServerConfiguration();
-  std::auto_ptr<Hedwig::Configuration> confptr(conf);
-  
-  Hedwig::Client* client = new Hedwig::Client(*conf);
-  std::auto_ptr<Hedwig::Client> clientptr(client);
-
-  Hedwig::Subscriber& sub = client->getSubscriber();
-
-  std::string topic = "testAlreadyStartDelivery";
-  std::string sid = "mysub";
-
-  sub.subscribe(topic, sid, Hedwig::SubscribeRequest::CREATE_OR_ATTACH);
-
-  PubSubMessageHandlerCallback* cb = new PubSubMessageHandlerCallback(topic, sid);
-  Hedwig::MessageHandlerCallbackPtr handler(cb);
-  sub.startDelivery(topic, sid, handler);
-  ASSERT_THROW(sub.startDelivery(topic, sid, handler),
-               Hedwig::AlreadyStartDeliveryException);
-}
-
-TEST(PubSubTest, testStopDeliveryWithoutSub) {
-  Hedwig::Configuration* conf = new TestServerConfiguration();
-  std::auto_ptr<Hedwig::Configuration> confptr(conf);
-  
-  Hedwig::Client* client = new Hedwig::Client(*conf);
-  std::auto_ptr<Hedwig::Client> clientptr(client);
-
-  Hedwig::Subscriber& sub = client->getSubscriber();
-  ASSERT_THROW(sub.stopDelivery("testStopDeliveryWithoutSub", "mysub"),
-               Hedwig::NotSubscribedException);
-}
-
-TEST(PubSubTest, testStopDeliveryTwice) {
-  Hedwig::Configuration* conf = new TestServerConfiguration();
-  std::auto_ptr<Hedwig::Configuration> confptr(conf);
-  
-  Hedwig::Client* client = new Hedwig::Client(*conf);
-  std::auto_ptr<Hedwig::Client> clientptr(client);
-
-  Hedwig::Subscriber& sub = client->getSubscriber();
-
-  std::string topic = "testStopDeliveryTwice";
-  std::string subid = "mysub";
-
-  sub.subscribe(topic, subid, Hedwig::SubscribeRequest::CREATE_OR_ATTACH);
-
-  // it is ok to stop delivery without start delivery
-  sub.stopDelivery(topic, subid);
-
-  PubSubMessageHandlerCallback* cb = new PubSubMessageHandlerCallback(topic, subid);
-  Hedwig::MessageHandlerCallbackPtr handler(cb);
-  sub.startDelivery(topic, subid, handler);
-  sub.stopDelivery(topic, subid);
-  // stop again
-  sub.stopDelivery(topic, subid);
-}
-
-// test startDelivery / stopDelivery in msg handler
-TEST(PubSubTest, testStartStopDeliveryInMsgHandler) {
-  std::string topic("startStopDeliveryInMsgHandler");
-  std::string subscriber("mysubid");
-
-  Hedwig::Configuration* conf = new TestServerConfiguration();
-  std::auto_ptr<Hedwig::Configuration> confptr(conf);
-
-  Hedwig::Client* client = new Hedwig::Client(*conf);
-  std::auto_ptr<Hedwig::Client> clientptr(client);
-
-  Hedwig::Subscriber& sub = client->getSubscriber();
-  Hedwig::Publisher& pub = client->getPublisher();
-
-  // subscribe topic
-  sub.subscribe(topic, subscriber, Hedwig::SubscribeRequest::CREATE_OR_ATTACH);
-
-  int numMsgs = 5;
-
-  for (int i=0; i<numMsgs; i++) {
-    std::stringstream oss;
-    oss << i;
-    pub.publish(topic, oss.str());
-  }
-
-  // sleep for a while to wait all messages are sent to subscribe and queue them
-  sleep(1);
-
-  StartStopDeliveryMsgHandler* cb = new StartStopDeliveryMsgHandler(sub, 0);
-  Hedwig::MessageHandlerCallbackPtr handler(cb);
-  sub.startDelivery(topic, subscriber, handler);
-
-  for (int i=0 ; i<10; i++) {
-    if (cb->getNextValue() == numMsgs) {
-      break;
-    } else {
-      sleep(1);
-    }
-  }
-  ASSERT_TRUE(cb->getNextValue() == numMsgs);
-
-  sub.stopDelivery(topic, subscriber);
-  sub.closeSubscription(topic, subscriber);
-}
-
-// test startDelivery / stopDelivery randomly
-TEST(PubSubTest, testRandomDelivery) {
-   std::string topic = "randomDeliveryTopic";
-   std::string subscriber = "mysub-randomDelivery";
-
-   int nLoops = 300;
-   int sleepTimePerLoop = 1;
-   int syncTimeout = 10000;
-
-   Hedwig::Configuration* conf = new TestServerConfiguration(syncTimeout);
-   std::auto_ptr<Hedwig::Configuration> confptr(conf);
-
-   Hedwig::Client* client = new Hedwig::Client(*conf);
-   std::auto_ptr<Hedwig::Client> clientptr(client);
-
-   Hedwig::Subscriber& sub = client->getSubscriber();
-   Hedwig::Publisher& pub = client->getPublisher();
-
-   // subscribe topic
-   sub.subscribe(topic, subscriber, Hedwig::SubscribeRequest::CREATE_OR_ATTACH);
-
-   // start thread to publish message
-   IntegerPublisher intPublisher = IntegerPublisher(topic, 0, 0, 0, pub, nLoops * sleepTimePerLoop * 1000);
-   boost::thread pubThread(intPublisher);
-
-   // start random delivery
-   PubSubOrderCheckingMessageHandlerCallback* cb =
-     new PubSubOrderCheckingMessageHandlerCallback(topic, subscriber, 0, 0);
-   Hedwig::MessageHandlerCallbackPtr handler(cb);
-
-   for (int i = 0; i < nLoops; i++) {
-     LOG4CXX_DEBUG(logger, "Randomly Delivery : " << i);
-     sub.startDelivery(topic, subscriber, handler);
-     // sleep random time
-     usleep(rand()%1000000);
-     sub.stopDelivery(topic, subscriber);
-     ASSERT_TRUE(cb->inOrder());
-   }
-
-   pubThread.join();
- }
-
- // check message ordering
- TEST(PubSubTest, testPubSubOrderChecking) {
-   std::string topic = "orderCheckingTopic";
-   std::string sid = "mysub-0";
-
-   int numMessages = 5;
-   int sleepTimeInConsume = 1;
-   // sync timeout
-   int syncTimeout = 10000;
-
-   // in order to guarantee message order, message queue should be locked
-   // so message received in io thread would be blocked, which also block
-   // sent operations (publish). because we have only one io thread now
-   // so increase sync timeout to 10s, which is more than numMessages * sleepTimeInConsume
-   Hedwig::Configuration* conf = new TestServerConfiguration(syncTimeout);
-   std::auto_ptr<Hedwig::Configuration> confptr(conf);
-
-   Hedwig::Client* client = new Hedwig::Client(*conf);
-   std::auto_ptr<Hedwig::Client> clientptr(client);
-
-   Hedwig::Subscriber& sub = client->getSubscriber();
-   Hedwig::Publisher& pub = client->getPublisher();
-
-   sub.subscribe(topic, sid, Hedwig::SubscribeRequest::CREATE_OR_ATTACH);
-
-   // we don't start delivery first, so the message will be queued
-   // publish ${numMessages} messages, so the messages will be queued
-   for (int i=1; i<=numMessages; i++) {
-     std::stringstream ss;
-     ss << i;
-     pub.publish(topic, ss.str()); 
-   }
-
-   PubSubOrderCheckingMessageHandlerCallback* cb = new PubSubOrderCheckingMessageHandlerCallback(topic, sid, 0, sleepTimeInConsume);
-   Hedwig::MessageHandlerCallbackPtr handler(cb);
-
-   // create a thread to publish another ${numMessages} messages
-   boost::thread pubThread(IntegerPublisher(topic, numMessages, numMessages, sleepTimeInConsume, pub, 0));
-
-   // start delivery will consumed the queued messages
-   // new message will recevied and the queued message should be consumed
-   // hedwig should ensure the message are received in order
-   sub.startDelivery(topic, sid, handler);
-
-   // wait until message are all published
-   pubThread.join();
-
-   for (int i = 0; i < 10; i++) {
-     sleep(3);
-     if (cb->nextExpectedMsgId() == 2 * numMessages) {
-       break;
-     }
-   }
-   ASSERT_TRUE(cb->inOrder());
- }
-
- // check message ordering
- TEST(PubSubTest, testPubSubInMultiDispatchThreads) {
-   std::string topic = "PubSubInMultiDispatchThreadsTopic-";
-   std::string sid = "mysub-0";
-
-   int syncTimeout = 10000;
-   int numDispatchThreads = 4;
-   int numMessages = 100;
-   int numTopics = 20;
-
-   Hedwig::Configuration* conf = new TestServerConfiguration(syncTimeout, numDispatchThreads);
-   std::auto_ptr<Hedwig::Configuration> confptr(conf);
-
-   Hedwig::Client* client = new Hedwig::Client(*conf);
-   std::auto_ptr<Hedwig::Client> clientptr(client);
-
-   Hedwig::Subscriber& sub = client->getSubscriber();
-   Hedwig::Publisher& pub = client->getPublisher();
-
-   std::vector<Hedwig::MessageHandlerCallbackPtr> callbacks;
-
-   for (int i=0; i<numTopics; i++) {
-     std::stringstream ss;
-     ss << topic << i;
-     sub.subscribe(ss.str(), sid, Hedwig::SubscribeRequest::CREATE_OR_ATTACH);
-
-     PubSubOrderCheckingMessageHandlerCallback* cb = new PubSubOrderCheckingMessageHandlerCallback(ss.str(), sid, 0, 0);
-     Hedwig::MessageHandlerCallbackPtr handler(cb);
-     sub.startDelivery(ss.str(), sid, handler);
-     callbacks.push_back(handler);
-   }
-
-   std::vector<boost::shared_ptr<boost::thread> > threads;
-
-   for (int i=0; i<numTopics; i++) {
-     std::stringstream ss;
-     ss << topic << i;
-     boost::shared_ptr<boost::thread> t = boost::shared_ptr<boost::thread>(
-									   new boost::thread(IntegerPublisher(ss.str(), 0, numMessages, 0, pub, 0)));
-     threads.push_back(t);
-   }
-
-   for (int i=0; i<numTopics; i++) {
-     threads[i]->join();
-   }
-   threads.clear();
-
-   for (int j=0; j<numTopics; j++) {
-     PubSubOrderCheckingMessageHandlerCallback *cb =
-       (PubSubOrderCheckingMessageHandlerCallback *)(callbacks[j].get());
-     for (int i = 0; i < 10; i++) {
-       if (cb->nextExpectedMsgId() == numMessages) {
-	 break;
-       }
-       sleep(3);
-     }
-     ASSERT_TRUE(cb->inOrder());
-   }
-   callbacks.clear();
- }
-
- TEST(PubSubTest, testPubSubContinuousOverClose) {
-   std::string topic = "pubSubTopic";
-   std::string sid = "MySubscriberid-1";
-
-   Hedwig::Configuration* conf = new TestServerConfiguration();
-   std::auto_ptr<Hedwig::Configuration> confptr(conf);
-
-   Hedwig::Client* client = new Hedwig::Client(*conf);
-   std::auto_ptr<Hedwig::Client> clientptr(client);
-
-   Hedwig::Subscriber& sub = client->getSubscriber();
-   Hedwig::Publisher& pub = client->getPublisher();
-
-   sub.subscribe(topic, sid, Hedwig::SubscribeRequest::CREATE_OR_ATTACH);
-   PubSubMessageHandlerCallback* cb = new PubSubMessageHandlerCallback(topic, sid);
-   Hedwig::MessageHandlerCallbackPtr handler(cb);
-
-   sub.startDelivery(topic, sid, handler);
-   pub.publish(topic, "Test Message 1");
-   bool pass = false;
-   for (int i = 0; i < 10; i++) {
-     sleep(3);
-     if (cb->numMessagesReceived() > 0) {
-       if (cb->getLastMessage() == "Test Message 1") {
-	 pass = true;
-	 break;
-       }
-     }
-   }
-   ASSERT_TRUE(pass);
-   sub.closeSubscription(topic, sid);
-
-   pub.publish(topic, "Test Message 2");
-
-   sub.subscribe(topic, sid, Hedwig::SubscribeRequest::CREATE_OR_ATTACH);
-   sub.startDelivery(topic, sid, handler);
-   pass = false;
-   for (int i = 0; i < 10; i++) {
-     sleep(3);
-     if (cb->numMessagesReceived() > 0) {
-       if (cb->getLastMessage() == "Test Message 2") {
-	 pass = true;
-	 break;
-       }
-     }
-   }
-   ASSERT_TRUE(pass);
- }
-
-
- /*  void testPubSubContinuousOverServerDown() {
-     std::string topic = "pubSubTopic";
-     std::string sid = "MySubscriberid-1";
-
-     Hedwig::Configuration* conf = new TestServerConfiguration();
-     std::auto_ptr<Hedwig::Configuration> confptr(conf);
-
-     Hedwig::Client* client = new Hedwig::Client(*conf);
-     std::auto_ptr<Hedwig::Client> clientptr(client);
-
-     Hedwig::Subscriber& sub = client->getSubscriber();
-     Hedwig::Publisher& pub = client->getPublisher();
-
-     sub.subscribe(topic, sid, Hedwig::SubscribeRequest::CREATE_OR_ATTACH);
-     PubSubMessageHandlerCallback* cb = new PubSubMessageHandlerCallback(topic, sid);
-     Hedwig::MessageHandlerCallbackPtr handler(cb);
-
-     sub.startDelivery(topic, sid, handler);
-     pub.publish(topic, "Test Message 1");
-     bool pass = false;
-     for (int i = 0; i < 10; i++) {
-     sleep(3);
-     if (cb->numMessagesReceived() > 0) {
-     if (cb->getLastMessage() == "Test Message 1") {
-     pass = true;
-     break;
-     }
-     }
-     }
-     CPPUNIT_ASSERT(pass);
-     sub.closeSubscription(topic, sid);
-
-     pub.publish(topic, "Test Message 2");
-
-     sub.subscribe(topic, sid, Hedwig::SubscribeRequest::CREATE_OR_ATTACH);
-     sub.startDelivery(topic, sid, handler);
-     pass = false;
-     for (int i = 0; i < 10; i++) {
-     sleep(3);
-     if (cb->numMessagesReceived() > 0) {
-     if (cb->getLastMessage() == "Test Message 2") {
-     pass = true;
-     break;
-     }
-     }
-     }
-     CPPUNIT_ASSERT(pass);
-     }*/
-
- TEST(PubSubTest, testMultiTopic) {
-   std::string topicA = "pubSubTopicA";
-   std::string topicB = "pubSubTopicB";
-   std::string sid = "MySubscriberid-3";
-
-   Hedwig::Configuration* conf = new TestServerConfiguration();
-   std::auto_ptr<Hedwig::Configuration> confptr(conf);
-
-   Hedwig::Client* client = new Hedwig::Client(*conf);
-   std::auto_ptr<Hedwig::Client> clientptr(client);
-
-   Hedwig::Subscriber& sub = client->getSubscriber();
-   Hedwig::Publisher& pub = client->getPublisher();
-
-   sub.subscribe(topicA, sid, Hedwig::SubscribeRequest::CREATE_OR_ATTACH);
-   sub.subscribe(topicB, sid, Hedwig::SubscribeRequest::CREATE_OR_ATTACH);
-   
-  PubSubMessageHandlerCallback* cbA = new PubSubMessageHandlerCallback(topicA, sid);
-  Hedwig::MessageHandlerCallbackPtr handlerA(cbA);
-  sub.startDelivery(topicA, sid, handlerA);
-
-  PubSubMessageHandlerCallback* cbB = new PubSubMessageHandlerCallback(topicB, sid);
-  Hedwig::MessageHandlerCallbackPtr handlerB(cbB);
-  sub.startDelivery(topicB, sid, handlerB);
-
-  pub.publish(topicA, "Test Message A");
-  pub.publish(topicB, "Test Message B");
-  int passA = false, passB = false;
-    
-  for (int i = 0; i < 10; i++) {
-    sleep(3);
-    if (cbA->numMessagesReceived() > 0) {
-      if (cbA->getLastMessage() == "Test Message A") {
-	passA = true;
-      }
-    }
-    if (cbB->numMessagesReceived() > 0) {
-      if (cbB->getLastMessage() == "Test Message B") {
-	passB = true;
-      }
-    }
-    if (passA && passB) {
-      break;
-    }
-  }
-  ASSERT_TRUE(passA && passB);
-}
-
-TEST(PubSubTest, testMultiTopicMultiSubscriber) {
-  std::string topicA = "pubSubTopicA";
-  std::string topicB = "pubSubTopicB";
-  std::string sidA = "MySubscriberid-4";
-  std::string sidB = "MySubscriberid-5";
-
-  Hedwig::Configuration* conf = new TestServerConfiguration();
-  std::auto_ptr<Hedwig::Configuration> confptr(conf);
-    
-  Hedwig::Client* client = new Hedwig::Client(*conf);
-  std::auto_ptr<Hedwig::Client> clientptr(client);
-
-  Hedwig::Subscriber& sub = client->getSubscriber();
-  Hedwig::Publisher& pub = client->getPublisher();
-
-  sub.subscribe(topicA, sidA, Hedwig::SubscribeRequest::CREATE_OR_ATTACH);
-  sub.subscribe(topicB, sidB, Hedwig::SubscribeRequest::CREATE_OR_ATTACH);
-   
-  PubSubMessageHandlerCallback* cbA = new PubSubMessageHandlerCallback(topicA, sidA);
-  Hedwig::MessageHandlerCallbackPtr handlerA(cbA);
-  sub.startDelivery(topicA, sidA, handlerA);
-
-  PubSubMessageHandlerCallback* cbB = new PubSubMessageHandlerCallback(topicB, sidB);
-  Hedwig::MessageHandlerCallbackPtr handlerB(cbB);
-  sub.startDelivery(topicB, sidB, handlerB);
-
-  pub.publish(topicA, "Test Message A");
-  pub.publish(topicB, "Test Message B");
-  int passA = false, passB = false;
-    
-  for (int i = 0; i < 10; i++) {
-    sleep(3);
-    if (cbA->numMessagesReceived() > 0) {
-      if (cbA->getLastMessage() == "Test Message A") {
-	passA = true;
-      }
-    }
-    if (cbB->numMessagesReceived() > 0) {
-      if (cbB->getLastMessage() == "Test Message B") {
-	passB = true;
-      }
-    }
-    if (passA && passB) {
-      break;
-    }
-  }
-  ASSERT_TRUE(passA && passB);
-}
-
-static const int BIG_MESSAGE_SIZE = 16436*2; // MTU to lo0 is 16436 by default on linux
-
-TEST(PubSubTest, testBigMessage) {
-  std::string topic = "pubSubTopic";
-  std::string sid = "MySubscriberid-6";
-
-  Hedwig::Configuration* conf = new TestServerConfiguration();
-  std::auto_ptr<Hedwig::Configuration> confptr(conf);
-    
-  Hedwig::Client* client = new Hedwig::Client(*conf);
-  std::auto_ptr<Hedwig::Client> clientptr(client);
-
-  Hedwig::Subscriber& sub = client->getSubscriber();
-  Hedwig::Publisher& pub = client->getPublisher();
-
-  sub.subscribe(topic, sid, Hedwig::SubscribeRequest::CREATE_OR_ATTACH);
-  PubSubMessageHandlerCallback* cb = new PubSubMessageHandlerCallback(topic, sid);
-  Hedwig::MessageHandlerCallbackPtr handler(cb);
-
-  sub.startDelivery(topic, sid, handler);
-
-  char buf[BIG_MESSAGE_SIZE];
-  std::string bigmessage(buf, BIG_MESSAGE_SIZE);
-  pub.publish(topic, bigmessage);
-  pub.publish(topic, "Test Message 1");
-  bool pass = false;
-  for (int i = 0; i < 10; i++) {
-    sleep(3);
-    if (cb->numMessagesReceived() > 0) {
-      if (cb->getLastMessage() == "Test Message 1") {
-	pass = true;
-	break;
-      }
-    }
-  }
-  ASSERT_TRUE(pass);
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/cpp/test/subscribetest.cpp
----------------------------------------------------------------------
diff --git a/hedwig-client/src/main/cpp/test/subscribetest.cpp b/hedwig-client/src/main/cpp/test/subscribetest.cpp
deleted file mode 100644
index 3ee736a..0000000
--- a/hedwig-client/src/main/cpp/test/subscribetest.cpp
+++ /dev/null
@@ -1,253 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifdef HAVE_CONFIG_H
-#include <config.h>
-#endif
-
-#include "gtest/gtest.h"
-
-#include "../lib/clientimpl.h"
-#include <hedwig/exceptions.h>
-#include <hedwig/callback.h>
-#include <stdexcept>
-#include <pthread.h>
-
-#include <log4cxx/logger.h>
-
-#include "util.h"
-
-static log4cxx::LoggerPtr logger(log4cxx::Logger::getLogger("hedwig."__FILE__));
-
-TEST(SubscribeTest, testSyncSubscribe) {
-  Hedwig::Configuration* conf = new TestServerConfiguration();
-  std::auto_ptr<Hedwig::Configuration> confptr(conf);
-    
-  Hedwig::Client* client = new Hedwig::Client(*conf);
-  std::auto_ptr<Hedwig::Client> clientptr(client);
-
-  Hedwig::Subscriber& sub = client->getSubscriber();
-    
-  sub.subscribe("testTopic", "mySubscriberId-1", Hedwig::SubscribeRequest::CREATE_OR_ATTACH);
-}
-
-TEST(SubscribeTest, testSyncSubscribeAttach) {
-  Hedwig::Configuration* conf = new TestServerConfiguration();
-  std::auto_ptr<Hedwig::Configuration> confptr(conf);
-
-  Hedwig::Client* client = new Hedwig::Client(*conf);
-  std::auto_ptr<Hedwig::Client> clientptr(client);
-
-  Hedwig::Subscriber& sub = client->getSubscriber();
-    
-  ASSERT_THROW(sub.subscribe("iAmATopicWhoDoesNotExist", "mySubscriberId-2", Hedwig::SubscribeRequest::ATTACH), Hedwig::ClientException);
-}
-
-TEST(SubscribeTest, testAsyncSubscribe) {
-  SimpleWaitCondition* cond1 = new SimpleWaitCondition();
-  std::auto_ptr<SimpleWaitCondition> cond1ptr(cond1);
-
-  Hedwig::Configuration* conf = new TestServerConfiguration();
-  std::auto_ptr<Hedwig::Configuration> confptr(conf);
-
-  Hedwig::Client* client = new Hedwig::Client(*conf);
-  std::auto_ptr<Hedwig::Client> clientptr(client);
-
-  Hedwig::Subscriber& sub = client->getSubscriber();
-   
-  Hedwig::OperationCallbackPtr testcb1(new TestCallback(cond1));
-
-  sub.asyncSubscribe("testTopic", "mySubscriberId-3", Hedwig::SubscribeRequest::CREATE_OR_ATTACH, testcb1);
-    
-  cond1->wait();
-  ASSERT_TRUE(cond1->wasSuccess());
-}
-  
-TEST(SubscribeTest, testAsyncSubcribeAndUnsubscribe) {
-  SimpleWaitCondition* cond1 = new SimpleWaitCondition();
-  std::auto_ptr<SimpleWaitCondition> cond1ptr(cond1);
-  SimpleWaitCondition* cond2 = new SimpleWaitCondition();
-  std::auto_ptr<SimpleWaitCondition> cond2ptr(cond2);
-
-  Hedwig::Configuration* conf = new TestServerConfiguration();
-  std::auto_ptr<Hedwig::Configuration> confptr(conf);
-
-  Hedwig::Client* client = new Hedwig::Client(*conf);
-  std::auto_ptr<Hedwig::Client> clientptr(client);
-
-  Hedwig::Subscriber& sub = client->getSubscriber();
-   
-  Hedwig::OperationCallbackPtr testcb1(new TestCallback(cond1));
-  Hedwig::OperationCallbackPtr testcb2(new TestCallback(cond2));
-
-  sub.asyncSubscribe("testTopic", "mySubscriberId-4", Hedwig::SubscribeRequest::CREATE_OR_ATTACH, testcb1);
-  cond1->wait();
-  ASSERT_TRUE(cond1->wasSuccess());
-    
-  sub.asyncUnsubscribe("testTopic", "mySubscriberId-4", testcb2);
-  cond2->wait();
-  ASSERT_TRUE(cond2->wasSuccess());
-}
-
-TEST(SubscribeTest, testAsyncSubcribeAndSyncUnsubscribe) {
-  SimpleWaitCondition* cond1 = new SimpleWaitCondition();
-  std::auto_ptr<SimpleWaitCondition> cond1ptr(cond1);
-
-  Hedwig::Configuration* conf = new TestServerConfiguration();
-  std::auto_ptr<Hedwig::Configuration> confptr(conf);
-
-  Hedwig::Client* client = new Hedwig::Client(*conf);
-  std::auto_ptr<Hedwig::Client> clientptr(client);
-
-  Hedwig::Subscriber& sub = client->getSubscriber();
-   
-  Hedwig::OperationCallbackPtr testcb1(new TestCallback(cond1));
-    
-  sub.asyncSubscribe("testTopic", "mySubscriberId-5", Hedwig::SubscribeRequest::CREATE_OR_ATTACH, testcb1);
-  cond1->wait();
-  ASSERT_TRUE(cond1->wasSuccess());
-
-  sub.unsubscribe("testTopic", "mySubscriberId-5");
-}
-
-TEST(SubscribeTest, testAsyncSubcribeCloseSubscriptionAndThenResubscribe) {
-  Hedwig::Configuration* conf = new TestServerConfiguration();
-  std::auto_ptr<Hedwig::Configuration> confptr(conf);
-
-  Hedwig::Client* client = new Hedwig::Client(*conf);
-  std::auto_ptr<Hedwig::Client> clientptr(client);
-
-  Hedwig::Subscriber& sub = client->getSubscriber();
-   
-  sub.subscribe("testTopic", "mySubscriberId-6", Hedwig::SubscribeRequest::CREATE_OR_ATTACH);
-  sub.closeSubscription("testTopic", "mySubscriberId-6");
-  sub.subscribe("testTopic", "mySubscriberId-6", Hedwig::SubscribeRequest::CREATE_OR_ATTACH);
-  sub.unsubscribe("testTopic", "mySubscriberId-6");
-}
-
-TEST(SubscribeTest, testUnsubscribeWithoutSubscribe) {
-  Hedwig::Configuration* conf = new TestServerConfiguration();
-  std::auto_ptr<Hedwig::Configuration> confptr(conf);
-    
-  Hedwig::Client* client = new Hedwig::Client(*conf);
-  std::auto_ptr<Hedwig::Client> clientptr(client);
-
-  Hedwig::Subscriber& sub = client->getSubscriber();
-    
-  ASSERT_THROW(sub.unsubscribe("testTopic", "mySubscriberId-7"), Hedwig::NotSubscribedException);
-}
-
-TEST(SubscribeTest, testAsyncSubscribeTwice) {
-  Hedwig::Configuration* conf = new TestServerConfiguration();
-  std::auto_ptr<Hedwig::Configuration> confptr(conf);
-  
-  Hedwig::Client* client = new Hedwig::Client(*conf);
-  std::auto_ptr<Hedwig::Client> clientptr(client);
-
-  Hedwig::Subscriber& sub = client->getSubscriber();
-
-  SimpleWaitCondition* cond1 = new SimpleWaitCondition();
-  std::auto_ptr<SimpleWaitCondition> cond1ptr(cond1);
-  SimpleWaitCondition* cond2 = new SimpleWaitCondition();
-  std::auto_ptr<SimpleWaitCondition> cond2ptr(cond2);
-  
-  Hedwig::OperationCallbackPtr testcb1(new TestCallback(cond1));
-  Hedwig::OperationCallbackPtr testcb2(new TestCallback(cond2));
-
-  std::string topic("testAsyncSubscribeTwice");
-  std::string subid("mysubid");
-
-  sub.asyncSubscribe(topic, subid,
-                     Hedwig::SubscribeRequest::CREATE_OR_ATTACH, testcb1);
-  sub.asyncSubscribe(topic, subid,
-                     Hedwig::SubscribeRequest::CREATE_OR_ATTACH, testcb2);
-  cond1->wait();
-  cond2->wait();
-
-  if (cond1->wasSuccess()) {
-    ASSERT_TRUE(!cond2->wasSuccess());
-  } else {
-    ASSERT_TRUE(cond2->wasSuccess());
-  }
-}
-
-TEST(SubscribeTest, testSubscribeTwice) {
-  Hedwig::Configuration* conf = new TestServerConfiguration();
-  std::auto_ptr<Hedwig::Configuration> confptr(conf);
-    
-  Hedwig::Client* client = new Hedwig::Client(*conf);
-  std::auto_ptr<Hedwig::Client> clientptr(client);
-
-  Hedwig::Subscriber& sub = client->getSubscriber();
-    
-  sub.subscribe("testTopic", "mySubscriberId-8", Hedwig::SubscribeRequest::CREATE_OR_ATTACH);
-  ASSERT_THROW(sub.subscribe("testTopic", "mySubscriberId-8", Hedwig::SubscribeRequest::CREATE_OR_ATTACH), Hedwig::AlreadySubscribedException);
-}
-
-TEST(SubscribeTest, testAsyncSubcribeForceAttach) {
-  Hedwig::Configuration* conf = new TestServerConfiguration();
-  std::auto_ptr<Hedwig::Configuration> confptr(conf);
-  // client 1
-  Hedwig::Client* client1 = new Hedwig::Client(*conf);
-  std::auto_ptr<Hedwig::Client> client1ptr(client1);
-  Hedwig::Subscriber& sub1 = client1->getSubscriber();
-  // client 2
-  Hedwig::Client* client2 = new Hedwig::Client(*conf);
-  std::auto_ptr<Hedwig::Client> client2ptr(client2);
-  Hedwig::Subscriber& sub2 = client2->getSubscriber();
-
-  SimpleWaitCondition* cond1 = new SimpleWaitCondition();
-  std::auto_ptr<SimpleWaitCondition> cond1ptr(cond1);
-  Hedwig::OperationCallbackPtr testcb1(new TestCallback(cond1));
-
-  SimpleWaitCondition* lcond1 = new SimpleWaitCondition();
-  std::auto_ptr<SimpleWaitCondition> lcond1ptr(lcond1);
-  Hedwig::SubscriptionListenerPtr listener1(
-    new TestSubscriptionListener(lcond1, Hedwig::SUBSCRIPTION_FORCED_CLOSED));
-
-  Hedwig::SubscriptionOptions options;
-  options.set_createorattach(Hedwig::SubscribeRequest::CREATE_OR_ATTACH);
-  options.set_forceattach(true);
-  options.set_enableresubscribe(false);
-
-  sub1.addSubscriptionListener(listener1);
-
-  sub1.asyncSubscribe("asyncSubscribeForceAttach", "mysub",
-                      options, testcb1);
-  cond1->wait();
-  ASSERT_TRUE(cond1->wasSuccess());
-
-  // sub2 subscribe would force close the channel of sub1
-  SimpleWaitCondition* cond2 = new SimpleWaitCondition();
-  std::auto_ptr<SimpleWaitCondition> cond2ptr(cond2);
-  Hedwig::OperationCallbackPtr testcb2(new TestCallback(cond2));
-
-  Hedwig::SubscriptionListenerPtr listener2(
-    new TestSubscriptionListener(0, Hedwig::SUBSCRIPTION_FORCED_CLOSED));
-
-  sub2.addSubscriptionListener(listener2);
-
-  sub2.asyncSubscribe("asyncSubscribeForceAttach", "mysub",
-                      options, testcb2);
-  cond2->wait();
-  ASSERT_TRUE(cond2->wasSuccess());
-
-  // sub1 would receive the disconnect event
-  lcond1->wait();
-
-  sub1.unsubscribe("asyncSubscribeForceAttach", "mysub");
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/cpp/test/test.sh
----------------------------------------------------------------------
diff --git a/hedwig-client/src/main/cpp/test/test.sh b/hedwig-client/src/main/cpp/test/test.sh
deleted file mode 100644
index c75bc3f..0000000
--- a/hedwig-client/src/main/cpp/test/test.sh
+++ /dev/null
@@ -1,21 +0,0 @@
-#!/bin/sh
-
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#   
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-export LD_LIBRARY_PATH=/usr/lib/jvm/java-6-sun/jre/lib/i386/server/:/usr/lib/jvm/java-6-sun/jre/lib/i386/
-export CLASSPATH=$HOME/src/hedwig/server/target/test-classes:$HOME/src/hedwig/server/lib/bookkeeper-SNAPSHOT.jar:$HOME/src/hedwig/server/lib/zookeeper-SNAPSHOT.jar:$HOME/src/hedwig/server/target/classes:$HOME/src/hedwig/protocol/target/classes:$HOME/src/hedwig/client/target/classes:$HOME/.m2/repository/commons-configuration/commons-configuration/1.6/commons-configuration-1.6.jar:$HOME/.m2/repository/org/jboss/netty/netty/3.1.2.GA/netty-3.1.2.GA.jar:$HOME/.m2/repository/commons-lang/commons-lang/2.4/commons-lang-2.4.jar:$HOME/.m2/repository/commons-collections/commons-collections/3.2.1/commons-collections-3.2.1.jar:$HOME/.m2/repository/commons-logging/commons-logging/1.1.1/commons-logging-1.1.1.jar:$HOME/.m2/repository/com/google/protobuf/protobuf-java/2.3.0/protobuf-java-2.3.0.jar:$HOME/.m2/repository/log4j/log4j/1.2.14/log4j-1.2.14.jar:$HOME/src/hedwig/client/target/classes/
-
-./hedwigtest
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/cpp/test/throttledeliverytest.cpp
----------------------------------------------------------------------
diff --git a/hedwig-client/src/main/cpp/test/throttledeliverytest.cpp b/hedwig-client/src/main/cpp/test/throttledeliverytest.cpp
deleted file mode 100644
index 9dd0d9f..0000000
--- a/hedwig-client/src/main/cpp/test/throttledeliverytest.cpp
+++ /dev/null
@@ -1,159 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifdef HAVE_CONFIG_H
-#include <config.h>
-#endif
-
-#include "gtest/gtest.h"
-
-#include "../lib/clientimpl.h"
-#include <hedwig/exceptions.h>
-#include <hedwig/callback.h>
-#include <stdexcept>
-#include <pthread.h>
-
-#include <log4cxx/logger.h>
-
-#include "util.h"
-
-static log4cxx::LoggerPtr logger(log4cxx::Logger::getLogger("hedwig."__FILE__));
-
-class ThrottleDeliveryConfiguration : public TestServerConfiguration {
-public:
-  ThrottleDeliveryConfiguration() : TestServerConfiguration() {}
-  
-  virtual bool getBool(const std::string& key, bool defaultVal) const {
-    if (key == Configuration::SUBSCRIBER_AUTOCONSUME) {
-      return false;
-    } else {
-      return TestServerConfiguration::getBool(key, defaultVal);
-    }
-  }
-};
-    
-class ThrottleDeliveryMessageHandlerCallback : public Hedwig::MessageHandlerCallback {
-public:
-  ThrottleDeliveryMessageHandlerCallback(Hedwig::Subscriber& sub,
-                                         const int start, const int end,
-                                         const int expectedToThrottle,
-                                         SimpleWaitCondition& throttleLatch,
-                                         SimpleWaitCondition& nonThrottleLatch)
-    : sub(sub), next(start), end(end), expectedToThrottle(expectedToThrottle),
-      throttleLatch(throttleLatch), nonThrottleLatch(nonThrottleLatch) {
-  }
-
-  virtual void consume(const std::string& topic, const std::string& subscriberId,
-                       const Hedwig::Message& msg, Hedwig::OperationCallbackPtr& callback) {
-    const int value = atoi(msg.body().c_str());
-    LOG4CXX_DEBUG(logger, "received message " << value);
-    boost::lock_guard<boost::mutex> lock(mutex);
-    if (value == next) {
-      ++next;
-    } else {
-      LOG4CXX_ERROR(logger, "Did not receive expected value " << next << ", got " << value);
-      next = 0;
-      throttleLatch.setSuccess(false);
-      throttleLatch.notify();
-      nonThrottleLatch.setSuccess(false);
-      nonThrottleLatch.notify();
-    }
-    if (next == expectedToThrottle + 2) {
-      throttleLatch.setSuccess(true);
-      throttleLatch.notify();
-    } else if (next == end + 1) {
-      nonThrottleLatch.setSuccess(true);
-      nonThrottleLatch.notify();
-    }
-    callback->operationComplete();
-    if (next > expectedToThrottle + 1) {
-      sub.consume(topic, subscriberId, msg.msgid());
-    }
-  }
-
-  int nextExpected() {
-    boost::lock_guard<boost::mutex> lock(mutex);
-    return next;
-  }
-
-protected:
-  Hedwig::Subscriber& sub;
-  boost::mutex mutex;
-  int next;
-  const int end;
-  const int expectedToThrottle;
-  SimpleWaitCondition& throttleLatch;
-  SimpleWaitCondition& nonThrottleLatch;
-};
-
-void throttleX(Hedwig::Publisher& pub, Hedwig::Subscriber& sub,
-               const std::string& topic, const std::string& subid, int X) {
-  for (int i = 1; i <= 3*X; i++) {
-    std::stringstream oss;
-    oss << i;
-    pub.publish(topic, oss.str());
-  }
-
-  sub.subscribe(topic, subid, Hedwig::SubscribeRequest::ATTACH);
-
-  SimpleWaitCondition throttleLatch, nonThrottleLatch;
-
-  ThrottleDeliveryMessageHandlerCallback* cb =
-    new ThrottleDeliveryMessageHandlerCallback(sub, 1, 3*X, X, throttleLatch,
-                                               nonThrottleLatch);
-  Hedwig::MessageHandlerCallbackPtr handler(cb);
-  sub.startDelivery(topic, subid, handler);
-
-  throttleLatch.timed_wait(3000);
-  ASSERT_TRUE(!throttleLatch.wasSuccess());
-  ASSERT_EQ(X + 1, cb->nextExpected());
-
-  // consume messages to not throttle it
-  for (int i=1; i<=X; i++) {
-    Hedwig::MessageSeqId msgid;
-    msgid.set_localcomponent(i);
-    sub.consume(topic, subid, msgid);
-  }
-
-  nonThrottleLatch.timed_wait(10000);
-  ASSERT_TRUE(nonThrottleLatch.wasSuccess());
-  ASSERT_EQ(3*X + 1, cb->nextExpected());
-
-  sub.stopDelivery(topic, subid);
-  sub.closeSubscription(topic, subid);
-}
-
-TEST(ThrottleDeliveryTest, testThrottleDelivery) {
-  Hedwig::Configuration* conf = new ThrottleDeliveryConfiguration();
-  std::auto_ptr<Hedwig::Configuration> confptr(conf);
-  
-  Hedwig::Client* client = new Hedwig::Client(*conf);
-  std::auto_ptr<Hedwig::Client> clientptr(client);
-
-  Hedwig::Subscriber& sub = client->getSubscriber();
-  Hedwig::Publisher& pub = client->getPublisher();
-
-  int throttleValue = 10;
-  std::string topic = "testThrottleDelivery";
-  std::string subid = "testSubId";
-  Hedwig::SubscriptionOptions options;
-  options.set_createorattach(Hedwig::SubscribeRequest::CREATE_OR_ATTACH);
-  options.set_messagewindowsize(throttleValue);
-  sub.subscribe(topic, subid, options);
-  sub.closeSubscription(topic, subid);
-  throttleX(pub, sub, topic, subid, throttleValue);
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/cpp/test/util.h
----------------------------------------------------------------------
diff --git a/hedwig-client/src/main/cpp/test/util.h b/hedwig-client/src/main/cpp/test/util.h
deleted file mode 100644
index dd5b5bf..0000000
--- a/hedwig-client/src/main/cpp/test/util.h
+++ /dev/null
@@ -1,201 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include "../lib/clientimpl.h"
-#include <hedwig/exceptions.h>
-#include <hedwig/callback.h>
-#include <stdexcept>
-#include <pthread.h>
-#include <boost/thread/mutex.hpp>
-#include <boost/thread/condition_variable.hpp>
-
-#include <log4cxx/logger.h>
-
-static log4cxx::LoggerPtr utillogger(log4cxx::Logger::getLogger("hedwig."__FILE__));
-
-class SimpleWaitCondition {
-public:
- SimpleWaitCondition() : flag(false), success(false) {};
-  ~SimpleWaitCondition() {}
-
-  void wait() {
-    boost::unique_lock<boost::mutex> lock(mut);
-    while(!flag)
-    {
-        cond.wait(lock);
-    }
-  }
-
-  void timed_wait(uint64_t milliseconds) {
-    boost::mutex::scoped_lock lock(mut);
-    if (!flag) {
-      LOG4CXX_DEBUG(utillogger, "wait for " << milliseconds << " ms.");
-      if (!cond.timed_wait(lock, boost::posix_time::milliseconds(milliseconds))) {
-        LOG4CXX_DEBUG(utillogger, "Timeout wait for " << milliseconds << " ms.");
-      }
-    }
-  }
-
-  void notify() {
-    {
-      boost::lock_guard<boost::mutex> lock(mut);
-      flag = true;
-    }
-    cond.notify_all();
-  }
-
-  void setSuccess(bool s) {
-    success = s;
-  }
-
-  bool wasSuccess() {
-    return success;
-  }
-
-private:
-  bool flag;
-  boost::condition_variable cond;
-  boost::mutex mut;
-  bool success;
-};
-
-class TestPublishResponseCallback : public Hedwig::PublishResponseCallback {
-public:
-  TestPublishResponseCallback(SimpleWaitCondition* cond) : cond(cond) {
-  }
-
-  virtual void operationComplete(const Hedwig::PublishResponsePtr & resp) {
-    LOG4CXX_DEBUG(utillogger, "operationComplete");
-    pubResp = resp;
-    cond->setSuccess(true);
-    cond->notify();
-  }
-  
-  virtual void operationFailed(const std::exception& exception) {
-    LOG4CXX_DEBUG(utillogger, "operationFailed: " << exception.what());
-    cond->setSuccess(false);
-    cond->notify();
-  }    
-
-  Hedwig::PublishResponsePtr getResponse() {
-    return pubResp;
-  }
-private:
-  SimpleWaitCondition *cond;
-  Hedwig::PublishResponsePtr pubResp;
-};
-
-class TestCallback : public Hedwig::OperationCallback {
-public:
-  TestCallback(SimpleWaitCondition* cond) 
-    : cond(cond) {
-  }
-
-  virtual void operationComplete() {
-    LOG4CXX_DEBUG(utillogger, "operationComplete");
-    cond->setSuccess(true);
-    cond->notify();
-
-  }
-  
-  virtual void operationFailed(const std::exception& exception) {
-    LOG4CXX_DEBUG(utillogger, "operationFailed: " << exception.what());
-    cond->setSuccess(false);
-    cond->notify();
-  }    
-  
-
-private:
-  SimpleWaitCondition *cond;
-};
-
-class TestSubscriptionListener : public Hedwig::SubscriptionListener {
-public:
-  TestSubscriptionListener(SimpleWaitCondition* cond,
-                           const Hedwig::SubscriptionEvent event)
-    : cond(cond), expectedEvent(event) {
-    LOG4CXX_DEBUG(utillogger, "Created TestSubscriptionListener " << this);
-  }
-
-  virtual ~TestSubscriptionListener() {}
-
-  virtual void processEvent(const std::string& topic, const std::string& subscriberId,
-                            const Hedwig::SubscriptionEvent event) {
-    LOG4CXX_DEBUG(utillogger, "Received event " << event << " for (topic:" << topic
-                              << ", subscriber:" << subscriberId << ") from listener " << this);
-    if (expectedEvent == event) {
-      if (cond) {
-        cond->setSuccess(true);
-        cond->notify();
-      }
-    }
-  }
-
-private:
-  SimpleWaitCondition *cond;
-  const Hedwig::SubscriptionEvent expectedEvent;
-};
-
-class TestServerConfiguration : public Hedwig::Configuration {
-public:
-  TestServerConfiguration() : address("localhost:4081:9877"),
-                              syncTimeout(10000), numThreads(2) {}
-
-  TestServerConfiguration(std::string& defaultServer) :
-    address(defaultServer), syncTimeout(10000), numThreads(2) {}
-
-  TestServerConfiguration(int syncTimeout, int numThreads = 2)
-    : address("localhost:4081:9877"), syncTimeout(syncTimeout), numThreads(numThreads) {}
-  
-  virtual int getInt(const std::string& key, int defaultVal) const {
-    if (key == Configuration::SYNC_REQUEST_TIMEOUT) {
-      return syncTimeout;
-    } else if (key == Configuration::NUM_DISPATCH_THREADS) {
-      return numThreads;
-    }
-    return defaultVal;
-  }
-
-  virtual const std::string get(const std::string& key, const std::string& defaultVal) const {
-    if (key == Configuration::DEFAULT_SERVER) {
-      return address;
-    } else if (key == Configuration::SSL_PEM_FILE) {
-      return certFile;
-    } else {
-      return defaultVal;
-    }
-  }
-
-  virtual bool getBool(const std::string& key, bool defaultVal) const {
-    if (key == Configuration::SSL_ENABLED) {
-      return isSSL;
-    } else if (key == Configuration::SUBSCRIPTION_CHANNEL_SHARING_ENABLED) {    
-      return multiplexing;
-    }
-    return defaultVal;
-  }
-public:
-  // for testing
-  static bool isSSL;
-  static std::string certFile;
-  static bool multiplexing;
-private:
-  const std::string address;
-  const int syncTimeout;
-  const int numThreads;
-};
-

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/cpp/test/utiltest.cpp
----------------------------------------------------------------------
diff --git a/hedwig-client/src/main/cpp/test/utiltest.cpp b/hedwig-client/src/main/cpp/test/utiltest.cpp
deleted file mode 100644
index e5b6d75..0000000
--- a/hedwig-client/src/main/cpp/test/utiltest.cpp
+++ /dev/null
@@ -1,74 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifdef HAVE_CONFIG_H
-#include <config.h>
-#endif
-
-#include "gtest/gtest.h"
-
-#include "../lib/util.h"
-#include <hedwig/exceptions.h>
-#include <stdexcept>
-
-TEST(UtilTest, testHostAddress) {
-  // good address (no ports)
-  Hedwig::HostAddress a1 = Hedwig::HostAddress::fromString("www.yahoo.com");
-  ASSERT_TRUE(a1.port() == 4080);
-
-  // good address with ip (no ports)
-  Hedwig::HostAddress a2 = Hedwig::HostAddress::fromString("127.0.0.1");
-  ASSERT_TRUE(a2.port() == 4080);
-  ASSERT_TRUE(a2.ip() == ((127 << 24) | 1));
-
-  // good address
-  Hedwig::HostAddress a3 = Hedwig::HostAddress::fromString("www.yahoo.com:80");
-  ASSERT_TRUE(a3.port() == 80);
-
-  // good address with ip
-  Hedwig::HostAddress a4 = Hedwig::HostAddress::fromString("127.0.0.1:80");
-  ASSERT_TRUE(a4.port() == 80);
-  ASSERT_TRUE(a4.ip() == ((127 << 24) | 1));
-
-  // good address (with ssl)
-  Hedwig::HostAddress a5 = Hedwig::HostAddress::fromString("www.yahoo.com:80:443");
-  ASSERT_TRUE(a5.port() == 80);
-
-  // good address with ip
-  Hedwig::HostAddress a6 = Hedwig::HostAddress::fromString("127.0.0.1:80:443");
-  ASSERT_TRUE(a6.port() == 80);
-  ASSERT_TRUE(a6.ip() == ((127 << 24) | 1));
-
-  // nothing
-  ASSERT_THROW(Hedwig::HostAddress::fromString(""), Hedwig::HostResolutionException);
-    
-  // nothing but colons
-  ASSERT_THROW(Hedwig::HostAddress::fromString("::::::::::::::::"), Hedwig::ConfigurationException);
-    
-  // only port number
-  ASSERT_THROW(Hedwig::HostAddress::fromString(":80"), Hedwig::HostResolutionException);
- 
-  // text after colon (isn't supported)
-  ASSERT_THROW(Hedwig::HostAddress::fromString("www.yahoo.com:http"), Hedwig::ConfigurationException);
-    
-  // invalid hostname
-  ASSERT_THROW(Hedwig::HostAddress::fromString("com.oohay.www:80"), Hedwig::HostResolutionException);
-    
-  // null
-  ASSERT_THROW(Hedwig::HostAddress::fromString(NULL), std::logic_error);
-}
-

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/java/org/apache/hedwig/client/HedwigClient.java
----------------------------------------------------------------------
diff --git a/hedwig-client/src/main/java/org/apache/hedwig/client/HedwigClient.java b/hedwig-client/src/main/java/org/apache/hedwig/client/HedwigClient.java
deleted file mode 100644
index 4092a47..0000000
--- a/hedwig-client/src/main/java/org/apache/hedwig/client/HedwigClient.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.client;
-
-import org.apache.hedwig.client.api.Client;
-import org.apache.hedwig.client.api.Publisher;
-import org.apache.hedwig.client.api.Subscriber;
-import org.apache.hedwig.client.netty.HedwigClientImpl;
-
-import org.apache.hedwig.client.conf.ClientConfiguration;
-import org.jboss.netty.channel.ChannelFactory;
-
-/**
- * Hedwig client uses as starting point for all communications with the Hedwig service.
- * 
- * @see Publisher
- * @see Subscriber
- */
-public class HedwigClient implements Client {
-    private final Client impl;
-
-    /**
-     * Construct a hedwig client object. The configuration object
-     * should be an instance of a class which implements ClientConfiguration.
-     *
-     * @param cfg The client configuration.
-     */
-    public HedwigClient(ClientConfiguration cfg) {
-        impl = HedwigClientImpl.create(cfg);
-    }
-
-    /**
-     * Construct a hedwig client object, using a preexisting socket factory.
-     * This is useful if you need to create many hedwig client instances.
-     *
-     * @param cfg The client configuration
-     * @param socketFactory A netty socket factory.
-     */
-    public HedwigClient(ClientConfiguration cfg, ChannelFactory socketFactory) {
-        impl = HedwigClientImpl.create(cfg, socketFactory);
-    }
-
-    @Override
-    public Publisher getPublisher() {
-        return impl.getPublisher();
-    }
-
-    @Override
-    public Subscriber getSubscriber() {
-        return impl.getSubscriber();
-    }
-
-    @Override
-    public void close() {
-        impl.close();
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/java/org/apache/hedwig/client/api/Client.java
----------------------------------------------------------------------
diff --git a/hedwig-client/src/main/java/org/apache/hedwig/client/api/Client.java b/hedwig-client/src/main/java/org/apache/hedwig/client/api/Client.java
deleted file mode 100644
index 891148f..0000000
--- a/hedwig-client/src/main/java/org/apache/hedwig/client/api/Client.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.client.api;
-
-/**
- * Interface defining the client API for Hedwig
- */
-public interface Client {
-    /**
-     * Retrieve the Publisher object for the client.
-     * This object can be used to publish messages to a topic on Hedwig.
-     * @see Publisher
-     */
-    public Publisher getPublisher();
-    
-    /**
-     * Retrieve the Subscriber object for the client.
-     * This object can be used to subscribe for messages from a topic.
-     * @see Subscriber
-     */
-    public Subscriber getSubscriber();
-
-    /**
-     * Close the client and free all associated resources.
-     */
-    public void close();
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/java/org/apache/hedwig/client/api/MessageHandler.java
----------------------------------------------------------------------
diff --git a/hedwig-client/src/main/java/org/apache/hedwig/client/api/MessageHandler.java b/hedwig-client/src/main/java/org/apache/hedwig/client/api/MessageHandler.java
deleted file mode 100644
index f312a36..0000000
--- a/hedwig-client/src/main/java/org/apache/hedwig/client/api/MessageHandler.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.client.api;
-
-import com.google.protobuf.ByteString;
-import org.apache.hedwig.protocol.PubSubProtocol.Message;
-import org.apache.hedwig.util.Callback;
-
-/**
- * Interface to define the client handler logic to deliver messages it is
- * subscribed to.
- *
- */
-public interface MessageHandler {
-
-    /**
-     * Delivers a message which has been published for topic. 
-     *
-     * @param topic
-     *            The topic name where the message came from.
-     * @param subscriberId
-     *            ID of the subscriber.
-     * @param msg
-     *            The message object to deliver.
-     * @param callback
-     *            Callback to invoke when the message delivery has been done.
-     * @param context
-     *            Calling context that the Callback needs since this is done
-     *            asynchronously.
-     */
-    public void deliver(ByteString topic, ByteString subscriberId, Message msg, Callback<Void> callback, Object context);
-
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/java/org/apache/hedwig/client/api/Publisher.java
----------------------------------------------------------------------
diff --git a/hedwig-client/src/main/java/org/apache/hedwig/client/api/Publisher.java b/hedwig-client/src/main/java/org/apache/hedwig/client/api/Publisher.java
deleted file mode 100644
index a4fdb04..0000000
--- a/hedwig-client/src/main/java/org/apache/hedwig/client/api/Publisher.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.client.api;
-
-import com.google.protobuf.ByteString;
-import org.apache.hedwig.exceptions.PubSubException.CouldNotConnectException;
-import org.apache.hedwig.exceptions.PubSubException.ServiceDownException;
-import org.apache.hedwig.protocol.PubSubProtocol;
-import org.apache.hedwig.protocol.PubSubProtocol.Message;
-import org.apache.hedwig.util.Callback;
-
-/**
- * Interface to define the client Publisher API.
- *
- */
-public interface Publisher {
-
-    /**
-     * Publishes a message on the given topic.
-     *
-     * @param topic
-     *            Topic name to publish on
-     * @param msg
-     *            Message object to serialize and publish
-     * @throws CouldNotConnectException
-     *             If we are not able to connect to the server host
-     * @throws ServiceDownException
-     *             If we are unable to publish the message to the topic.
-     * @return The PubSubProtocol.PublishResponse of the publish ... can be used to pick seq-id.
-     */
-    public PubSubProtocol.PublishResponse publish(ByteString topic, Message msg)
-        throws CouldNotConnectException, ServiceDownException;
-
-    /**
-     * Publishes a message asynchronously on the given topic.
-     *
-     * @param topic
-     *            Topic name to publish on
-     * @param msg
-     *            Message object to serialize and publish
-     * @param callback
-     *            Callback to invoke when the publish to the server has actually
-     *            gone through. This will have to deal with error conditions on
-     *            the async publish request.
-     * @param context
-     *            Calling context that the Callback needs since this is done
-     *            asynchronously.
-     */
-    public void asyncPublish(ByteString topic, Message msg, Callback<Void> callback, Object context);
-
-
-  /**
-   * Publishes a message asynchronously on the given topic.
-   * This method, unlike {@link #asyncPublish(ByteString, PubSubProtocol.Message, Callback, Object)},
-   * allows for the callback to retrieve {@link org.apache.hedwig.protocol.PubSubProtocol.PublishResponse}
-   * which was returned by the server.
-   *
-   *
-   *
-   * @param topic
-   *            Topic name to publish on
-   * @param msg
-   *            Message object to serialize and publish
-   * @param callback
-   *            Callback to invoke when the publish to the server has actually
-   *            gone through. This will have to deal with error conditions on
-   *            the async publish request.
-   * @param context
-   *            Calling context that the Callback needs since this is done
-   *            asynchronously.
-   */
-    public void asyncPublishWithResponse(ByteString topic, Message msg,
-                                         Callback<PubSubProtocol.PublishResponse> callback, Object context);
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/java/org/apache/hedwig/client/api/Subscriber.java
----------------------------------------------------------------------
diff --git a/hedwig-client/src/main/java/org/apache/hedwig/client/api/Subscriber.java b/hedwig-client/src/main/java/org/apache/hedwig/client/api/Subscriber.java
deleted file mode 100644
index 7e05c0e..0000000
--- a/hedwig-client/src/main/java/org/apache/hedwig/client/api/Subscriber.java
+++ /dev/null
@@ -1,380 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.client.api;
-
-import java.util.List;
-
-import com.google.protobuf.ByteString;
-import org.apache.hedwig.client.exceptions.AlreadyStartDeliveryException;
-import org.apache.hedwig.client.exceptions.InvalidSubscriberIdException;
-import org.apache.hedwig.exceptions.PubSubException.ClientAlreadySubscribedException;
-import org.apache.hedwig.exceptions.PubSubException.ClientNotSubscribedException;
-import org.apache.hedwig.exceptions.PubSubException.CouldNotConnectException;
-import org.apache.hedwig.exceptions.PubSubException.ServiceDownException;
-import org.apache.hedwig.filter.ClientMessageFilter;
-import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionOptions;
-import org.apache.hedwig.util.Callback;
-import org.apache.hedwig.util.SubscriptionListener;
-
-/**
- * Interface to define the client Subscriber API.
- *
- */
-public interface Subscriber {
-
-    /**
-     * Subscribe to the given topic for the inputted subscriberId.
-     *
-     * @param topic
-     *            Topic name of the subscription
-     * @param subscriberId
-     *            ID of the subscriber
-     * @param mode
-     *            Whether to prohibit, tolerate, or require an existing
-     *            subscription.
-     * @throws CouldNotConnectException
-     *             If we are not able to connect to the server host
-     * @throws ClientAlreadySubscribedException
-     *             If client is already subscribed to the topic
-     * @throws ServiceDownException
-     *             If unable to subscribe to topic
-     * @throws InvalidSubscriberIdException
-     *             If the subscriberId is not valid. We may want to set aside
-     *             certain formats of subscriberId's for different purposes.
-     *             e.g. local vs. hub subscriber
-     * @deprecated As of BookKeeper 4.2.0, replaced by
-     *             {@link Subscriber#subscribe(com.google.protobuf.ByteString,
-     *                                         com.google.protobuf.ByteString,
-     *                                         PubSubProtocol.SubscriptionOptions)}
-     */
-    @Deprecated
-    public void subscribe(ByteString topic, ByteString subscriberId, CreateOrAttach mode)
-            throws CouldNotConnectException, ClientAlreadySubscribedException, ServiceDownException,
-        InvalidSubscriberIdException;
-
-    /**
-     * Subscribe to the given topic asynchronously for the inputted subscriberId
-     * disregarding if the topic has been created yet or not.
-     *
-     * @param topic
-     *            Topic name of the subscription
-     * @param subscriberId
-     *            ID of the subscriber
-     * @param mode
-     *            Whether to prohibit, tolerate, or require an existing
-     *            subscription.
-     * @param callback
-     *            Callback to invoke when the subscribe request to the server
-     *            has actually gone through. This will have to deal with error
-     *            conditions on the async subscribe request.
-     * @param context
-     *            Calling context that the Callback needs since this is done
-     *            asynchronously.
-     * @deprecated As of BookKeeper 4.2.0, replaced by
-     *             {@link Subscriber#asyncSubscribe(com.google.protobuf.ByteString,
-     *                                              com.google.protobuf.ByteString,
-     *                                              PubSubProtocol.SubscriptionOptions,Callback,Object)}
-     */
-    @Deprecated
-    public void asyncSubscribe(ByteString topic, ByteString subscriberId, CreateOrAttach mode, Callback<Void> callback,
-                               Object context);
-
-
-    /**
-     * Subscribe to the given topic for the inputted subscriberId.
-     *
-     * @param topic
-     *            Topic name of the subscription
-     * @param subscriberId
-     *            ID of the subscriber
-     * @param options
-     *            Options to pass to the subscription. See
-     *             {@link Subscriber#asyncSubscribe(com.google.protobuf.ByteString,
-     *                                              com.google.protobuf.ByteString,
-     *                                              PubSubProtocol.SubscriptionOptions,
-     *                                              Callback,Object) asyncSubscribe}
-     *            for details on how to set options.
-     * @throws CouldNotConnectException
-     *             If we are not able to connect to the server host
-     * @throws ClientAlreadySubscribedException
-     *             If client is already subscribed to the topic
-     * @throws ServiceDownException
-     *             If unable to subscribe to topic
-     * @throws InvalidSubscriberIdException
-     *             If the subscriberId is not valid. We may want to set aside
-     *             certain formats of subscriberId's for different purposes.
-     *             e.g. local vs. hub subscriber
-     */
-    public void subscribe(ByteString topic, ByteString subscriberId, SubscriptionOptions options)
-            throws CouldNotConnectException, ClientAlreadySubscribedException, ServiceDownException,
-        InvalidSubscriberIdException;
-
-    /**
-     * <p>Subscribe to the given topic asynchronously for the inputted subscriberId.</p>
-     *
-     * <p>SubscriptionOptions contains parameters for how the hub should make the subscription.
-     * The options includes createorattach mode, message bound and message filter.</p>
-     *
-     * <p>The createorattach mode defines whether the subscription should create a new subscription, or
-     * just attach to a preexisting subscription. If it tries to create the subscription, and the
-     * subscription already exists, then an error will occur.</p>
-     *
-     * <p>The message bound defines the maximum number of undelivered messages which will be stored
-     * for the subscription. This can be used to ensure that unused subscriptions do not grow
-     * in an unbounded fashion. By default, the message bound is infinite, i.e. all undelivered messages
-     * will be stored for the subscription. Note that if one subscription on a topic has a infinite
-     * message bound, the message bound for all other subscriptions on that topic will effectively be
-     * infinite as the messages have to be stored for the first subscription in any case. </p>
-     *
-     * <p>The message filter defines a {@link org.apache.hedwig.filter.ServerMessageFilter}
-     * run in hub server to filter messages delivered to the subscription. The server message
-     * filter should be placed in the classpath of hub server before using it.</p>
-     *
-     * All these subscription options would be stored as SubscriptionPreferences in metadata
-     * manager. The next time subscriber attached with difference options, the new options would
-     * overwrite the old options.
-     *
-     * Usage is as follows:
-     * <pre>
-     * {@code
-     * // create a new subscription with a message bound of 5
-     * SubscriptionOptions options = SubscriptionOptions.newBuilder()
-     *     .setCreateOrAttach(CreateOrAttach.CREATE).setMessageBound(5).build();
-     * client.getSubscriber().asyncSubscribe(ByteString.copyFromUtf8("myTopic"),
-     *                                       ByteString.copyFromUtf8("mySubscription"),
-     *                                       options,
-     *                                       myCallback,
-     *                                       myContext);
-     * }
-     * </pre>
-     * @param topic
-     *            Topic name of the subscription
-     * @param subscriberId
-     *            ID of the subscriber
-     * @param options
-     *            Options to pass to the subscription.
-     * @param callback
-     *            Callback to invoke when the subscribe request to the server
-     *            has actually gone through. This will have to deal with error
-     *            conditions on the async subscribe request.
-     * @param context
-     *            Calling context that the Callback needs since this is done
-     *            asynchronously.
-     */
-    public void asyncSubscribe(ByteString topic, ByteString subscriberId, SubscriptionOptions options,
-                               Callback<Void> callback, Object context);
-
-    /**
-     * Unsubscribe from a topic that the subscriberId user has previously
-     * subscribed to.
-     *
-     * @param topic
-     *            Topic name of the subscription
-     * @param subscriberId
-     *            ID of the subscriber
-     * @throws CouldNotConnectException
-     *             If we are not able to connect to the server host
-     * @throws ClientNotSubscribedException
-     *             If the client is not currently subscribed to the topic
-     * @throws ServiceDownException
-     *             If the server was down and unable to complete the request
-     * @throws InvalidSubscriberIdException
-     *             If the subscriberId is not valid. We may want to set aside
-     *             certain formats of subscriberId's for different purposes.
-     *             e.g. local vs. hub subscriber
-     */
-    public void unsubscribe(ByteString topic, ByteString subscriberId) throws CouldNotConnectException,
-        ClientNotSubscribedException, ServiceDownException, InvalidSubscriberIdException;
-
-    /**
-     * Unsubscribe from a topic asynchronously that the subscriberId user has
-     * previously subscribed to.
-     *
-     * @param topic
-     *            Topic name of the subscription
-     * @param subscriberId
-     *            ID of the subscriber
-     * @param callback
-     *            Callback to invoke when the unsubscribe request to the server
-     *            has actually gone through. This will have to deal with error
-     *            conditions on the async unsubscribe request.
-     * @param context
-     *            Calling context that the Callback needs since this is done
-     *            asynchronously.
-     */
-    public void asyncUnsubscribe(ByteString topic, ByteString subscriberId, Callback<Void> callback, Object context);
-
-    /**
-     * Manually send a consume message to the server for the given inputs.
-     *
-     * @param topic
-     *            Topic name of the subscription
-     * @param subscriberId
-     *            ID of the subscriber
-     * @param messageSeqId
-     *            Message Sequence ID for the latest message that the client app
-     *            has successfully consumed. All messages up to that point will
-     *            also be considered as consumed.
-     * @throws ClientNotSubscribedException
-     *             If the client is not currently subscribed to the topic based
-     *             on the client's local state.
-     */
-    public void consume(ByteString topic, ByteString subscriberId, MessageSeqId messageSeqId)
-            throws ClientNotSubscribedException;
-
-    /**
-     * Checks if the subscriberId client is currently subscribed to the given
-     * topic.
-     *
-     * @param topic
-     *            Topic name of the subscription.
-     * @param subscriberId
-     *            ID of the subscriber
-     * @throws CouldNotConnectException
-     *             If we are not able to connect to the server host
-     * @throws ServiceDownException
-     *             If there is an error checking the server if the client has a
-     *             subscription
-     * @return Boolean indicating if the client has a subscription or not.
-     */
-    public boolean hasSubscription(ByteString topic, ByteString subscriberId) throws CouldNotConnectException,
-        ServiceDownException;
-
-    /**
-     * Fills the input List with the subscriptions this subscriberId client is
-     * subscribed to.
-     *
-     * @param subscriberId
-     *            ID of the subscriber
-     * @return List filled with subscription name (topic) strings.
-     * @throws CouldNotConnectException
-     *             If we are not able to connect to the server host
-     * @throws ServiceDownException
-     *             If there is an error retrieving the list of topics
-     */
-    public List<ByteString> getSubscriptionList(ByteString subscriberId) throws CouldNotConnectException,
-        ServiceDownException;
-
-    /**
-     * Begin delivery of messages from the server to us for this topic and
-     * subscriberId.
-     *
-     * @param topic
-     *            Topic name of the subscription
-     * @param subscriberId
-     *            ID of the subscriber
-     * @param messageHandler
-     *            Message Handler that will consume the subscribed messages
-     * @throws ClientNotSubscribedException
-     *             If the client is not currently subscribed to the topic
-     * @throws AlreadyStartDeliveryException
-     *             If someone started delivery a message handler before stopping existed one.
-     */
-    public void startDelivery(ByteString topic, ByteString subscriberId, MessageHandler messageHandler)
-            throws ClientNotSubscribedException, AlreadyStartDeliveryException;
-
-    /**
-     * Begin delivery of messages from the server to us for this topic and
-     * subscriberId.
-     *
-     * Only the messages passed <code>messageFilter</code> could be delivered to
-     * <code>messageHandler</code>.
-     *
-     * @param topic
-     *            Topic name of the subscription
-     * @param subscriberId
-     *            ID of the subscriber
-     * @param messageHandler
-     *            Message Handler that will consume the subscribed messages
-     * @throws ClientNotSubscribedException
-     *             If the client is not currently subscribed to the topic
-     * @throws AlreadyStartDeliveryException
-     *             If someone started delivery a message handler before stopping existed one.
-     * @throws NullPointerException
-     *             If either <code>messageHandler</code> or <code>messageFilter</code> is null.
-     */
-    public void startDeliveryWithFilter(ByteString topic, ByteString subscriberId,
-                                        MessageHandler messageHandler,
-                                        ClientMessageFilter messageFilter)
-            throws ClientNotSubscribedException, AlreadyStartDeliveryException;
-
-    /**
-     * Stop delivery of messages for this topic and subscriberId.
-     *
-     * @param topic
-     *            Topic name of the subscription
-     * @param subscriberId
-     *            ID of the subscriber
-     * @throws ClientNotSubscribedException
-     *             If the client is not currently subscribed to the topic
-     */
-    public void stopDelivery(ByteString topic, ByteString subscriberId) throws ClientNotSubscribedException;
-
-    /**
-     * Closes all of the client side cached data for this subscription without
-     * actually sending an unsubscribe request to the server. This will close
-     * the subscribe channel synchronously (if it exists) for the topic.
-     *
-     * @param topic
-     *            Topic name of the subscription
-     * @param subscriberId
-     *            ID of the subscriber
-     * @throws ServiceDownException
-     *             If the subscribe channel was not able to be closed
-     *             successfully
-     */
-    public void closeSubscription(ByteString topic, ByteString subscriberId) throws ServiceDownException;
-
-    /**
-     * Closes all of the client side cached data for this subscription without
-     * actually sending an unsubscribe request to the server. This will close
-     * the subscribe channel asynchronously (if it exists) for the topic.
-     *
-     * @param topic
-     *            Topic name of the subscription
-     * @param subscriberId
-     *            ID of the subscriber
-     * @param callback
-     *            Callback to invoke when the subscribe channel has been closed.
-     * @param context
-     *            Calling context that the Callback needs since this is done
-     *            asynchronously.
-     */
-    public void asyncCloseSubscription(ByteString topic, ByteString subscriberId, Callback<Void> callback,
-                                       Object context);
-
-    /**
-     * Register a subscription listener which get notified about subscription
-     * event indicating a state of a subscription that subscribed disable
-     * resubscribe logic.
-     *
-     * @param listener
-     *          Subscription Listener
-     */
-    public void addSubscriptionListener(SubscriptionListener listener);
-
-    /**
-     * Unregister a subscription listener.
-     *
-     * @param listener
-     *          Subscription Listener
-     */
-    public void removeSubscriptionListener(SubscriptionListener listener);
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/java/org/apache/hedwig/client/benchmark/BenchmarkPublisher.java
----------------------------------------------------------------------
diff --git a/hedwig-client/src/main/java/org/apache/hedwig/client/benchmark/BenchmarkPublisher.java b/hedwig-client/src/main/java/org/apache/hedwig/client/benchmark/BenchmarkPublisher.java
deleted file mode 100644
index 531840f..0000000
--- a/hedwig-client/src/main/java/org/apache/hedwig/client/benchmark/BenchmarkPublisher.java
+++ /dev/null
@@ -1,144 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.client.benchmark;
-
-import com.google.protobuf.ByteString;
-
-import org.apache.bookkeeper.util.MathUtils;
-import org.apache.hedwig.client.api.MessageHandler;
-import org.apache.hedwig.client.api.Publisher;
-import org.apache.hedwig.client.api.Subscriber;
-import org.apache.hedwig.client.benchmark.BenchmarkUtils.BenchmarkCallback;
-import org.apache.hedwig.client.benchmark.BenchmarkUtils.ThroughputLatencyAggregator;
-import org.apache.hedwig.protocol.PubSubProtocol.Message;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionOptions;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach;
-import org.apache.hedwig.util.Callback;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class BenchmarkPublisher extends BenchmarkWorker {
-    private static final Logger logger = LoggerFactory.getLogger(BenchmarkPublisher.class);
-    Publisher publisher;
-    Subscriber subscriber;
-    int msgSize;
-    int nParallel;
-    double rate;
-
-    public BenchmarkPublisher(int numTopics, int numMessages, int numRegions, int startTopicLabel, int partitionIndex,
-                              int numPartitions, Publisher publisher, Subscriber subscriber, int msgSize, int nParallel, int rate) {
-        super(numTopics, numMessages, numRegions, startTopicLabel, partitionIndex, numPartitions);
-        this.publisher = publisher;
-        this.msgSize = msgSize;
-        this.subscriber = subscriber;
-        this.nParallel = nParallel;
-
-        this.rate = rate / (numRegions * numPartitions + 0.0);
-    }
-
-    public void warmup(int nWarmup) throws Exception {
-        ByteString topic = ByteString.copyFromUtf8("warmup" + partitionIndex);
-        ByteString subId = ByteString.copyFromUtf8("sub");
-        SubscriptionOptions opts = SubscriptionOptions.newBuilder()
-            .setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH).build();
-        subscriber.subscribe(topic, subId, opts);
-
-        subscriber.startDelivery(topic, subId, new MessageHandler() {
-            @Override
-            public void deliver(ByteString topic, ByteString subscriberId, Message msg, Callback<Void> callback,
-            Object context) {
-                // noop
-                callback.operationFinished(context, null);
-            }
-        });
-
-        // picking constants arbitarily for warmup phase
-        ThroughputLatencyAggregator agg = new ThroughputLatencyAggregator("acked pubs", nWarmup, 100);
-        agg.startProgress();
-
-        Message msg = getMsg(1024);
-        for (int i = 0; i < nWarmup; i++) {
-            publisher.asyncPublish(topic, msg, new BenchmarkCallback(agg), null);
-        }
-
-        if (agg.tpAgg.queue.take() > 0) {
-            throw new RuntimeException("Warmup publishes failed!");
-        }
-
-    }
-
-    public Message getMsg(int size) {
-        StringBuilder sb = new StringBuilder();
-        for (int i = 0; i < size; i++) {
-            sb.append('a');
-        }
-        final ByteString body = ByteString.copyFromUtf8(sb.toString());
-        Message msg = Message.newBuilder().setBody(body).build();
-        return msg;
-    }
-
-    public Void call() throws Exception {
-        Message msg = getMsg(msgSize);
-
-        // Single warmup for every topic
-        int myPublishCount = 0;
-        for (int i = 0; i < numTopics; i++) {
-            if (!HedwigBenchmark.amIResponsibleForTopic(startTopicLabel + i, partitionIndex, numPartitions)) {
-                continue;
-            }
-            ByteString topic = ByteString.copyFromUtf8(HedwigBenchmark.TOPIC_PREFIX + (startTopicLabel + i));
-            publisher.publish(topic, msg);
-            myPublishCount++;
-        }
-
-        long startTime = MathUtils.now();
-        int myPublishLimit = numMessages / numRegions / numPartitions - myPublishCount;
-        myPublishCount = 0;
-        ThroughputLatencyAggregator agg = new ThroughputLatencyAggregator("acked pubs", myPublishLimit, nParallel);
-        agg.startProgress();
-
-        int topicLabel = 0;
-
-        while (myPublishCount < myPublishLimit) {
-            int topicNum = startTopicLabel + topicLabel;
-            topicLabel = (topicLabel + 1) % numTopics;
-
-            if (!HedwigBenchmark.amIResponsibleForTopic(topicNum, partitionIndex, numPartitions)) {
-                continue;
-            }
-
-            ByteString topic = ByteString.copyFromUtf8(HedwigBenchmark.TOPIC_PREFIX + topicNum);
-
-            if (rate > 0) {
-                long delay = startTime + (long) (1000 * myPublishCount / rate) - MathUtils.now();
-                if (delay > 0)
-                    Thread.sleep(delay);
-            }
-            publisher.asyncPublish(topic, msg, new BenchmarkCallback(agg), null);
-            myPublishCount++;
-        }
-
-        System.out.println("Finished unacked pubs: tput = " + BenchmarkUtils.calcTp(myPublishLimit, startTime)
-                           + " ops/s");
-        // Wait till the benchmark test has completed
-        agg.tpAgg.queue.take();
-        System.out.println(agg.summarize(startTime));
-        return null;
-    }
-
-}


Mime
View raw message