zookeeper-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From iv...@apache.org
Subject svn commit: r1369778 [2/2] - in /zookeeper/bookkeeper/trunk: ./ hedwig-protocol/src/main/java/org/apache/hedwig/exceptions/ hedwig-protocol/src/main/java/org/apache/hedwig/protocol/ hedwig-protocol/src/main/protobuf/ hedwig-server/src/main/java/org/apa...
Date Mon, 06 Aug 2012 11:01:37 GMT
Added: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/MMTopicManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/MMTopicManager.java?rev=1369778&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/MMTopicManager.java (added)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/MMTopicManager.java Mon Aug  6 11:01:36 2012
@@ -0,0 +1,353 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.hedwig.server.topics;
+
+import java.net.UnknownHostException;
+import java.io.IOException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.SynchronousQueue;
+
+import org.apache.bookkeeper.versioning.Version;
+import org.apache.bookkeeper.versioning.Versioned;
+import org.apache.hedwig.exceptions.PubSubException;
+import org.apache.hedwig.server.common.ServerConfiguration;
+import org.apache.hedwig.server.meta.MetadataManagerFactory;
+import org.apache.hedwig.server.meta.TopicOwnershipManager;
+import org.apache.hedwig.util.Callback;
+import org.apache.hedwig.util.ConcurrencyUtils;
+import org.apache.hedwig.util.Either;
+import org.apache.hedwig.util.HedwigSocketAddress;
+import org.apache.zookeeper.ZooKeeper;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.protobuf.ByteString;
+/**
+ * TopicOwnershipManager based topic manager
+ */
+public class MMTopicManager extends AbstractTopicManager implements TopicManager {
+
+    static Logger logger = LoggerFactory.getLogger(MMTopicManager.class);
+
+    // topic ownership manager
+    private final TopicOwnershipManager mm;
+    // hub server manager
+    private final HubServerManager hubManager;
+
+    private final HubInfo myHubInfo;
+    private final HubLoad myHubLoad;
+
+    // Boolean flag indicating if we should suspend activity. If this is true,
+    // all of the Ops put into the queuer will fail automatically.
+    protected volatile boolean isSuspended = false;
+
+    public MMTopicManager(ServerConfiguration cfg, ZooKeeper zk, 
+                          MetadataManagerFactory mmFactory,
+                          ScheduledExecutorService scheduler)
+            throws UnknownHostException, PubSubException {
+        super(cfg, scheduler);
+        // initialize topic ownership manager
+        this.mm = mmFactory.newTopicOwnershipManager();
+        this.hubManager = new ZkHubServerManager(cfg, zk, addr);
+
+        final SynchronousQueue<Either<HubInfo, PubSubException>> queue =
+            new SynchronousQueue<Either<HubInfo, PubSubException>>();
+
+        myHubLoad = new HubLoad(topics.size());
+        this.hubManager.registerListener(new HubServerManager.ManagerListener() {
+            @Override
+            public void onSuspend() {
+                isSuspended = true;
+            }
+            @Override
+            public void onResume() {
+                isSuspended = false;
+            }
+            @Override
+            public void onShutdown() {
+                // if hub server manager can't work, we had to quit
+                Runtime.getRuntime().exit(1);
+            }
+        });
+        this.hubManager.registerSelf(myHubLoad, new Callback<HubInfo>() {
+            @Override
+            public void operationFinished(final Object ctx, final HubInfo resultOfOperation) {
+                logger.info("Successfully registered hub {} with zookeeper", resultOfOperation);
+                ConcurrencyUtils.put(queue, Either.of(resultOfOperation, (PubSubException) null));
+            }
+            @Override
+            public void operationFailed(Object ctx, PubSubException exception) {
+                logger.error("Failed to register hub with zookeeper", exception);
+                ConcurrencyUtils.put(queue, Either.of((HubInfo)null, exception));
+            }
+        }, null);
+        Either<HubInfo, PubSubException> result = ConcurrencyUtils.take(queue);
+        PubSubException pse = result.right();
+        if (pse != null) {
+            throw pse;
+        }
+        myHubInfo = result.left();
+        logger.info("Start metadata manager based topic manager with hub id : " + myHubInfo);
+    }
+
+    @Override
+    protected void realGetOwner(final ByteString topic, final boolean shouldClaim,
+                                final Callback<HedwigSocketAddress> cb, final Object ctx) {
+        // If operations are suspended due to a ZK client disconnect, just error
+        // out this call and return.
+        if (isSuspended) {
+            cb.operationFailed(ctx, new PubSubException.ServiceDownException(
+                                    "MMTopicManager service is temporarily suspended!"));
+            return;
+        }
+
+        if (topics.contains(topic)) {
+            cb.operationFinished(ctx, addr);
+            return;
+        }
+
+        new MMGetOwnerOp(topic, cb, ctx).read();
+    }
+
+    /**
+     * MetadataManager do topic ledger election using versioned writes.
+     */
+    class MMGetOwnerOp {
+        ByteString topic;
+        Callback<HedwigSocketAddress> cb;
+        Object ctx;
+
+        public MMGetOwnerOp(ByteString topic,
+                            Callback<HedwigSocketAddress> cb, Object ctx) {
+            this.topic = topic;
+            this.cb = cb;
+            this.ctx = ctx;
+        }
+
+        protected void read() {
+            mm.readOwnerInfo(topic, new Callback<Versioned<HubInfo>>() {
+                @Override
+                public void operationFinished(final Object ctx, final Versioned<HubInfo> owner) {
+                    if (null == owner) {
+                        logger.info("{} : No owner found for topic {}",
+                                    new Object[] { addr, topic.toStringUtf8() });
+                        // no data found
+                        choose(null);
+                        return;
+                    }
+                    final Version ownerVersion = owner.getVersion();
+                    if (null == owner.getValue()) {
+                        logger.info("{} : Invalid owner found for topic {}",
+                                    new Object[] { addr, topic.toStringUtf8() });
+                        choose(ownerVersion);
+                        return;
+                    }
+                    final HubInfo hub = owner.getValue();
+                    logger.info("{} : Read owner of topic {} : {}",
+                                new Object[] { addr, topic.toStringUtf8(), hub });
+
+                    logger.info("{}, {}", new Object[] { hub, myHubInfo });
+
+                    if (hub.getAddress().equals(addr)) {
+                        if (myHubInfo.getZxid() == hub.getZxid()) {
+                            claimTopic(ctx);
+                            return;
+                        } else {
+                            choose(ownerVersion);
+                            return;
+                        }
+                    }
+
+                    logger.info("{} : Check whether owner {} for topic {} is still alive.",
+                                new Object[] { addr, hub, topic.toStringUtf8() });
+                    hubManager.isHubAlive(hub, new Callback<Boolean>() {
+                        @Override
+                        public void operationFinished(Object ctx, Boolean isAlive) {
+                            if (isAlive) {
+                                cb.operationFinished(ctx, hub.getAddress());
+                            } else {
+                                choose(ownerVersion);
+                            }
+                        }
+                        @Override
+                        public void operationFailed(Object ctx, PubSubException pse) {
+                            cb.operationFailed(ctx, pse);
+                        }
+                    }, ctx);
+                }
+
+                @Override
+                public void operationFailed(Object ctx, PubSubException exception) {
+                    cb.operationFailed(ctx, new PubSubException.ServiceDownException(
+                                       "Could not read ownership for topic " + topic.toStringUtf8() + " : "
+                                       + exception.getMessage()));
+                }
+            }, ctx);
+        }
+
+        public void claim(final Version prevOwnerVersion) {
+            logger.info("{} : claiming topic {} 's owner to be {}",
+                        new Object[] { addr, topic.toStringUtf8(), myHubInfo });
+            mm.writeOwnerInfo(topic, myHubInfo, prevOwnerVersion, new Callback<Version>() {
+                @Override
+                public void operationFinished(Object ctx, Version newVersion) {
+                    claimTopic(ctx);
+                }
+                @Override
+                public void operationFailed(Object ctx, PubSubException exception) {
+                    if (exception instanceof PubSubException.NoTopicOwnerInfoException ||
+                        exception instanceof PubSubException.BadVersionException) {
+                        // some one has updated the owner
+                        logger.info("{} : Some one has claimed topic {} 's owner. Try to read the owner again.",
+                                    new Object[] { addr, topic.toStringUtf8() });
+                        read();
+                        return;
+                    }
+                    cb.operationFailed(ctx, new PubSubException.ServiceDownException(
+                                       "Exception when writing owner info to claim ownership of topic "
+                                       + topic.toStringUtf8() + " : " + exception.getMessage()));
+                }
+            }, ctx);
+        }
+
+        protected void claimTopic(Object ctx) {
+            logger.info("{} : claimed topic {} 's owner to be {}",
+                        new Object[] { addr, topic.toStringUtf8(), myHubInfo });
+            notifyListenersAndAddToOwnedTopics(topic, cb, ctx);
+            hubManager.uploadSelfLoadData(myHubLoad.setNumTopics(topics.size()));
+        }
+
+        public void choose(final Version prevOwnerVersion) {
+            hubManager.chooseLeastLoadedHub(new Callback<HubInfo>() {
+                @Override
+                public void operationFinished(Object ctx, HubInfo owner) {
+                    logger.info("{} : Least loaded owner {} is chosen for topic {}",
+                                new Object[] { addr, owner, topic.toStringUtf8() });
+                    if (owner.getAddress().equals(addr)) {
+                        claim(prevOwnerVersion);
+                    } else {
+                        setOwner(owner, prevOwnerVersion);
+                    }
+                }
+                @Override
+                public void operationFailed(Object ctx, PubSubException pse) {
+                    logger.error("Failed to choose least loaded hub server for topic "
+                               + topic.toStringUtf8() + " : ", pse);
+                    cb.operationFailed(ctx, pse);
+                }
+            }, null);
+        }
+
+        public void setOwner(final HubInfo ownerHubInfo, final Version prevOwnerVersion) {
+            logger.info("{} : setting topic {} 's owner to be {}",
+                        new Object[] { addr, topic.toStringUtf8(), ownerHubInfo });
+            mm.writeOwnerInfo(topic, ownerHubInfo, prevOwnerVersion, new Callback<Version>() {
+                @Override
+                public void operationFinished(Object ctx, Version newVersion) {
+                    logger.info("{} : Set topic {} 's owner to be {}",
+                                new Object[] { addr, topic.toStringUtf8(), ownerHubInfo });
+                    cb.operationFinished(ctx, ownerHubInfo.getAddress());
+                }
+                @Override
+                public void operationFailed(Object ctx, PubSubException exception) {
+                    if (exception instanceof PubSubException.NoTopicOwnerInfoException ||
+                        exception instanceof PubSubException.BadVersionException) {
+                        // some one has updated the owner
+                        logger.info("{} : Some one has set topic {} 's owner. Try to read the owner again.",
+                                    new Object[] { addr, topic.toStringUtf8() });
+                        read();
+                        return;
+                    }
+                    cb.operationFailed(ctx, new PubSubException.ServiceDownException(
+                                       "Exception when writing owner info to claim ownership of topic "
+                                       + topic.toStringUtf8() + " : " + exception.getMessage()));
+                }
+            }, ctx);
+        }
+    }
+
+    @Override
+    protected void postReleaseCleanup(final ByteString topic,
+                                      final Callback<Void> cb, final Object ctx) {
+        mm.readOwnerInfo(topic, new Callback<Versioned<HubInfo>>() {
+            @Override
+            public void operationFinished(Object ctx, Versioned<HubInfo> owner) {
+                if (null == owner) {
+                    // Node has somehow disappeared from under us, live with it
+                    logger.warn("No owner info found when cleaning up topic " + topic.toStringUtf8());
+                    cb.operationFinished(ctx, null);
+                    return;
+                }
+                // no valid hub info found, just return
+                if (null == owner.getValue()) {
+                    logger.warn("No valid owner info found when cleaning up topic " + topic.toStringUtf8());
+                    cb.operationFinished(ctx, null);
+                    return;
+                }
+                HedwigSocketAddress ownerAddr = owner.getValue().getAddress();
+                if (!ownerAddr.equals(addr)) {
+                    logger.warn("Wanted to clean up self owner info for topic " + topic.toStringUtf8()
+                                + " but owner " + owner + " found, leaving untouched");
+                    // Not our node, someone else's, leave it alone
+                    cb.operationFinished(ctx, null);
+                    return;
+                }
+
+                mm.deleteOwnerInfo(topic, owner.getVersion(), new Callback<Void>() {
+                    @Override
+                    public void operationFinished(Object ctx, Void result) {
+                        cb.operationFinished(ctx, null);
+                    }
+                    @Override
+                    public void operationFailed(Object ctx, PubSubException exception) {
+                        if (exception instanceof PubSubException.NoTopicOwnerInfoException) {
+                            logger.warn("Wanted to clean up self owner info for topic " + topic.toStringUtf8()
+                                      + " but it has been removed.");
+                            cb.operationFinished(ctx, null);
+                            return;
+                        }
+                        logger.error("Exception when deleting self-ownership metadata for topic "
+                                     + topic.toStringUtf8() + " : ", exception);
+                        cb.operationFailed(ctx, new PubSubException.ServiceDownException(exception));
+                    }
+                }, ctx);
+            }
+            @Override
+            public void operationFailed(Object ctx, PubSubException exception) {
+                logger.error("Exception when cleaning up owner info of topic " + topic.toStringUtf8() + " : ", exception);
+                cb.operationFailed(ctx, new PubSubException.ServiceDownException(exception));
+            }
+        }, ctx);
+    }
+
+    @Override
+    public void stop() {
+        // we just unregister it with zookeeper to make it unavailable from hub servers list
+        try {
+            hubManager.unregisterSelf();
+        } catch (IOException e) {
+            logger.error("Error unregistering hub server " + myHubInfo + " : ", e);
+        }
+        super.stop();
+    }
+
+}

