activemq-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Phillip Henry (Commented) (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (AMQ-3732) Different methods synchronizing on different mutexes when changing the same field
Date Mon, 27 Feb 2012 19:38:46 GMT

    [ https://issues.apache.org/jira/browse/AMQ-3732?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13217443#comment-13217443
] 

Phillip Henry commented on AMQ-3732:
------------------------------------

OK, it's not quite a unit test - it's a stress test that uses JUnit - but it illustrates the
point.

This code fails on my Mac (2.66GHz Intel Core 2 Duo) after a non-deterministic number of iterations.


Most of the code is just simple stub implementations of ActiveMQ classes and interfaces with
one or two methods implemented or overriden. (I tried using JMock but it didn't seem multi-threaded
friendly). Basically, it starts two threads - one to pull a message, one to acknowledge. The
pulling thread should increase prefetchExtension in pullMessage() thus:
{code}
        	synchronized(this) {
        		prefetchExtension++;
        		dispatchCounterBeforePull = dispatchCounter;
        	}
{code}
The other thread calls acknowledge with the stubbed objects and PrefetchSubscription object
itself in such a state that this line is executed:
{code}
                            prefetchExtension = Math.max(prefetchExtension, index + 1);
{code}
where index is the index of the number of dispatched MessageReference collection. This should
never be more than 0 since only message is ever dispatch()-ed by the test. All objects are
instantiated anew on each iteration.

Given this, prefetchExtension should only ever be 0 (neither test has hit their target code)
or 1 (from prefetchExtension++ or Math.max(prefetchExtension, 1) because index is always 0).

However, the test demonstrates that occasionally prefetchExtension is 2. I posit that this
is because the threads can sometimes execute their lines at the same time; prefetchExtension++
is not atomic (it's actually prefetchExtension = prefetchExtension + 1); and changing it is
not guarded by the same mutex. 

For example, the order might look like:

Step 1. PULL THREAD: add 1 to prefetchExtension. To do this, find out what prefetchExtension
is. But before this thread can do so, context switches to the thread executing acknowledge()
in step 2, immediately below.

Step 2. ACK THREAD: prefetchExtension is the maximum of its current value and 1. It looks
like its current value is 0 so set prefetchExtension to 1.

Step 3. PULL THREAD: has found that prefetchExtension is 1 and adds one to it giving a total
of 2.


Test case:
{code}
package org.apache.activemq.broker.region;

import java.io.IOException;
import java.net.URI;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ThreadPoolExecutor;

import javax.jms.InvalidSelectorException;
import javax.jms.JMSException;

import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.Connection;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.Connector;
import org.apache.activemq.broker.ConsumerBrokerExchange;
import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
import org.apache.activemq.broker.region.policy.SlowConsumerStrategy;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.BrokerId;
import org.apache.activemq.command.BrokerInfo;
import org.apache.activemq.command.Command;
import org.apache.activemq.command.ConnectionControl;
import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.command.ConsumerControl;
import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.DestinationInfo;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.MessageDispatchNotification;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.MessagePull;
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.command.RemoveSubscriptionInfo;
import org.apache.activemq.command.Response;
import org.apache.activemq.command.SessionInfo;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.state.CommandVisitor;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.store.kahadb.plist.PListStore;
import org.apache.activemq.thread.Scheduler;
import org.apache.activemq.usage.MemoryUsage;
import org.apache.activemq.usage.SystemUsage;
import org.apache.activemq.usage.Usage;

import junit.framework.TestCase;

public class PrefetchSubscriptionStressTest extends TestCase {

	private PrefetchSubscriptionStub toTest;
	private final MessageId messageId = new MessageId();
	private MessageAck ack;
	private MessageReference node;

	protected void setUp() throws Exception {
		super.setUp();
	}
	
	public void testMultiThreadedAccessToPrefetchExtension() throws InterruptedException, InvalidSelectorException
{
		final int callsPerThread = 1;
		final int numIterations = 1000;
		
		for (int i = 0 ; i < numIterations ; i++) {
			final MessagePull pull = initializeStubs();
			
			Thread messagePuller = new Thread(new MessagePuller(callsPerThread, pull));
			Thread dispatchAndAcknowledger = new Thread(new DispatchAndAcknowledger(ack, node, callsPerThread));
		
			startThreads(messagePuller, dispatchAndAcknowledger);
			
			waitForThreadsToStop(messagePuller, dispatchAndAcknowledger);
			
			int actualPrefetchExtensions = toTest.getPrefetchExtension();
			assertEquals("failed on iteration: " + i, 1, actualPrefetchExtensions);
		}
	}
	
	private void waitForThreadsToStop(Thread messagePuller, Thread dispatchAndAcknowledger) throws
InterruptedException {
		dispatchAndAcknowledger.join();
		messagePuller.join();	
	}

	private void startThreads(Thread messagePuller, Thread dispatchAndAcknowledger) {
		messagePuller.start();
		dispatchAndAcknowledger.start();
	}

	private MessagePull initializeStubs() throws InvalidSelectorException {
		initialise();
		primeMessagesAndDestinations();
		final MessagePull pull = primeForPullMessage();
		return pull;
	}

	private void dispatchAndAcknowledge(final MessageAck ack, final MessageReference node, final
int numToDispatch)
			throws IOException, Exception {
		ConnectionContext context = null;
		for (int i = 0 ; i < numToDispatch ; i++) {
			toTest.dispatch(node);
		}
		toTest.acknowledge(context , ack);
	}
	
	public class MessagePuller implements Runnable {
		
		private final int totalRuns;
		private final MessagePull pull;
		
		public MessagePuller(int totalRuns, MessagePull pull) {
			super();
			this.totalRuns = totalRuns;
			this.pull = pull;
		}
		
		@Override
		public void run() {
			for (int i = 0 ; i < totalRuns ; i++) {
				try {
					pullMessage(pull);
				} catch (Exception e) {
					e.printStackTrace();
				}
			}
		}
		
	}

	private void pullMessage(final MessagePull pull) throws Exception {
		ConnectionContext context = null;
		toTest.pullMessage(context, pull);
	}

	private MessagePull primeForPullMessage() {
		final MessagePull pull = new MessagePull() {
			@Override
			public long getTimeout() {
				return -1;
			}
		};
		return pull;
	}

	
	public class DispatchAndAcknowledger implements Runnable {
		
		private final MessageAck ack;
		private final MessageReference node;
		private final int totalRuns;
		
		public DispatchAndAcknowledger(MessageAck ack, MessageReference node, int totalRuns) {
			super();
			this.ack = ack;
			this.node = node;
			this.totalRuns = totalRuns;
		}

		@Override
		public void run() {
			for (int i = 0 ; i < totalRuns ; i++) {
				try {
					dispatchAndAcknowledge(ack, node, 1);
				} catch (IOException e) {
					e.printStackTrace();
				} catch (Exception e) {
					e.printStackTrace();
				}
			}
		}
	}

	private void initialise() throws InvalidSelectorException {
		SystemUsage usageManager = null;
		
		final Connection connection = new Connection() {
			
			@Override
			public void stop() throws Exception {			}
			
			@Override
			public void start() throws Exception {			}
			
			@Override
			public void updateClient(ConnectionControl control) {			}
			
			@Override
			public void serviceExceptionAsync(IOException e) {			}
			
			@Override
			public void serviceException(Throwable error) {			}
			
			@Override
			public Response service(Command command) { return null; }
			
			@Override
			public boolean isSlow() { return false; }
			
			@Override
			public boolean isNetworkConnection() { return false; }
			
			@Override
			public boolean isManageable() { return false; }
			
			@Override
			public boolean isFaultTolerantConnection() { return false; }
			
			@Override
			public boolean isConnected() { return false; }
			
			@Override
			public boolean isBlocked() { return false; }
			
			@Override
			public boolean isActive() { return false; }
			
			@Override
			public ConnectionStatistics getStatistics() { return null; }
			
			@Override
			public String getRemoteAddress() { return null; }
			
			@Override
			public int getDispatchQueueSize() { return 0; }
			
			@Override
			public Connector getConnector() { return null; }
			
			@Override
			public String getConnectionId() { return null; }
			
			@Override
			public void dispatchSync(Command message) { }
			
			@Override
			public void dispatchAsync(Command command) { }
		};
		
		final ConnectionContext context = new ConnectionContext() {
			@Override
		    public Connection getConnection() {
		        return connection;
		    }
		}; 
		
		final Broker broker = new Broker() {
			
			@Override
			public void stop() throws Exception { }
			
			@Override
			public void start() throws Exception { }
			
			@Override
			public void send(ProducerBrokerExchange producerExchange, Message message)
					throws Exception { }
			
			@Override
			public void removeSubscription(ConnectionContext context,
					RemoveSubscriptionInfo info) throws Exception { }
			
			@Override
			public void removeDestination(ConnectionContext context,
					ActiveMQDestination destination, long timeout) throws Exception { }
			
			@Override
			public void removeConsumer(ConnectionContext context, ConsumerInfo info)
					throws Exception { }
			
			@Override
			public void processDispatchNotification(
					MessageDispatchNotification messageDispatchNotification)
					throws Exception { }
			
			@Override
			public void processConsumerControl(ConsumerBrokerExchange consumerExchange,
					ConsumerControl control) { }
			
			@Override
			public Response messagePull(ConnectionContext context, MessagePull pull)
					throws Exception { return null; }
			
			@Override
			public Set<Destination> getDestinations(ActiveMQDestination destination) { return
null; }
			
			@Override
			public Map<ActiveMQDestination, Destination> getDestinationMap() { return null; }
			
			@Override
			public void gc() { }
			
			@Override
			public Destination addDestination(ConnectionContext context,
					ActiveMQDestination destination, boolean createIfTemporary)
					throws Exception { return null; }
			
			@Override
			public Subscription addConsumer(ConnectionContext context, ConsumerInfo info)
					throws Exception { return null; }
			
			@Override
			public void acknowledge(ConsumerBrokerExchange consumerExchange,
					MessageAck ack) throws Exception { }
			
			@Override
			public void slowConsumer(ConnectionContext context,
					Destination destination, Subscription subs) { }
			
			@Override
			public void setAdminConnectionContext(
					ConnectionContext adminConnectionContext) { }
			
			@Override
			public void sendToDeadLetterQueue(ConnectionContext context,
					MessageReference messageReference, Subscription subscription) { }
			
			@Override
			public void rollbackTransaction(ConnectionContext context, TransactionId xid)
					throws Exception { }
			
			@Override
			public void removeSession(ConnectionContext context, SessionInfo info)
					throws Exception { }
			
			@Override
			public void removeProducer(ConnectionContext context, ProducerInfo info)
					throws Exception { }
			
			@Override
			public void removeDestinationInfo(ConnectionContext context,
					DestinationInfo info) throws Exception { }
			
			@Override
			public void removeConnection(ConnectionContext context,
					ConnectionInfo info, Throwable error) throws Exception { }
			
			@Override
			public void removeBroker(Connection connection, BrokerInfo info) { }
			
			@Override
			public int prepareTransaction(ConnectionContext context, TransactionId xid)
					throws Exception { return 0; }
			
			@Override
			public void preProcessDispatch(MessageDispatch messageDispatch) { }
			
			@Override
			public void postProcessDispatch(MessageDispatch messageDispatch) { }
			
			@Override
			public void nowMasterBroker() { }
			
			@Override
			public void networkBridgeStopped(BrokerInfo brokerInfo) { }
			
			@Override
			public void networkBridgeStarted(BrokerInfo brokerInfo,
					boolean createdByDuplex, String remoteIp) { }
			
			@Override
			public void messageExpired(ConnectionContext context,
					MessageReference messageReference, Subscription subscription) { }
			
			@Override
			public void messageDiscarded(ConnectionContext context, Subscription sub,
					MessageReference messageReference) { }
			
			@Override
			public void messageDelivered(ConnectionContext context,
					MessageReference messageReference) { }
			
			@Override
			public void messageConsumed(ConnectionContext context,
					MessageReference messageReference) { }
			
			@Override
			public boolean isStopped() { return false; }
			
			@Override
			public void isFull(ConnectionContext context, Destination destination,
					Usage usage) { }
			
			@Override
			public boolean isFaultTolerantConfiguration() { return false; }
			
			@Override
			public boolean isExpired(MessageReference messageReference) { return false; }
			
			@Override
			public URI getVmConnectorURI() { return null; }
			
			@Override
			public PListStore getTempDataStore() { return null; }
			
			@Override
			public Scheduler getScheduler() { return null; }
			
			@Override
			public Broker getRoot() { return null; }
			
			@Override
			public TransactionId[] getPreparedTransactions(ConnectionContext context)
					throws Exception { return null; }
			
			@Override
			public BrokerInfo[] getPeerBrokerInfos() { return null; }
			
			@Override
			public ThreadPoolExecutor getExecutor() { return null; }
			
			@Override
			public Set<ActiveMQDestination> getDurableDestinations() { return null; }
			
			@Override
			public ActiveMQDestination[] getDestinations() throws Exception { return null; }
			
			@Override
			public Connection[] getClients() throws Exception { return null; }
			
			@Override
			public BrokerService getBrokerService() { return null; }
			
			@Override
			public long getBrokerSequenceId() { return 0; }
			
			@Override
			public String getBrokerName() { return null; }
			
			@Override
			public BrokerId getBrokerId() { return null; }
			
			@Override
			public ConnectionContext getAdminConnectionContext() { return null; }
			
			@Override
			public Broker getAdaptor(Class type) { return null; }
			
			@Override
			public void forgetTransaction(ConnectionContext context,
					TransactionId transactionId) throws Exception { }
			
			@Override
			public void fastProducer(ConnectionContext context,
					ProducerInfo producerInfo) { }
			
			@Override
			public void commitTransaction(ConnectionContext context, TransactionId xid,
					boolean onePhase) throws Exception { }
			
			@Override
			public void brokerServiceStarted() { }
			
			@Override
			public void beginTransaction(ConnectionContext context, TransactionId xid)
					throws Exception { }
			
			@Override
			public void addSession(ConnectionContext context, SessionInfo info)
					throws Exception { }
			
			@Override
			public void addProducer(ConnectionContext context, ProducerInfo info)
					throws Exception { }
			
			@Override
			public void addDestinationInfo(ConnectionContext context,
					DestinationInfo info) throws Exception { }
			
			@Override
			public void addConnection(ConnectionContext context, ConnectionInfo info)
					throws Exception { }
			
			@Override
			public void addBroker(Connection connection, BrokerInfo info) { }
		};
		
		final ActiveMQDestination activeMQDestination = new ActiveMQDestination() {
			
			@Override
			public byte getDataStructureType() {
				return 0;
			}
			
			@Override
			protected String getQualifiedPrefix() {
				return null;
			}
			
			@Override
			public byte getDestinationType() {
				return 0;
			}
			
			@Override
			public String getPhysicalName() { 
				return "test";
			}
			
			@Override
			public boolean isComposite() {
				return false;
			}
		};
		
		final ConsumerInfo info = new ConsumerInfo() {
			@Override
			public ActiveMQDestination getDestination() {
				return activeMQDestination;
			}
		};
		

		toTest = new PrefetchSubscriptionStub(broker, usageManager, context, info);
	}
	
	private class PrefetchSubscriptionStub extends PrefetchSubscription {

		public PrefetchSubscriptionStub(Broker broker,
				SystemUsage usageManager, ConnectionContext context,
				ConsumerInfo info) throws InvalidSelectorException {
			super(broker, usageManager, context, info);
		}

		@Override
		public void destroy() {
			// TODO Auto-generated method stub

		}

		@Override
		protected boolean isDropped(MessageReference node) {
			// TODO Auto-generated method stub
			return false;
		}

		@Override
		protected boolean canDispatch(MessageReference node)
				throws IOException {
			// TODO Auto-generated method stub
			return false;
		}

		@Override
		protected void acknowledge(ConnectionContext context,
				MessageAck ack, MessageReference node) throws IOException {
			// TODO Auto-generated method stub

		}

		@Override
		public boolean isSlave() {
			return false;
		}

		@Override
		public int getPrefetchSize() {
			return 0;
		}

		@Override
		protected void onDispatch(final MessageReference node, final Message message) {
			
		}
		
		@Override
		protected void dispatchPending() throws IOException {
			
		}
		
		public int getPrefetchExtension() {
			return prefetchExtension;
		}
	}

	private void primeMessagesAndDestinations() {
		final Message message = new Message() {
			
			@Override
			public byte getDataStructureType() { return 0; }
			
			@Override
			public Response visit(CommandVisitor visitor) throws Exception { return null; }
			
			@Override
			public Message copy() { return null; }
			
			@Override
			public void clearBody() throws JMSException { }
		};
		final Destination destination = new Destination() {
			
			@Override
			public boolean iterate() { return false; }
			
			@Override
			public void stop() throws Exception { }
			
			@Override
			public void start() throws Exception { }
			
			@Override
			public void wakeup() { }
			
			@Override
			public void slowConsumer(ConnectionContext context, Subscription subs) { }
			
			@Override
			public void setUseCache(boolean useCache) { }
			
			@Override
			public void setProducerFlowControl(boolean value) { }
			
			@Override
			public void setMinimumMessageSize(int minimumMessageSize) { }
			
			@Override
			public void setMaxProducersToAudit(int maxProducersToAudit) { }
			
			@Override
			public void setMaxPageSize(int maxPageSize) { }
			
			@Override
			public void setMaxBrowsePageSize(int maxPageSize) { }
			
			@Override
			public void setMaxAuditDepth(int maxAuditDepth) { }
			
			@Override
			public void setLazyDispatch(boolean value) { }
			
			@Override
			public void setEnableAudit(boolean enableAudit) { }
			
			@Override
			public void setCursorMemoryHighWaterMark(int cursorMemoryHighWaterMark) { }
			
			@Override
			public void setBlockedProducerWarningInterval(
					long blockedProducerWarningInterval) { }
			
			@Override
			public void setAlwaysRetroactive(boolean value) { }
			
			@Override
			public void send(ProducerBrokerExchange producerExchange,
					Message messageSend) throws Exception { }
			
			@Override
			public void removeSubscription(ConnectionContext context, Subscription sub,
					long lastDeliveredSequenceId) throws Exception { }
			
			@Override
			public void removeProducer(ConnectionContext context, ProducerInfo info)
					throws Exception { }
			
			@Override
			public void processDispatchNotification(
					MessageDispatchNotification messageDispatchNotification)
					throws Exception { }
			
			@Override
			public void messageExpired(ConnectionContext context, Subscription subs,
					MessageReference node) { }
			
			@Override
			public void messageDiscarded(ConnectionContext context, Subscription sub,
					MessageReference messageReference) { }
			
			@Override
			public void messageDelivered(ConnectionContext context,
					MessageReference messageReference) { }
			
			@Override
			public void messageConsumed(ConnectionContext context,
					MessageReference messageReference) { 	}
			
			@Override
			public void markForGC(long timeStamp) { }
			
			@Override
			public boolean isUseCache() { return false; }
			
			@Override
			public boolean isProducerFlowControl() { return false; }
			
			@Override
			public boolean isPrioritizedMessages() { return false; }
			
			@Override
			public boolean isLazyDispatch() { return false; }
			
			@Override
			public void isFull(ConnectionContext context, Usage<?> usage) { }
			
			@Override
			public boolean isEnableAudit() { return false; }
			
			@Override
			public boolean isDisposed() { return false; }
			
			@Override
			public boolean isAlwaysRetroactive() { return false; }
			
			@Override
			public boolean isActive() { return false; }
			
			@Override
			public SlowConsumerStrategy getSlowConsumerStrategy() { return null; }
			
			@Override
			public String getName() { return null; }
			
			@Override
			public int getMinimumMessageSize() { return 0; }
			
			@Override
			public MessageStore getMessageStore() { return null; }
			
			@Override
			public MemoryUsage getMemoryUsage() { return null; }
			
			@Override
			public int getMaxProducersToAudit() { return 0; }
			
			@Override
			public int getMaxPageSize() { return 0; }
			
			@Override
			public int getMaxBrowsePageSize() { return 0; }
			
			@Override
			public int getMaxAuditDepth() { 	return 0; }
			
			@Override
			public long getInactiveTimoutBeforeGC() { return 0; }
			
			@Override
			public DestinationStatistics getDestinationStatistics() { return null; }
			
			@Override
			public DeadLetterStrategy getDeadLetterStrategy() { return null; }
			
			@Override
			public int getCursorMemoryHighWaterMark() { return 0; }
			
			@Override
			public List<Subscription> getConsumers() { return null; }
			
			@Override
			public long getBlockedProducerWarningInterval() { return 0; }
			
			@Override
			public ActiveMQDestination getActiveMQDestination() { return null; 	}
			
			@Override
			public void gc() { 	}
			
			@Override
			public void fastProducer(ConnectionContext context,
					ProducerInfo producerInfo) { }
			
			@Override
			public void dispose(ConnectionContext context) throws IOException { }
			
			@Override
			public boolean canGC() { return false; }
			
			@Override
			public Message[] browse() { return null; }
			
			@Override
			public void addSubscription(ConnectionContext context, Subscription sub)
					throws Exception { }
			
			@Override
			public void addProducer(ConnectionContext context, ProducerInfo info)
					throws Exception { }
			
			@Override
			public void acknowledge(ConnectionContext context, Subscription sub,
					MessageAck ack, MessageReference node) throws IOException { }
		};
		
		ack = new MessageAck() {

			@Override
			public boolean isDeliveredAck() {
				return true;
			}
			@Override
			public MessageId getLastMessageId() {
				return messageId;
			}
			
		};
		node = new MessageReference() {
			
			@Override
			public boolean isPersistent() {return false; }
			
			@Override
			public boolean isExpired() { 
				return false; 
			}
			
			@Override
			public boolean isDropped() { return false; }
			
			@Override
			public boolean isAdvisory() { return false; }
			
			@Override
			public int incrementReferenceCount() { return 0; }
			
			@Override
			public void incrementRedeliveryCounter() { }
			
			@Override
			public ConsumerId getTargetConsumerId() { return null; }
			
			@Override
			public int getSize() { return 0; }
			
			@Override
			public Destination getRegionDestination() { return destination; }
			
			@Override
			public int getReferenceCount() { return 0; }
			
			@Override
			public int getRedeliveryCounter() { return 0; 	}
			
			@Override
			public MessageId getMessageId() { return messageId; }
			
			@Override
			public Message getMessageHardRef() { return null; }
			
			@Override
			public Message getMessage() { return message; }
			
			@Override
			public int getGroupSequence() { return 0; 	}
			
			@Override
			public String getGroupID() { return null; }
			
			@Override
			public long getExpiration() { return 0; }
			
			@Override
			public int decrementReferenceCount() { return 0; }
		};
	}
}
{code}
                
> Different methods synchronizing on different mutexes when changing the same field
> ---------------------------------------------------------------------------------
>
>                 Key: AMQ-3732
>                 URL: https://issues.apache.org/jira/browse/AMQ-3732
>             Project: ActiveMQ
>          Issue Type: Bug
>          Components: Broker
>    Affects Versions: 5.5.1
>         Environment: Darwin phillip.local 9.8.0 Darwin Kernel Version 9.8.0: Wed Jul
15 16:55:01 PDT 2009; root:xnu-1228.15.4~1/RELEASE_I386 i386 i386
>            Reporter: Phillip Henry
>              Labels: concurrency
>
> org.apache.activemq.broker.region.PrefetchSubscription.prefetchExtension is changed while
guarded by a mutex on this (PrefetchSubscription) in PrefetchSubscription.pullMessage(...)
and PrefetchSubscription.dispatchLock in PrefetchSubscription.acknowledge(...). 
> This can lead to the corruption of the prefetchExtension variable (eg, prefetchExtension++
in pullMessage() is not an atomic operation so prefetchExtension may change in acknowledge()
mid-way through this operation).

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

Mime
View raw message