Return-Path: X-Original-To: apmail-zookeeper-commits-archive@www.apache.org Delivered-To: apmail-zookeeper-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 1DACC9AB5 for ; Wed, 11 Jul 2012 17:11:40 +0000 (UTC) Received: (qmail 27404 invoked by uid 500); 11 Jul 2012 17:11:40 -0000 Delivered-To: apmail-zookeeper-commits-archive@zookeeper.apache.org Received: (qmail 27373 invoked by uid 500); 11 Jul 2012 17:11:40 -0000 Mailing-List: contact commits-help@zookeeper.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ Delivered-To: mailing list commits@zookeeper.apache.org Received: (qmail 27365 invoked by uid 99); 11 Jul 2012 17:11:39 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 11 Jul 2012 17:11:39 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 11 Jul 2012 17:11:30 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 92E5823889EC for ; Wed, 11 Jul 2012 17:11:08 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1360300 [1/2] - in /zookeeper/bookkeeper/trunk: ./ hedwig-protocol/src/main/java/org/apache/hedwig/exceptions/ hedwig-protocol/src/main/java/org/apache/hedwig/protocol/ hedwig-protocol/src/main/protobuf/ hedwig-server/src/main/java/org/apa... Date: Wed, 11 Jul 2012 17:11:07 -0000 To: commits@zookeeper.apache.org From: ivank@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20120711171108.92E5823889EC@eris.apache.org> Author: ivank Date: Wed Jul 11 17:11:06 2012 New Revision: 1360300 URL: http://svn.apache.org/viewvc?rev=1360300&view=rev Log: BOOKKEEPER-250: Need a ledger manager like interface to manage metadata operations in Hedwig (sijie via ivank) Added: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/meta/ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/meta/FactoryLayout.java zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/meta/MetadataManagerFactory.java zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/meta/SubscriptionDataManager.java zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/meta/TopicPersistenceManager.java zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/meta/ZkMetadataManagerFactory.java zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/MMSubscriptionManager.java zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/meta/ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/meta/MetadataManagerFactoryTestCase.java zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/meta/TestFactoryLayout.java zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/meta/TestMetadataManager.java zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/meta/TestMetadataManagerFactory.java zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/subscriptions/TestMMSubscriptionManager.java - copied, changed from r1358895, zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/subscriptions/TestZkSubscriptionManager.java Removed: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/ZkSubscriptionManager.java zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/subscriptions/TestZkSubscriptionManager.java Modified: zookeeper/bookkeeper/trunk/CHANGES.txt zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/java/org/apache/hedwig/exceptions/PubSubException.java zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/java/org/apache/hedwig/protocol/PubSubProtocol.java zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/protobuf/PubSubProtocol.proto zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/common/ServerConfiguration.java zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/netty/PubSubServer.java zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/BookkeeperPersistenceManager.java zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/LocalDBPersistenceManager.java zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/PersistenceManager.java zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/zookeeper/ZkUtils.java zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/StubPersistenceManager.java zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestBookKeeperPersistenceManager.java zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestBookKeeperPersistenceManagerBlackBox.java zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestBookkeeperPersistenceManagerWhiteBox.java zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestPersistenceManagerBlackBox.java Modified: zookeeper/bookkeeper/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1360300&r1=1360299&r2=1360300&view=diff ============================================================================== --- zookeeper/bookkeeper/trunk/CHANGES.txt (original) +++ zookeeper/bookkeeper/trunk/CHANGES.txt Wed Jul 11 17:11:06 2012 @@ -54,6 +54,10 @@ Trunk (unreleased changes) BOOKKEEPER-328: Bookie DeathWatcher is missing thread name (Rakesh via sijie) + hedwig-server: + + BOOKKEEPER-250: Need a ledger manager like interface to manage metadata operations in Hedwig (sijie via ivank) + Release 4.1.0 - 2012-06-07 Non-backward compatible changes: Modified: zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/java/org/apache/hedwig/exceptions/PubSubException.java URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/java/org/apache/hedwig/exceptions/PubSubException.java?rev=1360300&r1=1360299&r2=1360300&view=diff ============================================================================== --- zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/java/org/apache/hedwig/exceptions/PubSubException.java (original) +++ zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/java/org/apache/hedwig/exceptions/PubSubException.java Wed Jul 11 17:11:06 2012 @@ -50,6 +50,18 @@ public abstract class PubSubException ex return new ServiceDownException(msg); } else if (code == StatusCode.COULD_NOT_CONNECT) { return new CouldNotConnectException(msg); + } else if (code == StatusCode.TOPIC_BUSY) { + return new TopicBusyException(msg); + } else if (code == StatusCode.BAD_VERSION) { + return new BadVersionException(msg); + } else if (code == StatusCode.NO_TOPIC_PERSISTENCE_INFO) { + return new NoTopicPersistenceInfoException(msg); + } else if (code == StatusCode.TOPIC_PERSISTENCE_INFO_EXISTS) { + return new TopicPersistenceInfoExistsException(msg); + } else if (code == StatusCode.NO_SUBSCRIPTION_STATE) { + return new NoSubscriptionStateException(msg); + } else if (code == StatusCode.SUBSCRIPTION_STATE_EXISTS) { + return new SubscriptionStateExistsException(msg); } /* * Insert new ones here @@ -120,6 +132,36 @@ public abstract class PubSubException ex } } + public static class BadVersionException extends PubSubException { + public BadVersionException(String msg) { + super(StatusCode.BAD_VERSION, msg); + } + } + + public static class NoTopicPersistenceInfoException extends PubSubException { + public NoTopicPersistenceInfoException(String msg) { + super(StatusCode.NO_TOPIC_PERSISTENCE_INFO, msg); + } + } + + public static class TopicPersistenceInfoExistsException extends PubSubException { + public TopicPersistenceInfoExistsException(String msg) { + super(StatusCode.TOPIC_PERSISTENCE_INFO_EXISTS, msg); + } + } + + public static class NoSubscriptionStateException extends PubSubException { + public NoSubscriptionStateException(String msg) { + super(StatusCode.NO_SUBSCRIPTION_STATE, msg); + } + } + + public static class SubscriptionStateExistsException extends PubSubException { + public SubscriptionStateExistsException(String msg) { + super(StatusCode.SUBSCRIPTION_STATE_EXISTS, msg); + } + } + /* * Insert new ones here */ Modified: zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/java/org/apache/hedwig/protocol/PubSubProtocol.java URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/java/org/apache/hedwig/protocol/PubSubProtocol.java?rev=1360300&r1=1360299&r2=1360300&view=diff ============================================================================== --- zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/java/org/apache/hedwig/protocol/PubSubProtocol.java (original) +++ zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/java/org/apache/hedwig/protocol/PubSubProtocol.java Wed Jul 11 17:11:06 2012 @@ -167,8 +167,13 @@ public final class PubSubProtocol { NOT_RESPONSIBLE_FOR_TOPIC(7, 501), SERVICE_DOWN(8, 502), UNCERTAIN_STATE(9, 503), - UNEXPECTED_CONDITION(10, 600), - COMPOSITE(11, 700), + BAD_VERSION(10, 520), + NO_TOPIC_PERSISTENCE_INFO(11, 521), + TOPIC_PERSISTENCE_INFO_EXISTS(12, 522), + NO_SUBSCRIPTION_STATE(13, 523), + SUBSCRIPTION_STATE_EXISTS(14, 524), + UNEXPECTED_CONDITION(15, 600), + COMPOSITE(16, 700), ; public static final int SUCCESS_VALUE = 0; @@ -181,6 +186,11 @@ public final class PubSubProtocol { public static final int NOT_RESPONSIBLE_FOR_TOPIC_VALUE = 501; public static final int SERVICE_DOWN_VALUE = 502; public static final int UNCERTAIN_STATE_VALUE = 503; + public static final int BAD_VERSION_VALUE = 520; + public static final int NO_TOPIC_PERSISTENCE_INFO_VALUE = 521; + public static final int TOPIC_PERSISTENCE_INFO_EXISTS_VALUE = 522; + public static final int NO_SUBSCRIPTION_STATE_VALUE = 523; + public static final int SUBSCRIPTION_STATE_EXISTS_VALUE = 524; public static final int UNEXPECTED_CONDITION_VALUE = 600; public static final int COMPOSITE_VALUE = 700; @@ -199,6 +209,11 @@ public final class PubSubProtocol { case 501: return NOT_RESPONSIBLE_FOR_TOPIC; case 502: return SERVICE_DOWN; case 503: return UNCERTAIN_STATE; + case 520: return BAD_VERSION; + case 521: return NO_TOPIC_PERSISTENCE_INFO; + case 522: return TOPIC_PERSISTENCE_INFO_EXISTS; + case 523: return NO_SUBSCRIPTION_STATE; + case 524: return SUBSCRIPTION_STATE_EXISTS; case 600: return UNEXPECTED_CONDITION; case 700: return COMPOSITE; default: return null; @@ -231,7 +246,7 @@ public final class PubSubProtocol { } private static final StatusCode[] VALUES = { - SUCCESS, MALFORMED_REQUEST, NO_SUCH_TOPIC, CLIENT_ALREADY_SUBSCRIBED, CLIENT_NOT_SUBSCRIBED, COULD_NOT_CONNECT, TOPIC_BUSY, NOT_RESPONSIBLE_FOR_TOPIC, SERVICE_DOWN, UNCERTAIN_STATE, UNEXPECTED_CONDITION, COMPOSITE, + SUCCESS, MALFORMED_REQUEST, NO_SUCH_TOPIC, CLIENT_ALREADY_SUBSCRIBED, CLIENT_NOT_SUBSCRIBED, COULD_NOT_CONNECT, TOPIC_BUSY, NOT_RESPONSIBLE_FOR_TOPIC, SERVICE_DOWN, UNCERTAIN_STATE, BAD_VERSION, NO_TOPIC_PERSISTENCE_INFO, TOPIC_PERSISTENCE_INFO_EXISTS, NO_SUBSCRIPTION_STATE, SUBSCRIPTION_STATE_EXISTS, UNEXPECTED_CONDITION, COMPOSITE, }; public static StatusCode valueOf( @@ -8933,6 +8948,454 @@ public final class PubSubProtocol { // @@protoc_insertion_point(class_scope:Hedwig.LedgerRanges) } + public interface ManagerMetaOrBuilder + extends com.google.protobuf.MessageOrBuilder { + + // required string managerImpl = 2; + boolean hasManagerImpl(); + String getManagerImpl(); + + // required uint32 managerVersion = 3; + boolean hasManagerVersion(); + int getManagerVersion(); + } + public static final class ManagerMeta extends + com.google.protobuf.GeneratedMessage + implements ManagerMetaOrBuilder { + // Use ManagerMeta.newBuilder() to construct. + private ManagerMeta(Builder builder) { + super(builder); + } + private ManagerMeta(boolean noInit) {} + + private static final ManagerMeta defaultInstance; + public static ManagerMeta getDefaultInstance() { + return defaultInstance; + } + + public ManagerMeta getDefaultInstanceForType() { + return defaultInstance; + } + + public static final com.google.protobuf.Descriptors.Descriptor + getDescriptor() { + return org.apache.hedwig.protocol.PubSubProtocol.internal_static_Hedwig_ManagerMeta_descriptor; + } + + protected com.google.protobuf.GeneratedMessage.FieldAccessorTable + internalGetFieldAccessorTable() { + return org.apache.hedwig.protocol.PubSubProtocol.internal_static_Hedwig_ManagerMeta_fieldAccessorTable; + } + + private int bitField0_; + // required string managerImpl = 2; + public static final int MANAGERIMPL_FIELD_NUMBER = 2; + private java.lang.Object managerImpl_; + public boolean hasManagerImpl() { + return ((bitField0_ & 0x00000001) == 0x00000001); + } + public String getManagerImpl() { + java.lang.Object ref = managerImpl_; + if (ref instanceof String) { + return (String) ref; + } else { + com.google.protobuf.ByteString bs = + (com.google.protobuf.ByteString) ref; + String s = bs.toStringUtf8(); + if (com.google.protobuf.Internal.isValidUtf8(bs)) { + managerImpl_ = s; + } + return s; + } + } + private com.google.protobuf.ByteString getManagerImplBytes() { + java.lang.Object ref = managerImpl_; + if (ref instanceof String) { + com.google.protobuf.ByteString b = + com.google.protobuf.ByteString.copyFromUtf8((String) ref); + managerImpl_ = b; + return b; + } else { + return (com.google.protobuf.ByteString) ref; + } + } + + // required uint32 managerVersion = 3; + public static final int MANAGERVERSION_FIELD_NUMBER = 3; + private int managerVersion_; + public boolean hasManagerVersion() { + return ((bitField0_ & 0x00000002) == 0x00000002); + } + public int getManagerVersion() { + return managerVersion_; + } + + private void initFields() { + managerImpl_ = ""; + managerVersion_ = 0; + } + private byte memoizedIsInitialized = -1; + public final boolean isInitialized() { + byte isInitialized = memoizedIsInitialized; + if (isInitialized != -1) return isInitialized == 1; + + if (!hasManagerImpl()) { + memoizedIsInitialized = 0; + return false; + } + if (!hasManagerVersion()) { + memoizedIsInitialized = 0; + return false; + } + memoizedIsInitialized = 1; + return true; + } + + public void writeTo(com.google.protobuf.CodedOutputStream output) + throws java.io.IOException { + getSerializedSize(); + if (((bitField0_ & 0x00000001) == 0x00000001)) { + output.writeBytes(2, getManagerImplBytes()); + } + if (((bitField0_ & 0x00000002) == 0x00000002)) { + output.writeUInt32(3, managerVersion_); + } + getUnknownFields().writeTo(output); + } + + private int memoizedSerializedSize = -1; + public int getSerializedSize() { + int size = memoizedSerializedSize; + if (size != -1) return size; + + size = 0; + if (((bitField0_ & 0x00000001) == 0x00000001)) { + size += com.google.protobuf.CodedOutputStream + .computeBytesSize(2, getManagerImplBytes()); + } + if (((bitField0_ & 0x00000002) == 0x00000002)) { + size += com.google.protobuf.CodedOutputStream + .computeUInt32Size(3, managerVersion_); + } + size += getUnknownFields().getSerializedSize(); + memoizedSerializedSize = size; + return size; + } + + private static final long serialVersionUID = 0L; + @java.lang.Override + protected java.lang.Object writeReplace() + throws java.io.ObjectStreamException { + return super.writeReplace(); + } + + public static org.apache.hedwig.protocol.PubSubProtocol.ManagerMeta parseFrom( + com.google.protobuf.ByteString data) + throws com.google.protobuf.InvalidProtocolBufferException { + return newBuilder().mergeFrom(data).buildParsed(); + } + public static org.apache.hedwig.protocol.PubSubProtocol.ManagerMeta parseFrom( + com.google.protobuf.ByteString data, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return newBuilder().mergeFrom(data, extensionRegistry) + .buildParsed(); + } + public static org.apache.hedwig.protocol.PubSubProtocol.ManagerMeta parseFrom(byte[] data) + throws com.google.protobuf.InvalidProtocolBufferException { + return newBuilder().mergeFrom(data).buildParsed(); + } + public static org.apache.hedwig.protocol.PubSubProtocol.ManagerMeta parseFrom( + byte[] data, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return newBuilder().mergeFrom(data, extensionRegistry) + .buildParsed(); + } + public static org.apache.hedwig.protocol.PubSubProtocol.ManagerMeta parseFrom(java.io.InputStream input) + throws java.io.IOException { + return newBuilder().mergeFrom(input).buildParsed(); + } + public static org.apache.hedwig.protocol.PubSubProtocol.ManagerMeta parseFrom( + java.io.InputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return newBuilder().mergeFrom(input, extensionRegistry) + .buildParsed(); + } + public static org.apache.hedwig.protocol.PubSubProtocol.ManagerMeta parseDelimitedFrom(java.io.InputStream input) + throws java.io.IOException { + Builder builder = newBuilder(); + if (builder.mergeDelimitedFrom(input)) { + return builder.buildParsed(); + } else { + return null; + } + } + public static org.apache.hedwig.protocol.PubSubProtocol.ManagerMeta parseDelimitedFrom( + java.io.InputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + Builder builder = newBuilder(); + if (builder.mergeDelimitedFrom(input, extensionRegistry)) { + return builder.buildParsed(); + } else { + return null; + } + } + public static org.apache.hedwig.protocol.PubSubProtocol.ManagerMeta parseFrom( + com.google.protobuf.CodedInputStream input) + throws java.io.IOException { + return newBuilder().mergeFrom(input).buildParsed(); + } + public static org.apache.hedwig.protocol.PubSubProtocol.ManagerMeta parseFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return newBuilder().mergeFrom(input, extensionRegistry) + .buildParsed(); + } + + public static Builder newBuilder() { return Builder.create(); } + public Builder newBuilderForType() { return newBuilder(); } + public static Builder newBuilder(org.apache.hedwig.protocol.PubSubProtocol.ManagerMeta prototype) { + return newBuilder().mergeFrom(prototype); + } + public Builder toBuilder() { return newBuilder(this); } + + @java.lang.Override + protected Builder newBuilderForType( + com.google.protobuf.GeneratedMessage.BuilderParent parent) { + Builder builder = new Builder(parent); + return builder; + } + public static final class Builder extends + com.google.protobuf.GeneratedMessage.Builder + implements org.apache.hedwig.protocol.PubSubProtocol.ManagerMetaOrBuilder { + public static final com.google.protobuf.Descriptors.Descriptor + getDescriptor() { + return org.apache.hedwig.protocol.PubSubProtocol.internal_static_Hedwig_ManagerMeta_descriptor; + } + + protected com.google.protobuf.GeneratedMessage.FieldAccessorTable + internalGetFieldAccessorTable() { + return org.apache.hedwig.protocol.PubSubProtocol.internal_static_Hedwig_ManagerMeta_fieldAccessorTable; + } + + // Construct using org.apache.hedwig.protocol.PubSubProtocol.ManagerMeta.newBuilder() + private Builder() { + maybeForceBuilderInitialization(); + } + + private Builder(BuilderParent parent) { + super(parent); + maybeForceBuilderInitialization(); + } + private void maybeForceBuilderInitialization() { + if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) { + } + } + private static Builder create() { + return new Builder(); + } + + public Builder clear() { + super.clear(); + managerImpl_ = ""; + bitField0_ = (bitField0_ & ~0x00000001); + managerVersion_ = 0; + bitField0_ = (bitField0_ & ~0x00000002); + return this; + } + + public Builder clone() { + return create().mergeFrom(buildPartial()); + } + + public com.google.protobuf.Descriptors.Descriptor + getDescriptorForType() { + return org.apache.hedwig.protocol.PubSubProtocol.ManagerMeta.getDescriptor(); + } + + public org.apache.hedwig.protocol.PubSubProtocol.ManagerMeta getDefaultInstanceForType() { + return org.apache.hedwig.protocol.PubSubProtocol.ManagerMeta.getDefaultInstance(); + } + + public org.apache.hedwig.protocol.PubSubProtocol.ManagerMeta build() { + org.apache.hedwig.protocol.PubSubProtocol.ManagerMeta result = buildPartial(); + if (!result.isInitialized()) { + throw newUninitializedMessageException(result); + } + return result; + } + + private org.apache.hedwig.protocol.PubSubProtocol.ManagerMeta buildParsed() + throws com.google.protobuf.InvalidProtocolBufferException { + org.apache.hedwig.protocol.PubSubProtocol.ManagerMeta result = buildPartial(); + if (!result.isInitialized()) { + throw newUninitializedMessageException( + result).asInvalidProtocolBufferException(); + } + return result; + } + + public org.apache.hedwig.protocol.PubSubProtocol.ManagerMeta buildPartial() { + org.apache.hedwig.protocol.PubSubProtocol.ManagerMeta result = new org.apache.hedwig.protocol.PubSubProtocol.ManagerMeta(this); + int from_bitField0_ = bitField0_; + int to_bitField0_ = 0; + if (((from_bitField0_ & 0x00000001) == 0x00000001)) { + to_bitField0_ |= 0x00000001; + } + result.managerImpl_ = managerImpl_; + if (((from_bitField0_ & 0x00000002) == 0x00000002)) { + to_bitField0_ |= 0x00000002; + } + result.managerVersion_ = managerVersion_; + result.bitField0_ = to_bitField0_; + onBuilt(); + return result; + } + + public Builder mergeFrom(com.google.protobuf.Message other) { + if (other instanceof org.apache.hedwig.protocol.PubSubProtocol.ManagerMeta) { + return mergeFrom((org.apache.hedwig.protocol.PubSubProtocol.ManagerMeta)other); + } else { + super.mergeFrom(other); + return this; + } + } + + public Builder mergeFrom(org.apache.hedwig.protocol.PubSubProtocol.ManagerMeta other) { + if (other == org.apache.hedwig.protocol.PubSubProtocol.ManagerMeta.getDefaultInstance()) return this; + if (other.hasManagerImpl()) { + setManagerImpl(other.getManagerImpl()); + } + if (other.hasManagerVersion()) { + setManagerVersion(other.getManagerVersion()); + } + this.mergeUnknownFields(other.getUnknownFields()); + return this; + } + + public final boolean isInitialized() { + if (!hasManagerImpl()) { + + return false; + } + if (!hasManagerVersion()) { + + return false; + } + return true; + } + + public Builder mergeFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + com.google.protobuf.UnknownFieldSet.Builder unknownFields = + com.google.protobuf.UnknownFieldSet.newBuilder( + this.getUnknownFields()); + while (true) { + int tag = input.readTag(); + switch (tag) { + case 0: + this.setUnknownFields(unknownFields.build()); + onChanged(); + return this; + default: { + if (!parseUnknownField(input, unknownFields, + extensionRegistry, tag)) { + this.setUnknownFields(unknownFields.build()); + onChanged(); + return this; + } + break; + } + case 18: { + bitField0_ |= 0x00000001; + managerImpl_ = input.readBytes(); + break; + } + case 24: { + bitField0_ |= 0x00000002; + managerVersion_ = input.readUInt32(); + break; + } + } + } + } + + private int bitField0_; + + // required string managerImpl = 2; + private java.lang.Object managerImpl_ = ""; + public boolean hasManagerImpl() { + return ((bitField0_ & 0x00000001) == 0x00000001); + } + public String getManagerImpl() { + java.lang.Object ref = managerImpl_; + if (!(ref instanceof String)) { + String s = ((com.google.protobuf.ByteString) ref).toStringUtf8(); + managerImpl_ = s; + return s; + } else { + return (String) ref; + } + } + public Builder setManagerImpl(String value) { + if (value == null) { + throw new NullPointerException(); + } + bitField0_ |= 0x00000001; + managerImpl_ = value; + onChanged(); + return this; + } + public Builder clearManagerImpl() { + bitField0_ = (bitField0_ & ~0x00000001); + managerImpl_ = getDefaultInstance().getManagerImpl(); + onChanged(); + return this; + } + void setManagerImpl(com.google.protobuf.ByteString value) { + bitField0_ |= 0x00000001; + managerImpl_ = value; + onChanged(); + } + + // required uint32 managerVersion = 3; + private int managerVersion_ ; + public boolean hasManagerVersion() { + return ((bitField0_ & 0x00000002) == 0x00000002); + } + public int getManagerVersion() { + return managerVersion_; + } + public Builder setManagerVersion(int value) { + bitField0_ |= 0x00000002; + managerVersion_ = value; + onChanged(); + return this; + } + public Builder clearManagerVersion() { + bitField0_ = (bitField0_ & ~0x00000002); + managerVersion_ = 0; + onChanged(); + return this; + } + + // @@protoc_insertion_point(builder_scope:Hedwig.ManagerMeta) + } + + static { + defaultInstance = new ManagerMeta(true); + defaultInstance.initFields(); + } + + // @@protoc_insertion_point(class_scope:Hedwig.ManagerMeta) + } + private static com.google.protobuf.Descriptors.Descriptor internal_static_Hedwig_Message_descriptor; private static @@ -9008,6 +9471,11 @@ public final class PubSubProtocol { private static com.google.protobuf.GeneratedMessage.FieldAccessorTable internal_static_Hedwig_LedgerRanges_fieldAccessorTable; + private static com.google.protobuf.Descriptors.Descriptor + internal_static_Hedwig_ManagerMeta_descriptor; + private static + com.google.protobuf.GeneratedMessage.FieldAccessorTable + internal_static_Hedwig_ManagerMeta_fieldAccessorTable; public static com.google.protobuf.Descriptors.FileDescriptor getDescriptor() { @@ -9062,19 +9530,24 @@ public final class PubSubProtocol { "Id\022\024\n\014messageBound\030\002 \001(\r\"O\n\013LedgerRange\022" + "\020\n\010ledgerId\030\001 \002(\004\022.\n\020endSeqIdIncluded\030\002 " + "\001(\0132\024.Hedwig.MessageSeqId\"3\n\014LedgerRange" + - "s\022#\n\006ranges\030\001 \003(\0132\023.Hedwig.LedgerRange*\"" + - "\n\017ProtocolVersion\022\017\n\013VERSION_ONE\020\001*p\n\rOp" + - "erationType\022\013\n\007PUBLISH\020\000\022\r\n\tSUBSCRIBE\020\001\022" + - "\013\n\007CONSUME\020\002\022\017\n\013UNSUBSCRIBE\020\003\022\022\n\016START_D" + - "ELIVERY\020\004\022\021\n\rSTOP_DELIVERY\020\005*\236\002\n\nStatusC", - "ode\022\013\n\007SUCCESS\020\000\022\026\n\021MALFORMED_REQUEST\020\221\003" + - "\022\022\n\rNO_SUCH_TOPIC\020\222\003\022\036\n\031CLIENT_ALREADY_S" + - "UBSCRIBED\020\223\003\022\032\n\025CLIENT_NOT_SUBSCRIBED\020\224\003" + - "\022\026\n\021COULD_NOT_CONNECT\020\225\003\022\017\n\nTOPIC_BUSY\020\226" + - "\003\022\036\n\031NOT_RESPONSIBLE_FOR_TOPIC\020\365\003\022\021\n\014SER" + - "VICE_DOWN\020\366\003\022\024\n\017UNCERTAIN_STATE\020\367\003\022\031\n\024UN" + - "EXPECTED_CONDITION\020\330\004\022\016\n\tCOMPOSITE\020\274\005B\036\n" + - "\032org.apache.hedwig.protocolH\001" + "s\022#\n\006ranges\030\001 \003(\0132\023.Hedwig.LedgerRange\":" + + "\n\013ManagerMeta\022\023\n\013managerImpl\030\002 \002(\t\022\026\n\016ma" + + "nagerVersion\030\003 \002(\r*\"\n\017ProtocolVersion\022\017\n" + + "\013VERSION_ONE\020\001*p\n\rOperationType\022\013\n\007PUBLI" + + "SH\020\000\022\r\n\tSUBSCRIBE\020\001\022\013\n\007CONSUME\020\002\022\017\n\013UNSU", + "BSCRIBE\020\003\022\022\n\016START_DELIVERY\020\004\022\021\n\rSTOP_DE" + + "LIVERY\020\005*\260\003\n\nStatusCode\022\013\n\007SUCCESS\020\000\022\026\n\021" + + "MALFORMED_REQUEST\020\221\003\022\022\n\rNO_SUCH_TOPIC\020\222\003" + + "\022\036\n\031CLIENT_ALREADY_SUBSCRIBED\020\223\003\022\032\n\025CLIE" + + "NT_NOT_SUBSCRIBED\020\224\003\022\026\n\021COULD_NOT_CONNEC" + + "T\020\225\003\022\017\n\nTOPIC_BUSY\020\226\003\022\036\n\031NOT_RESPONSIBLE" + + "_FOR_TOPIC\020\365\003\022\021\n\014SERVICE_DOWN\020\366\003\022\024\n\017UNCE" + + "RTAIN_STATE\020\367\003\022\020\n\013BAD_VERSION\020\210\004\022\036\n\031NO_T" + + "OPIC_PERSISTENCE_INFO\020\211\004\022\"\n\035TOPIC_PERSIS" + + "TENCE_INFO_EXISTS\020\212\004\022\032\n\025NO_SUBSCRIPTION_", + "STATE\020\213\004\022\036\n\031SUBSCRIPTION_STATE_EXISTS\020\214\004" + + "\022\031\n\024UNEXPECTED_CONDITION\020\330\004\022\016\n\tCOMPOSITE" + + "\020\274\005B\036\n\032org.apache.hedwig.protocolH\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -9201,6 +9674,14 @@ public final class PubSubProtocol { new java.lang.String[] { "Ranges", }, org.apache.hedwig.protocol.PubSubProtocol.LedgerRanges.class, org.apache.hedwig.protocol.PubSubProtocol.LedgerRanges.Builder.class); + internal_static_Hedwig_ManagerMeta_descriptor = + getDescriptor().getMessageTypes().get(15); + internal_static_Hedwig_ManagerMeta_fieldAccessorTable = new + com.google.protobuf.GeneratedMessage.FieldAccessorTable( + internal_static_Hedwig_ManagerMeta_descriptor, + new java.lang.String[] { "ManagerImpl", "ManagerVersion", }, + org.apache.hedwig.protocol.PubSubProtocol.ManagerMeta.class, + org.apache.hedwig.protocol.PubSubProtocol.ManagerMeta.Builder.class); return null; } }; Modified: zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/protobuf/PubSubProtocol.proto URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/protobuf/PubSubProtocol.proto?rev=1360300&r1=1360299&r2=1360300&view=diff ============================================================================== --- zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/protobuf/PubSubProtocol.proto (original) +++ zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/protobuf/PubSubProtocol.proto Wed Jul 11 17:11:06 2012 @@ -156,7 +156,14 @@ enum StatusCode{ NOT_RESPONSIBLE_FOR_TOPIC = 501; SERVICE_DOWN = 502; UNCERTAIN_STATE = 503; - + + //server-side meta manager errors (52x) + BAD_VERSION = 520; + NO_TOPIC_PERSISTENCE_INFO = 521; + TOPIC_PERSISTENCE_INFO_EXISTS = 522; + NO_SUBSCRIPTION_STATE = 523; + SUBSCRIPTION_STATE_EXISTS = 524; + //For all unexpected error conditions UNEXPECTED_CONDITION = 600; @@ -180,3 +187,7 @@ message LedgerRanges{ repeated LedgerRange ranges = 1; } +message ManagerMeta { + required string managerImpl = 2; + required uint32 managerVersion = 3; +} Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/common/ServerConfiguration.java URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/common/ServerConfiguration.java?rev=1360300&r1=1360299&r2=1360300&view=diff ============================================================================== --- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/common/ServerConfiguration.java (original) +++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/common/ServerConfiguration.java Wed Jul 11 17:11:06 2012 @@ -30,7 +30,9 @@ import java.util.List; import org.apache.commons.configuration.ConfigurationException; import com.google.protobuf.ByteString; +import org.apache.bookkeeper.util.ReflectionUtils; import org.apache.hedwig.conf.AbstractConfiguration; +import org.apache.hedwig.server.meta.MetadataManagerFactory; import org.apache.hedwig.util.HedwigSocketAddress; public class ServerConfiguration extends AbstractConfiguration { @@ -60,6 +62,17 @@ public class ServerConfiguration extends protected final static String BK_QUORUM_SIZE = "bk_quorum_size"; protected final static String RETRY_REMOTE_SUBSCRIBE_THREAD_RUN_INTERVAL = "retry_remote_subscribe_thread_run_interval"; + // manager related settings + protected final static String METADATA_MANAGER_FACTORY_CLASS = "metadata_manager_factory_class"; + + private static ClassLoader defaultLoader; + static { + defaultLoader = Thread.currentThread().getContextClassLoader(); + if (null == defaultLoader) { + defaultLoader = ServerConfiguration.class.getClassLoader(); + } + } + // these are the derived attributes protected ByteString myRegionByteString = null; protected HedwigSocketAddress myServerAddress = null; @@ -153,6 +166,17 @@ public class ServerConfiguration extends return sb.append(getZkPrefix()).append("/").append(getMyRegion()); } + /** + * Get znode path to store manager layouts. + * + * @param sb + * StringBuilder to store znode path to store manager layouts. + * @return znode path to store manager layouts. + */ + public StringBuilder getZkManagersPrefix(StringBuilder sb) { + return getZkRegionPrefix(sb).append("/managers"); + } + public StringBuilder getZkTopicsPrefix(StringBuilder sb) { return getZkRegionPrefix(sb).append("/topics"); } @@ -296,4 +320,28 @@ public class ServerConfiguration extends // add other checks here } + + /** + * Get metadata manager factory class. + * + * @return manager class + */ + public Class getMetadataManagerFactoryClass() + throws ConfigurationException { + return ReflectionUtils.getClass(conf, METADATA_MANAGER_FACTORY_CLASS, + null, MetadataManagerFactory.class, + defaultLoader); + } + + /** + * Set metadata manager factory class name + * + * @param managerClsName + * Manager Class Name + * @return server configuration + */ + public ServerConfiguration setMetadataManagerFactoryName(String managerClsName) { + conf.setProperty(METADATA_MANAGER_FACTORY_CLASS, managerClsName); + return this; + } } Added: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/meta/FactoryLayout.java URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/meta/FactoryLayout.java?rev=1360300&view=auto ============================================================================== --- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/meta/FactoryLayout.java (added) +++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/meta/FactoryLayout.java Wed Jul 11 17:11:06 2012 @@ -0,0 +1,149 @@ +package org.apache.hedwig.server.meta; + +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.StringReader; + +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.ZooDefs.Ids; +import org.apache.zookeeper.ZooKeeper; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.protobuf.TextFormat; +import com.google.protobuf.InvalidProtocolBufferException; +import org.apache.hedwig.protocol.PubSubProtocol.ManagerMeta; +import org.apache.hedwig.server.common.ServerConfiguration; +import org.apache.hedwig.zookeeper.ZkUtils; + +/** + * This class encapsulates metadata manager layout information + * that is persistently stored in zookeeper. + * It provides parsing and serialization methods of such information. + * + */ +public class FactoryLayout { + static final Logger logger = LoggerFactory.getLogger(FactoryLayout.class); + + // metadata manager name + public static final String NAME = "METADATA"; + // Znode name to store layout information + public static final String LAYOUT_ZNODE = "LAYOUT"; + public static final String LSEP = "\n"; + + private ManagerMeta managerMeta; + + /** + * Construct metadata manager factory layout. + * + * @param meta + * Meta describes what kind of factory used. + */ + public FactoryLayout(ManagerMeta meta) { + this.managerMeta = meta; + } + + public static String getFactoryLayoutPath(StringBuilder sb, ServerConfiguration cfg) { + return cfg.getZkManagersPrefix(sb).append("/").append(NAME) + .append("/").append(LAYOUT_ZNODE).toString(); + } + + public ManagerMeta getManagerMeta() { + return managerMeta; + } + + /** + * Store the factory layout into zookeeper + * + * @param zk + * ZooKeeper Handle + * @param cfg + * Server Configuration Object + * @throws KeeperException + * @throws IOException + * @throws InterruptedException + */ + public void store(ZooKeeper zk, ServerConfiguration cfg) + throws KeeperException, IOException, InterruptedException { + String factoryLayoutPath = getFactoryLayoutPath(new StringBuilder(), cfg); + + byte[] layoutData = TextFormat.printToString(managerMeta).getBytes(); + ZkUtils.createFullPathOptimistic(zk, factoryLayoutPath, layoutData, + Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + } + + @Override + public int hashCode() { + return managerMeta.hashCode(); + } + + @Override + public boolean equals(Object o) { + if (null == o || + !(o instanceof FactoryLayout)) { + return false; + } + FactoryLayout other = (FactoryLayout)o; + return managerMeta.equals(other.managerMeta); + } + + @Override + public String toString() { + return TextFormat.printToString(managerMeta); + } + + /** + * Read factory layout from zookeeper + * + * @param zk + * ZooKeeper Client + * @param cfg + * Server configuration object + * @return Factory layout, or null if none set in zookeeper + */ + public static FactoryLayout readLayout(final ZooKeeper zk, + final ServerConfiguration cfg) + throws IOException, KeeperException { + String factoryLayoutPath = getFactoryLayoutPath(new StringBuilder(), cfg); + byte[] layoutData; + try { + layoutData = zk.getData(factoryLayoutPath, false, null); + } catch (KeeperException.NoNodeException nne) { + return null; + } catch (InterruptedException ie) { + throw new IOException(ie); + } + ManagerMeta meta; + try { + BufferedReader reader = new BufferedReader( + new StringReader(new String(layoutData))); + ManagerMeta.Builder metaBuilder = ManagerMeta.newBuilder(); + TextFormat.merge(reader, metaBuilder); + meta = metaBuilder.build(); + } catch (InvalidProtocolBufferException ipbe) { + throw new IOException("Corrupted factory layout : ", ipbe); + } + + return new FactoryLayout(meta); + } +} Added: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/meta/MetadataManagerFactory.java URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/meta/MetadataManagerFactory.java?rev=1360300&view=auto ============================================================================== --- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/meta/MetadataManagerFactory.java (added) +++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/meta/MetadataManagerFactory.java Wed Jul 11 17:11:06 2012 @@ -0,0 +1,163 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hedwig.server.meta; + +import java.io.IOException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.bookkeeper.util.ReflectionUtils; +import org.apache.hedwig.protocol.PubSubProtocol.ManagerMeta; +import org.apache.hedwig.server.common.ServerConfiguration; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.ZooKeeper; + +/** + * Metadata Manager used to manage metadata used by hedwig. + */ +public abstract class MetadataManagerFactory { + + static final Logger LOG = LoggerFactory.getLogger(MetadataManagerFactory.class); + + /** + * Return current factory version. + * + * @return current version used by factory. + */ + public abstract int getCurrentVersion(); + + /** + * Initialize the metadata manager factory with given + * configuration and version. + * + * @param cfg + * Server configuration object + * @param zk + * ZooKeeper handler + * @param version + * Manager version + * @return metadata manager factory + * @throws IOException when fail to initialize the manager. + */ + protected abstract MetadataManagerFactory initialize( + ServerConfiguration cfg, ZooKeeper zk, int version) + throws IOException; + + /** + * Uninitialize the factory. + * + * @throws IOException when fail to shutdown the factory. + */ + public abstract void shutdown() throws IOException; + + /** + * Create topic persistence manager. + * + * @return topic persistence manager + */ + public abstract TopicPersistenceManager newTopicPersistenceManager(); + + /** + * Create subscription data manager. + * + * @return subscription data manager. + */ + public abstract SubscriptionDataManager newSubscriptionDataManager(); + + /** + * Create new Metadata Manager Factory. + * + * @param conf + * Configuration Object. + * @param zk + * ZooKeeper Client Handle, talk to zk to know which manager factory is used. + * @return new manager factory. + * @throws IOException + */ + public static MetadataManagerFactory newMetadataManagerFactory( + final ServerConfiguration conf, final ZooKeeper zk) + throws IOException, KeeperException, InterruptedException { + Class factoryClass; + try { + factoryClass = conf.getMetadataManagerFactoryClass(); + } catch (Exception e) { + throw new IOException("Failed to get metadata manager factory class from configuration : ", e); + } + + MetadataManagerFactory managerFactory; + + // check that the configured manager is + // compatible with the existing layout + FactoryLayout layout = FactoryLayout.readLayout(zk, conf); + if (layout == null) { // no existing layout + // use default manager if no one provided + if (factoryClass == null) { + factoryClass = ZkMetadataManagerFactory.class; + } + try { + managerFactory = ReflectionUtils.newInstance(factoryClass); + } catch (Throwable t) { + throw new IOException("Fail to instantiate metadata manager factory : " + factoryClass, t); + } + ManagerMeta managerMeta = ManagerMeta.newBuilder() + .setManagerImpl(factoryClass.getName()) + .setManagerVersion(managerFactory.getCurrentVersion()) + .build(); + layout = new FactoryLayout(managerMeta); + try { + layout.store(zk, conf); + } catch (KeeperException.NodeExistsException nee) { + FactoryLayout layout2 = FactoryLayout.readLayout(zk, conf); + if (!layout2.equals(layout)) { + throw new IOException("Contention writing to layout to zookeeper, " + + " other layout " + layout2 + " is incompatible with our " + + "layout " + layout); + } + } + return managerFactory.initialize(conf, zk, layout.getManagerMeta().getManagerVersion()); + } + LOG.debug("read meta layout {}", layout); + + if (factoryClass != null && + !layout.getManagerMeta().getManagerImpl().equals(factoryClass.getName())) { + throw new IOException("Configured metadata manager factory " + factoryClass.getName() + + " does not match existing factory " + layout.getManagerMeta().getManagerImpl()); + } + if (factoryClass == null) { + // no factory specified in configuration + String factoryClsName = layout.getManagerMeta().getManagerImpl(); + try { + Class theCls = Class.forName(factoryClsName); + if (!MetadataManagerFactory.class.isAssignableFrom(theCls)) { + throw new IOException("Wrong metadata manager factory " + factoryClsName); + } + factoryClass = theCls.asSubclass(MetadataManagerFactory.class); + } catch (ClassNotFoundException cnfe) { + throw new IOException("No class found to instantiate metadata manager factory " + factoryClsName); + } + } + // instantiate the metadata manager factory + try { + managerFactory = ReflectionUtils.newInstance(factoryClass); + } catch (Throwable t) { + throw new IOException("Failed to instantiate metadata manager factory : " + factoryClass, t); + } + return managerFactory.initialize(conf, zk, layout.getManagerMeta().getManagerVersion()); + } +} Added: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/meta/SubscriptionDataManager.java URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/meta/SubscriptionDataManager.java?rev=1360300&view=auto ============================================================================== --- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/meta/SubscriptionDataManager.java (added) +++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/meta/SubscriptionDataManager.java Wed Jul 11 17:11:06 2012 @@ -0,0 +1,118 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hedwig.server.meta; + +import java.io.Closeable; +import java.util.Map; + +import com.google.protobuf.ByteString; + +import org.apache.hedwig.exceptions.PubSubException; +import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionState; +import org.apache.hedwig.server.subscriptions.InMemorySubscriptionState; +import org.apache.hedwig.util.Callback; + +/** + * Manage subscription data. + */ +public interface SubscriptionDataManager extends Closeable { + + /** + * Create subscription state. + * + * @param topic + * Topic name + * @param subscriberId + * Subscriber id + * @param state + * Subscription state + * @param callback + * Callback when subscription state created. + * {@link PubSubException.SubscriptionStateExistsException} is returned when subscription state + * existed before. + * @param ctx + * Context of the callback + */ + public void createSubscriptionState(ByteString topic, ByteString subscriberId, SubscriptionState state, + Callback callback, Object ctx); + + /** + * Update subscription state. + * + * @param topic + * Topic name + * @param subscriberId + * Subscriber id + * @param state + * Subscription state + * @param callback + * Callback when subscription state updated. + * {@link PubSubException.NoSubscriptionStateException} is returned when no subscription state + * is found. + * @param ctx + * Context of the callback + */ + public void updateSubscriptionState(ByteString topic, ByteString subscriberId, SubscriptionState state, + Callback callback, Object ctx); + + /** + * Remove subscription state. + * + * @param topic + * Topic name + * @param subscriberId + * Subscriber id + * @param callback + * Callback when subscription state deleted + * {@link PubSubException.NoSubscriptionStateException} is returned when no subscription state + * is found. + * @param ctx + * Context of the callback + */ + public void deleteSubscriptionState(ByteString topic, ByteString subscriberId, + Callback callback, Object ctx); + + /** + * Read subscription state. + * + * @param topic + * Topic Name + * @param subscriberId + * Subscriber id + * @param callback + * Callback when subscription state read. + * Null is returned when no subscription state is found. + * @param ctx + * Context of the callback + */ + public void readSubscriptionState(ByteString topic, ByteString subscriberId, + Callback callback, Object ctx); + + /** + * Read all subscriptions of a topic. + * + * @param topic + * Topic name + * @param callback + * Callback to return subscriptions + * @param ctx + * Contxt of the callback + */ + public void readSubscriptions(ByteString topic, Callback> cb, + Object ctx); +} Added: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/meta/TopicPersistenceManager.java URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/meta/TopicPersistenceManager.java?rev=1360300&view=auto ============================================================================== --- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/meta/TopicPersistenceManager.java (added) +++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/meta/TopicPersistenceManager.java Wed Jul 11 17:11:06 2012 @@ -0,0 +1,96 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hedwig.server.meta; + +import java.io.Closeable; + +import com.google.protobuf.ByteString; + +import org.apache.bookkeeper.versioning.Version; +import org.apache.bookkeeper.versioning.Versioned; +import org.apache.hedwig.exceptions.PubSubException; +import org.apache.hedwig.protocol.PubSubProtocol.LedgerRanges; +import org.apache.hedwig.util.Callback; + +/** + * Manage topic persistence metadata. + */ +public interface TopicPersistenceManager extends Closeable { + + /** + * Read persistence info of a specified topic. + * + * @param topic + * Topic Name + * @param callback + * Callback when read persistence info. + * If no persistence info found, return null. + * @param ctx + * Context of the callback + */ + public void readTopicPersistenceInfo(ByteString topic, + Callback> callback, Object ctx); + + /** + * Update persistence info of a specified topic. + * + * @param topic + * Topic name + * @param ranges + * Persistence info + * @param version + * Current version of persistence info. + * If version is null, create persistence info; + * {@link PubSubException.TopicPersistenceInfoExistsException} is returned when + * persistence info existed before. + * If version is not null, the persitence info is updated only when + * provided version equals to its current version. + * {@link PubSubException.BadVersionException} is returned when version doesn't match, + * {@link PubSubException.NoTopicPersistenceInfoException} is returned when no + * persistence info found to update. + * @param callback + * Callback when persistence info updated. New version would be returned. + * @param ctx + * Context of the callback + */ + public void writeTopicPersistenceInfo(ByteString topic, LedgerRanges ranges, Version version, + Callback callback, Object ctx); + + /** + * Delete persistence info of a specified topic. + * Currently used in test cases. + * + * @param topic + * Topic name + * @param version + * Current version of persistence info + * If version is null, delete persistence info no matter its current version. + * If version is not null, the persitence info is deleted only when + * provided version equals to its current version. + * @param callback + * Callback return whether the deletion succeed. + * {@link PubSubException.NoTopicPersistenceInfoException} is returned when no persistence. + * {@link PubSubException.BadVersionException} is returned when version doesn't match. + * info found to delete. + * @param ctx + * Context of the callback + */ + public void deleteTopicPersistenceInfo(ByteString topic, Version version, + Callback callback, Object ctx); + +} Added: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/meta/ZkMetadataManagerFactory.java URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/meta/ZkMetadataManagerFactory.java?rev=1360300&view=auto ============================================================================== --- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/meta/ZkMetadataManagerFactory.java (added) +++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/meta/ZkMetadataManagerFactory.java Wed Jul 11 17:11:06 2012 @@ -0,0 +1,559 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hedwig.server.meta; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.KeeperException.Code; +import org.apache.zookeeper.ZooDefs.Ids; +import org.apache.zookeeper.data.Stat; +import com.google.protobuf.ByteString; +import com.google.protobuf.InvalidProtocolBufferException; +import org.apache.bookkeeper.versioning.Version; +import org.apache.bookkeeper.versioning.Versioned; +import org.apache.bookkeeper.meta.ZkVersion; +import org.apache.hedwig.exceptions.PubSubException; +import org.apache.hedwig.protocol.PubSubProtocol.LedgerRanges; +import org.apache.hedwig.protocol.PubSubProtocol.StatusCode; +import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionState; +import org.apache.hedwig.protoextensions.SubscriptionStateUtils; +import org.apache.hedwig.server.common.ServerConfiguration; +import org.apache.hedwig.server.subscriptions.InMemorySubscriptionState; +import org.apache.hedwig.util.Callback; +import org.apache.hedwig.zookeeper.SafeAsyncZKCallback; +import org.apache.hedwig.zookeeper.ZkUtils; + +/** + * ZooKeeper-based Metadata Manager. + */ +public class ZkMetadataManagerFactory extends MetadataManagerFactory { + protected final static Logger logger = LoggerFactory.getLogger(ZkMetadataManagerFactory.class); + + static final int CUR_VERSION = 1; + + ZooKeeper zk; + ServerConfiguration cfg; + + @Override + public int getCurrentVersion() { + return CUR_VERSION; + } + + @Override + public MetadataManagerFactory initialize(ServerConfiguration cfg, + ZooKeeper zk, + int version) + throws IOException { + if (CUR_VERSION != version) { + throw new IOException("Incompatible ZkMetadataManagerFactory version " + version + + " found, expected version " + CUR_VERSION); + } + this.cfg = cfg; + this.zk = zk; + return this; + } + + @Override + public void shutdown() { + // do nothing here, because zookeeper handle is passed from outside + // we don't need to stop it. + } + + @Override + public TopicPersistenceManager newTopicPersistenceManager() { + return new ZkTopicPersistenceManagerImpl(cfg, zk); + } + + @Override + public SubscriptionDataManager newSubscriptionDataManager() { + return new ZkSubscriptionDataManagerImpl(cfg, zk); + } + + /** + * ZooKeeper based topic persistence manager. + */ + static class ZkTopicPersistenceManagerImpl implements TopicPersistenceManager { + + ZooKeeper zk; + ServerConfiguration cfg; + + ZkTopicPersistenceManagerImpl(ServerConfiguration conf, ZooKeeper zk) { + this.cfg = conf; + this.zk = zk; + } + + @Override + public void close() throws IOException { + // do nothing in zookeeper based impl + } + + /** + * Get znode path to store persistence info of a topic. + * + * @param topic + * Topic name + * @return znode path to store persistence info. + */ + private String ledgersPath(ByteString topic) { + return cfg.getZkTopicPath(new StringBuilder(), topic).append("/ledgers").toString(); + } + + /** + * Parse ledger ranges data and return it thru callback. + * + * @param topic + * Topic name + * @param data + * Topic Ledger Ranges data + * @param version + * Version of the topic ledger ranges data + * @param callback + * Callback to return ledger ranges + * @param ctx + * Context of the callback + */ + private void parseAndReturnTopicLedgerRanges(ByteString topic, byte[] data, int version, + Callback> callback, Object ctx) { + try { + Versioned ranges = new Versioned(LedgerRanges.parseFrom(data), + new ZkVersion(version)); + callback.operationFinished(ctx, ranges); + return; + } catch (InvalidProtocolBufferException e) { + String msg = "Ledger ranges for topic:" + topic.toStringUtf8() + " could not be deserialized"; + logger.error(msg, e); + callback.operationFailed(ctx, new PubSubException.UnexpectedConditionException(msg)); + return; + } + } + + @Override + public void readTopicPersistenceInfo(final ByteString topic, + final Callback> callback, + Object ctx) { + // read topic ledgers node data + final String zNodePath = ledgersPath(topic); + + zk.getData(zNodePath, false, new SafeAsyncZKCallback.DataCallback() { + @Override + public void safeProcessResult(int rc, String path, Object ctx, byte[] data, Stat stat) { + if (rc == Code.OK.intValue()) { + parseAndReturnTopicLedgerRanges(topic, data, stat.getVersion(), callback, ctx); + return; + } + + if (rc == Code.NONODE.intValue()) { + // we don't create the znode until we first write it. + callback.operationFinished(ctx, null); + return; + } + + // otherwise some other error + KeeperException ke = + ZkUtils.logErrorAndCreateZKException("Could not read ledgers node for topic: " + + topic.toStringUtf8(), path, rc); + callback.operationFailed(ctx, new PubSubException.ServiceDownException(ke)); + } + }, ctx); + } + + private void createTopicPersistenceInfo(final ByteString topic, LedgerRanges ranges, + final Callback callback, Object ctx) { + final String zNodePath = ledgersPath(topic); + final byte[] data = ranges.toByteArray(); + // create it + ZkUtils.createFullPathOptimistic(zk, zNodePath, data, Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT, new SafeAsyncZKCallback.StringCallback() { + @Override + public void safeProcessResult(int rc, String path, Object ctx, String name) { + if (rc == Code.NODEEXISTS.intValue()) { + callback.operationFailed(ctx, PubSubException.create(StatusCode.TOPIC_PERSISTENCE_INFO_EXISTS, + "Persistence info of topic " + topic.toStringUtf8() + " existed.")); + return; + } + if (rc != Code.OK.intValue()) { + KeeperException ke = ZkUtils.logErrorAndCreateZKException( + "Could not create ledgers node for topic: " + topic.toStringUtf8(), + path, rc); + callback.operationFailed(ctx, new PubSubException.ServiceDownException(ke)); + return; + } + // initial version is version 0 + callback.operationFinished(ctx, new ZkVersion(0)); + } + }, ctx); + return; + } + + @Override + public void writeTopicPersistenceInfo(final ByteString topic, LedgerRanges ranges, final Version version, + final Callback callback, Object ctx) { + if (null == version) { + createTopicPersistenceInfo(topic, ranges, callback, ctx); + return; + } + + final String zNodePath = ledgersPath(topic); + final byte[] data = ranges.toByteArray(); + + if (!(version instanceof ZkVersion)) { + callback.operationFailed(ctx, new PubSubException.UnexpectedConditionException( + "Invalid version provided to update persistence info for topic " + topic.toStringUtf8())); + return; + } + + int znodeVersion = ((ZkVersion)version).getZnodeVersion(); + zk.setData(zNodePath, data, znodeVersion, new SafeAsyncZKCallback.StatCallback() { + @Override + public void safeProcessResult(int rc, String path, Object ctx, Stat stat) { + if (rc == Code.NONODE.intValue()) { + // no node + callback.operationFailed(ctx, PubSubException.create(StatusCode.NO_TOPIC_PERSISTENCE_INFO, + "No persistence info found for topic " + topic.toStringUtf8())); + return; + } else if (rc == Code.BadVersion) { + // bad version + callback.operationFailed(ctx, PubSubException.create(StatusCode.BAD_VERSION, + "Bad version provided to update persistence info of topic " + topic.toStringUtf8())); + return; + } else if (rc == Code.OK.intValue()) { + callback.operationFinished(ctx, new ZkVersion(stat.getVersion())); + return; + } else { + KeeperException ke = ZkUtils.logErrorAndCreateZKException( + "Could not write ledgers node for topic: " + topic.toStringUtf8(), path, rc); + callback.operationFailed(ctx, new PubSubException.ServiceDownException(ke)); + return; + } + } + }, ctx); + } + + @Override + public void deleteTopicPersistenceInfo(final ByteString topic, final Version version, + final Callback callback, Object ctx) { + final String zNodePath = ledgersPath(topic); + + int znodeVersion = -1; + if (null != version) { + if (!(version instanceof ZkVersion)) { + callback.operationFailed(ctx, new PubSubException.UnexpectedConditionException( + "Invalid version provided to delete persistence info for topic " + topic.toStringUtf8())); + return; + } else { + znodeVersion = ((ZkVersion)version).getZnodeVersion(); + } + } + zk.delete(zNodePath, znodeVersion, new SafeAsyncZKCallback.VoidCallback() { + @Override + public void safeProcessResult(int rc, String path, Object ctx) { + if (rc == Code.OK.intValue()) { + callback.operationFinished(ctx, null); + return; + } else if (rc == Code.NONODE.intValue()) { + // no node + callback.operationFailed(ctx, PubSubException.create(StatusCode.NO_TOPIC_PERSISTENCE_INFO, + "No persistence info found for topic " + topic.toStringUtf8())); + return; + } else if (rc == Code.BadVersion) { + // bad version + callback.operationFailed(ctx, PubSubException.create(StatusCode.BAD_VERSION, + "Bad version provided to delete persistence info of topic " + topic.toStringUtf8())); + return; + } + + KeeperException e = ZkUtils.logErrorAndCreateZKException("Topic: " + topic.toStringUtf8() + + " failed to delete persistence info @version " + version + " : ", path, rc); + callback.operationFailed(ctx, new PubSubException.ServiceDownException(e)); + } + }, ctx); + } + } + + /** + * ZooKeeper based subscription data manager. + */ + static class ZkSubscriptionDataManagerImpl implements SubscriptionDataManager { + + ZooKeeper zk; + ServerConfiguration cfg; + + ZkSubscriptionDataManagerImpl(ServerConfiguration conf, ZooKeeper zk) { + this.cfg = conf; + this.zk = zk; + } + + @Override + public void close() throws IOException { + // do nothing in zookeeper based impl + } + + /** + * Get znode path to store subscription states. + * + * @param sb + * String builder to store the znode path. + * @param topic + * Topic name. + * + * @return string builder to store znode path. + */ + private StringBuilder topicSubscribersPath(StringBuilder sb, ByteString topic) { + return cfg.getZkTopicPath(sb, topic).append("/subscribers"); + } + + /** + * Get znode path to store subscription state for a specified subscriber. + * + * @param topic + * Topic name. + * @param subscriber + * Subscriber id. + * @return znode path to store subscription state. + */ + private String topicSubscriberPath(ByteString topic, ByteString subscriber) { + return topicSubscribersPath(new StringBuilder(), topic).append("/").append(subscriber.toStringUtf8()) + .toString(); + } + + @Override + public void createSubscriptionState(final ByteString topic, final ByteString subscriberId, final SubscriptionState state, + final Callback callback, final Object ctx) { + ZkUtils.createFullPathOptimistic(zk, topicSubscriberPath(topic, subscriberId), state.toByteArray(), + Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, new SafeAsyncZKCallback.StringCallback() { + + @Override + public void safeProcessResult(int rc, String path, Object ctx, String name) { + + if (rc == Code.NODEEXISTS.intValue()) { + callback.operationFailed(ctx, PubSubException.create(StatusCode.SUBSCRIPTION_STATE_EXISTS, + "Subscription state for (topic:" + topic.toStringUtf8() + ", subscriber:" + + subscriberId.toStringUtf8() + ") existed.")); + return; + } else if (rc == Code.OK.intValue()) { + if (logger.isDebugEnabled()) { + logger.debug("Successfully recorded subscription for topic: " + topic.toStringUtf8() + + " subscriberId: " + subscriberId.toStringUtf8() + " state: " + + SubscriptionStateUtils.toString(state)); + } + callback.operationFinished(ctx, null); + } else { + KeeperException ke = ZkUtils.logErrorAndCreateZKException( + "Could not record new subscription for topic: " + topic.toStringUtf8() + + " subscriberId: " + subscriberId.toStringUtf8(), path, rc); + callback.operationFailed(ctx, new PubSubException.ServiceDownException(ke)); + } + } + }, ctx); + } + + @Override + public void updateSubscriptionState(final ByteString topic, final ByteString subscriberId, final SubscriptionState state, + final Callback callback, final Object ctx) { + zk.setData(topicSubscriberPath(topic, subscriberId), state.toByteArray(), -1, + new SafeAsyncZKCallback.StatCallback() { + @Override + public void safeProcessResult(int rc, String path, Object ctx, Stat stat) { + if (rc == Code.NONODE.intValue()) { + // no node + callback.operationFailed(ctx, PubSubException.create(StatusCode.NO_SUBSCRIPTION_STATE, + "No subscription state found for (topic:" + topic.toStringUtf8() + ", subscriber:" + + subscriberId.toStringUtf8() + ").")); + return; + } else if (rc != Code.OK.intValue()) { + KeeperException e = ZkUtils.logErrorAndCreateZKException("Topic: " + topic.toStringUtf8() + + " subscriberId: " + subscriberId.toStringUtf8() + + " could not set subscription state: " + SubscriptionStateUtils.toString(state), + path, rc); + callback.operationFailed(ctx, new PubSubException.ServiceDownException(e)); + } else { + if (logger.isDebugEnabled()) { + logger.debug("Successfully updated subscription for topic: " + topic.toStringUtf8() + + " subscriberId: " + subscriberId.toStringUtf8() + " state: " + + SubscriptionStateUtils.toString(state)); + } + + callback.operationFinished(ctx, null); + } + } + }, ctx); + } + + @Override + public void deleteSubscriptionState(final ByteString topic, final ByteString subscriberId, + final Callback callback, Object ctx) { + zk.delete(topicSubscriberPath(topic, subscriberId), -1, new SafeAsyncZKCallback.VoidCallback() { + @Override + public void safeProcessResult(int rc, String path, Object ctx) { + if (rc == Code.NONODE.intValue()) { + // no node + callback.operationFailed(ctx, PubSubException.create(StatusCode.NO_SUBSCRIPTION_STATE, + "No subscription state found for (topic:" + topic.toStringUtf8() + ", subscriber:" + + subscriberId.toStringUtf8() + ").")); + return; + } else if (rc == Code.OK.intValue()) { + if (logger.isDebugEnabled()) { + logger.debug("Successfully deleted subscription for topic: " + topic.toStringUtf8() + + " subscriberId: " + subscriberId.toStringUtf8()); + } + + callback.operationFinished(ctx, null); + return; + } + + KeeperException e = ZkUtils.logErrorAndCreateZKException("Topic: " + topic.toStringUtf8() + + " subscriberId: " + subscriberId.toStringUtf8() + " failed to delete subscription", path, rc); + callback.operationFailed(ctx, new PubSubException.ServiceDownException(e)); + } + }, ctx); + } + + @Override + public void readSubscriptionState(final ByteString topic, final ByteString subscriberId, + final Callback callback, final Object ctx) { + zk.getData(topicSubscriberPath(topic, subscriberId), false, new SafeAsyncZKCallback.DataCallback() { + @Override + public void safeProcessResult(int rc, String path, Object ctx, byte[] data, Stat stat) { + if (rc == Code.NONODE.intValue()) { + callback.operationFinished(ctx, null); + return; + } + if (rc != Code.OK.intValue()) { + KeeperException e = ZkUtils.logErrorAndCreateZKException( + "Could not read subscription data for topic: " + topic.toStringUtf8() + + ", subscriberId: " + subscriberId.toStringUtf8(), path, rc); + callback.operationFailed(ctx, new PubSubException.ServiceDownException(e)); + return; + } + + SubscriptionState state; + try { + state = SubscriptionState.parseFrom(data); + } catch (InvalidProtocolBufferException ex) { + String msg = "Failed to deserialize state for topic: " + topic.toStringUtf8() + + " subscriberId: " + subscriberId.toStringUtf8(); + logger.error(msg, ex); + callback.operationFailed(ctx, new PubSubException.UnexpectedConditionException(msg)); + return; + } + + if (logger.isDebugEnabled()) { + logger.debug("Found subscription while acquiring topic: " + topic.toStringUtf8() + + " subscriberId: " + subscriberId.toStringUtf8() + + "state: " + SubscriptionStateUtils.toString(state)); + } + callback.operationFinished(ctx, state); + } + }, ctx); + } + + @Override + public void readSubscriptions(final ByteString topic, + final Callback> cb, final Object ctx) { + String topicSubscribersPath = topicSubscribersPath(new StringBuilder(), topic).toString(); + zk.getChildren(topicSubscribersPath, false, new SafeAsyncZKCallback.ChildrenCallback() { + @Override + public void safeProcessResult(int rc, String path, final Object ctx, final List children) { + + if (rc != Code.OK.intValue() && rc != Code.NONODE.intValue()) { + KeeperException e = ZkUtils.logErrorAndCreateZKException("Could not read subscribers for topic " + + topic.toStringUtf8(), path, rc); + cb.operationFailed(ctx, new PubSubException.ServiceDownException(e)); + return; + } + + final Map topicSubs = new ConcurrentHashMap(); + + if (rc == Code.NONODE.intValue() || children.size() == 0) { + if (logger.isDebugEnabled()) { + logger.debug("No subscriptions found while acquiring topic: " + topic.toStringUtf8()); + } + cb.operationFinished(ctx, topicSubs); + return; + } + + final AtomicBoolean failed = new AtomicBoolean(); + final AtomicInteger count = new AtomicInteger(); + + for (final String child : children) { + + final ByteString subscriberId = ByteString.copyFromUtf8(child); + final String childPath = path + "/" + child; + + zk.getData(childPath, false, new SafeAsyncZKCallback.DataCallback() { + @Override + public void safeProcessResult(int rc, String path, Object ctx, byte[] data, Stat stat) { + + if (rc != Code.OK.intValue()) { + KeeperException e = ZkUtils.logErrorAndCreateZKException( + "Could not read subscription data for topic: " + topic.toStringUtf8() + + ", subscriberId: " + subscriberId.toStringUtf8(), path, rc); + reportFailure(new PubSubException.ServiceDownException(e)); + return; + } + + if (failed.get()) { + return; + } + + SubscriptionState state; + + try { + state = SubscriptionState.parseFrom(data); + } catch (InvalidProtocolBufferException ex) { + String msg = "Failed to deserialize state for topic: " + topic.toStringUtf8() + + " subscriberId: " + subscriberId.toStringUtf8(); + logger.error(msg, ex); + reportFailure(new PubSubException.UnexpectedConditionException(msg)); + return; + } + + if (logger.isDebugEnabled()) { + logger.debug("Found subscription while acquiring topic: " + topic.toStringUtf8() + + " subscriberId: " + child + "state: " + + SubscriptionStateUtils.toString(state)); + } + + topicSubs.put(subscriberId, new InMemorySubscriptionState(state)); + if (count.incrementAndGet() == children.size()) { + assert topicSubs.size() == count.get(); + cb.operationFinished(ctx, topicSubs); + } + } + + private void reportFailure(PubSubException e) { + if (failed.compareAndSet(false, true)) + cb.operationFailed(ctx, e); + } + }, ctx); + } + } + }, ctx); + } + } +}