Return-Path: X-Original-To: apmail-bookkeeper-commits-archive@www.apache.org Delivered-To: apmail-bookkeeper-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 3AB081985D for ; Wed, 16 Mar 2016 03:44:15 +0000 (UTC) Received: (qmail 50636 invoked by uid 500); 16 Mar 2016 03:44:15 -0000 Delivered-To: apmail-bookkeeper-commits-archive@bookkeeper.apache.org Received: (qmail 50555 invoked by uid 500); 16 Mar 2016 03:44:14 -0000 Mailing-List: contact commits-help@bookkeeper.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: bookkeeper-dev@bookkeeper.apache.org Delivered-To: mailing list commits@bookkeeper.apache.org Received: (qmail 49845 invoked by uid 99); 16 Mar 2016 03:44:12 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 16 Mar 2016 03:44:12 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 25AC5E00C6; Wed, 16 Mar 2016 03:44:12 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sijie@apache.org To: commits@bookkeeper.apache.org Date: Wed, 16 Mar 2016 03:44:19 -0000 Message-Id: <48f2b90feaf84e24a46e4a66c77d6a5e@git.apache.org> In-Reply-To: <5d2e3d025bc84b8f8013a1ae9dbb9498@git.apache.org> References: <5d2e3d025bc84b8f8013a1ae9dbb9498@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [09/49] bookkeeper git commit: BOOKKEEPER-769: Remove the Hedwig Code http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/topics/HubInfo.java ---------------------------------------------------------------------- diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/topics/HubInfo.java b/hedwig-server/src/main/java/org/apache/hedwig/server/topics/HubInfo.java deleted file mode 100644 index 9a4cb3d..0000000 --- a/hedwig-server/src/main/java/org/apache/hedwig/server/topics/HubInfo.java +++ /dev/null @@ -1,162 +0,0 @@ -package org.apache.hedwig.server.topics; - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -import java.io.BufferedReader; -import java.io.IOException; -import java.io.StringReader; - -import org.apache.hedwig.protocol.PubSubProtocol.HubInfoData; -import org.apache.hedwig.util.HedwigSocketAddress; - -import com.google.protobuf.InvalidProtocolBufferException; -import com.google.protobuf.TextFormat; - -/** - * Info identifies a hub server. - */ -public class HubInfo { - - public static class InvalidHubInfoException extends Exception { - public InvalidHubInfoException(String msg) { - super(msg); - } - - public InvalidHubInfoException(String msg, Throwable t) { - super(msg, t); - } - } - - // address identify a hub server - final HedwigSocketAddress addr; - // its znode czxid - final long czxid; - // protobuf encoded hub info data to be serialized - HubInfoData hubInfoData; - - public HubInfo(HedwigSocketAddress addr, long czxid) { - this(addr, czxid, null); - } - - protected HubInfo(HedwigSocketAddress addr, long czxid, - HubInfoData data) { - this.addr = addr; - this.czxid = czxid; - this.hubInfoData = data; - } - - public HedwigSocketAddress getAddress() { - return addr; - } - - public long getZxid() { - return czxid; - } - - private synchronized HubInfoData getHubInfoData() { - if (null == hubInfoData) { - hubInfoData = HubInfoData.newBuilder().setHostname(addr.toString()) - .setCzxid(czxid).build(); - } - return hubInfoData; - } - - @Override - public String toString() { - return TextFormat.printToString(getHubInfoData()); - } - - @Override - public boolean equals(Object o) { - if (null == o) { - return false; - } - if (!(o instanceof HubInfo)) { - return false; - } - HubInfo other = (HubInfo)o; - if (null == addr) { - if (null == other.addr) { - return true; - } else { - return czxid == other.czxid; - } - } else { - if (addr.equals(other.addr)) { - return czxid == other.czxid; - } else { - return false; - } - } - } - - @Override - public int hashCode() { - return addr.hashCode(); - } - - /** - * Parse hub info from a string. - * - * @param hubInfoStr - * String representation of hub info - * @return hub info - * @throws InvalidHubInfoException when hubInfoStr is not a valid - * string representation of hub info. - */ - public static HubInfo parse(String hubInfoStr) throws InvalidHubInfoException { - // it is not protobuf encoded hub info, it might be generated by ZkTopicManager - if (!hubInfoStr.startsWith("hostname")) { - final HedwigSocketAddress owner; - try { - owner = new HedwigSocketAddress(hubInfoStr); - } catch (Exception e) { - throw new InvalidHubInfoException("Corrupted hub server address : " + hubInfoStr, e); - } - return new HubInfo(owner, 0L); - } - - // it is a protobuf encoded hub info. - HubInfoData hubInfoData; - - try { - BufferedReader reader = new BufferedReader( - new StringReader(hubInfoStr)); - HubInfoData.Builder dataBuilder = HubInfoData.newBuilder(); - TextFormat.merge(reader, dataBuilder); - hubInfoData = dataBuilder.build(); - } catch (InvalidProtocolBufferException ipbe) { - throw new InvalidHubInfoException("Corrupted hub info : " + hubInfoStr, ipbe); - } catch (IOException ie) { - throw new InvalidHubInfoException("Corrupted hub info : " + hubInfoStr, ie); - } - - final HedwigSocketAddress owner; - try { - owner = new HedwigSocketAddress(hubInfoData.getHostname().trim()); - } catch (Exception e) { - throw new InvalidHubInfoException("Corrupted hub server address : " + hubInfoData.getHostname(), e); - } - long ownerZxid = hubInfoData.getCzxid(); - return new HubInfo(owner, ownerZxid, hubInfoData); - } -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/topics/HubLoad.java ---------------------------------------------------------------------- diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/topics/HubLoad.java b/hedwig-server/src/main/java/org/apache/hedwig/server/topics/HubLoad.java deleted file mode 100644 index 2f76020..0000000 --- a/hedwig-server/src/main/java/org/apache/hedwig/server/topics/HubLoad.java +++ /dev/null @@ -1,137 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.hedwig.server.topics; - -import java.io.BufferedReader; -import java.io.IOException; -import java.io.StringReader; - -import org.apache.hedwig.protocol.PubSubProtocol.HubLoadData; - -import com.google.protobuf.InvalidProtocolBufferException; -import com.google.protobuf.TextFormat; - -/** - * This class encapsulates metrics for determining the load on a hub server. - */ -public class HubLoad implements Comparable { - - public static final HubLoad MAX_LOAD = new HubLoad(Long.MAX_VALUE); - public static final HubLoad MIN_LOAD = new HubLoad(0); - - public static class InvalidHubLoadException extends Exception { - private static final long serialVersionUID = 5870487176956413387L; - - public InvalidHubLoadException(String msg) { - super(msg); - } - - public InvalidHubLoadException(String msg, Throwable t) { - super(msg, t); - } - } - - // how many topics that a hub server serves - long numTopics; - - public HubLoad(long num) { - this.numTopics = num; - } - - public HubLoad(HubLoadData data) { - this.numTopics = data.getNumTopics(); - } - - // TODO: Make this threadsafe (BOOKKEEPER-379) - public HubLoad setNumTopics(long numTopics) { - this.numTopics = numTopics; - return this; - } - - public long getNumTopics() { - return this.numTopics; - } - - public HubLoadData toHubLoadData() { - return HubLoadData.newBuilder().setNumTopics(numTopics).build(); - } - - @Override - public String toString() { - return TextFormat.printToString(toHubLoadData()); - } - - @Override - public boolean equals(Object o) { - if (null == o || - !(o instanceof HubLoad)) { - return false; - } - return 0 == compareTo((HubLoad)o); - } - - @Override - public int compareTo(HubLoad other) { - return numTopics > other.numTopics ? - 1 : (numTopics < other.numTopics ? -1 : 0); - } - - @Override - public int hashCode() { - return (int)numTopics; - } - - /** - * Parse hub load from a string. - * - * @param hubLoadStr - * String representation of hub load - * @return hub load - * @throws InvalidHubLoadException when hubLoadStr is not a valid - * string representation of hub load. - */ - public static HubLoad parse(String hubLoadStr) throws InvalidHubLoadException { - // it is no protobuf encoded hub info, it might be generated by ZkTopicManager - if (!hubLoadStr.startsWith("numTopics")) { - try { - long numTopics = Long.parseLong(hubLoadStr, 10); - return new HubLoad(numTopics); - } catch (NumberFormatException nfe) { - throw new InvalidHubLoadException("Corrupted hub load data : " + hubLoadStr, nfe); - } - } - // it it a protobuf encoded hub load data. - HubLoadData hubLoadData; - try { - BufferedReader reader = new BufferedReader( - new StringReader(hubLoadStr)); - HubLoadData.Builder dataBuilder = HubLoadData.newBuilder(); - TextFormat.merge(reader, dataBuilder); - hubLoadData = dataBuilder.build(); - } catch (InvalidProtocolBufferException ipbe) { - throw new InvalidHubLoadException("Corrupted hub load data : " + hubLoadStr, ipbe); - } catch (IOException ie) { - throw new InvalidHubLoadException("Corrupted hub load data : " + hubLoadStr, ie); - } - - return new HubLoad(hubLoadData); - } -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/topics/HubServerManager.java ---------------------------------------------------------------------- diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/topics/HubServerManager.java b/hedwig-server/src/main/java/org/apache/hedwig/server/topics/HubServerManager.java deleted file mode 100644 index 12524c9..0000000 --- a/hedwig-server/src/main/java/org/apache/hedwig/server/topics/HubServerManager.java +++ /dev/null @@ -1,124 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.server.topics; - -import java.io.IOException; - -import org.apache.hedwig.util.Callback; - -/** - * The HubServerManager class manages info about hub servers. - */ -interface HubServerManager { - - static interface ManagerListener { - - /** - * Server manager is suspended if encountering some transient errors. - * {@link #onResume()} would be called if those errors could be fixed. - * {@link #onShutdown()} would be called if those errors could not be fixed. - */ - public void onSuspend(); - - /** - * Server manager is resumed after fixing some transient errors. - */ - public void onResume(); - - /** - * Server manager had to shutdown due to unrecoverable errors. - */ - public void onShutdown(); - } - - /** - * Register a listener to listen events of server manager - * - * @param listener - * Server Manager Listener - */ - public void registerListener(ManagerListener listener); - - /** - * Register itself to the cluster. - * - * @param selfLoad - * Self load data - * @param callback - * Callback when itself registered. - * @param ctx - * Callback context. - */ - public void registerSelf(HubLoad selfLoad, Callback callback, Object ctx); - - /** - * Unregister itself from the cluster. - */ - public void unregisterSelf() throws IOException; - - /** - * Uploading self server load data. - * - * It is an asynchrounous call which should not block other operations. - * Currently we don't need to care about whether it succeed or not. - * - * @param selfLoad - * Hub server load data. - */ - public void uploadSelfLoadData(HubLoad selfLoad); - - /** - * Check whether a hub server is alive as the id - * - * @param hub - * Hub id to identify a lifecycle of a hub server - * @param callback - * Callback of check result. If the hub server is still - * alive as the provided id hub, return true. - * Otherwise return false. - * @param ctx - * Callback context - */ - public void isHubAlive(HubInfo hub, Callback callback, Object ctx); - - /** - * Choose a least loaded hub server from available hub servers. - * - * @param callback - * Callback to return least loaded hub server. - * @param ctx - * Callback context. - */ - public void chooseLeastLoadedHub(Callback callback, Object ctx); - - /** - * Try to rebalance the load within the cluster. This function will get - * the {@link HubLoad} from all available hubs within the cluster, and then - * shed additional load. - * - * @param tolerancePercentage - * the percentage of load above average that is permissible. - * @param maxLoadToShed - * the maximum amount of load to shed per call. - * @param callback - * Callback indicating whether we reduced load or not. - * @param ctx - */ - public void rebalanceCluster(double tolerancePercentage, HubLoad maxLoadToShed, - Callback callback, Object ctx); -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/topics/MMTopicManager.java ---------------------------------------------------------------------- diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/topics/MMTopicManager.java b/hedwig-server/src/main/java/org/apache/hedwig/server/topics/MMTopicManager.java deleted file mode 100644 index 65cc9c4..0000000 --- a/hedwig-server/src/main/java/org/apache/hedwig/server/topics/MMTopicManager.java +++ /dev/null @@ -1,359 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.hedwig.server.topics; - -import java.net.UnknownHostException; -import java.io.IOException; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.SynchronousQueue; - -import org.apache.bookkeeper.versioning.Version; -import org.apache.bookkeeper.versioning.Versioned; -import org.apache.hedwig.exceptions.PubSubException; -import org.apache.hedwig.server.common.ServerConfiguration; -import org.apache.hedwig.server.meta.MetadataManagerFactory; -import org.apache.hedwig.server.meta.TopicOwnershipManager; -import org.apache.hedwig.util.Callback; -import org.apache.hedwig.util.ConcurrencyUtils; -import org.apache.hedwig.util.Either; -import org.apache.hedwig.util.HedwigSocketAddress; -import org.apache.zookeeper.ZooKeeper; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.protobuf.ByteString; -/** - * TopicOwnershipManager based topic manager - */ -public class MMTopicManager extends AbstractTopicManager implements TopicManager { - - private static final Logger logger = LoggerFactory.getLogger(MMTopicManager.class); - - // topic ownership manager - private final TopicOwnershipManager mm; - // hub server manager - private final HubServerManager hubManager; - - private final HubInfo myHubInfo; - private final HubLoad myHubLoad; - - // Boolean flag indicating if we should suspend activity. If this is true, - // all of the Ops put into the queuer will fail automatically. - protected volatile boolean isSuspended = false; - - public MMTopicManager(ServerConfiguration cfg, ZooKeeper zk, - MetadataManagerFactory mmFactory, - ScheduledExecutorService scheduler) - throws UnknownHostException, PubSubException { - super(cfg, scheduler); - // initialize topic ownership manager - this.mm = mmFactory.newTopicOwnershipManager(); - this.hubManager = new ZkHubServerManager(cfg, zk, addr, this); - - final SynchronousQueue> queue = - new SynchronousQueue>(); - - myHubLoad = new HubLoad(topics.size()); - this.hubManager.registerListener(new HubServerManager.ManagerListener() { - @Override - public void onSuspend() { - isSuspended = true; - } - @Override - public void onResume() { - isSuspended = false; - } - @Override - public void onShutdown() { - // if hub server manager can't work, we had to quit - Runtime.getRuntime().exit(1); - } - }); - this.hubManager.registerSelf(myHubLoad, new Callback() { - @Override - public void operationFinished(final Object ctx, final HubInfo resultOfOperation) { - logger.info("Successfully registered hub {} with zookeeper", resultOfOperation); - ConcurrencyUtils.put(queue, Either.of(resultOfOperation, (PubSubException) null)); - } - @Override - public void operationFailed(Object ctx, PubSubException exception) { - logger.error("Failed to register hub with zookeeper", exception); - ConcurrencyUtils.put(queue, Either.of((HubInfo)null, exception)); - } - }, null); - Either result = ConcurrencyUtils.take(queue); - PubSubException pse = result.right(); - if (pse != null) { - throw pse; - } - myHubInfo = result.left(); - logger.info("Start metadata manager based topic manager with hub id : " + myHubInfo); - } - - @Override - protected void realGetOwner(final ByteString topic, final boolean shouldClaim, - final Callback cb, final Object ctx) { - // If operations are suspended due to a ZK client disconnect, just error - // out this call and return. - if (isSuspended) { - cb.operationFailed(ctx, new PubSubException.ServiceDownException( - "MMTopicManager service is temporarily suspended!")); - return; - } - - TopicStats stats = topics.getIfPresent(topic); - if (null != stats) { - cb.operationFinished(ctx, addr); - return; - } - - new MMGetOwnerOp(topic, cb, ctx).read(); - } - - /** - * MetadataManager do topic ledger election using versioned writes. - */ - class MMGetOwnerOp { - ByteString topic; - Callback cb; - Object ctx; - - public MMGetOwnerOp(ByteString topic, - Callback cb, Object ctx) { - this.topic = topic; - this.cb = cb; - this.ctx = ctx; - } - - protected void read() { - mm.readOwnerInfo(topic, new Callback>() { - @Override - public void operationFinished(final Object ctx, final Versioned owner) { - if (null == owner) { - logger.info("{} : No owner found for topic {}", - new Object[] { addr, topic.toStringUtf8() }); - // no data found - choose(Version.NEW); - return; - } - final Version ownerVersion = owner.getVersion(); - if (null == owner.getValue()) { - logger.info("{} : Invalid owner found for topic {}", - new Object[] { addr, topic.toStringUtf8() }); - choose(ownerVersion); - return; - } - final HubInfo hub = owner.getValue(); - logger.info("{} : Read owner of topic {} : {}", - new Object[] { addr, topic.toStringUtf8(), hub }); - - logger.info("{}, {}", new Object[] { hub, myHubInfo }); - - if (hub.getAddress().equals(addr)) { - if (myHubInfo.getZxid() == hub.getZxid()) { - claimTopic(ctx); - return; - } else { - choose(ownerVersion); - return; - } - } - - logger.info("{} : Check whether owner {} for topic {} is still alive.", - new Object[] { addr, hub, topic.toStringUtf8() }); - hubManager.isHubAlive(hub, new Callback() { - @Override - public void operationFinished(Object ctx, Boolean isAlive) { - if (isAlive) { - cb.operationFinished(ctx, hub.getAddress()); - } else { - choose(ownerVersion); - } - } - @Override - public void operationFailed(Object ctx, PubSubException pse) { - cb.operationFailed(ctx, pse); - } - }, ctx); - } - - @Override - public void operationFailed(Object ctx, PubSubException exception) { - cb.operationFailed(ctx, new PubSubException.ServiceDownException( - "Could not read ownership for topic " + topic.toStringUtf8() + " : " - + exception.getMessage())); - } - }, ctx); - } - - public void claim(final Version prevOwnerVersion) { - logger.info("{} : claiming topic {} 's owner to be {}", - new Object[] { addr, topic.toStringUtf8(), myHubInfo }); - mm.writeOwnerInfo(topic, myHubInfo, prevOwnerVersion, new Callback() { - @Override - public void operationFinished(Object ctx, Version newVersion) { - claimTopic(ctx); - } - @Override - public void operationFailed(Object ctx, PubSubException exception) { - if (exception instanceof PubSubException.NoTopicOwnerInfoException || - exception instanceof PubSubException.BadVersionException) { - // some one has updated the owner - logger.info("{} : Some one has claimed topic {} 's owner. Try to read the owner again.", - new Object[] { addr, topic.toStringUtf8() }); - read(); - return; - } - cb.operationFailed(ctx, new PubSubException.ServiceDownException( - "Exception when writing owner info to claim ownership of topic " - + topic.toStringUtf8() + " : " + exception.getMessage())); - } - }, ctx); - } - - protected void claimTopic(Object ctx) { - logger.info("{} : claimed topic {} 's owner to be {}", - new Object[] { addr, topic.toStringUtf8(), myHubInfo }); - notifyListenersAndAddToOwnedTopics(topic, cb, ctx); - hubManager.uploadSelfLoadData(myHubLoad.setNumTopics(topics.size())); - } - - public void choose(final Version prevOwnerVersion) { - hubManager.chooseLeastLoadedHub(new Callback() { - @Override - public void operationFinished(Object ctx, HubInfo owner) { - logger.info("{} : Least loaded owner {} is chosen for topic {}", - new Object[] { addr, owner, topic.toStringUtf8() }); - if (owner.getAddress().equals(addr)) { - claim(prevOwnerVersion); - } else { - setOwner(owner, prevOwnerVersion); - } - } - @Override - public void operationFailed(Object ctx, PubSubException pse) { - logger.error("Failed to choose least loaded hub server for topic " - + topic.toStringUtf8() + " : ", pse); - cb.operationFailed(ctx, pse); - } - }, null); - } - - public void setOwner(final HubInfo ownerHubInfo, final Version prevOwnerVersion) { - logger.info("{} : setting topic {} 's owner to be {}", - new Object[] { addr, topic.toStringUtf8(), ownerHubInfo }); - mm.writeOwnerInfo(topic, ownerHubInfo, prevOwnerVersion, new Callback() { - @Override - public void operationFinished(Object ctx, Version newVersion) { - logger.info("{} : Set topic {} 's owner to be {}", - new Object[] { addr, topic.toStringUtf8(), ownerHubInfo }); - cb.operationFinished(ctx, ownerHubInfo.getAddress()); - } - @Override - public void operationFailed(Object ctx, PubSubException exception) { - if (exception instanceof PubSubException.NoTopicOwnerInfoException || - exception instanceof PubSubException.BadVersionException) { - // some one has updated the owner - logger.info("{} : Some one has set topic {} 's owner. Try to read the owner again.", - new Object[] { addr, topic.toStringUtf8() }); - read(); - return; - } - cb.operationFailed(ctx, new PubSubException.ServiceDownException( - "Exception when writing owner info to claim ownership of topic " - + topic.toStringUtf8() + " : " + exception.getMessage())); - } - }, ctx); - } - } - - @Override - protected void postReleaseCleanup(final ByteString topic, - final Callback cb, final Object ctx) { - - // Reduce load. We've removed the topic from our topic set, so do this as well. - // When we reclaim the topic, we will increment the load again. - hubManager.uploadSelfLoadData(myHubLoad.setNumTopics(topics.size())); - - mm.readOwnerInfo(topic, new Callback>() { - @Override - public void operationFinished(Object ctx, Versioned owner) { - if (null == owner) { - // Node has somehow disappeared from under us, live with it - logger.warn("No owner info found when cleaning up topic " + topic.toStringUtf8()); - cb.operationFinished(ctx, null); - return; - } - // no valid hub info found, just return - if (null == owner.getValue()) { - logger.warn("No valid owner info found when cleaning up topic " + topic.toStringUtf8()); - cb.operationFinished(ctx, null); - return; - } - HedwigSocketAddress ownerAddr = owner.getValue().getAddress(); - if (!ownerAddr.equals(addr)) { - logger.warn("Wanted to clean up self owner info for topic " + topic.toStringUtf8() - + " but owner " + owner + " found, leaving untouched"); - // Not our node, someone else's, leave it alone - cb.operationFinished(ctx, null); - return; - } - - mm.deleteOwnerInfo(topic, owner.getVersion(), new Callback() { - @Override - public void operationFinished(Object ctx, Void result) { - cb.operationFinished(ctx, null); - } - @Override - public void operationFailed(Object ctx, PubSubException exception) { - if (exception instanceof PubSubException.NoTopicOwnerInfoException) { - logger.warn("Wanted to clean up self owner info for topic " + topic.toStringUtf8() - + " but it has been removed."); - cb.operationFinished(ctx, null); - return; - } - logger.error("Exception when deleting self-ownership metadata for topic " - + topic.toStringUtf8() + " : ", exception); - cb.operationFailed(ctx, new PubSubException.ServiceDownException(exception)); - } - }, ctx); - } - @Override - public void operationFailed(Object ctx, PubSubException exception) { - logger.error("Exception when cleaning up owner info of topic " + topic.toStringUtf8() + " : ", exception); - cb.operationFailed(ctx, new PubSubException.ServiceDownException(exception)); - } - }, ctx); - } - - @Override - public void stop() { - // we just unregister it with zookeeper to make it unavailable from hub servers list - try { - hubManager.unregisterSelf(); - } catch (IOException e) { - logger.error("Error unregistering hub server " + myHubInfo + " : ", e); - } - super.stop(); - } - -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/topics/TopicBasedLoadShedder.java ---------------------------------------------------------------------- diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/topics/TopicBasedLoadShedder.java b/hedwig-server/src/main/java/org/apache/hedwig/server/topics/TopicBasedLoadShedder.java deleted file mode 100644 index 2a0dcc0..0000000 --- a/hedwig-server/src/main/java/org/apache/hedwig/server/topics/TopicBasedLoadShedder.java +++ /dev/null @@ -1,151 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.server.topics; - -import java.util.List; -import java.util.Map; - -import org.apache.hedwig.exceptions.PubSubException; -import org.apache.hedwig.util.Callback; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.protobuf.ByteString; - -/** - * Shed load by releasing topics. - */ -public class TopicBasedLoadShedder { - private static final Logger logger = LoggerFactory.getLogger(TopicBasedLoadShedder.class); - private final double tolerancePercentage; - private final long maxLoadToShed; - private final TopicManager tm; - private final List topicList; - - /** - * @param tm The topic manager used to handle load shedding - * @param tolerancePercentage The tolerance percentage for shedding load - * @param maxLoadToShed The maximum amoung of load to shed in one call. - */ - public TopicBasedLoadShedder(TopicManager tm, double tolerancePercentage, - HubLoad maxLoadToShed) { - // Make sure that all functions in this class have a consistent view - // of the load. So, we use the same topic list throughout. - this(tm, tm.getTopicList(), tolerancePercentage, maxLoadToShed); - } - - /** - * This is public because it makes testing easier. - * @param tm The topic manager used to handle load shedding - * @param topicList The topic list representing topics owned by this hub. - * @param tolerancePercentage The tolerance percentage for shedding load - * @param maxLoadToShed The maximum amoung of load to shed in one call. - */ - TopicBasedLoadShedder(TopicManager tm, List topicList, - double tolerancePercentage, - HubLoad maxLoadToShed) { - this.tolerancePercentage = tolerancePercentage; - this.maxLoadToShed = maxLoadToShed.getNumTopics(); - this.tm = tm; - this.topicList = topicList; - } - - /** - * Reduce the load on the current hub so that it reaches the target load. - * We reduce load by releasing topics using the {@link TopicManager} passed - * to the constructor. We use {@link TopicManager#releaseTopics(int, org.apache.hedwig.util.Callback, Object)} - * to actually release topics. - * - * @param targetLoad - * @param callback - * a Callback that indicates how many topics we tried to release. - * @param ctx - */ - public void reduceLoadTo(HubLoad targetLoad, final Callback callback, final Object ctx) { - int targetTopics = (int)targetLoad.toHubLoadData().getNumTopics(); - int numTopicsToRelease = topicList.size() - targetTopics; - - // The number of topics we own is less than the target topic size. We don't release - // any topics in this case. - if (numTopicsToRelease <= 0) { - callback.operationFinished(ctx, 0L); - return; - } - // Call releaseTopics() on the topic manager to do this. We let the manager handle the release - // policy. - tm.releaseTopics(numTopicsToRelease, callback, ctx); - } - - /** - * Calculate the average number of topics on the currently active hubs and release topics - * if required. - * We shed topics if we currently hold topics greater than average + average * tolerancePercentage/100.0 - * We shed a maximum of maxLoadToShed topics - * We also hold on to at least one topic. - * @param loadMap - * @param callback - * A return value of true means we tried to rebalance. False means that there was - * no need to rebalance. - * @param ctx - */ - public void shedLoad(final Map loadMap, final Callback callback, - final Object ctx) { - - long totalTopics = 0L; - long myTopics = topicList.size(); - for (Map.Entry entry : loadMap.entrySet()) { - if (null == entry.getKey() || null == entry.getValue()) { - continue; - } - totalTopics += entry.getValue().toHubLoadData().getNumTopics(); - } - - double averageTopics = (double)totalTopics/loadMap.size(); - logger.info("Total topics in the cluster : {}. Average : {}.", totalTopics, averageTopics); - - // Handle the case when averageTopics == 0. We hold on to at least 1 topic. - long permissibleTopics = - Math.max(1L, (long) Math.ceil(averageTopics + averageTopics * tolerancePercentage / 100.0)); - logger.info("Permissible topics : {}. Number of topics this hub holds : {}.", permissibleTopics, myTopics); - if (myTopics <= permissibleTopics) { - // My owned topics are less than those permitted by the current tolerance level. No need to release - // any topics. - callback.operationFinished(ctx, false); - return; - } - - // The number of topics I own is more than what I should be holding. We shall now attempt to shed some load. - // We shed at most maxLoadToShed number of topics. We also hold on to at least 1 topic. - long targetNumTopics = Math.max(1L, Math.max((long)Math.ceil(averageTopics), myTopics - maxLoadToShed)); - - // Reduce the load on the current hub to the target load we calculated above. - logger.info("Reducing load on this hub to {} topics.", targetNumTopics); - reduceLoadTo(new HubLoad(targetNumTopics), new Callback() { - @Override - public void operationFinished(Object ctx, Long numReleased) { - logger.info("Released {} topics to shed load.", numReleased); - callback.operationFinished(ctx, true); - } - - @Override - public void operationFailed(Object ctx, PubSubException e) { - callback.operationFailed(ctx, e); - } - }, ctx); - } -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/topics/TopicManager.java ---------------------------------------------------------------------- diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/topics/TopicManager.java b/hedwig-server/src/main/java/org/apache/hedwig/server/topics/TopicManager.java deleted file mode 100644 index 4ed2e59..0000000 --- a/hedwig-server/src/main/java/org/apache/hedwig/server/topics/TopicManager.java +++ /dev/null @@ -1,107 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.server.topics; - -import com.google.protobuf.ByteString; -import org.apache.hedwig.exceptions.PubSubException.ServiceDownException; -import org.apache.hedwig.server.persistence.PersistenceManager; -import org.apache.hedwig.util.Callback; -import org.apache.hedwig.util.HedwigSocketAddress; - -import java.util.List; - -/** - * An implementor of this interface is basically responsible for ensuring that - * there is at most a single host responsible for a given topic at a given time. - * Also, it is desirable that on a host failure, some other hosts in the cluster - * claim responsibilities for the topics that were at the failed host. On - * claiming responsibility for a topic, a host should call its - * {@link TopicOwnershipChangeListener}. - * - */ - -public interface TopicManager { - /** - * Get the name of the host responsible for the given topic. - * - * @param topic - * The topic whose owner to get. - * @param cb - * Callback. - * @return The name of host responsible for the given topic - * @throws ServiceDownException - * If there is an error looking up the information - */ - public void getOwner(ByteString topic, boolean shouldClaim, - Callback cb, Object ctx); - - /** - * Increment the number of access times for a given topic. - * - * @param topic - * Topic Name. - */ - public void incrementTopicAccessTimes(ByteString topic); - - /** - * Whenever the topic manager finds out that the set of topics owned by this - * node has changed, it can notify a set of - * {@link TopicOwnershipChangeListener} objects. Any component of the system - * (e.g., the {@link PersistenceManager}) can listen for such changes by - * implementing the {@link TopicOwnershipChangeListener} interface and - * registering themselves with the {@link TopicManager} using this method. - * It is important that the {@link TopicOwnershipChangeListener} reacts - * immediately to such notifications, and with no blocking (because multiple - * listeners might need to be informed and they are all informed by the same - * thread). - * - * @param listener - */ - public void addTopicOwnershipChangeListener(TopicOwnershipChangeListener listener); - - /** - * Give up ownership of a topic. If I don't own it, do nothing. - * - * @throws ServiceDownException - * If there is an error in claiming responsibility for the topic - */ - public void releaseTopic(ByteString topic, Callback cb, Object ctx); - - /** - * Release numTopics topics. If you hold fewer, release all. - * @param numTopics - * Number of topics to release. - * @param callback - * The callback should be invoked with the number of topics the hub - * released successfully. - * @param ctx - */ - public void releaseTopics(int numTopics, Callback callback, Object ctx); - - /** - * Get the list of topics this hub believes it is responsible for. - * @return - */ - public List getTopicList(); - - /** - * Stop topic manager - */ - public void stop(); - -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/topics/TopicOwnershipChangeListener.java ---------------------------------------------------------------------- diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/topics/TopicOwnershipChangeListener.java b/hedwig-server/src/main/java/org/apache/hedwig/server/topics/TopicOwnershipChangeListener.java deleted file mode 100644 index b0fe2c9..0000000 --- a/hedwig-server/src/main/java/org/apache/hedwig/server/topics/TopicOwnershipChangeListener.java +++ /dev/null @@ -1,28 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.server.topics; - -import com.google.protobuf.ByteString; -import org.apache.hedwig.util.Callback; - -public interface TopicOwnershipChangeListener { - - public void acquiredTopic(ByteString topic, Callback callback, Object ctx); - - public void lostTopic(ByteString topic); -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/topics/TrivialOwnAllTopicManager.java ---------------------------------------------------------------------- diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/topics/TrivialOwnAllTopicManager.java b/hedwig-server/src/main/java/org/apache/hedwig/server/topics/TrivialOwnAllTopicManager.java deleted file mode 100644 index 6b3a417..0000000 --- a/hedwig-server/src/main/java/org/apache/hedwig/server/topics/TrivialOwnAllTopicManager.java +++ /dev/null @@ -1,58 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.server.topics; - -import java.net.UnknownHostException; -import java.util.concurrent.ScheduledExecutorService; - -import com.google.protobuf.ByteString; -import org.apache.hedwig.server.common.ServerConfiguration; -import org.apache.hedwig.util.Callback; -import org.apache.hedwig.util.HedwigSocketAddress; - -public class TrivialOwnAllTopicManager extends AbstractTopicManager { - - public TrivialOwnAllTopicManager(ServerConfiguration cfg, ScheduledExecutorService scheduler) - throws UnknownHostException { - super(cfg, scheduler); - } - - @Override - protected void realGetOwner(ByteString topic, boolean shouldClaim, - Callback cb, Object ctx) { - - TopicStats stats = topics.getIfPresent(topic); - if (null != stats) { - cb.operationFinished(ctx, addr); - return; - } - - notifyListenersAndAddToOwnedTopics(topic, cb, ctx); - } - - @Override - protected void postReleaseCleanup(ByteString topic, Callback cb, Object ctx) { - // No cleanup to do - cb.operationFinished(ctx, null); - } - - @Override - public void stop() { - // do nothing - } -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/topics/ZkHubServerManager.java ---------------------------------------------------------------------- diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/topics/ZkHubServerManager.java b/hedwig-server/src/main/java/org/apache/hedwig/server/topics/ZkHubServerManager.java deleted file mode 100644 index 9651058..0000000 --- a/hedwig-server/src/main/java/org/apache/hedwig/server/topics/ZkHubServerManager.java +++ /dev/null @@ -1,470 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.server.topics; - -import static com.google.common.base.Charsets.UTF_8; - -import java.io.IOException; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Random; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; - -import org.apache.hedwig.exceptions.PubSubException; -import org.apache.hedwig.server.common.ServerConfiguration; -import org.apache.hedwig.util.Callback; -import org.apache.hedwig.util.HedwigSocketAddress; -import org.apache.hedwig.zookeeper.SafeAsyncZKCallback; -import org.apache.hedwig.zookeeper.SafeAsyncZKCallback.StatCallback; -import org.apache.hedwig.zookeeper.ZkUtils; -import org.apache.zookeeper.CreateMode; -import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.KeeperException.Code; -import org.apache.zookeeper.WatchedEvent; -import org.apache.zookeeper.Watcher; -import org.apache.zookeeper.ZooDefs.Ids; -import org.apache.zookeeper.ZooKeeper; -import org.apache.zookeeper.data.Stat; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * ZooKeeper based hub server manager. - */ -class ZkHubServerManager implements HubServerManager { - - static Logger logger = LoggerFactory.getLogger(ZkHubServerManager.class); - - final Random rand = new Random(); - - private final ServerConfiguration conf; - private final ZooKeeper zk; - private final HedwigSocketAddress addr; - private final TopicManager tm; - private final String ephemeralNodePath; - private final String hubNodesPath; - - // hub info structure represent itself - protected HubInfo myHubInfo; - protected volatile boolean isSuspended = false; - protected ManagerListener listener = null; - protected final ScheduledExecutorService executor; - - // upload hub server load to zookeeper - StatCallback loadReportingStatCallback = new StatCallback() { - @Override - public void safeProcessResult(int rc, String path, Object ctx, Stat stat) { - if (rc != KeeperException.Code.OK.intValue()) { - logger.warn("Failed to update load information of hub {} in zk", myHubInfo); - } - } - }; - - /** - * Watcher to monitor available hub server list. - */ - class ZkHubsWatcher implements Watcher { - @Override - public void process(WatchedEvent event) { - if (event.getType().equals(Watcher.Event.EventType.None)) { - if (event.getState().equals( - Watcher.Event.KeeperState.Disconnected)) { - logger.warn("ZK client has been disconnected to the ZK server!"); - isSuspended = true; - if (null != listener) { - listener.onSuspend(); - } - } else if (event.getState().equals( - Watcher.Event.KeeperState.SyncConnected)) { - if (isSuspended) { - logger.info("ZK client has been reconnected to the ZK server!"); - } - isSuspended = false; - if (null != listener) { - listener.onResume(); - } - } - } - if (event.getState().equals(Watcher.Event.KeeperState.Expired)) { - logger.error("ZK client connection to the ZK server has expired.!"); - if (null != listener) { - // Shutdown our executor NOW! - executor.shutdownNow(); - listener.onShutdown(); - } - } - } - } - - class RebalanceRunnable implements Runnable { - private final double tolerancePercentage; - private final HubLoad maxLoadToShed; - private final long delaySeconds; - - public RebalanceRunnable(double tolerancePercentage, - HubLoad maxLoadToShed, - long delaySeconds) { - this.tolerancePercentage = tolerancePercentage; - this.maxLoadToShed = maxLoadToShed; - this.delaySeconds = delaySeconds; - } - - @Override - public void run() { - // If we are in suspended state, don't attempt a rebalance. - if (isSuspended) { - executor.schedule(this, delaySeconds, TimeUnit.SECONDS); - return; - } - // We should attempt a rebalance. We reschedule the job at the tail so that - // two rebalances don't happen simultaneously. - rebalanceCluster(tolerancePercentage, maxLoadToShed, new Callback() { - private void reschedule(Runnable task) { - executor.schedule(task, delaySeconds, TimeUnit.SECONDS); - } - - @Override - public void operationFinished(Object ctx, Boolean didRebalance) { - if (didRebalance == true) { - logger.info("The attempt to rebalance the cluster was successful"); - } else { - logger.info("There was no need to rebalance."); - } - // Our original runnable was passed as the context. - reschedule((Runnable)ctx); - } - - @Override - public void operationFailed(Object ctx, PubSubException e) { - logger.error("The attempt to rebalance the cluster did not succeed.", e); - // Reschedule the job - reschedule((Runnable)ctx); - } - }, this); - } - - public void start() { - // Initiate only if delaySeconds > 0 - if (delaySeconds > 0) { - logger.info("Starting the rebalancer thread with tolerance={}, maxLoadToShed={} and delay={}", - new Object[] { tolerancePercentage, maxLoadToShed.getNumTopics(), delaySeconds }); - executor.schedule(this, delaySeconds, TimeUnit.SECONDS); - } - } - } - - public ZkHubServerManager(ServerConfiguration conf, - ZooKeeper zk, - HedwigSocketAddress addr, - TopicManager tm) { - this.conf = conf; - this.zk = zk; - this.addr = addr; - this.tm = tm; - // znode path to store all available hub servers - this.hubNodesPath = this.conf.getZkHostsPrefix(new StringBuilder()).toString(); - // the node's ephemeral node path - this.ephemeralNodePath = getHubZkNodePath(addr); - this.executor = Executors.newSingleThreadScheduledExecutor(); - // register available hub servers list watcher - zk.register(new ZkHubsWatcher()); - - // Start the rebalancer here. - new RebalanceRunnable(conf.getRebalanceTolerance(), conf.getRebalanceMaxShed(), - conf.getRebalanceInterval()).start(); - } - - @Override - public void registerListener(ManagerListener listener) { - this.listener = listener; - } - - /** - * Get the znode path identifying a hub server. - * - * @param node - * Hub Server Address - * @return znode path identifying the hub server. - */ - private String getHubZkNodePath(HedwigSocketAddress node) { - String nodePath = this.conf.getZkHostsPrefix(new StringBuilder()) - .append("/").append(node).toString(); - return nodePath; - } - - @Override - public void registerSelf(final HubLoad selfData, final Callback callback, Object ctx) { - byte[] loadDataBytes = selfData.toString().getBytes(UTF_8); - ZkUtils.createFullPathOptimistic(zk, ephemeralNodePath, loadDataBytes, Ids.OPEN_ACL_UNSAFE, - CreateMode.EPHEMERAL, new SafeAsyncZKCallback.StringCallback() { - @Override - public void safeProcessResult(int rc, String path, Object ctx, String name) { - if (rc == Code.OK.intValue()) { - // now we are here - zk.exists(ephemeralNodePath, false, new SafeAsyncZKCallback.StatCallback() { - @Override - public void safeProcessResult(int rc, String path, Object ctx, Stat stat) { - if (rc == Code.OK.intValue()) { - myHubInfo = new HubInfo(addr, stat.getCzxid()); - callback.operationFinished(ctx, myHubInfo); - return; - } else { - callback.operationFailed(ctx, - new PubSubException.ServiceDownException( - "I can't state my hub node after I created it : " - + ephemeralNodePath)); - return; - } - } - }, ctx); - return; - } - if (rc != Code.NODEEXISTS.intValue()) { - KeeperException ke = ZkUtils.logErrorAndCreateZKException( - "Could not create ephemeral node to register hub", ephemeralNodePath, rc); - callback.operationFailed(ctx, new PubSubException.ServiceDownException(ke)); - return; - } - - logger.info("Found stale ephemeral node while registering hub with ZK, deleting it"); - - // Node exists, lets try to delete it and retry - zk.delete(ephemeralNodePath, -1, new SafeAsyncZKCallback.VoidCallback() { - @Override - public void safeProcessResult(int rc, String path, Object ctx) { - if (rc == Code.OK.intValue() || rc == Code.NONODE.intValue()) { - registerSelf(selfData, callback, ctx); - return; - } - KeeperException ke = ZkUtils.logErrorAndCreateZKException( - "Could not delete stale ephemeral node to register hub", ephemeralNodePath, rc); - callback.operationFailed(ctx, new PubSubException.ServiceDownException(ke)); - return; - } - }, ctx); - } - }, ctx); - } - - @Override - public void unregisterSelf() throws IOException { - try { - zk.delete(ephemeralNodePath, -1); - } catch (InterruptedException e) { - throw new IOException(e); - } catch (KeeperException e) { - throw new IOException(e); - } - } - - - @Override - public void uploadSelfLoadData(HubLoad selfLoad) { - logger.debug("Reporting hub load of {} : {}", myHubInfo, selfLoad); - byte[] loadDataBytes = selfLoad.toString().getBytes(UTF_8); - zk.setData(ephemeralNodePath, loadDataBytes, -1, - loadReportingStatCallback, null); - } - - @Override - public void isHubAlive(final HubInfo hub, final Callback callback, Object ctx) { - zk.exists(getHubZkNodePath(hub.getAddress()), false, new SafeAsyncZKCallback.StatCallback() { - @Override - public void safeProcessResult(int rc, String path, Object ctx, Stat stat) { - if (rc == Code.NONODE.intValue()) { - callback.operationFinished(ctx, false); - } else if (rc == Code.OK.intValue()) { - if (hub.getZxid() == stat.getCzxid()) { - callback.operationFinished(ctx, true); - } else { - callback.operationFinished(ctx, false); - } - } else { - callback.operationFailed(ctx, new PubSubException.ServiceDownException( - "Failed to check whether hub server " + hub + " is alive!")); - } - } - }, ctx); - } - - @Override - public void chooseLeastLoadedHub(final Callback callback, Object ctx) { - // Get the list of existing hosts - zk.getChildren(hubNodesPath, false, new SafeAsyncZKCallback.ChildrenCallback() { - @Override - public void safeProcessResult(int rc, String path, Object ctx, - List children) { - if (rc != Code.OK.intValue()) { - KeeperException e = ZkUtils.logErrorAndCreateZKException( - "Could not get list of available hubs", path, rc); - callback.operationFailed(ctx, new PubSubException.ServiceDownException(e)); - return; - } - chooseLeastLoadedNode(children, callback, ctx); - } - }, ctx); - } - - private void chooseLeastLoadedNode(final List children, - final Callback callback, Object ctx) { - SafeAsyncZKCallback.DataCallback dataCallback = new SafeAsyncZKCallback.DataCallback() { - int numResponses = 0; - HubLoad minLoad = HubLoad.MAX_LOAD; - String leastLoaded = null; - long leastLoadedCzxid = 0; - - @Override - public void safeProcessResult(int rc, String path, Object ctx, - byte[] data, Stat stat) { - synchronized (this) { - if (rc == KeeperException.Code.OK.intValue()) { - try { - HubLoad load = HubLoad.parse(new String(data, UTF_8)); - logger.debug("Found server {} with load: {}", ctx, load); - int compareRes = load.compareTo(minLoad); - if (compareRes < 0 || (compareRes == 0 && rand.nextBoolean())) { - minLoad = load; - leastLoaded = (String) ctx; - leastLoadedCzxid = stat.getCzxid(); - } - } catch (HubLoad.InvalidHubLoadException e) { - logger.warn("Corrupted load information from hub : " + ctx); - // some corrupted data, we'll just ignore this hub - } - } - numResponses++; - - if (numResponses == children.size()) { - if (leastLoaded == null) { - callback.operationFailed(ctx, - new PubSubException.ServiceDownException("No hub available")); - return; - } - try { - HedwigSocketAddress owner = new HedwigSocketAddress(leastLoaded); - callback.operationFinished(ctx, new HubInfo(owner, leastLoadedCzxid)); - } catch (Throwable t) { - callback.operationFailed(ctx, - new PubSubException.ServiceDownException("Least loaded hub server " - + leastLoaded + " is invalid.")); - } - } - } - } - }; - - for (String child : children) { - zk.getData(conf.getZkHostsPrefix(new StringBuilder()).append("/").append(child).toString(), false, - dataCallback, child); - } - } - - /** - * Get a map of all currently active hubs with their advertised load. - * @param callback - * @param originalCtx - */ - private void getActiveHubsInfoWithLoad(final Callback> callback, - final Object originalCtx) { - // Get the list of children and then for each child, get the data. All asynchronously. - zk.getChildren(hubNodesPath, false, new SafeAsyncZKCallback.ChildrenCallback() { - @Override - public void safeProcessResult(int rc, String path, Object ctx, final List children) { - if (rc != Code.OK.intValue()) { - KeeperException e = ZkUtils.logErrorAndCreateZKException( - "Could not get children for given path", path, rc); - callback.operationFailed(ctx, new PubSubException.ServiceDownException(e)); - return; - } - - // The data callback for every child node - SafeAsyncZKCallback.DataCallback dataCallback = new SafeAsyncZKCallback.DataCallback() { - Map loadMap = new HashMap(); - int numResponse = 0; - @Override - public void safeProcessResult(int rc, String path, Object dataCtx, - byte[] data, Stat stat) { - synchronized (this) { - if (rc == Code.OK.intValue()) { - // Put this load in the map. dataCtx is actually the child string which is the - // IP:PORT:SSL representation of the hub. - try { - HubInfo hubInfo = - new HubInfo(new HedwigSocketAddress((String)dataCtx), stat.getCzxid()); - HubLoad hubLoad = HubLoad.parse(new String(data, UTF_8)); - this.loadMap.put(hubInfo, hubLoad); - } catch (HubLoad.InvalidHubLoadException e) { - logger.warn("Corrupt data found for a hub. Ignoring."); - } - } - numResponse++; - if (numResponse == children.size()) { - // We got less number of valid responses than the hubs we saw previously. - // Signal an error. - if (loadMap.size() != numResponse) { - callback.operationFailed(originalCtx, - new PubSubException.UnexpectedConditionException( - "Fewer OK responses than the number of active hubs seen previously.")); - return; - } - // We've seen all responses. All OK. - callback.operationFinished(originalCtx, loadMap); - } - } - } - }; - - for (String child : children) { - String znode = conf.getZkHostsPrefix(new StringBuilder()).append("/").append(child).toString(); - zk.getData(znode, false, dataCallback, child); - } - } - }, originalCtx); - } - - @Override - public void rebalanceCluster(final double tolerancePercentage, final HubLoad maxLoadToShed, - final Callback callback, final Object ctx) { - // Get the load on all active hubs and then shed load if required. - getActiveHubsInfoWithLoad(new Callback>() { - @Override - public void operationFinished(Object ctx, Map loadMap) { - if (null == tm) { - // No topic manager, so no load to shed. - callback.operationFinished(ctx, false); - return; - } - TopicBasedLoadShedder tbls = new TopicBasedLoadShedder(tm, - tolerancePercentage, maxLoadToShed); - tbls.shedLoad(loadMap, callback, ctx); - } - - @Override - public void operationFailed(Object ctx, PubSubException e) { - // Rebalance failed. Log this and signal failure on the callback. - logger.error("Failed to get active hubs. Cannot attempt a rebalance."); - callback.operationFailed(ctx, e); - } - }, ctx); - } - - -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/topics/ZkTopicManager.java ---------------------------------------------------------------------- diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/topics/ZkTopicManager.java b/hedwig-server/src/main/java/org/apache/hedwig/server/topics/ZkTopicManager.java deleted file mode 100644 index 2424d27..0000000 --- a/hedwig-server/src/main/java/org/apache/hedwig/server/topics/ZkTopicManager.java +++ /dev/null @@ -1,345 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.server.topics; - -import java.net.UnknownHostException; -import java.io.IOException; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.SynchronousQueue; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.apache.zookeeper.CreateMode; -import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.KeeperException.Code; -import org.apache.zookeeper.ZooKeeper; -import org.apache.zookeeper.ZooDefs.Ids; -import org.apache.zookeeper.data.Stat; - -import static com.google.common.base.Charsets.UTF_8; -import com.google.protobuf.ByteString; -import org.apache.hedwig.exceptions.PubSubException; -import org.apache.hedwig.server.common.ServerConfiguration; -import org.apache.hedwig.util.Callback; -import org.apache.hedwig.util.ConcurrencyUtils; -import org.apache.hedwig.util.Either; -import org.apache.hedwig.util.HedwigSocketAddress; -import org.apache.hedwig.zookeeper.SafeAsyncZKCallback; -import org.apache.hedwig.zookeeper.ZkUtils; - -/** - * Topics are operated on in parallel as they are independent. - * - */ -public class ZkTopicManager extends AbstractTopicManager implements TopicManager { - - private static final Logger logger = LoggerFactory.getLogger(ZkTopicManager.class); - - /** - * Persistent storage for topic metadata. - */ - private ZooKeeper zk; - - // hub server manager - private final HubServerManager hubManager; - - private final HubInfo myHubInfo; - private final HubLoad myHubLoad; - - // Boolean flag indicating if we should suspend activity. If this is true, - // all of the Ops put into the queuer will fail automatically. - protected volatile boolean isSuspended = false; - - /** - * Create a new topic manager. Pass in an active ZooKeeper client object. - * - * @param zk - */ - public ZkTopicManager(final ZooKeeper zk, final ServerConfiguration cfg, ScheduledExecutorService scheduler) - throws UnknownHostException, PubSubException { - - super(cfg, scheduler); - this.zk = zk; - this.hubManager = new ZkHubServerManager(cfg, zk, addr, this); - - myHubLoad = new HubLoad(topics.size()); - this.hubManager.registerListener(new HubServerManager.ManagerListener() { - @Override - public void onSuspend() { - isSuspended = true; - } - @Override - public void onResume() { - isSuspended = false; - } - @Override - public void onShutdown() { - // if hub server manager can't work, we had to quit - Runtime.getRuntime().exit(1); - } - }); - - final SynchronousQueue> queue = - new SynchronousQueue>(); - this.hubManager.registerSelf(myHubLoad, new Callback() { - @Override - public void operationFinished(final Object ctx, final HubInfo resultOfOperation) { - logger.info("Successfully registered hub {} with zookeeper", resultOfOperation); - ConcurrencyUtils.put(queue, Either.of(resultOfOperation, (PubSubException) null)); - } - @Override - public void operationFailed(Object ctx, PubSubException exception) { - logger.error("Failed to register hub with zookeeper", exception); - ConcurrencyUtils.put(queue, Either.of((HubInfo)null, exception)); - } - }, null); - - Either result = ConcurrencyUtils.take(queue); - PubSubException pse = result.right(); - if (pse != null) { - throw pse; - } - myHubInfo = result.left(); - } - - String hubPath(ByteString topic) { - return cfg.getZkTopicPath(new StringBuilder(), topic).append("/hub").toString(); - } - - @Override - protected void realGetOwner(final ByteString topic, final boolean shouldClaim, - final Callback cb, final Object ctx) { - // If operations are suspended due to a ZK client disconnect, just error - // out this call and return. - if (isSuspended) { - cb.operationFailed(ctx, new PubSubException.ServiceDownException( - "ZKTopicManager service is temporarily suspended!")); - return; - } - - TopicStats stats = topics.getIfPresent(topic); - if (null != stats) { - cb.operationFinished(ctx, addr); - return; - } - - new ZkGetOwnerOp(topic, shouldClaim, cb, ctx).read(); - } - - // Recursively call each other. - class ZkGetOwnerOp { - ByteString topic; - boolean shouldClaim; - Callback cb; - Object ctx; - String hubPath; - - public ZkGetOwnerOp(ByteString topic, boolean shouldClaim, Callback cb, Object ctx) { - this.topic = topic; - this.shouldClaim = shouldClaim; - this.cb = cb; - this.ctx = ctx; - hubPath = hubPath(topic); - - } - - public void choose() { - hubManager.chooseLeastLoadedHub(new Callback() { - @Override - public void operationFinished(Object ctx, HubInfo owner) { - logger.info("{} : Least loaded owner {} is chosen for topic {}", - new Object[] { addr, owner, topic.toStringUtf8() }); - if (owner.getAddress().equals(addr)) { - claim(); - } else { - cb.operationFinished(ZkGetOwnerOp.this.ctx, owner.getAddress()); - } - } - @Override - public void operationFailed(Object ctx, PubSubException pse) { - logger.error("Failed to choose least loaded hub server for topic " - + topic.toStringUtf8() + " : ", pse); - cb.operationFailed(ctx, pse); - } - }, null); - } - - public void claimOrChoose() { - if (shouldClaim) - claim(); - else - choose(); - } - - public void read() { - zk.getData(hubPath, false, new SafeAsyncZKCallback.DataCallback() { - @Override - public void safeProcessResult(int rc, String path, Object ctx, byte[] data, Stat stat) { - - if (rc == Code.NONODE.intValue()) { - claimOrChoose(); - return; - } - - if (rc != Code.OK.intValue()) { - KeeperException e = ZkUtils.logErrorAndCreateZKException("Could not read ownership for topic: " - + topic.toStringUtf8(), path, rc); - cb.operationFailed(ctx, new PubSubException.ServiceDownException(e)); - return; - } - - // successfully did a read - try { - HubInfo ownerHubInfo = HubInfo.parse(new String(data, UTF_8)); - HedwigSocketAddress owner = ownerHubInfo.getAddress(); - if (!owner.equals(addr)) { - if (logger.isDebugEnabled()) { - logger.debug("topic: " + topic.toStringUtf8() + " belongs to someone else: " + owner); - } - cb.operationFinished(ctx, owner); - return; - } - logger.info("Discovered stale self-node for topic: " + topic.toStringUtf8() + ", will delete it"); - } catch (HubInfo.InvalidHubInfoException ihie) { - logger.info("Discovered invalid hub info for topic: " + topic.toStringUtf8() + ", will delete it : ", ihie); - } - - // we must have previously failed and left a - // residual ephemeral node here, so we must - // delete it (clean it up) and then - // re-create/re-acquire the topic. - zk.delete(hubPath, stat.getVersion(), new VoidCallback() { - @Override - public void processResult(int rc, String path, Object ctx) { - if (Code.OK.intValue() == rc || Code.NONODE.intValue() == rc) { - claimOrChoose(); - } else { - KeeperException e = ZkUtils.logErrorAndCreateZKException( - "Could not delete self node for topic: " + topic.toStringUtf8(), path, rc); - cb.operationFailed(ctx, new PubSubException.ServiceDownException(e)); - } - } - }, ctx); - } - }, ctx); - } - - public void claim() { - if (logger.isDebugEnabled()) { - logger.debug("claiming topic: " + topic.toStringUtf8()); - } - - ZkUtils.createFullPathOptimistic(zk, hubPath, - myHubInfo.toString().getBytes(UTF_8), Ids.OPEN_ACL_UNSAFE, - CreateMode.EPHEMERAL, new SafeAsyncZKCallback.StringCallback() { - - @Override - public void safeProcessResult(int rc, String path, Object ctx, String name) { - if (rc == Code.OK.intValue()) { - if (logger.isDebugEnabled()) { - logger.debug("claimed topic: " + topic.toStringUtf8()); - } - notifyListenersAndAddToOwnedTopics(topic, cb, ctx); - hubManager.uploadSelfLoadData(myHubLoad.setNumTopics(topics.size())); - } else if (rc == Code.NODEEXISTS.intValue()) { - read(); - } else { - KeeperException e = ZkUtils.logErrorAndCreateZKException( - "Failed to create ephemeral node to claim ownership of topic: " - + topic.toStringUtf8(), path, rc); - cb.operationFailed(ctx, new PubSubException.ServiceDownException(e)); - } - } - }, ctx); - } - - } - - @Override - protected void postReleaseCleanup(final ByteString topic, final Callback cb, Object ctx) { - - // Reduce load. We've removed the topic from our topic set, so do this as well. - // When we reclaim the topic, we will increment the load again. - hubManager.uploadSelfLoadData(myHubLoad.setNumTopics(topics.size())); - - zk.getData(hubPath(topic), false, new SafeAsyncZKCallback.DataCallback() { - @Override - public void safeProcessResult(int rc, String path, Object ctx, byte[] data, Stat stat) { - if (rc == Code.NONODE.intValue()) { - // Node has somehow disappeared from under us, live with it - // since its a transient node - logger.warn("While deleting self-node for topic: " + topic.toStringUtf8() + ", node not found"); - cb.operationFinished(ctx, null); - return; - } - - if (rc != Code.OK.intValue()) { - KeeperException e = ZkUtils.logErrorAndCreateZKException( - "Failed to delete self-ownership node for topic: " + topic.toStringUtf8(), path, rc); - cb.operationFailed(ctx, new PubSubException.ServiceDownException(e)); - return; - } - - String hubInfoStr = new String(data, UTF_8); - try { - HubInfo ownerHubInfo = HubInfo.parse(hubInfoStr); - HedwigSocketAddress owner = ownerHubInfo.getAddress(); - if (!owner.equals(addr)) { - logger.warn("Wanted to delete self-node for topic: " + topic.toStringUtf8() + " but node for " - + owner + " found, leaving untouched"); - // Not our node, someone else's, leave it alone - cb.operationFinished(ctx, null); - return; - } - } catch (HubInfo.InvalidHubInfoException ihie) { - logger.info("Invalid hub info " + hubInfoStr + " found when release topic " - + topic.toStringUtf8() + ". Leaving untouched until next acquire action."); - cb.operationFinished(ctx, null); - return; - } - - zk.delete(path, stat.getVersion(), new SafeAsyncZKCallback.VoidCallback() { - @Override - public void safeProcessResult(int rc, String path, Object ctx) { - if (rc != Code.OK.intValue() && rc != Code.NONODE.intValue()) { - KeeperException e = ZkUtils - .logErrorAndCreateZKException("Failed to delete self-ownership node for topic: " - + topic.toStringUtf8(), path, rc); - cb.operationFailed(ctx, new PubSubException.ServiceDownException(e)); - return; - } - - cb.operationFinished(ctx, null); - } - }, ctx); - } - }, ctx); - } - - @Override - public void stop() { - // we just unregister it with zookeeper to make it unavailable from hub servers list - try { - hubManager.unregisterSelf(); - } catch (IOException e) { - logger.error("Error unregistering hub server :", e); - } - super.stop(); - } - -}