qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rgodf...@apache.org
Subject svn commit: r1765973 [6/7] - in /qpid/java/branches/transfer-queue: bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/ bdbstore/src/test/java/org/apache/qpid/server/stor...
Date Fri, 21 Oct 2016 09:32:09 GMT
Modified: qpid/java/branches/transfer-queue/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/transfer-queue/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java?rev=1765973&r1=1765972&r2=1765973&view=diff
==============================================================================
--- qpid/java/branches/transfer-queue/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java (original)
+++ qpid/java/branches/transfer-queue/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java Fri Oct 21 09:32:07 2016
@@ -33,13 +33,14 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.qpid.bytebuffer.QpidByteBuffer;
 import org.apache.qpid.server.consumer.AbstractConsumerTarget;
-import org.apache.qpid.server.consumer.ConsumerImpl;
 import org.apache.qpid.server.flow.FlowCreditManager;
 import org.apache.qpid.server.logging.EventLogger;
 import org.apache.qpid.server.logging.messages.ChannelMessages;
+import org.apache.qpid.server.message.BaseMessageInstance;
 import org.apache.qpid.server.message.MessageInstance;
 import org.apache.qpid.server.message.MessageInstance.ConsumerAcquiredState;
 import org.apache.qpid.server.message.MessageInstance.EntryState;
+import org.apache.qpid.server.message.MessageInstanceConsumer;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.model.Exchange;
 import org.apache.qpid.server.model.Queue;
@@ -88,7 +89,7 @@ public class ConsumerTarget_0_10 extends
 
     private int _deferredMessageCredit;
     private long _deferredSizeCredit;
-    private final List<ConsumerImpl> _consumers = new CopyOnWriteArrayList<>();
+    private final List<MessageInstanceConsumer> _consumers = new CopyOnWriteArrayList<>();
 
     private final StateChangeListener<MessageInstance, EntryState> _unacknowledgedMessageListener = new StateChangeListener<MessageInstance, EntryState>()
     {
@@ -148,7 +149,7 @@ public class ConsumerTarget_0_10 extends
     protected void afterCloseInternal()
     {
 
-        for (ConsumerImpl consumer : _consumers)
+        for (MessageInstanceConsumer consumer : _consumers)
         {
             consumer.close();
         }
@@ -219,7 +220,7 @@ public class ConsumerTarget_0_10 extends
 
     private final AddMessageDispositionListenerAction _postIdSettingAction;
 
-    public void doSend(final ConsumerImpl consumer, final MessageInstance entry, boolean batch)
+    public void doSend(final MessageInstanceConsumer consumer, final MessageInstance entry, boolean batch)
     {
         ServerMessage serverMsg = entry.getMessage();
 
@@ -441,7 +442,7 @@ public class ConsumerTarget_0_10 extends
                            });
    }
 
-    void reject(final ConsumerImpl consumer, final MessageInstance entry)
+    void reject(final MessageInstanceConsumer consumer, final MessageInstance entry)
     {
         entry.setRedelivered();
         if (entry.makeAcquisitionUnstealable(consumer))
@@ -450,7 +451,7 @@ public class ConsumerTarget_0_10 extends
         }
     }
 
-    void release(final ConsumerImpl consumer,
+    void release(final MessageInstanceConsumer consumer,
                  final MessageInstance entry,
                  final boolean setRedelivered)
     {
@@ -474,17 +475,17 @@ public class ConsumerTarget_0_10 extends
         }
     }
 
