zookeeper-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From iv...@apache.org
Subject svn commit: r1432485 - in /zookeeper/bookkeeper/trunk: ./ hedwig-server/src/main/java/org/apache/hedwig/server/delivery/ hedwig-server/src/test/java/org/apache/hedwig/server/delivery/ hedwig-server/src/test/java/org/apache/hedwig/server/persistence/
Date Sat, 12 Jan 2013 17:55:27 GMT
Author: ivank
Date: Sat Jan 12 17:55:26 2013
New Revision: 1432485

URL: http://svn.apache.org/viewvc?rev=1432485&view=rev
Log:
BOOKKEEPER-503: The test case of TestThrottlingDelivery#testServerSideThrottle failed sometimes
(jiannan & sijie via ivank)

Modified:
    zookeeper/bookkeeper/trunk/CHANGES.txt
    zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/FIFODeliveryManager.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/delivery/TestFIFODeliveryManager.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/StubPersistenceManager.java

Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1432485&r1=1432484&r2=1432485&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Sat Jan 12 17:55:26 2013
@@ -238,6 +238,8 @@ Trunk (unreleased changes)
 
         BOOKKEEPER-539: ClientNotSubscribedException & doesn't receive enough messages
in TestThrottlingDelivery#testServerSideThrottle (sijie)
 
+        BOOKKEEPER-503: The test case of TestThrottlingDelivery#testServerSideThrottle failed
sometimes (jiannan & sijie via ivank)
+
     IMPROVEMENTS:
 
       BOOKKEEPER-467: Allocate ports for testing dynamically (ivank)

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/FIFODeliveryManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/FIFODeliveryManager.java?rev=1432485&r1=1432484&r2=1432485&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/FIFODeliveryManager.java
(original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/FIFODeliveryManager.java
Sat Jan 12 17:55:26 2013
@@ -483,22 +483,18 @@ public class FIFODeliveryManager impleme
             return topic;
         }
 
-        public long getLastLocalSeqIdDelivered() {
-            return lastLocalSeqIdDelivered;
-        }
-
-        public long getLastScanErrorTime() {
+        public synchronized long getLastScanErrorTime() {
             return lastScanErrorTime;
         }
 
-        public void setLastScanErrorTime(long lastScanErrorTime) {
+        public synchronized void setLastScanErrorTime(long lastScanErrorTime) {
             this.lastScanErrorTime = lastScanErrorTime;
         }
 
         /**
          * Clear the last scan error time so it could be retry immediately.
          */
-        protected void clearLastScanErrorTime() {
+        protected synchronized void clearLastScanErrorTime() {
             this.lastScanErrorTime = -1;
         }
 
@@ -511,7 +507,7 @@ public class FIFODeliveryManager impleme
             }
         }
 
-        protected void messageConsumed(long newSeqIdConsumed) {
+        protected synchronized void messageConsumed(long newSeqIdConsumed) {
             if (newSeqIdConsumed <= lastSeqIdConsumedUtil) {
                 return;
             }
@@ -563,20 +559,22 @@ public class FIFODeliveryManager impleme
                 return;
             }
 
-            // check whether we have delivered enough messages without receiving their consumes
-            if (msgLimitExceeded()) {
-                logger.info("Subscriber ({}) is throttled : last delivered {}, last consumed
{}.",
-                            va(this, lastLocalSeqIdDelivered, lastSeqIdConsumedUtil));
-                isThrottled = true;
-                // do nothing, since the delivery process would be throttled.
-                // After message consumed, it would be added back to retry queue.
-                return;
-            }
+            synchronized (this) {
+                // check whether we have delivered enough messages without receiving their
consumes
+                if (msgLimitExceeded()) {
+                    logger.info("Subscriber ({}) is throttled : last delivered {}, last consumed
{}.",
+                                va(this, lastLocalSeqIdDelivered, lastSeqIdConsumedUtil));
+                    isThrottled = true;
+                    // do nothing, since the delivery process would be throttled.
+                    // After message consumed, it would be added back to retry queue.
+                    return;
+                }
 
-            localSeqIdDeliveringNow = persistenceMgr.getSeqIdAfterSkipping(topic, lastLocalSeqIdDelivered,
1);
+                localSeqIdDeliveringNow = persistenceMgr.getSeqIdAfterSkipping(topic, lastLocalSeqIdDelivered,
1);
 
-            outstandingScanRequest = new ScanRequest(topic, localSeqIdDeliveringNow,
-                    /* callback= */this, /* ctx= */null);
+                outstandingScanRequest = new ScanRequest(topic, localSeqIdDeliveringNow,
+                        /* callback= */this, /* ctx= */null);
+            }
 
             persistenceMgr.scanSingleMessage(outstandingScanRequest);
         }
@@ -665,24 +663,26 @@ public class FIFODeliveryManager impleme
                 return;
             }
 
