hadoop-zookeeper-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From br...@apache.org
Subject svn commit: r1021463 [1/2] - in /hadoop/zookeeper/trunk: ./ src/contrib/hedwig/client/src/main/cpp/ src/contrib/hedwig/client/src/main/cpp/inc/hedwig/ src/contrib/hedwig/client/src/main/cpp/lib/ src/contrib/hedwig/client/src/main/cpp/m4/ src/contrib/he...
Date Mon, 11 Oct 2010 19:00:43 GMT
Author: breed
Date: Mon Oct 11 19:00:42 2010
New Revision: 1021463

URL: http://svn.apache.org/viewvc?rev=1021463&view=rev
Log:
ZOOKEEPER-864. Hedwig C++ client improvements

Added:
    hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/lib/data.cpp
    hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/lib/eventdispatcher.cpp
    hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/lib/eventdispatcher.h
    hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/m4/ax_boost_asio.m4
    hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/m4/ax_boost_base.m4
    hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/m4/ax_boost_thread.m4
    hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/scripts/
    hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/scripts/log4cpp.conf
    hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/scripts/network-delays.sh
    hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/scripts/server-control.sh
    hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/scripts/tester.sh
Modified:
    hadoop/zookeeper/trunk/CHANGES.txt
    hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/Makefile.am
    hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/configure.ac
    hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/hedwig-0.1.pc.in
    hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/inc/hedwig/client.h
    hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/inc/hedwig/exceptions.h
    hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/inc/hedwig/publish.h
    hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/inc/hedwig/subscribe.h
    hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/lib/Makefile.am
    hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/lib/channel.cpp
    hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/lib/channel.h
    hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/lib/client.cpp
    hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/lib/clientimpl.cpp
    hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/lib/clientimpl.h
    hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/lib/data.h
    hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/lib/publisherimpl.cpp
    hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/lib/publisherimpl.h
    hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/lib/subscriberimpl.cpp
    hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/lib/subscriberimpl.h
    hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/lib/util.cpp
    hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/lib/util.h
    hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/test/Makefile.am
    hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/test/main.cpp
    hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/test/publishtest.cpp
    hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/test/pubsubtest.cpp
    hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/test/servercontrol.cpp
    hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/test/servercontrol.h
    hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/test/subscribetest.cpp
    hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/test/util.h
    hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/test/java/org/apache/hedwig/ServerControlDaemon.java

Modified: hadoop/zookeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/CHANGES.txt?rev=1021463&r1=1021462&r2=1021463&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/CHANGES.txt (original)
+++ hadoop/zookeeper/trunk/CHANGES.txt Mon Oct 11 19:00:42 2010
@@ -144,6 +144,8 @@ IMPROVEMENTS:
   ZOOKEEPER-853. Make zookeeper.is_unrecoverable return True or False
   in zkpython (Andrei Savu via henryr)
 
+  ZOOKEEPER-864. Hedwig C++ client improvements (Ivan Kelly via breed)
+
 NEW FEATURES:
   ZOOKEEPER-729. Java client API to recursively delete a subtree.
   (Kay Kay via henry)

Modified: hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/Makefile.am
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/Makefile.am?rev=1021463&r1=1021462&r2=1021463&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/Makefile.am (original)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/Makefile.am Mon Oct 11 19:00:42 2010
@@ -1,3 +1,21 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
 ACLOCAL_AMFLAGS = -I m4
 
 SUBDIRS = lib test
@@ -8,6 +26,4 @@ library_include_HEADERS = inc/hedwig/cal
 pkgconfigdir = $(libdir)/pkgconfig
 nodist_pkgconfig_DATA = hedwig-0.1.pc
 
-include aminclude.am
-
 EXTRA_DIST = $(DX_CONFIG) doc/html

Modified: hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/configure.ac
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/configure.ac?rev=1021463&r1=1021462&r2=1021463&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/configure.ac (original)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/configure.ac Mon Oct 11 19:00:42 2010
@@ -1,18 +1,40 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
 AC_INIT([Hedwig C++ Client], [0.1], [zookeeper-dev@hadoop.apache.org], [hedwig-cpp], [http://hadoop.apache.org/zookeeper//])
 
 AC_PREREQ([2.59])
-AM_INIT_AUTOMAKE([1.10 no-define foreign])
+AM_INIT_AUTOMAKE([1.9 no-define foreign])
 AC_CONFIG_HEADERS([config.h])
 AC_PROG_CXX
+AC_LANG([C++])
 AC_CONFIG_FILES([Makefile lib/Makefile test/Makefile hedwig-0.1.pc])
 AC_PROG_LIBTOOL
 AC_CONFIG_MACRO_DIR([m4])
-PKG_CHECK_MODULES([DEPS], [log4cpp >= 0.23 protobuf >= 2.3.0 cppunit])
+PKG_CHECK_MODULES([DEPS], [log4cpp protobuf cppunit])
+AX_BOOST_BASE
+AX_BOOST_ASIO	  
+AX_BOOST_THREAD
 
 DX_HTML_FEATURE(ON)
 DX_INIT_DOXYGEN(hedwig-c++, c-doc.Doxyfile, doc)
 
-CXXFLAGS="$CXXFLAGS -fno-inline"
+CXXFLAGS="$CXXFLAGS -Wall"
 
 AC_OUTPUT
 

Modified: hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/hedwig-0.1.pc.in
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/hedwig-0.1.pc.in?rev=1021463&r1=1021462&r2=1021463&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/hedwig-0.1.pc.in (original)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/hedwig-0.1.pc.in Mon Oct 11 19:00:42 2010
@@ -1,3 +1,21 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
 prefix=@prefix@
 exec_prefix=@exec_prefix@
 libdir=@libdir@

Modified: hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/inc/hedwig/client.h
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/inc/hedwig/client.h?rev=1021463&r1=1021462&r2=1021463&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/inc/hedwig/client.h (original)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/inc/hedwig/client.h Mon Oct 11 19:00:42 2010
@@ -25,17 +25,29 @@
 #include <hedwig/publish.h>
 #include <hedwig/exceptions.h>
 #include <boost/noncopyable.hpp>
+#include <boost/shared_ptr.hpp>
 
 namespace Hedwig {
 
   class ClientImpl;
-  typedef std::tr1::shared_ptr<ClientImpl> ClientImplPtr;
+  typedef boost::shared_ptr<ClientImpl> ClientImplPtr;
 
   class Configuration {
   public:
+    static const std::string DEFAULT_SERVER;
+    static const std::string MESSAGE_CONSUME_RETRY_WAIT_TIME;
+    static const std::string SUBSCRIBER_CONSUME_RETRY_WAIT_TIME;
+    static const std::string MAX_MESSAGE_QUEUE_SIZE;
+    static const std::string RECONNECT_SUBSCRIBE_RETRY_WAIT_TIME;
+    static const std::string SYNC_REQUEST_TIMEOUT;
+
+  public:
     Configuration() {};
+    virtual int getInt(const std::string& key, int defaultVal) const = 0;
+    virtual const std::string get(const std::string& key, const std::string& defaultVal) const = 0;
+    virtual bool getBool(const std::string& key, bool defaultVal) const = 0;
 
-    virtual const std::string& getDefaultServer() const;    
+    virtual ~Configuration() {}
   };
 
   /** 

Modified: hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/inc/hedwig/exceptions.h
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/inc/hedwig/exceptions.h?rev=1021463&r1=1021462&r2=1021463&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/inc/hedwig/exceptions.h (original)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/inc/hedwig/exceptions.h Mon Oct 11 19:00:42 2010
@@ -24,6 +24,8 @@ namespace Hedwig {
 
   class ClientException : public std::exception { };
 
+  class ClientTimeoutException : public ClientException {};
+
   class ServiceDownException : public ClientException {};
   class CannotConnectException : public ClientException {};
   class UnexpectedResponseException : public ClientException {};

Modified: hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/inc/hedwig/publish.h
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/inc/hedwig/publish.h?rev=1021463&r1=1021462&r2=1021463&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/inc/hedwig/publish.h (original)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/inc/hedwig/publish.h Mon Oct 11 19:00:42 2010
@@ -53,6 +53,8 @@ namespace Hedwig {
 	@param callback Callback which will be used to report success or failure. Success is only reported once the server replies with an ACK response to the publication.
     */
     virtual void asyncPublish(const std::string& topic, const std::string& message, const OperationCallbackPtr& callback) = 0;
+    
+    virtual ~Publisher() {}
   };
 };
 

Modified: hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/inc/hedwig/subscribe.h
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/inc/hedwig/subscribe.h?rev=1021463&r1=1021462&r2=1021463&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/inc/hedwig/subscribe.h (original)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/inc/hedwig/subscribe.h Mon Oct 11 19:00:42 2010
@@ -44,6 +44,8 @@ namespace Hedwig {
     virtual void stopDelivery(const std::string& topic, const std::string& subscriberId) = 0;
 
     virtual void closeSubscription(const std::string& topic, const std::string& subscriberId) = 0;
+
+    virtual ~Subscriber() {}
   };
 };
 

Modified: hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/lib/Makefile.am
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/lib/Makefile.am?rev=1021463&r1=1021462&r2=1021463&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/lib/Makefile.am (original)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/lib/Makefile.am Mon Oct 11 19:00:42 2010
@@ -1,14 +1,32 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
 PROTODEF = ../../../../../protocol/src/main/protobuf/PubSubProtocol.proto
 
 lib_LTLIBRARIES = libhedwig01.la
-libhedwig01_la_SOURCES = protocol.cpp channel.cpp client.cpp util.cpp exceptions.cpp clientimpl.cpp publisherimpl.cpp subscriberimpl.cpp
-libhedwig01_la_CPPFLAGS = -I../inc $(DEPS_CFLAGS)
-libhedwig01_la_LIBADD = $(DEPS_LIBS) 
-libhedwig01_la_LDFLAGS = -no-undefined
+libhedwig01_la_SOURCES = protocol.cpp channel.cpp client.cpp util.cpp clientimpl.cpp publisherimpl.cpp subscriberimpl.cpp eventdispatcher.cpp data.cpp
+libhedwig01_la_CPPFLAGS = -I$(top_srcdir)/inc $(DEPS_CFLAGS)
+libhedwig01_la_LIBADD = $(DEPS_LIBS) $(BOOST_CPPFLAGS) 
+libhedwig01_la_LDFLAGS = -no-undefined $(BOOST_ASIO_LIB) $(BOOST_LDFLAGS) $(BOOST_THREAD_LIB)
 
 protocol.cpp: $(PROTODEF)
 	protoc --cpp_out=. -I`dirname $(PROTODEF)` $(PROTODEF)
-	mv PubSubProtocol.pb.cc protocol.cpp
-	sed -i "s/PubSubProtocol.pb.h/hedwig\/protocol.h/" protocol.cpp
-	mv PubSubProtocol.pb.h ../inc/hedwig/protocol.h
+	sed "s/PubSubProtocol.pb.h/hedwig\/protocol.h/" PubSubProtocol.pb.cc > protocol.cpp
+	rm PubSubProtocol.pb.cc
+	mv PubSubProtocol.pb.h $(top_srcdir)/inc/hedwig/protocol.h
 