-    protected void sendToDLQOrDiscard(final ConsumerImpl consumer, MessageInstance entry)
+    protected void sendToDLQOrDiscard(final MessageInstanceConsumer consumer, MessageInstance entry)
     {
         final ServerMessage msg = entry.getMessage();
 
         int requeues = 0;
         if (entry.makeAcquisitionUnstealable(consumer))
         {
-            requeues = entry.routeToAlternate(new Action<MessageInstance>()
+            requeues = entry.routeToAlternate(new Action<BaseMessageInstance>()
             {
                 @Override
-                public void performAction(final MessageInstance requeueEntry)
+                public void performAction(final BaseMessageInstance requeueEntry)
                 {
                     getEventLogger().message(ChannelMessages.DEADLETTERMSG(msg.getMessageNumber(),
                                                                            requeueEntry.getOwningResource()
@@ -620,7 +621,7 @@ public class ConsumerTarget_0_10 extends
     public void flush()
     {
         flushCreditState(true);
-        for(ConsumerImpl consumer : _consumers)
+        for(MessageInstanceConsumer consumer : _consumers)
         {
             consumer.flush();
         }
@@ -652,15 +653,15 @@ public class ConsumerTarget_0_10 extends
     }
 
     @Override
-    public void consumerAdded(final ConsumerImpl sub)
+    public void consumerAdded(final MessageInstanceConsumer consumer)
     {
-        _consumers.add(sub);
+        _consumers.add(consumer);
     }
 
     @Override
-    public void consumerRemoved(final ConsumerImpl sub)
+    public void consumerRemoved(final MessageInstanceConsumer consumer)
     {
-        _consumers.remove(sub);
+        _consumers.remove(consumer);
         if(_consumers.isEmpty())
         {
             close();

Modified: qpid/java/branches/transfer-queue/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/transfer-queue/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java?rev=1765973&r1=1765972&r2=1765973&view=diff
==============================================================================
--- qpid/java/branches/transfer-queue/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java (original)
+++ qpid/java/branches/transfer-queue/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java Fri Oct 21 09:32:07 2016
@@ -23,8 +23,8 @@ package org.apache.qpid.server.protocol.
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.qpid.server.consumer.ConsumerImpl;
 import org.apache.qpid.server.message.MessageInstance;
+import org.apache.qpid.server.message.MessageInstanceConsumer;
 
 
 class ExplicitAcceptDispositionChangeListener implements ServerSession.MessageDispositionChangeListener
@@ -34,11 +34,11 @@ class ExplicitAcceptDispositionChangeLis
 
     private final MessageInstance _entry;
     private final ConsumerTarget_0_10 _target;
-    private final ConsumerImpl _consumer;
+    private final MessageInstanceConsumer _consumer;
 
     public ExplicitAcceptDispositionChangeListener(MessageInstance entry,
                                                    ConsumerTarget_0_10 target,
-                                                   final ConsumerImpl consumer)
+                                                   final MessageInstanceConsumer consumer)
     {
         _entry = entry;
         _target = target;

Modified: qpid/java/branches/transfer-queue/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ImplicitAcceptDispositionChangeListener.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/transfer-queue/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ImplicitAcceptDispositionChangeListener.java?rev=1765973&r1=1765972&r2=1765973&view=diff
==============================================================================
--- qpid/java/branches/transfer-queue/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ImplicitAcceptDispositionChangeListener.java (original)
+++ qpid/java/branches/transfer-queue/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ImplicitAcceptDispositionChangeListener.java Fri Oct 21 09:32:07 2016
@@ -23,8 +23,8 @@ package org.apache.qpid.server.protocol.
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.qpid.server.consumer.ConsumerImpl;
 import org.apache.qpid.server.message.MessageInstance;
+import org.apache.qpid.server.message.MessageInstanceConsumer;
 
 class ImplicitAcceptDispositionChangeListener implements ServerSession.MessageDispositionChangeListener
 {
@@ -33,11 +33,11 @@ class ImplicitAcceptDispositionChangeLis
 
     private final MessageInstance _entry;
     private final ConsumerTarget_0_10 _target;
-    private final ConsumerImpl _consumer;
+    private final MessageInstanceConsumer _consumer;
 
     public ImplicitAcceptDispositionChangeListener(MessageInstance entry,
                                                    ConsumerTarget_0_10 target,
-                                                   final ConsumerImpl consumer)
+                                                   final MessageInstanceConsumer consumer)
     {
         _entry = entry;
         _target = target;

Modified: qpid/java/branches/transfer-queue/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/transfer-queue/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java?rev=1765973&r1=1765972&r2=1765973&view=diff
==============================================================================
--- qpid/java/branches/transfer-queue/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java (original)
+++ qpid/java/branches/transfer-queue/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java Fri Oct 21 09:32:07 2016
@@ -21,8 +21,8 @@
 
 package org.apache.qpid.server.protocol.v0_10;
 
-import org.apache.qpid.server.consumer.ConsumerImpl;
 import org.apache.qpid.server.message.MessageInstance;
+import org.apache.qpid.server.message.MessageInstanceConsumer;
 import org.apache.qpid.transport.Method;
 
 public class MessageAcceptCompletionListener implements Method.CompletionListener
@@ -30,12 +30,12 @@ public class MessageAcceptCompletionList
     private final ConsumerTarget_0_10 _sub;
     private final MessageInstance _entry;
     private final ServerSession _session;
-    private final ConsumerImpl _consumer;
+    private final MessageInstanceConsumer _consumer;
     private long _messageSize;
     private boolean _restoreCredit;
 
     public MessageAcceptCompletionListener(ConsumerTarget_0_10 sub,
-                                           final ConsumerImpl consumer,
+                                           final MessageInstanceConsumer consumer,
                                            ServerSession session,
                                            MessageInstance entry,
                                            boolean restoreCredit)

Modified: qpid/java/branches/transfer-queue/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/transfer-queue/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java?rev=1765973&r1=1765972&r2=1765973&view=diff
==============================================================================
--- qpid/java/branches/transfer-queue/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java (original)
+++ qpid/java/branches/transfer-queue/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java Fri Oct 21 09:32:07 2016
@@ -56,15 +56,17 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.server.connection.SessionPrincipal;
-import org.apache.qpid.server.consumer.ConsumerImpl;
 import org.apache.qpid.server.consumer.ConsumerTarget;
 import org.apache.qpid.server.logging.LogMessage;
 import org.apache.qpid.server.logging.LogSubject;
 import org.apache.qpid.server.logging.messages.ChannelMessages;
 import org.apache.qpid.server.logging.subjects.ChannelLogSubject;
+import org.apache.qpid.server.message.BaseMessageInstance;
 import org.apache.qpid.server.message.InstanceProperties;
 import org.apache.qpid.server.message.MessageDestination;
 import org.apache.qpid.server.message.MessageInstance;
+import org.apache.qpid.server.message.MessageInstanceConsumer;
+import org.apache.qpid.server.model.AbstractConfigurationChangeListener;
 import org.apache.qpid.server.model.Broker;
 import org.apache.qpid.server.model.ConfigurationChangeListener;
 import org.apache.qpid.server.model.ConfiguredObject;
@@ -529,7 +531,7 @@ public class ServerSession extends Sessi
         // Broker shouldn't block awaiting close - thus do override this method to do nothing
     }
 
-    public void acknowledge(final ConsumerImpl consumer,
+    public void acknowledge(final MessageInstanceConsumer consumer,
                             final ConsumerTarget_0_10 target,
                             final MessageInstance entry)
     {
@@ -566,14 +568,14 @@ public class ServerSession extends Sessi
     }
 
 
-    public void register(final ConsumerImpl consumerImpl)
+    public void register(final MessageInstanceConsumer consumer)
     {
-        if(consumerImpl instanceof Consumer<?>)
+        if(consumer instanceof Consumer<?>)
         {
-            final Consumer<?> consumer = (Consumer<?>) consumerImpl;
-            _consumers.add(consumer);
-            consumer.addChangeListener(_consumerClosedListener);
-            consumerAdded(consumer);
+            final Consumer<?> modelConsumer = (Consumer<?>) consumer;
+            _consumers.add(modelConsumer);
+            modelConsumer.addChangeListener(_consumerClosedListener);
+            consumerAdded(modelConsumer);
         }
     }
 
@@ -1285,10 +1287,10 @@ public class ServerSession extends Sessi
         return getId().compareTo(o.getId());
     }
 
-    private class CheckCapacityAction implements Action<MessageInstance>
+    private class CheckCapacityAction implements Action<BaseMessageInstance>
     {
         @Override
-        public void performAction(final MessageInstance entry)
+        public void performAction(final BaseMessageInstance entry)
         {
             TransactionLogResource queue = entry.getOwningResource();
             if(queue instanceof CapacityChecker)
@@ -1298,7 +1300,7 @@ public class ServerSession extends Sessi
         }
     }
 
-    private class ConsumerClosedListener implements ConfigurationChangeListener
+    private class ConsumerClosedListener extends AbstractConfigurationChangeListener
     {
         @Override
         public void stateChanged(final ConfiguredObject object, final org.apache.qpid.server.model.State oldState, final org.apache.qpid.server.model.State newState)
@@ -1309,37 +1311,5 @@ public class ServerSession extends Sessi
             }
         }
 
-        @Override
-        public void childAdded(final ConfiguredObject object, final ConfiguredObject child)
-        {
-
-        }
-
-        @Override
-        public void childRemoved(final ConfiguredObject object, final ConfiguredObject child)
-        {
-
-        }
-
-        @Override
-        public void attributeSet(final ConfiguredObject object,
-                                 final String attributeName,
-                                 final Object oldAttributeValue,
-                                 final Object newAttributeValue)
-        {
-
-        }
-
-        @Override
-        public void bulkChangeStart(final ConfiguredObject<?> object)
-        {
-
-        }
-
-        @Override
-        public void bulkChangeEnd(final ConfiguredObject<?> object)
-        {
-
-        }
     }
 }

Modified: qpid/java/branches/transfer-queue/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/transfer-queue/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java?rev=1765973&r1=1765972&r2=1765973&view=diff
==============================================================================
--- qpid/java/branches/transfer-queue/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java (original)
+++ qpid/java/branches/transfer-queue/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java Fri Oct 21 09:32:07 2016
@@ -42,12 +42,12 @@ import org.apache.qpid.common.AMQPFilter
 import org.apache.qpid.exchange.ExchangeDefaults;
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.server.logging.EventLogger;
+import org.apache.qpid.server.message.ConsumerOption;
 import org.apache.qpid.server.model.ConfiguredObject;
 import org.apache.qpid.server.model.Exchange;
 import org.apache.qpid.server.model.NamedAddressSpace;
 import org.apache.qpid.server.security.access.Operation;
 import org.apache.qpid.server.transport.ProtocolEngine;
-import org.apache.qpid.server.consumer.ConsumerImpl;
 import org.apache.qpid.server.store.MessageHandle;
 import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
 import org.apache.qpid.server.virtualhost.VirtualHostUnavailableException;
@@ -342,18 +342,18 @@ public class ServerSessionDelegate exten
                     ((ServerSession)session).register(destination, target);
                     try
                     {
-                        EnumSet<ConsumerImpl.Option> options = EnumSet.noneOf(ConsumerImpl.Option.class);
+                        EnumSet<ConsumerOption> options = EnumSet.noneOf(ConsumerOption.class);
                         if(method.getAcquireMode() == MessageAcquireMode.PRE_ACQUIRED)
                         {
-                            options.add(ConsumerImpl.Option.ACQUIRES);
+                            options.add(ConsumerOption.ACQUIRES);
                         }
                         if(method.getAcquireMode() != MessageAcquireMode.NOT_ACQUIRED || method.getAcceptMode() == MessageAcceptMode.EXPLICIT)
                         {
-                            options.add(ConsumerImpl.Option.SEES_REQUEUES);
+                            options.add(ConsumerOption.SEES_REQUEUES);
                         }
                         if(method.getExclusive())
                         {
-                            options.add(ConsumerImpl.Option.EXCLUSIVE);
+                            options.add(ConsumerOption.EXCLUSIVE);
                         }
                         for(MessageSource source : sources)
                         {

Modified: qpid/java/branches/transfer-queue/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/transfer-queue/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java?rev=1765973&r1=1765972&r2=1765973&view=diff
==============================================================================
--- qpid/java/branches/transfer-queue/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java (original)
+++ qpid/java/branches/transfer-queue/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java Fri Oct 21 09:32:07 2016
@@ -57,7 +57,6 @@ import org.apache.qpid.exchange.Exchange
 import org.apache.qpid.framing.*;
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.server.connection.SessionPrincipal;
-import org.apache.qpid.server.consumer.ConsumerImpl;
 import org.apache.qpid.server.consumer.ConsumerTarget;
 import org.apache.qpid.server.filter.AMQInvalidArgumentException;
 import org.apache.qpid.server.filter.ArrivalTimeFilter;
@@ -73,12 +72,16 @@ import org.apache.qpid.server.logging.Lo
 import org.apache.qpid.server.logging.messages.ChannelMessages;
 import org.apache.qpid.server.logging.messages.ExchangeMessages;
 import org.apache.qpid.server.logging.subjects.ChannelLogSubject;
+import org.apache.qpid.server.message.BaseMessageInstance;
+import org.apache.qpid.server.message.ConsumerOption;
 import org.apache.qpid.server.message.InstanceProperties;
 import org.apache.qpid.server.message.MessageDestination;
 import org.apache.qpid.server.message.MessageInstance;
+import org.apache.qpid.server.message.MessageInstanceConsumer;
 import org.apache.qpid.server.message.MessageReference;
 import org.apache.qpid.server.message.MessageSource;
 import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.model.AbstractConfigurationChangeListener;
 import org.apache.qpid.server.model.Broker;
 import org.apache.qpid.server.model.ConfigurationChangeListener;
 import org.apache.qpid.server.model.ConfiguredObject;
@@ -297,7 +300,7 @@ public class AMQChannel
         final RecordDeliveryMethod getRecordMethod = new RecordDeliveryMethod()
         {
 
-            public void recordMessageDelivery(final ConsumerImpl sub,
+            public void recordMessageDelivery(final MessageInstanceConsumer sub,
                                               final MessageInstance entry,
                                               final long deliveryTag)
             {
@@ -306,8 +309,8 @@ public class AMQChannel
         };
 
         ConsumerTarget_0_8 target;
-        EnumSet<ConsumerImpl.Option> options = EnumSet.of(ConsumerImpl.Option.TRANSIENT, ConsumerImpl.Option.ACQUIRES,
-                                                          ConsumerImpl.Option.SEES_REQUEUES);
+        EnumSet<ConsumerOption> options = EnumSet.of(ConsumerOption.TRANSIENT, ConsumerOption.ACQUIRES,
+                                                     ConsumerOption.SEES_REQUEUES);
         if (acks)
         {
 
@@ -322,7 +325,7 @@ public class AMQChannel
                                                              singleMessageCredit, getDeliveryMethod, getRecordMethod);
         }
 
-        ConsumerImpl sub = queue.addConsumer(target, null, AMQMessage.class, "", options, null);
+        MessageInstanceConsumer sub = queue.addConsumer(target, null, AMQMessage.class, "", options, null);
         sub.flush();
         sub.close();
         return getDeliveryMethod.hasDeliveredMessage();
@@ -433,6 +436,7 @@ public class AMQChannel
 
                 if(_currentMessage.getDestination() instanceof ConfiguredObject)
                 {
+
                     ((ConfiguredObject)_currentMessage.getDestination()).authorise(_token,
                                                                                    Operation.ACTION("publish"),
                                                                                    AbstractAMQPConnection.PUBLISH_ACTION_MAP_CREATOR.createMap(routingKey, info.isImmediate()));
@@ -626,12 +630,13 @@ public class AMQChannel
         }
         else
         {
+            if(_confirmOnPublish)
+            {
+                _connection.writeFrame(new AMQFrame(_channelId, new BasicNackBody(_confirmedMessageCounter, false, false)));
+            }
+
             if (mandatory || message.isImmediate())
             {
-                if(_confirmOnPublish)
-                {
-                    _connection.writeFrame(new AMQFrame(_channelId, new BasicNackBody(_confirmedMessageCounter, false, false)));
-                }
                 _transaction.addPostTransactionAction(new WriteReturnAction(AMQConstant.NO_ROUTE,
                                                                             "No Route for message "
                                                                             + description,
@@ -723,7 +728,7 @@ public class AMQChannel
         }
 
         ConsumerTarget_0_8 target;
-        EnumSet<ConsumerImpl.Option> options = EnumSet.noneOf(ConsumerImpl.Option.class);
+        EnumSet<ConsumerOption> options = EnumSet.noneOf(ConsumerOption.class);
 
         if(arguments != null && Boolean.TRUE.equals(arguments.get(AMQPFilterTypes.NO_CONSUME.getValue())))
         {
@@ -732,19 +737,19 @@ public class AMQChannel
         else if(acks)
         {
             target = ConsumerTarget_0_8.createAckTarget(this, tag, arguments, _creditManager);
-            options.add(ConsumerImpl.Option.ACQUIRES);
-            options.add(ConsumerImpl.Option.SEES_REQUEUES);
+            options.add(ConsumerOption.ACQUIRES);
+            options.add(ConsumerOption.SEES_REQUEUES);
         }
         else
         {
             target = ConsumerTarget_0_8.createNoAckTarget(this, tag, arguments, _noAckCreditManager);
-            options.add(ConsumerImpl.Option.ACQUIRES);
-            options.add(ConsumerImpl.Option.SEES_REQUEUES);
+            options.add(ConsumerOption.ACQUIRES);
+            options.add(ConsumerOption.SEES_REQUEUES);
         }
 
         if(exclusive)
         {
-            options.add(ConsumerImpl.Option.EXCLUSIVE);
+            options.add(ConsumerOption.EXCLUSIVE);
         }
 
 
@@ -828,7 +833,7 @@ public class AMQChannel
 
             for(MessageSource source : sources)
             {
-                ConsumerImpl sub =
+                MessageInstanceConsumer sub =
                         source.addConsumer(target,
                                            filterManager,
                                            AMQMessage.class,
@@ -868,15 +873,15 @@ public class AMQChannel
         }
 
         ConsumerTarget_0_8 target = _tag2SubscriptionTargetMap.remove(consumerTag);
-        Collection<ConsumerImpl> subs = target == null ? null : target.getConsumers();
-        if (subs != null)
+        Collection<MessageInstanceConsumer> consumers = target == null ? null : target.getConsumers();
+        if (consumers != null)
         {
-            for(ConsumerImpl sub : subs)
+            for(MessageInstanceConsumer consumer : consumers)
             {
-                sub.close();
-                if (sub instanceof Consumer<?>)
+                consumer.close();
+                if (consumer instanceof Consumer<?>)
                 {
-                    _consumers.remove(sub);
+                    _consumers.remove(consumer);
                 }
             }
             return true;
@@ -956,13 +961,12 @@ public class AMQChannel
 
     /**
      * Add a message to the channel-based list of unacknowledged messages
-     *
-     * @param entry       the record of the message on the queue that was delivered
+     *  @param entry       the record of the message on the queue that was delivered
      * @param deliveryTag the delivery tag used when delivering the message (see protocol spec for description of the
      *                    delivery tag)
      * @param consumer The consumer that is to acknowledge this message.
      */
-    public void addUnacknowledgedMessage(MessageInstance entry, long deliveryTag, ConsumerImpl consumer)
+    public void addUnacknowledgedMessage(MessageInstance entry, long deliveryTag, MessageInstanceConsumer consumer)
     {
         if (_logger.isDebugEnabled())
         {
@@ -1208,9 +1212,9 @@ public class AMQChannel
                 // may need to deliver queued messages
                 for (ConsumerTarget_0_8 s : getConsumerTargets())
                 {
-                    for(ConsumerImpl sub : s.getConsumers())
+                    for(MessageInstanceConsumer consumer : s.getConsumers())
                     {
-                        sub.externalStateChange();
+                        consumer.externalStateChange();
                     }
                 }
             }
@@ -1316,7 +1320,7 @@ public class AMQChannel
 
         for(MessageInstance entry : _resendList)
         {
-            ConsumerImpl sub = entry.getAcquiringConsumer();
+            MessageInstanceConsumer sub = entry.getAcquiringConsumer();
             if (sub == null || sub.isClosed())
             {
                 entry.release(sub);
@@ -1333,9 +1337,9 @@ public class AMQChannel
             _suspended.set(false);
             for(ConsumerTarget_0_8 target : getConsumerTargets())
             {
-                for(ConsumerImpl sub : target.getConsumers())
+                for(MessageInstanceConsumer consumer : target.getConsumers())
                 {
-                    sub.externalStateChange();
+                    consumer.externalStateChange();
                 }
             }
 
@@ -1385,7 +1389,7 @@ public class AMQChannel
     private final RecordDeliveryMethod _recordDeliveryMethod = new RecordDeliveryMethod()
         {
 
-            public void recordMessageDelivery(final ConsumerImpl sub, final MessageInstance entry, final long deliveryTag)
+            public void recordMessageDelivery(final MessageInstanceConsumer sub, final MessageInstance entry, final long deliveryTag)
             {
                 addUnacknowledgedMessage(entry, deliveryTag, sub);
             }
@@ -1505,7 +1509,7 @@ public class AMQChannel
         }
 
         @Override
-        public long deliverToClient(final ConsumerImpl sub, final ServerMessage message,
+        public long deliverToClient(final MessageInstanceConsumer consumer, final ServerMessage message,
                                     final InstanceProperties props, final long deliveryTag)
         {
             _singleMessageCredit.useCreditForMessage(message.getSize());
@@ -1527,14 +1531,14 @@ public class AMQChannel
     }
 
 
-    private class ImmediateAction implements Action<MessageInstance>
+    private class ImmediateAction implements Action<BaseMessageInstance>
     {
 
         public ImmediateAction()
         {
         }
 
-        public void performAction(MessageInstance entry)
+        public void performAction(BaseMessageInstance entry)
         {
             TransactionLogResource queue = entry.getOwningResource();
 
@@ -1592,10 +1596,10 @@ public class AMQChannel
         }
     }
 
-    private final class CapacityCheckAction implements Action<MessageInstance>
+    private final class CapacityCheckAction implements Action<BaseMessageInstance>
     {
         @Override
-        public void performAction(final MessageInstance entry)
+        public void performAction(final BaseMessageInstance entry)
         {
             TransactionLogResource queue = entry.getOwningResource();
             if(queue instanceof CapacityChecker)
@@ -1797,10 +1801,10 @@ public class AMQChannel
             int requeues = 0;
             if (rejectedQueueEntry.makeAcquisitionUnstealable(rejectedQueueEntry.getAcquiringConsumer()))
             {
-                requeues = rejectedQueueEntry.routeToAlternate(new Action<MessageInstance>()
+                requeues = rejectedQueueEntry.routeToAlternate(new Action<BaseMessageInstance>()
                 {
                     @Override
-                    public void performAction(final MessageInstance requeueEntry)
+                    public void performAction(final BaseMessageInstance requeueEntry)
                     {
                         messageWithSubject(ChannelMessages.DEADLETTERMSG(msg.getMessageNumber(),
                                                                          requeueEntry.getOwningResource()
@@ -1938,7 +1942,7 @@ public class AMQChannel
         return Collections.unmodifiableCollection(_consumers);
     }
 
-    private class ConsumerClosedListener implements ConfigurationChangeListener
+    private class ConsumerClosedListener extends AbstractConfigurationChangeListener
     {
         @Override
         public void stateChanged(final ConfiguredObject object, final State oldState, final State newState)
@@ -1949,38 +1953,6 @@ public class AMQChannel
             }
         }
 
-        @Override
-        public void childAdded(final ConfiguredObject object, final ConfiguredObject child)
-        {
-
-        }
-
-        @Override
-        public void childRemoved(final ConfiguredObject object, final ConfiguredObject child)
-        {
-
-        }
-
-        @Override
-        public void attributeSet(final ConfiguredObject object,
-                                 final String attributeName,
-                                 final Object oldAttributeValue,
-                                 final Object newAttributeValue)
-        {
-
-        }
-
-        @Override
-        public void bulkChangeStart(final ConfiguredObject<?> object)
-        {
-
-        }
-
-        @Override
-        public void bulkChangeEnd(final ConfiguredObject<?> object)
-        {
-
-        }
     }
 
     private void consumerAdded(final Consumer<?> consumer)

Modified: qpid/java/branches/transfer-queue/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8Impl.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/transfer-queue/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8Impl.java?rev=1765973&r1=1765972&r2=1765973&view=diff
==============================================================================
--- qpid/java/branches/transfer-queue/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8Impl.java (original)
+++ qpid/java/branches/transfer-queue/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8Impl.java Fri Oct 21 09:32:07 2016
@@ -60,6 +60,7 @@ import org.apache.qpid.configuration.Com
 import org.apache.qpid.framing.*;
 import org.apache.qpid.properties.ConnectionStartProperties;
 import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.server.message.MessageInstanceConsumer;
 import org.apache.qpid.server.model.NamedAddressSpace;
 import org.apache.qpid.server.model.Protocol;
 import org.apache.qpid.server.protocol.ConnectionClosingTicker;
@@ -67,7 +68,6 @@ import org.apache.qpid.server.security.*
 import org.apache.qpid.server.transport.AbstractAMQPConnection;
 import org.apache.qpid.server.transport.ProtocolEngine;
 import org.apache.qpid.server.configuration.BrokerProperties;
-import org.apache.qpid.server.consumer.ConsumerImpl;
 import org.apache.qpid.server.logging.messages.ConnectionMessages;
 import org.apache.qpid.server.message.InstanceProperties;
 import org.apache.qpid.server.message.ServerMessage;
@@ -756,7 +756,7 @@ public class AMQPConnection_0_8Impl
         }, getAccessControllerContext());
     }
 
-    public synchronized void writerIdle()
+    public void writerIdle()
     {
         writeFrame(HeartbeatBody.FRAME);
     }
@@ -1268,14 +1268,14 @@ public class AMQPConnection_0_8Impl
         }
 
         @Override
-        public long deliverToClient(final ConsumerImpl sub, final ServerMessage message,
+        public long deliverToClient(final MessageInstanceConsumer consumer, final ServerMessage message,
                                     final InstanceProperties props, final long deliveryTag)
         {
             long size = _protocolOutputConverter.writeDeliver(message,
                                                   props,
                                                   _channelId,
                                                   deliveryTag,
-                                                  new AMQShortString(sub.getName()));
+                                                  new AMQShortString(consumer.getName()));
             registerMessageDelivered(size);
             return size;
         }

Modified: qpid/java/branches/transfer-queue/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ClientDeliveryMethod.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/transfer-queue/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ClientDeliveryMethod.java?rev=1765973&r1=1765972&r2=1765973&view=diff
==============================================================================
--- qpid/java/branches/transfer-queue/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ClientDeliveryMethod.java (original)
+++ qpid/java/branches/transfer-queue/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ClientDeliveryMethod.java Fri Oct 21 09:32:07 2016
@@ -20,12 +20,12 @@
 */
 package org.apache.qpid.server.protocol.v0_8;
 
-import org.apache.qpid.server.consumer.ConsumerImpl;
 import org.apache.qpid.server.message.InstanceProperties;
+import org.apache.qpid.server.message.MessageInstanceConsumer;
 import org.apache.qpid.server.message.ServerMessage;
 
 public interface ClientDeliveryMethod
 {
-    long deliverToClient(final ConsumerImpl sub, final ServerMessage message, final InstanceProperties props,
+    long deliverToClient(final MessageInstanceConsumer consumer, final ServerMessage message, final InstanceProperties props,
                          final long deliveryTag);
 }

Modified: qpid/java/branches/transfer-queue/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/transfer-queue/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java?rev=1765973&r1=1765972&r2=1765973&view=diff
==============================================================================
--- qpid/java/branches/transfer-queue/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java (original)
+++ qpid/java/branches/transfer-queue/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java Fri Oct 21 09:32:07 2016
@@ -30,11 +30,11 @@ import org.apache.qpid.common.AMQPFilter
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.server.consumer.AbstractConsumerTarget;
-import org.apache.qpid.server.consumer.ConsumerImpl;
 import org.apache.qpid.server.flow.FlowCreditManager;
 import org.apache.qpid.server.message.InstanceProperties;
 import org.apache.qpid.server.message.MessageInstance;
 import org.apache.qpid.server.message.MessageInstance.EntryState;
+import org.apache.qpid.server.message.MessageInstanceConsumer;
 import org.apache.qpid.server.message.MessageReference;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.protocol.AMQSessionModel;
@@ -56,7 +56,7 @@ public abstract class ConsumerTarget_0_8
 
     private final AtomicLong _unacknowledgedCount = new AtomicLong(0);
     private final AtomicLong _unacknowledgedBytes = new AtomicLong(0);
-    private final List<ConsumerImpl> _consumers = new CopyOnWriteArrayList<>();
+    private final List<MessageInstanceConsumer> _consumers = new CopyOnWriteArrayList<>();
     private final AtomicBoolean _needToClose = new AtomicBoolean();
     private final String _targetAddress;
 
@@ -78,7 +78,7 @@ public abstract class ConsumerTarget_0_8
         return new GetNoAckConsumer(channel, consumerTag, filters, creditManager, deliveryMethod, recordMethod);
     }
 
-    public List<ConsumerImpl> getConsumers()
+    public List<MessageInstanceConsumer> getConsumers()
     {
         return _consumers;
     }
@@ -108,7 +108,7 @@ public abstract class ConsumerTarget_0_8
          * @throws QpidException
          */
         @Override
-        public void doSend(final ConsumerImpl consumer, MessageInstance entry, boolean batch)
+        public void doSend(final MessageInstanceConsumer consumer, MessageInstance entry, boolean batch)
         {
             // We don't decrement the reference here as we don't want to consume the message
             // but we do want to send it to the client.
@@ -157,13 +157,12 @@ public abstract class ConsumerTarget_0_8
         /**
          * This method can be called by each of the publisher threads. As a result all changes to the channel object must be
          * thread safe.
-         *
-         * @param consumer
+         *  @param consumer
          * @param entry   The message to send
          * @param batch
          */
         @Override
-        public void doSend(final ConsumerImpl consumer, MessageInstance entry, boolean batch)
+        public void doSend(final MessageInstanceConsumer consumer, MessageInstance entry, boolean batch)
         {
             // if we do not need to wait for client acknowledgements
             // we can decrement the reference count immediately.
@@ -256,13 +255,12 @@ public abstract class ConsumerTarget_0_8
         /**
          * This method can be called by each of the publisher threads. As a result all changes to the channel object must be
          * thread safe.
-         *
-         * @param consumer
+         *  @param consumer
          * @param entry   The message to send
          * @param batch
          */
         @Override
-        public void doSend(final ConsumerImpl consumer, MessageInstance entry, boolean batch)
+        public void doSend(final MessageInstanceConsumer consumer, MessageInstance entry, boolean batch)
         {
 
             // put queue entry on a list and then notify the connection to read list.
@@ -347,9 +345,9 @@ public abstract class ConsumerTarget_0_8
     }
 
     @Override
-    public void consumerRemoved(final ConsumerImpl sub)
+    public void consumerRemoved(final MessageInstanceConsumer consumer)
     {
-        _consumers.remove(sub);
+        _consumers.remove(consumer);
         if(_consumers.isEmpty())
         {
             close();
@@ -357,9 +355,9 @@ public abstract class ConsumerTarget_0_8
     }
 
     @Override
-    public void consumerAdded(final ConsumerImpl sub)
+    public void consumerAdded(final MessageInstanceConsumer consumer)
     {
-        _consumers.add( sub );
+        _consumers.add(consumer);
     }
 
     @Override
@@ -460,7 +458,7 @@ public abstract class ConsumerTarget_0_8
         }
     }
 
-    protected long sendToClient(final ConsumerImpl consumer, final ServerMessage message,
+    protected long sendToClient(final MessageInstanceConsumer consumer, final ServerMessage message,
                                 final InstanceProperties props,
                                 final long deliveryTag)
     {
@@ -469,7 +467,7 @@ public abstract class ConsumerTarget_0_8
     }
 
 
-    protected void recordMessageDelivery(final ConsumerImpl consumer,
+    protected void recordMessageDelivery(final MessageInstanceConsumer consumer,
                                          final MessageInstance entry,
                                          final long deliveryTag)
     {

Modified: qpid/java/branches/transfer-queue/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeue.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/transfer-queue/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeue.java?rev=1765973&r1=1765972&r2=1765973&view=diff
==============================================================================
--- qpid/java/branches/transfer-queue/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeue.java (original)
+++ qpid/java/branches/transfer-queue/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeue.java Fri Oct 21 09:32:07 2016
@@ -25,8 +25,8 @@ import java.util.Map;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.qpid.server.consumer.ConsumerImpl;
 import org.apache.qpid.server.message.MessageInstance;
+import org.apache.qpid.server.message.MessageInstanceConsumer;
 
 public class ExtractResendAndRequeue implements UnacknowledgedMessageMap.Visitor
 {
@@ -49,7 +49,7 @@ public class ExtractResendAndRequeue imp
     {
 
         message.setRedelivered();
-        final ConsumerImpl consumer = message.getDeliveredConsumer();
+        final MessageInstanceConsumer consumer = message.getDeliveredConsumer();
         if (consumer != null)
         {
             // Consumer exists

Added: qpid/java/branches/transfer-queue/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageContentSourceBody.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/transfer-queue/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageContentSourceBody.java?rev=1765973&view=auto
==============================================================================
--- qpid/java/branches/transfer-queue/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageContentSourceBody.java (added)
+++ qpid/java/branches/transfer-queue/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageContentSourceBody.java Fri Oct 21 09:32:07 2016
@@ -0,0 +1,79 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.v0_8;
+
+import org.apache.qpid.QpidException;
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.framing.AMQBody;
+import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
+import org.apache.qpid.server.message.MessageContentSource;
+import org.apache.qpid.transport.ByteBufferSender;
+
+public class MessageContentSourceBody implements AMQBody
+{
+    public static final byte TYPE = 3;
+    private final int _length;
+    private final MessageContentSource _content;
+    private final int _offset;
+
+    public MessageContentSourceBody(MessageContentSource content, int offset, int length)
+    {
+        _content = content;
+        _offset = offset;
+        _length = length;
+    }
+
+    public byte getFrameType()
+    {
+        return TYPE;
+    }
+
+    public int getSize()
+    {
+        return _length;
+    }
+
+    @Override
+    public long writePayload(final ByteBufferSender sender)
+    {
+        long size = 0L;
+        for(QpidByteBuffer buf : _content.getContent(_offset, _length))
+        {
+            size += buf.remaining();
+
+            sender.send(buf);
+            buf.dispose();
+        }
+        return size;
+    }
+
+    public void handle(int channelId, AMQVersionAwareProtocolSession amqProtocolSession) throws QpidException
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public String toString()
+    {
+        return "[" + getClass().getSimpleName() + " offset: " + _offset + ", length: " + _length + "]";
+    }
+
+}

Propchange: qpid/java/branches/transfer-queue/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageContentSourceBody.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: qpid/java/branches/transfer-queue/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/transfer-queue/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java?rev=1765973&r1=1765972&r2=1765973&view=diff
==============================================================================
--- qpid/java/branches/transfer-queue/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java (original)
+++ qpid/java/branches/transfer-queue/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java Fri Oct 21 09:32:07 2016
@@ -246,57 +246,6 @@ public class ProtocolOutputConverterImpl
         return GZIP_ENCODING.equals(contentHeaderBody.getProperties().getEncoding());
     }
 
-    private class MessageContentSourceBody implements AMQBody
-    {
-        public static final byte TYPE = 3;
-        private final int _length;
-        private final MessageContentSource _content;
-        private final int _offset;
-
-        public MessageContentSourceBody(MessageContentSource content, int offset, int length)
-        {
-            _content = content;
-            _offset = offset;
-            _length = length;
-        }
-
-        public byte getFrameType()
-        {
-            return TYPE;
-        }
-
-        public int getSize()
-        {
-            return _length;
-        }
-
-        @Override
-        public long writePayload(final ByteBufferSender sender)
-        {
-            long size = 0L;
-            for(QpidByteBuffer buf : _content.getContent(_offset, _length))
-            {
-                size += buf.remaining();
-
-                sender.send(buf);
-                buf.dispose();
-            }
-            return size;
-        }
-
-        public void handle(int channelId, AMQVersionAwareProtocolSession amqProtocolSession) throws QpidException
-        {
-            throw new UnsupportedOperationException();
-        }
-
-        @Override
-        public String toString()
-        {
-            return "[" + getClass().getSimpleName() + " offset: " + _offset + ", length: " + _length + "]";
-        }
-
-    }
-
     public long writeGetOk(final ServerMessage msg,
                            final InstanceProperties props,
                            int channelId,

Modified: qpid/java/branches/transfer-queue/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/RecordDeliveryMethod.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/transfer-queue/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/RecordDeliveryMethod.java?rev=1765973&r1=1765972&r2=1765973&view=diff
==============================================================================
--- qpid/java/branches/transfer-queue/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/RecordDeliveryMethod.java (original)
+++ qpid/java/branches/transfer-queue/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/RecordDeliveryMethod.java Fri Oct 21 09:32:07 2016
@@ -20,10 +20,10 @@
 */
 package org.apache.qpid.server.protocol.v0_8;
 
-import org.apache.qpid.server.consumer.ConsumerImpl;
 import org.apache.qpid.server.message.MessageInstance;
+import org.apache.qpid.server.message.MessageInstanceConsumer;
 
 public interface RecordDeliveryMethod
 {
-    void recordMessageDelivery(final ConsumerImpl sub, final MessageInstance entry, final long deliveryTag);
+    void recordMessageDelivery(final MessageInstanceConsumer sub, final MessageInstance entry, final long deliveryTag);
 }

Added: qpid/java/branches/transfer-queue/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/federation/FederationDecoder.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/transfer-queue/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/federation/FederationDecoder.java?rev=1765973&view=auto
==============================================================================
--- qpid/java/branches/transfer-queue/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/federation/FederationDecoder.java (added)
+++ qpid/java/branches/transfer-queue/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/federation/FederationDecoder.java Fri Oct 21 09:32:07 2016
@@ -0,0 +1,122 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.v0_8.federation;
+
+import java.io.IOException;
+import java.security.AccessController;
+import java.security.PrivilegedActionException;
+import java.security.PrivilegedExceptionAction;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.codec.ClientDecoder;
+import org.apache.qpid.framing.AMQFrameDecodingException;
+import org.apache.qpid.framing.AMQProtocolVersionException;
+import org.apache.qpid.server.util.ServerScopedRuntimeException;
+
+class FederationDecoder extends ClientDecoder
+{
+    private static final Logger LOGGER = LoggerFactory.getLogger(FederationDecoder.class);
+
+
+    private final OutboundConnection_0_8 _connection;
+
+    FederationDecoder(final OutboundConnection_0_8 connection)
+    {
+        super(connection);
+        _connection = connection;
+    }
+
+    public void decodeBuffer(QpidByteBuffer buf) throws AMQFrameDecodingException, AMQProtocolVersionException, IOException
+    {
+        decode(buf);
+    }
+
+    @Override
+    protected void processFrame(final int channelId, final byte type, final long bodySize, final QpidByteBuffer in)
+            throws AMQFrameDecodingException
+    {
+        long startTime = 0;
+        try
+        {
+            if (LOGGER.isDebugEnabled())
+            {
+                startTime = System.currentTimeMillis();
+            }
+            OutboundChannel channel = _connection.getChannel(channelId);
+            if(channel == null)
+            {
+                doProcessFrame(channelId, type, bodySize, in);
+            }
+            else
+            {
+
+                try
+                {
+                    AccessController.doPrivileged(new PrivilegedExceptionAction<Object>()
+                    {
+                        @Override
+                        public Void run() throws IOException, AMQFrameDecodingException
+                        {
+                            doProcessFrame(channelId, type, bodySize, in);
+                            return null;
+                        }
+                    }, channel.getAccessControllerContext());
+                }
+                catch (PrivilegedActionException e)
+                {
+                    Throwable cause = e.getCause();
+                    if(cause instanceof AMQFrameDecodingException)
+                    {
+                        throw (AMQFrameDecodingException) cause;
+                    }
+                    else if(cause instanceof RuntimeException)
+                    {
+                        throw (RuntimeException) cause;
+                    }
+                    else
+                    {
+                        throw new ServerScopedRuntimeException(cause);
+                    }
+                }
+            }
+        }
+        finally
+        {
+            if(LOGGER.isDebugEnabled())
+            {
+                LOGGER.debug("Frame handled in {} ms.", (System.currentTimeMillis() - startTime));
+            }
+        }
+    }
+
+
+    private void doProcessFrame(final int channelId, final byte type, final long bodySize, final QpidByteBuffer in)
+            throws AMQFrameDecodingException
+    {
+        super.processFrame(channelId, type, bodySize, in);
+
+    }
+
+
+}

Propchange: qpid/java/branches/transfer-queue/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/federation/FederationDecoder.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: qpid/java/branches/transfer-queue/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/federation/OutboundChannel.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/transfer-queue/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/federation/OutboundChannel.java?rev=1765973&view=auto
==============================================================================
--- qpid/java/branches/transfer-queue/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/federation/OutboundChannel.java (added)
+++ qpid/java/branches/transfer-queue/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/federation/OutboundChannel.java Fri Oct 21 09:32:07 2016
@@ -0,0 +1,34 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.v0_8.federation;
+
+import java.security.AccessControlContext;
+
+import org.apache.qpid.framing.ClientChannelMethodProcessor;
+
+interface OutboundChannel extends ClientChannelMethodProcessor
+{
+    AccessControlContext getAccessControllerContext();
+
+    boolean processPending();
+
+    void receivedComplete();
+}

Propchange: qpid/java/branches/transfer-queue/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/federation/OutboundChannel.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: qpid/java/branches/transfer-queue/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/federation/OutboundConnection_0_8.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/transfer-queue/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/federation/OutboundConnection_0_8.java?rev=1765973&view=auto
==============================================================================
--- qpid/java/branches/transfer-queue/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/federation/OutboundConnection_0_8.java (added)
+++ qpid/java/branches/transfer-queue/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/federation/OutboundConnection_0_8.java Fri Oct 21 09:32:07 2016
@@ -0,0 +1,772 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.v0_8.federation;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.security.AccessControlContext;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import javax.security.auth.Subject;
+import javax.security.auth.SubjectDomainCombiner;
+import javax.security.sasl.SaslClient;
+import javax.security.sasl.SaslException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.common.ServerPropertyNames;
+import org.apache.qpid.configuration.CommonProperties;
+import org.apache.qpid.framing.AMQDataBlock;
+import org.apache.qpid.framing.AMQFrameDecodingException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.ClientChannelMethodProcessor;
+import org.apache.qpid.framing.ClientMethodProcessor;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.FieldTableFactory;
+import org.apache.qpid.framing.HeartbeatBody;
+import org.apache.qpid.framing.MethodRegistry;
+import org.apache.qpid.framing.ProtocolInitiation;
+import org.apache.qpid.framing.ProtocolVersion;
+import org.apache.qpid.server.configuration.BrokerProperties;
+import org.apache.qpid.server.federation.OutboundProtocolEngine;
+import org.apache.qpid.server.model.Broker;
+import org.apache.qpid.server.model.Credential;
+import org.apache.qpid.server.model.Protocol;
+import org.apache.qpid.server.model.RemoteHost;
+import org.apache.qpid.server.model.RemoteHostAddress;
+import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.server.store.StoreException;
+import org.apache.qpid.server.transport.AggregateTicker;
+import org.apache.qpid.server.transport.ProtocolEngine;
+import org.apache.qpid.server.transport.SchedulableConnection;
+import org.apache.qpid.server.transport.ServerIdleReadTimeoutTicker;
+import org.apache.qpid.server.transport.ServerIdleWriteTimeoutTicker;
+import org.apache.qpid.server.util.Action;
+import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
+import org.apache.qpid.server.util.ServerScopedRuntimeException;
+
+
+class OutboundConnection_0_8 implements OutboundProtocolEngine, ClientMethodProcessor<ClientChannelMethodProcessor>
+{
+
+    private static final Map<Protocol, ProtocolVersion> PROTOCOL_VERSION_MAP;
+
+    static
+    {
+        Map<Protocol, ProtocolVersion> protocolVersionMap = new HashMap<>();
+        protocolVersionMap.put(Protocol.AMQP_0_8, ProtocolVersion.v0_8);
+        protocolVersionMap.put(Protocol.AMQP_0_9, ProtocolVersion.v0_9);
+        protocolVersionMap.put(Protocol.AMQP_0_9_1, ProtocolVersion.v0_91);
+
+        PROTOCOL_VERSION_MAP = Collections.unmodifiableMap(protocolVersionMap);
+    }
+
+    private static final int TRANSFER_SESSION_ID = 0;
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(OutboundConnection_0_8.class);
+
+    private final Protocol _protocol;
+    private final RemoteHost<?> _remoteHost;
+    private final RemoteHostAddress<?> _address;
+    private final VirtualHost<?> _virtualHost;
+
+    private final FederationDecoder _decoder;
+
+
+    private final AggregateTicker _aggregateTicker;
+    private final ProtocolVersion _protocolVersion;
+
+    private final MethodRegistry _methodRegistry;
+    private volatile long _lastReadTime;
+    private volatile long _lastWriteTime;
+    private SchedulableConnection _connection;
+    private volatile AccessControlContext _accessControllerContext;
+
+
+
+    private final AtomicBoolean _stateChanged = new AtomicBoolean();
+    private final AtomicReference<Action<ProtocolEngine>> _workListener = new AtomicReference<>();
+    private volatile Thread _ioThread;
+    private final List<Runnable> _pendingTasks = new CopyOnWriteArrayList<>();
+    private int _classId;
+    private int _methodId;
+    private SaslClient _saslClient;
+    private int _maxFrameSize;
+    private int _maxNoOfChannels;
+
+    private static final OutboundChannel[] _channels = new OutboundChannel[3];
+    private Action<Boolean> _onClosedTask;
+
+    enum State
+    {
+        INIT,
+        AWAIT_START,
+        AWAIT_SECURE,
+        AWAIT_TUNE,
+        AWAIT_OPEN_OK,
+        OPEN,
+        AWAIT_CLOSE_OK,
+        CLOSED
+    }
+
+    private State _state = State.INIT;
+
+
+    public OutboundConnection_0_8(final RemoteHostAddress<?> address,
+                                  final VirtualHost<?> virtualHost,
+                                  final Protocol protocol,
+                                  final byte[] protocolHeader)
+    {
+        _address = address;
+        _protocol = protocol;
+        _virtualHost = virtualHost;
+        _aggregateTicker = new AggregateTicker();
+        _remoteHost = address.getParent(RemoteHost.class);
+        _decoder = new FederationDecoder(this);
+        _protocolVersion = PROTOCOL_VERSION_MAP.get(protocol);
+        _methodRegistry = new MethodRegistry(_protocolVersion);
+
+        _pendingTasks.add(new Runnable()
+        {
+            @Override
+            public void run()
+            {
+                changeState(State.INIT, State.AWAIT_START);
+                _connection.send(QpidByteBuffer.wrap(protocolHeader));
+            }
+        });
+        _stateChanged.set(true);
+    }
+
+
+    public VirtualHost<?> getVirtualHost()
+    {
+        return _virtualHost;
+    }
+
+
+    private synchronized void changeState(final State currentState, final State newState)
+    {
+        if(_state != currentState)
+        {
+            throw new ConnectionScopedRuntimeException("Incorrect state");
+        }
+        _state = newState;
+    }
+
+    private synchronized void assertState(final State currentState)
+    {
+        if (_state != currentState)
+        {
+            throw new ConnectionScopedRuntimeException("Incorrect state");
+        }
+    }
+
+    public Protocol getProtocol()
+    {
+        return _protocol;
+    }
+
+    @Override
+    public final AggregateTicker getAggregateTicker()
+    {
+        return _aggregateTicker;
+    }
+
+    public final Date getLastIoTime()
+    {
+        return new Date(Math.max(getLastReadTime(), getLastWriteTime()));
+    }
+
+    @Override
+    public final long getLastReadTime()
+    {
+        return _lastReadTime;
+    }
+
+    public final void updateLastReadTime()
+    {
+        _lastReadTime = System.currentTimeMillis();
+    }
+
+    @Override
+    public final long getLastWriteTime()
+    {
+        return _lastWriteTime;
+    }
+
+    public final void updateLastWriteTime()
+    {
+        _lastWriteTime = System.currentTimeMillis();
+    }
+
+
+    MethodRegistry getMethodRegistry()
+    {
+        return _methodRegistry;
+    }
+
+
+    @Override
+    public void closed()
+    {
+        Action<Boolean> task = _onClosedTask;
+        if(task != null)
+        {
+            task.performAction(_state == State.OPEN);
+        }
+    }
+
+    public synchronized void writerIdle()
+    {
+        writeFrame(HeartbeatBody.FRAME);
+    }
+
+    @Override
+    public void readerIdle()
+    {
+
+    }
+
+    @Override
+    public Subject getSubject()
+    {
+        return null;
+    }
+
+    @Override
+    public boolean isTransportBlockedForWriting()
+    {
+        return false;
+    }
+
+    @Override
+    public void setTransportBlockedForWriting(final boolean blocked)
+    {
+
+    }
+
+    @Override
+    public void setMessageAssignmentSuspended(final boolean value, final boolean notifyConsumers)
+    {
+
+    }
+
+    @Override
+    public boolean isMessageAssignmentSuspended()
+    {
+        return false;
+    }
+
+
+    @Override
+    public Iterator<Runnable> processPendingIterator()
+    {
+        if (!isIOThread())
+        {
+            return Collections.emptyIterator();
+        }
+        return new ProcessPendingIterator();
+    }
+
+    @Override
+    public boolean hasWork()
+    {
+        return _stateChanged.get();
+    }
+
+    @Override
+    public void notifyWork()
+    {
+        _stateChanged.set(true);
+
+        final Action<ProtocolEngine> listener = _workListener.get();
+        if(listener != null)
+        {
+
+            listener.performAction(this);
+        }
+    }
+
+    @Override
+    public void clearWork()
+    {
+        _stateChanged.set(false);
+    }
+
+    @Override
+    public void setWorkListener(final Action<ProtocolEngine> listener)
+    {
+        _workListener.set(listener);
+    }
+
+
+    @Override
+    public void encryptedTransport()
+    {
+
+    }
+
+    @Override
+    public void received(final QpidByteBuffer msg)
+    {
+        AccessController.doPrivileged(new PrivilegedAction<Void>()
+        {
+            @Override
+            public Void run()
+            {
+                updateLastReadTime();
+
+                try
+                {
+                    _decoder.decodeBuffer(msg);
+                    receivedComplete();
+                }
+                catch (AMQFrameDecodingException | IOException e)
+                {
+                    LOGGER.error("Unexpected exception", e);
+                    throw new ConnectionScopedRuntimeException(e);
+                }
+                catch (StoreException e)
+                {
+                    if (_virtualHost.isActive())
+                    {
+                        throw new ServerScopedRuntimeException(e);
+                    }
+                    else
+                    {
+                        throw new ConnectionScopedRuntimeException(e);
+                    }
+                }
+                return null;
+            }
+        }, getAccessControllerContext());
+
+
+    }
+
+    private void receivedComplete()
+    {
+        for(OutboundChannel channel : _channels)
+        {
+            if (channel != null)
+            {
+                channel.receivedComplete();
+            }
+        }
+    }
+
+
+    public AccessControlContext getAccessControllerContext()
+    {
+        return _accessControllerContext;
+    }
+
+    public final void updateAccessControllerContext()
+    {
+        _accessControllerContext = getAccessControlContextFromSubject(
+                getSubject());
+    }
+
+    public final AccessControlContext getAccessControlContextFromSubject(final Subject subject)
+    {
+        final AccessControlContext acc = AccessController.getContext();
+        return AccessController.doPrivileged(
+                new PrivilegedAction<AccessControlContext>()
+                {
+                    public AccessControlContext run()
+                    {
+                        if (subject == null)
+                            return new AccessControlContext(acc, null);
+                        else
+                            return new AccessControlContext
+                                    (acc,
+                                     new SubjectDomainCombiner(subject));
+                    }
+                });
+    }
+
+
+    @Override
+    public void setIOThread(final Thread ioThread)
+    {
+        _ioThread = ioThread;
+    }
+
+    public boolean isIOThread()
+    {
+        return Thread.currentThread() == _ioThread;
+    }
+
+
+    @Override
+    public void setConnection(final SchedulableConnection connection)
+    {
+        _connection = connection;
+    }
+
+    @Override
+    public void setOnClosedTask(final Action<Boolean> onClosedTask)
+    {
+        _onClosedTask = onClosedTask;
+    }
+
+    @Override
+    public ProtocolVersion getProtocolVersion()
+    {
+        return _protocolVersion;
+    }
+
+    @Override
+    public ClientChannelMethodProcessor getChannelMethodProcessor(final int channelId)
+    {
+        return getChannel(channelId);
+    }
+
+
+    public OutboundChannel getChannel(final int channelId)
+    {
+        return channelId < _channels.length ? _channels[channelId] : null;
+    }
+
+    public void setMaxFrameSize(int frameMax)
+    {
+        _maxFrameSize = frameMax;
+        _decoder.setMaxFrameSize(frameMax);
+    }
+
+    public int getMaxFrameSize()
+    {
+        return _maxFrameSize;
+    }
+
+    @Override
+    public void receiveConnectionClose(final int replyCode,
+                                       final AMQShortString replyText,
+                                       final int classId,
+                                       final int methodId)
+    {
+
+        writeFrame(_methodRegistry.createConnectionCloseOkBody().generateFrame(0));
+        _connection.close();
+    }
+
+    @Override
+    public void receiveConnectionCloseOk()
+    {
+        // TODO
+    }
+
+    @Override
+    public void receiveHeartbeat()
+    {
+    }
+
+    @Override
+    public void receiveProtocolHeader(final ProtocolInitiation protocolInitiation)
+    {
+        _connection.close();
+    }
+
+    @Override
+    public void setCurrentMethod(final int classId, final int methodId)
+    {
+        _classId = classId;
+        _methodId = methodId;
+    }
+
+    void writeFrame(AMQDataBlock frame)
+    {
+        LOGGER.debug("SEND: {}", frame);
+
+
+        frame.writePayload(_connection);
+
+
+        updateLastWriteTime();
+
+    }
+
+
+    @Override
+    public boolean ignoreAllButCloseOk()
+    {
+        // TODO
+        return false;
+    }
+
+    @Override
+    public void receiveConnectionStart(final short versionMajor,
+                                       final short versionMinor,
+                                       final FieldTable serverProperties,
+                                       final byte[] mechanisms,
+                                       final byte[] locales)
+    {
+        changeState(State.AWAIT_START, State.AWAIT_SECURE);
+        List<String> saslMechanisms = Arrays.asList(new String(mechanisms, StandardCharsets.UTF_8).split(" "));
+        Map propMap = FieldTable.convertToMap(serverProperties);
+
+        Collection<Credential> credentialList = _remoteHost.getChildren(Credential.class);
+
+        SaslClient client = null;
+        for(Credential<?> credentials : credentialList)
+        {
+            client = credentials.getSaslClient(saslMechanisms);
+            if(client != null)
+            {
+                break;
+            }
+        }
+
+        if(client != null)
+        {
+            _saslClient = client;
+            try
+            {
+                byte[] initialResponse = client.hasInitialResponse() ? client.evaluateChallenge(new byte[0]) : new byte[0];
+
+                FieldTable clientProperties = FieldTableFactory.newFieldTable();
+                clientProperties.setString(ServerPropertyNames.PRODUCT,
+                                           CommonProperties.getProductName());
+                clientProperties.setString(ServerPropertyNames.VERSION,
+                                           CommonProperties.getReleaseVersion());
+                clientProperties.setString(ServerPropertyNames.QPID_BUILD,
+                                           CommonProperties.getBuildVersion());
+                clientProperties.setString(ServerPropertyNames.QPID_INSTANCE_NAME,
+                                           _virtualHost.getName());
+
+
+                writeFrame(_methodRegistry.createConnectionStartOkBody(clientProperties, AMQShortString.valueOf(_saslClient.getMechanismName()), initialResponse, null).generateFrame(0));
+                if(client.isComplete())
+                {
+                    changeState(State.AWAIT_SECURE, State.AWAIT_TUNE);
+                }
+            }
+            catch (SaslException e)
+            {
+                throw new ConnectionScopedRuntimeException(e);
+            }
+        }
+        else
+        {
+            throw new ConnectionScopedRuntimeException("Unable to find acceptable sasl mechanism");
+        }
+
+    }
+
+    @Override
+    public void receiveConnectionSecure(final byte[] challenge)
+    {
+        assertState(State.AWAIT_SECURE);
+        try
+        {
+            byte[] response = _saslClient.evaluateChallenge(challenge);
+
+            writeFrame(_methodRegistry.createConnectionSecureOkBody(response).generateFrame(0));
+            if(_saslClient.isComplete())
+            {
+                changeState(State.AWAIT_SECURE, State.AWAIT_TUNE);
+            }
+        }
+        catch (SaslException e)
+        {
+            throw new ConnectionScopedRuntimeException(e);
+        }
+    }
+
+    @Override
+    public void receiveConnectionRedirect(final AMQShortString host, final AMQShortString knownHosts)
+    {
+        LOGGER.info("Connection redirect to {} received, however redirection is not followed for federation links");
+        _connection.close();
+    }
+
+    private int getDefaultMaxFrameSize()
+    {
+        Broker<?> broker = _virtualHost.getBroker();
+
+        return broker.getNetworkBufferSize();
+    }
+
+    protected void initialiseHeartbeating(final long writerDelay, final long readerDelay)
+    {
+        if (writerDelay > 0)
+        {
+            _aggregateTicker.addTicker(new ServerIdleWriteTimeoutTicker(this, (int) writerDelay));
+            _connection.setMaxWriteIdleMillis(writerDelay);
+        }
+
+        if (readerDelay > 0)
+        {
+            _aggregateTicker.addTicker(new ServerIdleReadTimeoutTicker(_connection, this, (int) readerDelay));
+            _connection.setMaxReadIdleMillis(readerDelay);
+        }
+    }
+
+
+
+    public int getMaximumNumberOfChannels()
+    {
+        return _maxNoOfChannels;
+    }
+
+    private void setMaximumNumberOfChannels(int value)
+    {
+        _maxNoOfChannels = value;
+    }
+
+
+    @Override
+    public void receiveConnectionTune(final int channelMax, final long frameMax, final int heartbeat)
+    {
+        assertState(State.AWAIT_TUNE);
+
+        final int desiredHeartbeatInterval = _address.getDesiredHeartbeatInterval();
+        int heartbeatDelay = heartbeat == 0
+                ? desiredHeartbeatInterval
+                : (desiredHeartbeatInterval == 0) ? heartbeat : Math.min(heartbeat, desiredHeartbeatInterval);
+
+        long writerDelay = 1000L * heartbeatDelay;
+        long readerDelay = 1000L * BrokerProperties.HEARTBEAT_TIMEOUT_FACTOR * heartbeatDelay;
+        initialiseHeartbeating(writerDelay, readerDelay);
+
+        int maxFrameSize = getDefaultMaxFrameSize();
+        if (maxFrameSize <= 0)
+        {
+            maxFrameSize = Integer.MAX_VALUE;
+        }
+
+        if (frameMax > 0 && frameMax < maxFrameSize)
+        {
+            maxFrameSize = (int) frameMax;
+        }
+
+        setMaxFrameSize(maxFrameSize);
+
+        //0 means no implied limit, except that forced by protocol limitations (0xFFFF)
+        setMaximumNumberOfChannels( ((channelMax == 0l) || (channelMax > 0xFFFFL))
+                                            ? 0xFFFF
+                                            : (int)channelMax);
+
+
+
+        writeFrame(_methodRegistry.createConnectionTuneOkBody(_maxNoOfChannels, maxFrameSize, heartbeatDelay).generateFrame(0));
+        writeFrame(_methodRegistry.createConnectionOpenBody(AMQShortString.valueOf(_address.getHostName()), AMQShortString.EMPTY_STRING, false).generateFrame(0));
+
+        changeState(State.AWAIT_TUNE, State.AWAIT_OPEN_OK);
+
+
+    }
+
+    @Override
+    public void receiveConnectionOpenOk(final AMQShortString knownHosts)
+    {
+        changeState(State.AWAIT_OPEN_OK, State.OPEN);
+
+        createTransferSession();
+
+    }
+
+    private void createTransferSession()
+    {
+        _channels[TRANSFER_SESSION_ID] = new TransferSession_0_8(TRANSFER_SESSION_ID, this);
+    }
+
+
+    private class ProcessPendingIterator implements Iterator<Runnable>
+    {
+        private final List<OutboundChannel> _sessionsWithPending;
+        private Iterator<OutboundChannel> _sessionIterator;
+        private ProcessPendingIterator()
+        {
+            _sessionsWithPending = new ArrayList<>();
+            for(OutboundChannel channel : _channels)
+            {
+                if(channel != null)
+                {
+                    _sessionsWithPending.add(channel);
+                }
+            }
+            _sessionIterator = _sessionsWithPending.iterator();
+        }
+
+        @Override
+        public boolean hasNext()
+        {
+            return !(_sessionsWithPending.isEmpty() && _pendingTasks.isEmpty());
+        }
+
+        @Override
+        public Runnable next()
+        {
+            if(!_sessionsWithPending.isEmpty())
+            {
+                if(!_sessionIterator.hasNext())
+                {
+                    _sessionIterator = _sessionsWithPending.iterator();
+                }
+                final OutboundChannel session = _sessionIterator.next();
+                return new Runnable()
+                {
+                    @Override
+                    public void run()
+                    {
+                        if(!session.processPending())
+                        {
+                            _sessionIterator.remove();
+                        }
+                    }
+                };
+            }
+            else if(!_pendingTasks.isEmpty())
+            {
+                return _pendingTasks.remove(0);
+            }
+            else
+            {
+                throw new NoSuchElementException();
+            }
+        }
+
+        @Override
+        public void remove()
+        {
+            throw new UnsupportedOperationException();
+        }
+    }
+
+}

Propchange: qpid/java/branches/transfer-queue/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/federation/OutboundConnection_0_8.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: qpid/java/branches/transfer-queue/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/federation/OutboundProtocolEngineCreator_0_8.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/transfer-queue/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/federation/OutboundProtocolEngineCreator_0_8.java?rev=1765973&view=auto
==============================================================================
--- qpid/java/branches/transfer-queue/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/federation/OutboundProtocolEngineCreator_0_8.java (added)
+++ qpid/java/branches/transfer-queue/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/federation/OutboundProtocolEngineCreator_0_8.java Fri Oct 21 09:32:07 2016
@@ -0,0 +1,65 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.v0_8.federation;
+
+import org.apache.qpid.server.federation.OutboundProtocolEngine;
+import org.apache.qpid.server.model.Protocol;
+import org.apache.qpid.server.model.RemoteHostAddress;
+import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.server.plugin.OutboundProtocolEngineCreator;
+import org.apache.qpid.server.plugin.PluggableService;
+
+@PluggableService
+public class OutboundProtocolEngineCreator_0_8 implements OutboundProtocolEngineCreator
+{
+
+    private static final byte[] AMQP_0_8_HEADER =
+            new byte[] { (byte) 'A',
+                         (byte) 'M',
+                         (byte) 'Q',
+                         (byte) 'P',
+                         (byte) 1,
+                         (byte) 1,
+                         (byte) 8,
+                         (byte) 0
+            };
+
+    public OutboundProtocolEngineCreator_0_8()
+    {
+    }
+
+    public Protocol getVersion()
+    {
+        return Protocol.AMQP_0_8;
+    }
+
+    @Override
+    public OutboundProtocolEngine newProtocolEngine(final RemoteHostAddress<?> address, final VirtualHost<?> virtualHost)
+    {
+        return new OutboundConnection_0_8(address, virtualHost, Protocol.AMQP_0_8, AMQP_0_8_HEADER);
+    }
+
+    @Override
+    public String getType()
+    {
+        return getVersion().toString();
+    }
+}

Propchange: qpid/java/branches/transfer-queue/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/federation/OutboundProtocolEngineCreator_0_8.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: qpid/java/branches/transfer-queue/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/federation/OutboundProtocolEngineCreator_0_9.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/transfer-queue/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/federation/OutboundProtocolEngineCreator_0_9.java?rev=1765973&view=auto
==============================================================================
--- qpid/java/branches/transfer-queue/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/federation/OutboundProtocolEngineCreator_0_9.java (added)
+++ qpid/java/branches/transfer-queue/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/federation/OutboundProtocolEngineCreator_0_9.java Fri Oct 21 09:32:07 2016
@@ -0,0 +1,65 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.v0_8.federation;
+
+import org.apache.qpid.server.federation.OutboundProtocolEngine;
+import org.apache.qpid.server.model.Protocol;
+import org.apache.qpid.server.model.RemoteHostAddress;
+import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.server.plugin.OutboundProtocolEngineCreator;
+import org.apache.qpid.server.plugin.PluggableService;
+
+@PluggableService
+public class OutboundProtocolEngineCreator_0_9 implements OutboundProtocolEngineCreator
+{
+
+    private static final byte[] AMQP_0_9_HEADER =
+            new byte[] { (byte) 'A',
+                         (byte) 'M',
+                         (byte) 'Q',
+                         (byte) 'P',
+                         (byte) 1,
+                         (byte) 1,
+                         (byte) 0,
+                         (byte) 9
+            };
+
+    public OutboundProtocolEngineCreator_0_9()
+    {
+    }
+
+    public Protocol getVersion()
+    {
+        return Protocol.AMQP_0_9;
+    }
+
+    @Override
+    public OutboundProtocolEngine newProtocolEngine(final RemoteHostAddress<?> address, final VirtualHost<?> virtualHost)
+    {
+        return new OutboundConnection_0_8(address, virtualHost, Protocol.AMQP_0_9, AMQP_0_9_HEADER);
+    }
+
+    @Override
+    public String getType()
+    {
+        return getVersion().toString();
+    }
+}




---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message