hadoop-zookeeper-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From maha...@apache.org
Subject svn commit: r987314 [15/16] - in /hadoop/zookeeper/trunk: ./ src/contrib/hedwig/ src/contrib/hedwig/client/ src/contrib/hedwig/client/src/ src/contrib/hedwig/client/src/main/ src/contrib/hedwig/client/src/main/cpp/ src/contrib/hedwig/client/src/main/cp...
Date Thu, 19 Aug 2010 21:25:22 GMT
Added: hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/test/java/org/apache/hedwig/server/handlers/TestBaseHandler.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/test/java/org/apache/hedwig/server/handlers/TestBaseHandler.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/test/java/org/apache/hedwig/server/handlers/TestBaseHandler.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/test/java/org/apache/hedwig/server/handlers/TestBaseHandler.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server.handlers;
+
+import java.util.List;
+import junit.framework.TestCase;
+
+import org.jboss.netty.channel.Channel;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.hedwig.protocol.PubSubProtocol.PubSubRequest;
+import org.apache.hedwig.protocol.PubSubProtocol.PubSubResponse;
+import org.apache.hedwig.protocol.PubSubProtocol.StatusCode;
+import org.apache.hedwig.server.common.ServerConfiguration;
+import org.apache.hedwig.server.netty.WriteRecordingChannel;
+import org.apache.hedwig.server.topics.StubTopicManager;
+import org.apache.hedwig.server.topics.TopicManager;
+
+public class TestBaseHandler extends TestCase {
+
+    MyBaseHandler handler;
+    StubTopicManager tm;
+    PubSubRequest request = PubSubRequest.getDefaultInstance();
+    WriteRecordingChannel channel = new WriteRecordingChannel();
+
+    protected class MyBaseHandler extends BaseHandler {
+
+        public MyBaseHandler(TopicManager tm, ServerConfiguration conf) {
+            super(tm, conf);
+        }
+
+        PubSubRequest request;
+
+        public PubSubRequest getRequest() {
+            return request;
+        }
+
+        @Override
+        public void handleRequestAtOwner(PubSubRequest request, Channel channel) {
+            this.request = request;
+        }
+
+    }
+
+    @Override
+    @Before
+    public void setUp() throws Exception {
+        ServerConfiguration conf = new ServerConfiguration();
+        tm = new StubTopicManager(conf);
+        handler = new MyBaseHandler(tm, conf);
+        request = PubSubRequest.getDefaultInstance();
+        channel = new WriteRecordingChannel();
+    }
+
+    public PubSubResponse getPubSubResponse(WriteRecordingChannel channel) {
+        List<Object> messages = channel.getMessagesWritten();
+        assertEquals(messages.size(), 1);
+
+        Object message = messages.get(0);
+        assertEquals(message.getClass(), PubSubResponse.class);
+
+        return (PubSubResponse) message;
+    }
+
+    @Test
+    public void testHandleRequestOnRedirect() throws Exception {
+        tm.setShouldOwnEveryNewTopic(false);
+        handler.handleRequest(request, channel);
+
+        PubSubResponse response = getPubSubResponse(channel);
+        assertEquals(response.getStatusCode(), StatusCode.NOT_RESPONSIBLE_FOR_TOPIC);
+        assertEquals(request.getTxnId(), response.getTxnId());
+        assertNull(handler.getRequest());
+
+    }
+
+    @Test
+    public void testHandleRequestOnOwner() throws Exception {
+
+        tm.setShouldOwnEveryNewTopic(true);
+        handler.handleRequest(request, channel);
+        assertEquals(0, channel.getMessagesWritten().size());
+        assertEquals(handler.getRequest(), request);
+
+    }
+
+    @Test
+    public void testHandleRequestOnError() throws Exception {
+
+        tm.setShouldError(true);
+        handler.handleRequest(request, channel);
+
+        PubSubResponse response = getPubSubResponse(channel);
+        assertEquals(response.getStatusCode(), StatusCode.SERVICE_DOWN);
+        assertEquals(request.getTxnId(), response.getTxnId());
+        assertNull(handler.getRequest());
+
+    }
+
+}

Added: hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/test/java/org/apache/hedwig/server/handlers/TestSubUnsubHandler.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/test/java/org/apache/hedwig/server/handlers/TestSubUnsubHandler.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/test/java/org/apache/hedwig/server/handlers/TestSubUnsubHandler.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/test/java/org/apache/hedwig/server/handlers/TestSubUnsubHandler.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server.handlers;
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import org.junit.Test;
+
+import com.google.protobuf.ByteString;
+import org.apache.hedwig.StubCallback;
+import org.apache.hedwig.client.data.TopicSubscriber;
+import org.apache.hedwig.exceptions.PubSubException;
+import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId;
+import org.apache.hedwig.protocol.PubSubProtocol.OperationType;
+import org.apache.hedwig.protocol.PubSubProtocol.ProtocolVersion;
+import org.apache.hedwig.protocol.PubSubProtocol.PubSubRequest;
+import org.apache.hedwig.protocol.PubSubProtocol.PubSubResponse;
+import org.apache.hedwig.protocol.PubSubProtocol.StatusCode;
+import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest;
+import org.apache.hedwig.protocol.PubSubProtocol.UnsubscribeRequest;
+import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach;
+import org.apache.hedwig.server.common.ServerConfiguration;
+import org.apache.hedwig.server.delivery.ChannelEndPoint;
+import org.apache.hedwig.server.delivery.StubDeliveryManager;
+import org.apache.hedwig.server.delivery.StubDeliveryManager.StartServingRequest;
+import org.apache.hedwig.server.netty.WriteRecordingChannel;
+import org.apache.hedwig.server.persistence.LocalDBPersistenceManager;
+import org.apache.hedwig.server.persistence.PersistenceManager;
+import org.apache.hedwig.server.subscriptions.StubSubscriptionManager;
+import org.apache.hedwig.server.subscriptions.TrueFilter;
+import org.apache.hedwig.server.topics.TopicManager;
+import org.apache.hedwig.server.topics.TrivialOwnAllTopicManager;
+import org.apache.hedwig.util.ConcurrencyUtils;
+
+import junit.framework.TestCase;
+
+public class TestSubUnsubHandler extends TestCase {
+
+    SubscribeHandler sh;
+    StubDeliveryManager dm;
+    StubSubscriptionManager sm;
+    ByteString topic = ByteString.copyFromUtf8("topic");
+    WriteRecordingChannel channel;
+
+    SubscribeRequest subRequestPrototype;
+    PubSubRequest pubSubRequestPrototype;
+    ByteString subscriberId;
+    UnsubscribeHandler ush;
+
+    @Override
+    protected void setUp() throws Exception {
+        super.setUp();
+
+        ServerConfiguration conf = new ServerConfiguration();
+        ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
+
+        TopicManager tm = new TrivialOwnAllTopicManager(conf, executor);
+        dm = new StubDeliveryManager();
+        PersistenceManager pm = LocalDBPersistenceManager.instance();
+        sm = new StubSubscriptionManager(tm, pm, conf, executor);
+        sh = new SubscribeHandler(tm, dm, pm, sm, conf);
+        channel = new WriteRecordingChannel();
+
+        subscriberId = ByteString.copyFromUtf8("subId");
+
+        subRequestPrototype = SubscribeRequest.newBuilder().setSubscriberId(subscriberId).build();
+        pubSubRequestPrototype = PubSubRequest.newBuilder().setProtocolVersion(ProtocolVersion.VERSION_ONE).setType(
+                OperationType.SUBSCRIBE).setTxnId(0).setTopic(topic).setSubscribeRequest(subRequestPrototype).build();
+
+        ush = new UnsubscribeHandler(tm, conf, sm, dm);
+    }
+
+    @Test
+    public void testNoSubscribeRequest() {
+        sh.handleRequestAtOwner(PubSubRequest.newBuilder(pubSubRequestPrototype).clearSubscribeRequest().build(),
+                channel);
+        assertEquals(StatusCode.MALFORMED_REQUEST, ((PubSubResponse) channel.getMessagesWritten().get(0))
+                .getStatusCode());
+    }
+
+    @Test
+    public void testSuccessCase() {
+        StubCallback<Void> callback = new StubCallback<Void>();
+        sm.acquiredTopic(topic, callback, null);
+        assertNull(ConcurrencyUtils.take(callback.queue).right());
+
+        sh.handleRequestAtOwner(pubSubRequestPrototype, channel);
+        assertEquals(StatusCode.SUCCESS, ((PubSubResponse) channel.getMessagesWritten().get(0)).getStatusCode());
+
+        // make sure the channel was put in the maps
+        assertEquals(new TopicSubscriber(topic, subscriberId), sh.channel2sub.get(channel));
+        assertEquals(channel, sh.sub2Channel.get(new TopicSubscriber(topic, subscriberId)));
+
+        // make sure delivery was started
+        StartServingRequest startRequest = (StartServingRequest) dm.lastRequest.poll();
+        assertEquals(channel, ((ChannelEndPoint) startRequest.endPoint).getChannel());
+        assertEquals(false, startRequest.isHubSubscriber);
+        assertEquals(TrueFilter.class, startRequest.filter.getClass());
+        assertEquals(1, startRequest.seqIdToStartFrom.getLocalComponent());
+        assertEquals(subscriberId, startRequest.subscriberId);
+        assertEquals(topic, startRequest.topic);
+
+        // make sure subscription was registered
+        StubCallback<MessageSeqId> callback1 = new StubCallback<MessageSeqId>();
+        sm.serveSubscribeRequest(topic, SubscribeRequest.newBuilder(subRequestPrototype).setCreateOrAttach(
+                CreateOrAttach.CREATE).build(), MessageSeqId.newBuilder().setLocalComponent(10).build(), callback1,
+                null);
+
+        assertEquals(PubSubException.ClientAlreadySubscribedException.class, ConcurrencyUtils.take(callback1.queue)
+                .right().getClass());
+
+        // trying to subscribe again should throw an error
+        WriteRecordingChannel dupChannel = new WriteRecordingChannel();
+        sh.handleRequestAtOwner(pubSubRequestPrototype, dupChannel);
+        assertEquals(StatusCode.TOPIC_BUSY, ((PubSubResponse) dupChannel.getMessagesWritten().get(0)).getStatusCode());
+
+        // after disconnecting the channel, subscribe should work again
+        sh.channelDisconnected(channel);
+
+        dupChannel = new WriteRecordingChannel();
+        sh.handleRequestAtOwner(pubSubRequestPrototype, dupChannel);
+        assertEquals(StatusCode.SUCCESS, ((PubSubResponse) dupChannel.getMessagesWritten().get(0)).getStatusCode());
+
+        // test unsubscribe
+        channel = new WriteRecordingChannel();
+        ush.handleRequestAtOwner(pubSubRequestPrototype, channel);
+        assertEquals(StatusCode.MALFORMED_REQUEST, ((PubSubResponse) channel.getMessagesWritten().get(0))
+                .getStatusCode());
+
+        PubSubRequest unsubRequest = PubSubRequest.newBuilder(pubSubRequestPrototype).setUnsubscribeRequest(
+                UnsubscribeRequest.newBuilder().setSubscriberId(subscriberId)).build();
+        channel = new WriteRecordingChannel();
+        dm.lastRequest.clear();
+
+        ush.handleRequestAtOwner(unsubRequest, channel);
+        assertEquals(StatusCode.SUCCESS, ((PubSubResponse) channel.getMessagesWritten().get(0)).getStatusCode());
+
+        // make sure delivery has been stopped
+        assertEquals(new TopicSubscriber(topic, subscriberId), dm.lastRequest.poll());
+
+        // make sure the info is gone from the sm
+        StubCallback<MessageSeqId> callback2 = new StubCallback<MessageSeqId>();
+        sm.serveSubscribeRequest(topic, SubscribeRequest.newBuilder(subRequestPrototype).setCreateOrAttach(
+                CreateOrAttach.ATTACH).build(), MessageSeqId.newBuilder().setLocalComponent(10).build(), callback2,
+                null);
+        assertEquals(PubSubException.ClientNotSubscribedException.class, ConcurrencyUtils.take(callback2.queue).right()
+                .getClass());
+
+    }
+
+}

