zookeeper-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From iv...@apache.org
Subject svn commit: r1374321 [1/3] - in /zookeeper/bookkeeper/trunk: ./ bookkeeper-server/ hedwig-client/src/main/cpp/lib/ hedwig-client/src/main/cpp/test/ hedwig-client/src/main/java/org/apache/hedwig/client/netty/ hedwig-protocol/ hedwig-protocol/src/main/ja...
Date Fri, 17 Aug 2012 15:32:24 GMT
Author: ivank
Date: Fri Aug 17 15:32:23 2012
New Revision: 1374321

URL: http://svn.apache.org/viewvc?rev=1374321&view=rev
Log:
BOOKKEEPER-332: Add SubscriptionPreferences to record all preferences for a subscription (sijie
via ivank)

Modified:
    zookeeper/bookkeeper/trunk/CHANGES.txt
    zookeeper/bookkeeper/trunk/bookkeeper-server/pom.xml
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/data.cpp
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/data.h
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/messageboundtest.cpp
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigSubscriber.java
    zookeeper/bookkeeper/trunk/hedwig-protocol/pom.xml
    zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/java/org/apache/hedwig/protocol/PubSubProtocol.java
    zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/java/org/apache/hedwig/protoextensions/SubscriptionStateUtils.java
    zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/protobuf/PubSubProtocol.proto
    zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/HedwigAdmin.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/console/HedwigConsole.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/console/ReadTopic.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/meta/SubscriptionDataManager.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/meta/ZkMetadataManagerFactory.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/AbstractSubscriptionManager.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/InMemorySubscriptionManager.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/InMemorySubscriptionState.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/MMSubscriptionManager.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/TestBackwardCompat.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/meta/TestMetadataManager.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/MessageBoundedPersistenceTest.java

Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1374321&r1=1374320&r2=1374321&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Fri Aug 17 15:32:23 2012
@@ -98,6 +98,8 @@ Trunk (unreleased changes)
 
         BOOKKEEPER-283: Improve Hedwig Console to use Hedwig Metadata Manager. (sijie via
ivank)
 
