zookeeper-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From iv...@apache.org
Subject svn commit: r1369767 - in /zookeeper/bookkeeper/trunk: ./ hedwig-client/src/main/cpp/inc/hedwig/ hedwig-client/src/main/cpp/lib/ hedwig-client/src/main/cpp/test/
Date Mon, 06 Aug 2012 10:10:18 GMT
Author: ivank
Date: Mon Aug  6 10:10:17 2012
New Revision: 1369767

URL: http://svn.apache.org/viewvc?rev=1369767&view=rev
Log:
BOOKKEEPER-339: Let hedwig cpp client support returning message seq id for publish requests.
(sijie via ivank)

Modified:
    zookeeper/bookkeeper/trunk/CHANGES.txt
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/inc/hedwig/callback.h
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/inc/hedwig/publish.h
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/clientimpl.h
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/data.cpp
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/data.h
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/publisherimpl.cpp
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/publisherimpl.h
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/subscriberimpl.cpp
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/subscriberimpl.h
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/util.cpp
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/util.h
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/publishtest.cpp
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/util.h

Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1369767&r1=1369766&r2=1369767&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Mon Aug  6 10:10:17 2012
@@ -6,9 +6,15 @@ Trunk (unreleased changes)
 
     IMPROVEMENTS:
 
-      BOOKKEEPER-203: improve ledger manager interface to remove zookeeper dependency on
metadata operations. (sijie via ivank)
+      bookkeeper-server:
+
+        BOOKKEEPER-203: improve ledger manager interface to remove zookeeper dependency on
metadata operations. (sijie via ivank)
+
+        BOOKKEEPER-303: LedgerMetadata should serialized using protobufs (ivank)
+
+      hedwig-client:
 
