Return-Path: X-Original-To: apmail-bookkeeper-commits-archive@www.apache.org Delivered-To: apmail-bookkeeper-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 453E41985E for ; Wed, 16 Mar 2016 03:44:15 +0000 (UTC) Received: (qmail 50679 invoked by uid 500); 16 Mar 2016 03:44:15 -0000 Delivered-To: apmail-bookkeeper-commits-archive@bookkeeper.apache.org Received: (qmail 50601 invoked by uid 500); 16 Mar 2016 03:44:15 -0000 Mailing-List: contact commits-help@bookkeeper.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: bookkeeper-dev@bookkeeper.apache.org Delivered-To: mailing list commits@bookkeeper.apache.org Received: (qmail 49850 invoked by uid 99); 16 Mar 2016 03:44:12 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 16 Mar 2016 03:44:12 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 2B645E01BA; Wed, 16 Mar 2016 03:44:12 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sijie@apache.org To: commits@bookkeeper.apache.org Date: Wed, 16 Mar 2016 03:44:20 -0000 Message-Id: <447626ca44f149a38b9a078f74b8ff46@git.apache.org> In-Reply-To: <5d2e3d025bc84b8f8013a1ae9dbb9498@git.apache.org> References: <5d2e3d025bc84b8f8013a1ae9dbb9498@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [10/49] bookkeeper git commit: BOOKKEEPER-769: Remove the Hedwig Code http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/AbstractSubscriptionManager.java ---------------------------------------------------------------------- diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/AbstractSubscriptionManager.java b/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/AbstractSubscriptionManager.java deleted file mode 100644 index eaed39d..0000000 --- a/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/AbstractSubscriptionManager.java +++ /dev/null @@ -1,798 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.server.subscriptions; - -import java.util.ArrayList; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Timer; -import java.util.TimerTask; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ScheduledExecutorService; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import com.google.protobuf.ByteString; -import org.apache.bookkeeper.versioning.Version; -import org.apache.hedwig.exceptions.PubSubException; -import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId; -import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest; -import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionData; -import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionPreferences; -import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionState; -import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach; -import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionEvent; -import org.apache.hedwig.protoextensions.MessageIdUtils; -import org.apache.hedwig.protoextensions.SubscriptionStateUtils; -import org.apache.hedwig.server.common.ServerConfiguration; -import org.apache.hedwig.server.common.TopicOpQueuer; -import org.apache.hedwig.server.delivery.DeliveryManager; -import org.apache.hedwig.server.persistence.PersistenceManager; -import org.apache.hedwig.server.topics.TopicManager; -import org.apache.hedwig.server.topics.TopicOwnershipChangeListener; -import org.apache.hedwig.util.Callback; -import org.apache.hedwig.util.CallbackUtils; -import org.apache.hedwig.util.ConcurrencyUtils; - -public abstract class AbstractSubscriptionManager implements SubscriptionManager, TopicOwnershipChangeListener { - - private static final Logger logger = LoggerFactory.getLogger(AbstractSubscriptionManager.class); - - protected final ServerConfiguration cfg; - protected final ConcurrentHashMap> top2sub2seq = - new ConcurrentHashMap>(); - protected final TopicOpQueuer queuer; - private final ArrayList listeners = new ArrayList(); - - // Handle to the DeliveryManager for the server so we can stop serving subscribers - // when losing topics - private final DeliveryManager dm; - // Handle to the PersistenceManager for the server so we can pass along the - // message consume pointers for each topic. - private final PersistenceManager pm; - // Timer for running a recurring thread task to get the minimum message - // sequence ID for each topic that all subscribers for it have consumed - // already. With that information, we can call the PersistenceManager to - // update it on the messages that are safe to be garbage collected. - private final Timer timer = new Timer(true); - // In memory mapping of topics to the minimum consumed message sequence ID - // for all subscribers to the topic. - private final ConcurrentHashMap topic2MinConsumedMessagesMap = new ConcurrentHashMap(); - - protected final Callback noopCallback = new NoopCallback(); - - static class NoopCallback implements Callback { - @Override - public void operationFailed(Object ctx, PubSubException exception) { - logger.warn("Exception found in AbstractSubscriptionManager : ", exception); - } - - public void operationFinished(Object ctx, T resultOfOperation) { - }; - } - - public AbstractSubscriptionManager(ServerConfiguration cfg, TopicManager tm, - PersistenceManager pm, DeliveryManager dm, - ScheduledExecutorService scheduler) { - this.cfg = cfg; - queuer = new TopicOpQueuer(scheduler); - tm.addTopicOwnershipChangeListener(this); - this.pm = pm; - this.dm = dm; - // Schedule the recurring MessagesConsumedTask only if a - // PersistenceManager is passed. - if (pm != null) { - timer.schedule(new MessagesConsumedTask(), 0, cfg.getMessagesConsumedThreadRunInterval()); - } - } - - /** - * This is the Timer Task for finding out for each topic, what the minimum - * consumed message by the subscribers are. This information is used to pass - * along to the server's PersistenceManager so it can garbage collect older - * topic messages that are no longer needed by the subscribers. - */ - class MessagesConsumedTask extends TimerTask { - /** - * Implement the TimerTask's abstract run method. - */ - @Override - public void run() { - // We are looping through relatively small in memory data structures - // so it should be safe to run this fairly often. - for (ByteString topic : top2sub2seq.keySet()) { - final Map topicSubscriptions = top2sub2seq.get(topic); - if (topicSubscriptions == null) { - continue; - } - - long minConsumedMessage = Long.MAX_VALUE; - boolean hasBound = true; - // Loop through all subscribers on the current topic to find the - // minimum persisted message id. The reason not using in-memory - // consumed message id is LedgerRangs and InMemorySubscriptionState - // may be inconsistent in case of a server crash. - for (InMemorySubscriptionState curSubscription : topicSubscriptions.values()) { - if (curSubscription.getLastPersistedSeqId() < minConsumedMessage) { - minConsumedMessage = curSubscription.getLastPersistedSeqId(); - } - hasBound = hasBound && curSubscription.getSubscriptionPreferences().hasMessageBound(); - } - boolean callPersistenceManager = true; - // Call the PersistenceManager if nobody subscribes to the topic - // yet, or the consume pointer has moved ahead since the last - // time, or if this is the initial subscription. - Long minConsumedFromMap = topic2MinConsumedMessagesMap.get(topic); - if (topicSubscriptions.isEmpty() - || (minConsumedFromMap != null && minConsumedFromMap < minConsumedMessage) - || (minConsumedFromMap == null && minConsumedMessage != 0)) { - // Replace or put the new min consumed value. If it has changed - // do nothing, as another thread has updated the min consumed message - if ((minConsumedFromMap != null - && (topic2MinConsumedMessagesMap.replace(topic, minConsumedFromMap, - minConsumedMessage))) - || (topic2MinConsumedMessagesMap.putIfAbsent(topic, minConsumedMessage) == null)) { - pm.consumedUntil(topic, minConsumedMessage); - } - } else if (hasBound) { - pm.consumeToBound(topic); - } - } - } - } - - private class AcquireOp extends TopicOpQueuer.AsynchronousOp { - public AcquireOp(ByteString topic, Callback callback, Object ctx) { - queuer.super(topic, callback, ctx); - } - - @Override - public void run() { - if (top2sub2seq.containsKey(topic)) { - cb.operationFinished(ctx, null); - return; - } - - readSubscriptions(topic, new Callback>() { - @Override - public void operationFailed(Object ctx, PubSubException exception) { - cb.operationFailed(ctx, exception); - } - - @Override - public void operationFinished(final Object ctx, - final Map resultOfOperation) { - // We've just inherited a bunch of subscriber for this - // topic, some of which may be local. If they are, then we - // need to (1) notify listeners of this and (2) record the - // number for bookkeeping so that future - // subscribes/unsubscribes can efficiently notify listeners. - - // The final "commit" (and "abort") operations. - final Callback cb2 = new Callback() { - - @Override - public void operationFailed(Object ctx, PubSubException exception) { - logger.error("Subscription manager failed to acquired topic " + topic.toStringUtf8(), - exception); - cb.operationFailed(ctx, null); - } - - @Override - public void operationFinished(Object ctx, Void voidObj) { - top2sub2seq.put(topic, resultOfOperation); - logger.info("Subscription manager successfully acquired topic: " + topic.toStringUtf8()); - cb.operationFinished(ctx, null); - } - - }; - - // Notify listeners if necessary. - if (hasLocalSubscriptions(resultOfOperation)) { - notifyFirstLocalSubscribe(topic, false, cb2, ctx); - } else { - cb2.operationFinished(ctx, null); - } - - updateMessageBound(topic); - } - - }, ctx); - - } - - } - - private void notifyFirstLocalSubscribe(ByteString topic, boolean synchronous, final Callback cb, final Object ctx) { - Callback mcb = CallbackUtils.multiCallback(listeners.size(), cb, ctx); - for (SubscriptionEventListener listener : listeners) { - listener.onFirstLocalSubscribe(topic, synchronous, mcb); - } - } - - /** - * Figure out who is subscribed. Do nothing if already acquired. If there's - * an error reading the subscribers' sequence IDs, then the topic is not - * acquired. - * - * @param topic - * @param callback - * @param ctx - */ - @Override - public void acquiredTopic(final ByteString topic, final Callback callback, Object ctx) { - queuer.pushAndMaybeRun(topic, new AcquireOp(topic, callback, ctx)); - } - - class ReleaseOp extends TopicOpQueuer.AsynchronousOp { - - public ReleaseOp(final ByteString topic, final Callback cb, Object ctx) { - queuer.super(topic, cb, ctx); - } - - @Override - public void run() { - Callback finalCb = new Callback() { - @Override - public void operationFinished(Object ctx, Void resultOfOperation) { - logger.info("Finished update subscription states when losting topic " - + topic.toStringUtf8()); - finish(); - } - - @Override - public void operationFailed(Object ctx, - PubSubException exception) { - logger.warn("Error when releasing topic : " + topic.toStringUtf8(), exception); - finish(); - } - - private void finish() { - // tell delivery manager to stop delivery for subscriptions of this topic - final Map topicSubscriptions = top2sub2seq.remove(topic); - // no subscriptions now, it may be removed by other release ops - if (null != topicSubscriptions) { - for (ByteString subId : topicSubscriptions.keySet()) { - if (logger.isDebugEnabled()) { - logger.debug("Stop serving subscriber (" + topic.toStringUtf8() + ", " - + subId.toStringUtf8() + ") when losing topic"); - } - if (null != dm) { - dm.stopServingSubscriber(topic, subId, SubscriptionEvent.TOPIC_MOVED, - noopCallback, null); - } - } - } - if (logger.isDebugEnabled()) { - logger.debug("Stop serving topic " + topic.toStringUtf8()); - } - // Since we decrement local count when some of remote subscriptions failed, - // while we don't unsubscribe those succeed subscriptions. so we can't depends - // on local count, just try to notify unsubscribe. - notifyLastLocalUnsubscribe(topic); - cb.operationFinished(ctx, null); - } - }; - if (logger.isDebugEnabled()) { - logger.debug("Try to update subscription states when losing topic " + topic.toStringUtf8()); - } - updateSubscriptionStates(topic, finalCb, ctx); - } - } - - void updateSubscriptionStates(ByteString topic, Callback finalCb, Object ctx) { - // Try to update subscription states of a specified topic - Map states = top2sub2seq.get(topic); - if (null == states) { - finalCb.operationFinished(ctx, null); - } else { - Callback mcb = CallbackUtils.multiCallback(states.size(), finalCb, ctx); - for (Entry entry : states.entrySet()) { - InMemorySubscriptionState memState = entry.getValue(); - if (memState.setLastConsumeSeqIdImmediately()) { - updateSubscriptionState(topic, entry.getKey(), memState, mcb, ctx); - } else { - mcb.operationFinished(ctx, null); - } - } - } - } - - /** - * Remove the local mapping. - */ - @Override - public void lostTopic(ByteString topic) { - queuer.pushAndMaybeRun(topic, new ReleaseOp(topic, noopCallback, null)); - } - - private void notifyLastLocalUnsubscribe(ByteString topic) { - for (SubscriptionEventListener listener : listeners) - listener.onLastLocalUnsubscribe(topic); - } - - protected abstract void readSubscriptions(final ByteString topic, - final Callback> cb, final Object ctx); - - protected abstract void readSubscriptionData(final ByteString topic, final ByteString subscriberId, - final Callback cb, Object ctx); - - private class SubscribeOp extends TopicOpQueuer.AsynchronousOp { - SubscribeRequest subRequest; - MessageSeqId consumeSeqId; - - public SubscribeOp(ByteString topic, SubscribeRequest subRequest, MessageSeqId consumeSeqId, - Callback callback, Object ctx) { - queuer.super(topic, callback, ctx); - this.subRequest = subRequest; - this.consumeSeqId = consumeSeqId; - } - - @Override - public void run() { - - final Map topicSubscriptions = top2sub2seq.get(topic); - if (topicSubscriptions == null) { - cb.operationFailed(ctx, new PubSubException.ServerNotResponsibleForTopicException("")); - return; - } - - final ByteString subscriberId = subRequest.getSubscriberId(); - final InMemorySubscriptionState subscriptionState = topicSubscriptions.get(subscriberId); - CreateOrAttach createOrAttach = subRequest.getCreateOrAttach(); - - if (subscriptionState != null) { - - if (createOrAttach.equals(CreateOrAttach.CREATE)) { - String msg = "Topic: " + topic.toStringUtf8() + " subscriberId: " + subscriberId.toStringUtf8() - + " requested creating a subscription but it is already subscribed with state: " - + SubscriptionStateUtils.toString(subscriptionState.getSubscriptionState()); - logger.error(msg); - cb.operationFailed(ctx, new PubSubException.ClientAlreadySubscribedException(msg)); - return; - } - - // Subscription existed before, check whether new preferences provided - // if new preferences provided, merged the subscription data and updated them - // TODO: needs ACL mechanism when changing preferences - if (subRequest.hasPreferences() && - subscriptionState.updatePreferences(subRequest.getPreferences())) { - updateSubscriptionPreferences(topic, subscriberId, subscriptionState, new Callback() { - @Override - public void operationFailed(Object ctx, PubSubException exception) { - cb.operationFailed(ctx, exception); - } - - @Override - public void operationFinished(Object ctx, Void resultOfOperation) { - if (logger.isDebugEnabled()) { - logger.debug("Topic: " + topic.toStringUtf8() + " subscriberId: " + subscriberId.toStringUtf8() - + " attaching to subscription with state: " - + SubscriptionStateUtils.toString(subscriptionState.getSubscriptionState()) - + ", with preferences: " - + SubscriptionStateUtils.toString(subscriptionState.getSubscriptionPreferences())); - } - // update message bound if necessary - updateMessageBound(topic); - cb.operationFinished(ctx, subscriptionState.toSubscriptionData()); - } - }, ctx); - return; - } - - // otherwise just attach - if (logger.isDebugEnabled()) { - logger.debug("Topic: " + topic.toStringUtf8() + " subscriberId: " + subscriberId.toStringUtf8() - + " attaching to subscription with state: " - + SubscriptionStateUtils.toString(subscriptionState.getSubscriptionState()) - + ", with preferences: " - + SubscriptionStateUtils.toString(subscriptionState.getSubscriptionPreferences())); - } - - cb.operationFinished(ctx, subscriptionState.toSubscriptionData()); - return; - } - - // we don't have a mapping for this subscriber - if (createOrAttach.equals(CreateOrAttach.ATTACH)) { - String msg = "Topic: " + topic.toStringUtf8() + " subscriberId: " + subscriberId.toStringUtf8() - + " requested attaching to an existing subscription but it is not subscribed"; - logger.error(msg); - cb.operationFailed(ctx, new PubSubException.ClientNotSubscribedException(msg)); - return; - } - - // now the hard case, this is a brand new subscription, must record - SubscriptionState.Builder stateBuilder = SubscriptionState.newBuilder().setMsgId(consumeSeqId); - - SubscriptionPreferences.Builder preferencesBuilder; - if (subRequest.hasPreferences()) { - preferencesBuilder = SubscriptionPreferences.newBuilder(subRequest.getPreferences()); - } else { - preferencesBuilder = SubscriptionPreferences.newBuilder(); - } - - // backward compability - if (subRequest.hasMessageBound()) { - preferencesBuilder = preferencesBuilder.setMessageBound(subRequest.getMessageBound()); - } - - SubscriptionData.Builder subDataBuilder = - SubscriptionData.newBuilder().setState(stateBuilder).setPreferences(preferencesBuilder); - final SubscriptionData subData = subDataBuilder.build(); - - createSubscriptionData(topic, subscriberId, subData, new Callback() { - @Override - public void operationFailed(Object ctx, PubSubException exception) { - cb.operationFailed(ctx, exception); - } - - @Override - public void operationFinished(Object ctx, final Version version) { - Callback cb2 = new Callback() { - @Override - public void operationFailed(final Object ctx, final PubSubException exception) { - logger.error("subscription for subscriber " + subscriberId.toStringUtf8() + " to topic " - + topic.toStringUtf8() + " failed due to failed listener callback", exception); - // should remove subscription when synchronized cross-region subscription failed - deleteSubscriptionData(topic, subscriberId, version, new Callback() { - @Override - public void operationFinished(Object context, - Void resultOfOperation) { - finish(); - } - @Override - public void operationFailed(Object context, - PubSubException ex) { - logger.error("Remove subscription for subscriber " + subscriberId.toStringUtf8() + " to topic " - + topic.toStringUtf8() + " failed : ", ex); - finish(); - } - private void finish() { - cb.operationFailed(ctx, exception); - } - }, ctx); - } - - @Override - public void operationFinished(Object ctx, Void resultOfOperation) { - topicSubscriptions.put(subscriberId, new InMemorySubscriptionState(subData, version)); - - updateMessageBound(topic); - - cb.operationFinished(ctx, subData); - } - - }; - - // if this will be the first local subscription, notifyFirstLocalSubscribe - if (!SubscriptionStateUtils.isHubSubscriber(subRequest.getSubscriberId()) - && !hasLocalSubscriptions(topicSubscriptions)) - notifyFirstLocalSubscribe(topic, subRequest.getSynchronous(), cb2, ctx); - else - cb2.operationFinished(ctx, null); - } - }, ctx); - } - } - - /** - * @return True if the given subscriberId-to-subscriberState map contains a local subscription: - * the vast majority of subscriptions are local, so we will quickly encounter one if it exists. - */ - private static boolean hasLocalSubscriptions(Map topicSubscriptions) { - for (ByteString subId : topicSubscriptions.keySet()) - if (!SubscriptionStateUtils.isHubSubscriber(subId)) - return true; - return false; - } - - public void updateMessageBound(ByteString topic) { - final Map topicSubscriptions = top2sub2seq.get(topic); - if (topicSubscriptions == null) { - return; - } - int maxBound = Integer.MIN_VALUE; - for (Map.Entry e : topicSubscriptions.entrySet()) { - if (!e.getValue().getSubscriptionPreferences().hasMessageBound()) { - maxBound = Integer.MIN_VALUE; - break; - } else { - maxBound = Math.max(maxBound, e.getValue().getSubscriptionPreferences().getMessageBound()); - } - } - if (maxBound == Integer.MIN_VALUE) { - pm.clearMessageBound(topic); - } else { - pm.setMessageBound(topic, maxBound); - } - } - - @Override - public void serveSubscribeRequest(ByteString topic, SubscribeRequest subRequest, MessageSeqId consumeSeqId, - Callback callback, Object ctx) { - queuer.pushAndMaybeRun(topic, new SubscribeOp(topic, subRequest, consumeSeqId, callback, ctx)); - } - - private class ConsumeOp extends TopicOpQueuer.AsynchronousOp { - ByteString subscriberId; - MessageSeqId consumeSeqId; - - public ConsumeOp(ByteString topic, ByteString subscriberId, MessageSeqId consumeSeqId, Callback callback, - Object ctx) { - queuer.super(topic, callback, ctx); - this.subscriberId = subscriberId; - this.consumeSeqId = consumeSeqId; - } - - @Override - public void run() { - Map topicSubs = top2sub2seq.get(topic); - if (topicSubs == null) { - cb.operationFinished(ctx, null); - return; - } - - final InMemorySubscriptionState subState = topicSubs.get(subscriberId); - if (subState == null) { - cb.operationFinished(ctx, null); - return; - } - - if (subState.setLastConsumeSeqId(consumeSeqId, cfg.getConsumeInterval())) { - updateSubscriptionState(topic, subscriberId, subState, new Callback() { - @Override - public void operationFinished(Object ctx, Void resultOfOperation) { - subState.setLastPersistedSeqId(consumeSeqId.getLocalComponent()); - cb.operationFinished(ctx, resultOfOperation); - } - - @Override - public void operationFailed(Object ctx, PubSubException exception) { - cb.operationFailed(ctx, exception); - } - }, ctx); - } else { - if (logger.isDebugEnabled()) { - logger.debug("Only advanced consume pointer in memory, will persist later, topic: " - + topic.toStringUtf8() + " subscriberId: " + subscriberId.toStringUtf8() - + " persistentState: " + SubscriptionStateUtils.toString(subState.getSubscriptionState()) - + " in-memory consume-id: " - + MessageIdUtils.msgIdToReadableString(subState.getLastConsumeSeqId())); - } - cb.operationFinished(ctx, null); - } - // tell delivery manage about the consume event - if (null != dm) { - dm.messageConsumed(topic, subscriberId, consumeSeqId); - } - } - } - - @Override - public void setConsumeSeqIdForSubscriber(ByteString topic, ByteString subscriberId, MessageSeqId consumeSeqId, - Callback callback, Object ctx) { - queuer.pushAndMaybeRun(topic, new ConsumeOp(topic, subscriberId, consumeSeqId, callback, ctx)); - } - - private class CloseSubscriptionOp extends TopicOpQueuer.AsynchronousOp { - - public CloseSubscriptionOp(ByteString topic, ByteString subscriberId, - Callback callback, Object ctx) { - queuer.super(topic, callback, ctx); - } - - @Override - public void run() { - // TODO: BOOKKEEPER-412: we might need to move the loaded subscription - // to reclaim memory - // But for now we do nothing - cb.operationFinished(ctx, null); - } - } - - @Override - public void closeSubscription(ByteString topic, ByteString subscriberId, - Callback callback, Object ctx) { - queuer.pushAndMaybeRun(topic, new CloseSubscriptionOp(topic, subscriberId, callback, ctx)); - } - - private class UnsubscribeOp extends TopicOpQueuer.AsynchronousOp { - ByteString subscriberId; - - public UnsubscribeOp(ByteString topic, ByteString subscriberId, Callback callback, Object ctx) { - queuer.super(topic, callback, ctx); - this.subscriberId = subscriberId; - } - - @Override - public void run() { - final Map topicSubscriptions = top2sub2seq.get(topic); - if (topicSubscriptions == null) { - cb.operationFailed(ctx, new PubSubException.ServerNotResponsibleForTopicException("")); - return; - } - - if (!topicSubscriptions.containsKey(subscriberId)) { - cb.operationFailed(ctx, new PubSubException.ClientNotSubscribedException("")); - return; - } - - deleteSubscriptionData(topic, subscriberId, topicSubscriptions.get(subscriberId).getVersion(), - new Callback() { - @Override - public void operationFailed(Object ctx, PubSubException exception) { - cb.operationFailed(ctx, exception); - } - - @Override - public void operationFinished(Object ctx, Void resultOfOperation) { - topicSubscriptions.remove(subscriberId); - // Notify listeners if necessary. - if (!SubscriptionStateUtils.isHubSubscriber(subscriberId) - && !hasLocalSubscriptions(topicSubscriptions)) - notifyLastLocalUnsubscribe(topic); - - updateMessageBound(topic); - cb.operationFinished(ctx, null); - } - }, ctx); - - } - - } - - @Override - public void unsubscribe(ByteString topic, ByteString subscriberId, Callback callback, Object ctx) { - queuer.pushAndMaybeRun(topic, new UnsubscribeOp(topic, subscriberId, callback, ctx)); - } - - /** - * Not thread-safe. - */ - @Override - public void addListener(SubscriptionEventListener listener) { - listeners.add(listener); - } - - /** - * Method to stop this class gracefully including releasing any resources - * used and stopping all threads spawned. - */ - public void stop() { - timer.cancel(); - try { - final LinkedBlockingQueue queue = new LinkedBlockingQueue(); - // update dirty subscriptions - for (ByteString topic : top2sub2seq.keySet()) { - Callback finalCb = new Callback() { - @Override - public void operationFinished(Object ctx, Void resultOfOperation) { - ConcurrencyUtils.put(queue, true); - } - @Override - public void operationFailed(Object ctx, - PubSubException exception) { - ConcurrencyUtils.put(queue, false); - } - }; - updateSubscriptionStates(topic, finalCb, null); - queue.take(); - } - } catch (InterruptedException ie) { - logger.warn("Error during updating subscription states : ", ie); - } - } - - private void updateSubscriptionState(final ByteString topic, final ByteString subscriberId, - final InMemorySubscriptionState state, - final Callback callback, Object ctx) { - SubscriptionData subData; - Callback cb = new Callback() { - @Override - public void operationFinished(Object ctx, Version version) { - state.setVersion(version); - callback.operationFinished(ctx, null); - } - @Override - public void operationFailed(Object ctx, PubSubException exception) { - if (exception instanceof PubSubException.BadVersionException) { - readSubscriptionData(topic, subscriberId, new Callback() { - @Override - public void operationFinished(Object ctx, - InMemorySubscriptionState resultOfOperation) { - state.setVersion(resultOfOperation.getVersion()); - updateSubscriptionState(topic, subscriberId, state, callback, ctx); - } - @Override - public void operationFailed(Object ctx, - PubSubException exception) { - callback.operationFailed(ctx, exception); - } - }, ctx); - - return; - } - callback.operationFailed(ctx, exception); - } - }; - if (isPartialUpdateSupported()) { - subData = SubscriptionData.newBuilder().setState(state.getSubscriptionState()).build(); - updateSubscriptionData(topic, subscriberId, subData, state.getVersion(), cb, ctx); - } else { - subData = state.toSubscriptionData(); - replaceSubscriptionData(topic, subscriberId, subData, state.getVersion(), cb, ctx); - } - } - - private void updateSubscriptionPreferences(final ByteString topic, final ByteString subscriberId, - final InMemorySubscriptionState state, - final Callback callback, Object ctx) { - SubscriptionData subData; - Callback cb = new Callback() { - @Override - public void operationFinished(Object ctx, Version version) { - state.setVersion(version); - callback.operationFinished(ctx, null); - } - @Override - public void operationFailed(Object ctx, PubSubException exception) { - if (exception instanceof PubSubException.BadVersionException) { - readSubscriptionData(topic, subscriberId, new Callback() { - @Override - public void operationFinished(Object ctx, - InMemorySubscriptionState resultOfOperation) { - state.setVersion(resultOfOperation.getVersion()); - updateSubscriptionPreferences(topic, subscriberId, state, callback, ctx); - } - @Override - public void operationFailed(Object ctx, - PubSubException exception) { - callback.operationFailed(ctx, exception); - } - }, ctx); - - return; - } - callback.operationFailed(ctx, exception); - } - }; - if (isPartialUpdateSupported()) { - subData = SubscriptionData.newBuilder().setPreferences(state.getSubscriptionPreferences()).build(); - updateSubscriptionData(topic, subscriberId, subData, state.getVersion(), cb, ctx); - } else { - subData = state.toSubscriptionData(); - replaceSubscriptionData(topic, subscriberId, subData, state.getVersion(), cb, ctx); - } - } - - protected abstract boolean isPartialUpdateSupported(); - - protected abstract void createSubscriptionData(final ByteString topic, ByteString subscriberId, - SubscriptionData data, Callback callback, Object ctx); - - protected abstract void updateSubscriptionData(ByteString topic, ByteString subscriberId, SubscriptionData data, - Version version, Callback callback, Object ctx); - - protected abstract void replaceSubscriptionData(ByteString topic, ByteString subscriberId, SubscriptionData data, - Version version, Callback callback, Object ctx); - - protected abstract void deleteSubscriptionData(ByteString topic, ByteString subscriberId, Version version, Callback callback, - Object ctx); - -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/AllToAllTopologyFilter.java ---------------------------------------------------------------------- diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/AllToAllTopologyFilter.java b/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/AllToAllTopologyFilter.java deleted file mode 100644 index 389ccc9..0000000 --- a/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/AllToAllTopologyFilter.java +++ /dev/null @@ -1,75 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.server.subscriptions; - -import java.io.IOException; - -import com.google.protobuf.ByteString; -import org.apache.commons.configuration.Configuration; -import org.apache.commons.configuration.ConfigurationException; -import org.apache.hedwig.filter.MessageFilterBase; -import org.apache.hedwig.filter.ServerMessageFilter; -import org.apache.hedwig.protocol.PubSubProtocol.Message; -import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionPreferences; -import org.apache.hedwig.protoextensions.SubscriptionStateUtils; -import org.apache.hedwig.server.common.ServerConfiguration; - -public class AllToAllTopologyFilter implements ServerMessageFilter { - - ByteString subscriberRegion; - boolean isHubSubscriber; - - @Override - public ServerMessageFilter initialize(Configuration conf) - throws ConfigurationException, IOException { - String region = conf.getString(ServerConfiguration.REGION, "standalone"); - if (null == region) { - throw new IOException("No region found to run " + getClass().getName()); - } - subscriberRegion = ByteString.copyFromUtf8(region); - return this; - } - - @Override - public void uninitialize() { - // do nothing now - } - - @Override - public MessageFilterBase setSubscriptionPreferences(ByteString topic, ByteString subscriberId, - SubscriptionPreferences preferences) { - isHubSubscriber = SubscriptionStateUtils.isHubSubscriber(subscriberId); - return this; - } - - @Override - public boolean testMessage(Message message) { - // We're using a simple all-to-all network topology, so no region - // should ever need to forward messages to any other region. - // Otherwise, with the current logic, messages will end up - // ping-pong-ing back and forth between regions with subscriptions - // to each other without termination (or in any other cyclic - // configuration). - if (isHubSubscriber && !message.getSrcRegion().equals(subscriberRegion)) { - return false; - } else { - return true; - } - } - -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/InMemorySubscriptionManager.java ---------------------------------------------------------------------- diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/InMemorySubscriptionManager.java b/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/InMemorySubscriptionManager.java deleted file mode 100644 index 4adbf1c..0000000 --- a/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/InMemorySubscriptionManager.java +++ /dev/null @@ -1,128 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.server.subscriptions; - -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ScheduledExecutorService; - -import com.google.protobuf.ByteString; - -import org.apache.bookkeeper.versioning.Version; -import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionData; -import org.apache.hedwig.server.common.ServerConfiguration; -import org.apache.hedwig.server.delivery.DeliveryManager; -import org.apache.hedwig.server.persistence.PersistenceManager; -import org.apache.hedwig.server.topics.TopicManager; -import org.apache.hedwig.util.Callback; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class InMemorySubscriptionManager extends AbstractSubscriptionManager { - private static final Logger logger = LoggerFactory.getLogger(InMemorySubscriptionManager.class); - // Backup for top2sub2seq - final ConcurrentHashMap> top2sub2seqBackup = - new ConcurrentHashMap>(); - - public InMemorySubscriptionManager(ServerConfiguration conf, - TopicManager tm, PersistenceManager pm, - DeliveryManager dm, - ScheduledExecutorService scheduler) { - super(conf, tm, pm, dm, scheduler); - } - - @Override - protected void createSubscriptionData(ByteString topic, ByteString subscriberId, SubscriptionData subData, - Callback callback, Object ctx) { - // nothing to do, in-memory info is already recorded by base class - callback.operationFinished(ctx, null); - } - - @Override - protected void deleteSubscriptionData(ByteString topic, ByteString subscriberId, Version version, Callback callback, - Object ctx) { - // nothing to do, in-memory info is already deleted by base class - callback.operationFinished(ctx, null); - } - - @Override - protected boolean isPartialUpdateSupported() { - return false; - } - - @Override - protected void updateSubscriptionData(ByteString topic, ByteString subscriberId, SubscriptionData data, - Version version, Callback callback, Object ctx) { - throw new UnsupportedOperationException("Doesn't support partial update"); - } - - @Override - protected void replaceSubscriptionData(ByteString topic, ByteString subscriberId, SubscriptionData data, - Version version, Callback callback, Object ctx) { - // nothing to do, in-memory info is already updated by base class - callback.operationFinished(ctx, null); - } - - @Override - public void lostTopic(ByteString topic) { - // Backup topic-sub2seq map for readSubscriptions - final Map sub2seq = top2sub2seq.get(topic); - if (null != sub2seq) - top2sub2seqBackup.put(topic, sub2seq); - - if (logger.isDebugEnabled()) { - logger.debug("InMemorySubscriptionManager is losing topic " + topic.toStringUtf8()); - } - queuer.pushAndMaybeRun(topic, new ReleaseOp(topic, noopCallback, null)); - } - - @Override - protected void readSubscriptions(ByteString topic, - Callback> cb, Object ctx) { - // Since we backed up in-memory information on lostTopic, we can just return that back - Map topicSubs = top2sub2seqBackup.remove(topic); - - if (topicSubs != null) { - cb.operationFinished(ctx, topicSubs); - } else { - cb.operationFinished(ctx, new ConcurrentHashMap()); - } - - } - - @Override - protected void readSubscriptionData(ByteString topic, - ByteString subscriberId, Callback cb, Object ctx) { - // Since we backed up in-memory information on lostTopic, we can just return that back - Map sub2seqBackup = top2sub2seqBackup.get(topic); - if (sub2seqBackup == null) { - cb.operationFinished(ctx, new InMemorySubscriptionState( - SubscriptionData.getDefaultInstance(), Version.NEW)); - return; - } - InMemorySubscriptionState subState = sub2seqBackup.remove(subscriberId); - - if (subState != null) { - cb.operationFinished(ctx, subState); - } else { - cb.operationFinished(ctx, new InMemorySubscriptionState( - SubscriptionData.getDefaultInstance(), Version.NEW)); - } - } - -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/InMemorySubscriptionState.java ---------------------------------------------------------------------- diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/InMemorySubscriptionState.java b/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/InMemorySubscriptionState.java deleted file mode 100644 index ea74599..0000000 --- a/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/InMemorySubscriptionState.java +++ /dev/null @@ -1,198 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.server.subscriptions; - - -import java.util.Map; -import com.google.protobuf.ByteString; -import org.apache.bookkeeper.versioning.Version; -import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId; -import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionData; -import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionPreferences; -import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionState; -import org.apache.hedwig.protoextensions.MapUtils; -import org.apache.hedwig.protoextensions.SubscriptionStateUtils; - -public class InMemorySubscriptionState { - SubscriptionState subscriptionState; - SubscriptionPreferences subscriptionPreferences; - MessageSeqId lastConsumeSeqId; - Version version; - long lastPersistedSeqId; - - public InMemorySubscriptionState(SubscriptionData subscriptionData, Version version, MessageSeqId lastConsumeSeqId) { - this.subscriptionState = subscriptionData.getState(); - if (subscriptionData.hasPreferences()) { - this.subscriptionPreferences = subscriptionData.getPreferences(); - } else { - // set initial subscription preferences - SubscriptionPreferences.Builder prefsBuilder = SubscriptionPreferences.newBuilder(); - // progate the old system preferences from subscription state to preferences - prefsBuilder.setMessageBound(subscriptionState.getMessageBound()); - this.subscriptionPreferences = prefsBuilder.build(); - - } - this.lastConsumeSeqId = lastConsumeSeqId; - this.version = version; - this.lastPersistedSeqId = subscriptionState.getMsgId().getLocalComponent(); - } - - public InMemorySubscriptionState(SubscriptionData subscriptionData, Version version) { - this(subscriptionData, version, subscriptionData.getState().getMsgId()); - } - - public SubscriptionData toSubscriptionData() { - SubscriptionState.Builder stateBuilder = - SubscriptionState.newBuilder(subscriptionState).setMsgId(lastConsumeSeqId); - return SubscriptionData.newBuilder().setState(stateBuilder) - .setPreferences(subscriptionPreferences) - .build(); - } - - public SubscriptionState getSubscriptionState() { - return subscriptionState; - } - - public SubscriptionPreferences getSubscriptionPreferences() { - return subscriptionPreferences; - } - - public MessageSeqId getLastConsumeSeqId() { - return lastConsumeSeqId; - } - - public Version getVersion() { - return version; - } - - public void setVersion(Version version) { - this.version = version; - } - - /** - * - * @param lastConsumeSeqId - * @param consumeInterval - * The amount of laziness we want in persisting the consume - * pointers - * @return true if the resulting structure needs to be persisted, false - * otherwise - */ - public boolean setLastConsumeSeqId(MessageSeqId lastConsumeSeqId, int consumeInterval) { - long interval = lastConsumeSeqId.getLocalComponent() - subscriptionState.getMsgId().getLocalComponent(); - if (interval <= 0) { - return false; - } - - // set consume seq id when it is larger - this.lastConsumeSeqId = lastConsumeSeqId; - if (interval < consumeInterval) { - return false; - } - - // subscription state will be updated, marked it as clean - subscriptionState = SubscriptionState.newBuilder(subscriptionState).setMsgId(lastConsumeSeqId).build(); - return true; - } - - /** - * Set lastConsumeSeqId Immediately - * - * @return true if the resulting structure needs to be persisted, false otherwise - */ - public boolean setLastConsumeSeqIdImmediately() { - long interval = lastConsumeSeqId.getLocalComponent() - subscriptionState.getMsgId().getLocalComponent(); - // no need to set - if (interval <= 0) { - return false; - } - subscriptionState = SubscriptionState.newBuilder(subscriptionState).setMsgId(lastConsumeSeqId).build(); - return true; - } - - public long getLastPersistedSeqId() { - return lastPersistedSeqId; - } - - public void setLastPersistedSeqId(long lastPersistedSeqId) { - this.lastPersistedSeqId = lastPersistedSeqId; - } - - /** - * Update preferences. - * - * @return true if preferences is updated, which needs to be persisted, false otherwise. - */ - public boolean updatePreferences(SubscriptionPreferences preferences) { - boolean changed = false; - SubscriptionPreferences.Builder newPreferencesBuilder = SubscriptionPreferences.newBuilder(subscriptionPreferences); - if (preferences.hasMessageBound()) { - if (!subscriptionPreferences.hasMessageBound() || - subscriptionPreferences.getMessageBound() != preferences.getMessageBound()) { - newPreferencesBuilder.setMessageBound(preferences.getMessageBound()); - changed = true; - } - } - if (preferences.hasMessageFilter()) { - if (!subscriptionPreferences.hasMessageFilter() || - !subscriptionPreferences.getMessageFilter().equals(preferences.getMessageFilter())) { - newPreferencesBuilder.setMessageFilter(preferences.getMessageFilter()); - changed = true; - } - } - if (preferences.hasMessageWindowSize()) { - if (!subscriptionPreferences.hasMessageWindowSize() || - subscriptionPreferences.getMessageWindowSize() != - preferences.getMessageWindowSize()) { - newPreferencesBuilder.setMessageWindowSize(preferences.getMessageWindowSize()); - changed = true; - } - } - if (preferences.hasOptions()) { - Map userOptions = SubscriptionStateUtils.buildUserOptions(subscriptionPreferences); - Map optUpdates = SubscriptionStateUtils.buildUserOptions(preferences); - boolean optChanged = false; - for (Map.Entry entry : optUpdates.entrySet()) { - String key = entry.getKey(); - if (userOptions.containsKey(key)) { - if (null == entry.getValue()) { - userOptions.remove(key); - optChanged = true; - } else { - if (!entry.getValue().equals(userOptions.get(key))) { - userOptions.put(key, entry.getValue()); - optChanged = true; - } - } - } else { - userOptions.put(key, entry.getValue()); - optChanged = true; - } - } - if (optChanged) { - changed = true; - newPreferencesBuilder.setOptions(MapUtils.buildMapBuilder(userOptions)); - } - } - if (changed) { - subscriptionPreferences = newPreferencesBuilder.build(); - } - return changed; - } - -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/MMSubscriptionManager.java ---------------------------------------------------------------------- diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/MMSubscriptionManager.java b/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/MMSubscriptionManager.java deleted file mode 100644 index 47fdfd2..0000000 --- a/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/MMSubscriptionManager.java +++ /dev/null @@ -1,138 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.server.subscriptions; - -import java.io.IOException; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ScheduledExecutorService; - -import com.google.protobuf.ByteString; - -import org.apache.hedwig.exceptions.PubSubException; -import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionData; -import org.apache.hedwig.server.common.ServerConfiguration; -import org.apache.hedwig.server.delivery.DeliveryManager; -import org.apache.hedwig.server.meta.MetadataManagerFactory; -import org.apache.hedwig.server.meta.SubscriptionDataManager; -import org.apache.hedwig.server.persistence.PersistenceManager; -import org.apache.hedwig.server.topics.TopicManager; -import org.apache.hedwig.util.Callback; -import org.apache.bookkeeper.versioning.Version; -import org.apache.bookkeeper.versioning.Versioned; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * MetaManager-based subscription manager. - */ -public class MMSubscriptionManager extends AbstractSubscriptionManager { - - private static final Logger logger = LoggerFactory.getLogger(MMSubscriptionManager.class); - SubscriptionDataManager subManager; - - public MMSubscriptionManager(ServerConfiguration cfg, - MetadataManagerFactory metaManagerFactory, - TopicManager topicMgr, PersistenceManager pm, - DeliveryManager dm, - ScheduledExecutorService scheduler) { - super(cfg, topicMgr, pm, dm, scheduler); - this.subManager = metaManagerFactory.newSubscriptionDataManager(); - } - - @Override - protected void readSubscriptions(final ByteString topic, - final Callback> cb, final Object ctx) { - subManager.readSubscriptions(topic, new Callback>>() { - @Override - public void operationFailed(Object ctx, PubSubException pse) { - cb.operationFailed(ctx, pse); - } - @Override - public void operationFinished(Object ctx, Map> subs) { - Map results = new ConcurrentHashMap(); - for (Map.Entry> subEntry : subs.entrySet()) { - Versioned vv = subEntry.getValue(); - results.put(subEntry.getKey(), new InMemorySubscriptionState(vv.getValue(), vv.getVersion())); - } - cb.operationFinished(ctx, results); - } - }, ctx); - } - - @Override - protected void readSubscriptionData(final ByteString topic, final ByteString subscriberId, - final Callback cb, final Object ctx) { - subManager.readSubscriptionData(topic, subscriberId, new Callback>() { - @Override - public void operationFinished(Object ctx, - Versioned subData) { - if (null != subData) { - cb.operationFinished(ctx, - new InMemorySubscriptionState(subData.getValue(), subData.getVersion())); - } else { - cb.operationFinished(ctx, new InMemorySubscriptionState( - SubscriptionData.getDefaultInstance(), Version.NEW)); - } - } - @Override - public void operationFailed(Object ctx, PubSubException exception) { - cb.operationFailed(ctx, exception); - } - }, ctx); - } - - @Override - protected boolean isPartialUpdateSupported() { - return subManager.isPartialUpdateSupported(); - } - - @Override - protected void createSubscriptionData(final ByteString topic, final ByteString subscriberId, - final SubscriptionData subData, final Callback callback, final Object ctx) { - subManager.createSubscriptionData(topic, subscriberId, subData, callback, ctx); - } - - @Override - protected void replaceSubscriptionData(final ByteString topic, final ByteString subscriberId, final SubscriptionData subData, - final Version version, final Callback callback, final Object ctx) { - subManager.replaceSubscriptionData(topic, subscriberId, subData, version, callback, ctx); - } - - @Override - protected void updateSubscriptionData(final ByteString topic, final ByteString subscriberId, final SubscriptionData subData, - final Version version, final Callback callback, final Object ctx) { - subManager.updateSubscriptionData(topic, subscriberId, subData, version, callback, ctx); - } - - @Override - protected void deleteSubscriptionData(final ByteString topic, final ByteString subscriberId, Version version, - final Callback callback, final Object ctx) { - subManager.deleteSubscriptionData(topic, subscriberId, version, callback, ctx); - } - - @Override - public void stop() { - super.stop(); - try { - subManager.close(); - } catch (IOException ioe) { - logger.warn("Exception closing subscription data manager : ", ioe); - } - } -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/SubscriptionEventListener.java ---------------------------------------------------------------------- diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/SubscriptionEventListener.java b/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/SubscriptionEventListener.java deleted file mode 100644 index 6c6e626..0000000 --- a/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/SubscriptionEventListener.java +++ /dev/null @@ -1,57 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.server.subscriptions; - -import com.google.protobuf.ByteString; -import org.apache.hedwig.util.Callback; - -/** - * For listening to events that are issued by a SubscriptionManager. - * - */ -public interface SubscriptionEventListener { - - /** - * Called by the subscription manager when it previously had zero local - * subscribers for a topic and is currently accepting its first local - * subscriber. - * - * @param topic - * The topic of interest. - * @param synchronous - * Whether this request was actually initiated by a new local - * subscriber, or whether it was an existing subscription - * inherited by the hub (e.g. when recovering the state from ZK). - * @param cb - * The subscription will not complete until success is called on - * this callback. An error on cb will result in a subscription - * error. - */ - public void onFirstLocalSubscribe(ByteString topic, boolean synchronous, Callback cb); - - /** - * Called by the SubscriptionManager when it previously had non-zero local - * subscribers for a topic and is currently dropping its last local - * subscriber. This is fully asynchronous so there is no callback. - * - * @param topic - * The topic of interest. - */ - public void onLastLocalUnsubscribe(ByteString topic); - -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/SubscriptionManager.java ---------------------------------------------------------------------- diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/SubscriptionManager.java b/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/SubscriptionManager.java deleted file mode 100644 index eadebcb..0000000 --- a/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/SubscriptionManager.java +++ /dev/null @@ -1,123 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.server.subscriptions; - -import com.google.protobuf.ByteString; -import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId; -import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest; -import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionData; -import org.apache.hedwig.util.Callback; - -/** - * All methods are thread-safe. - */ -public interface SubscriptionManager { - - /** - * - * Register a new subscription for the given subscriber for the given topic. - * This method should reliably persist the existence of the subscription in - * a way that it can't be lost. If the subscription already exists, - * depending on the create or attach flag in the subscribe request, an - * exception may be returned. - * - * This is an asynchronous method. - * - * @param topic - * @param subRequest - * @param consumeSeqId - * The seqId to start serving the subscription from, if this is a - * brand new subscription - * @param callback - * The subscription data returned by the callback. - * @param ctx - */ - public void serveSubscribeRequest(ByteString topic, SubscribeRequest subRequest, MessageSeqId consumeSeqId, - Callback callback, Object ctx); - - /** - * Set the consume position of a given subscriber on a given topic. Note - * that this method need not persist the consume position immediately but - * can be lazy and persist it later asynchronously, if that is more - * efficient. - * - * @param topic - * @param subscriberId - * @param consumeSeqId - */ - public void setConsumeSeqIdForSubscriber(ByteString topic, ByteString subscriberId, MessageSeqId consumeSeqId, - Callback callback, Object ctx); - - /** - * Close a particular subscription - * - * @param topic - * Topic Name - * @param subscriberId - * Subscriber Id - * @param callback - * Callback - * @param ctx - * Callback context - */ - public void closeSubscription(ByteString topic, ByteString subscriberId, - Callback callback, Object ctx); - - /** - * Delete a particular subscription - * - * @param topic - * @param subscriberId - */ - public void unsubscribe(ByteString topic, ByteString subscriberId, Callback callback, Object ctx); - - // Management API methods that we will fill in later - // /** - // * Get the ids of all subscribers for a given topic - // * - // * @param topic - // * @return A list of subscriber ids that are currently subscribed to the - // * given topic - // */ - // public List getSubscriptionsForTopic(ByteString topic); - // - // /** - // * Get the topics to which a given subscriber is subscribed to - // * - // * @param subscriberId - // * @return A list of the topics to which the given subscriber is - // subscribed - // * to - // * @throws ServiceDownException - // * If there is an error in looking up the subscription - // * information - // */ - // public List getTopicsForSubscriber(ByteString subscriberId) - // throws ServiceDownException; - - /** - * Add a listener that is notified when topic-subscription pairs are added - * or removed. - */ - public void addListener(SubscriptionEventListener listener); - - /** - * Stop Subscription Manager - */ - public void stop(); -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/topics/AbstractTopicManager.java ---------------------------------------------------------------------- diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/topics/AbstractTopicManager.java b/hedwig-server/src/main/java/org/apache/hedwig/server/topics/AbstractTopicManager.java deleted file mode 100644 index 2d9aba2..0000000 --- a/hedwig-server/src/main/java/org/apache/hedwig/server/topics/AbstractTopicManager.java +++ /dev/null @@ -1,314 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.server.topics; - -import java.net.UnknownHostException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; - -import org.apache.hedwig.exceptions.PubSubException; -import org.apache.hedwig.server.common.ServerConfiguration; -import org.apache.hedwig.server.common.TopicOpQueuer; -import org.apache.hedwig.util.Callback; -import org.apache.hedwig.util.CallbackUtils; -import org.apache.hedwig.util.HedwigSocketAddress; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.cache.Cache; -import com.google.common.cache.CacheBuilder; -import com.google.common.cache.RemovalListener; -import com.google.common.cache.RemovalNotification; -import com.google.common.collect.Lists; -import com.google.protobuf.ByteString; - -public abstract class AbstractTopicManager implements TopicManager { - - /** - * Stats for a topic. For now it just an empty stub class. - */ - static class TopicStats { - } - - final static TopicStats STUB_TOPIC_STATS = new TopicStats(); - - /** - * My name. - */ - protected HedwigSocketAddress addr; - - /** - * Topic change listeners. - */ - protected ArrayList listeners = new ArrayList(); - - /** - * List of topics I believe I am responsible for. - */ - protected Cache topics; - - protected TopicOpQueuer queuer; - protected ServerConfiguration cfg; - protected ScheduledExecutorService scheduler; - - private static final Logger logger = LoggerFactory.getLogger(AbstractTopicManager.class); - - private class GetOwnerOp extends TopicOpQueuer.AsynchronousOp { - public boolean shouldClaim; - - public GetOwnerOp(final ByteString topic, boolean shouldClaim, - final Callback cb, Object ctx) { - queuer.super(topic, cb, ctx); - this.shouldClaim = shouldClaim; - } - - @Override - public void run() { - realGetOwner(topic, shouldClaim, cb, ctx); - } - } - - private class ReleaseOp extends TopicOpQueuer.AsynchronousOp { - final boolean checkExistence; - - public ReleaseOp(ByteString topic, Callback cb, Object ctx) { - this(topic, cb, ctx, true); - } - - ReleaseOp(ByteString topic, Callback cb, Object ctx, - boolean checkExistence) { - queuer.super(topic, cb, ctx); - this.checkExistence = checkExistence; - } - - @Override - public void run() { - if (checkExistence) { - TopicStats stats = topics.getIfPresent(topic); - if (null == stats) { - cb.operationFinished(ctx, null); - return; - } - } - realReleaseTopic(topic, cb, ctx); - } - } - - /** - * Release topic when the topic is removed from topics cache. - */ - class ReleaseTopicListener implements RemovalListener { - @Override - public void onRemoval(RemovalNotification notification) { - if (notification.wasEvicted()) { - logger.info("topic {} is evicted", notification.getKey().toStringUtf8()); - // if the topic is evicted, we need to release the topic. - releaseTopicInternally(notification.getKey(), false); - } - } - } - - public AbstractTopicManager(ServerConfiguration cfg, ScheduledExecutorService scheduler) - throws UnknownHostException { - this.cfg = cfg; - this.queuer = new TopicOpQueuer(scheduler); - this.scheduler = scheduler; - addr = cfg.getServerAddr(); - - // build the topic cache - CacheBuilder cacheBuilder = CacheBuilder.newBuilder() - .maximumSize(cfg.getMaxNumTopics()) - .initialCapacity(cfg.getInitNumTopics()) - // TODO: change to same number as topic op queuer threads - .concurrencyLevel(Runtime.getRuntime().availableProcessors()) - .removalListener(new ReleaseTopicListener()); - if (cfg.getRetentionSecsAfterAccess() > 0) { - cacheBuilder.expireAfterAccess(cfg.getRetentionSecsAfterAccess(), TimeUnit.SECONDS); - } - topics = cacheBuilder.build(); - } - - @Override - public void incrementTopicAccessTimes(ByteString topic) { - // let guava cache handle hits counting - topics.getIfPresent(topic); - } - - @Override - public synchronized void addTopicOwnershipChangeListener(TopicOwnershipChangeListener listener) { - listeners.add(listener); - } - - private void releaseTopicInternally(final ByteString topic, boolean checkExistence) { - // Enqueue a release operation. (Recall that release - // doesn't "fail" even if the topic is missing.) - queuer.pushAndMaybeRun(topic, new ReleaseOp(topic, new Callback() { - - @Override - public void operationFailed(Object ctx, PubSubException exception) { - logger.error("failure that should never happen when releasing topic " - + topic, exception); - } - - @Override - public void operationFinished(Object ctx, Void resultOfOperation) { - logger.info("successfully release of topic " - + topic.toStringUtf8()); - if (logger.isDebugEnabled()) { - logger.debug("successfully release of topic " - + topic.toStringUtf8()); - } - } - - }, null, checkExistence)); - } - - protected final synchronized void notifyListenersAndAddToOwnedTopics(final ByteString topic, - final Callback originalCallback, final Object originalContext) { - - Callback postCb = new Callback() { - - @Override - public void operationFinished(Object ctx, Void resultOfOperation) { - topics.put(topic, STUB_TOPIC_STATS); - if (cfg.getRetentionSecs() > 0) { - scheduler.schedule(new Runnable() { - @Override - public void run() { - releaseTopicInternally(topic, true); - } - }, cfg.getRetentionSecs(), TimeUnit.SECONDS); - } - originalCallback.operationFinished(originalContext, addr); - } - - @Override - public void operationFailed(final Object ctx, final PubSubException exception) { - // TODO: optimization: we can release this as soon as we experience the first error. - Callback cb = new Callback() { - @Override - public void operationFinished(Object _ctx, Void _resultOfOperation) { - originalCallback.operationFailed(ctx, exception); - } - @Override - public void operationFailed(Object _ctx, PubSubException _exception) { - logger.error("Exception releasing topic", _exception); - originalCallback.operationFailed(ctx, exception); - } - }; - - realReleaseTopic(topic, cb, originalContext); - } - }; - - Callback mcb = CallbackUtils.multiCallback(listeners.size(), postCb, null); - for (TopicOwnershipChangeListener listener : listeners) { - listener.acquiredTopic(topic, mcb, null); - } - } - - private void realReleaseTopic(ByteString topic, Callback callback, Object ctx) { - for (TopicOwnershipChangeListener listener : listeners) - listener.lostTopic(topic); - topics.invalidate(topic); - postReleaseCleanup(topic, callback, ctx); - } - - @Override - public final void getOwner(ByteString topic, boolean shouldClaim, - Callback cb, Object ctx) { - queuer.pushAndMaybeRun(topic, new GetOwnerOp(topic, shouldClaim, cb, ctx)); - } - - @Override - public final void releaseTopic(ByteString topic, Callback cb, Object ctx) { - queuer.pushAndMaybeRun(topic, new ReleaseOp(topic, cb, ctx)); - } - - @Override - public final void releaseTopics(int numTopics, final Callback callback, final Object ctx) { - // This is a best effort function. We sacrifice accuracy to not hold a lock on the topics set. - List topicList = getTopicList(); - // Make sure we release only as many topics as we own. - final long numTopicsToRelease = Math.min(topicList.size(), numTopics); - // Shuffle the list of topics we own, so that we release a random subset. - Collections.shuffle(topicList); - Callback mcb = CallbackUtils.multiCallback((int)numTopicsToRelease, new Callback() { - @Override - public void operationFinished(Object ctx, Void ignoreVal) { - callback.operationFinished(ctx, numTopicsToRelease); - } - - @Override - public void operationFailed(Object ctx, PubSubException e) { - long notReleased = 0; - if (e instanceof PubSubException.CompositeException) { - notReleased = ((PubSubException.CompositeException)e).getExceptions().size(); - } - callback.operationFinished(ctx, numTopicsToRelease - notReleased); - } - }, ctx); - - // Try to release "numTopicsToRelease" topics. It's okay if we're not - // able to release some topics. We signal that we tried by invoking the callback's - // operationFinished() with the actual number of topics released. - logger.info("This hub is releasing {} topics", numTopicsToRelease); - long releaseCount = 0; - for (ByteString topic : topicList) { - if (++releaseCount > numTopicsToRelease) { - break; - } - releaseTopic(topic, mcb, ctx); - } - } - - @Override - public List getTopicList() { - List topicList; - synchronized (this.topics) { - topicList = Lists.newArrayList(this.topics.asMap().keySet()); - } - return topicList; - } - - /** - * This method should "return" the owner of the topic if one has been chosen - * already. If there is no pre-chosen owner, either this hub or some other - * should be chosen based on the shouldClaim parameter. If its ends up - * choosing this hub as the owner, the {@code - * AbstractTopicManager#notifyListenersAndAddToOwnedTopics(ByteString, - * OperationCallback, Object)} method must be called. - * - */ - protected abstract void realGetOwner(ByteString topic, boolean shouldClaim, - Callback cb, Object ctx); - - /** - * The method should do any cleanup necessary to indicate to other hubs that - * this topic has been released - */ - protected abstract void postReleaseCleanup(ByteString topic, Callback cb, Object ctx); - - @Override - public void stop() { - // do nothing now - } -}