zookeeper-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From f..@apache.org
Subject svn commit: r1203380 - in /zookeeper/bookkeeper/trunk: ./ hedwig-client/src/main/java/org/apache/hedwig/client/ hedwig-client/src/main/java/org/apache/hedwig/client/api/ hedwig-client/src/main/java/org/apache/hedwig/client/benchmark/ hedwig-client/src/...
Date Thu, 17 Nov 2011 21:38:51 GMT
Author: fpj
Date: Thu Nov 17 21:38:49 2011
New Revision: 1203380

URL: http://svn.apache.org/viewvc?rev=1203380&view=rev
Log:
BOOKKEEPER-90: Hedwig API changes for initial Bookkeeper release (ivank via fpj)



Added:
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/HedwigClient.java
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/api/Client.java
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigClientImpl.java
Removed:
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigClient.java
Modified:
    zookeeper/bookkeeper/trunk/CHANGES.txt
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/api/MessageHandler.java
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/benchmark/BenchmarkPublisher.java
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/benchmark/BenchmarkSubscriber.java
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/benchmark/HedwigBenchmark.java
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/MessageConsumeCallback.java
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/PublishResponseHandler.java
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/SubscribeReconnectCallback.java
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/SubscribeResponseHandler.java
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/UnsubscribeResponseHandler.java
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/ClientChannelPipelineFactory.java
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/ConnectCallback.java
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigPublisher.java
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigSubscriber.java
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/ResponseHandler.java
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/WriteCallback.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/HedwigProxy.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/ProxyStartDeliveryHandler.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/regions/HedwigHubClient.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/regions/HedwigHubSubscriber.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/regions/RegionManager.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/client/TestPubSubClient.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/HedwigRegionTestBase.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/integration/TestHedwigHub.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/integration/TestHedwigRegion.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/netty/TestPubSubServer.java
    zookeeper/bookkeeper/trunk/pom.xml

Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1203380&r1=1203379&r2=1203380&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Thu Nov 17 21:38:49 2011
@@ -115,3 +115,5 @@ IMPROVEMENTS:
   BOOKKEEPER-44: Reuse publish channel to default server to avoid too many connect requests to default server when lots of producers came in same time (Sijie Guo via breed)
 
   BOOKKEEPER-109: Add documentation to describe how bookies flushes data (Sijie Guo via fpj)
+
+  BOOKKEEPER-90: Hedwig API changes for initial Bookkeeper release (ivank via fpj)

Added: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/HedwigClient.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/HedwigClient.java?rev=1203380&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/HedwigClient.java (added)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/HedwigClient.java Thu Nov 17 21:38:49 2011
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.client;
+
+import org.apache.hedwig.client.api.Client;
+import org.apache.hedwig.client.api.Publisher;
+import org.apache.hedwig.client.api.Subscriber;
+import org.apache.hedwig.client.netty.HedwigClientImpl;
+
+import org.apache.hedwig.client.conf.ClientConfiguration;
+import org.jboss.netty.channel.ChannelFactory;
+
+/**
+ * Hedwig client uses as starting point for all communications with the Hedwig service.
+ * 
+ * @see Publisher
+ * @see Subscriber
+ */
+public class HedwigClient implements Client {
+    private final Client impl;
+
+    /**
+     * Construct a hedwig client object. The configuration object
+     * should be an instance of a class which implements ClientConfiguration.
+     *
+     * @param cfg The client configuration.
+     */
+    public HedwigClient(ClientConfiguration cfg) {
+        impl = HedwigClientImpl.create(cfg);
+    }
+
+    /**
+     * Construct a hedwig client object, using a preexisting socket factory.
+     * This is useful if you need to create many hedwig client instances.
+     *
+     * @param cfg The client configuration
+     * @param socketFactory A netty socket factory.
+     */
+    public HedwigClient(ClientConfiguration cfg, ChannelFactory socketFactory) {
+        impl = HedwigClientImpl.create(cfg, socketFactory);
+    }
+
+    @Override
+    public Publisher getPublisher() {
+        return impl.getPublisher();
+    }
+
+    @Override
+    public Subscriber getSubscriber() {
+        return impl.getSubscriber();
+    }
+
+    @Override
+    public void close() {
+        impl.close();
+    }
+}
\ No newline at end of file

