hadoop-zookeeper-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From maha...@apache.org
Subject svn commit: r987314 [4/16] - in /hadoop/zookeeper/trunk: ./ src/contrib/hedwig/ src/contrib/hedwig/client/ src/contrib/hedwig/client/src/ src/contrib/hedwig/client/src/main/ src/contrib/hedwig/client/src/main/cpp/ src/contrib/hedwig/client/src/main/cpp...
Date Thu, 19 Aug 2010 21:25:22 GMT
Added: hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/test/publishtest.cpp
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/test/publishtest.cpp?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/test/publishtest.cpp (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/test/publishtest.cpp Thu Aug 19 21:25:13 2010
@@ -0,0 +1,268 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <cppunit/Test.h>
+#include <cppunit/TestSuite.h>
+#include <cppunit/extensions/HelperMacros.h>
+
+#include "../lib/clientimpl.h"
+#include <hedwig/exceptions.h>
+#include <hedwig/callback.h>
+#include <stdexcept>
+#include <pthread.h>
+
+#include <log4cpp/Category.hh>
+
+#include "servercontrol.h"
+#include "util.h"
+
+static log4cpp::Category &LOG = log4cpp::Category::getInstance("hedwigtest."__FILE__);
+
+using namespace CppUnit;
+
+class PublishTestSuite : public CppUnit::TestFixture {
+private:
+  HedwigTest::ServerControl* control;
+  HedwigTest::TestServerPtr zk;
+  HedwigTest::TestServerPtr bk1;
+  HedwigTest::TestServerPtr bk2;
+  HedwigTest::TestServerPtr bk3;
+  HedwigTest::TestServerPtr hw1;
+  HedwigTest::TestServerPtr hw2;
+
+  CPPUNIT_TEST_SUITE( PublishTestSuite );
+  CPPUNIT_TEST(testSyncPublish);
+  CPPUNIT_TEST(testAsyncPublish);
+  CPPUNIT_TEST(testMultipleAsyncPublish);
+  //  CPPUNIT_TEST(simplePublish);
+  //CPPUNIT_TEST(simplePublishAndSubscribe);
+  //CPPUNIT_TEST(publishAndSubscribeWithRedirect);
+  CPPUNIT_TEST_SUITE_END();
+
+public:
+  PublishTestSuite() {
+
+  }
+
+  ~PublishTestSuite() {
+
+  }
+
+  void setUp()
+  {
+    control = new HedwigTest::ServerControl(HedwigTest::DEFAULT_CONTROLSERVER_PORT);
+    zk = control->startZookeeperServer(12345);
+    bk1 = control->startBookieServer(12346, zk);
+    bk2 = control->startBookieServer(12347, zk);
+    bk3 = control->startBookieServer(12348, zk);
+    
+    std::string region("testRegion");
+    hw1 = control->startPubSubServer(12349, region, zk);
+    hw2 = control->startPubSubServer(12350, region, zk);
+  }
+  
+  void tearDown() 
+  {
+    hw2->kill();
+    hw1->kill();
+    
+    bk1->kill();
+    bk2->kill();
+    bk3->kill();
+    
+    zk->kill();
+    delete control;
+  }
+
+  void testSyncPublish() {
+    Hedwig::Configuration* conf = new TestServerConfiguration(hw1);
+    
+    Hedwig::Client* client = new Hedwig::Client(*conf);
+    Hedwig::Publisher& pub = client->getPublisher();
+    
+    pub.publish("testTopic", "testMessage 1");
+    
+    delete client;
+    delete conf;
+  }
+
+  void testAsyncPublish() {
+    SimpleWaitCondition* cond = new SimpleWaitCondition();
+
+    Hedwig::Configuration* conf = new TestServerConfiguration(hw1);
+    Hedwig::Client* client = new Hedwig::Client(*conf);
+    Hedwig::Publisher& pub = client->getPublisher();
+    
+    Hedwig::OperationCallbackPtr testcb(new TestCallback(cond));
+    pub.asyncPublish("testTopic", "async test message", testcb);
+    
+    cond->wait();
+    delete cond;
+    delete client;
+    delete conf;
+  }
+
+  void testMultipleAsyncPublish() {
+    SimpleWaitCondition* cond1 = new SimpleWaitCondition();
+    SimpleWaitCondition* cond2 = new SimpleWaitCondition();
+    SimpleWaitCondition* cond3 = new SimpleWaitCondition();
+
+    Hedwig::Configuration* conf = new TestServerConfiguration(hw1);
+    Hedwig::Client* client = new Hedwig::Client(*conf);
+    Hedwig::Publisher& pub = client->getPublisher();
+   
+    Hedwig::OperationCallbackPtr testcb1(new TestCallback(cond1));
+    Hedwig::OperationCallbackPtr testcb2(new TestCallback(cond2));
+    Hedwig::OperationCallbackPtr testcb3(new TestCallback(cond3));
+
+    pub.asyncPublish("testTopic", "async test message #1", testcb1);
+    pub.asyncPublish("testTopic", "async test message #2", testcb2);
+    pub.asyncPublish("testTopic", "async test message #3", testcb3);
+    
+    cond3->wait();
+    cond2->wait();
+    cond1->wait();
+
+    delete cond3; delete cond2; delete cond1;
+    delete client;
+    delete conf;
+  }
+  /*  void simplePublish() {
+    LOG.debugStream() << ">>> simplePublish";
+    SimpleWaitCondition* cond = new SimpleWaitCondition();
+
+    Hedwig::Configuration* conf = new Configuration1();
+    Hedwig::Client* client = new Hedwig::Client(*conf);
+    Hedwig::Publisher& pub = client->getPublisher();
+    
+    Hedwig::OperationCallbackPtr testcb(new TestCallback(cond));
+    pub.asyncPublish("foobar", "barfoo", testcb);
+    
+    LOG.debugStream() << "wait for response";
+    cond->wait();
+    delete cond;
+    LOG.debugStream() << "got response";
+    
+
+    delete client;
+    delete conf;
+    LOG.debugStream() << "<<< simplePublish";
+  }
+
+  class MyMessageHandler : public Hedwig::MessageHandlerCallback {
+  public:
+    MyMessageHandler(SimpleWaitCondition* cond) : cond(cond) {}
+
+    void consume(const std::string& topic, const std::string& subscriberId, const Hedwig::Message& msg, Hedwig::OperationCallbackPtr& callback) {
+      LOG.debugStream() << "Topic: " << topic << "  subscriberId: " << subscriberId;
+      LOG.debugStream() << " Message: " << msg.body();
+      
+      callback->operationComplete();
+      cond->setTrue();
+      cond->signal();
+    }
+  private:
+    SimpleWaitCondition* cond;
+    };*/
+  /*
+  void simplePublishAndSubscribe() {
+    SimpleWaitCondition* cond1 = new SimpleWaitCondition();
+    SimpleWaitCondition* cond2 = new SimpleWaitCondition();
+    SimpleWaitCondition* cond3 = new SimpleWaitCondition();
+
+    Hedwig::Configuration* conf = new Configuration1();
+    Hedwig::Client* client = new Hedwig::Client(*conf);
+    Hedwig::Publisher& pub = client->getPublisher();
+    Hedwig::Subscriber& sub = client->getSubscriber();
+    
+    std::string topic("foobar");
+    std::string sid("mysubscriber");
+    Hedwig::OperationCallbackPtr testcb1(new TestCallback(cond1));
+    sub.asyncSubscribe(topic, sid, Hedwig::SubscribeRequest::CREATE_OR_ATTACH, testcb1);
+    Hedwig::MessageHandlerCallbackPtr messagecb(new MyMessageHandler(cond2));
+    sub.startDelivery(topic, sid, messagecb);
+    cond1->wait();
+    
+    Hedwig::OperationCallbackPtr testcb2(new TestCallback(cond3));
+    pub.asyncPublish("foobar", "barfoo", testcb2);
+    cond3->wait();
+    cond2->wait();
+
+    delete cond1;
+    delete cond3;
+    delete cond2;
+
+    delete client;
+    delete conf;
+  }
+
+  void publishAndSubscribeWithRedirect() {
+    SimpleWaitCondition* cond1 = new SimpleWaitCondition();
+    SimpleWaitCondition* cond2 = new SimpleWaitCondition();
+    SimpleWaitCondition* cond3 = new SimpleWaitCondition();
+    SimpleWaitCondition* cond4 = new SimpleWaitCondition();
+
+    Hedwig::Configuration* publishconf = new Configuration1();
+    Hedwig::Configuration* subscribeconf = new Configuration2();
+
+    Hedwig::Client* publishclient = new Hedwig::Client(*publishconf);
+    Hedwig::Publisher& pub = publishclient->getPublisher();
+
+    Hedwig::Client* subscribeclient = new Hedwig::Client(*subscribeconf);
+    Hedwig::Subscriber& sub = subscribeclient->getSubscriber();
+    
+    LOG.debugStream() << "publishing";
+    Hedwig::OperationCallbackPtr testcb2(new TestCallback(cond3));
+    pub.asyncPublish("foobar", "barfoo", testcb2);
+    cond3->wait();
+    
+    LOG.debugStream() << "Subscribing";
+    std::string topic("foobar");
+    std::string sid("mysubscriber");
+    Hedwig::OperationCallbackPtr testcb1(new TestCallback(cond1));
+    sub.asyncSubscribe(topic, sid, Hedwig::SubscribeRequest::CREATE_OR_ATTACH, testcb1);
+    LOG.debugStream() << "Starting delivery";
+    Hedwig::MessageHandlerCallbackPtr messagecb(new MyMessageHandler(cond2));
+    sub.startDelivery(topic, sid, messagecb);
+
+    LOG.debugStream() << "Subscribe wait";
+    cond1->wait();
+
+    Hedwig::OperationCallbackPtr testcb3(new TestCallback(cond4));
+    pub.asyncPublish("foobar", "barfoo", testcb3);
+    cond4->wait();
+
+
+    LOG.debugStream() << "Delivery wait";
+
+    cond2->wait();
+
+    sub.stopDelivery(topic, sid);
+
+    delete cond1;
+    delete cond3;
+    delete cond2;
+    delete cond4;
+
+    delete subscribeclient;
+    delete publishclient;
+    delete publishconf;
+    delete subscribeconf;
+    }*/
+};
+
+CPPUNIT_TEST_SUITE_NAMED_REGISTRATION( PublishTestSuite, "Publish");

Added: hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/test/pubsubdatatest.cpp
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/test/pubsubdatatest.cpp?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/test/pubsubdatatest.cpp (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/test/pubsubdatatest.cpp Thu Aug 19 21:25:13 2010
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <cppunit/Test.h>
+#include <cppunit/TestSuite.h>
+#include <cppunit/extensions/HelperMacros.h>
+
+#include "../lib/clientimpl.h"
+#include <hedwig/exceptions.h>
+#include <stdexcept>
+
+using namespace CppUnit;
+
+class PubSubDataTestSuite : public CppUnit::TestFixture {
+  CPPUNIT_TEST_SUITE( PubSubDataTestSuite );
+  CPPUNIT_TEST(createPubSubData);
+  CPPUNIT_TEST_SUITE_END();
+
+public:
+  void setUp()
+  {
+  }
+
+  void tearDown() 
+  {
+  }
+
+  void createPubSubData() {
+    
+  }
+};
+
+CPPUNIT_TEST_SUITE_REGISTRATION( PubSubDataTestSuite );

Added: hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/test/pubsubtest.cpp
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/test/pubsubtest.cpp?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/test/pubsubtest.cpp (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/test/pubsubtest.cpp Thu Aug 19 21:25:13 2010
@@ -0,0 +1,322 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <cppunit/Test.h>
+#include <cppunit/TestSuite.h>
+#include <cppunit/extensions/HelperMacros.h>
+
+#include "../lib/clientimpl.h"
+#include <hedwig/exceptions.h>
+#include <hedwig/callback.h>
+#include <stdexcept>
+#include <pthread.h>
+
+#include <log4cpp/Category.hh>
+
+#include "servercontrol.h"
+#include "util.h"
+
+static log4cpp::Category &LOG = log4cpp::Category::getInstance("hedwigtest."__FILE__);
+
+class PubSubTestSuite : public CppUnit::TestFixture {
+private:
+  HedwigTest::ServerControl* control;
+  HedwigTest::TestServerPtr zk;
+  HedwigTest::TestServerPtr bk1;
+  HedwigTest::TestServerPtr bk2;
+  HedwigTest::TestServerPtr bk3;
+  HedwigTest::TestServerPtr hw1;
+
+			       
+  CPPUNIT_TEST_SUITE( PubSubTestSuite );
+  CPPUNIT_TEST(testPubSubContinuousOverClose);
+  //  CPPUNIT_TEST(testPubSubContinuousOverServerDown);
+  CPPUNIT_TEST(testMultiTopic);
+  CPPUNIT_TEST(testMultiTopicMultiSubscriber);
+  CPPUNIT_TEST_SUITE_END();
+
+public:
+  PubSubTestSuite() {
+    
+  }
+
+  ~PubSubTestSuite() {
+  }
+
+  void setUp()
+  {
+    control = new HedwigTest::ServerControl(HedwigTest::DEFAULT_CONTROLSERVER_PORT);
+    zk = control->startZookeeperServer(12345);
+    bk1 = control->startBookieServer(12346, zk);
+    bk2 = control->startBookieServer(12347, zk);
+    bk3 = control->startBookieServer(12348, zk);
+    
+    std::string region("testRegion");
+    hw1 = control->startPubSubServer(12349, region, zk);
+  }
+  
+  void tearDown() 
+  {
+    try {
+      hw1->kill();
+    
+      bk1->kill();
+      bk2->kill();
+      bk3->kill();
+      
+      zk->kill();
+    } catch (std::exception& e) {
+      // don't allow an exception to break everything, we're going deleting the control no matter what
+    }
+    delete control;
+  }
+
+  class MyMessageHandlerCallback : public Hedwig::MessageHandlerCallback {
+  public:
+    MyMessageHandlerCallback(const std::string& topic, const std::string& subscriberId) : messagesReceived(0), topic(topic), subscriberId(subscriberId) {
+      
+    }
+
+    virtual void consume(const std::string& topic, const std::string& subscriberId, const Hedwig::Message& msg, Hedwig::OperationCallbackPtr& callback) {
+      if (topic == this->topic && subscriberId == this->subscriberId) {
+	mutex.lock();
+	messagesReceived++;
+	lastMessage = msg.body();
+	callback->operationComplete();
+	mutex.unlock();
+      }
+    }
+    
+    std::string getLastMessage() {
+      mutex.lock();
+      std::string s = lastMessage;
+      mutex.unlock();
+      return s;
+    }
+
+    int numMessagesReceived() {
+      mutex.lock();
+      int i = messagesReceived;
+      mutex.unlock();
+      return i;
+    }    
+    
+  protected:
+    Hedwig::Mutex mutex;
+    int messagesReceived;
+    std::string lastMessage;
+    std::string topic;
+    std::string subscriberId;
+  };
+ 
+  void testPubSubContinuousOverClose() {
+    std::string topic = "pubSubTopic";
+    std::string sid = "MySubscriberid-1";
+
+    Hedwig::Configuration* conf = new TestServerConfiguration(hw1);
+    std::auto_ptr<Hedwig::Configuration> confptr(conf);
+    
+    Hedwig::Client* client = new Hedwig::Client(*conf);
+    std::auto_ptr<Hedwig::Client> clientptr(client);
+
+    Hedwig::Subscriber& sub = client->getSubscriber();
+    Hedwig::Publisher& pub = client->getPublisher();
+
+    sub.subscribe(topic, sid, Hedwig::SubscribeRequest::CREATE_OR_ATTACH);
+    MyMessageHandlerCallback* cb = new MyMessageHandlerCallback(topic, sid);
+    Hedwig::MessageHandlerCallbackPtr handler(cb);
+
+    sub.startDelivery(topic, sid, handler);
+    pub.publish(topic, "Test Message 1");
+    bool pass = false;
+    for (int i = 0; i < 10; i++) {
+      sleep(3);
+      if (cb->numMessagesReceived() > 0) {
+	if (cb->getLastMessage() == "Test Message 1") {
+	  pass = true;
+	  break;
+	}
+      }
+    }
+    CPPUNIT_ASSERT(pass);
+    sub.closeSubscription(topic, sid);
+
+    pub.publish(topic, "Test Message 2");
+    
+    sub.subscribe(topic, sid, Hedwig::SubscribeRequest::CREATE_OR_ATTACH);
+    sub.startDelivery(topic, sid, handler);
+    pass = false;
+    for (int i = 0; i < 10; i++) {
+      sleep(3);
+      if (cb->numMessagesReceived() > 0) {
+	if (cb->getLastMessage() == "Test Message 2") {
+	  pass = true;
+	  break;
+	}
+      }
+    }
+    CPPUNIT_ASSERT(pass);
+  }
+
+  /*  void testPubSubContinuousOverServerDown() {
+    std::string topic = "pubSubTopic";
+    std::string sid = "MySubscriberid-1";
+
+    Hedwig::Configuration* conf = new TestServerConfiguration(hw1);
+    std::auto_ptr<Hedwig::Configuration> confptr(conf);
+    
+    Hedwig::Client* client = new Hedwig::Client(*conf);
+    std::auto_ptr<Hedwig::Client> clientptr(client);
+
+    Hedwig::Subscriber& sub = client->getSubscriber();
+    Hedwig::Publisher& pub = client->getPublisher();
+
+    sub.subscribe(topic, sid, Hedwig::SubscribeRequest::CREATE_OR_ATTACH);
+    MyMessageHandlerCallback* cb = new MyMessageHandlerCallback(topic, sid);
+    Hedwig::MessageHandlerCallbackPtr handler(cb);
+
+    sub.startDelivery(topic, sid, handler);
+    pub.publish(topic, "Test Message 1");
+    bool pass = false;
+    for (int i = 0; i < 10; i++) {
+      sleep(3);
+      if (cb->numMessagesReceived() > 0) {
+	if (cb->getLastMessage() == "Test Message 1") {
+	  pass = true;
+	  break;
+	}
+      }
+    }
+    CPPUNIT_ASSERT(pass);
+    sub.closeSubscription(topic, sid);
+
+    pub.publish(topic, "Test Message 2");
+    
+    sub.subscribe(topic, sid, Hedwig::SubscribeRequest::CREATE_OR_ATTACH);
+    sub.startDelivery(topic, sid, handler);
+    pass = false;
+    for (int i = 0; i < 10; i++) {
+      sleep(3);
+      if (cb->numMessagesReceived() > 0) {
+	if (cb->getLastMessage() == "Test Message 2") {
+	  pass = true;
+	  break;
+	}
+      }
+    }
+    CPPUNIT_ASSERT(pass);
+    }*/
+
+  void testMultiTopic() {
+    std::string topicA = "pubSubTopicA";
+    std::string topicB = "pubSubTopicB";
+    std::string sid = "MySubscriberid-3";
+
+    Hedwig::Configuration* conf = new TestServerConfiguration(hw1);
+    std::auto_ptr<Hedwig::Configuration> confptr(conf);
+    
+    Hedwig::Client* client = new Hedwig::Client(*conf);
+    std::auto_ptr<Hedwig::Client> clientptr(client);
+
+    Hedwig::Subscriber& sub = client->getSubscriber();
+    Hedwig::Publisher& pub = client->getPublisher();
+
+    sub.subscribe(topicA, sid, Hedwig::SubscribeRequest::CREATE_OR_ATTACH);
+    sub.subscribe(topicB, sid, Hedwig::SubscribeRequest::CREATE_OR_ATTACH);
+   
+    MyMessageHandlerCallback* cbA = new MyMessageHandlerCallback(topicA, sid);
+    Hedwig::MessageHandlerCallbackPtr handlerA(cbA);
+    sub.startDelivery(topicA, sid, handlerA);
+
+    MyMessageHandlerCallback* cbB = new MyMessageHandlerCallback(topicB, sid);
+    Hedwig::MessageHandlerCallbackPtr handlerB(cbB);
+    sub.startDelivery(topicB, sid, handlerB);
+
+    pub.publish(topicA, "Test Message A");
+    pub.publish(topicB, "Test Message B");
+    int passA = false, passB = false;
+    
+    for (int i = 0; i < 10; i++) {
+      sleep(3);
+      if (cbA->numMessagesReceived() > 0) {
+	if (cbA->getLastMessage() == "Test Message A") {
+	  passA = true;
+	}
+      }
+      if (cbB->numMessagesReceived() > 0) {
+	if (cbB->getLastMessage() == "Test Message B") {
+	  passB = true;
+	}
+      }
+      if (passA && passB) {
+	break;
+      }
+    }
+    CPPUNIT_ASSERT(passA && passB);
+  }
+
+  void testMultiTopicMultiSubscriber() {
+    std::string topicA = "pubSubTopicA";
+    std::string topicB = "pubSubTopicB";
+    std::string sidA = "MySubscriberid-4";
+    std::string sidB = "MySubscriberid-5";
+
+    Hedwig::Configuration* conf = new TestServerConfiguration(hw1);
+    std::auto_ptr<Hedwig::Configuration> confptr(conf);
+    
+    Hedwig::Client* client = new Hedwig::Client(*conf);
+    std::auto_ptr<Hedwig::Client> clientptr(client);
+
+    Hedwig::Subscriber& sub = client->getSubscriber();
+    Hedwig::Publisher& pub = client->getPublisher();
+
+    sub.subscribe(topicA, sidA, Hedwig::SubscribeRequest::CREATE_OR_ATTACH);
+    sub.subscribe(topicB, sidB, Hedwig::SubscribeRequest::CREATE_OR_ATTACH);
+   
+    MyMessageHandlerCallback* cbA = new MyMessageHandlerCallback(topicA, sidA);
+    Hedwig::MessageHandlerCallbackPtr handlerA(cbA);
+    sub.startDelivery(topicA, sidA, handlerA);
+
+    MyMessageHandlerCallback* cbB = new MyMessageHandlerCallback(topicB, sidB);
+    Hedwig::MessageHandlerCallbackPtr handlerB(cbB);
+    sub.startDelivery(topicB, sidB, handlerB);
+
+    pub.publish(topicA, "Test Message A");
+    pub.publish(topicB, "Test Message B");
+    int passA = false, passB = false;
+    
+    for (int i = 0; i < 10; i++) {
+      sleep(3);
+      if (cbA->numMessagesReceived() > 0) {
+	if (cbA->getLastMessage() == "Test Message A") {
+	  passA = true;
+	}
+      }
+      if (cbB->numMessagesReceived() > 0) {
+	if (cbB->getLastMessage() == "Test Message B") {
+	  passB = true;
+	}
+      }
+      if (passA && passB) {
+	break;
+      }
+    }
+    CPPUNIT_ASSERT(passA && passB);
+  }
+};
+
+CPPUNIT_TEST_SUITE_NAMED_REGISTRATION( PubSubTestSuite, "PubSub" );

Added: hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/test/servercontrol.cpp
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/test/servercontrol.cpp?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/test/servercontrol.cpp (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/test/servercontrol.cpp Thu Aug 19 21:25:13 2010
@@ -0,0 +1,175 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <arpa/inet.h>
+#include <netinet/tcp.h>
+
+#include <string>
+#include <string.h>
+#include <stdlib.h>
+#include "servercontrol.h"
+
+#include <log4cpp/Category.hh>
+#include <sstream>   
+
+static log4cpp::Category &LOG = log4cpp::Category::getInstance("hedwigtest."__FILE__);
+
+using namespace HedwigTest;
+
+const int MAX_COMMAND_LN = 256;
+
+class TestServerImpl : public TestServer {
+public:
+  TestServerImpl(std::string& address, ServerControl& sc);
+  ~TestServerImpl();
+  void kill();
+  std::string& getAddress();
+
+private:
+  std::string address;
+  ServerControl& sc;
+};
+
+TestServerImpl::TestServerImpl(std::string& address, ServerControl& sc) : address(address), sc(sc)  {
+}
+
+TestServerImpl::~TestServerImpl() {
+}
+
+void TestServerImpl::kill() {
+  std::ostringstream sstr;
+  sstr << "KILL " << address << std::endl;
+  ServerControl::ServerResponse resp = sc.requestResponse(sstr.str());
+  if (resp.status != "OK") {
+    LOG.errorStream() << "Error killing Server " << resp.message;
+    throw ErrorKillingServerException();
+  }
+}
+
+std::string& TestServerImpl::getAddress() {
+  return address;
+}
+ 
+ServerControl::ServerControl(int port) {
+  socketfd = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
+  
+  if (-1 == socketfd) {
+    LOG.errorStream() << "Couldn't create socket";
+    throw CantConnectToServerControlDaemonException();
+  }
+
+  sockaddr_in addr;
+  addr.sin_family = AF_INET; 
+  addr.sin_port = htons(port);
+  addr.sin_addr.s_addr = inet_addr("127.0.0.1"); 
+    
+  if (-1 == ::connect(socketfd, (const sockaddr *)&addr, sizeof(struct sockaddr))) {
+    LOG.errorStream() << "Couldn't connect socket";
+    close(socketfd);
+    throw CantConnectToServerControlDaemonException();
+  }
+}
+
+ServerControl::~ServerControl() {
+  close(socketfd);
+}
+  
+
+ServerControl::ServerResponse ServerControl::requestResponse(std::string request) {
+  socketlock.lock();
+  char response[MAX_COMMAND_LN];
+
+  LOG.debugStream() << "REQ: " << request.c_str() << " " << request.length();
+  send(socketfd, request.c_str(), request.length(), 0);
+  
+  memset(response, 0, MAX_COMMAND_LN);
+  recv(socketfd, response, MAX_COMMAND_LN, 0);
+  LOG.debugStream() << "RESP: " << response;
+
+  socketlock.unlock();
+
+  char* space = strchr(response, ' ');
+  if (space == NULL) {
+    throw InvalidServerControlDaemonResponseException();
+  }
+  char* status = response;
+  *space = 0;
+  
+  char* message = space+1;
+  char* cr = strchr(message, '\n');
+  if (cr != NULL) {
+    *cr = 0;
+  }
+  if (strlen(message) < 1) {
+    throw InvalidServerControlDaemonResponseException();
+  }
+  LOG.debugStream() << "$" << message << "$";
+  ServerControl::ServerResponse resp = { std::string(status), std::string(message) };
+  return resp;
+}
+  
+TestServerPtr ServerControl::startZookeeperServer(int port) {  
+  std::ostringstream sstr;
+  sstr << "START ZOOKEEPER " << port << std::endl;
+
+  std::string req(sstr.str());
+  LOG.debugStream() << req;
+
+  ServerControl::ServerResponse resp = requestResponse(req);
+  if (resp.status == "OK") {
+    return TestServerPtr(new TestServerImpl(resp.message, *this));
+  } else {
+    LOG.errorStream() << "Error creating zookeeper on port " << port << " " << resp.message;
+    throw ErrorCreatingServerException();
+  }
+}
+
+TestServerPtr ServerControl::startBookieServer(int port, TestServerPtr& zookeeperServer) {
+  std::ostringstream sstr;
+  sstr << "START BOOKKEEPER " << port << " " << zookeeperServer->getAddress() << std::endl;
+
+  std::string req(sstr.str());
+  LOG.debugStream() << req;
+
+  ServerControl::ServerResponse resp = requestResponse(req);
+  if (resp.status == "OK") {
+    return TestServerPtr(new TestServerImpl(resp.message, *this));
+  } else {
+    LOG.errorStream() << "Error creating bookkeeper on port " << port << " " << resp.message;
+    throw ErrorCreatingServerException();
+  }
+}
+
+TestServerPtr ServerControl::startPubSubServer(int port, std::string& region, TestServerPtr& zookeeperServer) {
+  std::ostringstream sstr;
+  sstr << "START HEDWIG " << port << " " << region << " " << zookeeperServer->getAddress() << std::endl;
+
+  std::string req(sstr.str());
+  LOG.debugStream() << req;
+
+  ServerControl::ServerResponse resp = requestResponse(req);
+  if (resp.status == "OK") {
+    return TestServerPtr(new TestServerImpl(resp.message, *this));
+  } else {
+    LOG.errorStream() << "Error creating hedwig on port " << port << " " << resp.message;
+    throw ErrorCreatingServerException();
+  }
+}

Added: hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/test/servercontrol.h
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/test/servercontrol.h?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/test/servercontrol.h (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/test/servercontrol.h Thu Aug 19 21:25:13 2010
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef SERVERCONTROL_H
+#define SERVERCONTROL_H
+
+#include <tr1/memory>
+#include <exception>
+#include "../lib/util.h"
+
+namespace HedwigTest {
+    const int DEFAULT_CONTROLSERVER_PORT = 5672;
+
+  class TestException : public std::exception {};
+  class CantConnectToServerControlDaemonException : public TestException {};
+  class InvalidServerControlDaemonResponseException : public TestException {};
+  class ErrorCreatingServerException : public TestException {};
+  class ErrorKillingServerException : public TestException {};
+
+  class TestServer {
+  public:
+    virtual void kill() = 0;
+    virtual std::string& getAddress() = 0;
+  };
+  
+  typedef std::tr1::shared_ptr<TestServer> TestServerPtr;
+
+  class ServerControl {
+  public:
+    ServerControl(int port);
+    ~ServerControl();
+    
+    TestServerPtr startZookeeperServer(int port);
+    TestServerPtr startBookieServer(int port, TestServerPtr& zookeeperServer);
+    TestServerPtr startPubSubServer(int port, std::string& region, TestServerPtr& zookeeperServer);
+    
+    struct ServerResponse {
+      std::string status;
+      std::string message; 
+    };
+    ServerResponse requestResponse(std::string request);
+
+  public:
+    int socketfd;
+    Hedwig::Mutex socketlock;
+  };
+};
+
+#endif

Added: hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/test/subscribetest.cpp
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/test/subscribetest.cpp?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/test/subscribetest.cpp (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/test/subscribetest.cpp Thu Aug 19 21:25:13 2010
@@ -0,0 +1,222 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <cppunit/Test.h>
+#include <cppunit/TestSuite.h>
+#include <cppunit/extensions/HelperMacros.h>
+
+#include "../lib/clientimpl.h"
+#include <hedwig/exceptions.h>
+#include <hedwig/callback.h>
+#include <stdexcept>
+#include <pthread.h>
+
+#include <log4cpp/Category.hh>
+
+#include "servercontrol.h"
+#include "util.h"
+
+static log4cpp::Category &LOG = log4cpp::Category::getInstance("hedwigtest."__FILE__);
+
+class SubscribeTestSuite : public CppUnit::TestFixture {
+private:
+  HedwigTest::ServerControl* control;
+  HedwigTest::TestServerPtr zk;
+  HedwigTest::TestServerPtr bk1;
+  HedwigTest::TestServerPtr bk2;
+  HedwigTest::TestServerPtr bk3;
+  HedwigTest::TestServerPtr hw1;
+  HedwigTest::TestServerPtr hw2;
+
+			       
+  CPPUNIT_TEST_SUITE( SubscribeTestSuite );
+  CPPUNIT_TEST(testSyncSubscribe);
+  CPPUNIT_TEST(testSyncSubscribeAttach);
+  CPPUNIT_TEST(testAsyncSubscribe);
+  CPPUNIT_TEST(testAsyncSubcribeAndUnsubscribe);
+  CPPUNIT_TEST(testAsyncSubcribeAndSyncUnsubscribe);
+  CPPUNIT_TEST(testAsyncSubcribeCloseSubscriptionAndThenResubscribe);
+  CPPUNIT_TEST(testUnsubscribeWithoutSubscribe);
+  CPPUNIT_TEST(testSubscribeTwice);      
+  CPPUNIT_TEST_SUITE_END();
+
+public:
+  SubscribeTestSuite() {
+    
+  }
+
+  ~SubscribeTestSuite() {
+  }
+
+  void setUp()
+  {
+    control = new HedwigTest::ServerControl(HedwigTest::DEFAULT_CONTROLSERVER_PORT);
+    zk = control->startZookeeperServer(12345);
+    bk1 = control->startBookieServer(12346, zk);
+    bk2 = control->startBookieServer(12347, zk);
+    bk3 = control->startBookieServer(12348, zk);
+    
+    std::string region("testRegion");
+    hw1 = control->startPubSubServer(12349, region, zk);
+    hw2 = control->startPubSubServer(12350, region, zk);
+  }
+  
+  void tearDown() 
+  {
+    try {
+      hw1->kill();
+    
+      bk1->kill();
+      bk2->kill();
+      bk3->kill();
+      
+      zk->kill();
+    } catch (std::exception& e) {
+      // don't allow an exception to break everything, we're going deleting the control no matter what
+    }
+    delete control;
+  }
+
+  void testSyncSubscribe() {
+    Hedwig::Configuration* conf = new TestServerConfiguration(hw1);
+    std::auto_ptr<Hedwig::Configuration> confptr(conf);
+    
+    Hedwig::Client* client = new Hedwig::Client(*conf);
+    std::auto_ptr<Hedwig::Client> clientptr(client);
+
+    Hedwig::Subscriber& sub = client->getSubscriber();
+    
+    sub.subscribe("testTopic", "mySubscriberId-1", Hedwig::SubscribeRequest::CREATE_OR_ATTACH);
+  }
+
+  void testSyncSubscribeAttach() {
+    Hedwig::Configuration* conf = new TestServerConfiguration(hw1);
+    std::auto_ptr<Hedwig::Configuration> confptr(conf);
+
+    Hedwig::Client* client = new Hedwig::Client(*conf);
+    std::auto_ptr<Hedwig::Client> clientptr(client);
+
+    Hedwig::Subscriber& sub = client->getSubscriber();
+    
+    CPPUNIT_ASSERT_THROW(sub.subscribe("iAmATopicWhoDoesNotExist", "mySubscriberId-2", Hedwig::SubscribeRequest::ATTACH), Hedwig::ClientException);
+  }
+
+  void testAsyncSubscribe() {
+    SimpleWaitCondition* cond1 = new SimpleWaitCondition();
+    std::auto_ptr<SimpleWaitCondition> cond1ptr(cond1);
+
+    Hedwig::Configuration* conf = new TestServerConfiguration(hw1);
+    std::auto_ptr<Hedwig::Configuration> confptr(conf);
+
+    Hedwig::Client* client = new Hedwig::Client(*conf);
+    std::auto_ptr<Hedwig::Client> clientptr(client);
+
+    Hedwig::Subscriber& sub = client->getSubscriber();
+   
+    Hedwig::OperationCallbackPtr testcb1(new TestCallback(cond1));
+
+    sub.asyncSubscribe("testTopic", "mySubscriberId-3", Hedwig::SubscribeRequest::CREATE_OR_ATTACH, testcb1);
+    
+    cond1->wait();
+  }
+  
+  void testAsyncSubcribeAndUnsubscribe() {
+    SimpleWaitCondition* cond1 = new SimpleWaitCondition();
+    std::auto_ptr<SimpleWaitCondition> cond1ptr(cond1);
+    SimpleWaitCondition* cond2 = new SimpleWaitCondition();
+    std::auto_ptr<SimpleWaitCondition> cond2ptr(cond2);
+
+    Hedwig::Configuration* conf = new TestServerConfiguration(hw1);
+    std::auto_ptr<Hedwig::Configuration> confptr(conf);
+
+    Hedwig::Client* client = new Hedwig::Client(*conf);
+    std::auto_ptr<Hedwig::Client> clientptr(client);
+
+    Hedwig::Subscriber& sub = client->getSubscriber();
+   
+    Hedwig::OperationCallbackPtr testcb1(new TestCallback(cond1));
+    Hedwig::OperationCallbackPtr testcb2(new TestCallback(cond2));
+
+    sub.asyncSubscribe("testTopic", "mySubscriberId-4", Hedwig::SubscribeRequest::CREATE_OR_ATTACH, testcb1);
+    cond1->wait();
+    
+    sub.asyncUnsubscribe("testTopic", "mySubscriberId-4", testcb2);
+    cond2->wait();
+  }
+
+  void testAsyncSubcribeAndSyncUnsubscribe() {
+    SimpleWaitCondition* cond1 = new SimpleWaitCondition();
+    std::auto_ptr<SimpleWaitCondition> cond1ptr(cond1);
+
+    Hedwig::Configuration* conf = new TestServerConfiguration(hw1);
+    std::auto_ptr<Hedwig::Configuration> confptr(conf);
+
+    Hedwig::Client* client = new Hedwig::Client(*conf);
+    std::auto_ptr<Hedwig::Client> clientptr(client);
+
+    Hedwig::Subscriber& sub = client->getSubscriber();
+   
+    Hedwig::OperationCallbackPtr testcb1(new TestCallback(cond1));
+    
+    sub.asyncSubscribe("testTopic", "mySubscriberId-5", Hedwig::SubscribeRequest::CREATE_OR_ATTACH, testcb1);
+    cond1->wait();
+
+    sub.unsubscribe("testTopic", "mySubscriberId-5");
+  }
+
+  void testAsyncSubcribeCloseSubscriptionAndThenResubscribe() {
+    Hedwig::Configuration* conf = new TestServerConfiguration(hw1);
+    std::auto_ptr<Hedwig::Configuration> confptr(conf);
+
+    Hedwig::Client* client = new Hedwig::Client(*conf);
+    std::auto_ptr<Hedwig::Client> clientptr(client);
+
+    Hedwig::Subscriber& sub = client->getSubscriber();
+   
+    sub.subscribe("testTopic", "mySubscriberId-6", Hedwig::SubscribeRequest::CREATE_OR_ATTACH);
+    sub.closeSubscription("testTopic", "mySubscriberId-6");
+    sub.subscribe("testTopic", "mySubscriberId-6", Hedwig::SubscribeRequest::CREATE_OR_ATTACH);
+    sub.unsubscribe("testTopic", "mySubscriberId-6");
+  }
+
+  void testUnsubscribeWithoutSubscribe() {
+    Hedwig::Configuration* conf = new TestServerConfiguration(hw1);
+    std::auto_ptr<Hedwig::Configuration> confptr(conf);
+    
+    Hedwig::Client* client = new Hedwig::Client(*conf);
+    std::auto_ptr<Hedwig::Client> clientptr(client);
+
+    Hedwig::Subscriber& sub = client->getSubscriber();
+    
+    CPPUNIT_ASSERT_THROW(sub.unsubscribe("testTopic", "mySubscriberId-7"), Hedwig::NotSubscribedException);
+  }
+
+  void testSubscribeTwice() {
+    Hedwig::Configuration* conf = new TestServerConfiguration(hw1);
+    std::auto_ptr<Hedwig::Configuration> confptr(conf);
+    
+    Hedwig::Client* client = new Hedwig::Client(*conf);
+    std::auto_ptr<Hedwig::Client> clientptr(client);
+
+    Hedwig::Subscriber& sub = client->getSubscriber();
+    
+    sub.subscribe("testTopic", "mySubscriberId-8", Hedwig::SubscribeRequest::CREATE_OR_ATTACH);
+    CPPUNIT_ASSERT_THROW(sub.subscribe("testTopic", "mySubscriberId-8", Hedwig::SubscribeRequest::CREATE_OR_ATTACH), Hedwig::AlreadySubscribedException);
+  }
+};
+
+CPPUNIT_TEST_SUITE_NAMED_REGISTRATION( SubscribeTestSuite, "Subscribe" );

Added: hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/test/test.sh
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/test/test.sh?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/test/test.sh (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/test/test.sh Thu Aug 19 21:25:13 2010
@@ -0,0 +1,21 @@
+#!/bin/sh
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#   
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+export LD_LIBRARY_PATH=/usr/lib/jvm/java-6-sun/jre/lib/i386/server/:/usr/lib/jvm/java-6-sun/jre/lib/i386/
+export CLASSPATH=$HOME/src/hedwig/server/target/test-classes:$HOME/src/hedwig/server/lib/bookkeeper-SNAPSHOT.jar:$HOME/src/hedwig/server/lib/zookeeper-SNAPSHOT.jar:$HOME/src/hedwig/server/target/classes:$HOME/src/hedwig/protocol/target/classes:$HOME/src/hedwig/client/target/classes:$HOME/.m2/repository/commons-configuration/commons-configuration/1.6/commons-configuration-1.6.jar:$HOME/.m2/repository/org/jboss/netty/netty/3.1.2.GA/netty-3.1.2.GA.jar:$HOME/.m2/repository/commons-lang/commons-lang/2.4/commons-lang-2.4.jar:$HOME/.m2/repository/commons-collections/commons-collections/3.2.1/commons-collections-3.2.1.jar:$HOME/.m2/repository/commons-logging/commons-logging/1.1.1/commons-logging-1.1.1.jar:$HOME/.m2/repository/com/google/protobuf/protobuf-java/2.3.0/protobuf-java-2.3.0.jar:$HOME/.m2/repository/log4j/log4j/1.2.14/log4j-1.2.14.jar:$HOME/src/hedwig/client/target/classes/
+
+./hedwigtest
\ No newline at end of file

Added: hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/test/util.h
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/test/util.h?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/test/util.h (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/test/util.h Thu Aug 19 21:25:13 2010
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "../lib/clientimpl.h"
+#include <hedwig/exceptions.h>
+#include <hedwig/callback.h>
+#include <stdexcept>
+#include <pthread.h>
+
+static log4cpp::Category &UTILLOG = log4cpp::Category::getInstance("hedwigtest."__FILE__);
+
+
+class SimpleWaitCondition : public Hedwig::WaitConditionBase {
+public:
+  SimpleWaitCondition() : flag(false) {};
+  ~SimpleWaitCondition() { wait(); }
+
+  void setTrue() { UTILLOG.debugStream() << "Setting flag " << &flag << " to true"; flag=true; UTILLOG.debugStream() << "Flag now " << flag; }
+  bool isTrue() {
+    UTILLOG.debugStream() << &flag << " isTrue? " << flag;
+    return flag;
+  }
+private:
+  bool flag;
+};
+
+class TestCallback : public Hedwig::OperationCallback {
+public:
+  TestCallback(SimpleWaitCondition* cond) 
+    : cond(cond) {
+  }
+
+  virtual void operationComplete() {
+    UTILLOG.debugStream() << "operationComplete";
+    cond->lock();
+    cond->setTrue();
+    cond->signalAndUnlock();
+  }
+  
+  virtual void operationFailed(const std::exception& exception) {
+    UTILLOG.debugStream() << "operationFailed: " << exception.what();
+    cond->lock();
+    cond->setTrue();
+    cond->signalAndUnlock();
+  }    
+private:
+  SimpleWaitCondition *cond;
+};
+
+
+class TestServerConfiguration : public Hedwig::Configuration {
+public:
+  TestServerConfiguration(HedwigTest::TestServerPtr& server) : server(server), address(server->getAddress()) {}
+  
+  virtual const std::string& getDefaultServer() const {
+    return address;
+  }
+  
+private:
+  HedwigTest::TestServerPtr server;
+  const std::string address;
+};

Added: hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/test/utiltest.cpp
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/test/utiltest.cpp?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/test/utiltest.cpp (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/test/utiltest.cpp Thu Aug 19 21:25:13 2010
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <cppunit/Test.h>
+#include <cppunit/TestSuite.h>
+#include <cppunit/extensions/HelperMacros.h>
+
+#include "../lib/util.h"
+#include <hedwig/exceptions.h>
+#include <stdexcept>
+
+using namespace CppUnit;
+
+class UtilTestSuite : public CppUnit::TestFixture {
+  CPPUNIT_TEST_SUITE( UtilTestSuite );
+  CPPUNIT_TEST(testHostAddress);
+  CPPUNIT_TEST_SUITE_END();
+
+public:
+  void setUp()
+  {
+  }
+
+  void tearDown() 
+  {
+  }
+
+  void testHostAddress() {
+    // good address (no ports)
+    Hedwig::HostAddress a1 = Hedwig::HostAddress::fromString("www.yahoo.com");
+    CPPUNIT_ASSERT(a1.port() == 4080);
+
+    // good address with ip (no ports)
+    Hedwig::HostAddress a2 = Hedwig::HostAddress::fromString("127.0.0.1");
+    CPPUNIT_ASSERT(a2.port() == 4080);
+    CPPUNIT_ASSERT(a2.ip() == ((127 << 24) | 1));
+
+    // good address
+    Hedwig::HostAddress a3 = Hedwig::HostAddress::fromString("www.yahoo.com:80");
+    CPPUNIT_ASSERT(a3.port() == 80);
+
+    // good address with ip
+    Hedwig::HostAddress a4 = Hedwig::HostAddress::fromString("127.0.0.1:80");
+    CPPUNIT_ASSERT(a4.port() == 80);
+    CPPUNIT_ASSERT(a4.ip() == ((127 << 24) | 1));
+
+    // good address (with ssl)
+    Hedwig::HostAddress a5 = Hedwig::HostAddress::fromString("www.yahoo.com:80:443");
+    CPPUNIT_ASSERT(a5.port() == 80);
+
+    // good address with ip
+    Hedwig::HostAddress a6 = Hedwig::HostAddress::fromString("127.0.0.1:80:443");
+    CPPUNIT_ASSERT(a6.port() == 80);
+    CPPUNIT_ASSERT(a6.ip() == ((127 << 24) | 1));
+
+    // nothing
+    CPPUNIT_ASSERT_THROW(Hedwig::HostAddress::fromString(""), Hedwig::HostResolutionException);
+    
+    // nothing but colons
+    CPPUNIT_ASSERT_THROW(Hedwig::HostAddress::fromString("::::::::::::::::"), Hedwig::ConfigurationException);
+    
+    // only port number
+    CPPUNIT_ASSERT_THROW(Hedwig::HostAddress::fromString(":80"), Hedwig::HostResolutionException);
+ 
+    // text after colon (isn't supported)
+    CPPUNIT_ASSERT_THROW(Hedwig::HostAddress::fromString("www.yahoo.com:http"), Hedwig::ConfigurationException);
+    
+    // invalid hostname
+    CPPUNIT_ASSERT_THROW(Hedwig::HostAddress::fromString("com.oohay.www:80"), Hedwig::HostResolutionException);
+    
+    // null
+    CPPUNIT_ASSERT_THROW(Hedwig::HostAddress::fromString(NULL), std::logic_error);
+  }
+};
+
+CPPUNIT_TEST_SUITE_NAMED_REGISTRATION( UtilTestSuite, "Util" );

Added: hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/java/org/apache/hedwig/client/api/MessageHandler.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/java/org/apache/hedwig/client/api/MessageHandler.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/java/org/apache/hedwig/client/api/MessageHandler.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/java/org/apache/hedwig/client/api/MessageHandler.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.client.api;
+
+import com.google.protobuf.ByteString;
+import org.apache.hedwig.protocol.PubSubProtocol.Message;
+import org.apache.hedwig.util.Callback;
+
+/**
+ * Interface to define the client handler logic to consume messages it is
+ * subscribed to.
+ * 
+ */
+public interface MessageHandler {
+
+    /**
+     * Consumes a message it is subscribed to and has been delivered to it.
+     * 
+     * @param topic
+     *            The topic name where the message came from.
+     * @param subscriberId
+     *            ID of the subscriber.
+     * @param msg
+     *            The message object to consume.
+     * @param callback
+     *            Callback to invoke when the message consumption has been done.
+     * @param context
+     *            Calling context that the Callback needs since this is done
+     *            asynchronously.
+     */
+    public void consume(ByteString topic, ByteString subscriberId, Message msg, Callback<Void> callback, Object context);
+
+}
\ No newline at end of file

Added: hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/java/org/apache/hedwig/client/api/Publisher.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/java/org/apache/hedwig/client/api/Publisher.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/java/org/apache/hedwig/client/api/Publisher.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/java/org/apache/hedwig/client/api/Publisher.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.client.api;
+
+import com.google.protobuf.ByteString;
+import org.apache.hedwig.exceptions.PubSubException.CouldNotConnectException;
+import org.apache.hedwig.exceptions.PubSubException.ServiceDownException;
+import org.apache.hedwig.protocol.PubSubProtocol.Message;
+import org.apache.hedwig.util.Callback;
+
+/**
+ * Interface to define the client Publisher API.
+ * 
+ */
+public interface Publisher {
+
+    /**
+     * Publishes a message on the given topic.
+     * 
+     * @param topic
+     *            Topic name to publish on
+     * @param msg
+     *            Message object to serialize and publish
+     * @throws CouldNotConnectException
+     *             If we are not able to connect to the server host
+     * @throws ServiceDownException
+     *             If we are unable to publish the message to the topic.
+     */
+    public void publish(ByteString topic, Message msg) throws CouldNotConnectException, ServiceDownException;
+
+    /**
+     * Publishes a message asynchronously on the given topic.
+     * 
+     * @param topic
+     *            Topic name to publish on
+     * @param msg
+     *            Message object to serialize and publish
+     * @param callback
+     *            Callback to invoke when the publish to the server has actually
+     *            gone through. This will have to deal with error conditions on
+     *            the async publish request.
+     * @param context
+     *            Calling context that the Callback needs since this is done
+     *            asynchronously.
+     */
+    public void asyncPublish(ByteString topic, Message msg, Callback<Void> callback, Object context);
+
+}
\ No newline at end of file

Added: hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/java/org/apache/hedwig/client/api/Subscriber.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/java/org/apache/hedwig/client/api/Subscriber.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/java/org/apache/hedwig/client/api/Subscriber.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/java/org/apache/hedwig/client/api/Subscriber.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,237 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.client.api;
+
+import java.util.List;
+
+import com.google.protobuf.ByteString;
+import org.apache.hedwig.client.exceptions.InvalidSubscriberIdException;
+import org.apache.hedwig.exceptions.PubSubException.ClientAlreadySubscribedException;
+import org.apache.hedwig.exceptions.PubSubException.ClientNotSubscribedException;
+import org.apache.hedwig.exceptions.PubSubException.CouldNotConnectException;
+import org.apache.hedwig.exceptions.PubSubException.ServiceDownException;
+import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId;
+import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach;
+import org.apache.hedwig.util.Callback;
+
+/**
+ * Interface to define the client Subscriber API.
+ * 
+ */
+public interface Subscriber {
+
+    /**
+     * Subscribe to the given topic for the inputted subscriberId.
+     * 
+     * @param topic
+     *            Topic name of the subscription
+     * @param subscriberId
+     *            ID of the subscriber
+     * @param mode
+     *            Whether to prohibit, tolerate, or require an existing
+     *            subscription.
+     * @throws CouldNotConnectException
+     *             If we are not able to connect to the server host
+     * @throws ClientAlreadySubscribedException
+     *             If client is already subscribed to the topic
+     * @throws ServiceDownException
+     *             If unable to subscribe to topic
+     * @throws InvalidSubscriberIdException
+     *             If the subscriberId is not valid. We may want to set aside
+     *             certain formats of subscriberId's for different purposes.
+     *             e.g. local vs. hub subscriber
+     */
+    public void subscribe(ByteString topic, ByteString subscriberId, CreateOrAttach mode)
+            throws CouldNotConnectException, ClientAlreadySubscribedException, ServiceDownException,
+            InvalidSubscriberIdException;
+
+    /**
+     * Subscribe to the given topic asynchronously for the inputted subscriberId
+     * disregarding if the topic has been created yet or not.
+     * 
+     * @param topic
+     *            Topic name of the subscription
+     * @param subscriberId
+     *            ID of the subscriber
+     * @param mode
+     *            Whether to prohibit, tolerate, or require an existing
+     *            subscription.
+     * @param callback
+     *            Callback to invoke when the subscribe request to the server
+     *            has actually gone through. This will have to deal with error
+     *            conditions on the async subscribe request.
+     * @param context
+     *            Calling context that the Callback needs since this is done
+     *            asynchronously.
+     */
+    public void asyncSubscribe(ByteString topic, ByteString subscriberId, CreateOrAttach mode, Callback<Void> callback,
+            Object context);
+
+    /**
+     * Unsubscribe from a topic that the subscriberId user has previously
+     * subscribed to.
+     * 
+     * @param topic
+     *            Topic name of the subscription
+     * @param subscriberId
+     *            ID of the subscriber
+     * @throws CouldNotConnectException
+     *             If we are not able to connect to the server host
+     * @throws ClientNotSubscribedException
+     *             If the client is not currently subscribed to the topic
+     * @throws ServiceDownException
+     *             If the server was down and unable to complete the request
+     * @throws InvalidSubscriberIdException
+     *             If the subscriberId is not valid. We may want to set aside
+     *             certain formats of subscriberId's for different purposes.
+     *             e.g. local vs. hub subscriber
+     */
+    public void unsubscribe(ByteString topic, ByteString subscriberId) throws CouldNotConnectException,
+            ClientNotSubscribedException, ServiceDownException, InvalidSubscriberIdException;
+
+    /**
+     * Unsubscribe from a topic asynchronously that the subscriberId user has
+     * previously subscribed to.
+     * 
+     * @param topic
+     *            Topic name of the subscription
+     * @param subscriberId
+     *            ID of the subscriber
+     * @param callback
+     *            Callback to invoke when the unsubscribe request to the server
+     *            has actually gone through. This will have to deal with error
+     *            conditions on the async unsubscribe request.
+     * @param context
+     *            Calling context that the Callback needs since this is done
+     *            asynchronously.
+     */
+    public void asyncUnsubscribe(ByteString topic, ByteString subscriberId, Callback<Void> callback, Object context);
+
+    /**
+     * Manually send a consume message to the server for the given inputs.
+     * 
+     * @param topic
+     *            Topic name of the subscription
+     * @param subscriberId
+     *            ID of the subscriber
+     * @param messageSeqId
+     *            Message Sequence ID for the latest message that the client app
+     *            has successfully consumed. All messages up to that point will
+     *            also be considered as consumed.            
+     * @throws ClientNotSubscribedException
+     *             If the client is not currently subscribed to the topic based
+     *             on the client's local state.
+     */
+    public void consume(ByteString topic, ByteString subscriberId, MessageSeqId messageSeqId)
+            throws ClientNotSubscribedException;
+
+    /**
+     * Checks if the subscriberId client is currently subscribed to the given
+     * topic.
+     * 
+     * @param topic
+     *            Topic name of the subscription.
+     * @param subscriberId
+     *            ID of the subscriber
+     * @throws CouldNotConnectException
+     *             If we are not able to connect to the server host
+     * @throws ServiceDownException
+     *             If there is an error checking the server if the client has a
+     *             subscription
+     * @return Boolean indicating if the client has a subscription or not.
+     */
+    public boolean hasSubscription(ByteString topic, ByteString subscriberId) throws CouldNotConnectException,
+            ServiceDownException;
+
+    /**
+     * Fills the input List with the subscriptions this subscriberId client is
+     * subscribed to.
+     * 
+     * @param subscriberId
+     *            ID of the subscriber
+     * @return List filled with subscription name (topic) strings.
+     * @throws CouldNotConnectException
+     *             If we are not able to connect to the server host
+     * @throws ServiceDownException
+     *             If there is an error retrieving the list of topics
+     */
+    public List<ByteString> getSubscriptionList(ByteString subscriberId) throws CouldNotConnectException,
+            ServiceDownException;
+
+    /**
+     * Begin delivery of messages from the server to us for this topic and
+     * subscriberId.
+     * 
+     * @param topic
+     *            Topic name of the subscription
+     * @param subscriberId
+     *            ID of the subscriber
+     * @param messageHandler
+     *            Message Handler that will consume the subscribed messages
+     * @throws ClientNotSubscribedException
+     *             If the client is not currently subscribed to the topic
+     */
+    public void startDelivery(ByteString topic, ByteString subscriberId, MessageHandler messageHandler)
+            throws ClientNotSubscribedException;
+
+    /**
+     * Stop delivery of messages for this topic and subscriberId.
+     * 
+     * @param topic
+     *            Topic name of the subscription
+     * @param subscriberId
+     *            ID of the subscriber
+     * @throws ClientNotSubscribedException
+     *             If the client is not currently subscribed to the topic
+     */
+    public void stopDelivery(ByteString topic, ByteString subscriberId) throws ClientNotSubscribedException;
+
+    /**
+     * Closes all of the client side cached data for this subscription without
+     * actually sending an unsubscribe request to the server. This will close
+     * the subscribe channel synchronously (if it exists) for the topic.
+     * 
+     * @param topic
+     *            Topic name of the subscription
+     * @param subscriberId
+     *            ID of the subscriber
+     * @throws ServiceDownException
+     *             If the subscribe channel was not able to be closed
+     *             successfully
+     */
+    public void closeSubscription(ByteString topic, ByteString subscriberId) throws ServiceDownException;
+
+    /**
+     * Closes all of the client side cached data for this subscription without
+     * actually sending an unsubscribe request to the server. This will close
+     * the subscribe channel asynchronously (if it exists) for the topic.
+     * 
+     * @param topic
+     *            Topic name of the subscription
+     * @param subscriberId
+     *            ID of the subscriber
+     * @param callback
+     *            Callback to invoke when the subscribe channel has been closed.
+     * @param context
+     *            Calling context that the Callback needs since this is done
+     *            asynchronously.
+     */
+    public void asyncCloseSubscription(ByteString topic, ByteString subscriberId, Callback<Void> callback,
+            Object context);
+
+}
\ No newline at end of file

Added: hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/java/org/apache/hedwig/client/benchmark/BenchmarkPublisher.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/java/org/apache/hedwig/client/benchmark/BenchmarkPublisher.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/java/org/apache/hedwig/client/benchmark/BenchmarkPublisher.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/java/org/apache/hedwig/client/benchmark/BenchmarkPublisher.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.client.benchmark;
+
+import com.google.protobuf.ByteString;
+import org.apache.hedwig.client.api.MessageHandler;
+import org.apache.hedwig.client.api.Publisher;
+import org.apache.hedwig.client.api.Subscriber;
+import org.apache.hedwig.client.benchmark.BenchmarkUtils.BenchmarkCallback;
+import org.apache.hedwig.client.benchmark.BenchmarkUtils.ThroughputLatencyAggregator;
+import org.apache.hedwig.protocol.PubSubProtocol.Message;
+import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach;
+import org.apache.hedwig.util.Callback;
+
+public class BenchmarkPublisher extends BenchmarkWorker {
+    Publisher publisher;
+    Subscriber subscriber;
+    int msgSize;
+    int nParallel;
+    double rate;
+
+    public BenchmarkPublisher(int numTopics, int numMessages, int numRegions, int startTopicLabel, int partitionIndex,
+            int numPartitions, Publisher publisher, Subscriber subscriber, int msgSize, int nParallel, int rate) {
+        super(numTopics, numMessages, numRegions, startTopicLabel, partitionIndex, numPartitions);
+        this.publisher = publisher;
+        this.msgSize = msgSize;
+        this.subscriber = subscriber;
+        this.nParallel = nParallel;
+
+        this.rate = rate / (numRegions * numPartitions + 0.0);
+    }
+
+    public void warmup(int nWarmup) throws Exception {
+        ByteString topic = ByteString.copyFromUtf8("warmup" + partitionIndex);
+        ByteString subId = ByteString.copyFromUtf8("sub");
+        subscriber.subscribe(topic, subId, CreateOrAttach.CREATE_OR_ATTACH);
+
+        subscriber.startDelivery(topic, subId, new MessageHandler() {
+            @Override
+            public void consume(ByteString topic, ByteString subscriberId, Message msg, Callback<Void> callback,
+                    Object context) {
+                // noop
+                callback.operationFinished(context, null);
+            }
+        });
+
+        // picking constants arbitarily for warmup phase
+        ThroughputLatencyAggregator agg = new ThroughputLatencyAggregator("acked pubs", nWarmup, 100);
+        Message msg = getMsg(1024);
+        for (int i = 0; i < nWarmup; i++) {
+            publisher.asyncPublish(topic, msg, new BenchmarkCallback(agg), null);
+        }
+
+        if (agg.tpAgg.queue.take() > 0) {
+            throw new RuntimeException("Warmup publishes failed!");
+        }
+
+    }
+
+    public Message getMsg(int size) {
+        StringBuilder sb = new StringBuilder();
+        for (int i = 0; i < size; i++) {
+            sb.append('a');
+        }
+        final ByteString body = ByteString.copyFromUtf8(sb.toString());
+        Message msg = Message.newBuilder().setBody(body).build();
+        return msg;
+    }
+
+    public Void call() throws Exception {
+        Message msg = getMsg(msgSize);
+
+        // Single warmup for every topic
+        int myPublishCount = 0;
+        for (int i = 0; i < numTopics; i++) {
+            if (!HedwigBenchmark.amIResponsibleForTopic(startTopicLabel + i, partitionIndex, numPartitions)){
+                continue;
+            }
+            ByteString topic = ByteString.copyFromUtf8(HedwigBenchmark.TOPIC_PREFIX + (startTopicLabel + i));
+            publisher.publish(topic, msg);
+            myPublishCount++;
+        }
+
+        long startTime = System.currentTimeMillis();
+        int myPublishLimit = numMessages / numRegions / numPartitions - myPublishCount;
+        myPublishCount = 0;
+        ThroughputLatencyAggregator agg = new ThroughputLatencyAggregator("acked pubs", myPublishLimit, nParallel);
+
+        int topicLabel = 0;
+
+        while (myPublishCount < myPublishLimit) {
+            int topicNum = startTopicLabel + topicLabel;
+            topicLabel = (topicLabel + 1) % numTopics;
+
+            if (!HedwigBenchmark.amIResponsibleForTopic(topicNum, partitionIndex, numPartitions)) {
+                continue;
+            }
+
+            ByteString topic = ByteString.copyFromUtf8(HedwigBenchmark.TOPIC_PREFIX + topicNum);
+
+            if (rate > 0) {
+                long delay = startTime + (long) (1000 * myPublishCount / rate) - System.currentTimeMillis();
+                if (delay > 0)
+                    Thread.sleep(delay);
+            }
+            publisher.asyncPublish(topic, msg, new BenchmarkCallback(agg), null);
+            myPublishCount++;
+        }
+
+        System.out.println("Finished unacked pubs: tput = " + BenchmarkUtils.calcTp(myPublishLimit, startTime)
+                + " ops/s");
+        // Wait till the benchmark test has completed 
+        agg.tpAgg.queue.take();
+        System.out.println(agg.summarize(startTime));
+        return null;
+    }
+
+}

Added: hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/java/org/apache/hedwig/client/benchmark/BenchmarkSubscriber.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/java/org/apache/hedwig/client/benchmark/BenchmarkSubscriber.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/java/org/apache/hedwig/client/benchmark/BenchmarkSubscriber.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/java/org/apache/hedwig/client/benchmark/BenchmarkSubscriber.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.client.benchmark;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Callable;
+
+import org.apache.log4j.Logger;
+
+import com.google.protobuf.ByteString;
+import org.apache.hedwig.client.api.MessageHandler;
+import org.apache.hedwig.client.api.Subscriber;
+import org.apache.hedwig.client.benchmark.BenchmarkUtils.BenchmarkCallback;
+import org.apache.hedwig.client.benchmark.BenchmarkUtils.ThroughputAggregator;
+import org.apache.hedwig.client.benchmark.BenchmarkUtils.ThroughputLatencyAggregator;
+import org.apache.hedwig.protocol.PubSubProtocol.Message;
+import org.apache.hedwig.protocol.PubSubProtocol.RegionSpecificSeqId;
+import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach;
+import org.apache.hedwig.util.Callback;
+
+public class BenchmarkSubscriber extends BenchmarkWorker implements Callable<Void>{
+    static final Logger logger = Logger.getLogger(BenchmarkSubscriber.class);
+    Subscriber subscriber;
+    ByteString subId;
+    
+
+    public BenchmarkSubscriber(int numTopics, int numMessages, int numRegions,
+            int startTopicLabel, int partitionIndex, int numPartitions, Subscriber subscriber, ByteString subId) {
+        super(numTopics, numMessages, numRegions, startTopicLabel, partitionIndex, numPartitions);
+        this.subscriber = subscriber;
+        this.subId = subId;        
+    }
+
+    public void warmup(int numWarmup) throws InterruptedException {
+        /*
+         * multiplying the number of ops by numParitions because we end up
+         * skipping many because of the partitioning logic
+         */
+        multiSub("warmup", "warmup", 0, numWarmup, numWarmup * numPartitions);
+    }
+
+    public Void call() throws Exception {
+
+        final ThroughputAggregator agg = new ThroughputAggregator("recvs", numMessages);
+        final Map<String, Long> lastSeqIdSeenMap = new HashMap<String, Long>();
+
+        for (int i = startTopicLabel; i < startTopicLabel + numTopics; i++) {
+
+            if (!HedwigBenchmark.amIResponsibleForTopic(i, partitionIndex, numPartitions)) {
+                continue;
+            }
+
+            final String topic = HedwigBenchmark.TOPIC_PREFIX + i;
+
+            subscriber.subscribe(ByteString.copyFromUtf8(topic), subId, CreateOrAttach.CREATE_OR_ATTACH);
+            subscriber.startDelivery(ByteString.copyFromUtf8(topic), subId, new MessageHandler() {
+
+                @Override
+                public void consume(ByteString thisTopic, ByteString subscriberId, Message msg,
+                        Callback<Void> callback, Object context) {
+                    if (logger.isDebugEnabled())
+                        logger.debug("Got message from src-region: " + msg.getSrcRegion() + " with seq-id: "
+                                + msg.getMsgId());
+
+                    String mapKey = topic + msg.getSrcRegion().toStringUtf8();
+                    Long lastSeqIdSeen = lastSeqIdSeenMap.get(mapKey);
+                    if (lastSeqIdSeen == null) {
+                        lastSeqIdSeen = (long) 0;
+                    }
+
+                    if (getSrcSeqId(msg) <= lastSeqIdSeen) {
+                        logger.info("Redelivery of message, src-region: " + msg.getSrcRegion() + "seq-id: "
+                                + msg.getMsgId());
+                    } else {
+                        agg.ding(false);
+                    }
+
+                    callback.operationFinished(context, null);
+                }
+            });
+        }
+        System.out.println("Finished subscribing to topics and now waiting for messages to come in...");
+        // Wait till the benchmark test has completed
+        agg.queue.take();            
+        System.out.println(agg.summarize(agg.earliest.get()));
+        return null;
+    }
+
+    long getSrcSeqId(Message msg) {
+        if (msg.getMsgId().getRemoteComponentsCount() == 0) {
+            return msg.getMsgId().getLocalComponent();
+        }
+
+        for (RegionSpecificSeqId rseqId : msg.getMsgId().getRemoteComponentsList()) {
+            if (rseqId.getRegion().equals(msg.getSrcRegion()))
+                return rseqId.getSeqId();
+        }
+
+        return msg.getMsgId().getLocalComponent();
+    }
+
+    void multiSub(String label, String topicPrefix, int start, final int npar, final int count)
+            throws InterruptedException {
+        long startTime = System.currentTimeMillis();
+        ThroughputLatencyAggregator agg = new ThroughputLatencyAggregator(label, count / numPartitions, npar);
+        int end = start + count;
+        for (int i = start; i < end; ++i) {
+            if (!HedwigBenchmark.amIResponsibleForTopic(i, partitionIndex, numPartitions)){
+                continue;
+            }
+            subscriber.asyncSubscribe(ByteString.copyFromUtf8(topicPrefix + i), subId, CreateOrAttach.CREATE_OR_ATTACH,
+                    new BenchmarkCallback(agg), null);
+        }
+        // Wait till the benchmark test has completed
+        agg.tpAgg.queue.take();
+        if (count > 1)
+            System.out.println(agg.summarize(startTime));
+    }
+
+}

Added: hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/java/org/apache/hedwig/client/benchmark/BenchmarkUtils.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/java/org/apache/hedwig/client/benchmark/BenchmarkUtils.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/java/org/apache/hedwig/client/benchmark/BenchmarkUtils.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/java/org/apache/hedwig/client/benchmark/BenchmarkUtils.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,176 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.client.benchmark;
+
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.log4j.Logger;
+
+import org.apache.hedwig.exceptions.PubSubException;
+import org.apache.hedwig.util.Callback;
+
+public class BenchmarkUtils {
+    static final Logger logger = Logger.getLogger(BenchmarkUtils.class);
+
+    public static double calcTp(final int count, long startTime) {
+        return 1000. * count / (System.currentTimeMillis() - startTime);
+    }
+
+    /**
+     * Stats aggregator for callback (round-trip) operations. Measures both
+     * throughput and latency.
+     */
+    public static class ThroughputLatencyAggregator {
+        int numBuckets;
+        final ThroughputAggregator tpAgg;
+        final Semaphore outstanding;
+        final AtomicLong sum = new AtomicLong();
+
+        final AtomicLong[] latencyBuckets;
+
+        // bucket[i] is count of number of operations that took >= i ms and <
+        // (i+1) ms.
+
+        public ThroughputLatencyAggregator(String label, int count, int limit) throws InterruptedException {
+            numBuckets = Integer.getInteger("numBuckets", 101);
+            latencyBuckets = new AtomicLong[numBuckets];
+            tpAgg = new ThroughputAggregator(label, count);
+            outstanding = new Semaphore(limit);
+            for (int i = 0; i < numBuckets; i++) {
+                latencyBuckets[i] = new AtomicLong();
+            }
+        }
+
+        public void reportLatency(long latency) {
+            sum.addAndGet(latency);
+
+            int bucketIndex;
+            if (latency >= numBuckets) {
+                bucketIndex = (int) numBuckets - 1;
+            } else {
+                bucketIndex = (int) latency;
+            }
+            latencyBuckets[bucketIndex].incrementAndGet();
+        }
+
+        private String getPercentile(double percentile) {
+            int numInliersNeeded = (int) (percentile / 100 * tpAgg.count);
+            int numInliersFound = 0;
+            for (int i = 0; i < numBuckets - 1; i++) {
+                numInliersFound += latencyBuckets[i].intValue();
+                if (numInliersFound > numInliersNeeded) {
+                    return i + "";
+                }
+            }
+            return " >= " + (numBuckets - 1);
+        }
+
+        public String summarize(long startTime) {
+            double percentile = Double.parseDouble(System.getProperty("percentile", "99.9"));
+            return tpAgg.summarize(startTime) + ", avg latency = " + sum.get() / tpAgg.count + ", " + percentile
+                    + "%ile latency = " + getPercentile(percentile);
+        }
+    }
+
+    /**
+     * Stats aggregator for non-callback (single-shot) operations. Measures just
+     * throughput.
+     */
+    public static class ThroughputAggregator {
+        final String label;
+        final int count;
+        final AtomicInteger done = new AtomicInteger();
+        final AtomicLong earliest = new AtomicLong();
+        final AtomicInteger numFailed = new AtomicInteger();
+        final LinkedBlockingQueue<Integer> queue = new LinkedBlockingQueue<Integer>();
+
+        public ThroughputAggregator(final String label, final int count) {
+            this.label = label;
+            this.count = count;
+            if (count == 0)
+                queue.add(0);
+            if (Boolean.getBoolean("progress")) {
+                new Thread(new Runnable() {
+                    @Override
+                    public void run() {
+                        try {
+                            for (int doneSnap = 0, prev = 0; doneSnap < count; prev = doneSnap, doneSnap = done.get()) {
+                                if (doneSnap > prev) {
+                                    System.out.println(label + " progress: " + doneSnap + " of " + count);
+                                }
+                                Thread.sleep(1000);
+                            }
+                        } catch (Exception ex) {
+                            throw new RuntimeException(ex);
+                        }
+                    }
+                }).start();
+            }
+        }
+
+        public void ding(boolean failed) {
+            int snapDone = done.incrementAndGet();
+            earliest.compareAndSet(0, System.currentTimeMillis());
+            if (failed)
+                numFailed.incrementAndGet();
+            if (logger.isDebugEnabled())
+                logger.debug(label + " " + (failed ? "failed" : "succeeded") + ", done so far = " + snapDone);
+            if (snapDone == count) {
+                queue.add(numFailed.get());
+            }
+        }
+
+        public String summarize(long startTime) {
+            return "Finished " + label + ": count = " + done.get() + ", tput = " + calcTp(count, startTime)
+                    + " ops/s, numFailed = " + numFailed;
+        }
+    }
+
+    public static class BenchmarkCallback implements Callback<Void> {
+
+        final ThroughputLatencyAggregator agg;
+        final long startTime;
+
+        public BenchmarkCallback(ThroughputLatencyAggregator agg) throws InterruptedException {
+            this.agg = agg;
+            agg.outstanding.acquire();
+            // Must set the start time *after* taking acquiring on outstanding.
+            startTime = System.currentTimeMillis();
+        }
+
+        private void finish(boolean failed) {
+            agg.reportLatency(System.currentTimeMillis() - startTime);
+            agg.tpAgg.ding(failed);
+            agg.outstanding.release();
+        }
+
+        @Override
+        public void operationFinished(Object ctx, Void resultOfOperation) {
+            finish(false);
+        }
+
+        @Override
+        public void operationFailed(Object ctx, PubSubException exception) {
+            finish(true);
+        }
+    };
+
+}

Added: hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/java/org/apache/hedwig/client/benchmark/BenchmarkWorker.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/java/org/apache/hedwig/client/benchmark/BenchmarkWorker.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/java/org/apache/hedwig/client/benchmark/BenchmarkWorker.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/java/org/apache/hedwig/client/benchmark/BenchmarkWorker.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.client.benchmark;
+
+public class BenchmarkWorker {
+    int numTopics;
+    int numMessages;
+    int numRegions;
+    int startTopicLabel;
+    int partitionIndex;
+    int numPartitions;
+
+    public BenchmarkWorker(int numTopics, int numMessages, int numRegions,
+            int startTopicLabel, int partitionIndex, int numPartitions) {
+        this.numTopics = numTopics;
+        this.numMessages = numMessages;
+        this.numRegions = numRegions;
+        this.startTopicLabel = startTopicLabel;
+        this.partitionIndex = partitionIndex;
+        this.numPartitions = numPartitions;
+
+        if (numMessages % (numTopics * numRegions) != 0) {
+            throw new RuntimeException("Number of messages not equally divisible among regions and topics");
+        }
+
+        if (numTopics % numPartitions != 0) {
+            throw new RuntimeException("Number of topics not equally divisible among partitions");
+        }
+
+    }
+}



Mime
View raw message