Author: ivank
Date: Sat Jan 12 17:55:26 2013
New Revision: 1432485
URL: http://svn.apache.org/viewvc?rev=1432485&view=rev
Log:
BOOKKEEPER-503: The test case of TestThrottlingDelivery#testServerSideThrottle failed sometimes
(jiannan & sijie via ivank)
Modified:
zookeeper/bookkeeper/trunk/CHANGES.txt
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/FIFODeliveryManager.java
zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/delivery/TestFIFODeliveryManager.java
zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/StubPersistenceManager.java
Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1432485&r1=1432484&r2=1432485&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Sat Jan 12 17:55:26 2013
@@ -238,6 +238,8 @@ Trunk (unreleased changes)
BOOKKEEPER-539: ClientNotSubscribedException & doesn't receive enough messages
in TestThrottlingDelivery#testServerSideThrottle (sijie)
+ BOOKKEEPER-503: The test case of TestThrottlingDelivery#testServerSideThrottle failed
sometimes (jiannan & sijie via ivank)
+
IMPROVEMENTS:
BOOKKEEPER-467: Allocate ports for testing dynamically (ivank)
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/FIFODeliveryManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/FIFODeliveryManager.java?rev=1432485&r1=1432484&r2=1432485&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/FIFODeliveryManager.java
(original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/FIFODeliveryManager.java
Sat Jan 12 17:55:26 2013
@@ -483,22 +483,18 @@ public class FIFODeliveryManager impleme
return topic;
}
- public long getLastLocalSeqIdDelivered() {
- return lastLocalSeqIdDelivered;
- }
-
- public long getLastScanErrorTime() {
+ public synchronized long getLastScanErrorTime() {
return lastScanErrorTime;
}
- public void setLastScanErrorTime(long lastScanErrorTime) {
+ public synchronized void setLastScanErrorTime(long lastScanErrorTime) {
this.lastScanErrorTime = lastScanErrorTime;
}
/**
* Clear the last scan error time so it could be retry immediately.
*/
- protected void clearLastScanErrorTime() {
+ protected synchronized void clearLastScanErrorTime() {
this.lastScanErrorTime = -1;
}
@@ -511,7 +507,7 @@ public class FIFODeliveryManager impleme
}
}
- protected void messageConsumed(long newSeqIdConsumed) {
+ protected synchronized void messageConsumed(long newSeqIdConsumed) {
if (newSeqIdConsumed <= lastSeqIdConsumedUtil) {
return;
}
@@ -563,20 +559,22 @@ public class FIFODeliveryManager impleme
return;
}
- // check whether we have delivered enough messages without receiving their consumes
- if (msgLimitExceeded()) {
- logger.info("Subscriber ({}) is throttled : last delivered {}, last consumed
{}.",
- va(this, lastLocalSeqIdDelivered, lastSeqIdConsumedUtil));
- isThrottled = true;
- // do nothing, since the delivery process would be throttled.
- // After message consumed, it would be added back to retry queue.
- return;
- }
+ synchronized (this) {
+ // check whether we have delivered enough messages without receiving their
consumes
+ if (msgLimitExceeded()) {
+ logger.info("Subscriber ({}) is throttled : last delivered {}, last consumed
{}.",
+ va(this, lastLocalSeqIdDelivered, lastSeqIdConsumedUtil));
+ isThrottled = true;
+ // do nothing, since the delivery process would be throttled.
+ // After message consumed, it would be added back to retry queue.
+ return;
+ }
- localSeqIdDeliveringNow = persistenceMgr.getSeqIdAfterSkipping(topic, lastLocalSeqIdDelivered,
1);
+ localSeqIdDeliveringNow = persistenceMgr.getSeqIdAfterSkipping(topic, lastLocalSeqIdDelivered,
1);
- outstandingScanRequest = new ScanRequest(topic, localSeqIdDeliveringNow,
- /* callback= */this, /* ctx= */null);
+ outstandingScanRequest = new ScanRequest(topic, localSeqIdDeliveringNow,
+ /* callback= */this, /* ctx= */null);
+ }
persistenceMgr.scanSingleMessage(outstandingScanRequest);
}
@@ -665,24 +663,26 @@ public class FIFODeliveryManager impleme
return;
}
- lastLocalSeqIdDelivered = localSeqIdDeliveringNow;
+ synchronized (this) {
+ lastLocalSeqIdDelivered = localSeqIdDeliveringNow;
- if (lastLocalSeqIdDelivered > lastSeqIdCommunicatedExternally + SEQ_ID_SLACK)
{
- // Note: The order of the next 2 statements is important. We should
- // submit a request to change our delivery pointer only *after* we
- // have actually changed it. Otherwise, there is a race condition
- // with removal of this channel, w.r.t, maintaining the deliveryPtrs
- // tree map.
- long prevId = lastSeqIdCommunicatedExternally;
- lastSeqIdCommunicatedExternally = lastLocalSeqIdDelivered;
- moveDeliveryPtrForward(this, prevId, lastLocalSeqIdDelivered);
+ if (lastLocalSeqIdDelivered > lastSeqIdCommunicatedExternally + SEQ_ID_SLACK)
{
+ // Note: The order of the next 2 statements is important. We should
+ // submit a request to change our delivery pointer only *after* we
+ // have actually changed it. Otherwise, there is a race condition
+ // with removal of this channel, w.r.t, maintaining the deliveryPtrs
+ // tree map.
+ long prevId = lastSeqIdCommunicatedExternally;
+ lastSeqIdCommunicatedExternally = lastLocalSeqIdDelivered;
+ moveDeliveryPtrForward(this, prevId, lastLocalSeqIdDelivered);
+ }
}
// increment deliveried message
ServerStats.getInstance().incrementMessagesDelivered();
deliverNextMessage();
}
- public long getLastSeqIdCommunicatedExternally() {
+ public synchronized long getLastSeqIdCommunicatedExternally() {
return lastSeqIdCommunicatedExternally;
}
@@ -730,8 +730,10 @@ public class FIFODeliveryManager impleme
doStopServingSubscriber(prevSubscriber, se);
}
- lastSeqIdCommunicatedExternally = lastLocalSeqIdDelivered;
- addDeliveryPtr(this, lastLocalSeqIdDelivered);
+ synchronized (this) {
+ lastSeqIdCommunicatedExternally = lastLocalSeqIdDelivered;
+ addDeliveryPtr(this, lastLocalSeqIdDelivered);
+ }
deliverNextMessage();
};
@@ -741,7 +743,9 @@ public class FIFODeliveryManager impleme
StringBuilder sb = new StringBuilder();
sb.append("Topic: ");
sb.append(topic.toStringUtf8());
- sb.append("DeliveryPtr: ");
+ sb.append("Subscriber: ");
+ sb.append(subscriberId.toStringUtf8());
+ sb.append(", DeliveryPtr: ");
sb.append(lastLocalSeqIdDelivered);
return sb.toString();
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/delivery/TestFIFODeliveryManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/delivery/TestFIFODeliveryManager.java?rev=1432485&r1=1432484&r2=1432485&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/delivery/TestFIFODeliveryManager.java
(original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/delivery/TestFIFODeliveryManager.java
Sat Jan 12 17:55:26 2013
@@ -17,41 +17,36 @@
*/
package org.apache.hedwig.server.delivery;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.junit.Test;
-
-import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.protobuf.ByteString;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.hedwig.client.api.Subscriber;
+import org.apache.hedwig.exceptions.PubSubException;
+import org.apache.hedwig.filter.PipelineFilter;
import org.apache.hedwig.protocol.PubSubProtocol.Message;
import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionPreferences;
-import org.apache.hedwig.filter.PipelineFilter;
-import org.apache.hedwig.server.subscriptions.AllToAllTopologyFilter;
-
import org.apache.hedwig.protocol.PubSubProtocol.PubSubResponse;
-import org.apache.hedwig.exceptions.PubSubException;
-
+import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionPreferences;
import org.apache.hedwig.server.common.ServerConfiguration;
-import org.apache.hedwig.util.Callback;
-import org.apache.hedwig.server.persistence.StubPersistenceManager;
-import org.apache.hedwig.server.persistence.PersistenceManager;
import org.apache.hedwig.server.persistence.PersistRequest;
+import org.apache.hedwig.server.persistence.PersistenceManager;
+import org.apache.hedwig.server.persistence.StubPersistenceManager;
+import org.apache.hedwig.server.subscriptions.AllToAllTopologyFilter;
+import org.apache.hedwig.util.Callback;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.protobuf.ByteString;
public class TestFIFODeliveryManager {
static Logger logger = LoggerFactory.getLogger(TestFIFODeliveryManager.class);
@@ -133,7 +128,7 @@ public class TestFIFODeliveryManager {
filter.addLast(new AllToAllTopologyFilter());
filter.initialize(conf.getConf());
filter.setSubscriptionPreferences(topic, subscriber, prefs);
- MessageSeqId startId = MessageSeqId.newBuilder().build();
+ MessageSeqId startId = MessageSeqId.newBuilder().setLocalComponent(1).build();
CountDownLatch l = new CountDownLatch(1);
Message m = Message.newBuilder().setBody(ByteString.copyFromUtf8(String.valueOf(1))).build();
@@ -194,4 +189,109 @@ public class TestFIFODeliveryManager {
r = dep.getNextResponse();
assertNull("There should only be 2 responses", r);
}
+
+ static class ExecutorDeliveryEndPoint implements DeliveryEndPoint {
+ ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
+ AtomicInteger numDelivered = new AtomicInteger();
+ final DeliveryManager dm;
+
+ ExecutorDeliveryEndPoint(DeliveryManager dm) {
+ this.dm = dm;
+ }
+
+ public void send(final PubSubResponse response, final DeliveryCallback callback)
{
+ executor.submit(new Runnable() {
+ public void run() {
+ if (response.hasMessage()) {
+ MessageSeqId msgid = response.getMessage().getMsgId();
+ if ((msgid.getLocalComponent() % 2) == 1) {
+ dm.messageConsumed(response.getTopic(),
+ response.getSubscriberId(),
+ response.getMessage().getMsgId());
+ } else {
+ executor.schedule(new Runnable() {
+ public void run() {
+ dm.messageConsumed(response.getTopic(),
+ response.getSubscriberId(),
+ response.getMessage().getMsgId());
+ }
+ }, 1, TimeUnit.SECONDS);
+ }
+ }
+ numDelivered.incrementAndGet();
+ callback.sendingFinished();
+ }
+ });
+ }
+
+ public void close() {
+ executor.shutdown();
+ }
+
+ int getNumDelivered() {
+ return numDelivered.get();
+ }
+ }
+
+ /**
+ * Test throttle race issue cause by messageConsumed and doDeliverNextMessage
+ * {@link https://issues.apache.org/jira/browse/BOOKKEEPER-503}
+ */
+ @Test
+ public void testFIFODeliveryThrottlingRace() throws Exception {
+ final int numMessages = 20;
+ final int throttleSize = 10;
+ ServerConfiguration conf = new ServerConfiguration() {
+ @Override
+ public int getDefaultMessageWindowSize() {
+ return throttleSize;
+ }
+ };
+ ByteString topic = ByteString.copyFromUtf8("throttlingRaceTopic");
+ ByteString subscriber = ByteString.copyFromUtf8("throttlingRaceSubscriber");
+
+ PersistenceManager pm = new StubPersistenceManager();
+ FIFODeliveryManager fdm = new FIFODeliveryManager(pm, conf);
+ ExecutorDeliveryEndPoint dep = new ExecutorDeliveryEndPoint(fdm);
+ SubscriptionPreferences prefs = SubscriptionPreferences.newBuilder().build();
+
+ PipelineFilter filter = new PipelineFilter();
+ filter.addLast(new AllToAllTopologyFilter());
+ filter.initialize(conf.getConf());
+ filter.setSubscriptionPreferences(topic, subscriber, prefs);
+
+ CountDownLatch l = new CountDownLatch(numMessages);
+
+ TestCallback firstCallback = null;
+ for (int i = 0; i < numMessages; i++) {
+ Message m = Message.newBuilder().setBody(ByteString.copyFromUtf8(String.valueOf(i))).build();
+ TestCallback cb = new TestCallback(l);
+ if (firstCallback == null) {
+ firstCallback = cb;
+ }
+ pm.persistMessage(new PersistRequest(topic, m, cb, null));
+ }
+ fdm.start();
+ assertTrue("Persistence never finished", l.await(10, TimeUnit.SECONDS));
+ fdm.startServingSubscription(topic, subscriber, prefs, firstCallback.getId(), dep,
filter,
+ new Callback<Void>() {
+ @Override
+ public void operationFinished(Object ctx, Void result) {
+ }
+ @Override
+ public void operationFailed(Object ctx, PubSubException exception) {
+ // would not happened
+ }
+ }, null);
+
+ int count = 30; // wait for 30 seconds maximum
+ while (dep.getNumDelivered() < numMessages) {
+ Thread.sleep(1000);
+ if (count-- == 0) {
+ break;
+ }
+ }
+ assertEquals("Should have delivered " + numMessages, numMessages, dep.getNumDelivered());
+ }
+
}
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/StubPersistenceManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/StubPersistenceManager.java?rev=1432485&r1=1432484&r2=1432485&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/StubPersistenceManager.java
(original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/StubPersistenceManager.java
Sat Jan 12 17:55:26 2013
@@ -88,9 +88,16 @@ public class StubPersistenceManager impl
return;
}
- request.getCallback().messageScanned(request.getCtx(),
- messages.get(request.getTopic()).get((int) request.getStartSeqId()));
+ long index = request.getStartSeqId() - 1;
+ List<Message> messageList = messages.get(request.getTopic());
+ if (index >= messageList.size()) {
+ request.getCallback().scanFinished(request.getCtx(), ReasonForFinish.NO_MORE_MESSAGES);
+ return;
+ }
+ Message msg = messageList.get((int) index);
+ Message toDeliver = MessageIdUtils.mergeLocalSeqId(msg, request.getStartSeqId());
+ request.getCallback().messageScanned(request.getCtx(), toDeliver);
}
public void scanMessages(RangeScanRequest request) {
|