Added: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/api/Client.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/api/Client.java?rev=1203380&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/api/Client.java (added)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/api/Client.java Thu Nov 17 21:38:49 2011
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.client.api;
+
+/**
+ * Interface defining the client API for Hedwig
+ */
+public interface Client {
+    /**
+     * Retrieve the Publisher object for the client.
+     * This object can be used to publish messages to a topic on Hedwig.
+     * @see Publisher
+     */
+    public Publisher getPublisher();
+    
+    /**
+     * Retrieve the Subscriber object for the client.
+     * This object can be used to subscribe for messages from a topic.
+     * @see Subscriber
+     */
+    public Subscriber getSubscriber();
+
+    /**
+     * Close the client and free all associated resources.
+     */
+    public void close();
+}
\ No newline at end of file

Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/api/MessageHandler.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/api/MessageHandler.java?rev=1203380&r1=1203379&r2=1203380&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/api/MessageHandler.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/api/MessageHandler.java Thu Nov 17 21:38:49 2011
@@ -22,27 +22,27 @@ import org.apache.hedwig.protocol.PubSub
 import org.apache.hedwig.util.Callback;
 
 /**
- * Interface to define the client handler logic to consume messages it is
+ * Interface to define the client handler logic to deliver messages it is
  * subscribed to.
  *
  */
 public interface MessageHandler {
 
     /**
-     * Consumes a message it is subscribed to and has been delivered to it.
+     * Delivers a message which has been published for topic. 
      *
      * @param topic
      *            The topic name where the message came from.
      * @param subscriberId
      *            ID of the subscriber.
      * @param msg
-     *            The message object to consume.
+     *            The message object to deliver.
      * @param callback
-     *            Callback to invoke when the message consumption has been done.
+     *            Callback to invoke when the message delivery has been done.
      * @param context
      *            Calling context that the Callback needs since this is done
      *            asynchronously.
      */
-    public void consume(ByteString topic, ByteString subscriberId, Message msg, Callback<Void> callback, Object context);
+    public void deliver(ByteString topic, ByteString subscriberId, Message msg, Callback<Void> callback, Object context);
 
 }

Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/benchmark/BenchmarkPublisher.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/benchmark/BenchmarkPublisher.java?rev=1203380&r1=1203379&r2=1203380&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/benchmark/BenchmarkPublisher.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/benchmark/BenchmarkPublisher.java Thu Nov 17 21:38:49 2011
@@ -52,7 +52,7 @@ public class BenchmarkPublisher extends 
 
         subscriber.startDelivery(topic, subId, new MessageHandler() {
             @Override
-            public void consume(ByteString topic, ByteString subscriberId, Message msg, Callback<Void> callback,
+            public void deliver(ByteString topic, ByteString subscriberId, Message msg, Callback<Void> callback,
             Object context) {
                 // noop
                 callback.operationFinished(context, null);

Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/benchmark/BenchmarkSubscriber.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/benchmark/BenchmarkSubscriber.java?rev=1203380&r1=1203379&r2=1203380&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/benchmark/BenchmarkSubscriber.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/benchmark/BenchmarkSubscriber.java Thu Nov 17 21:38:49 2011
@@ -73,7 +73,7 @@ public class BenchmarkSubscriber extends
             subscriber.startDelivery(ByteString.copyFromUtf8(topic), subId, new MessageHandler() {
 
                 @Override
-                public void consume(ByteString thisTopic, ByteString subscriberId, Message msg,
+                public void deliver(ByteString thisTopic, ByteString subscriberId, Message msg,
                 Callback<Void> callback, Object context) {
                     if (logger.isDebugEnabled())
                         logger.debug("Got message from src-region: " + msg.getSrcRegion() + " with seq-id: "

Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/benchmark/HedwigBenchmark.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/benchmark/HedwigBenchmark.java?rev=1203380&r1=1203379&r2=1203380&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/benchmark/HedwigBenchmark.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/benchmark/HedwigBenchmark.java Thu Nov 17 21:38:49 2011
@@ -28,9 +28,9 @@ import org.jboss.netty.logging.Log4JLogg
 
 import com.google.protobuf.ByteString;
 import org.apache.hedwig.client.conf.ClientConfiguration;
-import org.apache.hedwig.client.netty.HedwigClient;
-import org.apache.hedwig.client.netty.HedwigPublisher;
-import org.apache.hedwig.client.netty.HedwigSubscriber;
+import org.apache.hedwig.client.HedwigClient;
+import org.apache.hedwig.client.api.Publisher;
+import org.apache.hedwig.client.api.Subscriber;
 
 public class HedwigBenchmark implements Callable<Void> {
     protected static final Logger logger = LoggerFactory.getLogger(HedwigBenchmark.class);
@@ -38,8 +38,8 @@ public class HedwigBenchmark implements 
     static final String TOPIC_PREFIX = "topic";
 
     private final HedwigClient client;
-    private final HedwigPublisher publisher;
-    private final HedwigSubscriber subscriber;
+    private final Publisher publisher;
+    private final Subscriber subscriber;
 
     public HedwigBenchmark(ClientConfiguration cfg) {
         client = new HedwigClient(cfg);

Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/MessageConsumeCallback.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/MessageConsumeCallback.java?rev=1203380&r1=1203379&r2=1203380&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/MessageConsumeCallback.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/MessageConsumeCallback.java Thu Nov 17 21:38:49 2011
@@ -25,7 +25,7 @@ import org.jboss.netty.channel.Channel;
 
 import org.apache.hedwig.client.data.MessageConsumeData;
 import org.apache.hedwig.client.data.TopicSubscriber;
-import org.apache.hedwig.client.netty.HedwigClient;
+import org.apache.hedwig.client.netty.HedwigClientImpl;
 import org.apache.hedwig.exceptions.PubSubException;
 import org.apache.hedwig.util.Callback;
 
@@ -42,9 +42,9 @@ public class MessageConsumeCallback impl
 
     private static Logger logger = LoggerFactory.getLogger(MessageConsumeCallback.class);
 
-    private final HedwigClient client;
+    private final HedwigClientImpl client;
 
-    public MessageConsumeCallback(HedwigClient client) {
+    public MessageConsumeCallback(HedwigClientImpl client) {
         this.client = client;
     }
 
@@ -61,7 +61,7 @@ public class MessageConsumeCallback impl
         public void run() {
             // Try to consume the message again
             Channel topicSubscriberChannel = client.getSubscriber().getChannelForTopic(topicSubscriber);
-            HedwigClient.getResponseHandlerFromChannel(topicSubscriberChannel).getSubscribeResponseHandler()
+            HedwigClientImpl.getResponseHandlerFromChannel(topicSubscriberChannel).getSubscribeResponseHandler()
             .asyncMessageConsume(messageConsumeData.msg);
         }
     }
@@ -72,7 +72,7 @@ public class MessageConsumeCallback impl
         // Message has been successfully consumed by the client app so callback
         // to the ResponseHandler indicating that the message is consumed.
         Channel topicSubscriberChannel = client.getSubscriber().getChannelForTopic(topicSubscriber);
-        HedwigClient.getResponseHandlerFromChannel(topicSubscriberChannel).getSubscribeResponseHandler()
+        HedwigClientImpl.getResponseHandlerFromChannel(topicSubscriberChannel).getSubscribeResponseHandler()
         .messageConsumed(messageConsumeData.msg);
     }
 

Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/PublishResponseHandler.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/PublishResponseHandler.java?rev=1203380&r1=1203379&r2=1203380&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/PublishResponseHandler.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/PublishResponseHandler.java Thu Nov 17 21:38:49 2011
@@ -22,7 +22,7 @@ import org.slf4j.LoggerFactory;
 import org.jboss.netty.channel.Channel;
 
 import org.apache.hedwig.client.data.PubSubData;
-import org.apache.hedwig.client.netty.HedwigClient;
+import org.apache.hedwig.client.netty.HedwigClientImpl;
 import org.apache.hedwig.client.netty.ResponseHandler;
 import org.apache.hedwig.exceptions.PubSubException.ServiceDownException;
 import org.apache.hedwig.protocol.PubSubProtocol.PubSubResponse;
@@ -41,7 +41,7 @@ public class PublishResponseHandler {
     public void handlePublishResponse(PubSubResponse response, PubSubData pubSubData, Channel channel) throws Exception {
         if (logger.isDebugEnabled())
             logger.debug("Handling a Publish response: " + response + ", pubSubData: " + pubSubData + ", host: "
-                         + HedwigClient.getHostFromChannel(channel));
+                         + HedwigClientImpl.getHostFromChannel(channel));
         switch (response.getStatusCode()) {
         case SUCCESS:
             // Response was success so invoke the callback's operationFinished

Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/SubscribeReconnectCallback.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/SubscribeReconnectCallback.java?rev=1203380&r1=1203379&r2=1203380&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/SubscribeReconnectCallback.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/SubscribeReconnectCallback.java Thu Nov 17 21:38:49 2011
@@ -25,7 +25,7 @@ import org.slf4j.LoggerFactory;
 import org.apache.hedwig.client.api.MessageHandler;
 import org.apache.hedwig.client.conf.ClientConfiguration;
 import org.apache.hedwig.client.data.PubSubData;
-import org.apache.hedwig.client.netty.HedwigClient;
+import org.apache.hedwig.client.netty.HedwigClientImpl;
 import org.apache.hedwig.client.netty.HedwigSubscriber;
 import org.apache.hedwig.exceptions.PubSubException;
 import org.apache.hedwig.exceptions.PubSubException.ClientNotSubscribedException;
@@ -45,13 +45,13 @@ public class SubscribeReconnectCallback 
 
     // Private member variables
     private final PubSubData origSubData;
-    private final HedwigClient client;
+    private final HedwigClientImpl client;
     private final HedwigSubscriber sub;
     private final ClientConfiguration cfg;
     private final MessageHandler messageHandler;
 
     // Constructor
-    public SubscribeReconnectCallback(PubSubData origSubData, HedwigClient client, MessageHandler messageHandler) {
+    public SubscribeReconnectCallback(PubSubData origSubData, HedwigClientImpl client, MessageHandler messageHandler) {
         this.origSubData = origSubData;
         this.client = client;
         this.sub = client.getSubscriber();

Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/SubscribeResponseHandler.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/SubscribeResponseHandler.java?rev=1203380&r1=1203379&r2=1203380&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/SubscribeResponseHandler.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/SubscribeResponseHandler.java Thu Nov 17 21:38:49 2011
@@ -30,7 +30,7 @@ import org.apache.hedwig.client.api.Mess
 import org.apache.hedwig.client.data.MessageConsumeData;
 import org.apache.hedwig.client.data.PubSubData;
 import org.apache.hedwig.client.data.TopicSubscriber;
-import org.apache.hedwig.client.netty.HedwigClient;
+import org.apache.hedwig.client.netty.HedwigClientImpl;
 import org.apache.hedwig.client.netty.ResponseHandler;
 import org.apache.hedwig.exceptions.PubSubException.ClientAlreadySubscribedException;
 import org.apache.hedwig.exceptions.PubSubException.ServiceDownException;
@@ -88,13 +88,13 @@ public class SubscribeResponseHandler {
         // If this was not a successful response to the Subscribe request, we
         // won't be using the Netty Channel created so just close it.
         if (!response.getStatusCode().equals(StatusCode.SUCCESS)) {
-            HedwigClient.getResponseHandlerFromChannel(channel).channelClosedExplicitly = true;
+            HedwigClientImpl.getResponseHandlerFromChannel(channel).channelClosedExplicitly = true;
             channel.close();
         }
 
         if (logger.isDebugEnabled())
             logger.debug("Handling a Subscribe response: " + response + ", pubSubData: " + pubSubData + ", host: "
-                         + HedwigClient.getHostFromChannel(channel));
+                         + HedwigClientImpl.getHostFromChannel(channel));
         switch (response.getStatusCode()) {
         case SUCCESS:
             // For successful Subscribe requests, store this Channel locally
@@ -217,8 +217,8 @@ public class SubscribeResponseHandler {
         }
         MessageConsumeData messageConsumeData = new MessageConsumeData(origSubData.topic, origSubData.subscriberId,
                 message);
-        messageHandler.consume(origSubData.topic, origSubData.subscriberId, message, responseHandler.getClient()
-                               .getConsumeCallback(), messageConsumeData);
+        messageHandler.deliver(origSubData.topic, origSubData.subscriberId, message, responseHandler.getClient()
+                .getConsumeCallback(), messageConsumeData);
     }
 
     /**

Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/UnsubscribeResponseHandler.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/UnsubscribeResponseHandler.java?rev=1203380&r1=1203379&r2=1203380&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/UnsubscribeResponseHandler.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/UnsubscribeResponseHandler.java Thu Nov 17 21:38:49 2011
@@ -22,7 +22,7 @@ import org.slf4j.LoggerFactory;
 import org.jboss.netty.channel.Channel;
 
 import org.apache.hedwig.client.data.PubSubData;
-import org.apache.hedwig.client.netty.HedwigClient;
+import org.apache.hedwig.client.netty.HedwigClientImpl;
 import org.apache.hedwig.client.netty.ResponseHandler;
 import org.apache.hedwig.exceptions.PubSubException.ClientNotSubscribedException;
 import org.apache.hedwig.exceptions.PubSubException.ServiceDownException;
@@ -43,7 +43,7 @@ public class UnsubscribeResponseHandler 
             throws Exception {
         if (logger.isDebugEnabled())
             logger.debug("Handling an Unsubscribe response: " + response + ", pubSubData: " + pubSubData + ", host: "
-                         + HedwigClient.getHostFromChannel(channel));
+                         + HedwigClientImpl.getHostFromChannel(channel));
         switch (response.getStatusCode()) {
         case SUCCESS:
             // For successful Unsubscribe requests, we can now safely close the

Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/ClientChannelPipelineFactory.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/ClientChannelPipelineFactory.java?rev=1203380&r1=1203379&r2=1203380&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/ClientChannelPipelineFactory.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/ClientChannelPipelineFactory.java Thu Nov 17 21:38:49 2011
@@ -30,9 +30,9 @@ import org.apache.hedwig.protocol.PubSub
 
 public class ClientChannelPipelineFactory implements ChannelPipelineFactory {
 
-    private HedwigClient client;
+    private HedwigClientImpl client;
 
-    public ClientChannelPipelineFactory(HedwigClient client) {
+    public ClientChannelPipelineFactory(HedwigClientImpl client) {
         this.client = client;
     }
 

Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/ConnectCallback.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/ConnectCallback.java?rev=1203380&r1=1203379&r2=1203380&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/ConnectCallback.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/ConnectCallback.java Thu Nov 17 21:38:49 2011
@@ -39,13 +39,13 @@ public class ConnectCallback implements 
     // Private member variables
     private PubSubData pubSubData;
     private InetSocketAddress host;
-    private final HedwigClient client;
+    private final HedwigClientImpl client;
     private final HedwigPublisher pub;
     private final HedwigSubscriber sub;
     private final ClientConfiguration cfg;
 
     // Constructor
-    public ConnectCallback(PubSubData pubSubData, InetSocketAddress host, HedwigClient client) {
+    public ConnectCallback(PubSubData pubSubData, InetSocketAddress host, HedwigClientImpl client) {
         super();
         this.pubSubData = pubSubData;
         this.host = host;
@@ -101,14 +101,14 @@ public class ConnectCallback implements 
             // this channel will be closed but we'll always publish on the
             // cached channel in the HedwigPublisher.host2Channel map.
             pub.storeHost2ChannelMapping(future.getChannel());
-            pub.doPublish(pubSubData, pub.host2Channel.get(HedwigClient.getHostFromChannel(future.getChannel())));
+            pub.doPublish(pubSubData, pub.host2Channel.get(HedwigClientImpl.getHostFromChannel(future.getChannel())));
         } else if (pubSubData.operationType.equals(OperationType.UNSUBSCRIBE)) {
             // Unsubscribe Request so store this Channel connection in the
             // HedwigPublisher Map (if it doesn't exist yet) and then do the
             // unsubscribe. Unsubscribe requests will share and reuse
             // the netty Channel connections that Publish requests use.
             pub.storeHost2ChannelMapping(future.getChannel());
-            sub.doSubUnsub(pubSubData, pub.host2Channel.get(HedwigClient.getHostFromChannel(future.getChannel())));
+            sub.doSubUnsub(pubSubData, pub.host2Channel.get(HedwigClientImpl.getHostFromChannel(future.getChannel())));
         } else {
             // Subscribe Request. We do not store the Channel connection yet for
             // Subscribes here. This will be done only when we've found the

Added: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigClientImpl.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigClientImpl.java?rev=1203380&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigClientImpl.java (added)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigClientImpl.java Thu Nov 17 21:38:49 2011
@@ -0,0 +1,369 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.client.netty;
+
+import java.net.InetSocketAddress;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.jboss.netty.bootstrap.ClientBootstrap;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelFactory;
+import org.jboss.netty.channel.ChannelFuture;
+import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
+
+import com.google.protobuf.ByteString;
+import org.apache.hedwig.client.api.Client;
+import org.apache.hedwig.client.conf.ClientConfiguration;
+import org.apache.hedwig.client.data.PubSubData;
+import org.apache.hedwig.client.handlers.MessageConsumeCallback;
+import org.apache.hedwig.client.ssl.SslClientContextFactory;
+import org.apache.hedwig.exceptions.PubSubException.UncertainStateException;
+
+/**
+ * This is a top level Hedwig Client class that encapsulates the common
+ * functionality needed for both Publish and Subscribe operations.
+ *
+ */
+public class HedwigClientImpl implements Client {
+
+    private static final Logger logger = LoggerFactory.getLogger(HedwigClientImpl.class);
+
+    // Global counter used for generating unique transaction ID's for
+    // publish and subscribe requests
+    protected final AtomicLong globalCounter = new AtomicLong();
+    // Static String constants
+    protected static final String COLON = ":";
+
+    // The Netty socket factory for making connections to the server.
+    protected final ChannelFactory socketFactory;
+    // Whether the socket factory is one we created or is owned by whoever
+    // instantiated us.
+    protected boolean ownChannelFactory = false;
+
+    // PipelineFactory to create netty client channels to the appropriate server
+    private ClientChannelPipelineFactory pipelineFactory;
+
+    // Concurrent Map to store the mapping from the Topic to the Host.
+    // This could change over time since servers can drop mastership of topics
+    // for load balancing or failover. If a server host ever goes down, we'd
+    // also want to remove all topic mappings the host was responsible for.
+    // The second Map is used as the inverted version of the first one.
+    protected final ConcurrentMap<ByteString, InetSocketAddress> topic2Host = new ConcurrentHashMap<ByteString, InetSocketAddress>();
+    private final ConcurrentMap<InetSocketAddress, List<ByteString>> host2Topics = new ConcurrentHashMap<InetSocketAddress, List<ByteString>>();
+
+    // Each client instantiation will have a Timer for running recurring
+    // threads. One such timer task thread to is to timeout long running
+    // PubSubRequests that are waiting for an ack response from the server.
+    private final Timer clientTimer = new Timer(true);
+
+    // Boolean indicating if the client is running or has stopped.
+    // Once we stop the client, we should sidestep all of the connect,
+    // write callback and channel disconnected logic.
+    private boolean isStopped = false;
+
+    private HedwigSubscriber sub;
+    private final HedwigPublisher pub;
+    private final ClientConfiguration cfg;
+    private final MessageConsumeCallback consumeCb;
+    private SslClientContextFactory sslFactory = null;
+
+    public static Client create(ClientConfiguration cfg) {
+        return new HedwigClientImpl(cfg);
+    }
+
+    public static Client create(ClientConfiguration cfg, ChannelFactory socketFactory) {
+        return new HedwigClientImpl(cfg, socketFactory);
+    }
+
+    // Base constructor that takes in a Configuration object.
+    // This will create its own client socket channel factory.
+    protected HedwigClientImpl(ClientConfiguration cfg) {
+        this(cfg, new NioClientSocketChannelFactory(Executors.newCachedThreadPool(), Executors.newCachedThreadPool()));
+        ownChannelFactory = true;
+    }
+
+    // Constructor that takes in a Configuration object and a ChannelFactory
+    // that has already been instantiated by the caller.
+    protected HedwigClientImpl(ClientConfiguration cfg, ChannelFactory socketFactory) {
+        this.cfg = cfg;
+        this.socketFactory = socketFactory;
+        pub = new HedwigPublisher(this);
+        sub = new HedwigSubscriber(this);
+        pipelineFactory = new ClientChannelPipelineFactory(this);
+        consumeCb = new MessageConsumeCallback(this);
+        if (cfg.isSSLEnabled()) {
+            sslFactory = new SslClientContextFactory(cfg);
+        }
+        // Schedule all of the client timer tasks. Currently we only have the
+        // Request Timeout task.
+        clientTimer.schedule(new PubSubRequestTimeoutTask(), 0, cfg.getTimeoutThreadRunInterval());
+    }
+
+    // Public getters for the various components of a client.
+    public ClientConfiguration getConfiguration() {
+        return cfg;
+    }
+
+    public HedwigSubscriber getSubscriber() {
+        return sub;
+    }
+
+    // Protected method to set the subscriber. This is needed currently for hub
+    // versions of the client subscriber.
+    protected void setSubscriber(HedwigSubscriber sub) {
+        this.sub = sub;
+    }
+
+    public HedwigPublisher getPublisher() {
+        return pub;
+    }
+
+    public MessageConsumeCallback getConsumeCallback() {
+        return consumeCb;
+    }
+
+    public SslClientContextFactory getSslFactory() {
+        return sslFactory;
+    }
+
+    // We need to deal with the possible problem of a PubSub request being
+    // written to successfully to the server host but for some reason, the
+    // ack message back never comes. What could happen is that the VoidCallback
+    // stored in the ResponseHandler.txn2PublishData map will never be called.
+    // We should have a configured timeout so if that passes from the time a
+    // write was successfully done to the server, we can fail this async PubSub
+    // transaction. The caller could possibly redo the transaction if needed at
+    // a later time. Creating a timeout cleaner TimerTask to do this here.
+    class PubSubRequestTimeoutTask extends TimerTask {
+        /**
+         * Implement the TimerTask's abstract run method.
+         */
+        @Override
+        public void run() {
+            if (logger.isDebugEnabled())
+                logger.debug("Running the PubSubRequest Timeout Task");
+            // Loop through all outstanding PubSubData requests and check if
+            // the requestWriteTime has timed out compared to the current time.
+            long curTime = System.currentTimeMillis();
+            long timeoutInterval = cfg.getServerAckResponseTimeout();
+
+            // First check the ResponseHandlers associated with cached
+            // channels in HedwigPublisher.host2Channel. This stores the
+            // channels used for Publish and Unsubscribe requests.
+            for (Channel channel : pub.host2Channel.values()) {
+                ResponseHandler responseHandler = getResponseHandlerFromChannel(channel);
+                for (PubSubData pubSubData : responseHandler.txn2PubSubData.values()) {
+                    checkPubSubDataToTimeOut(pubSubData, responseHandler, curTime, timeoutInterval);
+                }
+            }
+            // Now do the same for the cached channels in
+            // HedwigSubscriber.topicSubscriber2Channel. This stores the
+            // channels used exclusively for Subscribe requests.
+            for (Channel channel : sub.topicSubscriber2Channel.values()) {
+                ResponseHandler responseHandler = getResponseHandlerFromChannel(channel);
+                for (PubSubData pubSubData : responseHandler.txn2PubSubData.values()) {
+                    checkPubSubDataToTimeOut(pubSubData, responseHandler, curTime, timeoutInterval);
+                }
+            }
+        }
+
+        private void checkPubSubDataToTimeOut(PubSubData pubSubData, ResponseHandler responseHandler, long curTime,
+                                              long timeoutInterval) {
+            if (curTime > pubSubData.requestWriteTime + timeoutInterval) {
+                // Current PubSubRequest has timed out so remove it from the
+                // ResponseHandler's map and invoke the VoidCallback's
+                // operationFailed method.
+                logger.error("Current PubSubRequest has timed out for pubSubData: " + pubSubData);
+                responseHandler.txn2PubSubData.remove(pubSubData.txnId);
+                pubSubData.callback.operationFailed(pubSubData.context, new UncertainStateException(
+                                                        "Server ack response never received so PubSubRequest has timed out!"));
+            }
+        }
+    }
+
+    // When we are done with the client, this is a clean way to gracefully close
+    // all channels/sockets created by the client and to also release all
+    // resources used by netty.
+    public void close() {
+        logger.info("Stopping the client!");
+        // Set the client boolean flag to indicate the client has stopped.
+        isStopped = true;
+        // Stop the timer and all timer task threads.
+        clientTimer.cancel();
+        // Close all of the open Channels.
+        for (Channel channel : pub.host2Channel.values()) {
+            getResponseHandlerFromChannel(channel).channelClosedExplicitly = true;
+            channel.close().awaitUninterruptibly();
+        }
+        for (Channel channel : sub.topicSubscriber2Channel.values()) {
+            getResponseHandlerFromChannel(channel).channelClosedExplicitly = true;
+            channel.close().awaitUninterruptibly();
+        }
+        // Clear out all Maps.
+        topic2Host.clear();
+        host2Topics.clear();
+        pub.host2Channel.clear();
+        sub.topicSubscriber2Channel.clear();
+        // Release resources used by the ChannelFactory on the client if we are
+        // the owner that created it.
+        if (ownChannelFactory) {
+            socketFactory.releaseExternalResources();
+        }
+        logger.info("Completed stopping the client!");
+    }
+
+    /**
+     * This is a helper method to do the connect attempt to the server given the
+     * inputted host/port. This can be used to connect to the default server
+     * host/port which is the VIP. That will pick a server in the cluster at
+     * random to connect to for the initial PubSub attempt (with redirect logic
+     * being done at the server side). Additionally, this could be called after
+     * the client makes an initial PubSub attempt at a server, and is redirected
+     * to the one that is responsible for the topic. Once the connect to the
+     * server is done, we will perform the corresponding PubSub write on that
+     * channel.
+     *
+     * @param pubSubData
+     *            PubSub call's data wrapper object
+     * @param serverHost
+     *            Input server host to connect to of type InetSocketAddress
+     */
+    public void doConnect(PubSubData pubSubData, InetSocketAddress serverHost) {
+        if (logger.isDebugEnabled())
+            logger.debug("Connecting to host: " + serverHost + " with pubSubData: " + pubSubData);
+        // Set up the ClientBootStrap so we can create a new Channel connection
+        // to the server.
+        ClientBootstrap bootstrap = new ClientBootstrap(socketFactory);
+        bootstrap.setPipelineFactory(pipelineFactory);
+        bootstrap.setOption("tcpNoDelay", true);
+        bootstrap.setOption("keepAlive", true);
+
+        // Start the connection attempt to the input server host.
+        ChannelFuture future = bootstrap.connect(serverHost);
+        future.addListener(new ConnectCallback(pubSubData, serverHost, this));
+    }
+
+    /**
+     * Helper method to store the topic2Host mapping in the HedwigClient cache
+     * map. This method is assumed to be called when we've done a successful
+     * connection to the correct server topic master.
+     *
+     * @param pubSubData
+     *            PubSub wrapper data
+     * @param channel
+     *            Netty Channel
+     */
+    protected void storeTopic2HostMapping(PubSubData pubSubData, Channel channel) {
+        // Retrieve the server host that we've connected to and store the
+        // mapping from the topic to this host. For all other non-redirected
+        // server statuses, we consider that as a successful connection to the
+        // correct topic master.
+        InetSocketAddress host = getHostFromChannel(channel);
+        if (topic2Host.containsKey(pubSubData.topic) && topic2Host.get(pubSubData.topic).equals(host)) {
+            // Entry in map exists for the topic but it is the same as the
+            // current host. In this case there is nothing to do.
+            return;
+        }
+
+        // Store the relevant mappings for this topic and host combination.
+        if (logger.isDebugEnabled())
+            logger.debug("Storing info for topic: " + pubSubData.topic.toStringUtf8() + ", old host: "
+                         + topic2Host.get(pubSubData.topic) + ", new host: " + host);
+        topic2Host.put(pubSubData.topic, host);
+        if (host2Topics.containsKey(host)) {
+            host2Topics.get(host).add(pubSubData.topic);
+        } else {
+            LinkedList<ByteString> topicsList = new LinkedList<ByteString>();
+            topicsList.add(pubSubData.topic);
+            host2Topics.put(host, topicsList);
+        }
+    }
+
+    /**
+     * Helper static method to get the String Hostname:Port from a netty
+     * Channel. Assumption is that the netty Channel was originally created with
+     * an InetSocketAddress. This is true with the Hedwig netty implementation.
+     *
+     * @param channel
+     *            Netty channel to extract the hostname and port from.
+     * @return String representation of the Hostname:Port from the Netty Channel
+     */
+    public static InetSocketAddress getHostFromChannel(Channel channel) {
+        return (InetSocketAddress) channel.getRemoteAddress();
+    }
+
+    /**
+     * Helper static method to get the ResponseHandler instance from a Channel
+     * via the ChannelPipeline it is associated with. The assumption is that the
+     * last ChannelHandler tied to the ChannelPipeline is the ResponseHandler.
+     *
+     * @param channel
+     *            Channel we are retrieving the ResponseHandler instance for
+     * @return ResponseHandler Instance tied to the Channel's Pipeline
+     */
+    public static ResponseHandler getResponseHandlerFromChannel(Channel channel) {
+        return (ResponseHandler) channel.getPipeline().getLast();
+    }
+
+    // Public getter for entries in the topic2Host Map.
+    public InetSocketAddress getHostForTopic(ByteString topic) {
+        return topic2Host.get(topic);
+    }
+
+    // If a server host goes down or the channel to it gets disconnected,
+    // we want to clear out all relevant cached information. We'll
+    // need to remove all of the topic mappings that the host was
+    // responsible for.
+    public void clearAllTopicsForHost(InetSocketAddress host) {
+        if (logger.isDebugEnabled())
+            logger.debug("Clearing all topics for host: " + host);
+        // For each of the topics that the host was responsible for,
+        // remove it from the topic2Host mapping.
+        if (host2Topics.containsKey(host)) {
+            for (ByteString topic : host2Topics.get(host)) {
+                if (logger.isDebugEnabled())
+                    logger.debug("Removing mapping for topic: " + topic.toStringUtf8() + " from host: " + host);
+                topic2Host.remove(topic);
+            }
+            // Now it is safe to remove the host2Topics mapping entry.
+            host2Topics.remove(host);
+        }
+    }
+
+    // Public getter to see if the client has been stopped.
+    public boolean hasStopped() {
+        return isStopped;
+    }
+
+    // Public getter to get the client's Timer object.
+    // This is so we can reuse this and not have to create multiple Timer
+    // objects.
+    public Timer getClientTimer() {
+        return clientTimer;
+    }
+
+}

Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigPublisher.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigPublisher.java?rev=1203380&r1=1203379&r2=1203380&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigPublisher.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigPublisher.java Thu Nov 17 21:38:49 2011
@@ -56,10 +56,10 @@ public class HedwigPublisher implements 
     // unsubscribe requests.
     protected final ConcurrentMap<InetSocketAddress, Channel> host2Channel = new ConcurrentHashMap<InetSocketAddress, Channel>();
 
-    private final HedwigClient client;
+    private final HedwigClientImpl client;
     private final ClientConfiguration cfg;
 
-    protected HedwigPublisher(HedwigClient client) {
+    protected HedwigPublisher(HedwigClientImpl client) {
         this.client = client;
         this.cfg = client.getConfiguration();
     }
@@ -179,11 +179,11 @@ public class HedwigPublisher implements 
         // Before we do the write, store this information into the
         // ResponseHandler so when the server responds, we know what
         // appropriate Callback Data to invoke for the given txn ID.
-        HedwigClient.getResponseHandlerFromChannel(channel).txn2PubSubData.put(txnId, pubSubData);
+        HedwigClientImpl.getResponseHandlerFromChannel(channel).txn2PubSubData.put(txnId, pubSubData);
 
         // Finally, write the Publish request through the Channel.
         if (logger.isDebugEnabled())
-            logger.debug("Writing a Publish request to host: " + HedwigClient.getHostFromChannel(channel)
+            logger.debug("Writing a Publish request to host: " + HedwigClientImpl.getHostFromChannel(channel)
                          + " for pubSubData: " + pubSubData);
         ChannelFuture future = channel.write(pubsubRequestBuilder.build());
         future.addListener(new WriteCallback(pubSubData, client));
@@ -193,7 +193,7 @@ public class HedwigPublisher implements 
     // exist yet). Retrieve the hostname info from the Channel created via the
     // RemoteAddress tied to it.
     protected synchronized void storeHost2ChannelMapping(Channel channel) {
-        InetSocketAddress host = HedwigClient.getHostFromChannel(channel);
+        InetSocketAddress host = HedwigClientImpl.getHostFromChannel(channel);
         if (!host2Channel.containsKey(host)) {
             if (logger.isDebugEnabled())
                 logger.debug("Storing a new Channel mapping for host: " + host);
@@ -216,7 +216,7 @@ public class HedwigPublisher implements 
             // topic. Close these redundant channels as they won't be used.
             if (logger.isDebugEnabled())
                 logger.debug("Channel mapping to host: " + host + " already exists so no need to store it.");
-            HedwigClient.getResponseHandlerFromChannel(channel).channelClosedExplicitly = true;
+            HedwigClientImpl.getResponseHandlerFromChannel(channel).channelClosedExplicitly = true;
             channel.close();
         }
     }

Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigSubscriber.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigSubscriber.java?rev=1203380&r1=1203379&r2=1203380&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigSubscriber.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigSubscriber.java Thu Nov 17 21:38:49 2011
@@ -67,10 +67,10 @@ public class HedwigSubscriber implements
     // Channel Pipeline.
     protected final ConcurrentMap<TopicSubscriber, Channel> topicSubscriber2Channel = new ConcurrentHashMap<TopicSubscriber, Channel>();
 
-    protected final HedwigClient client;
+    protected final HedwigClientImpl client;
     protected final ClientConfiguration cfg;
 
-    public HedwigSubscriber(HedwigClient client) {
+    public HedwigSubscriber(HedwigClientImpl client) {
         this.client = client;
         this.cfg = client.getConfiguration();
     }
@@ -356,11 +356,11 @@ public class HedwigSubscriber implements
         // Before we do the write, store this information into the
         // ResponseHandler so when the server responds, we know what
         // appropriate Callback Data to invoke for the given txn ID.
-        HedwigClient.getResponseHandlerFromChannel(channel).txn2PubSubData.put(txnId, pubSubData);
+        HedwigClientImpl.getResponseHandlerFromChannel(channel).txn2PubSubData.put(txnId, pubSubData);
 
         // Finally, write the Subscribe request through the Channel.
         if (logger.isDebugEnabled())
-            logger.debug("Writing a SubUnsub request to host: " + HedwigClient.getHostFromChannel(channel)
+            logger.debug("Writing a SubUnsub request to host: " + HedwigClientImpl.getHostFromChannel(channel)
                          + " for pubSubData: " + pubSubData);
         ChannelFuture future = channel.write(pubsubRequestBuilder.build());
         future.addListener(new WriteCallback(pubSubData, client));
@@ -405,14 +405,14 @@ public class HedwigSubscriber implements
         // action. Instead, just have a future listener that will log an error
         // message if there was a problem writing the consume request.
         if (logger.isDebugEnabled())
-            logger.debug("Writing a Consume request to host: " + HedwigClient.getHostFromChannel(channel)
+            logger.debug("Writing a Consume request to host: " + HedwigClientImpl.getHostFromChannel(channel)
                          + " with messageSeqId: " + messageSeqId + " for pubSubData: " + pubSubData);
         ChannelFuture future = channel.write(pubsubRequestBuilder.build());
         future.addListener(new ChannelFutureListener() {
             @Override
             public void operationComplete(ChannelFuture future) throws Exception {
                 if (!future.isSuccess()) {
-                    logger.error("Error writing a Consume request to host: " + HedwigClient.getHostFromChannel(channel)
+                    logger.error("Error writing a Consume request to host: " + HedwigClientImpl.getHostFromChannel(channel)
                                  + " with messageSeqId: " + messageSeqId + " for pubSubData: " + pubSubData);
                 }
             }
@@ -460,7 +460,7 @@ public class HedwigSubscriber implements
         // Register the MessageHandler with the subscribe Channel's
         // Response Handler.
         Channel topicSubscriberChannel = topicSubscriber2Channel.get(topicSubscriber);
-        HedwigClient.getResponseHandlerFromChannel(topicSubscriberChannel).getSubscribeResponseHandler()
+        HedwigClientImpl.getResponseHandlerFromChannel(topicSubscriberChannel).getSubscribeResponseHandler()
         .setMessageHandler(messageHandler);
         // Now make the TopicSubscriber Channel readable (it is set to not be
         // readable when the initial subscription is done). Note that this is an
@@ -497,7 +497,7 @@ public class HedwigSubscriber implements
         // Unregister the MessageHandler for the subscribe Channel's
         // Response Handler.
         Channel topicSubscriberChannel = topicSubscriber2Channel.get(topicSubscriber);
-        HedwigClient.getResponseHandlerFromChannel(topicSubscriberChannel).getSubscribeResponseHandler()
+        HedwigClientImpl.getResponseHandlerFromChannel(topicSubscriberChannel).getSubscribeResponseHandler()
         .setMessageHandler(null);
         // Now make the TopicSubscriber channel not-readable. This will buffer
         // up messages if any are sent from the server. Note that this is an
@@ -545,7 +545,7 @@ public class HedwigSubscriber implements
             Channel channel = topicSubscriber2Channel.get(topicSubscriber);
             topicSubscriber2Channel.remove(topicSubscriber);
             // Close the subscribe channel asynchronously.
-            HedwigClient.getResponseHandlerFromChannel(channel).channelClosedExplicitly = true;
+            HedwigClientImpl.getResponseHandlerFromChannel(channel).channelClosedExplicitly = true;
             ChannelFuture future = channel.close();
             future.addListener(new ChannelFutureListener() {
                 @Override

Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/ResponseHandler.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/ResponseHandler.java?rev=1203380&r1=1203379&r2=1203380&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/ResponseHandler.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/ResponseHandler.java Thu Nov 17 21:38:49 2011
@@ -66,7 +66,7 @@ public class ResponseHandler extends Sim
     // channel disconnected logic here.
     public boolean channelClosedExplicitly = false;
 
-    private final HedwigClient client;
+    private final HedwigClientImpl client;
     private final HedwigPublisher pub;
     private final HedwigSubscriber sub;
     private final ClientConfiguration cfg;
@@ -75,7 +75,7 @@ public class ResponseHandler extends Sim
     private final SubscribeResponseHandler subHandler;
     private final UnsubscribeResponseHandler unsubHandler;
 
-    public ResponseHandler(HedwigClient client) {
+    public ResponseHandler(HedwigClientImpl client) {
         this.client = client;
         this.sub = client.getSubscriber();
         this.pub = client.getPublisher();
@@ -86,7 +86,7 @@ public class ResponseHandler extends Sim
     }
 
     // Public getters needed for the private members
-    public HedwigClient getClient() {
+    public HedwigClientImpl getClient() {
         return client;
     }
 
@@ -113,7 +113,7 @@ public class ResponseHandler extends Sim
         // server.
         PubSubResponse response = (PubSubResponse) e.getMessage();
         if (logger.isDebugEnabled())
-            logger.debug("Response received from host: " + HedwigClient.getHostFromChannel(ctx.getChannel())
+            logger.debug("Response received from host: " + HedwigClientImpl.getHostFromChannel(ctx.getChannel())
                          + ", response: " + response);
 
         // Determine if this PubSubResponse is an ack response for a PubSub
@@ -185,7 +185,7 @@ public class ResponseHandler extends Sim
     public void handleRedirectResponse(PubSubResponse response, PubSubData pubSubData, Channel channel)
             throws Exception {
         if (logger.isDebugEnabled())
-            logger.debug("Handling a redirect from host: " + HedwigClient.getHostFromChannel(channel) + ", response: "
+            logger.debug("Handling a redirect from host: " + HedwigClientImpl.getHostFromChannel(channel) + ", response: "
                          + response + ", pubSubData: " + pubSubData);
         // In this case, the PubSub request was done to a server that is not
         // responsible for the topic. First make sure that we haven't
@@ -206,7 +206,7 @@ public class ResponseHandler extends Sim
         // We will redirect and try to connect to the correct server
         // stored in the StatusMsg of the response. First store the
         // server that we sent the PubSub request to for the topic.
-        ByteString triedServer = ByteString.copyFromUtf8(HedwigSocketAddress.sockAddrStr(HedwigClient
+        ByteString triedServer = ByteString.copyFromUtf8(HedwigSocketAddress.sockAddrStr(HedwigClientImpl
                                  .getHostFromChannel(channel)));
         if (pubSubData.triedServers == null)
             pubSubData.triedServers = new LinkedList<ByteString>();
@@ -277,7 +277,7 @@ public class ResponseHandler extends Sim
         // Make sure the host retrieved is not null as there could be some weird
         // channel disconnect events happening during a client shutdown.
         // If it is, just return as there shouldn't be anything we need to do.
-        InetSocketAddress host = HedwigClient.getHostFromChannel(ctx.getChannel());
+        InetSocketAddress host = HedwigClientImpl.getHostFromChannel(ctx.getChannel());
         logger.warn("Channel was disconnected to host: " + host);
         if (host == null)
             return;

Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/WriteCallback.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/WriteCallback.java?rev=1203380&r1=1203379&r2=1203380&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/WriteCallback.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/WriteCallback.java Thu Nov 17 21:38:49 2011
@@ -37,11 +37,11 @@ public class WriteCallback implements Ch
 
     // Private member variables
     private PubSubData pubSubData;
-    private final HedwigClient client;
+    private final HedwigClientImpl client;
     private final ClientConfiguration cfg;
 
     // Constructor
-    public WriteCallback(PubSubData pubSubData, HedwigClient client) {
+    public WriteCallback(PubSubData pubSubData, HedwigClientImpl client) {
         super();
         this.pubSubData = pubSubData;
         this.client = client;
@@ -56,14 +56,14 @@ public class WriteCallback implements Ch
 
         // When the write operation to the server is done, we just need to check
         // if it was successful or not.
-        InetSocketAddress host = HedwigClient.getHostFromChannel(future.getChannel());
+        InetSocketAddress host = HedwigClientImpl.getHostFromChannel(future.getChannel());
         if (!future.isSuccess()) {
             logger.error("Error writing on channel to host: " + host);
             // On a write failure for a PubSubRequest, we also want to remove
             // the saved txnId to PubSubData in the ResponseHandler. These
             // requests will not receive an ack response from the server
             // so there is no point storing that information there anymore.
-            HedwigClient.getResponseHandlerFromChannel(future.getChannel()).txn2PubSubData.remove(pubSubData.txnId);
+            HedwigClientImpl.getResponseHandlerFromChannel(future.getChannel()).txn2PubSubData.remove(pubSubData.txnId);
 
             // If we were not able to write on the channel to the server host,
             // the host could have died or something is wrong with the channel

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/HedwigProxy.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/HedwigProxy.java?rev=1203380&r1=1203379&r2=1203380&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/HedwigProxy.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/HedwigProxy.java Thu Nov 17 21:38:49 2011
@@ -36,7 +36,7 @@ import org.jboss.netty.channel.socket.ni
 import org.jboss.netty.logging.InternalLoggerFactory;
 import org.jboss.netty.logging.Log4JLoggerFactory;
 
-import org.apache.hedwig.client.netty.HedwigClient;
+import org.apache.hedwig.client.HedwigClient;
 import org.apache.hedwig.protocol.PubSubProtocol.OperationType;
 import org.apache.hedwig.server.common.TerminateJVMExceptionHandler;
 import org.apache.hedwig.server.handlers.Handler;
@@ -120,7 +120,7 @@ public class HedwigProxy {
 
     public void shutdown() {
         allChannels.close().awaitUninterruptibly();
-        client.stop();
+        client.close();
         serverSocketChannelFactory.releaseExternalResources();
     }
 

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/ProxyStartDeliveryHandler.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/ProxyStartDeliveryHandler.java?rev=1203380&r1=1203379&r2=1203380&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/ProxyStartDeliveryHandler.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/ProxyStartDeliveryHandler.java Thu Nov 17 21:38:49 2011
@@ -82,7 +82,7 @@ public class ProxyStartDeliveryHandler i
 
             MessageHandler handler = new MessageHandler() {
                 @Override
-                public void consume(ByteString topic, ByteString subscriberId, Message msg,
+                public void deliver(ByteString topic, ByteString subscriberId, Message msg,
                 final Callback<Void> callback, final Object context) {
 
                     PubSubResponse response = PubSubResponse.newBuilder().setProtocolVersion(

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/regions/HedwigHubClient.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/regions/HedwigHubClient.java?rev=1203380&r1=1203379&r2=1203380&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/regions/HedwigHubClient.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/regions/HedwigHubClient.java Thu Nov 17 21:38:49 2011
@@ -20,7 +20,7 @@ package org.apache.hedwig.server.regions
 import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
 
 import org.apache.hedwig.client.conf.ClientConfiguration;
-import org.apache.hedwig.client.netty.HedwigClient;
+import org.apache.hedwig.client.netty.HedwigClientImpl;
 
 /**
  * This is a hub specific implementation of the HedwigClient. All this does
@@ -28,7 +28,7 @@ import org.apache.hedwig.client.netty.He
  * Creating this class so we can call the protected method in the parent to set
  * the subscriber since we don't want to expose that API to the public.
  */
-public class HedwigHubClient extends HedwigClient {
+public class HedwigHubClient extends HedwigClientImpl {
 
     // Constructor when we already have a ChannelFactory instantiated.
     public HedwigHubClient(ClientConfiguration cfg, ClientSocketChannelFactory channelFactory) {

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/regions/HedwigHubSubscriber.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/regions/HedwigHubSubscriber.java?rev=1203380&r1=1203379&r2=1203380&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/regions/HedwigHubSubscriber.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/regions/HedwigHubSubscriber.java Thu Nov 17 21:38:49 2011
@@ -19,7 +19,7 @@ package org.apache.hedwig.server.regions
 
 import com.google.protobuf.ByteString;
 import org.apache.hedwig.client.exceptions.InvalidSubscriberIdException;
-import org.apache.hedwig.client.netty.HedwigClient;
+import org.apache.hedwig.client.netty.HedwigClientImpl;
 import org.apache.hedwig.client.netty.HedwigSubscriber;
 import org.apache.hedwig.exceptions.PubSubException.ClientAlreadySubscribedException;
 import org.apache.hedwig.exceptions.PubSubException.ClientNotSubscribedException;
@@ -38,7 +38,7 @@ import org.apache.hedwig.util.Callback;
  */
 public class HedwigHubSubscriber extends HedwigSubscriber {
 
-    public HedwigHubSubscriber(HedwigClient client) {
+    public HedwigHubSubscriber(HedwigClientImpl client) {
         super(client);
     }
 

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/regions/RegionManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/regions/RegionManager.java?rev=1203380&r1=1203379&r2=1203380&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/regions/RegionManager.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/regions/RegionManager.java Thu Nov 17 21:38:49 2011
@@ -85,7 +85,7 @@ public class RegionManager implements Su
                             try {
                                 sub.startDelivery(topic, mySubId, new MessageHandler() {
                                     @Override
-                                    public void consume(final ByteString topic, ByteString subscriberId, Message msg,
+                                    public void deliver(final ByteString topic, ByteString subscriberId, Message msg,
                                     final Callback<Void> callback, final Object context) {
                                         // When messages are first published
                                         // locally, the PublishHandler sets the
@@ -176,7 +176,7 @@ public class RegionManager implements Su
     // Method to shutdown and stop all of the cross-region Hedwig clients.
     public void stop() {
         for (HedwigHubClient client : clients) {
-            client.stop();
+            client.close();
         }
     }
 

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/client/TestPubSubClient.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/client/TestPubSubClient.java?rev=1203380&r1=1203379&r2=1203380&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/client/TestPubSubClient.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/client/TestPubSubClient.java Thu Nov 17 21:38:49 2011
@@ -26,9 +26,9 @@ import org.junit.Test;
 import com.google.protobuf.ByteString;
 import org.apache.hedwig.client.api.MessageHandler;
 import org.apache.hedwig.client.conf.ClientConfiguration;
-import org.apache.hedwig.client.netty.HedwigClient;
-import org.apache.hedwig.client.netty.HedwigPublisher;
-import org.apache.hedwig.client.netty.HedwigSubscriber;
+import org.apache.hedwig.client.HedwigClient;
+import org.apache.hedwig.client.api.Publisher;
+import org.apache.hedwig.client.api.Subscriber;
 import org.apache.hedwig.exceptions.PubSubException;
 import org.apache.hedwig.exceptions.PubSubException.ClientNotSubscribedException;
 import org.apache.hedwig.protocol.PubSubProtocol.Message;
@@ -41,8 +41,8 @@ public class TestPubSubClient extends Pu
 
     // Client side variables
     protected HedwigClient client;
-    protected HedwigPublisher publisher;
-    protected HedwigSubscriber subscriber;
+    protected Publisher publisher;
+    protected Subscriber subscriber;
 
     // SynchronousQueues to verify async calls
     private final SynchronousQueue<Boolean> queue = new SynchronousQueue<Boolean>();
@@ -77,7 +77,7 @@ public class TestPubSubClient extends Pu
 
     // Test implementation of subscriber's message handler.
     class TestMessageHandler implements MessageHandler {
-        public void consume(ByteString topic, ByteString subscriberId, Message msg, Callback<Void> callback,
+        public void deliver(ByteString topic, ByteString subscriberId, Message msg, Callback<Void> callback,
                             Object context) {
             new Thread(new Runnable() {
                 @Override
@@ -103,7 +103,7 @@ public class TestPubSubClient extends Pu
     @Override
     @After
     public void tearDown() throws Exception {
-        client.stop();
+        client.close();
         super.tearDown();
     }
 

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/HedwigRegionTestBase.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/HedwigRegionTestBase.java?rev=1203380&r1=1203379&r2=1203380&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/HedwigRegionTestBase.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/HedwigRegionTestBase.java Thu Nov 17 21:38:49 2011
@@ -30,7 +30,7 @@ import org.junit.After;
 import org.junit.Before;
 
 import org.apache.hedwig.client.conf.ClientConfiguration;
-import org.apache.hedwig.client.netty.HedwigClient;
+import org.apache.hedwig.client.HedwigClient;
 import org.apache.hedwig.server.common.ServerConfiguration;
 import org.apache.hedwig.server.netty.PubSubServer;
 import org.apache.hedwig.server.persistence.BookKeeperTestBase;
@@ -203,7 +203,7 @@ public abstract class HedwigRegionTestBa
         logger.info("tearDown starting");
         // Stop all of the HedwigClients for all regions
         for (HedwigClient client : regionClientsMap.values()) {
-            client.stop();
+            client.close();
         }
         regionClientsMap.clear();
         // Shutdown all of the PubSubServers in all regions

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/integration/TestHedwigHub.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/integration/TestHedwigHub.java?rev=1203380&r1=1203379&r2=1203380&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/integration/TestHedwigHub.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/integration/TestHedwigHub.java Thu Nov 17 21:38:49 2011
@@ -35,9 +35,10 @@ import org.apache.hedwig.client.api.Mess
 import org.apache.hedwig.client.api.Subscriber;
 import org.apache.hedwig.client.conf.ClientConfiguration;
 import org.apache.hedwig.client.exceptions.InvalidSubscriberIdException;
-import org.apache.hedwig.client.netty.HedwigClient;
-import org.apache.hedwig.client.netty.HedwigPublisher;
-import org.apache.hedwig.client.netty.HedwigSubscriber;
+import org.apache.hedwig.client.HedwigClient;
+import org.apache.hedwig.client.api.Client;
+import org.apache.hedwig.client.api.Publisher;
+import org.apache.hedwig.client.api.Subscriber;
 import org.apache.hedwig.exceptions.PubSubException;
 import org.apache.hedwig.exceptions.PubSubException.ClientNotSubscribedException;
 import org.apache.hedwig.protocol.PubSubProtocol.Message;
@@ -63,8 +64,8 @@ public class TestHedwigHub extends Hedwi
 
     // Client side variables
     protected HedwigClient client;
-    protected HedwigPublisher publisher;
-    protected HedwigSubscriber subscriber;
+    protected Publisher publisher;
+    protected Subscriber subscriber;
 
     // Common ByteStrings used in tests.
     private final ByteString localSubscriberId = ByteString.copyFromUtf8("LocalSubscriber");
@@ -139,7 +140,7 @@ public class TestHedwigHub extends Hedwi
             this.consumeQueue = consumeQueue;
         }
 
-        public void consume(ByteString topic, ByteString subscriberId, final Message msg, Callback<Void> callback,
+        public void deliver(ByteString topic, ByteString subscriberId, final Message msg, Callback<Void> callback,
                             Object context) {
             if (!consumedMessages.contains(msg.getMsgId())) {
                 // New message to consume. Add it to the Set of consumed
@@ -218,7 +219,7 @@ public class TestHedwigHub extends Hedwi
     @Override
     @After
     public void tearDown() throws Exception {
-        client.stop();
+        client.close();
         if (mode == Mode.PROXY) {
             proxy.shutdown();
         }
@@ -322,8 +323,8 @@ public class TestHedwigHub extends Hedwi
             }
 
         });
-        HedwigSubscriber mySubscriber = myClient.getSubscriber();
-        HedwigPublisher myPublisher = myClient.getPublisher();
+        Subscriber mySubscriber = myClient.getSubscriber();
+        Publisher myPublisher = myClient.getPublisher();
         ByteString myTopic = getTopic(0);
         // Subscribe to a topic and start delivery on it
         mySubscriber.asyncSubscribe(myTopic, localSubscriberId, CreateOrAttach.CREATE_OR_ATTACH,
@@ -357,7 +358,7 @@ public class TestHedwigHub extends Hedwi
         } catch (InterruptedException e) {
             logger.error("Thread was interrupted while waiting to stop client for manual consume test!!", e);
         }
-        myClient.stop();
+        myClient.close();
     }
 
     @Test
@@ -640,8 +641,8 @@ public class TestHedwigHub extends Hedwi
     // subscriberId to be in the "hub" specific format.
     @Test
     public void testSyncHubSubscribeWithInvalidSubscriberId() throws Exception {
-        HedwigClient hubClient = new HedwigHubClient(new ClientConfiguration());
-        HedwigSubscriber hubSubscriber = hubClient.getSubscriber();
+        Client hubClient = new HedwigHubClient(new ClientConfiguration());
+        Subscriber hubSubscriber = hubClient.getSubscriber();
         boolean subscribeSuccess = false;
         try {
             hubSubscriber.subscribe(getTopic(0), localSubscriberId, CreateOrAttach.CREATE_OR_ATTACH);
@@ -651,23 +652,23 @@ public class TestHedwigHub extends Hedwi
             subscribeSuccess = false;
         }
         assertTrue(subscribeSuccess);
-        hubClient.stop();
+        hubClient.close();
     }
 
     @Test
     public void testAsyncHubSubscribeWithInvalidSubscriberId() throws Exception {
-        HedwigClient hubClient = new HedwigHubClient(new ClientConfiguration());
-        HedwigSubscriber hubSubscriber = hubClient.getSubscriber();
+        Client hubClient = new HedwigHubClient(new ClientConfiguration());
+        Subscriber hubSubscriber = hubClient.getSubscriber();
         hubSubscriber.asyncSubscribe(getTopic(0), localSubscriberId, CreateOrAttach.CREATE_OR_ATTACH, new TestCallback(
                                          queue), null);
         assertFalse(queue.take());
-        hubClient.stop();
+        hubClient.close();
     }
 
     @Test
     public void testSyncHubUnsubscribeWithInvalidSubscriberId() throws Exception {
-        HedwigClient hubClient = new HedwigHubClient(new ClientConfiguration());
-        HedwigSubscriber hubSubscriber = hubClient.getSubscriber();
+        Client hubClient = new HedwigHubClient(new ClientConfiguration());
+        Subscriber hubSubscriber = hubClient.getSubscriber();
         boolean unsubscribeSuccess = false;
         try {
             hubSubscriber.unsubscribe(getTopic(0), localSubscriberId);
@@ -677,16 +678,16 @@ public class TestHedwigHub extends Hedwi
             unsubscribeSuccess = false;
         }
         assertTrue(unsubscribeSuccess);
-        hubClient.stop();
+        hubClient.close();
     }
 
     @Test
     public void testAsyncHubUnsubscribeWithInvalidSubscriberId() throws Exception {
-        HedwigClient hubClient = new HedwigHubClient(new ClientConfiguration());
-        HedwigSubscriber hubSubscriber = hubClient.getSubscriber();
+        Client hubClient = new HedwigHubClient(new ClientConfiguration());
+        Subscriber hubSubscriber = hubClient.getSubscriber();
         hubSubscriber.asyncUnsubscribe(getTopic(0), localSubscriberId, new TestCallback(queue), null);
         assertFalse(queue.take());
-        hubClient.stop();
+        hubClient.close();
     }
 
 }

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/integration/TestHedwigRegion.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/integration/TestHedwigRegion.java?rev=1203380&r1=1203379&r2=1203380&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/integration/TestHedwigRegion.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/integration/TestHedwigRegion.java Thu Nov 17 21:38:49 2011
@@ -24,8 +24,8 @@ import org.junit.Before;
 import org.junit.Test;
 
 import com.google.protobuf.ByteString;
-import org.apache.hedwig.client.netty.HedwigClient;
-import org.apache.hedwig.client.netty.HedwigPublisher;
+import org.apache.hedwig.client.HedwigClient;
+import org.apache.hedwig.client.api.Publisher;
 import org.apache.hedwig.protocol.PubSubProtocol.Message;
 import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach;
 import org.apache.hedwig.server.HedwigRegionTestBase;
@@ -76,7 +76,7 @@ public class TestHedwigRegion extends He
         // Now start publishing messages for the subscribed topics in one of the
         // regions and verify that it gets delivered and consumed in all of the
         // other ones.
-        HedwigPublisher publisher = regionClientsMap.values().iterator().next().getPublisher();
+        Publisher publisher = regionClientsMap.values().iterator().next().getPublisher();
         for (int i = 0; i < batchSize; i++) {
             publisher.asyncPublish(ByteString.copyFromUtf8("Topic" + i), Message.newBuilder().setBody(
                                        ByteString.copyFromUtf8("Message" + i)).build(), new TestCallback(queue), null);

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/netty/TestPubSubServer.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/netty/TestPubSubServer.java?rev=1203380&r1=1203379&r2=1203380&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/netty/TestPubSubServer.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/netty/TestPubSubServer.java Thu Nov 17 21:38:49 2011
@@ -33,8 +33,8 @@ import org.junit.Test;
 
 import com.google.protobuf.ByteString;
 import org.apache.hedwig.client.conf.ClientConfiguration;
-import org.apache.hedwig.client.netty.HedwigClient;
-import org.apache.hedwig.client.netty.HedwigPublisher;
+import org.apache.hedwig.client.HedwigClient;
+import org.apache.hedwig.client.api.Publisher;
 import org.apache.hedwig.exceptions.PubSubException;
 import org.apache.hedwig.protocol.PubSubProtocol.Message;
 import org.apache.hedwig.server.PubSubServerStandAloneTestBase;
@@ -97,7 +97,7 @@ public class TestPubSubServer extends Pu
     }
 
     public void runPublishRequest(final int port) throws Exception {
-        HedwigPublisher publisher = new HedwigClient(new ClientConfiguration() {
+        Publisher publisher = new HedwigClient(new ClientConfiguration() {
             @Override
             public InetSocketAddress getDefaultServerHost() {
                 return new InetSocketAddress("localhost", port);

Modified: zookeeper/bookkeeper/trunk/pom.xml
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/pom.xml?rev=1203380&r1=1203379&r2=1203380&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/pom.xml (original)
+++ zookeeper/bookkeeper/trunk/pom.xml Thu Nov 17 21:38:49 2011
@@ -72,7 +72,8 @@
         <artifactId>maven-javadoc-plugin</artifactId>
         <version>2.8</version>
 	<configuration>
-	  <subpackages>org.apache.bookkeeper.client</subpackages>
+	  <additionalparam>-exclude org.apache.hedwig.client.netty:org.apache.hedwig.client.benchmark:org.apache.hedwig.client.data:org.apache.hedwig.client.exceptions:org.apache.hedwig.client.handlers:org.apache.hedwig.client.ssl</additionalparam>
+	  <subpackages>org.apache.bookkeeper.client:org.apache.hedwig.client:org.apache.hedwig.util:org.apache.hedwig.protocol:org.apache.hedwig.exceptions</subpackages>
           <groups>
 	    <group>
               <title>Bookkeeper</title>



Mime
View raw message