From commits-return-36371-apmail-qpid-commits-archive=qpid.apache.org@qpid.apache.org Fri Sep 9 14:49:48 2016 Return-Path: X-Original-To: apmail-qpid-commits-archive@www.apache.org Delivered-To: apmail-qpid-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id C801C198A5 for ; Fri, 9 Sep 2016 14:49:48 +0000 (UTC) Received: (qmail 70148 invoked by uid 500); 9 Sep 2016 14:49:48 -0000 Delivered-To: apmail-qpid-commits-archive@qpid.apache.org Received: (qmail 70114 invoked by uid 500); 9 Sep 2016 14:49:48 -0000 Mailing-List: contact commits-help@qpid.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@qpid.apache.org Delivered-To: mailing list commits@qpid.apache.org Received: (qmail 70105 invoked by uid 99); 9 Sep 2016 14:49:48 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 09 Sep 2016 14:49:48 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id 02A4DC2E2D for ; Fri, 9 Sep 2016 14:49:48 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 0.686 X-Spam-Level: X-Spam-Status: No, score=0.686 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RP_MATCHES_RCVD=-1.124, T_FILL_THIS_FORM_SHORT=0.01] autolearn=disabled Received: from mx2-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id FA_A31vL8xTE for ; Fri, 9 Sep 2016 14:49:44 +0000 (UTC) Received: from mailrelay1-us-west.apache.org (mailrelay1-us-west.apache.org [209.188.14.139]) by mx2-lw-eu.apache.org (ASF Mail Server at mx2-lw-eu.apache.org) with ESMTP id 74CD85FB14 for ; Fri, 9 Sep 2016 14:49:43 +0000 (UTC) Received: from svn01-us-west.apache.org (svn.apache.org [10.41.0.6]) by mailrelay1-us-west.apache.org (ASF Mail Server at mailrelay1-us-west.apache.org) with ESMTP id 858C1E0099 for ; Fri, 9 Sep 2016 14:49:42 +0000 (UTC) Received: from svn01-us-west.apache.org (localhost [127.0.0.1]) by svn01-us-west.apache.org (ASF Mail Server at svn01-us-west.apache.org) with ESMTP id 3C0053A05B6 for ; Fri, 9 Sep 2016 14:49:42 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1760032 - in /qpid/java/trunk: broker-core/src/main/java/org/apache/qpid/server/consumer/ broker-core/src/main/java/org/apache/qpid/server/message/ broker-core/src/main/java/org/apache/qpid/server/queue/ broker-core/src/main/java/org/apach... Date: Fri, 09 Sep 2016 14:49:41 -0000 To: commits@qpid.apache.org From: kwall@apache.org X-Mailer: svnmailer-1.0.9 Message-Id: <20160909144942.3C0053A05B6@svn01-us-west.apache.org> Author: kwall Date: Fri Sep 9 14:49:41 2016 New Revision: 1760032 URL: http://svn.apache.org/viewvc?rev=1760032&view=rev Log: QPID-7417: [Java Broker] Ensure message instance listeners only fire on state change of the associated object Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/DefinedGroupMessageGroupManager.java qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/util/StateChangeListener.java qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/util/StateChangeListenerEntry.java qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/ConsumerListTest.java qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapTest.java qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/client/FlowControlTest.java Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java?rev=1760032&r1=1760031&r2=1760032&view=diff ============================================================================== --- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java (original) +++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java Fri Sep 9 14:49:41 2016 @@ -226,7 +226,7 @@ public abstract class AbstractConsumerTa if (consumer.acquires()) { - entry.unlockAcquisition(); + entry.makeAcquisitionStealable(); } } finally Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java?rev=1760032&r1=1760031&r2=1760032&view=diff ============================================================================== --- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java (original) +++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java Fri Sep 9 14:49:41 2016 @@ -44,9 +44,9 @@ public interface MessageInstance void decrementDeliveryCount(); - void addStateChangeListener(StateChangeListener listener); + void addStateChangeListener(StateChangeListener listener); - boolean removeStateChangeListener(StateChangeListener listener); + boolean removeStateChangeListener(StateChangeListener listener); boolean acquiredByConsumer(); @@ -70,9 +70,9 @@ public interface MessageInstance boolean acquire(ConsumerImpl sub); - boolean lockAcquisition(final ConsumerImpl consumer); + boolean makeAcquisitionUnstealable(final ConsumerImpl consumer); - boolean unlockAcquisition(); + boolean makeAcquisitionStealable(); int getMaximumDeliveryCount(); @@ -84,7 +84,7 @@ public interface MessageInstance MessageEnqueueRecord getEnqueueRecord(); - public static enum State + enum State { AVAILABLE, ACQUIRED, @@ -92,7 +92,7 @@ public interface MessageInstance DELETED } - public abstract class EntryState + abstract class EntryState { protected EntryState() { @@ -114,7 +114,7 @@ public interface MessageInstance } - public final class AvailableState extends EntryState + final class AvailableState extends EntryState { public State getState() @@ -129,7 +129,7 @@ public interface MessageInstance } - public final class DequeuedState extends EntryState + final class DequeuedState extends EntryState { public State getState() @@ -144,7 +144,7 @@ public interface MessageInstance } - public final class DeletedState extends EntryState + final class DeletedState extends EntryState { public State getState() @@ -158,7 +158,7 @@ public interface MessageInstance } } - public final class NonConsumerAcquiredState extends EntryState + final class NonConsumerAcquiredState extends EntryState { public State getState() { @@ -171,76 +171,72 @@ public interface MessageInstance } } - public final class ConsumerAcquiredState extends EntryState + abstract class ConsumerAcquiredState extends EntryState { - private final C _consumer; - private final LockedAcquiredState _lockedState; + public abstract C getConsumer(); - public ConsumerAcquiredState(C consumer) + @Override + public final State getState() { - _consumer = consumer; - _lockedState = new LockedAcquiredState<>(this); + return State.ACQUIRED; } - - public State getState() + @Override + public String toString() { - return State.ACQUIRED; + return "{" + getState().name() + " : " + getConsumer() +"}"; } + } - public C getConsumer() + final class StealableConsumerAcquiredState extends ConsumerAcquiredState + { + private final C _consumer; + private final UnstealableConsumerAcquiredState _unstealableState; + + public StealableConsumerAcquiredState(C consumer) { - return _consumer; + _consumer = consumer; + _unstealableState = new UnstealableConsumerAcquiredState<>(this); } - public String toString() + @Override + public C getConsumer() { - return "{" + getState().name() + " : " + _consumer +"}"; + return _consumer; } - public LockedAcquiredState getLockedState() + public UnstealableConsumerAcquiredState getUnstealableState() { - return _lockedState; + return _unstealableState; } - } - public final class LockedAcquiredState extends EntryState + final class UnstealableConsumerAcquiredState extends ConsumerAcquiredState { - private final ConsumerAcquiredState _acquiredState; + private final StealableConsumerAcquiredState _stealableState; - public LockedAcquiredState(final ConsumerAcquiredState acquiredState) + public UnstealableConsumerAcquiredState(final StealableConsumerAcquiredState stealableState) { - _acquiredState = acquiredState; + _stealableState = stealableState; } @Override - public State getState() - { - return State.ACQUIRED; - } - public C getConsumer() { - return _acquiredState.getConsumer(); - } - - public String toString() - { - return "{" + getState().name() + " : " + _acquiredState.getConsumer() +"}"; + return _stealableState.getConsumer(); } - public ConsumerAcquiredState getUnlockedState() + public StealableConsumerAcquiredState getStealableState() { - return _acquiredState; + return _stealableState; } } - final static EntryState AVAILABLE_STATE = new AvailableState(); - final static EntryState DELETED_STATE = new DeletedState(); - final static EntryState DEQUEUED_STATE = new DequeuedState(); - final static EntryState NON_CONSUMER_ACQUIRED_STATE = new NonConsumerAcquiredState(); + EntryState AVAILABLE_STATE = new AvailableState(); + EntryState DELETED_STATE = new DeletedState(); + EntryState DEQUEUED_STATE = new DequeuedState(); + EntryState NON_CONSUMER_ACQUIRED_STATE = new NonConsumerAcquiredState(); boolean isAvailable(); Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/DefinedGroupMessageGroupManager.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/DefinedGroupMessageGroupManager.java?rev=1760032&r1=1760031&r2=1760032&view=diff ============================================================================== --- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/DefinedGroupMessageGroupManager.java (original) +++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/DefinedGroupMessageGroupManager.java Fri Sep 9 14:49:41 2016 @@ -22,6 +22,8 @@ package org.apache.qpid.server.queue; import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.message.MessageInstance; +import org.apache.qpid.server.message.MessageInstance.ConsumerAcquiredState; +import org.apache.qpid.server.message.MessageInstance.EntryState; import org.apache.qpid.server.util.StateChangeListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -172,7 +174,7 @@ public class DefinedGroupMessageGroupMan _groupMap.put(groupId, group); - // there's a small change that the group became empty between the point at which getNextAvailable() was + // there's a small chance that the group became empty between the point at which getNextAvailable() was // called on the consumer, and when accept message is called... in that case we want to avoid delivering // out of order if(_resetHelper.isEntryAheadOfConsumer(entry, sub)) @@ -256,7 +258,7 @@ public class DefinedGroupMessageGroupMan return groupVal; } - private class GroupStateChangeListener implements StateChangeListener + private class GroupStateChangeListener implements StateChangeListener { private final Group _group; @@ -265,24 +267,20 @@ public class DefinedGroupMessageGroupMan _group = group; } - public void stateChanged(final MessageInstance entry, - final MessageInstance.State oldState, - final MessageInstance.State newState) + @Override + public void stateChanged(final MessageInstance entry, final EntryState oldState, final EntryState newState) { synchronized (DefinedGroupMessageGroupManager.this) { if(_group.isValid()) { - if(oldState != newState) + if (isConsumerAcquiredStateForThisGroup(newState) && !isConsumerAcquiredStateForThisGroup(oldState)) + { + _group.add(); + } + else if (isConsumerAcquiredStateForThisGroup(oldState) && !isConsumerAcquiredStateForThisGroup(newState)) { - if(newState == QueueEntry.State.ACQUIRED) - { - _group.add(); - } - else if(oldState == QueueEntry.State.ACQUIRED) - { - _group.subtract((QueueEntry) entry, newState == MessageInstance.State.AVAILABLE); - } + _group.subtract((QueueEntry) entry, newState.getState() == MessageInstance.State.AVAILABLE); } } else @@ -291,5 +289,11 @@ public class DefinedGroupMessageGroupMan } } } + + private boolean isConsumerAcquiredStateForThisGroup(EntryState state) + { + return state instanceof ConsumerAcquiredState + && ((ConsumerAcquiredState) state).getConsumer() == _group.getConsumer(); + } } } Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java?rev=1760032&r1=1760031&r2=1760032&view=diff ============================================================================== --- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java (original) +++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java Fri Sep 9 14:49:41 2016 @@ -46,7 +46,7 @@ public interface QueueConsumer getOwningState(); + MessageInstance.StealableConsumerAcquiredState getOwningState(); QueueContext getQueueContext(); Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java?rev=1760032&r1=1760031&r2=1760032&view=diff ============================================================================== --- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java (original) +++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java Fri Sep 9 14:49:41 2016 @@ -71,7 +71,8 @@ class QueueConsumerImpl private final AtomicBoolean _closed = new AtomicBoolean(false); private final long _consumerNumber; private final long _createTime = System.currentTimeMillis(); - private final MessageInstance.ConsumerAcquiredState _owningState = new MessageInstance.ConsumerAcquiredState(this); + private final MessageInstance.StealableConsumerAcquiredState + _owningState = new MessageInstance.StealableConsumerAcquiredState<>(this); private final WaitingOnCreditMessageListener _waitingOnCreditMessageListener = new WaitingOnCreditMessageListener(); private final boolean _acquires; private final boolean _seesRequeues; @@ -538,7 +539,7 @@ class QueueConsumerImpl return _createTime; } - public final MessageInstance.ConsumerAcquiredState getOwningState() + public final MessageInstance.StealableConsumerAcquiredState getOwningState() { return _owningState; } @@ -644,7 +645,7 @@ class QueueConsumerImpl return _queue.getEventLogger(); } - public class WaitingOnCreditMessageListener implements StateChangeListener + public class WaitingOnCreditMessageListener implements StateChangeListener { private final AtomicReference _entry = new AtomicReference<>(); @@ -675,7 +676,8 @@ class QueueConsumerImpl } - public void stateChanged(MessageInstance entry, QueueEntry.State oldSate, QueueEntry.State newState) + @Override + public void stateChanged(MessageInstance entry, MessageInstance.EntryState oldState, MessageInstance.EntryState newState) { entry.removeStateChangeListener(this); _entry.compareAndSet(entry, null); Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java?rev=1760032&r1=1760031&r2=1760032&view=diff ============================================================================== --- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java (original) +++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java Fri Sep 9 14:49:41 2016 @@ -81,7 +81,7 @@ public abstract class QueueEntryImpl imp (QueueEntryImpl.class, EntryState.class, "_state"); - private volatile StateChangeListenerEntry _stateChangeListeners; + private volatile StateChangeListenerEntry _stateChangeListeners; private static final AtomicReferenceFieldUpdater @@ -224,7 +224,7 @@ public abstract class QueueEntryImpl imp { return acquire(NON_CONSUMER_ACQUIRED_STATE); } - private class DelayedAcquisitionStateListener implements StateChangeListener + private class DelayedAcquisitionStateListener implements StateChangeListener { private final Runnable _task; private final AtomicBoolean _run = new AtomicBoolean(); @@ -235,13 +235,13 @@ public abstract class QueueEntryImpl imp } @Override - public void stateChanged(final MessageInstance object, final State oldState, final State newState) + public void stateChanged(final MessageInstance object, final EntryState oldState, final EntryState newState) { - if(newState == State.DELETED || newState == State.DEQUEUED) + if (newState.equals(DELETED_STATE) || newState.equals(DEQUEUED_STATE)) { QueueEntryImpl.this.removeStateChangeListener(this); } - else if(acquireOrSteal(null)) + else if (acquireOrSteal(null)) { runTask(); } @@ -288,7 +288,7 @@ public abstract class QueueEntryImpl imp EntryState currentState; - while((currentState = _state).getState() == State.AVAILABLE) + while((currentState = _state).equals(AVAILABLE_STATE)) { if(acquired = _stateUpdater.compareAndSet(this, currentState, state)) { @@ -298,7 +298,7 @@ public abstract class QueueEntryImpl imp if(acquired && _stateChangeListeners != null) { - notifyStateChange(State.AVAILABLE, State.ACQUIRED); + notifyStateChange(AVAILABLE_STATE, state); } return acquired; @@ -306,7 +306,7 @@ public abstract class QueueEntryImpl imp public boolean acquire(ConsumerImpl sub) { - final boolean acquired = acquire(((QueueConsumer) sub).getOwningState().getLockedState()); + final boolean acquired = acquire(((QueueConsumer) sub).getOwningState().getUnstealableState()); if(acquired) { _deliveryCountUpdater.compareAndSet(this,-1,0); @@ -316,33 +316,35 @@ public abstract class QueueEntryImpl imp } @Override - public boolean lockAcquisition(final ConsumerImpl consumer) + public boolean makeAcquisitionUnstealable(final ConsumerImpl consumer) { EntryState state = _state; - if(state instanceof ConsumerAcquiredState && ((ConsumerAcquiredState) state).getConsumer() == consumer) + if(state instanceof StealableConsumerAcquiredState + && ((StealableConsumerAcquiredState) state).getConsumer() == consumer) { - LockedAcquiredState lockedState = ((ConsumerAcquiredState) state).getLockedState(); - boolean updated = _stateUpdater.compareAndSet(this, state, lockedState); + UnstealableConsumerAcquiredState unstealableState = ((StealableConsumerAcquiredState) state).getUnstealableState(); + boolean updated = _stateUpdater.compareAndSet(this, state, unstealableState); if(updated) { - notifyStateChange(state.getState(), lockedState.getState()); + notifyStateChange(state, unstealableState); } return updated; } - return state instanceof LockedAcquiredState && ((LockedAcquiredState) state).getConsumer() == consumer; + return state instanceof UnstealableConsumerAcquiredState + && ((UnstealableConsumerAcquiredState) state).getConsumer() == consumer; } @Override - public boolean unlockAcquisition() + public boolean makeAcquisitionStealable() { EntryState state = _state; - if(state instanceof LockedAcquiredState) + if(state instanceof UnstealableConsumerAcquiredState) { - ConsumerAcquiredState unlockedState = ((LockedAcquiredState) state).getUnlockedState(); - boolean updated = _stateUpdater.compareAndSet(this, state, unlockedState); + StealableConsumerAcquiredState stealableState = ((UnstealableConsumerAcquiredState) state).getStealableState(); + boolean updated = _stateUpdater.compareAndSet(this, state, stealableState); if(updated) { - notifyStateChange(state.getState(),unlockedState.getState()); + notifyStateChange(state, stealableState); } return updated; } @@ -351,8 +353,7 @@ public abstract class QueueEntryImpl imp public boolean acquiredByConsumer() { - - return (_state instanceof ConsumerAcquiredState) || (_state instanceof LockedAcquiredState); + return _state instanceof ConsumerAcquiredState; } @Override @@ -360,14 +361,10 @@ public abstract class QueueEntryImpl imp { ConsumerImpl consumer; EntryState state = _state; - if(state instanceof ConsumerAcquiredState) + if (state instanceof ConsumerAcquiredState) { consumer = ((ConsumerAcquiredState)state).getConsumer(); } - else if(state instanceof LockedAcquiredState) - { - consumer = ((LockedAcquiredState)state).getConsumer(); - } else { consumer = null; @@ -379,20 +376,22 @@ public abstract class QueueEntryImpl imp public boolean isAcquiredBy(ConsumerImpl consumer) { EntryState state = _state; - return (state instanceof ConsumerAcquiredState - && ((ConsumerAcquiredState)state).getConsumer() == consumer) - || (state instanceof LockedAcquiredState - && ((LockedAcquiredState)state).getConsumer() == consumer); + return (state instanceof ConsumerAcquiredState && ((ConsumerAcquiredState)state).getConsumer() == consumer); } @Override public boolean removeAcquisitionFromConsumer(ConsumerImpl consumer) { EntryState state = _state; - if(state instanceof ConsumerAcquiredState - && ((ConsumerAcquiredState)state).getConsumer() == consumer) + if(state instanceof StealableConsumerAcquiredState + && ((StealableConsumerAcquiredState)state).getConsumer() == consumer) { - return _stateUpdater.compareAndSet(this,state,NON_CONSUMER_ACQUIRED_STATE); + final boolean stateWasChanged = _stateUpdater.compareAndSet(this, state, NON_CONSUMER_ACQUIRED_STATE); + if (stateWasChanged) + { + notifyStateChange(state, NON_CONSUMER_ACQUIRED_STATE); + } + return stateWasChanged; } else { @@ -423,7 +422,7 @@ public abstract class QueueEntryImpl imp private void postRelease(final EntryState previousState) { - if(previousState instanceof ConsumerAcquiredState || previousState instanceof LockedAcquiredState) + if (previousState instanceof ConsumerAcquiredState) { getQueue().decrementUnackedMsgCount(this); } @@ -431,9 +430,9 @@ public abstract class QueueEntryImpl imp if(!getQueue().isDeleted()) { getQueue().requeue(this); - if(_stateChangeListeners != null && previousState.getState() == State.ACQUIRED) + if (_stateChangeListeners != null && previousState.getState() == State.ACQUIRED) { - notifyStateChange(State.ACQUIRED, State.AVAILABLE); + notifyStateChange(previousState, AVAILABLE_STATE); } } @@ -478,19 +477,7 @@ public abstract class QueueEntryImpl imp @Override public QueueConsumer getDeliveredConsumer() { - EntryState state = _state; - if (state instanceof ConsumerAcquiredState) - { - return (QueueConsumer) ((ConsumerAcquiredState) state).getConsumer(); - } - else if (state instanceof LockedAcquiredState) - { - return (QueueConsumer) ((LockedAcquiredState) state).getConsumer(); - } - else - { - return null; - } + return (QueueConsumer) getAcquiringConsumer(); } public void reject() @@ -536,7 +523,7 @@ public abstract class QueueEntryImpl imp if(state.getState() == State.ACQUIRED) { - if (state instanceof ConsumerAcquiredState || state instanceof LockedAcquiredState) + if (state instanceof ConsumerAcquiredState) { getQueue().decrementUnackedMsgCount(this); } @@ -544,7 +531,7 @@ public abstract class QueueEntryImpl imp getQueue().dequeue(this); if(_stateChangeListeners != null) { - notifyStateChange(state.getState() , QueueEntry.State.DEQUEUED); + notifyStateChange(state, DEQUEUED_STATE); } return true; } @@ -555,12 +542,12 @@ public abstract class QueueEntryImpl imp } - private void notifyStateChange(final State oldState, final State newState) + private void notifyStateChange(final EntryState oldState, final EntryState newState) { - StateChangeListenerEntry entry = _listenersUpdater.get(this); + StateChangeListenerEntry entry = _listenersUpdater.get(this); while(entry != null) { - StateChangeListener l = entry.getListener(); + StateChangeListener l = entry.getListener(); if(l != null) { l.stateChanged(this, oldState, newState); @@ -651,16 +638,16 @@ public abstract class QueueEntryImpl imp return getQueue().isDeleted(); } - public void addStateChangeListener(StateChangeListener listener) + public void addStateChangeListener(StateChangeListener listener) { - StateChangeListenerEntry entry = new StateChangeListenerEntry<>(listener); + StateChangeListenerEntry entry = new StateChangeListenerEntry<>(listener); if(!_listenersUpdater.compareAndSet(this,null, entry)) { _listenersUpdater.get(this).add(entry); } } - public boolean removeStateChangeListener(StateChangeListener listener) + public boolean removeStateChangeListener(StateChangeListener listener) { StateChangeListenerEntry entry = _listenersUpdater.get(this); if(entry != null) Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/util/StateChangeListener.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/util/StateChangeListener.java?rev=1760032&r1=1760031&r2=1760032&view=diff ============================================================================== --- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/util/StateChangeListener.java (original) +++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/util/StateChangeListener.java Fri Sep 9 14:49:41 2016 @@ -20,7 +20,7 @@ */ package org.apache.qpid.server.util; -public interface StateChangeListener +public interface StateChangeListener { void stateChanged(T object, E oldState, E newState); } Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/util/StateChangeListenerEntry.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/util/StateChangeListenerEntry.java?rev=1760032&r1=1760031&r2=1760032&view=diff ============================================================================== --- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/util/StateChangeListenerEntry.java (original) +++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/util/StateChangeListenerEntry.java Fri Sep 9 14:49:41 2016 @@ -22,7 +22,7 @@ package org.apache.qpid.server.util; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; -public class StateChangeListenerEntry +public class StateChangeListenerEntry { private static final AtomicReferenceFieldUpdater NEXT = AtomicReferenceFieldUpdater.newUpdater(StateChangeListenerEntry.class, StateChangeListenerEntry.class, "_next"); Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java?rev=1760032&r1=1760031&r2=1760032&view=diff ============================================================================== --- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java (original) +++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java Fri Sep 9 14:49:41 2016 @@ -356,13 +356,13 @@ public abstract class AbstractSystemMess } @Override - public void addStateChangeListener(final StateChangeListener listener) + public void addStateChangeListener(final StateChangeListener listener) { } @Override - public boolean removeStateChangeListener(final StateChangeListener listener) + public boolean removeStateChangeListener(final StateChangeListener listener) { return false; } @@ -447,13 +447,13 @@ public abstract class AbstractSystemMess } @Override - public boolean lockAcquisition(final ConsumerImpl consumer) + public boolean makeAcquisitionUnstealable(final ConsumerImpl consumer) { return false; } @Override - public boolean unlockAcquisition() + public boolean makeAcquisitionStealable() { return false; } Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java?rev=1760032&r1=1760031&r2=1760032&view=diff ============================================================================== --- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java (original) +++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java Fri Sep 9 14:49:41 2016 @@ -412,12 +412,12 @@ abstract class AbstractQueueTestBase ext QueueEntry queueEntry = queueEntries.get(0); final CountDownLatch dequeueIndicator = new CountDownLatch(1); - queueEntry.addStateChangeListener(new StateChangeListener() + queueEntry.addStateChangeListener(new StateChangeListener() { @Override - public void stateChanged(MessageInstance object, MessageInstance.State oldState, MessageInstance.State newState) + public void stateChanged(MessageInstance object, MessageInstance.EntryState oldState, MessageInstance.EntryState newState) { - if (newState == MessageInstance.State.DEQUEUED) + if (newState.equals(MessageInstance.DEQUEUED_STATE)) { dequeueIndicator.countDown(); } Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/ConsumerListTest.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/ConsumerListTest.java?rev=1760032&r1=1760031&r2=1760032&view=diff ============================================================================== --- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/ConsumerListTest.java (original) +++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/ConsumerListTest.java Fri Sep 9 14:49:41 2016 @@ -55,7 +55,8 @@ public class ConsumerListTest extends Qp private QueueConsumer newMockConsumer() { QueueConsumer consumer = mock(QueueConsumer.class); - MessageInstance.ConsumerAcquiredState owningState = new QueueEntryImpl.ConsumerAcquiredState(consumer); + MessageInstance.StealableConsumerAcquiredState + owningState = new MessageInstance.StealableConsumerAcquiredState(consumer); when(consumer.getOwningState()).thenReturn(owningState); return consumer; Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java?rev=1760032&r1=1760031&r2=1760032&view=diff ============================================================================== --- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java (original) +++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java Fri Sep 9 14:49:41 2016 @@ -101,13 +101,13 @@ public class MockMessageInstance impleme } @Override - public boolean lockAcquisition(final ConsumerImpl consumer) + public boolean makeAcquisitionUnstealable(final ConsumerImpl consumer) { return false; } @Override - public boolean unlockAcquisition() + public boolean makeAcquisitionStealable() { return false; } @@ -221,13 +221,13 @@ public class MockMessageInstance impleme } @Override - public void addStateChangeListener(final StateChangeListener listener) + public void addStateChangeListener(final StateChangeListener listener) { } @Override - public boolean removeStateChangeListener(final StateChangeListener listener) + public boolean removeStateChangeListener(final StateChangeListener listener) { return false; } Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java?rev=1760032&r1=1760031&r2=1760032&view=diff ============================================================================== --- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java (original) +++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java Fri Sep 9 14:49:41 2016 @@ -18,8 +18,12 @@ */ package org.apache.qpid.server.queue; +import static org.apache.qpid.server.message.MessageInstance.NON_CONSUMER_ACQUIRED_STATE; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; +import static org.mockito.Matchers.isA; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import java.lang.reflect.Field; @@ -33,6 +37,8 @@ import org.apache.qpid.server.configurat import org.apache.qpid.server.logging.EventLogger; import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.MessageInstance.EntryState; +import org.apache.qpid.server.message.MessageInstance.StealableConsumerAcquiredState; +import org.apache.qpid.server.message.MessageInstance.UnstealableConsumerAcquiredState; import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.model.BrokerModel; @@ -41,6 +47,7 @@ import org.apache.qpid.server.model.Conf import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.store.TransactionLogResource; +import org.apache.qpid.server.util.StateChangeListener; import org.apache.qpid.test.utils.QpidTestCase; /** @@ -135,12 +142,31 @@ public abstract class QueueEntryImplTest { final QueueConsumer consumer = mock(QueueConsumer.class); - MessageInstance.ConsumerAcquiredState owningState = new QueueEntryImpl.ConsumerAcquiredState(consumer); + StealableConsumerAcquiredState + owningState = new StealableConsumerAcquiredState(consumer); when(consumer.getOwningState()).thenReturn(owningState); when(consumer.getConsumerNumber()).thenReturn(_consumerId++); return consumer; } + public void testStateChanges() + { + QueueConsumer consumer = newConsumer(); + StateChangeListener stateChangeListener = mock(StateChangeListener.class); + _queueEntry.addStateChangeListener(stateChangeListener); + _queueEntry.acquire(consumer); + verify(stateChangeListener).stateChanged(eq(_queueEntry), + eq(MessageInstance.AVAILABLE_STATE), + isA(UnstealableConsumerAcquiredState.class)); + _queueEntry.makeAcquisitionStealable(); + verify(stateChangeListener).stateChanged(eq(_queueEntry), + isA(UnstealableConsumerAcquiredState.class), + isA(StealableConsumerAcquiredState.class)); + _queueEntry.removeAcquisitionFromConsumer(consumer); + verify(stateChangeListener).stateChanged(eq(_queueEntry), + isA(StealableConsumerAcquiredState.class), + eq(NON_CONSUMER_ACQUIRED_STATE)); + } public void testLocking() { @@ -152,7 +178,7 @@ public abstract class QueueEntryImplTest _queueEntry.isAcquired()); assertFalse("Acquisition should initially be locked",_queueEntry.removeAcquisitionFromConsumer(consumer)); - assertTrue("Should be able to unlock locked queue entry", _queueEntry.unlockAcquisition()); + assertTrue("Should be able to unlock locked queue entry", _queueEntry.makeAcquisitionStealable()); assertFalse("Acquisition should not be able to be removed from the wrong consumer", _queueEntry.removeAcquisitionFromConsumer(consumer2)); assertTrue("Acquisition should be able to be removed once unlocked", @@ -169,8 +195,8 @@ public abstract class QueueEntryImplTest _queueEntry.isAcquired()); assertFalse("Acquisition should initially be locked",_queueEntry.removeAcquisitionFromConsumer(consumer)); - assertTrue("Should be able to unlock locked queue entry",_queueEntry.unlockAcquisition()); - assertTrue("Should be able to lock queue entry",_queueEntry.lockAcquisition(consumer)); + assertTrue("Should be able to unlock locked queue entry",_queueEntry.makeAcquisitionStealable()); + assertTrue("Should be able to lock queue entry",_queueEntry.makeAcquisitionUnstealable(consumer)); assertFalse("Acquisition should not be able to be hijacked when locked",_queueEntry.removeAcquisitionFromConsumer(consumer)); _queueEntry.delete(); @@ -185,10 +211,10 @@ public abstract class QueueEntryImplTest _queueEntry.acquire(consumer1); assertTrue("Queue entry should be acquired by consumer1", _queueEntry.acquiredByConsumer()); - assertTrue("Consumer1 relocking should be allowed", _queueEntry.lockAcquisition(consumer1)); - assertFalse("Consumer2 should not be allowed", _queueEntry.lockAcquisition(consumer2)); + assertTrue("Consumer1 relocking should be allowed", _queueEntry.makeAcquisitionUnstealable(consumer1)); + assertFalse("Consumer2 should not be allowed", _queueEntry.makeAcquisitionUnstealable(consumer2)); - _queueEntry.unlockAcquisition(); + _queueEntry.makeAcquisitionStealable(); assertTrue("Queue entry should still be acquired by consumer1", _queueEntry.acquiredByConsumer()); Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java?rev=1760032&r1=1760031&r2=1760032&view=diff ============================================================================== --- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java (original) +++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java Fri Sep 9 14:49:41 2016 @@ -354,13 +354,13 @@ public class StandardQueueTest extends A } @Override - public boolean lockAcquisition(final ConsumerImpl consumer) + public boolean makeAcquisitionUnstealable(final ConsumerImpl consumer) { return true; } @Override - public boolean unlockAcquisition() + public boolean makeAcquisitionStealable() { return true; } Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java?rev=1760032&r1=1760031&r2=1760032&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java (original) +++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java Fri Sep 9 14:49:41 2016 @@ -38,6 +38,8 @@ import org.apache.qpid.server.flow.FlowC import org.apache.qpid.server.logging.EventLogger; import org.apache.qpid.server.logging.messages.ChannelMessages; import org.apache.qpid.server.message.MessageInstance; +import org.apache.qpid.server.message.MessageInstance.ConsumerAcquiredState; +import org.apache.qpid.server.message.MessageInstance.EntryState; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.model.Exchange; import org.apache.qpid.server.model.Queue; @@ -88,17 +90,24 @@ public class ConsumerTarget_0_10 extends private long _deferredSizeCredit; private final List _consumers = new CopyOnWriteArrayList<>(); - private final StateChangeListener _unacknowledgedMessageListener = new StateChangeListener() + private final StateChangeListener _unacknowledgedMessageListener = new StateChangeListener() { - public void stateChanged(MessageInstance entry, MessageInstance.State oldState, MessageInstance.State newState) + @Override + public void stateChanged(MessageInstance entry, EntryState oldState, EntryState newState) { - if(oldState == MessageInstance.State.ACQUIRED && newState != MessageInstance.State.ACQUIRED) + if (isConsumerAcquiredStateForThis(oldState) && !isConsumerAcquiredStateForThis(newState)) { removeUnacknowledgedMessage(entry); entry.removeStateChangeListener(this); } } + + private boolean isConsumerAcquiredStateForThis(EntryState state) + { + return state instanceof ConsumerAcquiredState + && ((ConsumerAcquiredState) state).getConsumer().getTarget() == ConsumerTarget_0_10.this; + } }; public ConsumerTarget_0_10(ServerSession session, @@ -389,10 +398,6 @@ public class ConsumerTarget_0_10 extends @Override public void acquisitionRemoved(final MessageInstance entry) { - if (entry.removeStateChangeListener(_unacknowledgedMessageListener)) - { - removeUnacknowledgedMessage(entry); - } } private void deferredAddCredit(final int deferredMessageCredit, final long deferredSizeCredit) @@ -439,7 +444,7 @@ public class ConsumerTarget_0_10 extends void reject(final ConsumerImpl consumer, final MessageInstance entry) { entry.setRedelivered(); - if (entry.lockAcquisition(consumer)) + if (entry.makeAcquisitionUnstealable(consumer)) { entry.routeToAlternate(null, null); } @@ -474,7 +479,7 @@ public class ConsumerTarget_0_10 extends final ServerMessage msg = entry.getMessage(); int requeues = 0; - if (entry.lockAcquisition(consumer)) + if (entry.makeAcquisitionUnstealable(consumer)) { requeues = entry.routeToAlternate(new Action() { Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java?rev=1760032&r1=1760031&r2=1760032&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java (original) +++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java Fri Sep 9 14:49:41 2016 @@ -533,7 +533,7 @@ public class ServerSession extends Sessi final ConsumerTarget_0_10 target, final MessageInstance entry) { - if (entry.lockAcquisition(consumer)) + if (entry.makeAcquisitionUnstealable(consumer)) { _transaction.dequeue(entry.getEnqueueRecord(), new ServerTransaction.Action() Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java?rev=1760032&r1=1760031&r2=1760032&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java (original) +++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java Fri Sep 9 14:49:41 2016 @@ -1637,7 +1637,7 @@ public class AMQChannel { for(MessageInstance entry : _ackedMessages) { - entry.unlockAcquisition(); + entry.makeAcquisitionStealable(); } _resendList.addAll(_ackedMessages); } @@ -1795,7 +1795,7 @@ public class AMQChannel { final ServerMessage msg = rejectedQueueEntry.getMessage(); int requeues = 0; - if (rejectedQueueEntry.lockAcquisition(rejectedQueueEntry.getAcquiringConsumer())) + if (rejectedQueueEntry.makeAcquisitionUnstealable(rejectedQueueEntry.getAcquiringConsumer())) { requeues = rejectedQueueEntry.routeToAlternate(new Action() { Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java?rev=1760032&r1=1760031&r2=1760032&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java (original) +++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java Fri Sep 9 14:49:41 2016 @@ -34,6 +34,7 @@ import org.apache.qpid.server.consumer.C import org.apache.qpid.server.flow.FlowCreditManager; import org.apache.qpid.server.message.InstanceProperties; import org.apache.qpid.server.message.MessageInstance; +import org.apache.qpid.server.message.MessageInstance.EntryState; import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.protocol.AMQSessionModel; @@ -545,10 +546,6 @@ public abstract class ConsumerTarget_0_8 @Override public void acquisitionRemoved(final MessageInstance node) { - if (node.removeStateChangeListener(_unacknowledgedMessageListener)) - { - removeUnacknowledgedMessage(node); - } } public long getUnacknowledgedBytes() @@ -561,17 +558,22 @@ public abstract class ConsumerTarget_0_8 return _unacknowledgedCount.longValue(); } - private final StateChangeListener _unacknowledgedMessageListener = new StateChangeListener() + private final StateChangeListener _unacknowledgedMessageListener = new StateChangeListener() { - - public void stateChanged(MessageInstance entry, MessageInstance.State oldState, MessageInstance.State newState) + @Override + public void stateChanged(MessageInstance entry, EntryState oldState, EntryState newState) { - if(oldState == MessageInstance.State.ACQUIRED && newState != MessageInstance.State.ACQUIRED) + if (isConsumerAcquiredStateForThis(oldState) && !isConsumerAcquiredStateForThis(newState)) { removeUnacknowledgedMessage(entry); entry.removeStateChangeListener(this); } + } + private boolean isConsumerAcquiredStateForThis(EntryState state) + { + return state instanceof MessageInstance.ConsumerAcquiredState + && ((MessageInstance.ConsumerAcquiredState) state).getConsumer().getTarget() == ConsumerTarget_0_8.this; } }; } Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java?rev=1760032&r1=1760031&r2=1760032&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java (original) +++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java Fri Sep 9 14:49:41 2016 @@ -155,7 +155,7 @@ public class UnacknowledgedMessageMapImp List acknowledged = new ArrayList<>(); for (MessageInstance instance : ackedMessageMap.values()) { - if (instance.lockAcquisition(instance.getAcquiringConsumer())) + if (instance.makeAcquisitionUnstealable(instance.getAcquiringConsumer())) { acknowledged.add(instance); } @@ -169,7 +169,7 @@ public class UnacknowledgedMessageMapImp { instance = remove(deliveryTag); } - if(instance != null && instance.lockAcquisition(instance.getAcquiringConsumer())) + if(instance != null && instance.makeAcquisitionUnstealable(instance.getAcquiringConsumer())) { return Collections.singleton(instance); } Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapTest.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapTest.java?rev=1760032&r1=1760031&r2=1760032&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapTest.java (original) +++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapTest.java Fri Sep 9 14:49:41 2016 @@ -50,8 +50,8 @@ public class UnacknowledgedMessageMapTes map = new UnacknowledgedMessageMapImpl(100); msgs = populateMap(map,expectedSize); // simulate some messages being ttl expired - when(msgs[2].lockAcquisition(_consumer)).thenReturn(Boolean.FALSE); - when(msgs[4].lockAcquisition(_consumer)).thenReturn(Boolean.FALSE); + when(msgs[2].makeAcquisitionUnstealable(_consumer)).thenReturn(Boolean.FALSE); + when(msgs[4].makeAcquisitionUnstealable(_consumer)).thenReturn(Boolean.FALSE); assertEquals(expectedSize,map.size()); @@ -80,7 +80,7 @@ public class UnacknowledgedMessageMapTes private MessageInstance createMessageInstance(final int id) { MessageInstance instance = mock(MessageInstance.class); - when(instance.lockAcquisition(_consumer)).thenReturn(Boolean.TRUE); + when(instance.makeAcquisitionUnstealable(_consumer)).thenReturn(Boolean.TRUE); when(instance.getAcquiringConsumer()).thenReturn(_consumer); return instance; } Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java?rev=1760032&r1=1760031&r2=1760032&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java (original) +++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java Fri Sep 9 14:49:41 2016 @@ -372,7 +372,7 @@ class ConsumerTarget_1_0 extends Abstrac if(outcome instanceof Accepted) { - if (_queueEntry.lockAcquisition(getConsumer())) + if (_queueEntry.makeAcquisitionUnstealable(getConsumer())) { txn.dequeue(_queueEntry.getEnqueueRecord(), new ServerTransaction.Action() Modified: qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java?rev=1760032&r1=1760031&r2=1760032&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java (original) +++ qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java Fri Sep 9 14:49:41 2016 @@ -1065,13 +1065,13 @@ class ManagementNode implements MessageS } @Override - public void addStateChangeListener(final StateChangeListener listener) + public void addStateChangeListener(final StateChangeListener listener) { } @Override - public boolean removeStateChangeListener(final StateChangeListener listener) + public boolean removeStateChangeListener(final StateChangeListener listener) { return false; } @@ -1156,13 +1156,13 @@ class ManagementNode implements MessageS } @Override - public boolean lockAcquisition(final ConsumerImpl consumer) + public boolean makeAcquisitionUnstealable(final ConsumerImpl consumer) { return false; } @Override - public boolean unlockAcquisition() + public boolean makeAcquisitionStealable() { return false; } Modified: qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java?rev=1760032&r1=1760031&r2=1760032&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java (original) +++ qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java Fri Sep 9 14:49:41 2016 @@ -66,13 +66,13 @@ class ManagementResponse implements Mess } @Override - public void addStateChangeListener(final StateChangeListener listener) + public void addStateChangeListener(final StateChangeListener listener) { } @Override - public boolean removeStateChangeListener(final StateChangeListener listener) + public boolean removeStateChangeListener(final StateChangeListener listener) { return false; } @@ -157,13 +157,13 @@ class ManagementResponse implements Mess } @Override - public boolean lockAcquisition(final ConsumerImpl consumer) + public boolean makeAcquisitionUnstealable(final ConsumerImpl consumer) { return false; } @Override - public boolean unlockAcquisition() + public boolean makeAcquisitionStealable() { return false; } Modified: qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/client/FlowControlTest.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/client/FlowControlTest.java?rev=1760032&r1=1760031&r2=1760032&view=diff ============================================================================== --- qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/client/FlowControlTest.java (original) +++ qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/client/FlowControlTest.java Fri Sep 9 14:49:41 2016 @@ -33,7 +33,6 @@ import org.slf4j.LoggerFactory; import org.apache.qpid.client.AMQSession_0_8; import org.apache.qpid.client.message.AbstractJMSMessage; -import org.apache.qpid.test.utils.BrokerHolder; import org.apache.qpid.test.utils.QpidBrokerTestCase; public class FlowControlTest extends QpidBrokerTestCase @@ -70,18 +69,9 @@ public class FlowControlTest extends Qpi Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); MessageProducer producer = producerSession.createProducer(_queue); - BytesMessage m1 = producerSession.createBytesMessage(); - m1.writeBytes(new byte[128]); - m1.setIntProperty("msg", 1); - producer.send(m1); - BytesMessage m2 = producerSession.createBytesMessage(); - m2.writeBytes(new byte[128]); - m2.setIntProperty("msg", 2); - producer.send(m2); - BytesMessage m3 = producerSession.createBytesMessage(); - m3.writeBytes(new byte[256]); - m3.setIntProperty("msg", 3); - producer.send(m3); + sendBytesMessage(producerSession, producer, 1, 128); + sendBytesMessage(producerSession, producer, 2, 128); + sendBytesMessage(producerSession, producer, 3, 256); producer.close(); producerSession.close(); @@ -140,18 +130,9 @@ public class FlowControlTest extends Qpi Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); MessageProducer producer = producerSession.createProducer(_queue); - BytesMessage m1 = producerSession.createBytesMessage(); - m1.writeBytes(new byte[128]); - m1.setIntProperty("msg", 1); - producer.send(m1); - BytesMessage m2 = producerSession.createBytesMessage(); - m2.writeBytes(new byte[256]); - m2.setIntProperty("msg", 2); - producer.send(m2); - BytesMessage m3 = producerSession.createBytesMessage(); - m3.writeBytes(new byte[128]); - m3.setIntProperty("msg", 3); - producer.send(m3); + sendBytesMessage(producerSession, producer, 1, 128); + sendBytesMessage(producerSession, producer, 2, 256); + sendBytesMessage(producerSession, producer, 3, 128); producer.close(); producerSession.close(); @@ -196,31 +177,47 @@ public class FlowControlTest extends Qpi } - public static void main(String args[]) throws Throwable + public void testDeliverMessageLargerThanBytesLimit() throws Exception { - FlowControlTest test = new FlowControlTest(); + _queue = (Queue) getInitialContext().lookup("queue"); + Connection connection = getConnection(); + connection.start(); + + Session producerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + producerSession.createConsumer(_queue).close(); + MessageProducer producer = producerSession.createProducer(_queue); + + sendBytesMessage(producerSession, producer, 1, 128); + sendBytesMessage(producerSession, producer, 2, 256); + + Session consumerSession1 = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + ((AMQSession_0_8) consumerSession1).setPrefetchLimits(0, 64); + MessageConsumer recv1 = consumerSession1.createConsumer(_queue); + + Message r1 = recv1.receive(RECEIVE_TIMEOUT); + assertNotNull("First message not received", r1); + assertEquals("Messages in wrong order", 1, r1.getIntProperty("msg")); - int run = 0; - while (true) - { - System.err.println("Test Run:" + ++run); - Thread.sleep(1000); - BrokerHolder broker = null; - try - { - broker = test.createSpawnedBroker(); - test.testBasicBytesFlowControl(); - - Thread.sleep(1000); - } - finally - { - if (broker != null) - { - broker.shutdown(); - } - } - } + Message r2 = recv1.receive(RECEIVE_TIMEOUT); + assertNull("Second message incorrectly delivered", r2); + + r1.acknowledge(); + + r2 = recv1.receive(RECEIVE_TIMEOUT); + assertNotNull("Second message not received", r2); + assertEquals("Wrong messages received", 2, r2.getIntProperty("msg")); + + r2.acknowledge(); + } + + private void sendBytesMessage(final Session producerSession, + final MessageProducer producer, + final int messageId, final int messageSize) throws Exception + { + BytesMessage message = producerSession.createBytesMessage(); + message.writeBytes(new byte[messageSize]); + message.setIntProperty("msg", messageId); + producer.send(message); } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org For additional commands, e-mail: commits-help@qpid.apache.org