Added: hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/test/java/org/apache/hedwig/server/integration/TestHedwigHub.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/test/java/org/apache/hedwig/server/integration/TestHedwigHub.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/test/java/org/apache/hedwig/server/integration/TestHedwigHub.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/test/java/org/apache/hedwig/server/integration/TestHedwigHub.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,692 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server.integration;
+
+import java.net.InetSocketAddress;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.concurrent.SynchronousQueue;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import com.google.protobuf.ByteString;
+import org.apache.hedwig.client.api.MessageHandler;
+import org.apache.hedwig.client.api.Subscriber;
+import org.apache.hedwig.client.conf.ClientConfiguration;
+import org.apache.hedwig.client.exceptions.InvalidSubscriberIdException;
+import org.apache.hedwig.client.netty.HedwigClient;
+import org.apache.hedwig.client.netty.HedwigPublisher;
+import org.apache.hedwig.client.netty.HedwigSubscriber;
+import org.apache.hedwig.exceptions.PubSubException;
+import org.apache.hedwig.exceptions.PubSubException.ClientNotSubscribedException;
+import org.apache.hedwig.protocol.PubSubProtocol.Message;
+import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId;
+import org.apache.hedwig.protocol.PubSubProtocol.OperationType;
+import org.apache.hedwig.protocol.PubSubProtocol.ProtocolVersion;
+import org.apache.hedwig.protocol.PubSubProtocol.PubSubRequest;
+import org.apache.hedwig.protocol.PubSubProtocol.PubSubResponse;
+import org.apache.hedwig.protocol.PubSubProtocol.StartDeliveryRequest;
+import org.apache.hedwig.protocol.PubSubProtocol.StatusCode;
+import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach;
+import org.apache.hedwig.protoextensions.SubscriptionStateUtils;
+import org.apache.hedwig.server.HedwigHubTestBase;
+import org.apache.hedwig.server.netty.WriteRecordingChannel;
+import org.apache.hedwig.server.proxy.HedwigProxy;
+import org.apache.hedwig.server.proxy.ProxyConfiguration;
+import org.apache.hedwig.server.regions.HedwigHubClient;
+import org.apache.hedwig.util.Callback;
+import org.apache.hedwig.util.ConcurrencyUtils;
+
+@RunWith(Parameterized.class)
+public class TestHedwigHub extends HedwigHubTestBase {
+
+    // Client side variables
+    protected HedwigClient client;
+    protected HedwigPublisher publisher;
+    protected HedwigSubscriber subscriber;
+
+    // Common ByteStrings used in tests.
+    private final ByteString localSubscriberId = ByteString.copyFromUtf8("LocalSubscriber");
+    private final ByteString hubSubscriberId = ByteString.copyFromUtf8(SubscriptionStateUtils.HUB_SUBSCRIBER_PREFIX
+            + "HubSubcriber");
+
+    enum Mode {
+        REGULAR, PROXY, SSL
+    };
+
+    @Parameters
+    public static Collection<Object[]> configs() {
+        return Arrays.asList(new Object[][] { { Mode.PROXY }, { Mode.REGULAR }, { Mode.SSL }});
+    }
+
+    protected Mode mode;
+
+    public TestHedwigHub(Mode mode) {
+        this.mode = mode;
+    }
+
+    protected HedwigProxy proxy;
+    protected ProxyConfiguration proxyConf = new ProxyConfiguration();
+
+    // SynchronousQueues to verify async calls
+    private final SynchronousQueue<Boolean> queue = new SynchronousQueue<Boolean>();
+    private final SynchronousQueue<Boolean> consumeQueue = new SynchronousQueue<Boolean>();
+
+    // Test implementation of Callback for async client actions.
+    static class TestCallback implements Callback<Void> {
+        private final SynchronousQueue<Boolean> queue;
+
+        public TestCallback(SynchronousQueue<Boolean> queue) {
+            this.queue = queue;
+        }
+
+        @Override
+        public void operationFinished(Object ctx, Void resultOfOperation) {
+            new Thread(new Runnable() {
+                @Override
+                public void run() {
+                    if (logger.isDebugEnabled())
+                        logger.debug("Operation finished!");
+                    ConcurrencyUtils.put(queue, true);
+                }
+            }).start();
+        }
+
+        @Override
+        public void operationFailed(Object ctx, final PubSubException exception) {
+            new Thread(new Runnable() {
+                @Override
+                public void run() {
+                    logger.error("Operation failed!", exception);
+                    ConcurrencyUtils.put(queue, false);
+                }
+            }).start();
+        }
+    }
+
+    // Test implementation of subscriber's message handler.
+    static class TestMessageHandler implements MessageHandler {
+        // For subscribe reconnect testing, the server could send us back
+        // messages we've already processed and consumed. We need to keep
+        // track of the ones we've encountered so we only signal back to the
+        // consumeQueue once.
+        private HashSet<MessageSeqId> consumedMessages = new HashSet<MessageSeqId>();
+        private long largestMsgSeqIdConsumed = -1;
+        private final SynchronousQueue<Boolean> consumeQueue;
+
+        public TestMessageHandler(SynchronousQueue<Boolean> consumeQueue) {
+            this.consumeQueue = consumeQueue;
+        }
+
+        public void consume(ByteString topic, ByteString subscriberId, final Message msg, Callback<Void> callback,
+                Object context) {
+            if (!consumedMessages.contains(msg.getMsgId())) {
+                // New message to consume. Add it to the Set of consumed
+                // messages.
+                consumedMessages.add(msg.getMsgId());
+                // Check that the msg seq ID is incrementing by 1 compared to
+                // the last consumed message. Don't do this check if this is the
+                // initial message being consumed.
+                if (largestMsgSeqIdConsumed >= 0 && msg.getMsgId().getLocalComponent() != largestMsgSeqIdConsumed + 1) {
+                    new Thread(new Runnable() {
+                        @Override
+                        public void run() {
+                            if (logger.isDebugEnabled())
+                                logger.debug("Consuming message that is out of order for msgId: "
+                                        + msg.getMsgId().getLocalComponent());
+                            ConcurrencyUtils.put(consumeQueue, false);
+                        }
+                    }).start();
+                } else {
+                    new Thread(new Runnable() {
+                        @Override
+                        public void run() {
+                            if (logger.isDebugEnabled())
+                                logger.debug("Consume operation finished successfully!");
+                            ConcurrencyUtils.put(consumeQueue, true);
+                        }
+                    }).start();
+                }
+                // Store the consumed message as the new last msg id consumed.
+                largestMsgSeqIdConsumed = msg.getMsgId().getLocalComponent();
+            } else {
+                if (logger.isDebugEnabled())
+                    logger.debug("Consumed a message that we've processed already: " + msg);
+            }
+            callback.operationFinished(context, null);
+        }
+    }
+
+    class TestClientConfiguration extends ClientConfiguration {
+        @Override
+        public InetSocketAddress getDefaultServerHost() {
+            if (mode == Mode.PROXY) {
+                return new InetSocketAddress(proxyConf.getProxyPort());
+            } else {
+                return super.getDefaultServerHost();
+            }
+        }
+
+        @Override
+        public boolean isSSLEnabled() {
+            if (mode == Mode.SSL)
+                return true;
+            else
+                return false;
+        }
+    }
+
+    // ClientConfiguration to use for this test.
+    protected ClientConfiguration getClientConfiguration() {
+        return new TestClientConfiguration();
+    }
+
+    @Override
+    @Before
+    public void setUp() throws Exception {
+        numServers = 3;
+        super.setUp();
+        if (mode == Mode.PROXY) {
+            proxy = new HedwigProxy(proxyConf);
+        }
+        client = new HedwigClient(getClientConfiguration());
+        publisher = client.getPublisher();
+        subscriber = client.getSubscriber();
+    }
+
+    @Override
+    @After
+    public void tearDown() throws Exception {
+        client.stop();
+        if (mode == Mode.PROXY) {
+            proxy.shutdown();
+        }
+        super.tearDown();
+
+    }
+
+    // Helper function to generate Messages
+    protected Message getMsg(int msgNum) {
+        return Message.newBuilder().setBody(ByteString.copyFromUtf8("Message" + msgNum)).build();
+    }
+
+    // Helper function to generate Topics
+    protected ByteString getTopic(int topicNum) {
+        return ByteString.copyFromUtf8("Topic" + topicNum);
+    }
+
+    protected void startDelivery(ByteString topic, ByteString subscriberId, MessageHandler handler) throws Exception {
+        startDelivery(subscriber, topic, subscriberId, handler);
+    }
+
+    protected void startDelivery(Subscriber subscriber, ByteString topic, ByteString subscriberId,
+            MessageHandler handler) throws Exception {
+        subscriber.startDelivery(topic, subscriberId, handler);
+        if (mode == Mode.PROXY) {
+            WriteRecordingChannel channel = new WriteRecordingChannel();
+            PubSubRequest request = PubSubRequest.newBuilder().setProtocolVersion(ProtocolVersion.VERSION_ONE)
+                    .setTopic(topic).setTxnId(0).setType(OperationType.START_DELIVERY).setStartDeliveryRequest(
+                            StartDeliveryRequest.newBuilder().setSubscriberId(subscriberId)).build();
+            proxy.getStartDeliveryHandler().handleRequest(request, channel);
+            assertEquals(StatusCode.SUCCESS, ((PubSubResponse) channel.getMessagesWritten().get(0)).getStatusCode());
+        }
+    }
+
+    protected void publishFirstBatch(int batchSize, boolean messagesToBeConsumed) throws Exception {
+        if (logger.isDebugEnabled())
+            logger.debug("Publishing first batch of messages.");
+        for (int i = 0; i < batchSize; i++) {
+            publisher.asyncPublish(getTopic(i), getMsg(i), new TestCallback(queue), null);
+            assertTrue(queue.take());
+            if (messagesToBeConsumed)
+                assertTrue(consumeQueue.take());
+        }
+    }
+
+    protected void publishSecondBatch(int batchSize, boolean messagesToBeConsumed) throws Exception {
+        if (logger.isDebugEnabled())
+            logger.debug("Publishing second batch of messages.");
+        for (int i = 0; i < batchSize; i++) {
+            publisher.asyncPublish(getTopic(i), getMsg(i + batchSize), new TestCallback(queue), null);
+            assertTrue(queue.take());
+            if (messagesToBeConsumed)
+                assertTrue(consumeQueue.take());
+        }
+    }
+
+    protected void subscribeToTopics(int batchSize) throws Exception {
+        if (logger.isDebugEnabled())
+            logger.debug("Subscribing to topics and starting delivery.");
+        for (int i = 0; i < batchSize; i++) {
+            subscriber.asyncSubscribe(getTopic(i), localSubscriberId, CreateOrAttach.CREATE_OR_ATTACH,
+                    new TestCallback(queue), null);
+            assertTrue(queue.take());
+        }
+
+        // Start delivery for the subscriber
+        for (int i = 0; i < batchSize; i++) {
+            startDelivery(getTopic(i), localSubscriberId, new TestMessageHandler(consumeQueue));
+        }
+    }
+
+    protected void shutDownLastServer() {
+        if (logger.isDebugEnabled())
+            logger.debug("Shutting down the last server in the Hedwig hub cluster.");
+        serversList.get(serversList.size() - 1).shutdown();
+        // Due to a possible race condition, after we've shutdown the server,
+        // the client could still be caching the channel connection to that
+        // server. It is possible for a publish request to go to the shutdown
+        // server using the closed/shutdown channel before the channel
+        // disconnect logic kicks in. What could happen is that the publish
+        // is done successfully on the channel but the server on the other end
+        // can't/won't read it. This publish request will time out and the
+        // Junit test will fail. Since that particular scenario is not what is
+        // tested here, use a workaround of sleeping in this thread (so the
+        // channel disconnect logic can complete) before we publish again.
+        try {
+            Thread.sleep(1000);
+        } catch (InterruptedException e) {
+            logger.error("Thread was interrupted while sleeping after shutting down last server!", e);
+        }
+    }
+
+    // This tests out the manual sending of consume messages to the server
+    // instead of relying on the automatic sending by the client lib for it.
+    @Test
+    public void testManualConsumeClient() throws Exception {
+        HedwigClient myClient = new HedwigClient(new TestClientConfiguration() {
+            @Override
+            public boolean isAutoSendConsumeMessageEnabled() {
+                return false;
+            }
+
+        });
+        HedwigSubscriber mySubscriber = myClient.getSubscriber();
+        HedwigPublisher myPublisher = myClient.getPublisher();
+        ByteString myTopic = getTopic(0);
+        // Subscribe to a topic and start delivery on it
+        mySubscriber.asyncSubscribe(myTopic, localSubscriberId, CreateOrAttach.CREATE_OR_ATTACH,
+                new TestCallback(queue), null);
+        assertTrue(queue.take());
+        startDelivery(mySubscriber, myTopic, localSubscriberId, new TestMessageHandler(consumeQueue));
+        // Publish some messages
+        int batchSize = 10;
+        for (int i = 0; i < batchSize; i++) {
+            myPublisher.asyncPublish(myTopic, getMsg(i), new TestCallback(queue), null);
+            assertTrue(queue.take());
+            assertTrue(consumeQueue.take());
+        }
+        // Now manually send a consume message for each message received
+        for (int i = 0; i < batchSize; i++) {
+            boolean success = true;
+            try {
+                mySubscriber.consume(myTopic, localSubscriberId, MessageSeqId.newBuilder().setLocalComponent(i + 1)
+                        .build());
+            } catch (ClientNotSubscribedException e) {
+                success = false;
+            }
+            assertTrue(success);
+        }
+        // Since the consume call eventually does an async write to the Netty
+        // channel, the writing of the consume requests may not have completed
+        // yet before we stop the client. Sleep a little before we stop the
+        // client just so error messages are not logged.
+        try {
+            Thread.sleep(1000);
+        } catch (InterruptedException e) {
+            logger.error("Thread was interrupted while waiting to stop client for manual consume test!!", e);
+        }
+        myClient.stop();
+    }
+
+    @Test
+    public void testAttachToSubscriptionSuccess() throws Exception {
+        ByteString topic = getTopic(0);
+        subscriber.asyncSubscribe(topic, localSubscriberId, CreateOrAttach.CREATE_OR_ATTACH, new TestCallback(queue),
+                null);
+        assertTrue(queue.take());
+        // Close the subscription asynchronously
+        subscriber.asyncCloseSubscription(topic, localSubscriberId, new TestCallback(queue), null);
+        assertTrue(queue.take());
+        // Now try to attach to the subscription
+        subscriber.asyncSubscribe(topic, localSubscriberId, CreateOrAttach.ATTACH, new TestCallback(queue), null);
+        assertTrue(queue.take());
+        // Start delivery and publish some messages. Make sure they are consumed
+        // correctly.
+        startDelivery(topic, localSubscriberId, new TestMessageHandler(consumeQueue));
+        int batchSize = 5;
+        for (int i = 0; i < batchSize; i++) {
+            publisher.asyncPublish(topic, getMsg(i), new TestCallback(queue), null);
+            assertTrue(queue.take());
+            assertTrue(consumeQueue.take());
+        }
+    }
+
+    @Test
+    public void testServerRedirect() throws Exception {
+        int batchSize = 10;
+        publishFirstBatch(batchSize, false);
+    }
+
+    @Test
+    public void testSubscribeAndConsume() throws Exception {
+        int batchSize = 10;
+        subscribeToTopics(batchSize);
+        publishFirstBatch(batchSize, true);
+    }
+
+    @Test
+    public void testServerFailoverPublishOnly() throws Exception {
+        int batchSize = 10;
+        publishFirstBatch(batchSize, false);
+        shutDownLastServer();
+        publishSecondBatch(batchSize, false);
+    }
+
+    @Test
+    public void testServerFailover() throws Exception {
+        int batchSize = 10;
+        subscribeToTopics(batchSize);
+        publishFirstBatch(batchSize, true);
+        shutDownLastServer();
+        publishSecondBatch(batchSize, true);
+    }
+
+    @Test
+    public void testUnsubscribe() throws Exception {
+        ByteString topic = getTopic(0);
+        subscriber.asyncSubscribe(topic, localSubscriberId, CreateOrAttach.CREATE_OR_ATTACH, new TestCallback(queue),
+                null);
+        assertTrue(queue.take());
+        startDelivery(topic, localSubscriberId, new TestMessageHandler(consumeQueue));
+        publisher.asyncPublish(topic, getMsg(0), new TestCallback(queue), null);
+        assertTrue(queue.take());
+        assertTrue(consumeQueue.take());
+        // Send an Unsubscribe request
+        subscriber.asyncUnsubscribe(topic, localSubscriberId, new TestCallback(queue), null);
+        assertTrue(queue.take());
+        // Now publish a message and make sure it is not consumed by the client
+        publisher.asyncPublish(topic, getMsg(1), new TestCallback(queue), null);
+        assertTrue(queue.take());
+        // Wait a little bit just in case the message handler is still active,
+        // consuming the message, and then putting a true value in the
+        // consumeQueue.
+        Thread.sleep(1000);
+        // Put a False value on the consumeQueue so we can verify that it
+        // is not blocked by a message consume action which already put a True
+        // value into the queue.
+        new Thread(new Runnable() {
+            @Override
+            public void run() {
+                ConcurrencyUtils.put(consumeQueue, false);
+            }
+        }).start();
+        assertFalse(consumeQueue.take());
+    }
+
+    @Test
+    public void testSyncUnsubscribeWithoutSubscription() throws Exception {
+        boolean unsubscribeSuccess = false;
+        try {
+            subscriber.unsubscribe(getTopic(0), localSubscriberId);
+        } catch (ClientNotSubscribedException e) {
+            unsubscribeSuccess = true;
+        } catch (Exception ex) {
+            unsubscribeSuccess = false;
+        }
+        assertTrue(unsubscribeSuccess);
+    }
+
+    @Test
+    public void testAsyncUnsubscribeWithoutSubscription() throws Exception {
+        subscriber.asyncUnsubscribe(getTopic(0), localSubscriberId, new TestCallback(queue), null);
+        assertFalse(queue.take());
+    }
+
+    @Test
+    public void testCloseSubscription() throws Exception {
+        ByteString topic = getTopic(0);
+        subscriber.asyncSubscribe(topic, localSubscriberId, CreateOrAttach.CREATE_OR_ATTACH, new TestCallback(queue),
+                null);
+        assertTrue(queue.take());
+        startDelivery(topic, localSubscriberId, new TestMessageHandler(consumeQueue));
+        publisher.asyncPublish(topic, getMsg(0), new TestCallback(queue), null);
+        assertTrue(queue.take());
+        assertTrue(consumeQueue.take());
+        // Close the subscription asynchronously
+        subscriber.asyncCloseSubscription(topic, localSubscriberId, new TestCallback(queue), null);
+        assertTrue(queue.take());
+        // Now publish a message and make sure it is not consumed by the client
+        publisher.asyncPublish(topic, getMsg(1), new TestCallback(queue), null);
+        assertTrue(queue.take());
+        // Wait a little bit just in case the message handler is still active,
+        // consuming the message, and then putting a true value in the
+        // consumeQueue.
+        Thread.sleep(1000);
+        // Put a False value on the consumeQueue so we can verify that it
+        // is not blocked by a message consume action which already put a True
+        // value into the queue.
+        new Thread(new Runnable() {
+            @Override
+            public void run() {
+                ConcurrencyUtils.put(consumeQueue, false);
+            }
+        }).start();
+        assertFalse(consumeQueue.take());
+    }
+
+    @Test
+    public void testStopDelivery() throws Exception {
+        ByteString topic = getTopic(0);
+        subscriber.asyncSubscribe(topic, localSubscriberId, CreateOrAttach.CREATE_OR_ATTACH, new TestCallback(queue),
+                null);
+        assertTrue(queue.take());
+        startDelivery(topic, localSubscriberId, new TestMessageHandler(consumeQueue));
+        publisher.asyncPublish(topic, getMsg(0), new TestCallback(queue), null);
+        assertTrue(queue.take());
+        assertTrue(consumeQueue.take());
+        // Stop the delivery for this subscription
+        subscriber.stopDelivery(topic, localSubscriberId);
+        // Publish some more messages so they are queued up to be delivered to
+        // the client
+        int batchSize = 10;
+        for (int i = 0; i < batchSize; i++) {
+            publisher.asyncPublish(topic, getMsg(i + 1), new TestCallback(queue), null);
+            assertTrue(queue.take());
+        }
+        // Wait a little bit just in case the message handler is still active,
+        // consuming the message, and then putting a true value in the
+        // consumeQueue.
+        Thread.sleep(1000);
+        // Put a False value on the consumeQueue so we can verify that it
+        // is not blocked by a message consume action which already put a True
+        // value into the queue.
+        new Thread(new Runnable() {
+            @Override
+            public void run() {
+                ConcurrencyUtils.put(consumeQueue, false);
+            }
+        }).start();
+        assertFalse(consumeQueue.take());
+        // Now start delivery again and verify that the queued up messages are
+        // consumed
+        startDelivery(topic, localSubscriberId, new TestMessageHandler(consumeQueue));
+        for (int i = 0; i < batchSize; i++) {
+            assertTrue(consumeQueue.take());
+        }
+    }
+
+    @Test
+    public void testConsumedMessagesInOrder() throws Exception {
+        ByteString topic = getTopic(0);
+        subscriber.asyncSubscribe(topic, localSubscriberId, CreateOrAttach.CREATE_OR_ATTACH, new TestCallback(queue),
+                null);
+        assertTrue(queue.take());
+        startDelivery(topic, localSubscriberId, new TestMessageHandler(consumeQueue));
+        // Now publish some messages and verify that they are delivered in order
+        // to the subscriber
+        int batchSize = 100;
+        for (int i = 0; i < batchSize; i++) {
+            publisher.asyncPublish(topic, getMsg(i), new TestCallback(queue), null);
+        }
+        // We've sent out all of the publish messages asynchronously,
+        // now verify that they are consumed in the correct order.
+        for (int i = 0; i < batchSize; i++) {
+            assertTrue(queue.take());
+            assertTrue(consumeQueue.take());
+        }
+    }
+
+    @Test
+    public void testCreateSubscriptionFailure() throws Exception {
+        ByteString topic = getTopic(0);
+        subscriber.asyncSubscribe(topic, localSubscriberId, CreateOrAttach.CREATE_OR_ATTACH, new TestCallback(queue),
+                null);
+        assertTrue(queue.take());
+        // Close the subscription asynchronously
+        subscriber.asyncCloseSubscription(topic, localSubscriberId, new TestCallback(queue), null);
+        assertTrue(queue.take());
+        // Now try to create the subscription when it already exists
+        subscriber.asyncSubscribe(topic, localSubscriberId, CreateOrAttach.CREATE, new TestCallback(queue), null);
+        assertFalse(queue.take());
+    }
+
+    @Test
+    public void testCreateSubscriptionSuccess() throws Exception {
+        ByteString topic = getTopic(0);
+        subscriber.asyncSubscribe(topic, localSubscriberId, CreateOrAttach.CREATE, new TestCallback(queue), null);
+        assertTrue(queue.take());
+        startDelivery(topic, localSubscriberId, new TestMessageHandler(consumeQueue));
+        int batchSize = 5;
+        for (int i = 0; i < batchSize; i++) {
+            publisher.asyncPublish(topic, getMsg(i), new TestCallback(queue), null);
+            assertTrue(queue.take());
+            assertTrue(consumeQueue.take());
+        }
+    }
+
+    @Test
+    public void testAttachToSubscriptionFailure() throws Exception {
+        ByteString topic = getTopic(0);
+        subscriber.asyncSubscribe(topic, localSubscriberId, CreateOrAttach.ATTACH, new TestCallback(queue), null);
+        assertFalse(queue.take());
+    }
+
+    // The following 4 tests are to make sure that the subscriberId validation
+    // works when it is a local subscriber and we're expecting the subscriberId
+    // to be in the "local" specific format.
+    @Test
+    public void testSyncSubscribeWithInvalidSubscriberId() throws Exception {
+        boolean subscribeSuccess = false;
+        try {
+            subscriber.subscribe(getTopic(0), hubSubscriberId, CreateOrAttach.CREATE_OR_ATTACH);
+        } catch (InvalidSubscriberIdException e) {
+            subscribeSuccess = true;
+        } catch (Exception ex) {
+            subscribeSuccess = false;
+        }
+        assertTrue(subscribeSuccess);
+    }
+
+    @Test
+    public void testAsyncSubscribeWithInvalidSubscriberId() throws Exception {
+        subscriber.asyncSubscribe(getTopic(0), hubSubscriberId, CreateOrAttach.CREATE_OR_ATTACH,
+                new TestCallback(queue), null);
+        assertFalse(queue.take());
+    }
+
+    @Test
+    public void testSyncUnsubscribeWithInvalidSubscriberId() throws Exception {
+        boolean unsubscribeSuccess = false;
+        try {
+            subscriber.unsubscribe(getTopic(0), hubSubscriberId);
+        } catch (InvalidSubscriberIdException e) {
+            unsubscribeSuccess = true;
+        } catch (Exception ex) {
+            unsubscribeSuccess = false;
+        }
+        assertTrue(unsubscribeSuccess);
+    }
+
+    @Test
+    public void testAsyncUnsubscribeWithInvalidSubscriberId() throws Exception {
+        subscriber.asyncUnsubscribe(getTopic(0), hubSubscriberId, new TestCallback(queue), null);
+        assertFalse(queue.take());
+    }
+
+    // The following 4 tests are to make sure that the subscriberId validation
+    // also works when it is a hub subscriber and we're expecting the
+    // subscriberId to be in the "hub" specific format.
+    @Test
+    public void testSyncHubSubscribeWithInvalidSubscriberId() throws Exception {
+        HedwigClient hubClient = new HedwigHubClient(new ClientConfiguration());
+        HedwigSubscriber hubSubscriber = hubClient.getSubscriber();
+        boolean subscribeSuccess = false;
+        try {
+            hubSubscriber.subscribe(getTopic(0), localSubscriberId, CreateOrAttach.CREATE_OR_ATTACH);
+        } catch (InvalidSubscriberIdException e) {
+            subscribeSuccess = true;
+        } catch (Exception ex) {
+            subscribeSuccess = false;
+        }
+        assertTrue(subscribeSuccess);
+        hubClient.stop();
+    }
+
+    @Test
+    public void testAsyncHubSubscribeWithInvalidSubscriberId() throws Exception {
+        HedwigClient hubClient = new HedwigHubClient(new ClientConfiguration());
+        HedwigSubscriber hubSubscriber = hubClient.getSubscriber();
+        hubSubscriber.asyncSubscribe(getTopic(0), localSubscriberId, CreateOrAttach.CREATE_OR_ATTACH, new TestCallback(
+                queue), null);
+        assertFalse(queue.take());
+        hubClient.stop();
+    }
+
+    @Test
+    public void testSyncHubUnsubscribeWithInvalidSubscriberId() throws Exception {
+        HedwigClient hubClient = new HedwigHubClient(new ClientConfiguration());
+        HedwigSubscriber hubSubscriber = hubClient.getSubscriber();
+        boolean unsubscribeSuccess = false;
+        try {
+            hubSubscriber.unsubscribe(getTopic(0), localSubscriberId);
+        } catch (InvalidSubscriberIdException e) {
+            unsubscribeSuccess = true;
+        } catch (Exception ex) {
+            unsubscribeSuccess = false;
+        }
+        assertTrue(unsubscribeSuccess);
+        hubClient.stop();
+    }
+
+    @Test
+    public void testAsyncHubUnsubscribeWithInvalidSubscriberId() throws Exception {
+        HedwigClient hubClient = new HedwigHubClient(new ClientConfiguration());
+        HedwigSubscriber hubSubscriber = hubClient.getSubscriber();
+        hubSubscriber.asyncUnsubscribe(getTopic(0), localSubscriberId, new TestCallback(queue), null);
+        assertFalse(queue.take());
+        hubClient.stop();
+    }
+
+}

