zookeeper-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From iv...@apache.org
Subject svn commit: r1360300 [2/2] - in /zookeeper/bookkeeper/trunk: ./ hedwig-protocol/src/main/java/org/apache/hedwig/exceptions/ hedwig-protocol/src/main/java/org/apache/hedwig/protocol/ hedwig-protocol/src/main/protobuf/ hedwig-server/src/main/java/org/apa...
Date Wed, 11 Jul 2012 17:11:07 GMT
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/netty/PubSubServer.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/netty/PubSubServer.java?rev=1360300&r1=1360299&r2=1360300&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/netty/PubSubServer.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/netty/PubSubServer.java Wed Jul 11 17:11:06 2012
@@ -63,6 +63,7 @@ import org.apache.hedwig.server.handlers
 import org.apache.hedwig.server.handlers.SubscribeHandler;
 import org.apache.hedwig.server.handlers.UnsubscribeHandler;
 import org.apache.hedwig.server.jmx.HedwigMBeanRegistry;
+import org.apache.hedwig.server.meta.MetadataManagerFactory;
 import org.apache.hedwig.server.persistence.BookkeeperPersistenceManager;
 import org.apache.hedwig.server.persistence.LocalDBPersistenceManager;
 import org.apache.hedwig.server.persistence.PersistenceManager;
@@ -73,7 +74,7 @@ import org.apache.hedwig.server.regions.
 import org.apache.hedwig.server.ssl.SslServerContextFactory;
 import org.apache.hedwig.server.subscriptions.InMemorySubscriptionManager;
 import org.apache.hedwig.server.subscriptions.SubscriptionManager;
-import org.apache.hedwig.server.subscriptions.ZkSubscriptionManager;
+import org.apache.hedwig.server.subscriptions.MMSubscriptionManager;
 import org.apache.hedwig.server.topics.TopicManager;
 import org.apache.hedwig.server.topics.TrivialOwnAllTopicManager;
 import org.apache.hedwig.server.topics.ZkTopicManager;
@@ -98,6 +99,9 @@ public class PubSubServer {
     SubscriptionManager sm;
     RegionManager rm;
 
+    // Metadata Manager Factory
+    MetadataManagerFactory mm;
+
     ZooKeeper zk; // null if we are in standalone mode
     BookKeeper bk; // null if we are in standalone mode
 
@@ -127,7 +131,7 @@ public class PubSubServer {
                 logger.error("Could not instantiate bookkeeper client", e);
                 throw new IOException(e);
             }
-            underlyingPM = new BookkeeperPersistenceManager(bk, zk, topicMgr, conf, scheduler);
+            underlyingPM = new BookkeeperPersistenceManager(bk, mm, topicMgr, conf, scheduler);
 
         }
 
@@ -144,7 +148,7 @@ public class PubSubServer {
         if (conf.isStandalone()) {
             return new InMemorySubscriptionManager(tm, pm, conf, scheduler);
         } else {
-            return new ZkSubscriptionManager(zk, tm, pm, conf, scheduler);
+            return new MMSubscriptionManager(mm, tm, pm, conf, scheduler);
         }
 
     }
@@ -175,6 +179,13 @@ public class PubSubServer {
         }
     }
 
