zookeeper-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From iv...@apache.org
Subject svn commit: r1360300 [1/2] - in /zookeeper/bookkeeper/trunk: ./ hedwig-protocol/src/main/java/org/apache/hedwig/exceptions/ hedwig-protocol/src/main/java/org/apache/hedwig/protocol/ hedwig-protocol/src/main/protobuf/ hedwig-server/src/main/java/org/apa...
Date Wed, 11 Jul 2012 17:11:07 GMT
Author: ivank
Date: Wed Jul 11 17:11:06 2012
New Revision: 1360300

URL: http://svn.apache.org/viewvc?rev=1360300&view=rev
Log:
BOOKKEEPER-250: Need a ledger manager like interface to manage metadata operations in Hedwig (sijie via ivank)

Added:
    zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/meta/
    zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/meta/FactoryLayout.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/meta/MetadataManagerFactory.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/meta/SubscriptionDataManager.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/meta/TopicPersistenceManager.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/meta/ZkMetadataManagerFactory.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/MMSubscriptionManager.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/meta/
    zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/meta/MetadataManagerFactoryTestCase.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/meta/TestFactoryLayout.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/meta/TestMetadataManager.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/meta/TestMetadataManagerFactory.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/subscriptions/TestMMSubscriptionManager.java
      - copied, changed from r1358895, zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/subscriptions/TestZkSubscriptionManager.java
Removed:
    zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/ZkSubscriptionManager.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/subscriptions/TestZkSubscriptionManager.java
Modified:
    zookeeper/bookkeeper/trunk/CHANGES.txt
    zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/java/org/apache/hedwig/exceptions/PubSubException.java
    zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/java/org/apache/hedwig/protocol/PubSubProtocol.java
    zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/protobuf/PubSubProtocol.proto
    zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/common/ServerConfiguration.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/netty/PubSubServer.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/BookkeeperPersistenceManager.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/LocalDBPersistenceManager.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/PersistenceManager.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/zookeeper/ZkUtils.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/StubPersistenceManager.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestBookKeeperPersistenceManager.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestBookKeeperPersistenceManagerBlackBox.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestBookkeeperPersistenceManagerWhiteBox.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestPersistenceManagerBlackBox.java

Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1360300&r1=1360299&r2=1360300&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Wed Jul 11 17:11:06 2012
@@ -54,6 +54,10 @@ Trunk (unreleased changes)
 
         BOOKKEEPER-328: Bookie DeathWatcher is missing thread name (Rakesh via sijie)
 
+      hedwig-server:
+
+        BOOKKEEPER-250: Need a ledger manager like interface to manage metadata operations in Hedwig (sijie via ivank)
+
 Release 4.1.0 - 2012-06-07
 
   Non-backward compatible changes:

Modified: zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/java/org/apache/hedwig/exceptions/PubSubException.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/java/org/apache/hedwig/exceptions/PubSubException.java?rev=1360300&r1=1360299&r2=1360300&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/java/org/apache/hedwig/exceptions/PubSubException.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/java/org/apache/hedwig/exceptions/PubSubException.java Wed Jul 11 17:11:06 2012
@@ -50,6 +50,18 @@ public abstract class PubSubException ex
             return new ServiceDownException(msg);
         } else if (code == StatusCode.COULD_NOT_CONNECT) {
             return new CouldNotConnectException(msg);
+        } else if (code == StatusCode.TOPIC_BUSY) {
+            return new TopicBusyException(msg);
+        } else if (code == StatusCode.BAD_VERSION) {
+            return new BadVersionException(msg);
+        } else if (code == StatusCode.NO_TOPIC_PERSISTENCE_INFO) {
+            return new NoTopicPersistenceInfoException(msg);
+        } else if (code == StatusCode.TOPIC_PERSISTENCE_INFO_EXISTS) {
+            return new TopicPersistenceInfoExistsException(msg);
+        } else if (code == StatusCode.NO_SUBSCRIPTION_STATE) {
+            return new NoSubscriptionStateException(msg);
+        } else if (code == StatusCode.SUBSCRIPTION_STATE_EXISTS) {
+            return new SubscriptionStateExistsException(msg);
         }
         /*
          * Insert new ones here
@@ -120,6 +132,36 @@ public abstract class PubSubException ex
         }
     }
 
+    public static class BadVersionException extends PubSubException {
+        public BadVersionException(String msg) {
+            super(StatusCode.BAD_VERSION, msg);
+        }
+    }
+
+    public static class NoTopicPersistenceInfoException extends PubSubException {
+        public NoTopicPersistenceInfoException(String msg) {
+            super(StatusCode.NO_TOPIC_PERSISTENCE_INFO, msg);
+        }
+    }
+
+    public static class TopicPersistenceInfoExistsException extends PubSubException {
+        public TopicPersistenceInfoExistsException(String msg) {
+            super(StatusCode.TOPIC_PERSISTENCE_INFO_EXISTS, msg);
+        }
+    }
+
+    public static class NoSubscriptionStateException extends PubSubException {
+        public NoSubscriptionStateException(String msg) {
+            super(StatusCode.NO_SUBSCRIPTION_STATE, msg);
+        }
+    }
+
+    public static class SubscriptionStateExistsException extends PubSubException {
+        public SubscriptionStateExistsException(String msg) {
+            super(StatusCode.SUBSCRIPTION_STATE_EXISTS, msg);
+        }
+    }
+
     /*
      * Insert new ones here
      */

