hadoop-zookeeper-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From br...@apache.org
Subject svn commit: r1021463 [2/2] - in /hadoop/zookeeper/trunk: ./ src/contrib/hedwig/client/src/main/cpp/ src/contrib/hedwig/client/src/main/cpp/inc/hedwig/ src/contrib/hedwig/client/src/main/cpp/lib/ src/contrib/hedwig/client/src/main/cpp/m4/ src/contrib/he...
Date Mon, 11 Oct 2010 19:00:43 GMT
Modified: hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/lib/publisherimpl.h
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/lib/publisherimpl.h?rev=1021463&r1=1021462&r2=1021463&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/lib/publisherimpl.h (original)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/lib/publisherimpl.h Mon Oct 11 19:00:42 2010
@@ -25,7 +25,7 @@
 namespace Hedwig {
   class PublishWriteCallback : public OperationCallback {
   public:
-    PublishWriteCallback(ClientImplPtr& client, const PubSubDataPtr& data);
+    PublishWriteCallback(const ClientImplPtr& client, const PubSubDataPtr& data);
 
     void operationComplete();
     void operationFailed(const std::exception& exception);
@@ -36,12 +36,12 @@ namespace Hedwig {
 
   class PublisherImpl : public Publisher {
   public:
-    PublisherImpl(ClientImplPtr& client);
+    PublisherImpl(const ClientImplPtr& client);
 
     void publish(const std::string& topic, const std::string& message);
     void asyncPublish(const std::string& topic, const std::string& message, const OperationCallbackPtr& callback);
     
-    void messageHandler(const PubSubResponse& m, const PubSubDataPtr& txn);
+    void messageHandler(const PubSubResponsePtr& m, const PubSubDataPtr& txn);
 
     void doPublish(const DuplexChannelPtr& channel, const PubSubDataPtr& data);
 

Modified: hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/lib/subscriberimpl.cpp
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/lib/subscriberimpl.cpp?rev=1021463&r1=1021462&r2=1021463&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/lib/subscriberimpl.cpp (original)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/lib/subscriberimpl.cpp Mon Oct 11 19:00:42 2010
@@ -20,13 +20,20 @@
 #include "util.h"
 #include "channel.h"
 
+#include <boost/asio.hpp>
+#include <boost/date_time/posix_time/posix_time.hpp>
+
 #include <log4cpp/Category.hh>
 
 static log4cpp::Category &LOG = log4cpp::Category::getInstance("hedwig."__FILE__);
-const int SUBSCRIBER_RECONNECT_TIME = 3000; // 3 seconds
+
 using namespace Hedwig;
+const int DEFAULT_MESSAGE_CONSUME_RETRY_WAIT_TIME = 5000;
+const int DEFAULT_SUBSCRIBER_CONSUME_RETRY_WAIT_TIME = 5000;
+const int DEFAULT_MAX_MESSAGE_QUEUE_SIZE = 10;
+const int DEFAULT_RECONNECT_SUBSCRIBE_RETRY_WAIT_TIME = 5000;
 
-SubscriberWriteCallback::SubscriberWriteCallback(ClientImplPtr& client, const PubSubDataPtr& data) : client(client), data(data) {}
+SubscriberWriteCallback::SubscriberWriteCallback(const ClientImplPtr& client, const PubSubDataPtr& data) : client(client), data(data) {}
 
 void SubscriberWriteCallback::operationComplete() {
   if (LOG.isDebugEnabled()) {
@@ -35,29 +42,43 @@ void SubscriberWriteCallback::operationC
 }
 
 void SubscriberWriteCallback::operationFailed(const std::exception& exception) {
-  LOG.errorStream() << "Error writing to publisher " << exception.what();
+  LOG.errorStream() << "Error writing to subscriber " << exception.what();
   
   //remove txn from channel pending list
-  #warning "Actually do something here"
+  data->getCallback()->operationFailed(exception);
+  client->getSubscriberImpl().closeSubscription(data->getTopic(), data->getSubscriberId());
 }
 
-UnsubscribeWriteCallback::UnsubscribeWriteCallback(ClientImplPtr& client, const PubSubDataPtr& data) : client(client), data(data) {}
+UnsubscribeWriteCallback::UnsubscribeWriteCallback(const ClientImplPtr& client, const PubSubDataPtr& data) : client(client), data(data) {}
 
 void UnsubscribeWriteCallback::operationComplete() {
-  
+  if (LOG.isDebugEnabled()) {
+    LOG.debugStream() << "Successfully wrote unsubscribe transaction: " << data->getTxnId();
+  }  
 }
 
 void UnsubscribeWriteCallback::operationFailed(const std::exception& exception) {
-  #warning "Actually do something here"
+  data->getCallback()->operationFailed(exception);
 }
   
-ConsumeWriteCallback::ConsumeWriteCallback(const PubSubDataPtr& data) 
-  : data(data) {
+ConsumeWriteCallback::ConsumeWriteCallback(const ClientImplPtr& client, const PubSubDataPtr& data) 
+  : client(client), data(data) {
 }
 
 ConsumeWriteCallback::~ConsumeWriteCallback() {
 }
 
+/* static */ void ConsumeWriteCallback::timerComplete(const ClientImplPtr& client, const PubSubDataPtr& data,
+						      const boost::system::error_code& error) {
+  if (error) {
+    // shutting down
+    return;
+  }
+
+  client->getSubscriberImpl().consume(data->getTopic(), data->getSubscriberId(), data->getMessageSeqId());
+}
+
+
 void ConsumeWriteCallback::operationComplete() {
   if (LOG.isDebugEnabled()) {
     LOG.debugStream() << "Successfully wrote consume transaction: " << data->getTxnId();
@@ -65,24 +86,54 @@ void ConsumeWriteCallback::operationComp
 }
 
 void ConsumeWriteCallback::operationFailed(const std::exception& exception) {
-  LOG.errorStream() << "Error writing consume transaction: " << data->getTxnId() << " error: " << exception.what();
+  int retrywait = client->getConfiguration().getInt(Configuration::MESSAGE_CONSUME_RETRY_WAIT_TIME, 
+						    DEFAULT_MESSAGE_CONSUME_RETRY_WAIT_TIME);
+  LOG.errorStream() << "Error writing consume transaction: " << data->getTxnId() << " error: " << exception.what() 
+		    << " retrying in " << retrywait << " Microseconds";
+
+  boost::asio::deadline_timer t(client->getService(), boost::posix_time::milliseconds(retrywait));
+
+  t.async_wait(boost::bind(&ConsumeWriteCallback::timerComplete, client, data, boost::asio::placeholders::error));  
 }
 
-SubscriberConsumeCallback::SubscriberConsumeCallback(ClientImplPtr& client, const std::string& topic, const std::string& subscriberid, const MessageSeqId& msgid) 
-  : client(client), topic(topic), subscriberid(subscriberid), msgid(msgid)
+SubscriberConsumeCallback::SubscriberConsumeCallback(const ClientImplPtr& client, 
+						     const SubscriberClientChannelHandlerPtr& handler, 
+						     const PubSubDataPtr& data, const PubSubResponsePtr& m) 
+  : client(client), handler(handler), data(data), m(m)
 {
 }
 
 void SubscriberConsumeCallback::operationComplete() {
-  LOG.errorStream() << "ConsumeCallback::operationComplete";
-  client->getSubscriber().consume(topic, subscriberid, msgid);
+  if (LOG.isDebugEnabled()) {
+    LOG.debugStream() << "ConsumeCallback::operationComplete " << data->getTopic() << " - " << data->getSubscriberId();
+  };
+  client->getSubscriber().consume(data->getTopic(), data->getSubscriberId(), m->message().msgid());
+}
+
+/* static */ void SubscriberConsumeCallback::timerComplete(const SubscriberClientChannelHandlerPtr handler, 
+							   const PubSubResponsePtr m, 
+							   const boost::system::error_code& error) {
+  if (error) {
+    return;
+  }
+  handler->messageReceived(handler->getChannel(), m);
 }
 
 void SubscriberConsumeCallback::operationFailed(const std::exception& exception) {
-  LOG.errorStream() << "ConsumeCallback::operationFailed";
+  LOG.errorStream() << "ConsumeCallback::operationFailed  " << data->getTopic() << " - " << data->getSubscriberId();
+  
+  int retrywait = client->getConfiguration().getInt(Configuration::SUBSCRIBER_CONSUME_RETRY_WAIT_TIME,
+						    DEFAULT_SUBSCRIBER_CONSUME_RETRY_WAIT_TIME);
+
+  LOG.errorStream() << "Error passing message to client transaction: " << data->getTxnId() << " error: " << exception.what() 
+		    << " retrying in " << retrywait << " Microseconds";
+
+  boost::asio::deadline_timer t(client->getService(), boost::posix_time::milliseconds(retrywait));
+
+  t.async_wait(boost::bind(&SubscriberConsumeCallback::timerComplete, handler, m, boost::asio::placeholders::error));  
 }
 
-SubscriberReconnectCallback::SubscriberReconnectCallback(ClientImplPtr& client, const PubSubDataPtr& origData) 
+SubscriberReconnectCallback::SubscriberReconnectCallback(const ClientImplPtr& client, const PubSubDataPtr& origData) 
   : client(client), origData(origData) {
 }
 
@@ -90,11 +141,13 @@ void SubscriberReconnectCallback::operat
 }
 
 void SubscriberReconnectCallback::operationFailed(const std::exception& exception) {
-  
+  LOG.errorStream() << "Error writing to new subscriber. Channel should pick this up disconnect the channel and try to connect again " << exception.what();
+
+
 }
 
-SubscriberClientChannelHandler::SubscriberClientChannelHandler(ClientImplPtr& client, SubscriberImpl& subscriber, const PubSubDataPtr& data)
-  : HedwigClientChannelHandler(client), subscriber(subscriber), origData(data), closed(false)  {
+SubscriberClientChannelHandler::SubscriberClientChannelHandler(const ClientImplPtr& client, SubscriberImpl& subscriber, const PubSubDataPtr& data)
+  : HedwigClientChannelHandler(client), subscriber(subscriber), origData(data), closed(false), should_wait(true)  {
   if (LOG.isDebugEnabled()) {
     LOG.debugStream() << "Creating SubscriberClientChannelHandler " << this;
   }
@@ -106,18 +159,21 @@ SubscriberClientChannelHandler::~Subscri
   }
 }
 
-void SubscriberClientChannelHandler::messageReceived(DuplexChannel* channel, const PubSubResponse& m) {
-  if (m.has_message()) {
+void SubscriberClientChannelHandler::messageReceived(const DuplexChannelPtr& channel, const PubSubResponsePtr& m) {
+  if (m->has_message()) {
     if (LOG.isDebugEnabled()) {
       LOG.debugStream() << "Message received (topic:" << origData->getTopic() << ", subscriberId:" << origData->getSubscriberId() << ")";
     }
 
     if (this->handler.get()) {
-      OperationCallbackPtr callback(new SubscriberConsumeCallback(client, origData->getTopic(), origData->getSubscriberId(), m.message().msgid()));
-      this->handler->consume(origData->getTopic(), origData->getSubscriberId(), m.message(), callback);
+      OperationCallbackPtr callback(new SubscriberConsumeCallback(client, shared_from_this(), origData, m));
+      this->handler->consume(origData->getTopic(), origData->getSubscriberId(), m->message(), callback);
     } else {
-      LOG.debugStream() << "putting in queue";
-      queue.push_back(m.message());
+      queue.push_back(m);
+      if (queue.size() >= (std::size_t)client->getConfiguration().getInt(Configuration::MAX_MESSAGE_QUEUE_SIZE,
+									 DEFAULT_MAX_MESSAGE_QUEUE_SIZE)) {
+	channel->stopReceiving();
+      }
     }
   } else {
     HedwigClientChannelHandler::messageReceived(channel, m);
@@ -126,12 +182,23 @@ void SubscriberClientChannelHandler::mes
 
 void SubscriberClientChannelHandler::close() {
   closed = true;
+
   if (channel) {
     channel->kill();
   }
 }
 
-void SubscriberClientChannelHandler::channelDisconnected(DuplexChannel* channel, const std::exception& e) {
+/*static*/ void SubscriberClientChannelHandler::reconnectTimerComplete(const SubscriberClientChannelHandlerPtr handler,
+								       const DuplexChannelPtr channel, const std::exception e, 
+								       const boost::system::error_code& error) {
+  if (error) {
+    return;
+  }
+  handler->should_wait = false;
+  handler->channelDisconnected(channel, e);
+}
+
+void SubscriberClientChannelHandler::channelDisconnected(const DuplexChannelPtr& channel, const std::exception& e) {
   // has subscription been closed
   if (closed) {
     return;
@@ -142,59 +209,60 @@ void SubscriberClientChannelHandler::cha
   if (client->shuttingDown()) {
     return;
   }
-  
+
+  if (should_wait) {
+    int retrywait = client->getConfiguration().getInt(Configuration::RECONNECT_SUBSCRIBE_RETRY_WAIT_TIME,
+						      DEFAULT_RECONNECT_SUBSCRIBE_RETRY_WAIT_TIME);
+    
+    boost::asio::deadline_timer t(client->getService(), boost::posix_time::milliseconds(retrywait));
+    t.async_wait(boost::bind(&SubscriberClientChannelHandler::reconnectTimerComplete, shared_from_this(), 
+			     channel, e, boost::asio::placeholders::error));  
+
+  }
+  should_wait = true;
+
   // setup pubsub data for reconnection attempt
   origData->clearTriedServers();
   OperationCallbackPtr newcallback(new SubscriberReconnectCallback(client, origData));
   origData->setCallback(newcallback);
 
   // Create a new handler for the new channel
-  SubscriberClientChannelHandlerPtr handler(new SubscriberClientChannelHandler(client, subscriber, origData));  
-  ChannelHandlerPtr baseptr = handler;
-  // if there is an error createing the channel, sleep for SUBSCRIBER_RECONNECT_TIME and try again
-  DuplexChannelPtr newchannel;
-  while (true) {
-    try {
-      newchannel = client->createChannelForTopic(origData->getTopic(), baseptr);
-      handler->setChannel(newchannel);
-      break;
-    } catch (ShuttingDownException& e) {
-      LOG.errorStream() << "Shutting down, don't try to reconnect";
-      return; 
-    } catch (ChannelException& e) {
-      LOG.errorStream() << "Couldn't acquire channel, sleeping for " << SUBSCRIBER_RECONNECT_TIME << " before trying again";
-      usleep(SUBSCRIBER_RECONNECT_TIME);
-    }
-  } 
-  handoverDelivery(handler.get());
+  SubscriberClientChannelHandlerPtr newhandler(new SubscriberClientChannelHandler(client, subscriber, origData));  
+  ChannelHandlerPtr baseptr = newhandler;
+  
+  DuplexChannelPtr newchannel = client->createChannel(origData->getTopic(), baseptr);
+  newhandler->setChannel(newchannel);
+  handoverDelivery(newhandler);
   
   // remove record of the failed channel from the subscriber
-  subscriber.closeSubscription(origData->getTopic(), origData->getSubscriberId());
-
+  client->getSubscriberImpl().closeSubscription(origData->getTopic(), origData->getSubscriberId());
+  
   // subscriber
-  subscriber.doSubscribe(newchannel, origData, handler);
+  client->getSubscriberImpl().doSubscribe(newchannel, origData, newhandler);
 }
 
 void SubscriberClientChannelHandler::startDelivery(const MessageHandlerCallbackPtr& handler) {
   this->handler = handler;
   
   while (!queue.empty()) {    
-    LOG.debugStream() << "Taking from queue";
-    Message m = queue.front();
+    PubSubResponsePtr m = queue.front();
     queue.pop_front();
 
-    OperationCallbackPtr callback(new SubscriberConsumeCallback(client, origData->getTopic(), origData->getSubscriberId(), m.msgid()));
+    OperationCallbackPtr callback(new SubscriberConsumeCallback(client, shared_from_this(), origData, m));
 
-    this->handler->consume(origData->getTopic(), origData->getSubscriberId(), m, callback);
+    this->handler->consume(origData->getTopic(), origData->getSubscriberId(), m->message(), callback);
   }
+  channel->startReceiving();
 }
 
 void SubscriberClientChannelHandler::stopDelivery() {
+  channel->stopReceiving();
+
   this->handler = MessageHandlerCallbackPtr();
 }
 
 
-void SubscriberClientChannelHandler::handoverDelivery(SubscriberClientChannelHandler* newHandler) {
+void SubscriberClientChannelHandler::handoverDelivery(const SubscriberClientChannelHandlerPtr& newHandler) {
   LOG.debugStream() << "Messages in queue " << queue.size();
   MessageHandlerCallbackPtr handler = this->handler;
   stopDelivery(); // resets old handler
@@ -209,7 +277,7 @@ DuplexChannelPtr& SubscriberClientChanne
   return channel;
 }
 
-SubscriberImpl::SubscriberImpl(ClientImplPtr& client) 
+SubscriberImpl::SubscriberImpl(const ClientImplPtr& client) 
   : client(client) 
 {
 }
@@ -221,7 +289,8 @@ SubscriberImpl::~SubscriberImpl() 
 
 
 void SubscriberImpl::subscribe(const std::string& topic, const std::string& subscriberId, const SubscribeRequest::CreateOrAttach mode) {
-  SyncOperationCallback* cb = new SyncOperationCallback();
+  SyncOperationCallback* cb = new SyncOperationCallback(client->getConfiguration().getInt(Configuration::SYNC_REQUEST_TIMEOUT, 
+											  DEFAULT_SYNC_REQUEST_TIMEOUT));
   OperationCallbackPtr callback(cb);
   asyncSubscribe(topic, subscriberId, mode, callback);
   cb->wait();
@@ -232,37 +301,35 @@ void SubscriberImpl::subscribe(const std
 void SubscriberImpl::asyncSubscribe(const std::string& topic, const std::string& subscriberId, const SubscribeRequest::CreateOrAttach mode, const OperationCallbackPtr& callback) {
   PubSubDataPtr data = PubSubData::forSubscribeRequest(client->counter().next(), subscriberId, topic, callback, mode);
 
-  SubscriberClientChannelHandlerPtr handler(new SubscriberClientChannelHandler(client, *this, data));  
+  SubscriberClientChannelHandlerPtr handler(new SubscriberClientChannelHandler(client, *this, data));
   ChannelHandlerPtr baseptr = handler;
-  DuplexChannelPtr channel = client->createChannelForTopic(topic, baseptr);
-  
-  handler->setChannel(channel);
 
+  DuplexChannelPtr channel = client->createChannel(topic, handler);
+  handler->setChannel(channel);
   doSubscribe(channel, data, handler);
 }
 
 void SubscriberImpl::doSubscribe(const DuplexChannelPtr& channel, const PubSubDataPtr& data, const SubscriberClientChannelHandlerPtr& handler) {
-  LOG.debugStream() << "doSubscribe";
   channel->storeTransaction(data);
 
   OperationCallbackPtr writecb(new SubscriberWriteCallback(client, data));
   channel->writeRequest(data->getRequest(), writecb);
 
-  topicsubscriber2handler_lock.lock();
+  boost::lock_guard<boost::shared_mutex> lock(topicsubscriber2handler_lock);
   TopicSubscriber t(data->getTopic(), data->getSubscriberId());
   SubscriberClientChannelHandlerPtr oldhandler = topicsubscriber2handler[t];
   if (oldhandler != NULL) {
-    oldhandler->handoverDelivery(handler.get());
+    oldhandler->handoverDelivery(handler);
   }
   topicsubscriber2handler[t] = handler;
   if (LOG.isDebugEnabled()) {
     LOG.debugStream() << "Set topic subscriber for topic(" << data->getTopic() << ") subscriberId(" << data->getSubscriberId() << ") to " << handler.get() << " topicsubscriber2topic(" << &topicsubscriber2handler << ")";
   }
-  topicsubscriber2handler_lock.unlock();;
 }
 
 void SubscriberImpl::unsubscribe(const std::string& topic, const std::string& subscriberId) {
-  SyncOperationCallback* cb = new SyncOperationCallback();
+  SyncOperationCallback* cb = new SyncOperationCallback(client->getConfiguration().getInt(Configuration::SYNC_REQUEST_TIMEOUT, 
+											  DEFAULT_SYNC_REQUEST_TIMEOUT));
   OperationCallbackPtr callback(cb);
   asyncUnsubscribe(topic, subscriberId, callback);
   cb->wait();
@@ -275,14 +342,8 @@ void SubscriberImpl::asyncUnsubscribe(co
 
   PubSubDataPtr data = PubSubData::forUnsubscribeRequest(client->counter().next(), subscriberId, topic, callback);
   
-  DuplexChannelPtr channel = client->getChannelForTopic(topic);
-  if (channel.get() == 0) {
-    LOG.errorStream() << "Trying to unsubscribe from (" << topic << ", " << subscriberId << ") but channel is dead";
-    callback->operationFailed(InvalidStateException());
-    return;
-  }
-  
-  doUnsubscribe(channel, data);  
+  DuplexChannelPtr channel = client->getChannel(topic);
+  doUnsubscribe(channel, data);
 }
 
 void SubscriberImpl::doUnsubscribe(const DuplexChannelPtr& channel, const PubSubDataPtr& data) {
@@ -293,10 +354,9 @@ void SubscriberImpl::doUnsubscribe(const
 
 void SubscriberImpl::consume(const std::string& topic, const std::string& subscriberId, const MessageSeqId& messageSeqId) {
   TopicSubscriber t(topic, subscriberId);
-
-  topicsubscriber2handler_lock.lock();
+  
+  boost::shared_lock<boost::shared_mutex> lock(topicsubscriber2handler_lock);
   SubscriberClientChannelHandlerPtr handler = topicsubscriber2handler[t];
-  topicsubscriber2handler_lock.unlock();
 
   if (handler.get() == 0) {
     LOG.errorStream() << "Cannot consume. Bad handler for topic(" << topic << ") subscriberId(" << subscriberId << ") topicsubscriber2topic(" << &topicsubscriber2handler << ")";
@@ -309,16 +369,15 @@ void SubscriberImpl::consume(const std::
   }
   
   PubSubDataPtr data = PubSubData::forConsumeRequest(client->counter().next(), subscriberId, topic, messageSeqId);  
-  OperationCallbackPtr writecb(new ConsumeWriteCallback(data));
+  OperationCallbackPtr writecb(new ConsumeWriteCallback(client, data));
   channel->writeRequest(data->getRequest(), writecb);
 }
 
 void SubscriberImpl::startDelivery(const std::string& topic, const std::string& subscriberId, const MessageHandlerCallbackPtr& callback) {
   TopicSubscriber t(topic, subscriberId);
 
-  topicsubscriber2handler_lock.lock();
+  boost::shared_lock<boost::shared_mutex> lock(topicsubscriber2handler_lock);
   SubscriberClientChannelHandlerPtr handler = topicsubscriber2handler[t];
-  topicsubscriber2handler_lock.unlock();
 
   if (handler.get() == 0) {
     LOG.errorStream() << "Trying to start deliver on a non existant handler topic = " << topic << ", subscriber = " << subscriberId;
@@ -328,10 +387,9 @@ void SubscriberImpl::startDelivery(const
 
 void SubscriberImpl::stopDelivery(const std::string& topic, const std::string& subscriberId) {
   TopicSubscriber t(topic, subscriberId);
-
-  topicsubscriber2handler_lock.lock();
+  
+  boost::shared_lock<boost::shared_mutex> lock(topicsubscriber2handler_lock);
   SubscriberClientChannelHandlerPtr handler = topicsubscriber2handler[t];
-  topicsubscriber2handler_lock.unlock();
 
   if (handler.get() == 0) {
     LOG.errorStream() << "Trying to start deliver on a non existant handler topic = " << topic << ", subscriber = " << subscriberId;
@@ -345,11 +403,14 @@ void SubscriberImpl::closeSubscription(c
   }
   TopicSubscriber t(topic, subscriberId);
 
-  topicsubscriber2handler_lock.lock();;
-  SubscriberClientChannelHandlerPtr handler = topicsubscriber2handler[t];
-  topicsubscriber2handler.erase(t);
-  topicsubscriber2handler_lock.unlock();;
-  if (handler) {
+  SubscriberClientChannelHandlerPtr handler;
+  {
+    boost::lock_guard<boost::shared_mutex> lock(topicsubscriber2handler_lock);
+    handler = topicsubscriber2handler[t];
+    topicsubscriber2handler.erase(t);
+  }
+  
+  if (handler.get() != 0) {
     handler->close();
   }
 }
@@ -357,16 +418,16 @@ void SubscriberImpl::closeSubscription(c
 /**
    takes ownership of txn
 */
-void SubscriberImpl::messageHandler(const PubSubResponse& m, const PubSubDataPtr& txn) {
+void SubscriberImpl::messageHandler(const PubSubResponsePtr& m, const PubSubDataPtr& txn) {
   if (!txn.get()) {
     LOG.errorStream() << "Invalid transaction";
     return;
   }
 
   if (LOG.isDebugEnabled()) {
-    LOG.debugStream() << "message received with status " << m.statuscode();
+    LOG.debugStream() << "message received with status " << m->statuscode();
   }
-  switch (m.statuscode()) {
+  switch (m->statuscode()) {
   case SUCCESS:
     txn->getCallback()->operationComplete();
     break;

Modified: hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/lib/subscriberimpl.h
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/lib/subscriberimpl.h?rev=1021463&r1=1021462&r2=1021463&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/lib/subscriberimpl.h (original)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/lib/subscriberimpl.h Mon Oct 11 19:00:42 2010
@@ -25,98 +25,115 @@
 #include <tr1/memory>
 #include <deque>
 
+#include <boost/shared_ptr.hpp>
+#include <boost/enable_shared_from_this.hpp>
+#include <boost/thread/shared_mutex.hpp>
+
 namespace Hedwig {
   class SubscriberWriteCallback : public OperationCallback {
   public:
-    SubscriberWriteCallback(ClientImplPtr& client, const PubSubDataPtr& data);
+    SubscriberWriteCallback(const ClientImplPtr& client, const PubSubDataPtr& data);
 
     void operationComplete();
     void operationFailed(const std::exception& exception);
   private:
-    ClientImplPtr client;
-    PubSubDataPtr data;
+    const ClientImplPtr client;
+    const PubSubDataPtr data;
   };
   
   class UnsubscribeWriteCallback : public OperationCallback {
   public:
-    UnsubscribeWriteCallback(ClientImplPtr& client, const PubSubDataPtr& data);
+    UnsubscribeWriteCallback(const ClientImplPtr& client, const PubSubDataPtr& data);
 
     void operationComplete();
     void operationFailed(const std::exception& exception);
   private:
-    ClientImplPtr client;
-    PubSubDataPtr data;
+    const ClientImplPtr client;
+    const PubSubDataPtr data;
   };
 
   class ConsumeWriteCallback : public OperationCallback {
   public:
-    ConsumeWriteCallback(const PubSubDataPtr& data);
+    ConsumeWriteCallback(const ClientImplPtr& client, const PubSubDataPtr& data);
     ~ConsumeWriteCallback();
 
     void operationComplete();
     void operationFailed(const std::exception& exception);
+    
+    static void timerComplete(const ClientImplPtr& client, const PubSubDataPtr& data, const boost::system::error_code& error);
   private:
-    PubSubDataPtr data;
+    const ClientImplPtr client;
+    const PubSubDataPtr data;
     };
 
   class SubscriberReconnectCallback : public OperationCallback {
   public: 
-    SubscriberReconnectCallback(ClientImplPtr& client, const PubSubDataPtr& origData);
+    SubscriberReconnectCallback(const ClientImplPtr& client, const PubSubDataPtr& origData);
 
     void operationComplete();
     void operationFailed(const std::exception& exception);
   private:
-    ClientImplPtr client;
-    PubSubDataPtr origData;
+    const ClientImplPtr client;
+    const PubSubDataPtr origData;
   };
 
   class SubscriberClientChannelHandler;
-  typedef std::tr1::shared_ptr<SubscriberClientChannelHandler> SubscriberClientChannelHandlerPtr;
+  typedef boost::shared_ptr<SubscriberClientChannelHandler> SubscriberClientChannelHandlerPtr;
 
   class SubscriberConsumeCallback : public OperationCallback {
   public: 
-    SubscriberConsumeCallback(ClientImplPtr& client, const std::string& topic, const std::string& subscriberid, const MessageSeqId& msgid);
+    SubscriberConsumeCallback(const ClientImplPtr& client, const SubscriberClientChannelHandlerPtr& handler, const PubSubDataPtr& data, const PubSubResponsePtr& m);
 
     void operationComplete();
     void operationFailed(const std::exception& exception);
+    static void timerComplete(const SubscriberClientChannelHandlerPtr handler, 
+			      const PubSubResponsePtr m, 
+			      const boost::system::error_code& error);
+
   private:
-    ClientImplPtr client;
-    const std::string topic;
-    const std::string subscriberid;
-    MessageSeqId msgid;
+    const ClientImplPtr client;
+    const SubscriberClientChannelHandlerPtr handler;
+    
+    const PubSubDataPtr data;
+    const PubSubResponsePtr m;
   };
 
-  class SubscriberClientChannelHandler : public HedwigClientChannelHandler {
+  class SubscriberClientChannelHandler : public HedwigClientChannelHandler, 
+					 public boost::enable_shared_from_this<SubscriberClientChannelHandler> {
   public: 
-    SubscriberClientChannelHandler(ClientImplPtr& client, SubscriberImpl& subscriber, const PubSubDataPtr& data);
+    SubscriberClientChannelHandler(const ClientImplPtr& client, SubscriberImpl& subscriber, const PubSubDataPtr& data);
     ~SubscriberClientChannelHandler();
 
-    void messageReceived(DuplexChannel* channel, const PubSubResponse& m);
-    void channelDisconnected(DuplexChannel* channel, const std::exception& e);
+    void messageReceived(const DuplexChannelPtr& channel, const PubSubResponsePtr& m);
+    void channelDisconnected(const DuplexChannelPtr& channel, const std::exception& e);
 
     void startDelivery(const MessageHandlerCallbackPtr& handler);
     void stopDelivery();
 
-    void handoverDelivery(SubscriberClientChannelHandler* newHandler);
+    void handoverDelivery(const SubscriberClientChannelHandlerPtr& newHandler);
 
     void setChannel(const DuplexChannelPtr& channel);
     DuplexChannelPtr& getChannel();
 
+    static void reconnectTimerComplete(const SubscriberClientChannelHandlerPtr handler, const DuplexChannelPtr channel, const std::exception e, 
+				       const boost::system::error_code& error);
+
     void close();
   private:
 
     SubscriberImpl& subscriber;
-#warning "put some limit on this to stop it growing forever"
-    std::deque<Message> queue;
+    std::deque<PubSubResponsePtr> queue;
+    
     MessageHandlerCallbackPtr handler;
     PubSubDataPtr origData;
     DuplexChannelPtr channel;
     bool closed;
+    bool should_wait;
   };
 
   class SubscriberImpl : public Subscriber {
   public:
-    SubscriberImpl(ClientImplPtr& client);
+    SubscriberImpl(const ClientImplPtr& client);
     ~SubscriberImpl();
 
     void subscribe(const std::string& topic, const std::string& subscriberId, const SubscribeRequest::CreateOrAttach mode);
@@ -132,16 +149,16 @@ namespace Hedwig {
 
     void closeSubscription(const std::string& topic, const std::string& subscriberId);
 
-    void messageHandler(const PubSubResponse& m, const PubSubDataPtr& txn);
+    void messageHandler(const PubSubResponsePtr& m, const PubSubDataPtr& txn);
 
     void doSubscribe(const DuplexChannelPtr& channel, const PubSubDataPtr& data, const SubscriberClientChannelHandlerPtr& handler);
     void doUnsubscribe(const DuplexChannelPtr& channel, const PubSubDataPtr& data);
 
   private:
-    ClientImplPtr client;
+    const ClientImplPtr client;
     
-    std::tr1::unordered_map<TopicSubscriber, SubscriberClientChannelHandlerPtr> topicsubscriber2handler;
-    Mutex topicsubscriber2handler_lock;	    
+    std::tr1::unordered_map<TopicSubscriber, SubscriberClientChannelHandlerPtr, TopicSubscriberHash > topicsubscriber2handler;
+    boost::shared_mutex topicsubscriber2handler_lock;	    
   };
 
 };

Modified: hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/lib/util.cpp
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/lib/util.cpp?rev=1021463&r1=1021462&r2=1021463&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/lib/util.cpp (original)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/lib/util.cpp Mon Oct 11 19:00:42 2010
@@ -139,95 +139,3 @@ HostAddress HostAddress::fromString(std:
   return h;
 }
 
-WaitConditionBase::WaitConditionBase() {
-  pthread_mutex_init(&mutex, NULL);
-  pthread_cond_init(&cond, NULL);  
-}
-
-WaitConditionBase::~WaitConditionBase() {
-  pthread_mutex_destroy(&mutex);
-  pthread_cond_destroy(&cond);
-}
-    
-void WaitConditionBase::wait() {
-  pthread_mutex_lock(&mutex);
-  while (!isTrue()) {
-    if (LOG.isDebugEnabled()) {
-      LOG.debugStream() << "wait: condition is false for " << this;
-    }
-
-    pthread_cond_wait(&cond, &mutex); 
-  }
-  pthread_mutex_unlock(&mutex);
-}
-
-void WaitConditionBase::lock() {
-  pthread_mutex_lock(&mutex);
-}
-
-void WaitConditionBase::signalAndUnlock() {
-  if (LOG.isDebugEnabled()) {
-    LOG.debugStream() << "signal: signal " << this;
-  }
-  
-  pthread_cond_signal(&cond);
-  
-  pthread_mutex_unlock(&mutex);
-}
-
-Mutex::Mutex() {
-  if (LOG.isDebugEnabled()) {
-    LOG.debugStream() << "Creating mutex " << this;
-  }
-  int error = pthread_mutex_init(&mutex, NULL);
-  if (error != 0) {
-    LOG.errorStream() << "Error initiating mutex " << error;
-  }
-}
-
-Mutex::~Mutex() {
-  if (LOG.isDebugEnabled()) {
-    LOG.debugStream() << "Destroying mutex " << this;
-  }
-
-  int error = pthread_mutex_destroy(&mutex);
-  if (error != 0) {
-    LOG.errorStream() << "Error destroying mutex " << this << " " << error;
-  }
-}
-
-void Mutex::lock() {
-  if (LOG.isDebugEnabled()) {
-    LOG.debugStream() << "Locking mutex " << this;
-  }
-    
-  int error = pthread_mutex_lock(&mutex);
-  if (error != 0) {
-    LOG.errorStream() << "Error locking mutex " << this << " " << error;
-  }
-}
-
-void Mutex::unlock() {
-  if (LOG.isDebugEnabled()) {
-    LOG.debugStream() << "Unlocking mutex " << this;
-  }
-
-  int error = pthread_mutex_unlock(&mutex);
-  if (error != 0) {
-    LOG.errorStream() << "Error unlocking mutex " << this << " " << error;
-  }
-}
-
-std::size_t std::tr1::hash<HostAddress>::operator()(const HostAddress& address) const {
-  return (address.ip() << 16) & (address.port());
-}
-
-std::size_t std::tr1::hash<DuplexChannel*>::operator()(const DuplexChannel* channel) const {
-  return reinterpret_cast<std::size_t>(channel);
-}
-
-std::size_t std::tr1::hash<TopicSubscriber>::operator()(const TopicSubscriber& topicsub) const {
-  std::string fullstr = topicsub.first + topicsub.second;
-  return std::tr1::hash<std::string>()(fullstr);
-}
-

Modified: hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/lib/util.h
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/lib/util.h?rev=1021463&r1=1021462&r2=1021463&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/lib/util.h (original)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/lib/util.h Mon Oct 11 19:00:42 2010
@@ -60,63 +60,27 @@ namespace Hedwig {
     struct sockaddr_in socket_addr;
   };
 
-  class DuplexChannel;  
-  
-  class Mutex {
-  public:
-    Mutex();
-    ~Mutex();
-    
-    void lock();
-    void unlock();
-  private:
-    pthread_mutex_t mutex;
-  };
-
-  class WaitConditionBase {
-  public:
-    WaitConditionBase();
-    virtual ~WaitConditionBase();
-    
-    void wait(); 
-    void lock();
-    void signalAndUnlock();
-
-    virtual bool isTrue() = 0;
-  private:
-
-    pthread_mutex_t mutex;
-    pthread_cond_t cond;    
-  };
-
-};
-
-namespace std 
-{
-  namespace tr1 
-  {
   /**
      Hash a host address. Takes the least significant 16-bits of the address and the 16-bits of the
      port and packs them into one 32-bit number. While collisons are theoretically very possible, they
      shouldn't happen as the hedwig servers should be in the same subnet.
   */
-  template <> struct hash<Hedwig::HostAddress> : public unary_function<Hedwig::HostAddress, size_t> {
-    size_t operator()(const Hedwig::HostAddress& address) const;
+  struct HostAddressHash : public std::unary_function<Hedwig::HostAddress, size_t> {
+    size_t operator()(const Hedwig::HostAddress& address) const {
+        return (address.ip() << 16) & (address.port());
+    }
   };
 
-  /**
-     Hash a channel pointer, just returns the pointer.
-  */
-  template <> struct hash<Hedwig::DuplexChannel*> : public unary_function<Hedwig::DuplexChannel*, size_t> {
-    size_t operator()(const Hedwig::DuplexChannel* channel) const;
-  };
 
   /**
      Hash a channel pointer, just returns the pointer.
   */
-  template <> struct hash<Hedwig::TopicSubscriber> : public unary_function<Hedwig::TopicSubscriber, size_t> {
-    size_t operator()(const Hedwig::TopicSubscriber& topicsub) const;
+  struct TopicSubscriberHash : public std::unary_function<Hedwig::TopicSubscriber, size_t> {
+    size_t operator()(const Hedwig::TopicSubscriber& topicsub) const {
+      std::string fullstr = topicsub.first + topicsub.second;
+      return std::tr1::hash<std::string>()(fullstr);
+    }
   };
-  }
-}
+};
+
 #endif

Added: hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/m4/ax_boost_asio.m4
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/m4/ax_boost_asio.m4?rev=1021463&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/m4/ax_boost_asio.m4 (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/m4/ax_boost_asio.m4 Mon Oct 11 19:00:42 2010
@@ -0,0 +1,111 @@
+# ===========================================================================
+#       http://www.gnu.org/software/autoconf-archive/ax_boost_asio.html
+# ===========================================================================
+#
+# SYNOPSIS
+#
+#   AX_BOOST_ASIO
+#
+# DESCRIPTION
+#
+#   Test for Asio library from the Boost C++ libraries. The macro requires a
+#   preceding call to AX_BOOST_BASE. Further documentation is available at
+#   <http://randspringer.de/boost/index.html>.
+#
+#   This macro calls:
+#
+#     AC_SUBST(BOOST_ASIO_LIB)
+#
+#   And sets:
+#
+#     HAVE_BOOST_ASIO
+#
+# LICENSE
+#
+#   Copyright (c) 2008 Thomas Porschberg <thomas@randspringer.de>
+#   Copyright (c) 2008 Pete Greenwell <pete@mu.org>
+#
+#   Copying and distribution of this file, with or without modification, are
+#   permitted in any medium without royalty provided the copyright notice
+#   and this notice are preserved. This file is offered as-is, without any
+#   warranty.
+
+#serial 9
+
+AC_DEFUN([AX_BOOST_ASIO],
+[
+	AC_ARG_WITH([boost-asio],
+	AS_HELP_STRING([--with-boost-asio@<:@=special-lib@:>@],
+                   [use the ASIO library from boost - it is possible to specify a certain library for the linker
+                        e.g. --with-boost-asio=boost_system-gcc41-mt-1_34 ]),
+        [
+        if test "$withval" = "no"; then
+			want_boost="no"
+        elif test "$withval" = "yes"; then
+            want_boost="yes"
+            ax_boost_user_asio_lib=""
+        else
+		    want_boost="yes"
+        	ax_boost_user_asio_lib="$withval"
+		fi
+        ],
+        [want_boost="yes"]
+	)
+
+	if test "x$want_boost" = "xyes"; then
+        AC_REQUIRE([AC_PROG_CC])
+		CPPFLAGS_SAVED="$CPPFLAGS"
+		CPPFLAGS="$CPPFLAGS $BOOST_CPPFLAGS"
+		export CPPFLAGS
+
+		LDFLAGS_SAVED="$LDFLAGS"
+		LDFLAGS="$LDFLAGS $BOOST_LDFLAGS"
+		export LDFLAGS
+
+        AC_CACHE_CHECK(whether the Boost::ASIO library is available,
+					   ax_cv_boost_asio,
+        [AC_LANG_PUSH([C++])
+		 AC_COMPILE_IFELSE(AC_LANG_PROGRAM([[ @%:@include <boost/asio.hpp>
+											]],
+                                  [[
+
+                                    boost::asio::io_service io;
+                                    boost::system::error_code timer_result;
+                                    boost::asio::deadline_timer t(io);
+                                    t.cancel();
+                                    io.run_one();
+									return 0;
+                                   ]]),
+                             ax_cv_boost_asio=yes, ax_cv_boost_asio=no)
+         AC_LANG_POP([C++])
+		])
+		if test "x$ax_cv_boost_asio" = "xyes"; then
+			AC_DEFINE(HAVE_BOOST_ASIO,,[define if the Boost::ASIO library is available])
+			BN=boost_system
+            if test "x$ax_boost_user_asio_lib" = "x"; then
+				for ax_lib in $BN $BN-$CC $BN-$CC-mt $BN-$CC-mt-s $BN-$CC-s \
+                              lib$BN lib$BN-$CC lib$BN-$CC-mt lib$BN-$CC-mt-s lib$BN-$CC-s \
+                              $BN-mgw $BN-mgw $BN-mgw-mt $BN-mgw-mt-s $BN-mgw-s ; do
+				    AC_CHECK_LIB($ax_lib, main, [BOOST_ASIO_LIB="-l$ax_lib" AC_SUBST(BOOST_ASIO_LIB) link_thread="yes" break],
+                                 [link_thread="no"])
+  				done
+            else
+               for ax_lib in $ax_boost_user_asio_lib $BN-$ax_boost_user_asio_lib; do
+				      AC_CHECK_LIB($ax_lib, main,
+                                   [BOOST_ASIO_LIB="-l$ax_lib" AC_SUBST(BOOST_ASIO_LIB) link_asio="yes" break],
+                                   [link_asio="no"])
+                  done
+
+            fi
+            if test "x$ax_lib" = "x"; then
+                AC_MSG_ERROR(Could not find a version of the library!)
+            fi
+			if test "x$link_asio" = "xno"; then
+				AC_MSG_ERROR(Could not link against $ax_lib !)
+			fi
+		fi
+
+		CPPFLAGS="$CPPFLAGS_SAVED"
+    	LDFLAGS="$LDFLAGS_SAVED"
+	fi
+])

Added: hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/m4/ax_boost_base.m4
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/m4/ax_boost_base.m4?rev=1021463&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/m4/ax_boost_base.m4 (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/m4/ax_boost_base.m4 Mon Oct 11 19:00:42 2010
@@ -0,0 +1,252 @@
+# ===========================================================================
+#       http://www.gnu.org/software/autoconf-archive/ax_boost_base.html
+# ===========================================================================
+#
+# SYNOPSIS
+#
+#   AX_BOOST_BASE([MINIMUM-VERSION], [ACTION-IF-FOUND], [ACTION-IF-NOT-FOUND])
+#
+# DESCRIPTION
+#
+#   Test for the Boost C++ libraries of a particular version (or newer)
+#
+#   If no path to the installed boost library is given the macro searchs
+#   under /usr, /usr/local, /opt and /opt/local and evaluates the
+#   $BOOST_ROOT environment variable. Further documentation is available at
+#   <http://randspringer.de/boost/index.html>.
+#
+#   This macro calls:
+#
+#     AC_SUBST(BOOST_CPPFLAGS) / AC_SUBST(BOOST_LDFLAGS)
+#
+#   And sets:
+#
+#     HAVE_BOOST
+#
+# LICENSE
+#
+#   Copyright (c) 2008 Thomas Porschberg <thomas@randspringer.de>
+#   Copyright (c) 2009 Peter Adolphs
+#
+#   Copying and distribution of this file, with or without modification, are
+#   permitted in any medium without royalty provided the copyright notice
+#   and this notice are preserved. This file is offered as-is, without any
+#   warranty.
+
+#serial 17
+
+AC_DEFUN([AX_BOOST_BASE],
+[
+AC_ARG_WITH([boost],
+  [AS_HELP_STRING([--with-boost@<:@=ARG@:>@],
+    [use Boost library from a standard location (ARG=yes),
+     from the specified location (ARG=<path>),
+     or disable it (ARG=no)
+     @<:@ARG=yes@:>@ ])],
+    [
+    if test "$withval" = "no"; then
+        want_boost="no"
+    elif test "$withval" = "yes"; then
+        want_boost="yes"
+        ac_boost_path=""
+    else
+        want_boost="yes"
+        ac_boost_path="$withval"
+    fi
+    ],
+    [want_boost="yes"])
+
+
+AC_ARG_WITH([boost-libdir],
+        AS_HELP_STRING([--with-boost-libdir=LIB_DIR],
+        [Force given directory for boost libraries. Note that this will overwrite library path detection, so use this parameter only if default library detection fails and you know exactly where your boost libraries are located.]),
+        [
+        if test -d "$withval"
+        then
+                ac_boost_lib_path="$withval"
+        else
+                AC_MSG_ERROR(--with-boost-libdir expected directory name)
+        fi
+        ],
+        [ac_boost_lib_path=""]
+)
+
+if test "x$want_boost" = "xyes"; then
+    boost_lib_version_req=ifelse([$1], ,1.20.0,$1)
+    boost_lib_version_req_shorten=`expr $boost_lib_version_req : '\([[0-9]]*\.[[0-9]]*\)'`
+    boost_lib_version_req_major=`expr $boost_lib_version_req : '\([[0-9]]*\)'`
+    boost_lib_version_req_minor=`expr $boost_lib_version_req : '[[0-9]]*\.\([[0-9]]*\)'`
+    boost_lib_version_req_sub_minor=`expr $boost_lib_version_req : '[[0-9]]*\.[[0-9]]*\.\([[0-9]]*\)'`
+    if test "x$boost_lib_version_req_sub_minor" = "x" ; then
+        boost_lib_version_req_sub_minor="0"
+        fi
+    WANT_BOOST_VERSION=`expr $boost_lib_version_req_major \* 100000 \+  $boost_lib_version_req_minor \* 100 \+ $boost_lib_version_req_sub_minor`
+    AC_MSG_CHECKING(for boostlib >= $boost_lib_version_req)
+    succeeded=no
+
+    dnl On x86_64 systems check for system libraries in both lib64 and lib.
+    dnl The former is specified by FHS, but e.g. Debian does not adhere to
+    dnl this (as it rises problems for generic multi-arch support).
+    dnl The last entry in the list is chosen by default when no libraries
+    dnl are found, e.g. when only header-only libraries are installed!
+    libsubdirs="lib"
+    if test `uname -m` = x86_64; then
+        libsubdirs="lib64 lib lib64"
+    fi
+
+    dnl first we check the system location for boost libraries
+    dnl this location ist chosen if boost libraries are installed with the --layout=system option
+    dnl or if you install boost with RPM
+    if test "$ac_boost_path" != ""; then
+        BOOST_LDFLAGS="-L$ac_boost_path/$libsubdir"
+        BOOST_CPPFLAGS="-I$ac_boost_path/include"
+    elif test "$cross_compiling" != yes; then
+        for ac_boost_path_tmp in /usr /usr/local /opt /opt/local ; do
+            if test -d "$ac_boost_path_tmp/include/boost" && test -r "$ac_boost_path_tmp/include/boost"; then
+                for libsubdir in $libsubdirs ; do
+                    if ls "$ac_boost_path_tmp/$libsubdir/libboost_"* >/dev/null 2>&1 ; then break; fi
+                done
+                BOOST_LDFLAGS="-L$ac_boost_path_tmp/$libsubdir"
+                BOOST_CPPFLAGS="-I$ac_boost_path_tmp/include"
+                break;
+            fi
+        done
+    fi
+
+    dnl overwrite ld flags if we have required special directory with
+    dnl --with-boost-libdir parameter
+    if test "$ac_boost_lib_path" != ""; then
+       BOOST_LDFLAGS="-L$ac_boost_lib_path"
+    fi
+
+    CPPFLAGS_SAVED="$CPPFLAGS"
+    CPPFLAGS="$CPPFLAGS $BOOST_CPPFLAGS"
+    export CPPFLAGS
+
+    LDFLAGS_SAVED="$LDFLAGS"
+    LDFLAGS="$LDFLAGS $BOOST_LDFLAGS"
+    export LDFLAGS
+
+    AC_REQUIRE([AC_PROG_CXX])
+    AC_LANG_PUSH(C++)
+        AC_COMPILE_IFELSE([AC_LANG_PROGRAM([[
+    @%:@include <boost/version.hpp>
+    ]], [[
+    #if BOOST_VERSION >= $WANT_BOOST_VERSION
+    // Everything is okay
+    #else
+    #  error Boost version is too old
+    #endif
+    ]])],[
+        AC_MSG_RESULT(yes)
+    succeeded=yes
+    found_system=yes
+        ],[
+        ])
+    AC_LANG_POP([C++])
+
+
+
+    dnl if we found no boost with system layout we search for boost libraries
+    dnl built and installed without the --layout=system option or for a staged(not installed) version
+    if test "x$succeeded" != "xyes"; then
+        _version=0
+        if test "$ac_boost_path" != ""; then
+            if test -d "$ac_boost_path" && test -r "$ac_boost_path"; then
+                for i in `ls -d $ac_boost_path/include/boost-* 2>/dev/null`; do
+                    _version_tmp=`echo $i | sed "s#$ac_boost_path##" | sed 's/\/include\/boost-//' | sed 's/_/./'`
+                    V_CHECK=`expr $_version_tmp \> $_version`
+                    if test "$V_CHECK" = "1" ; then
+                        _version=$_version_tmp
+                    fi
+                    VERSION_UNDERSCORE=`echo $_version | sed 's/\./_/'`
+                    BOOST_CPPFLAGS="-I$ac_boost_path/include/boost-$VERSION_UNDERSCORE"
+                done
+            fi
+        else
+            if test "$cross_compiling" != yes; then
+                for ac_boost_path in /usr /usr/local /opt /opt/local ; do
+                    if test -d "$ac_boost_path" && test -r "$ac_boost_path"; then
+                        for i in `ls -d $ac_boost_path/include/boost-* 2>/dev/null`; do
+                            _version_tmp=`echo $i | sed "s#$ac_boost_path##" | sed 's/\/include\/boost-//' | sed 's/_/./'`
+                            V_CHECK=`expr $_version_tmp \> $_version`
+                            if test "$V_CHECK" = "1" ; then
+                                _version=$_version_tmp
+                                best_path=$ac_boost_path
+                            fi
+                        done
+                    fi
+                done
+
+                VERSION_UNDERSCORE=`echo $_version | sed 's/\./_/'`
+                BOOST_CPPFLAGS="-I$best_path/include/boost-$VERSION_UNDERSCORE"
+                if test "$ac_boost_lib_path" = ""; then
+                    for libsubdir in $libsubdirs ; do
+                        if ls "$best_path/$libsubdir/libboost_"* >/dev/null 2>&1 ; then break; fi
+                    done
+                    BOOST_LDFLAGS="-L$best_path/$libsubdir"
+                fi
+            fi
+
+            if test "x$BOOST_ROOT" != "x"; then
+                for libsubdir in $libsubdirs ; do
+                    if ls "$BOOST_ROOT/stage/$libsubdir/libboost_"* >/dev/null 2>&1 ; then break; fi
+                done
+                if test -d "$BOOST_ROOT" && test -r "$BOOST_ROOT" && test -d "$BOOST_ROOT/stage/$libsubdir" && test -r "$BOOST_ROOT/stage/$libsubdir"; then
+                    version_dir=`expr //$BOOST_ROOT : '.*/\(.*\)'`
+                    stage_version=`echo $version_dir | sed 's/boost_//' | sed 's/_/./g'`
+                        stage_version_shorten=`expr $stage_version : '\([[0-9]]*\.[[0-9]]*\)'`
+                    V_CHECK=`expr $stage_version_shorten \>\= $_version`
+                    if test "$V_CHECK" = "1" -a "$ac_boost_lib_path" = "" ; then
+                        AC_MSG_NOTICE(We will use a staged boost library from $BOOST_ROOT)
+                        BOOST_CPPFLAGS="-I$BOOST_ROOT"
+                        BOOST_LDFLAGS="-L$BOOST_ROOT/stage/$libsubdir"
+                    fi
+                fi
+            fi
+        fi
+
+        CPPFLAGS="$CPPFLAGS $BOOST_CPPFLAGS"
+        export CPPFLAGS
+        LDFLAGS="$LDFLAGS $BOOST_LDFLAGS"
+        export LDFLAGS
+
+        AC_LANG_PUSH(C++)
+            AC_COMPILE_IFELSE([AC_LANG_PROGRAM([[
+        @%:@include <boost/version.hpp>
+        ]], [[
+        #if BOOST_VERSION >= $WANT_BOOST_VERSION
+        // Everything is okay
+        #else
+        #  error Boost version is too old
+        #endif
+        ]])],[
+            AC_MSG_RESULT(yes)
+        succeeded=yes
+        found_system=yes
+            ],[
+            ])
+        AC_LANG_POP([C++])
+    fi
+
+    if test "$succeeded" != "yes" ; then
+        if test "$_version" = "0" ; then
+            AC_MSG_NOTICE([[We could not detect the boost libraries (version $boost_lib_version_req_shorten or higher). If you have a staged boost library (still not installed) please specify \$BOOST_ROOT in your environment and do not give a PATH to --with-boost option.  If you are sure you have boost installed, then check your version number looking in <boost/version.hpp>. See http://randspringer.de/boost for more documentation.]])
+        else
+            AC_MSG_NOTICE([Your boost libraries seems to old (version $_version).])
+        fi
+        # execute ACTION-IF-NOT-FOUND (if present):
+        ifelse([$3], , :, [$3])
+    else
+        AC_SUBST(BOOST_CPPFLAGS)
+        AC_SUBST(BOOST_LDFLAGS)
+        AC_DEFINE(HAVE_BOOST,,[define if the Boost library is available])
+        # execute ACTION-IF-FOUND (if present):
+        ifelse([$2], , :, [$2])
+    fi
+
+    CPPFLAGS="$CPPFLAGS_SAVED"
+    LDFLAGS="$LDFLAGS_SAVED"
+fi
+
+])

Added: hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/m4/ax_boost_thread.m4
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/m4/ax_boost_thread.m4?rev=1021463&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/m4/ax_boost_thread.m4 (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/m4/ax_boost_thread.m4 Mon Oct 11 19:00:42 2010
@@ -0,0 +1,149 @@
+# ===========================================================================
+#      http://www.gnu.org/software/autoconf-archive/ax_boost_thread.html
+# ===========================================================================
+#
+# SYNOPSIS
+#
+#   AX_BOOST_THREAD
+#
+# DESCRIPTION
+#
+#   Test for Thread library from the Boost C++ libraries. The macro requires
+#   a preceding call to AX_BOOST_BASE. Further documentation is available at
+#   <http://randspringer.de/boost/index.html>.
+#
+#   This macro calls:
+#
+#     AC_SUBST(BOOST_THREAD_LIB)
+#
+#   And sets:
+#
+#     HAVE_BOOST_THREAD
+#
+# LICENSE
+#
+#   Copyright (c) 2009 Thomas Porschberg <thomas@randspringer.de>
+#   Copyright (c) 2009 Michael Tindal
+#
+#   Copying and distribution of this file, with or without modification, are
+#   permitted in any medium without royalty provided the copyright notice
+#   and this notice are preserved. This file is offered as-is, without any
+#   warranty.
+
+#serial 17
+
+AC_DEFUN([AX_BOOST_THREAD],
+[
+	AC_ARG_WITH([boost-thread],
+	AS_HELP_STRING([--with-boost-thread@<:@=special-lib@:>@],
+                   [use the Thread library from boost - it is possible to specify a certain library for the linker
+                        e.g. --with-boost-thread=boost_thread-gcc-mt ]),
+        [
+        if test "$withval" = "no"; then
+			want_boost="no"
+        elif test "$withval" = "yes"; then
+            want_boost="yes"
+            ax_boost_user_thread_lib=""
+        else
+		    want_boost="yes"
+        	ax_boost_user_thread_lib="$withval"
+		fi
+        ],
+        [want_boost="yes"]
+	)
+
+	if test "x$want_boost" = "xyes"; then
+        AC_REQUIRE([AC_PROG_CC])
+        AC_REQUIRE([AC_CANONICAL_BUILD])
+		CPPFLAGS_SAVED="$CPPFLAGS"
+		CPPFLAGS="$CPPFLAGS $BOOST_CPPFLAGS"
+		export CPPFLAGS
+
+		LDFLAGS_SAVED="$LDFLAGS"
+		LDFLAGS="$LDFLAGS $BOOST_LDFLAGS"
+		export LDFLAGS
+
+        AC_CACHE_CHECK(whether the Boost::Thread library is available,
+					   ax_cv_boost_thread,
+        [AC_LANG_PUSH([C++])
+			 CXXFLAGS_SAVE=$CXXFLAGS
+
+			 if test "x$build_os" = "xsolaris" ; then
+  				 CXXFLAGS="-pthreads $CXXFLAGS"
+			 elif test "x$build_os" = "xming32" ; then
+				 CXXFLAGS="-mthreads $CXXFLAGS"
+			 else
+				CXXFLAGS="-pthread $CXXFLAGS"
+			 fi
+			 AC_COMPILE_IFELSE(AC_LANG_PROGRAM([[@%:@include <boost/thread/thread.hpp>]],
+                                   [[boost::thread_group thrds;
+                                   return 0;]]),
+                   ax_cv_boost_thread=yes, ax_cv_boost_thread=no)
+			 CXXFLAGS=$CXXFLAGS_SAVE
+             AC_LANG_POP([C++])
+		])
+		if test "x$ax_cv_boost_thread" = "xyes"; then
+           if test "x$build_os" = "xsolaris" ; then
+			  BOOST_CPPFLAGS="-pthreads $BOOST_CPPFLAGS"
+		   elif test "x$build_os" = "xming32" ; then
+			  BOOST_CPPFLAGS="-mthreads $BOOST_CPPFLAGS"
+		   else
+			  BOOST_CPPFLAGS="-pthread $BOOST_CPPFLAGS"
+		   fi
+
+			AC_SUBST(BOOST_CPPFLAGS)
+
+			AC_DEFINE(HAVE_BOOST_THREAD,,[define if the Boost::Thread library is available])
+            BOOSTLIBDIR=`echo $BOOST_LDFLAGS | sed -e 's/@<:@^\/@:>@*//'`
+
+			LDFLAGS_SAVE=$LDFLAGS
+                        case "x$build_os" in
+                          *bsd* )
+                               LDFLAGS="-pthread $LDFLAGS"
+                          break;
+                          ;;
+                        esac
+            if test "x$ax_boost_user_thread_lib" = "x"; then
+                for libextension in `ls $BOOSTLIBDIR/libboost_thread*.so* 2>/dev/null | sed 's,.*/,,' | sed -e 's;^lib\(boost_thread.*\)\.so.*$;\1;'` `ls $BOOSTLIBDIR/libboost_thread*.a* 2>/dev/null | sed 's,.*/,,' | sed -e 's;^lib\(boost_thread.*\)\.a*$;\1;'`; do
+                     ax_lib=${libextension}
+				    AC_CHECK_LIB($ax_lib, exit,
+                                 [BOOST_THREAD_LIB="-l$ax_lib"; AC_SUBST(BOOST_THREAD_LIB) link_thread="yes"; break],
+                                 [link_thread="no"])
+  				done
+                if test "x$link_thread" != "xyes"; then
+                for libextension in `ls $BOOSTLIBDIR/boost_thread*.dll* 2>/dev/null | sed 's,.*/,,' | sed -e 's;^\(boost_thread.*\)\.dll.*$;\1;'` `ls $BOOSTLIBDIR/boost_thread*.a* 2>/dev/null | sed 's,.*/,,' | sed -e 's;^\(boost_thread.*\)\.a*$;\1;'` ; do
+                     ax_lib=${libextension}
+				    AC_CHECK_LIB($ax_lib, exit,
+                                 [BOOST_THREAD_LIB="-l$ax_lib"; AC_SUBST(BOOST_THREAD_LIB) link_thread="yes"; break],
+                                 [link_thread="no"])
+  				done
+                fi
+
+            else
+               for ax_lib in $ax_boost_user_thread_lib boost_thread-$ax_boost_user_thread_lib; do
+				      AC_CHECK_LIB($ax_lib, exit,
+                                   [BOOST_THREAD_LIB="-l$ax_lib"; AC_SUBST(BOOST_THREAD_LIB) link_thread="yes"; break],
+                                   [link_thread="no"])
+                  done
+
+            fi
+            if test "x$ax_lib" = "x"; then
+                AC_MSG_ERROR(Could not find a version of the library!)
+            fi
+			if test "x$link_thread" = "xno"; then
+				AC_MSG_ERROR(Could not link against $ax_lib !)
+                        else
+                           case "x$build_os" in
+                              *bsd* )
+			        BOOST_LDFLAGS="-pthread $BOOST_LDFLAGS"
+                              break;
+                              ;;
+                           esac
+
+			fi
+		fi
+
+		CPPFLAGS="$CPPFLAGS_SAVED"
+    	LDFLAGS="$LDFLAGS_SAVED"
+	fi
+])

Added: hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/scripts/log4cpp.conf
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/scripts/log4cpp.conf?rev=1021463&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/scripts/log4cpp.conf (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/scripts/log4cpp.conf Mon Oct 11 19:00:42 2010
@@ -0,0 +1,49 @@
+#
+# 
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+# 
+#
+
+log4j.appender.rootAppender=org.apache.log4j.ConsoleAppender
+log4j.appender.rootAppender.layout=org.apache.log4j.BasicLayout
+
+#log4j.appender.hedwig=org.apache.log4j.RollingFileAppender
+log4j.appender.hedwig=org.apache.log4j.ConsoleAppender
+#log4j.appender.hedwig.fileName=./testLog.log
+log4j.appender.hedwig.layout=org.apache.log4j.PatternLayout
+log4j.appender.hedwig.layout.ConversionPattern=[%d{%H:%M:%S.%l}] %t %c %p - %m%n
+log4j.appender.hedwig.layout=org.apache.log4j.PatternLayout
+log4j.appender.hedwig.layout.ConversionPattern=%.5m%n
+
+log4j.appender.hedwigtest=org.apache.log4j.ConsoleAppender
+#log4j.appender.hedwig.fileName=./testLog.log
+log4j.appender.hedwigtest.layout=org.apache.log4j.PatternLayout
+log4j.appender.hedwigtest.layout.ConversionPattern=[%d{%H:%M:%S.%l}] %c %p - %m%n
+log4j.appender.hedwigtest.layout=org.apache.log4j.PatternLayout
+log4j.appender.hedwigtest.layout.ConversionPattern=%.5m%n
+
+# category
+log4j.category.hedwig=DEBUG, hedwig
+log4j.rootCategory=DEBUG
+
+#log4j.category.hedwig.channel=ERROR
+log4j.category.hedwig.util=ERROR
+log4j.category.hedwigtest.servercontrol=ERROR
+
+log4j.category.hedwigtest=DEBUG, hedwigtest
+log4j.rootCategory=DEBUG

Added: hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/scripts/network-delays.sh
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/scripts/network-delays.sh?rev=1021463&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/scripts/network-delays.sh (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/scripts/network-delays.sh Mon Oct 11 19:00:42 2010
@@ -0,0 +1,64 @@
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+setup_delays() {
+
+    UNAME=`uname -s`
+
+    echo "Setting delay to ${1}ms"
+    case "$UNAME" in
+	Darwin|FreeBSD)
+	    sudo ipfw pipe 1 config delay ${1}ms
+	    sudo ipfw add pipe 1 dst-port 12349 
+	    sudo ipfw add pipe 1 dst-port 12350
+	    sudo ipfw add pipe 1 src-port 12349 
+	    sudo ipfw add pipe 1 src-port 12350 
+            ;;
+	Linux)
+	    sudo tc qdisc add dev lo root handle 1: prio
+	    sudo tc qdisc add dev lo parent 1:3 handle 30: netem delay ${1}ms 
+	    sudo tc filter add dev lo protocol ip parent 1:0 prio 3 u32 match ip dport 12349 0xffff flowid 1:3
+	    sudo tc filter add dev lo protocol ip parent 1:0 prio 3 u32 match ip dport 12350 0xffff flowid 1:3
+	    sudo tc filter add dev lo protocol ip parent 1:0 prio 3 u32 match ip sport 12349 0xffff flowid 1:3
+	    sudo tc filter add dev lo protocol ip parent 1:0 prio 3 u32 match ip sport 12350 0xffff flowid 1:3
+	    ;;
+	*)
+	    echo "Unknown system type, $UNAME, only Linux, Darwin & FreeBSD supported"
+	    ;;
+    esac
+}
+
+clear_delays() {
+    UNAME=`uname -s`
+
+    case "$UNAME" in
+	Darwin|FreeBSD)
+	    echo "Flushing ipfw"
+	    sudo ipfw -f -q flush
+            ;;
+	Linux)
+	    echo "Clearing delay"
+	    sudo tc qdisc del dev lo root
+	    ;;
+	*)
+	    echo "Unknown system type, $UNAME, only Linux, Darwin & FreeBSD supported"
+	    ;;
+    esac
+}
+

Added: hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/scripts/server-control.sh
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/scripts/server-control.sh?rev=1021463&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/scripts/server-control.sh (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/scripts/server-control.sh Mon Oct 11 19:00:42 2010
@@ -0,0 +1,49 @@
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+HEDWIGBASE=../../../../..
+
+HEDWIGJAR=`ls $HEDWIGBASE/server/target/server-*-with-dependencies.jar`
+if [ ! $? -eq 0 ]; then
+    echo "\n\nCould not find server-VERSION-with-dependencies.jar. \nYou need to build the java part of hedwig. \nRun mvn package in the toplevel hedwig directory.\n\n"
+    exit 1;
+fi
+
+HEDWIGSERVERTESTS=$HEDWIGBASE/server/target/test-classes/
+if [ ! -e $HEDWIGSERVERTESTS ]; then
+    echo "\n\nThe hedwig java server tests need to be build.\b\b"
+    exit 1;
+fi
+
+export CP=.:$HEDWIGJAR:$HEDWIGSERVERTESTS
+
+start_control_server() {
+    if [ -e server-control.pid ]; then
+	kill -9 `cat server-control.pid`
+	rm server-control.pid
+    fi
+    java -cp $CP  -Dlog4j.configuration=log4j.properties org.apache.hedwig.ServerControlDaemon  <&-  1> servercontrol.out  2>&1  &
+    echo $! > server-control.pid
+    sleep 5
+}
+
+stop_control_server() {
+    kill -9 `cat server-control.pid`
+    rm server-control.pid
+}

Added: hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/scripts/tester.sh
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/scripts/tester.sh?rev=1021463&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/scripts/tester.sh (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/scripts/tester.sh Mon Oct 11 19:00:42 2010
@@ -0,0 +1,95 @@
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+cd `dirname $0`;
+
+source network-delays.sh
+source server-control.sh
+
+all() {
+    if [ "z$HEDWIG_NETWORK_DELAY" != "z" ]; then
+	setup_delays $HEDWIG_NETWORK_DELAY
+    fi
+
+    start_control_server;
+
+    ../test/hedwigtest 
+    RESULT=$?
+    stop_control_server;
+
+    if [ "z$HEDWIG_NETWORK_DELAY" != "z" ]; then
+	clear_delays
+    else
+	cat <<EOF
+
+The environment variable HEDWIG_NETWORK_DELAY is not set, so the tests were run directly 
+with a localhost server. This isn't quite realistic as usually there will be some delay between 
+the client and the hedwig server. Set HEDWIG_NETWORK_DELAY to the number of milliseconds you want
+to delay the packets between the server and client. 
+
+ $ export HEDWIG_NETWORK_DELAY=500
+
+Requires root privileges.
+
+WARNING!!! This will modify your traffic shaping and firewall rules. If you do run with delays, 
+check your firewall rules afterwards.
+
+EOF
+    fi
+
+    exit $RESULT
+}
+
+case "$1" in
+    start-control-server)
+	start_control_server
+	;;
+    stop-control-server)
+	stop_control_server
+	;;
+    setup-delays)
+	setup_delays $2
+	;;
+    clear-delays)
+	clear_delays
+	;;
+    all)
+	all
+	;;
+    *)
+	cat <<EOF
+Usage: tester.sh [command]
+
+tester.sh all
+   Run through the tests, setting up and cleaning up all prerequisites.
+
+tester.sh start-control-server
+   Starts the deamon which the tests use to start and stop hedwig/zookeeper/bookeeper servers
+
+tester.sh stop-control-server
+   Stops the aforementioned daemon
+
+tester.sh setup-delays <delay>
+   Set the millisecond delay for accessing the hedwig servers for the tests.
+
+tester.sh clear-delays
+   Clear the delay for accessing the hedwig servers.
+EOF
+	;;
+esac

Modified: hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/test/Makefile.am
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/test/Makefile.am?rev=1021463&r1=1021462&r2=1021463&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/test/Makefile.am (original)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/test/Makefile.am Mon Oct 11 19:00:42 2010
@@ -1,6 +1,26 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
 bin_PROGRAMS = hedwigtest
 hedwigtest_SOURCES = main.cpp utiltest.cpp pubsubdatatest.cpp publishtest.cpp subscribetest.cpp servercontrol.cpp pubsubtest.cpp
-hedwigtest_CPPFLAGS = -I../inc $(DEPS_CFLAGS)
-hedwigtest_LDADD = $(DEPS_LIBS) -L../lib -lhedwig01
-hedwigtest_LDFLAGS = -no-undefined
+hedwigtest_CPPFLAGS = -I$(top_srcdir)/inc $(DEPS_CFLAGS) $(BOOST_CPPFLAGS) 
+hedwigtest_LDADD = $(DEPS_LIBS) -L$(top_builddir)/lib -lhedwig01 
+hedwigtest_LDFLAGS = -no-undefined $(BOOST_ASIO_LIB) $(BOOST_LDFLAGS) $(BOOST_THREAD_LIB)
 
+check: hedwigtest
+	bash ../scripts/tester.sh all

Modified: hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/test/main.cpp
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/test/main.cpp?rev=1021463&r1=1021462&r2=1021463&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/test/main.cpp (original)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/test/main.cpp Mon Oct 11 19:00:42 2010
@@ -23,14 +23,24 @@
 #include <log4cpp/PropertyConfigurator.hh>
 #include <log4cpp/Category.hh>
 #include "servercontrol.h"
+#include "util.h"
 
 #include <cppunit/extensions/TestFactoryRegistry.h>
 #include <cppunit/ui/text/TextTestRunner.h>
 
+#include <cppunit/TextTestProgressListener.h>
+#include <cppunit/TestResult.h>
+
+HedwigCppTextTestProgressListener gprogress;
+
 int main( int argc, char **argv)
 {
   try {
-    log4cpp::PropertyConfigurator::configure("../log4cpp.conf");
+    if (getenv("LOG4CPP_CONF") == NULL) {
+      std::cerr << "Set LOG4CPP_CONF in your environment to get logging." << std::endl;
+    } else {
+      log4cpp::PropertyConfigurator::configure(getenv("LOG4CPP_CONF"));
+    }
   } catch (log4cpp::ConfigureFailure &e) {
     std::cerr << "log4cpp configuration failure while loading : " << e.what() << std::endl;
   } catch (std::exception &e) {
@@ -55,10 +65,13 @@ int main( int argc, char **argv)
     
     runner.addTest( registry.makeTest() );
   }
-  int ret =  runner.run(testPath);
+  
+  runner.eventManager().addListener( &gprogress );
+
+  bool ret = runner.run(testPath);
   google::protobuf::ShutdownProtobufLibrary();
   
   log4cpp::Category::shutdown();
   
-  return ret;
+  return (ret == true) ? 0 : 1;
 }

Modified: hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/test/publishtest.cpp
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/test/publishtest.cpp?rev=1021463&r1=1021462&r2=1021463&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/test/publishtest.cpp (original)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/test/publishtest.cpp Mon Oct 11 19:00:42 2010
@@ -54,12 +54,10 @@ private:
   CPPUNIT_TEST_SUITE_END();
 
 public:
-  PublishTestSuite() {
-
+  PublishTestSuite() : control(NULL) {
   }
 
   ~PublishTestSuite() {
-
   }
 
   void setUp()
@@ -77,15 +75,29 @@ public:
   
   void tearDown() 
   {
-    hw2->kill();
-    hw1->kill();
+    if (hw2.get()) {
+      hw2->kill();
+    }
+    if (hw1.get()) {
+      hw1->kill();
+    }
     
-    bk1->kill();
-    bk2->kill();
-    bk3->kill();
+    if (bk1.get()) {
+      bk1->kill();
+    }
+    if (bk2.get()) {
+      bk2->kill();
+    }
+    if (bk3.get()) {
+      bk3->kill();
+    }
     
-    zk->kill();
-    delete control;
+    if (zk.get()) {
+      zk->kill();
+    }
+    if (control) {
+      delete control;
+    }
   }
 
   void testSyncPublish() {
@@ -111,6 +123,9 @@ public:
     pub.asyncPublish("testTopic", "async test message", testcb);
     
     cond->wait();
+
+    CPPUNIT_ASSERT(cond->wasSuccess());
+
     delete cond;
     delete client;
     delete conf;
@@ -134,9 +149,12 @@ public:
     pub.asyncPublish("testTopic", "async test message #3", testcb3);
     
     cond3->wait();
+    CPPUNIT_ASSERT(cond3->wasSuccess());
     cond2->wait();
+    CPPUNIT_ASSERT(cond2->wasSuccess());
     cond1->wait();
-
+    CPPUNIT_ASSERT(cond1->wasSuccess());
+    
     delete cond3; delete cond2; delete cond1;
     delete client;
     delete conf;

Modified: hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/test/pubsubtest.cpp
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/test/pubsubtest.cpp?rev=1021463&r1=1021462&r2=1021463&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/test/pubsubtest.cpp (original)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/test/pubsubtest.cpp Mon Oct 11 19:00:42 2010
@@ -18,6 +18,7 @@
 #include <cppunit/Test.h>
 #include <cppunit/TestSuite.h>
 #include <cppunit/extensions/HelperMacros.h>
+#include <boost/thread/mutex.hpp>
 
 #include "../lib/clientimpl.h"
 #include <hedwig/exceptions.h>
@@ -50,7 +51,7 @@ private:
   CPPUNIT_TEST_SUITE_END();
 
 public:
-  PubSubTestSuite() {
+  PubSubTestSuite() : control(NULL) {
     
   }
 
@@ -72,17 +73,29 @@ public:
   void tearDown() 
   {
     try {
-      hw1->kill();
-    
-      bk1->kill();
-      bk2->kill();
-      bk3->kill();
+      if (hw1.get()) {
+	hw1->kill();
+      }
       
-      zk->kill();
+      if (bk1.get()) {
+	bk1->kill();
+      }
+      if (bk2.get()) {
+	bk2->kill();
+      }
+      if (bk3.get()) {
+	bk3->kill();
+      }
+      
+      if (zk.get()) {
+	zk->kill();
+      }
     } catch (std::exception& e) {
       // don't allow an exception to break everything, we're going deleting the control no matter what
     }
-    delete control;
+    if (control) {
+      delete control;
+    }
   }
 
   class MyMessageHandlerCallback : public Hedwig::MessageHandlerCallback {
@@ -93,30 +106,28 @@ public:
 
     virtual void consume(const std::string& topic, const std::string& subscriberId, const Hedwig::Message& msg, Hedwig::OperationCallbackPtr& callback) {
       if (topic == this->topic && subscriberId == this->subscriberId) {
-	mutex.lock();
+	boost::lock_guard<boost::mutex> lock(mutex);
+      
 	messagesReceived++;
 	lastMessage = msg.body();
 	callback->operationComplete();
-	mutex.unlock();
       }
     }
     
     std::string getLastMessage() {
-      mutex.lock();
+      boost::lock_guard<boost::mutex> lock(mutex);
       std::string s = lastMessage;
-      mutex.unlock();
       return s;
     }
 
     int numMessagesReceived() {
-      mutex.lock();
+      boost::lock_guard<boost::mutex> lock(mutex);
       int i = messagesReceived;
-      mutex.unlock();
       return i;
     }    
     
   protected:
-    Hedwig::Mutex mutex;
+    boost::mutex mutex;
     int messagesReceived;
     std::string lastMessage;
     std::string topic;

Modified: hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/test/servercontrol.cpp
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/test/servercontrol.cpp?rev=1021463&r1=1021462&r2=1021463&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/test/servercontrol.cpp (original)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/test/servercontrol.cpp Mon Oct 11 19:00:42 2010
@@ -27,11 +27,18 @@
 #include <stdlib.h>
 #include "servercontrol.h"
 
+
 #include <log4cpp/Category.hh>
+
+#include "util.h"
+
 #include <sstream>   
+#include <time.h>
 
 static log4cpp::Category &LOG = log4cpp::Category::getInstance("hedwigtest."__FILE__);
 
+extern HedwigCppTextTestProgressListener gprogress;
+
 using namespace HedwigTest;
 
 const int MAX_COMMAND_LN = 256;
@@ -86,6 +93,8 @@ ServerControl::ServerControl(int port) {
     close(socketfd);
     throw CantConnectToServerControlDaemonException();
   }
+  
+  requestResponse("TEST " + gprogress.getTestName() + "\n");
 }
 
 ServerControl::~ServerControl() {

Modified: hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/test/servercontrol.h
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/test/servercontrol.h?rev=1021463&r1=1021462&r2=1021463&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/test/servercontrol.h (original)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/test/servercontrol.h Mon Oct 11 19:00:42 2010
@@ -21,10 +21,11 @@
 
 #include <tr1/memory>
 #include <exception>
+#include <boost/thread/mutex.hpp>
 #include "../lib/util.h"
 
 namespace HedwigTest {
-    const int DEFAULT_CONTROLSERVER_PORT = 5672;
+  const int DEFAULT_CONTROLSERVER_PORT = 5672;
 
   class TestException : public std::exception {};
   class CantConnectToServerControlDaemonException : public TestException {};
@@ -36,6 +37,7 @@ namespace HedwigTest {
   public:
     virtual void kill() = 0;
     virtual std::string& getAddress() = 0;
+    virtual ~TestServer() {}
   };
   
   typedef std::tr1::shared_ptr<TestServer> TestServerPtr;
@@ -57,7 +59,7 @@ namespace HedwigTest {
 
   public:
     int socketfd;
-    Hedwig::Mutex socketlock;
+    boost::mutex socketlock;
   };
 };
 

Modified: hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/test/subscribetest.cpp
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/test/subscribetest.cpp?rev=1021463&r1=1021462&r2=1021463&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/test/subscribetest.cpp (original)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/test/subscribetest.cpp Mon Oct 11 19:00:42 2010
@@ -55,7 +55,7 @@ private:
   CPPUNIT_TEST_SUITE_END();
 
 public:
-  SubscribeTestSuite() {
+  SubscribeTestSuite() : control(NULL) {
     
   }
 
@@ -78,17 +78,29 @@ public:
   void tearDown() 
   {
     try {
-      hw1->kill();
-    
-      bk1->kill();
-      bk2->kill();
-      bk3->kill();
+      if (hw1.get()) {
+	hw1->kill();
+      }
+    
+      if (bk1.get()) {
+	bk1->kill();
+      }
+      if (bk2.get()) {
+	bk2->kill();
+      }
+      if (bk3.get()) {
+	bk3->kill();
+      }
       
-      zk->kill();
+      if (zk.get()) {
+	zk->kill();
+      }
     } catch (std::exception& e) {
       // don't allow an exception to break everything, we're going deleting the control no matter what
     }
-    delete control;
+    if (control) {
+      delete control;
+    }
   }
 
   void testSyncSubscribe() {
@@ -132,6 +144,7 @@ public:
     sub.asyncSubscribe("testTopic", "mySubscriberId-3", Hedwig::SubscribeRequest::CREATE_OR_ATTACH, testcb1);
     
     cond1->wait();
+    CPPUNIT_ASSERT(cond1->wasSuccess());
   }
   
   void testAsyncSubcribeAndUnsubscribe() {
@@ -153,9 +166,11 @@ public:
 
     sub.asyncSubscribe("testTopic", "mySubscriberId-4", Hedwig::SubscribeRequest::CREATE_OR_ATTACH, testcb1);
     cond1->wait();
+    CPPUNIT_ASSERT(cond1->wasSuccess());
     
     sub.asyncUnsubscribe("testTopic", "mySubscriberId-4", testcb2);
     cond2->wait();
+    CPPUNIT_ASSERT(cond2->wasSuccess());
   }
 
   void testAsyncSubcribeAndSyncUnsubscribe() {
@@ -174,6 +189,7 @@ public:
     
     sub.asyncSubscribe("testTopic", "mySubscriberId-5", Hedwig::SubscribeRequest::CREATE_OR_ATTACH, testcb1);
     cond1->wait();
+    CPPUNIT_ASSERT(cond1->wasSuccess());
 
     sub.unsubscribe("testTopic", "mySubscriberId-5");
   }

Modified: hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/test/util.h
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/test/util.h?rev=1021463&r1=1021462&r2=1021463&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/test/util.h (original)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/test/util.h Mon Oct 11 19:00:42 2010
@@ -20,22 +20,50 @@
 #include <hedwig/callback.h>
 #include <stdexcept>
 #include <pthread.h>
+#include <boost/thread/mutex.hpp>
+#include <boost/thread/condition_variable.hpp>
 
-static log4cpp::Category &UTILLOG = log4cpp::Category::getInstance("hedwigtest."__FILE__);
 
+#include <cppunit/TextTestProgressListener.h>
+#include <cppunit/TestResult.h>
+#include <cppunit/Test.h>
+
+static log4cpp::Category &UTILLOG = log4cpp::Category::getInstance("hedwigtest."__FILE__);
 
-class SimpleWaitCondition : public Hedwig::WaitConditionBase {
+class SimpleWaitCondition {
 public:
-  SimpleWaitCondition() : flag(false) {};
+ SimpleWaitCondition() : flag(false), success(false) {};
   ~SimpleWaitCondition() { wait(); }
 
-  void setTrue() { UTILLOG.debugStream() << "Setting flag " << &flag << " to true"; flag=true; UTILLOG.debugStream() << "Flag now " << flag; }
-  bool isTrue() {
-    UTILLOG.debugStream() << &flag << " isTrue? " << flag;
-    return flag;
+  void wait() {
+    boost::unique_lock<boost::mutex> lock(mut);
+    while(!flag)
+    {
+        cond.wait(lock);
+    }
+  }
+
+  void notify() {
+    {
+      boost::lock_guard<boost::mutex> lock(mut);
+      flag = true;
+    }
+    cond.notify_all();
+  }
+
+  void setSuccess(bool s) {
+    success = s;
   }
+
+  bool wasSuccess() {
+    return success;
+  }
+
 private:
   bool flag;
+  boost::condition_variable cond;
+  boost::mutex mut;
+  bool success;
 };
 
 class TestCallback : public Hedwig::OperationCallback {
@@ -46,17 +74,18 @@ public:
 
   virtual void operationComplete() {
     UTILLOG.debugStream() << "operationComplete";
-    cond->lock();
-    cond->setTrue();
-    cond->signalAndUnlock();
+    cond->setSuccess(true);
+    cond->notify();
+
   }
   
   virtual void operationFailed(const std::exception& exception) {
     UTILLOG.debugStream() << "operationFailed: " << exception.what();
-    cond->lock();
-    cond->setTrue();
-    cond->signalAndUnlock();
+    cond->setSuccess(false);
+    cond->notify();
   }    
+  
+
 private:
   SimpleWaitCondition *cond;
 };
@@ -66,11 +95,49 @@ class TestServerConfiguration : public H
 public:
   TestServerConfiguration(HedwigTest::TestServerPtr& server) : server(server), address(server->getAddress()) {}
   
-  virtual const std::string& getDefaultServer() const {
-    return address;
+  virtual int getInt(const std::string& /*key*/, int defaultVal) const {
+    return defaultVal;
+  }
+
+  virtual const std::string get(const std::string& key, const std::string& defaultVal) const {
+    if (key == Configuration::DEFAULT_SERVER) {
+      return address;
+    } else {
+      return defaultVal;
+    }
+  }
+
+  virtual bool getBool(const std::string& /*key*/, bool defaultVal) const {
+    return defaultVal;
   }
   
 private:
   HedwigTest::TestServerPtr server;
   const std::string address;
 };
+
+
+class HedwigCppTextTestProgressListener : public CppUnit::TextTestProgressListener 
+{
+ public:
+  void startTest( CppUnit::Test *test ) {
+    std::cout << "\n****\n\nStarting " << test->getName() << "\n\n****" << std::endl;
+    current_test = test->getName();
+  }
+  
+  void addFailure( const CppUnit::TestFailure &failure ) {
+    std::cout << "\n!!!!!\n\nFailed\n\n!!!!!" << std::endl;
+
+  }
+
+  void endTestRun( CppUnit::Test *test, 
+		   CppUnit::TestResult *eventManager ) {
+  }
+
+  std::string& getTestName() {
+    return current_test;
+  }
+
+private:
+  std::string current_test;
+};

Modified: hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/test/java/org/apache/hedwig/ServerControlDaemon.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/test/java/org/apache/hedwig/ServerControlDaemon.java?rev=1021463&r1=1021462&r2=1021463&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/test/java/org/apache/hedwig/ServerControlDaemon.java (original)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/test/java/org/apache/hedwig/ServerControlDaemon.java Mon Oct 11 19:00:42 2010
@@ -28,7 +28,6 @@ import org.jboss.netty.channel.socket.ni
 
 import org.apache.log4j.Logger;
 
-
 import org.jboss.netty.channel.Channel;  
 import org.jboss.netty.buffer.ChannelBuffer;
 import org.jboss.netty.channel.ChannelEvent;
@@ -74,7 +73,12 @@ public class ServerControlDaemon {
 	    HashMap<String, ServerControl.TestServer> map = serverMap.get(c);
 	    ServerControl.TestServer t = map.get(name);
 	    map.remove(name);
-	    t.kill();
+	    try {
+		t.kill();
+	    } catch (Exception e) {
+		e.printStackTrace();
+		// do nothing, should be killed, we won't use it again anyhow
+	    }
 	}
 
 	private ServerControl.TestServer lookupServer(Channel c, String name) {
@@ -86,10 +90,12 @@ public class ServerControlDaemon {
 	    HashMap<String, ServerControl.TestServer> map = serverMap.get(c);
 	    serverMap.remove(map);
 	    
-	    for (ServerControl.TestServer t : map.values()) {
-		t.kill();
+	    if (map != null) {
+		for (ServerControl.TestServer t : map.values()) {
+		    t.kill();
+		}
+		map.clear();
 	    }
-	    map.clear();
 	}
 
 	public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
@@ -116,11 +122,16 @@ public class ServerControlDaemon {
 		    ctx.getChannel().write("OK " + t.getAddress() + "\n");
 		} else if (args[0].equals("KILL")) {
 		    killServerForChannel(ctx.getChannel(), args[1]);
+		    
 		    ctx.getChannel().write("OK Killed " + args[1] + "\n");
+		} else if (args[0].equals("TEST")) {
+		    LOG.info("\n******\n\n" + args[1] + "\n\n******");
+		    ctx.getChannel().write("OK Test Noted\n");
 		} else {
 		    ctx.getChannel().write("ERR Bad Command\n");
 		}
 	    } catch (Exception ex) {
+		ex.printStackTrace();
 		ctx.getChannel().write("ERR " + ex.toString() + "\n");
 	    }
 	}



Mime
View raw message