+    protected void instantiateMetadataManagerFactory() throws Exception {
+        if (conf.isStandalone()) {
+            return;
+        }
+        mm = MetadataManagerFactory.newMetadataManagerFactory(conf, zk);
+    }
+
     protected TopicManager instantiateTopicManager() throws IOException {
         TopicManager tm;
 
@@ -233,10 +244,20 @@ public class PubSubServer {
         // Stop the DeliveryManager and ReadAheadCache threads (if
         // applicable).
         dm.stop();
+        pm.stop();
 
         // Stop the SubscriptionManager if needed.
         sm.stop();
 
+        // Shutdown metadata manager if needed
+        if (null != mm) {
+            try {
+                mm.shutdown();
+            } catch (IOException ie) {
+                logger.error("Error while shutdown metadata manager factory!", ie);
+            }
+        }
+
         // Shutdown the ZooKeeper and BookKeeper clients only if we are
         // not in stand-alone mode.
         try {
@@ -245,9 +266,9 @@ public class PubSubServer {
             if (zk != null)
                 zk.close();
         } catch (InterruptedException e) {
-            logger.error("Error while closing ZooKeeper client!");
+            logger.error("Error while closing ZooKeeper client : ", e);
         } catch (BKException bke) {
-            logger.error("Error while closing BookKeeper client");
+            logger.error("Error while closing BookKeeper client : ", bke);
         }
 
         // Close and release the Netty channels and resources
@@ -350,6 +371,7 @@ public class PubSubServer {
                             .newCachedThreadPool());
 
                     instantiateZookeeperClient();
+                    instantiateMetadataManagerFactory();
                     tm = instantiateTopicManager();
                     pm = instantiatePersistenceManager(tm);
                     dm = new FIFODeliveryManager(pm, conf);

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/BookkeeperPersistenceManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/BookkeeperPersistenceManager.java?rev=1360300&r1=1360299&r2=1360300&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/BookkeeperPersistenceManager.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/BookkeeperPersistenceManager.java Wed Jul 11 17:11:06 2012
@@ -33,14 +33,10 @@ import org.apache.bookkeeper.client.Book
 import org.apache.bookkeeper.client.LedgerEntry;
 import org.apache.bookkeeper.client.LedgerHandle;
 import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.bookkeeper.versioning.Version;
+import org.apache.bookkeeper.versioning.Versioned;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.ZooDefs;
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.KeeperException.Code;
-import org.apache.zookeeper.data.Stat;
 
 import com.google.protobuf.ByteString;
 import com.google.protobuf.InvalidProtocolBufferException;
@@ -54,13 +50,13 @@ import org.apache.hedwig.protoextensions
 import org.apache.hedwig.server.common.ServerConfiguration;
 import org.apache.hedwig.server.common.TopicOpQueuer;
 import org.apache.hedwig.server.common.UnexpectedError;
+import org.apache.hedwig.server.meta.MetadataManagerFactory;
+import org.apache.hedwig.server.meta.TopicPersistenceManager;
 import org.apache.hedwig.server.persistence.ScanCallback.ReasonForFinish;
 import org.apache.hedwig.server.topics.TopicManager;
 import org.apache.hedwig.server.topics.TopicOwnershipChangeListener;
 import org.apache.hedwig.util.Callback;
 import org.apache.hedwig.zookeeper.SafeAsynBKCallback;
-import org.apache.hedwig.zookeeper.SafeAsyncZKCallback;
-import org.apache.hedwig.zookeeper.ZkUtils;
 
 /**
  * This persistence manager uses zookeeper and bookkeeper to store messages.
@@ -78,7 +74,7 @@ public class BookkeeperPersistenceManage
     static Logger logger = LoggerFactory.getLogger(BookkeeperPersistenceManager.class);
     static byte[] passwd = "sillysecret".getBytes();
     private BookKeeper bk;
-    private ZooKeeper zk;
+    private TopicPersistenceManager tpManager;
     private ServerConfiguration cfg;
     private TopicManager tm;
 
@@ -122,7 +118,7 @@ public class BookkeeperPersistenceManage
          * include the current ledger
          */
         TreeMap<Long, InMemoryLedgerRange> ledgerRanges = new TreeMap<Long, InMemoryLedgerRange>();
-        int ledgerRangesZnodeVersion = -1;
+        Version ledgerRangesVersion = null;
 
         /**
          * This is the handle of the current ledger that is being used to write
@@ -148,17 +144,20 @@ public class BookkeeperPersistenceManage
      *
      * @param bk
      *            a reference to bookkeeper to use.
-     * @param zk
-     *            a zookeeper handle to use.
-     * @param zkPrefix
-     *            the zookeeper subtree that stores the topic to ledger
-     *            information. if this prefix does not exist, it will be
-     *            created.
+     * @param metaManagerFactory
+     *            a metadata manager factory handle to use.
+     * @param tm
+     *            a reference to topic manager.
+     * @param cfg
+     *            Server configuration object
+     * @param executor
+     *            A executor
      */
-    public BookkeeperPersistenceManager(BookKeeper bk, ZooKeeper zk, TopicManager tm, ServerConfiguration cfg,
+    public BookkeeperPersistenceManager(BookKeeper bk, MetadataManagerFactory metaManagerFactory,
+                                        TopicManager tm, ServerConfiguration cfg,
                                         ScheduledExecutorService executor) {
         this.bk = bk;
-        this.zk = zk;
+        this.tpManager = metaManagerFactory.newTopicPersistenceManager();
         this.cfg = cfg;
         this.tm = tm;
         queuer = new TopicOpQueuer(executor);
@@ -357,20 +356,20 @@ public class BookkeeperPersistenceManage
 
             if (needsUpdate) {
                 final LedgerRanges newRanges = builder.build();
-                updateLedgerRangesNode(topic, newRanges, topicInfo.ledgerRangesZnodeVersion,
-                                       new Callback<Integer>() {
-                                           public void operationFinished(Object ctx, Integer newVersion) {
-                                               // Finally, all done
-                                               for (Long k : keysToRemove) {
-                                                   topicInfo.ledgerRanges.remove(k);
-                                               }
-                                               topicInfo.ledgerRangesZnodeVersion = newVersion;
-                                               cb.operationFinished(ctx, null);
-                                           }
-                                           public void operationFailed(Object ctx, PubSubException exception) {
-                                               cb.operationFailed(ctx, exception);
-                                           }
-                                       }, ctx);
+                tpManager.writeTopicPersistenceInfo(
+                topic, newRanges, topicInfo.ledgerRangesVersion, new Callback<Version>() {
+                    public void operationFinished(Object ctx, Version newVersion) {
+                        // Finally, all done
+                        for (Long k : keysToRemove) {
+                            topicInfo.ledgerRanges.remove(k);
+                        }
+                        topicInfo.ledgerRangesVersion = newVersion;
+                        cb.operationFinished(ctx, null);
+                    }
+                    public void operationFailed(Object ctx, PubSubException exception) {
+                        cb.operationFailed(ctx, exception);
+                    }
+                }, ctx);
             } else {
                 cb.operationFinished(ctx, null);
             }
@@ -558,10 +557,6 @@ public class BookkeeperPersistenceManage
         };
     };
 
-    String ledgersPath(ByteString topic) {
-        return cfg.getZkTopicPath(new StringBuilder(), topic).append("/ledgers").toString();
-    }
-
     class AcquireOp extends TopicOpQueuer.AsynchronousOp<Void> {
         public AcquireOp(ByteString topic, Callback<Void> cb, Object ctx) {
             queuer.super(topic, cb, ctx);
@@ -574,60 +569,25 @@ public class BookkeeperPersistenceManage
                 cb.operationFinished(ctx, null);
                 return;
             }
-            // read topic ledgers node data
-            final String zNodePath = ledgersPath(topic);
 
-            zk.getData(zNodePath, false, new SafeAsyncZKCallback.DataCallback() {
+            // read persistence info
+            tpManager.readTopicPersistenceInfo(topic, new Callback<Versioned<LedgerRanges>>() {
                 @Override
-                public void safeProcessResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
-                    if (rc == Code.OK.intValue()) {
-                        processTopicLedgersNodeData(data, stat.getVersion());
-                        return;
-                    }
-
-                    if (rc == Code.NONODE.intValue()) {
-                        // create it
-                        final byte[] initialData = LedgerRanges.getDefaultInstance().toByteArray();
-                        ZkUtils.createFullPathOptimistic(zk, zNodePath, initialData, ZooDefs.Ids.OPEN_ACL_UNSAFE,
-                        CreateMode.PERSISTENT, new SafeAsyncZKCallback.StringCallback() {
-                            @Override
-                            public void safeProcessResult(int rc, String path, Object ctx, String name) {
-                                if (rc != Code.OK.intValue()) {
-                                    KeeperException ke = ZkUtils.logErrorAndCreateZKException(
-                                                             "Could not create ledgers node for topic: " + topic.toStringUtf8(),
-                                                             path, rc);
-                                    cb.operationFailed(ctx, new PubSubException.ServiceDownException(ke));
-                                    return;
-                                }
-                                // initial version is version 1
-                                // (guessing)
-                                processTopicLedgersNodeData(initialData, 0);
-                            }
-                        }, ctx);
-                        return;
+                public void operationFinished(Object ctx, Versioned<LedgerRanges> ranges) {
+                    if (null != ranges) {
+                        processTopicLedgerRanges(ranges.getValue(), ranges.getVersion());
+                    } else {
+                        processTopicLedgerRanges(LedgerRanges.getDefaultInstance(), null);
                     }
-
-                    // otherwise some other error
-                    KeeperException ke = ZkUtils.logErrorAndCreateZKException("Could not read ledgers node for topic: "
-                                         + topic.toStringUtf8(), path, rc);
-                    cb.operationFailed(ctx, new PubSubException.ServiceDownException(ke));
-
+                }
+                @Override
+                public void operationFailed(Object ctx, PubSubException exception) {
+                    cb.operationFailed(ctx, exception);
                 }
             }, ctx);
         }
 
-        void processTopicLedgersNodeData(byte[] data, int version) {
-
-            final LedgerRanges ranges;
-            try {
-                ranges = LedgerRanges.parseFrom(data);
-            } catch (InvalidProtocolBufferException e) {
-                String msg = "Ledger ranges for topic:" + topic.toStringUtf8() + " could not be deserialized";
-                logger.error(msg, e);
-                cb.operationFailed(ctx, new PubSubException.UnexpectedConditionException(msg));
-                return;
-            }
-
+        void processTopicLedgerRanges(final LedgerRanges ranges, final Version version) {
             Iterator<LedgerRange> lrIterator = ranges.getRangesList().iterator();
             TopicInfo topicInfo = new TopicInfo();
 
@@ -670,7 +630,7 @@ public class BookkeeperPersistenceManage
          * @param ledgerId
          *            Ledger to be recovered
          */
-        private void recoverLastTopicLedgerAndOpenNewOne(final long ledgerId, final int expectedVersionOfLedgerNode,
+        private void recoverLastTopicLedgerAndOpenNewOne(final long ledgerId, final Version expectedVersionOfLedgerNode,
                 final TopicInfo topicInfo) {
 
             bk.asyncOpenLedger(ledgerId, DigestType.CRC32, passwd, new SafeAsynBKCallback.OpenCallback() {
@@ -748,7 +708,7 @@ public class BookkeeperPersistenceManage
          *            The version of the ledgers node when we read it, should be
          *            the same when we try to write
          */
-        private void openNewTopicLedger(final int expectedVersionOfLedgersNode, final TopicInfo topicInfo) {
+        private void openNewTopicLedger(final Version expectedVersionOfLedgersNode, final TopicInfo topicInfo) {
             bk.asyncCreateLedger(cfg.getBkEnsembleSize(), cfg.getBkQuorumSize(), DigestType.CRC32, passwd,
             new SafeAsynBKCallback.CreateCallback() {
                 boolean processed = false;
@@ -786,42 +746,26 @@ public class BookkeeperPersistenceManage
                     }
                     builder.addRanges(lastRange);
 
-                    updateLedgerRangesNode(topic, builder.build(), expectedVersionOfLedgersNode,
-                                           new Callback<Integer>() {
-                                               public void operationFinished(Object ctx, Integer newVersion) {
-                                                   // Finally, all done
-                                                   topicInfo.ledgerRangesZnodeVersion = newVersion;
-                                                   topicInfos.put(topic, topicInfo);
-                                                   cb.operationFinished(ctx, null);
-                                               }
-                                               public void operationFailed(Object ctx, PubSubException exception) {
-                                                   cb.operationFailed(ctx, exception);
-                                               }
-                                           }, ctx);
+                    tpManager.writeTopicPersistenceInfo(
+                    topic, builder.build(), expectedVersionOfLedgersNode, new Callback<Version>() {
+                        @Override
+                        public void operationFinished(Object ctx, Version newVersion) {
+                            // Finally, all done
+                            topicInfo.ledgerRangesVersion = newVersion;
+                            topicInfos.put(topic, topicInfo);
+                            cb.operationFinished(ctx, null);
+                        }
+                        @Override
+                        public void operationFailed(Object ctx, PubSubException exception) {
+                            cb.operationFailed(ctx, exception);
+                        }
+                    }, ctx);
                     return;
                 }
             }, ctx);
         }
     }
 
-    public void updateLedgerRangesNode(final ByteString topic, LedgerRanges ranges,
-                                       int version, final Callback<Integer> callback, Object ctx) {
-        final String zNodePath = ledgersPath(topic);
-
-        zk.setData(zNodePath, ranges.toByteArray(), version, new SafeAsyncZKCallback.StatCallback() {
-                @Override
-                public void safeProcessResult(int rc, String path, Object ctx, Stat stat) {
-                    if (rc != KeeperException.Code.OK.intValue()) {
-                        KeeperException ke = ZkUtils.logErrorAndCreateZKException(
-                                "Could not write ledgers node for topic: " + topic.toStringUtf8(), path, rc);
-                        callback.operationFailed(ctx, new PubSubException.ServiceDownException(ke));
-                        return;
-                    }
-                    callback.operationFinished(ctx, stat.getVersion());
-                }
-            }, ctx);
-    }
-
     /**
      * acquire ownership of a topic, doing whatever is needed to be able to
      * perform reads and writes on that topic from here on
@@ -905,4 +849,13 @@ public class BookkeeperPersistenceManage
     public void clearMessageBound(ByteString topic) {
         queuer.pushAndMaybeRun(topic, new SetMessageBoundOp(topic, TopicInfo.UNLIMITED));
     }
+
+    @Override
+    public void stop() {
+        try {
+            tpManager.close();
+        } catch (IOException ioe) {
+            logger.warn("Exception closing topic persistence manager : ", ioe);
+        }
+    }
 }

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/LocalDBPersistenceManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/LocalDBPersistenceManager.java?rev=1360300&r1=1360299&r2=1360300&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/LocalDBPersistenceManager.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/LocalDBPersistenceManager.java Wed Jul 11 17:11:06 2012
@@ -128,6 +128,11 @@ public class LocalDBPersistenceManager i
         }
     }
 
+    @Override
+    public void stop() {
+        // do nothing
+    }
+
     /**
      * Ensures that at least the default seq-id exists in the map for the given
      * topic. Checks for race conditions (.e.g, another thread inserts the

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/PersistenceManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/PersistenceManager.java?rev=1360300&r1=1360299&r2=1360300&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/PersistenceManager.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/PersistenceManager.java Wed Jul 11 17:11:06 2012
@@ -91,4 +91,9 @@ public interface PersistenceManager {
     public void setMessageBound(ByteString topic, Integer bound);
     public void clearMessageBound(ByteString topic);
     public void consumeToBound(ByteString topic);
+
+    /**
+     * Stop persistence manager.
+     */
+    public void stop();
 }

Added: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/MMSubscriptionManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/MMSubscriptionManager.java?rev=1360300&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/MMSubscriptionManager.java (added)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/MMSubscriptionManager.java Wed Jul 11 17:11:06 2012
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server.subscriptions;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.ScheduledExecutorService;
+
+import com.google.protobuf.ByteString;
+import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionState;
+import org.apache.hedwig.server.common.ServerConfiguration;
+import org.apache.hedwig.server.delivery.DeliveryManager;
+import org.apache.hedwig.server.meta.MetadataManagerFactory;
+import org.apache.hedwig.server.meta.SubscriptionDataManager;
+import org.apache.hedwig.server.persistence.PersistenceManager;
+import org.apache.hedwig.server.topics.TopicManager;
+import org.apache.hedwig.util.Callback;
+
+/**
+ * MetaManager-based subscription manager.
+ */
+public class MMSubscriptionManager extends AbstractSubscriptionManager {
+
+    SubscriptionDataManager subManager;
+
+    public MMSubscriptionManager(MetadataManagerFactory metaManagerFactory,
+                                 TopicManager topicMgr, PersistenceManager pm,
+                                 ServerConfiguration cfg,
+                                 ScheduledExecutorService scheduler) {
+        super(cfg, topicMgr, pm, scheduler);
+        this.subManager = metaManagerFactory.newSubscriptionDataManager();
+    }
+
+    @Override
+    protected void readSubscriptions(final ByteString topic,
+                                     final Callback<Map<ByteString, InMemorySubscriptionState>> cb, final Object ctx) {
+        subManager.readSubscriptions(topic, cb, ctx);
+    }
+
+    @Override
+    protected void createSubscriptionState(final ByteString topic, final ByteString subscriberId,
+                                           final SubscriptionState state, final Callback<Void> callback, final Object ctx) {
+        subManager.createSubscriptionState(topic, subscriberId, state, callback, ctx);
+    }
+
+    @Override
+    protected void updateSubscriptionState(final ByteString topic, final ByteString subscriberId,
+                                           final SubscriptionState state, final Callback<Void> callback, final Object ctx) {
+        subManager.updateSubscriptionState(topic, subscriberId, state, callback, ctx);
+    }
+
+    @Override
+    protected void deleteSubscriptionState(final ByteString topic, final ByteString subscriberId,
+                                           final Callback<Void> callback, final Object ctx) {
+        subManager.deleteSubscriptionState(topic, subscriberId, callback, ctx);
+    }
+
+    @Override
+    public void stop() {
+        super.stop();
+        try {
+            subManager.close();
+        } catch (IOException ioe) {
+            logger.warn("Exception closing subscription data manager : ", ioe);
+        }
+    }
+}

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/zookeeper/ZkUtils.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/zookeeper/ZkUtils.java?rev=1360300&r1=1360299&r2=1360300&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/zookeeper/ZkUtils.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/zookeeper/ZkUtils.java Wed Jul 11 17:11:06 2012
@@ -17,6 +17,7 @@
  */
 package org.apache.hedwig.zookeeper;
 
+import java.io.IOException;
 import java.util.List;
 
 import org.slf4j.Logger;
@@ -34,6 +35,43 @@ public class ZkUtils {
 
     static Logger logger = LoggerFactory.getLogger(ZkUtils.class);
 
+    static class SyncObject {
+        int rc;
+        String path;
+        boolean called = false;
+    }
+
+    public static void createFullPathOptimistic(final ZooKeeper zk, final String originalPath, final byte[] data,
+            final List<ACL> acl, final CreateMode createMode)
+    throws KeeperException, IOException, InterruptedException {
+        final SyncObject syncObj = new SyncObject();
+
+        createFullPathOptimistic(
+            zk, originalPath, data, acl, createMode,
+            new SafeAsyncZKCallback.StringCallback() {
+                @Override
+                public void safeProcessResult(final int rc, String path, Object ctx, String name) {
+                    synchronized (syncObj) {
+                        syncObj.rc = rc;
+                        syncObj.path = path;
+                        syncObj.called = true;
+                        syncObj.notify();
+                    }
+                }
+            }, syncObj
+        );
+
+        synchronized (syncObj) {
+            while (!syncObj.called) {
+                syncObj.wait();
+            }
+        }
+
+        if (Code.OK.intValue() != syncObj.rc) {
+            throw KeeperException.create(syncObj.rc, syncObj.path);
+        }
+    }
+
     public static void createFullPathOptimistic(final ZooKeeper zk, final String originalPath, final byte[] data,
             final List<ACL> acl, final CreateMode createMode, final AsyncCallback.StringCallback callback,
             final Object ctx) {

Added: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/meta/MetadataManagerFactoryTestCase.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/meta/MetadataManagerFactoryTestCase.java?rev=1360300&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/meta/MetadataManagerFactoryTestCase.java (added)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/meta/MetadataManagerFactoryTestCase.java Wed Jul 11 17:11:06 2012
@@ -0,0 +1,75 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.hedwig.server.meta;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+import org.apache.hedwig.server.common.ServerConfiguration;
+import org.apache.hedwig.server.meta.MetadataManagerFactory;
+import org.apache.hedwig.server.meta.ZkMetadataManagerFactory;
+import org.apache.hedwig.util.Callback;
+import org.apache.hedwig.zookeeper.ZooKeeperTestBase;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@RunWith(Parameterized.class)
+public abstract class MetadataManagerFactoryTestCase extends ZooKeeperTestBase {
+    static Logger LOG = LoggerFactory.getLogger(MetadataManagerFactoryTestCase.class);
+
+    protected MetadataManagerFactory metadataManagerFactory;
+    protected ServerConfiguration conf;
+
+    public MetadataManagerFactoryTestCase(String metadataManagerFactoryCls) {
+        super();
+        conf = new ServerConfiguration();
+        conf.setMetadataManagerFactoryName(metadataManagerFactoryCls);
+    }
+
+    @Parameters
+    public static Collection<Object[]> configs() {
+        return Arrays.asList(new Object[][] {
+            { ZkMetadataManagerFactory.class.getName() }
+        });
+    }
+
+    @Before
+    @Override
+    public void setUp() throws Exception {
+        super.setUp();
+        metadataManagerFactory = MetadataManagerFactory.newMetadataManagerFactory(conf, zk);
+    }
+
+    @After
+    @Override
+    public void tearDown() throws Exception {
+        metadataManagerFactory.shutdown();
+        super.tearDown();
+    }
+
+}

Added: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/meta/TestFactoryLayout.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/meta/TestFactoryLayout.java?rev=1360300&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/meta/TestFactoryLayout.java (added)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/meta/TestFactoryLayout.java Wed Jul 11 17:11:06 2012
@@ -0,0 +1,84 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.hedwig.server.meta;
+
+import java.io.IOException;
+
+import org.apache.hedwig.protocol.PubSubProtocol.ManagerMeta;
+import org.apache.hedwig.server.common.ServerConfiguration;
+import org.apache.hedwig.zookeeper.ZooKeeperTestBase;
+import org.apache.hedwig.zookeeper.ZkUtils;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.ZooDefs.Ids;
+
+import org.junit.Test;
+import org.junit.Assert;
+
+public class TestFactoryLayout extends ZooKeeperTestBase {
+
+    @Test
+    public void testFactoryLayout() throws Exception {
+        ServerConfiguration conf = new ServerConfiguration();
+        conf.setMetadataManagerFactoryName(
+            "org.apache.hedwig.server.meta.ZkMetadataManager");
+
+        FactoryLayout layout = FactoryLayout.readLayout(zk, conf);
+        Assert.assertTrue("Layout should be null", layout == null);
+
+        String testName = "foobar";
+        int testVersion = 0xdeadbeef;
+        // use layout defined in configuration also create it in zookeeper
+        writeFactoryLayout(conf, testName, testVersion);
+
+        layout = FactoryLayout.readLayout(zk, conf);
+        Assert.assertEquals(testName, layout.getManagerMeta().getManagerImpl());
+        Assert.assertEquals(testVersion, layout.getManagerMeta().getManagerVersion());
+    }
+
+    private void writeFactoryLayout(ServerConfiguration conf, String managerCls,
+                                    int managerVersion)
+        throws Exception {
+        ManagerMeta managerMeta = ManagerMeta.newBuilder()
+                                             .setManagerImpl(managerCls)
+                                             .setManagerVersion(managerVersion)
+                                             .build();
+        FactoryLayout layout = new FactoryLayout(managerMeta);
+        layout.store(zk, conf);
+    }
+
+    @Test
+    public void testCorruptedFactoryLayout() throws Exception {
+        ServerConfiguration conf = new ServerConfiguration();
+        StringBuilder msb = new StringBuilder();
+        String factoryLayoutPath = FactoryLayout.getFactoryLayoutPath(msb, conf);
+        // write corrupted manager layout
+        ZkUtils.createFullPathOptimistic(zk, factoryLayoutPath, "BadLayout".getBytes(),
+                                         Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+
+        try {
+            FactoryLayout.readLayout(zk, conf);
+            Assert.fail("Shouldn't reach here!");
+        } catch (IOException ie) {
+        }
+    }
+}

Added: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/meta/TestMetadataManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/meta/TestMetadataManager.java?rev=1360300&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/meta/TestMetadataManager.java (added)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/meta/TestMetadataManager.java Wed Jul 11 17:11:06 2012
@@ -0,0 +1,237 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.hedwig.server.meta;
+
+import java.util.Map;
+import java.util.concurrent.SynchronousQueue;
+
+import com.google.protobuf.ByteString;
+
+import org.apache.bookkeeper.versioning.Version;
+import org.apache.bookkeeper.versioning.Versioned;
+import org.apache.hedwig.StubCallback;
+import org.apache.hedwig.exceptions.PubSubException;
+import org.apache.hedwig.protocol.PubSubProtocol.LedgerRanges;
+import org.apache.hedwig.protocol.PubSubProtocol.LedgerRange;
+import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId;
+import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionState;
+import org.apache.hedwig.server.subscriptions.InMemorySubscriptionState;
+import org.apache.hedwig.server.meta.MetadataManagerFactory;
+import org.apache.hedwig.util.Either;
+
+import org.junit.Test;
+import org.junit.Assert;
+
+public class TestMetadataManager extends MetadataManagerFactoryTestCase {
+
+    public TestMetadataManager(String metadataManagerFactoryCls) {
+        super(metadataManagerFactoryCls);
+    }
+
+    @Test
+    public void testPersistenceInfo() throws Exception {
+        TopicPersistenceManager tpManager = metadataManagerFactory.newTopicPersistenceManager();
+
+        ByteString topic = ByteString.copyFromUtf8("testPersistenceInfo");
+        StubCallback<Versioned<LedgerRanges>> readCallback = new StubCallback<Versioned<LedgerRanges>>();
+        StubCallback<Version> writeCallback = new StubCallback<Version>();
+        StubCallback<Void> deleteCallback = new StubCallback<Void>();
+
+        // Write non-existed persistence info
+        tpManager.writeTopicPersistenceInfo(topic, LedgerRanges.getDefaultInstance(), null,
+                                            writeCallback, null);
+        Either<Version, PubSubException> res = writeCallback.queue.take();
+        Assert.assertEquals(null, res.right());
+        Version v1 = res.left();
+
+        // read persistence info
+        tpManager.readTopicPersistenceInfo(topic, readCallback, null);
+        Versioned<LedgerRanges> ranges = readCallback.queue.take().left();
+        Assert.assertEquals(Version.Occurred.CONCURRENTLY, v1.compare(ranges.getVersion()));
+        Assert.assertEquals(LedgerRanges.getDefaultInstance(), ranges.getValue());
+
+        LedgerRange lastRange = LedgerRange.newBuilder().setLedgerId(1).build();
+        LedgerRanges.Builder builder = LedgerRanges.newBuilder();
+        builder.addRanges(lastRange);
+        LedgerRanges newRanges = builder.build();
+
+        // write existed persistence info with null version
+        tpManager.writeTopicPersistenceInfo(topic, newRanges, null, writeCallback, null);
+        res = writeCallback.queue.take();
+        Assert.assertNotNull(res.right());
+        Assert.assertTrue(res.right() instanceof PubSubException.TopicPersistenceInfoExistsException);
+
+        // write existed persistence info with right version
+        tpManager.writeTopicPersistenceInfo(topic, newRanges, v1,
+                                            writeCallback, null);
+        res = writeCallback.queue.take();
+        Assert.assertEquals(null, res.right());
+        Version v2 = res.left();
+        Assert.assertEquals(Version.Occurred.AFTER, v2.compare(v1));
+
+        // read persistence info
+        tpManager.readTopicPersistenceInfo(topic, readCallback, null);
+        ranges = readCallback.queue.take().left();
+        Assert.assertEquals(Version.Occurred.CONCURRENTLY, v2.compare(ranges.getVersion()));
+        Assert.assertEquals(newRanges, ranges.getValue());
+
+        lastRange = LedgerRange.newBuilder().setLedgerId(2).build();
+        builder = LedgerRanges.newBuilder();
+        builder.addRanges(lastRange);
+        LedgerRanges newRanges2 = builder.build();
+
+        // write existed persistence info with bad version
+        tpManager.writeTopicPersistenceInfo(topic, newRanges2, v1,
+                                            writeCallback, null);
+        res = writeCallback.queue.take();
+        Assert.assertNotNull(res.right());
+        Assert.assertTrue(res.right() instanceof PubSubException.BadVersionException);
+
+        // read persistence info
+        tpManager.readTopicPersistenceInfo(topic, readCallback, null);
+        ranges = readCallback.queue.take().left();
+        Assert.assertEquals(Version.Occurred.CONCURRENTLY, v2.compare(ranges.getVersion()));
+        Assert.assertEquals(newRanges, ranges.getValue());
+
+        // delete with bad version
+        tpManager.deleteTopicPersistenceInfo(topic, v1, deleteCallback, null);
+        Assert.assertTrue(deleteCallback.queue.take().right() instanceof
+                          PubSubException.BadVersionException);
+
+        // read persistence info
+        tpManager.readTopicPersistenceInfo(topic, readCallback, null);
+        ranges = readCallback.queue.take().left();
+        Assert.assertEquals(Version.Occurred.CONCURRENTLY, v2.compare(ranges.getVersion()));
+        Assert.assertEquals(newRanges, ranges.getValue());
+
+        // delete existed persistence info with right version
+        tpManager.deleteTopicPersistenceInfo(topic, v2, deleteCallback, null);
+        Assert.assertEquals(null, deleteCallback.queue.take().right());
+
+        // read empty persistence info
+        tpManager.readTopicPersistenceInfo(topic, readCallback, null);
+        Assert.assertEquals(null, readCallback.queue.take().left());
+
+        // delete non-existed persistence info
+        tpManager.deleteTopicPersistenceInfo(topic, null, deleteCallback, null);
+        Assert.assertTrue(deleteCallback.queue.take().right() instanceof
+                          PubSubException.NoTopicPersistenceInfoException);
+
+        tpManager.close();
+    }
+
+    @Test
+    public void testSubscriptionState() throws Exception {
+        SubscriptionDataManager subManager = metadataManagerFactory.newSubscriptionDataManager();
+
+        ByteString topic = ByteString.copyFromUtf8("testSubscriptionState");
+        ByteString subid = ByteString.copyFromUtf8("mysub");
+
+        StubCallback<Void> callback = new StubCallback<Void>();
+        StubCallback<SubscriptionState> readCallback = new StubCallback<SubscriptionState>();
+        StubCallback<Map<ByteString, InMemorySubscriptionState>> subsCallback
+            = new StubCallback<Map<ByteString, InMemorySubscriptionState>>();
+
+        subManager.readSubscriptionState(topic, subid, readCallback, null);
+        Either<SubscriptionState, PubSubException> readRes = readCallback.queue.take();
+        Assert.assertEquals("Found inconsistent subscription state", null, readRes.left());
+        Assert.assertEquals("Should not fail with PubSubException", null, readRes.right());
+
+        // read non-existed subscription state
+        subManager.readSubscriptions(topic, subsCallback, null);
+        Either<Map<ByteString, InMemorySubscriptionState>, PubSubException> res = subsCallback.queue.take();
+        Assert.assertEquals("Found more than 0 subscribers", 0, res.left().size());
+        Assert.assertEquals("Should not fail with PubSubException", null, res.right());
+
+        // update non-existed subscription state
+        subManager.updateSubscriptionState(topic, subid, SubscriptionState.getDefaultInstance(),
+                                            callback, null);
+        Assert.assertTrue("Should fail to update a non-existed subscriber with PubSubException",
+                          callback.queue.take().right() instanceof PubSubException.NoSubscriptionStateException);
+
+        // delete non-existed subscription state
+        subManager.deleteSubscriptionState(topic, subid, callback, null);
+        Assert.assertTrue("Should fail to delete a non-existed subscriber with PubSubException",
+                          callback.queue.take().right() instanceof PubSubException.NoSubscriptionStateException);
+
+        long seqId = 10;
+        MessageSeqId.Builder builder = MessageSeqId.newBuilder();
+        builder.setLocalComponent(seqId);
+        MessageSeqId msgId = builder.build();
+
+        SubscriptionState state = SubscriptionState.newBuilder(SubscriptionState.getDefaultInstance()).setMsgId(msgId).build();
+
+        // create a subscription state
+        subManager.createSubscriptionState(topic, subid, state, callback, null);
+        Assert.assertEquals("Should not fail with PubSubException",
+                            null, callback.queue.take().right());
+
+        // read subscriptions
+        subManager.readSubscriptions(topic, subsCallback, null);
+        res = subsCallback.queue.take();
+        Assert.assertEquals("Should find just 1 subscriber", 1, res.left().size());
+        Assert.assertEquals("Should not fail with PubSubException", null, res.right());
+        InMemorySubscriptionState imss = res.left().get(subid);
+        Assert.assertEquals("Found inconsistent subscription state",
+                            state, imss.getSubscriptionState());
+        Assert.assertEquals("Found inconsistent last consumed seq id",
+                            seqId, imss.getLastConsumeSeqId().getLocalComponent());
+
+        // move consume seq id
+        seqId = 99;
+        builder = MessageSeqId.newBuilder();
+        builder.setLocalComponent(seqId);
+        msgId = builder.build();
+
+        state = SubscriptionState.newBuilder(state).setMsgId(msgId).build();
+
+        // update subscription state
+        subManager.updateSubscriptionState(topic, subid, state, callback, null);
+        Assert.assertEquals("Fail to update a subscription state", null, callback.queue.take().right());
+
+        // read subscription state
+        subManager.readSubscriptionState(topic, subid, readCallback, null);
+        Assert.assertEquals("Found inconsistent subscription state",
+                            state, readCallback.queue.take().left());
+
+        // read subscriptions again
+        subManager.readSubscriptions(topic, subsCallback, null);
+        res = subsCallback.queue.take();
+        Assert.assertEquals("Should find just 1 subscriber", 1, res.left().size());
+        Assert.assertEquals("Should not fail with PubSubException", null, res.right());
+        imss = res.left().get(subid);
+        Assert.assertEquals("Found inconsistent subscription state",
+                            state, imss.getSubscriptionState());
+        Assert.assertEquals("Found inconsistent last consumed seq id",
+                            seqId, imss.getLastConsumeSeqId().getLocalComponent());
+
+        subManager.deleteSubscriptionState(topic, subid, callback, null);
+        Assert.assertEquals("Fail to delete an existed subscriber", null, callback.queue.take().right());
+
+        // read subscription states again
+        subManager.readSubscriptions(topic, subsCallback, null);
+        res = subsCallback.queue.take();
+        Assert.assertEquals("Found more than 0 subscribers", 0, res.left().size());
+        Assert.assertEquals("Should not fail with PubSubException", null, res.right());
+
+        subManager.close();
+    }
+}

Added: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/meta/TestMetadataManagerFactory.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/meta/TestMetadataManagerFactory.java?rev=1360300&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/meta/TestMetadataManagerFactory.java (added)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/meta/TestMetadataManagerFactory.java Wed Jul 11 17:11:06 2012
@@ -0,0 +1,291 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.hedwig.server.meta;
+
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.ZooDefs.Ids;
+import java.io.IOException;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.CountDownLatch;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hedwig.protocol.PubSubProtocol.ManagerMeta;
+import org.apache.hedwig.server.common.ServerConfiguration;
+import org.apache.hedwig.zookeeper.ZooKeeperTestBase;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.Assert;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TestMetadataManagerFactory extends ZooKeeperTestBase {
+    static Logger LOG = LoggerFactory.getLogger(TestMetadataManagerFactory.class);
+
+    static class TestServerConfiguration extends ServerConfiguration {
+        String hedwigPrefix = "/hedwig";
+
+        @Override
+        public String getZkPrefix() {
+            return hedwigPrefix;
+        }
+
+        public void setZkPrefix(String prefix) {
+            this.hedwigPrefix = prefix;
+        }
+    }
+
+    static class DummyMetadataManagerFactory extends MetadataManagerFactory {
+        static int VERSION = 10;
+
+        public int getCurrentVersion() { return VERSION; }
+
+
+        public MetadataManagerFactory initialize(ServerConfiguration cfg,
+                                                 ZooKeeper zk,
+                                                 int version)
+        throws IOException {
+            if (version != VERSION) {
+                throw new IOException("unmatched manager version");
+            }
+            // do nothing
+            return this;
+        }
+
+        public void shutdown() {}
+
+        public TopicPersistenceManager newTopicPersistenceManager() {
+            return null;
+        }
+
+        public SubscriptionDataManager newSubscriptionDataManager() {
+            return null;
+        }
+    }
+
+    private void writeFactoryLayout(ServerConfiguration conf,
+                                    String factoryCls,
+                                    int factoryVersion)
+        throws Exception {
+        ManagerMeta meta = ManagerMeta.newBuilder()
+                                      .setManagerImpl(factoryCls)
+                                      .setManagerVersion(factoryVersion).build();
+        new FactoryLayout(meta).store(zk, conf);
+    }
+
+    /**
+     * Test bad server configuration
+     */
+    @Test
+    public void testBadConf() throws Exception {
+        TestServerConfiguration conf = new TestServerConfiguration();
+
+        String root0 = "/goodconf";
+        conf.setZkPrefix(root0);
+
+        MetadataManagerFactory m =
+            MetadataManagerFactory.newMetadataManagerFactory(conf, zk);
+        Assert.assertTrue("MetadataManagerFactory is unexpected type",
+                          (m instanceof ZkMetadataManagerFactory));
+
+        // mismatching conf
+        conf.setMetadataManagerFactoryName(DummyMetadataManagerFactory.class.getName());
+        try {
+            MetadataManagerFactory.newMetadataManagerFactory(conf, zk);
+            Assert.fail("Shouldn't reach here");
+        } catch (Exception e) {
+            Assert.assertTrue("Invalid exception",
+                              e.getMessage().contains("does not match existing factory"));
+        }
+
+        // invalid metadata manager
+        String root1 = "/badconf1";
+        conf.setZkPrefix(root1);
+        conf.setMetadataManagerFactoryName("DoesNotExist");
+        try {
+            MetadataManagerFactory.newMetadataManagerFactory(conf, zk);
+            Assert.fail("Shouldn't reach here");
+        } catch (Exception e) {
+            Assert.assertTrue("Invalid exception",
+                              e.getMessage().contains("Failed to get metadata manager factory class from configuration"));
+        }
+    }
+
+    /**
+     * Test bad zk configuration
+     */
+    @Test
+    public void testBadZkContents() throws Exception {
+        TestServerConfiguration conf = new TestServerConfiguration();
+
+        // bad type in zookeeper
+        String root0 = "/badzk0";
+        conf.setZkPrefix(root0);
+
+        writeFactoryLayout(conf, "DoesNotExist", 0xdeadbeef);
+        try {
+            MetadataManagerFactory.newMetadataManagerFactory(conf, zk);
+            Assert.fail("Shouldn't reach here");
+        } catch (Exception e) {
+            Assert.assertTrue("Invalid exception",
+                              e.getMessage().contains("No class found to instantiate metadata manager factory"));
+        }
+
+        // bad version in zookeeper
+        String root1 = "/badzk1";
+        conf.setZkPrefix(root1);
+
+        writeFactoryLayout(conf, ZkMetadataManagerFactory.class.getName(), 0xdeadbeef);
+        try {
+            MetadataManagerFactory.newMetadataManagerFactory(conf, zk);
+            Assert.fail("Shouldn't reach here");
+        } catch (Exception e) {
+            Assert.assertTrue("Invalid exception",
+                              e.getMessage().contains("Incompatible ZkMetadataManagerFactory version"));
+        }
+    }
+
+    private class CreateMMThread extends Thread {
+        private boolean success = false;
+        private final String factoryCls;
+        private final String root;
+        private final CyclicBarrier barrier;
+        private ZooKeeper zkc;
+
+        CreateMMThread(String root, String factoryCls, CyclicBarrier barrier) throws Exception {
+            this.factoryCls = factoryCls;
+            this.barrier = barrier;
+            this.root = root;
+            final CountDownLatch latch = new CountDownLatch(1);
+            zkc = new ZooKeeper(hostPort, 10000, new Watcher() {
+                public void process(WatchedEvent event) {
+                    latch.countDown();
+                }
+            });
+            latch.await();
+        }
+
+        public void run() {
+            TestServerConfiguration conf = new TestServerConfiguration();
+            conf.setZkPrefix(root);
+            conf.setMetadataManagerFactoryName(factoryCls);
+
+            try {
+                barrier.await();
+                MetadataManagerFactory.newMetadataManagerFactory(conf, zkc);
+                success = true;
+            } catch (Exception e) {
+                LOG.error("Failed to create metadata manager factory", e);
+            }
+        }
+
+        public boolean isSuccessful() {
+            return success;
+        }
+
+        public void close() throws Exception {
+            zkc.close();
+        }
+    }
+
+    // test concurrent
+    @Test
+    public void testConcurrent1() throws Exception {
+        /// everyone creates the same
+        int numThreads = 50;
+
+        // bad version in zookeeper
+        String root0 = "/lmroot0";
+
+        CyclicBarrier barrier = new CyclicBarrier(numThreads+1);
+        List<CreateMMThread> threads = new ArrayList<CreateMMThread>(numThreads);
+        for (int i = 0; i < numThreads; i++) {
+            CreateMMThread t = new CreateMMThread(root0, ZkMetadataManagerFactory.class.getName(), barrier);
+            t.start();
+            threads.add(t);
+        }
+
+        barrier.await();
+
+        boolean success = true;
+        for (CreateMMThread t : threads) {
+            t.join();
+            t.close();
+            success = t.isSuccessful() && success;
+        }
+        Assert.assertTrue("Not all metadata manager factories created", success);
+    }
+
+    @Test
+    public void testConcurrent2() throws Exception {
+        /// odd create different
+        int numThreadsEach = 25;
+
+        // bad version in zookeeper
+        String root0 = "/lmroot0";
+
+        CyclicBarrier barrier = new CyclicBarrier(numThreadsEach*2+1);
+        List<CreateMMThread> threadsA = new ArrayList<CreateMMThread>(numThreadsEach);
+        for (int i = 0; i < numThreadsEach; i++) {
+            CreateMMThread t = new CreateMMThread(root0, ZkMetadataManagerFactory.class.getName(), barrier);
+            t.start();
+            threadsA.add(t);
+        }
+        List<CreateMMThread> threadsB = new ArrayList<CreateMMThread>(numThreadsEach);
+        for (int i = 0; i < numThreadsEach; i++) {
+            CreateMMThread t = new CreateMMThread(root0, DummyMetadataManagerFactory.class.getName(), barrier);
+            t.start();
+            threadsB.add(t);
+        }
+
+        barrier.await();
+
+        int numSuccess = 0;
+        int numFails = 0;
+        for (CreateMMThread t : threadsA) {
+            t.join();
+            t.close();
+            if (t.isSuccessful()) {
+                numSuccess++;
+            } else {
+                numFails++;
+            }
+        }
+
+        for (CreateMMThread t : threadsB) {
+            t.join();
+            t.close();
+            if (t.isSuccessful()) {
+                numSuccess++;
+            } else {
+                numFails++;
+            }
+        }
+        Assert.assertEquals("Incorrect number of successes", numThreadsEach, numSuccess);
+        Assert.assertEquals("Incorrect number of failures", numThreadsEach, numFails);
+    }
+}

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/StubPersistenceManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/StubPersistenceManager.java?rev=1360300&r1=1360299&r2=1360300&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/StubPersistenceManager.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/StubPersistenceManager.java Wed Jul 11 17:11:06 2012
@@ -121,4 +121,9 @@ public class StubPersistenceManager impl
         request.getCallback().scanFinished(request.getCtx(), ReasonForFinish.NUM_MESSAGES_LIMIT_EXCEEDED);
 
     }
+
+    @Override
+    public void stop() {
+        // do nothing
+    }
 }

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestBookKeeperPersistenceManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestBookKeeperPersistenceManager.java?rev=1360300&r1=1360299&r2=1360300&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestBookKeeperPersistenceManager.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestBookKeeperPersistenceManager.java Wed Jul 11 17:11:06 2012
@@ -32,6 +32,7 @@ import org.apache.hedwig.HelperMethods;
 import org.apache.hedwig.exceptions.PubSubException;
 import org.apache.hedwig.protocol.PubSubProtocol.Message;
 import org.apache.hedwig.server.common.ServerConfiguration;
+import org.apache.hedwig.server.meta.MetadataManagerFactory;
 import org.apache.hedwig.server.topics.TopicManager;
 import org.apache.hedwig.server.topics.TrivialOwnAllTopicManager;
 import org.apache.hedwig.util.Callback;
@@ -56,6 +57,7 @@ public class TestBookKeeperPersistenceMa
     TopicManager tm;
     BookkeeperPersistenceManager manager;
     PubSubException failureException = null;
+    MetadataManagerFactory metadataManagerFactory;
 
     @Override
     @Before
@@ -73,15 +75,21 @@ public class TestBookKeeperPersistenceMa
         .setThrottleValue(3);
         conf.addConf(bkClientConf);
 
+        metadataManagerFactory =
+            MetadataManagerFactory.newMetadataManagerFactory(conf, bktb.getZooKeeperClient());
+
         scheduler = Executors.newScheduledThreadPool(1);
         tm = new TrivialOwnAllTopicManager(conf, scheduler);
-        manager = new BookkeeperPersistenceManager(bktb.bk, bktb.getZooKeeperClient(), tm, conf, scheduler);
+        manager = new BookkeeperPersistenceManager(bktb.bk, metadataManagerFactory,
+                                                   tm, conf, scheduler);
     }
 
     @Override
     @After
     protected void tearDown() throws Exception {
         tm.stop();
+        manager.stop();
+        metadataManagerFactory.shutdown();
         scheduler.shutdown();
         bktb.tearDown();
         super.tearDown();

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestBookKeeperPersistenceManagerBlackBox.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestBookKeeperPersistenceManagerBlackBox.java?rev=1360300&r1=1360299&r2=1360300&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestBookKeeperPersistenceManagerBlackBox.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestBookKeeperPersistenceManagerBlackBox.java Wed Jul 11 17:11:06 2012
@@ -27,12 +27,15 @@ import org.junit.After;
 import org.junit.Before;
 
 import org.apache.hedwig.server.common.ServerConfiguration;
+import org.apache.hedwig.server.meta.MetadataManagerFactory;
 import org.apache.hedwig.server.topics.TrivialOwnAllTopicManager;
 
 public class TestBookKeeperPersistenceManagerBlackBox extends TestPersistenceManagerBlackBox {
     BookKeeperTestBase bktb;
     private final int numBookies = 3;
 
+    MetadataManagerFactory metadataManagerFactory = null;
+
     @Override
     @Before
     protected void setUp() throws Exception {
@@ -48,6 +51,9 @@ public class TestBookKeeperPersistenceMa
     protected void tearDown() throws Exception {
         bktb.tearDown();
         super.tearDown();
+        if (null != metadataManagerFactory) {
+            metadataManagerFactory.shutdown();
+        }
     }
 
     @Override
@@ -60,8 +66,12 @@ public class TestBookKeeperPersistenceMa
         ServerConfiguration conf = new ServerConfiguration();
         ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
 
-        return new BookkeeperPersistenceManager(bktb.bk, bktb.getZooKeeperClient(), new TrivialOwnAllTopicManager(conf,
-                                                scheduler), conf, scheduler);
+        metadataManagerFactory =
+            MetadataManagerFactory.newMetadataManagerFactory(conf, bktb.getZooKeeperClient());
+
+        return new BookkeeperPersistenceManager(bktb.bk, metadataManagerFactory,
+                                                new TrivialOwnAllTopicManager(conf, scheduler),
+                                                conf, scheduler);
     }
 
     @Override

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestBookkeeperPersistenceManagerWhiteBox.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestBookkeeperPersistenceManagerWhiteBox.java?rev=1360300&r1=1360299&r2=1360300&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestBookkeeperPersistenceManagerWhiteBox.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestBookkeeperPersistenceManagerWhiteBox.java Wed Jul 11 17:11:06 2012
@@ -35,6 +35,7 @@ import org.apache.hedwig.HelperMethods;
 import org.apache.hedwig.StubCallback;
 import org.apache.hedwig.protocol.PubSubProtocol.Message;
 import org.apache.hedwig.server.common.ServerConfiguration;
+import org.apache.hedwig.server.meta.MetadataManagerFactory;
 import org.apache.hedwig.server.topics.TopicManager;
 import org.apache.hedwig.server.topics.TrivialOwnAllTopicManager;
 import org.apache.hedwig.util.ConcurrencyUtils;
@@ -44,6 +45,7 @@ public class TestBookkeeperPersistenceMa
     BookKeeperTestBase bktb;
     private final int numBookies = 3;
     BookkeeperPersistenceManager bkpm;
+    MetadataManagerFactory mm;
     ServerConfiguration conf;
     ScheduledExecutorService scheduler;
     TopicManager tm;
@@ -60,12 +62,15 @@ public class TestBookkeeperPersistenceMa
         scheduler = Executors.newScheduledThreadPool(1);
         tm = new TrivialOwnAllTopicManager(conf, scheduler);
 
-        bkpm = new BookkeeperPersistenceManager(bktb.bk, bktb.getZooKeeperClient(), tm, conf, scheduler);
+        mm = MetadataManagerFactory.newMetadataManagerFactory(conf, bktb.getZooKeeperClient());
+
+        bkpm = new BookkeeperPersistenceManager(bktb.bk, mm, tm, conf, scheduler);
     }
 
     @Override
     @After
     protected void tearDown() throws Exception {
+        mm.shutdown();
         bktb.tearDown();
         super.tearDown();
     }
@@ -78,7 +83,7 @@ public class TestBookkeeperPersistenceMa
         assertNull(ConcurrencyUtils.take(stubCallback.queue).right());
         // now abandon, and try another time, the prev ledger should be dirty
 
-        bkpm = new BookkeeperPersistenceManager(new BookKeeper(bktb.getZkHostPort()), bktb.getZooKeeperClient(), tm,
+        bkpm = new BookkeeperPersistenceManager(new BookKeeper(bktb.getZkHostPort()), mm, tm,
                                                 conf, scheduler);
         bkpm.acquiredTopic(topic, stubCallback, null);
         assertNull(ConcurrencyUtils.take(stubCallback.queue).right());

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestPersistenceManagerBlackBox.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestPersistenceManagerBlackBox.java?rev=1360300&r1=1360299&r2=1360300&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestPersistenceManagerBlackBox.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestPersistenceManagerBlackBox.java Wed Jul 11 17:11:06 2012
@@ -229,6 +229,7 @@ public abstract class TestPersistenceMan
     @Override
     protected void tearDown() throws Exception {
         logger.info("tearDown starting");
+        persistenceManager.stop();
         super.tearDown();
         logger.info("FINISHED " + getName());
     }

Copied: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/subscriptions/TestMMSubscriptionManager.java (from r1358895, zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/subscriptions/TestZkSubscriptionManager.java)
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/subscriptions/TestMMSubscriptionManager.java?p2=zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/subscriptions/TestMMSubscriptionManager.java&p1=zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/subscriptions/TestZkSubscriptionManager.java&r1=1358895&r2=1360300&rev=1360300&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/subscriptions/TestZkSubscriptionManager.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/subscriptions/TestMMSubscriptionManager.java Wed Jul 11 17:11:06 2012
@@ -23,6 +23,7 @@ import java.util.concurrent.SynchronousQ
 
 import org.junit.Assert;
 import org.junit.Test;
+import org.junit.Before;
 
 import com.google.protobuf.ByteString;
 import org.apache.hedwig.exceptions.PubSubException;
@@ -30,15 +31,17 @@ import org.apache.hedwig.protocol.PubSub
 import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest;
 import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach;
 import org.apache.hedwig.server.common.ServerConfiguration;
-import org.apache.hedwig.server.topics.TrivialOwnAllTopicManager;
 import org.apache.hedwig.server.persistence.LocalDBPersistenceManager;
+import org.apache.hedwig.server.topics.TrivialOwnAllTopicManager;
+import org.apache.hedwig.server.meta.MetadataManagerFactory;
 import org.apache.hedwig.util.ConcurrencyUtils;
 import org.apache.hedwig.util.Either;
 import org.apache.hedwig.util.Callback;
 import org.apache.hedwig.zookeeper.ZooKeeperTestBase;
 
-public class TestZkSubscriptionManager extends ZooKeeperTestBase {
-    ZkSubscriptionManager sm;
+public class TestMMSubscriptionManager extends ZooKeeperTestBase {
+    MetadataManagerFactory mm;
+    MMSubscriptionManager sm;
     ServerConfiguration cfg = new ServerConfiguration();
     SynchronousQueue<Either<MessageSeqId, PubSubException>> msgIdCallbackQueue = new SynchronousQueue<Either<MessageSeqId, PubSubException>>();
     SynchronousQueue<Either<Boolean, PubSubException>> BooleanCallbackQueue = new SynchronousQueue<Either<Boolean, PubSubException>>();
@@ -46,12 +49,14 @@ public class TestZkSubscriptionManager e
     Callback<Void> voidCallback;
     Callback<MessageSeqId> msgIdCallback;
 
+    @Before
     @Override
     public void setUp() throws Exception {
         super.setUp();
         cfg = new ServerConfiguration();
         final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
-        sm = new ZkSubscriptionManager(zk, new TrivialOwnAllTopicManager(cfg, scheduler),
+        mm = MetadataManagerFactory.newMetadataManagerFactory(cfg, zk);
+        sm = new MMSubscriptionManager(mm, new TrivialOwnAllTopicManager(cfg, scheduler),
                                        LocalDBPersistenceManager.instance(), cfg, scheduler);
         msgIdCallback = new Callback<MessageSeqId>() {
             @Override



Mime
View raw message