hadoop-zookeeper-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From maha...@apache.org
Subject svn commit: r987314 [13/16] - in /hadoop/zookeeper/trunk: ./ src/contrib/hedwig/ src/contrib/hedwig/client/ src/contrib/hedwig/client/src/ src/contrib/hedwig/client/src/main/ src/contrib/hedwig/client/src/main/cpp/ src/contrib/hedwig/client/src/main/cp...
Date Thu, 19 Aug 2010 21:25:22 GMT
Added: hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/regions/RegionManager.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/regions/RegionManager.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/regions/RegionManager.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/regions/RegionManager.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server.regions;
+
+import java.util.ArrayList;
+import java.util.concurrent.ScheduledExecutorService;
+
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.ZooKeeper;
+
+import com.google.protobuf.ByteString;
+import org.apache.hedwig.client.api.MessageHandler;
+import org.apache.hedwig.client.netty.HedwigSubscriber;
+import org.apache.hedwig.exceptions.PubSubException;
+import org.apache.hedwig.protocol.PubSubProtocol.Message;
+import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId;
+import org.apache.hedwig.protocol.PubSubProtocol.RegionSpecificSeqId;
+import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach;
+import org.apache.hedwig.protoextensions.SubscriptionStateUtils;
+import org.apache.hedwig.server.common.ServerConfiguration;
+import org.apache.hedwig.server.common.TopicOpQueuer;
+import org.apache.hedwig.server.persistence.PersistRequest;
+import org.apache.hedwig.server.persistence.PersistenceManager;
+import org.apache.hedwig.server.subscriptions.SubscriptionEventListener;
+import org.apache.hedwig.util.Callback;
+import org.apache.hedwig.util.CallbackUtils;
+import org.apache.hedwig.util.HedwigSocketAddress;
+
+public class RegionManager implements SubscriptionEventListener {
+
+    protected static final Logger LOGGER = Logger.getLogger(RegionManager.class);
+
+    private final ByteString mySubId;
+    private final PersistenceManager pm;
+    private final ArrayList<HedwigHubClient> clients = new ArrayList<HedwigHubClient>();
+    private final TopicOpQueuer queue;
+
+    public RegionManager(final PersistenceManager pm, final ServerConfiguration cfg, final ZooKeeper zk,
+            ScheduledExecutorService scheduler, HedwigHubClientFactory hubClientFactory) {
+        this.pm = pm;
+        mySubId = ByteString.copyFromUtf8(SubscriptionStateUtils.HUB_SUBSCRIBER_PREFIX + cfg.getMyRegion());
+        queue = new TopicOpQueuer(scheduler);
+        for (final String hub : cfg.getRegions()) {
+            clients.add(hubClientFactory.create(new HedwigSocketAddress(hub)));
+        }
+    }
+
+    @Override
+    public void onFirstLocalSubscribe(final ByteString topic, final boolean synchronous, final Callback<Void> cb) {
+        // Whenever we acquire a topic due to a (local) subscribe, subscribe on
+        // it to all the other regions (currently using simple all-to-all
+        // topology).
+        queue.pushAndMaybeRun(topic, queue.new AsynchronousOp<Void>(topic, cb, null) {
+            @Override
+            public void run() {
+                Callback<Void> postCb = synchronous ? cb : CallbackUtils.logger(LOGGER, Level.DEBUG, Level.ERROR,
+                        "all cross-region subscriptions succeeded", "at least one cross-region subscription failed");
+                final Callback<Void> mcb = CallbackUtils.multiCallback(clients.size(), postCb, ctx);
+                for (final HedwigHubClient client : clients) {
+                    final HedwigSubscriber sub = client.getSubscriber();
+                    sub.asyncSubscribe(topic, mySubId, CreateOrAttach.CREATE_OR_ATTACH, new Callback<Void>() {
+                        @Override
+                        public void operationFinished(Object ctx, Void resultOfOperation) {
+                            if (LOGGER.isDebugEnabled())
+                                LOGGER.debug("cross-region subscription done for topic " + topic.toStringUtf8());
+                            try {
+                                sub.startDelivery(topic, mySubId, new MessageHandler() {
+                                    @Override
+                                    public void consume(final ByteString topic, ByteString subscriberId, Message msg,
+                                            final Callback<Void> callback, final Object context) {
+                                        // When messages are first published
+                                        // locally, the PublishHandler sets the
+                                        // source region in the Message.
+                                        if (msg.hasSrcRegion()) {
+                                            Message.newBuilder(msg).setMsgId(
+                                                    MessageSeqId.newBuilder(msg.getMsgId()).addRemoteComponents(
+                                                            RegionSpecificSeqId.newBuilder().setRegion(
+                                                                    msg.getSrcRegion()).setSeqId(
+                                                                    msg.getMsgId().getLocalComponent())));
+                                        }
+                                        pm.persistMessage(new PersistRequest(topic, msg, new Callback<Long>() {
+                                            @Override
+                                            public void operationFinished(Object ctx, Long resultOfOperation) {
+                                                if (LOGGER.isDebugEnabled())
+                                                    LOGGER.debug("cross-region recv-fwd succeeded for topic "
+                                                            + topic.toStringUtf8());
+                                                callback.operationFinished(context, null);
+                                            }
+
+                                            @Override
+                                            public void operationFailed(Object ctx, PubSubException exception) {
+                                                if (LOGGER.isDebugEnabled())
+                                                    LOGGER.error("cross-region recv-fwd failed for topic "
+                                                            + topic.toStringUtf8(), exception);
+                                                callback.operationFailed(context, exception);
+                                            }
+                                        }, null));
+                                    }
+                                });
+                                if (LOGGER.isDebugEnabled())
+                                    LOGGER.debug("cross-region start-delivery succeeded for topic "
+                                            + topic.toStringUtf8());
+                                mcb.operationFinished(ctx, null);
+                            } catch (PubSubException ex) {
+                                if (LOGGER.isDebugEnabled())
+                                    LOGGER.error(
+                                            "cross-region start-delivery failed for topic " + topic.toStringUtf8(), ex);
+                                mcb.operationFailed(ctx, ex);
+                            }
+                        }
+
+                        @Override
+                        public void operationFailed(Object ctx, PubSubException exception) {
+                            if (LOGGER.isDebugEnabled())
+                                LOGGER.error("cross-region subscribe failed for topic " + topic.toStringUtf8(),
+                                        exception);
+                            mcb.operationFailed(ctx, exception);
+                        }
+                    }, null);
+                }
+                if (!synchronous)
+                    cb.operationFinished(null, null);
+            }
+        });
+
+    }
+
+    @Override
+    public void onLastLocalUnsubscribe(final ByteString topic) {
+        // TODO may want to ease up on the eager unsubscribe; this is dropping
+        // cross-region subscriptions ASAP
+        queue.pushAndMaybeRun(topic, queue.new AsynchronousOp<Void>(topic, new Callback<Void>() {
+
+            @Override
+            public void operationFinished(Object ctx, Void result) {
+                if (LOGGER.isDebugEnabled())
+                    LOGGER.debug("cross-region unsubscribes succeeded for topic " + topic.toStringUtf8());
+            }
+
+            @Override
+            public void operationFailed(Object ctx, PubSubException exception) {
+                if (LOGGER.isDebugEnabled())
+                    LOGGER.error("cross-region unsubscribes failed for topic " + topic.toStringUtf8(), exception);
+            }
+
+        }, null) {
+            @Override
+            public void run() {
+                Callback<Void> mcb = CallbackUtils.multiCallback(clients.size(), cb, ctx);
+                for (final HedwigHubClient client : clients) {
+                    client.getSubscriber().asyncUnsubscribe(topic, mySubId, mcb, null);
+                }
+            }
+        });
+    }
+
+    // Method to shutdown and stop all of the cross-region Hedwig clients.
+    public void stop() {
+        for (HedwigHubClient client : clients) {
+            client.stop();
+        }
+    }
+
+}

