qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rgodf...@apache.org
Subject svn commit: r1772527 [1/2] - in /qpid/java/trunk: broker-core/src/main/java/org/apache/qpid/server/consumer/ broker-core/src/main/java/org/apache/qpid/server/filter/ broker-core/src/main/java/org/apache/qpid/server/logging/subjects/ broker-core/src/mai...
Date Sun, 04 Dec 2016 12:24:58 GMT
Author: rgodfrey
Date: Sun Dec  4 12:24:57 2016
New Revision: 1772527

URL: http://svn.apache.org/viewvc?rev=1772527&view=rev
Log:
QPID-7572 : Parameterize consumer with the consumer target type

Modified:
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/filter/FilterSupport.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/logging/subjects/ChannelLogSubject.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstanceConsumer.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageSource.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Consumer.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/CapacityChecker.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/ConsumerListener.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AssignedConsumerMessageGroupManager.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/DefinedGroupMessageGroupManager.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/MessageGroupManager.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerManager.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerManagerImpl.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerNode.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/security/TrustStoreMessageSource.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostPropertiesNode.java
    qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/consumer/TestConsumerTarget.java
    qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/model/BrokerTestHelper.java
    qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
    qpid/java/trunk/broker-plugins/access-control/src/main/java/org/apache/qpid/server/security/access/config/LegacyAccessControlAdapter.java
    qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java
    qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
    qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
    qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
    qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
    qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8Impl.java
    qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
    qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
    qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java?rev=1772527&r1=1772526&r2=1772527&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java Sun Dec  4 12:24:57 2016
@@ -29,8 +29,6 @@ import java.util.concurrent.atomic.Atomi
 
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import org.apache.qpid.server.logging.LogSubject;
 import org.apache.qpid.server.logging.messages.SubscriptionMessages;
@@ -42,9 +40,8 @@ import org.apache.qpid.server.message.Me
 import org.apache.qpid.server.queue.SuspendedConsumerLoggingTicker;
 import org.apache.qpid.server.transport.AMQPConnection;
 