Added: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/ZkHubServerManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/ZkHubServerManager.java?rev=1369778&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/ZkHubServerManager.java (added)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/ZkHubServerManager.java Mon Aug  6 11:01:36 2012
@@ -0,0 +1,310 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server.topics;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.hedwig.exceptions.PubSubException;
+import org.apache.hedwig.server.common.ServerConfiguration;
+import org.apache.hedwig.util.Callback;
+import org.apache.hedwig.util.HedwigSocketAddress;
+import org.apache.hedwig.zookeeper.SafeAsyncZKCallback;
+import org.apache.hedwig.zookeeper.SafeAsyncZKCallback.StatCallback;
+import org.apache.hedwig.zookeeper.ZkUtils;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.Code;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.Stat;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * ZooKeeper based hub server manager.
+ */
+class ZkHubServerManager implements HubServerManager {
+
+    static Logger logger = LoggerFactory.getLogger(ZkHubServerManager.class);
+
+    final Random rand = new Random();
+
+    private final ServerConfiguration conf;
+    private final ZooKeeper zk;
+    private final HedwigSocketAddress addr;
+    private final String ephemeralNodePath;
+    private final String hubNodesPath;
+
+    // hub info structure represent itself
+    protected HubInfo myHubInfo;
+    protected volatile boolean isSuspended = false;
+    protected ManagerListener listener = null;
+
+    // upload hub server load to zookeeper
+    StatCallback loadReportingStatCallback = new StatCallback() {
+        @Override
+        public void safeProcessResult(int rc, String path, Object ctx, Stat stat) {
+            if (rc != KeeperException.Code.OK.intValue()) {
+                logger.warn("Failed to update load information of hub {} in zk", myHubInfo);
+            }
+        }
+    };
+
+    /**
+     * Watcher to monitor available hub server list.
+     */
+    class ZkHubsWatcher implements Watcher {
+        @Override
+        public void process(WatchedEvent event) {
+            if (event.getType().equals(Watcher.Event.EventType.None)) {
+                if (event.getState().equals(
+                        Watcher.Event.KeeperState.Disconnected)) {
+                    logger.warn("ZK client has been disconnected to the ZK server!");
+                    isSuspended = true;
+                    if (null != listener) {
+                        listener.onSuspend();
+                    }
+                } else if (event.getState().equals(
+                        Watcher.Event.KeeperState.SyncConnected)) {
+                    if (isSuspended) {
+                        logger.info("ZK client has been reconnected to the ZK server!");
+                    }
+                    isSuspended = false;
+                    if (null != listener) {
+                        listener.onResume();
+                    }
+                }
+            }
+            if (event.getState().equals(Watcher.Event.KeeperState.Expired)) {
+                logger.error("ZK client connection to the ZK server has expired.!");
+                if (null != listener) {
+                    listener.onShutdown();
+                }
+            }
+        }
+    }
+
+    public ZkHubServerManager(ServerConfiguration conf,
+                              ZooKeeper zk,
+                              HedwigSocketAddress addr) {
+        this.conf = conf;
+        this.zk = zk;
+        this.addr = addr;
+
+        // znode path to store all available hub servers
+        this.hubNodesPath = this.conf.getZkHostsPrefix(new StringBuilder()).toString();
+        // the node's ephemeral node path
+        this.ephemeralNodePath = getHubZkNodePath(addr);
+        // register available hub servers list watcher
+        zk.register(new ZkHubsWatcher());
+    }
+
+    @Override
+    public void registerListener(ManagerListener listener) {
+        this.listener = listener;
+    }
+
+    /**
+     * Get the znode path identifying a hub server.
+     *
+     * @param node
+     *          Hub Server Address
+     * @return znode path identifying the hub server.
+     */
+    private String getHubZkNodePath(HedwigSocketAddress node) {
+        String nodePath = this.conf.getZkHostsPrefix(new StringBuilder())
+                          .append("/").append(node).toString();
+        return nodePath;
+    }
+
+    @Override
+    public void registerSelf(final HubLoad selfData, final Callback<HubInfo> callback, Object ctx) {
+        byte[] loadDataBytes = selfData.toString().getBytes();
+        ZkUtils.createFullPathOptimistic(zk, ephemeralNodePath, loadDataBytes, Ids.OPEN_ACL_UNSAFE,
+                                         CreateMode.EPHEMERAL, new SafeAsyncZKCallback.StringCallback() {
+            @Override
+            public void safeProcessResult(int rc, String path, Object ctx, String name) {
+                if (rc == Code.OK.intValue()) {
+                    // now we are here
+                    zk.exists(ephemeralNodePath, false, new SafeAsyncZKCallback.StatCallback() {
+                        @Override
+                        public void safeProcessResult(int rc, String path, Object ctx, Stat stat) {
+                            if (rc == Code.OK.intValue()) {
+                                myHubInfo = new HubInfo(addr, stat.getCzxid());
+                                callback.operationFinished(ctx, myHubInfo);
+                                return;
+                            } else {
+                                callback.operationFailed(ctx,
+                                    new PubSubException.ServiceDownException(
+                                        "I can't state my hub node after I created it : "
+                                        + ephemeralNodePath));
+                                return;
+                            }
+                        }
+                    }, ctx);
+                    return;
+                }
+                if (rc != Code.NODEEXISTS.intValue()) {
+                    KeeperException ke = ZkUtils .logErrorAndCreateZKException(
+                            "Could not create ephemeral node to register hub", ephemeralNodePath, rc);
+                    callback.operationFailed(ctx, new PubSubException.ServiceDownException(ke));
+                    return;
+                }
+
+                logger.info("Found stale ephemeral node while registering hub with ZK, deleting it");
+
+                // Node exists, lets try to delete it and retry
+                zk.delete(ephemeralNodePath, -1, new SafeAsyncZKCallback.VoidCallback() {
+                    @Override
+                    public void safeProcessResult(int rc, String path, Object ctx) {
+                        if (rc == Code.OK.intValue() || rc == Code.NONODE.intValue()) {
+                            registerSelf(selfData, callback, ctx);
+                            return;
+                        }
+                        KeeperException ke = ZkUtils.logErrorAndCreateZKException(
+                                "Could not delete stale ephemeral node to register hub", ephemeralNodePath, rc);
+                        callback.operationFailed(ctx, new PubSubException.ServiceDownException(ke));
+                        return;
+                    }
+                }, ctx);
+            }
+        }, ctx);
+    }
+
+    @Override
+    public void unregisterSelf() throws IOException {
+        try {
+            zk.delete(ephemeralNodePath, -1);
+        } catch (InterruptedException e) {
+            throw new IOException(e);
+        } catch (KeeperException e) {
+            throw new IOException(e);
+        }
+    }
+
+
+    @Override
+    public void uploadSelfLoadData(HubLoad selfLoad) {
+        if (logger.isDebugEnabled()) {
+            logger.debug("Reporting hub load of {} : {}", myHubInfo, selfLoad);
+        }
+        byte[] loadDataBytes = selfLoad.toString().getBytes();
+        zk.setData(ephemeralNodePath, loadDataBytes, -1,
+                   loadReportingStatCallback, null);
+    }
+
+    @Override
+    public void isHubAlive(final HubInfo hub, final Callback<Boolean> callback, Object ctx) {
+        zk.exists(getHubZkNodePath(hub.getAddress()), false, new SafeAsyncZKCallback.StatCallback() {
+            @Override
+            public void safeProcessResult(int rc, String path, Object ctx, Stat stat) {
+                if (rc == Code.NONODE.intValue()) {
+                    callback.operationFinished(ctx, false);
+                } else if (rc == Code.OK.intValue()) {
+                    if (hub.getZxid() == stat.getCzxid()) {
+                        callback.operationFinished(ctx, true);
+                    } else {
+                        callback.operationFinished(ctx, false);
+                    }
+                } else {
+                    callback.operationFailed(ctx, new PubSubException.ServiceDownException(
+                        "Failed to check whether hub server " + hub + " is alive!"));
+                }
+            }
+        }, ctx);
+    }
+
+    @Override
+    public void chooseLeastLoadedHub(final Callback<HubInfo> callback, Object ctx) {
+        // Get the list of existing hosts
+        zk.getChildren(hubNodesPath, false, new SafeAsyncZKCallback.ChildrenCallback() {
+            @Override
+            public void safeProcessResult(int rc, String path, Object ctx,
+                                          List<String> children) {
+                if (rc != Code.OK.intValue()) {
+                    KeeperException e = ZkUtils.logErrorAndCreateZKException(
+                        "Could not get list of available hubs", path, rc);
+                    callback.operationFailed(ctx, new PubSubException.ServiceDownException(e));
+                    return;
+                }
+                chooseLeastLoadedNode(children, callback, ctx);
+            }
+        }, ctx);
+    }
+
+    private void chooseLeastLoadedNode(final List<String> children,
+                                       final Callback<HubInfo> callback, Object ctx) {
+        SafeAsyncZKCallback.DataCallback dataCallback = new SafeAsyncZKCallback.DataCallback() {
+            int numResponses = 0;
+            HubLoad minLoad = HubLoad.MAX_LOAD;
+            String leastLoaded = null;
+            long leastLoadedCzxid = 0;
+
+            @Override
+            public void safeProcessResult(int rc, String path, Object ctx,
+                                          byte[] data, Stat stat) {
+                synchronized (this) {
+                    if (rc == KeeperException.Code.OK.intValue()) {
+                        try {
+                            HubLoad load = HubLoad.parse(new String(data));
+                            if (logger.isDebugEnabled()) {
+                                logger.debug("Found server " + ctx + " with load: " + load);
+                            }
+                            int compareRes = load.compareTo(minLoad);
+                            if (compareRes < 0 || (compareRes == 0 && rand.nextBoolean())) {
+                                minLoad = load;
+                                leastLoaded = (String) ctx;
+                                leastLoadedCzxid = stat.getCzxid();
+                            }
+                        } catch (HubLoad.InvalidHubLoadException e) {
+                            logger.warn("Corrupted load information from hub : " + ctx);
+                            // some corrupted data, we'll just ignore this hub
+                        }
+                    }
+                    numResponses++;
+
+                    if (numResponses == children.size()) {
+                        if (leastLoaded == null) {
+                            callback.operationFailed(ctx, 
+                                new PubSubException.ServiceDownException("No hub available"));
+                            return;
+                        }
+                        try {
+                            HedwigSocketAddress owner = new HedwigSocketAddress(leastLoaded);
+                            callback.operationFinished(ctx, new HubInfo(owner, leastLoadedCzxid));
+                        } catch (Throwable t) {
+                            callback.operationFailed(ctx,
+                                new PubSubException.ServiceDownException("Least loaded hub server "
+                                                                       + leastLoaded + " is invalid."));
+                        }
+                    }
+                }
+            }
+        };
+
+        for (String child : children) {
+            zk.getData(conf.getZkHostsPrefix(new StringBuilder()).append("/").append(child).toString(), false,
+                       dataCallback, child);
+        }
+    }
+}

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/ZkTopicManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/ZkTopicManager.java?rev=1369778&r1=1369777&r2=1369778&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/ZkTopicManager.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/ZkTopicManager.java Mon Aug  6 11:01:36 2012
@@ -18,8 +18,7 @@
 package org.apache.hedwig.server.topics;
 
 import java.net.UnknownHostException;