Added: hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/ssl/SslServerContextFactory.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/ssl/SslServerContextFactory.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/ssl/SslServerContextFactory.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/ssl/SslServerContextFactory.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server.ssl;
+
+import java.security.KeyStore;
+
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.SSLContext;
+
+import org.apache.hedwig.client.ssl.SslContextFactory;
+import org.apache.hedwig.server.common.ServerConfiguration;
+
+public class SslServerContextFactory extends SslContextFactory {
+
+    public SslServerContextFactory(ServerConfiguration cfg) {
+        try {
+            // Load our Java key store.
+            KeyStore ks = KeyStore.getInstance("pkcs12");
+            ks.load(cfg.getCertStream(), cfg.getPassword().toCharArray());
+
+            // Like ssh-agent.
+            KeyManagerFactory kmf = KeyManagerFactory.getInstance("SunX509");
+            kmf.init(ks, cfg.getPassword().toCharArray());
+
+            // Create the SSL context.
+            ctx = SSLContext.getInstance("TLS");
+            ctx.init(kmf.getKeyManagers(), getTrustManagers(), null);
+        } catch (Exception ex) {
+            throw new RuntimeException(ex);
+        }
+    }
+
+    @Override
+    protected boolean isClient() {
+        return false;
+    }
+
+}

Added: hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/subscriptions/AbstractSubscriptionManager.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/subscriptions/AbstractSubscriptionManager.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/subscriptions/AbstractSubscriptionManager.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/subscriptions/AbstractSubscriptionManager.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,450 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server.subscriptions;
+
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.log4j.Logger;
+
+import com.google.protobuf.ByteString;
+import org.apache.hedwig.exceptions.PubSubException;
+import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId;
+import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest;
+import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionState;
+import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach;
+import org.apache.hedwig.protoextensions.MessageIdUtils;
+import org.apache.hedwig.protoextensions.SubscriptionStateUtils;
+import org.apache.hedwig.server.common.ServerConfiguration;
+import org.apache.hedwig.server.common.TopicOpQueuer;
+import org.apache.hedwig.server.persistence.PersistenceManager;
+import org.apache.hedwig.server.topics.TopicManager;
+import org.apache.hedwig.server.topics.TopicOwnershipChangeListener;
+import org.apache.hedwig.util.Callback;
+import org.apache.hedwig.util.CallbackUtils;
+
+public abstract class AbstractSubscriptionManager implements SubscriptionManager, TopicOwnershipChangeListener {
+
+    ServerConfiguration cfg;
+    ConcurrentHashMap<ByteString, Map<ByteString, InMemorySubscriptionState>> top2sub2seq = new ConcurrentHashMap<ByteString, Map<ByteString, InMemorySubscriptionState>>();
+    static Logger logger = Logger.getLogger(AbstractSubscriptionManager.class);
+
+    TopicOpQueuer queuer;
+    private final ArrayList<SubscriptionEventListener> listeners = new ArrayList<SubscriptionEventListener>();
+    private final ConcurrentHashMap<ByteString, AtomicInteger> topic2LocalCounts = new ConcurrentHashMap<ByteString, AtomicInteger>();
+
+    // Handle to the PersistenceManager for the server so we can pass along the
+    // message consume pointers for each topic.
+    private final PersistenceManager pm;
+    // Timer for running a recurring thread task to get the minimum message
+    // sequence ID for each topic that all subscribers for it have consumed
+    // already. With that information, we can call the PersistenceManager to
+    // update it on the messages that are safe to be garbage collected.
+    private final Timer timer = new Timer(true);
+    // In memory mapping of topics to the minimum consumed message sequence ID
+    // for all subscribers to the topic.
+    private final ConcurrentHashMap<ByteString, Long> topic2MinConsumedMessagesMap = new ConcurrentHashMap<ByteString, Long>();
+
+    public AbstractSubscriptionManager(ServerConfiguration cfg, TopicManager tm, PersistenceManager pm,
+            ScheduledExecutorService scheduler) {
+        this.cfg = cfg;
+        queuer = new TopicOpQueuer(scheduler);
+        tm.addTopicOwnershipChangeListener(this);
+        this.pm = pm;
+        // Schedule the recurring MessagesConsumedTask only if a
+        // PersistenceManager is passed.
+        if (pm != null) {
+            timer.schedule(new MessagesConsumedTask(), 0, cfg.getMessagesConsumedThreadRunInterval());
+        }
+    }
+
+    /**
+     * This is the Timer Task for finding out for each topic, what the minimum
+     * consumed message by the subscribers are. This information is used to pass
+     * along to the server's PersistenceManager so it can garbage collect older
+     * topic messages that are no longer needed by the subscribers.
+     */
+    class MessagesConsumedTask extends TimerTask {
+        /**
+         * Implement the TimerTask's abstract run method.
+         */
+        @Override
+        public void run() {
+            // We are looping through relatively small in memory data structures
+            // so it should be safe to run this fairly often.
+            for (ByteString topic : top2sub2seq.keySet()) {
+                final Map<ByteString, InMemorySubscriptionState> topicSubscriptions = top2sub2seq.get(topic);
+                long minConsumedMessage = Long.MAX_VALUE;
+                // Loop through all subscribers to the current topic to find the
+                // minimum consumed message id. The consume pointers are
+                // persisted lazily so we'll use the stale in-memory value
+                // instead. This keeps things consistent in case of a server
+                // crash.
+                for (InMemorySubscriptionState curSubscription : topicSubscriptions.values()) {
+                    if (curSubscription.getSubscriptionState().getMsgId().getLocalComponent() < minConsumedMessage)
+                        minConsumedMessage = curSubscription.getSubscriptionState().getMsgId().getLocalComponent();
+                }
+                boolean callPersistenceManager = true;
+                // Don't call the PersistenceManager if nobody is subscribed to
+                // the topic yet, or the consume pointer has not changed since
+                // the last time, or if this is the initial subscription.
+                if (topicSubscriptions.isEmpty()
+                        || (topic2MinConsumedMessagesMap.containsKey(topic) && topic2MinConsumedMessagesMap.get(topic) == minConsumedMessage)
+                        || minConsumedMessage == 0) {
+                    callPersistenceManager = false;
+                }
+                // Pass the new consume pointers to the PersistenceManager.
+                if (callPersistenceManager) {
+                    topic2MinConsumedMessagesMap.put(topic, minConsumedMessage);
+                    pm.consumedUntil(topic, minConsumedMessage);
+                }
+            }
+        }
+    }
+
+    private class AcquireOp extends TopicOpQueuer.AsynchronousOp<Void> {
+        public AcquireOp(ByteString topic, Callback<Void> callback, Object ctx) {
+            queuer.super(topic, callback, ctx);
+        }
+
+        @Override
+        public void run() {
+            if (top2sub2seq.containsKey(topic)) {
+                cb.operationFinished(ctx, null);
+            }
+
+            readSubscriptions(topic, new Callback<Map<ByteString, InMemorySubscriptionState>>() {
+                @Override
+                public void operationFailed(Object ctx, PubSubException exception) {
+                    cb.operationFailed(ctx, exception);
+                }
+
+                @Override
+                public void operationFinished(final Object ctx,
+                        final Map<ByteString, InMemorySubscriptionState> resultOfOperation) {
+                    // We've just inherited a bunch of subscriber for this
+                    // topic, some of which may be local. If they are, then we
+                    // need to (1) notify listeners of this and (2) record the
+                    // number for bookkeeping so that future
+                    // subscribes/unsubscribes can efficiently notify listeners.
+
+                    // Count the number of local subscribers we just inherited.
+                    // This loop is OK since the number of subscribers per topic
+                    // is expected to be small.
+                    int localCount = 0;
+                    for (ByteString subscriberId : resultOfOperation.keySet())
+                        if (!SubscriptionStateUtils.isHubSubscriber(subscriberId))
+                            localCount++;
+                    topic2LocalCounts.put(topic, new AtomicInteger(localCount));
+
+                    // The final "commit" (and "abort") operations.
+                    final Callback<Void> cb2 = new Callback<Void>() {
+
+                        @Override
+                        public void operationFailed(Object ctx, PubSubException exception) {
+                            logger.error("Subscription manager failed to acquired topic " + topic.toStringUtf8(),
+                                    exception);
+                            cb.operationFailed(ctx, null);
+                        }
+
+                        @Override
+                        public void operationFinished(Object ctx, Void voidObj) {
+                            top2sub2seq.put(topic, resultOfOperation);
+                            logger.info("Subscription manager successfully acquired topic: " + topic.toStringUtf8());
+                            cb.operationFinished(ctx, null);
+                        }
+
+                    };
+
+                    // Notify listeners if necessary.
+                    if (localCount > 0) {
+                        notifySubscribe(topic, false, cb2, ctx);
+                    } else {
+                        cb2.operationFinished(ctx, null);
+                    }
+                }
+
+            }, ctx);
+
+        }
+
+    }
+
+    private void notifySubscribe(ByteString topic, boolean synchronous, final Callback<Void> cb, final Object ctx) {
+        Callback<Void> mcb = CallbackUtils.multiCallback(listeners.size(), cb, ctx);
+        for (SubscriptionEventListener listener : listeners) {
+            listener.onFirstLocalSubscribe(topic, synchronous, mcb);
+        }
+    }
+
+    /**
+     * Figure out who is subscribed. Do nothing if already acquired. If there's
+     * an error reading the subscribers' sequence IDs, then the topic is not
+     * acquired.
+     * 
+     * @param topic
+     * @param callback
+     * @param ctx
+     */
+    @Override
+    public void acquiredTopic(final ByteString topic, final Callback<Void> callback, Object ctx) {
+        queuer.pushAndMaybeRun(topic, new AcquireOp(topic, callback, ctx));
+    }
+
+    /**
+     * Remove the local mapping.
+     */
+    @Override
+    public void lostTopic(ByteString topic) {
+        top2sub2seq.remove(topic);
+        // Notify listeners if necessary.
+        if (topic2LocalCounts.remove(topic).get() > 0)
+            notifyUnsubcribe(topic);
+    }
+
+    private void notifyUnsubcribe(ByteString topic) {
+        for (SubscriptionEventListener listener : listeners)
+            listener.onLastLocalUnsubscribe(topic);
+    }
+
+    protected abstract void readSubscriptions(final ByteString topic,
+            final Callback<Map<ByteString, InMemorySubscriptionState>> cb, final Object ctx);
+
+    private class SubscribeOp extends TopicOpQueuer.AsynchronousOp<MessageSeqId> {
+        SubscribeRequest subRequest;
+        MessageSeqId consumeSeqId;
+
+        public SubscribeOp(ByteString topic, SubscribeRequest subRequest, MessageSeqId consumeSeqId,
+                Callback<MessageSeqId> callback, Object ctx) {
+            queuer.super(topic, callback, ctx);
+            this.subRequest = subRequest;
+            this.consumeSeqId = consumeSeqId;
+        }
+
+        @Override
+        public void run() {
+
+            final Map<ByteString, InMemorySubscriptionState> topicSubscriptions = top2sub2seq.get(topic);
+            if (topicSubscriptions == null) {
+                cb.operationFailed(ctx, new PubSubException.ServerNotResponsibleForTopicException(""));
+                return;
+            }
+
+            final ByteString subscriberId = subRequest.getSubscriberId();
+            InMemorySubscriptionState subscriptionState = topicSubscriptions.get(subscriberId);
+            CreateOrAttach createOrAttach = subRequest.getCreateOrAttach();
+
+            if (subscriptionState != null) {
+
+                if (createOrAttach.equals(CreateOrAttach.CREATE)) {
+                    String msg = "Topic: " + topic.toStringUtf8() + " subscriberId: " + subscriberId.toStringUtf8()
+                            + " requested creating a subscription but it is already subscribed with state: "
+                            + SubscriptionStateUtils.toString(subscriptionState.getSubscriptionState());
+                    logger.debug(msg);
+                    cb.operationFailed(ctx, new PubSubException.ClientAlreadySubscribedException(msg));
+                    return;
+                }
+
+                // otherwise just attach
+                if (logger.isDebugEnabled()) {
+                    logger.debug("Topic: " + topic.toStringUtf8() + " subscriberId: " + subscriberId.toStringUtf8()
+                            + " attaching to subscription with state: "
+                            + SubscriptionStateUtils.toString(subscriptionState.getSubscriptionState()));
+                }
+
+                cb.operationFinished(ctx, subscriptionState.getLastConsumeSeqId());
+                return;
+            }
+
+            // we don't have a mapping for this subscriber
+            if (createOrAttach.equals(CreateOrAttach.ATTACH)) {
+                String msg = "Topic: " + topic.toStringUtf8() + " subscriberId: " + subscriberId.toStringUtf8()
+                        + " requested attaching to an existing subscription but it is not subscribed";
+                logger.debug(msg);
+                cb.operationFailed(ctx, new PubSubException.ClientNotSubscribedException(msg));
+                return;
+            }
+
+            // now the hard case, this is a brand new subscription, must record
+            final SubscriptionState newState = SubscriptionState.newBuilder().setMsgId(consumeSeqId).build();
+            createSubscriptionState(topic, subscriberId, newState, new Callback<Void>() {
+                @Override
+                public void operationFailed(Object ctx, PubSubException exception) {
+                    cb.operationFailed(ctx, exception);
+                }
+
+                @Override
+                public void operationFinished(Object ctx, Void resultOfOperation) {
+                    Callback<Void> cb2 = new Callback<Void>() {
+
+                        @Override
+                        public void operationFailed(Object ctx, PubSubException exception) {
+                            logger.error("subscription for subscriber " + subscriberId.toStringUtf8() + " to topic "
+                                    + topic.toStringUtf8() + " failed due to failed listener callback", exception);
+                            cb.operationFailed(ctx, exception);
+                        }
+
+                        @Override
+                        public void operationFinished(Object ctx, Void resultOfOperation) {
+                            topicSubscriptions.put(subscriberId, new InMemorySubscriptionState(newState));
+                            cb.operationFinished(ctx, consumeSeqId);
+                        }
+
+                    };
+
+                    if (!SubscriptionStateUtils.isHubSubscriber(subRequest.getSubscriberId())
+                            && topic2LocalCounts.get(topic).incrementAndGet() == 1)
+                        notifySubscribe(topic, subRequest.getSynchronous(), cb2, ctx);
+                    else
+                        cb2.operationFinished(ctx, resultOfOperation);
+                }
+            }, ctx);
+        }
+    }
+
+    @Override
+    public void serveSubscribeRequest(ByteString topic, SubscribeRequest subRequest, MessageSeqId consumeSeqId,
+            Callback<MessageSeqId> callback, Object ctx) {
+        queuer.pushAndMaybeRun(topic, new SubscribeOp(topic, subRequest, consumeSeqId, callback, ctx));
+    }
+
+    private class ConsumeOp extends TopicOpQueuer.AsynchronousOp<Void> {
+        ByteString subscriberId;
+        MessageSeqId consumeSeqId;
+
+        public ConsumeOp(ByteString topic, ByteString subscriberId, MessageSeqId consumeSeqId, Callback<Void> callback,
+                Object ctx) {
+            queuer.super(topic, callback, ctx);
+            this.subscriberId = subscriberId;
+            this.consumeSeqId = consumeSeqId;
+        }
+
+        @Override
+        public void run() {
+            Map<ByteString, InMemorySubscriptionState> topicSubs = top2sub2seq.get(topic);
+            if (topicSubs == null) {
+                cb.operationFinished(ctx, null);
+                return;
+            }
+
+            InMemorySubscriptionState subState = topicSubs.get(subscriberId);
+            if (subState == null) {
+                cb.operationFinished(ctx, null);
+                return;
+            }
+
+            if (subState.setLastConsumeSeqId(consumeSeqId, cfg.getConsumeInterval())) {
+                updateSubscriptionState(topic, subscriberId, subState.getSubscriptionState(), cb, ctx);
+            } else {
+                if (logger.isDebugEnabled()) {
+                    logger.debug("Only advanced consume pointer in memory, will persist later, topic: "
+                            + topic.toStringUtf8() + " subscriberId: " + subscriberId.toStringUtf8()
+                            + " persistentState: " + SubscriptionStateUtils.toString(subState.getSubscriptionState())
+                            + " in-memory consume-id: "
+                            + MessageIdUtils.msgIdToReadableString(subState.getLastConsumeSeqId()));
+                }
+                cb.operationFinished(ctx, null);
+            }
+
+        }
+    }
+
+    @Override
+    public void setConsumeSeqIdForSubscriber(ByteString topic, ByteString subscriberId, MessageSeqId consumeSeqId,
+            Callback<Void> callback, Object ctx) {
+        queuer.pushAndMaybeRun(topic, new ConsumeOp(topic, subscriberId, consumeSeqId, callback, ctx));
+    }
+
+    private class UnsubscribeOp extends TopicOpQueuer.AsynchronousOp<Void> {
+        ByteString subscriberId;
+
+        public UnsubscribeOp(ByteString topic, ByteString subscriberId, Callback<Void> callback, Object ctx) {
+            queuer.super(topic, callback, ctx);
+            this.subscriberId = subscriberId;
+        }
+
+        @Override
+        public void run() {
+            final Map<ByteString, InMemorySubscriptionState> topicSubscriptions = top2sub2seq.get(topic);
+            if (topicSubscriptions == null) {
+                cb.operationFailed(ctx, new PubSubException.ServerNotResponsibleForTopicException(""));
+                return;
+            }
+
+            if (!topicSubscriptions.containsKey(subscriberId)) {
+                cb.operationFailed(ctx, new PubSubException.ClientNotSubscribedException(""));
+                return;
+            }
+
+            deleteSubscriptionState(topic, subscriberId, new Callback<Void>() {
+                @Override
+                public void operationFailed(Object ctx, PubSubException exception) {
+                    cb.operationFailed(ctx, exception);
+                }
+
+                @Override
+                public void operationFinished(Object ctx, Void resultOfOperation) {
+                    topicSubscriptions.remove(subscriberId);
+                    // Notify listeners if necessary.
+                    if (!SubscriptionStateUtils.isHubSubscriber(subscriberId)
+                            && topic2LocalCounts.get(topic).decrementAndGet() == 0)
+                        notifyUnsubcribe(topic);
+                    cb.operationFinished(ctx, null);
+                }
+            }, ctx);
+
+        }
+
+    }
+
+    @Override
+    public void unsubscribe(ByteString topic, ByteString subscriberId, Callback<Void> callback, Object ctx) {
+        queuer.pushAndMaybeRun(topic, new UnsubscribeOp(topic, subscriberId, callback, ctx));
+    }
+
+    /**
+     * Not thread-safe.
+     */
+    @Override
+    public void addListener(SubscriptionEventListener listener) {
+        listeners.add(listener);
+    }
+
+    /**
+     * Method to stop this class gracefully including releasing any resources
+     * used and stopping all threads spawned.
+     */
+    public void stop() {
+        timer.cancel();
+    }
+
+    protected abstract void createSubscriptionState(final ByteString topic, ByteString subscriberId,
+            SubscriptionState state, Callback<Void> callback, Object ctx);
+
+    protected abstract void updateSubscriptionState(ByteString topic, ByteString subscriberId, SubscriptionState state,
+            Callback<Void> callback, Object ctx);
+
+    protected abstract void deleteSubscriptionState(ByteString topic, ByteString subscriberId, Callback<Void> callback,
+            Object ctx);
+
+}