Added: hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/test/java/org/apache/hedwig/server/integration/TestHedwigRegion.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/test/java/org/apache/hedwig/server/integration/TestHedwigRegion.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/test/java/org/apache/hedwig/server/integration/TestHedwigRegion.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/test/java/org/apache/hedwig/server/integration/TestHedwigRegion.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server.integration;
+
+import java.util.concurrent.SynchronousQueue;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.protobuf.ByteString;
+import org.apache.hedwig.client.netty.HedwigClient;
+import org.apache.hedwig.client.netty.HedwigPublisher;
+import org.apache.hedwig.protocol.PubSubProtocol.Message;
+import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach;
+import org.apache.hedwig.server.HedwigRegionTestBase;
+import org.apache.hedwig.server.integration.TestHedwigHub.TestCallback;
+import org.apache.hedwig.server.integration.TestHedwigHub.TestMessageHandler;
+
+public class TestHedwigRegion extends HedwigRegionTestBase {
+
+    // SynchronousQueues to verify async calls
+    private final SynchronousQueue<Boolean> queue = new SynchronousQueue<Boolean>();
+    private final SynchronousQueue<Boolean> consumeQueue = new SynchronousQueue<Boolean>();
+
+    @Override
+    @Before
+    public void setUp() throws Exception {
+        numRegions = 3;
+        numServersPerRegion = 4;
+        super.setUp();
+    }
+
+    @Override
+    @After
+    public void tearDown() throws Exception {
+        super.tearDown();
+    }
+
+    @Test
+    public void testMultiRegionSubscribeAndConsume() throws Exception {
+        int batchSize = 10;
+        // Subscribe to topics for clients in all regions
+        for (HedwigClient client : regionClientsMap.values()) {
+            for (int i = 0; i < batchSize; i++) {
+                client.getSubscriber().asyncSubscribe(ByteString.copyFromUtf8("Topic" + i),
+                        ByteString.copyFromUtf8("LocalSubscriber"), CreateOrAttach.CREATE_OR_ATTACH,
+                        new TestCallback(queue), null);
+                assertTrue(queue.take());
+            }
+        }
+
+        // Start delivery for the local subscribers in all regions
+        for (HedwigClient client : regionClientsMap.values()) {
+            for (int i = 0; i < batchSize; i++) {
+                client.getSubscriber().startDelivery(ByteString.copyFromUtf8("Topic" + i),
+                        ByteString.copyFromUtf8("LocalSubscriber"), new TestMessageHandler(consumeQueue));
+            }
+        }
+
+        // Now start publishing messages for the subscribed topics in one of the
+        // regions and verify that it gets delivered and consumed in all of the
+        // other ones.
+        HedwigPublisher publisher = regionClientsMap.values().iterator().next().getPublisher();
+        for (int i = 0; i < batchSize; i++) {
+            publisher.asyncPublish(ByteString.copyFromUtf8("Topic" + i), Message.newBuilder().setBody(
+                    ByteString.copyFromUtf8("Message" + i)).build(), new TestCallback(queue), null);
+            assertTrue(queue.take());
+        }
+        // Make sure each region consumes the same set of published messages.
+        for (int i = 0; i < regionClientsMap.size(); i++) {
+            for (int j = 0; j < batchSize; j++) {
+                assertTrue(consumeQueue.take());
+            }
+        }       
+    }
+
+}