Modified: hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/lib/channel.cpp
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/lib/channel.cpp?rev=1021463&r1=1021462&r2=1021463&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/lib/channel.cpp (original)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/lib/channel.cpp Mon Oct 11 19:00:42 2010
@@ -20,6 +20,8 @@
 #include <netinet/in.h>
 #include <arpa/inet.h>
 #include <netinet/tcp.h>
+#include <poll.h>
+#include <iostream>
 
 #include <stdio.h>
 #include <stdlib.h>
@@ -34,403 +36,409 @@
 #include "clientimpl.h"
 
 #include <log4cpp/Category.hh>
+#include <google/protobuf/io/zero_copy_stream_impl.h>
 
 static log4cpp::Category &LOG = log4cpp::Category::getInstance("hedwig."__FILE__);
 
-const int MAX_MESSAGE_SIZE = 2*1024*1024; // 2 Meg
-
 using namespace Hedwig;
 
-namespace Hedwig {
+DuplexChannel::DuplexChannel(EventDispatcher& dispatcher, const HostAddress& addr, 
+			     const Configuration& cfg, const ChannelHandlerPtr& handler)
+  : dispatcher(dispatcher), address(addr), handler(handler), 
+    socket(dispatcher.getService()), instream(&in_buf), copy_buf(NULL), copy_buf_length(0),
+    state(UNINITIALISED), receiving(false), sending(false)
+{
+  if (LOG.isDebugEnabled()) {
+    LOG.debugStream() << "Creating DuplexChannel(" << this << ")";
+  }
+}
 
-  class RunnableThread {  
-  public:
-    RunnableThread(DuplexChannel& channel, const ChannelHandlerPtr& handler);
-    virtual ~RunnableThread();
-    virtual void entryPoint() = 0;
-    
-    void run();
-    virtual void kill();
-    
-  protected:
-    DuplexChannel& channel;
-    ChannelHandlerPtr handler;
-    pthread_t thread;
-    pthread_attr_t attr;
-  };
+/*static*/ void DuplexChannel::connectCallbackHandler(DuplexChannelPtr channel,
+						      const boost::system::error_code& error) {
+  if (LOG.isDebugEnabled()) {
+    LOG.debugStream() << "DuplexChannel::connectCallbackHandler error(" << error 
+		      << ") channel(" << channel.get() << ")";
+  }
+
+  if (error) {
+    channel->channelDisconnected(ChannelConnectException());
+    channel->setState(DEAD);
+    return;
+  }
+
+  channel->setState(CONNECTED);
+
+  boost::system::error_code ec;
+  boost::asio::ip::tcp::no_delay option(true);
+
+  channel->socket.set_option(option, ec);
+  if (ec) {
+    channel->channelDisconnected(ChannelSetupException());
+    channel->setState(DEAD);
+    return;
+  } 
   
-  typedef std::pair<const PubSubRequest*, OperationCallbackPtr> RequestPair;
+  channel->startSending();
+  channel->startReceiving();
+}
+
+void DuplexChannel::connect() {  
+  setState(CONNECTING);
+
+  boost::asio::ip::tcp::endpoint endp(boost::asio::ip::address_v4(address.ip()), address.port());
+  boost::system::error_code error = boost::asio::error::host_not_found;
+
+  socket.async_connect(endp, boost::bind(&DuplexChannel::connectCallbackHandler, 
+					 shared_from_this(), 
+					 boost::asio::placeholders::error)); 
+}
+
+/*static*/ void DuplexChannel::messageReadCallbackHandler(DuplexChannelPtr channel, 
+							  std::size_t message_size,
+							  const boost::system::error_code& error, 
+							  std::size_t bytes_transferred) {
+  if (LOG.isDebugEnabled()) {
+    LOG.debugStream() << "DuplexChannel::messageReadCallbackHandler " << error << ", " 
+		      << bytes_transferred << " channel(" << channel.get() << ")";
+  }
 
-  class PacketsAvailableCondition : public WaitConditionBase {
-  public:
-    PacketsAvailableCondition(std::deque<RequestPair>& queue) : queue(queue), dead(false) {
+  if (error) {
+    LOG.errorStream() << "Invalid read error (" << error << ") bytes_transferred (" 
+		      << bytes_transferred << ") channel(" << channel.get() << ")";
+    channel->channelDisconnected(ChannelReadException());
+    return;
+  }
+
+  if (channel->copy_buf_length < message_size) {
+    channel->copy_buf_length = message_size;
+    channel->copy_buf = (char*)realloc(channel->copy_buf, channel->copy_buf_length);
+    if (channel->copy_buf == NULL) {
+      LOG.errorStream() << "Error allocating buffer. channel(" << channel.get() << ")";
+      return;
     }
+  }
+  
+  channel->instream.read(channel->copy_buf, message_size);
+  PubSubResponsePtr response(new PubSubResponse());
+  bool err = response->ParseFromArray(channel->copy_buf, message_size);
 
-    ~PacketsAvailableCondition() { wait(); }
 
-    bool isTrue() { return dead || !queue.empty(); }
-    void kill() { dead = true; }
+  if (!err) {
+    LOG.errorStream() << "Error parsing message. channel(" << channel.get() << ")";
 
-  private:
-    std::deque<RequestPair>& queue;
-    bool dead;
-  };
-
-  class WriteThread : public RunnableThread {
-  public: 
-    WriteThread(DuplexChannel& channel, int socketfd, const ChannelHandlerPtr& handler);
-    
-    void entryPoint();
-    void writeRequest(const PubSubRequest& m, const OperationCallbackPtr& callback);
-    virtual void kill();
+    channel->channelDisconnected(ChannelReadException());
+    return;
+  } else if (LOG.isDebugEnabled()) {
+    LOG.debugStream() << "channel(" << channel.get() << ") : " << channel->in_buf.size() 
+		      << " bytes left in buffer";
+  }
 
-    ~WriteThread();
-    
-  private:
-    int socketfd;
+  ChannelHandlerPtr h;
+  {
+    boost::shared_lock<boost::shared_mutex> lock(channel->destruction_lock);
+    if (channel->handler.get()) {
+      h = channel->handler;
+    }
+  }
+  if (h.get()) {
+    h->messageReceived(channel, response);
+  }
 
-    PacketsAvailableCondition packetsAvailableWaitCondition;
-    Mutex queueMutex;
-    std::deque<RequestPair> requestQueue;
-    bool dead;
-  };
-  
-  class ReadThread : public RunnableThread {
-  public:
-    ReadThread(DuplexChannel& channel, int socketfd, const ChannelHandlerPtr& handler);
-    
-    void entryPoint();
-    
-    ~ReadThread();
-    
-  private:    
-    int socketfd;
-  };
+  DuplexChannel::readSize(channel);
 }
 
-DuplexChannel::DuplexChannel(const HostAddress& addr, const Configuration& cfg, const ChannelHandlerPtr& handler)
-  : address(addr), handler(handler), writer(NULL), reader(NULL), socketfd(-1), state(UNINITIALISED), txnid2data_lock()
-{
+/*static*/ void DuplexChannel::sizeReadCallbackHandler(DuplexChannelPtr channel, 
+						       const boost::system::error_code& error, 
+						       std::size_t bytes_transferred) {
   if (LOG.isDebugEnabled()) {
-    LOG.debugStream() << "Creating DuplexChannel(" << this << ")";
+    LOG.debugStream() << "DuplexChannel::sizeReadCallbackHandler " << error << ", " 
+		      << bytes_transferred << " channel(" << channel.get() << ")";
+  }
+
+  if (error) {
+    LOG.errorStream() << "Invalid read error (" << error << ") bytes_transferred (" 
+		      << bytes_transferred << ") channel(" << channel.get() << ")";
+    channel->channelDisconnected(ChannelReadException());
+    return;
+  }
+  
+  if (channel->in_buf.size() < sizeof(uint32_t)) {
+    LOG.errorStream() << "Not enough data in stream. Must have been an error reading. " 
+		      << " Closing channel(" << channel.get() << ")";
+    channel->channelDisconnected(ChannelReadException());
+    return;
+  }
+
+  uint32_t size;
+  std::istream is(&channel->in_buf);
+  is.read((char*)&size, sizeof(uint32_t));
+  size = ntohl(size);
+
+  int toread = size - channel->in_buf.size();
+  if (LOG.isDebugEnabled()) {
+    LOG.debugStream() << " size of incoming message " << size << ", currently in buffer " 
+		      << channel->in_buf.size() << " channel(" << channel.get() << ")";
+  }
+  if (toread <= 0) {
+    DuplexChannel::messageReadCallbackHandler(channel, size, error, 0);
+  } else {
+    boost::asio::async_read(channel->socket, channel->in_buf,
+			    boost::asio::transfer_at_least(toread),
+			    boost::bind(&DuplexChannel::messageReadCallbackHandler, 
+					channel, size,
+					boost::asio::placeholders::error, 
+					boost::asio::placeholders::bytes_transferred));
   }
 }
 
-void DuplexChannel::connect() {
+/*static*/ void DuplexChannel::readSize(DuplexChannelPtr channel) {
+  if (!channel->isReceiving()) {
+    return;
+  }
+
+  int toread = sizeof(uint32_t) - channel->in_buf.size();
   if (LOG.isDebugEnabled()) {
-    LOG.debugStream() << "DuplexChannel(" << this << ")::connect " << address.getAddressString();
+    LOG.debugStream() << " size of incoming message " << sizeof(uint32_t) 
+		      << ", currently in buffer " << channel->in_buf.size() 
+		      << " channel(" << channel.get() << ")";
   }
 
+  if (toread < 0) {
+    DuplexChannel::sizeReadCallbackHandler(channel, boost::system::error_code(), 0);
+  } else {
+    //  in_buf_size.prepare(sizeof(uint32_t));
+    boost::asio::async_read(channel->socket, channel->in_buf, 
+			    boost::asio::transfer_at_least(sizeof(uint32_t)),
+			    boost::bind(&DuplexChannel::sizeReadCallbackHandler, 
+					channel, 
+					boost::asio::placeholders::error, 
+					boost::asio::placeholders::bytes_transferred));
+  }
+}
+
+void DuplexChannel::startReceiving() {
+  if (LOG.isDebugEnabled()) {
+    LOG.debugStream() << "DuplexChannel::startReceiving channel(" << this << ")";
+  }
 
-  socketfd = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
+  boost::lock_guard<boost::mutex> lock(receiving_lock);
+  receiving = true;
   
-  if (-1 == socketfd) {
-    LOG.errorStream() << "DuplexChannel(" << this << ") Unable to create socket";
+  DuplexChannel::readSize(shared_from_this());
+}
+
+bool DuplexChannel::isReceiving() {
+  return receiving;
+}
 
-    throw CannotCreateSocketException();
+void DuplexChannel::stopReceiving() {
+  if (LOG.isDebugEnabled()) {
+    LOG.debugStream() << "DuplexChannel::stopReceiving channel(" << this << ")";
   }
 
-  if (-1 == ::connect(socketfd, (const struct sockaddr *)&(address.socketAddress()), sizeof(struct sockaddr_in))) {
-    LOG.errorStream() << "DuplexChannel(" << this << ") Could not connect socket";
-    close(socketfd);
+  boost::lock_guard<boost::mutex> lock(receiving_lock);
+  receiving = false;
+}
 
-    throw CannotConnectException();
+void DuplexChannel::startSending() {
+  {
+    boost::shared_lock<boost::shared_mutex> lock(state_lock);
+    if (state != CONNECTED) {
+      return;
+    }
   }
 
+  boost::lock_guard<boost::mutex> lock(sending_lock);
+  if (sending) {
+    return;
+  }
+  if (LOG.isDebugEnabled()) {
+    LOG.debugStream() << "DuplexChannel::startSending channel(" << this << ")";
+  }
 
-  int flag = 1;
-  int res = 0;
-  if ((res = setsockopt(socketfd, IPPROTO_TCP, TCP_NODELAY, &flag, sizeof(int))) != 0){
-    close(socketfd);
-    LOG.errorStream() << "Error setting nodelay on (" << this << ") " << res;
-    throw ChannelSetupException();
+  WriteRequest w;
+  { 
+    boost::lock_guard<boost::mutex> lock(write_lock);
+    if (write_queue.empty()) {
+      return;
+    }
+    w = write_queue.front();
+    write_queue.pop_front();
   }
 
-  reader = new ReadThread(*this, socketfd, handler);
-  writer = new WriteThread(*this, socketfd, handler);
+  sending = true;
 
-  reader->run();
-  writer->run();
+  std::ostream os(&out_buf);
+  uint32_t size = htonl(w.first->ByteSize());
+  os.write((char*)&size, sizeof(uint32_t));
+  
+  bool err = w.first->SerializeToOstream(&os);
+  if (!err) {
+    w.second->operationFailed(ChannelWriteException());
+    channelDisconnected(ChannelWriteException());
+    return;
+  }
 
-  if (LOG.isDebugEnabled()) {
-    LOG.debugStream() << "DuplexChannel(" << this << ")::connect successful. Notifying handler.";
-  }    
-  state = CONNECTED;
-  handler->channelConnected(this);
+  boost::asio::async_write(socket, out_buf, 
+			   boost::bind(&DuplexChannel::writeCallbackHandler, 
+				       shared_from_this(), 
+				       w.second,
+				       boost::asio::placeholders::error, 
+				       boost::asio::placeholders::bytes_transferred));
 }
 
+
 const HostAddress& DuplexChannel::getHostAddress() const {
   return address;
 }
 
+void DuplexChannel::channelDisconnected(const std::exception& e) {
+  setState(DEAD);
+  
+  {
+    boost::lock_guard<boost::mutex> lock(write_lock);
+    while (!write_queue.empty()) {
+      WriteRequest w = write_queue.front();
+      write_queue.pop_front();
+      w.second->operationFailed(e);
+    }
+  }
+
+  ChannelHandlerPtr h;
+  {
+    boost::shared_lock<boost::shared_mutex> lock(destruction_lock);
+    if (handler.get()) {
+      h = handler;
+    }
+  }
+  if (h.get()) {
+    h->channelDisconnected(shared_from_this(), e);
+  }
+}
+
 void DuplexChannel::kill() {
   if (LOG.isDebugEnabled()) {
     LOG.debugStream() << "Killing duplex channel (" << this << ")";
   }    
+  
+  bool connected = false;
+  {
+    boost::shared_lock<boost::shared_mutex> statelock(state_lock);
+    connected = (state == CONNECTING || state == CONNECTED);
+  }
 
-  destruction_lock.lock();
-  if (state == CONNECTED) {
-    state = DEAD;
-    
-    destruction_lock.unlock();
+  boost::lock_guard<boost::shared_mutex> lock(destruction_lock);
+  if (connected) {
+    setState(DEAD);
     
-    if (socketfd != -1) {
-      shutdown(socketfd, SHUT_RDWR);
-    }
-    
-    if (writer) {
-      writer->kill();
-      delete writer;
-    }
-    if (reader) {
-      reader->kill();
-      delete reader;
-    }
-    if (socketfd != -1) {
-      close(socketfd);
-    }
-  } else {
-    destruction_lock.unlock();
+    socket.cancel();
+    socket.shutdown(boost::asio::ip::tcp::socket::shutdown_both);
+    socket.close();
   }
-  handler = ChannelHandlerPtr(); // clear the handler in case it ever referenced the channel
+  handler = ChannelHandlerPtr(); // clear the handler in case it ever referenced the channel*/
 }
 
 DuplexChannel::~DuplexChannel() {
   /** If we are going away, fail all transactions that haven't been completed */
   failAllTransactions();
   kill();
-
+  free(copy_buf);
+  copy_buf = NULL;
+  copy_buf_length = 0;
 
   if (LOG.isDebugEnabled()) {
     LOG.debugStream() << "Destroying DuplexChannel(" << this << ")";
-  }    
+  }
 }
 
-void DuplexChannel::writeRequest(const PubSubRequest& m, const OperationCallbackPtr& callback) {
-  if (state != CONNECTED) {
-    LOG.errorStream() << "Tried to write transaction [" << m.txnid() << "] to a channel [" << this << "] which is " << (state == DEAD ? "DEAD" : "UNINITIALISED");
-    callback->operationFailed(UninitialisedChannelException());
+/*static*/ void DuplexChannel::writeCallbackHandler(DuplexChannelPtr channel, OperationCallbackPtr callback,
+						    const boost::system::error_code& error, 
+						    std::size_t bytes_transferred) {
+  if (LOG.isDebugEnabled()) {
+    LOG.debugStream() << "DuplexChannel::writeCallbackHandler " << error << ", " 
+		      << bytes_transferred << " channel(" << channel.get() << ")";
   }
-			      
-  writer->writeRequest(m, callback);
-}
 
-/**
-   Store the transaction data for a request.
-*/
-void DuplexChannel::storeTransaction(const PubSubDataPtr& data) {
-  txnid2data_lock.lock();
-  txnid2data[data->getTxnId()] = data;
-  txnid2data_lock.unlock();;
-}
+  if (error) {
+    callback->operationFailed(ChannelWriteException());
+    channel->channelDisconnected(ChannelWriteException());
+    return;
+  }
 
-/**
-   Give the transaction back to the caller. 
-*/
-PubSubDataPtr DuplexChannel::retrieveTransaction(long txnid) {
-  txnid2data_lock.lock();
-  PubSubDataPtr data = txnid2data[txnid];
-  txnid2data.erase(txnid);
-  txnid2data_lock.unlock();
-  return data;
-}
+  callback->operationComplete();
 
-void DuplexChannel::failAllTransactions() {
-  txnid2data_lock.lock();
-  for (TransactionMap::iterator iter = txnid2data.begin(); iter != txnid2data.end(); ++iter) {
-    PubSubDataPtr& data = (*iter).second;
-    data->getCallback()->operationFailed(ChannelDiedException());
-  }
-  txnid2data.clear();
-  txnid2data_lock.unlock();
-}
+  channel->out_buf.consume(bytes_transferred);
 
-/** 
-Entry point for pthread initialisation
-*/
-void* ThreadEntryPoint(void *obj) {
-  if (LOG.isDebugEnabled()) {
-    LOG.debugStream() << "Thread entered (" << obj << ")";
+  {
+    boost::lock_guard<boost::mutex> lock(channel->sending_lock);
+    channel->sending = false;
   }
 
-  RunnableThread* thread = (RunnableThread*) obj;
-  thread->entryPoint();
-}
- 
-RunnableThread::RunnableThread(DuplexChannel& channel, const ChannelHandlerPtr& handler) 
-  : channel(channel), handler(handler)
-{
-  //  pthread_cond_init(&deathlock, NULL);
+  channel->startSending();
 }
 
-void RunnableThread::run() {
-  int ret;
-
+void DuplexChannel::writeRequest(const PubSubRequestPtr& m, const OperationCallbackPtr& callback) {
   if (LOG.isDebugEnabled()) {
-    LOG.debugStream() << "Running thread (" << this << ")";
-  }    
-  
-  pthread_attr_init(&attr);
-  ret = pthread_create(&thread, &attr, ThreadEntryPoint, this);
-  if (ret != 0) {
-    LOG.errorStream() << "Error creating thread (" << this << "). Notifying handler.";
-    handler->exceptionOccurred(&channel, ChannelThreadException());
+    LOG.debugStream() << "DuplexChannel::writeRequest channel(" << this << ") txnid(" 
+		      << m->txnid() << ") shouldClaim("<< m->has_shouldclaim() << ", " 
+		      << m->shouldclaim() << ")";
   }
-}
 
-void RunnableThread::kill() {
-  if (LOG.isDebugEnabled()) {
-    LOG.debugStream() << "Killing thread (" << this << ")";
-  }    
+  {
+    boost::shared_lock<boost::shared_mutex> lock(state_lock);
+    if (state != CONNECTED && state != CONNECTING) {
+      LOG.errorStream() << "Tried to write transaction [" << m->txnid() << "] to a channel [" 
+			<< this << "] which is " << (state == DEAD ? "DEAD" : "UNINITIALISED");
+      callback->operationFailed(UninitialisedChannelException());
+    }
+  }
 
-  pthread_cancel(thread);
-  pthread_join(thread, NULL);
+  { 
+    boost::lock_guard<boost::mutex> lock(write_lock);
+    WriteRequest w(m, callback);
+    write_queue.push_back(w);
+  }
 
-  pthread_attr_destroy(&attr);
+  startSending();
 }
 
-RunnableThread::~RunnableThread() {
-  if (LOG.isDebugEnabled()) {
-    LOG.debugStream() << "Deleting thread (" << this << ")";
-  }    
-}
 /**
-Writer thread
+   Store the transaction data for a request.
 */
-WriteThread::WriteThread(DuplexChannel& channel, int socketfd, const ChannelHandlerPtr& handler) 
-  : RunnableThread(channel, handler), socketfd(socketfd), packetsAvailableWaitCondition(requestQueue), queueMutex(), dead(false) {
-  
-}
-
-// should probably be using a queue here.
-void WriteThread::writeRequest(const PubSubRequest& m, const OperationCallbackPtr& callback) {
-  #warning "you should validate these inputs"
-  if (LOG.isDebugEnabled()) {
-    LOG.debugStream() << "Adding message to queue " << &m;
-  }
-  packetsAvailableWaitCondition.lock();
-  queueMutex.lock();
-  requestQueue.push_back(RequestPair(&m, callback));
-  queueMutex.unlock();;
-
-  packetsAvailableWaitCondition.signalAndUnlock();
-}
-  
-void WriteThread::entryPoint() {
-  while (true) {
-    packetsAvailableWaitCondition.wait();
-
-    if (dead) {
-      if (LOG.isDebugEnabled()) {
-	LOG.debugStream() << "returning from thread " << this;
-      }
-      return;
-    }
-    while (!requestQueue.empty()) { 
-      queueMutex.lock();;
-      RequestPair currentRequest = requestQueue.front();;
-      requestQueue.pop_front();
-      queueMutex.unlock();
-      if (LOG.isDebugEnabled()) {
-	LOG.debugStream() << "Writing message to socket " << currentRequest.first;
-      }
-      
-      uint32_t size = htonl(currentRequest.first->ByteSize());
-      write(socketfd, &size, sizeof(size));
-      
-      bool res = currentRequest.first->SerializeToFileDescriptor(socketfd);
-      
-      if (!res || errno != 0) {
-	LOG.errorStream() << "Error writing to socket (" << this << ") errno(" << errno << ") res(" << res << "). Disconnected.";
-	ChannelWriteException e;
-	
-	currentRequest.second->operationFailed(e);
-	channel.kill(); // make sure it's dead
-	handler->channelDisconnected(&channel, e);
-	
-	return;
-      } else {
-	currentRequest.second->operationComplete();
-      }
-    }  
+void DuplexChannel::storeTransaction(const PubSubDataPtr& data) {
+  if (LOG.isDebugEnabled()) {
+    LOG.debugStream() << "Storing txnid(" << data->getTxnId() << ") for channel(" << this << ")";
   }
-}
-
-void WriteThread::kill() {
-  dead = true;
-  packetsAvailableWaitCondition.lock();
-  packetsAvailableWaitCondition.kill();
-  packetsAvailableWaitCondition.signalAndUnlock();
-  
-  RunnableThread::kill();
-}
-
-WriteThread::~WriteThread() {
-  queueMutex.unlock();
+  boost::lock_guard<boost::mutex> lock(txnid2data_lock);
+  txnid2data[data->getTxnId()] = data;
 }
 
 /**
-Reader Thread
+   Give the transaction back to the caller. 
 */
+PubSubDataPtr DuplexChannel::retrieveTransaction(long txnid) {
+  boost::lock_guard<boost::mutex> lock(txnid2data_lock);
 
-ReadThread::ReadThread(DuplexChannel& channel, int socketfd, const ChannelHandlerPtr& handler) 
-  : RunnableThread(channel, handler), socketfd(socketfd) {
-}
-  
-void ReadThread::entryPoint() {
-  PubSubResponse* response = new PubSubResponse();
-  uint8_t* dataarray = NULL;//(uint8_t*)malloc(MAX_MESSAGE_SIZE); // shouldn't be allocating every time. check that there's a max size
-  int currentbufsize = 0;
-  
-  while (true) {
-    uint32_t size = 0;
-    int bytesread = 0;
-
-    bytesread = read(socketfd, &size, sizeof(size));
-    size = ntohl(size);
-    if (LOG.isDebugEnabled()) {
-      LOG.debugStream() << "Start reading packet of size: " << size;
-    }
-    if (bytesread < 1 || size > MAX_MESSAGE_SIZE) {
-      LOG.errorStream() << "Zero read from socket or unreasonable size read, size(" << size << ") errno(" << errno << ") " << strerror(errno);
-      channel.kill(); // make sure it's dead
-      handler->channelDisconnected(&channel, ChannelReadException());
-      break;
-    }
-
-    if (currentbufsize < size) {
-      dataarray = (uint8_t*)realloc(dataarray, size);
-    }
-    if (dataarray == NULL) {
-      LOG.errorStream() << "Error allocating input buffer of size " << size << " errno(" << errno << ") " << strerror(errno);
-      channel.kill(); // make sure it's dead
-      handler->channelDisconnected(&channel, ChannelReadException());
-      
-      break;
-    }
-    
-    memset(dataarray, 0, size);
-    bytesread = read(socketfd, dataarray, size);
-    bool res = response->ParseFromArray(dataarray, size);
+  PubSubDataPtr data = txnid2data[txnid];
+  txnid2data.erase(txnid);
+  if (data == NULL) {
+    LOG.errorStream() << "Transaction txnid(" << txnid 
+		      << ") doesn't exist in channel (" << this << ")";
+  }
 
+  return data;
+}
 
-    if (LOG.isDebugEnabled()) {
-      LOG.debugStream() << "Packet read ";
-    }
-    
-    if (!res && errno != 0 || bytesread < size) {
-      LOG.errorStream() << "Error reading from socket (" << this << ") errno(" << errno << ") res(" << res << "). Disconnected.";
-      channel.kill(); // make sure it's dead
-      handler->channelDisconnected(&channel, ChannelReadException());
-
-      break;
-    } else {
-      handler->messageReceived(&channel, *response);
-    }
+void DuplexChannel::failAllTransactions() {
+  boost::lock_guard<boost::mutex> lock(txnid2data_lock);
+  for (TransactionMap::iterator iter = txnid2data.begin(); iter != txnid2data.end(); ++iter) {
+    PubSubDataPtr& data = (*iter).second;
+    data->getCallback()->operationFailed(ChannelDiedException());
   }
-  free(dataarray);
-  delete response;
+  txnid2data.clear();
 }
 
-ReadThread::~ReadThread() {
+void DuplexChannel::setState(State s) {
+  boost::lock_guard<boost::shared_mutex> lock(state_lock);
+  state = s;
 }

Modified: hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/lib/channel.h
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/lib/channel.h?rev=1021463&r1=1021462&r2=1021463&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/lib/channel.h (original)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/lib/channel.h Mon Oct 11 19:00:42 2010
@@ -23,9 +23,20 @@
 #include <hedwig/client.h>
 #include "util.h"
 #include "data.h"
+#include "eventdispatcher.h"
+
 #include <tr1/memory>
 #include <tr1/unordered_map>
 
+#include <google/protobuf/io/zero_copy_stream_impl.h>
+
+#include <boost/shared_ptr.hpp>
+#include <boost/enable_shared_from_this.hpp>
+
+#include <boost/asio/ip/tcp.hpp>
+#include <boost/thread/mutex.hpp>
+#include <boost/thread/shared_mutex.hpp>
+
 namespace Hedwig {
   class ChannelException : public std::exception { };
   class UninitialisedChannelException : public ChannelException {};
@@ -33,6 +44,7 @@ namespace Hedwig {
   class ChannelConnectException : public ChannelException {};
   class CannotCreateSocketException : public ChannelConnectException {};
   class ChannelSetupException : public ChannelConnectException {};
+  class ChannelNotConnectedException : public ChannelConnectException {};
 
   class ChannelDiedException : public ChannelException {};
 
@@ -40,66 +52,105 @@ namespace Hedwig {
   class ChannelReadException : public ChannelException {};
   class ChannelThreadException : public ChannelException {};
 
+  class DuplexChannel;
+  typedef boost::shared_ptr<DuplexChannel> DuplexChannelPtr;
 
   class ChannelHandler {
   public:
-    virtual void messageReceived(DuplexChannel* channel, const PubSubResponse& m) = 0;
-    virtual void channelConnected(DuplexChannel* channel) = 0;
+    virtual void messageReceived(const DuplexChannelPtr& channel, const PubSubResponsePtr& m) = 0;
+    virtual void channelConnected(const DuplexChannelPtr& channel) = 0;
 
-    virtual void channelDisconnected(DuplexChannel* channel, const std::exception& e) = 0;
-    virtual void exceptionOccurred(DuplexChannel* channel, const std::exception& e) = 0;
+    virtual void channelDisconnected(const DuplexChannelPtr& channel, const std::exception& e) = 0;
+    virtual void exceptionOccurred(const DuplexChannelPtr& channel, const std::exception& e) = 0;
 
     virtual ~ChannelHandler() {}
   };
-  typedef std::tr1::shared_ptr<ChannelHandler> ChannelHandlerPtr;
 
-  class WriteThread;
-  class ReadThread;
+  typedef boost::shared_ptr<ChannelHandler> ChannelHandlerPtr;
 
-  class DuplexChannel {
-  public:
-    DuplexChannel(const HostAddress& addr, const Configuration& cfg, const ChannelHandlerPtr& handler);
 
+  class DuplexChannel : public boost::enable_shared_from_this<DuplexChannel> {
+  public:
+    DuplexChannel(EventDispatcher& dispatcher, const HostAddress& addr, 
+		  const Configuration& cfg, const ChannelHandlerPtr& handler);
+    static void connectCallbackHandler(DuplexChannelPtr channel, 
+				       const boost::system::error_code& error);
     void connect();
 
-    void writeRequest(const PubSubRequest& m, const OperationCallbackPtr& callback);
+    static void writeCallbackHandler(DuplexChannelPtr channel, OperationCallbackPtr callback, 
+				     const boost::system::error_code& error, 
+				     std::size_t bytes_transferred);
+    void writeRequest(const PubSubRequestPtr& m, const OperationCallbackPtr& callback);
     
     const HostAddress& getHostAddress() const;
 
     void storeTransaction(const PubSubDataPtr& data);
     PubSubDataPtr retrieveTransaction(long txnid);
     void failAllTransactions();
+
+    static void sizeReadCallbackHandler(DuplexChannelPtr channel, 
+					const boost::system::error_code& error, 
+					std::size_t bytes_transferred);
+    static void messageReadCallbackHandler(DuplexChannelPtr channel, std::size_t messagesize, 
+					   const boost::system::error_code& error, 
+					   std::size_t bytes_transferred);
+    static void readSize(DuplexChannelPtr channel);
+
+    void startReceiving();
+    bool isReceiving();
+    void stopReceiving();
     
+    void startSending();
+
+    void channelDisconnected(const std::exception& e);
     virtual void kill();
 
-    ~DuplexChannel();
+    virtual ~DuplexChannel();
   private:
+    enum State { UNINITIALISED, CONNECTING, CONNECTED, DEAD };
+
+    void setState(State s);
+
+    EventDispatcher& dispatcher;
+
     HostAddress address;
     ChannelHandlerPtr handler;
-    int socketfd;
-    WriteThread *writer;
-    ReadThread *reader;
+
+    boost::asio::ip::tcp::socket socket;
+    boost::asio::streambuf in_buf;
+    std::istream instream;
     
-    enum State { UNINITIALISED, CONNECTED, DEAD };
+    // only exists because protobufs can't play nice with streams (if there's more than message len in it, it tries to read all)
+    char* copy_buf;
+    std::size_t copy_buf_length;
+
+    boost::asio::streambuf out_buf;
+    
+    typedef std::pair<PubSubRequestPtr, OperationCallbackPtr> WriteRequest;
+    boost::mutex write_lock;
+    std::deque<WriteRequest> write_queue;
+
     State state;
+    boost::shared_mutex state_lock;
+
+    bool receiving;
+    boost::mutex receiving_lock;
     
+    bool sending;
+    boost::mutex sending_lock;
+
     typedef std::tr1::unordered_map<long, PubSubDataPtr> TransactionMap;
+
     TransactionMap txnid2data;
-    Mutex txnid2data_lock;
-    Mutex destruction_lock;
+    boost::mutex txnid2data_lock;
+    boost::shared_mutex destruction_lock;
   };
   
-  typedef std::tr1::shared_ptr<DuplexChannel> DuplexChannelPtr;
-};
 
-namespace std 
-{
-  namespace tr1 
-  {
-  // defined in util.cpp
-  template <> struct hash<Hedwig::DuplexChannelPtr> : public unary_function<Hedwig::DuplexChannelPtr, size_t> {
-    size_t operator()(const Hedwig::DuplexChannelPtr& channel) const;
+  struct DuplexChannelPtrHash : public std::unary_function<DuplexChannelPtr, size_t> {
+    size_t operator()(const Hedwig::DuplexChannelPtr& channel) const {
+      return reinterpret_cast<size_t>(channel.get());
+    }
   };
-  }
 };
 #endif

Modified: hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/lib/client.cpp
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/lib/client.cpp?rev=1021463&r1=1021462&r2=1021463&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/lib/client.cpp (original)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/lib/client.cpp Mon Oct 11 19:00:42 2010
@@ -20,15 +20,23 @@
 #include <memory>
 
 #include "clientimpl.h"
+#include <log4cpp/Category.hh>
+
+static log4cpp::Category &LOG = log4cpp::Category::getInstance("hedwig."__FILE__);
 
 using namespace Hedwig;
 
-const std::string DEFAULT_SERVER = "localhost:4080";
-const std::string& Configuration::getDefaultServer() const {
-  return DEFAULT_SERVER;
-}
+const std::string Configuration::DEFAULT_SERVER = "hedwig.cpp.default_server";
+const std::string Configuration::MESSAGE_CONSUME_RETRY_WAIT_TIME = "hedwig.cpp.message_consume_retry_wait_time";
+const std::string Configuration::SUBSCRIBER_CONSUME_RETRY_WAIT_TIME = "hedwig.cpp.subscriber_consume_retry_wait_time";
+const std::string Configuration::MAX_MESSAGE_QUEUE_SIZE = "hedwig.cpp.max_msgqueue_size";
+const std::string Configuration::RECONNECT_SUBSCRIBE_RETRY_WAIT_TIME = "hedwig.cpp.reconnect_subscribe_retry_wait_time";
+const std::string Configuration::SYNC_REQUEST_TIMEOUT = "hedwig.cpp.sync_request_timeout";
 
 Client::Client(const Configuration& conf) {
+  if (LOG.isDebugEnabled()) {
+    LOG.debugStream() << "Client::Client (" << this << ")";
+  }
   clientimpl = ClientImpl::Create( conf );
 }
 
@@ -41,6 +49,10 @@ Publisher& Client::getPublisher() {
 }
 
 Client::~Client() {
+  if (LOG.isDebugEnabled()) {
+    LOG.debugStream() << "Client::~Client (" << this << ")";
+  }
+
   clientimpl->Destroy();
 }
 

Modified: hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/lib/clientimpl.cpp
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/lib/clientimpl.cpp?rev=1021463&r1=1021462&r2=1021463&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/lib/clientimpl.cpp (original)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/lib/clientimpl.cpp Mon Oct 11 19:00:42 2010
@@ -25,31 +25,54 @@ static log4cpp::Category &LOG = log4cpp:
 
 using namespace Hedwig;
 
+const std::string DEFAULT_SERVER_DEFAULT_VAL = "";
+
+void SyncOperationCallback::wait() {
+  boost::unique_lock<boost::mutex> lock(mut);
+  while(response==PENDING) {
+    if (cond.timed_wait(lock, boost::posix_time::milliseconds(timeout)) == false) {
+      LOG.errorStream() << "Timeout waiting for operation to complete " << this;
+
+      response = TIMEOUT;
+    }
+  }
+}
 
 void SyncOperationCallback::operationComplete() {
-  lock();
-  response = SUCCESS;
-  signalAndUnlock();
+  if (response == TIMEOUT) {
+    LOG.errorStream() << "operationCompleted successfully after timeout " << this;
+    return;
+  }
+
+  {
+    boost::lock_guard<boost::mutex> lock(mut);
+    response = SUCCESS;
+  }
+  cond.notify_all();
 }
 
 void SyncOperationCallback::operationFailed(const std::exception& exception) {
-  lock();
-  if (typeid(exception) == typeid(ChannelConnectException)) {
-    response = NOCONNECT;
-  } else if (typeid(exception) == typeid(ServiceDownException)) {
-    response = SERVICEDOWN;
-  } else if (typeid(exception) == typeid(AlreadySubscribedException)) {
-    response = ALREADY_SUBSCRIBED;
-  } else if (typeid(exception) == typeid(NotSubscribedException)) {
-    response = NOT_SUBSCRIBED;
-  } else {
-    response = UNKNOWN;
+  if (response == TIMEOUT) {
+    LOG.errorStream() << "operationCompleted unsuccessfully after timeout " << this;
+    return;
   }
-  signalAndUnlock();
-}
+
+  {
+    boost::lock_guard<boost::mutex> lock(mut);
     
-bool SyncOperationCallback::isTrue() {
-  return response != PENDING;
+    if (typeid(exception) == typeid(ChannelConnectException)) {
+      response = NOCONNECT;
+    } else if (typeid(exception) == typeid(ServiceDownException)) {
+      response = SERVICEDOWN;
+    } else if (typeid(exception) == typeid(AlreadySubscribedException)) {
+      response = ALREADY_SUBSCRIBED;
+    } else if (typeid(exception) == typeid(NotSubscribedException)) {
+      response = NOT_SUBSCRIBED;
+    } else {
+      response = UNKNOWN;
+    }
+  }
+  cond.notify_all();
 }
 
 void SyncOperationCallback::throwExceptionIfNeeded() {
@@ -68,34 +91,36 @@ void SyncOperationCallback::throwExcepti
   case NOT_SUBSCRIBED:
     throw NotSubscribedException();
     break;
+  case TIMEOUT:
+    throw ClientTimeoutException();
+    break;
   default:
     throw ClientException();
     break;
   }
 }
 
-HedwigClientChannelHandler::HedwigClientChannelHandler(ClientImplPtr& client) 
+HedwigClientChannelHandler::HedwigClientChannelHandler(const ClientImplPtr& client) 
   : client(client){
 }
 
-void HedwigClientChannelHandler::messageReceived(DuplexChannel* channel, const PubSubResponse& m) {
-  LOG.debugStream() << "Message received";
-  if (m.has_message()) {
+void HedwigClientChannelHandler::messageReceived(const DuplexChannelPtr& channel, const PubSubResponsePtr& m) {
+  LOG.debugStream() << "Message received txnid(" << m->txnid() << ") status(" 
+		    << m->statuscode() << ")";
+  if (m->has_message()) {
     LOG.errorStream() << "Subscription response, ignore for now";
     return;
   }
   
-  long txnid = m.txnid();
-  PubSubDataPtr data = channel->retrieveTransaction(m.txnid()); 
+  PubSubDataPtr data = channel->retrieveTransaction(m->txnid()); 
   /* you now have ownership of data, don't leave this funciton without deleting it or 
      palming it off to someone else */
 
   if (data == NULL) {
-    LOG.errorStream() << "Transaction " << m.txnid() << " doesn't exist in channel " << channel;
     return;
   }
 
-  if (m.statuscode() == NOT_RESPONSIBLE_FOR_TOPIC) {
+  if (m->statuscode() == NOT_RESPONSIBLE_FOR_TOPIC) {
     client->redirectRequest(channel, data, m);
     return;
   }
@@ -115,17 +140,17 @@ void HedwigClientChannelHandler::message
 }
 
 
-void HedwigClientChannelHandler::channelConnected(DuplexChannel* channel) {
+void HedwigClientChannelHandler::channelConnected(const DuplexChannelPtr& channel) {
   // do nothing 
 }
 
-void HedwigClientChannelHandler::channelDisconnected(DuplexChannel* channel, const std::exception& e) {
+void HedwigClientChannelHandler::channelDisconnected(const DuplexChannelPtr& channel, const std::exception& e) {
   LOG.errorStream() << "Channel disconnected";
 
   client->channelDied(channel);
 }
 
-void HedwigClientChannelHandler::exceptionOccurred(DuplexChannel* channel, const std::exception& e) {
+void HedwigClientChannelHandler::exceptionOccurred(const DuplexChannelPtr& channel, const std::exception& e) {
   LOG.errorStream() << "Exception occurred" << e.what();
 }
 
@@ -142,185 +167,39 @@ Increment the transaction counter and re
 @returns the next transaction id
 */
 long ClientTxnCounter::next() {  // would be nice to remove lock from here, look more into it
-  mutex.lock();
-  long next= ++counter; 
-  mutex.unlock();
-  return next;
-}
-
-
-
-PubSubDataPtr PubSubData::forPublishRequest(long txnid, const std::string& topic, const std::string& body, const OperationCallbackPtr& callback) {
-  PubSubDataPtr ptr(new PubSubData());
-  ptr->type = PUBLISH;
-  ptr->txnid = txnid;
-  ptr->topic = topic;
-  ptr->body = body;
-  ptr->callback = callback;
-  return ptr;
-}
-
-PubSubDataPtr PubSubData::forSubscribeRequest(long txnid, const std::string& subscriberid, const std::string& topic, const OperationCallbackPtr& callback, SubscribeRequest::CreateOrAttach mode) {
-  PubSubDataPtr ptr(new PubSubData());
-  ptr->type = SUBSCRIBE;
-  ptr->txnid = txnid;
-  ptr->subscriberid = subscriberid;
-  ptr->topic = topic;
-  ptr->callback = callback;
-  ptr->mode = mode;
-  return ptr;  
-}
-
-PubSubDataPtr PubSubData::forUnsubscribeRequest(long txnid, const std::string& subscriberid, const std::string& topic, const OperationCallbackPtr& callback) {
-  PubSubDataPtr ptr(new PubSubData());
-  ptr->type = UNSUBSCRIBE;
-  ptr->txnid = txnid;
-  ptr->subscriberid = subscriberid;
-  ptr->topic = topic;
-  ptr->callback = callback;
-  return ptr;  
-}
-
-PubSubDataPtr PubSubData::forConsumeRequest(long txnid, const std::string& subscriberid, const std::string& topic, const MessageSeqId msgid) {
-  PubSubDataPtr ptr(new PubSubData());
-  ptr->type = CONSUME;
-  ptr->txnid = txnid;
-  ptr->subscriberid = subscriberid;
-  ptr->topic = topic;
-  ptr->msgid = msgid;
-  return ptr;  
-}
-
-PubSubData::PubSubData() : request(NULL) {  
-}
-
-PubSubData::~PubSubData() {
-  if (request != NULL) {
-    delete request;
-  }
-}
-
-OperationType PubSubData::getType() const {
-  return type;
-}
+  boost::lock_guard<boost::mutex> lock(mutex);
 
-long PubSubData::getTxnId() const {
-  return txnid;
-}
-
-const std::string& PubSubData::getTopic() const {
-  return topic;
-}
-
-const std::string& PubSubData::getBody() const {
-  return body;
-}
-
-const PubSubRequest& PubSubData::getRequest() {
-  if (request != NULL) {
-    delete request;
-    request = NULL;
-  }
-  request = new Hedwig::PubSubRequest();
-  request->set_protocolversion(Hedwig::VERSION_ONE);
-  request->set_type(type);
-  request->set_txnid(txnid);
-  request->set_shouldclaim(shouldClaim);
-  request->set_topic(topic);
-    
-  if (type == PUBLISH) {
-    if (LOG.isDebugEnabled()) {
-      LOG.debugStream() << "Creating publish request";
-    }
-    Hedwig::PublishRequest* pubreq = request->mutable_publishrequest();
-    Hedwig::Message* msg = pubreq->mutable_msg();
-    msg->set_body(body);
-  } else if (type == SUBSCRIBE) {
-    if (LOG.isDebugEnabled()) {
-      LOG.debugStream() << "Creating subscribe request";
-    }
-
-    Hedwig::SubscribeRequest* subreq = request->mutable_subscriberequest();
-    subreq->set_subscriberid(subscriberid);
-    subreq->set_createorattach(mode);
-  } else if (type == CONSUME) {
-    if (LOG.isDebugEnabled()) {
-      LOG.debugStream() << "Creating consume request";
-    }
-
-    Hedwig::ConsumeRequest* conreq = request->mutable_consumerequest();
-    conreq->set_subscriberid(subscriberid);
-    conreq->mutable_msgid()->CopyFrom(msgid);
-  } else if (type == UNSUBSCRIBE) {
-    if (LOG.isDebugEnabled()) {
-      LOG.debugStream() << "Creating unsubscribe request";
-    }
-    
-    Hedwig::UnsubscribeRequest* unsubreq = request->mutable_unsubscriberequest();
-    unsubreq->set_subscriberid(subscriberid);    
-  } else {
-    LOG.errorStream() << "Tried to create a request message for the wrong type [" << type << "]";
-    throw UnknownRequestException();
-  }
-
-
-
-  return *request;
-}
-
-void PubSubData::setShouldClaim(bool shouldClaim) {
-  shouldClaim = shouldClaim;
-}
-
-void PubSubData::addTriedServer(HostAddress& h) {
-  triedservers.insert(h);
-}
-
-bool PubSubData::hasTriedServer(HostAddress& h) {
-  return triedservers.count(h) > 0;
-}
-
-void PubSubData::clearTriedServers() {
-  triedservers.clear();
-}
-
-OperationCallbackPtr& PubSubData::getCallback() {
-  return callback;
-}
-
-void PubSubData::setCallback(const OperationCallbackPtr& callback) {
-  this->callback = callback;
-}
-
-const std::string& PubSubData::getSubscriberId() const {
-  return subscriberid;
-}
+  long next= ++counter; 
 
-SubscribeRequest::CreateOrAttach PubSubData::getMode() const {
-  return mode;
+  return next;
 }
 
-ClientImplPtr& ClientImpl::Create(const Configuration& conf) {
-  ClientImpl* impl = new ClientImpl(conf);
-    if (LOG.isDebugEnabled()) {
+ClientImplPtr ClientImpl::Create(const Configuration& conf) {
+  ClientImplPtr impl(new ClientImpl(conf));
+  if (LOG.isDebugEnabled()) {
     LOG.debugStream() << "Creating Clientimpl " << impl;
   }
+  impl->dispatcher.start();
 
-  return impl->selfptr;
+  return impl;
 }
 
 void ClientImpl::Destroy() {
   if (LOG.isDebugEnabled()) {
     LOG.debugStream() << "destroying Clientimpl " << this;
   }
-  allchannels_lock.lock();
 
-  shuttingDownFlag = true;
-  for (ChannelMap::iterator iter = allchannels.begin(); iter != allchannels.end(); ++iter ) {
-    (*iter).second->kill();
-  }  
-  allchannels.clear();
-  allchannels_lock.unlock();
+  dispatcher.stop();
+  {
+    boost::lock_guard<boost::shared_mutex> lock(allchannels_lock);
+    
+    shuttingDownFlag = true;
+    for (ChannelMap::iterator iter = allchannels.begin(); iter != allchannels.end(); ++iter ) {
+      (*iter)->kill();
+    }  
+    allchannels.clear();
+  }
+
   /* destruction of the maps will clean up any items they hold */
   
   if (subscriber != NULL) {
@@ -331,12 +210,10 @@ void ClientImpl::Destroy() {
     delete publisher;
     publisher = NULL;
   }
-
-  selfptr = ClientImplPtr(); // clear the self pointer
 }
 
 ClientImpl::ClientImpl(const Configuration& conf) 
-  : selfptr(this), conf(conf), subscriber(NULL), publisher(NULL), counterobj(), shuttingDownFlag(false)
+  : conf(conf), publisher(NULL), subscriber(NULL), counterobj(), shuttingDownFlag(false)
 {
 }
 
@@ -350,22 +227,20 @@ Publisher& ClientImpl::getPublisher() {
     
 SubscriberImpl& ClientImpl::getSubscriberImpl() {
   if (subscriber == NULL) {
-    subscribercreate_lock.lock();
+    boost::lock_guard<boost::mutex> lock(subscribercreate_lock);
     if (subscriber == NULL) {
-      subscriber = new SubscriberImpl(selfptr);
+      subscriber = new SubscriberImpl(shared_from_this());
     }
-    subscribercreate_lock.unlock();
   }
   return *subscriber;
 }
 
 PublisherImpl& ClientImpl::getPublisherImpl() {
   if (publisher == NULL) { 
-    publishercreate_lock.lock();
+    boost::lock_guard<boost::mutex> lock(publishercreate_lock);
     if (publisher == NULL) {
-      publisher = new PublisherImpl(selfptr);
+      publisher = new PublisherImpl(shared_from_this());
     }
-    publishercreate_lock.unlock();
   }
   return *publisher;
 }
@@ -374,18 +249,21 @@ ClientTxnCounter& ClientImpl::counter() 
   return counterobj;
 }
 
-void ClientImpl::redirectRequest(DuplexChannel* channel, PubSubDataPtr& data, const PubSubResponse& response) {
+void ClientImpl::redirectRequest(const DuplexChannelPtr& channel, PubSubDataPtr& data, const PubSubResponsePtr& response) {
   HostAddress oldhost = channel->getHostAddress();
   data->addTriedServer(oldhost);
   
-  HostAddress h = HostAddress::fromString(response.statusmsg());
+  HostAddress h = HostAddress::fromString(response->statusmsg());
   if (data->hasTriedServer(h)) {
-    LOG.errorStream() << "We've been told to try request [" << data->getTxnId() << "] with [" << h.getAddressString()<< "] by " << channel->getHostAddress().getAddressString() << " but we've already tried that. Failing operation";
+    LOG.errorStream() << "We've been told to try request [" << data->getTxnId() << "] with [" 
+		      << h.getAddressString()<< "] by " << oldhost.getAddressString() 
+		      << " but we've already tried that. Failing operation";
     data->getCallback()->operationFailed(InvalidRedirectException());
     return;
   }
   if (LOG.isDebugEnabled()) {
-    LOG.debugStream() << "We've been told  [" << data->getTopic() << "] is on [" << h.getAddressString() << "] by [" << oldhost.getAddressString() << "]. Redirecting request " << data->getTxnId();
+    LOG.debugStream() << "We've been told  [" << data->getTopic() << "] is on [" << h.getAddressString() 
+		      << "] by [" << oldhost.getAddressString() << "]. Redirecting request " << data->getTxnId();
   }
   data->setShouldClaim(true);
 
@@ -393,21 +271,17 @@ void ClientImpl::redirectRequest(DuplexC
   DuplexChannelPtr newchannel;
   try {
     if (data->getType() == SUBSCRIBE) {
-      SubscriberClientChannelHandlerPtr handler(new SubscriberClientChannelHandler(selfptr, this->getSubscriberImpl(), data));
-      ChannelHandlerPtr basehandler = handler;
-      
-      newchannel = createChannelForTopic(data->getTopic(), basehandler);
+      SubscriberClientChannelHandlerPtr handler(new SubscriberClientChannelHandler(shared_from_this(), 
+										   this->getSubscriberImpl(), data));
+      newchannel = createChannel(data->getTopic(), handler);
       handler->setChannel(newchannel);
-      
       getSubscriberImpl().doSubscribe(newchannel, data, handler);
+    } else if (data->getType() == PUBLISH) {
+      newchannel = getChannel(data->getTopic());
+      getPublisherImpl().doPublish(newchannel, data);
     } else {
-      newchannel = getChannelForTopic(data->getTopic());
-      
-      if (data->getType() == PUBLISH) {
-	getPublisherImpl().doPublish(newchannel, data);
-      } else {
-	getSubscriberImpl().doUnsubscribe(newchannel, data);
-      }
+      newchannel = getChannel(data->getTopic());
+      getSubscriberImpl().doUnsubscribe(newchannel, data);
     }
   } catch (ShuttingDownException& e) {
     return; // no point in redirecting if we're shutting down
@@ -420,52 +294,57 @@ ClientImpl::~ClientImpl() {
   }
 }
 
-DuplexChannelPtr ClientImpl::createChannelForTopic(const std::string& topic, ChannelHandlerPtr& handler) {
+DuplexChannelPtr ClientImpl::createChannel(const std::string& topic, const ChannelHandlerPtr& handler) {
   // get the host address
   // create a channel to the host
   HostAddress addr = topic2host[topic];
   if (addr.isNullHost()) {
-    addr = HostAddress::fromString(conf.getDefaultServer());
+    addr = HostAddress::fromString(conf.get(Configuration::DEFAULT_SERVER, DEFAULT_SERVER_DEFAULT_VAL));
+    setHostForTopic(topic, addr);
   }
 
-  DuplexChannelPtr channel(new DuplexChannel(addr, conf, handler));
-  channel->connect();
+  DuplexChannelPtr channel(new DuplexChannel(dispatcher, addr, conf, handler));
 
-  allchannels_lock.lock();
+  boost::lock_guard<boost::shared_mutex> lock(allchannels_lock);
   if (shuttingDownFlag) {
     channel->kill();
-    allchannels_lock.unlock();
     throw ShuttingDownException();
   }
-  allchannels[channel.get()] = channel;
+  channel->connect();
+
+  allchannels.insert(channel);
   if (LOG.isDebugEnabled()) {
     LOG.debugStream() << "(create) All channels size: " << allchannels.size();
   }
-  allchannels_lock.unlock();
 
   return channel;
 }
 
-DuplexChannelPtr ClientImpl::getChannelForTopic(const std::string& topic) {
+DuplexChannelPtr ClientImpl::getChannel(const std::string& topic) {
   HostAddress addr = topic2host[topic];
+  if (addr.isNullHost()) {
+    addr = HostAddress::fromString(conf.get(Configuration::DEFAULT_SERVER, DEFAULT_SERVER_DEFAULT_VAL));
+    setHostForTopic(topic, addr);
+  }  
   DuplexChannelPtr channel = host2channel[addr];
 
-  if (channel.get() == 0 || addr.isNullHost()) {
-    ChannelHandlerPtr handler(new HedwigClientChannelHandler(selfptr));
-    channel = createChannelForTopic(topic, handler);
-    host2channel_lock.lock();
+  if (channel.get() == 0) {
+    if (LOG.isDebugEnabled()) {
+      LOG.debugStream() << " No channel for topic, creating new channel.get() " << channel.get() << " addr " << addr.getAddressString();
+    }
+    ChannelHandlerPtr handler(new HedwigClientChannelHandler(shared_from_this()));
+    channel = createChannel(topic, handler);
+
+    boost::lock_guard<boost::shared_mutex> lock(host2channel_lock);
     host2channel[addr] = channel;
-    host2channel_lock.unlock();
-    return channel;
-  }
+  } 
 
   return channel;
 }
 
 void ClientImpl::setHostForTopic(const std::string& topic, const HostAddress& host) {
-  topic2host_lock.lock();
+  boost::lock_guard<boost::shared_mutex> lock(topic2host_lock);
   topic2host[topic] = host;
-  topic2host_lock.unlock();
 }
 
 bool ClientImpl::shuttingDown() const {
@@ -478,15 +357,15 @@ bool ClientImpl::shuttingDown() const {
    This does not delete the channel. Some publishers or subscribers will still hold it and will be errored
    when they try to do anything with it. 
 */
-void ClientImpl::channelDied(DuplexChannel* channel) {
+void ClientImpl::channelDied(const DuplexChannelPtr& channel) {
   if (shuttingDownFlag) {
     return;
   }
 
-  host2topics_lock.lock();
-  host2channel_lock.lock();
-  topic2host_lock.lock();
-  allchannels_lock.lock();
+  boost::lock_guard<boost::shared_mutex> h2tlock(host2topics_lock);
+  boost::lock_guard<boost::shared_mutex> h2clock(host2channel_lock);
+  boost::lock_guard<boost::shared_mutex> t2hlock(topic2host_lock);
+  boost::lock_guard<boost::shared_mutex> aclock(allchannels_lock);
   // get host
   HostAddress addr = channel->getHostAddress();
   
@@ -497,9 +376,12 @@ void ClientImpl::channelDied(DuplexChann
   host2channel.erase(addr);
 
   allchannels.erase(channel); // channel should be deleted here
+}
+
+const Configuration& ClientImpl::getConfiguration() {
+  return conf;
+}
 
-  allchannels_lock.unlock();
-  host2topics_lock.unlock();
-  host2channel_lock.unlock();
-  topic2host_lock.unlock();
+boost::asio::io_service& ClientImpl::getService() {
+  return dispatcher.getService();
 }

Modified: hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/lib/clientimpl.h
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/lib/clientimpl.h?rev=1021463&r1=1021462&r2=1021463&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/lib/clientimpl.h (original)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/lib/clientimpl.h Mon Oct 11 19:00:42 2010
@@ -22,22 +22,30 @@
 #include <hedwig/client.h>
 #include <hedwig/protocol.h>
 
+#include <boost/asio.hpp>
+#include <boost/shared_ptr.hpp>
+#include <boost/enable_shared_from_this.hpp>
+#include <boost/thread/mutex.hpp>
+#include <boost/thread/condition_variable.hpp>
+
 #include <tr1/unordered_map>
 #include <list>
+
 #include "util.h"
-#include <pthread.h>
 #include "channel.h"
 #include "data.h"
+#include "eventdispatcher.h"
 
 namespace Hedwig {
-  class SyncOperationCallback : public OperationCallback, public WaitConditionBase {
+  const int DEFAULT_SYNC_REQUEST_TIMEOUT = 5000;
+
+  class SyncOperationCallback : public OperationCallback {
   public:
-    SyncOperationCallback() : response(PENDING) {}
+  SyncOperationCallback(int timeout) : response(PENDING), timeout(timeout) {}
     virtual void operationComplete();
     virtual void operationFailed(const std::exception& exception);
     
-    virtual bool isTrue();
-
+    void wait();
     void throwExceptionIfNeeded();
     
   private:
@@ -48,21 +56,26 @@ namespace Hedwig {
       SERVICEDOWN,
       NOT_SUBSCRIBED,
       ALREADY_SUBSCRIBED,
+      TIMEOUT,
       UNKNOWN
     } response;
+    
+    boost::condition_variable cond;
+    boost::mutex mut;
+    int timeout;
   };
 
   class HedwigClientChannelHandler : public ChannelHandler {
   public:
-    HedwigClientChannelHandler(ClientImplPtr& client);
+    HedwigClientChannelHandler(const ClientImplPtr& client);
     
-    virtual void messageReceived(DuplexChannel* channel, const PubSubResponse& m);
-    virtual void channelConnected(DuplexChannel* channel);
-    virtual void channelDisconnected(DuplexChannel* channel, const std::exception& e);
-    virtual void exceptionOccurred(DuplexChannel* channel, const std::exception& e);
+    virtual void messageReceived(const DuplexChannelPtr& channel, const PubSubResponsePtr& m);
+    virtual void channelConnected(const DuplexChannelPtr& channel);
+    virtual void channelDisconnected(const DuplexChannelPtr& channel, const std::exception& e);
+    virtual void exceptionOccurred(const DuplexChannelPtr& channel, const std::exception& e);
     
   protected:
-    ClientImplPtr client;
+    const ClientImplPtr client;
   };
   
   class PublisherImpl;
@@ -71,9 +84,9 @@ namespace Hedwig {
   /**
      Implementation of the hedwig client. This class takes care of globals such as the topic->host map and the transaction id counter.
   */
-  class ClientImpl {
+  class ClientImpl : public boost::enable_shared_from_this<ClientImpl> {
   public:
-    static ClientImplPtr& Create(const Configuration& conf);
+    static ClientImplPtr Create(const Configuration& conf);
     void Destroy();
 
     Subscriber& getSubscriber();
@@ -81,49 +94,55 @@ namespace Hedwig {
 
     ClientTxnCounter& counter();
 
-    void redirectRequest(DuplexChannel* channel, PubSubDataPtr& data, const PubSubResponse& response);
+    void redirectRequest(const DuplexChannelPtr& channel, PubSubDataPtr& data, const PubSubResponsePtr& response);
 
     const HostAddress& getHostForTopic(const std::string& topic);
 
-    DuplexChannelPtr createChannelForTopic(const std::string& topic, ChannelHandlerPtr& handler);
-    DuplexChannelPtr getChannelForTopic(const std::string& topic);
-    
+    //DuplexChannelPtr getChannelForTopic(const std::string& topic, OperationCallback& callback);
+    //DuplexChannelPtr createChannelForTopic(const std::string& topic, ChannelHandlerPtr& handler, OperationCallback& callback);
+    DuplexChannelPtr createChannel(const std::string& topic, const ChannelHandlerPtr& handler);    
+    DuplexChannelPtr getChannel(const std::string& topic);
+
     void setHostForTopic(const std::string& topic, const HostAddress& host);
 
-    void setChannelForHost(const HostAddress& address, DuplexChannel* channel);
-    void channelDied(DuplexChannel* channel);
+    void setChannelForHost(const HostAddress& address, const DuplexChannelPtr& channel);
+    void channelDied(const DuplexChannelPtr& channel);
     bool shuttingDown() const;
     
     SubscriberImpl& getSubscriberImpl();
     PublisherImpl& getPublisherImpl();
 
+    const Configuration& getConfiguration();
+    boost::asio::io_service& getService();
+
     ~ClientImpl();
   private:
     ClientImpl(const Configuration& conf);
 
-    ClientImplPtr selfptr;
-
     const Configuration& conf;
+
+    boost::mutex publishercreate_lock;
     PublisherImpl* publisher;
+
+    boost::mutex subscribercreate_lock;
     SubscriberImpl* subscriber;
-    ClientTxnCounter counterobj;
 
+    ClientTxnCounter counterobj;
 
-    typedef std::tr1::unordered_multimap<HostAddress, std::string> Host2TopicsMap;
+    EventDispatcher dispatcher;
+    
+    typedef std::tr1::unordered_multimap<HostAddress, std::string, HostAddressHash > Host2TopicsMap;
     Host2TopicsMap host2topics;
-    Mutex host2topics_lock;
+    boost::shared_mutex host2topics_lock;
 
-    std::tr1::unordered_map<HostAddress, DuplexChannelPtr> host2channel;
-    Mutex host2channel_lock;
+    std::tr1::unordered_map<HostAddress, DuplexChannelPtr, HostAddressHash > host2channel;
+    boost::shared_mutex host2channel_lock;
     std::tr1::unordered_map<std::string, HostAddress> topic2host;
-    Mutex topic2host_lock;
-
-    Mutex publishercreate_lock;
-    Mutex subscribercreate_lock;
+    boost::shared_mutex topic2host_lock;
 
-    typedef std::tr1::unordered_map<DuplexChannel*, DuplexChannelPtr> ChannelMap;
+    typedef std::tr1::unordered_set<DuplexChannelPtr, DuplexChannelPtrHash > ChannelMap;
     ChannelMap allchannels;
-    Mutex allchannels_lock;
+    boost::shared_mutex allchannels_lock;
 
     bool shuttingDownFlag;
   };

Added: hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/lib/data.cpp
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/lib/data.cpp?rev=1021463&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/lib/data.cpp (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/lib/data.cpp Mon Oct 11 19:00:42 2010
@@ -0,0 +1,173 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <hedwig/protocol.h>
+#include "data.h"
+
+#include <log4cpp/Category.hh>
+
+static log4cpp::Category &LOG = log4cpp::Category::getInstance("hedwig."__FILE__);
+
+using namespace Hedwig;
+
+PubSubDataPtr PubSubData::forPublishRequest(long txnid, const std::string& topic, const std::string& body, const OperationCallbackPtr& callback) {
+  PubSubDataPtr ptr(new PubSubData());
+  ptr->type = PUBLISH;
+  ptr->txnid = txnid;
+  ptr->topic = topic;
+  ptr->body = body;
+  ptr->callback = callback;
+  return ptr;
+}
+
+PubSubDataPtr PubSubData::forSubscribeRequest(long txnid, const std::string& subscriberid, const std::string& topic, const OperationCallbackPtr& callback, SubscribeRequest::CreateOrAttach mode) {
+  PubSubDataPtr ptr(new PubSubData());
+  ptr->type = SUBSCRIBE;
+  ptr->txnid = txnid;
+  ptr->subscriberid = subscriberid;
+  ptr->topic = topic;
+  ptr->callback = callback;
+  ptr->mode = mode;
+  return ptr;  
+}
+
+PubSubDataPtr PubSubData::forUnsubscribeRequest(long txnid, const std::string& subscriberid, const std::string& topic, const OperationCallbackPtr& callback) {
+  PubSubDataPtr ptr(new PubSubData());
+  ptr->type = UNSUBSCRIBE;
+  ptr->txnid = txnid;
+  ptr->subscriberid = subscriberid;
+  ptr->topic = topic;
+  ptr->callback = callback;
+  return ptr;  
+}
+
+PubSubDataPtr PubSubData::forConsumeRequest(long txnid, const std::string& subscriberid, const std::string& topic, const MessageSeqId msgid) {
+  PubSubDataPtr ptr(new PubSubData());
+  ptr->type = CONSUME;
+  ptr->txnid = txnid;
+  ptr->subscriberid = subscriberid;
+  ptr->topic = topic;
+  ptr->msgid = msgid;
+  return ptr;  
+}
+
+PubSubData::PubSubData() : shouldClaim(false) {  
+}
+
+PubSubData::~PubSubData() {
+}
+
+OperationType PubSubData::getType() const {
+  return type;
+}
+
+long PubSubData::getTxnId() const {
+  return txnid;
+}
+
+const std::string& PubSubData::getTopic() const {
+  return topic;
+}
+
+const std::string& PubSubData::getBody() const {
+  return body;
+}
+
+const MessageSeqId PubSubData::getMessageSeqId() const {
+  return msgid;
+}
+
+const PubSubRequestPtr PubSubData::getRequest() {
+  PubSubRequestPtr request(new Hedwig::PubSubRequest());
+  request->set_protocolversion(Hedwig::VERSION_ONE);
+  request->set_type(type);
+  request->set_txnid(txnid);
+  if (shouldClaim) {
+    request->set_shouldclaim(shouldClaim);
+  }
+  request->set_topic(topic);
+    
+  if (type == PUBLISH) {
+    if (LOG.isDebugEnabled()) {
+      LOG.debugStream() << "Creating publish request";
+    }
+    Hedwig::PublishRequest* pubreq = request->mutable_publishrequest();
+    Hedwig::Message* msg = pubreq->mutable_msg();
+    msg->set_body(body);
+  } else if (type == SUBSCRIBE) {
+    if (LOG.isDebugEnabled()) {
+      LOG.debugStream() << "Creating subscribe request";
+    }
+
+    Hedwig::SubscribeRequest* subreq = request->mutable_subscriberequest();
+    subreq->set_subscriberid(subscriberid);
+    subreq->set_createorattach(mode);
+  } else if (type == CONSUME) {
+    if (LOG.isDebugEnabled()) {
+      LOG.debugStream() << "Creating consume request";
+    }
+
+    Hedwig::ConsumeRequest* conreq = request->mutable_consumerequest();
+    conreq->set_subscriberid(subscriberid);
+    conreq->mutable_msgid()->CopyFrom(msgid);
+  } else if (type == UNSUBSCRIBE) {
+    if (LOG.isDebugEnabled()) {
+      LOG.debugStream() << "Creating unsubscribe request";
+    }
+    
+    Hedwig::UnsubscribeRequest* unsubreq = request->mutable_unsubscriberequest();
+    unsubreq->set_subscriberid(subscriberid);    
+  } else {
+    LOG.errorStream() << "Tried to create a request message for the wrong type [" << type << "]";
+    throw UnknownRequestException();
+  }
+
+  return request;
+}
+
+void PubSubData::setShouldClaim(bool shouldClaim) {
+  this->shouldClaim = shouldClaim;
+}
+
+void PubSubData::addTriedServer(HostAddress& h) {
+  triedservers.insert(h);
+}
+
+bool PubSubData::hasTriedServer(HostAddress& h) {
+  return triedservers.count(h) > 0;
+}
+
+void PubSubData::clearTriedServers() {
+  triedservers.clear();
+}
+
+OperationCallbackPtr& PubSubData::getCallback() {
+  return callback;
+}
+
+void PubSubData::setCallback(const OperationCallbackPtr& callback) {
+  this->callback = callback;
+}
+
+const std::string& PubSubData::getSubscriberId() const {
+  return subscriberid;
+}
+
+SubscribeRequest::CreateOrAttach PubSubData::getMode() const {
+  return mode;
+}

Modified: hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/lib/data.h
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/lib/data.h?rev=1021463&r1=1021462&r2=1021463&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/lib/data.h (original)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/lib/data.h Mon Oct 11 19:00:42 2010
@@ -25,6 +25,8 @@
 #include <pthread.h>
 #include <tr1/unordered_set>
 #include "util.h"
+#include <boost/shared_ptr.hpp>
+#include <boost/thread/mutex.hpp>
 
 namespace Hedwig {
   /**
@@ -38,11 +40,13 @@ namespace Hedwig {
     
   private:
     long counter;
-    Mutex mutex;
+    boost::mutex mutex;
   };
 
   class PubSubData;
-  typedef std::tr1::shared_ptr<PubSubData> PubSubDataPtr;
+  typedef boost::shared_ptr<PubSubData> PubSubDataPtr;
+  typedef boost::shared_ptr<PubSubRequest> PubSubRequestPtr;
+  typedef boost::shared_ptr<PubSubResponse> PubSubResponsePtr;
 
   /**
      Data structure to hold information about requests and build request messages.
@@ -63,10 +67,11 @@ namespace Hedwig {
     const std::string& getSubscriberId() const;
     const std::string& getTopic() const;
     const std::string& getBody() const;
+    const MessageSeqId getMessageSeqId() const;
 
     void setShouldClaim(bool shouldClaim);
 
-    const PubSubRequest& getRequest();
+    const PubSubRequestPtr getRequest();
     void setCallback(const OperationCallbackPtr& callback);
     OperationCallbackPtr& getCallback();
     SubscribeRequest::CreateOrAttach getMode() const;
@@ -75,8 +80,8 @@ namespace Hedwig {
     bool hasTriedServer(HostAddress& h);
     void clearTriedServers();
   private:
+
     PubSubData();
-    PubSubRequest* request;
     
     OperationType type;
     long txnid;
@@ -87,9 +92,8 @@ namespace Hedwig {
     OperationCallbackPtr callback;
     SubscribeRequest::CreateOrAttach mode;
     MessageSeqId msgid;
-    std::tr1::unordered_set<HostAddress> triedservers;
+    std::tr1::unordered_set<HostAddress, HostAddressHash > triedservers;
   };
   
-
 };
 #endif

Added: hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/lib/eventdispatcher.cpp
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/lib/eventdispatcher.cpp?rev=1021463&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/lib/eventdispatcher.cpp (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/lib/eventdispatcher.cpp Mon Oct 11 19:00:42 2010
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "eventdispatcher.h"
+
+#include <log4cpp/Category.hh>
+
+static log4cpp::Category &LOG = log4cpp::Category::getInstance("hedwig."__FILE__);
+
+using namespace Hedwig;
+
+EventDispatcher::EventDispatcher() : service(), dummy_work(NULL), t(NULL) {
+}
+
+void EventDispatcher::run_forever() {
+  if (LOG.isDebugEnabled()) {
+    LOG.debugStream() << "Starting event dispatcher";
+  }
+
+  while (true) {
+    try {
+      service.run();
+      break;
+    } catch (std::exception &e) {
+      LOG.errorStream() << "Exception in dispatch handler. " << e.what();
+    }
+  }
+  if (LOG.isDebugEnabled()) {
+    LOG.debugStream() << "Event dispatcher done";
+  }
+}
+
+void EventDispatcher::start() {
+  if (t) {
+    return;
+  }
+  dummy_work = new boost::asio::io_service::work(service);
+  t = new boost::thread(boost::bind(&EventDispatcher::run_forever, this));
+}
+
+void EventDispatcher::stop() {
+  if (!t) {
+    return;
+  }
+  delete dummy_work;
+  dummy_work = NULL;
+  
+  service.stop();
+  
+  t->join();
+  delete t;
+  t = NULL;
+}
+
+EventDispatcher::~EventDispatcher() {
+  delete dummy_work;
+}
+
+boost::asio::io_service& EventDispatcher::getService() {
+  return service;
+}

Added: hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/lib/eventdispatcher.h
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/lib/eventdispatcher.h?rev=1021463&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/lib/eventdispatcher.h (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/lib/eventdispatcher.h Mon Oct 11 19:00:42 2010
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef EVENTDISPATCHER_H
+#define EVENTDISPATCHER_H
+
+#include <boost/asio.hpp>
+#include <boost/thread.hpp>
+
+namespace Hedwig {
+  class EventDispatcher {
+  public:  
+    EventDispatcher();
+    ~EventDispatcher();
+    
+    void start();
+    void stop();
+    
+    boost::asio::io_service& getService();
+    
+  private:
+    void run_forever();
+
+    boost::asio::io_service service;
+    boost::asio::io_service::work* dummy_work;
+    boost::thread* t;
+  };
+}
+
+#endif

Modified: hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/lib/publisherimpl.cpp
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/lib/publisherimpl.cpp?rev=1021463&r1=1021462&r2=1021463&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/lib/publisherimpl.cpp (original)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/lib/publisherimpl.cpp Mon Oct 11 19:00:42 2010
@@ -25,7 +25,7 @@ static log4cpp::Category &LOG = log4cpp:
 
 using namespace Hedwig;
 
-PublishWriteCallback::PublishWriteCallback(ClientImplPtr& client, const PubSubDataPtr& data) : client(client), data(data) {}
+PublishWriteCallback::PublishWriteCallback(const ClientImplPtr& client, const PubSubDataPtr& data) : client(client), data(data) {}
 
 void PublishWriteCallback::operationComplete() {
   if (LOG.isDebugEnabled()) {
@@ -36,16 +36,16 @@ void PublishWriteCallback::operationComp
 void PublishWriteCallback::operationFailed(const std::exception& exception) {
   LOG.errorStream() << "Error writing to publisher " << exception.what();
   
-  //remove txn from channel pending list
-  #warning "Actually do something here"
+  data->getCallback()->operationFailed(exception);
 }
 
-PublisherImpl::PublisherImpl(ClientImplPtr& client) 
+PublisherImpl::PublisherImpl(const ClientImplPtr& client) 
   : client(client) {
 }
 
 void PublisherImpl::publish(const std::string& topic, const std::string& message) {
-  SyncOperationCallback* cb = new SyncOperationCallback();
+  SyncOperationCallback* cb = new SyncOperationCallback(client->getConfiguration().getInt(Configuration::SYNC_REQUEST_TIMEOUT, 
+											  DEFAULT_SYNC_REQUEST_TIMEOUT));
   OperationCallbackPtr callback(cb);
   asyncPublish(topic, message, callback);
   cb->wait();
@@ -54,11 +54,10 @@ void PublisherImpl::publish(const std::s
 }
 
 void PublisherImpl::asyncPublish(const std::string& topic, const std::string& message, const OperationCallbackPtr& callback) {
-  DuplexChannelPtr channel = client->getChannelForTopic(topic);
-
   // use release after callback to release the channel after the callback is called
   PubSubDataPtr data = PubSubData::forPublishRequest(client->counter().next(), topic, message, callback);
   
+  DuplexChannelPtr channel = client->getChannel(topic);
   doPublish(channel, data);
 }
 
@@ -66,12 +65,11 @@ void PublisherImpl::doPublish(const Dupl
   channel->storeTransaction(data);
   
   OperationCallbackPtr writecb(new PublishWriteCallback(client, data));
-  LOG.debugStream() << "dopublish";
   channel->writeRequest(data->getRequest(), writecb);
 }
 
-void PublisherImpl::messageHandler(const PubSubResponse& m, const PubSubDataPtr& txn) {
-  switch (m.statuscode()) {
+void PublisherImpl::messageHandler(const PubSubResponsePtr& m, const PubSubDataPtr& txn) {
+  switch (m->statuscode()) {
   case SUCCESS:
     txn->getCallback()->operationComplete();
     break;
@@ -80,7 +78,7 @@ void PublisherImpl::messageHandler(const
     txn->getCallback()->operationFailed(ServiceDownException());
     break;
   default:
-    LOG.errorStream() << "Unexpected response " << m.statuscode() << " for " << txn->getTxnId();
+    LOG.errorStream() << "Unexpected response " << m->statuscode() << " for " << txn->getTxnId();
     txn->getCallback()->operationFailed(UnexpectedResponseException());
     break;
   }



Mime
View raw message