+        BOOKKEEPER-332: Add SubscriptionPreferences to record all preferences for a subscription
(sijie via ivank)
+
       hedwig-client:
 
         BOOKKEEPER-306: Change C++ client to use gtest for testing (ivank via sijie)

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/pom.xml
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/pom.xml?rev=1374321&r1=1374320&r2=1374321&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/pom.xml (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/pom.xml Fri Aug 17 15:32:23 2012
@@ -88,32 +88,32 @@
       <version>2.1</version>
     </dependency>
     <!--
-	Annoying dependency we need to include because
-	zookeeper uses log4j and so we transatively do, but
-	log4j has some dependencies which aren't in the 
-	default maven repositories
+        Annoying dependency we need to include because
+        zookeeper uses log4j and so we transatively do, but
+        log4j has some dependencies which aren't in the 
+        default maven repositories
     //-->
     <dependency>
       <groupId>log4j</groupId>
       <artifactId>log4j</artifactId>
       <version>1.2.15</version>
       <exclusions>
-	<exclusion>
-	  <groupId>javax.mail</groupId>
-	  <artifactId>mail</artifactId>
-	</exclusion>
-	<exclusion>
-	  <groupId>javax.jms</groupId>
-	  <artifactId>jms</artifactId>
-	</exclusion>
-	<exclusion>
-	  <groupId>com.sun.jdmk</groupId>
-	  <artifactId>jmxtools</artifactId>
-	</exclusion>
-	<exclusion>
-	  <groupId>com.sun.jmx</groupId>
-	  <artifactId>jmxri</artifactId>
-	</exclusion>
+        <exclusion>
+          <groupId>javax.mail</groupId>
+          <artifactId>mail</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>javax.jms</groupId>
+          <artifactId>jms</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>com.sun.jdmk</groupId>
+          <artifactId>jmxtools</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>com.sun.jmx</groupId>
+          <artifactId>jmxri</artifactId>
+        </exclusion>
       </exclusions>
     </dependency>
     <dependency>
@@ -165,11 +165,23 @@
         </executions>
       </plugin>
       <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-jar-plugin</artifactId>
+        <version>2.2</version>
+        <executions>
+          <execution>
+            <goals>
+              <goal>test-jar</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
         <artifactId>maven-assembly-plugin</artifactId>
         <version>2.2.1</version>
         <configuration>
           <descriptors>
-	    <descriptor>../src/assemble/bin.xml</descriptor>
+            <descriptor>../src/assemble/bin.xml</descriptor>
           </descriptors>
         </configuration>
       </plugin>
@@ -192,18 +204,18 @@
         </configuration>
       </plugin>
       <plugin>
-	<artifactId>maven-dependency-plugin</artifactId>
-	<executions>
-	  <execution>
-	    <phase>package</phase>
-	    <goals>
-	      <goal>copy-dependencies</goal>
-	    </goals>
-	    <configuration>
-	      <outputDirectory>${basedir}/lib</outputDirectory>
-	    </configuration>
-	  </execution>
-	</executions>
+        <artifactId>maven-dependency-plugin</artifactId>
+        <executions>
+          <execution>
+            <phase>package</phase>
+            <goals>
+              <goal>copy-dependencies</goal>
+            </goals>
+            <configuration>
+              <outputDirectory>${basedir}/lib</outputDirectory>
+            </configuration>
+          </execution>
+        </executions>
       </plugin>
     </plugins>
   </build>

Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/data.cpp
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/data.cpp?rev=1374321&r1=1374320&r2=1374321&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/data.cpp (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/data.cpp Fri Aug 17 15:32:23
2012
@@ -98,6 +98,17 @@ const MessageSeqId PubSubData::getMessag
   return msgid;
 }
 
+void PubSubData::setPreferencesForSubRequest(SubscribeRequest * subreq,
+                                             const SubscriptionOptions &options) {
+  Hedwig::SubscriptionPreferences* preferences = subreq->mutable_preferences();
+  if (options.messagebound() > 0) {
+    preferences->set_messagebound(options.messagebound());
+  }
+  if (options.has_options()) {
+    preferences->mutable_options()->CopyFrom(options.options());
+  }
+}
+
 const PubSubRequestPtr PubSubData::getRequest() {
   PubSubRequestPtr request(new Hedwig::PubSubRequest());
   request->set_protocolversion(Hedwig::VERSION_ONE);
@@ -120,9 +131,7 @@ const PubSubRequestPtr PubSubData::getRe
     Hedwig::SubscribeRequest* subreq = request->mutable_subscriberequest();
     subreq->set_subscriberid(subscriberid);
     subreq->set_createorattach(options.createorattach());
-    if (options.messagebound() > 0) {
-      subreq->set_messagebound(options.messagebound());
-    }
+    setPreferencesForSubRequest(subreq, options);
   } else if (type == CONSUME) {
     LOG4CXX_DEBUG(logger, "Creating consume request");
 

Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/data.h
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/data.h?rev=1374321&r1=1374320&r2=1374321&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/data.h (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/data.h Fri Aug 17 15:32:23 2012
@@ -95,6 +95,9 @@ namespace Hedwig {
   private:
 
     PubSubData();
+
+    void setPreferencesForSubRequest(SubscribeRequest * subreq,
+                                     const SubscriptionOptions &options);
     
     OperationType type;
     long txnid;

Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/messageboundtest.cpp
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/messageboundtest.cpp?rev=1374321&r1=1374320&r2=1374321&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/messageboundtest.cpp (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/messageboundtest.cpp Fri Aug
17 15:32:23 2012
@@ -182,3 +182,45 @@ TEST(MessageBoundTest, testMultipleSubsc
   sub.unsubscribe(topic, subid5);
 }
 
+TEST(MessageBoundTest, testUpdateMessageBound) {
+  Hedwig::Configuration* conf = new TestServerConfiguration();
+  std::auto_ptr<Hedwig::Configuration> confptr(conf);
+
+  Hedwig::Client* client = new Hedwig::Client(*conf);
+  std::auto_ptr<Hedwig::Client> clientptr(client);
+
+  Hedwig::Subscriber& sub = client->getSubscriber();
+  Hedwig::Publisher& pub = client->getPublisher();
+
+  Hedwig::SubscriptionOptions options5;
+  options5.set_messagebound(5);
+  options5.set_createorattach(Hedwig::SubscribeRequest::CREATE_OR_ATTACH);
+  Hedwig::SubscriptionOptions options20;
+  options20.set_messagebound(20);
+  options20.set_createorattach(Hedwig::SubscribeRequest::CREATE_OR_ATTACH);
+  Hedwig::SubscriptionOptions options10;
+  options10.set_messagebound(10);
+  options10.set_createorattach(Hedwig::SubscribeRequest::CREATE_OR_ATTACH);
+
+  std::string topic = "testUpdateMessageBound";
+  std::string subid = "updateSubId";
+
+  sub.subscribe(topic, subid, options5);
+  sub.closeSubscription(topic, subid);
+  sendXExpectLastY(pub, sub, topic, subid, 50, 5);
+
+  // update bound to 20
+  sub.subscribe(topic, subid, options20);
+  sub.closeSubscription(topic, subid);
+  sendXExpectLastY(pub, sub, topic, subid, 50, 20);
+
+  // update bound to 10
+  sub.subscribe(topic, subid, options10);
+  sub.closeSubscription(topic, subid);
+  sendXExpectLastY(pub, sub, topic, subid, 50, 10);
+
+  // message bound is not provided, no update
+  sub.subscribe(topic, subid, Hedwig::SubscribeRequest::CREATE_OR_ATTACH);
+  sub.closeSubscription(topic, subid);
+  sendXExpectLastY(pub, sub, topic, subid, 50, 10);
+}

Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigSubscriber.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigSubscriber.java?rev=1374321&r1=1374320&r2=1374321&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigSubscriber.java
(original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigSubscriber.java
Fri Aug 17 15:32:23 2012
@@ -53,6 +53,7 @@ import org.apache.hedwig.protocol.PubSub
 import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest;
 import org.apache.hedwig.protocol.PubSubProtocol.UnsubscribeRequest;
 import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionOptions;
+import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionPreferences;
 import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach;
 import org.apache.hedwig.protoextensions.SubscriptionStateUtils;
 import org.apache.hedwig.util.Callback;
@@ -355,6 +356,31 @@ public class HedwigSubscriber implements
     }
 
     /**
+     * Convert client-side subscription options to subscription preferences
+     *
+     * @param options
+     *          Client-Side subscription options
+     */
+    protected SubscriptionPreferences.Builder options2Preferences(SubscriptionOptions options)
{
+        // prepare subscription preferences
+        SubscriptionPreferences.Builder preferencesBuilder = SubscriptionPreferences.newBuilder();
+
+        // set message bound
+        if (options.getMessageBound() > 0) {
+            preferencesBuilder.setMessageBound(options.getMessageBound());
+        } else if (cfg.getSubscriptionMessageBound() > 0) {
+            preferencesBuilder.setMessageBound(cfg.getSubscriptionMessageBound());
+        }
+
+        // set user options
+        if (options.hasOptions()) {
+            preferencesBuilder.setOptions(options.getOptions());
+        }
+
+        return preferencesBuilder;
+    }
+
+    /**
      * This is a helper method to write the actual subscribe/unsubscribe message
      * once the client is connected to the server and a Channel is available.
      *
@@ -388,12 +414,14 @@ public class HedwigSubscriber implements
             // For now, all subscribes should wait for all cross-regional
             // subscriptions to be established before returning.
             subscribeRequestBuilder.setSynchronous(true);
-
-            if (pubSubData.options.getMessageBound() > 0) {
-                subscribeRequestBuilder.setMessageBound(pubSubData.options.getMessageBound());
-            } else if (cfg.getSubscriptionMessageBound() > 0) {
-                subscribeRequestBuilder.setMessageBound(cfg.getSubscriptionMessageBound());
+            // set subscription preferences
+            SubscriptionPreferences.Builder preferencesBuilder =
+                options2Preferences(pubSubData.options);
+            // backward compatable with 4.1.0
+            if (preferencesBuilder.hasMessageBound()) {
+                subscribeRequestBuilder.setMessageBound(preferencesBuilder.getMessageBound());
             }
+            subscribeRequestBuilder.setPreferences(preferencesBuilder);
 
             // Set the SubscribeRequest into the outer PubSubRequest
             pubsubRequestBuilder.setSubscribeRequest(subscribeRequestBuilder);

Modified: zookeeper/bookkeeper/trunk/hedwig-protocol/pom.xml
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-protocol/pom.xml?rev=1374321&r1=1374320&r2=1374321&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-protocol/pom.xml (original)
+++ zookeeper/bookkeeper/trunk/hedwig-protocol/pom.xml Fri Aug 17 15:32:23 2012
@@ -41,6 +41,16 @@
       <version>4.8.1</version>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-api</artifactId>
+      <version>1.6.4</version>
+    </dependency>
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-log4j12</artifactId>
+      <version>1.6.4</version>
+    </dependency>
   </dependencies>
   <repositories>
   </repositories>



Mime
View raw message