Added: hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/test/java/org/apache/hedwig/server/netty/TestPubSubServer.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/test/java/org/apache/hedwig/server/netty/TestPubSubServer.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/test/java/org/apache/hedwig/server/netty/TestPubSubServer.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/test/java/org/apache/hedwig/server/netty/TestPubSubServer.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,260 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server.netty;
+
+import java.io.IOException;
+import java.lang.Thread.UncaughtExceptionHandler;
+import java.net.InetSocketAddress;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.SynchronousQueue;
+
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+import org.junit.Test;
+
+import com.google.protobuf.ByteString;
+import org.apache.hedwig.client.conf.ClientConfiguration;
+import org.apache.hedwig.client.netty.HedwigClient;
+import org.apache.hedwig.client.netty.HedwigPublisher;
+import org.apache.hedwig.exceptions.PubSubException;
+import org.apache.hedwig.protocol.PubSubProtocol.Message;
+import org.apache.hedwig.server.PubSubServerStandAloneTestBase;
+import org.apache.hedwig.server.common.ServerConfiguration;
+import org.apache.hedwig.server.topics.AbstractTopicManager;
+import org.apache.hedwig.server.topics.TopicManager;
+import org.apache.hedwig.util.Callback;
+import org.apache.hedwig.util.HedwigSocketAddress;
+import org.apache.hedwig.zookeeper.SafeAsyncZKCallback;
+
+public class TestPubSubServer extends PubSubServerStandAloneTestBase {
+
+    @Test
+    public void testSecondServer() throws Exception {
+        PubSubServer server1 = new PubSubServer(new StandAloneServerConfiguration() {
+            @Override
+            public int getServerPort() {
+                return super.getServerPort() + 1;
+            }
+        });
+        server1.shutdown();
+    }
+
+    class RecordingUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler {
+        SynchronousQueue<Throwable> queue;
+
+        public RecordingUncaughtExceptionHandler(SynchronousQueue<Throwable> queue) {
+            this.queue = queue;
+        }
+
+        @Override
+        public void uncaughtException(Thread t, Throwable e) {
+            queue.add(e);
+        }
+
+    }
+
+    private interface TopicManagerInstantiator {
+        public TopicManager instantiateTopicManager() throws IOException;
+    }
+
+    PubSubServer startServer(final UncaughtExceptionHandler uncaughtExceptionHandler, final int port,
+            final TopicManagerInstantiator instantiator) throws Exception {
+        PubSubServer server = new PubSubServer(new StandAloneServerConfiguration() {
+            @Override
+            public int getServerPort() {
+                return port;
+            }
+
+        }, uncaughtExceptionHandler) {
+
+            @Override
+            protected TopicManager instantiateTopicManager() throws IOException {
+                return instantiator.instantiateTopicManager();
+            }
+        };
+
+        return server;
+
+    }
+
+    public void runPublishRequest(final int port) throws Exception {
+        HedwigPublisher publisher = new HedwigClient(new ClientConfiguration() {
+            @Override
+            public InetSocketAddress getDefaultServerHost() {
+                return new InetSocketAddress("localhost", port);
+            }
+        }).getPublisher();
+
+        publisher.asyncPublish(ByteString.copyFromUtf8("blah"), Message.newBuilder().setBody(
+                ByteString.copyFromUtf8("blah")).build(), new Callback<Void>() {
+            @Override
+            public void operationFailed(Object ctx, PubSubException exception) {
+                assertTrue(false);
+            }
+
+            @Override
+            public void operationFinished(Object ctx, Void resultOfOperation) {
+                assertTrue(false);
+            }
+
+        }, null);
+    }
+
+    @Test
+    public void testUncaughtExceptionInNettyThread() throws Exception {
+
+        SynchronousQueue<Throwable> queue = new SynchronousQueue<Throwable>();
+        RecordingUncaughtExceptionHandler uncaughtExceptionHandler = new RecordingUncaughtExceptionHandler(queue);
+        final int port = 9876;
+
+        PubSubServer server = startServer(uncaughtExceptionHandler, port, new TopicManagerInstantiator() {
+
+            @Override
+            public TopicManager instantiateTopicManager() throws IOException {
+                return new AbstractTopicManager(new ServerConfiguration(), Executors.newSingleThreadScheduledExecutor()) {
+                    @Override
+                    protected void realGetOwner(ByteString topic, boolean shouldClaim, 
+                            Callback<HedwigSocketAddress> cb, Object ctx) {
+                        throw new RuntimeException("this exception should be uncaught");
+                    }
+
+                    @Override
+                    protected void postReleaseCleanup(ByteString topic, Callback<Void> cb, Object ctx) {
+                    }
+                };
+            }
+        });
+
+        runPublishRequest(port);
+        assertEquals(RuntimeException.class, queue.take().getClass());
+        server.shutdown();
+    }
+
+    @Test
+    public void testUncaughtExceptionInZKThread() throws Exception {
+
+        SynchronousQueue<Throwable> queue = new SynchronousQueue<Throwable>();
+        RecordingUncaughtExceptionHandler uncaughtExceptionHandler = new RecordingUncaughtExceptionHandler(queue);
+        final int port = 9876;
+        final String hostPort = "127.0.0.1:33221";
+
+        PubSubServer server = startServer(uncaughtExceptionHandler, port, new TopicManagerInstantiator() {
+
+            @Override
+            public TopicManager instantiateTopicManager() throws IOException {
+                return new AbstractTopicManager(new ServerConfiguration(), Executors.newSingleThreadScheduledExecutor()) {
+
+                    @Override
+                    protected void realGetOwner(ByteString topic, boolean shouldClaim, 
+                            Callback<HedwigSocketAddress> cb, Object ctx) {
+                        ZooKeeper zookeeper;
+                        try {
+                            zookeeper = new ZooKeeper(hostPort, 60000, new Watcher() {
+                                @Override
+                                public void process(WatchedEvent event) {
+                                    // TODO Auto-generated method stub
+
+                                }
+                            });
+                        } catch (IOException e) {
+                            throw new RuntimeException(e);
+                        }
+
+                        zookeeper.getData("/fake", false, new SafeAsyncZKCallback.DataCallback() {
+                            @Override
+                            public void safeProcessResult(int rc, String path, Object ctx, byte[] data,
+                                    org.apache.zookeeper.data.Stat stat) {
+                                throw new RuntimeException("This should go to the uncaught exception handler");
+                            }
+
+                        }, null);
+                    }
+
+                    @Override
+                    protected void postReleaseCleanup(ByteString topic, Callback<Void> cb, Object ctx) {
+                    }
+                };
+            }
+        });
+
+        runPublishRequest(port);
+        assertEquals(RuntimeException.class, queue.take().getClass());
+        server.shutdown();
+    }
+
+    @Test
+    public void testInvalidServerConfiguration() throws Exception {
+        boolean success = false;
+        ServerConfiguration conf = new ServerConfiguration() {
+            @Override
+            public boolean isInterRegionSSLEnabled() {
+                return conf.getBoolean(INTER_REGION_SSL_ENABLED, true);
+            }
+
+            @Override
+            public List<String> getRegions() {
+                List<String> regionsList = new LinkedList<String>();
+                regionsList.add("regionHost1:4080:9876"); 
+                regionsList.add("regionHost2:4080"); 
+                regionsList.add("regionHost3:4080:9876");
+                return regionsList;
+            }
+        };
+        try {
+            conf.validate();
+        }
+        catch (ConfigurationException e) {
+            logger.error("Invalid configuration: ", e);
+            success = true;
+        }
+        assertTrue(success);
+    }    
+
+    @Test
+    public void testValidServerConfiguration() throws Exception {
+        boolean success = true;
+        ServerConfiguration conf = new ServerConfiguration() {
+            @Override
+            public boolean isInterRegionSSLEnabled() {
+                return conf.getBoolean(INTER_REGION_SSL_ENABLED, true);
+            }
+
+            @Override
+            public List<String> getRegions() {
+                List<String> regionsList = new LinkedList<String>();
+                regionsList.add("regionHost1:4080:9876"); 
+                regionsList.add("regionHost2:4080:2938"); 
+                regionsList.add("regionHost3:4080:9876");
+                return regionsList;
+            }
+        };
+        try {
+            conf.validate();
+        }
+        catch (ConfigurationException e) {
+            logger.error("Invalid configuration: ", e);
+            success = false;
+        }
+        assertTrue(success);
+    }    
+
+}