Added: hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/subscriptions/AllToAllTopologyFilter.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/subscriptions/AllToAllTopologyFilter.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/subscriptions/AllToAllTopologyFilter.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/subscriptions/AllToAllTopologyFilter.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server.subscriptions;
+
+import com.google.protobuf.ByteString;
+import org.apache.hedwig.protocol.PubSubProtocol.Message;
+
+public class AllToAllTopologyFilter implements MessageFilter {
+
+    ByteString subscriberRegion;
+
+    public AllToAllTopologyFilter(ByteString subscriberRegion) {
+        this.subscriberRegion = subscriberRegion;
+    }
+
+    public boolean testMessage(Message message) {
+        if (message.getSrcRegion().equals(subscriberRegion)) {
+            return false;
+        } else {
+            return true;
+        }
+    }
+
+}

Added: hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/subscriptions/InMemorySubscriptionManager.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/subscriptions/InMemorySubscriptionManager.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/subscriptions/InMemorySubscriptionManager.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/subscriptions/InMemorySubscriptionManager.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server.subscriptions;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledExecutorService;
+
+import com.google.protobuf.ByteString;
+import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionState;
+import org.apache.hedwig.server.common.ServerConfiguration;
+import org.apache.hedwig.server.persistence.PersistenceManager;
+import org.apache.hedwig.server.topics.TopicManager;
+import org.apache.hedwig.util.Callback;
+
+public class InMemorySubscriptionManager extends AbstractSubscriptionManager {
+
+    public InMemorySubscriptionManager(TopicManager tm, PersistenceManager pm, ServerConfiguration conf, ScheduledExecutorService scheduler) {
+        super(conf, tm, pm, scheduler);
+    }
+
+    @Override
+    protected void createSubscriptionState(ByteString topic, ByteString subscriberId, SubscriptionState state,
+            Callback<Void> callback, Object ctx) {
+        // nothing to do, in-memory info is already recorded by base class
+        callback.operationFinished(ctx, null);
+    }
+
+    @Override
+    protected void deleteSubscriptionState(ByteString topic, ByteString subscriberId, Callback<Void> callback,
+            Object ctx) {
+        // nothing to do, in-memory info is already deleted by base class
+        callback.operationFinished(ctx, null);
+    }
+
+    @Override
+    protected void updateSubscriptionState(ByteString topic, ByteString subscriberId, SubscriptionState state,
+            Callback<Void> callback, Object ctx) {
+        // nothing to do, in-memory info is already updated by base class
+        callback.operationFinished(ctx, null);
+    }
+
+    @Override
+    public void lostTopic(ByteString topic) {
+        // Intentionally do nothing, so that we dont lose in-memory information
+    }
+
+    @Override
+    protected void readSubscriptions(ByteString topic,
+            Callback<Map<ByteString, InMemorySubscriptionState>> cb, Object ctx) {
+        // Since we don't lose in-memory information on lostTopic, we can just
+        // return that back
+        Map<ByteString, InMemorySubscriptionState> topicSubs = top2sub2seq.get(topic);
+
+        if (topicSubs != null) {
+            cb.operationFinished(ctx, topicSubs);
+        } else {
+            cb.operationFinished(ctx, new ConcurrentHashMap<ByteString, InMemorySubscriptionState>());
+        }
+
+    }
+
+}