-public abstract class AbstractConsumerTarget implements ConsumerTarget
+public abstract class AbstractConsumerTarget<T extends AbstractConsumerTarget<T>> implements ConsumerTarget<T>
 {
-    private static final Logger LOGGER = LoggerFactory.getLogger(AbstractConsumerTarget.class);
     private static final LogSubject MULTI_QUEUE_LOG_SUBJECT = new LogSubject()
     {
         @Override
@@ -98,7 +95,9 @@ public abstract class AbstractConsumerTa
     @Override
     public void notifyWork()
     {
-        getSessionModel().notifyWork(this);
+        @SuppressWarnings("unchecked")
+        final T target = (T) this;
+        getSessionModel().notifyWork(target);
     }
 
     protected final void setNotifyWorkDesired(final boolean desired)
@@ -173,7 +172,7 @@ public abstract class AbstractConsumerTa
 
     private ListenableFuture<Void> doOnIoThreadAsync(final Runnable task)
     {
-        AMQSessionModel<?> sessionModel = getSessionModel();
+        AMQSessionModel<?,T> sessionModel = getSessionModel();
         return sessionModel.getAMQPConnection().doOnIOThreadAsync(task);
     }
 

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java?rev=1772527&r1=1772526&r2=1772527&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java Sun Dec  4 12:24:57 2016
@@ -27,7 +27,7 @@ import org.apache.qpid.server.message.Me
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.protocol.AMQSessionModel;
 
-public interface ConsumerTarget
+public interface ConsumerTarget<T extends ConsumerTarget<T>>
 {
     void acquisitionRemoved(MessageInstance node);
 
@@ -50,17 +50,17 @@ public interface ConsumerTarget
 
     State getState();
 
-    void consumerAdded(MessageInstanceConsumer sub);
+    void consumerAdded(MessageInstanceConsumer<T> sub);
 
-    ListenableFuture<Void> consumerRemoved(MessageInstanceConsumer sub);
+    ListenableFuture<Void> consumerRemoved(MessageInstanceConsumer<T> sub);
 
     long getUnacknowledgedBytes();
 
     long getUnacknowledgedMessages();
 
-    AMQSessionModel<?> getSessionModel();
+    AMQSessionModel<?,T> getSessionModel();
 
-    long send(final MessageInstanceConsumer consumer, MessageInstance entry, boolean batch);
+    long send(final MessageInstanceConsumer<T> consumer, MessageInstance entry, boolean batch);
 
     boolean sendNextMessage();
 

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/filter/FilterSupport.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/filter/FilterSupport.java?rev=1772527&r1=1772526&r2=1772527&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/filter/FilterSupport.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/filter/FilterSupport.java Sun Dec  4 12:24:57 2016
@@ -134,8 +134,8 @@ public class FilterSupport
         public boolean matches(Filterable message)
         {
 
-            final Collection<QueueConsumer<?>> consumers = _queue.getConsumers();
-            for(QueueConsumer<?> c : consumers)
+            final Collection<QueueConsumer<?,?>> consumers = _queue.getConsumers();
+            for(QueueConsumer<?,?> c : consumers)
             {
                 if(c.getSessionModel().getConnectionReference() == message.getConnectionReference())
                 {

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/logging/subjects/ChannelLogSubject.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/logging/subjects/ChannelLogSubject.java?rev=1772527&r1=1772526&r2=1772527&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/logging/subjects/ChannelLogSubject.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/logging/subjects/ChannelLogSubject.java Sun Dec  4 12:24:57 2016
@@ -27,7 +27,7 @@ import org.apache.qpid.server.transport.
 
 public class ChannelLogSubject extends AbstractLogSubject
 {
-    private final AMQSessionModel<?> _sessionModel;
+    private final AMQSessionModel<?,?> _sessionModel;
     public ChannelLogSubject(AMQSessionModel session)
     {
         _sessionModel = session;

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java?rev=1772527&r1=1772526&r2=1772527&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java Sun Dec  4 12:24:57 2016
@@ -77,7 +77,7 @@ public interface MessageInstance
 
     Filterable asFilterable();
 
-    MessageInstanceConsumer getAcquiringConsumer();
+    MessageInstanceConsumer<?> getAcquiringConsumer();
 
     MessageEnqueueRecord getEnqueueRecord();
 

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstanceConsumer.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstanceConsumer.java?rev=1772527&r1=1772526&r2=1772527&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstanceConsumer.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstanceConsumer.java Sun Dec  4 12:24:57 2016
@@ -22,7 +22,7 @@ package org.apache.qpid.server.message;
 
 import org.apache.qpid.server.consumer.ConsumerTarget;
 
-public interface MessageInstanceConsumer
+public interface MessageInstanceConsumer<T extends ConsumerTarget>
 {
     boolean isClosed();
 
@@ -38,7 +38,7 @@ public interface MessageInstanceConsumer
 
     MessageContainer pullMessage();
 
-    ConsumerTarget getTarget();
+    T getTarget();
 
     void setNotifyWorkDesired(boolean desired);
 }

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageSource.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageSource.java?rev=1772527&r1=1772526&r2=1772527&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageSource.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageSource.java Sun Dec  4 12:24:57 2016
@@ -31,17 +31,17 @@ import org.apache.qpid.server.store.Tran
 
 public interface MessageSource extends TransactionLogResource, MessageNode
 {
-    MessageInstanceConsumer addConsumer(ConsumerTarget target, FilterManager filters,
-                                        Class<? extends ServerMessage> messageClass,
-                                        String consumerName,
-                                        EnumSet<ConsumerOption> options,
-                                        Integer priority)
+    <T extends ConsumerTarget<T>> MessageInstanceConsumer<T> addConsumer(T target, FilterManager filters,
+                                                                      Class<? extends ServerMessage> messageClass,
+                                                                      String consumerName,
+                                                                      EnumSet<ConsumerOption> options,
+                                                                      Integer priority)
             throws ExistingExclusiveConsumer, ExistingConsumerPreventsExclusive,
                    ConsumerAccessRefused, QueueDeleted;
 
     Collection<? extends MessageInstanceConsumer> getConsumers();
 
-    boolean verifySessionAccess(AMQSessionModel<?> session);
+    boolean verifySessionAccess(AMQSessionModel<?,?> session);
 
     /**
      * ExistingExclusiveConsumer signals a failure to create a consumer, because an exclusive consumer

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Consumer.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Consumer.java?rev=1772527&r1=1772526&r2=1772527&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Consumer.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Consumer.java Sun Dec  4 12:24:57 2016
@@ -22,11 +22,12 @@ package org.apache.qpid.server.model;
 
 import java.util.concurrent.atomic.AtomicLong;
 
+import org.apache.qpid.server.consumer.ConsumerTarget;
 import org.apache.qpid.server.message.MessageInstanceConsumer;
 import org.apache.qpid.server.protocol.AMQSessionModel;
 
 @ManagedObject(creatable = false, amqpName = "org.apache.qpid.Consumer")
-public interface Consumer<X extends Consumer<X>> extends ConfiguredObject<X>, MessageInstanceConsumer
+public interface Consumer<X extends Consumer<X,T>, T extends ConsumerTarget> extends ConfiguredObject<X>, MessageInstanceConsumer<T>
 {
     String DISTRIBUTION_MODE = "distributionMode";
     String EXCLUSIVE = "exclusive";

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java?rev=1772527&r1=1772526&r2=1772527&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java Sun Dec  4 12:24:57 2016
@@ -264,7 +264,7 @@ public interface Queue<X extends Queue<X
 
 
     @ManagedOperation(nonModifying = true, changesConfiguredObjectState = false)
-    Collection<QueueConsumer<?>> getConsumers();
+    Collection<QueueConsumer<?,?>> getConsumers();
 
     //operations
 

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java?rev=1772527&r1=1772526&r2=1772527&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java Sun Dec  4 12:24:57 2016
@@ -62,7 +62,7 @@ public final class SessionAdapter extend
     private final AbstractAMQPConnection<?,?> _amqpConnection;
 
     public SessionAdapter(final AbstractAMQPConnection<?,?> amqpConnection,
-                          final AMQSessionModel session)
+                          final AMQSessionModel<?,?> session)
     {
         super(parentsMap(amqpConnection), createAttributes(session));
         _amqpConnection = amqpConnection;
@@ -70,13 +70,13 @@ public final class SessionAdapter extend
         _session.addConsumerListener(new ConsumerListener()
         {
             @Override
-            public void consumerAdded(final Consumer<?> consumer)
+            public void consumerAdded(final Consumer<?,?> consumer)
             {
                 childAdded(consumer);
             }
 
             @Override
-            public void consumerRemoved(final Consumer<?> consumer)
+            public void consumerRemoved(final Consumer<?,?> consumer)
             {
                 childRemoved(consumer);
 

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java?rev=1772527&r1=1772526&r2=1772527&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java Sun Dec  4 12:24:57 2016
@@ -38,7 +38,7 @@ import org.apache.qpid.transport.network
  * Extends {@link Comparable} to allow objects to be inserted into a {@link ConcurrentSkipListSet}
  * when monitoring the blocking and blocking of queues/sessions in {@link Queue}.
  */
-public interface AMQSessionModel<T extends AMQSessionModel<T>> extends Comparable<AMQSessionModel>, Deletable<T>
+public interface AMQSessionModel<T extends AMQSessionModel<T,X>, X extends ConsumerTarget<X>> extends Comparable<AMQSessionModel>, Deletable<T>
 {
     UUID getId();
 
@@ -72,7 +72,7 @@ public interface AMQSessionModel<T exten
 
     int getConsumerCount();
 
-    Collection<Consumer<?>> getConsumers();
+    Collection<Consumer<?,X>> getConsumers();
 
     void addConsumerListener(ConsumerListener listener);
 
@@ -103,5 +103,5 @@ public interface AMQSessionModel<T exten
     void addTicker(Ticker ticker);
     void removeTicker(Ticker ticker);
 
-    void notifyWork(ConsumerTarget target);
+    void notifyWork(X target);
 }

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/CapacityChecker.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/CapacityChecker.java?rev=1772527&r1=1772526&r2=1772527&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/CapacityChecker.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/CapacityChecker.java Sun Dec  4 12:24:57 2016
@@ -22,5 +22,5 @@ package org.apache.qpid.server.protocol;
 
 public interface CapacityChecker
 {
-    void checkCapacity(AMQSessionModel<?> channel);
+    void checkCapacity(AMQSessionModel<?,?> channel);
 }

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/ConsumerListener.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/ConsumerListener.java?rev=1772527&r1=1772526&r2=1772527&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/ConsumerListener.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/ConsumerListener.java Sun Dec  4 12:24:57 2016
@@ -24,7 +24,7 @@ import org.apache.qpid.server.model.Cons
 
 public interface ConsumerListener
 {
-    void consumerAdded(Consumer<?> consumer);
+    void consumerAdded(Consumer<?,?> consumer);
 
-    void consumerRemoved(Consumer<?> consumer);
+    void consumerRemoved(Consumer<?,?> consumer);
 }

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java?rev=1772527&r1=1772526&r2=1772527&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java Sun Dec  4 12:24:57 2016
@@ -153,7 +153,7 @@ public abstract class AbstractQueue<X ex
     @ManagedAttributeField( beforeSet = "preSetAlternateExchange", afterSet = "postSetAlternateExchange")
     private Exchange _alternateExchange;
 
-    private volatile QueueConsumer<?> _exclusiveSubscriber;
+    private volatile QueueConsumer<?,?> _exclusiveSubscriber;
 
     private final AtomicLong _targetQueueSize = new AtomicLong(INITIAL_TARGET_QUEUE_SIZE);
 
@@ -206,7 +206,7 @@ public abstract class AbstractQueue<X ex
 
     private AtomicBoolean _stopped = new AtomicBoolean(false);
 
-    private final Set<AMQSessionModel> _blockedChannels = new ConcurrentSkipListSet<AMQSessionModel>();
+    private final Set<AMQSessionModel<?,?>> _blockedChannels = new ConcurrentSkipListSet<>();
 
     private final AtomicBoolean _deleted = new AtomicBoolean(false);
     private final SettableFuture<Integer> _deleteFuture = SettableFuture.create();
@@ -347,7 +347,7 @@ public abstract class AbstractQueue<X ex
         _queueHouseKeepingTask = new AdvanceConsumersTask();
         Subject activeSubject = Subject.getSubject(AccessController.getContext());
         Set<SessionPrincipal> sessionPrincipals = activeSubject == null ? Collections.<SessionPrincipal>emptySet() : activeSubject.getPrincipals(SessionPrincipal.class);
-        AMQSessionModel<?> sessionModel;
+        AMQSessionModel<?,?> sessionModel;
         if(sessionPrincipals.isEmpty())
         {
             sessionModel = null;
@@ -692,7 +692,7 @@ public abstract class AbstractQueue<X ex
 
 
     @Override
-    public QueueConsumerImpl addConsumer(final ConsumerTarget target,
+    public <T extends ConsumerTarget<T>> QueueConsumerImpl<T> addConsumer(final T target,
                                          final FilterManager filters,
                                          final Class<? extends ServerMessage> messageClass,
                                          final String consumerName,
@@ -704,10 +704,10 @@ public abstract class AbstractQueue<X ex
 
         try
         {
-            final QueueConsumerImpl queueConsumer = getTaskExecutor().run(new Task<QueueConsumerImpl, Exception>()
+            final QueueConsumerImpl<T> queueConsumer = getTaskExecutor().run(new Task<QueueConsumerImpl<T>, Exception>()
             {
                 @Override
-                public QueueConsumerImpl execute() throws Exception
+                public QueueConsumerImpl<T> execute() throws Exception
                 {
                     return addConsumerInternal(target, filters, messageClass, consumerName, optionSet, priority);
                 }
@@ -755,7 +755,7 @@ public abstract class AbstractQueue<X ex
 
     }
 
-    private QueueConsumerImpl addConsumerInternal(final ConsumerTarget target,
+    private <T extends ConsumerTarget<T>> QueueConsumerImpl<T> addConsumerInternal(final T target,
                                                   FilterManager filters,
                                                   final Class<? extends ServerMessage> messageClass,
                                                   final String consumerName,
@@ -895,7 +895,7 @@ public abstract class AbstractQueue<X ex
             optionSet.removeAll(EnumSet.of(ConsumerOption.SEES_REQUEUES, ConsumerOption.ACQUIRES));
         }
 
-        QueueConsumerImpl consumer = new QueueConsumerImpl(this,
+        QueueConsumerImpl<T> consumer = new QueueConsumerImpl<>(this,
                                                            target,
                                                            consumerName,
                                                            filters,
@@ -1001,18 +1001,18 @@ public abstract class AbstractQueue<X ex
     }
 
     @Override
-    public Collection<QueueConsumer<?>> getConsumers()
+    public Collection<QueueConsumer<?,?>> getConsumers()
     {
         return getConsumersImpl();
     }
 
-    private Collection<QueueConsumer<?>> getConsumersImpl()
+    private Collection<QueueConsumer<?,?>> getConsumersImpl()
     {
         return Lists.newArrayList(_queueConsumerManager.getAllIterator());
     }
 
 
-    public void resetSubPointersForGroups(QueueConsumer<?> consumer)
+    public void resetSubPointersForGroups(QueueConsumer<?,?> consumer)
     {
         QueueEntry entry = _messageGroupManager.findEarliestAssignedAvailableEntry(consumer);
         _messageGroupManager.clearAssignments(consumer);
@@ -1194,7 +1194,7 @@ public abstract class AbstractQueue<X ex
         }
     }
 
-    private boolean assign(final QueueConsumer<?> sub, final QueueEntry entry)
+    private boolean assign(final QueueConsumer<?,?> sub, final QueueEntry entry)
     {
         if(_messageGroupManager == null)
         {
@@ -1238,7 +1238,7 @@ public abstract class AbstractQueue<X ex
         return _queueStatistics.getEnqueueCount();
     }
 
-    private void setLastSeenEntry(final QueueConsumer<?> sub, final QueueEntry entry)
+    private void setLastSeenEntry(final QueueConsumer<?,?> sub, final QueueEntry entry)
     {
         QueueContext subContext = sub.getQueueContext();
         if (subContext != null)
@@ -1253,7 +1253,7 @@ public abstract class AbstractQueue<X ex
         }
     }
 
-    private void updateSubRequeueEntry(final QueueConsumer<?> sub, final QueueEntry entry)
+    private void updateSubRequeueEntry(final QueueConsumer<?,?> sub, final QueueEntry entry)
     {
         QueueContext subContext = sub.getQueueContext();
         if(subContext != null)
@@ -1286,11 +1286,11 @@ public abstract class AbstractQueue<X ex
 
     private void resetSubPointers(final QueueEntry entry, final boolean ignoreAvailable)
     {
-        Iterator<QueueConsumer<?>> consumerIterator = _queueConsumerManager.getAllIterator();
+        Iterator<QueueConsumer<?,?>> consumerIterator = _queueConsumerManager.getAllIterator();
         // iterate over all the subscribers, and if they are in advance of this queue entry then move them backwards
         while (consumerIterator.hasNext() && (ignoreAvailable || entry.isAvailable()))
         {
-            QueueConsumer<?> sub = consumerIterator.next();
+            QueueConsumer<?,?> sub = consumerIterator.next();
 
             // we don't make browsers send the same stuff twice
             if (sub.seesRequeues())
@@ -1454,7 +1454,7 @@ public abstract class AbstractQueue<X ex
         return _exclusiveSubscriber != null;
     }
 
-    private void setExclusiveSubscriber(QueueConsumer<?> exclusiveSubscriber)
+    private void setExclusiveSubscriber(QueueConsumer<?,?> exclusiveSubscriber)
     {
         _exclusiveSubscriber = exclusiveSubscriber;
     }
@@ -1664,11 +1664,11 @@ public abstract class AbstractQueue<X ex
                     try
                     {
 
-                        Iterator<QueueConsumer<?>> consumerIterator = _queueConsumerManager.getAllIterator();
+                        Iterator<QueueConsumer<?,?>> consumerIterator = _queueConsumerManager.getAllIterator();
 
                         while (consumerIterator.hasNext())
                         {
-                            QueueConsumer<?> consumer = consumerIterator.next();
+                            QueueConsumer<?,?> consumer = consumerIterator.next();
 
                             if (consumer != null)
                             {
@@ -1750,7 +1750,7 @@ public abstract class AbstractQueue<X ex
     }
 
     @Override
-    public void checkCapacity(AMQSessionModel<?> channel)
+    public void checkCapacity(AMQSessionModel<?,?> channel)
     {
         if(_queueFlowControlSizeBytes != 0L)
         {
@@ -1799,7 +1799,7 @@ public abstract class AbstractQueue<X ex
                                                                          _queueFlowResumeSizeBytes));
                     }
 
-                    for (final AMQSessionModel<?> blockedChannel : _blockedChannels)
+                    for (final AMQSessionModel<?,?> blockedChannel : _blockedChannels)
                     {
                         blockedChannel.unblock(this);
                         _blockedChannels.remove(blockedChannel);
@@ -1812,20 +1812,20 @@ public abstract class AbstractQueue<X ex
     void notifyConsumers(QueueEntry entry)
     {
 
-        Iterator<QueueConsumer<?>> nonAcquiringIterator = _queueConsumerManager.getNonAcquiringIterator();
+        Iterator<QueueConsumer<?,?>> nonAcquiringIterator = _queueConsumerManager.getNonAcquiringIterator();
         while (nonAcquiringIterator.hasNext())
         {
-            QueueConsumer<?> consumer = nonAcquiringIterator.next();
+            QueueConsumer<?,?> consumer = nonAcquiringIterator.next();
             if(consumer.hasInterest(entry))
             {
                 notifyConsumer(consumer);
             }
         }
 
-        final Iterator<QueueConsumer<?>> interestedIterator = _queueConsumerManager.getInterestedIterator();
+        final Iterator<QueueConsumer<?,?>> interestedIterator = _queueConsumerManager.getInterestedIterator();
         while (entry.isAvailable() && interestedIterator.hasNext())
         {
-            QueueConsumer<?> consumer = interestedIterator.next();
+            QueueConsumer<?,?> consumer = interestedIterator.next();
             if(consumer.hasInterest(entry))
             {
                 if(notifyConsumer(consumer))
@@ -1842,12 +1842,12 @@ public abstract class AbstractQueue<X ex
         }
     }
 
-    void notifyOtherConsumers(final QueueConsumer<?> excludedConsumer)
+    void notifyOtherConsumers(final QueueConsumer<?,?> excludedConsumer)
     {
-        final Iterator<QueueConsumer<?>> interestedIterator = _queueConsumerManager.getInterestedIterator();
+        final Iterator<QueueConsumer<?,?>> interestedIterator = _queueConsumerManager.getInterestedIterator();
         while (hasAvailableMessages() && interestedIterator.hasNext())
         {
-            QueueConsumer<?> consumer = interestedIterator.next();
+            QueueConsumer<?,?> consumer = interestedIterator.next();
 
             if (excludedConsumer != consumer)
             {
@@ -1860,7 +1860,7 @@ public abstract class AbstractQueue<X ex
     }
 
 
-    MessageContainer deliverSingleMessage(QueueConsumer<?> consumer)
+    MessageContainer deliverSingleMessage(QueueConsumer<?,?> consumer)
     {
         boolean queueEmpty = false;
         MessageContainer messageContainer = null;
@@ -1930,7 +1930,7 @@ public abstract class AbstractQueue<X ex
      * @param sub the consumer
      * @return true if we have completed all possible deliveries for this sub.
      */
-    private MessageContainer attemptDelivery(QueueConsumer<?> sub)
+    private MessageContainer attemptDelivery(QueueConsumer<?,?> sub)
     {
         MessageContainer messageContainer;
         // avoid referring old deleted queue entry in sub._queueContext._lastSeen
@@ -1992,13 +1992,13 @@ public abstract class AbstractQueue<X ex
         return messageContainer;
     }
 
-    private boolean noHigherPriorityWithCredit(final QueueConsumer<?> sub, final QueueEntry queueEntry)
+    private boolean noHigherPriorityWithCredit(final QueueConsumer<?,?> sub, final QueueEntry queueEntry)
     {
-        Iterator<QueueConsumer<?>> consumerIterator = _queueConsumerManager.getAllIterator();
+        Iterator<QueueConsumer<?,?>> consumerIterator = _queueConsumerManager.getAllIterator();
 
         while (consumerIterator.hasNext())
         {
-            QueueConsumer<?> consumer = consumerIterator.next();
+            QueueConsumer<?,?> consumer = consumerIterator.next();
             if(consumer.getPriority() > sub.getPriority())
             {
                 if(consumer.isNotifyWorkDesired()
@@ -2018,7 +2018,7 @@ public abstract class AbstractQueue<X ex
     }
 
 
-    QueueEntry getNextAvailableEntry(final QueueConsumer sub)
+    private QueueEntry getNextAvailableEntry(final QueueConsumer<?, ?> sub)
     {
         QueueContext context = sub.getQueueContext();
         if(context != null)
@@ -2060,7 +2060,7 @@ public abstract class AbstractQueue<X ex
         }
     }
 
-    public boolean isEntryAheadOfConsumer(QueueEntry entry, QueueConsumer<?> sub)
+    public boolean isEntryAheadOfConsumer(QueueEntry entry, QueueConsumer<?,?> sub)
     {
         QueueContext context = sub.getQueueContext();
         if(context != null)
@@ -2178,11 +2178,11 @@ public abstract class AbstractQueue<X ex
                 _activeSubscriberCount.decrementAndGet();
 
                 // iterate over interested and notify one as long as its priority is higher than any notified
-                final Iterator<QueueConsumer<?>> consumerIterator = _queueConsumerManager.getInterestedIterator();
+                final Iterator<QueueConsumer<?,?>> consumerIterator = _queueConsumerManager.getInterestedIterator();
                 final int highestNotifiedPriority = _queueConsumerManager.getHighestNotifiedPriority();
                 while (consumerIterator.hasNext())
                 {
-                    QueueConsumer<?> queueConsumer = consumerIterator.next();
+                    QueueConsumer<?,?> queueConsumer = consumerIterator.next();
                     if (queueConsumer.getPriority() < highestNotifiedPriority || notifyConsumer(queueConsumer))
                     {
                         break;
@@ -2192,7 +2192,7 @@ public abstract class AbstractQueue<X ex
         }
     }
 
-    private boolean notifyConsumer(final QueueConsumer<?> consumer)
+    private boolean notifyConsumer(final QueueConsumer<?,?> consumer)
     {
         if(consumerHasAvailableMessages(consumer) && _queueConsumerManager.setNotified(consumer, true))
         {
@@ -2638,7 +2638,7 @@ public abstract class AbstractQueue<X ex
     }
 
     @Override
-    public boolean verifySessionAccess(final AMQSessionModel<?> session)
+    public boolean verifySessionAccess(final AMQSessionModel<?,?> session)
     {
         boolean allowed;
         switch(_exclusive)
@@ -2708,7 +2708,7 @@ public abstract class AbstractQueue<X ex
         switch (getConsumerCount())
         {
             case 1:
-                Iterator<QueueConsumer<?>> consumerIterator = _queueConsumerManager.getAllIterator();
+                Iterator<QueueConsumer<?,?>> consumerIterator = _queueConsumerManager.getAllIterator();
 
                 if (consumerIterator.hasNext())
                 {
@@ -2734,10 +2734,10 @@ public abstract class AbstractQueue<X ex
             case CONTAINER:
             case CONNECTION:
                 AMQSessionModel session = null;
-                Iterator<QueueConsumer<?>> queueConsumerIterator = _queueConsumerManager.getAllIterator();
+                Iterator<QueueConsumer<?,?>> queueConsumerIterator = _queueConsumerManager.getAllIterator();
                 while(queueConsumerIterator.hasNext())
                 {
-                    QueueConsumer<?> c = queueConsumerIterator.next();
+                    QueueConsumer<?,?> c = queueConsumerIterator.next();
 
                     if(session == null)
                     {
@@ -2763,10 +2763,10 @@ public abstract class AbstractQueue<X ex
             case CONTAINER:
             case PRINCIPAL:
                 AMQPConnection con = null;
-                Iterator<QueueConsumer<?>> queueConsumerIterator = _queueConsumerManager.getAllIterator();
+                Iterator<QueueConsumer<?,?>> queueConsumerIterator = _queueConsumerManager.getAllIterator();
                 while(queueConsumerIterator.hasNext())
                 {
-                    QueueConsumer<?> c = queueConsumerIterator.next();
+                    QueueConsumer<?,?> c = queueConsumerIterator.next();
                     if(con == null)
                     {
                         con = c.getSessionModel().getAMQPConnection();
@@ -2793,10 +2793,10 @@ public abstract class AbstractQueue<X ex
             case NONE:
             case PRINCIPAL:
                 String containerID = null;
-                Iterator<QueueConsumer<?>> queueConsumerIterator = _queueConsumerManager.getAllIterator();
+                Iterator<QueueConsumer<?,?>> queueConsumerIterator = _queueConsumerManager.getAllIterator();
                 while(queueConsumerIterator.hasNext())
                 {
-                    QueueConsumer<?> c = queueConsumerIterator.next();
+                    QueueConsumer<?,?> c = queueConsumerIterator.next();
                     if(containerID == null)
                     {
                         containerID = c.getSessionModel().getAMQPConnection().getRemoteContainerName();
@@ -2826,10 +2826,10 @@ public abstract class AbstractQueue<X ex
             case NONE:
             case CONTAINER:
                 Principal principal = null;
-                Iterator<QueueConsumer<?>> queueConsumerIterator = _queueConsumerManager.getAllIterator();
+                Iterator<QueueConsumer<?,?>> queueConsumerIterator = _queueConsumerManager.getAllIterator();
                 while(queueConsumerIterator.hasNext())
                 {
-                    QueueConsumer<?> c = queueConsumerIterator.next();
+                    QueueConsumer<?,?> c = queueConsumerIterator.next();
                     if(principal == null)
                     {
                         principal = c.getSessionModel().getAMQPConnection().getAuthorizedPrincipal();
@@ -3437,11 +3437,11 @@ public abstract class AbstractQueue<X ex
             // next entry they are interested in yet.  This would lead to holding on to references to expired messages, etc
             // which would give us memory "leak".
 
-            Iterator<QueueConsumer<?>> consumerIterator = _queueConsumerManager.getAllIterator();
+            Iterator<QueueConsumer<?,?>> consumerIterator = _queueConsumerManager.getAllIterator();
 
             while (consumerIterator.hasNext() && !isDeleted())
             {
-                QueueConsumer<?> sub = consumerIterator.next();
+                QueueConsumer<?,?> sub = consumerIterator.next();
                 if(sub.acquires())
                 {
                     getNextAvailableEntry(sub);

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AssignedConsumerMessageGroupManager.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AssignedConsumerMessageGroupManager.java?rev=1772527&r1=1772526&r2=1772527&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AssignedConsumerMessageGroupManager.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AssignedConsumerMessageGroupManager.java Sun Dec  4 12:24:57 2016
@@ -34,10 +34,10 @@ public class AssignedConsumerMessageGrou
 
 
     private final String _groupId;
-    private final ConcurrentMap<Integer, QueueConsumer<?>> _groupMap = new ConcurrentHashMap<Integer, QueueConsumer<?>>();
+    private final ConcurrentMap<Integer, QueueConsumer<?,?>> _groupMap = new ConcurrentHashMap<>();
     private final int _groupMask;
 
-    public AssignedConsumerMessageGroupManager(final String groupId, final int maxGroups)
+    AssignedConsumerMessageGroupManager(final String groupId, final int maxGroups)
     {
         _groupId = groupId;
         _groupMask = pow2(maxGroups)-1;
@@ -63,17 +63,17 @@ public class AssignedConsumerMessageGrou
         }
         else
         {
-            QueueConsumer assignedSub = _groupMap.get(groupVal.hashCode() & _groupMask);
+            QueueConsumer<?,?> assignedSub = _groupMap.get(groupVal.hashCode() & _groupMask);
             return assignedSub == null || assignedSub == sub;
         }
     }
 
-    public boolean acceptMessage(QueueConsumer<?> sub, QueueEntry entry)
+    public boolean acceptMessage(QueueConsumer<?,?> sub, QueueEntry entry)
     {
         return assignMessage(sub, entry) && entry.acquire(sub);
     }
 
-    private boolean assignMessage(QueueConsumer<?> sub, QueueEntry entry)
+    private boolean assignMessage(QueueConsumer<?,?> sub, QueueEntry entry)
     {
         Object groupVal = entry.getMessage().getMessageHeader().getHeader(_groupId);
         if(groupVal == null)
@@ -83,7 +83,7 @@ public class AssignedConsumerMessageGrou
         else
         {
             Integer group = groupVal.hashCode() & _groupMask;
-            QueueConsumer assignedSub = _groupMap.get(group);
+            QueueConsumer<?,?> assignedSub = _groupMap.get(group);
             if(assignedSub == sub)
             {
                 return true;
@@ -104,7 +104,7 @@ public class AssignedConsumerMessageGrou
         }
     }
     
-    public QueueEntry findEarliestAssignedAvailableEntry(QueueConsumer<?> sub)
+    public QueueEntry findEarliestAssignedAvailableEntry(QueueConsumer<?,?> sub)
     {
         EntryFinder visitor = new EntryFinder(sub);
         sub.getQueue().visit(visitor);
@@ -114,9 +114,9 @@ public class AssignedConsumerMessageGrou
     private class EntryFinder implements QueueEntryVisitor
     {
         private QueueEntry _entry;
-        private QueueConsumer<?> _sub;
+        private QueueConsumer<?,?> _sub;
 
-        public EntryFinder(final QueueConsumer<?> sub)
+        EntryFinder(final QueueConsumer<?, ?> sub)
         {
             _sub = sub;
         }
@@ -135,7 +135,7 @@ public class AssignedConsumerMessageGrou
             }
 
             Integer group = groupId.hashCode() & _groupMask;
-            QueueConsumer<?> assignedSub = _groupMap.get(group);
+            QueueConsumer<?,?> assignedSub = _groupMap.get(group);
             if(assignedSub == _sub)
             {
                 _entry = entry;
@@ -153,9 +153,9 @@ public class AssignedConsumerMessageGrou
         }
     }
 
-    public void clearAssignments(QueueConsumer<?> sub)
+    public void clearAssignments(QueueConsumer<?,?> sub)
     {
-        Iterator<QueueConsumer<?>> subIter = _groupMap.values().iterator();
+        Iterator<QueueConsumer<?,?>> subIter = _groupMap.values().iterator();
         while(subIter.hasNext())
         {
             if(subIter.next() == sub)

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/DefinedGroupMessageGroupManager.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/DefinedGroupMessageGroupManager.java?rev=1772527&r1=1772526&r2=1772527&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/DefinedGroupMessageGroupManager.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/DefinedGroupMessageGroupManager.java Sun Dec  4 12:24:57 2016
@@ -24,8 +24,6 @@ import org.apache.qpid.server.message.Me
 import org.apache.qpid.server.message.MessageInstance.ConsumerAcquiredState;
 import org.apache.qpid.server.message.MessageInstance.EntryState;
 import org.apache.qpid.server.util.StateChangeListener;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import org.apache.qpid.server.message.AMQMessageHeader;
 import org.apache.qpid.server.message.ServerMessage;
@@ -37,21 +35,19 @@ import java.util.TreeSet;
 
 public class DefinedGroupMessageGroupManager implements MessageGroupManager
 {
-    private static final Logger _logger = LoggerFactory.getLogger(DefinedGroupMessageGroupManager.class);
-
     private final String _groupId;
     private final String _defaultGroup;
-    private final Map<Object, Group> _groupMap = new HashMap<Object, Group>();
+    private final Map<Object, Group> _groupMap = new HashMap<>();
     private final ConsumerResetHelper _resetHelper;
 
     private final class Group
     {
         private final Object _group;
         private final SortedSet<QueueEntry> _skippedEntries = new TreeSet<>();
-        private QueueConsumer<?> _consumer;
+        private QueueConsumer<?,?> _consumer;
         private int _activeCount;
 
-        private Group(final Object key, final QueueConsumer<?> consumer)
+        private Group(final Object key, final QueueConsumer<?,?> consumer)
         {
             _group = key;
             _consumer = consumer;
@@ -70,7 +66,7 @@ public class DefinedGroupMessageGroupMan
             }
         }
         
-        public void subtract(final QueueEntry entry, final boolean released)
+        void subtract(final QueueEntry entry, final boolean released)
         {
             if(!released)
             {
@@ -116,7 +112,7 @@ public class DefinedGroupMessageGroupMan
             return !(_consumer == null || (_activeCount == 0 && _consumer.isClosed()));
         }
 
-        public QueueConsumer<?> getConsumer()
+        public QueueConsumer<?,?> getConsumer()
         {
             return _consumer;
         }
@@ -131,13 +127,13 @@ public class DefinedGroupMessageGroupMan
                     '}';
         }
 
-        public void addSkippedEntry(final QueueEntry entry)
+        void addSkippedEntry(final QueueEntry entry)
         {
             _skippedEntries.add(entry);
         }
     }
 
-    public DefinedGroupMessageGroupManager(final String groupId, String defaultGroup, ConsumerResetHelper resetHelper)
+    DefinedGroupMessageGroupManager(final String groupId, String defaultGroup, ConsumerResetHelper resetHelper)
     {
         _groupId = groupId;
         _defaultGroup = defaultGroup;
@@ -157,12 +153,12 @@ public class DefinedGroupMessageGroupMan
         return possibleAssignment;
     }
 
-    public synchronized boolean acceptMessage(final QueueConsumer<?> sub, final QueueEntry entry)
+    public synchronized boolean acceptMessage(final QueueConsumer<?,?> sub, final QueueEntry entry)
     {
         return assignMessage(sub, entry) && entry.acquire(sub);
     }
 
-    private boolean assignMessage(final QueueConsumer<?> sub, final QueueEntry entry)
+    private boolean assignMessage(final QueueConsumer<?,?> sub, final QueueEntry entry)
     {
         Object groupId = getKey(entry);
         Group group = _groupMap.get(groupId);
@@ -182,7 +178,7 @@ public class DefinedGroupMessageGroupMan
             }
         }
 
-        QueueConsumer<?> assignedSub = group.getConsumer();
+        QueueConsumer<?,?> assignedSub = group.getConsumer();
 
         if(assignedSub == sub)
         {
@@ -196,7 +192,7 @@ public class DefinedGroupMessageGroupMan
         }
     }
 
-    public synchronized QueueEntry findEarliestAssignedAvailableEntry(final QueueConsumer<?> sub)
+    public synchronized QueueEntry findEarliestAssignedAvailableEntry(final QueueConsumer<?,?> sub)
     {
         EntryFinder visitor = new EntryFinder(sub);
         sub.getQueue().visit(visitor);
@@ -206,9 +202,9 @@ public class DefinedGroupMessageGroupMan
     private class EntryFinder implements QueueEntryVisitor
     {
         private QueueEntry _entry;
-        private QueueConsumer _sub;
+        private QueueConsumer<?,?> _sub;
 
-        public EntryFinder(final QueueConsumer<?> sub)
+        EntryFinder(final QueueConsumer<?, ?> sub)
         {
             _sub = sub;
         }
@@ -241,7 +237,7 @@ public class DefinedGroupMessageGroupMan
     }
 
     
-    public void clearAssignments(final QueueConsumer<?> sub)
+    public void clearAssignments(final QueueConsumer<?,?> sub)
     {
     }
     
@@ -261,7 +257,7 @@ public class DefinedGroupMessageGroupMan
     {
         private final Group _group;
 
-        public GroupStateChangeListener(final Group group)
+        GroupStateChangeListener(final Group group)
         {
             _group = group;
         }

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/MessageGroupManager.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/MessageGroupManager.java?rev=1772527&r1=1772526&r2=1772527&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/MessageGroupManager.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/MessageGroupManager.java Sun Dec  4 12:24:57 2016
@@ -26,14 +26,14 @@ public interface MessageGroupManager
     {
         void resetSubPointersForGroups(QueueEntry entry);
 
-        boolean isEntryAheadOfConsumer(QueueEntry entry, QueueConsumer<?> sub);
+        boolean isEntryAheadOfConsumer(QueueEntry entry, QueueConsumer<?,?> sub);
     }
 
     boolean mightAssign(QueueEntry entry, final QueueConsumer sub);
 
-    boolean acceptMessage(QueueConsumer<?> sub, QueueEntry entry);
+    boolean acceptMessage(QueueConsumer<?,?> sub, QueueEntry entry);
 
-    QueueEntry findEarliestAssignedAvailableEntry(QueueConsumer<?> sub);
+    QueueEntry findEarliestAssignedAvailableEntry(QueueConsumer<?,?> sub);
 
-    void clearAssignments(QueueConsumer<?> sub);
+    void clearAssignments(QueueConsumer<?,?> sub);
 }

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java?rev=1772527&r1=1772526&r2=1772527&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java Sun Dec  4 12:24:57 2016
@@ -28,7 +28,7 @@ import org.apache.qpid.server.virtualhos
 public abstract class OutOfOrderQueue<X extends OutOfOrderQueue<X>> extends AbstractQueue<X>
 {
 
-    protected OutOfOrderQueue(Map<String, Object> attributes, QueueManagingVirtualHost<?> virtualHost)
+    OutOfOrderQueue(Map<String, Object> attributes, QueueManagingVirtualHost<?> virtualHost)
     {
         super(attributes, virtualHost);
     }
@@ -37,11 +37,11 @@ public abstract class OutOfOrderQueue<X
     protected void checkConsumersNotAheadOfDelivery(final QueueEntry entry)
     {
         // check that all consumers are not in advance of the entry
-        Iterator<QueueConsumer<?>> consumerIterator = getQueueConsumerManager().getAllIterator();
+        Iterator<QueueConsumer<?,?>> consumerIterator = getQueueConsumerManager().getAllIterator();
 
         while (consumerIterator.hasNext() && !entry.isAcquired())
         {
-            QueueConsumer<?> consumer = consumerIterator.next();
+            QueueConsumer<?,?> consumer = consumerIterator.next();
 
             if(!consumer.isClosed())
             {

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java?rev=1772527&r1=1772526&r2=1772527&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java Sun Dec  4 12:24:57 2016
@@ -20,11 +20,12 @@
  */
 package org.apache.qpid.server.queue;
 
+import org.apache.qpid.server.consumer.ConsumerTarget;
 import org.apache.qpid.server.message.MessageInstance;
 import org.apache.qpid.server.model.Consumer;
 import org.apache.qpid.server.model.Queue;
 
-public interface QueueConsumer<X extends QueueConsumer<X>> extends Consumer<X>
+public interface QueueConsumer<X extends QueueConsumer<X,T>, T extends ConsumerTarget> extends Consumer<X, T>
 {
     void flushBatched();
 

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java?rev=1772527&r1=1772526&r2=1772527&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java Sun Dec  4 12:24:57 2016
@@ -61,15 +61,15 @@ import org.apache.qpid.server.protocol.M
 import org.apache.qpid.server.security.access.Operation;
 import org.apache.qpid.server.util.StateChangeListener;
 
-class QueueConsumerImpl
-    extends AbstractConfiguredObject<QueueConsumerImpl>
-        implements QueueConsumer<QueueConsumerImpl>, LogSubject
+class QueueConsumerImpl<T extends ConsumerTarget>
+    extends AbstractConfiguredObject<QueueConsumerImpl<T>>
+        implements QueueConsumer<QueueConsumerImpl<T>,T>, LogSubject
 {
     private final static Logger LOGGER = LoggerFactory.getLogger(QueueConsumerImpl.class);
     private final AtomicBoolean _closed = new AtomicBoolean(false);
     private final long _consumerNumber;
     private final long _createTime = System.currentTimeMillis();
-    private final MessageInstance.StealableConsumerAcquiredState<QueueConsumerImpl>
+    private final MessageInstance.StealableConsumerAcquiredState<QueueConsumerImpl<T>>
             _owningState = new MessageInstance.StealableConsumerAcquiredState<>(this);
     private final WaitingOnCreditMessageListener _waitingOnCreditMessageListener = new WaitingOnCreditMessageListener();
     private final boolean _acquires;
@@ -82,7 +82,7 @@ class QueueConsumerImpl
     private final Object _sessionReference;
     private final AbstractQueue _queue;
 
-    private final ConsumerTarget _target;
+    private final T _target;
     private volatile QueueContext _queueContext;
 
 
@@ -104,7 +104,7 @@ class QueueConsumerImpl
     private QueueConsumerNode _queueConsumerNode;
 
     QueueConsumerImpl(final AbstractQueue<?> queue,
-                      ConsumerTarget target,
+                      T target,
                       final String consumerName,
                       final FilterManager filters,
                       final Class<? extends ServerMessage> messageClass,
@@ -173,7 +173,7 @@ class QueueConsumerImpl
     }
 
     @Override
-    public ConsumerTarget getTarget()
+    public T getTarget()
     {
         return _target;
     }
@@ -470,7 +470,7 @@ class QueueConsumerImpl
         return _createTime;
     }
 
-    public final MessageInstance.StealableConsumerAcquiredState<QueueConsumerImpl> getOwningState()
+    public final MessageInstance.StealableConsumerAcquiredState<QueueConsumerImpl<T>> getOwningState()
     {
         return _owningState;
     }

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerManager.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerManager.java?rev=1772527&r1=1772526&r2=1772527&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerManager.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerManager.java Sun Dec  4 12:24:57 2016
@@ -24,14 +24,14 @@ import java.util.Iterator;
 
 public interface QueueConsumerManager
 {
-    void addConsumer(QueueConsumer<?> consumer);
-    boolean removeConsumer(QueueConsumer<?> consumer);
-    boolean setInterest(QueueConsumer<?> consumer, boolean interested);
-    boolean setNotified(QueueConsumer<?> consumer, boolean notified);
+    void addConsumer(QueueConsumer<?,?> consumer);
+    boolean removeConsumer(QueueConsumer<?,?> consumer);
+    boolean setInterest(QueueConsumer<?,?> consumer, boolean interested);
+    boolean setNotified(QueueConsumer<?,?> consumer, boolean notified);
 
-    Iterator<QueueConsumer<?>> getInterestedIterator();
-    Iterator<QueueConsumer<?>> getAllIterator();
-    Iterator<QueueConsumer<?>> getNonAcquiringIterator();
+    Iterator<QueueConsumer<?,?>> getInterestedIterator();
+    Iterator<QueueConsumer<?,?>> getAllIterator();
+    Iterator<QueueConsumer<?,?>> getNonAcquiringIterator();
 
     int getAllSize();
     int getHighestNotifiedPriority();

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerManagerImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerManagerImpl.java?rev=1772527&r1=1772526&r2=1772527&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerManagerImpl.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerManagerImpl.java Sun Dec  4 12:24:57 2016
@@ -71,7 +71,7 @@ public class QueueConsumerManagerImpl im
 
     // Always in the config thread
     @Override
-    public void addConsumer(final QueueConsumer<?> consumer)
+    public void addConsumer(final QueueConsumer<?,?> consumer)
     {
         QueueConsumerNode node = new QueueConsumerNode(this, consumer);
         addToAll(node);
@@ -97,7 +97,7 @@ public class QueueConsumerManagerImpl im
 
     // Always in the config thread
     @Override
-    public boolean removeConsumer(final QueueConsumer<?> consumer)
+    public boolean removeConsumer(final QueueConsumer<?,?> consumer)
     {
         removeFromAll(consumer);
         QueueConsumerNode node = consumer.getQueueConsumerNode();
@@ -112,7 +112,7 @@ public class QueueConsumerManagerImpl im
 
     // Set by the consumer always in the IO thread
     @Override
-    public boolean setInterest(final QueueConsumer consumer, final boolean interested)
+    public boolean setInterest(final QueueConsumer<?,?> consumer, final boolean interested)
     {
         QueueConsumerNode node = consumer.getQueueConsumerNode();
         if (interested)
@@ -141,7 +141,7 @@ public class QueueConsumerManagerImpl im
 
     // Set by the Queue any IO thread
     @Override
-    public boolean setNotified(final QueueConsumer consumer, final boolean notified)
+    public boolean setNotified(final QueueConsumer<?,?> consumer, final boolean notified)
     {
         QueueConsumerNode node = consumer.getQueueConsumerNode();
         if (consumer.acquires())
@@ -162,19 +162,19 @@ public class QueueConsumerManagerImpl im
     }
 
     @Override
-    public Iterator<QueueConsumer<?>> getInterestedIterator()
+    public Iterator<QueueConsumer<?,?>> getInterestedIterator()
     {
         return new QueueConsumerIterator(new PrioritisedQueueConsumerNodeIterator(_interested));
     }
 
     @Override
-    public Iterator<QueueConsumer<?>> getAllIterator()
+    public Iterator<QueueConsumer<?,?>> getAllIterator()
     {
         return new QueueConsumerIterator(new PrioritisedQueueConsumerNodeIterator(_allConsumers));
     }
 
     @Override
-    public Iterator<QueueConsumer<?>> getNonAcquiringIterator()
+    public Iterator<QueueConsumer<?,?>> getNonAcquiringIterator()
     {
         return new QueueConsumerIterator(_nonAcquiring.iterator());
     }
@@ -241,7 +241,7 @@ public class QueueConsumerManagerImpl im
         return newListEntry;
     }
 
-    private static class QueueConsumerIterator implements Iterator<QueueConsumer<?>>
+    private static class QueueConsumerIterator implements Iterator<QueueConsumer<?,?>>
     {
         private final Iterator<QueueConsumerNode> _underlying;
 
@@ -257,7 +257,7 @@ public class QueueConsumerManagerImpl im
         }
 
         @Override
-        public QueueConsumer<?> next()
+        public QueueConsumer<?,?> next()
         {
             return _underlying.next().getQueueConsumer();
         }
@@ -296,7 +296,7 @@ public class QueueConsumerManagerImpl im
         _interested.add(i, new PriorityConsumerListPair(consumerPriority));
     }
 
-    private void removeFromAll(final QueueConsumer<?> consumer)
+    private void removeFromAll(final QueueConsumer<?,?> consumer)
     {
         final QueueConsumerNode node = consumer.getQueueConsumerNode();
         int consumerPriority = consumer.getPriority();

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerNode.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerNode.java?rev=1772527&r1=1772526&r2=1772527&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerNode.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerNode.java Sun Dec  4 12:24:57 2016
@@ -25,18 +25,18 @@ import java.util.Collection;
 final class QueueConsumerNode
 {
     private final QueueConsumerManagerImpl _queueConsumerManager;
-    private final QueueConsumer<?> _queueConsumer;
+    private final QueueConsumer<?,?> _queueConsumer;
     private QueueConsumerNodeListEntry _listEntry;
     private QueueConsumerManagerImpl.NodeState _state = QueueConsumerManagerImpl.NodeState.REMOVED;
     private QueueConsumerNodeListEntry _allEntry;
 
-    QueueConsumerNode(final QueueConsumerManagerImpl queueConsumerManager, final QueueConsumer<?> queueConsumer)
+    QueueConsumerNode(final QueueConsumerManagerImpl queueConsumerManager, final QueueConsumer<?,?> queueConsumer)
     {
         _queueConsumerManager = queueConsumerManager;
         _queueConsumer = queueConsumer;
     }
 
-    public QueueConsumer<?> getQueueConsumer()
+    public QueueConsumer<?,?> getQueueConsumer()
     {
         return _queueConsumer;
     }

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java?rev=1772527&r1=1772526&r2=1772527&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java Sun Dec  4 12:24:57 2016
@@ -81,6 +81,7 @@ public abstract class QueueEntryImpl imp
         (QueueEntryImpl.class, EntryState.class, "_state");
 
 
+    @SuppressWarnings("unused")
     private volatile StateChangeListenerEntry<? super QueueEntry, EntryState> _stateChangeListeners;
 
     private static final
@@ -97,12 +98,13 @@ public abstract class QueueEntryImpl imp
         (QueueEntryImpl.class, "_entryId");
 
 
+    @SuppressWarnings("unused")
     private volatile long _entryId;
 
-    private static int REDELIVERED_FLAG = 1;
-    private static int PERSISTENT_FLAG = 2;
-    private static int MANDATORY_FLAG = 4;
-    private static int IMMEDIATE_FLAG = 8;
+    private static final int REDELIVERED_FLAG = 1;
+    private static final int PERSISTENT_FLAG = 2;
+    private static final int MANDATORY_FLAG = 4;
+    private static final int IMMEDIATE_FLAG = 8;
     private int _flags;
     private long _expiration;
 
@@ -114,17 +116,17 @@ public abstract class QueueEntryImpl imp
     private final MessageEnqueueRecord _enqueueRecord;
 
 
-    public QueueEntryImpl(QueueEntryList queueEntryList)
+    QueueEntryImpl(QueueEntryList queueEntryList)
     {
         this(queueEntryList, null, Long.MIN_VALUE, null);
         _state = DELETED_STATE;
     }
 
 
-    public QueueEntryImpl(QueueEntryList queueEntryList,
-                          ServerMessage message,
-                          final long entryId,
-                          final MessageEnqueueRecord enqueueRecord)
+    QueueEntryImpl(QueueEntryList queueEntryList,
+                   ServerMessage message,
+                   final long entryId,
+                   final MessageEnqueueRecord enqueueRecord)
     {
         _queueEntryList = queueEntryList;
 
@@ -135,9 +137,9 @@ public abstract class QueueEntryImpl imp
         _enqueueRecord = enqueueRecord;
     }
 
-    public QueueEntryImpl(QueueEntryList queueEntryList,
-                          ServerMessage message,
-                          final MessageEnqueueRecord enqueueRecord)
+    QueueEntryImpl(QueueEntryList queueEntryList,
+                   ServerMessage message,
+                   final MessageEnqueueRecord enqueueRecord)
     {
         _queueEntryList = queueEntryList;
         _message = message == null ? null :  message.newReference(queueEntryList.getQueue());
@@ -167,12 +169,12 @@ public abstract class QueueEntryImpl imp
         return new EntryInstanceProperties();
     }
 
-    protected void setEntryId(long entryId)
+    void setEntryId(long entryId)
     {
         _entryIdUpdater.set(this, entryId);
     }
 
-    protected long getEntryId()
+    long getEntryId()
     {
         return _entryId;
     }
@@ -263,7 +265,7 @@ public abstract class QueueEntryImpl imp
         boolean acquired = acquire();
         if(!acquired)
         {
-            QueueConsumer consumer = getAcquiringConsumer();
+            QueueConsumer<?,?> consumer = getAcquiringConsumer();
             acquired = removeAcquisitionFromConsumer(consumer);
             if(acquired)
             {
@@ -306,7 +308,7 @@ public abstract class QueueEntryImpl imp
 
     public boolean acquire(MessageInstanceConsumer sub)
     {
-        final boolean acquired = acquire(((QueueConsumer<?>) sub).getOwningState().getUnstealableState());
+        final boolean acquired = acquire(((QueueConsumer<?,?>) sub).getOwningState().getUnstealableState());
         if(acquired)
         {
             _deliveryCountUpdater.compareAndSet(this,-1,0);
@@ -356,13 +358,13 @@ public abstract class QueueEntryImpl imp
     }
 
     @Override
-    public QueueConsumer<?> getAcquiringConsumer()
+    public QueueConsumer<?,?> getAcquiringConsumer()
     {
-        QueueConsumer<?> consumer;
+        QueueConsumer<?,?> consumer;
         EntryState state = _state;
         if (state instanceof ConsumerAcquiredState)
         {
-            consumer = ((ConsumerAcquiredState<QueueConsumer<?>>)state).getConsumer();
+            consumer = ((ConsumerAcquiredState<QueueConsumer<?,?>>)state).getConsumer();
         }
         else
         {
@@ -471,7 +473,7 @@ public abstract class QueueEntryImpl imp
 
     public void reject()
     {
-        QueueConsumer consumer = getAcquiringConsumer();
+        QueueConsumer<?,?> consumer = getAcquiringConsumer();
 
         if (consumer != null)
         {
@@ -491,15 +493,7 @@ public abstract class QueueEntryImpl imp
     @Override
     public boolean isRejectedBy(MessageInstanceConsumer consumer)
     {
-
-        if (_rejectedBy != null) // We have consumers that rejected this message
-        {
-            return _rejectedBy.contains(consumer.getIdentifier());
-        }
-        else // This message hasn't been rejected yet.
-        {
-            return false;
-        }
+        return _rejectedBy != null && _rejectedBy.contains(consumer.getIdentifier());
     }
 
     private boolean dequeue()
@@ -633,11 +627,7 @@ public abstract class QueueEntryImpl imp
     public boolean removeStateChangeListener(StateChangeListener<? super MessageInstance, EntryState> listener)
     {
         StateChangeListenerEntry entry = _listenersUpdater.get(this);
-        if(entry != null)
-        {
-            return entry.remove(listener);
-        }
-        return false;
+        return entry != null && entry.remove(listener);
     }
 
     @Override

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/security/TrustStoreMessageSource.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/security/TrustStoreMessageSource.java?rev=1772527&r1=1772526&r2=1772527&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/security/TrustStoreMessageSource.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/security/TrustStoreMessageSource.java Sun Dec  4 12:24:57 2016
@@ -91,7 +91,7 @@ public class TrustStoreMessageSource ext
     }
 
     @Override
-    public Consumer addConsumer(final ConsumerTarget target,
+    public <T extends ConsumerTarget<T>> Consumer<T> addConsumer(final T target,
                                 final FilterManager filters,
                                 final Class<? extends ServerMessage> messageClass,
                                 final String consumerName,
@@ -99,7 +99,7 @@ public class TrustStoreMessageSource ext
             throws ExistingExclusiveConsumer, ExistingConsumerPreventsExclusive,
                    ConsumerAccessRefused, QueueDeleted
     {
-        final Consumer consumer = super.addConsumer(target, filters, messageClass, consumerName, options, priority);
+        final Consumer<T> consumer = super.addConsumer(target, filters, messageClass, consumerName, options, priority);
         consumer.send(createMessage());
         target.queueEmpty();
         return consumer;

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java?rev=1772527&r1=1772526&r2=1772527&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java Sun Dec  4 12:24:57 2016
@@ -57,7 +57,7 @@ public interface AMQPConnection<C extend
 
     void registerMessageDelivered(long size);
 
-    void closeSessionAsync(AMQSessionModel<?> session, CloseReason reason, String message);
+    void closeSessionAsync(AMQSessionModel<?,?> session, CloseReason reason, String message);
 
     SocketAddress getRemoteSocketAddress();
 
@@ -91,11 +91,11 @@ public interface AMQPConnection<C extend
      *
      * @return list of sessions
      */
-    Collection<? extends AMQSessionModel<?>> getSessionModels();
+    Collection<? extends AMQSessionModel<?,?>> getSessionModels();
 
     void resetStatistics();
 
-    void notifyWork(AMQSessionModel<?> sessionModel);
+    void notifyWork(AMQSessionModel<?,?> sessionModel);
 
     boolean isTransportBlockedForWriting();
 }

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java?rev=1772527&r1=1772526&r2=1772527&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java Sun Dec  4 12:24:57 2016
@@ -550,7 +550,7 @@ public abstract class AbstractAMQPConnec
         return _subject;
     }
 
-    public void sessionAdded(final AMQSessionModel<?> session)
+    public void sessionAdded(final AMQSessionModel<?,?> session)
     {
         SessionAdapter adapter = new SessionAdapter(this, session);
         adapter.create();
@@ -558,7 +558,7 @@ public abstract class AbstractAMQPConnec
 
     }
 
-    public void sessionRemoved(final AMQSessionModel<?> session)
+    public void sessionRemoved(final AMQSessionModel<?,?> session)
     {
     }
 

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java?rev=1772527&r1=1772526&r2=1772527&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java Sun Dec  4 12:24:57 2016
@@ -54,7 +54,7 @@ public abstract class AbstractSystemMess
     protected final UUID _id;
     protected final String _name;
     protected final NamedAddressSpace _addressSpace;
-    private List<Consumer> _consumers = new CopyOnWriteArrayList<>();
+    private List<Consumer<?>> _consumers = new CopyOnWriteArrayList<>();
 
     public AbstractSystemMessageSource(String name, final NamedAddressSpace addressSpace)
     {
@@ -83,7 +83,7 @@ public abstract class AbstractSystemMess
     }
 
     @Override
-    public Consumer addConsumer(final ConsumerTarget target,
+    public <T extends ConsumerTarget<T>> Consumer<T> addConsumer(final T target,
                                 final FilterManager filters,
                                 final Class<? extends ServerMessage> messageClass,
                                 final String consumerName,
@@ -98,28 +98,28 @@ public abstract class AbstractSystemMess
     }
 
     @Override
-    public Collection<Consumer> getConsumers()
+    public Collection<Consumer<?>> getConsumers()
     {
         return new ArrayList<>(_consumers);
     }
 
     @Override
-    public boolean verifySessionAccess(final AMQSessionModel<?> session)
+    public boolean verifySessionAccess(final AMQSessionModel<?,?> session)
     {
         return true;
     }
 
-    protected class Consumer implements MessageInstanceConsumer
+    protected class Consumer<T extends ConsumerTarget> implements MessageInstanceConsumer<T>
     {
 
         private final List<PropertiesMessageInstance> _queue =
                 Collections.synchronizedList(new ArrayList<PropertiesMessageInstance>());
-        private final ConsumerTarget _target;
+        private final T _target;
         private final String _name;
         private final Object _identifier = new Object();
 
 
-        public Consumer(final String consumerName, ConsumerTarget target)
+        public Consumer(final String consumerName, T target)
         {
             _name = consumerName;
             _target = target;
@@ -141,7 +141,7 @@ public abstract class AbstractSystemMess
         }
 
         @Override
-        public ConsumerTarget getTarget()
+        public T getTarget()
         {
             return _target;
         }

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostPropertiesNode.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostPropertiesNode.java?rev=1772527&r1=1772526&r2=1772527&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostPropertiesNode.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostPropertiesNode.java Sun Dec  4 12:24:57 2016
@@ -47,7 +47,7 @@ public class VirtualHostPropertiesNode e
     }
 
     @Override
-    public Consumer addConsumer(final ConsumerTarget target,
+    public <T extends ConsumerTarget<T>> Consumer<T> addConsumer(final T target,
                                 final FilterManager filters,
                                 final Class<? extends ServerMessage> messageClass,
                                 final String consumerName,
@@ -55,7 +55,7 @@ public class VirtualHostPropertiesNode e
             throws ExistingExclusiveConsumer, ExistingConsumerPreventsExclusive,
                    ConsumerAccessRefused, QueueDeleted
     {
-        final Consumer consumer = super.addConsumer(target, filters, messageClass, consumerName, options, priority);
+        final Consumer<T> consumer = super.addConsumer(target, filters, messageClass, consumerName, options, priority);
         consumer.send(createMessage());
         target.queueEmpty();
         return consumer;

Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/consumer/TestConsumerTarget.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/consumer/TestConsumerTarget.java?rev=1772527&r1=1772526&r2=1772527&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/consumer/TestConsumerTarget.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/consumer/TestConsumerTarget.java Sun Dec  4 12:24:57 2016
@@ -50,7 +50,7 @@ import org.apache.qpid.server.transport.
 import org.apache.qpid.server.util.Action;
 import org.apache.qpid.transport.network.Ticker;
 
-public class TestConsumerTarget implements ConsumerTarget
+public class TestConsumerTarget implements ConsumerTarget<TestConsumerTarget>
 {
 
     private boolean _closed = false;
@@ -240,7 +240,7 @@ public class TestConsumerTarget implemen
         }
     }
 
-    private static class MockSessionModel implements AMQSessionModel<MockSessionModel>
+    private static class MockSessionModel implements AMQSessionModel<MockSessionModel, TestConsumerTarget>
     {
         private final UUID _id = UUID.randomUUID();
         private Session _modelObject;
@@ -355,7 +355,7 @@ public class TestConsumerTarget implemen
         }
 
         @Override
-        public Collection<Consumer<?>> getConsumers()
+        public Collection<Consumer<?,TestConsumerTarget>> getConsumers()
         {
             return null;
         }
@@ -434,7 +434,7 @@ public class TestConsumerTarget implemen
         }
 
         @Override
-        public void notifyWork(final ConsumerTarget target)
+        public void notifyWork(final TestConsumerTarget target)
         {
             _connection.notifyWork(this);
         }

Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/model/BrokerTestHelper.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/model/BrokerTestHelper.java?rev=1772527&r1=1772526&r2=1772527&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/model/BrokerTestHelper.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/model/BrokerTestHelper.java Sun Dec  4 12:24:57 2016
@@ -226,7 +226,7 @@ public class BrokerTestHelper
         return createVirtualHost(attributes, broker, defaultVHN, accessControl);
     }
 
-    public static AMQSessionModel<?> createSession(int channelId, AMQPConnection<?> connection)
+    public static AMQSessionModel<?,?> createSession(int channelId, AMQPConnection<?> connection)
     {
         @SuppressWarnings("rawtypes")
         AMQSessionModel session = mock(AMQSessionModel.class);
@@ -235,13 +235,13 @@ public class BrokerTestHelper
         return session;
     }
 
-    public static AMQSessionModel<?> createSession(int channelId) throws Exception
+    public static AMQSessionModel<?,?> createSession(int channelId) throws Exception
     {
         AMQPConnection<?> session = createConnection();
         return createSession(channelId, session);
     }
 
-    public static AMQSessionModel<?> createSession() throws Exception
+    public static AMQSessionModel<?,?> createSession() throws Exception
     {
         return createSession(1);
     }



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message