Modified: zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/java/org/apache/hedwig/protocol/PubSubProtocol.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/java/org/apache/hedwig/protocol/PubSubProtocol.java?rev=1360300&r1=1360299&r2=1360300&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/java/org/apache/hedwig/protocol/PubSubProtocol.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/java/org/apache/hedwig/protocol/PubSubProtocol.java Wed Jul 11 17:11:06 2012
@@ -167,8 +167,13 @@ public final class PubSubProtocol {
     NOT_RESPONSIBLE_FOR_TOPIC(7, 501),
     SERVICE_DOWN(8, 502),
     UNCERTAIN_STATE(9, 503),
-    UNEXPECTED_CONDITION(10, 600),
-    COMPOSITE(11, 700),
+    BAD_VERSION(10, 520),
+    NO_TOPIC_PERSISTENCE_INFO(11, 521),
+    TOPIC_PERSISTENCE_INFO_EXISTS(12, 522),
+    NO_SUBSCRIPTION_STATE(13, 523),
+    SUBSCRIPTION_STATE_EXISTS(14, 524),
+    UNEXPECTED_CONDITION(15, 600),
+    COMPOSITE(16, 700),
     ;
     
     public static final int SUCCESS_VALUE = 0;
@@ -181,6 +186,11 @@ public final class PubSubProtocol {
     public static final int NOT_RESPONSIBLE_FOR_TOPIC_VALUE = 501;
     public static final int SERVICE_DOWN_VALUE = 502;
     public static final int UNCERTAIN_STATE_VALUE = 503;
+    public static final int BAD_VERSION_VALUE = 520;
+    public static final int NO_TOPIC_PERSISTENCE_INFO_VALUE = 521;
+    public static final int TOPIC_PERSISTENCE_INFO_EXISTS_VALUE = 522;
+    public static final int NO_SUBSCRIPTION_STATE_VALUE = 523;
+    public static final int SUBSCRIPTION_STATE_EXISTS_VALUE = 524;
     public static final int UNEXPECTED_CONDITION_VALUE = 600;
     public static final int COMPOSITE_VALUE = 700;
     
@@ -199,6 +209,11 @@ public final class PubSubProtocol {
         case 501: return NOT_RESPONSIBLE_FOR_TOPIC;
         case 502: return SERVICE_DOWN;
         case 503: return UNCERTAIN_STATE;
+        case 520: return BAD_VERSION;
+        case 521: return NO_TOPIC_PERSISTENCE_INFO;
+        case 522: return TOPIC_PERSISTENCE_INFO_EXISTS;
+        case 523: return NO_SUBSCRIPTION_STATE;
+        case 524: return SUBSCRIPTION_STATE_EXISTS;
         case 600: return UNEXPECTED_CONDITION;
         case 700: return COMPOSITE;
         default: return null;
@@ -231,7 +246,7 @@ public final class PubSubProtocol {
     }
     
     private static final StatusCode[] VALUES = {
-      SUCCESS, MALFORMED_REQUEST, NO_SUCH_TOPIC, CLIENT_ALREADY_SUBSCRIBED, CLIENT_NOT_SUBSCRIBED, COULD_NOT_CONNECT, TOPIC_BUSY, NOT_RESPONSIBLE_FOR_TOPIC, SERVICE_DOWN, UNCERTAIN_STATE, UNEXPECTED_CONDITION, COMPOSITE, 
+      SUCCESS, MALFORMED_REQUEST, NO_SUCH_TOPIC, CLIENT_ALREADY_SUBSCRIBED, CLIENT_NOT_SUBSCRIBED, COULD_NOT_CONNECT, TOPIC_BUSY, NOT_RESPONSIBLE_FOR_TOPIC, SERVICE_DOWN, UNCERTAIN_STATE, BAD_VERSION, NO_TOPIC_PERSISTENCE_INFO, TOPIC_PERSISTENCE_INFO_EXISTS, NO_SUBSCRIPTION_STATE, SUBSCRIPTION_STATE_EXISTS, UNEXPECTED_CONDITION, COMPOSITE, 
     };
     
     public static StatusCode valueOf(
@@ -8933,6 +8948,454 @@ public final class PubSubProtocol {
     // @@protoc_insertion_point(class_scope:Hedwig.LedgerRanges)
   }
   
+  public interface ManagerMetaOrBuilder
+      extends com.google.protobuf.MessageOrBuilder {
+    
+    // required string managerImpl = 2;
+    boolean hasManagerImpl();
+    String getManagerImpl();
+    
+    // required uint32 managerVersion = 3;
+    boolean hasManagerVersion();
+    int getManagerVersion();
+  }
+  public static final class ManagerMeta extends
+      com.google.protobuf.GeneratedMessage
+      implements ManagerMetaOrBuilder {
+    // Use ManagerMeta.newBuilder() to construct.
+    private ManagerMeta(Builder builder) {
+      super(builder);
+    }
+    private ManagerMeta(boolean noInit) {}
+    
+    private static final ManagerMeta defaultInstance;
+    public static ManagerMeta getDefaultInstance() {
+      return defaultInstance;
+    }
+    
+    public ManagerMeta getDefaultInstanceForType() {
+      return defaultInstance;
+    }
+    
+    public static final com.google.protobuf.Descriptors.Descriptor
+        getDescriptor() {
+      return org.apache.hedwig.protocol.PubSubProtocol.internal_static_Hedwig_ManagerMeta_descriptor;
+    }
+    
+    protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
+        internalGetFieldAccessorTable() {
+      return org.apache.hedwig.protocol.PubSubProtocol.internal_static_Hedwig_ManagerMeta_fieldAccessorTable;
+    }
+    
+    private int bitField0_;
+    // required string managerImpl = 2;
+    public static final int MANAGERIMPL_FIELD_NUMBER = 2;
+    private java.lang.Object managerImpl_;
+    public boolean hasManagerImpl() {
+      return ((bitField0_ & 0x00000001) == 0x00000001);
+    }
+    public String getManagerImpl() {
+      java.lang.Object ref = managerImpl_;
+      if (ref instanceof String) {
+        return (String) ref;
+      } else {
+        com.google.protobuf.ByteString bs = 
+            (com.google.protobuf.ByteString) ref;
+        String s = bs.toStringUtf8();
+        if (com.google.protobuf.Internal.isValidUtf8(bs)) {
+          managerImpl_ = s;
+        }
+        return s;
+      }
+    }
+    private com.google.protobuf.ByteString getManagerImplBytes() {
+      java.lang.Object ref = managerImpl_;
+      if (ref instanceof String) {
+        com.google.protobuf.ByteString b = 
+            com.google.protobuf.ByteString.copyFromUtf8((String) ref);
+        managerImpl_ = b;
+        return b;
+      } else {
+        return (com.google.protobuf.ByteString) ref;
+      }
+    }
+    
+    // required uint32 managerVersion = 3;
+    public static final int MANAGERVERSION_FIELD_NUMBER = 3;
+    private int managerVersion_;
+    public boolean hasManagerVersion() {
+      return ((bitField0_ & 0x00000002) == 0x00000002);
+    }
+    public int getManagerVersion() {
+      return managerVersion_;
+    }
+    
+    private void initFields() {
+      managerImpl_ = "";
+      managerVersion_ = 0;
+    }
+    private byte memoizedIsInitialized = -1;
+    public final boolean isInitialized() {
+      byte isInitialized = memoizedIsInitialized;
+      if (isInitialized != -1) return isInitialized == 1;
+      
+      if (!hasManagerImpl()) {
+        memoizedIsInitialized = 0;
+        return false;
+      }
+      if (!hasManagerVersion()) {
+        memoizedIsInitialized = 0;
+        return false;
+      }
+      memoizedIsInitialized = 1;
+      return true;
+    }
+    
+    public void writeTo(com.google.protobuf.CodedOutputStream output)
+                        throws java.io.IOException {
+      getSerializedSize();
+      if (((bitField0_ & 0x00000001) == 0x00000001)) {
+        output.writeBytes(2, getManagerImplBytes());
+      }
+      if (((bitField0_ & 0x00000002) == 0x00000002)) {
+        output.writeUInt32(3, managerVersion_);
+      }
+      getUnknownFields().writeTo(output);
+    }
+    
+    private int memoizedSerializedSize = -1;
+    public int getSerializedSize() {
+      int size = memoizedSerializedSize;
+      if (size != -1) return size;
+    
+      size = 0;
+      if (((bitField0_ & 0x00000001) == 0x00000001)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeBytesSize(2, getManagerImplBytes());
+      }
+      if (((bitField0_ & 0x00000002) == 0x00000002)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeUInt32Size(3, managerVersion_);
+      }
+      size += getUnknownFields().getSerializedSize();
+      memoizedSerializedSize = size;
+      return size;
+    }
+    
+    private static final long serialVersionUID = 0L;
+    @java.lang.Override
+    protected java.lang.Object writeReplace()
+        throws java.io.ObjectStreamException {
+      return super.writeReplace();
+    }
+    
+    public static org.apache.hedwig.protocol.PubSubProtocol.ManagerMeta parseFrom(
+        com.google.protobuf.ByteString data)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return newBuilder().mergeFrom(data).buildParsed();
+    }
+    public static org.apache.hedwig.protocol.PubSubProtocol.ManagerMeta parseFrom(
+        com.google.protobuf.ByteString data,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return newBuilder().mergeFrom(data, extensionRegistry)
+               .buildParsed();
+    }
+    public static org.apache.hedwig.protocol.PubSubProtocol.ManagerMeta parseFrom(byte[] data)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return newBuilder().mergeFrom(data).buildParsed();
+    }
+    public static org.apache.hedwig.protocol.PubSubProtocol.ManagerMeta parseFrom(
+        byte[] data,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return newBuilder().mergeFrom(data, extensionRegistry)
+               .buildParsed();
+    }
+    public static org.apache.hedwig.protocol.PubSubProtocol.ManagerMeta parseFrom(java.io.InputStream input)
+        throws java.io.IOException {
+      return newBuilder().mergeFrom(input).buildParsed();
+    }
+    public static org.apache.hedwig.protocol.PubSubProtocol.ManagerMeta parseFrom(
+        java.io.InputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      return newBuilder().mergeFrom(input, extensionRegistry)
+               .buildParsed();
+    }
+    public static org.apache.hedwig.protocol.PubSubProtocol.ManagerMeta parseDelimitedFrom(java.io.InputStream input)
+        throws java.io.IOException {
+      Builder builder = newBuilder();
+      if (builder.mergeDelimitedFrom(input)) {
+        return builder.buildParsed();
+      } else {
+        return null;
+      }
+    }
+    public static org.apache.hedwig.protocol.PubSubProtocol.ManagerMeta parseDelimitedFrom(
+        java.io.InputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      Builder builder = newBuilder();
+      if (builder.mergeDelimitedFrom(input, extensionRegistry)) {
+        return builder.buildParsed();
+      } else {
+        return null;
+      }
+    }
+    public static org.apache.hedwig.protocol.PubSubProtocol.ManagerMeta parseFrom(
+        com.google.protobuf.CodedInputStream input)
+        throws java.io.IOException {
+      return newBuilder().mergeFrom(input).buildParsed();
+    }
+    public static org.apache.hedwig.protocol.PubSubProtocol.ManagerMeta parseFrom(
+        com.google.protobuf.CodedInputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      return newBuilder().mergeFrom(input, extensionRegistry)
+               .buildParsed();
+    }
+    
+    public static Builder newBuilder() { return Builder.create(); }
+    public Builder newBuilderForType() { return newBuilder(); }
+    public static Builder newBuilder(org.apache.hedwig.protocol.PubSubProtocol.ManagerMeta prototype) {
+      return newBuilder().mergeFrom(prototype);
+    }
+    public Builder toBuilder() { return newBuilder(this); }
+    
+    @java.lang.Override
+    protected Builder newBuilderForType(
+        com.google.protobuf.GeneratedMessage.BuilderParent parent) {
+      Builder builder = new Builder(parent);
+      return builder;
+    }
+    public static final class Builder extends
+        com.google.protobuf.GeneratedMessage.Builder<Builder>
+       implements org.apache.hedwig.protocol.PubSubProtocol.ManagerMetaOrBuilder {
+      public static final com.google.protobuf.Descriptors.Descriptor
+          getDescriptor() {
+        return org.apache.hedwig.protocol.PubSubProtocol.internal_static_Hedwig_ManagerMeta_descriptor;
+      }
+      
+      protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
+          internalGetFieldAccessorTable() {
+        return org.apache.hedwig.protocol.PubSubProtocol.internal_static_Hedwig_ManagerMeta_fieldAccessorTable;
+      }
+      
+      // Construct using org.apache.hedwig.protocol.PubSubProtocol.ManagerMeta.newBuilder()
+      private Builder() {
+        maybeForceBuilderInitialization();
+      }
+      
+      private Builder(BuilderParent parent) {
+        super(parent);
+        maybeForceBuilderInitialization();
+      }
+      private void maybeForceBuilderInitialization() {
+        if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) {
+        }
+      }
+      private static Builder create() {
+        return new Builder();
+      }
+      
+      public Builder clear() {
+        super.clear();
+        managerImpl_ = "";
+        bitField0_ = (bitField0_ & ~0x00000001);
+        managerVersion_ = 0;
+        bitField0_ = (bitField0_ & ~0x00000002);
+        return this;
+      }
+      
+      public Builder clone() {
+        return create().mergeFrom(buildPartial());
+      }
+      
+      public com.google.protobuf.Descriptors.Descriptor
+          getDescriptorForType() {
+        return org.apache.hedwig.protocol.PubSubProtocol.ManagerMeta.getDescriptor();
+      }
+      
+      public org.apache.hedwig.protocol.PubSubProtocol.ManagerMeta getDefaultInstanceForType() {
+        return org.apache.hedwig.protocol.PubSubProtocol.ManagerMeta.getDefaultInstance();
+      }
+      
+      public org.apache.hedwig.protocol.PubSubProtocol.ManagerMeta build() {
+        org.apache.hedwig.protocol.PubSubProtocol.ManagerMeta result = buildPartial();
+        if (!result.isInitialized()) {
+          throw newUninitializedMessageException(result);
+        }
+        return result;
+      }
+      
+      private org.apache.hedwig.protocol.PubSubProtocol.ManagerMeta buildParsed()
+          throws com.google.protobuf.InvalidProtocolBufferException {
+        org.apache.hedwig.protocol.PubSubProtocol.ManagerMeta result = buildPartial();
+        if (!result.isInitialized()) {
+          throw newUninitializedMessageException(
+            result).asInvalidProtocolBufferException();
+        }
+        return result;
+      }
+      
+      public org.apache.hedwig.protocol.PubSubProtocol.ManagerMeta buildPartial() {
+        org.apache.hedwig.protocol.PubSubProtocol.ManagerMeta result = new org.apache.hedwig.protocol.PubSubProtocol.ManagerMeta(this);
+        int from_bitField0_ = bitField0_;
+        int to_bitField0_ = 0;
+        if (((from_bitField0_ & 0x00000001) == 0x00000001)) {
+          to_bitField0_ |= 0x00000001;
+        }
+        result.managerImpl_ = managerImpl_;
+        if (((from_bitField0_ & 0x00000002) == 0x00000002)) {
+          to_bitField0_ |= 0x00000002;
+        }
+        result.managerVersion_ = managerVersion_;
+        result.bitField0_ = to_bitField0_;
+        onBuilt();
+        return result;
+      }
+      
+      public Builder mergeFrom(com.google.protobuf.Message other) {
+        if (other instanceof org.apache.hedwig.protocol.PubSubProtocol.ManagerMeta) {
+          return mergeFrom((org.apache.hedwig.protocol.PubSubProtocol.ManagerMeta)other);
+        } else {
+          super.mergeFrom(other);
+          return this;
+        }
+      }
+      
+      public Builder mergeFrom(org.apache.hedwig.protocol.PubSubProtocol.ManagerMeta other) {
+        if (other == org.apache.hedwig.protocol.PubSubProtocol.ManagerMeta.getDefaultInstance()) return this;
+        if (other.hasManagerImpl()) {
+          setManagerImpl(other.getManagerImpl());
+        }
+        if (other.hasManagerVersion()) {
+          setManagerVersion(other.getManagerVersion());
+        }
+        this.mergeUnknownFields(other.getUnknownFields());
+        return this;
+      }
+      
+      public final boolean isInitialized() {
+        if (!hasManagerImpl()) {
+          
+          return false;
+        }
+        if (!hasManagerVersion()) {
+          
+          return false;
+        }
+        return true;
+      }
+      
+      public Builder mergeFrom(
+          com.google.protobuf.CodedInputStream input,
+          com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+          throws java.io.IOException {
+        com.google.protobuf.UnknownFieldSet.Builder unknownFields =
+          com.google.protobuf.UnknownFieldSet.newBuilder(
+            this.getUnknownFields());
+        while (true) {
+          int tag = input.readTag();
+          switch (tag) {
+            case 0:
+              this.setUnknownFields(unknownFields.build());
+              onChanged();
+              return this;
+            default: {
+              if (!parseUnknownField(input, unknownFields,
+                                     extensionRegistry, tag)) {
+                this.setUnknownFields(unknownFields.build());
+                onChanged();
+                return this;
+              }
+              break;
+            }
+            case 18: {
+              bitField0_ |= 0x00000001;
+              managerImpl_ = input.readBytes();
+              break;
+            }
+            case 24: {
+              bitField0_ |= 0x00000002;
+              managerVersion_ = input.readUInt32();
+              break;
+            }
+          }
+        }
+      }
+      
+      private int bitField0_;
+      
+      // required string managerImpl = 2;
+      private java.lang.Object managerImpl_ = "";
+      public boolean hasManagerImpl() {
+        return ((bitField0_ & 0x00000001) == 0x00000001);
+      }
+      public String getManagerImpl() {
+        java.lang.Object ref = managerImpl_;
+        if (!(ref instanceof String)) {
+          String s = ((com.google.protobuf.ByteString) ref).toStringUtf8();
+          managerImpl_ = s;
+          return s;
+        } else {
+          return (String) ref;
+        }
+      }
+      public Builder setManagerImpl(String value) {
+        if (value == null) {
+    throw new NullPointerException();
+  }
+  bitField0_ |= 0x00000001;
+        managerImpl_ = value;
+        onChanged();
+        return this;
+      }
+      public Builder clearManagerImpl() {
+        bitField0_ = (bitField0_ & ~0x00000001);
+        managerImpl_ = getDefaultInstance().getManagerImpl();
+        onChanged();
+        return this;
+      }
+      void setManagerImpl(com.google.protobuf.ByteString value) {
+        bitField0_ |= 0x00000001;
+        managerImpl_ = value;
+        onChanged();
+      }
+      
+      // required uint32 managerVersion = 3;
+      private int managerVersion_ ;
+      public boolean hasManagerVersion() {
+        return ((bitField0_ & 0x00000002) == 0x00000002);
+      }
+      public int getManagerVersion() {
+        return managerVersion_;
+      }
+      public Builder setManagerVersion(int value) {
+        bitField0_ |= 0x00000002;
+        managerVersion_ = value;
+        onChanged();
+        return this;
+      }
+      public Builder clearManagerVersion() {
+        bitField0_ = (bitField0_ & ~0x00000002);
+        managerVersion_ = 0;
+        onChanged();
+        return this;
+      }
+      
+      // @@protoc_insertion_point(builder_scope:Hedwig.ManagerMeta)
+    }
+    
+    static {
+      defaultInstance = new ManagerMeta(true);
+      defaultInstance.initFields();
+    }
+    
+    // @@protoc_insertion_point(class_scope:Hedwig.ManagerMeta)
+  }
+  
   private static com.google.protobuf.Descriptors.Descriptor
     internal_static_Hedwig_Message_descriptor;
   private static
@@ -9008,6 +9471,11 @@ public final class PubSubProtocol {
   private static
     com.google.protobuf.GeneratedMessage.FieldAccessorTable
       internal_static_Hedwig_LedgerRanges_fieldAccessorTable;
+  private static com.google.protobuf.Descriptors.Descriptor
+    internal_static_Hedwig_ManagerMeta_descriptor;
+  private static
+    com.google.protobuf.GeneratedMessage.FieldAccessorTable
+      internal_static_Hedwig_ManagerMeta_fieldAccessorTable;
   
   public static com.google.protobuf.Descriptors.FileDescriptor
       getDescriptor() {
@@ -9062,19 +9530,24 @@ public final class PubSubProtocol {
       "Id\022\024\n\014messageBound\030\002 \001(\r\"O\n\013LedgerRange\022" +
       "\020\n\010ledgerId\030\001 \002(\004\022.\n\020endSeqIdIncluded\030\002 " +
       "\001(\0132\024.Hedwig.MessageSeqId\"3\n\014LedgerRange" +
-      "s\022#\n\006ranges\030\001 \003(\0132\023.Hedwig.LedgerRange*\"" +
-      "\n\017ProtocolVersion\022\017\n\013VERSION_ONE\020\001*p\n\rOp" +
-      "erationType\022\013\n\007PUBLISH\020\000\022\r\n\tSUBSCRIBE\020\001\022" +
-      "\013\n\007CONSUME\020\002\022\017\n\013UNSUBSCRIBE\020\003\022\022\n\016START_D" +
-      "ELIVERY\020\004\022\021\n\rSTOP_DELIVERY\020\005*\236\002\n\nStatusC",
-      "ode\022\013\n\007SUCCESS\020\000\022\026\n\021MALFORMED_REQUEST\020\221\003" +
-      "\022\022\n\rNO_SUCH_TOPIC\020\222\003\022\036\n\031CLIENT_ALREADY_S" +
-      "UBSCRIBED\020\223\003\022\032\n\025CLIENT_NOT_SUBSCRIBED\020\224\003" +
-      "\022\026\n\021COULD_NOT_CONNECT\020\225\003\022\017\n\nTOPIC_BUSY\020\226" +
-      "\003\022\036\n\031NOT_RESPONSIBLE_FOR_TOPIC\020\365\003\022\021\n\014SER" +
-      "VICE_DOWN\020\366\003\022\024\n\017UNCERTAIN_STATE\020\367\003\022\031\n\024UN" +
-      "EXPECTED_CONDITION\020\330\004\022\016\n\tCOMPOSITE\020\274\005B\036\n" +
-      "\032org.apache.hedwig.protocolH\001"
+      "s\022#\n\006ranges\030\001 \003(\0132\023.Hedwig.LedgerRange\":" +
+      "\n\013ManagerMeta\022\023\n\013managerImpl\030\002 \002(\t\022\026\n\016ma" +
+      "nagerVersion\030\003 \002(\r*\"\n\017ProtocolVersion\022\017\n" +
+      "\013VERSION_ONE\020\001*p\n\rOperationType\022\013\n\007PUBLI" +
+      "SH\020\000\022\r\n\tSUBSCRIBE\020\001\022\013\n\007CONSUME\020\002\022\017\n\013UNSU",
+      "BSCRIBE\020\003\022\022\n\016START_DELIVERY\020\004\022\021\n\rSTOP_DE" +
+      "LIVERY\020\005*\260\003\n\nStatusCode\022\013\n\007SUCCESS\020\000\022\026\n\021" +
+      "MALFORMED_REQUEST\020\221\003\022\022\n\rNO_SUCH_TOPIC\020\222\003" +
+      "\022\036\n\031CLIENT_ALREADY_SUBSCRIBED\020\223\003\022\032\n\025CLIE" +
+      "NT_NOT_SUBSCRIBED\020\224\003\022\026\n\021COULD_NOT_CONNEC" +
+      "T\020\225\003\022\017\n\nTOPIC_BUSY\020\226\003\022\036\n\031NOT_RESPONSIBLE" +
+      "_FOR_TOPIC\020\365\003\022\021\n\014SERVICE_DOWN\020\366\003\022\024\n\017UNCE" +
+      "RTAIN_STATE\020\367\003\022\020\n\013BAD_VERSION\020\210\004\022\036\n\031NO_T" +
+      "OPIC_PERSISTENCE_INFO\020\211\004\022\"\n\035TOPIC_PERSIS" +
+      "TENCE_INFO_EXISTS\020\212\004\022\032\n\025NO_SUBSCRIPTION_",
+      "STATE\020\213\004\022\036\n\031SUBSCRIPTION_STATE_EXISTS\020\214\004" +
+      "\022\031\n\024UNEXPECTED_CONDITION\020\330\004\022\016\n\tCOMPOSITE" +
+      "\020\274\005B\036\n\032org.apache.hedwig.protocolH\001"
     };
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
       new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -9201,6 +9674,14 @@ public final class PubSubProtocol {
               new java.lang.String[] { "Ranges", },
               org.apache.hedwig.protocol.PubSubProtocol.LedgerRanges.class,
               org.apache.hedwig.protocol.PubSubProtocol.LedgerRanges.Builder.class);
+          internal_static_Hedwig_ManagerMeta_descriptor =
+            getDescriptor().getMessageTypes().get(15);
+          internal_static_Hedwig_ManagerMeta_fieldAccessorTable = new
+            com.google.protobuf.GeneratedMessage.FieldAccessorTable(
+              internal_static_Hedwig_ManagerMeta_descriptor,
+              new java.lang.String[] { "ManagerImpl", "ManagerVersion", },
+              org.apache.hedwig.protocol.PubSubProtocol.ManagerMeta.class,
+              org.apache.hedwig.protocol.PubSubProtocol.ManagerMeta.Builder.class);
           return null;
         }
       };

Modified: zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/protobuf/PubSubProtocol.proto
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/protobuf/PubSubProtocol.proto?rev=1360300&r1=1360299&r2=1360300&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/protobuf/PubSubProtocol.proto (original)
+++ zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/protobuf/PubSubProtocol.proto Wed Jul 11 17:11:06 2012
@@ -156,7 +156,14 @@ enum StatusCode{
     NOT_RESPONSIBLE_FOR_TOPIC = 501;
     SERVICE_DOWN = 502;
     UNCERTAIN_STATE = 503;
-    
+
+    //server-side meta manager errors (52x)
+    BAD_VERSION = 520;
+    NO_TOPIC_PERSISTENCE_INFO = 521;
+    TOPIC_PERSISTENCE_INFO_EXISTS = 522;
+    NO_SUBSCRIPTION_STATE = 523;
+    SUBSCRIPTION_STATE_EXISTS = 524;
+
     //For all unexpected error conditions
     UNEXPECTED_CONDITION = 600;
     
@@ -180,3 +187,7 @@ message LedgerRanges{
     repeated LedgerRange ranges = 1;
 }
 
+message ManagerMeta {
+    required string managerImpl = 2;
+    required uint32 managerVersion = 3;
+}

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/common/ServerConfiguration.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/common/ServerConfiguration.java?rev=1360300&r1=1360299&r2=1360300&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/common/ServerConfiguration.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/common/ServerConfiguration.java Wed Jul 11 17:11:06 2012
@@ -30,7 +30,9 @@ import java.util.List;
 import org.apache.commons.configuration.ConfigurationException;
 
 import com.google.protobuf.ByteString;
+import org.apache.bookkeeper.util.ReflectionUtils;
 import org.apache.hedwig.conf.AbstractConfiguration;
+import org.apache.hedwig.server.meta.MetadataManagerFactory;
 import org.apache.hedwig.util.HedwigSocketAddress;
 
 public class ServerConfiguration extends AbstractConfiguration {
@@ -60,6 +62,17 @@ public class ServerConfiguration extends
     protected final static String BK_QUORUM_SIZE = "bk_quorum_size";
     protected final static String RETRY_REMOTE_SUBSCRIBE_THREAD_RUN_INTERVAL = "retry_remote_subscribe_thread_run_interval";
 
+    // manager related settings
+    protected final static String METADATA_MANAGER_FACTORY_CLASS = "metadata_manager_factory_class";
+
+    private static ClassLoader defaultLoader;
+    static {
+        defaultLoader = Thread.currentThread().getContextClassLoader();
+        if (null == defaultLoader) {
+            defaultLoader = ServerConfiguration.class.getClassLoader();
+        }
+    }
+
     // these are the derived attributes
     protected ByteString myRegionByteString = null;
     protected HedwigSocketAddress myServerAddress = null;
@@ -153,6 +166,17 @@ public class ServerConfiguration extends
         return sb.append(getZkPrefix()).append("/").append(getMyRegion());
     }
 
+    /**
+     * Get znode path to store manager layouts.
+     *
+     * @param sb
+     *          StringBuilder to store znode path to store manager layouts.
+     * @return znode path to store manager layouts.
+     */
+    public StringBuilder getZkManagersPrefix(StringBuilder sb) {
+        return getZkRegionPrefix(sb).append("/managers");
+    }
+
     public StringBuilder getZkTopicsPrefix(StringBuilder sb) {
         return getZkRegionPrefix(sb).append("/topics");
     }
@@ -296,4 +320,28 @@ public class ServerConfiguration extends
 
         // add other checks here
     }
+
+    /**
+     * Get metadata manager factory class.
+     *
+     * @return manager class
+     */
+    public Class<? extends MetadataManagerFactory> getMetadataManagerFactoryClass()
+    throws ConfigurationException {
+        return ReflectionUtils.getClass(conf, METADATA_MANAGER_FACTORY_CLASS,
+                                        null, MetadataManagerFactory.class,
+                                        defaultLoader);
+    }
+
+    /**
+     * Set metadata manager factory class name
+     *
+     * @param managerClsName
+     *          Manager Class Name
+     * @return server configuration
+     */
+    public ServerConfiguration setMetadataManagerFactoryName(String managerClsName) {
+        conf.setProperty(METADATA_MANAGER_FACTORY_CLASS, managerClsName);
+        return this;
+    }
 }

Added: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/meta/FactoryLayout.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/meta/FactoryLayout.java?rev=1360300&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/meta/FactoryLayout.java (added)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/meta/FactoryLayout.java Wed Jul 11 17:11:06 2012
@@ -0,0 +1,149 @@
+package org.apache.hedwig.server.meta;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.StringReader;
+
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.ZooKeeper;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.protobuf.TextFormat;
+import com.google.protobuf.InvalidProtocolBufferException;
+import org.apache.hedwig.protocol.PubSubProtocol.ManagerMeta;
+import org.apache.hedwig.server.common.ServerConfiguration;
+import org.apache.hedwig.zookeeper.ZkUtils;
+
+/**
+ * This class encapsulates metadata manager layout information
+ * that is persistently stored in zookeeper.
+ * It provides parsing and serialization methods of such information.
+ *
+ */
+public class FactoryLayout {
+    static final Logger logger = LoggerFactory.getLogger(FactoryLayout.class);
+
+    // metadata manager name
+    public static final String NAME = "METADATA";
+    // Znode name to store layout information
+    public static final String LAYOUT_ZNODE = "LAYOUT";
+    public static final String LSEP = "\n";
+
+    private ManagerMeta managerMeta;
+
+    /**
+     * Construct metadata manager factory layout.
+     *
+     * @param meta
+     *          Meta describes what kind of factory used.
+     */
+    public FactoryLayout(ManagerMeta meta) {
+        this.managerMeta = meta;
+    }
+
+    public static String getFactoryLayoutPath(StringBuilder sb, ServerConfiguration cfg) {
+        return cfg.getZkManagersPrefix(sb).append("/").append(NAME)
+               .append("/").append(LAYOUT_ZNODE).toString();
+    }
+
+    public ManagerMeta getManagerMeta() {
+        return managerMeta;
+    }
+
+    /**
+     * Store the factory layout into zookeeper
+     *
+     * @param zk
+     *          ZooKeeper Handle
+     * @param cfg
+     *          Server Configuration Object
+     * @throws KeeperException
+     * @throws IOException
+     * @throws InterruptedException
+     */
+    public void store(ZooKeeper zk, ServerConfiguration cfg)
+    throws KeeperException, IOException, InterruptedException {
+        String factoryLayoutPath = getFactoryLayoutPath(new StringBuilder(), cfg);
+
+        byte[] layoutData = TextFormat.printToString(managerMeta).getBytes();
+        ZkUtils.createFullPathOptimistic(zk, factoryLayoutPath, layoutData,
+                                         Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+    }
+
+    @Override
+    public int hashCode() {
+        return managerMeta.hashCode();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (null == o ||
+            !(o instanceof FactoryLayout)) {
+            return false;
+        }
+        FactoryLayout other = (FactoryLayout)o;
+        return managerMeta.equals(other.managerMeta);
+    }
+
+    @Override
+    public String toString() {
+        return TextFormat.printToString(managerMeta);
+    }
+
+    /**
+     * Read factory layout from zookeeper
+     *
+     * @param zk
+     *          ZooKeeper Client
+     * @param cfg
+     *          Server configuration object
+     * @return Factory layout, or null if none set in zookeeper
+     */
+    public static FactoryLayout readLayout(final ZooKeeper zk,
+                                           final ServerConfiguration cfg)
+    throws IOException, KeeperException {
+        String factoryLayoutPath = getFactoryLayoutPath(new StringBuilder(), cfg);
+        byte[] layoutData;
+        try {
+            layoutData = zk.getData(factoryLayoutPath, false, null);
+        } catch (KeeperException.NoNodeException nne) {
+            return null;
+        } catch (InterruptedException ie) {
+            throw new IOException(ie);
+        }
+        ManagerMeta meta;
+        try {
+            BufferedReader reader = new BufferedReader(
+                new StringReader(new String(layoutData)));
+            ManagerMeta.Builder metaBuilder = ManagerMeta.newBuilder();
+            TextFormat.merge(reader, metaBuilder);
+            meta = metaBuilder.build();
+        } catch (InvalidProtocolBufferException ipbe) {
+            throw new IOException("Corrupted factory layout : ", ipbe);
+        }
+
+        return new FactoryLayout(meta);
+    }
+}

Added: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/meta/MetadataManagerFactory.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/meta/MetadataManagerFactory.java?rev=1360300&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/meta/MetadataManagerFactory.java (added)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/meta/MetadataManagerFactory.java Wed Jul 11 17:11:06 2012
@@ -0,0 +1,163 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server.meta;
+
+import java.io.IOException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.bookkeeper.util.ReflectionUtils;
+import org.apache.hedwig.protocol.PubSubProtocol.ManagerMeta;
+import org.apache.hedwig.server.common.ServerConfiguration;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooKeeper;
+
+/**
+ * Metadata Manager used to manage metadata used by hedwig.
+ */
+public abstract class MetadataManagerFactory {
+
+    static final Logger LOG = LoggerFactory.getLogger(MetadataManagerFactory.class);
+
+    /**
+     * Return current factory version.
+     *
+     * @return current version used by factory.
+     */
+    public abstract int getCurrentVersion();
+
+    /**
+     * Initialize the metadata manager factory with given
+     * configuration and version.
+     *
+     * @param cfg
+     *          Server configuration object
+     * @param zk
+     *          ZooKeeper handler
+     * @param version
+     *          Manager version
+     * @return metadata manager factory
+     * @throws IOException when fail to initialize the manager.
+     */
+    protected abstract MetadataManagerFactory initialize(
+        ServerConfiguration cfg, ZooKeeper zk, int version)
+    throws IOException;
+
+    /**
+     * Uninitialize the factory.
+     *
+     * @throws IOException when fail to shutdown the factory.
+     */
+    public abstract void shutdown() throws IOException;
+
+    /**
+     * Create topic persistence manager.
+     *
+     * @return topic persistence manager
+     */
+    public abstract TopicPersistenceManager newTopicPersistenceManager();
+
+    /**
+     * Create subscription data manager.
+     *
+     * @return subscription data manager.
+     */
+    public abstract SubscriptionDataManager newSubscriptionDataManager();
+
+    /**
+     * Create new Metadata Manager Factory.
+     *
+     * @param conf
+     *          Configuration Object.
+     * @param zk
+     *          ZooKeeper Client Handle, talk to zk to know which manager factory is used.
+     * @return new manager factory.
+     * @throws IOException
+     */
+    public static MetadataManagerFactory newMetadataManagerFactory(
+        final ServerConfiguration conf, final ZooKeeper zk)
+    throws IOException, KeeperException, InterruptedException {
+        Class<? extends MetadataManagerFactory> factoryClass;
+        try {
+            factoryClass = conf.getMetadataManagerFactoryClass();
+        } catch (Exception e) {
+            throw new IOException("Failed to get metadata manager factory class from configuration : ", e);
+        }
+
+        MetadataManagerFactory managerFactory;
+
+        // check that the configured manager is
+        // compatible with the existing layout
+        FactoryLayout layout = FactoryLayout.readLayout(zk, conf);
+        if (layout == null) { // no existing layout
+            // use default manager if no one provided
+            if (factoryClass == null) {
+                factoryClass = ZkMetadataManagerFactory.class;
+            }
+            try {
+                managerFactory = ReflectionUtils.newInstance(factoryClass);
+            } catch (Throwable t) {
+                throw new IOException("Fail to instantiate metadata manager factory : " + factoryClass, t);
+            }
+            ManagerMeta managerMeta = ManagerMeta.newBuilder()
+                                      .setManagerImpl(factoryClass.getName())
+                                      .setManagerVersion(managerFactory.getCurrentVersion())
+                                      .build();
+            layout = new FactoryLayout(managerMeta);
+            try {
+                layout.store(zk, conf);
+            } catch (KeeperException.NodeExistsException nee) {
+                FactoryLayout layout2 = FactoryLayout.readLayout(zk, conf);
+                if (!layout2.equals(layout)) {
+                    throw new IOException("Contention writing to layout to zookeeper, "
+                            + " other layout " + layout2 + " is incompatible with our "
+                            + "layout " + layout);
+                }
+            }
+            return managerFactory.initialize(conf, zk, layout.getManagerMeta().getManagerVersion());
+        }
+        LOG.debug("read meta layout {}", layout);
+
+        if (factoryClass != null &&
+            !layout.getManagerMeta().getManagerImpl().equals(factoryClass.getName())) {
+            throw new IOException("Configured metadata manager factory " + factoryClass.getName()
+                                + " does not match existing factory "  + layout.getManagerMeta().getManagerImpl());
+        }
+        if (factoryClass == null) {
+            // no factory specified in configuration
+            String factoryClsName = layout.getManagerMeta().getManagerImpl();
+            try {
+                Class<?> theCls = Class.forName(factoryClsName);
+                if (!MetadataManagerFactory.class.isAssignableFrom(theCls)) {
+                    throw new IOException("Wrong metadata manager factory " + factoryClsName);
+                }
+                factoryClass = theCls.asSubclass(MetadataManagerFactory.class);
+            } catch (ClassNotFoundException cnfe) {
+                throw new IOException("No class found to instantiate metadata manager factory " + factoryClsName);
+            }
+        }
+        // instantiate the metadata manager factory
+        try {
+            managerFactory = ReflectionUtils.newInstance(factoryClass);
+        } catch (Throwable t) {
+            throw new IOException("Failed to instantiate metadata manager factory : " + factoryClass, t);
+        }
+        return managerFactory.initialize(conf, zk, layout.getManagerMeta().getManagerVersion());
+    }
+}

Added: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/meta/SubscriptionDataManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/meta/SubscriptionDataManager.java?rev=1360300&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/meta/SubscriptionDataManager.java (added)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/meta/SubscriptionDataManager.java Wed Jul 11 17:11:06 2012
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server.meta;
+
+import java.io.Closeable;
+import java.util.Map;
+
+import com.google.protobuf.ByteString;
+
+import org.apache.hedwig.exceptions.PubSubException;
+import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionState;
+import org.apache.hedwig.server.subscriptions.InMemorySubscriptionState;
+import org.apache.hedwig.util.Callback;
+
+/**
+ * Manage subscription data.
+ */
+public interface SubscriptionDataManager extends Closeable {
+
+    /**
+     * Create subscription state.
+     *
+     * @param topic
+     *          Topic name
+     * @param subscriberId
+     *          Subscriber id
+     * @param state
+     *          Subscription state
+     * @param callback
+     *          Callback when subscription state created.
+     *          {@link PubSubException.SubscriptionStateExistsException} is returned when subscription state
+     *          existed before.
+     * @param ctx
+     *          Context of the callback
+     */
+    public void createSubscriptionState(ByteString topic, ByteString subscriberId, SubscriptionState state,
+                                        Callback<Void> callback, Object ctx);
+
+    /**
+     * Update subscription state.
+     *
+     * @param topic
+     *          Topic name
+     * @param subscriberId
+     *          Subscriber id
+     * @param state
+     *          Subscription state
+     * @param callback
+     *          Callback when subscription state updated.
+     *          {@link PubSubException.NoSubscriptionStateException} is returned when no subscription state
+     *          is found.
+     * @param ctx
+     *          Context of the callback
+     */
+    public void updateSubscriptionState(ByteString topic, ByteString subscriberId, SubscriptionState state,
+                                        Callback<Void> callback, Object ctx);
+
+    /**
+     * Remove subscription state.
+     *
+     * @param topic
+     *          Topic name
+     * @param subscriberId
+     *          Subscriber id
+     * @param callback
+     *          Callback when subscription state deleted
+     *          {@link PubSubException.NoSubscriptionStateException} is returned when no subscription state
+     *          is found.
+     * @param ctx
+     *          Context of the callback
+     */
+    public void deleteSubscriptionState(ByteString topic, ByteString subscriberId,
+                                        Callback<Void> callback, Object ctx);
+
+    /**
+     * Read subscription state.
+     *
+     * @param topic
+     *          Topic Name
+     * @param subscriberId
+     *          Subscriber id
+     * @param callback
+     *          Callback when subscription state read.
+     *          Null is returned when no subscription state is found.
+     * @param ctx
+     *          Context of the callback
+     */
+    public void readSubscriptionState(ByteString topic, ByteString subscriberId,
+                                      Callback<SubscriptionState> callback, Object ctx);
+
+    /**
+     * Read all subscriptions of a topic.
+     *
+     * @param topic
+     *          Topic name
+     * @param callback
+     *          Callback to return subscriptions
+     * @param ctx
+     *          Contxt of the callback
+     */
+    public void readSubscriptions(ByteString topic, Callback<Map<ByteString, InMemorySubscriptionState>> cb,
+                                  Object ctx);
+}

Added: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/meta/TopicPersistenceManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/meta/TopicPersistenceManager.java?rev=1360300&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/meta/TopicPersistenceManager.java (added)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/meta/TopicPersistenceManager.java Wed Jul 11 17:11:06 2012
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server.meta;
+
+import java.io.Closeable;
+
+import com.google.protobuf.ByteString;
+
+import org.apache.bookkeeper.versioning.Version;
+import org.apache.bookkeeper.versioning.Versioned;
+import org.apache.hedwig.exceptions.PubSubException;
+import org.apache.hedwig.protocol.PubSubProtocol.LedgerRanges;
+import org.apache.hedwig.util.Callback;
+
+/**
+ * Manage topic persistence metadata.
+ */
+public interface TopicPersistenceManager extends Closeable {
+
+    /**
+     * Read persistence info of a specified topic.
+     *
+     * @param topic
+     *          Topic Name
+     * @param callback
+     *          Callback when read persistence info.
+     *          If no persistence info found, return null.
+     * @param ctx
+     *          Context of the callback
+     */
+    public void readTopicPersistenceInfo(ByteString topic,
+                                         Callback<Versioned<LedgerRanges>> callback, Object ctx);
+
+    /**
+     * Update persistence info of a specified topic.
+     *
+     * @param topic
+     *          Topic name
+     * @param ranges
+     *          Persistence info
+     * @param version
+     *          Current version of persistence info.
+     *          If <code>version</code> is null, create persistence info;
+     *          {@link PubSubException.TopicPersistenceInfoExistsException} is returned when
+     *          persistence info existed before.
+     *          If <code>version</code> is not null, the persitence info is updated only when
+     *          provided version equals to its current version.
+     *          {@link PubSubException.BadVersionException} is returned when version doesn't match,
+     *          {@link PubSubException.NoTopicPersistenceInfoException} is returned when no
+     *          persistence info found to update.
+     * @param callback
+     *          Callback when persistence info updated. New version would be returned.
+     * @param ctx
+     *          Context of the callback
+     */
+    public void writeTopicPersistenceInfo(ByteString topic, LedgerRanges ranges, Version version,
+                                          Callback<Version> callback, Object ctx);
+
+    /**
+     * Delete persistence info of a specified topic.
+     * Currently used in test cases.
+     *
+     * @param topic
+     *          Topic name
+     * @param version
+     *          Current version of persistence info
+     *          If <code>version</code> is null, delete persistence info no matter its current version.
+     *          If <code>version</code> is not null, the persitence info is deleted only when
+     *          provided version equals to its current version.
+     * @param callback
+     *          Callback return whether the deletion succeed.
+     *          {@link PubSubException.NoTopicPersistenceInfoException} is returned when no persistence.
+     *          {@link PubSubException.BadVersionException} is returned when version doesn't match.
+     *          info found to delete.
+     * @param ctx
+     *          Context of the callback
+     */
+    public void deleteTopicPersistenceInfo(ByteString topic, Version version,
+                                           Callback<Void> callback, Object ctx);
+
+}

Added: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/meta/ZkMetadataManagerFactory.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/meta/ZkMetadataManagerFactory.java?rev=1360300&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/meta/ZkMetadataManagerFactory.java (added)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/meta/ZkMetadataManagerFactory.java Wed Jul 11 17:11:06 2012
@@ -0,0 +1,559 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server.meta;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.KeeperException.Code;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.data.Stat;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.InvalidProtocolBufferException;
+import org.apache.bookkeeper.versioning.Version;
+import org.apache.bookkeeper.versioning.Versioned;
+import org.apache.bookkeeper.meta.ZkVersion;
+import org.apache.hedwig.exceptions.PubSubException;
+import org.apache.hedwig.protocol.PubSubProtocol.LedgerRanges;
+import org.apache.hedwig.protocol.PubSubProtocol.StatusCode;
+import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionState;
+import org.apache.hedwig.protoextensions.SubscriptionStateUtils;
+import org.apache.hedwig.server.common.ServerConfiguration;
+import org.apache.hedwig.server.subscriptions.InMemorySubscriptionState;
+import org.apache.hedwig.util.Callback;
+import org.apache.hedwig.zookeeper.SafeAsyncZKCallback;
+import org.apache.hedwig.zookeeper.ZkUtils;
+
+/**
+ * ZooKeeper-based Metadata Manager.
+ */
+public class ZkMetadataManagerFactory extends MetadataManagerFactory {
+    protected final static Logger logger = LoggerFactory.getLogger(ZkMetadataManagerFactory.class);
+
+    static final int CUR_VERSION = 1;
+
+    ZooKeeper zk;
+    ServerConfiguration cfg;
+
+    @Override
+    public int getCurrentVersion() {
+        return CUR_VERSION;
+    }
+
+    @Override
+    public MetadataManagerFactory initialize(ServerConfiguration cfg,
+                                             ZooKeeper zk,
+                                             int version)
+    throws IOException {
+        if (CUR_VERSION != version) {
+            throw new IOException("Incompatible ZkMetadataManagerFactory version " + version
+                                + " found, expected version " + CUR_VERSION);
+        }
+        this.cfg = cfg;
+        this.zk = zk;
+        return this;
+    }
+
+    @Override
+    public void shutdown() {
+        // do nothing here, because zookeeper handle is passed from outside
+        // we don't need to stop it.
+    }
+
+    @Override
+    public TopicPersistenceManager newTopicPersistenceManager() {
+        return new ZkTopicPersistenceManagerImpl(cfg, zk);
+    }
+
+    @Override
+    public SubscriptionDataManager newSubscriptionDataManager() {
+        return new ZkSubscriptionDataManagerImpl(cfg, zk);
+    }
+
+    /**
+     * ZooKeeper based topic persistence manager.
+     */
+    static class ZkTopicPersistenceManagerImpl implements TopicPersistenceManager {
+
+        ZooKeeper zk;
+        ServerConfiguration cfg;
+
+        ZkTopicPersistenceManagerImpl(ServerConfiguration conf, ZooKeeper zk) {
+            this.cfg = conf;
+            this.zk = zk;
+        }
+
+        @Override
+        public void close() throws IOException {
+            // do nothing in zookeeper based impl
+        }
+
+        /**
+         * Get znode path to store persistence info of a topic.
+         *
+         * @param topic
+         *          Topic name
+         * @return znode path to store persistence info.
+         */
+        private String ledgersPath(ByteString topic) {
+            return cfg.getZkTopicPath(new StringBuilder(), topic).append("/ledgers").toString();
+        }
+
+        /**
+         * Parse ledger ranges data and return it thru callback.
+         *
+         * @param topic
+         *          Topic name
+         * @param data
+         *          Topic Ledger Ranges data
+         * @param version
+         *          Version of the topic ledger ranges data
+         * @param callback
+         *          Callback to return ledger ranges
+         * @param ctx
+         *          Context of the callback
+         */
+        private void parseAndReturnTopicLedgerRanges(ByteString topic, byte[] data, int version,
+                                                     Callback<Versioned<LedgerRanges>> callback, Object ctx) {
+            try {
+                Versioned<LedgerRanges> ranges = new Versioned<LedgerRanges>(LedgerRanges.parseFrom(data),
+                                                                             new ZkVersion(version));
+                callback.operationFinished(ctx, ranges);
+                return;
+            } catch (InvalidProtocolBufferException e) {
+                String msg = "Ledger ranges for topic:" + topic.toStringUtf8() + " could not be deserialized";
+                logger.error(msg, e);
+                callback.operationFailed(ctx, new PubSubException.UnexpectedConditionException(msg));
+                return;
+            }
+        }
+
+        @Override
+        public void readTopicPersistenceInfo(final ByteString topic,
+                                             final Callback<Versioned<LedgerRanges>> callback,
+                                             Object ctx) {
+            // read topic ledgers node data
+            final String zNodePath = ledgersPath(topic);
+
+            zk.getData(zNodePath, false, new SafeAsyncZKCallback.DataCallback() {
+                @Override
+                public void safeProcessResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
+                    if (rc == Code.OK.intValue()) {
+                        parseAndReturnTopicLedgerRanges(topic, data, stat.getVersion(), callback, ctx);
+                        return;
+                    }
+
+                    if (rc == Code.NONODE.intValue()) {
+                        // we don't create the znode until we first write it.
+                        callback.operationFinished(ctx, null);
+                        return;
+                    }
+
+                    // otherwise some other error
+                    KeeperException ke =
+                        ZkUtils.logErrorAndCreateZKException("Could not read ledgers node for topic: "
+                                                             + topic.toStringUtf8(), path, rc);
+                    callback.operationFailed(ctx, new PubSubException.ServiceDownException(ke));
+                }
+            }, ctx);
+        }
+
+        private void createTopicPersistenceInfo(final ByteString topic, LedgerRanges ranges,
+                                                final Callback<Version> callback, Object ctx) {
+            final String zNodePath = ledgersPath(topic);
+            final byte[] data = ranges.toByteArray();
+            // create it
+            ZkUtils.createFullPathOptimistic(zk, zNodePath, data, Ids.OPEN_ACL_UNSAFE,
+            CreateMode.PERSISTENT, new SafeAsyncZKCallback.StringCallback() {
+                @Override
+                public void safeProcessResult(int rc, String path, Object ctx, String name) {
+                    if (rc == Code.NODEEXISTS.intValue()) {
+                        callback.operationFailed(ctx, PubSubException.create(StatusCode.TOPIC_PERSISTENCE_INFO_EXISTS,
+                                                      "Persistence info of topic " + topic.toStringUtf8() + " existed."));
+                        return;
+                    }
+                    if (rc != Code.OK.intValue()) {
+                        KeeperException ke = ZkUtils.logErrorAndCreateZKException(
+                                             "Could not create ledgers node for topic: " + topic.toStringUtf8(),
+                                             path, rc);
+                        callback.operationFailed(ctx, new PubSubException.ServiceDownException(ke));
+                        return;
+                    }
+                    // initial version is version 0
+                    callback.operationFinished(ctx, new ZkVersion(0));
+                }
+            }, ctx);
+            return;
+        }
+
+        @Override
+        public void writeTopicPersistenceInfo(final ByteString topic, LedgerRanges ranges, final Version version,
+                                              final Callback<Version> callback, Object ctx) {
+            if (null == version) {
+                createTopicPersistenceInfo(topic, ranges, callback, ctx);
+                return;
+            }
+
+            final String zNodePath = ledgersPath(topic);
+            final byte[] data = ranges.toByteArray();
+
+            if (!(version instanceof ZkVersion)) {
+                callback.operationFailed(ctx, new PubSubException.UnexpectedConditionException(
+                                              "Invalid version provided to update persistence info for topic " + topic.toStringUtf8()));
+                return;
+            }
+
+            int znodeVersion = ((ZkVersion)version).getZnodeVersion();
+            zk.setData(zNodePath, data, znodeVersion, new SafeAsyncZKCallback.StatCallback() {
+                    @Override
+                    public void safeProcessResult(int rc, String path, Object ctx, Stat stat) {
+                        if (rc == Code.NONODE.intValue()) {
+                            // no node
+                            callback.operationFailed(ctx, PubSubException.create(StatusCode.NO_TOPIC_PERSISTENCE_INFO,
+                                                          "No persistence info found for topic " + topic.toStringUtf8()));
+                            return;
+                        } else if (rc == Code.BadVersion) {
+                            // bad version
+                            callback.operationFailed(ctx, PubSubException.create(StatusCode.BAD_VERSION,
+                                                          "Bad version provided to update persistence info of topic " + topic.toStringUtf8()));
+                            return;
+                        } else if (rc == Code.OK.intValue()) {
+                            callback.operationFinished(ctx, new ZkVersion(stat.getVersion()));
+                            return;
+                        } else {
+                            KeeperException ke = ZkUtils.logErrorAndCreateZKException(
+                                    "Could not write ledgers node for topic: " + topic.toStringUtf8(), path, rc);
+                            callback.operationFailed(ctx, new PubSubException.ServiceDownException(ke));
+                            return;
+                        }
+                    }
+            }, ctx);
+        }
+
+        @Override
+        public void deleteTopicPersistenceInfo(final ByteString topic, final Version version,
+                                               final Callback<Void> callback, Object ctx) {
+            final String zNodePath = ledgersPath(topic);
+
+            int znodeVersion = -1;
+            if (null != version) {
+                if (!(version instanceof ZkVersion)) {
+                    callback.operationFailed(ctx, new PubSubException.UnexpectedConditionException(
+                                                  "Invalid version provided to delete persistence info for topic " + topic.toStringUtf8()));
+                    return;
+                } else {
+                    znodeVersion = ((ZkVersion)version).getZnodeVersion();
+                }
+            }
+            zk.delete(zNodePath, znodeVersion, new SafeAsyncZKCallback.VoidCallback() {
+                @Override
+                public void safeProcessResult(int rc, String path, Object ctx) {
+                    if (rc == Code.OK.intValue()) {
+                        callback.operationFinished(ctx, null);
+                        return;
+                    } else if (rc == Code.NONODE.intValue()) {
+                        // no node
+                        callback.operationFailed(ctx, PubSubException.create(StatusCode.NO_TOPIC_PERSISTENCE_INFO,
+                                                      "No persistence info found for topic " + topic.toStringUtf8()));
+                        return;
+                    } else if (rc == Code.BadVersion) {
+                        // bad version
+                        callback.operationFailed(ctx, PubSubException.create(StatusCode.BAD_VERSION,
+                                                      "Bad version provided to delete persistence info of topic " + topic.toStringUtf8()));
+                        return;
+                    }
+
+                    KeeperException e = ZkUtils.logErrorAndCreateZKException("Topic: " + topic.toStringUtf8()
+                                        + " failed to delete persistence info @version " + version + " : ", path, rc);
+                    callback.operationFailed(ctx, new PubSubException.ServiceDownException(e));
+                }
+            }, ctx);
+        }
+    }
+
+    /**
+     * ZooKeeper based subscription data manager.
+     */
+    static class ZkSubscriptionDataManagerImpl implements SubscriptionDataManager {
+
+        ZooKeeper zk;
+        ServerConfiguration cfg;
+
+        ZkSubscriptionDataManagerImpl(ServerConfiguration conf, ZooKeeper zk) {
+            this.cfg = conf;
+            this.zk = zk;
+        }
+
+        @Override
+        public void close() throws IOException {
+            // do nothing in zookeeper based impl
+        }
+
+        /**
+         * Get znode path to store subscription states.
+         *
+         * @param sb
+         *          String builder to store the znode path.
+         * @param topic
+         *          Topic name.
+         *
+         * @return string builder to store znode path.
+         */
+        private StringBuilder topicSubscribersPath(StringBuilder sb, ByteString topic) {
+            return cfg.getZkTopicPath(sb, topic).append("/subscribers");
+        }
+
+        /**
+         * Get znode path to store subscription state for a specified subscriber.
+         *
+         * @param topic
+         *          Topic name.
+         * @param subscriber
+         *          Subscriber id.
+         * @return znode path to store subscription state.
+         */
+        private String topicSubscriberPath(ByteString topic, ByteString subscriber) {
+            return topicSubscribersPath(new StringBuilder(), topic).append("/").append(subscriber.toStringUtf8())
+                   .toString();
+        }
+
+        @Override
+        public void createSubscriptionState(final ByteString topic, final ByteString subscriberId, final SubscriptionState state,
+                                            final Callback<Void> callback, final Object ctx) {
+            ZkUtils.createFullPathOptimistic(zk, topicSubscriberPath(topic, subscriberId), state.toByteArray(),
+            Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, new SafeAsyncZKCallback.StringCallback() {
+
+                @Override
+                public void safeProcessResult(int rc, String path, Object ctx, String name) {
+
+                    if (rc == Code.NODEEXISTS.intValue()) {
+                        callback.operationFailed(ctx, PubSubException.create(StatusCode.SUBSCRIPTION_STATE_EXISTS,
+                                                      "Subscription state for (topic:" + topic.toStringUtf8() + ", subscriber:"
+                                                      + subscriberId.toStringUtf8() + ") existed."));
+                        return;
+                    } else if (rc == Code.OK.intValue()) {
+                        if (logger.isDebugEnabled()) {
+                            logger.debug("Successfully recorded subscription for topic: " + topic.toStringUtf8()
+                                         + " subscriberId: " + subscriberId.toStringUtf8() + " state: "
+                                         + SubscriptionStateUtils.toString(state));
+                        }
+                        callback.operationFinished(ctx, null);
+                    } else {
+                        KeeperException ke = ZkUtils.logErrorAndCreateZKException(
+                                                 "Could not record new subscription for topic: " + topic.toStringUtf8()
+                                                 + " subscriberId: " + subscriberId.toStringUtf8(), path, rc);
+                        callback.operationFailed(ctx, new PubSubException.ServiceDownException(ke));
+                    }
+                }
+            }, ctx);
+        }
+
+        @Override
+        public void updateSubscriptionState(final ByteString topic, final ByteString subscriberId, final SubscriptionState state,
+                                            final Callback<Void> callback, final Object ctx) {
+            zk.setData(topicSubscriberPath(topic, subscriberId), state.toByteArray(), -1,
+            new SafeAsyncZKCallback.StatCallback() {
+                @Override
+                public void safeProcessResult(int rc, String path, Object ctx, Stat stat) {
+                    if (rc == Code.NONODE.intValue()) {
+                        // no node
+                        callback.operationFailed(ctx, PubSubException.create(StatusCode.NO_SUBSCRIPTION_STATE,
+                                                      "No subscription state found for (topic:" + topic.toStringUtf8() + ", subscriber:"
+                                                      + subscriberId.toStringUtf8() + ")."));
+                        return;
+                    } else if (rc != Code.OK.intValue()) {
+                        KeeperException e = ZkUtils.logErrorAndCreateZKException("Topic: " + topic.toStringUtf8()
+                                            + " subscriberId: " + subscriberId.toStringUtf8()
+                                            + " could not set subscription state: " + SubscriptionStateUtils.toString(state),
+                                            path, rc);
+                        callback.operationFailed(ctx, new PubSubException.ServiceDownException(e));
+                    } else {
+                        if (logger.isDebugEnabled()) {
+                            logger.debug("Successfully updated subscription for topic: " + topic.toStringUtf8()
+                                         + " subscriberId: " + subscriberId.toStringUtf8() + " state: "
+                                         + SubscriptionStateUtils.toString(state));
+                        }
+
+                        callback.operationFinished(ctx, null);
+                    }
+                }
+            }, ctx);
+        }
+
+        @Override
+        public void deleteSubscriptionState(final ByteString topic, final ByteString subscriberId,
+                                            final Callback<Void> callback, Object ctx) {
+            zk.delete(topicSubscriberPath(topic, subscriberId), -1, new SafeAsyncZKCallback.VoidCallback() {
+                @Override
+                public void safeProcessResult(int rc, String path, Object ctx) {
+                    if (rc == Code.NONODE.intValue()) {
+                        // no node
+                        callback.operationFailed(ctx, PubSubException.create(StatusCode.NO_SUBSCRIPTION_STATE,
+                                                      "No subscription state found for (topic:" + topic.toStringUtf8() + ", subscriber:"
+                                                      + subscriberId.toStringUtf8() + ")."));
+                        return;
+                    } else if (rc == Code.OK.intValue()) {
+                        if (logger.isDebugEnabled()) {
+                            logger.debug("Successfully deleted subscription for topic: " + topic.toStringUtf8()
+                                         + " subscriberId: " + subscriberId.toStringUtf8());
+                        }
+
+                        callback.operationFinished(ctx, null);
+                        return;
+                    }
+
+                    KeeperException e = ZkUtils.logErrorAndCreateZKException("Topic: " + topic.toStringUtf8()
+                                        + " subscriberId: " + subscriberId.toStringUtf8() + " failed to delete subscription", path, rc);
+                    callback.operationFailed(ctx, new PubSubException.ServiceDownException(e));
+                }
+            }, ctx);
+        }
+
+        @Override
+        public void readSubscriptionState(final ByteString topic, final ByteString subscriberId,
+                                          final Callback<SubscriptionState> callback, final Object ctx) {
+            zk.getData(topicSubscriberPath(topic, subscriberId), false, new SafeAsyncZKCallback.DataCallback() {
+                @Override
+                public void safeProcessResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
+                    if (rc == Code.NONODE.intValue()) {
+                        callback.operationFinished(ctx, null);
+                        return;
+                    }
+                    if (rc != Code.OK.intValue()) {
+                        KeeperException e = ZkUtils.logErrorAndCreateZKException(
+                                                "Could not read subscription data for topic: " + topic.toStringUtf8()
+                                                + ", subscriberId: " + subscriberId.toStringUtf8(), path, rc);
+                        callback.operationFailed(ctx, new PubSubException.ServiceDownException(e));
+                        return;
+                    }
+
+                    SubscriptionState state;
+                    try {
+                        state = SubscriptionState.parseFrom(data);
+                    } catch (InvalidProtocolBufferException ex) {
+                        String msg = "Failed to deserialize state for topic: " + topic.toStringUtf8()
+                                     + " subscriberId: " + subscriberId.toStringUtf8();
+                        logger.error(msg, ex);
+                        callback.operationFailed(ctx, new PubSubException.UnexpectedConditionException(msg));
+                        return;
+                    }
+
+                    if (logger.isDebugEnabled()) {
+                        logger.debug("Found subscription while acquiring topic: " + topic.toStringUtf8()
+                                     + " subscriberId: " + subscriberId.toStringUtf8()
+                                     + "state: " + SubscriptionStateUtils.toString(state));
+                    }
+                    callback.operationFinished(ctx, state);
+                }
+            }, ctx);
+        }
+
+        @Override
+        public void readSubscriptions(final ByteString topic,
+                                      final Callback<Map<ByteString, InMemorySubscriptionState>> cb, final Object ctx) {
+            String topicSubscribersPath = topicSubscribersPath(new StringBuilder(), topic).toString();
+            zk.getChildren(topicSubscribersPath, false, new SafeAsyncZKCallback.ChildrenCallback() {
+                @Override
+                public void safeProcessResult(int rc, String path, final Object ctx, final List<String> children) {
+
+                    if (rc != Code.OK.intValue() && rc != Code.NONODE.intValue()) {
+                        KeeperException e = ZkUtils.logErrorAndCreateZKException("Could not read subscribers for topic "
+                                            + topic.toStringUtf8(), path, rc);
+                        cb.operationFailed(ctx, new PubSubException.ServiceDownException(e));
+                        return;
+                    }
+
+                    final Map<ByteString, InMemorySubscriptionState> topicSubs = new ConcurrentHashMap<ByteString, InMemorySubscriptionState>();
+
+                    if (rc == Code.NONODE.intValue() || children.size() == 0) {
+                        if (logger.isDebugEnabled()) {
+                            logger.debug("No subscriptions found while acquiring topic: " + topic.toStringUtf8());
+                        }
+                        cb.operationFinished(ctx, topicSubs);
+                        return;
+                    }
+
+                    final AtomicBoolean failed = new AtomicBoolean();
+                    final AtomicInteger count = new AtomicInteger();
+
+                    for (final String child : children) {
+
+                        final ByteString subscriberId = ByteString.copyFromUtf8(child);
+                        final String childPath = path + "/" + child;
+
+                        zk.getData(childPath, false, new SafeAsyncZKCallback.DataCallback() {
+                            @Override
+                            public void safeProcessResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
+
+                                if (rc != Code.OK.intValue()) {
+                                    KeeperException e = ZkUtils.logErrorAndCreateZKException(
+                                                            "Could not read subscription data for topic: " + topic.toStringUtf8()
+                                                            + ", subscriberId: " + subscriberId.toStringUtf8(), path, rc);
+                                    reportFailure(new PubSubException.ServiceDownException(e));
+                                    return;
+                                }
+
+                                if (failed.get()) {
+                                    return;
+                                }
+
+                                SubscriptionState state;
+
+                                try {
+                                    state = SubscriptionState.parseFrom(data);
+                                } catch (InvalidProtocolBufferException ex) {
+                                    String msg = "Failed to deserialize state for topic: " + topic.toStringUtf8()
+                                                 + " subscriberId: " + subscriberId.toStringUtf8();
+                                    logger.error(msg, ex);
+                                    reportFailure(new PubSubException.UnexpectedConditionException(msg));
+                                    return;
+                                }
+
+                                if (logger.isDebugEnabled()) {
+                                    logger.debug("Found subscription while acquiring topic: " + topic.toStringUtf8()
+                                                 + " subscriberId: " + child + "state: "
+                                                 + SubscriptionStateUtils.toString(state));
+                                }
+
+                                topicSubs.put(subscriberId, new InMemorySubscriptionState(state));
+                                if (count.incrementAndGet() == children.size()) {
+                                    assert topicSubs.size() == count.get();
+                                    cb.operationFinished(ctx, topicSubs);
+                                }
+                            }
+
+                            private void reportFailure(PubSubException e) {
+                                if (failed.compareAndSet(false, true))
+                                    cb.operationFailed(ctx, e);
+                            }
+                        }, ctx);
+                    }
+                }
+            }, ctx);
+        }
+    }
+}



Mime
View raw message