Added: hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/subscriptions/InMemorySubscriptionState.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/subscriptions/InMemorySubscriptionState.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/subscriptions/InMemorySubscriptionState.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/subscriptions/InMemorySubscriptionState.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server.subscriptions;
+
+import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId;
+import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionState;
+
+public class InMemorySubscriptionState {
+    SubscriptionState subscriptionState;
+    MessageSeqId lastConsumeSeqId;
+
+    public InMemorySubscriptionState(SubscriptionState subscriptionState, MessageSeqId lastConsumeSeqId) {
+        this.subscriptionState = subscriptionState;
+        this.lastConsumeSeqId = lastConsumeSeqId;
+    }
+
+    public InMemorySubscriptionState(SubscriptionState subscriptionState) {
+        this(subscriptionState, subscriptionState.getMsgId());
+    }
+
+    public SubscriptionState getSubscriptionState() {
+        return subscriptionState;
+    }
+
+    public MessageSeqId getLastConsumeSeqId() {
+        return lastConsumeSeqId;
+    }
+
+    /**
+     * 
+     * @param lastConsumeSeqId
+     * @param consumeInterval
+     *            The amount of laziness we want in persisting the consume
+     *            pointers
+     * @return true if the resulting structure needs to be persisted, false
+     *         otherwise
+     */
+    public boolean setLastConsumeSeqId(MessageSeqId lastConsumeSeqId, int consumeInterval) {
+        this.lastConsumeSeqId = lastConsumeSeqId;
+
+        if (lastConsumeSeqId.getLocalComponent() - subscriptionState.getMsgId().getLocalComponent() < consumeInterval) {
+            return false;
+        }
+
+        subscriptionState = SubscriptionState.newBuilder(subscriptionState).setMsgId(lastConsumeSeqId).build();
+        return true;
+    }
+
+}

