Added: hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/regions/RegionManager.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/regions/RegionManager.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/regions/RegionManager.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/regions/RegionManager.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server.regions;
+
+import java.util.ArrayList;
+import java.util.concurrent.ScheduledExecutorService;
+
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.ZooKeeper;
+
+import com.google.protobuf.ByteString;
+import org.apache.hedwig.client.api.MessageHandler;
+import org.apache.hedwig.client.netty.HedwigSubscriber;
+import org.apache.hedwig.exceptions.PubSubException;
+import org.apache.hedwig.protocol.PubSubProtocol.Message;
+import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId;
+import org.apache.hedwig.protocol.PubSubProtocol.RegionSpecificSeqId;
+import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach;
+import org.apache.hedwig.protoextensions.SubscriptionStateUtils;
+import org.apache.hedwig.server.common.ServerConfiguration;
+import org.apache.hedwig.server.common.TopicOpQueuer;
+import org.apache.hedwig.server.persistence.PersistRequest;
+import org.apache.hedwig.server.persistence.PersistenceManager;
+import org.apache.hedwig.server.subscriptions.SubscriptionEventListener;
+import org.apache.hedwig.util.Callback;
+import org.apache.hedwig.util.CallbackUtils;
+import org.apache.hedwig.util.HedwigSocketAddress;
+
+public class RegionManager implements SubscriptionEventListener {
+
+ protected static final Logger LOGGER = Logger.getLogger(RegionManager.class);
+
+ private final ByteString mySubId;
+ private final PersistenceManager pm;
+ private final ArrayList<HedwigHubClient> clients = new ArrayList<HedwigHubClient>();
+ private final TopicOpQueuer queue;
+
+ public RegionManager(final PersistenceManager pm, final ServerConfiguration cfg, final ZooKeeper zk,
+ ScheduledExecutorService scheduler, HedwigHubClientFactory hubClientFactory) {
+ this.pm = pm;
+ mySubId = ByteString.copyFromUtf8(SubscriptionStateUtils.HUB_SUBSCRIBER_PREFIX + cfg.getMyRegion());
+ queue = new TopicOpQueuer(scheduler);
+ for (final String hub : cfg.getRegions()) {
+ clients.add(hubClientFactory.create(new HedwigSocketAddress(hub)));
+ }
+ }
+
+ @Override
+ public void onFirstLocalSubscribe(final ByteString topic, final boolean synchronous, final Callback<Void> cb) {
+ // Whenever we acquire a topic due to a (local) subscribe, subscribe on
+ // it to all the other regions (currently using simple all-to-all
+ // topology).
+ queue.pushAndMaybeRun(topic, queue.new AsynchronousOp<Void>(topic, cb, null) {
+ @Override
+ public void run() {
+ Callback<Void> postCb = synchronous ? cb : CallbackUtils.logger(LOGGER, Level.DEBUG, Level.ERROR,
+ "all cross-region subscriptions succeeded", "at least one cross-region subscription failed");
+ final Callback<Void> mcb = CallbackUtils.multiCallback(clients.size(), postCb, ctx);
+ for (final HedwigHubClient client : clients) {
+ final HedwigSubscriber sub = client.getSubscriber();
+ sub.asyncSubscribe(topic, mySubId, CreateOrAttach.CREATE_OR_ATTACH, new Callback<Void>() {
+ @Override
+ public void operationFinished(Object ctx, Void resultOfOperation) {
+ if (LOGGER.isDebugEnabled())
+ LOGGER.debug("cross-region subscription done for topic " + topic.toStringUtf8());
+ try {
+ sub.startDelivery(topic, mySubId, new MessageHandler() {
+ @Override
+ public void consume(final ByteString topic, ByteString subscriberId, Message msg,
+ final Callback<Void> callback, final Object context) {
+ // When messages are first published
+ // locally, the PublishHandler sets the
+ // source region in the Message.
+ if (msg.hasSrcRegion()) {
+ Message.newBuilder(msg).setMsgId(
+ MessageSeqId.newBuilder(msg.getMsgId()).addRemoteComponents(
+ RegionSpecificSeqId.newBuilder().setRegion(
+ msg.getSrcRegion()).setSeqId(
+ msg.getMsgId().getLocalComponent())));
+ }
+ pm.persistMessage(new PersistRequest(topic, msg, new Callback<Long>() {
+ @Override
+ public void operationFinished(Object ctx, Long resultOfOperation) {
+ if (LOGGER.isDebugEnabled())
+ LOGGER.debug("cross-region recv-fwd succeeded for topic "
+ + topic.toStringUtf8());
+ callback.operationFinished(context, null);
+ }
+
+ @Override
+ public void operationFailed(Object ctx, PubSubException exception) {
+ if (LOGGER.isDebugEnabled())
+ LOGGER.error("cross-region recv-fwd failed for topic "
+ + topic.toStringUtf8(), exception);
+ callback.operationFailed(context, exception);
+ }
+ }, null));
+ }
+ });
+ if (LOGGER.isDebugEnabled())
+ LOGGER.debug("cross-region start-delivery succeeded for topic "
+ + topic.toStringUtf8());
+ mcb.operationFinished(ctx, null);
+ } catch (PubSubException ex) {
+ if (LOGGER.isDebugEnabled())
+ LOGGER.error(
+ "cross-region start-delivery failed for topic " + topic.toStringUtf8(), ex);
+ mcb.operationFailed(ctx, ex);
+ }
+ }
+
+ @Override
+ public void operationFailed(Object ctx, PubSubException exception) {
+ if (LOGGER.isDebugEnabled())
+ LOGGER.error("cross-region subscribe failed for topic " + topic.toStringUtf8(),
+ exception);
+ mcb.operationFailed(ctx, exception);
+ }
+ }, null);
+ }
+ if (!synchronous)
+ cb.operationFinished(null, null);
+ }
+ });
+
+ }
+
+ @Override
+ public void onLastLocalUnsubscribe(final ByteString topic) {
+ // TODO may want to ease up on the eager unsubscribe; this is dropping
+ // cross-region subscriptions ASAP
+ queue.pushAndMaybeRun(topic, queue.new AsynchronousOp<Void>(topic, new Callback<Void>() {
+
+ @Override
+ public void operationFinished(Object ctx, Void result) {
+ if (LOGGER.isDebugEnabled())
+ LOGGER.debug("cross-region unsubscribes succeeded for topic " + topic.toStringUtf8());
+ }
+
+ @Override
+ public void operationFailed(Object ctx, PubSubException exception) {
+ if (LOGGER.isDebugEnabled())
+ LOGGER.error("cross-region unsubscribes failed for topic " + topic.toStringUtf8(), exception);
+ }
+
+ }, null) {
+ @Override
+ public void run() {
+ Callback<Void> mcb = CallbackUtils.multiCallback(clients.size(), cb, ctx);
+ for (final HedwigHubClient client : clients) {
+ client.getSubscriber().asyncUnsubscribe(topic, mySubId, mcb, null);
+ }
+ }
+ });
+ }
+
+ // Method to shutdown and stop all of the cross-region Hedwig clients.
+ public void stop() {
+ for (HedwigHubClient client : clients) {
+ client.stop();
+ }
+ }
+
+}
Added: hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/ssl/SslServerContextFactory.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/ssl/SslServerContextFactory.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/ssl/SslServerContextFactory.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/ssl/SslServerContextFactory.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server.ssl;
+
+import java.security.KeyStore;
+
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.SSLContext;
+
+import org.apache.hedwig.client.ssl.SslContextFactory;
+import org.apache.hedwig.server.common.ServerConfiguration;
+
+public class SslServerContextFactory extends SslContextFactory {
+
+ public SslServerContextFactory(ServerConfiguration cfg) {
+ try {
+ // Load our Java key store.
+ KeyStore ks = KeyStore.getInstance("pkcs12");
+ ks.load(cfg.getCertStream(), cfg.getPassword().toCharArray());
+
+ // Like ssh-agent.
+ KeyManagerFactory kmf = KeyManagerFactory.getInstance("SunX509");
+ kmf.init(ks, cfg.getPassword().toCharArray());
+
+ // Create the SSL context.
+ ctx = SSLContext.getInstance("TLS");
+ ctx.init(kmf.getKeyManagers(), getTrustManagers(), null);
+ } catch (Exception ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+
+ @Override
+ protected boolean isClient() {
+ return false;
+ }
+
+}
Added: hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/subscriptions/AbstractSubscriptionManager.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/subscriptions/AbstractSubscriptionManager.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/subscriptions/AbstractSubscriptionManager.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/subscriptions/AbstractSubscriptionManager.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,450 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server.subscriptions;
+
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.log4j.Logger;
+
+import com.google.protobuf.ByteString;
+import org.apache.hedwig.exceptions.PubSubException;
+import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId;
+import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest;
+import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionState;
+import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach;
+import org.apache.hedwig.protoextensions.MessageIdUtils;
+import org.apache.hedwig.protoextensions.SubscriptionStateUtils;
+import org.apache.hedwig.server.common.ServerConfiguration;
+import org.apache.hedwig.server.common.TopicOpQueuer;
+import org.apache.hedwig.server.persistence.PersistenceManager;
+import org.apache.hedwig.server.topics.TopicManager;
+import org.apache.hedwig.server.topics.TopicOwnershipChangeListener;
+import org.apache.hedwig.util.Callback;
+import org.apache.hedwig.util.CallbackUtils;
+
+public abstract class AbstractSubscriptionManager implements SubscriptionManager, TopicOwnershipChangeListener {
+
+ ServerConfiguration cfg;
+ ConcurrentHashMap<ByteString, Map<ByteString, InMemorySubscriptionState>> top2sub2seq = new ConcurrentHashMap<ByteString, Map<ByteString, InMemorySubscriptionState>>();
+ static Logger logger = Logger.getLogger(AbstractSubscriptionManager.class);
+
+ TopicOpQueuer queuer;
+ private final ArrayList<SubscriptionEventListener> listeners = new ArrayList<SubscriptionEventListener>();
+ private final ConcurrentHashMap<ByteString, AtomicInteger> topic2LocalCounts = new ConcurrentHashMap<ByteString, AtomicInteger>();
+
+ // Handle to the PersistenceManager for the server so we can pass along the
+ // message consume pointers for each topic.
+ private final PersistenceManager pm;
+ // Timer for running a recurring thread task to get the minimum message
+ // sequence ID for each topic that all subscribers for it have consumed
+ // already. With that information, we can call the PersistenceManager to
+ // update it on the messages that are safe to be garbage collected.
+ private final Timer timer = new Timer(true);
+ // In memory mapping of topics to the minimum consumed message sequence ID
+ // for all subscribers to the topic.
+ private final ConcurrentHashMap<ByteString, Long> topic2MinConsumedMessagesMap = new ConcurrentHashMap<ByteString, Long>();
+
+ public AbstractSubscriptionManager(ServerConfiguration cfg, TopicManager tm, PersistenceManager pm,
+ ScheduledExecutorService scheduler) {
+ this.cfg = cfg;
+ queuer = new TopicOpQueuer(scheduler);
+ tm.addTopicOwnershipChangeListener(this);
+ this.pm = pm;
+ // Schedule the recurring MessagesConsumedTask only if a
+ // PersistenceManager is passed.
+ if (pm != null) {
+ timer.schedule(new MessagesConsumedTask(), 0, cfg.getMessagesConsumedThreadRunInterval());
+ }
+ }
+
+ /**
+ * This is the Timer Task for finding out for each topic, what the minimum
+ * consumed message by the subscribers are. This information is used to pass
+ * along to the server's PersistenceManager so it can garbage collect older
+ * topic messages that are no longer needed by the subscribers.
+ */
+ class MessagesConsumedTask extends TimerTask {
+ /**
+ * Implement the TimerTask's abstract run method.
+ */
+ @Override
+ public void run() {
+ // We are looping through relatively small in memory data structures
+ // so it should be safe to run this fairly often.
+ for (ByteString topic : top2sub2seq.keySet()) {
+ final Map<ByteString, InMemorySubscriptionState> topicSubscriptions = top2sub2seq.get(topic);
+ long minConsumedMessage = Long.MAX_VALUE;
+ // Loop through all subscribers to the current topic to find the
+ // minimum consumed message id. The consume pointers are
+ // persisted lazily so we'll use the stale in-memory value
+ // instead. This keeps things consistent in case of a server
+ // crash.
+ for (InMemorySubscriptionState curSubscription : topicSubscriptions.values()) {
+ if (curSubscription.getSubscriptionState().getMsgId().getLocalComponent() < minConsumedMessage)
+ minConsumedMessage = curSubscription.getSubscriptionState().getMsgId().getLocalComponent();
+ }
+ boolean callPersistenceManager = true;
+ // Don't call the PersistenceManager if nobody is subscribed to
+ // the topic yet, or the consume pointer has not changed since
+ // the last time, or if this is the initial subscription.
+ if (topicSubscriptions.isEmpty()
+ || (topic2MinConsumedMessagesMap.containsKey(topic) && topic2MinConsumedMessagesMap.get(topic) == minConsumedMessage)
+ || minConsumedMessage == 0) {
+ callPersistenceManager = false;
+ }
+ // Pass the new consume pointers to the PersistenceManager.
+ if (callPersistenceManager) {
+ topic2MinConsumedMessagesMap.put(topic, minConsumedMessage);
+ pm.consumedUntil(topic, minConsumedMessage);
+ }
+ }
+ }
+ }
+
+ private class AcquireOp extends TopicOpQueuer.AsynchronousOp<Void> {
+ public AcquireOp(ByteString topic, Callback<Void> callback, Object ctx) {
+ queuer.super(topic, callback, ctx);
+ }
+
+ @Override
+ public void run() {
+ if (top2sub2seq.containsKey(topic)) {
+ cb.operationFinished(ctx, null);
+ }
+
+ readSubscriptions(topic, new Callback<Map<ByteString, InMemorySubscriptionState>>() {
+ @Override
+ public void operationFailed(Object ctx, PubSubException exception) {
+ cb.operationFailed(ctx, exception);
+ }
+
+ @Override
+ public void operationFinished(final Object ctx,
+ final Map<ByteString, InMemorySubscriptionState> resultOfOperation) {
+ // We've just inherited a bunch of subscriber for this
+ // topic, some of which may be local. If they are, then we
+ // need to (1) notify listeners of this and (2) record the
+ // number for bookkeeping so that future
+ // subscribes/unsubscribes can efficiently notify listeners.
+
+ // Count the number of local subscribers we just inherited.
+ // This loop is OK since the number of subscribers per topic
+ // is expected to be small.
+ int localCount = 0;
+ for (ByteString subscriberId : resultOfOperation.keySet())
+ if (!SubscriptionStateUtils.isHubSubscriber(subscriberId))
+ localCount++;
+ topic2LocalCounts.put(topic, new AtomicInteger(localCount));
+
+ // The final "commit" (and "abort") operations.
+ final Callback<Void> cb2 = new Callback<Void>() {
+
+ @Override
+ public void operationFailed(Object ctx, PubSubException exception) {
+ logger.error("Subscription manager failed to acquired topic " + topic.toStringUtf8(),
+ exception);
+ cb.operationFailed(ctx, null);
+ }
+
+ @Override
+ public void operationFinished(Object ctx, Void voidObj) {
+ top2sub2seq.put(topic, resultOfOperation);
+ logger.info("Subscription manager successfully acquired topic: " + topic.toStringUtf8());
+ cb.operationFinished(ctx, null);
+ }
+
+ };
+
+ // Notify listeners if necessary.
+ if (localCount > 0) {
+ notifySubscribe(topic, false, cb2, ctx);
+ } else {
+ cb2.operationFinished(ctx, null);
+ }
+ }
+
+ }, ctx);
+
+ }
+
+ }
+
+ private void notifySubscribe(ByteString topic, boolean synchronous, final Callback<Void> cb, final Object ctx) {
+ Callback<Void> mcb = CallbackUtils.multiCallback(listeners.size(), cb, ctx);
+ for (SubscriptionEventListener listener : listeners) {
+ listener.onFirstLocalSubscribe(topic, synchronous, mcb);
+ }
+ }
+
+ /**
+ * Figure out who is subscribed. Do nothing if already acquired. If there's
+ * an error reading the subscribers' sequence IDs, then the topic is not
+ * acquired.
+ *
+ * @param topic
+ * @param callback
+ * @param ctx
+ */
+ @Override
+ public void acquiredTopic(final ByteString topic, final Callback<Void> callback, Object ctx) {
+ queuer.pushAndMaybeRun(topic, new AcquireOp(topic, callback, ctx));
+ }
+
+ /**
+ * Remove the local mapping.
+ */
+ @Override
+ public void lostTopic(ByteString topic) {
+ top2sub2seq.remove(topic);
+ // Notify listeners if necessary.
+ if (topic2LocalCounts.remove(topic).get() > 0)
+ notifyUnsubcribe(topic);
+ }
+
+ private void notifyUnsubcribe(ByteString topic) {
+ for (SubscriptionEventListener listener : listeners)
+ listener.onLastLocalUnsubscribe(topic);
+ }
+
+ protected abstract void readSubscriptions(final ByteString topic,
+ final Callback<Map<ByteString, InMemorySubscriptionState>> cb, final Object ctx);
+
+ private class SubscribeOp extends TopicOpQueuer.AsynchronousOp<MessageSeqId> {
+ SubscribeRequest subRequest;
+ MessageSeqId consumeSeqId;
+
+ public SubscribeOp(ByteString topic, SubscribeRequest subRequest, MessageSeqId consumeSeqId,
+ Callback<MessageSeqId> callback, Object ctx) {
+ queuer.super(topic, callback, ctx);
+ this.subRequest = subRequest;
+ this.consumeSeqId = consumeSeqId;
+ }
+
+ @Override
+ public void run() {
+
+ final Map<ByteString, InMemorySubscriptionState> topicSubscriptions = top2sub2seq.get(topic);
+ if (topicSubscriptions == null) {
+ cb.operationFailed(ctx, new PubSubException.ServerNotResponsibleForTopicException(""));
+ return;
+ }
+
+ final ByteString subscriberId = subRequest.getSubscriberId();
+ InMemorySubscriptionState subscriptionState = topicSubscriptions.get(subscriberId);
+ CreateOrAttach createOrAttach = subRequest.getCreateOrAttach();
+
+ if (subscriptionState != null) {
+
+ if (createOrAttach.equals(CreateOrAttach.CREATE)) {
+ String msg = "Topic: " + topic.toStringUtf8() + " subscriberId: " + subscriberId.toStringUtf8()
+ + " requested creating a subscription but it is already subscribed with state: "
+ + SubscriptionStateUtils.toString(subscriptionState.getSubscriptionState());
+ logger.debug(msg);
+ cb.operationFailed(ctx, new PubSubException.ClientAlreadySubscribedException(msg));
+ return;
+ }
+
+ // otherwise just attach
+ if (logger.isDebugEnabled()) {
+ logger.debug("Topic: " + topic.toStringUtf8() + " subscriberId: " + subscriberId.toStringUtf8()
+ + " attaching to subscription with state: "
+ + SubscriptionStateUtils.toString(subscriptionState.getSubscriptionState()));
+ }
+
+ cb.operationFinished(ctx, subscriptionState.getLastConsumeSeqId());
+ return;
+ }
+
+ // we don't have a mapping for this subscriber
+ if (createOrAttach.equals(CreateOrAttach.ATTACH)) {
+ String msg = "Topic: " + topic.toStringUtf8() + " subscriberId: " + subscriberId.toStringUtf8()
+ + " requested attaching to an existing subscription but it is not subscribed";
+ logger.debug(msg);
+ cb.operationFailed(ctx, new PubSubException.ClientNotSubscribedException(msg));
+ return;
+ }
+
+ // now the hard case, this is a brand new subscription, must record
+ final SubscriptionState newState = SubscriptionState.newBuilder().setMsgId(consumeSeqId).build();
+ createSubscriptionState(topic, subscriberId, newState, new Callback<Void>() {
+ @Override
+ public void operationFailed(Object ctx, PubSubException exception) {
+ cb.operationFailed(ctx, exception);
+ }
+
+ @Override
+ public void operationFinished(Object ctx, Void resultOfOperation) {
+ Callback<Void> cb2 = new Callback<Void>() {
+
+ @Override
+ public void operationFailed(Object ctx, PubSubException exception) {
+ logger.error("subscription for subscriber " + subscriberId.toStringUtf8() + " to topic "
+ + topic.toStringUtf8() + " failed due to failed listener callback", exception);
+ cb.operationFailed(ctx, exception);
+ }
+
+ @Override
+ public void operationFinished(Object ctx, Void resultOfOperation) {
+ topicSubscriptions.put(subscriberId, new InMemorySubscriptionState(newState));
+ cb.operationFinished(ctx, consumeSeqId);
+ }
+
+ };
+
+ if (!SubscriptionStateUtils.isHubSubscriber(subRequest.getSubscriberId())
+ && topic2LocalCounts.get(topic).incrementAndGet() == 1)
+ notifySubscribe(topic, subRequest.getSynchronous(), cb2, ctx);
+ else
+ cb2.operationFinished(ctx, resultOfOperation);
+ }
+ }, ctx);
+ }
+ }
+
+ @Override
+ public void serveSubscribeRequest(ByteString topic, SubscribeRequest subRequest, MessageSeqId consumeSeqId,
+ Callback<MessageSeqId> callback, Object ctx) {
+ queuer.pushAndMaybeRun(topic, new SubscribeOp(topic, subRequest, consumeSeqId, callback, ctx));
+ }
+
+ private class ConsumeOp extends TopicOpQueuer.AsynchronousOp<Void> {
+ ByteString subscriberId;
+ MessageSeqId consumeSeqId;
+
+ public ConsumeOp(ByteString topic, ByteString subscriberId, MessageSeqId consumeSeqId, Callback<Void> callback,
+ Object ctx) {
+ queuer.super(topic, callback, ctx);
+ this.subscriberId = subscriberId;
+ this.consumeSeqId = consumeSeqId;
+ }
+
+ @Override
+ public void run() {
+ Map<ByteString, InMemorySubscriptionState> topicSubs = top2sub2seq.get(topic);
+ if (topicSubs == null) {
+ cb.operationFinished(ctx, null);
+ return;
+ }
+
+ InMemorySubscriptionState subState = topicSubs.get(subscriberId);
+ if (subState == null) {
+ cb.operationFinished(ctx, null);
+ return;
+ }
+
+ if (subState.setLastConsumeSeqId(consumeSeqId, cfg.getConsumeInterval())) {
+ updateSubscriptionState(topic, subscriberId, subState.getSubscriptionState(), cb, ctx);
+ } else {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Only advanced consume pointer in memory, will persist later, topic: "
+ + topic.toStringUtf8() + " subscriberId: " + subscriberId.toStringUtf8()
+ + " persistentState: " + SubscriptionStateUtils.toString(subState.getSubscriptionState())
+ + " in-memory consume-id: "
+ + MessageIdUtils.msgIdToReadableString(subState.getLastConsumeSeqId()));
+ }
+ cb.operationFinished(ctx, null);
+ }
+
+ }
+ }
+
+ @Override
+ public void setConsumeSeqIdForSubscriber(ByteString topic, ByteString subscriberId, MessageSeqId consumeSeqId,
+ Callback<Void> callback, Object ctx) {
+ queuer.pushAndMaybeRun(topic, new ConsumeOp(topic, subscriberId, consumeSeqId, callback, ctx));
+ }
+
+ private class UnsubscribeOp extends TopicOpQueuer.AsynchronousOp<Void> {
+ ByteString subscriberId;
+
+ public UnsubscribeOp(ByteString topic, ByteString subscriberId, Callback<Void> callback, Object ctx) {
+ queuer.super(topic, callback, ctx);
+ this.subscriberId = subscriberId;
+ }
+
+ @Override
+ public void run() {
+ final Map<ByteString, InMemorySubscriptionState> topicSubscriptions = top2sub2seq.get(topic);
+ if (topicSubscriptions == null) {
+ cb.operationFailed(ctx, new PubSubException.ServerNotResponsibleForTopicException(""));
+ return;
+ }
+
+ if (!topicSubscriptions.containsKey(subscriberId)) {
+ cb.operationFailed(ctx, new PubSubException.ClientNotSubscribedException(""));
+ return;
+ }
+
+ deleteSubscriptionState(topic, subscriberId, new Callback<Void>() {
+ @Override
+ public void operationFailed(Object ctx, PubSubException exception) {
+ cb.operationFailed(ctx, exception);
+ }
+
+ @Override
+ public void operationFinished(Object ctx, Void resultOfOperation) {
+ topicSubscriptions.remove(subscriberId);
+ // Notify listeners if necessary.
+ if (!SubscriptionStateUtils.isHubSubscriber(subscriberId)
+ && topic2LocalCounts.get(topic).decrementAndGet() == 0)
+ notifyUnsubcribe(topic);
+ cb.operationFinished(ctx, null);
+ }
+ }, ctx);
+
+ }
+
+ }
+
+ @Override
+ public void unsubscribe(ByteString topic, ByteString subscriberId, Callback<Void> callback, Object ctx) {
+ queuer.pushAndMaybeRun(topic, new UnsubscribeOp(topic, subscriberId, callback, ctx));
+ }
+
+ /**
+ * Not thread-safe.
+ */
+ @Override
+ public void addListener(SubscriptionEventListener listener) {
+ listeners.add(listener);
+ }
+
+ /**
+ * Method to stop this class gracefully including releasing any resources
+ * used and stopping all threads spawned.
+ */
+ public void stop() {
+ timer.cancel();
+ }
+
+ protected abstract void createSubscriptionState(final ByteString topic, ByteString subscriberId,
+ SubscriptionState state, Callback<Void> callback, Object ctx);
+
+ protected abstract void updateSubscriptionState(ByteString topic, ByteString subscriberId, SubscriptionState state,
+ Callback<Void> callback, Object ctx);
+
+ protected abstract void deleteSubscriptionState(ByteString topic, ByteString subscriberId, Callback<Void> callback,
+ Object ctx);
+
+}
Added: hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/subscriptions/AllToAllTopologyFilter.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/subscriptions/AllToAllTopologyFilter.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/subscriptions/AllToAllTopologyFilter.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/subscriptions/AllToAllTopologyFilter.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server.subscriptions;
+
+import com.google.protobuf.ByteString;
+import org.apache.hedwig.protocol.PubSubProtocol.Message;
+
+public class AllToAllTopologyFilter implements MessageFilter {
+
+ ByteString subscriberRegion;
+
+ public AllToAllTopologyFilter(ByteString subscriberRegion) {
+ this.subscriberRegion = subscriberRegion;
+ }
+
+ public boolean testMessage(Message message) {
+ if (message.getSrcRegion().equals(subscriberRegion)) {
+ return false;
+ } else {
+ return true;
+ }
+ }
+
+}
Added: hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/subscriptions/InMemorySubscriptionManager.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/subscriptions/InMemorySubscriptionManager.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/subscriptions/InMemorySubscriptionManager.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/subscriptions/InMemorySubscriptionManager.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server.subscriptions;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledExecutorService;
+
+import com.google.protobuf.ByteString;
+import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionState;
+import org.apache.hedwig.server.common.ServerConfiguration;
+import org.apache.hedwig.server.persistence.PersistenceManager;
+import org.apache.hedwig.server.topics.TopicManager;
+import org.apache.hedwig.util.Callback;
+
+public class InMemorySubscriptionManager extends AbstractSubscriptionManager {
+
+ public InMemorySubscriptionManager(TopicManager tm, PersistenceManager pm, ServerConfiguration conf, ScheduledExecutorService scheduler) {
+ super(conf, tm, pm, scheduler);
+ }
+
+ @Override
+ protected void createSubscriptionState(ByteString topic, ByteString subscriberId, SubscriptionState state,
+ Callback<Void> callback, Object ctx) {
+ // nothing to do, in-memory info is already recorded by base class
+ callback.operationFinished(ctx, null);
+ }
+
+ @Override
+ protected void deleteSubscriptionState(ByteString topic, ByteString subscriberId, Callback<Void> callback,
+ Object ctx) {
+ // nothing to do, in-memory info is already deleted by base class
+ callback.operationFinished(ctx, null);
+ }
+
+ @Override
+ protected void updateSubscriptionState(ByteString topic, ByteString subscriberId, SubscriptionState state,
+ Callback<Void> callback, Object ctx) {
+ // nothing to do, in-memory info is already updated by base class
+ callback.operationFinished(ctx, null);
+ }
+
+ @Override
+ public void lostTopic(ByteString topic) {
+ // Intentionally do nothing, so that we dont lose in-memory information
+ }
+
+ @Override
+ protected void readSubscriptions(ByteString topic,
+ Callback<Map<ByteString, InMemorySubscriptionState>> cb, Object ctx) {
+ // Since we don't lose in-memory information on lostTopic, we can just
+ // return that back
+ Map<ByteString, InMemorySubscriptionState> topicSubs = top2sub2seq.get(topic);
+
+ if (topicSubs != null) {
+ cb.operationFinished(ctx, topicSubs);
+ } else {
+ cb.operationFinished(ctx, new ConcurrentHashMap<ByteString, InMemorySubscriptionState>());
+ }
+
+ }
+
+}
Added: hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/subscriptions/InMemorySubscriptionState.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/subscriptions/InMemorySubscriptionState.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/subscriptions/InMemorySubscriptionState.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/subscriptions/InMemorySubscriptionState.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server.subscriptions;
+
+import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId;
+import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionState;
+
+public class InMemorySubscriptionState {
+ SubscriptionState subscriptionState;
+ MessageSeqId lastConsumeSeqId;
+
+ public InMemorySubscriptionState(SubscriptionState subscriptionState, MessageSeqId lastConsumeSeqId) {
+ this.subscriptionState = subscriptionState;
+ this.lastConsumeSeqId = lastConsumeSeqId;
+ }
+
+ public InMemorySubscriptionState(SubscriptionState subscriptionState) {
+ this(subscriptionState, subscriptionState.getMsgId());
+ }
+
+ public SubscriptionState getSubscriptionState() {
+ return subscriptionState;
+ }
+
+ public MessageSeqId getLastConsumeSeqId() {
+ return lastConsumeSeqId;
+ }
+
+ /**
+ *
+ * @param lastConsumeSeqId
+ * @param consumeInterval
+ * The amount of laziness we want in persisting the consume
+ * pointers
+ * @return true if the resulting structure needs to be persisted, false
+ * otherwise
+ */
+ public boolean setLastConsumeSeqId(MessageSeqId lastConsumeSeqId, int consumeInterval) {
+ this.lastConsumeSeqId = lastConsumeSeqId;
+
+ if (lastConsumeSeqId.getLocalComponent() - subscriptionState.getMsgId().getLocalComponent() < consumeInterval) {
+ return false;
+ }
+
+ subscriptionState = SubscriptionState.newBuilder(subscriptionState).setMsgId(lastConsumeSeqId).build();
+ return true;
+ }
+
+}
Added: hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/subscriptions/MessageFilter.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/subscriptions/MessageFilter.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/subscriptions/MessageFilter.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/subscriptions/MessageFilter.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server.subscriptions;
+
+import org.apache.hedwig.protocol.PubSubProtocol.Message;
+
+public interface MessageFilter {
+
+ /**
+ * Tests whether a particular message passes the filter or not
+ *
+ * @param message
+ * @return
+ */
+ public boolean testMessage(Message message);
+}
Added: hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/subscriptions/SubscriptionEventListener.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/subscriptions/SubscriptionEventListener.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/subscriptions/SubscriptionEventListener.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/subscriptions/SubscriptionEventListener.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server.subscriptions;
+
+import com.google.protobuf.ByteString;
+import org.apache.hedwig.util.Callback;
+
+/**
+ * For listening to events that are issued by a SubscriptionManager.
+ *
+ */
+public interface SubscriptionEventListener {
+
+ /**
+ * Called by the subscription manager when it previously had zero local
+ * subscribers for a topic and is currently accepting its first local
+ * subscriber.
+ *
+ * @param topic
+ * The topic of interest.
+ * @param synchronous
+ * Whether this request was actually initiated by a new local
+ * subscriber, or whether it was an existing subscription
+ * inherited by the hub (e.g. when recovering the state from ZK).
+ * @param cb
+ * The subscription will not complete until success is called on
+ * this callback. An error on cb will result in a subscription
+ * error.
+ */
+ public void onFirstLocalSubscribe(ByteString topic, boolean synchronous, Callback<Void> cb);
+
+ /**
+ * Called by the SubscriptionManager when it previously had non-zero local
+ * subscribers for a topic and is currently dropping its last local
+ * subscriber. This is fully asynchronous so there is no callback.
+ *
+ * @param topic
+ * The topic of interest.
+ */
+ public void onLastLocalUnsubscribe(ByteString topic);
+
+}
Added: hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/subscriptions/SubscriptionManager.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/subscriptions/SubscriptionManager.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/subscriptions/SubscriptionManager.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/subscriptions/SubscriptionManager.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server.subscriptions;
+
+import com.google.protobuf.ByteString;
+import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId;
+import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest;
+import org.apache.hedwig.util.Callback;
+
+/**
+ * All methods are thread-safe.
+ */
+public interface SubscriptionManager {
+
+ /**
+ *
+ * Register a new subscription for the given subscriber for the given topic.
+ * This method should reliably persist the existence of the subscription in
+ * a way that it can't be lost. If the subscription already exists,
+ * depending on the create or attach flag in the subscribe request, an
+ * exception may be returned.
+ *
+ * This is an asynchronous method.
+ *
+ * @param topic
+ * @param subRequest
+ * @param consumeSeqId
+ * The seqId to start serving the subscription from, if this is a
+ * brand new subscription
+ * @param callback
+ * The seq id returned by the callback is where serving should
+ * start from
+ * @param ctx
+ */
+ public void serveSubscribeRequest(ByteString topic, SubscribeRequest subRequest, MessageSeqId consumeSeqId,
+ Callback<MessageSeqId> callback, Object ctx);
+
+ /**
+ * Set the consume position of a given subscriber on a given topic. Note
+ * that this method need not persist the consume position immediately but
+ * can be lazy and persist it later asynchronously, if that is more
+ * efficient.
+ *
+ * @param topic
+ * @param subscriberId
+ * @param consumeSeqId
+ */
+ public void setConsumeSeqIdForSubscriber(ByteString topic, ByteString subscriberId, MessageSeqId consumeSeqId,
+ Callback<Void> callback, Object ctx);
+
+ /**
+ * Delete a particular subscription
+ *
+ * @param topic
+ * @param subscriberId
+ */
+ public void unsubscribe(ByteString topic, ByteString subscriberId, Callback<Void> callback, Object ctx);
+
+ // Management API methods that we will fill in later
+ // /**
+ // * Get the ids of all subscribers for a given topic
+ // *
+ // * @param topic
+ // * @return A list of subscriber ids that are currently subscribed to the
+ // * given topic
+ // */
+ // public List<ByteString> getSubscriptionsForTopic(ByteString topic);
+ //
+ // /**
+ // * Get the topics to which a given subscriber is subscribed to
+ // *
+ // * @param subscriberId
+ // * @return A list of the topics to which the given subscriber is
+ // subscribed
+ // * to
+ // * @throws ServiceDownException
+ // * If there is an error in looking up the subscription
+ // * information
+ // */
+ // public List<ByteString> getTopicsForSubscriber(ByteString subscriberId)
+ // throws ServiceDownException;
+
+ /**
+ * Add a listener that is notified when topic-subscription pairs are added
+ * or removed.
+ */
+ public void addListener(SubscriptionEventListener listener);
+
+}
Added: hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/subscriptions/TrueFilter.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/subscriptions/TrueFilter.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/subscriptions/TrueFilter.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/subscriptions/TrueFilter.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server.subscriptions;
+
+import org.apache.hedwig.protocol.PubSubProtocol.Message;
+
+public class TrueFilter implements MessageFilter {
+ protected static TrueFilter instance = new TrueFilter();
+
+ public static TrueFilter instance() {
+ return instance;
+ }
+
+ public boolean testMessage(Message message) {
+ return true;
+ }
+}
Added: hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/subscriptions/ZkSubscriptionManager.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/subscriptions/ZkSubscriptionManager.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/subscriptions/ZkSubscriptionManager.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/subscriptions/ZkSubscriptionManager.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,226 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server.subscriptions;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.KeeperException.Code;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.data.Stat;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.InvalidProtocolBufferException;
+import org.apache.hedwig.exceptions.PubSubException;
+import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionState;
+import org.apache.hedwig.protoextensions.SubscriptionStateUtils;
+import org.apache.hedwig.server.common.ServerConfiguration;
+import org.apache.hedwig.server.persistence.PersistenceManager;
+import org.apache.hedwig.server.topics.TopicManager;
+import org.apache.hedwig.util.Callback;
+import org.apache.hedwig.zookeeper.SafeAsyncZKCallback;
+import org.apache.hedwig.zookeeper.ZkUtils;
+
+public class ZkSubscriptionManager extends AbstractSubscriptionManager {
+
+ ZooKeeper zk;
+
+ protected final static Logger logger = Logger.getLogger(ZkSubscriptionManager.class);
+
+ public ZkSubscriptionManager(ZooKeeper zk, TopicManager topicMgr, PersistenceManager pm, ServerConfiguration cfg,
+ ScheduledExecutorService scheduler) {
+ super(cfg, topicMgr, pm, scheduler);
+ this.zk = zk;
+ }
+
+ private StringBuilder topicSubscribersPath(StringBuilder sb, ByteString topic) {
+ return cfg.getZkTopicPath(sb, topic).append("/subscribers");
+ }
+
+ private String topicSubscriberPath(ByteString topic, ByteString subscriber) {
+ return topicSubscribersPath(new StringBuilder(), topic).append("/").append(subscriber.toStringUtf8())
+ .toString();
+ }
+
+ @Override
+ protected void readSubscriptions(final ByteString topic,
+ final Callback<Map<ByteString, InMemorySubscriptionState>> cb, final Object ctx) {
+
+ String topicSubscribersPath = topicSubscribersPath(new StringBuilder(), topic).toString();
+ zk.getChildren(topicSubscribersPath, false, new SafeAsyncZKCallback.ChildrenCallback() {
+ @Override
+ public void safeProcessResult(int rc, String path, final Object ctx, final List<String> children) {
+
+ if (rc != Code.OK.intValue() && rc != Code.NONODE.intValue()) {
+ KeeperException e = ZkUtils.logErrorAndCreateZKException("Could not read subscribers for topic "
+ + topic.toStringUtf8(), path, rc);
+ cb.operationFailed(ctx, new PubSubException.ServiceDownException(e));
+ return;
+ }
+
+ final Map<ByteString, InMemorySubscriptionState> topicSubs = new ConcurrentHashMap<ByteString, InMemorySubscriptionState>();
+
+ if (rc == Code.NONODE.intValue() || children.size() == 0) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("No subscriptions found while acquiring topic: " + topic.toStringUtf8());
+ }
+ cb.operationFinished(ctx, topicSubs);
+ return;
+ }
+
+ final AtomicBoolean failed = new AtomicBoolean();
+ final AtomicInteger count = new AtomicInteger();
+
+ for (final String child : children) {
+
+ final ByteString subscriberId = ByteString.copyFromUtf8(child);
+ final String childPath = path + "/" + child;
+
+ zk.getData(childPath, false, new SafeAsyncZKCallback.DataCallback() {
+ @Override
+ public void safeProcessResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
+
+ if (rc != Code.OK.intValue()) {
+ KeeperException e = ZkUtils.logErrorAndCreateZKException(
+ "Could not read subscription data for topic: " + topic.toStringUtf8()
+ + ", subscriberId: " + subscriberId.toStringUtf8(), path, rc);
+ reportFailure(new PubSubException.ServiceDownException(e));
+ return;
+ }
+
+ if (failed.get()) {
+ return;
+ }
+
+ SubscriptionState state;
+
+ try {
+ state = SubscriptionState.parseFrom(data);
+ } catch (InvalidProtocolBufferException ex) {
+ String msg = "Failed to deserialize state for topic: " + topic.toStringUtf8()
+ + " subscriberId: " + subscriberId.toStringUtf8();
+ logger.error(msg, ex);
+ reportFailure(new PubSubException.UnexpectedConditionException(msg));
+ return;
+ }
+
+ if (logger.isDebugEnabled()) {
+ logger.debug("Found subscription while acquiring topic: " + topic.toStringUtf8()
+ + " subscriberId: " + child + "state: "
+ + SubscriptionStateUtils.toString(state));
+ }
+
+ topicSubs.put(subscriberId, new InMemorySubscriptionState(state));
+ if (count.incrementAndGet() == children.size()) {
+ assert topicSubs.size() == count.get();
+ cb.operationFinished(ctx, topicSubs);
+ }
+ }
+
+ private void reportFailure(PubSubException e) {
+ if (failed.compareAndSet(false, true))
+ cb.operationFailed(ctx, e);
+ }
+ }, ctx);
+ }
+ }
+ }, ctx);
+ }
+
+ @Override
+ protected void createSubscriptionState(final ByteString topic, final ByteString subscriberId,
+ final SubscriptionState state, final Callback<Void> callback, final Object ctx) {
+ ZkUtils.createFullPathOptimistic(zk, topicSubscriberPath(topic, subscriberId), state.toByteArray(),
+ Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, new SafeAsyncZKCallback.StringCallback() {
+
+ @Override
+ public void safeProcessResult(int rc, String path, Object ctx, String name) {
+ if (rc == Code.OK.intValue()) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Successfully recorded subscription for topic: " + topic.toStringUtf8()
+ + " subscriberId: " + subscriberId.toStringUtf8() + " state: "
+ + SubscriptionStateUtils.toString(state));
+ }
+ callback.operationFinished(ctx, null);
+ } else {
+ KeeperException ke = ZkUtils.logErrorAndCreateZKException(
+ "Could not record new subscription for topic: " + topic.toStringUtf8()
+ + " subscriberId: " + subscriberId.toStringUtf8(), path, rc);
+ callback.operationFailed(ctx, new PubSubException.ServiceDownException(ke));
+ }
+ }
+ }, ctx);
+ }
+
+ @Override
+ protected void updateSubscriptionState(final ByteString topic, final ByteString subscriberId,
+ final SubscriptionState state, final Callback<Void> callback, final Object ctx) {
+ zk.setData(topicSubscriberPath(topic, subscriberId), state.toByteArray(), -1,
+ new SafeAsyncZKCallback.StatCallback() {
+ @Override
+ public void safeProcessResult(int rc, String path, Object ctx, Stat stat) {
+ if (rc != Code.OK.intValue()) {
+ KeeperException e = ZkUtils.logErrorAndCreateZKException("Topic: " + topic.toStringUtf8()
+ + " subscriberId: " + subscriberId.toStringUtf8()
+ + " could not set subscription state: " + SubscriptionStateUtils.toString(state),
+ path, rc);
+ callback.operationFailed(ctx, new PubSubException.ServiceDownException(e));
+ } else {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Successfully updated subscription for topic: " + topic.toStringUtf8()
+ + " subscriberId: " + subscriberId.toStringUtf8() + " state: "
+ + SubscriptionStateUtils.toString(state));
+ }
+
+ callback.operationFinished(ctx, null);
+ }
+ }
+ }, ctx);
+ }
+
+ @Override
+ protected void deleteSubscriptionState(final ByteString topic, final ByteString subscriberId,
+ final Callback<Void> callback, final Object ctx) {
+ zk.delete(topicSubscriberPath(topic, subscriberId), -1, new SafeAsyncZKCallback.VoidCallback() {
+ @Override
+ public void safeProcessResult(int rc, String path, Object ctx) {
+ if (rc == Code.OK.intValue()) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Successfully deleted subscription for topic: " + topic.toStringUtf8()
+ + " subscriberId: " + subscriberId.toStringUtf8());
+ }
+
+ callback.operationFinished(ctx, null);
+ return;
+ }
+
+ KeeperException e = ZkUtils.logErrorAndCreateZKException("Topic: " + topic.toStringUtf8()
+ + " subscriberId: " + subscriberId.toStringUtf8() + " failed to delete subscription", path, rc);
+ callback.operationFailed(ctx, new PubSubException.ServiceDownException(e));
+ }
+ }, ctx);
+ }
+
+}
Added: hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/topics/AbstractTopicManager.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/topics/AbstractTopicManager.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/topics/AbstractTopicManager.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/topics/AbstractTopicManager.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,187 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server.topics;
+
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.log4j.Logger;
+
+import com.google.protobuf.ByteString;
+import org.apache.hedwig.exceptions.PubSubException;
+import org.apache.hedwig.server.common.ServerConfiguration;
+import org.apache.hedwig.server.common.TopicOpQueuer;
+import org.apache.hedwig.util.Callback;
+import org.apache.hedwig.util.CallbackUtils;
+import org.apache.hedwig.util.HedwigSocketAddress;
+
+public abstract class AbstractTopicManager implements TopicManager {
+ /**
+ * My name.
+ */
+ protected HedwigSocketAddress addr;
+
+ /**
+ * Topic change listeners.
+ */
+ protected ArrayList<TopicOwnershipChangeListener> listeners = new ArrayList<TopicOwnershipChangeListener>();
+
+ /**
+ * List of topics I believe I am responsible for.
+ */
+ protected Set<ByteString> topics = Collections.synchronizedSet(new HashSet<ByteString>());
+
+ protected TopicOpQueuer queuer;
+ protected ServerConfiguration cfg;
+ protected ScheduledExecutorService scheduler;
+
+ private static final Logger logger = Logger.getLogger(AbstractTopicManager.class);
+
+ private class GetOwnerOp extends TopicOpQueuer.AsynchronousOp<HedwigSocketAddress> {
+ public boolean shouldClaim;
+
+ public GetOwnerOp(final ByteString topic, boolean shouldClaim,
+ final Callback<HedwigSocketAddress> cb, Object ctx) {
+ queuer.super(topic, cb, ctx);
+ this.shouldClaim = shouldClaim;
+ }
+
+ @Override
+ public void run() {
+ realGetOwner(topic, shouldClaim, cb, ctx);
+ }
+ }
+
+ private class ReleaseOp extends TopicOpQueuer.AsynchronousOp<Void> {
+ public ReleaseOp(ByteString topic, Callback<Void> cb, Object ctx) {
+ queuer.super(topic, cb, ctx);
+ }
+
+ @Override
+ public void run() {
+ if (!topics.contains(topic)) {
+ cb.operationFinished(ctx, null);
+ return;
+ }
+ realReleaseTopic(topic, cb, ctx);
+ }
+ }
+
+ public AbstractTopicManager(ServerConfiguration cfg, ScheduledExecutorService scheduler)
+ throws UnknownHostException {
+ this.cfg = cfg;
+ this.queuer = new TopicOpQueuer(scheduler);
+ this.scheduler = scheduler;
+ addr = cfg.getServerAddr();
+ }
+
+ @Override
+ public synchronized void addTopicOwnershipChangeListener(TopicOwnershipChangeListener listener) {
+ listeners.add(listener);
+ }
+
+ protected final synchronized void notifyListenersAndAddToOwnedTopics(final ByteString topic,
+ final Callback<HedwigSocketAddress> originalCallback, final Object originalContext) {
+
+ Callback<Void> postCb = new Callback<Void>() {
+
+ @Override
+ public void operationFinished(Object ctx, Void resultOfOperation) {
+ topics.add(topic);
+ if (cfg.getRetentionSecs() > 0) {
+ scheduler.schedule(new Runnable() {
+ @Override
+ public void run() {
+ // Enqueue a release operation. (Recall that release
+ // doesn't "fail" even if the topic is missing.)
+ releaseTopic(topic, new Callback<Void>() {
+
+ @Override
+ public void operationFailed(Object ctx, PubSubException exception) {
+ logger.error("failure that should never happen when periodically releasing topic "
+ + topic, exception);
+ }
+
+ @Override
+ public void operationFinished(Object ctx, Void resultOfOperation) {
+ logger.debug("successful periodic release of topic " + topic);
+ }
+
+ }, null);
+ }
+ }, cfg.getRetentionSecs(), TimeUnit.SECONDS);
+ }
+ originalCallback.operationFinished(originalContext, addr);
+ }
+
+ @Override
+ public void operationFailed(Object ctx, PubSubException exception) {
+ // TODO: optimization: we can release this as soon as we experience the first error.
+ realReleaseTopic(topic, CallbackUtils.curry(originalCallback, addr), originalContext);
+ originalCallback.operationFailed(ctx, exception);
+ }
+ };
+
+ Callback<Void> mcb = CallbackUtils.multiCallback(listeners.size(), postCb, null);
+ for (TopicOwnershipChangeListener listener : listeners) {
+ listener.acquiredTopic(topic, mcb, null);
+ }
+ }
+
+ private void realReleaseTopic(ByteString topic, Callback<Void> callback, Object ctx) {
+ for (TopicOwnershipChangeListener listener : listeners)
+ listener.lostTopic(topic);
+ topics.remove(topic);
+ postReleaseCleanup(topic, callback, ctx);
+ }
+
+ @Override
+ public final void getOwner(ByteString topic, boolean shouldClaim,
+ Callback<HedwigSocketAddress> cb, Object ctx) {
+ queuer.pushAndMaybeRun(topic, new GetOwnerOp(topic, shouldClaim, cb, ctx));
+ }
+
+ @Override
+ public final void releaseTopic(ByteString topic, Callback<Void> cb, Object ctx) {
+ queuer.pushAndMaybeRun(topic, new ReleaseOp(topic, cb, ctx));
+ }
+
+ /**
+ * This method should "return" the owner of the topic if one has been chosen
+ * already. If there is no pre-chosen owner, either this hub or some other
+ * should be chosen based on the shouldClaim parameter. If its ends up
+ * choosing this hub as the owner, the {@code
+ * AbstractTopicManager#notifyListenersAndAddToOwnedTopics(ByteString,
+ * OperationCallback, Object)} method must be called.
+ *
+ */
+ protected abstract void realGetOwner(ByteString topic, boolean shouldClaim,
+ Callback<HedwigSocketAddress> cb, Object ctx);
+
+ /**
+ * The method should do any cleanup necessary to indicate to other hubs that
+ * this topic has been released
+ */
+ protected abstract void postReleaseCleanup(ByteString topic, Callback<Void> cb, Object ctx);
+
+}
Added: hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/topics/TopicManager.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/topics/TopicManager.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/topics/TopicManager.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/topics/TopicManager.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server.topics;
+
+import com.google.protobuf.ByteString;
+import org.apache.hedwig.exceptions.PubSubException.ServiceDownException;
+import org.apache.hedwig.server.persistence.PersistenceManager;
+import org.apache.hedwig.util.Callback;
+import org.apache.hedwig.util.HedwigSocketAddress;
+
+/**
+ * An implementor of this interface is basically responsible for ensuring that
+ * there is at most a single host responsible for a given topic at a given time.
+ * Also, it is desirable that on a host failure, some other hosts in the cluster
+ * claim responsibilities for the topics that were at the failed host. On
+ * claiming responsibility for a topic, a host should call its
+ * {@link TopicOwnershipChangeListener}.
+ *
+ */
+
+public interface TopicManager {
+ /**
+ * Get the name of the host responsible for the given topic.
+ *
+ * @param topic
+ * The topic whose owner to get.
+ * @param cb
+ * Callback.
+ * @return The name of host responsible for the given topic
+ * @throws ServiceDownException
+ * If there is an error looking up the information
+ */
+ public void getOwner(ByteString topic, boolean shouldClaim,
+ Callback<HedwigSocketAddress> cb, Object ctx);
+
+ /**
+ * Whenever the topic manager finds out that the set of topics owned by this
+ * node has changed, it can notify a set of
+ * {@link TopicOwnershipChangeListener} objects. Any component of the system
+ * (e.g., the {@link PersistenceManager}) can listen for such changes by
+ * implementing the {@link TopicOwnershipChangeListener} interface and
+ * registering themselves with the {@link TopicManager} using this method.
+ * It is important that the {@link TopicOwnershipChangeListener} reacts
+ * immediately to such notifications, and with no blocking (because multiple
+ * listeners might need to be informed and they are all informed by the same
+ * thread).
+ *
+ * @param listener
+ */
+ public void addTopicOwnershipChangeListener(TopicOwnershipChangeListener listener);
+
+ /**
+ * Give up ownership of a topic. If I don't own it, do nothing.
+ *
+ * @throws ServiceDownException
+ * If there is an error in claiming responsibility for the topic
+ */
+ public void releaseTopic(ByteString topic, Callback<Void> cb, Object ctx);
+
+}
Added: hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/topics/TopicOwnershipChangeListener.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/topics/TopicOwnershipChangeListener.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/topics/TopicOwnershipChangeListener.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/topics/TopicOwnershipChangeListener.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server.topics;
+
+import com.google.protobuf.ByteString;
+import org.apache.hedwig.util.Callback;
+
+public interface TopicOwnershipChangeListener {
+
+ public void acquiredTopic(ByteString topic, Callback<Void> callback, Object ctx);
+
+ public void lostTopic(ByteString topic);
+}
Added: hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/topics/TrivialOwnAllTopicManager.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/topics/TrivialOwnAllTopicManager.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/topics/TrivialOwnAllTopicManager.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/topics/TrivialOwnAllTopicManager.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server.topics;
+
+import java.net.UnknownHostException;
+import java.util.concurrent.ScheduledExecutorService;
+
+import com.google.protobuf.ByteString;
+import org.apache.hedwig.server.common.ServerConfiguration;
+import org.apache.hedwig.util.Callback;
+import org.apache.hedwig.util.HedwigSocketAddress;
+
+public class TrivialOwnAllTopicManager extends AbstractTopicManager {
+
+ public TrivialOwnAllTopicManager(ServerConfiguration cfg, ScheduledExecutorService scheduler)
+ throws UnknownHostException {
+ super(cfg, scheduler);
+ }
+
+ @Override
+ protected void realGetOwner(ByteString topic, boolean shouldClaim,
+ Callback<HedwigSocketAddress> cb, Object ctx) {
+
+ if (topics.contains(topic)) {
+ cb.operationFinished(ctx, addr);
+ return;
+ }
+
+ notifyListenersAndAddToOwnedTopics(topic, cb, ctx);
+ }
+
+ @Override
+ protected void postReleaseCleanup(ByteString topic, Callback<Void> cb, Object ctx) {
+ // No cleanup to do
+ cb.operationFinished(ctx, null);
+ }
+}
|