-            lastLocalSeqIdDelivered = localSeqIdDeliveringNow;
+            synchronized (this) {
+                lastLocalSeqIdDelivered = localSeqIdDeliveringNow;
 
-            if (lastLocalSeqIdDelivered > lastSeqIdCommunicatedExternally + SEQ_ID_SLACK)
{
-                // Note: The order of the next 2 statements is important. We should
-                // submit a request to change our delivery pointer only *after* we
-                // have actually changed it. Otherwise, there is a race condition
-                // with removal of this channel, w.r.t, maintaining the deliveryPtrs
-                // tree map.
-                long prevId = lastSeqIdCommunicatedExternally;
-                lastSeqIdCommunicatedExternally = lastLocalSeqIdDelivered;
-                moveDeliveryPtrForward(this, prevId, lastLocalSeqIdDelivered);
+                if (lastLocalSeqIdDelivered > lastSeqIdCommunicatedExternally + SEQ_ID_SLACK)
{
+                    // Note: The order of the next 2 statements is important. We should
+                    // submit a request to change our delivery pointer only *after* we
+                    // have actually changed it. Otherwise, there is a race condition
+                    // with removal of this channel, w.r.t, maintaining the deliveryPtrs
+                    // tree map.
+                    long prevId = lastSeqIdCommunicatedExternally;
+                    lastSeqIdCommunicatedExternally = lastLocalSeqIdDelivered;
+                    moveDeliveryPtrForward(this, prevId, lastLocalSeqIdDelivered);
+                }
             }
             // increment deliveried message
             ServerStats.getInstance().incrementMessagesDelivered();
             deliverNextMessage();
         }
 
-        public long getLastSeqIdCommunicatedExternally() {
+        public synchronized long getLastSeqIdCommunicatedExternally() {
             return lastSeqIdCommunicatedExternally;
         }
 
@@ -730,8 +730,10 @@ public class FIFODeliveryManager impleme
                 doStopServingSubscriber(prevSubscriber, se);
             }
 
-            lastSeqIdCommunicatedExternally = lastLocalSeqIdDelivered;
-            addDeliveryPtr(this, lastLocalSeqIdDelivered);
+            synchronized (this) {
+                lastSeqIdCommunicatedExternally = lastLocalSeqIdDelivered;
+                addDeliveryPtr(this, lastLocalSeqIdDelivered);
+            }
 
             deliverNextMessage();
         };
@@ -741,7 +743,9 @@ public class FIFODeliveryManager impleme
             StringBuilder sb = new StringBuilder();
             sb.append("Topic: ");
             sb.append(topic.toStringUtf8());