Added: hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/subscriptions/MessageFilter.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/subscriptions/MessageFilter.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/subscriptions/MessageFilter.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/subscriptions/MessageFilter.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server.subscriptions;
+
+import org.apache.hedwig.protocol.PubSubProtocol.Message;
+
+public interface MessageFilter {
+
+    /**
+     * Tests whether a particular message passes the filter or not
+     * 
+     * @param message
+     * @return
+     */
+    public boolean testMessage(Message message);
+}

Added: hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/subscriptions/SubscriptionEventListener.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/subscriptions/SubscriptionEventListener.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/subscriptions/SubscriptionEventListener.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/subscriptions/SubscriptionEventListener.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server.subscriptions;
+
+import com.google.protobuf.ByteString;
+import org.apache.hedwig.util.Callback;
+
+/**
+ * For listening to events that are issued by a SubscriptionManager.
+ * 
+ */
+public interface SubscriptionEventListener {
+
+    /**
+     * Called by the subscription manager when it previously had zero local
+     * subscribers for a topic and is currently accepting its first local
+     * subscriber.
+     * 
+     * @param topic
+     *            The topic of interest.
+     * @param synchronous
+     *            Whether this request was actually initiated by a new local
+     *            subscriber, or whether it was an existing subscription
+     *            inherited by the hub (e.g. when recovering the state from ZK).
+     * @param cb
+     *            The subscription will not complete until success is called on
+     *            this callback. An error on cb will result in a subscription
+     *            error.
+     */
+    public void onFirstLocalSubscribe(ByteString topic, boolean synchronous, Callback<Void> cb);
+
+    /**
+     * Called by the SubscriptionManager when it previously had non-zero local
+     * subscribers for a topic and is currently dropping its last local
+     * subscriber. This is fully asynchronous so there is no callback.
+     * 
+     * @param topic
+     *            The topic of interest.
+     */
+    public void onLastLocalUnsubscribe(ByteString topic);
+    
+}