-import java.util.List;
-import java.util.Random;
+import java.io.IOException;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.SynchronousQueue;
 
@@ -27,10 +26,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.KeeperException.Code;
+import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.ZooDefs.Ids;
 import org.apache.zookeeper.data.Stat;
 
@@ -53,22 +50,17 @@ import org.apache.hedwig.zookeeper.SafeA
 public class ZkTopicManager extends AbstractTopicManager implements TopicManager {
 
     static Logger logger = LoggerFactory.getLogger(ZkTopicManager.class);
-    Random rand = new Random();
 
     /**
      * Persistent storage for topic metadata.
      */
     private ZooKeeper zk;
-    String ephemeralNodePath;
 
-    StatCallback loadReportingStatCallback = new StatCallback() {
-        @Override
-        public void safeProcessResult(int rc, String path, Object ctx, Stat stat) {
-            if (rc != KeeperException.Code.OK.intValue()) {
-                logger.warn("Failed to update load information in zk");
-            }
-        }
-    };
+    // hub server manager
+    private final HubServerManager hubManager;
+
+    private final HubInfo myHubInfo;
+    private final HubLoad myHubLoad;
 
     // Boolean flag indicating if we should suspend activity. If this is true,
     // all of the Ops put into the queuer will fail automatically.
@@ -84,94 +76,46 @@ public class ZkTopicManager extends Abst
 
         super(cfg, scheduler);
         this.zk = zk;
-        this.ephemeralNodePath = cfg.getZkHostsPrefix(new StringBuilder()).append("/").append(addr).toString();
+        this.hubManager = new ZkHubServerManager(cfg, zk, addr);
 
-        zk.register(new Watcher() {
+        myHubLoad = new HubLoad(topics.size());
+        this.hubManager.registerListener(new HubServerManager.ManagerListener() {
             @Override
-            public void process(WatchedEvent event) {
-                if (event.getType().equals(Watcher.Event.EventType.None)) {
-                    if (event.getState().equals(Watcher.Event.KeeperState.Disconnected)) {
-                        logger.warn("ZK client has been disconnected to the ZK server!");
-                        isSuspended = true;
-                    } else if (event.getState().equals(Watcher.Event.KeeperState.SyncConnected)) {
-                        if (isSuspended) {
-                            logger.info("ZK client has been reconnected to the ZK server!");
-                        }
-                        isSuspended = false;
-                    }
-                }
-                // Check for expired connection.
-                if (event.getState().equals(Watcher.Event.KeeperState.Expired)) {
-                    logger.error("ZK client connection to the ZK server has expired!");
-                    Runtime.getRuntime().exit(1);
-                }
+            public void onSuspend() {
+                isSuspended = true;
             }
-        });
-        final SynchronousQueue<Either<Void, PubSubException>> queue = new SynchronousQueue<Either<Void, PubSubException>>();
-
-        registerWithZookeeper(new Callback<Void>() {
             @Override
-            public void operationFailed(Object ctx, PubSubException exception) {
-                logger.error("Failed to register hub with zookeeper", exception);
-                ConcurrencyUtils.put(queue, Either.of((Void) null, exception));
+            public void onResume() {
+                isSuspended = false;
             }
+            @Override
+            public void onShutdown() {
+                // if hub server manager can't work, we had to quit
+                Runtime.getRuntime().exit(1);
+            }
+        });
 
+        final SynchronousQueue<Either<HubInfo, PubSubException>> queue =
+            new SynchronousQueue<Either<HubInfo, PubSubException>>();
+        this.hubManager.registerSelf(myHubLoad, new Callback<HubInfo>() {
             @Override
-            public void operationFinished(Object ctx, Void resultOfOperation) {
-                logger.info("Successfully registered hub with zookeeper");
+            public void operationFinished(final Object ctx, final HubInfo resultOfOperation) {
+                logger.info("Successfully registered hub {} with zookeeper", resultOfOperation);
                 ConcurrencyUtils.put(queue, Either.of(resultOfOperation, (PubSubException) null));
             }
+            @Override
+            public void operationFailed(Object ctx, PubSubException exception) {
+                logger.error("Failed to register hub with zookeeper", exception);
+                ConcurrencyUtils.put(queue, Either.of((HubInfo)null, exception));
+            }
         }, null);
 
-        PubSubException pse = ConcurrencyUtils.take(queue).right();
-
+        Either<HubInfo, PubSubException> result = ConcurrencyUtils.take(queue);
+        PubSubException pse = result.right();
         if (pse != null) {
             throw pse;
         }
-    }
-
-    void registerWithZookeeper(final Callback<Void> callback, Object ctx) {
-
-        ZkUtils.createFullPathOptimistic(zk, ephemeralNodePath, getCurrentLoadData(), Ids.OPEN_ACL_UNSAFE,
-        CreateMode.EPHEMERAL, new SafeAsyncZKCallback.StringCallback() {
-
-            @Override
-            public void safeProcessResult(int rc, String path, Object ctx, String name) {
-                if (rc == Code.OK.intValue()) {
-                    callback.operationFinished(ctx, null);
-                    return;
-                }
-                if (rc != Code.NODEEXISTS.intValue()) {
-                    KeeperException ke = ZkUtils.logErrorAndCreateZKException(
-                                             "Could not create ephemeral node to register hub", ephemeralNodePath, rc);
-                    callback.operationFailed(ctx, new PubSubException.ServiceDownException(ke));
-                    return;
-                }
-
-                logger.info("Found stale ephemeral node while registering hub with ZK, deleting it");
-
-                // Node exists, lets try to delete it and retry
-                zk.delete(ephemeralNodePath, -1, new SafeAsyncZKCallback.VoidCallback() {
-                    @Override
-                    public void safeProcessResult(int rc, String path, Object ctx) {
-                        if (rc == Code.OK.intValue() || rc == Code.NONODE.intValue()) {
-                            registerWithZookeeper(callback, ctx);
-                            return;
-                        }
-                        KeeperException ke = ZkUtils.logErrorAndCreateZKException(
-                                                 "Could not delete stale ephemeral node to register hub", ephemeralNodePath, rc);
-                        callback.operationFailed(ctx, new PubSubException.ServiceDownException(ke));
-                        return;
-
-                    }
-                }, ctx);
-
-            }
-        }, null);
-    }
-
-    void unregisterWithZookeeper() throws InterruptedException, KeeperException {
-        zk.delete(ephemeralNodePath, -1);
+        myHubInfo = result.left();
     }
 
     String hubPath(ByteString topic) {
@@ -215,71 +159,24 @@ public class ZkTopicManager extends Abst
         }
 
         public void choose() {
-            // Get the list of existing hosts
-            String registeredHostsPath = cfg.getZkHostsPrefix(new StringBuilder()).toString();
-            zk.getChildren(registeredHostsPath, false, new SafeAsyncZKCallback.ChildrenCallback() {
+            hubManager.chooseLeastLoadedHub(new Callback<HubInfo>() {
                 @Override
-                public void safeProcessResult(int rc, String path, Object ctx, List<String> children) {
-                    if (rc != Code.OK.intValue()) {
-                        KeeperException e = ZkUtils.logErrorAndCreateZKException(
-                                                "Could not get list of available hubs", path, rc);
-                        cb.operationFailed(ctx, new PubSubException.ServiceDownException(e));
-                        return;
+                public void operationFinished(Object ctx, HubInfo owner) {
+                    logger.info("{} : Least loaded owner {} is chosen for topic {}",
+                                new Object[] { addr, owner, topic.toStringUtf8() });
+                    if (owner.getAddress().equals(addr)) {
+                        claim();
+                    } else {
+                        cb.operationFinished(ZkGetOwnerOp.this.ctx, owner.getAddress());
                     }
-                    chooseLeastLoadedNode(children);
                 }
-            }, null);
-        }
-
-        public void chooseLeastLoadedNode(final List<String> children) {
-            DataCallback dataCallback = new DataCallback() {
-                int numResponses = 0;
-                int minLoad = Integer.MAX_VALUE;
-                String leastLoaded = null;
-
                 @Override
-                public void safeProcessResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
-                    synchronized (this) {
-                        if (rc == KeeperException.Code.OK.intValue()) {
-                            try {
-                                int load = Integer.parseInt(new String(data));
-                                if (logger.isDebugEnabled()) {
-                                    logger.debug("Found server: " + ctx + " with load: " + load);
-                                }
-                                if (load < minLoad  || (load == minLoad && rand.nextBoolean())) {
-                                    minLoad = load;
-                                    leastLoaded = (String) ctx;
-                                }
-                            } catch (NumberFormatException e) {
-                                logger.warn("Corrupted load information from hub:" + ctx);
-                                // some corrupted data, we'll just ignore this
-                                // hub
-                            }
-                        }
-                        numResponses++;
-
-                        if (numResponses == children.size()) {
-                            if (leastLoaded == null) {
-                                cb.operationFailed(ZkGetOwnerOp.this.ctx, new PubSubException.ServiceDownException(
-                                                       "No hub available"));
-                                return;
-                            }
-                            HedwigSocketAddress owner = new HedwigSocketAddress(leastLoaded);
-                            if (owner.equals(addr)) {
-                                claim();
-                            } else {
-                                cb.operationFinished(ZkGetOwnerOp.this.ctx, owner);
-                            }
-                        }
-                    }
-
+                public void operationFailed(Object ctx, PubSubException pse) {
+                    logger.error("Failed to choose least loaded hub server for topic "
+                               + topic.toStringUtf8() + " : ", pse);
+                    cb.operationFailed(ctx, pse);
                 }
-            };
-
-            for (String child : children) {
-                zk.getData(cfg.getZkHostsPrefix(new StringBuilder()).append("/").append(child).toString(), false,
-                           dataCallback, child);
-            }
+            }, null);
         }
 
         public void claimOrChoose() {
@@ -307,17 +204,21 @@ public class ZkTopicManager extends Abst
                     }
 
                     // successfully did a read
-                    HedwigSocketAddress owner = new HedwigSocketAddress(new String(data));
-                    if (!owner.equals(addr)) {
-                        if (logger.isDebugEnabled()) {
-                            logger.debug("topic: " + topic.toStringUtf8() + " belongs to someone else: " + owner);
+                    try {
+                        HubInfo ownerHubInfo = HubInfo.parse(new String(data));
+                        HedwigSocketAddress owner = ownerHubInfo.getAddress();
+                        if (!owner.equals(addr)) {
+                            if (logger.isDebugEnabled()) {
+                                logger.debug("topic: " + topic.toStringUtf8() + " belongs to someone else: " + owner);
+                            }
+                            cb.operationFinished(ctx, owner);
+                            return;
                         }
-                        cb.operationFinished(ctx, owner);
-                        return;
+                        logger.info("Discovered stale self-node for topic: " + topic.toStringUtf8() + ", will delete it");
+                    } catch (HubInfo.InvalidHubInfoException ihie) {
+                        logger.info("Discovered invalid hub info for topic: " + topic.toStringUtf8() + ", will delete it : ", ihie);
                     }
 
-                    logger.info("Discovered stale self-node for topic: " + topic.toStringUtf8() + ", will delete it");
-
                     // we must have previously failed and left a
                     // residual ephemeral node here, so we must
                     // delete it (clean it up) and then
@@ -343,7 +244,7 @@ public class ZkTopicManager extends Abst
                 logger.debug("claiming topic: " + topic.toStringUtf8());
             }
 
-            ZkUtils.createFullPathOptimistic(zk, hubPath, addr.toString().getBytes(), Ids.OPEN_ACL_UNSAFE,
+            ZkUtils.createFullPathOptimistic(zk, hubPath, myHubInfo.toString().getBytes(), Ids.OPEN_ACL_UNSAFE,
             CreateMode.EPHEMERAL, new SafeAsyncZKCallback.StringCallback() {
 
                 @Override
@@ -353,7 +254,7 @@ public class ZkTopicManager extends Abst
                             logger.debug("claimed topic: " + topic.toStringUtf8());
                         }
                         notifyListenersAndAddToOwnedTopics(topic, cb, ctx);
-                        updateLoadInformation();
+                        hubManager.uploadSelfLoadData(myHubLoad.setNumTopics(topics.size()));
                     } else if (rc == Code.NODEEXISTS.intValue()) {
                         read();
                     } else {
@@ -368,20 +269,6 @@ public class ZkTopicManager extends Abst
 
     }
 
-    byte[] getCurrentLoadData() {
-        // For now, using the number of topics as an indicator of load
-        // information
-        return (topics.size() + "").getBytes();
-    }
-
-    void updateLoadInformation() {
-        byte[] currentLoad = getCurrentLoadData();
-        if (logger.isDebugEnabled()) {
-            logger.debug("Reporting load of " + new String(currentLoad));
-        }
-        zk.setData(ephemeralNodePath, currentLoad, -1, loadReportingStatCallback, null);
-    }
-
     @Override
     protected void postReleaseCleanup(final ByteString topic, final Callback<Void> cb, Object ctx) {
 
@@ -403,11 +290,20 @@ public class ZkTopicManager extends Abst
                     return;
                 }
 
-                HedwigSocketAddress owner = new HedwigSocketAddress(new String(data));
-                if (!owner.equals(addr)) {
-                    logger.warn("Wanted to delete self-node for topic: " + topic.toStringUtf8() + " but node for "
-                                + owner + " found, leaving untouched");
-                    // Not our node, someone else's, leave it alone
+                String hubInfoStr = new String(data);
+                try {
+                    HubInfo ownerHubInfo = HubInfo.parse(hubInfoStr);
+                    HedwigSocketAddress owner = ownerHubInfo.getAddress();
+                    if (!owner.equals(addr)) {
+                        logger.warn("Wanted to delete self-node for topic: " + topic.toStringUtf8() + " but node for "
+                                    + owner + " found, leaving untouched");
+                        // Not our node, someone else's, leave it alone
+                        cb.operationFinished(ctx, null);
+                        return;
+                    }
+                } catch (HubInfo.InvalidHubInfoException ihie) {
+                    logger.info("Invalid hub info " + hubInfoStr + " found when release topic "
+                              + topic.toStringUtf8() + ". Leaving untouched until next acquire action.");
                     cb.operationFinished(ctx, null);
                     return;
                 }
@@ -434,10 +330,8 @@ public class ZkTopicManager extends Abst
     public void stop() {
         // we just unregister it with zookeeper to make it unavailable from hub servers list
         try {
-            unregisterWithZookeeper();
-        } catch (InterruptedException e) {
-            logger.error("Error unregistering hub server :", e);
-        } catch (KeeperException e) {
+            hubManager.unregisterSelf();
+        } catch (IOException e) {
             logger.error("Error unregistering hub server :", e);
         }
         super.stop();

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/meta/TestMetadataManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/meta/TestMetadataManager.java?rev=1369778&r1=1369777&r2=1369778&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/meta/TestMetadataManager.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/meta/TestMetadataManager.java Mon Aug  6 11:01:36 2012
@@ -34,8 +34,10 @@ import org.apache.hedwig.protocol.PubSub
 import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId;
 import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionState;
 import org.apache.hedwig.server.subscriptions.InMemorySubscriptionState;
+import org.apache.hedwig.server.topics.HubInfo;
 import org.apache.hedwig.server.meta.MetadataManagerFactory;
 import org.apache.hedwig.util.Either;
+import org.apache.hedwig.util.HedwigSocketAddress;
 
 import org.junit.Test;
 import org.junit.Assert;
@@ -47,6 +49,92 @@ public class TestMetadataManager extends
     }
 
     @Test
+    public void testOwnerInfo() throws Exception {
+        TopicOwnershipManager toManager = metadataManagerFactory.newTopicOwnershipManager();
+
+        ByteString topic = ByteString.copyFromUtf8("testOwnerInfo");
+        StubCallback<Versioned<HubInfo>> readCallback = new StubCallback<Versioned<HubInfo>>();
+        StubCallback<Version> writeCallback = new StubCallback<Version>();
+        StubCallback<Void> deleteCallback = new StubCallback<Void>();
+
+        Either<Version, PubSubException> res;
+        HubInfo owner = new HubInfo(new HedwigSocketAddress("127.0.0.1", 8008), 999);
+
+        // Write non-existed owner info
+        toManager.writeOwnerInfo(topic, owner, null, writeCallback, null);
+        res = writeCallback.queue.take();
+        Assert.assertEquals(null, res.right());
+        Version v1 = res.left();
+
+        // read owner info
+        toManager.readOwnerInfo(topic, readCallback, null);
+        Versioned<HubInfo> hubInfo = readCallback.queue.take().left();
+        Assert.assertEquals(Version.Occurred.CONCURRENTLY, v1.compare(hubInfo.getVersion()));
+        Assert.assertEquals(owner, hubInfo.getValue());
+
+        HubInfo newOwner = new HubInfo(new HedwigSocketAddress("127.0.0.1", 8008), 1000);
+
+        // write exsited owner info with null version
+        toManager.writeOwnerInfo(topic, newOwner, null, writeCallback, null);
+        res = writeCallback.queue.take();
+        Assert.assertNotNull(res.right());
+        Assert.assertTrue(res.right() instanceof PubSubException.TopicOwnerInfoExistsException);
+
+        // write existed owner info with right version
+        toManager.writeOwnerInfo(topic, newOwner, v1, writeCallback, null);
+        res = writeCallback.queue.take();
+        Assert.assertEquals(null, res.right());
+        Version v2 = res.left();
+        Assert.assertEquals(Version.Occurred.AFTER, v2.compare(v1));
+
+        // read owner info
+        toManager.readOwnerInfo(topic, readCallback, null);
+        hubInfo = readCallback.queue.take().left();
+        Assert.assertEquals(Version.Occurred.CONCURRENTLY, v2.compare(hubInfo.getVersion()));
+        Assert.assertEquals(newOwner, hubInfo.getValue());
+
+        HubInfo newOwner2 = new HubInfo(new HedwigSocketAddress("127.0.0.1", 8008), 1001);
+
+        // write existed owner info with bad version
+        toManager.writeOwnerInfo(topic, newOwner2, v1,
+                                 writeCallback, null);
+        res = writeCallback.queue.take();
+        Assert.assertNotNull(res.right());
+        Assert.assertTrue(res.right() instanceof PubSubException.BadVersionException);
+
+        // read owner info
+        toManager.readOwnerInfo(topic, readCallback, null);
+        hubInfo = readCallback.queue.take().left();
+        Assert.assertEquals(Version.Occurred.CONCURRENTLY, v2.compare(hubInfo.getVersion()));
+        Assert.assertEquals(newOwner, hubInfo.getValue());
+
+        // delete existed owner info with bad version
+        toManager.deleteOwnerInfo(topic, v1, deleteCallback, null);
+        Assert.assertTrue(deleteCallback.queue.take().right() instanceof
+                          PubSubException.BadVersionException);
+
+        // read owner info
+        toManager.readOwnerInfo(topic, readCallback, null);
+        hubInfo = readCallback.queue.take().left();
+        Assert.assertEquals(Version.Occurred.CONCURRENTLY, v2.compare(hubInfo.getVersion()));
+
+        // delete existed owner info with right version
+        toManager.deleteOwnerInfo(topic, v2, deleteCallback, null);
+        Assert.assertEquals(null, deleteCallback.queue.take().right());
+
+        // Empty owner info
+        toManager.readOwnerInfo(topic, readCallback, null);
+        Assert.assertEquals(null, readCallback.queue.take().left());
+
+        // delete non-existed owner info
+        toManager.deleteOwnerInfo(topic, null, deleteCallback, null);
+        Assert.assertTrue(deleteCallback.queue.take().right() instanceof
+                          PubSubException.NoTopicOwnerInfoException);
+
+        toManager.close();
+    }
+
+    @Test
     public void testPersistenceInfo() throws Exception {
         TopicPersistenceManager tpManager = metadataManagerFactory.newTopicPersistenceManager();
 

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/meta/TestMetadataManagerFactory.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/meta/TestMetadataManagerFactory.java?rev=1369778&r1=1369777&r2=1369778&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/meta/TestMetadataManagerFactory.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/meta/TestMetadataManagerFactory.java Mon Aug  6 11:01:36 2012
@@ -85,6 +85,10 @@ public class TestMetadataManagerFactory 
         public SubscriptionDataManager newSubscriptionDataManager() {
             return null;
         }
+
+        public TopicOwnershipManager newTopicOwnershipManager() {
+            return null;
+        }
     }
 
     private void writeFactoryLayout(ServerConfiguration conf,

Added: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/topics/TestHubInfo.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/topics/TestHubInfo.java?rev=1369778&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/topics/TestHubInfo.java (added)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/topics/TestHubInfo.java Mon Aug  6 11:01:36 2012
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hedwig.server.topics;
+
+import org.apache.hedwig.util.HedwigSocketAddress;
+
+import org.junit.Test;
+import org.junit.Assert;
+
+public class TestHubInfo {
+
+    @Test
+    public void testParseHubInfo() throws Exception {
+        HedwigSocketAddress addr = new HedwigSocketAddress("localhost", 9086, 9087);
+        HubInfo hubInfo1 = new HubInfo(addr, 9999);
+
+        String strHubInfo1 = hubInfo1.toString();
+        HubInfo parsedHubInfo1 = HubInfo.parse(strHubInfo1);
+        Assert.assertEquals("Hub infos should be same", hubInfo1, parsedHubInfo1);
+
+        HubInfo hubInfo2 = new HubInfo(addr, 0);
+        HubInfo parsedHubInfo2 = HubInfo.parse("localhost:9086:9087");
+        Assert.assertEquals("Hub infos w/o zxid should be same", hubInfo2, parsedHubInfo2);
+
+        // parse empty string
+        try {
+            HubInfo.parse("");
+            Assert.fail("Should throw InvalidHubInfoException parsing empty string.");
+        } catch (HubInfo.InvalidHubInfoException ihie) {
+        }
+
+        // parse corrupted hostname
+        try {
+            HubInfo.parse("localhost,a,b,c");
+            Assert.fail("Should throw InvalidHubInfoException parsing corrupted hostname.");
+        } catch (HubInfo.InvalidHubInfoException ihie) {
+        }
+
+        // parse corrupted string
+        try {
+            HubInfo.parse("hostname: localhost:9086:9087");
+            Assert.fail("Should throw InvalidHubInfoException parsing corrupted string.");
+        } catch (HubInfo.InvalidHubInfoException ihie) {
+        }
+    }
+}

Added: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/topics/TestHubLoad.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/topics/TestHubLoad.java?rev=1369778&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/topics/TestHubLoad.java (added)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/topics/TestHubLoad.java Mon Aug  6 11:01:36 2012
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hedwig.server.topics;
+
+import org.junit.Test;
+import org.junit.Assert;
+
+public class TestHubLoad {
+
+    @Test
+    public void testParseHubLoad() throws Exception {
+        HubLoad hubLoad1 = new HubLoad(9999);
+
+        String strHubLoad1 = hubLoad1.toString();
+        HubLoad parsedHubLoad1 = HubLoad.parse(strHubLoad1);
+        Assert.assertEquals("Hub load data should be same", hubLoad1, parsedHubLoad1);
+
+        final int numTopics = 9998;
+        HubLoad hubLoad2 = new HubLoad(numTopics);
+        HubLoad parsedHubLoad2 = HubLoad.parse(numTopics + "");
+        Assert.assertEquals("Hub load data not protobuf encoded should be same", hubLoad2, parsedHubLoad2);
+
+        // parse empty string
+        try {
+            HubLoad.parse("");
+            Assert.fail("Should throw InvalidHubLoadException parsing empty string.");
+        } catch (HubLoad.InvalidHubLoadException ihie) {
+        }
+
+        // parse corrupted numTopics
+        try {
+            HubLoad.parse("9998_x");
+            Assert.fail("Should throw InvalidHubLoadException parsing corrupted hub load data.");
+        } catch (HubLoad.InvalidHubLoadException ihie) {
+        }
+
+        // parse corrupted string
+        try {
+            HubLoad.parse("hostname: 9998_x");
+            Assert.fail("Should throw InvalidHubLoadException parsing corrupted hub load data.");
+        } catch (HubLoad.InvalidHubLoadException ihie) {
+        }
+    }
+}

Copied: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/topics/TestMMTopicManager.java (from r1369767, zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/topics/TestZkTopicManager.java)
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/topics/TestMMTopicManager.java?p2=zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/topics/TestMMTopicManager.java&p1=zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/topics/TestZkTopicManager.java&r1=1369767&r2=1369778&rev=1369778&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/topics/TestZkTopicManager.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/topics/TestMMTopicManager.java Mon Aug  6 11:01:36 2012
@@ -22,14 +22,20 @@ import java.util.concurrent.ScheduledExe
 import java.util.concurrent.SynchronousQueue;
 
 import org.apache.zookeeper.KeeperException;
+import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
 import com.google.protobuf.ByteString;
+import org.apache.hedwig.StubCallback;
 import org.apache.hedwig.exceptions.PubSubException;
 import org.apache.hedwig.exceptions.PubSubException.CompositeException;
 import org.apache.hedwig.server.common.ServerConfiguration;
+import org.apache.hedwig.server.meta.MetadataManagerFactoryTestCase;
+import org.apache.hedwig.server.meta.TopicOwnershipManager;
+import org.apache.bookkeeper.versioning.Version;
+import org.apache.bookkeeper.versioning.Versioned;
 import org.apache.hedwig.util.Callback;
 import org.apache.hedwig.util.ConcurrencyUtils;
 import org.apache.hedwig.util.Either;
@@ -39,11 +45,12 @@ import org.apache.hedwig.zookeeper.ZooKe
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class TestZkTopicManager extends ZooKeeperTestBase {
+public class TestMMTopicManager extends MetadataManagerFactoryTestCase {
 
-    static Logger LOG = LoggerFactory.getLogger(TestZkTopicManager.class);
+    static Logger LOG = LoggerFactory.getLogger(TestMMTopicManager.class);
 
-    protected ZkTopicManager tm;
+    protected MMTopicManager tm;
+    protected TopicOwnershipManager tom;
 
     protected class CallbackQueue<T> implements Callback<T> {
         SynchronousQueue<Either<T, Exception>> q = new SynchronousQueue<Either<T, Exception>>();
@@ -83,18 +90,29 @@ public class TestZkTopicManager extends 
     protected CallbackQueue<Void> voidCbq = new CallbackQueue<Void>();
 
     protected ByteString topic = ByteString.copyFromUtf8("topic");
-    protected ServerConfiguration cfg;
     protected HedwigSocketAddress me;
     protected ScheduledExecutorService scheduler;
 
+    public TestMMTopicManager(String metaManagerCls) {
+        super(metaManagerCls);
+    }
+
     @Override
     @Before
     public void setUp() throws Exception {
         super.setUp();
-        cfg = new ServerConfiguration();
-        me = cfg.getServerAddr();
+        me = conf.getServerAddr();
         scheduler = Executors.newSingleThreadScheduledExecutor();
-        tm = new ZkTopicManager(zk, cfg, scheduler);
+        tom = metadataManagerFactory.newTopicOwnershipManager();
+        tm = new MMTopicManager(conf, zk, metadataManagerFactory, scheduler);
+    }
+
+    @Override
+    @After
+    public void tearDown() throws Exception {
+        tom.close();
+        tm.stop();
+        super.tearDown();
     }
 
     @Test
@@ -129,23 +147,14 @@ public class TestZkTopicManager extends 
 
     @Test
     public void testGetOwnerMulti() throws Exception {
-        ServerConfiguration cfg1 = new CustomServerConfiguration(cfg.getServerPort() + 1), cfg2 = new CustomServerConfiguration(
-            cfg.getServerPort() + 2);
-        // TODO change cfg1 cfg2 params
-        ZkTopicManager tm1 = new ZkTopicManager(zk, cfg1, scheduler), tm2 = new ZkTopicManager(zk, cfg2, scheduler);
+        ServerConfiguration conf1 = new CustomServerConfiguration(conf.getServerPort() + 1),
+                            conf2 = new CustomServerConfiguration(conf.getServerPort() + 2);
+        MMTopicManager tm1 = new MMTopicManager(conf1, zk, metadataManagerFactory, scheduler),
+                       tm2 = new MMTopicManager(conf2, zk, metadataManagerFactory, scheduler);
 
         tm.getOwner(topic, false, addrCbq, null);
         HedwigSocketAddress owner = check(addrCbq.take());
 
-        // If we were told to have another person claim the topic, make them
-        // claim the topic.
-        if (owner.getPort() == cfg1.getServerPort())
-            tm1.getOwner(topic, true, addrCbq, null);
-        else if (owner.getPort() == cfg2.getServerPort())
-            tm2.getOwner(topic, true, addrCbq, null);
-        if (owner.getPort() != cfg.getServerPort())
-            Assert.assertEquals(owner, check(addrCbq.take()));
-
         for (int i = 0; i < 100; ++i) {
             tm.getOwner(topic, false, addrCbq, null);
             Assert.assertEquals(owner, check(addrCbq.take()));
@@ -157,7 +166,6 @@ public class TestZkTopicManager extends 
             Assert.assertEquals(owner, check(addrCbq.take()));
         }
 
-        // Give us 100 chances to choose another owner if not shouldClaim.
         for (int i = 0; i < 100; ++i) {
             if (!owner.equals(me))
                 break;
@@ -167,11 +175,8 @@ public class TestZkTopicManager extends 
                 Assert.fail("Never chose another owner");
         }
 
-        // Make sure we always choose ourselves if shouldClaim.
-        for (int i = 0; i < 100; ++i) {
-            tm.getOwner(mkTopic(100), true, addrCbq, null);
-            Assert.assertEquals(me, check(addrCbq.take()));
-        }
+        tm1.stop();
+        tm2.stop();
     }
 
     @Test
@@ -180,13 +185,14 @@ public class TestZkTopicManager extends 
 
         Assert.assertEquals(me, check(addrCbq.take()));
 
-        ServerConfiguration cfg1 = new CustomServerConfiguration(cfg.getServerPort() + 1);
-        new ZkTopicManager(zk, cfg1, scheduler);
+        ServerConfiguration conf1 = new CustomServerConfiguration(conf.getServerPort() + 1);
+        TopicManager tm1 = new MMTopicManager(conf1, zk, metadataManagerFactory, scheduler);
 
         ByteString topic1 = mkTopic(1);
         tm.getOwner(topic1, false, addrCbq, null);
-        Assert.assertEquals(cfg1.getServerAddr(), check(addrCbq.take()));
+        Assert.assertEquals(conf1.getServerAddr(), check(addrCbq.take()));
 
+        tm1.stop();
     }
 
     class StubOwnershipChangeListener implements TopicOwnershipChangeListener {
@@ -277,17 +283,17 @@ public class TestZkTopicManager extends 
     }
 
     public void assertOwnershipNodeExists() throws Exception {
-        byte[] data = zk.getData(tm.hubPath(topic), false, null);
-        Assert.assertEquals(new HedwigSocketAddress(new String(data)), tm.addr);
+        StubCallback<Versioned<HubInfo>> callback = new StubCallback<Versioned<HubInfo>>();
+        tom.readOwnerInfo(topic, callback, null);
+        Versioned<HubInfo> hubInfo = callback.queue.take().left();
+        Assert.assertEquals(tm.addr, hubInfo.getValue().getAddress());
     }
 
     public void assertOwnershipNodeDoesntExist() throws Exception {
-        try {
-            zk.getData(tm.hubPath(topic), false, null);
-            Assert.assertTrue(false);
-        } catch (KeeperException e) {
-            Assert.assertEquals(e.code(), KeeperException.Code.NONODE);
-        }
+        StubCallback<Versioned<HubInfo>> callback = new StubCallback<Versioned<HubInfo>>();
+        tom.readOwnerInfo(topic, callback, null);
+        Versioned<HubInfo> hubInfo = callback.queue.take().left();
+        Assert.assertEquals(null, hubInfo);
     }
 
     @Test

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/topics/TestZkTopicManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/topics/TestZkTopicManager.java?rev=1369778&r1=1369777&r2=1369778&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/topics/TestZkTopicManager.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/topics/TestZkTopicManager.java Mon Aug  6 11:01:36 2012
@@ -22,6 +22,7 @@ import java.util.concurrent.ScheduledExe
 import java.util.concurrent.SynchronousQueue;
 
 import org.apache.zookeeper.KeeperException;
+import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -97,6 +98,13 @@ public class TestZkTopicManager extends 
         tm = new ZkTopicManager(zk, cfg, scheduler);
     }
 
+    @Override
+    @After
+    public void tearDown() throws Exception {
+        tm.stop();
+        super.tearDown();
+    }
+
     @Test
     public void testGetOwnerSingle() throws Exception {
         tm.getOwner(topic, false, addrCbq, null);
@@ -132,7 +140,8 @@ public class TestZkTopicManager extends 
         ServerConfiguration cfg1 = new CustomServerConfiguration(cfg.getServerPort() + 1), cfg2 = new CustomServerConfiguration(
             cfg.getServerPort() + 2);
         // TODO change cfg1 cfg2 params
-        ZkTopicManager tm1 = new ZkTopicManager(zk, cfg1, scheduler), tm2 = new ZkTopicManager(zk, cfg2, scheduler);
+        ZkTopicManager tm1 = new ZkTopicManager(zk, cfg1, scheduler),
+                       tm2 = new ZkTopicManager(zk, cfg2, scheduler);
 
         tm.getOwner(topic, false, addrCbq, null);
         HedwigSocketAddress owner = check(addrCbq.take());
@@ -172,6 +181,9 @@ public class TestZkTopicManager extends 
             tm.getOwner(mkTopic(100), true, addrCbq, null);
             Assert.assertEquals(me, check(addrCbq.take()));
         }
+
+        tm1.stop();
+        tm2.stop();
     }
 
     @Test
@@ -181,12 +193,13 @@ public class TestZkTopicManager extends 
         Assert.assertEquals(me, check(addrCbq.take()));
 
         ServerConfiguration cfg1 = new CustomServerConfiguration(cfg.getServerPort() + 1);
-        new ZkTopicManager(zk, cfg1, scheduler);
+        TopicManager tm1 = new ZkTopicManager(zk, cfg1, scheduler);
 
         ByteString topic1 = mkTopic(1);
         tm.getOwner(topic1, false, addrCbq, null);
         Assert.assertEquals(cfg1.getServerAddr(), check(addrCbq.take()));
 
+        tm1.stop();
     }
 
     class StubOwnershipChangeListener implements TopicOwnershipChangeListener {
@@ -278,7 +291,8 @@ public class TestZkTopicManager extends 
 
     public void assertOwnershipNodeExists() throws Exception {
         byte[] data = zk.getData(tm.hubPath(topic), false, null);
-        Assert.assertEquals(new HedwigSocketAddress(new String(data)), tm.addr);
+        Assert.assertEquals(HubInfo.parse(new String(data)).getAddress(),
+                            tm.addr);
     }
 
     public void assertOwnershipNodeDoesntExist() throws Exception {



Mime
View raw message