bookkeeper-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From si...@apache.org
Subject [14/49] bookkeeper git commit: BOOKKEEPER-769: Remove the Hedwig Code
Date Wed, 16 Mar 2016 03:44:24 GMT
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/meta/ZkMetadataManagerFactory.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/meta/ZkMetadataManagerFactory.java b/hedwig-server/src/main/java/org/apache/hedwig/server/meta/ZkMetadataManagerFactory.java
deleted file mode 100644
index e50bc7f..0000000
--- a/hedwig-server/src/main/java/org/apache/hedwig/server/meta/ZkMetadataManagerFactory.java
+++ /dev/null
@@ -1,840 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.meta;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-import java.util.Iterator;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.KeeperException.Code;
-import org.apache.zookeeper.ZKUtil;
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.ZooDefs.Ids;
-import org.apache.zookeeper.data.Stat;
-import com.google.protobuf.ByteString;
-import com.google.protobuf.InvalidProtocolBufferException;
-import org.apache.bookkeeper.versioning.Version;
-import org.apache.bookkeeper.versioning.Versioned;
-import org.apache.bookkeeper.meta.ZkVersion;
-import org.apache.hedwig.exceptions.PubSubException;
-import org.apache.hedwig.protocol.PubSubProtocol.LedgerRanges;
-import org.apache.hedwig.protocol.PubSubProtocol.StatusCode;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionData;
-import org.apache.hedwig.protoextensions.SubscriptionStateUtils;
-import org.apache.hedwig.server.common.ServerConfiguration;
-import org.apache.hedwig.server.topics.HubInfo;
-import org.apache.hedwig.util.Callback;
-import org.apache.hedwig.zookeeper.SafeAsyncZKCallback;
-import org.apache.hedwig.zookeeper.ZkUtils;
-import static com.google.common.base.Charsets.UTF_8;
-
-/**
- * ZooKeeper-based Metadata Manager.
- */
-public class ZkMetadataManagerFactory extends MetadataManagerFactory {
-    protected final static Logger logger = LoggerFactory.getLogger(ZkMetadataManagerFactory.class);
-
-    static final int CUR_VERSION = 1;
-
-    ZooKeeper zk;
-    ServerConfiguration cfg;
-
-    @Override
-    public int getCurrentVersion() {
-        return CUR_VERSION;
-    }
-
-    @Override
-    public MetadataManagerFactory initialize(ServerConfiguration cfg,
-                                             ZooKeeper zk,
-                                             int version)
-    throws IOException {
-        if (CUR_VERSION != version) {
-            throw new IOException("Incompatible ZkMetadataManagerFactory version " + version
-                                + " found, expected version " + CUR_VERSION);
-        }
-        this.cfg = cfg;
-        this.zk = zk;
-        return this;
-    }
-
-    @Override
-    public void shutdown() {
-        // do nothing here, because zookeeper handle is passed from outside
-        // we don't need to stop it.
-    }
-
-    @Override
-    public Iterator<ByteString> getTopics() throws IOException {
-        List<String> topics;
-        try {
-            topics = zk.getChildren(cfg.getZkTopicsPrefix(new StringBuilder()).toString(), false);
-        } catch (KeeperException ke) {
-            throw new IOException("Failed to get topics list : ", ke);
-        } catch (InterruptedException ie) {
-            throw new IOException("Interrupted on getting topics list : ", ie);
-        }
-        final Iterator<String> iter = topics.iterator();
-        return new Iterator<ByteString>() {
-            @Override
-            public boolean hasNext() {
-                return iter.hasNext();
-            }
-            @Override
-            public ByteString next() {
-                String t = iter.next();
-                return ByteString.copyFromUtf8(t);
-            }
-            @Override
-            public void remove() {
-                iter.remove();
-            }
-        };
-    }
-
-    @Override
-    public TopicPersistenceManager newTopicPersistenceManager() {
-        return new ZkTopicPersistenceManagerImpl(cfg, zk);
-    }
-
-    @Override
-    public SubscriptionDataManager newSubscriptionDataManager() {
-        return new ZkSubscriptionDataManagerImpl(cfg, zk);
-    }
-
-    @Override
-    public TopicOwnershipManager newTopicOwnershipManager() {
-        return new ZkTopicOwnershipManagerImpl(cfg, zk);
-    }
-
-    /**
-     * ZooKeeper based topic persistence manager.
-     */
-    static class ZkTopicPersistenceManagerImpl implements TopicPersistenceManager {
-
-        ZooKeeper zk;
-        ServerConfiguration cfg;
-
-        ZkTopicPersistenceManagerImpl(ServerConfiguration conf, ZooKeeper zk) {
-            this.cfg = conf;
-            this.zk = zk;
-        }
-
-        @Override
-        public void close() throws IOException {
-            // do nothing in zookeeper based impl
-        }
-
-        /**
-         * Get znode path to store persistence info of a topic.
-         *
-         * @param topic
-         *          Topic name
-         * @return znode path to store persistence info.
-         */
-        private String ledgersPath(ByteString topic) {
-            return cfg.getZkTopicPath(new StringBuilder(), topic).append("/ledgers").toString();
-        }
-
-        /**
-         * Parse ledger ranges data and return it thru callback.
-         *
-         * @param topic
-         *          Topic name
-         * @param data
-         *          Topic Ledger Ranges data
-         * @param version
-         *          Version of the topic ledger ranges data
-         * @param callback
-         *          Callback to return ledger ranges
-         * @param ctx
-         *          Context of the callback
-         */
-        private void parseAndReturnTopicLedgerRanges(ByteString topic, byte[] data, int version,
-                                                     Callback<Versioned<LedgerRanges>> callback, Object ctx) {
-            try {
-                Versioned<LedgerRanges> ranges = new Versioned<LedgerRanges>(LedgerRanges.parseFrom(data),
-                                                                             new ZkVersion(version));
-                callback.operationFinished(ctx, ranges);
-                return;
-            } catch (InvalidProtocolBufferException e) {
-                String msg = "Ledger ranges for topic:" + topic.toStringUtf8() + " could not be deserialized";
-                logger.error(msg, e);
-                callback.operationFailed(ctx, new PubSubException.UnexpectedConditionException(msg));
-                return;
-            }
-        }
-
-        @Override
-        public void readTopicPersistenceInfo(final ByteString topic,
-                                             final Callback<Versioned<LedgerRanges>> callback,
-                                             Object ctx) {
-            // read topic ledgers node data
-            final String zNodePath = ledgersPath(topic);
-
-            zk.getData(zNodePath, false, new SafeAsyncZKCallback.DataCallback() {
-                @Override
-                public void safeProcessResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
-                    if (rc == Code.OK.intValue()) {
-                        parseAndReturnTopicLedgerRanges(topic, data, stat.getVersion(), callback, ctx);
-                        return;
-                    }
-
-                    if (rc == Code.NONODE.intValue()) {
-                        // we don't create the znode until we first write it.
-                        callback.operationFinished(ctx, null);
-                        return;
-                    }
-
-                    // otherwise some other error
-                    KeeperException ke =
-                        ZkUtils.logErrorAndCreateZKException("Could not read ledgers node for topic: "
-                                                             + topic.toStringUtf8(), path, rc);
-                    callback.operationFailed(ctx, new PubSubException.ServiceDownException(ke));
-                }
-            }, ctx);
-        }
-
-        private void createTopicPersistenceInfo(final ByteString topic, LedgerRanges ranges,
-                                                final Callback<Version> callback, Object ctx) {
-            final String zNodePath = ledgersPath(topic);
-            final byte[] data = ranges.toByteArray();
-            // create it
-            ZkUtils.createFullPathOptimistic(zk, zNodePath, data, Ids.OPEN_ACL_UNSAFE,
-            CreateMode.PERSISTENT, new SafeAsyncZKCallback.StringCallback() {
-                @Override
-                public void safeProcessResult(int rc, String path, Object ctx, String name) {
-                    if (rc == Code.NODEEXISTS.intValue()) {
-                        callback.operationFailed(ctx, PubSubException.create(StatusCode.TOPIC_PERSISTENCE_INFO_EXISTS,
-                                                      "Persistence info of topic " + topic.toStringUtf8() + " existed."));
-                        return;
-                    }
-                    if (rc != Code.OK.intValue()) {
-                        KeeperException ke = ZkUtils.logErrorAndCreateZKException(
-                                             "Could not create ledgers node for topic: " + topic.toStringUtf8(),
-                                             path, rc);
-                        callback.operationFailed(ctx, new PubSubException.ServiceDownException(ke));
-                        return;
-                    }
-                    // initial version is version 0
-                    callback.operationFinished(ctx, new ZkVersion(0));
-                }
-            }, ctx);
-            return;
-        }
-
-        @Override
-        public void writeTopicPersistenceInfo(final ByteString topic, LedgerRanges ranges, final Version version,
-                                              final Callback<Version> callback, Object ctx) {
-            if (Version.NEW == version) {
-                createTopicPersistenceInfo(topic, ranges, callback, ctx);
-                return;
-            }
-
-            final String zNodePath = ledgersPath(topic);
-            final byte[] data = ranges.toByteArray();
-
-            if (!(version instanceof ZkVersion)) {
-                callback.operationFailed(ctx, new PubSubException.UnexpectedConditionException(
-                                              "Invalid version provided to update persistence info for topic " + topic.toStringUtf8()));
-                return;
-            }
-
-            int znodeVersion = ((ZkVersion)version).getZnodeVersion();
-            zk.setData(zNodePath, data, znodeVersion, new SafeAsyncZKCallback.StatCallback() {
-                    @Override
-                    public void safeProcessResult(int rc, String path, Object ctx, Stat stat) {
-                        if (rc == Code.NONODE.intValue()) {
-                            // no node
-                            callback.operationFailed(ctx, PubSubException.create(StatusCode.NO_TOPIC_PERSISTENCE_INFO,
-                                                          "No persistence info found for topic " + topic.toStringUtf8()));
-                            return;
-                        } else if (rc == Code.BADVERSION.intValue()) {
-                            // bad version
-                            callback.operationFailed(ctx, PubSubException.create(StatusCode.BAD_VERSION,
-                                                          "Bad version provided to update persistence info of topic " + topic.toStringUtf8()));
-                            return;
-                        } else if (rc == Code.OK.intValue()) {
-                            callback.operationFinished(ctx, new ZkVersion(stat.getVersion()));
-                            return;
-                        } else {
-                            KeeperException ke = ZkUtils.logErrorAndCreateZKException(
-                                    "Could not write ledgers node for topic: " + topic.toStringUtf8(), path, rc);
-                            callback.operationFailed(ctx, new PubSubException.ServiceDownException(ke));
-                            return;
-                        }
-                    }
-            }, ctx);
-        }
-
-        @Override
-        public void deleteTopicPersistenceInfo(final ByteString topic, final Version version,
-                                               final Callback<Void> callback, Object ctx) {
-            final String zNodePath = ledgersPath(topic);
-
-            int znodeVersion = -1;
-            if (Version.ANY != version) {
-                if (!(version instanceof ZkVersion)) {
-                    callback.operationFailed(ctx, new PubSubException.UnexpectedConditionException(
-                                                  "Invalid version provided to delete persistence info for topic " + topic.toStringUtf8()));
-                    return;
-                } else {
-                    znodeVersion = ((ZkVersion)version).getZnodeVersion();
-                }
-            }
-            zk.delete(zNodePath, znodeVersion, new SafeAsyncZKCallback.VoidCallback() {
-                @Override
-                public void safeProcessResult(int rc, String path, Object ctx) {
-                    if (rc == Code.OK.intValue()) {
-                        callback.operationFinished(ctx, null);
-                        return;
-                    } else if (rc == Code.NONODE.intValue()) {
-                        // no node
-                        callback.operationFailed(ctx, PubSubException.create(StatusCode.NO_TOPIC_PERSISTENCE_INFO,
-                                                      "No persistence info found for topic " + topic.toStringUtf8()));
-                        return;
-                    } else if (rc == Code.BADVERSION.intValue()) {
-                        // bad version
-                        callback.operationFailed(ctx, PubSubException.create(StatusCode.BAD_VERSION,
-                                                      "Bad version provided to delete persistence info of topic " + topic.toStringUtf8()));
-                        return;
-                    }
-
-                    KeeperException e = ZkUtils.logErrorAndCreateZKException("Topic: " + topic.toStringUtf8()
-                                        + " failed to delete persistence info @version " + version + " : ", path, rc);
-                    callback.operationFailed(ctx, new PubSubException.ServiceDownException(e));
-                }
-            }, ctx);
-        }
-    }
-
-    /**
-     * ZooKeeper based subscription data manager.
-     */
-    static class ZkSubscriptionDataManagerImpl implements SubscriptionDataManager {
-
-        ZooKeeper zk;
-        ServerConfiguration cfg;
-
-        ZkSubscriptionDataManagerImpl(ServerConfiguration conf, ZooKeeper zk) {
-            this.cfg = conf;
-            this.zk = zk;
-        }
-
-        @Override
-        public void close() throws IOException {
-            // do nothing in zookeeper based impl
-        }
-
-        /**
-         * Get znode path to store subscription states.
-         *
-         * @param sb
-         *          String builder to store the znode path.
-         * @param topic
-         *          Topic name.
-         *
-         * @return string builder to store znode path.
-         */
-        private StringBuilder topicSubscribersPath(StringBuilder sb, ByteString topic) {
-            return cfg.getZkTopicPath(sb, topic).append("/subscribers");
-        }
-
-        /**
-         * Get znode path to store subscription state for a specified subscriber.
-         *
-         * @param topic
-         *          Topic name.
-         * @param subscriber
-         *          Subscriber id.
-         * @return znode path to store subscription state.
-         */
-        private String topicSubscriberPath(ByteString topic, ByteString subscriber) {
-            return topicSubscribersPath(new StringBuilder(), topic).append("/").append(subscriber.toStringUtf8())
-                   .toString();
-        }
-
-        @Override
-        public boolean isPartialUpdateSupported() {
-            return false;
-        }
-
-        @Override
-        public void createSubscriptionData(final ByteString topic, final ByteString subscriberId, final SubscriptionData data,
-                                           final Callback<Version> callback, final Object ctx) {
-            ZkUtils.createFullPathOptimistic(zk, topicSubscriberPath(topic, subscriberId), data.toByteArray(),
-            Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, new SafeAsyncZKCallback.StringCallback() {
-
-                @Override
-                public void safeProcessResult(int rc, String path, Object ctx, String name) {
-
-                    if (rc == Code.NODEEXISTS.intValue()) {
-                        callback.operationFailed(ctx, PubSubException.create(StatusCode.SUBSCRIPTION_STATE_EXISTS,
-                                                      "Subscription state for (topic:" + topic.toStringUtf8() + ", subscriber:"
-                                                      + subscriberId.toStringUtf8() + ") existed."));
-                        return;
-                    } else if (rc == Code.OK.intValue()) {
-                        if (logger.isDebugEnabled()) {
-                            logger.debug("Successfully recorded subscription for topic: " + topic.toStringUtf8()
-                                         + " subscriberId: " + subscriberId.toStringUtf8() + " data: "
-                                         + SubscriptionStateUtils.toString(data));
-                        }
-                        callback.operationFinished(ctx, new ZkVersion(0));
-                    } else {
-                        KeeperException ke = ZkUtils.logErrorAndCreateZKException(
-                                                 "Could not record new subscription for topic: " + topic.toStringUtf8()
-                                                 + " subscriberId: " + subscriberId.toStringUtf8(), path, rc);
-                        callback.operationFailed(ctx, new PubSubException.ServiceDownException(ke));
-                    }
-                }
-            }, ctx);
-        }
-
-        @Override
-        public void updateSubscriptionData(final ByteString topic, final ByteString subscriberId, final SubscriptionData data,
-                                           final Version version, final Callback<Version> callback, final Object ctx) {
-            throw new UnsupportedOperationException("ZooKeeper based metadata manager doesn't support partial update!");
-        }
-
-        @Override
-        public void replaceSubscriptionData(final ByteString topic, final ByteString subscriberId, final SubscriptionData data,
-                                            final Version version, final Callback<Version> callback, final Object ctx) {
-            int znodeVersion = -1;
-            if (Version.NEW == version) {
-                callback.operationFailed(ctx, 
-                        new PubSubException.BadVersionException("Can not replace Version.New subscription data"));
-                return;
-            } else if (Version.ANY != version) {
-                if (!(version instanceof ZkVersion)) {
-                    callback.operationFailed(ctx, new PubSubException.UnexpectedConditionException(
-                                                  "Invalid version provided to replace subscription data for topic  " 
-                                                  + topic.toStringUtf8() + " subscribe id: " + subscriberId));
-                    return;
-                } else {
-                    znodeVersion = ((ZkVersion)version).getZnodeVersion();
-                }
-            }
-            zk.setData(topicSubscriberPath(topic, subscriberId), data.toByteArray(), 
-                    znodeVersion, new SafeAsyncZKCallback.StatCallback() {
-                @Override
-                public void safeProcessResult(int rc, String path, Object ctx, Stat stat) {
-                    if (rc == Code.NONODE.intValue()) {
-                        // no node
-                        callback.operationFailed(ctx, PubSubException.create(StatusCode.NO_SUBSCRIPTION_STATE,
-                                                      "No subscription state found for (topic:" + topic.toStringUtf8() + ", subscriber:"
-                                                      + subscriberId.toStringUtf8() + ")."));
-                        return;
-                    } else if (rc == Code.BADVERSION.intValue()) {
-                        // bad version
-                        callback.operationFailed(ctx, PubSubException.create(StatusCode.BAD_VERSION,
-                                                      "Bad version provided to replace subscription data of topic " 
-                                                      + topic.toStringUtf8() + " subscriberId " + subscriberId));
-                        return;
-                    } else if (rc != Code.OK.intValue()) {
-                        KeeperException e = ZkUtils.logErrorAndCreateZKException("Topic: " + topic.toStringUtf8()
-                                            + " subscriberId: " + subscriberId.toStringUtf8()
-                                            + " could not set subscription data: " + SubscriptionStateUtils.toString(data),
-                                            path, rc);
-                        callback.operationFailed(ctx, new PubSubException.ServiceDownException(e));
-                    } else {
-                        if (logger.isDebugEnabled()) {
-                            logger.debug("Successfully updated subscription for topic: " + topic.toStringUtf8()
-                                         + " subscriberId: " + subscriberId.toStringUtf8() + " data: "
-                                         + SubscriptionStateUtils.toString(data));
-                        }
-
-                        callback.operationFinished(ctx, new ZkVersion(stat.getVersion()));
-                    }
-                }
-            }, ctx);
-        }
-
-        @Override
-        public void deleteSubscriptionData(final ByteString topic, final ByteString subscriberId, Version version,
-                                           final Callback<Void> callback, Object ctx) {
-            
-            int znodeVersion = -1;
-            if (Version.NEW == version) {
-                callback.operationFailed(ctx, 
-                        new PubSubException.BadVersionException("Can not delete Version.New subscription data"));
-                return;
-            } else if (Version.ANY != version) {
-                if (!(version instanceof ZkVersion)) {
-                    callback.operationFailed(ctx, new PubSubException.UnexpectedConditionException(
-                                                  "Invalid version provided to delete subscription data for topic  " 
-                                                  + topic.toStringUtf8() + " subscribe id: " + subscriberId));
-                    return;
-                } else {
-                    znodeVersion = ((ZkVersion)version).getZnodeVersion();
-                }
-            }
-            
-            zk.delete(topicSubscriberPath(topic, subscriberId), znodeVersion, new SafeAsyncZKCallback.VoidCallback() {
-                @Override
-                public void safeProcessResult(int rc, String path, Object ctx) {
-                    if (rc == Code.NONODE.intValue()) {
-                        // no node
-                        callback.operationFailed(ctx, PubSubException.create(StatusCode.NO_SUBSCRIPTION_STATE,
-                                                      "No subscription state found for (topic:" + topic.toStringUtf8() + ", subscriber:"
-                                                      + subscriberId.toStringUtf8() + ")."));
-                        return;
-                    } else if (rc == Code.BADVERSION.intValue()) {
-                        // bad version
-                        callback.operationFailed(ctx, PubSubException.create(StatusCode.BAD_VERSION,
-                                                      "Bad version provided to delete subscription data of topic " 
-                                                      + topic.toStringUtf8() + " subscriberId " + subscriberId));
-                        return;
-                    } else if (rc == Code.OK.intValue()) {
-                        if (logger.isDebugEnabled()) {
-                            logger.debug("Successfully deleted subscription for topic: " + topic.toStringUtf8()
-                                         + " subscriberId: " + subscriberId.toStringUtf8());
-                        }
-
-                        callback.operationFinished(ctx, null);
-                        return;
-                    }
-
-                    KeeperException e = ZkUtils.logErrorAndCreateZKException("Topic: " + topic.toStringUtf8()
-                                        + " subscriberId: " + subscriberId.toStringUtf8() + " failed to delete subscription", path, rc);
-                    callback.operationFailed(ctx, new PubSubException.ServiceDownException(e));
-                }
-            }, ctx);
-        }
-
-        @Override
-        public void readSubscriptionData(final ByteString topic, final ByteString subscriberId,
-                                         final Callback<Versioned<SubscriptionData>> callback, final Object ctx) {
-            zk.getData(topicSubscriberPath(topic, subscriberId), false, new SafeAsyncZKCallback.DataCallback() {
-                @Override
-                public void safeProcessResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
-                    if (rc == Code.NONODE.intValue()) {
-                        callback.operationFinished(ctx, null);
-                        return;
-                    }
-                    if (rc != Code.OK.intValue()) {
-                        KeeperException e = ZkUtils.logErrorAndCreateZKException(
-                                                "Could not read subscription data for topic: " + topic.toStringUtf8()
-                                                + ", subscriberId: " + subscriberId.toStringUtf8(), path, rc);
-                        callback.operationFailed(ctx, new PubSubException.ServiceDownException(e));
-                        return;
-                    }
-                    
-                    Versioned<SubscriptionData> subData;
-                    try {
-                        subData = new Versioned<SubscriptionData>(
-                                        SubscriptionStateUtils.parseSubscriptionData(data), 
-                                        new ZkVersion(stat.getVersion()));
-                    } catch (InvalidProtocolBufferException ex) {
-                        String msg = "Failed to deserialize subscription data for topic: " + topic.toStringUtf8()
-                                     + " subscriberId: " + subscriberId.toStringUtf8();
-                        logger.error(msg, ex);
-                        callback.operationFailed(ctx, new PubSubException.UnexpectedConditionException(msg));
-                        return;
-                    }
-
-                    if (logger.isDebugEnabled()) {
-                        logger.debug("Found subscription while acquiring topic: " + topic.toStringUtf8()
-                                     + " subscriberId: " + subscriberId.toStringUtf8()
-                                     + " data: " + SubscriptionStateUtils.toString(subData.getValue()));
-                    }
-                    callback.operationFinished(ctx, subData);
-                }
-            }, ctx);
-        }
-
-        @Override
-        public void readSubscriptions(final ByteString topic,
-                                      final Callback<Map<ByteString, Versioned<SubscriptionData>>> cb, final Object ctx) {
-            String topicSubscribersPath = topicSubscribersPath(new StringBuilder(), topic).toString();
-            zk.getChildren(topicSubscribersPath, false, new SafeAsyncZKCallback.ChildrenCallback() {
-                @Override
-                public void safeProcessResult(int rc, String path, final Object ctx, final List<String> children) {
-
-                    if (rc != Code.OK.intValue() && rc != Code.NONODE.intValue()) {
-                        KeeperException e = ZkUtils.logErrorAndCreateZKException("Could not read subscribers for topic "
-                                            + topic.toStringUtf8(), path, rc);
-                        cb.operationFailed(ctx, new PubSubException.ServiceDownException(e));
-                        return;
-                    }
-
-                    final Map<ByteString, Versioned<SubscriptionData>> topicSubs = 
-                            new ConcurrentHashMap<ByteString, Versioned<SubscriptionData>>();
-
-                    if (rc == Code.NONODE.intValue() || children.size() == 0) {
-                        if (logger.isDebugEnabled()) {
-                            logger.debug("No subscriptions found while acquiring topic: " + topic.toStringUtf8());
-                        }
-                        cb.operationFinished(ctx, topicSubs);
-                        return;
-                    }
-
-                    final AtomicBoolean failed = new AtomicBoolean();
-                    final AtomicInteger count = new AtomicInteger();
-
-                    for (final String child : children) {
-
-                        final ByteString subscriberId = ByteString.copyFromUtf8(child);
-                        final String childPath = path + "/" + child;
-
-                        zk.getData(childPath, false, new SafeAsyncZKCallback.DataCallback() {
-                            @Override
-                            public void safeProcessResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
-
-                                if (rc != Code.OK.intValue()) {
-                                    KeeperException e = ZkUtils.logErrorAndCreateZKException(
-                                                            "Could not read subscription data for topic: " + topic.toStringUtf8()
-                                                            + ", subscriberId: " + subscriberId.toStringUtf8(), path, rc);
-                                    reportFailure(new PubSubException.ServiceDownException(e));
-                                    return;
-                                }
-
-                                if (failed.get()) {
-                                    return;
-                                }
-
-                                Versioned<SubscriptionData> subData;
-                                try {
-                                    subData = new Versioned<SubscriptionData>(
-                                            SubscriptionStateUtils.parseSubscriptionData(data), 
-                                            new ZkVersion(stat.getVersion()));
-                                } catch (InvalidProtocolBufferException ex) {
-                                    String msg = "Failed to deserialize subscription data for topic: " + topic.toStringUtf8()
-                                                 + " subscriberId: " + subscriberId.toStringUtf8();
-                                    logger.error(msg, ex);
-                                    reportFailure(new PubSubException.UnexpectedConditionException(msg));
-                                    return;
-                                }
-
-                                if (logger.isDebugEnabled()) {
-                                    logger.debug("Found subscription while acquiring topic: " + topic.toStringUtf8()
-                                                 + " subscriberId: " + child + "state: "
-                                                 + SubscriptionStateUtils.toString(subData.getValue()));
-                                }
-
-                                topicSubs.put(subscriberId, subData);
-                                if (count.incrementAndGet() == children.size()) {
-                                    assert topicSubs.size() == count.get();
-                                    cb.operationFinished(ctx, topicSubs);
-                                }
-                            }
-
-                            private void reportFailure(PubSubException e) {
-                                if (failed.compareAndSet(false, true))
-                                    cb.operationFailed(ctx, e);
-                            }
-                        }, ctx);
-                    }
-                }
-            }, ctx);
-        }
-    }
-
-    /**
-     * ZooKeeper base topic ownership manager.
-     */
-    static class ZkTopicOwnershipManagerImpl implements TopicOwnershipManager {
-
-        ZooKeeper zk;
-        ServerConfiguration cfg;
-
-        ZkTopicOwnershipManagerImpl(ServerConfiguration conf, ZooKeeper zk) {
-            this.cfg = conf;
-            this.zk = zk;
-        }
-
-        @Override
-        public void close() throws IOException {
-            // do nothing in zookeeper based impl
-        }
-
-        /**
-         * Return znode path to store topic owner.
-         *
-         * @param topic
-         *          Topic Name
-         * @return znode path to store topic owner.
-         */
-        String hubPath(ByteString topic) {
-            return cfg.getZkTopicPath(new StringBuilder(), topic).append("/hub").toString();
-        }
-
-        @Override
-        public void readOwnerInfo(final ByteString topic, final Callback<Versioned<HubInfo>> callback, Object ctx) {
-            String ownerPath = hubPath(topic);
-            zk.getData(ownerPath, false, new SafeAsyncZKCallback.DataCallback() {
-                @Override
-                public void safeProcessResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
-                    if (Code.NONODE.intValue() == rc) {
-                        callback.operationFinished(ctx, null);
-                        return;
-                    }
-
-                    if (Code.OK.intValue() != rc) {
-                        KeeperException e = ZkUtils.logErrorAndCreateZKException("Could not read ownership for topic: "
-                                            + topic.toStringUtf8(), path, rc);
-                        callback.operationFailed(ctx, new PubSubException.ServiceDownException(e));
-                        return;
-                    }
-                    HubInfo owner = null;
-                    try {
-                        owner = HubInfo.parse(new String(data, UTF_8));
-                    } catch (HubInfo.InvalidHubInfoException ihie) {
-                        logger.warn("Failed to parse hub info for topic " + topic.toStringUtf8() + " : ", ihie);
-                    }
-                    int version = stat.getVersion();
-                    callback.operationFinished(ctx, new Versioned<HubInfo>(owner, new ZkVersion(version)));
-                    return;
-                }
-            }, ctx);
-        }
-
-        @Override
-        public void writeOwnerInfo(final ByteString topic, final HubInfo owner, final Version version,
-                                   final Callback<Version> callback, Object ctx) {
-            if (Version.NEW == version) {
-                createOwnerInfo(topic, owner, callback, ctx);
-                return;
-            }
-
-            if (!(version instanceof ZkVersion)) {
-                callback.operationFailed(ctx, new PubSubException.UnexpectedConditionException(
-                                              "Invalid version provided to update owner info for topic " + topic.toStringUtf8()));
-                return;
-            }
-
-            int znodeVersion = ((ZkVersion)version).getZnodeVersion();
-            zk.setData(hubPath(topic), owner.toString().getBytes(UTF_8), znodeVersion,
-                       new SafeAsyncZKCallback.StatCallback() {
-                @Override
-                public void safeProcessResult(int rc, String path, Object ctx, Stat stat) {
-                    if (rc == Code.NONODE.intValue()) {
-                        // no node
-                        callback.operationFailed(ctx, PubSubException.create(StatusCode.NO_TOPIC_OWNER_INFO,
-                                                      "No owner info found for topic " + topic.toStringUtf8()));
-                        return;
-                    } else if (rc == Code.BADVERSION.intValue()) {
-                        // bad version
-                        callback.operationFailed(ctx, PubSubException.create(StatusCode.BAD_VERSION,
-                                                      "Bad version provided to update owner info of topic " + topic.toStringUtf8()));
-                        return;
-                    } else if (Code.OK.intValue() == rc) {
-                        callback.operationFinished(ctx, new ZkVersion(stat.getVersion()));
-                        return;
-                    } else {
-                        KeeperException e = ZkUtils.logErrorAndCreateZKException(
-                            "Failed to update ownership of topic " + topic.toStringUtf8() +
-                            " to " + owner, path, rc);
-                        callback.operationFailed(ctx, new PubSubException.ServiceDownException(e));
-                        return;
-                    }
-                }
-            }, ctx);
-        }
-
-        protected void createOwnerInfo(final ByteString topic, final HubInfo owner,
-                                       final Callback<Version> callback, Object ctx) {
-            String ownerPath = hubPath(topic);
-            ZkUtils.createFullPathOptimistic(zk, ownerPath, owner.toString().getBytes(UTF_8), Ids.OPEN_ACL_UNSAFE,
-                                             CreateMode.PERSISTENT, new SafeAsyncZKCallback.StringCallback() {
-                @Override
-                public void safeProcessResult(int rc, String path, Object ctx, String name) {
-                    if (Code.OK.intValue() == rc) {
-                        // assume the initial version is 0
-                        callback.operationFinished(ctx, new ZkVersion(0));
-                        return;
-                    } else if (Code.NODEEXISTS.intValue() == rc) {
-                        // node existed
-                        callback.operationFailed(ctx, PubSubException.create(StatusCode.TOPIC_OWNER_INFO_EXISTS,
-                                                      "Owner info of topic " + topic.toStringUtf8() + " existed."));
-                        return;
-                    } else {
-                        KeeperException e = ZkUtils.logErrorAndCreateZKException(
-                                                "Failed to create znode for ownership of topic: "
-                                                + topic.toStringUtf8(), path, rc);
-                        callback.operationFailed(ctx, new PubSubException.ServiceDownException(e));
-                        return;
-                    }
-                }
-            }, ctx);
-        }
-
-        @Override
-        public void deleteOwnerInfo(final ByteString topic, final Version version,
-                                    final Callback<Void> callback, Object ctx) {
-            int znodeVersion = -1;
-            if (Version.ANY != version) {
-                if (!(version instanceof ZkVersion)) {
-                    callback.operationFailed(ctx, new PubSubException.UnexpectedConditionException(
-                                                  "Invalid version provided to delete owner info for topic " + topic.toStringUtf8()));
-                    return;
-                } else {
-                    znodeVersion = ((ZkVersion)version).getZnodeVersion();
-                }
-            }
-
-            zk.delete(hubPath(topic), znodeVersion, new SafeAsyncZKCallback.VoidCallback() {
-                @Override
-                public void safeProcessResult(int rc, String path, Object ctx) {
-                    if (Code.OK.intValue() == rc) {
-                        if (logger.isDebugEnabled()) {
-                            logger.debug("Successfully deleted owner info for topic " + topic.toStringUtf8() + ".");
-                        }
-                        callback.operationFinished(ctx, null);
-                        return;
-                    } else if (Code.NONODE.intValue() == rc) {
-                        // no node
-                        callback.operationFailed(ctx, PubSubException.create(StatusCode.NO_TOPIC_OWNER_INFO,
-                                                      "No owner info found for topic " + topic.toStringUtf8()));
-                        return;
-                    } else if (Code.BADVERSION.intValue() == rc) {
-                        // bad version
-                        callback.operationFailed(ctx, PubSubException.create(StatusCode.BAD_VERSION,
-                                                      "Bad version provided to delete owner info of topic " + topic.toStringUtf8()));
-                        return;
-                    } else {
-                        KeeperException e = ZkUtils.logErrorAndCreateZKException(
-                                                "Failed to delete owner info for topic "
-                                                + topic.toStringUtf8(), path, rc);
-                        callback.operationFailed(ctx, new PubSubException.ServiceDownException(e));
-                    }
-                }
-            }, ctx);
-        }
-    }
-
-    @Override
-    public void format(ServerConfiguration cfg, ZooKeeper zk) throws IOException {
-        try {
-            ZKUtil.deleteRecursive(zk, cfg.getZkTopicsPrefix(new StringBuilder()).toString());
-        } catch (KeeperException.NoNodeException e) {
-            logger.debug("Hedwig root node doesn't exist in zookeeper to delete");
-        } catch (KeeperException ke) {
-            throw new IOException(ke);
-        } catch (InterruptedException ie) {
-            throw new IOException(ie);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/netty/PubSubServer.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/netty/PubSubServer.java b/hedwig-server/src/main/java/org/apache/hedwig/server/netty/PubSubServer.java
deleted file mode 100644
index 23e04e3..0000000
--- a/hedwig-server/src/main/java/org/apache/hedwig/server/netty/PubSubServer.java
+++ /dev/null
@@ -1,535 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.netty;
-
-import java.io.File;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.MalformedURLException;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.TimeUnit;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-
-import org.apache.bookkeeper.conf.ClientConfiguration;
-import org.apache.bookkeeper.client.BookKeeper;
-import org.apache.bookkeeper.client.BKException;
-import org.apache.commons.configuration.ConfigurationException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooKeeper;
-import org.jboss.netty.bootstrap.ServerBootstrap;
-import org.jboss.netty.channel.group.ChannelGroup;
-import org.jboss.netty.channel.group.DefaultChannelGroup;
-import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
-import org.jboss.netty.channel.socket.ServerSocketChannelFactory;
-import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
-import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
-import org.jboss.netty.logging.InternalLoggerFactory;
-import org.jboss.netty.logging.Log4JLoggerFactory;
-
-import org.apache.hedwig.exceptions.PubSubException;
-import org.apache.hedwig.protocol.PubSubProtocol.OperationType;
-import org.apache.hedwig.server.common.ServerConfiguration;
-import org.apache.hedwig.server.common.TerminateJVMExceptionHandler;
-import org.apache.hedwig.server.delivery.DeliveryManager;
-import org.apache.hedwig.server.delivery.FIFODeliveryManager;
-import org.apache.hedwig.server.handlers.CloseSubscriptionHandler;
-import org.apache.hedwig.server.handlers.ConsumeHandler;
-import org.apache.hedwig.server.handlers.Handler;
-import org.apache.hedwig.server.handlers.NettyHandlerBean;
-import org.apache.hedwig.server.handlers.PublishHandler;
-import org.apache.hedwig.server.handlers.SubscribeHandler;
-import org.apache.hedwig.server.handlers.SubscriptionChannelManager;
-import org.apache.hedwig.server.handlers.SubscriptionChannelManager.SubChannelDisconnectedListener;
-import org.apache.hedwig.server.handlers.UnsubscribeHandler;
-import org.apache.hedwig.server.jmx.HedwigMBeanRegistry;
-import org.apache.hedwig.server.meta.MetadataManagerFactory;
-import org.apache.hedwig.server.meta.ZkMetadataManagerFactory;
-import org.apache.hedwig.server.persistence.BookkeeperPersistenceManager;
-import org.apache.hedwig.server.persistence.LocalDBPersistenceManager;
-import org.apache.hedwig.server.persistence.PersistenceManager;
-import org.apache.hedwig.server.persistence.PersistenceManagerWithRangeScan;
-import org.apache.hedwig.server.persistence.ReadAheadCache;
-import org.apache.hedwig.server.regions.HedwigHubClientFactory;
-import org.apache.hedwig.server.regions.RegionManager;
-import org.apache.hedwig.server.ssl.SslServerContextFactory;
-import org.apache.hedwig.server.subscriptions.InMemorySubscriptionManager;
-import org.apache.hedwig.server.subscriptions.SubscriptionManager;
-import org.apache.hedwig.server.subscriptions.MMSubscriptionManager;
-import org.apache.hedwig.server.topics.MMTopicManager;
-import org.apache.hedwig.server.topics.TopicManager;
-import org.apache.hedwig.server.topics.TrivialOwnAllTopicManager;
-import org.apache.hedwig.server.topics.ZkTopicManager;
-import org.apache.hedwig.util.ConcurrencyUtils;
-import org.apache.hedwig.util.Either;
-import org.apache.hedwig.zookeeper.SafeAsyncCallback;
-
-public class PubSubServer {
-
-    private static final Logger logger = LoggerFactory.getLogger(PubSubServer.class);
-
-    private static final String JMXNAME_PREFIX = "PubSubServer_";
-
-    // Netty related variables
-    ServerSocketChannelFactory serverChannelFactory;
-    ClientSocketChannelFactory clientChannelFactory;
-    ServerConfiguration conf;
-    org.apache.hedwig.client.conf.ClientConfiguration clientConfiguration;
-    ChannelGroup allChannels;
-
-    // Manager components that make up the PubSubServer
-    PersistenceManager pm;
-    DeliveryManager dm;
-    TopicManager tm;
-    SubscriptionManager sm;
-    RegionManager rm;
-
-    // Metadata Manager Factory
-    MetadataManagerFactory mm;
-
-    ZooKeeper zk; // null if we are in standalone mode
-    BookKeeper bk; // null if we are in standalone mode
-
-    // we use this to prevent long stack chains from building up in callbacks
-    ScheduledExecutorService scheduler;
-
-    // JMX Beans
-    NettyHandlerBean jmxNettyBean;
-    PubSubServerBean jmxServerBean;
-    final ThreadGroup tg;
-
-    protected PersistenceManager instantiatePersistenceManager(TopicManager topicMgr) throws IOException,
-        InterruptedException {
-
-        PersistenceManagerWithRangeScan underlyingPM;
-
-        if (conf.isStandalone()) {
-
-            underlyingPM = LocalDBPersistenceManager.instance();
-
-        } else {
-            try {
-                ClientConfiguration bkConf = new ClientConfiguration();
-                bkConf.addConfiguration(conf.getConf());
-                bk = new BookKeeper(bkConf, zk, clientChannelFactory);
-            } catch (KeeperException e) {
-                logger.error("Could not instantiate bookkeeper client", e);
-                throw new IOException(e);
-            }
-            underlyingPM = new BookkeeperPersistenceManager(bk, mm, topicMgr, conf, scheduler);
-
-        }
-
-        PersistenceManager pm = underlyingPM;
-
-        if (conf.getReadAheadEnabled()) {
-            pm = new ReadAheadCache(underlyingPM, conf).start();
-        }
-
-        return pm;
-    }
-
-    protected SubscriptionManager instantiateSubscriptionManager(TopicManager tm, PersistenceManager pm,
-                                                                 DeliveryManager dm) {
-        if (conf.isStandalone()) {
-            return new InMemorySubscriptionManager(conf, tm, pm, dm, scheduler);
-        } else {
-            return new MMSubscriptionManager(conf, mm, tm, pm, dm, scheduler);
-        }
-
-    }
-
-    protected RegionManager instantiateRegionManager(PersistenceManager pm, ScheduledExecutorService scheduler) {
-        return new RegionManager(pm, conf, zk, scheduler, new HedwigHubClientFactory(conf, clientConfiguration,
-                clientChannelFactory));
-    }
-
-    protected void instantiateZookeeperClient() throws Exception {
-        if (!conf.isStandalone()) {
-            final CountDownLatch signalZkReady = new CountDownLatch(1);
-
-            zk = new ZooKeeper(conf.getZkHost(), conf.getZkTimeout(), new Watcher() {
-                @Override
-                public void process(WatchedEvent event) {
-                    if(Event.KeeperState.SyncConnected.equals(event.getState())) {
-                        signalZkReady.countDown();
-                    }
-                }
-            });
-            // wait until connection is effective
-            if (!signalZkReady.await(conf.getZkTimeout()*2, TimeUnit.MILLISECONDS)) {
-                logger.error("Could not establish connection with ZooKeeper after zk_timeout*2 = " +
-                             conf.getZkTimeout()*2 + " ms. (Default value for zk_timeout is 2000).");
-                throw new Exception("Could not establish connection with ZooKeeper after zk_timeout*2 = " +
-                                    conf.getZkTimeout()*2 + " ms. (Default value for zk_timeout is 2000).");
-            }
-        }
-    }
-
-    protected void instantiateMetadataManagerFactory() throws Exception {
-        if (conf.isStandalone()) {
-            return;
-        }
-        mm = MetadataManagerFactory.newMetadataManagerFactory(conf, zk);
-    }
-
-    protected TopicManager instantiateTopicManager() throws IOException {
-        TopicManager tm;
-
-        if (conf.isStandalone()) {
-            tm = new TrivialOwnAllTopicManager(conf, scheduler);
-        } else {
-            try {
-                if (conf.isMetadataManagerBasedTopicManagerEnabled()) {
-                    tm = new MMTopicManager(conf, zk, mm, scheduler);
-                } else {
-                    if (!(mm instanceof ZkMetadataManagerFactory)) {
-                        throw new IOException("Uses " + mm.getClass().getName() + " to store hedwig metadata, "
-                                            + "but uses zookeeper ephemeral znodes to store topic ownership. "
-                                            + "Check your configuration as this could lead to scalability issues.");
-                    }
-                    tm = new ZkTopicManager(zk, conf, scheduler);
-                }
-            } catch (PubSubException e) {
-                logger.error("Could not instantiate TopicOwnershipManager based topic manager", e);
-                throw new IOException(e);
-            }
-        }
-        return tm;
-    }
-
-   protected Map<OperationType, Handler> initializeNettyHandlers(
-           TopicManager tm, DeliveryManager dm,
-           PersistenceManager pm, SubscriptionManager sm,
-           SubscriptionChannelManager subChannelMgr) {
-        Map<OperationType, Handler> handlers = new HashMap<OperationType, Handler>();
-        handlers.put(OperationType.PUBLISH, new PublishHandler(tm, pm, conf));
-        handlers.put(OperationType.SUBSCRIBE,
-                     new SubscribeHandler(conf, tm, dm, pm, sm, subChannelMgr));
-        handlers.put(OperationType.UNSUBSCRIBE,
-                     new UnsubscribeHandler(conf, tm, sm, dm, subChannelMgr));
-        handlers.put(OperationType.CONSUME, new ConsumeHandler(tm, sm, conf));
-        handlers.put(OperationType.CLOSESUBSCRIPTION,
-                     new CloseSubscriptionHandler(conf, tm, sm, dm, subChannelMgr));
-        handlers = Collections.unmodifiableMap(handlers);
-        return handlers;
-    }
-
-    protected void initializeNetty(SslServerContextFactory sslFactory,
-                                   Map<OperationType, Handler> handlers,
-                                   SubscriptionChannelManager subChannelMgr) {
-        boolean isSSLEnabled = (sslFactory != null) ? true : false;
-        InternalLoggerFactory.setDefaultFactory(new Log4JLoggerFactory());
-        ServerBootstrap bootstrap = new ServerBootstrap(serverChannelFactory);
-        UmbrellaHandler umbrellaHandler =
-            new UmbrellaHandler(allChannels, handlers, subChannelMgr, isSSLEnabled);
-        PubSubServerPipelineFactory pipeline =
-            new PubSubServerPipelineFactory(umbrellaHandler, sslFactory,
-                                            conf.getMaximumMessageSize());
-
-        bootstrap.setPipelineFactory(pipeline);
-        bootstrap.setOption("child.tcpNoDelay", true);
-        bootstrap.setOption("child.keepAlive", true);
-        bootstrap.setOption("reuseAddress", true);
-
-        // Bind and start to accept incoming connections.
-        allChannels.add(bootstrap.bind(isSSLEnabled ? new InetSocketAddress(conf.getSSLServerPort())
-                                       : new InetSocketAddress(conf.getServerPort())));
-        logger.info("Going into receive loop");
-    }
-
-    public void shutdown() {
-        // TODO: tell bk to close logs
-
-        // Stop topic manager first since it is core of Hub server
-        tm.stop();
-
-        // Stop the RegionManager.
-        rm.stop();
-
-        // Stop the DeliveryManager and ReadAheadCache threads (if
-        // applicable).
-        dm.stop();
-        pm.stop();
-
-        // Stop the SubscriptionManager if needed.
-        sm.stop();
-
-        // Shutdown metadata manager if needed
-        if (null != mm) {
-            try {
-                mm.shutdown();
-            } catch (IOException ie) {
-                logger.error("Error while shutdown metadata manager factory!", ie);
-            }
-        }
-
-        // Shutdown the ZooKeeper and BookKeeper clients only if we are
-        // not in stand-alone mode.
-        try {
-            if (bk != null)
-                bk.close();
-            if (zk != null)
-                zk.close();
-        } catch (InterruptedException e) {
-            logger.error("Error while closing ZooKeeper client : ", e);
-        } catch (BKException bke) {
-            logger.error("Error while closing BookKeeper client : ", bke);
-        }
-
-        // Close and release the Netty channels and resources
-        allChannels.close().awaitUninterruptibly();
-        serverChannelFactory.releaseExternalResources();
-        clientChannelFactory.releaseExternalResources();
-        scheduler.shutdown();
-
-        // unregister jmx
-        unregisterJMX();
-    }
-
-    protected void registerJMX(SubscriptionChannelManager subChannelMgr) {
-        try {
-            String jmxName = JMXNAME_PREFIX + conf.getServerPort() + "_"
-                                            + conf.getSSLServerPort();
-            jmxServerBean = new PubSubServerBean(jmxName);
-            HedwigMBeanRegistry.getInstance().register(jmxServerBean, null);
-            try {
-                jmxNettyBean = new NettyHandlerBean(subChannelMgr);
-                HedwigMBeanRegistry.getInstance().register(jmxNettyBean, jmxServerBean);
-            } catch (Exception e) {
-                logger.warn("Failed to register with JMX", e);
-                jmxNettyBean = null;
-            }
-        } catch (Exception e) {
-            logger.warn("Failed to register with JMX", e);
-            jmxServerBean = null;
-        }
-        if (pm instanceof ReadAheadCache) {
-            ((ReadAheadCache)pm).registerJMX(jmxServerBean);
-        }
-    }
-
-    protected void unregisterJMX() {
-        if (pm != null && pm instanceof ReadAheadCache) {
-            ((ReadAheadCache)pm).unregisterJMX();
-        }
-        try {
-            if (jmxNettyBean != null) {
-                HedwigMBeanRegistry.getInstance().unregister(jmxNettyBean);
-            }
-        } catch (Exception e) {
-            logger.warn("Failed to unregister with JMX", e);
-        }
-        try {
-            if (jmxServerBean != null) {
-                HedwigMBeanRegistry.getInstance().unregister(jmxServerBean);
-            }
-        } catch (Exception e) {
-            logger.warn("Failed to unregister with JMX", e);
-        }
-        jmxNettyBean = null;
-        jmxServerBean = null;
-    }
-
-    /**
-     * Starts the hedwig server on the given port
-     *
-     * @param port
-     * @throws ConfigurationException
-     *             if there is something wrong with the given configuration
-     * @throws IOException
-     * @throws InterruptedException
-     * @throws ConfigurationException
-     */
-    public PubSubServer(final ServerConfiguration serverConfiguration,
-                        final org.apache.hedwig.client.conf.ClientConfiguration clientConfiguration,
-                        final Thread.UncaughtExceptionHandler exceptionHandler)
-            throws ConfigurationException {
-
-        // First validate the serverConfiguration
-        this.conf = serverConfiguration;
-        serverConfiguration.validate();
-
-        // Validate the client configuration
-        this.clientConfiguration = clientConfiguration;
-        clientConfiguration.validate();
-
-        // We need a custom thread group, so that we can override the uncaught
-        // exception method
-        tg = new ThreadGroup("hedwig") {
-            @Override
-            public void uncaughtException(Thread t, Throwable e) {
-                exceptionHandler.uncaughtException(t, e);
-            }
-        };
-        // ZooKeeper threads register their own handler. But if some work that
-        // we do in ZK threads throws an exception, we want our handler to be
-        // called, not theirs.
-        SafeAsyncCallback.setUncaughtExceptionHandler(exceptionHandler);
-    }
-
-    public void start() throws Exception {
-        final SynchronousQueue<Either<Object, Exception>> queue = new SynchronousQueue<Either<Object, Exception>>();
-
-        new Thread(tg, new Runnable() {
-            @Override
-            public void run() {
-                try {
-                    // Since zk is needed by almost everyone,try to see if we
-                    // need that first
-                    ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
-                    scheduler = Executors.newSingleThreadScheduledExecutor(tfb
-                            .setNameFormat("PubSubServerScheduler-%d").build());
-                    serverChannelFactory = new NioServerSocketChannelFactory(
-                            Executors.newCachedThreadPool(tfb.setNameFormat(
-                                    "PubSub-Server-NIOBoss-%d").build()),
-                            Executors.newCachedThreadPool(tfb.setNameFormat(
-                                    "PubSub-Server-NIOWorker-%d").build()));
-                    clientChannelFactory = new NioClientSocketChannelFactory(
-                            Executors.newCachedThreadPool(tfb.setNameFormat(
-                                    "PubSub-Client-NIOBoss-%d").build()),
-                            Executors.newCachedThreadPool(tfb.setNameFormat(
-                                    "PubSub-Client-NIOWorker-%d").build()));
-
-                    instantiateZookeeperClient();
-                    instantiateMetadataManagerFactory();
-                    tm = instantiateTopicManager();
-                    pm = instantiatePersistenceManager(tm);
-                    dm = new FIFODeliveryManager(tm, pm, conf);
-                    dm.start();
-
-                    sm = instantiateSubscriptionManager(tm, pm, dm);
-                    rm = instantiateRegionManager(pm, scheduler);
-                    sm.addListener(rm);
-
-                    allChannels = new DefaultChannelGroup("hedwig");
-                    // Initialize the Netty Handlers (used by the
-                    // UmbrellaHandler) once so they can be shared by
-                    // both the SSL and non-SSL channels.
-                    SubscriptionChannelManager subChannelMgr = new SubscriptionChannelManager();
-                    subChannelMgr.addSubChannelDisconnectedListener((SubChannelDisconnectedListener) dm);
-                    Map<OperationType, Handler> handlers =
-                        initializeNettyHandlers(tm, dm, pm, sm, subChannelMgr);
-                    // Initialize Netty for the regular non-SSL channels
-                    initializeNetty(null, handlers, subChannelMgr);
-                    if (conf.isSSLEnabled()) {
-                        initializeNetty(new SslServerContextFactory(conf),
-                                        handlers, subChannelMgr);
-                    }
-                    // register jmx
-                    registerJMX(subChannelMgr);
-                } catch (Exception e) {
-                    ConcurrencyUtils.put(queue, Either.right(e));
-                    return;
-                }
-
-                ConcurrencyUtils.put(queue, Either.of(new Object(), (Exception) null));
-            }
-
-        }).start();
-
-        Either<Object, Exception> either = ConcurrencyUtils.take(queue);
-        if (either.left() == null) {
-            throw either.right();
-        }
-    }
-
-    public PubSubServer(ServerConfiguration serverConfiguration,
-                        org.apache.hedwig.client.conf.ClientConfiguration clientConfiguration) throws Exception {
-        this(serverConfiguration, clientConfiguration, new TerminateJVMExceptionHandler());
-    }
-
-    public PubSubServer(ServerConfiguration serverConfiguration) throws Exception {
-        this(serverConfiguration, new org.apache.hedwig.client.conf.ClientConfiguration());
-    }
-
-    @VisibleForTesting
-    public DeliveryManager getDeliveryManager() {
-        return dm;
-    }
-
-    /**
-     *
-     * @param msg
-     * @param rc
-     *            : code to exit with
-     */
-    public static void errorMsgAndExit(String msg, Throwable t, int rc) {
-        logger.error(msg, t);
-        System.err.println(msg);
-        System.exit(rc);
-    }
-
-    public final static int RC_INVALID_CONF_FILE = 1;
-    public final static int RC_MISCONFIGURED = 2;
-    public final static int RC_OTHER = 3;
-
-    /**
-     * @param args
-     */
-    public static void main(String[] args) {
-
-        logger.info("Attempting to start Hedwig");
-        ServerConfiguration serverConfiguration = new ServerConfiguration();
-        // The client configuration for the hedwig client in the region manager.
-        org.apache.hedwig.client.conf.ClientConfiguration regionMgrClientConfiguration
-                = new org.apache.hedwig.client.conf.ClientConfiguration();
-        if (args.length > 0) {
-            String confFile = args[0];
-            try {
-                serverConfiguration.loadConf(new File(confFile).toURI().toURL());
-            } catch (MalformedURLException e) {
-                String msg = "Could not open server configuration file: " + confFile;
-                errorMsgAndExit(msg, e, RC_INVALID_CONF_FILE);
-            } catch (ConfigurationException e) {
-                String msg = "Malformed server configuration file: " + confFile;
-                errorMsgAndExit(msg, e, RC_MISCONFIGURED);
-            }
-            logger.info("Using configuration file " + confFile);
-        }
-        if (args.length > 1) {
-            // args[1] is the client configuration file.
-            String confFile = args[1];
-            try {
-                regionMgrClientConfiguration.loadConf(new File(confFile).toURI().toURL());
-            } catch (MalformedURLException e) {
-                String msg = "Could not open client configuration file: " + confFile;
-                errorMsgAndExit(msg, e, RC_INVALID_CONF_FILE);
-            } catch (ConfigurationException e) {
-                String msg = "Malformed client configuration file: " + confFile;
-                errorMsgAndExit(msg, e, RC_MISCONFIGURED);
-            }
-        }
-        try {
-            new PubSubServer(serverConfiguration, regionMgrClientConfiguration).start();
-        } catch (Throwable t) {
-            errorMsgAndExit("Error during startup", t, RC_OTHER);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/netty/PubSubServerBean.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/netty/PubSubServerBean.java b/hedwig-server/src/main/java/org/apache/hedwig/server/netty/PubSubServerBean.java
deleted file mode 100644
index c1acbbc..0000000
--- a/hedwig-server/src/main/java/org/apache/hedwig/server/netty/PubSubServerBean.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hedwig.server.netty;
-
-import org.apache.hedwig.server.jmx.HedwigMBeanInfo;
-import org.apache.hedwig.server.netty.ServerStats.OpStatData;
-
-import org.apache.hedwig.protocol.PubSubProtocol.OperationType;
-
-/**
- * PubSub Server Bean
- */
-public class PubSubServerBean implements PubSubServerMXBean, HedwigMBeanInfo {
-
-    private final String name;
-
-    public PubSubServerBean(String jmxName) {
-        this.name = jmxName;
-    }
-
-    @Override
-    public String getName() {
-        return name;
-    }
-
-    @Override
-    public boolean isHidden() {
-        return false;
-    }
-
-    @Override
-    public OpStatData getPubStats() {
-        return ServerStats.getInstance().getOpStats(OperationType.PUBLISH).toOpStatData();
-    }
-
-    @Override
-    public OpStatData getSubStats() {
-        return ServerStats.getInstance().getOpStats(OperationType.SUBSCRIBE).toOpStatData();
-    }
-
-    @Override
-    public OpStatData getUnsubStats() {
-        return ServerStats.getInstance().getOpStats(OperationType.UNSUBSCRIBE).toOpStatData();
-    }
-
-    @Override
-    public OpStatData getConsumeStats() {
-        return ServerStats.getInstance().getOpStats(OperationType.CONSUME).toOpStatData();
-    }
-
-    @Override
-    public long getNumRequestsReceived() {
-        return ServerStats.getInstance().getNumRequestsReceived();
-    }
-
-    @Override
-    public long getNumRequestsRedirect() {
-        return ServerStats.getInstance().getNumRequestsRedirect();
-    }
-
-    @Override
-    public long getNumMessagesDelivered() {
-        return ServerStats.getInstance().getNumMessagesDelivered();
-    }
-
-
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/netty/PubSubServerMXBean.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/netty/PubSubServerMXBean.java b/hedwig-server/src/main/java/org/apache/hedwig/server/netty/PubSubServerMXBean.java
deleted file mode 100644
index 15e860f..0000000
--- a/hedwig-server/src/main/java/org/apache/hedwig/server/netty/PubSubServerMXBean.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hedwig.server.netty;
-
-import org.apache.hedwig.server.netty.ServerStats.OpStatData;
-
-/**
- * PubSub Server MBean
- */
-public interface PubSubServerMXBean {
-
-    /**
-     * @return publish stats
-     */
-    public OpStatData getPubStats();
-
-    /**
-     * @return subscription stats
-     */
-    public OpStatData getSubStats();
-
-    /**
-     * @return unsub stats
-     */
-    public OpStatData getUnsubStats();
-
-    /**
-     * @return consume stats
-     */
-    public OpStatData getConsumeStats();
-
-    /**
-     * @return number of requests received
-     */
-    public long getNumRequestsReceived();
-
-    /**
-     * @return number of requests redirect
-     */
-    public long getNumRequestsRedirect();
-
-    /**
-     * @return number of messages delivered
-     */
-    public long getNumMessagesDelivered();
-
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/netty/PubSubServerPipelineFactory.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/netty/PubSubServerPipelineFactory.java b/hedwig-server/src/main/java/org/apache/hedwig/server/netty/PubSubServerPipelineFactory.java
deleted file mode 100644
index c96f438..0000000
--- a/hedwig-server/src/main/java/org/apache/hedwig/server/netty/PubSubServerPipelineFactory.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.netty;
-
-import org.jboss.netty.channel.ChannelPipeline;
-import org.jboss.netty.channel.ChannelPipelineFactory;
-import org.jboss.netty.channel.Channels;
-import org.jboss.netty.handler.codec.frame.LengthFieldBasedFrameDecoder;
-import org.jboss.netty.handler.codec.frame.LengthFieldPrepender;
-import org.jboss.netty.handler.codec.protobuf.ProtobufDecoder;
-import org.jboss.netty.handler.codec.protobuf.ProtobufEncoder;
-import org.jboss.netty.handler.ssl.SslHandler;
-
-import org.apache.hedwig.protocol.PubSubProtocol;
-import org.apache.hedwig.server.ssl.SslServerContextFactory;
-
-public class PubSubServerPipelineFactory implements ChannelPipelineFactory {
-
-    // TODO: make these conf settings
-    final static int MAX_WORKER_THREADS = 32;
-    final static int MAX_CHANNEL_MEMORY_SIZE = 10 * 1024 * 1024;
-    final static int MAX_TOTAL_MEMORY_SIZE = 100 * 1024 * 1024;
-
-    private UmbrellaHandler uh;
-    private SslServerContextFactory sslFactory;
-    private int maxMessageSize;
-
-    /**
-     *
-     * @param uh
-     * @param sslFactory
-     *            may be null if ssl is disabled
-     * @param cfg
-     */
-    public PubSubServerPipelineFactory(UmbrellaHandler uh, SslServerContextFactory sslFactory, int maxMessageSize) {
-        this.uh = uh;
-        this.sslFactory = sslFactory;
-        this.maxMessageSize = maxMessageSize;
-    }
-
-    public ChannelPipeline getPipeline() throws Exception {
-        ChannelPipeline pipeline = Channels.pipeline();
-        if (sslFactory != null) {
-            pipeline.addLast("ssl", new SslHandler(sslFactory.getEngine()));
-        }
-        pipeline.addLast("lengthbaseddecoder",
-                         new LengthFieldBasedFrameDecoder(maxMessageSize, 0, 4, 0, 4));
-        pipeline.addLast("lengthprepender", new LengthFieldPrepender(4));
-
-        pipeline.addLast("protobufdecoder", new ProtobufDecoder(PubSubProtocol.PubSubRequest.getDefaultInstance()));
-        pipeline.addLast("protobufencoder", new ProtobufEncoder());
-
-        // pipeline.addLast("executor", new ExecutionHandler(
-        // new OrderedMemoryAwareThreadPoolExecutor(MAX_WORKER_THREADS,
-        // MAX_CHANNEL_MEMORY_SIZE, MAX_TOTAL_MEMORY_SIZE)));
-        //
-        // Dependency injection.
-        pipeline.addLast("umbrellahandler", uh);
-        return pipeline;
-    }
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/netty/ServerStats.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/netty/ServerStats.java b/hedwig-server/src/main/java/org/apache/hedwig/server/netty/ServerStats.java
deleted file mode 100644
index 69ee6ef..0000000
--- a/hedwig-server/src/main/java/org/apache/hedwig/server/netty/ServerStats.java
+++ /dev/null
@@ -1,202 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hedwig.server.netty;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import java.beans.ConstructorProperties;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.apache.hedwig.protocol.PubSubProtocol.OperationType;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Server Stats
- */
-public class ServerStats {
-    private static final Logger LOG = LoggerFactory.getLogger(ServerStats.class);
-    static ServerStats instance = new ServerStats();
-
-    /**
-     * A read view of stats, also used in CompositeViewData to expose to JMX
-     */
-    public static class OpStatData {
-        private final long maxLatency, minLatency;
-        private final double avgLatency;
-        private final long numSuccessOps, numFailedOps;
-        private final String latencyHist;
-
-        @ConstructorProperties({"maxLatency", "minLatency", "avgLatency",
-                                "numSuccessOps", "numFailedOps", "latencyHist"})
-        public OpStatData(long maxLatency, long minLatency, double avgLatency,
-                          long numSuccessOps, long numFailedOps, String latencyHist) {
-            this.maxLatency = maxLatency;
-            this.minLatency = minLatency == Long.MAX_VALUE ? 0 : minLatency;
-            this.avgLatency = avgLatency;
-            this.numSuccessOps = numSuccessOps;
-            this.numFailedOps = numFailedOps;
-            this.latencyHist = latencyHist;
-        }
-
-        public long getMaxLatency() {
-            return maxLatency;
-        }
-
-        public long getMinLatency() {
-            return minLatency;
-        }
-
-        public double getAvgLatency() {
-            return avgLatency;
-        }
-
-        public long getNumSuccessOps() {
-            return numSuccessOps;
-        }
-
-        public long getNumFailedOps() {
-            return numFailedOps;
-        }
-
-        public String getLatencyHist() {
-            return latencyHist;
-        }
-    }
-
-    /**
-     * Operation Statistics
-     */
-    public static class OpStats {
-        static final int NUM_BUCKETS = 3*9 + 2;
-
-        long maxLatency = 0;
-        long minLatency = Long.MAX_VALUE;
-        double totalLatency = 0.0f;
-        long numSuccessOps = 0;
-        long numFailedOps = 0;
-        long[] latencyBuckets = new long[NUM_BUCKETS];
-
-        OpStats() {}
-
-        /**
-         * Increment number of failed operations
-         */
-        synchronized public void incrementFailedOps() {
-            ++numFailedOps;
-        }
-
-        /**
-         * Update Latency
-         */
-        synchronized public void updateLatency(long latency) {
-            if (latency < 0) {
-                // less than 0ms . Ideally this should not happen.
-                // We have seen this latency negative in some cases due to the
-                // behaviors of JVM. Ignoring the statistics updation for such
-                // cases.
-                LOG.warn("Latency time coming negative");
-                return;
-            }
-            totalLatency += latency;
-            ++numSuccessOps;
-            if (latency < minLatency) {
-                minLatency = latency;
-            }
-            if (latency > maxLatency) {
-                maxLatency = latency;
-            }
-            int bucket;
-            if (latency <= 100) { // less than 100ms
-                bucket = (int)(latency / 10);
-            } else if (latency <= 1000) { // 100ms ~ 1000ms
-                bucket = 1 * 9 + (int)(latency / 100);
-            } else if (latency <= 10000) { // 1s ~ 10s
-                bucket = 2 * 9 + (int)(latency / 1000);
-            } else { // more than 10s
-                bucket = 3 * 9 + 1;
-            }
-            ++latencyBuckets[bucket];
-        }
-
-        synchronized public OpStatData toOpStatData() {
-            double avgLatency = numSuccessOps > 0 ? totalLatency / numSuccessOps : 0.0f;
-            StringBuilder sb = new StringBuilder();
-            for (int i=0; i<NUM_BUCKETS; i++) {
-                sb.append(latencyBuckets[i]);
-                if (i != NUM_BUCKETS - 1) {
-                    sb.append(',');
-                }
-            }
-
-            return new OpStatData(maxLatency, minLatency, avgLatency,
-                                  numSuccessOps, numFailedOps, sb.toString());
-        }
-
-    }
-
-    public static ServerStats getInstance() {
-        return instance;
-    }
-
-    protected ServerStats() {
-        stats = new HashMap<OperationType, OpStats>();
-        for (OperationType type : OperationType.values()) {
-            stats.put(type, new OpStats());
-        }
-    }
-    Map<OperationType, OpStats> stats;
-
-
-    AtomicLong numRequestsReceived = new AtomicLong(0);
-    AtomicLong numRequestsRedirect = new AtomicLong(0);
-    AtomicLong numMessagesDelivered = new AtomicLong(0);
-
-    /**
-     * Stats of operations
-     *
-     * @param type
-     *          Operation Type
-     * @return op stats
-     */
-    public OpStats getOpStats(OperationType type) {
-        return stats.get(type);
-    }
-
-    public void incrementRequestsReceived() {
-        numRequestsReceived.incrementAndGet();
-    }
-
-    public void incrementRequestsRedirect() {
-        numRequestsRedirect.incrementAndGet();
-    }
-
-    public void incrementMessagesDelivered() {
-        numMessagesDelivered.incrementAndGet();
-    }
-
-    public long getNumRequestsReceived() {
-        return numRequestsReceived.get();
-    }
-
-    public long getNumRequestsRedirect() {
-        return numRequestsRedirect.get();
-    }
-
-    public long getNumMessagesDelivered() {
-        return numMessagesDelivered.get();
-    }
-}


Mime
View raw message