Added: hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/subscriptions/SubscriptionManager.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/subscriptions/SubscriptionManager.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/subscriptions/SubscriptionManager.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/subscriptions/SubscriptionManager.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server.subscriptions;
+
+import com.google.protobuf.ByteString;
+import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId;
+import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest;
+import org.apache.hedwig.util.Callback;
+
+/**
+ * All methods are thread-safe.
+ */
+public interface SubscriptionManager {
+
+    /**
+     * 
+     * Register a new subscription for the given subscriber for the given topic.
+     * This method should reliably persist the existence of the subscription in
+     * a way that it can't be lost. If the subscription already exists,
+     * depending on the create or attach flag in the subscribe request, an
+     * exception may be returned.
+     * 
+     * This is an asynchronous method.
+     * 
+     * @param topic
+     * @param subRequest
+     * @param consumeSeqId
+     *            The seqId to start serving the subscription from, if this is a
+     *            brand new subscription
+     * @param callback
+     *            The seq id returned by the callback is where serving should
+     *            start from
+     * @param ctx
+     */
+    public void serveSubscribeRequest(ByteString topic, SubscribeRequest subRequest, MessageSeqId consumeSeqId,
+            Callback<MessageSeqId> callback, Object ctx);
+
+    /**
+     * Set the consume position of a given subscriber on a given topic. Note
+     * that this method need not persist the consume position immediately but
+     * can be lazy and persist it later asynchronously, if that is more
+     * efficient.
+     * 
+     * @param topic
+     * @param subscriberId
+     * @param consumeSeqId
+     */
+    public void setConsumeSeqIdForSubscriber(ByteString topic, ByteString subscriberId, MessageSeqId consumeSeqId,
+            Callback<Void> callback, Object ctx);
+
+    /**
+     * Delete a particular subscription
+     * 
+     * @param topic
+     * @param subscriberId
+     */
+    public void unsubscribe(ByteString topic, ByteString subscriberId, Callback<Void> callback, Object ctx);
+
+    // Management API methods that we will fill in later
+    // /**
+    // * Get the ids of all subscribers for a given topic
+    // *
+    // * @param topic
+    // * @return A list of subscriber ids that are currently subscribed to the
+    // * given topic
+    // */
+    // public List<ByteString> getSubscriptionsForTopic(ByteString topic);
+    //
+    // /**
+    // * Get the topics to which a given subscriber is subscribed to
+    // *
+    // * @param subscriberId
+    // * @return A list of the topics to which the given subscriber is
+    // subscribed
+    // * to
+    // * @throws ServiceDownException
+    // * If there is an error in looking up the subscription
+    // * information
+    // */
+    // public List<ByteString> getTopicsForSubscriber(ByteString subscriberId)
+    // throws ServiceDownException;
+
+    /**
+     * Add a listener that is notified when topic-subscription pairs are added
+     * or removed.
+     */
+    public void addListener(SubscriptionEventListener listener);
+    
+}

Added: hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/subscriptions/TrueFilter.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/subscriptions/TrueFilter.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/subscriptions/TrueFilter.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/subscriptions/TrueFilter.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server.subscriptions;
+
+import org.apache.hedwig.protocol.PubSubProtocol.Message;
+
+public class TrueFilter implements MessageFilter {
+    protected static TrueFilter instance = new TrueFilter();
+
+    public static TrueFilter instance() {
+        return instance;
+    }
+
+    public boolean testMessage(Message message) {
+        return true;
+    }
+}

Added: hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/subscriptions/ZkSubscriptionManager.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/subscriptions/ZkSubscriptionManager.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/subscriptions/ZkSubscriptionManager.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/subscriptions/ZkSubscriptionManager.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,226 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server.subscriptions;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.KeeperException.Code;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.data.Stat;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.InvalidProtocolBufferException;
+import org.apache.hedwig.exceptions.PubSubException;
+import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionState;
+import org.apache.hedwig.protoextensions.SubscriptionStateUtils;
+import org.apache.hedwig.server.common.ServerConfiguration;
+import org.apache.hedwig.server.persistence.PersistenceManager;
+import org.apache.hedwig.server.topics.TopicManager;
+import org.apache.hedwig.util.Callback;
+import org.apache.hedwig.zookeeper.SafeAsyncZKCallback;
+import org.apache.hedwig.zookeeper.ZkUtils;
+
+public class ZkSubscriptionManager extends AbstractSubscriptionManager {
+
+    ZooKeeper zk;
+
+    protected final static Logger logger = Logger.getLogger(ZkSubscriptionManager.class);
+
+    public ZkSubscriptionManager(ZooKeeper zk, TopicManager topicMgr, PersistenceManager pm, ServerConfiguration cfg,
+            ScheduledExecutorService scheduler) {
+        super(cfg, topicMgr, pm, scheduler);
+        this.zk = zk;
+    }
+
+    private StringBuilder topicSubscribersPath(StringBuilder sb, ByteString topic) {
+        return cfg.getZkTopicPath(sb, topic).append("/subscribers");
+    }
+
+    private String topicSubscriberPath(ByteString topic, ByteString subscriber) {
+        return topicSubscribersPath(new StringBuilder(), topic).append("/").append(subscriber.toStringUtf8())
+                .toString();
+    }
+
+    @Override
+    protected void readSubscriptions(final ByteString topic,
+            final Callback<Map<ByteString, InMemorySubscriptionState>> cb, final Object ctx) {
+
+        String topicSubscribersPath = topicSubscribersPath(new StringBuilder(), topic).toString();
+        zk.getChildren(topicSubscribersPath, false, new SafeAsyncZKCallback.ChildrenCallback() {
+            @Override
+            public void safeProcessResult(int rc, String path, final Object ctx, final List<String> children) {
+
+                if (rc != Code.OK.intValue() && rc != Code.NONODE.intValue()) {
+                    KeeperException e = ZkUtils.logErrorAndCreateZKException("Could not read subscribers for topic "
+                            + topic.toStringUtf8(), path, rc);
+                    cb.operationFailed(ctx, new PubSubException.ServiceDownException(e));
+                    return;
+                }
+
+                final Map<ByteString, InMemorySubscriptionState> topicSubs = new ConcurrentHashMap<ByteString, InMemorySubscriptionState>();
+
+                if (rc == Code.NONODE.intValue() || children.size() == 0) {
+                    if (logger.isDebugEnabled()) {
+                        logger.debug("No subscriptions found while acquiring topic: " + topic.toStringUtf8());
+                    }
+                    cb.operationFinished(ctx, topicSubs);
+                    return;
+                }
+
+                final AtomicBoolean failed = new AtomicBoolean();
+                final AtomicInteger count = new AtomicInteger();
+
+                for (final String child : children) {
+
+                    final ByteString subscriberId = ByteString.copyFromUtf8(child);
+                    final String childPath = path + "/" + child;
+
+                    zk.getData(childPath, false, new SafeAsyncZKCallback.DataCallback() {
+                        @Override
+                        public void safeProcessResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
+
+                            if (rc != Code.OK.intValue()) {
+                                KeeperException e = ZkUtils.logErrorAndCreateZKException(
+                                        "Could not read subscription data for topic: " + topic.toStringUtf8()
+                                                + ", subscriberId: " + subscriberId.toStringUtf8(), path, rc);
+                                reportFailure(new PubSubException.ServiceDownException(e));
+                                return;
+                            }
+
+                            if (failed.get()) {
+                                return;
+                            }
+
+                            SubscriptionState state;
+
+                            try {
+                                state = SubscriptionState.parseFrom(data);
+                            } catch (InvalidProtocolBufferException ex) {
+                                String msg = "Failed to deserialize state for topic: " + topic.toStringUtf8()
+                                        + " subscriberId: " + subscriberId.toStringUtf8();
+                                logger.error(msg, ex);
+                                reportFailure(new PubSubException.UnexpectedConditionException(msg));
+                                return;
+                            }
+
+                            if (logger.isDebugEnabled()) {
+                                logger.debug("Found subscription while acquiring topic: " + topic.toStringUtf8()
+                                        + " subscriberId: " + child + "state: "
+                                        + SubscriptionStateUtils.toString(state));
+                            }
+
+                            topicSubs.put(subscriberId, new InMemorySubscriptionState(state));
+                            if (count.incrementAndGet() == children.size()) {
+                                assert topicSubs.size() == count.get();
+                                cb.operationFinished(ctx, topicSubs);
+                            }
+                        }
+
+                        private void reportFailure(PubSubException e) {
+                            if (failed.compareAndSet(false, true))
+                                cb.operationFailed(ctx, e);
+                        }
+                    }, ctx);
+                }
+            }
+        }, ctx);
+    }
+
+    @Override
+    protected void createSubscriptionState(final ByteString topic, final ByteString subscriberId,
+            final SubscriptionState state, final Callback<Void> callback, final Object ctx) {
+        ZkUtils.createFullPathOptimistic(zk, topicSubscriberPath(topic, subscriberId), state.toByteArray(),
+                Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, new SafeAsyncZKCallback.StringCallback() {
+
+                    @Override
+                    public void safeProcessResult(int rc, String path, Object ctx, String name) {
+                        if (rc == Code.OK.intValue()) {
+                            if (logger.isDebugEnabled()) {
+                                logger.debug("Successfully recorded subscription for topic: " + topic.toStringUtf8()
+                                        + " subscriberId: " + subscriberId.toStringUtf8() + " state: "
+                                        + SubscriptionStateUtils.toString(state));
+                            }
+                            callback.operationFinished(ctx, null);
+                        } else {
+                            KeeperException ke = ZkUtils.logErrorAndCreateZKException(
+                                    "Could not record new subscription for topic: " + topic.toStringUtf8()
+                                            + " subscriberId: " + subscriberId.toStringUtf8(), path, rc);
+                            callback.operationFailed(ctx, new PubSubException.ServiceDownException(ke));
+                        }
+                    }
+                }, ctx);
+    }
+
+    @Override
+    protected void updateSubscriptionState(final ByteString topic, final ByteString subscriberId,
+            final SubscriptionState state, final Callback<Void> callback, final Object ctx) {
+        zk.setData(topicSubscriberPath(topic, subscriberId), state.toByteArray(), -1,
+                new SafeAsyncZKCallback.StatCallback() {
+                    @Override
+                    public void safeProcessResult(int rc, String path, Object ctx, Stat stat) {
+                        if (rc != Code.OK.intValue()) {
+                            KeeperException e = ZkUtils.logErrorAndCreateZKException("Topic: " + topic.toStringUtf8()
+                                    + " subscriberId: " + subscriberId.toStringUtf8()
+                                    + " could not set subscription state: " + SubscriptionStateUtils.toString(state),
+                                    path, rc);
+                            callback.operationFailed(ctx, new PubSubException.ServiceDownException(e));
+                        } else {
+                            if (logger.isDebugEnabled()) {
+                                logger.debug("Successfully updated subscription for topic: " + topic.toStringUtf8()
+                                        + " subscriberId: " + subscriberId.toStringUtf8() + " state: "
+                                        + SubscriptionStateUtils.toString(state));
+                            }
+
+                            callback.operationFinished(ctx, null);
+                        }
+                    }
+                }, ctx);
+    }
+
+    @Override
+    protected void deleteSubscriptionState(final ByteString topic, final ByteString subscriberId,
+            final Callback<Void> callback, final Object ctx) {
+        zk.delete(topicSubscriberPath(topic, subscriberId), -1, new SafeAsyncZKCallback.VoidCallback() {
+            @Override
+            public void safeProcessResult(int rc, String path, Object ctx) {
+                if (rc == Code.OK.intValue()) {
+                    if (logger.isDebugEnabled()) {
+                        logger.debug("Successfully deleted subscription for topic: " + topic.toStringUtf8()
+                                + " subscriberId: " + subscriberId.toStringUtf8());
+                    }
+
+                    callback.operationFinished(ctx, null);
+                    return;
+                }
+
+                KeeperException e = ZkUtils.logErrorAndCreateZKException("Topic: " + topic.toStringUtf8()
+                        + " subscriberId: " + subscriberId.toStringUtf8() + " failed to delete subscription", path, rc);
+                callback.operationFailed(ctx, new PubSubException.ServiceDownException(e));
+            }
+        }, ctx);
+    }
+
+}