-            sb.append("DeliveryPtr: ");
+            sb.append("Subscriber: ");
+            sb.append(subscriberId.toStringUtf8());
+            sb.append(", DeliveryPtr: ");
             sb.append(lastLocalSeqIdDelivered);
             return sb.toString();
 

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/delivery/TestFIFODeliveryManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/delivery/TestFIFODeliveryManager.java?rev=1432485&r1=1432484&r2=1432485&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/delivery/TestFIFODeliveryManager.java
(original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/delivery/TestFIFODeliveryManager.java
Sat Jan 12 17:55:26 2013
@@ -17,41 +17,36 @@
  */
 package org.apache.hedwig.server.delivery;
 
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.junit.Test;
-
-import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.protobuf.ByteString;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 
-import org.apache.hedwig.client.api.Subscriber;
+import org.apache.hedwig.exceptions.PubSubException;
+import org.apache.hedwig.filter.PipelineFilter;
 import org.apache.hedwig.protocol.PubSubProtocol.Message;
 import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionPreferences;
-import org.apache.hedwig.filter.PipelineFilter;
-import org.apache.hedwig.server.subscriptions.AllToAllTopologyFilter;
-
 import org.apache.hedwig.protocol.PubSubProtocol.PubSubResponse;
-import org.apache.hedwig.exceptions.PubSubException;
-
+import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionPreferences;
 import org.apache.hedwig.server.common.ServerConfiguration;
-import org.apache.hedwig.util.Callback;
-import org.apache.hedwig.server.persistence.StubPersistenceManager;
-import org.apache.hedwig.server.persistence.PersistenceManager;
 import org.apache.hedwig.server.persistence.PersistRequest;
+import org.apache.hedwig.server.persistence.PersistenceManager;
+import org.apache.hedwig.server.persistence.StubPersistenceManager;
+import org.apache.hedwig.server.subscriptions.AllToAllTopologyFilter;
+import org.apache.hedwig.util.Callback;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.protobuf.ByteString;
 
 public class TestFIFODeliveryManager {
     static Logger logger = LoggerFactory.getLogger(TestFIFODeliveryManager.class);
@@ -133,7 +128,7 @@ public class TestFIFODeliveryManager {
         filter.addLast(new AllToAllTopologyFilter());
         filter.initialize(conf.getConf());
         filter.setSubscriptionPreferences(topic, subscriber, prefs);
-        MessageSeqId startId = MessageSeqId.newBuilder().build();
+        MessageSeqId startId = MessageSeqId.newBuilder().setLocalComponent(1).build();
 
         CountDownLatch l = new CountDownLatch(1);
         Message m = Message.newBuilder().setBody(ByteString.copyFromUtf8(String.valueOf(1))).build();
@@ -194,4 +189,109 @@ public class TestFIFODeliveryManager {
         r = dep.getNextResponse();
         assertNull("There should only be 2 responses", r);
     }
+
+    static class ExecutorDeliveryEndPoint implements DeliveryEndPoint {
+        ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
+        AtomicInteger numDelivered = new AtomicInteger();
+        final DeliveryManager dm;
+
+        ExecutorDeliveryEndPoint(DeliveryManager dm) {
+            this.dm = dm;
+        }
+
+        public void send(final PubSubResponse response, final DeliveryCallback callback)
{
+            executor.submit(new Runnable() {
+                    public void run() {
+                        if (response.hasMessage()) {
+                            MessageSeqId msgid = response.getMessage().getMsgId();
+                            if ((msgid.getLocalComponent() % 2) == 1) {
+                                dm.messageConsumed(response.getTopic(),
+                                        response.getSubscriberId(),
+                                        response.getMessage().getMsgId());
+                            } else {
+                                executor.schedule(new Runnable() {
+                                        public void run() {
+                                            dm.messageConsumed(response.getTopic(),
+                                                    response.getSubscriberId(),
+                                                    response.getMessage().getMsgId());
+                                        }
+                                    }, 1, TimeUnit.SECONDS);
+                            }
+                        }
+                        numDelivered.incrementAndGet();
+                        callback.sendingFinished();
+                    }
+                });
+        }
+
+        public void close() {
+            executor.shutdown();
+        }
+
+        int getNumDelivered() {
+            return numDelivered.get();
+        }
+    }
+
+    /**
+     * Test throttle race issue cause by messageConsumed and doDeliverNextMessage
+     * {@link https://issues.apache.org/jira/browse/BOOKKEEPER-503}
+     */
+    @Test
+    public void testFIFODeliveryThrottlingRace() throws Exception {
+        final int numMessages = 20;
+        final int throttleSize = 10;
+        ServerConfiguration conf = new ServerConfiguration() {
+                @Override
+                public int getDefaultMessageWindowSize() {
+                    return throttleSize;
+                }
+            };
+        ByteString topic = ByteString.copyFromUtf8("throttlingRaceTopic");
+        ByteString subscriber = ByteString.copyFromUtf8("throttlingRaceSubscriber");
+
+        PersistenceManager pm = new StubPersistenceManager();
+        FIFODeliveryManager fdm = new FIFODeliveryManager(pm, conf);
+        ExecutorDeliveryEndPoint dep = new ExecutorDeliveryEndPoint(fdm);
+        SubscriptionPreferences prefs = SubscriptionPreferences.newBuilder().build();
+
+        PipelineFilter filter = new PipelineFilter();
+        filter.addLast(new AllToAllTopologyFilter());
+        filter.initialize(conf.getConf());
+        filter.setSubscriptionPreferences(topic, subscriber, prefs);
+
+        CountDownLatch l = new CountDownLatch(numMessages);
+
+        TestCallback firstCallback = null;
+        for (int i = 0; i < numMessages; i++) {
+            Message m = Message.newBuilder().setBody(ByteString.copyFromUtf8(String.valueOf(i))).build();
+            TestCallback cb = new TestCallback(l);
+            if (firstCallback == null) {
+                firstCallback = cb;
+            }
+            pm.persistMessage(new PersistRequest(topic, m, cb, null));
+        }
+        fdm.start();
+        assertTrue("Persistence never finished", l.await(10, TimeUnit.SECONDS));
+        fdm.startServingSubscription(topic, subscriber, prefs, firstCallback.getId(), dep,
filter,
+                new Callback<Void>() {
+                     @Override
+                     public void operationFinished(Object ctx, Void result) {
+                     }
+                     @Override
+                     public void operationFailed(Object ctx, PubSubException exception) {
+                         // would not happened
+                     }
+                }, null);
+
+        int count = 30; // wait for 30 seconds maximum
+        while (dep.getNumDelivered() < numMessages) {
+            Thread.sleep(1000);
+            if (count-- == 0) {
+                break;
+            }
+        }
+        assertEquals("Should have delivered " + numMessages, numMessages, dep.getNumDelivered());
+    }
+
 }

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/StubPersistenceManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/StubPersistenceManager.java?rev=1432485&r1=1432484&r2=1432485&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/StubPersistenceManager.java
(original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/StubPersistenceManager.java
Sat Jan 12 17:55:26 2013
@@ -88,9 +88,16 @@ public class StubPersistenceManager impl
             return;
         }
 
-        request.getCallback().messageScanned(request.getCtx(),
-                                             messages.get(request.getTopic()).get((int) request.getStartSeqId()));
+        long index = request.getStartSeqId() - 1;
+        List<Message> messageList = messages.get(request.getTopic());
+        if (index >= messageList.size()) {
+            request.getCallback().scanFinished(request.getCtx(), ReasonForFinish.NO_MORE_MESSAGES);
+            return;
+        }
 
+        Message msg = messageList.get((int) index);
+        Message toDeliver = MessageIdUtils.mergeLocalSeqId(msg, request.getStartSeqId());
+        request.getCallback().messageScanned(request.getCtx(), toDeliver);
     }
 
     public void scanMessages(RangeScanRequest request) {



Mime
View raw message