Return-Path: X-Original-To: apmail-bookkeeper-commits-archive@www.apache.org Delivered-To: apmail-bookkeeper-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 7A89819862 for ; Wed, 16 Mar 2016 03:44:15 +0000 (UTC) Received: (qmail 50853 invoked by uid 500); 16 Mar 2016 03:44:15 -0000 Delivered-To: apmail-bookkeeper-commits-archive@bookkeeper.apache.org Received: (qmail 50775 invoked by uid 500); 16 Mar 2016 03:44:15 -0000 Mailing-List: contact commits-help@bookkeeper.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: bookkeeper-dev@bookkeeper.apache.org Delivered-To: mailing list commits@bookkeeper.apache.org Received: (qmail 49877 invoked by uid 99); 16 Mar 2016 03:44:12 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 16 Mar 2016 03:44:12 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 447C2DFA42; Wed, 16 Mar 2016 03:44:12 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sijie@apache.org To: commits@bookkeeper.apache.org Date: Wed, 16 Mar 2016 03:44:24 -0000 Message-Id: <2ea7732de0ad4f47b754d0e0e95f8327@git.apache.org> In-Reply-To: <5d2e3d025bc84b8f8013a1ae9dbb9498@git.apache.org> References: <5d2e3d025bc84b8f8013a1ae9dbb9498@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [14/49] bookkeeper git commit: BOOKKEEPER-769: Remove the Hedwig Code http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/meta/ZkMetadataManagerFactory.java ---------------------------------------------------------------------- diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/meta/ZkMetadataManagerFactory.java b/hedwig-server/src/main/java/org/apache/hedwig/server/meta/ZkMetadataManagerFactory.java deleted file mode 100644 index e50bc7f..0000000 --- a/hedwig-server/src/main/java/org/apache/hedwig/server/meta/ZkMetadataManagerFactory.java +++ /dev/null @@ -1,840 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.server.meta; - -import java.io.IOException; -import java.util.List; -import java.util.Map; -import java.util.Iterator; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.apache.zookeeper.CreateMode; -import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.KeeperException.Code; -import org.apache.zookeeper.ZKUtil; -import org.apache.zookeeper.ZooKeeper; -import org.apache.zookeeper.ZooDefs.Ids; -import org.apache.zookeeper.data.Stat; -import com.google.protobuf.ByteString; -import com.google.protobuf.InvalidProtocolBufferException; -import org.apache.bookkeeper.versioning.Version; -import org.apache.bookkeeper.versioning.Versioned; -import org.apache.bookkeeper.meta.ZkVersion; -import org.apache.hedwig.exceptions.PubSubException; -import org.apache.hedwig.protocol.PubSubProtocol.LedgerRanges; -import org.apache.hedwig.protocol.PubSubProtocol.StatusCode; -import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionData; -import org.apache.hedwig.protoextensions.SubscriptionStateUtils; -import org.apache.hedwig.server.common.ServerConfiguration; -import org.apache.hedwig.server.topics.HubInfo; -import org.apache.hedwig.util.Callback; -import org.apache.hedwig.zookeeper.SafeAsyncZKCallback; -import org.apache.hedwig.zookeeper.ZkUtils; -import static com.google.common.base.Charsets.UTF_8; - -/** - * ZooKeeper-based Metadata Manager. - */ -public class ZkMetadataManagerFactory extends MetadataManagerFactory { - protected final static Logger logger = LoggerFactory.getLogger(ZkMetadataManagerFactory.class); - - static final int CUR_VERSION = 1; - - ZooKeeper zk; - ServerConfiguration cfg; - - @Override - public int getCurrentVersion() { - return CUR_VERSION; - } - - @Override - public MetadataManagerFactory initialize(ServerConfiguration cfg, - ZooKeeper zk, - int version) - throws IOException { - if (CUR_VERSION != version) { - throw new IOException("Incompatible ZkMetadataManagerFactory version " + version - + " found, expected version " + CUR_VERSION); - } - this.cfg = cfg; - this.zk = zk; - return this; - } - - @Override - public void shutdown() { - // do nothing here, because zookeeper handle is passed from outside - // we don't need to stop it. - } - - @Override - public Iterator getTopics() throws IOException { - List topics; - try { - topics = zk.getChildren(cfg.getZkTopicsPrefix(new StringBuilder()).toString(), false); - } catch (KeeperException ke) { - throw new IOException("Failed to get topics list : ", ke); - } catch (InterruptedException ie) { - throw new IOException("Interrupted on getting topics list : ", ie); - } - final Iterator iter = topics.iterator(); - return new Iterator() { - @Override - public boolean hasNext() { - return iter.hasNext(); - } - @Override - public ByteString next() { - String t = iter.next(); - return ByteString.copyFromUtf8(t); - } - @Override - public void remove() { - iter.remove(); - } - }; - } - - @Override - public TopicPersistenceManager newTopicPersistenceManager() { - return new ZkTopicPersistenceManagerImpl(cfg, zk); - } - - @Override - public SubscriptionDataManager newSubscriptionDataManager() { - return new ZkSubscriptionDataManagerImpl(cfg, zk); - } - - @Override - public TopicOwnershipManager newTopicOwnershipManager() { - return new ZkTopicOwnershipManagerImpl(cfg, zk); - } - - /** - * ZooKeeper based topic persistence manager. - */ - static class ZkTopicPersistenceManagerImpl implements TopicPersistenceManager { - - ZooKeeper zk; - ServerConfiguration cfg; - - ZkTopicPersistenceManagerImpl(ServerConfiguration conf, ZooKeeper zk) { - this.cfg = conf; - this.zk = zk; - } - - @Override - public void close() throws IOException { - // do nothing in zookeeper based impl - } - - /** - * Get znode path to store persistence info of a topic. - * - * @param topic - * Topic name - * @return znode path to store persistence info. - */ - private String ledgersPath(ByteString topic) { - return cfg.getZkTopicPath(new StringBuilder(), topic).append("/ledgers").toString(); - } - - /** - * Parse ledger ranges data and return it thru callback. - * - * @param topic - * Topic name - * @param data - * Topic Ledger Ranges data - * @param version - * Version of the topic ledger ranges data - * @param callback - * Callback to return ledger ranges - * @param ctx - * Context of the callback - */ - private void parseAndReturnTopicLedgerRanges(ByteString topic, byte[] data, int version, - Callback> callback, Object ctx) { - try { - Versioned ranges = new Versioned(LedgerRanges.parseFrom(data), - new ZkVersion(version)); - callback.operationFinished(ctx, ranges); - return; - } catch (InvalidProtocolBufferException e) { - String msg = "Ledger ranges for topic:" + topic.toStringUtf8() + " could not be deserialized"; - logger.error(msg, e); - callback.operationFailed(ctx, new PubSubException.UnexpectedConditionException(msg)); - return; - } - } - - @Override - public void readTopicPersistenceInfo(final ByteString topic, - final Callback> callback, - Object ctx) { - // read topic ledgers node data - final String zNodePath = ledgersPath(topic); - - zk.getData(zNodePath, false, new SafeAsyncZKCallback.DataCallback() { - @Override - public void safeProcessResult(int rc, String path, Object ctx, byte[] data, Stat stat) { - if (rc == Code.OK.intValue()) { - parseAndReturnTopicLedgerRanges(topic, data, stat.getVersion(), callback, ctx); - return; - } - - if (rc == Code.NONODE.intValue()) { - // we don't create the znode until we first write it. - callback.operationFinished(ctx, null); - return; - } - - // otherwise some other error - KeeperException ke = - ZkUtils.logErrorAndCreateZKException("Could not read ledgers node for topic: " - + topic.toStringUtf8(), path, rc); - callback.operationFailed(ctx, new PubSubException.ServiceDownException(ke)); - } - }, ctx); - } - - private void createTopicPersistenceInfo(final ByteString topic, LedgerRanges ranges, - final Callback callback, Object ctx) { - final String zNodePath = ledgersPath(topic); - final byte[] data = ranges.toByteArray(); - // create it - ZkUtils.createFullPathOptimistic(zk, zNodePath, data, Ids.OPEN_ACL_UNSAFE, - CreateMode.PERSISTENT, new SafeAsyncZKCallback.StringCallback() { - @Override - public void safeProcessResult(int rc, String path, Object ctx, String name) { - if (rc == Code.NODEEXISTS.intValue()) { - callback.operationFailed(ctx, PubSubException.create(StatusCode.TOPIC_PERSISTENCE_INFO_EXISTS, - "Persistence info of topic " + topic.toStringUtf8() + " existed.")); - return; - } - if (rc != Code.OK.intValue()) { - KeeperException ke = ZkUtils.logErrorAndCreateZKException( - "Could not create ledgers node for topic: " + topic.toStringUtf8(), - path, rc); - callback.operationFailed(ctx, new PubSubException.ServiceDownException(ke)); - return; - } - // initial version is version 0 - callback.operationFinished(ctx, new ZkVersion(0)); - } - }, ctx); - return; - } - - @Override - public void writeTopicPersistenceInfo(final ByteString topic, LedgerRanges ranges, final Version version, - final Callback callback, Object ctx) { - if (Version.NEW == version) { - createTopicPersistenceInfo(topic, ranges, callback, ctx); - return; - } - - final String zNodePath = ledgersPath(topic); - final byte[] data = ranges.toByteArray(); - - if (!(version instanceof ZkVersion)) { - callback.operationFailed(ctx, new PubSubException.UnexpectedConditionException( - "Invalid version provided to update persistence info for topic " + topic.toStringUtf8())); - return; - } - - int znodeVersion = ((ZkVersion)version).getZnodeVersion(); - zk.setData(zNodePath, data, znodeVersion, new SafeAsyncZKCallback.StatCallback() { - @Override - public void safeProcessResult(int rc, String path, Object ctx, Stat stat) { - if (rc == Code.NONODE.intValue()) { - // no node - callback.operationFailed(ctx, PubSubException.create(StatusCode.NO_TOPIC_PERSISTENCE_INFO, - "No persistence info found for topic " + topic.toStringUtf8())); - return; - } else if (rc == Code.BADVERSION.intValue()) { - // bad version - callback.operationFailed(ctx, PubSubException.create(StatusCode.BAD_VERSION, - "Bad version provided to update persistence info of topic " + topic.toStringUtf8())); - return; - } else if (rc == Code.OK.intValue()) { - callback.operationFinished(ctx, new ZkVersion(stat.getVersion())); - return; - } else { - KeeperException ke = ZkUtils.logErrorAndCreateZKException( - "Could not write ledgers node for topic: " + topic.toStringUtf8(), path, rc); - callback.operationFailed(ctx, new PubSubException.ServiceDownException(ke)); - return; - } - } - }, ctx); - } - - @Override - public void deleteTopicPersistenceInfo(final ByteString topic, final Version version, - final Callback callback, Object ctx) { - final String zNodePath = ledgersPath(topic); - - int znodeVersion = -1; - if (Version.ANY != version) { - if (!(version instanceof ZkVersion)) { - callback.operationFailed(ctx, new PubSubException.UnexpectedConditionException( - "Invalid version provided to delete persistence info for topic " + topic.toStringUtf8())); - return; - } else { - znodeVersion = ((ZkVersion)version).getZnodeVersion(); - } - } - zk.delete(zNodePath, znodeVersion, new SafeAsyncZKCallback.VoidCallback() { - @Override - public void safeProcessResult(int rc, String path, Object ctx) { - if (rc == Code.OK.intValue()) { - callback.operationFinished(ctx, null); - return; - } else if (rc == Code.NONODE.intValue()) { - // no node - callback.operationFailed(ctx, PubSubException.create(StatusCode.NO_TOPIC_PERSISTENCE_INFO, - "No persistence info found for topic " + topic.toStringUtf8())); - return; - } else if (rc == Code.BADVERSION.intValue()) { - // bad version - callback.operationFailed(ctx, PubSubException.create(StatusCode.BAD_VERSION, - "Bad version provided to delete persistence info of topic " + topic.toStringUtf8())); - return; - } - - KeeperException e = ZkUtils.logErrorAndCreateZKException("Topic: " + topic.toStringUtf8() - + " failed to delete persistence info @version " + version + " : ", path, rc); - callback.operationFailed(ctx, new PubSubException.ServiceDownException(e)); - } - }, ctx); - } - } - - /** - * ZooKeeper based subscription data manager. - */ - static class ZkSubscriptionDataManagerImpl implements SubscriptionDataManager { - - ZooKeeper zk; - ServerConfiguration cfg; - - ZkSubscriptionDataManagerImpl(ServerConfiguration conf, ZooKeeper zk) { - this.cfg = conf; - this.zk = zk; - } - - @Override - public void close() throws IOException { - // do nothing in zookeeper based impl - } - - /** - * Get znode path to store subscription states. - * - * @param sb - * String builder to store the znode path. - * @param topic - * Topic name. - * - * @return string builder to store znode path. - */ - private StringBuilder topicSubscribersPath(StringBuilder sb, ByteString topic) { - return cfg.getZkTopicPath(sb, topic).append("/subscribers"); - } - - /** - * Get znode path to store subscription state for a specified subscriber. - * - * @param topic - * Topic name. - * @param subscriber - * Subscriber id. - * @return znode path to store subscription state. - */ - private String topicSubscriberPath(ByteString topic, ByteString subscriber) { - return topicSubscribersPath(new StringBuilder(), topic).append("/").append(subscriber.toStringUtf8()) - .toString(); - } - - @Override - public boolean isPartialUpdateSupported() { - return false; - } - - @Override - public void createSubscriptionData(final ByteString topic, final ByteString subscriberId, final SubscriptionData data, - final Callback callback, final Object ctx) { - ZkUtils.createFullPathOptimistic(zk, topicSubscriberPath(topic, subscriberId), data.toByteArray(), - Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, new SafeAsyncZKCallback.StringCallback() { - - @Override - public void safeProcessResult(int rc, String path, Object ctx, String name) { - - if (rc == Code.NODEEXISTS.intValue()) { - callback.operationFailed(ctx, PubSubException.create(StatusCode.SUBSCRIPTION_STATE_EXISTS, - "Subscription state for (topic:" + topic.toStringUtf8() + ", subscriber:" - + subscriberId.toStringUtf8() + ") existed.")); - return; - } else if (rc == Code.OK.intValue()) { - if (logger.isDebugEnabled()) { - logger.debug("Successfully recorded subscription for topic: " + topic.toStringUtf8() - + " subscriberId: " + subscriberId.toStringUtf8() + " data: " - + SubscriptionStateUtils.toString(data)); - } - callback.operationFinished(ctx, new ZkVersion(0)); - } else { - KeeperException ke = ZkUtils.logErrorAndCreateZKException( - "Could not record new subscription for topic: " + topic.toStringUtf8() - + " subscriberId: " + subscriberId.toStringUtf8(), path, rc); - callback.operationFailed(ctx, new PubSubException.ServiceDownException(ke)); - } - } - }, ctx); - } - - @Override - public void updateSubscriptionData(final ByteString topic, final ByteString subscriberId, final SubscriptionData data, - final Version version, final Callback callback, final Object ctx) { - throw new UnsupportedOperationException("ZooKeeper based metadata manager doesn't support partial update!"); - } - - @Override - public void replaceSubscriptionData(final ByteString topic, final ByteString subscriberId, final SubscriptionData data, - final Version version, final Callback callback, final Object ctx) { - int znodeVersion = -1; - if (Version.NEW == version) { - callback.operationFailed(ctx, - new PubSubException.BadVersionException("Can not replace Version.New subscription data")); - return; - } else if (Version.ANY != version) { - if (!(version instanceof ZkVersion)) { - callback.operationFailed(ctx, new PubSubException.UnexpectedConditionException( - "Invalid version provided to replace subscription data for topic " - + topic.toStringUtf8() + " subscribe id: " + subscriberId)); - return; - } else { - znodeVersion = ((ZkVersion)version).getZnodeVersion(); - } - } - zk.setData(topicSubscriberPath(topic, subscriberId), data.toByteArray(), - znodeVersion, new SafeAsyncZKCallback.StatCallback() { - @Override - public void safeProcessResult(int rc, String path, Object ctx, Stat stat) { - if (rc == Code.NONODE.intValue()) { - // no node - callback.operationFailed(ctx, PubSubException.create(StatusCode.NO_SUBSCRIPTION_STATE, - "No subscription state found for (topic:" + topic.toStringUtf8() + ", subscriber:" - + subscriberId.toStringUtf8() + ").")); - return; - } else if (rc == Code.BADVERSION.intValue()) { - // bad version - callback.operationFailed(ctx, PubSubException.create(StatusCode.BAD_VERSION, - "Bad version provided to replace subscription data of topic " - + topic.toStringUtf8() + " subscriberId " + subscriberId)); - return; - } else if (rc != Code.OK.intValue()) { - KeeperException e = ZkUtils.logErrorAndCreateZKException("Topic: " + topic.toStringUtf8() - + " subscriberId: " + subscriberId.toStringUtf8() - + " could not set subscription data: " + SubscriptionStateUtils.toString(data), - path, rc); - callback.operationFailed(ctx, new PubSubException.ServiceDownException(e)); - } else { - if (logger.isDebugEnabled()) { - logger.debug("Successfully updated subscription for topic: " + topic.toStringUtf8() - + " subscriberId: " + subscriberId.toStringUtf8() + " data: " - + SubscriptionStateUtils.toString(data)); - } - - callback.operationFinished(ctx, new ZkVersion(stat.getVersion())); - } - } - }, ctx); - } - - @Override - public void deleteSubscriptionData(final ByteString topic, final ByteString subscriberId, Version version, - final Callback callback, Object ctx) { - - int znodeVersion = -1; - if (Version.NEW == version) { - callback.operationFailed(ctx, - new PubSubException.BadVersionException("Can not delete Version.New subscription data")); - return; - } else if (Version.ANY != version) { - if (!(version instanceof ZkVersion)) { - callback.operationFailed(ctx, new PubSubException.UnexpectedConditionException( - "Invalid version provided to delete subscription data for topic " - + topic.toStringUtf8() + " subscribe id: " + subscriberId)); - return; - } else { - znodeVersion = ((ZkVersion)version).getZnodeVersion(); - } - } - - zk.delete(topicSubscriberPath(topic, subscriberId), znodeVersion, new SafeAsyncZKCallback.VoidCallback() { - @Override - public void safeProcessResult(int rc, String path, Object ctx) { - if (rc == Code.NONODE.intValue()) { - // no node - callback.operationFailed(ctx, PubSubException.create(StatusCode.NO_SUBSCRIPTION_STATE, - "No subscription state found for (topic:" + topic.toStringUtf8() + ", subscriber:" - + subscriberId.toStringUtf8() + ").")); - return; - } else if (rc == Code.BADVERSION.intValue()) { - // bad version - callback.operationFailed(ctx, PubSubException.create(StatusCode.BAD_VERSION, - "Bad version provided to delete subscription data of topic " - + topic.toStringUtf8() + " subscriberId " + subscriberId)); - return; - } else if (rc == Code.OK.intValue()) { - if (logger.isDebugEnabled()) { - logger.debug("Successfully deleted subscription for topic: " + topic.toStringUtf8() - + " subscriberId: " + subscriberId.toStringUtf8()); - } - - callback.operationFinished(ctx, null); - return; - } - - KeeperException e = ZkUtils.logErrorAndCreateZKException("Topic: " + topic.toStringUtf8() - + " subscriberId: " + subscriberId.toStringUtf8() + " failed to delete subscription", path, rc); - callback.operationFailed(ctx, new PubSubException.ServiceDownException(e)); - } - }, ctx); - } - - @Override - public void readSubscriptionData(final ByteString topic, final ByteString subscriberId, - final Callback> callback, final Object ctx) { - zk.getData(topicSubscriberPath(topic, subscriberId), false, new SafeAsyncZKCallback.DataCallback() { - @Override - public void safeProcessResult(int rc, String path, Object ctx, byte[] data, Stat stat) { - if (rc == Code.NONODE.intValue()) { - callback.operationFinished(ctx, null); - return; - } - if (rc != Code.OK.intValue()) { - KeeperException e = ZkUtils.logErrorAndCreateZKException( - "Could not read subscription data for topic: " + topic.toStringUtf8() - + ", subscriberId: " + subscriberId.toStringUtf8(), path, rc); - callback.operationFailed(ctx, new PubSubException.ServiceDownException(e)); - return; - } - - Versioned subData; - try { - subData = new Versioned( - SubscriptionStateUtils.parseSubscriptionData(data), - new ZkVersion(stat.getVersion())); - } catch (InvalidProtocolBufferException ex) { - String msg = "Failed to deserialize subscription data for topic: " + topic.toStringUtf8() - + " subscriberId: " + subscriberId.toStringUtf8(); - logger.error(msg, ex); - callback.operationFailed(ctx, new PubSubException.UnexpectedConditionException(msg)); - return; - } - - if (logger.isDebugEnabled()) { - logger.debug("Found subscription while acquiring topic: " + topic.toStringUtf8() - + " subscriberId: " + subscriberId.toStringUtf8() - + " data: " + SubscriptionStateUtils.toString(subData.getValue())); - } - callback.operationFinished(ctx, subData); - } - }, ctx); - } - - @Override - public void readSubscriptions(final ByteString topic, - final Callback>> cb, final Object ctx) { - String topicSubscribersPath = topicSubscribersPath(new StringBuilder(), topic).toString(); - zk.getChildren(topicSubscribersPath, false, new SafeAsyncZKCallback.ChildrenCallback() { - @Override - public void safeProcessResult(int rc, String path, final Object ctx, final List children) { - - if (rc != Code.OK.intValue() && rc != Code.NONODE.intValue()) { - KeeperException e = ZkUtils.logErrorAndCreateZKException("Could not read subscribers for topic " - + topic.toStringUtf8(), path, rc); - cb.operationFailed(ctx, new PubSubException.ServiceDownException(e)); - return; - } - - final Map> topicSubs = - new ConcurrentHashMap>(); - - if (rc == Code.NONODE.intValue() || children.size() == 0) { - if (logger.isDebugEnabled()) { - logger.debug("No subscriptions found while acquiring topic: " + topic.toStringUtf8()); - } - cb.operationFinished(ctx, topicSubs); - return; - } - - final AtomicBoolean failed = new AtomicBoolean(); - final AtomicInteger count = new AtomicInteger(); - - for (final String child : children) { - - final ByteString subscriberId = ByteString.copyFromUtf8(child); - final String childPath = path + "/" + child; - - zk.getData(childPath, false, new SafeAsyncZKCallback.DataCallback() { - @Override - public void safeProcessResult(int rc, String path, Object ctx, byte[] data, Stat stat) { - - if (rc != Code.OK.intValue()) { - KeeperException e = ZkUtils.logErrorAndCreateZKException( - "Could not read subscription data for topic: " + topic.toStringUtf8() - + ", subscriberId: " + subscriberId.toStringUtf8(), path, rc); - reportFailure(new PubSubException.ServiceDownException(e)); - return; - } - - if (failed.get()) { - return; - } - - Versioned subData; - try { - subData = new Versioned( - SubscriptionStateUtils.parseSubscriptionData(data), - new ZkVersion(stat.getVersion())); - } catch (InvalidProtocolBufferException ex) { - String msg = "Failed to deserialize subscription data for topic: " + topic.toStringUtf8() - + " subscriberId: " + subscriberId.toStringUtf8(); - logger.error(msg, ex); - reportFailure(new PubSubException.UnexpectedConditionException(msg)); - return; - } - - if (logger.isDebugEnabled()) { - logger.debug("Found subscription while acquiring topic: " + topic.toStringUtf8() - + " subscriberId: " + child + "state: " - + SubscriptionStateUtils.toString(subData.getValue())); - } - - topicSubs.put(subscriberId, subData); - if (count.incrementAndGet() == children.size()) { - assert topicSubs.size() == count.get(); - cb.operationFinished(ctx, topicSubs); - } - } - - private void reportFailure(PubSubException e) { - if (failed.compareAndSet(false, true)) - cb.operationFailed(ctx, e); - } - }, ctx); - } - } - }, ctx); - } - } - - /** - * ZooKeeper base topic ownership manager. - */ - static class ZkTopicOwnershipManagerImpl implements TopicOwnershipManager { - - ZooKeeper zk; - ServerConfiguration cfg; - - ZkTopicOwnershipManagerImpl(ServerConfiguration conf, ZooKeeper zk) { - this.cfg = conf; - this.zk = zk; - } - - @Override - public void close() throws IOException { - // do nothing in zookeeper based impl - } - - /** - * Return znode path to store topic owner. - * - * @param topic - * Topic Name - * @return znode path to store topic owner. - */ - String hubPath(ByteString topic) { - return cfg.getZkTopicPath(new StringBuilder(), topic).append("/hub").toString(); - } - - @Override - public void readOwnerInfo(final ByteString topic, final Callback> callback, Object ctx) { - String ownerPath = hubPath(topic); - zk.getData(ownerPath, false, new SafeAsyncZKCallback.DataCallback() { - @Override - public void safeProcessResult(int rc, String path, Object ctx, byte[] data, Stat stat) { - if (Code.NONODE.intValue() == rc) { - callback.operationFinished(ctx, null); - return; - } - - if (Code.OK.intValue() != rc) { - KeeperException e = ZkUtils.logErrorAndCreateZKException("Could not read ownership for topic: " - + topic.toStringUtf8(), path, rc); - callback.operationFailed(ctx, new PubSubException.ServiceDownException(e)); - return; - } - HubInfo owner = null; - try { - owner = HubInfo.parse(new String(data, UTF_8)); - } catch (HubInfo.InvalidHubInfoException ihie) { - logger.warn("Failed to parse hub info for topic " + topic.toStringUtf8() + " : ", ihie); - } - int version = stat.getVersion(); - callback.operationFinished(ctx, new Versioned(owner, new ZkVersion(version))); - return; - } - }, ctx); - } - - @Override - public void writeOwnerInfo(final ByteString topic, final HubInfo owner, final Version version, - final Callback callback, Object ctx) { - if (Version.NEW == version) { - createOwnerInfo(topic, owner, callback, ctx); - return; - } - - if (!(version instanceof ZkVersion)) { - callback.operationFailed(ctx, new PubSubException.UnexpectedConditionException( - "Invalid version provided to update owner info for topic " + topic.toStringUtf8())); - return; - } - - int znodeVersion = ((ZkVersion)version).getZnodeVersion(); - zk.setData(hubPath(topic), owner.toString().getBytes(UTF_8), znodeVersion, - new SafeAsyncZKCallback.StatCallback() { - @Override - public void safeProcessResult(int rc, String path, Object ctx, Stat stat) { - if (rc == Code.NONODE.intValue()) { - // no node - callback.operationFailed(ctx, PubSubException.create(StatusCode.NO_TOPIC_OWNER_INFO, - "No owner info found for topic " + topic.toStringUtf8())); - return; - } else if (rc == Code.BADVERSION.intValue()) { - // bad version - callback.operationFailed(ctx, PubSubException.create(StatusCode.BAD_VERSION, - "Bad version provided to update owner info of topic " + topic.toStringUtf8())); - return; - } else if (Code.OK.intValue() == rc) { - callback.operationFinished(ctx, new ZkVersion(stat.getVersion())); - return; - } else { - KeeperException e = ZkUtils.logErrorAndCreateZKException( - "Failed to update ownership of topic " + topic.toStringUtf8() + - " to " + owner, path, rc); - callback.operationFailed(ctx, new PubSubException.ServiceDownException(e)); - return; - } - } - }, ctx); - } - - protected void createOwnerInfo(final ByteString topic, final HubInfo owner, - final Callback callback, Object ctx) { - String ownerPath = hubPath(topic); - ZkUtils.createFullPathOptimistic(zk, ownerPath, owner.toString().getBytes(UTF_8), Ids.OPEN_ACL_UNSAFE, - CreateMode.PERSISTENT, new SafeAsyncZKCallback.StringCallback() { - @Override - public void safeProcessResult(int rc, String path, Object ctx, String name) { - if (Code.OK.intValue() == rc) { - // assume the initial version is 0 - callback.operationFinished(ctx, new ZkVersion(0)); - return; - } else if (Code.NODEEXISTS.intValue() == rc) { - // node existed - callback.operationFailed(ctx, PubSubException.create(StatusCode.TOPIC_OWNER_INFO_EXISTS, - "Owner info of topic " + topic.toStringUtf8() + " existed.")); - return; - } else { - KeeperException e = ZkUtils.logErrorAndCreateZKException( - "Failed to create znode for ownership of topic: " - + topic.toStringUtf8(), path, rc); - callback.operationFailed(ctx, new PubSubException.ServiceDownException(e)); - return; - } - } - }, ctx); - } - - @Override - public void deleteOwnerInfo(final ByteString topic, final Version version, - final Callback callback, Object ctx) { - int znodeVersion = -1; - if (Version.ANY != version) { - if (!(version instanceof ZkVersion)) { - callback.operationFailed(ctx, new PubSubException.UnexpectedConditionException( - "Invalid version provided to delete owner info for topic " + topic.toStringUtf8())); - return; - } else { - znodeVersion = ((ZkVersion)version).getZnodeVersion(); - } - } - - zk.delete(hubPath(topic), znodeVersion, new SafeAsyncZKCallback.VoidCallback() { - @Override - public void safeProcessResult(int rc, String path, Object ctx) { - if (Code.OK.intValue() == rc) { - if (logger.isDebugEnabled()) { - logger.debug("Successfully deleted owner info for topic " + topic.toStringUtf8() + "."); - } - callback.operationFinished(ctx, null); - return; - } else if (Code.NONODE.intValue() == rc) { - // no node - callback.operationFailed(ctx, PubSubException.create(StatusCode.NO_TOPIC_OWNER_INFO, - "No owner info found for topic " + topic.toStringUtf8())); - return; - } else if (Code.BADVERSION.intValue() == rc) { - // bad version - callback.operationFailed(ctx, PubSubException.create(StatusCode.BAD_VERSION, - "Bad version provided to delete owner info of topic " + topic.toStringUtf8())); - return; - } else { - KeeperException e = ZkUtils.logErrorAndCreateZKException( - "Failed to delete owner info for topic " - + topic.toStringUtf8(), path, rc); - callback.operationFailed(ctx, new PubSubException.ServiceDownException(e)); - } - } - }, ctx); - } - } - - @Override - public void format(ServerConfiguration cfg, ZooKeeper zk) throws IOException { - try { - ZKUtil.deleteRecursive(zk, cfg.getZkTopicsPrefix(new StringBuilder()).toString()); - } catch (KeeperException.NoNodeException e) { - logger.debug("Hedwig root node doesn't exist in zookeeper to delete"); - } catch (KeeperException ke) { - throw new IOException(ke); - } catch (InterruptedException ie) { - throw new IOException(ie); - } - } -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/netty/PubSubServer.java ---------------------------------------------------------------------- diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/netty/PubSubServer.java b/hedwig-server/src/main/java/org/apache/hedwig/server/netty/PubSubServer.java deleted file mode 100644 index 23e04e3..0000000 --- a/hedwig-server/src/main/java/org/apache/hedwig/server/netty/PubSubServer.java +++ /dev/null @@ -1,535 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.server.netty; - -import java.io.File; -import java.io.IOException; -import java.net.InetSocketAddress; -import java.net.MalformedURLException; -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.SynchronousQueue; -import java.util.concurrent.TimeUnit; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.util.concurrent.ThreadFactoryBuilder; - -import org.apache.bookkeeper.conf.ClientConfiguration; -import org.apache.bookkeeper.client.BookKeeper; -import org.apache.bookkeeper.client.BKException; -import org.apache.commons.configuration.ConfigurationException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.WatchedEvent; -import org.apache.zookeeper.Watcher; -import org.apache.zookeeper.ZooKeeper; -import org.jboss.netty.bootstrap.ServerBootstrap; -import org.jboss.netty.channel.group.ChannelGroup; -import org.jboss.netty.channel.group.DefaultChannelGroup; -import org.jboss.netty.channel.socket.ClientSocketChannelFactory; -import org.jboss.netty.channel.socket.ServerSocketChannelFactory; -import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory; -import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory; -import org.jboss.netty.logging.InternalLoggerFactory; -import org.jboss.netty.logging.Log4JLoggerFactory; - -import org.apache.hedwig.exceptions.PubSubException; -import org.apache.hedwig.protocol.PubSubProtocol.OperationType; -import org.apache.hedwig.server.common.ServerConfiguration; -import org.apache.hedwig.server.common.TerminateJVMExceptionHandler; -import org.apache.hedwig.server.delivery.DeliveryManager; -import org.apache.hedwig.server.delivery.FIFODeliveryManager; -import org.apache.hedwig.server.handlers.CloseSubscriptionHandler; -import org.apache.hedwig.server.handlers.ConsumeHandler; -import org.apache.hedwig.server.handlers.Handler; -import org.apache.hedwig.server.handlers.NettyHandlerBean; -import org.apache.hedwig.server.handlers.PublishHandler; -import org.apache.hedwig.server.handlers.SubscribeHandler; -import org.apache.hedwig.server.handlers.SubscriptionChannelManager; -import org.apache.hedwig.server.handlers.SubscriptionChannelManager.SubChannelDisconnectedListener; -import org.apache.hedwig.server.handlers.UnsubscribeHandler; -import org.apache.hedwig.server.jmx.HedwigMBeanRegistry; -import org.apache.hedwig.server.meta.MetadataManagerFactory; -import org.apache.hedwig.server.meta.ZkMetadataManagerFactory; -import org.apache.hedwig.server.persistence.BookkeeperPersistenceManager; -import org.apache.hedwig.server.persistence.LocalDBPersistenceManager; -import org.apache.hedwig.server.persistence.PersistenceManager; -import org.apache.hedwig.server.persistence.PersistenceManagerWithRangeScan; -import org.apache.hedwig.server.persistence.ReadAheadCache; -import org.apache.hedwig.server.regions.HedwigHubClientFactory; -import org.apache.hedwig.server.regions.RegionManager; -import org.apache.hedwig.server.ssl.SslServerContextFactory; -import org.apache.hedwig.server.subscriptions.InMemorySubscriptionManager; -import org.apache.hedwig.server.subscriptions.SubscriptionManager; -import org.apache.hedwig.server.subscriptions.MMSubscriptionManager; -import org.apache.hedwig.server.topics.MMTopicManager; -import org.apache.hedwig.server.topics.TopicManager; -import org.apache.hedwig.server.topics.TrivialOwnAllTopicManager; -import org.apache.hedwig.server.topics.ZkTopicManager; -import org.apache.hedwig.util.ConcurrencyUtils; -import org.apache.hedwig.util.Either; -import org.apache.hedwig.zookeeper.SafeAsyncCallback; - -public class PubSubServer { - - private static final Logger logger = LoggerFactory.getLogger(PubSubServer.class); - - private static final String JMXNAME_PREFIX = "PubSubServer_"; - - // Netty related variables - ServerSocketChannelFactory serverChannelFactory; - ClientSocketChannelFactory clientChannelFactory; - ServerConfiguration conf; - org.apache.hedwig.client.conf.ClientConfiguration clientConfiguration; - ChannelGroup allChannels; - - // Manager components that make up the PubSubServer - PersistenceManager pm; - DeliveryManager dm; - TopicManager tm; - SubscriptionManager sm; - RegionManager rm; - - // Metadata Manager Factory - MetadataManagerFactory mm; - - ZooKeeper zk; // null if we are in standalone mode - BookKeeper bk; // null if we are in standalone mode - - // we use this to prevent long stack chains from building up in callbacks - ScheduledExecutorService scheduler; - - // JMX Beans - NettyHandlerBean jmxNettyBean; - PubSubServerBean jmxServerBean; - final ThreadGroup tg; - - protected PersistenceManager instantiatePersistenceManager(TopicManager topicMgr) throws IOException, - InterruptedException { - - PersistenceManagerWithRangeScan underlyingPM; - - if (conf.isStandalone()) { - - underlyingPM = LocalDBPersistenceManager.instance(); - - } else { - try { - ClientConfiguration bkConf = new ClientConfiguration(); - bkConf.addConfiguration(conf.getConf()); - bk = new BookKeeper(bkConf, zk, clientChannelFactory); - } catch (KeeperException e) { - logger.error("Could not instantiate bookkeeper client", e); - throw new IOException(e); - } - underlyingPM = new BookkeeperPersistenceManager(bk, mm, topicMgr, conf, scheduler); - - } - - PersistenceManager pm = underlyingPM; - - if (conf.getReadAheadEnabled()) { - pm = new ReadAheadCache(underlyingPM, conf).start(); - } - - return pm; - } - - protected SubscriptionManager instantiateSubscriptionManager(TopicManager tm, PersistenceManager pm, - DeliveryManager dm) { - if (conf.isStandalone()) { - return new InMemorySubscriptionManager(conf, tm, pm, dm, scheduler); - } else { - return new MMSubscriptionManager(conf, mm, tm, pm, dm, scheduler); - } - - } - - protected RegionManager instantiateRegionManager(PersistenceManager pm, ScheduledExecutorService scheduler) { - return new RegionManager(pm, conf, zk, scheduler, new HedwigHubClientFactory(conf, clientConfiguration, - clientChannelFactory)); - } - - protected void instantiateZookeeperClient() throws Exception { - if (!conf.isStandalone()) { - final CountDownLatch signalZkReady = new CountDownLatch(1); - - zk = new ZooKeeper(conf.getZkHost(), conf.getZkTimeout(), new Watcher() { - @Override - public void process(WatchedEvent event) { - if(Event.KeeperState.SyncConnected.equals(event.getState())) { - signalZkReady.countDown(); - } - } - }); - // wait until connection is effective - if (!signalZkReady.await(conf.getZkTimeout()*2, TimeUnit.MILLISECONDS)) { - logger.error("Could not establish connection with ZooKeeper after zk_timeout*2 = " + - conf.getZkTimeout()*2 + " ms. (Default value for zk_timeout is 2000)."); - throw new Exception("Could not establish connection with ZooKeeper after zk_timeout*2 = " + - conf.getZkTimeout()*2 + " ms. (Default value for zk_timeout is 2000)."); - } - } - } - - protected void instantiateMetadataManagerFactory() throws Exception { - if (conf.isStandalone()) { - return; - } - mm = MetadataManagerFactory.newMetadataManagerFactory(conf, zk); - } - - protected TopicManager instantiateTopicManager() throws IOException { - TopicManager tm; - - if (conf.isStandalone()) { - tm = new TrivialOwnAllTopicManager(conf, scheduler); - } else { - try { - if (conf.isMetadataManagerBasedTopicManagerEnabled()) { - tm = new MMTopicManager(conf, zk, mm, scheduler); - } else { - if (!(mm instanceof ZkMetadataManagerFactory)) { - throw new IOException("Uses " + mm.getClass().getName() + " to store hedwig metadata, " - + "but uses zookeeper ephemeral znodes to store topic ownership. " - + "Check your configuration as this could lead to scalability issues."); - } - tm = new ZkTopicManager(zk, conf, scheduler); - } - } catch (PubSubException e) { - logger.error("Could not instantiate TopicOwnershipManager based topic manager", e); - throw new IOException(e); - } - } - return tm; - } - - protected Map initializeNettyHandlers( - TopicManager tm, DeliveryManager dm, - PersistenceManager pm, SubscriptionManager sm, - SubscriptionChannelManager subChannelMgr) { - Map handlers = new HashMap(); - handlers.put(OperationType.PUBLISH, new PublishHandler(tm, pm, conf)); - handlers.put(OperationType.SUBSCRIBE, - new SubscribeHandler(conf, tm, dm, pm, sm, subChannelMgr)); - handlers.put(OperationType.UNSUBSCRIBE, - new UnsubscribeHandler(conf, tm, sm, dm, subChannelMgr)); - handlers.put(OperationType.CONSUME, new ConsumeHandler(tm, sm, conf)); - handlers.put(OperationType.CLOSESUBSCRIPTION, - new CloseSubscriptionHandler(conf, tm, sm, dm, subChannelMgr)); - handlers = Collections.unmodifiableMap(handlers); - return handlers; - } - - protected void initializeNetty(SslServerContextFactory sslFactory, - Map handlers, - SubscriptionChannelManager subChannelMgr) { - boolean isSSLEnabled = (sslFactory != null) ? true : false; - InternalLoggerFactory.setDefaultFactory(new Log4JLoggerFactory()); - ServerBootstrap bootstrap = new ServerBootstrap(serverChannelFactory); - UmbrellaHandler umbrellaHandler = - new UmbrellaHandler(allChannels, handlers, subChannelMgr, isSSLEnabled); - PubSubServerPipelineFactory pipeline = - new PubSubServerPipelineFactory(umbrellaHandler, sslFactory, - conf.getMaximumMessageSize()); - - bootstrap.setPipelineFactory(pipeline); - bootstrap.setOption("child.tcpNoDelay", true); - bootstrap.setOption("child.keepAlive", true); - bootstrap.setOption("reuseAddress", true); - - // Bind and start to accept incoming connections. - allChannels.add(bootstrap.bind(isSSLEnabled ? new InetSocketAddress(conf.getSSLServerPort()) - : new InetSocketAddress(conf.getServerPort()))); - logger.info("Going into receive loop"); - } - - public void shutdown() { - // TODO: tell bk to close logs - - // Stop topic manager first since it is core of Hub server - tm.stop(); - - // Stop the RegionManager. - rm.stop(); - - // Stop the DeliveryManager and ReadAheadCache threads (if - // applicable). - dm.stop(); - pm.stop(); - - // Stop the SubscriptionManager if needed. - sm.stop(); - - // Shutdown metadata manager if needed - if (null != mm) { - try { - mm.shutdown(); - } catch (IOException ie) { - logger.error("Error while shutdown metadata manager factory!", ie); - } - } - - // Shutdown the ZooKeeper and BookKeeper clients only if we are - // not in stand-alone mode. - try { - if (bk != null) - bk.close(); - if (zk != null) - zk.close(); - } catch (InterruptedException e) { - logger.error("Error while closing ZooKeeper client : ", e); - } catch (BKException bke) { - logger.error("Error while closing BookKeeper client : ", bke); - } - - // Close and release the Netty channels and resources - allChannels.close().awaitUninterruptibly(); - serverChannelFactory.releaseExternalResources(); - clientChannelFactory.releaseExternalResources(); - scheduler.shutdown(); - - // unregister jmx - unregisterJMX(); - } - - protected void registerJMX(SubscriptionChannelManager subChannelMgr) { - try { - String jmxName = JMXNAME_PREFIX + conf.getServerPort() + "_" - + conf.getSSLServerPort(); - jmxServerBean = new PubSubServerBean(jmxName); - HedwigMBeanRegistry.getInstance().register(jmxServerBean, null); - try { - jmxNettyBean = new NettyHandlerBean(subChannelMgr); - HedwigMBeanRegistry.getInstance().register(jmxNettyBean, jmxServerBean); - } catch (Exception e) { - logger.warn("Failed to register with JMX", e); - jmxNettyBean = null; - } - } catch (Exception e) { - logger.warn("Failed to register with JMX", e); - jmxServerBean = null; - } - if (pm instanceof ReadAheadCache) { - ((ReadAheadCache)pm).registerJMX(jmxServerBean); - } - } - - protected void unregisterJMX() { - if (pm != null && pm instanceof ReadAheadCache) { - ((ReadAheadCache)pm).unregisterJMX(); - } - try { - if (jmxNettyBean != null) { - HedwigMBeanRegistry.getInstance().unregister(jmxNettyBean); - } - } catch (Exception e) { - logger.warn("Failed to unregister with JMX", e); - } - try { - if (jmxServerBean != null) { - HedwigMBeanRegistry.getInstance().unregister(jmxServerBean); - } - } catch (Exception e) { - logger.warn("Failed to unregister with JMX", e); - } - jmxNettyBean = null; - jmxServerBean = null; - } - - /** - * Starts the hedwig server on the given port - * - * @param port - * @throws ConfigurationException - * if there is something wrong with the given configuration - * @throws IOException - * @throws InterruptedException - * @throws ConfigurationException - */ - public PubSubServer(final ServerConfiguration serverConfiguration, - final org.apache.hedwig.client.conf.ClientConfiguration clientConfiguration, - final Thread.UncaughtExceptionHandler exceptionHandler) - throws ConfigurationException { - - // First validate the serverConfiguration - this.conf = serverConfiguration; - serverConfiguration.validate(); - - // Validate the client configuration - this.clientConfiguration = clientConfiguration; - clientConfiguration.validate(); - - // We need a custom thread group, so that we can override the uncaught - // exception method - tg = new ThreadGroup("hedwig") { - @Override - public void uncaughtException(Thread t, Throwable e) { - exceptionHandler.uncaughtException(t, e); - } - }; - // ZooKeeper threads register their own handler. But if some work that - // we do in ZK threads throws an exception, we want our handler to be - // called, not theirs. - SafeAsyncCallback.setUncaughtExceptionHandler(exceptionHandler); - } - - public void start() throws Exception { - final SynchronousQueue> queue = new SynchronousQueue>(); - - new Thread(tg, new Runnable() { - @Override - public void run() { - try { - // Since zk is needed by almost everyone,try to see if we - // need that first - ThreadFactoryBuilder tfb = new ThreadFactoryBuilder(); - scheduler = Executors.newSingleThreadScheduledExecutor(tfb - .setNameFormat("PubSubServerScheduler-%d").build()); - serverChannelFactory = new NioServerSocketChannelFactory( - Executors.newCachedThreadPool(tfb.setNameFormat( - "PubSub-Server-NIOBoss-%d").build()), - Executors.newCachedThreadPool(tfb.setNameFormat( - "PubSub-Server-NIOWorker-%d").build())); - clientChannelFactory = new NioClientSocketChannelFactory( - Executors.newCachedThreadPool(tfb.setNameFormat( - "PubSub-Client-NIOBoss-%d").build()), - Executors.newCachedThreadPool(tfb.setNameFormat( - "PubSub-Client-NIOWorker-%d").build())); - - instantiateZookeeperClient(); - instantiateMetadataManagerFactory(); - tm = instantiateTopicManager(); - pm = instantiatePersistenceManager(tm); - dm = new FIFODeliveryManager(tm, pm, conf); - dm.start(); - - sm = instantiateSubscriptionManager(tm, pm, dm); - rm = instantiateRegionManager(pm, scheduler); - sm.addListener(rm); - - allChannels = new DefaultChannelGroup("hedwig"); - // Initialize the Netty Handlers (used by the - // UmbrellaHandler) once so they can be shared by - // both the SSL and non-SSL channels. - SubscriptionChannelManager subChannelMgr = new SubscriptionChannelManager(); - subChannelMgr.addSubChannelDisconnectedListener((SubChannelDisconnectedListener) dm); - Map handlers = - initializeNettyHandlers(tm, dm, pm, sm, subChannelMgr); - // Initialize Netty for the regular non-SSL channels - initializeNetty(null, handlers, subChannelMgr); - if (conf.isSSLEnabled()) { - initializeNetty(new SslServerContextFactory(conf), - handlers, subChannelMgr); - } - // register jmx - registerJMX(subChannelMgr); - } catch (Exception e) { - ConcurrencyUtils.put(queue, Either.right(e)); - return; - } - - ConcurrencyUtils.put(queue, Either.of(new Object(), (Exception) null)); - } - - }).start(); - - Either either = ConcurrencyUtils.take(queue); - if (either.left() == null) { - throw either.right(); - } - } - - public PubSubServer(ServerConfiguration serverConfiguration, - org.apache.hedwig.client.conf.ClientConfiguration clientConfiguration) throws Exception { - this(serverConfiguration, clientConfiguration, new TerminateJVMExceptionHandler()); - } - - public PubSubServer(ServerConfiguration serverConfiguration) throws Exception { - this(serverConfiguration, new org.apache.hedwig.client.conf.ClientConfiguration()); - } - - @VisibleForTesting - public DeliveryManager getDeliveryManager() { - return dm; - } - - /** - * - * @param msg - * @param rc - * : code to exit with - */ - public static void errorMsgAndExit(String msg, Throwable t, int rc) { - logger.error(msg, t); - System.err.println(msg); - System.exit(rc); - } - - public final static int RC_INVALID_CONF_FILE = 1; - public final static int RC_MISCONFIGURED = 2; - public final static int RC_OTHER = 3; - - /** - * @param args - */ - public static void main(String[] args) { - - logger.info("Attempting to start Hedwig"); - ServerConfiguration serverConfiguration = new ServerConfiguration(); - // The client configuration for the hedwig client in the region manager. - org.apache.hedwig.client.conf.ClientConfiguration regionMgrClientConfiguration - = new org.apache.hedwig.client.conf.ClientConfiguration(); - if (args.length > 0) { - String confFile = args[0]; - try { - serverConfiguration.loadConf(new File(confFile).toURI().toURL()); - } catch (MalformedURLException e) { - String msg = "Could not open server configuration file: " + confFile; - errorMsgAndExit(msg, e, RC_INVALID_CONF_FILE); - } catch (ConfigurationException e) { - String msg = "Malformed server configuration file: " + confFile; - errorMsgAndExit(msg, e, RC_MISCONFIGURED); - } - logger.info("Using configuration file " + confFile); - } - if (args.length > 1) { - // args[1] is the client configuration file. - String confFile = args[1]; - try { - regionMgrClientConfiguration.loadConf(new File(confFile).toURI().toURL()); - } catch (MalformedURLException e) { - String msg = "Could not open client configuration file: " + confFile; - errorMsgAndExit(msg, e, RC_INVALID_CONF_FILE); - } catch (ConfigurationException e) { - String msg = "Malformed client configuration file: " + confFile; - errorMsgAndExit(msg, e, RC_MISCONFIGURED); - } - } - try { - new PubSubServer(serverConfiguration, regionMgrClientConfiguration).start(); - } catch (Throwable t) { - errorMsgAndExit("Error during startup", t, RC_OTHER); - } - } -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/netty/PubSubServerBean.java ---------------------------------------------------------------------- diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/netty/PubSubServerBean.java b/hedwig-server/src/main/java/org/apache/hedwig/server/netty/PubSubServerBean.java deleted file mode 100644 index c1acbbc..0000000 --- a/hedwig-server/src/main/java/org/apache/hedwig/server/netty/PubSubServerBean.java +++ /dev/null @@ -1,83 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hedwig.server.netty; - -import org.apache.hedwig.server.jmx.HedwigMBeanInfo; -import org.apache.hedwig.server.netty.ServerStats.OpStatData; - -import org.apache.hedwig.protocol.PubSubProtocol.OperationType; - -/** - * PubSub Server Bean - */ -public class PubSubServerBean implements PubSubServerMXBean, HedwigMBeanInfo { - - private final String name; - - public PubSubServerBean(String jmxName) { - this.name = jmxName; - } - - @Override - public String getName() { - return name; - } - - @Override - public boolean isHidden() { - return false; - } - - @Override - public OpStatData getPubStats() { - return ServerStats.getInstance().getOpStats(OperationType.PUBLISH).toOpStatData(); - } - - @Override - public OpStatData getSubStats() { - return ServerStats.getInstance().getOpStats(OperationType.SUBSCRIBE).toOpStatData(); - } - - @Override - public OpStatData getUnsubStats() { - return ServerStats.getInstance().getOpStats(OperationType.UNSUBSCRIBE).toOpStatData(); - } - - @Override - public OpStatData getConsumeStats() { - return ServerStats.getInstance().getOpStats(OperationType.CONSUME).toOpStatData(); - } - - @Override - public long getNumRequestsReceived() { - return ServerStats.getInstance().getNumRequestsReceived(); - } - - @Override - public long getNumRequestsRedirect() { - return ServerStats.getInstance().getNumRequestsRedirect(); - } - - @Override - public long getNumMessagesDelivered() { - return ServerStats.getInstance().getNumMessagesDelivered(); - } - - -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/netty/PubSubServerMXBean.java ---------------------------------------------------------------------- diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/netty/PubSubServerMXBean.java b/hedwig-server/src/main/java/org/apache/hedwig/server/netty/PubSubServerMXBean.java deleted file mode 100644 index 15e860f..0000000 --- a/hedwig-server/src/main/java/org/apache/hedwig/server/netty/PubSubServerMXBean.java +++ /dev/null @@ -1,63 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hedwig.server.netty; - -import org.apache.hedwig.server.netty.ServerStats.OpStatData; - -/** - * PubSub Server MBean - */ -public interface PubSubServerMXBean { - - /** - * @return publish stats - */ - public OpStatData getPubStats(); - - /** - * @return subscription stats - */ - public OpStatData getSubStats(); - - /** - * @return unsub stats - */ - public OpStatData getUnsubStats(); - - /** - * @return consume stats - */ - public OpStatData getConsumeStats(); - - /** - * @return number of requests received - */ - public long getNumRequestsReceived(); - - /** - * @return number of requests redirect - */ - public long getNumRequestsRedirect(); - - /** - * @return number of messages delivered - */ - public long getNumMessagesDelivered(); - -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/netty/PubSubServerPipelineFactory.java ---------------------------------------------------------------------- diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/netty/PubSubServerPipelineFactory.java b/hedwig-server/src/main/java/org/apache/hedwig/server/netty/PubSubServerPipelineFactory.java deleted file mode 100644 index c96f438..0000000 --- a/hedwig-server/src/main/java/org/apache/hedwig/server/netty/PubSubServerPipelineFactory.java +++ /dev/null @@ -1,76 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.server.netty; - -import org.jboss.netty.channel.ChannelPipeline; -import org.jboss.netty.channel.ChannelPipelineFactory; -import org.jboss.netty.channel.Channels; -import org.jboss.netty.handler.codec.frame.LengthFieldBasedFrameDecoder; -import org.jboss.netty.handler.codec.frame.LengthFieldPrepender; -import org.jboss.netty.handler.codec.protobuf.ProtobufDecoder; -import org.jboss.netty.handler.codec.protobuf.ProtobufEncoder; -import org.jboss.netty.handler.ssl.SslHandler; - -import org.apache.hedwig.protocol.PubSubProtocol; -import org.apache.hedwig.server.ssl.SslServerContextFactory; - -public class PubSubServerPipelineFactory implements ChannelPipelineFactory { - - // TODO: make these conf settings - final static int MAX_WORKER_THREADS = 32; - final static int MAX_CHANNEL_MEMORY_SIZE = 10 * 1024 * 1024; - final static int MAX_TOTAL_MEMORY_SIZE = 100 * 1024 * 1024; - - private UmbrellaHandler uh; - private SslServerContextFactory sslFactory; - private int maxMessageSize; - - /** - * - * @param uh - * @param sslFactory - * may be null if ssl is disabled - * @param cfg - */ - public PubSubServerPipelineFactory(UmbrellaHandler uh, SslServerContextFactory sslFactory, int maxMessageSize) { - this.uh = uh; - this.sslFactory = sslFactory; - this.maxMessageSize = maxMessageSize; - } - - public ChannelPipeline getPipeline() throws Exception { - ChannelPipeline pipeline = Channels.pipeline(); - if (sslFactory != null) { - pipeline.addLast("ssl", new SslHandler(sslFactory.getEngine())); - } - pipeline.addLast("lengthbaseddecoder", - new LengthFieldBasedFrameDecoder(maxMessageSize, 0, 4, 0, 4)); - pipeline.addLast("lengthprepender", new LengthFieldPrepender(4)); - - pipeline.addLast("protobufdecoder", new ProtobufDecoder(PubSubProtocol.PubSubRequest.getDefaultInstance())); - pipeline.addLast("protobufencoder", new ProtobufEncoder()); - - // pipeline.addLast("executor", new ExecutionHandler( - // new OrderedMemoryAwareThreadPoolExecutor(MAX_WORKER_THREADS, - // MAX_CHANNEL_MEMORY_SIZE, MAX_TOTAL_MEMORY_SIZE))); - // - // Dependency injection. - pipeline.addLast("umbrellahandler", uh); - return pipeline; - } -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/netty/ServerStats.java ---------------------------------------------------------------------- diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/netty/ServerStats.java b/hedwig-server/src/main/java/org/apache/hedwig/server/netty/ServerStats.java deleted file mode 100644 index 69ee6ef..0000000 --- a/hedwig-server/src/main/java/org/apache/hedwig/server/netty/ServerStats.java +++ /dev/null @@ -1,202 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hedwig.server.netty; - -import java.util.HashMap; -import java.util.Map; - -import java.beans.ConstructorProperties; -import java.util.concurrent.atomic.AtomicLong; - -import org.apache.hedwig.protocol.PubSubProtocol.OperationType; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Server Stats - */ -public class ServerStats { - private static final Logger LOG = LoggerFactory.getLogger(ServerStats.class); - static ServerStats instance = new ServerStats(); - - /** - * A read view of stats, also used in CompositeViewData to expose to JMX - */ - public static class OpStatData { - private final long maxLatency, minLatency; - private final double avgLatency; - private final long numSuccessOps, numFailedOps; - private final String latencyHist; - - @ConstructorProperties({"maxLatency", "minLatency", "avgLatency", - "numSuccessOps", "numFailedOps", "latencyHist"}) - public OpStatData(long maxLatency, long minLatency, double avgLatency, - long numSuccessOps, long numFailedOps, String latencyHist) { - this.maxLatency = maxLatency; - this.minLatency = minLatency == Long.MAX_VALUE ? 0 : minLatency; - this.avgLatency = avgLatency; - this.numSuccessOps = numSuccessOps; - this.numFailedOps = numFailedOps; - this.latencyHist = latencyHist; - } - - public long getMaxLatency() { - return maxLatency; - } - - public long getMinLatency() { - return minLatency; - } - - public double getAvgLatency() { - return avgLatency; - } - - public long getNumSuccessOps() { - return numSuccessOps; - } - - public long getNumFailedOps() { - return numFailedOps; - } - - public String getLatencyHist() { - return latencyHist; - } - } - - /** - * Operation Statistics - */ - public static class OpStats { - static final int NUM_BUCKETS = 3*9 + 2; - - long maxLatency = 0; - long minLatency = Long.MAX_VALUE; - double totalLatency = 0.0f; - long numSuccessOps = 0; - long numFailedOps = 0; - long[] latencyBuckets = new long[NUM_BUCKETS]; - - OpStats() {} - - /** - * Increment number of failed operations - */ - synchronized public void incrementFailedOps() { - ++numFailedOps; - } - - /** - * Update Latency - */ - synchronized public void updateLatency(long latency) { - if (latency < 0) { - // less than 0ms . Ideally this should not happen. - // We have seen this latency negative in some cases due to the - // behaviors of JVM. Ignoring the statistics updation for such - // cases. - LOG.warn("Latency time coming negative"); - return; - } - totalLatency += latency; - ++numSuccessOps; - if (latency < minLatency) { - minLatency = latency; - } - if (latency > maxLatency) { - maxLatency = latency; - } - int bucket; - if (latency <= 100) { // less than 100ms - bucket = (int)(latency / 10); - } else if (latency <= 1000) { // 100ms ~ 1000ms - bucket = 1 * 9 + (int)(latency / 100); - } else if (latency <= 10000) { // 1s ~ 10s - bucket = 2 * 9 + (int)(latency / 1000); - } else { // more than 10s - bucket = 3 * 9 + 1; - } - ++latencyBuckets[bucket]; - } - - synchronized public OpStatData toOpStatData() { - double avgLatency = numSuccessOps > 0 ? totalLatency / numSuccessOps : 0.0f; - StringBuilder sb = new StringBuilder(); - for (int i=0; i(); - for (OperationType type : OperationType.values()) { - stats.put(type, new OpStats()); - } - } - Map stats; - - - AtomicLong numRequestsReceived = new AtomicLong(0); - AtomicLong numRequestsRedirect = new AtomicLong(0); - AtomicLong numMessagesDelivered = new AtomicLong(0); - - /** - * Stats of operations - * - * @param type - * Operation Type - * @return op stats - */ - public OpStats getOpStats(OperationType type) { - return stats.get(type); - } - - public void incrementRequestsReceived() { - numRequestsReceived.incrementAndGet(); - } - - public void incrementRequestsRedirect() { - numRequestsRedirect.incrementAndGet(); - } - - public void incrementMessagesDelivered() { - numMessagesDelivered.incrementAndGet(); - } - - public long getNumRequestsReceived() { - return numRequestsReceived.get(); - } - - public long getNumRequestsRedirect() { - return numRequestsRedirect.get(); - } - - public long getNumMessagesDelivered() { - return numMessagesDelivered.get(); - } -}