Added: hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/topics/AbstractTopicManager.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/topics/AbstractTopicManager.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/topics/AbstractTopicManager.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/topics/AbstractTopicManager.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,187 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server.topics;
+
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.log4j.Logger;
+
+import com.google.protobuf.ByteString;
+import org.apache.hedwig.exceptions.PubSubException;
+import org.apache.hedwig.server.common.ServerConfiguration;
+import org.apache.hedwig.server.common.TopicOpQueuer;
+import org.apache.hedwig.util.Callback;
+import org.apache.hedwig.util.CallbackUtils;
+import org.apache.hedwig.util.HedwigSocketAddress;
+
+public abstract class AbstractTopicManager implements TopicManager {
+    /**
+     * My name.
+     */
+    protected HedwigSocketAddress addr;
+
+    /**
+     * Topic change listeners.
+     */
+    protected ArrayList<TopicOwnershipChangeListener> listeners = new ArrayList<TopicOwnershipChangeListener>();
+
+    /**
+     * List of topics I believe I am responsible for.
+     */
+    protected Set<ByteString> topics = Collections.synchronizedSet(new HashSet<ByteString>());
+
+    protected TopicOpQueuer queuer;
+    protected ServerConfiguration cfg;
+    protected ScheduledExecutorService scheduler;
+
+    private static final Logger logger = Logger.getLogger(AbstractTopicManager.class);
+
+    private class GetOwnerOp extends TopicOpQueuer.AsynchronousOp<HedwigSocketAddress> {
+        public boolean shouldClaim;
+
+        public GetOwnerOp(final ByteString topic, boolean shouldClaim, 
+                final Callback<HedwigSocketAddress> cb, Object ctx) {
+            queuer.super(topic, cb, ctx);
+            this.shouldClaim = shouldClaim;
+        }
+
+        @Override
+        public void run() {
+            realGetOwner(topic, shouldClaim, cb, ctx);
+        }
+    }
+
+    private class ReleaseOp extends TopicOpQueuer.AsynchronousOp<Void> {
+        public ReleaseOp(ByteString topic, Callback<Void> cb, Object ctx) {
+            queuer.super(topic, cb, ctx);
+        }
+
+        @Override
+        public void run() {
+            if (!topics.contains(topic)) {
+                cb.operationFinished(ctx, null);
+                return;
+            }
+            realReleaseTopic(topic, cb, ctx);
+        }
+    }
+
+    public AbstractTopicManager(ServerConfiguration cfg, ScheduledExecutorService scheduler)
+            throws UnknownHostException {
+        this.cfg = cfg;
+        this.queuer = new TopicOpQueuer(scheduler);
+        this.scheduler = scheduler;
+        addr = cfg.getServerAddr();
+    }
+
+    @Override
+    public synchronized void addTopicOwnershipChangeListener(TopicOwnershipChangeListener listener) {
+        listeners.add(listener);
+    }
+
+    protected final synchronized void notifyListenersAndAddToOwnedTopics(final ByteString topic,
+            final Callback<HedwigSocketAddress> originalCallback, final Object originalContext) {
+
+        Callback<Void> postCb = new Callback<Void>() {
+
+            @Override
+            public void operationFinished(Object ctx, Void resultOfOperation) {
+                topics.add(topic);
+                if (cfg.getRetentionSecs() > 0) {
+                    scheduler.schedule(new Runnable() {
+                        @Override
+                        public void run() {
+                            // Enqueue a release operation. (Recall that release
+                            // doesn't "fail" even if the topic is missing.)
+                            releaseTopic(topic, new Callback<Void>() {
+
+                                @Override
+                                public void operationFailed(Object ctx, PubSubException exception) {
+                                    logger.error("failure that should never happen when periodically releasing topic "
+                                            + topic, exception);
+                                }
+
+                                @Override
+                                public void operationFinished(Object ctx, Void resultOfOperation) {
+                                    logger.debug("successful periodic release of topic " + topic);
+                                }
+
+                            }, null);
+                        }
+                    }, cfg.getRetentionSecs(), TimeUnit.SECONDS);
+                }
+                originalCallback.operationFinished(originalContext, addr);
+            }
+
+            @Override
+            public void operationFailed(Object ctx, PubSubException exception) {
+                // TODO: optimization: we can release this as soon as we experience the first error.
+                realReleaseTopic(topic, CallbackUtils.curry(originalCallback, addr), originalContext);
+                originalCallback.operationFailed(ctx, exception);
+            }
+        };
+
+        Callback<Void> mcb = CallbackUtils.multiCallback(listeners.size(), postCb, null);
+        for (TopicOwnershipChangeListener listener : listeners) {
+            listener.acquiredTopic(topic, mcb, null);
+        }
+    }
+
+    private void realReleaseTopic(ByteString topic, Callback<Void> callback, Object ctx) {
+        for (TopicOwnershipChangeListener listener : listeners)
+            listener.lostTopic(topic);
+        topics.remove(topic);
+        postReleaseCleanup(topic, callback, ctx);
+    }
+
+    @Override
+    public final void getOwner(ByteString topic, boolean shouldClaim,
+            Callback<HedwigSocketAddress> cb, Object ctx) {
+        queuer.pushAndMaybeRun(topic, new GetOwnerOp(topic, shouldClaim, cb, ctx));
+    }
+
+    @Override
+    public final void releaseTopic(ByteString topic, Callback<Void> cb, Object ctx) {
+        queuer.pushAndMaybeRun(topic, new ReleaseOp(topic, cb, ctx));
+    }
+
+    /**
+     * This method should "return" the owner of the topic if one has been chosen
+     * already. If there is no pre-chosen owner, either this hub or some other
+     * should be chosen based on the shouldClaim parameter. If its ends up
+     * choosing this hub as the owner, the {@code
+     * AbstractTopicManager#notifyListenersAndAddToOwnedTopics(ByteString,
+     * OperationCallback, Object)} method must be called.
+     * 
+     */
+    protected abstract void realGetOwner(ByteString topic, boolean shouldClaim,
+            Callback<HedwigSocketAddress> cb, Object ctx);
+
+    /**
+     * The method should do any cleanup necessary to indicate to other hubs that
+     * this topic has been released
+     */
+    protected abstract void postReleaseCleanup(ByteString topic, Callback<Void> cb, Object ctx);
+
+}