Added: hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/test/java/org/apache/hedwig/server/netty/WriteRecordingChannel.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/test/java/org/apache/hedwig/server/netty/WriteRecordingChannel.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/test/java/org/apache/hedwig/server/netty/WriteRecordingChannel.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/test/java/org/apache/hedwig/server/netty/WriteRecordingChannel.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,170 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server.netty;
+
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelConfig;
+import org.jboss.netty.channel.ChannelFactory;
+import org.jboss.netty.channel.ChannelFuture;
+import org.jboss.netty.channel.ChannelPipeline;
+import org.jboss.netty.channel.DefaultChannelFuture;
+import org.jboss.netty.channel.SucceededChannelFuture;
+
+public class WriteRecordingChannel implements Channel {
+
+    public boolean closed = false;
+    ChannelFuture closingFuture = new DefaultChannelFuture(this, false);
+    List<Object> messagesWritten = new LinkedList<Object>();
+
+    public List<Object> getMessagesWritten() {
+        return messagesWritten;
+    }
+
+    public void clearMessages() {
+        messagesWritten.clear();
+    }
+
+    @Override
+    public ChannelFuture bind(SocketAddress localAddress) {
+        throw new RuntimeException("Not intended");
+    }
+
+    @Override
+    public ChannelFuture close() {
+        closed = true;
+        closingFuture.setSuccess();
+        return new SucceededChannelFuture(this);
+    }
+
+    @Override
+    public ChannelFuture connect(SocketAddress remoteAddress) {
+        throw new RuntimeException("Not intended");
+    }
+
+    @Override
+    public ChannelFuture disconnect() {
+        return close();
+    }
+
+    @Override
+    public ChannelFuture getCloseFuture() {
+        return closingFuture;
+    }
+
+    @Override
+    public ChannelConfig getConfig() {
+        throw new RuntimeException("Not intended");
+    }
+
+    @Override
+    public ChannelFactory getFactory() {
+        throw new RuntimeException("Not intended");
+    }
+
+    @Override
+    public Integer getId() {
+        throw new RuntimeException("Not intended");
+    }
+
+    @Override
+    public int getInterestOps() {
+        throw new RuntimeException("Not intended");
+    }
+
+    @Override
+    public SocketAddress getLocalAddress() {
+        return new InetSocketAddress("localhost", 1234);
+    }
+
+    @Override
+    public Channel getParent() {
+        throw new RuntimeException("Not intended");
+    }
+
+    @Override
+    public ChannelPipeline getPipeline() {
+        throw new RuntimeException("Not intended");
+    }
+
+    @Override
+    public SocketAddress getRemoteAddress() {
+        return new InetSocketAddress("www.yahoo.com", 80);
+    }
+
+    @Override
+    public boolean isBound() {
+        throw new RuntimeException("Not intended");
+    }
+
+    @Override
+    public boolean isConnected() {
+        return closed == false;
+    }
+
+    @Override
+    public boolean isOpen() {
+        throw new RuntimeException("Not intended");
+    }
+
+    @Override
+    public boolean isReadable() {
+        throw new RuntimeException("Not intended");
+    }
+
+    @Override
+    public boolean isWritable() {
+        throw new RuntimeException("Not intended");
+    }
+
+    @Override
+    public ChannelFuture setInterestOps(int interestOps) {
+        throw new RuntimeException("Not intended");
+    }
+
+    @Override
+    public ChannelFuture setReadable(boolean readable) {
+        throw new RuntimeException("Not intended");
+    }
+
+    @Override
+    public ChannelFuture unbind() {
+        throw new RuntimeException("Not intended");
+    }
+
+    @Override
+    public ChannelFuture write(Object message) {
+        messagesWritten.add(message);
+        return new SucceededChannelFuture(this);
+    }
+
+    @Override
+    public ChannelFuture write(Object message, SocketAddress remoteAddress) {
+        throw new RuntimeException("Not intended");
+    }
+
+    @Override
+    public int compareTo(Channel o) {
+        throw new RuntimeException("Not intended");
+    }
+
+}