-      BOOKKEEPER-303: LedgerMetadata should serialized using protobufs (ivank)
+        BOOKKEEPER-339: Let hedwig cpp client support returning message seq id for publish
requests. (sijie via ivank)
 
   Backward compatible changes:
 

Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/inc/hedwig/callback.h
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/inc/hedwig/callback.h?rev=1369767&r1=1369766&r2=1369767&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/inc/hedwig/callback.h (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/inc/hedwig/callback.h Mon Aug  6
10:10:17 2012
@@ -29,6 +29,16 @@
 #endif
 
 namespace Hedwig {
+
+  template<class R>
+  class Callback {
+  public:
+    virtual void operationComplete(const R& result) = 0;
+    virtual void operationFailed(const std::exception& exception) = 0;
+
+    virtual ~Callback() {};
+  };
+
   class OperationCallback {
   public:
     virtual void operationComplete() = 0;

Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/inc/hedwig/publish.h
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/inc/hedwig/publish.h?rev=1369767&r1=1369766&r2=1369767&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/inc/hedwig/publish.h (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/inc/hedwig/publish.h Mon Aug  6
10:10:17 2012
@@ -27,6 +27,10 @@
 
 namespace Hedwig {
 
+  typedef std::tr1::shared_ptr<PublishResponse> PublishResponsePtr;
+  typedef Callback<PublishResponsePtr> PublishResponseCallback;
+  typedef std::tr1::shared_ptr<PublishResponseCallback> PublishResponseCallbackPtr;
+
   /**
      Interface for publishing to a hedwig instance.
   */
@@ -38,9 +42,9 @@ namespace Hedwig {
        @param topic Topic to publish to.
        @param message Data to publish for topic.
     */
-    virtual void publish(const std::string& topic, const std::string& message) =
0;
+    virtual PublishResponsePtr publish(const std::string& topic, const std::string&
message) = 0;
     
-    virtual void publish(const std::string& topic, const Message& message) = 0;
+    virtual PublishResponsePtr publish(const std::string& topic, const Message& message)
= 0;
 
     /** 
 	Asynchronously publish message for topic. 
@@ -58,6 +62,9 @@ namespace Hedwig {
     
     virtual void asyncPublish(const std::string& topic, const Message& message, const
OperationCallbackPtr& callback) = 0;
 
+    virtual void asyncPublishWithResponse(const std::string& topic, const Message&
messsage,
+                                          const PublishResponseCallbackPtr& callback)
= 0;
+
     virtual ~Publisher() {}
   };
 };

Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/clientimpl.h
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/clientimpl.h?rev=1369767&r1=1369766&r2=1369767&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/clientimpl.h (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/clientimpl.h Mon Aug  6 10:10:17
2012
@@ -44,9 +44,103 @@
 namespace Hedwig {
   const int DEFAULT_SYNC_REQUEST_TIMEOUT = 5000;
 
+  template<class R>
+  class SyncCallback : public Callback<R> {
+  public:
+    SyncCallback(int timeout) : response(PENDING), timeout(timeout) {}
+    virtual void operationComplete(const R& r) {
+      if (response == TIMEOUT) {
+        return;
+      }
+
+      {
+        boost::lock_guard<boost::mutex> lock(mut);
+        response = SUCCESS;
+        result = r;
+      }
+      cond.notify_all();
+    }
+
+    virtual void operationFailed(const std::exception& exception) {
+      if (response == TIMEOUT) {
+        return;
+      }
+
+      {
+        boost::lock_guard<boost::mutex> lock(mut);
+
+        if (typeid(exception) == typeid(ChannelConnectException)) {
+          response = NOCONNECT;
+        } else if (typeid(exception) == typeid(ServiceDownException)) {
+          response = SERVICEDOWN;
+        } else if (typeid(exception) == typeid(AlreadySubscribedException)) {
+          response = ALREADY_SUBSCRIBED;
+        } else if (typeid(exception) == typeid(NotSubscribedException)) {
+          response = NOT_SUBSCRIBED;
+        } else {
+          response = UNKNOWN;
+        }
+      }
+      cond.notify_all();
+    }
+
+    void wait() {
+      boost::unique_lock<boost::mutex> lock(mut);
+      while(response==PENDING) {
+        if (cond.timed_wait(lock, boost::posix_time::milliseconds(timeout)) == false) {
+          response = TIMEOUT;
+        }
+      }
+    }
+
+    void throwExceptionIfNeeded() {
+      switch (response) {
+        case SUCCESS:
+          break;
+        case NOCONNECT:
+          throw CannotConnectException();
+          break;
+        case SERVICEDOWN:
+          throw ServiceDownException();
+          break;
+        case ALREADY_SUBSCRIBED:
+          throw AlreadySubscribedException();
+          break;
+        case NOT_SUBSCRIBED:
+          throw NotSubscribedException();
+          break;
+        case TIMEOUT:
+          throw ClientTimeoutException();
+          break;
+        default:
+          throw ClientException();
+          break;
+      }
+    }
+
+    R getResult() { return result; }
+    
+  private:
+    enum { 
+      PENDING, 
+      SUCCESS,
+      NOCONNECT,
+      SERVICEDOWN,
+      NOT_SUBSCRIBED,
+      ALREADY_SUBSCRIBED,
+      TIMEOUT,
+      UNKNOWN
+    } response;
+
+    boost::condition_variable cond;
+    boost::mutex mut;
+    int timeout;
+    R result;
+  };
+
   class SyncOperationCallback : public OperationCallback {
   public:
-  SyncOperationCallback(int timeout) : response(PENDING), timeout(timeout) {}
+    SyncOperationCallback(int timeout) : response(PENDING), timeout(timeout) {}
     virtual void operationComplete();
     virtual void operationFailed(const std::exception& exception);
     
@@ -64,7 +158,7 @@ namespace Hedwig {
       TIMEOUT,
       UNKNOWN
     } response;
-    
+
     boost::condition_variable cond;
     boost::mutex mut;
     int timeout;

Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/data.cpp
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/data.cpp?rev=1369767&r1=1369766&r2=1369767&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/data.cpp (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/data.cpp Mon Aug  6 10:10:17
2012
@@ -28,7 +28,8 @@ static log4cxx::LoggerPtr logger(log4cxx
 
 using namespace Hedwig;
 
-PubSubDataPtr PubSubData::forPublishRequest(long txnid, const std::string& topic, const
Message& body, const OperationCallbackPtr& callback) {
+PubSubDataPtr PubSubData::forPublishRequest(long txnid, const std::string& topic, const
Message& body,
+                                            const ResponseCallbackPtr& callback) {
   PubSubDataPtr ptr(new PubSubData());
   ptr->type = PUBLISH;
   ptr->txnid = txnid;
@@ -38,7 +39,8 @@ PubSubDataPtr PubSubData::forPublishRequ
   return ptr;
 }
 
-PubSubDataPtr PubSubData::forSubscribeRequest(long txnid, const std::string& subscriberid,
const std::string& topic, const OperationCallbackPtr& callback, const SubscriptionOptions&
options) {
+PubSubDataPtr PubSubData::forSubscribeRequest(long txnid, const std::string& subscriberid,
const std::string& topic,
+                                              const ResponseCallbackPtr& callback, const
SubscriptionOptions& options) {
   PubSubDataPtr ptr(new PubSubData());
   ptr->type = SUBSCRIBE;
   ptr->txnid = txnid;
@@ -49,7 +51,8 @@ PubSubDataPtr PubSubData::forSubscribeRe
   return ptr;  
 }
 
-PubSubDataPtr PubSubData::forUnsubscribeRequest(long txnid, const std::string& subscriberid,
const std::string& topic, const OperationCallbackPtr& callback) {
+PubSubDataPtr PubSubData::forUnsubscribeRequest(long txnid, const std::string& subscriberid,
const std::string& topic,
+                                                const ResponseCallbackPtr& callback)
{
   PubSubDataPtr ptr(new PubSubData());
   ptr->type = UNSUBSCRIBE;
   ptr->txnid = txnid;
@@ -155,11 +158,11 @@ void PubSubData::clearTriedServers() {
   triedservers.clear();
 }
 
-OperationCallbackPtr& PubSubData::getCallback() {
+ResponseCallbackPtr& PubSubData::getCallback() {
   return callback;
 }
 
-void PubSubData::setCallback(const OperationCallbackPtr& callback) {
+void PubSubData::setCallback(const ResponseCallbackPtr& callback) {
   this->callback = callback;
 }
 

Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/data.h
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/data.h?rev=1369767&r1=1369766&r2=1369767&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/data.h (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/data.h Mon Aug  6 10:10:17 2012
@@ -49,6 +49,9 @@ namespace Hedwig {
     boost::mutex mutex;
   };
 
+  typedef Callback<ResponseBody> ResponseCallback;
+  typedef std::tr1::shared_ptr<ResponseCallback> ResponseCallbackPtr;
+
   class PubSubData;
   typedef boost::shared_ptr<PubSubData> PubSubDataPtr;
   typedef boost::shared_ptr<PubSubRequest> PubSubRequestPtr;
@@ -61,10 +64,12 @@ namespace Hedwig {
   class PubSubData {
   public:
     // to be used for publish
-    static PubSubDataPtr forPublishRequest(long txnid, const std::string& topic, const
Message& body, const OperationCallbackPtr& callback);
+    static PubSubDataPtr forPublishRequest(long txnid, const std::string& topic, const
Message& body,
+                                           const ResponseCallbackPtr& callback);
     static PubSubDataPtr forSubscribeRequest(long txnid, const std::string& subscriberid,
const std::string& topic,
-					     const OperationCallbackPtr& callback, const SubscriptionOptions& options);
-    static PubSubDataPtr forUnsubscribeRequest(long txnid, const std::string& subscriberid,
const std::string& topic, const OperationCallbackPtr& callback);
+                                             const ResponseCallbackPtr& callback, const
SubscriptionOptions& options);
+    static PubSubDataPtr forUnsubscribeRequest(long txnid, const std::string& subscriberid,
const std::string& topic,
+                                               const ResponseCallbackPtr& callback);
     static PubSubDataPtr forConsumeRequest(long txnid, const std::string& subscriberid,
const std::string& topic, const MessageSeqId msgid);
 
     ~PubSubData();
@@ -80,8 +85,8 @@ namespace Hedwig {
     void setMessageBound(int messageBound);
 
     const PubSubRequestPtr getRequest();
-    void setCallback(const OperationCallbackPtr& callback);
-    OperationCallbackPtr& getCallback();
+    void setCallback(const ResponseCallbackPtr& callback);
+    ResponseCallbackPtr& getCallback();
     const SubscriptionOptions& getSubscriptionOptions() const;
 
     void addTriedServer(HostAddress& h);
@@ -98,7 +103,7 @@ namespace Hedwig {
     Message body;
     bool shouldClaim;
     int messageBound;
-    OperationCallbackPtr callback;
+    ResponseCallbackPtr callback;
     SubscriptionOptions options;
     MessageSeqId msgid;
     std::tr1::unordered_set<HostAddress, HostAddressHash > triedservers;

Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/publisherimpl.cpp
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/publisherimpl.cpp?rev=1369767&r1=1369766&r2=1369767&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/publisherimpl.cpp (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/publisherimpl.cpp Mon Aug  6
10:10:17 2012
@@ -28,6 +28,26 @@ static log4cxx::LoggerPtr logger(log4cxx
 
 using namespace Hedwig;
 
+PublishResponseAdaptor::PublishResponseAdaptor(const PublishResponseCallbackPtr& pubCallback)
+  : pubCallback(pubCallback) {
+}
+
+void PublishResponseAdaptor::operationComplete(const ResponseBody& result) {
+  if (result.has_publishresponse()) {
+    PublishResponse *resp = new PublishResponse();
+    resp->CopyFrom(result.publishresponse());
+    PublishResponsePtr respPtr(resp);
+    pubCallback->operationComplete(respPtr);
+  } else {
+    // return empty response
+    pubCallback->operationComplete(PublishResponsePtr());
+  }
+}
+
+void PublishResponseAdaptor::operationFailed(const std::exception& exception) {
+  pubCallback->operationFailed(exception);
+}
+
 PublishWriteCallback::PublishWriteCallback(const ClientImplPtr& client, const PubSubDataPtr&
data) : client(client), data(data) {}
 
 void PublishWriteCallback::operationComplete() {
@@ -44,28 +64,28 @@ PublisherImpl::PublisherImpl(const Clien
   : client(client) {
 }
 
-void PublisherImpl::publish(const std::string& topic, const Message& message) {
-  SyncOperationCallback* cb = new SyncOperationCallback(client->getConfiguration().getInt(Configuration::SYNC_REQUEST_TIMEOUT,

-											  DEFAULT_SYNC_REQUEST_TIMEOUT));
-  OperationCallbackPtr callback(cb);
-  asyncPublish(topic, message, callback);
+PublishResponsePtr PublisherImpl::publish(const std::string& topic, const Message&
message) {
+  SyncCallback<PublishResponsePtr>* cb =
+    new SyncCallback<PublishResponsePtr>(client->getConfiguration().getInt(Configuration::SYNC_REQUEST_TIMEOUT,

+											                                                     DEFAULT_SYNC_REQUEST_TIMEOUT));
+  PublishResponseCallbackPtr callback(cb);
+  asyncPublishWithResponse(topic, message, callback);
   cb->wait();
   
   cb->throwExceptionIfNeeded();  
+  return cb->getResult();
 }
 
-void PublisherImpl::publish(const std::string& topic, const std::string& message)
{
+PublishResponsePtr PublisherImpl::publish(const std::string& topic, const std::string&
message) {
   Message msg;
   msg.set_body(message);
-  publish(topic, msg);
+  return publish(topic, msg);
 }
 
 void PublisherImpl::asyncPublish(const std::string& topic, const Message& message,
const OperationCallbackPtr& callback) {
   // use release after callback to release the channel after the callback is called
-  PubSubDataPtr data = PubSubData::forPublishRequest(client->counter().next(), topic,
message, callback);
-  
-  DuplexChannelPtr channel = client->getChannel(topic);
-  doPublish(channel, data);
+  ResponseCallbackPtr respCallback(new ResponseCallbackAdaptor(callback));
+  doPublish(topic, message, respCallback);
 }
 
 void PublisherImpl::asyncPublish(const std::string& topic, const std::string& message,
const OperationCallbackPtr& callback) {
@@ -74,6 +94,20 @@ void PublisherImpl::asyncPublish(const s
   asyncPublish(topic, msg, callback);
 }
 
+void PublisherImpl::asyncPublishWithResponse(const std::string& topic, const Message&
message,
+                                             const PublishResponseCallbackPtr& callback)
{
+  ResponseCallbackPtr respCallback(new PublishResponseAdaptor(callback));
+  doPublish(topic, message, respCallback);
+}
+
+void PublisherImpl::doPublish(const std::string& topic, const Message& message, const
ResponseCallbackPtr& callback) {
+  PubSubDataPtr data = PubSubData::forPublishRequest(client->counter().next(), topic,
message, callback);
+  
+  DuplexChannelPtr channel = client->getChannel(topic);
+
+  doPublish(channel, data);
+}
+
 void PublisherImpl::doPublish(const DuplexChannelPtr& channel, const PubSubDataPtr&
data) {
   channel->storeTransaction(data);
   
@@ -84,7 +118,11 @@ void PublisherImpl::doPublish(const Dupl
 void PublisherImpl::messageHandler(const PubSubResponsePtr& m, const PubSubDataPtr&
txn) {
   switch (m->statuscode()) {
   case SUCCESS:
-    txn->getCallback()->operationComplete();
+    if (m->has_responsebody()) {
+      txn->getCallback()->operationComplete(m->responsebody());
+    } else {
+      txn->getCallback()->operationComplete(ResponseBody());
+    }
     break;
   case SERVICE_DOWN:
     LOG4CXX_ERROR(logger, "Server responsed with SERVICE_DOWN for " << txn->getTxnId());

Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/publisherimpl.h
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/publisherimpl.h?rev=1369767&r1=1369766&r2=1369767&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/publisherimpl.h (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/publisherimpl.h Mon Aug  6 10:10:17
2012
@@ -21,8 +21,19 @@
 #include <hedwig/publish.h>
 #include <hedwig/callback.h>
 #include "clientimpl.h"
+#include "data.h"
 
 namespace Hedwig {
+  class PublishResponseAdaptor : public ResponseCallback {
+  public:
+    PublishResponseAdaptor(const PublishResponseCallbackPtr& pubCallback);
+
+    void operationComplete(const ResponseBody & result);
+    void operationFailed(const std::exception& exception);
+  private:
+    PublishResponseCallbackPtr pubCallback;
+  };
+
   class PublishWriteCallback : public OperationCallback {
   public:
     PublishWriteCallback(const ClientImplPtr& client, const PubSubDataPtr& data);
@@ -38,14 +49,17 @@ namespace Hedwig {
   public:
     PublisherImpl(const ClientImplPtr& client);
 
-    void publish(const std::string& topic, const std::string& message);
-    void publish(const std::string& topic, const Message& message);
+    PublishResponsePtr publish(const std::string& topic, const std::string& message);
+    PublishResponsePtr publish(const std::string& topic, const Message& message);
 
     void asyncPublish(const std::string& topic, const std::string& message, const
OperationCallbackPtr& callback);
     void asyncPublish(const std::string& topic, const Message& message, const OperationCallbackPtr&
callback);
+    void asyncPublishWithResponse(const std::string& topic, const Message& messsage,
+                                  const PublishResponseCallbackPtr& callback);
     
     void messageHandler(const PubSubResponsePtr& m, const PubSubDataPtr& txn);
 
+    void doPublish(const std::string& topic, const Message& message, const ResponseCallbackPtr&
callback);
     void doPublish(const DuplexChannelPtr& channel, const PubSubDataPtr& data);
 
   private:

Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/subscriberimpl.cpp
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/subscriberimpl.cpp?rev=1369767&r1=1369766&r2=1369767&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/subscriberimpl.cpp (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/subscriberimpl.cpp Mon Aug 
6 10:10:17 2012
@@ -137,7 +137,7 @@ SubscriberReconnectCallback::SubscriberR
   : client(client), origData(origData) {
 }
 
-void SubscriberReconnectCallback::operationComplete() {
+void SubscriberReconnectCallback::operationComplete(const ResponseBody & resp) {
 }
 
 void SubscriberReconnectCallback::operationFailed(const std::exception& exception) {
@@ -217,7 +217,7 @@ void SubscriberClientChannelHandler::cha
 
   // setup pubsub data for reconnection attempt
   origData->clearTriedServers();
-  OperationCallbackPtr newcallback(new SubscriberReconnectCallback(client, origData));
+  ResponseCallbackPtr newcallback(new SubscriberReconnectCallback(client, origData));
   origData->setCallback(newcallback);
 
   // Create a new handler for the new channel
@@ -329,7 +329,9 @@ void SubscriberImpl::asyncSubscribe(cons
     options2.set_messagebound(messageBound);
   }
 
-  PubSubDataPtr data = PubSubData::forSubscribeRequest(client->counter().next(), subscriberId,
topic, callback, options2);
+  ResponseCallbackPtr respCallback(new ResponseCallbackAdaptor(callback));
+  PubSubDataPtr data = PubSubData::forSubscribeRequest(client->counter().next(), subscriberId,
topic,
+                                                       respCallback, options2);
 
   SubscriberClientChannelHandlerPtr handler(new SubscriberClientChannelHandler(client, *this,
data));
   ChannelHandlerPtr baseptr = handler;
@@ -369,7 +371,8 @@ void SubscriberImpl::unsubscribe(const s
 void SubscriberImpl::asyncUnsubscribe(const std::string& topic, const std::string&
subscriberId, const OperationCallbackPtr& callback) {
   closeSubscription(topic, subscriberId);
 
-  PubSubDataPtr data = PubSubData::forUnsubscribeRequest(client->counter().next(), subscriberId,
topic, callback);
+  ResponseCallbackPtr respCallback(new ResponseCallbackAdaptor(callback));
+  PubSubDataPtr data = PubSubData::forUnsubscribeRequest(client->counter().next(), subscriberId,
topic, respCallback);
   
   DuplexChannelPtr channel = client->getChannel(topic);
   doUnsubscribe(channel, data);
@@ -456,7 +459,11 @@ void SubscriberImpl::messageHandler(cons
 
   switch (m->statuscode()) {
   case SUCCESS:
-    txn->getCallback()->operationComplete();
+    if (m->has_responsebody()) {
+      txn->getCallback()->operationComplete(m->responsebody());
+    } else {
+      txn->getCallback()->operationComplete(ResponseBody());
+    }
     break;
   case SERVICE_DOWN:
     txn->getCallback()->operationFailed(ServiceDownException());

Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/subscriberimpl.h
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/subscriberimpl.h?rev=1369767&r1=1369766&r2=1369767&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/subscriberimpl.h (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/subscriberimpl.h Mon Aug  6
10:10:17 2012
@@ -72,11 +72,11 @@ namespace Hedwig {
     const PubSubDataPtr data;
     };
 
-  class SubscriberReconnectCallback : public OperationCallback {
+  class SubscriberReconnectCallback : public ResponseCallback {
   public: 
     SubscriberReconnectCallback(const ClientImplPtr& client, const PubSubDataPtr&
origData);
 
-    void operationComplete();
+    void operationComplete(const ResponseBody & resp);
     void operationFailed(const std::exception& exception);
   private:
     const ClientImplPtr client;

Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/util.cpp
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/util.cpp?rev=1369767&r1=1369766&r2=1369767&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/util.cpp (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/util.cpp Mon Aug  6 10:10:17
2012
@@ -143,3 +143,14 @@ HostAddress HostAddress::fromString(std:
   return h;
 }
 
+ResponseCallbackAdaptor::ResponseCallbackAdaptor(const OperationCallbackPtr& opCallbackPtr)
+  : opCallbackPtr(opCallbackPtr) {
+}
+
+void ResponseCallbackAdaptor::operationComplete(const ResponseBody& response) {
+  opCallbackPtr->operationComplete();
+}
+
+void ResponseCallbackAdaptor::operationFailed(const std::exception& exception) {
+  opCallbackPtr->operationFailed(exception);
+}

Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/util.h
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/util.h?rev=1369767&r1=1369766&r2=1369767&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/util.h (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/util.h Mon Aug  6 10:10:17 2012
@@ -67,6 +67,19 @@ namespace Hedwig {
   };
 
   /**
+   * An adaptor for OperationCallback
+   */
+  class ResponseCallbackAdaptor : public Callback<ResponseBody> {
+  public:
+    ResponseCallbackAdaptor(const OperationCallbackPtr& opCallbackPtr);
+
+    virtual void operationComplete(const ResponseBody& response);
+    virtual void operationFailed(const std::exception& exception);
+  private:
+    OperationCallbackPtr opCallbackPtr;
+  };
+
+  /**
      Hash a host address. Takes the least significant 16-bits of the address and the 16-bits
of the
      port and packs them into one 32-bit number. While collisons are theoretically very possible,
they
      shouldn't happen as the hedwig servers should be in the same subnet.

Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/publishtest.cpp
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/publishtest.cpp?rev=1369767&r1=1369766&r2=1369767&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/publishtest.cpp (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/publishtest.cpp Mon Aug  6
10:10:17 2012
@@ -67,6 +67,22 @@ TEST(PublishTest, testSyncPublish) {
   delete conf;
 }
 
+TEST(PublishTest, testSyncPublishWithResponse) {
+  Hedwig::Configuration* conf = new TestServerConfiguration();
+  
+  Hedwig::Client* client = new Hedwig::Client(*conf);
+  Hedwig::Publisher& pub = client->getPublisher();
+
+  int numMsgs = 20;
+  for(int i=1; i<=numMsgs; i++) {
+    Hedwig::PublishResponsePtr pubResponse = pub.publish("testSyncPublishWithResponse", "testMessage
" + i);
+    ASSERT_EQ(i, (int)pubResponse->publishedmsgid().localcomponent());
+  }
+  
+  delete client;
+  delete conf;
+}
+
 TEST(PublishTest, testAsyncPublish) {
   SimpleWaitCondition* cond = new SimpleWaitCondition();
 
@@ -86,6 +102,32 @@ TEST(PublishTest, testAsyncPublish) {
   delete conf;
 }
 
+TEST(PublishTest, testAsyncPublishWithResponse) {
+  Hedwig::Configuration* conf = new TestServerConfiguration();
+  Hedwig::Client* client = new Hedwig::Client(*conf);
+  Hedwig::Publisher& pub = client->getPublisher();
+
+  int numMsgs = 20;
+  for (int i=1; i<=numMsgs; i++) {
+    SimpleWaitCondition* cond = new SimpleWaitCondition();
+    TestPublishResponseCallback* callback =
+      new TestPublishResponseCallback(cond);
+    Hedwig::PublishResponseCallbackPtr testcb(callback);
+    Hedwig::Message asyncMsg;
+    asyncMsg.set_body("testAsyncPublishWithResponse-" + i);
+    pub.asyncPublishWithResponse("testAsyncPublishWithResponse", asyncMsg, testcb);
+    
+    cond->wait();
+
+    ASSERT_TRUE(cond->wasSuccess());
+    ASSERT_EQ(i, (int)callback->getResponse()->publishedmsgid().localcomponent());
+
+    delete cond;
+  }
+  delete client;
+  delete conf;
+}
+
 TEST(PublishTest, testMultipleAsyncPublish) {
   SimpleWaitCondition* cond1 = new SimpleWaitCondition();
   SimpleWaitCondition* cond2 = new SimpleWaitCondition();

Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/util.h
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/util.h?rev=1369767&r1=1369766&r2=1369767&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/util.h (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/util.h Mon Aug  6 10:10:17
2012
@@ -63,6 +63,32 @@ private:
   bool success;
 };
 
+class TestPublishResponseCallback : public Hedwig::PublishResponseCallback {
+public:
+  TestPublishResponseCallback(SimpleWaitCondition* cond) : cond(cond) {
+  }
+
+  virtual void operationComplete(const Hedwig::PublishResponsePtr & resp) {
+    LOG4CXX_DEBUG(utillogger, "operationComplete");
+    pubResp = resp;
+    cond->setSuccess(true);
+    cond->notify();
+  }
+  
+  virtual void operationFailed(const std::exception& exception) {
+    LOG4CXX_DEBUG(utillogger, "operationFailed: " << exception.what());
+    cond->setSuccess(false);
+    cond->notify();
+  }    
+
+  Hedwig::PublishResponsePtr getResponse() {
+    return pubResp;
+  }
+private:
+  SimpleWaitCondition *cond;
+  Hedwig::PublishResponsePtr pubResp;
+};
+
 class TestCallback : public Hedwig::OperationCallback {
 public:
   TestCallback(SimpleWaitCondition* cond) 



Mime
View raw message