Added: hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/topics/TopicManager.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/topics/TopicManager.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/topics/TopicManager.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/topics/TopicManager.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server.topics;
+
+import com.google.protobuf.ByteString;
+import org.apache.hedwig.exceptions.PubSubException.ServiceDownException;
+import org.apache.hedwig.server.persistence.PersistenceManager;
+import org.apache.hedwig.util.Callback;
+import org.apache.hedwig.util.HedwigSocketAddress;
+
+/**
+ * An implementor of this interface is basically responsible for ensuring that
+ * there is at most a single host responsible for a given topic at a given time.
+ * Also, it is desirable that on a host failure, some other hosts in the cluster
+ * claim responsibilities for the topics that were at the failed host. On
+ * claiming responsibility for a topic, a host should call its
+ * {@link TopicOwnershipChangeListener}.
+ * 
+ */
+
+public interface TopicManager {
+    /**
+     * Get the name of the host responsible for the given topic.
+     * 
+     * @param topic
+     *            The topic whose owner to get.
+     * @param cb
+     *            Callback.
+     * @return The name of host responsible for the given topic
+     * @throws ServiceDownException
+     *             If there is an error looking up the information
+     */
+    public void getOwner(ByteString topic, boolean shouldClaim, 
+            Callback<HedwigSocketAddress> cb, Object ctx);
+
+    /**
+     * Whenever the topic manager finds out that the set of topics owned by this
+     * node has changed, it can notify a set of
+     * {@link TopicOwnershipChangeListener} objects. Any component of the system
+     * (e.g., the {@link PersistenceManager}) can listen for such changes by
+     * implementing the {@link TopicOwnershipChangeListener} interface and
+     * registering themselves with the {@link TopicManager} using this method.
+     * It is important that the {@link TopicOwnershipChangeListener} reacts
+     * immediately to such notifications, and with no blocking (because multiple
+     * listeners might need to be informed and they are all informed by the same
+     * thread).
+     * 
+     * @param listener
+     */
+    public void addTopicOwnershipChangeListener(TopicOwnershipChangeListener listener);
+
+    /**
+     * Give up ownership of a topic. If I don't own it, do nothing.
+     * 
+     * @throws ServiceDownException
+     *             If there is an error in claiming responsibility for the topic
+     */
+    public void releaseTopic(ByteString topic, Callback<Void> cb, Object ctx);
+
+}

Added: hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/topics/TopicOwnershipChangeListener.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/topics/TopicOwnershipChangeListener.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/topics/TopicOwnershipChangeListener.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/topics/TopicOwnershipChangeListener.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server.topics;
+
+import com.google.protobuf.ByteString;
+import org.apache.hedwig.util.Callback;
+
+public interface TopicOwnershipChangeListener {
+
+    public void acquiredTopic(ByteString topic, Callback<Void> callback, Object ctx);
+
+    public void lostTopic(ByteString topic);
+}

Added: hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/topics/TrivialOwnAllTopicManager.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/topics/TrivialOwnAllTopicManager.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/topics/TrivialOwnAllTopicManager.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/topics/TrivialOwnAllTopicManager.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server.topics;
+
+import java.net.UnknownHostException;
+import java.util.concurrent.ScheduledExecutorService;
+
+import com.google.protobuf.ByteString;
+import org.apache.hedwig.server.common.ServerConfiguration;
+import org.apache.hedwig.util.Callback;
+import org.apache.hedwig.util.HedwigSocketAddress;
+
+public class TrivialOwnAllTopicManager extends AbstractTopicManager {
+
+    public TrivialOwnAllTopicManager(ServerConfiguration cfg, ScheduledExecutorService scheduler)
+            throws UnknownHostException {
+        super(cfg, scheduler);
+    }
+
+    @Override
+    protected void realGetOwner(ByteString topic, boolean shouldClaim, 
+            Callback<HedwigSocketAddress> cb, Object ctx) {
+
+        if (topics.contains(topic)) {
+            cb.operationFinished(ctx, addr);
+            return;
+        }
+
+        notifyListenersAndAddToOwnedTopics(topic, cb, ctx);
+    }
+
+    @Override
+    protected void postReleaseCleanup(ByteString topic, Callback<Void> cb, Object ctx) {
+        // No cleanup to do
+        cb.operationFinished(ctx, null);
+    }
+}



Mime
View raw message