Added: hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/test/java/org/apache/hedwig/server/persistence/BookKeeperTestBase.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/test/java/org/apache/hedwig/server/persistence/BookKeeperTestBase.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/test/java/org/apache/hedwig/server/persistence/BookKeeperTestBase.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/test/java/org/apache/hedwig/server/persistence/BookKeeperTestBase.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server.persistence;
+
+import java.io.File;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.proto.BookieServer;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.hedwig.util.FileUtils;
+import org.apache.hedwig.zookeeper.ZooKeeperTestBase;
+
+/**
+ * This is a base class for any tests that require a BookKeeper client/server
+ * setup.
+ * 
+ */
+public class BookKeeperTestBase extends ZooKeeperTestBase {
+
+    // BookKeeper Server variables
+    private List<BookieServer> bookiesList;
+    private int initialPort = 5000;
+
+    // String constants used for creating the bookie server files.
+    private static final String PREFIX = "bookie";
+    private static final String SUFFIX = "test";
+
+    // Variable to decide how many bookie servers to set up.
+    private final int numBookies;
+    // BookKeeper client instance
+    protected BookKeeper bk;
+
+    // Constructor
+    public BookKeeperTestBase(int numBookies) {
+        this.numBookies = numBookies;
+    }
+
+    public BookKeeperTestBase() {
+        // By default, use 3 bookies.
+        this(3);
+    }
+
+    // Getter for the ZooKeeper client instance that the parent class sets up.
+    protected ZooKeeper getZooKeeperClient() {
+        return zk;
+    }
+
+    // Give junit a fake test so that its happy
+    @Test
+    public void testNothing() throws Exception {
+
+    }
+
+    @Override
+    @Before
+    public void setUp() throws Exception {
+        super.setUp();
+        // Initialize the zk client with values
+        try {
+            zk.create("/ledgers", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+            zk.create("/ledgers/available", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+        } catch (KeeperException e) {
+            e.printStackTrace();
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+        }
+
+        // Create Bookie Servers
+        bookiesList = new LinkedList<BookieServer>();
+
+        for (int i = 0; i < numBookies; i++) {
+            File tmpDir = FileUtils.createTempDirectory(PREFIX + i, SUFFIX);
+            BookieServer bs = new BookieServer(initialPort + i, hostPort, tmpDir, new File[] { tmpDir });
+            bs.start();
+            bookiesList.add(bs);
+        }
+
+        // Create the BookKeeper client
+        bk = new BookKeeper(hostPort);
+    }
+
+    public String getZkHostPort() {
+        return hostPort;
+    }
+
+    @Override
+    @After
+    public void tearDown() throws Exception {
+        // Shutdown all of the bookie servers
+        try {
+            for (BookieServer bs : bookiesList) {
+                bs.shutdown();
+            }
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+        }
+        // Close the BookKeeper client
+        bk.halt();
+        super.tearDown();
+    }
+
+}

Added: hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/test/java/org/apache/hedwig/server/persistence/StubPersistenceManager.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/test/java/org/apache/hedwig/server/persistence/StubPersistenceManager.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/test/java/org/apache/hedwig/server/persistence/StubPersistenceManager.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/test/java/org/apache/hedwig/server/persistence/StubPersistenceManager.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server.persistence;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import com.google.protobuf.ByteString;
+import org.apache.hedwig.exceptions.PubSubException.ServiceDownException;
+import org.apache.hedwig.protocol.PubSubProtocol.Message;
+import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId;
+import org.apache.hedwig.protoextensions.MessageIdUtils;
+import org.apache.hedwig.server.persistence.ScanCallback.ReasonForFinish;
+
+public class StubPersistenceManager implements PersistenceManagerWithRangeScan {
+    Map<ByteString, List<Message>> messages = new HashMap<ByteString, List<Message>>();
+    boolean failure = false;
+    ServiceDownException exception = new ServiceDownException("Asked to fail");
+
+    public void deliveredUntil(ByteString topic, Long seqId) {
+        // noop
+    }
+
+    public void consumedUntil(ByteString topic, Long seqId) {
+        // noop
+    }
+
+    protected static class ArrayListMessageFactory implements Factory<List<Message>> {
+        static ArrayListMessageFactory instance = new ArrayListMessageFactory();
+
+        public List<Message> newInstance() {
+            return new ArrayList<Message>();
+        }
+    }
+
+    public MessageSeqId getCurrentSeqIdForTopic(ByteString topic) {
+        long seqId = MapMethods.getAfterInsertingIfAbsent(messages, topic, ArrayListMessageFactory.instance).size();
+        return MessageSeqId.newBuilder().setLocalComponent(seqId).build();
+    }
+
+    public long getSeqIdAfterSkipping(ByteString topic, long seqId, int skipAmount) {
+        return seqId + skipAmount;
+    }
+
+    public void persistMessage(PersistRequest request) {
+        if (failure) {
+            request.callback.operationFailed(request.getCtx(), exception);
+            return;
+        }
+
+        MapMethods.addToMultiMap(messages, request.getTopic(), request.getMessage(), ArrayListMessageFactory.instance);
+        request.callback.operationFinished(request.getCtx(), (long) messages.get(request.getTopic()).size());
+    }
+
+    public void scanSingleMessage(ScanRequest request) {
+        if (failure) {
+            request.getCallback().scanFailed(request.getCtx(), exception);
+            return;
+        }
+
+        request.getCallback().messageScanned(request.getCtx(),
+                messages.get(request.getTopic()).get((int) request.getStartSeqId()));
+
+    }
+
+    public void scanMessages(RangeScanRequest request) {
+        if (failure) {
+            request.getCallback().scanFailed(request.getCtx(), exception);
+            return;
+        }
+
+        long totalSize = 0;
+        long startSeqId = request.getStartSeqId();
+        for (int i = 0; i < request.getMessageLimit(); i++) {
+            List<Message> messageList = MapMethods.getAfterInsertingIfAbsent(messages, request.getTopic(),
+                    ArrayListMessageFactory.instance);
+            if (startSeqId + i > messageList.size()) {
+                request.getCallback().scanFinished(request.getCtx(), ReasonForFinish.NO_MORE_MESSAGES);
+                return;
+            }
+            Message msg = messageList.get((int) startSeqId + i - 1);
+            Message toDeliver = MessageIdUtils.mergeLocalSeqId(msg, startSeqId + i);
+            request.getCallback().messageScanned(request.getCtx(), toDeliver);
+
+            totalSize += toDeliver.getBody().size();
+
+            if (totalSize > request.getSizeLimit()) {
+                request.getCallback().scanFinished(request.getCtx(), ReasonForFinish.SIZE_LIMIT_EXCEEDED);
+                return;
+            }
+        }
+        request.getCallback().scanFinished(request.getCtx(), ReasonForFinish.NUM_MESSAGES_LIMIT_EXCEEDED);
+
+    }
+}

Added: hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/test/java/org/apache/hedwig/server/persistence/StubScanCallback.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/test/java/org/apache/hedwig/server/persistence/StubScanCallback.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/test/java/org/apache/hedwig/server/persistence/StubScanCallback.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/test/java/org/apache/hedwig/server/persistence/StubScanCallback.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server.persistence;
+
+import java.util.concurrent.LinkedBlockingQueue;
+
+import com.google.protobuf.ByteString;
+import org.apache.hedwig.protocol.PubSubProtocol.Message;
+import org.apache.hedwig.util.ConcurrencyUtils;
+import org.apache.hedwig.util.Either;
+
+public class StubScanCallback implements ScanCallback{
+
+    public static Message END_MESSAGE = Message.newBuilder().setBody(ByteString.EMPTY).build();
+    
+    LinkedBlockingQueue<Either<Message, Exception>> queue = new LinkedBlockingQueue<Either<Message,Exception>>();
+    
+    @Override
+    public void messageScanned(Object ctx, Message message) {
+       ConcurrencyUtils.put(queue, Either.of(message, (Exception) null));
+    }
+    
+    @Override
+    public void scanFailed(Object ctx, Exception exception) {
+        ConcurrencyUtils.put(queue, Either.of((Message) null, exception));
+    }
+    
+    @Override
+    public void scanFinished(Object ctx, ReasonForFinish reason) {
+        ConcurrencyUtils.put(queue, Either.of(END_MESSAGE, (Exception) null));
+        
+    }
+}

Added: hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/test/java/org/apache/hedwig/server/persistence/TestBookKeeperPersistenceManagerBlackBox.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/test/java/org/apache/hedwig/server/persistence/TestBookKeeperPersistenceManagerBlackBox.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/test/java/org/apache/hedwig/server/persistence/TestBookKeeperPersistenceManagerBlackBox.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/test/java/org/apache/hedwig/server/persistence/TestBookKeeperPersistenceManagerBlackBox.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server.persistence;
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+
+import junit.framework.Test;
+import junit.framework.TestSuite;
+
+import org.junit.After;
+import org.junit.Before;
+
+import org.apache.hedwig.server.common.ServerConfiguration;
+import org.apache.hedwig.server.topics.TrivialOwnAllTopicManager;
+
+public class TestBookKeeperPersistenceManagerBlackBox extends TestPersistenceManagerBlackBox {
+    BookKeeperTestBase bktb;
+    private final int numBookies = 3;   
+    
+    @Override
+    @Before
+    protected void setUp() throws Exception {
+        // We need to setUp this class first since the super.setUp() method will
+        // need the BookKeeperTestBase to be instantiated.
+        bktb = new BookKeeperTestBase(numBookies);
+        bktb.setUp();
+        super.setUp();
+    }
+
+    @Override
+    @After
+    protected void tearDown() throws Exception {
+        bktb.tearDown();
+        super.tearDown();
+    }
+
+    @Override
+    long getLowestSeqId() {
+        return 1;
+    }
+
+    @Override
+    PersistenceManager instantiatePersistenceManager() throws Exception {
+        ServerConfiguration conf = new ServerConfiguration();
+        ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
+
+        return new BookkeeperPersistenceManager(bktb.bk, bktb.getZooKeeperClient(), new TrivialOwnAllTopicManager(conf,
+                scheduler), conf, scheduler);
+    }
+
+    @Override
+    public long getExpectedSeqId(int numPublished) {
+        return numPublished;
+    }
+
+    public static Test suite() {
+        return new TestSuite(TestBookKeeperPersistenceManagerBlackBox.class);
+    }
+
+}

Added: hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/test/java/org/apache/hedwig/server/persistence/TestBookkeeperPersistenceManagerWhiteBox.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/test/java/org/apache/hedwig/server/persistence/TestBookkeeperPersistenceManagerWhiteBox.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/test/java/org/apache/hedwig/server/persistence/TestBookkeeperPersistenceManagerWhiteBox.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/test/java/org/apache/hedwig/server/persistence/TestBookkeeperPersistenceManagerWhiteBox.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server.persistence;
+
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+
+import junit.framework.TestCase;
+
+import org.apache.bookkeeper.client.BookKeeper;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.protobuf.ByteString;
+import org.apache.hedwig.HelperMethods;
+import org.apache.hedwig.StubCallback;
+import org.apache.hedwig.protocol.PubSubProtocol.Message;
+import org.apache.hedwig.server.common.ServerConfiguration;
+import org.apache.hedwig.server.topics.TopicManager;
+import org.apache.hedwig.server.topics.TrivialOwnAllTopicManager;
+import org.apache.hedwig.util.ConcurrencyUtils;
+
+public class TestBookkeeperPersistenceManagerWhiteBox extends TestCase {
+
+    BookKeeperTestBase bktb;
+    private final int numBookies = 3;
+    BookkeeperPersistenceManager bkpm;
+    ServerConfiguration conf;
+    ScheduledExecutorService scheduler;
+    TopicManager tm;
+    ByteString topic = ByteString.copyFromUtf8("topic0");
+
+    @Override
+    @Before
+    protected void setUp() throws Exception {
+        super.setUp();
+        bktb = new BookKeeperTestBase(numBookies);
+        bktb.setUp();
+
+        conf = new ServerConfiguration();
+        scheduler = Executors.newScheduledThreadPool(1);
+        tm = new TrivialOwnAllTopicManager(conf, scheduler);
+
+        bkpm = new BookkeeperPersistenceManager(bktb.bk, bktb.getZooKeeperClient(), tm, conf, scheduler);
+    }
+
+    @Override
+    @After
+    protected void tearDown() throws Exception {
+        bktb.tearDown();
+        super.tearDown();
+    }
+
+    @Test
+    public void testEmptyDirtyLedger() throws Exception {
+
+        StubCallback<Void> stubCallback = new StubCallback<Void>();
+        bkpm.acquiredTopic(topic, stubCallback, null);
+        assertNull(ConcurrencyUtils.take(stubCallback.queue).right());
+        // now abandon, and try another time, the prev ledger should be dirty
+
+        bkpm = new BookkeeperPersistenceManager(new BookKeeper(bktb.getZkHostPort()), bktb.getZooKeeperClient(), tm,
+                conf, scheduler);
+        bkpm.acquiredTopic(topic, stubCallback, null);
+        assertNull(ConcurrencyUtils.take(stubCallback.queue).right());
+        assertEquals(0, bkpm.topicInfos.get(topic).ledgerRanges.size());
+    }
+
+    public void testNonEmptyDirtyLedger() throws Exception {
+
+        Random r = new Random();
+        int NUM_MESSAGES_TO_TEST = 100;
+        int SIZE_OF_MESSAGES_TO_TEST = 100;
+        int index = 0;
+        int numPrevLedgers = 0;
+        List<Message> messages = HelperMethods.getRandomPublishedMessages(NUM_MESSAGES_TO_TEST,
+                SIZE_OF_MESSAGES_TO_TEST);
+
+        while (index < messages.size()) {
+
+            StubCallback<Void> stubCallback = new StubCallback<Void>();
+            bkpm.acquiredTopic(topic, stubCallback, null);
+            assertNull(ConcurrencyUtils.take(stubCallback.queue).right());
+            assertEquals(numPrevLedgers, bkpm.topicInfos.get(topic).ledgerRanges.size());
+
+            StubCallback<Long> persistCallback = new StubCallback<Long>();
+            bkpm.persistMessage(new PersistRequest(topic, messages.get(index), persistCallback, null));
+            assertEquals(new Long(index + 1), ConcurrencyUtils.take(persistCallback.queue).left());
+
+            // once in every 10 times, give up ledger
+            if (r.nextInt(10) == 9) {
+                // Make the bkpm lose its memory
+                bkpm.topicInfos.clear();
+                numPrevLedgers++;
+            }
+            index++;
+        }
+
+        // Lets scan now
+        StubScanCallback scanCallback = new StubScanCallback();
+        bkpm.scanMessages(new RangeScanRequest(topic, 1, NUM_MESSAGES_TO_TEST, Long.MAX_VALUE, scanCallback, null));
+        for (int i = 0; i < messages.size(); i++) {
+            Message scannedMessage = ConcurrencyUtils.take(scanCallback.queue).left();
+            assertTrue(messages.get(i).getBody().equals(scannedMessage.getBody()));
+            assertEquals(i + 1, scannedMessage.getMsgId().getLocalComponent());
+        }
+        assertTrue(StubScanCallback.END_MESSAGE == ConcurrencyUtils.take(scanCallback.queue).left());
+
+    }
+
+}

Added: hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/test/java/org/apache/hedwig/server/persistence/TestLocalDBPersistenceManagerBlackBox.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/test/java/org/apache/hedwig/server/persistence/TestLocalDBPersistenceManagerBlackBox.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/test/java/org/apache/hedwig/server/persistence/TestLocalDBPersistenceManagerBlackBox.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/test/java/org/apache/hedwig/server/persistence/TestLocalDBPersistenceManagerBlackBox.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server.persistence;
+
+import junit.framework.Test;
+import junit.framework.TestSuite;
+
+import org.apache.hedwig.server.persistence.LocalDBPersistenceManager;
+import org.apache.hedwig.server.persistence.PersistenceManager;
+
+public class TestLocalDBPersistenceManagerBlackBox extends TestPersistenceManagerBlackBox {
+
+    @Override
+    protected void tearDown() throws Exception {
+        super.tearDown();
+        ((LocalDBPersistenceManager) persistenceManager).reset();
+    }
+
+    @Override
+    long getLowestSeqId() {
+        return 1;
+    }
+
+    @Override
+    PersistenceManager instantiatePersistenceManager() {
+        return LocalDBPersistenceManager.instance();
+    }
+
+    @Override
+    public long getExpectedSeqId(int numPublished) {
+        return numPublished;
+    }
+
+    public static Test suite() {
+        return new TestSuite(TestLocalDBPersistenceManagerBlackBox.class);
+    }
+
+}



Mime
View raw message