hadoop-zookeeper-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From maha...@apache.org
Subject svn commit: r987314 [3/16] - in /hadoop/zookeeper/trunk: ./ src/contrib/hedwig/ src/contrib/hedwig/client/ src/contrib/hedwig/client/src/ src/contrib/hedwig/client/src/main/ src/contrib/hedwig/client/src/main/cpp/ src/contrib/hedwig/client/src/main/cpp...
Date Thu, 19 Aug 2010 21:25:22 GMT
Added: hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/lib/client.cpp
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/lib/client.cpp?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/lib/client.cpp (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/lib/client.cpp Thu Aug 19 21:25:13 2010
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <hedwig/client.h>
+#include <memory>
+
+#include "clientimpl.h"
+
+using namespace Hedwig;
+
+const std::string DEFAULT_SERVER = "localhost:4080";
+const std::string& Configuration::getDefaultServer() const {
+  return DEFAULT_SERVER;
+}
+
+Client::Client(const Configuration& conf) {
+  clientimpl = ClientImpl::Create( conf );
+}
+
+Subscriber& Client::getSubscriber() {
+  return clientimpl->getSubscriber();
+}
+
+Publisher& Client::getPublisher() {
+  return clientimpl->getPublisher();
+}
+
+Client::~Client() {
+  clientimpl->Destroy();
+}
+
+

Added: hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/lib/clientimpl.cpp
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/lib/clientimpl.cpp?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/lib/clientimpl.cpp (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/lib/clientimpl.cpp Thu Aug 19 21:25:13 2010
@@ -0,0 +1,505 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "clientimpl.h"
+#include "channel.h"
+#include "publisherimpl.h"
+#include "subscriberimpl.h"
+#include <log4cpp/Category.hh>
+
+static log4cpp::Category &LOG = log4cpp::Category::getInstance("hedwig."__FILE__);
+
+using namespace Hedwig;
+
+
+void SyncOperationCallback::operationComplete() {
+  lock();
+  response = SUCCESS;
+  signalAndUnlock();
+}
+
+void SyncOperationCallback::operationFailed(const std::exception& exception) {
+  lock();
+  if (typeid(exception) == typeid(ChannelConnectException)) {
+    response = NOCONNECT;
+  } else if (typeid(exception) == typeid(ServiceDownException)) {
+    response = SERVICEDOWN;
+  } else if (typeid(exception) == typeid(AlreadySubscribedException)) {
+    response = ALREADY_SUBSCRIBED;
+  } else if (typeid(exception) == typeid(NotSubscribedException)) {
+    response = NOT_SUBSCRIBED;
+  } else {
+    response = UNKNOWN;
+  }
+  signalAndUnlock();
+}
+    
+bool SyncOperationCallback::isTrue() {
+  return response != PENDING;
+}
+
+void SyncOperationCallback::throwExceptionIfNeeded() {
+  switch (response) {
+  case SUCCESS:
+    break;
+  case NOCONNECT:
+    throw CannotConnectException();
+    break;
+  case SERVICEDOWN:
+    throw ServiceDownException();
+    break;
+  case ALREADY_SUBSCRIBED:
+    throw AlreadySubscribedException();
+    break;
+  case NOT_SUBSCRIBED:
+    throw NotSubscribedException();
+    break;
+  default:
+    throw ClientException();
+    break;
+  }
+}
+
+HedwigClientChannelHandler::HedwigClientChannelHandler(ClientImplPtr& client) 
+  : client(client){
+}
+
+void HedwigClientChannelHandler::messageReceived(DuplexChannel* channel, const PubSubResponse& m) {
+  LOG.debugStream() << "Message received";
+  if (m.has_message()) {
+    LOG.errorStream() << "Subscription response, ignore for now";
+    return;
+  }
+  
+  long txnid = m.txnid();
+  PubSubDataPtr data = channel->retrieveTransaction(m.txnid()); 
+  /* you now have ownership of data, don't leave this funciton without deleting it or 
+     palming it off to someone else */
+
+  if (data == NULL) {
+    LOG.errorStream() << "Transaction " << m.txnid() << " doesn't exist in channel " << channel;
+    return;
+  }
+
+  if (m.statuscode() == NOT_RESPONSIBLE_FOR_TOPIC) {
+    client->redirectRequest(channel, data, m);
+    return;
+  }
+
+  switch (data->getType()) {
+  case PUBLISH:
+    client->getPublisherImpl().messageHandler(m, data);
+    break;
+  case SUBSCRIBE:
+  case UNSUBSCRIBE:
+    client->getSubscriberImpl().messageHandler(m, data);
+    break;
+  default:
+    LOG.errorStream() << "Unimplemented request type " << data->getType();
+    break;
+  }
+}
+
+
+void HedwigClientChannelHandler::channelConnected(DuplexChannel* channel) {
+  // do nothing 
+}
+
+void HedwigClientChannelHandler::channelDisconnected(DuplexChannel* channel, const std::exception& e) {
+  LOG.errorStream() << "Channel disconnected";
+
+  client->channelDied(channel);
+}
+
+void HedwigClientChannelHandler::exceptionOccurred(DuplexChannel* channel, const std::exception& e) {
+  LOG.errorStream() << "Exception occurred" << e.what();
+}
+
+ClientTxnCounter::ClientTxnCounter() : counter(0) 
+{
+}
+
+ClientTxnCounter::~ClientTxnCounter() {
+}
+
+/**
+Increment the transaction counter and return the new value.
+
+@returns the next transaction id
+*/
+long ClientTxnCounter::next() {  // would be nice to remove lock from here, look more into it
+  mutex.lock();
+  long next= ++counter; 
+  mutex.unlock();
+  return next;
+}
+
+
+
+PubSubDataPtr PubSubData::forPublishRequest(long txnid, const std::string& topic, const std::string& body, const OperationCallbackPtr& callback) {
+  PubSubDataPtr ptr(new PubSubData());
+  ptr->type = PUBLISH;
+  ptr->txnid = txnid;
+  ptr->topic = topic;
+  ptr->body = body;
+  ptr->callback = callback;
+  return ptr;
+}
+
+PubSubDataPtr PubSubData::forSubscribeRequest(long txnid, const std::string& subscriberid, const std::string& topic, const OperationCallbackPtr& callback, SubscribeRequest::CreateOrAttach mode) {
+  PubSubDataPtr ptr(new PubSubData());
+  ptr->type = SUBSCRIBE;
+  ptr->txnid = txnid;
+  ptr->subscriberid = subscriberid;
+  ptr->topic = topic;
+  ptr->callback = callback;
+  ptr->mode = mode;
+  return ptr;  
+}
+
+PubSubDataPtr PubSubData::forUnsubscribeRequest(long txnid, const std::string& subscriberid, const std::string& topic, const OperationCallbackPtr& callback) {
+  PubSubDataPtr ptr(new PubSubData());
+  ptr->type = UNSUBSCRIBE;
+  ptr->txnid = txnid;
+  ptr->subscriberid = subscriberid;
+  ptr->topic = topic;
+  ptr->callback = callback;
+  return ptr;  
+}
+
+PubSubDataPtr PubSubData::forConsumeRequest(long txnid, const std::string& subscriberid, const std::string& topic, const MessageSeqId msgid) {
+  PubSubDataPtr ptr(new PubSubData());
+  ptr->type = CONSUME;
+  ptr->txnid = txnid;
+  ptr->subscriberid = subscriberid;
+  ptr->topic = topic;
+  ptr->msgid = msgid;
+  return ptr;  
+}
+
+PubSubData::PubSubData() : request(NULL) {  
+}
+
+PubSubData::~PubSubData() {
+  if (request != NULL) {
+    delete request;
+  }
+}
+
+OperationType PubSubData::getType() const {
+  return type;
+}
+
+long PubSubData::getTxnId() const {
+  return txnid;
+}
+
+const std::string& PubSubData::getTopic() const {
+  return topic;
+}
+
+const std::string& PubSubData::getBody() const {
+  return body;
+}
+
+const PubSubRequest& PubSubData::getRequest() {
+  if (request != NULL) {
+    delete request;
+    request = NULL;
+  }
+  request = new Hedwig::PubSubRequest();
+  request->set_protocolversion(Hedwig::VERSION_ONE);
+  request->set_type(type);
+  request->set_txnid(txnid);
+  request->set_shouldclaim(shouldClaim);
+  request->set_topic(topic);
+    
+  if (type == PUBLISH) {
+    if (LOG.isDebugEnabled()) {
+      LOG.debugStream() << "Creating publish request";
+    }
+    Hedwig::PublishRequest* pubreq = request->mutable_publishrequest();
+    Hedwig::Message* msg = pubreq->mutable_msg();
+    msg->set_body(body);
+  } else if (type == SUBSCRIBE) {
+    if (LOG.isDebugEnabled()) {
+      LOG.debugStream() << "Creating subscribe request";
+    }
+
+    Hedwig::SubscribeRequest* subreq = request->mutable_subscriberequest();
+    subreq->set_subscriberid(subscriberid);
+    subreq->set_createorattach(mode);
+  } else if (type == CONSUME) {
+    if (LOG.isDebugEnabled()) {
+      LOG.debugStream() << "Creating consume request";
+    }
+
+    Hedwig::ConsumeRequest* conreq = request->mutable_consumerequest();
+    conreq->set_subscriberid(subscriberid);
+    conreq->mutable_msgid()->CopyFrom(msgid);
+  } else if (type == UNSUBSCRIBE) {
+    if (LOG.isDebugEnabled()) {
+      LOG.debugStream() << "Creating unsubscribe request";
+    }
+    
+    Hedwig::UnsubscribeRequest* unsubreq = request->mutable_unsubscriberequest();
+    unsubreq->set_subscriberid(subscriberid);    
+  } else {
+    LOG.errorStream() << "Tried to create a request message for the wrong type [" << type << "]";
+    throw UnknownRequestException();
+  }
+
+
+
+  return *request;
+}
+
+void PubSubData::setShouldClaim(bool shouldClaim) {
+  shouldClaim = shouldClaim;
+}
+
+void PubSubData::addTriedServer(HostAddress& h) {
+  triedservers.insert(h);
+}
+
+bool PubSubData::hasTriedServer(HostAddress& h) {
+  return triedservers.count(h) > 0;
+}
+
+void PubSubData::clearTriedServers() {
+  triedservers.clear();
+}
+
+OperationCallbackPtr& PubSubData::getCallback() {
+  return callback;
+}
+
+void PubSubData::setCallback(const OperationCallbackPtr& callback) {
+  this->callback = callback;
+}
+
+const std::string& PubSubData::getSubscriberId() const {
+  return subscriberid;
+}
+
+SubscribeRequest::CreateOrAttach PubSubData::getMode() const {
+  return mode;
+}
+
+ClientImplPtr& ClientImpl::Create(const Configuration& conf) {
+  ClientImpl* impl = new ClientImpl(conf);
+    if (LOG.isDebugEnabled()) {
+    LOG.debugStream() << "Creating Clientimpl " << impl;
+  }
+
+  return impl->selfptr;
+}
+
+void ClientImpl::Destroy() {
+  if (LOG.isDebugEnabled()) {
+    LOG.debugStream() << "destroying Clientimpl " << this;
+  }
+  allchannels_lock.lock();
+
+  shuttingDownFlag = true;
+  for (ChannelMap::iterator iter = allchannels.begin(); iter != allchannels.end(); ++iter ) {
+    (*iter).second->kill();
+  }  
+  allchannels.clear();
+  allchannels_lock.unlock();
+  /* destruction of the maps will clean up any items they hold */
+  
+  if (subscriber != NULL) {
+    delete subscriber;
+    subscriber = NULL;
+  }
+  if (publisher != NULL) {
+    delete publisher;
+    publisher = NULL;
+  }
+
+  selfptr = ClientImplPtr(); // clear the self pointer
+}
+
+ClientImpl::ClientImpl(const Configuration& conf) 
+  : selfptr(this), conf(conf), subscriber(NULL), publisher(NULL), counterobj(), shuttingDownFlag(false)
+{
+}
+
+Subscriber& ClientImpl::getSubscriber() {
+  return getSubscriberImpl();
+}
+
+Publisher& ClientImpl::getPublisher() {
+  return getPublisherImpl();
+}
+    
+SubscriberImpl& ClientImpl::getSubscriberImpl() {
+  if (subscriber == NULL) {
+    subscribercreate_lock.lock();
+    if (subscriber == NULL) {
+      subscriber = new SubscriberImpl(selfptr);
+    }
+    subscribercreate_lock.unlock();
+  }
+  return *subscriber;
+}
+
+PublisherImpl& ClientImpl::getPublisherImpl() {
+  if (publisher == NULL) { 
+    publishercreate_lock.lock();
+    if (publisher == NULL) {
+      publisher = new PublisherImpl(selfptr);
+    }
+    publishercreate_lock.unlock();
+  }
+  return *publisher;
+}
+
+ClientTxnCounter& ClientImpl::counter() {
+  return counterobj;
+}
+
+void ClientImpl::redirectRequest(DuplexChannel* channel, PubSubDataPtr& data, const PubSubResponse& response) {
+  HostAddress oldhost = channel->getHostAddress();
+  data->addTriedServer(oldhost);
+  
+  HostAddress h = HostAddress::fromString(response.statusmsg());
+  if (data->hasTriedServer(h)) {
+    LOG.errorStream() << "We've been told to try request [" << data->getTxnId() << "] with [" << h.getAddressString()<< "] by " << channel->getHostAddress().getAddressString() << " but we've already tried that. Failing operation";
+    data->getCallback()->operationFailed(InvalidRedirectException());
+    return;
+  }
+  if (LOG.isDebugEnabled()) {
+    LOG.debugStream() << "We've been told  [" << data->getTopic() << "] is on [" << h.getAddressString() << "] by [" << oldhost.getAddressString() << "]. Redirecting request " << data->getTxnId();
+  }
+  data->setShouldClaim(true);
+
+  setHostForTopic(data->getTopic(), h);
+  DuplexChannelPtr newchannel;
+  try {
+    if (data->getType() == SUBSCRIBE) {
+      SubscriberClientChannelHandlerPtr handler(new SubscriberClientChannelHandler(selfptr, this->getSubscriberImpl(), data));
+      ChannelHandlerPtr basehandler = handler;
+      
+      newchannel = createChannelForTopic(data->getTopic(), basehandler);
+      handler->setChannel(newchannel);
+      
+      getSubscriberImpl().doSubscribe(newchannel, data, handler);
+    } else {
+      newchannel = getChannelForTopic(data->getTopic());
+      
+      if (data->getType() == PUBLISH) {
+	getPublisherImpl().doPublish(newchannel, data);
+      } else {
+	getSubscriberImpl().doUnsubscribe(newchannel, data);
+      }
+    }
+  } catch (ShuttingDownException& e) {
+    return; // no point in redirecting if we're shutting down
+  }
+}
+
+ClientImpl::~ClientImpl() {
+  if (LOG.isDebugEnabled()) {
+    LOG.debugStream() << "deleting Clientimpl " << this;
+  }
+}
+
+DuplexChannelPtr ClientImpl::createChannelForTopic(const std::string& topic, ChannelHandlerPtr& handler) {
+  // get the host address
+  // create a channel to the host
+  HostAddress addr = topic2host[topic];
+  if (addr.isNullHost()) {
+    addr = HostAddress::fromString(conf.getDefaultServer());
+  }
+
+  DuplexChannelPtr channel(new DuplexChannel(addr, conf, handler));
+  channel->connect();
+
+  allchannels_lock.lock();
+  if (shuttingDownFlag) {
+    channel->kill();
+    allchannels_lock.unlock();
+    throw ShuttingDownException();
+  }
+  allchannels[channel.get()] = channel;
+  if (LOG.isDebugEnabled()) {
+    LOG.debugStream() << "(create) All channels size: " << allchannels.size();
+  }
+  allchannels_lock.unlock();
+
+  return channel;
+}
+
+DuplexChannelPtr ClientImpl::getChannelForTopic(const std::string& topic) {
+  HostAddress addr = topic2host[topic];
+  DuplexChannelPtr channel = host2channel[addr];
+
+  if (channel.get() == 0 || addr.isNullHost()) {
+    ChannelHandlerPtr handler(new HedwigClientChannelHandler(selfptr));
+    channel = createChannelForTopic(topic, handler);
+    host2channel_lock.lock();
+    host2channel[addr] = channel;
+    host2channel_lock.unlock();
+    return channel;
+  }
+
+  return channel;
+}
+
+void ClientImpl::setHostForTopic(const std::string& topic, const HostAddress& host) {
+  topic2host_lock.lock();
+  topic2host[topic] = host;
+  topic2host_lock.unlock();
+}
+
+bool ClientImpl::shuttingDown() const {
+  return shuttingDownFlag;
+}
+
+/**
+   A channel has just died. Remove it so we never give it to any other publisher or subscriber.
+   
+   This does not delete the channel. Some publishers or subscribers will still hold it and will be errored
+   when they try to do anything with it. 
+*/
+void ClientImpl::channelDied(DuplexChannel* channel) {
+  if (shuttingDownFlag) {
+    return;
+  }
+
+  host2topics_lock.lock();
+  host2channel_lock.lock();
+  topic2host_lock.lock();
+  allchannels_lock.lock();
+  // get host
+  HostAddress addr = channel->getHostAddress();
+  
+  for (Host2TopicsMap::iterator iter = host2topics.find(addr); iter != host2topics.end(); ++iter) {
+    topic2host.erase((*iter).second);
+  }
+  host2topics.erase(addr);
+  host2channel.erase(addr);
+
+  allchannels.erase(channel); // channel should be deleted here
+
+  allchannels_lock.unlock();
+  host2topics_lock.unlock();
+  host2channel_lock.unlock();
+  topic2host_lock.unlock();
+}

Added: hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/lib/clientimpl.h
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/lib/clientimpl.h?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/lib/clientimpl.h (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/lib/clientimpl.h Thu Aug 19 21:25:13 2010
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef HEDWIG_CLIENT_IMPL_H
+#define HEDWIG_CLIENT_IMPL_H
+
+#include <hedwig/client.h>
+#include <hedwig/protocol.h>
+
+#include <tr1/unordered_map>
+#include <list>
+#include "util.h"
+#include <pthread.h>
+#include "channel.h"
+#include "data.h"
+
+namespace Hedwig {
+  class SyncOperationCallback : public OperationCallback, public WaitConditionBase {
+  public:
+    SyncOperationCallback() : response(PENDING) {}
+    virtual void operationComplete();
+    virtual void operationFailed(const std::exception& exception);
+    
+    virtual bool isTrue();
+
+    void throwExceptionIfNeeded();
+    
+  private:
+    enum { 
+      PENDING, 
+      SUCCESS,
+      NOCONNECT,
+      SERVICEDOWN,
+      NOT_SUBSCRIBED,
+      ALREADY_SUBSCRIBED,
+      UNKNOWN
+    } response;
+  };
+
+  class HedwigClientChannelHandler : public ChannelHandler {
+  public:
+    HedwigClientChannelHandler(ClientImplPtr& client);
+    
+    virtual void messageReceived(DuplexChannel* channel, const PubSubResponse& m);
+    virtual void channelConnected(DuplexChannel* channel);
+    virtual void channelDisconnected(DuplexChannel* channel, const std::exception& e);
+    virtual void exceptionOccurred(DuplexChannel* channel, const std::exception& e);
+    
+  protected:
+    ClientImplPtr client;
+  };
+  
+  class PublisherImpl;
+  class SubscriberImpl;
+  
+  /**
+     Implementation of the hedwig client. This class takes care of globals such as the topic->host map and the transaction id counter.
+  */
+  class ClientImpl {
+  public:
+    static ClientImplPtr& Create(const Configuration& conf);
+    void Destroy();
+
+    Subscriber& getSubscriber();
+    Publisher& getPublisher();
+
+    ClientTxnCounter& counter();
+
+    void redirectRequest(DuplexChannel* channel, PubSubDataPtr& data, const PubSubResponse& response);
+
+    const HostAddress& getHostForTopic(const std::string& topic);
+
+    DuplexChannelPtr createChannelForTopic(const std::string& topic, ChannelHandlerPtr& handler);
+    DuplexChannelPtr getChannelForTopic(const std::string& topic);
+    
+    void setHostForTopic(const std::string& topic, const HostAddress& host);
+
+    void setChannelForHost(const HostAddress& address, DuplexChannel* channel);
+    void channelDied(DuplexChannel* channel);
+    bool shuttingDown() const;
+    
+    SubscriberImpl& getSubscriberImpl();
+    PublisherImpl& getPublisherImpl();
+
+    ~ClientImpl();
+  private:
+    ClientImpl(const Configuration& conf);
+
+    ClientImplPtr selfptr;
+
+    const Configuration& conf;
+    PublisherImpl* publisher;
+    SubscriberImpl* subscriber;
+    ClientTxnCounter counterobj;
+
+
+    typedef std::tr1::unordered_multimap<HostAddress, std::string> Host2TopicsMap;
+    Host2TopicsMap host2topics;
+    Mutex host2topics_lock;
+
+    std::tr1::unordered_map<HostAddress, DuplexChannelPtr> host2channel;
+    Mutex host2channel_lock;
+    std::tr1::unordered_map<std::string, HostAddress> topic2host;
+    Mutex topic2host_lock;
+
+    Mutex publishercreate_lock;
+    Mutex subscribercreate_lock;
+
+    typedef std::tr1::unordered_map<DuplexChannel*, DuplexChannelPtr> ChannelMap;
+    ChannelMap allchannels;
+    Mutex allchannels_lock;
+
+    bool shuttingDownFlag;
+  };
+};
+#endif

Added: hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/lib/data.h
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/lib/data.h?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/lib/data.h (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/lib/data.h Thu Aug 19 21:25:13 2010
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef DATA_H
+#define DATA_H
+
+#include <hedwig/protocol.h>
+#include <hedwig/callback.h>
+
+#include <pthread.h>
+#include <tr1/unordered_set>
+#include "util.h"
+
+namespace Hedwig {
+  /**
+     Simple counter for transaction ids from the client
+  */
+  class ClientTxnCounter {
+  public:
+    ClientTxnCounter();
+    ~ClientTxnCounter();
+    long next();
+    
+  private:
+    long counter;
+    Mutex mutex;
+  };
+
+  class PubSubData;
+  typedef std::tr1::shared_ptr<PubSubData> PubSubDataPtr;
+
+  /**
+     Data structure to hold information about requests and build request messages.
+     Used to store requests which may need to be resent to another server. 
+   */
+  class PubSubData {
+  public:
+    // to be used for publish
+    static PubSubDataPtr forPublishRequest(long txnid, const std::string& topic, const std::string& body, const OperationCallbackPtr& callback);
+    static PubSubDataPtr forSubscribeRequest(long txnid, const std::string& subscriberid, const std::string& topic, const OperationCallbackPtr& callback, SubscribeRequest::CreateOrAttach mode);
+    static PubSubDataPtr forUnsubscribeRequest(long txnid, const std::string& subscriberid, const std::string& topic, const OperationCallbackPtr& callback);
+    static PubSubDataPtr forConsumeRequest(long txnid, const std::string& subscriberid, const std::string& topic, const MessageSeqId msgid);
+
+    ~PubSubData();
+
+    OperationType getType() const;
+    long getTxnId() const;
+    const std::string& getSubscriberId() const;
+    const std::string& getTopic() const;
+    const std::string& getBody() const;
+
+    void setShouldClaim(bool shouldClaim);
+
+    const PubSubRequest& getRequest();
+    void setCallback(const OperationCallbackPtr& callback);
+    OperationCallbackPtr& getCallback();
+    SubscribeRequest::CreateOrAttach getMode() const;
+
+    void addTriedServer(HostAddress& h);
+    bool hasTriedServer(HostAddress& h);
+    void clearTriedServers();
+  private:
+    PubSubData();
+    PubSubRequest* request;
+    
+    OperationType type;
+    long txnid;
+    std::string subscriberid;
+    std::string topic;
+    std::string body;
+    bool shouldClaim;
+    OperationCallbackPtr callback;
+    SubscribeRequest::CreateOrAttach mode;
+    MessageSeqId msgid;
+    std::tr1::unordered_set<HostAddress> triedservers;
+  };
+  
+
+};
+#endif

Added: hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/lib/exceptions.cpp
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/lib/exceptions.cpp?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/lib/exceptions.cpp (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/lib/exceptions.cpp Thu Aug 19 21:25:13 2010
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <hedwig/exceptions.h>
+#include <stdlib.h>
+#include <string.h>
+
+using namespace Hedwig;
+
+
+
+  

Added: hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/lib/publisherimpl.cpp
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/lib/publisherimpl.cpp?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/lib/publisherimpl.cpp (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/lib/publisherimpl.cpp Thu Aug 19 21:25:13 2010
@@ -0,0 +1,87 @@
+ /**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "publisherimpl.h"
+#include "channel.h"
+
+#include <log4cpp/Category.hh>
+
+static log4cpp::Category &LOG = log4cpp::Category::getInstance("hedwig."__FILE__);
+
+using namespace Hedwig;
+
+PublishWriteCallback::PublishWriteCallback(ClientImplPtr& client, const PubSubDataPtr& data) : client(client), data(data) {}
+
+void PublishWriteCallback::operationComplete() {
+  if (LOG.isDebugEnabled()) {
+    LOG.debugStream() << "Successfully wrote transaction: " << data->getTxnId();
+  }
+}
+
+void PublishWriteCallback::operationFailed(const std::exception& exception) {
+  LOG.errorStream() << "Error writing to publisher " << exception.what();
+  
+  //remove txn from channel pending list
+  #warning "Actually do something here"
+}
+
+PublisherImpl::PublisherImpl(ClientImplPtr& client) 
+  : client(client) {
+}
+
+void PublisherImpl::publish(const std::string& topic, const std::string& message) {
+  SyncOperationCallback* cb = new SyncOperationCallback();
+  OperationCallbackPtr callback(cb);
+  asyncPublish(topic, message, callback);
+  cb->wait();
+  
+  cb->throwExceptionIfNeeded();  
+}
+
+void PublisherImpl::asyncPublish(const std::string& topic, const std::string& message, const OperationCallbackPtr& callback) {
+  DuplexChannelPtr channel = client->getChannelForTopic(topic);
+
+  // use release after callback to release the channel after the callback is called
+  PubSubDataPtr data = PubSubData::forPublishRequest(client->counter().next(), topic, message, callback);
+  
+  doPublish(channel, data);
+}
+
+void PublisherImpl::doPublish(const DuplexChannelPtr& channel, const PubSubDataPtr& data) {
+  channel->storeTransaction(data);
+  
+  OperationCallbackPtr writecb(new PublishWriteCallback(client, data));
+  LOG.debugStream() << "dopublish";
+  channel->writeRequest(data->getRequest(), writecb);
+}
+
+void PublisherImpl::messageHandler(const PubSubResponse& m, const PubSubDataPtr& txn) {
+  switch (m.statuscode()) {
+  case SUCCESS:
+    txn->getCallback()->operationComplete();
+    break;
+  case SERVICE_DOWN:
+    LOG.errorStream() << "Server responsed with SERVICE_DOWN for " << txn->getTxnId();
+    txn->getCallback()->operationFailed(ServiceDownException());
+    break;
+  default:
+    LOG.errorStream() << "Unexpected response " << m.statuscode() << " for " << txn->getTxnId();
+    txn->getCallback()->operationFailed(UnexpectedResponseException());
+    break;
+  }
+}

Added: hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/lib/publisherimpl.h
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/lib/publisherimpl.h?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/lib/publisherimpl.h (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/lib/publisherimpl.h Thu Aug 19 21:25:13 2010
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef PUBLISHER_IMPL_H
+#define PUBLISHER_IMPL_H
+
+#include <hedwig/publish.h>
+#include <hedwig/callback.h>
+#include "clientimpl.h"
+
+namespace Hedwig {
+  class PublishWriteCallback : public OperationCallback {
+  public:
+    PublishWriteCallback(ClientImplPtr& client, const PubSubDataPtr& data);
+
+    void operationComplete();
+    void operationFailed(const std::exception& exception);
+  private:
+    ClientImplPtr client;
+    PubSubDataPtr data;
+  };
+
+  class PublisherImpl : public Publisher {
+  public:
+    PublisherImpl(ClientImplPtr& client);
+
+    void publish(const std::string& topic, const std::string& message);
+    void asyncPublish(const std::string& topic, const std::string& message, const OperationCallbackPtr& callback);
+    
+    void messageHandler(const PubSubResponse& m, const PubSubDataPtr& txn);
+
+    void doPublish(const DuplexChannelPtr& channel, const PubSubDataPtr& data);
+
+  private:
+    ClientImplPtr client;
+  };
+
+};
+
+#endif

Added: hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/lib/subscriberimpl.cpp
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/lib/subscriberimpl.cpp?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/lib/subscriberimpl.cpp (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/lib/subscriberimpl.cpp Thu Aug 19 21:25:13 2010
@@ -0,0 +1,387 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "subscriberimpl.h"
+#include "util.h"
+#include "channel.h"
+
+#include <log4cpp/Category.hh>
+
+static log4cpp::Category &LOG = log4cpp::Category::getInstance("hedwig."__FILE__);
+const int SUBSCRIBER_RECONNECT_TIME = 3000; // 3 seconds
+using namespace Hedwig;
+
+SubscriberWriteCallback::SubscriberWriteCallback(ClientImplPtr& client, const PubSubDataPtr& data) : client(client), data(data) {}
+
+void SubscriberWriteCallback::operationComplete() {
+  if (LOG.isDebugEnabled()) {
+    LOG.debugStream() << "Successfully wrote subscribe transaction: " << data->getTxnId();
+  }
+}
+
+void SubscriberWriteCallback::operationFailed(const std::exception& exception) {
+  LOG.errorStream() << "Error writing to publisher " << exception.what();
+  
+  //remove txn from channel pending list
+  #warning "Actually do something here"
+}
+
+UnsubscribeWriteCallback::UnsubscribeWriteCallback(ClientImplPtr& client, const PubSubDataPtr& data) : client(client), data(data) {}
+
+void UnsubscribeWriteCallback::operationComplete() {
+  
+}
+
+void UnsubscribeWriteCallback::operationFailed(const std::exception& exception) {
+  #warning "Actually do something here"
+}
+  
+ConsumeWriteCallback::ConsumeWriteCallback(const PubSubDataPtr& data) 
+  : data(data) {
+}
+
+ConsumeWriteCallback::~ConsumeWriteCallback() {
+}
+
+void ConsumeWriteCallback::operationComplete() {
+  if (LOG.isDebugEnabled()) {
+    LOG.debugStream() << "Successfully wrote consume transaction: " << data->getTxnId();
+  }
+}
+
+void ConsumeWriteCallback::operationFailed(const std::exception& exception) {
+  LOG.errorStream() << "Error writing consume transaction: " << data->getTxnId() << " error: " << exception.what();
+}
+
+SubscriberConsumeCallback::SubscriberConsumeCallback(ClientImplPtr& client, const std::string& topic, const std::string& subscriberid, const MessageSeqId& msgid) 
+  : client(client), topic(topic), subscriberid(subscriberid), msgid(msgid)
+{
+}
+
+void SubscriberConsumeCallback::operationComplete() {
+  LOG.errorStream() << "ConsumeCallback::operationComplete";
+  client->getSubscriber().consume(topic, subscriberid, msgid);
+}
+
+void SubscriberConsumeCallback::operationFailed(const std::exception& exception) {
+  LOG.errorStream() << "ConsumeCallback::operationFailed";
+}
+
+SubscriberReconnectCallback::SubscriberReconnectCallback(ClientImplPtr& client, const PubSubDataPtr& origData) 
+  : client(client), origData(origData) {
+}
+
+void SubscriberReconnectCallback::operationComplete() {
+}
+
+void SubscriberReconnectCallback::operationFailed(const std::exception& exception) {
+  
+}
+
+SubscriberClientChannelHandler::SubscriberClientChannelHandler(ClientImplPtr& client, SubscriberImpl& subscriber, const PubSubDataPtr& data)
+  : HedwigClientChannelHandler(client), subscriber(subscriber), origData(data), closed(false)  {
+  if (LOG.isDebugEnabled()) {
+    LOG.debugStream() << "Creating SubscriberClientChannelHandler " << this;
+  }
+}
+
+SubscriberClientChannelHandler::~SubscriberClientChannelHandler() {
+  if (LOG.isDebugEnabled()) {
+    LOG.debugStream() << "Cleaning up SubscriberClientChannelHandler " << this;
+  }
+}
+
+void SubscriberClientChannelHandler::messageReceived(DuplexChannel* channel, const PubSubResponse& m) {
+  if (m.has_message()) {
+    if (LOG.isDebugEnabled()) {
+      LOG.debugStream() << "Message received (topic:" << origData->getTopic() << ", subscriberId:" << origData->getSubscriberId() << ")";
+    }
+
+    if (this->handler.get()) {
+      OperationCallbackPtr callback(new SubscriberConsumeCallback(client, origData->getTopic(), origData->getSubscriberId(), m.message().msgid()));
+      this->handler->consume(origData->getTopic(), origData->getSubscriberId(), m.message(), callback);
+    } else {
+      LOG.debugStream() << "putting in queue";
+      queue.push_back(m.message());
+    }
+  } else {
+    HedwigClientChannelHandler::messageReceived(channel, m);
+  }
+}
+
+void SubscriberClientChannelHandler::close() {
+  closed = true;
+  if (channel) {
+    channel->kill();
+  }
+}
+
+void SubscriberClientChannelHandler::channelDisconnected(DuplexChannel* channel, const std::exception& e) {
+  // has subscription been closed
+  if (closed) {
+    return;
+  }
+
+  // Clean up the channel from all maps
+  client->channelDied(channel);
+  if (client->shuttingDown()) {
+    return;
+  }
+  
+  // setup pubsub data for reconnection attempt
+  origData->clearTriedServers();
+  OperationCallbackPtr newcallback(new SubscriberReconnectCallback(client, origData));
+  origData->setCallback(newcallback);
+
+  // Create a new handler for the new channel
+  SubscriberClientChannelHandlerPtr handler(new SubscriberClientChannelHandler(client, subscriber, origData));  
+  ChannelHandlerPtr baseptr = handler;
+  // if there is an error createing the channel, sleep for SUBSCRIBER_RECONNECT_TIME and try again
+  DuplexChannelPtr newchannel;
+  while (true) {
+    try {
+      newchannel = client->createChannelForTopic(origData->getTopic(), baseptr);
+      handler->setChannel(newchannel);
+      break;
+    } catch (ShuttingDownException& e) {
+      LOG.errorStream() << "Shutting down, don't try to reconnect";
+      return; 
+    } catch (ChannelException& e) {
+      LOG.errorStream() << "Couldn't acquire channel, sleeping for " << SUBSCRIBER_RECONNECT_TIME << " before trying again";
+      usleep(SUBSCRIBER_RECONNECT_TIME);
+    }
+  } 
+  handoverDelivery(handler.get());
+  
+  // remove record of the failed channel from the subscriber
+  subscriber.closeSubscription(origData->getTopic(), origData->getSubscriberId());
+
+  // subscriber
+  subscriber.doSubscribe(newchannel, origData, handler);
+}
+
+void SubscriberClientChannelHandler::startDelivery(const MessageHandlerCallbackPtr& handler) {
+  this->handler = handler;
+  
+  while (!queue.empty()) {    
+    LOG.debugStream() << "Taking from queue";
+    Message m = queue.front();
+    queue.pop_front();
+
+    OperationCallbackPtr callback(new SubscriberConsumeCallback(client, origData->getTopic(), origData->getSubscriberId(), m.msgid()));
+
+    this->handler->consume(origData->getTopic(), origData->getSubscriberId(), m, callback);
+  }
+}
+
+void SubscriberClientChannelHandler::stopDelivery() {
+  this->handler = MessageHandlerCallbackPtr();
+}
+
+
+void SubscriberClientChannelHandler::handoverDelivery(SubscriberClientChannelHandler* newHandler) {
+  LOG.debugStream() << "Messages in queue " << queue.size();
+  MessageHandlerCallbackPtr handler = this->handler;
+  stopDelivery(); // resets old handler
+  newHandler->startDelivery(handler);
+}
+
+void SubscriberClientChannelHandler::setChannel(const DuplexChannelPtr& channel) {
+  this->channel = channel;
+}
+
+DuplexChannelPtr& SubscriberClientChannelHandler::getChannel() {
+  return channel;
+}
+
+SubscriberImpl::SubscriberImpl(ClientImplPtr& client) 
+  : client(client) 
+{
+}
+
+SubscriberImpl::~SubscriberImpl() 
+{
+  LOG.debugStream() << "deleting subscriber" << this;
+}
+
+
+void SubscriberImpl::subscribe(const std::string& topic, const std::string& subscriberId, const SubscribeRequest::CreateOrAttach mode) {
+  SyncOperationCallback* cb = new SyncOperationCallback();
+  OperationCallbackPtr callback(cb);
+  asyncSubscribe(topic, subscriberId, mode, callback);
+  cb->wait();
+  
+  cb->throwExceptionIfNeeded();  
+}
+
+void SubscriberImpl::asyncSubscribe(const std::string& topic, const std::string& subscriberId, const SubscribeRequest::CreateOrAttach mode, const OperationCallbackPtr& callback) {
+  PubSubDataPtr data = PubSubData::forSubscribeRequest(client->counter().next(), subscriberId, topic, callback, mode);
+
+  SubscriberClientChannelHandlerPtr handler(new SubscriberClientChannelHandler(client, *this, data));  
+  ChannelHandlerPtr baseptr = handler;
+  DuplexChannelPtr channel = client->createChannelForTopic(topic, baseptr);
+  
+  handler->setChannel(channel);
+
+  doSubscribe(channel, data, handler);
+}
+
+void SubscriberImpl::doSubscribe(const DuplexChannelPtr& channel, const PubSubDataPtr& data, const SubscriberClientChannelHandlerPtr& handler) {
+  LOG.debugStream() << "doSubscribe";
+  channel->storeTransaction(data);
+
+  OperationCallbackPtr writecb(new SubscriberWriteCallback(client, data));
+  channel->writeRequest(data->getRequest(), writecb);
+
+  topicsubscriber2handler_lock.lock();
+  TopicSubscriber t(data->getTopic(), data->getSubscriberId());
+  SubscriberClientChannelHandlerPtr oldhandler = topicsubscriber2handler[t];
+  if (oldhandler != NULL) {
+    oldhandler->handoverDelivery(handler.get());
+  }
+  topicsubscriber2handler[t] = handler;
+  if (LOG.isDebugEnabled()) {
+    LOG.debugStream() << "Set topic subscriber for topic(" << data->getTopic() << ") subscriberId(" << data->getSubscriberId() << ") to " << handler.get() << " topicsubscriber2topic(" << &topicsubscriber2handler << ")";
+  }
+  topicsubscriber2handler_lock.unlock();;
+}
+
+void SubscriberImpl::unsubscribe(const std::string& topic, const std::string& subscriberId) {
+  SyncOperationCallback* cb = new SyncOperationCallback();
+  OperationCallbackPtr callback(cb);
+  asyncUnsubscribe(topic, subscriberId, callback);
+  cb->wait();
+  
+  cb->throwExceptionIfNeeded();
+}
+
+void SubscriberImpl::asyncUnsubscribe(const std::string& topic, const std::string& subscriberId, const OperationCallbackPtr& callback) {
+  closeSubscription(topic, subscriberId);
+
+  PubSubDataPtr data = PubSubData::forUnsubscribeRequest(client->counter().next(), subscriberId, topic, callback);
+  
+  DuplexChannelPtr channel = client->getChannelForTopic(topic);
+  if (channel.get() == 0) {
+    LOG.errorStream() << "Trying to unsubscribe from (" << topic << ", " << subscriberId << ") but channel is dead";
+    callback->operationFailed(InvalidStateException());
+    return;
+  }
+  
+  doUnsubscribe(channel, data);  
+}
+
+void SubscriberImpl::doUnsubscribe(const DuplexChannelPtr& channel, const PubSubDataPtr& data) {
+  channel->storeTransaction(data);
+  OperationCallbackPtr writecb(new UnsubscribeWriteCallback(client, data));
+  channel->writeRequest(data->getRequest(), writecb);
+}
+
+void SubscriberImpl::consume(const std::string& topic, const std::string& subscriberId, const MessageSeqId& messageSeqId) {
+  TopicSubscriber t(topic, subscriberId);
+
+  topicsubscriber2handler_lock.lock();
+  SubscriberClientChannelHandlerPtr handler = topicsubscriber2handler[t];
+  topicsubscriber2handler_lock.unlock();
+
+  if (handler.get() == 0) {
+    LOG.errorStream() << "Cannot consume. Bad handler for topic(" << topic << ") subscriberId(" << subscriberId << ") topicsubscriber2topic(" << &topicsubscriber2handler << ")";
+    return;
+  }
+
+  DuplexChannelPtr channel = handler->getChannel();
+  if (channel.get() == 0) {
+    LOG.errorStream() << "Trying to consume a message on a topic/subscriber pair that don't have a channel. Something fishy going on. Topic: " << topic << " SubscriberId: " << subscriberId << " MessageSeqId: " << messageSeqId.localcomponent();
+  }
+  
+  PubSubDataPtr data = PubSubData::forConsumeRequest(client->counter().next(), subscriberId, topic, messageSeqId);  
+  OperationCallbackPtr writecb(new ConsumeWriteCallback(data));
+  channel->writeRequest(data->getRequest(), writecb);
+}
+
+void SubscriberImpl::startDelivery(const std::string& topic, const std::string& subscriberId, const MessageHandlerCallbackPtr& callback) {
+  TopicSubscriber t(topic, subscriberId);
+
+  topicsubscriber2handler_lock.lock();
+  SubscriberClientChannelHandlerPtr handler = topicsubscriber2handler[t];
+  topicsubscriber2handler_lock.unlock();
+
+  if (handler.get() == 0) {
+    LOG.errorStream() << "Trying to start deliver on a non existant handler topic = " << topic << ", subscriber = " << subscriberId;
+  }
+  handler->startDelivery(callback);
+}
+
+void SubscriberImpl::stopDelivery(const std::string& topic, const std::string& subscriberId) {
+  TopicSubscriber t(topic, subscriberId);
+
+  topicsubscriber2handler_lock.lock();
+  SubscriberClientChannelHandlerPtr handler = topicsubscriber2handler[t];
+  topicsubscriber2handler_lock.unlock();
+
+  if (handler.get() == 0) {
+    LOG.errorStream() << "Trying to start deliver on a non existant handler topic = " << topic << ", subscriber = " << subscriberId;
+  }
+  handler->stopDelivery();
+}
+
+void SubscriberImpl::closeSubscription(const std::string& topic, const std::string& subscriberId) {
+  if (LOG.isDebugEnabled()) {
+    LOG.debugStream() << "closeSubscription (" << topic << ",  " << subscriberId << ")";
+  }
+  TopicSubscriber t(topic, subscriberId);
+
+  topicsubscriber2handler_lock.lock();;
+  SubscriberClientChannelHandlerPtr handler = topicsubscriber2handler[t];
+  topicsubscriber2handler.erase(t);
+  topicsubscriber2handler_lock.unlock();;
+  if (handler) {
+    handler->close();
+  }
+}
+
+/**
+   takes ownership of txn
+*/
+void SubscriberImpl::messageHandler(const PubSubResponse& m, const PubSubDataPtr& txn) {
+  if (!txn.get()) {
+    LOG.errorStream() << "Invalid transaction";
+    return;
+  }
+
+  if (LOG.isDebugEnabled()) {
+    LOG.debugStream() << "message received with status " << m.statuscode();
+  }
+  switch (m.statuscode()) {
+  case SUCCESS:
+    txn->getCallback()->operationComplete();
+    break;
+  case SERVICE_DOWN:
+    txn->getCallback()->operationFailed(ServiceDownException());
+    break;
+  case CLIENT_ALREADY_SUBSCRIBED:
+  case TOPIC_BUSY:
+    txn->getCallback()->operationFailed(AlreadySubscribedException());
+    break;
+  case CLIENT_NOT_SUBSCRIBED:
+    txn->getCallback()->operationFailed(NotSubscribedException());
+    break;
+  default:
+    txn->getCallback()->operationFailed(UnexpectedResponseException());
+    break;
+  }
+}

Added: hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/lib/subscriberimpl.h
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/lib/subscriberimpl.h?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/lib/subscriberimpl.h (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/lib/subscriberimpl.h Thu Aug 19 21:25:13 2010
@@ -0,0 +1,149 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef SUBSCRIBE_IMPL_H
+#define SUBSCRIBE_IMPL_H
+
+#include <hedwig/subscribe.h>
+#include <hedwig/callback.h>
+#include "clientimpl.h"
+#include <utility>
+#include <tr1/memory>
+#include <deque>
+
+namespace Hedwig {
+  class SubscriberWriteCallback : public OperationCallback {
+  public:
+    SubscriberWriteCallback(ClientImplPtr& client, const PubSubDataPtr& data);
+
+    void operationComplete();
+    void operationFailed(const std::exception& exception);
+  private:
+    ClientImplPtr client;
+    PubSubDataPtr data;
+  };
+  
+  class UnsubscribeWriteCallback : public OperationCallback {
+  public:
+    UnsubscribeWriteCallback(ClientImplPtr& client, const PubSubDataPtr& data);
+
+    void operationComplete();
+    void operationFailed(const std::exception& exception);
+  private:
+    ClientImplPtr client;
+    PubSubDataPtr data;
+  };
+
+  class ConsumeWriteCallback : public OperationCallback {
+  public:
+    ConsumeWriteCallback(const PubSubDataPtr& data);
+    ~ConsumeWriteCallback();
+
+    void operationComplete();
+    void operationFailed(const std::exception& exception);
+  private:
+    PubSubDataPtr data;
+    };
+
+  class SubscriberReconnectCallback : public OperationCallback {
+  public: 
+    SubscriberReconnectCallback(ClientImplPtr& client, const PubSubDataPtr& origData);
+
+    void operationComplete();
+    void operationFailed(const std::exception& exception);
+  private:
+    ClientImplPtr client;
+    PubSubDataPtr origData;
+  };
+
+  class SubscriberClientChannelHandler;
+  typedef std::tr1::shared_ptr<SubscriberClientChannelHandler> SubscriberClientChannelHandlerPtr;
+
+  class SubscriberConsumeCallback : public OperationCallback {
+  public: 
+    SubscriberConsumeCallback(ClientImplPtr& client, const std::string& topic, const std::string& subscriberid, const MessageSeqId& msgid);
+
+    void operationComplete();
+    void operationFailed(const std::exception& exception);
+  private:
+    ClientImplPtr client;
+    const std::string topic;
+    const std::string subscriberid;
+    MessageSeqId msgid;
+  };
+
+  class SubscriberClientChannelHandler : public HedwigClientChannelHandler {
+  public: 
+    SubscriberClientChannelHandler(ClientImplPtr& client, SubscriberImpl& subscriber, const PubSubDataPtr& data);
+    ~SubscriberClientChannelHandler();
+
+    void messageReceived(DuplexChannel* channel, const PubSubResponse& m);
+    void channelDisconnected(DuplexChannel* channel, const std::exception& e);
+
+    void startDelivery(const MessageHandlerCallbackPtr& handler);
+    void stopDelivery();
+
+    void handoverDelivery(SubscriberClientChannelHandler* newHandler);
+
+    void setChannel(const DuplexChannelPtr& channel);
+    DuplexChannelPtr& getChannel();
+
+    void close();
+  private:
+
+    SubscriberImpl& subscriber;
+#warning "put some limit on this to stop it growing forever"
+    std::deque<Message> queue;
+    MessageHandlerCallbackPtr handler;
+    PubSubDataPtr origData;
+    DuplexChannelPtr channel;
+    bool closed;
+  };
+
+  class SubscriberImpl : public Subscriber {
+  public:
+    SubscriberImpl(ClientImplPtr& client);
+    ~SubscriberImpl();
+
+    void subscribe(const std::string& topic, const std::string& subscriberId, const SubscribeRequest::CreateOrAttach mode);
+    void asyncSubscribe(const std::string& topic, const std::string& subscriberId, const SubscribeRequest::CreateOrAttach mode, const OperationCallbackPtr& callback);
+    
+    void unsubscribe(const std::string& topic, const std::string& subscriberId);
+    void asyncUnsubscribe(const std::string& topic, const std::string& subscriberId, const OperationCallbackPtr& callback);
+
+    void consume(const std::string& topic, const std::string& subscriberId, const MessageSeqId& messageSeqId);
+
+    void startDelivery(const std::string& topic, const std::string& subscriberId, const MessageHandlerCallbackPtr& callback);
+    void stopDelivery(const std::string& topic, const std::string& subscriberId);
+
+    void closeSubscription(const std::string& topic, const std::string& subscriberId);
+
+    void messageHandler(const PubSubResponse& m, const PubSubDataPtr& txn);
+
+    void doSubscribe(const DuplexChannelPtr& channel, const PubSubDataPtr& data, const SubscriberClientChannelHandlerPtr& handler);
+    void doUnsubscribe(const DuplexChannelPtr& channel, const PubSubDataPtr& data);
+
+  private:
+    ClientImplPtr client;
+    
+    std::tr1::unordered_map<TopicSubscriber, SubscriberClientChannelHandlerPtr> topicsubscriber2handler;
+    Mutex topicsubscriber2handler_lock;	    
+  };
+
+};
+
+#endif

Added: hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/lib/util.cpp
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/lib/util.cpp?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/lib/util.cpp (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/lib/util.cpp Thu Aug 19 21:25:13 2010
@@ -0,0 +1,233 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <string>
+
+#include <netdb.h>
+#include <errno.h>
+#include "util.h"
+#include "channel.h"
+#include <log4cpp/Category.hh>
+#include <sys/types.h>
+#include <sys/socket.h>
+
+static log4cpp::Category &LOG = log4cpp::Category::getInstance("hedwig."__FILE__);
+
+using namespace Hedwig;
+
+#define MAX_HOSTNAME_LENGTH 256
+const std::string UNITIALISED_HOST("UNINITIALISED HOST");
+
+const int DEFAULT_PORT = 4080;
+const int DEFAULT_SSL_PORT = 9876;
+
+HostAddress::HostAddress() : initialised(false), address_str() {
+  memset(&socket_addr, 0, sizeof(struct sockaddr_in));
+}
+
+HostAddress::~HostAddress() {
+}
+
+bool HostAddress::isNullHost() const {
+  return !initialised;
+}
+
+bool HostAddress::operator==(const HostAddress& other) const {
+  return (other.ip() == ip() && other.port() == port());
+}
+
+const std::string& HostAddress::getAddressString() const {
+  if (!isNullHost()) {
+    return address_str;
+  } else {
+    return UNITIALISED_HOST;
+  }
+}
+   
+uint32_t HostAddress::ip() const {
+  return ntohl(socket_addr.sin_addr.s_addr);;
+}
+
+uint16_t HostAddress::port() const {
+  return ntohs(socket_addr.sin_port);
+}
+
+const struct sockaddr_in& HostAddress::socketAddress() const {
+  return socket_addr;
+}
+
+
+void HostAddress::parse_string() {
+  char* url = strdup(address_str.c_str());
+
+  if (url == NULL) {
+    LOG.errorStream() << "You seems to be out of memory";
+    throw OomException();
+  }
+  int port = DEFAULT_PORT;
+  int sslport = DEFAULT_SSL_PORT;
+
+  char *colon = strchr(url, ':');
+  if (colon) {
+    *colon = 0;
+    colon++;
+    
+    char* sslcolon = strchr(colon, ':');
+    if (sslcolon) {
+      *sslcolon = 0;
+      sslcolon++;
+      
+      sslport = strtol(sslcolon, NULL, 10);
+      if (sslport == 0) {
+	LOG.errorStream() << "Invalid SSL port given: [" << sslcolon << "]";
+	free((void*)url);
+	throw InvalidPortException();
+      }
+    }
+    
+    port = strtol(colon, NULL, 10);
+    if (port == 0) {
+      LOG.errorStream() << "Invalid port given: [" << colon << "]";
+      free((void*)url);
+      throw InvalidPortException();
+    }
+  }
+
+  int err = 0;
+  
+  struct addrinfo *addr;
+  struct addrinfo hints;
+
+  memset(&hints, 0, sizeof(struct addrinfo));
+  hints.ai_family = AF_INET;
+
+  err = getaddrinfo(url, NULL, &hints, &addr);
+  if (err != 0) {
+    LOG.errorStream() << "Couldn't resolve host [" << url << "]:" << hstrerror(err);
+    free((void*)url);
+    throw HostResolutionException();
+  }
+
+  sockaddr_in* sa_ptr = (sockaddr_in*)addr->ai_addr;
+  socket_addr = *sa_ptr;
+  socket_addr.sin_port = htons(port); 
+  //socket_addr.sin_family = AF_INET;
+
+  free((void*)url);
+  free((void*)addr);
+}
+
+HostAddress HostAddress::fromString(std::string str) {
+  HostAddress h;
+  h.address_str = str;
+  h.parse_string();
+  h.initialised = true;
+  return h;
+}
+
+WaitConditionBase::WaitConditionBase() {
+  pthread_mutex_init(&mutex, NULL);
+  pthread_cond_init(&cond, NULL);  
+}
+
+WaitConditionBase::~WaitConditionBase() {
+  pthread_mutex_destroy(&mutex);
+  pthread_cond_destroy(&cond);
+}
+    
+void WaitConditionBase::wait() {
+  pthread_mutex_lock(&mutex);
+  while (!isTrue()) {
+    if (LOG.isDebugEnabled()) {
+      LOG.debugStream() << "wait: condition is false for " << this;
+    }
+
+    pthread_cond_wait(&cond, &mutex); 
+  }
+  pthread_mutex_unlock(&mutex);
+}
+
+void WaitConditionBase::lock() {
+  pthread_mutex_lock(&mutex);
+}
+
+void WaitConditionBase::signalAndUnlock() {
+  if (LOG.isDebugEnabled()) {
+    LOG.debugStream() << "signal: signal " << this;
+  }
+  
+  pthread_cond_signal(&cond);
+  
+  pthread_mutex_unlock(&mutex);
+}
+
+Mutex::Mutex() {
+  if (LOG.isDebugEnabled()) {
+    LOG.debugStream() << "Creating mutex " << this;
+  }
+  int error = pthread_mutex_init(&mutex, NULL);
+  if (error != 0) {
+    LOG.errorStream() << "Error initiating mutex " << error;
+  }
+}
+
+Mutex::~Mutex() {
+  if (LOG.isDebugEnabled()) {
+    LOG.debugStream() << "Destroying mutex " << this;
+  }
+
+  int error = pthread_mutex_destroy(&mutex);
+  if (error != 0) {
+    LOG.errorStream() << "Error destroying mutex " << this << " " << error;
+  }
+}
+
+void Mutex::lock() {
+  if (LOG.isDebugEnabled()) {
+    LOG.debugStream() << "Locking mutex " << this;
+  }
+    
+  int error = pthread_mutex_lock(&mutex);
+  if (error != 0) {
+    LOG.errorStream() << "Error locking mutex " << this << " " << error;
+  }
+}
+
+void Mutex::unlock() {
+  if (LOG.isDebugEnabled()) {
+    LOG.debugStream() << "Unlocking mutex " << this;
+  }
+
+  int error = pthread_mutex_unlock(&mutex);
+  if (error != 0) {
+    LOG.errorStream() << "Error unlocking mutex " << this << " " << error;
+  }
+}
+
+std::size_t std::tr1::hash<HostAddress>::operator()(const HostAddress& address) const {
+  return (address.ip() << 16) & (address.port());
+}
+
+std::size_t std::tr1::hash<DuplexChannel*>::operator()(const DuplexChannel* channel) const {
+  return reinterpret_cast<std::size_t>(channel);
+}
+
+std::size_t std::tr1::hash<TopicSubscriber>::operator()(const TopicSubscriber& topicsub) const {
+  std::string fullstr = topicsub.first + topicsub.second;
+  return std::tr1::hash<std::string>()(fullstr);
+}
+

Added: hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/lib/util.h
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/lib/util.h?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/lib/util.h (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/lib/util.h Thu Aug 19 21:25:13 2010
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef HEDWIG_UTIL_H
+#define HEDWIG_UTIL_H
+
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <hedwig/exceptions.h>
+#include <hedwig/callback.h>
+#include <list>
+#include <iostream>
+#include <utility>
+#include <tr1/functional>
+#include <semaphore.h>
+#include <pthread.h>
+
+namespace Hedwig {
+  typedef std::pair<const std::string, const std::string> TopicSubscriber;
+
+  /**
+     Representation of a hosts address
+  */
+  class HostAddress {
+  public:
+    HostAddress();
+    ~HostAddress();
+
+    bool operator==(const HostAddress& other) const;
+    
+    bool isNullHost() const;
+    const std::string& getAddressString() const;
+    uint32_t ip() const;
+    uint16_t port() const;
+    const sockaddr_in& socketAddress() const;
+
+    static HostAddress fromString(std::string host);
+
+  private:
+
+    void parse_string();
+    
+    bool initialised;
+    std::string address_str;
+    struct sockaddr_in socket_addr;
+  };
+
+  class DuplexChannel;  
+  
+  class Mutex {
+  public:
+    Mutex();
+    ~Mutex();
+    
+    void lock();
+    void unlock();
+  private:
+    pthread_mutex_t mutex;
+  };
+
+  class WaitConditionBase {
+  public:
+    WaitConditionBase();
+    virtual ~WaitConditionBase();
+    
+    void wait(); 
+    void lock();
+    void signalAndUnlock();
+
+    virtual bool isTrue() = 0;
+  private:
+
+    pthread_mutex_t mutex;
+    pthread_cond_t cond;    
+  };
+
+};
+
+namespace std 
+{
+  namespace tr1 
+  {
+  /**
+     Hash a host address. Takes the least significant 16-bits of the address and the 16-bits of the
+     port and packs them into one 32-bit number. While collisons are theoretically very possible, they
+     shouldn't happen as the hedwig servers should be in the same subnet.
+  */
+  template <> struct hash<Hedwig::HostAddress> : public unary_function<Hedwig::HostAddress, size_t> {
+    size_t operator()(const Hedwig::HostAddress& address) const;
+  };
+
+  /**
+     Hash a channel pointer, just returns the pointer.
+  */
+  template <> struct hash<Hedwig::DuplexChannel*> : public unary_function<Hedwig::DuplexChannel*, size_t> {
+    size_t operator()(const Hedwig::DuplexChannel* channel) const;
+  };
+
+  /**
+     Hash a channel pointer, just returns the pointer.
+  */
+  template <> struct hash<Hedwig::TopicSubscriber> : public unary_function<Hedwig::TopicSubscriber, size_t> {
+    size_t operator()(const Hedwig::TopicSubscriber& topicsub) const;
+  };
+  }
+}
+#endif

Added: hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/log4cpp.conf
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/log4cpp.conf?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/log4cpp.conf (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/log4cpp.conf Thu Aug 19 21:25:13 2010
@@ -0,0 +1,49 @@
+#
+# 
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+# 
+#
+
+log4j.appender.rootAppender=org.apache.log4j.ConsoleAppender
+log4j.appender.rootAppender.layout=org.apache.log4j.BasicLayout
+
+#log4j.appender.hedwig=org.apache.log4j.RollingFileAppender
+log4j.appender.hedwig=org.apache.log4j.ConsoleAppender
+#log4j.appender.hedwig.fileName=./testLog.log
+log4j.appender.hedwig.layout=org.apache.log4j.PatternLayout
+log4j.appender.hedwig.layout.ConversionPattern=[%d{%H:%M:%S.%l}] %t %c %p - %m%n
+log4j.appender.hedwig.layout=org.apache.log4j.PatternLayout
+log4j.appender.hedwig.layout.ConversionPattern=%.5m%n
+
+log4j.appender.hedwigtest=org.apache.log4j.ConsoleAppender
+#log4j.appender.hedwig.fileName=./testLog.log
+log4j.appender.hedwigtest.layout=org.apache.log4j.PatternLayout
+log4j.appender.hedwigtest.layout.ConversionPattern=[%d{%H:%M:%S.%l}] %c %p - %m%n
+log4j.appender.hedwigtest.layout=org.apache.log4j.PatternLayout
+log4j.appender.hedwigtest.layout.ConversionPattern=%.5m%n
+
+# category
+log4j.category.hedwig=DEBUG, hedwig
+log4j.rootCategory=DEBUG
+
+log4j.category.hedwig.channel=ERROR
+log4j.category.hedwig.util=ERROR
+log4j.category.hedwigtest.servercontrol=ERROR
+
+log4j.category.hedwigtest=DEBUG, hedwigtest
+log4j.rootCategory=DEBUG

Added: hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/m4/ax_doxygen.m4
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/m4/ax_doxygen.m4?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/m4/ax_doxygen.m4 (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/m4/ax_doxygen.m4 Thu Aug 19 21:25:13 2010
@@ -0,0 +1,533 @@
+# ===========================================================================
+#      http://www.gnu.org/software/autoconf-archive/ax_prog_doxygen.html
+# ===========================================================================
+#
+# SYNOPSIS
+#
+#   DX_INIT_DOXYGEN(PROJECT-NAME, DOXYFILE-PATH, [OUTPUT-DIR])
+#   DX_DOXYGEN_FEATURE(ON|OFF)
+#   DX_DOT_FEATURE(ON|OFF)
+#   DX_HTML_FEATURE(ON|OFF)
+#   DX_CHM_FEATURE(ON|OFF)
+#   DX_CHI_FEATURE(ON|OFF)
+#   DX_MAN_FEATURE(ON|OFF)
+#   DX_RTF_FEATURE(ON|OFF)
+#   DX_XML_FEATURE(ON|OFF)
+#   DX_PDF_FEATURE(ON|OFF)
+#   DX_PS_FEATURE(ON|OFF)
+#
+# DESCRIPTION
+#
+#   The DX_*_FEATURE macros control the default setting for the given
+#   Doxygen feature. Supported features are 'DOXYGEN' itself, 'DOT' for
+#   generating graphics, 'HTML' for plain HTML, 'CHM' for compressed HTML
+#   help (for MS users), 'CHI' for generating a seperate .chi file by the
+#   .chm file, and 'MAN', 'RTF', 'XML', 'PDF' and 'PS' for the appropriate
+#   output formats. The environment variable DOXYGEN_PAPER_SIZE may be
+#   specified to override the default 'a4wide' paper size.
+#
+#   By default, HTML, PDF and PS documentation is generated as this seems to
+#   be the most popular and portable combination. MAN pages created by
+#   Doxygen are usually problematic, though by picking an appropriate subset
+#   and doing some massaging they might be better than nothing. CHM and RTF
+#   are specific for MS (note that you can't generate both HTML and CHM at
+#   the same time). The XML is rather useless unless you apply specialized
+#   post-processing to it.
+#
+#   The macros mainly control the default state of the feature. The use can
+#   override the default by specifying --enable or --disable. The macros
+#   ensure that contradictory flags are not given (e.g.,
+#   --enable-doxygen-html and --enable-doxygen-chm,
+#   --enable-doxygen-anything with --disable-doxygen, etc.) Finally, each
+#   feature will be automatically disabled (with a warning) if the required
+#   programs are missing.
+#
+#   Once all the feature defaults have been specified, call DX_INIT_DOXYGEN
+#   with the following parameters: a one-word name for the project for use
+#   as a filename base etc., an optional configuration file name (the
+#   default is 'Doxyfile', the same as Doxygen's default), and an optional
+#   output directory name (the default is 'doxygen-doc').
+#
+#   Automake Support
+#
+#   The following is a template aminclude.am file for use with Automake.
+#   Make targets and variables values are controlled by the various
+#   DX_COND_* conditionals set by autoconf.
+#
+#   The provided targets are:
+#
+#     doxygen-doc: Generate all doxygen documentation.
+#
+#     doxygen-run: Run doxygen, which will generate some of the
+#                  documentation (HTML, CHM, CHI, MAN, RTF, XML)
+#                  but will not do the post processing required
+#                  for the rest of it (PS, PDF, and some MAN).
+#
+#     doxygen-man: Rename some doxygen generated man pages.
+#
+#     doxygen-ps:  Generate doxygen PostScript documentation.
+#
+#     doxygen-pdf: Generate doxygen PDF documentation.
+#
+#   Note that by default these are not integrated into the automake targets.
+#   If doxygen is used to generate man pages, you can achieve this
+#   integration by setting man3_MANS to the list of man pages generated and
+#   then adding the dependency:
+#
+#     $(man3_MANS): doxygen-doc
+#
+#   This will cause make to run doxygen and generate all the documentation.
+#
+#   The following variable is intended for use in Makefile.am:
+#
+#     DX_CLEANFILES = everything to clean.
+#
+#   Then add this variable to MOSTLYCLEANFILES.
+#
+#     ----- begin aminclude.am -------------------------------------
+#
+#     ## --------------------------------- ##
+#     ## Format-independent Doxygen rules. ##
+#     ## --------------------------------- ##
+#
+#     if DX_COND_doc
+#
+#     ## ------------------------------- ##
+#     ## Rules specific for HTML output. ##
+#     ## ------------------------------- ##
+#
+#     if DX_COND_html
+#
+#     DX_CLEAN_HTML = @DX_DOCDIR@/html
+#
+#     endif DX_COND_html
+#
+#     ## ------------------------------ ##
+#     ## Rules specific for CHM output. ##
+#     ## ------------------------------ ##
+#
+#     if DX_COND_chm
+#
+#     DX_CLEAN_CHM = @DX_DOCDIR@/chm
+#
+#     if DX_COND_chi
+#
+#     DX_CLEAN_CHI = @DX_DOCDIR@/@PACKAGE@.chi
+#
+#     endif DX_COND_chi
+#
+#     endif DX_COND_chm
+#
+#     ## ------------------------------ ##
+#     ## Rules specific for MAN output. ##
+#     ## ------------------------------ ##
+#
+#     if DX_COND_man
+#
+#     DX_CLEAN_MAN = @DX_DOCDIR@/man
+#
+#     endif DX_COND_man
+#
+#     ## ------------------------------ ##
+#     ## Rules specific for RTF output. ##
+#     ## ------------------------------ ##
+#
+#     if DX_COND_rtf
+#
+#     DX_CLEAN_RTF = @DX_DOCDIR@/rtf
+#
+#     endif DX_COND_rtf
+#
+#     ## ------------------------------ ##
+#     ## Rules specific for XML output. ##
+#     ## ------------------------------ ##
+#
+#     if DX_COND_xml
+#
+#     DX_CLEAN_XML = @DX_DOCDIR@/xml
+#
+#     endif DX_COND_xml
+#
+#     ## ----------------------------- ##
+#     ## Rules specific for PS output. ##
+#     ## ----------------------------- ##
+#
+#     if DX_COND_ps
+#
+#     DX_CLEAN_PS = @DX_DOCDIR@/@PACKAGE@.ps
+#
+#     DX_PS_GOAL = doxygen-ps
+#
+#     doxygen-ps: @DX_DOCDIR@/@PACKAGE@.ps
+#
+#     @DX_DOCDIR@/@PACKAGE@.ps: @DX_DOCDIR@/@PACKAGE@.tag
+#         cd @DX_DOCDIR@/latex; \
+#         rm -f *.aux *.toc *.idx *.ind *.ilg *.log *.out; \
+#         $(DX_LATEX) refman.tex; \
+#         $(MAKEINDEX_PATH) refman.idx; \
+#         $(DX_LATEX) refman.tex; \
+#         countdown=5; \
+#         while $(DX_EGREP) 'Rerun (LaTeX|to get cross-references right)' \
+#                           refman.log > /dev/null 2>&1 \
+#            && test $$countdown -gt 0; do \
+#             $(DX_LATEX) refman.tex; \
+#             countdown=`expr $$countdown - 1`; \
+#         done; \
+#         $(DX_DVIPS) -o ../@PACKAGE@.ps refman.dvi
+#
+#     endif DX_COND_ps
+#
+#     ## ------------------------------ ##
+#     ## Rules specific for PDF output. ##
+#     ## ------------------------------ ##
+#
+#     if DX_COND_pdf
+#
+#     DX_CLEAN_PDF = @DX_DOCDIR@/@PACKAGE@.pdf
+#
+#     DX_PDF_GOAL = doxygen-pdf
+#
+#     doxygen-pdf: @DX_DOCDIR@/@PACKAGE@.pdf
+#
+#     @DX_DOCDIR@/@PACKAGE@.pdf: @DX_DOCDIR@/@PACKAGE@.tag
+#         cd @DX_DOCDIR@/latex; \
+#         rm -f *.aux *.toc *.idx *.ind *.ilg *.log *.out; \
+#         $(DX_PDFLATEX) refman.tex; \
+#         $(DX_MAKEINDEX) refman.idx; \
+#         $(DX_PDFLATEX) refman.tex; \
+#         countdown=5; \
+#         while $(DX_EGREP) 'Rerun (LaTeX|to get cross-references right)' \
+#                           refman.log > /dev/null 2>&1 \
+#            && test $$countdown -gt 0; do \
+#             $(DX_PDFLATEX) refman.tex; \
+#             countdown=`expr $$countdown - 1`; \
+#         done; \
+#         mv refman.pdf ../@PACKAGE@.pdf
+#
+#     endif DX_COND_pdf
+#
+#     ## ------------------------------------------------- ##
+#     ## Rules specific for LaTeX (shared for PS and PDF). ##
+#     ## ------------------------------------------------- ##
+#
+#     if DX_COND_latex
+#
+#     DX_CLEAN_LATEX = @DX_DOCDIR@/latex
+#
+#     endif DX_COND_latex
+#
+#     .PHONY: doxygen-run doxygen-doc $(DX_PS_GOAL) $(DX_PDF_GOAL)
+#
+#     .INTERMEDIATE: doxygen-run $(DX_PS_GOAL) $(DX_PDF_GOAL)
+#
+#     doxygen-run: @DX_DOCDIR@/@PACKAGE@.tag
+#
+#     doxygen-doc: doxygen-run $(DX_PS_GOAL) $(DX_PDF_GOAL)
+#
+#     @DX_DOCDIR@/@PACKAGE@.tag: $(DX_CONFIG) $(pkginclude_HEADERS)
+#         rm -rf @DX_DOCDIR@
+#         $(DX_ENV) $(DX_DOXYGEN) $(srcdir)/$(DX_CONFIG)
+#
+#     DX_CLEANFILES = \
+#         @DX_DOCDIR@/@PACKAGE@.tag \
+#         -r \
+#         $(DX_CLEAN_HTML) \
+#         $(DX_CLEAN_CHM) \
+#         $(DX_CLEAN_CHI) \
+#         $(DX_CLEAN_MAN) \
+#         $(DX_CLEAN_RTF) \
+#         $(DX_CLEAN_XML) \
+#         $(DX_CLEAN_PS) \
+#         $(DX_CLEAN_PDF) \
+#         $(DX_CLEAN_LATEX)
+#
+#     endif DX_COND_doc
+#
+#     ----- end aminclude.am ---------------------------------------
+#
+# LICENSE
+#
+#   Copyright (c) 2009 Oren Ben-Kiki <oren@ben-kiki.org>
+#
+#   Copying and distribution of this file, with or without modification, are
+#   permitted in any medium without royalty provided the copyright notice
+#   and this notice are preserved. This file is offered as-is, without any
+#   warranty.
+
+#serial 10
+
+## ----------##
+## Defaults. ##
+## ----------##
+
+DX_ENV=""
+AC_DEFUN([DX_FEATURE_doc],  ON)
+AC_DEFUN([DX_FEATURE_dot],  ON)
+AC_DEFUN([DX_FEATURE_man],  OFF)
+AC_DEFUN([DX_FEATURE_html], ON)
+AC_DEFUN([DX_FEATURE_chm],  OFF)
+AC_DEFUN([DX_FEATURE_chi],  OFF)
+AC_DEFUN([DX_FEATURE_rtf],  OFF)
+AC_DEFUN([DX_FEATURE_xml],  OFF)
+AC_DEFUN([DX_FEATURE_pdf],  ON)
+AC_DEFUN([DX_FEATURE_ps],   ON)
+
+## --------------- ##
+## Private macros. ##
+## --------------- ##
+
+# DX_ENV_APPEND(VARIABLE, VALUE)
+# ------------------------------
+# Append VARIABLE="VALUE" to DX_ENV for invoking doxygen.
+AC_DEFUN([DX_ENV_APPEND], [AC_SUBST([DX_ENV], ["$DX_ENV $1='$2'"])])
+
+# DX_DIRNAME_EXPR
+# ---------------
+# Expand into a shell expression prints the directory part of a path.
+AC_DEFUN([DX_DIRNAME_EXPR],
+         [[expr ".$1" : '\(\.\)[^/]*$' \| "x$1" : 'x\(.*\)/[^/]*$']])
+
+# DX_IF_FEATURE(FEATURE, IF-ON, IF-OFF)
+# -------------------------------------
+# Expands according to the M4 (static) status of the feature.
+AC_DEFUN([DX_IF_FEATURE], [ifelse(DX_FEATURE_$1, ON, [$2], [$3])])
+
+# DX_REQUIRE_PROG(VARIABLE, PROGRAM)
+# ----------------------------------
+# Require the specified program to be found for the DX_CURRENT_FEATURE to work.
+AC_DEFUN([DX_REQUIRE_PROG], [
+AC_PATH_TOOL([$1], [$2])
+if test "$DX_FLAG_[]DX_CURRENT_FEATURE$$1" = 1; then
+    AC_MSG_WARN([$2 not found - will not DX_CURRENT_DESCRIPTION])
+    AC_SUBST(DX_FLAG_[]DX_CURRENT_FEATURE, 0)
+fi
+])
+
+# DX_TEST_FEATURE(FEATURE)
+# ------------------------
+# Expand to a shell expression testing whether the feature is active.
+AC_DEFUN([DX_TEST_FEATURE], [test "$DX_FLAG_$1" = 1])
+
+# DX_CHECK_DEPEND(REQUIRED_FEATURE, REQUIRED_STATE)
+# -------------------------------------------------
+# Verify that a required features has the right state before trying to turn on
+# the DX_CURRENT_FEATURE.
+AC_DEFUN([DX_CHECK_DEPEND], [
+test "$DX_FLAG_$1" = "$2" \
+|| AC_MSG_ERROR([doxygen-DX_CURRENT_FEATURE ifelse([$2], 1,
+                            requires, contradicts) doxygen-DX_CURRENT_FEATURE])
+])
+
+# DX_CLEAR_DEPEND(FEATURE, REQUIRED_FEATURE, REQUIRED_STATE)
+# ----------------------------------------------------------
+# Turn off the DX_CURRENT_FEATURE if the required feature is off.
+AC_DEFUN([DX_CLEAR_DEPEND], [
+test "$DX_FLAG_$1" = "$2" || AC_SUBST(DX_FLAG_[]DX_CURRENT_FEATURE, 0)
+])
+
+# DX_FEATURE_ARG(FEATURE, DESCRIPTION,
+#                CHECK_DEPEND, CLEAR_DEPEND,
+#                REQUIRE, DO-IF-ON, DO-IF-OFF)
+# --------------------------------------------
+# Parse the command-line option controlling a feature. CHECK_DEPEND is called
+# if the user explicitly turns the feature on (and invokes DX_CHECK_DEPEND),
+# otherwise CLEAR_DEPEND is called to turn off the default state if a required
+# feature is disabled (using DX_CLEAR_DEPEND). REQUIRE performs additional
+# requirement tests (DX_REQUIRE_PROG). Finally, an automake flag is set and
+# DO-IF-ON or DO-IF-OFF are called according to the final state of the feature.
+AC_DEFUN([DX_ARG_ABLE], [
+    AC_DEFUN([DX_CURRENT_FEATURE], [$1])
+    AC_DEFUN([DX_CURRENT_DESCRIPTION], [$2])
+    AC_ARG_ENABLE(doxygen-$1,
+                  [AS_HELP_STRING(DX_IF_FEATURE([$1], [--disable-doxygen-$1],
+                                                      [--enable-doxygen-$1]),
+                                  DX_IF_FEATURE([$1], [don't $2], [$2]))],
+                  [
+case "$enableval" in
+#(
+y|Y|yes|Yes|YES)
+    AC_SUBST([DX_FLAG_$1], 1)
+    $3
+;; #(
+n|N|no|No|NO)
+    AC_SUBST([DX_FLAG_$1], 0)
+;; #(
+*)
+    AC_MSG_ERROR([invalid value '$enableval' given to doxygen-$1])
+;;
+esac
+], [
+AC_SUBST([DX_FLAG_$1], [DX_IF_FEATURE([$1], 1, 0)])
+$4
+])
+if DX_TEST_FEATURE([$1]); then
+    $5
+    :
+fi
+if DX_TEST_FEATURE([$1]); then
+    AM_CONDITIONAL(DX_COND_$1, :)
+    $6
+    :
+else
+    AM_CONDITIONAL(DX_COND_$1, false)
+    $7
+    :
+fi
+])
+
+## -------------- ##
+## Public macros. ##
+## -------------- ##
+
+# DX_XXX_FEATURE(DEFAULT_STATE)
+# -----------------------------
+AC_DEFUN([DX_DOXYGEN_FEATURE], [AC_DEFUN([DX_FEATURE_doc],  [$1])])
+AC_DEFUN([DX_MAN_FEATURE],     [AC_DEFUN([DX_FEATURE_man],  [$1])])
+AC_DEFUN([DX_HTML_FEATURE],    [AC_DEFUN([DX_FEATURE_html], [$1])])
+AC_DEFUN([DX_CHM_FEATURE],     [AC_DEFUN([DX_FEATURE_chm],  [$1])])
+AC_DEFUN([DX_CHI_FEATURE],     [AC_DEFUN([DX_FEATURE_chi],  [$1])])
+AC_DEFUN([DX_RTF_FEATURE],     [AC_DEFUN([DX_FEATURE_rtf],  [$1])])
+AC_DEFUN([DX_XML_FEATURE],     [AC_DEFUN([DX_FEATURE_xml],  [$1])])
+AC_DEFUN([DX_XML_FEATURE],     [AC_DEFUN([DX_FEATURE_xml],  [$1])])
+AC_DEFUN([DX_PDF_FEATURE],     [AC_DEFUN([DX_FEATURE_pdf],  [$1])])
+AC_DEFUN([DX_PS_FEATURE],      [AC_DEFUN([DX_FEATURE_ps],   [$1])])
+
+# DX_INIT_DOXYGEN(PROJECT, [CONFIG-FILE], [OUTPUT-DOC-DIR])
+# ---------------------------------------------------------
+# PROJECT also serves as the base name for the documentation files.
+# The default CONFIG-FILE is "Doxyfile" and OUTPUT-DOC-DIR is "doxygen-doc".
+AC_DEFUN([DX_INIT_DOXYGEN], [
+
+# Files:
+AC_SUBST([DX_PROJECT], [$1])
+AC_SUBST([DX_CONFIG], [ifelse([$2], [], Doxyfile, [$2])])
+AC_SUBST([DX_DOCDIR], [ifelse([$3], [], doxygen-doc, [$3])])
+
+# Environment variables used inside doxygen.cfg:
+DX_ENV_APPEND(SRCDIR, $srcdir)
+DX_ENV_APPEND(PROJECT, $DX_PROJECT)
+DX_ENV_APPEND(DOCDIR, $DX_DOCDIR)
+DX_ENV_APPEND(VERSION, $PACKAGE_VERSION)
+
+# Doxygen itself:
+DX_ARG_ABLE(doc, [generate any doxygen documentation],
+            [],
+            [],
+            [DX_REQUIRE_PROG([DX_DOXYGEN], doxygen)
+             DX_REQUIRE_PROG([DX_PERL], perl)],
+            [DX_ENV_APPEND(PERL_PATH, $DX_PERL)])
+
+# Dot for graphics:
+DX_ARG_ABLE(dot, [generate graphics for doxygen documentation],
+            [DX_CHECK_DEPEND(doc, 1)],
+            [DX_CLEAR_DEPEND(doc, 1)],
+            [DX_REQUIRE_PROG([DX_DOT], dot)],
+            [DX_ENV_APPEND(HAVE_DOT, YES)
+             DX_ENV_APPEND(DOT_PATH, [`DX_DIRNAME_EXPR($DX_DOT)`])],
+            [DX_ENV_APPEND(HAVE_DOT, NO)])
+
+# Man pages generation:
+DX_ARG_ABLE(man, [generate doxygen manual pages],
+            [DX_CHECK_DEPEND(doc, 1)],
+            [DX_CLEAR_DEPEND(doc, 1)],
+            [],
+            [DX_ENV_APPEND(GENERATE_MAN, YES)],
+            [DX_ENV_APPEND(GENERATE_MAN, NO)])
+
+# RTF file generation:
+DX_ARG_ABLE(rtf, [generate doxygen RTF documentation],
+            [DX_CHECK_DEPEND(doc, 1)],
+            [DX_CLEAR_DEPEND(doc, 1)],
+            [],
+            [DX_ENV_APPEND(GENERATE_RTF, YES)],
+            [DX_ENV_APPEND(GENERATE_RTF, NO)])
+
+# XML file generation:
+DX_ARG_ABLE(xml, [generate doxygen XML documentation],
+            [DX_CHECK_DEPEND(doc, 1)],
+            [DX_CLEAR_DEPEND(doc, 1)],
+            [],
+            [DX_ENV_APPEND(GENERATE_XML, YES)],
+            [DX_ENV_APPEND(GENERATE_XML, NO)])
+
+# (Compressed) HTML help generation:
+DX_ARG_ABLE(chm, [generate doxygen compressed HTML help documentation],
+            [DX_CHECK_DEPEND(doc, 1)],
+            [DX_CLEAR_DEPEND(doc, 1)],
+            [DX_REQUIRE_PROG([DX_HHC], hhc)],
+            [DX_ENV_APPEND(HHC_PATH, $DX_HHC)
+             DX_ENV_APPEND(GENERATE_HTML, YES)
+             DX_ENV_APPEND(GENERATE_HTMLHELP, YES)],
+            [DX_ENV_APPEND(GENERATE_HTMLHELP, NO)])
+
+# Seperate CHI file generation.
+DX_ARG_ABLE(chi, [generate doxygen seperate compressed HTML help index file],
+            [DX_CHECK_DEPEND(chm, 1)],
+            [DX_CLEAR_DEPEND(chm, 1)],
+            [],
+            [DX_ENV_APPEND(GENERATE_CHI, YES)],
+            [DX_ENV_APPEND(GENERATE_CHI, NO)])
+
+# Plain HTML pages generation:
+DX_ARG_ABLE(html, [generate doxygen plain HTML documentation],
+            [DX_CHECK_DEPEND(doc, 1) DX_CHECK_DEPEND(chm, 0)],
+            [DX_CLEAR_DEPEND(doc, 1) DX_CLEAR_DEPEND(chm, 0)],
+            [],
+            [DX_ENV_APPEND(GENERATE_HTML, YES)],
+            [DX_TEST_FEATURE(chm) || DX_ENV_APPEND(GENERATE_HTML, NO)])
+
+# PostScript file generation:
+DX_ARG_ABLE(ps, [generate doxygen PostScript documentation],
+            [DX_CHECK_DEPEND(doc, 1)],
+            [DX_CLEAR_DEPEND(doc, 1)],
+            [DX_REQUIRE_PROG([DX_LATEX], latex)
+             DX_REQUIRE_PROG([DX_MAKEINDEX], makeindex)
+             DX_REQUIRE_PROG([DX_DVIPS], dvips)
+             DX_REQUIRE_PROG([DX_EGREP], egrep)])
+
+# PDF file generation:
+DX_ARG_ABLE(pdf, [generate doxygen PDF documentation],
+            [DX_CHECK_DEPEND(doc, 1)],
+            [DX_CLEAR_DEPEND(doc, 1)],
+            [DX_REQUIRE_PROG([DX_PDFLATEX], pdflatex)
+             DX_REQUIRE_PROG([DX_MAKEINDEX], makeindex)
+             DX_REQUIRE_PROG([DX_EGREP], egrep)])
+
+# LaTeX generation for PS and/or PDF:
+if DX_TEST_FEATURE(ps) || DX_TEST_FEATURE(pdf); then
+    AM_CONDITIONAL(DX_COND_latex, :)
+    DX_ENV_APPEND(GENERATE_LATEX, YES)
+else
+    AM_CONDITIONAL(DX_COND_latex, false)
+    DX_ENV_APPEND(GENERATE_LATEX, NO)
+fi
+
+# Paper size for PS and/or PDF:
+AC_ARG_VAR(DOXYGEN_PAPER_SIZE,
+           [a4wide (default), a4, letter, legal or executive])
+case "$DOXYGEN_PAPER_SIZE" in
+#(
+"")
+    AC_SUBST(DOXYGEN_PAPER_SIZE, "")
+;; #(
+a4wide|a4|letter|legal|executive)
+    DX_ENV_APPEND(PAPER_SIZE, $DOXYGEN_PAPER_SIZE)
+;; #(
+*)
+    AC_MSG_ERROR([unknown DOXYGEN_PAPER_SIZE='$DOXYGEN_PAPER_SIZE'])
+;;
+esac
+
+#For debugging:
+#echo DX_FLAG_doc=$DX_FLAG_doc
+#echo DX_FLAG_dot=$DX_FLAG_dot
+#echo DX_FLAG_man=$DX_FLAG_man
+#echo DX_FLAG_html=$DX_FLAG_html
+#echo DX_FLAG_chm=$DX_FLAG_chm
+#echo DX_FLAG_chi=$DX_FLAG_chi
+#echo DX_FLAG_rtf=$DX_FLAG_rtf
+#echo DX_FLAG_xml=$DX_FLAG_xml
+#echo DX_FLAG_pdf=$DX_FLAG_pdf
+#echo DX_FLAG_ps=$DX_FLAG_ps
+#echo DX_ENV=$DX_ENV
+])

Added: hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/test/Makefile.am
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/test/Makefile.am?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/test/Makefile.am (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/test/Makefile.am Thu Aug 19 21:25:13 2010
@@ -0,0 +1,6 @@
+bin_PROGRAMS = hedwigtest
+hedwigtest_SOURCES = main.cpp utiltest.cpp pubsubdatatest.cpp publishtest.cpp subscribetest.cpp servercontrol.cpp pubsubtest.cpp
+hedwigtest_CPPFLAGS = -I../inc $(DEPS_CFLAGS)
+hedwigtest_LDADD = $(DEPS_LIBS) -L../lib -lhedwig01
+hedwigtest_LDFLAGS = -no-undefined
+

Added: hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/test/main.cpp
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/test/main.cpp?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/test/main.cpp (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/test/main.cpp Thu Aug 19 21:25:13 2010
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "../lib/channel.h"
+#include "../lib/util.h"
+#include <hedwig/protocol.h>
+#include <hedwig/callback.h>
+#include <iostream>
+#include <log4cpp/PropertyConfigurator.hh>
+#include <log4cpp/Category.hh>
+#include "servercontrol.h"
+
+#include <cppunit/extensions/TestFactoryRegistry.h>
+#include <cppunit/ui/text/TextTestRunner.h>
+
+int main( int argc, char **argv)
+{
+  try {
+    log4cpp::PropertyConfigurator::configure("../log4cpp.conf");
+  } catch (log4cpp::ConfigureFailure &e) {
+    std::cerr << "log4cpp configuration failure while loading : " << e.what() << std::endl;
+  } catch (std::exception &e) {
+    std::cerr << "exception caught while configuring log4cpp via : " << e.what() << std::endl;
+  } catch (...) {
+    std::cerr << "unknown exception while configuring log4cpp vi'." << std::endl;
+  }
+  std::string testPath = (argc > 2) ? std::string(argv[2]) : "";
+
+  CppUnit::TextTestRunner runner;
+
+  if (argc > 1) {
+    CppUnit::TestFactoryRegistry &registry = CppUnit::TestFactoryRegistry::getRegistry(argv[1]);
+    
+    runner.addTest( registry.makeTest() );
+  } else {
+    CppUnit::TestFactoryRegistry &registry = CppUnit::TestFactoryRegistry::getRegistry("*");
+    registry.addRegistry("Util");
+    registry.addRegistry("Subscribe");
+    registry.addRegistry("Publish"); 
+    registry.addRegistry("PubSub");
+    
+    runner.addTest( registry.makeTest() );
+  }
+  int ret =  runner.run(testPath);
+  google::protobuf::ShutdownProtobufLibrary();
+  
+  log4cpp::Category::shutdown();
+  
+  return ret;
+}



Mime
View raw message