qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rgodf...@apache.org
Subject svn commit: r1765973 [7/7] - in /qpid/java/branches/transfer-queue: bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/ bdbstore/src/test/java/org/apache/qpid/server/stor...
Date Fri, 21 Oct 2016 09:32:09 GMT
Propchange: qpid/java/branches/transfer-queue/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/federation/OutboundProtocolEngineCreator_0_9.java
------------------------------------------------------------------------------
    svn:eol-style = native

Copied: qpid/java/branches/transfer-queue/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/federation/OutboundProtocolEngineCreator_0_9_1.java (from r1755476, qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolEngineCreator_0_9_1.java)
URL: http://svn.apache.org/viewvc/qpid/java/branches/transfer-queue/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/federation/OutboundProtocolEngineCreator_0_9_1.java?p2=qpid/java/branches/transfer-queue/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/federation/OutboundProtocolEngineCreator_0_9_1.java&p1=qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolEngineCreator_0_9_1.java&r1=1755476&r2=1765973&rev=1765973&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolEngineCreator_0_9_1.java (original)
+++ qpid/java/branches/transfer-queue/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/federation/OutboundProtocolEngineCreator_0_9_1.java Fri Oct 21 09:32:07 2016
@@ -18,20 +18,17 @@
  * under the License.
  *
  */
-package org.apache.qpid.server.protocol.v0_8;
+package org.apache.qpid.server.protocol.v0_8.federation;
 
-import org.apache.qpid.server.transport.ProtocolEngine;
-import org.apache.qpid.server.model.Broker;
+import org.apache.qpid.server.federation.OutboundProtocolEngine;
 import org.apache.qpid.server.model.Protocol;
-import org.apache.qpid.server.model.Transport;
-import org.apache.qpid.server.model.port.AmqpPort;
+import org.apache.qpid.server.model.RemoteHostAddress;
+import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.server.plugin.OutboundProtocolEngineCreator;
 import org.apache.qpid.server.plugin.PluggableService;
-import org.apache.qpid.server.plugin.ProtocolEngineCreator;
-import org.apache.qpid.server.transport.ServerNetworkConnection;
-import org.apache.qpid.transport.network.AggregateTicker;
 
 @PluggableService
-public class ProtocolEngineCreator_0_9_1 implements ProtocolEngineCreator
+public class OutboundProtocolEngineCreator_0_9_1 implements OutboundProtocolEngineCreator
 {
 
     private static final byte[] AMQP_0_9_1_HEADER =
@@ -45,7 +42,7 @@ public class ProtocolEngineCreator_0_9_1
                          (byte) 1
             };
 
-    public ProtocolEngineCreator_0_9_1()
+    public OutboundProtocolEngineCreator_0_9_1()
     {
     }
 
@@ -54,35 +51,10 @@ public class ProtocolEngineCreator_0_9_1
         return Protocol.AMQP_0_9_1;
     }
 
-
-    public byte[] getHeaderIdentifier()
-    {
-        return AMQP_0_9_1_HEADER;
-    }
-
-    public ProtocolEngine newProtocolEngine(Broker<?> broker,
-                                            ServerNetworkConnection network,
-                                            AmqpPort<?> port,
-                                            Transport transport,
-                                            long id, final AggregateTicker aggregateTicker)
-    {
-        final AMQPConnection_0_8Impl protocolEngine =
-                new AMQPConnection_0_8Impl(broker, network, port, transport, getVersion(), id, aggregateTicker);
-        protocolEngine.create();
-        return protocolEngine;
-    }
-
     @Override
-    public byte[] getSuggestedAlternativeHeader()
-    {
-        return null;
-    }
-
-    private static ProtocolEngineCreator INSTANCE = new ProtocolEngineCreator_0_9_1();
-
-    public static ProtocolEngineCreator getInstance()
+    public OutboundProtocolEngine newProtocolEngine(final RemoteHostAddress<?> address, final VirtualHost<?> virtualHost)
     {
-        return INSTANCE;
+        return new OutboundConnection_0_8(address, virtualHost, Protocol.AMQP_0_9_1, AMQP_0_9_1_HEADER);
     }
 
     @Override

Added: qpid/java/branches/transfer-queue/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/federation/TransferSession_0_8.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/transfer-queue/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/federation/TransferSession_0_8.java?rev=1765973&view=auto
==============================================================================
--- qpid/java/branches/transfer-queue/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/federation/TransferSession_0_8.java (added)
+++ qpid/java/branches/transfer-queue/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/federation/TransferSession_0_8.java Fri Oct 21 09:32:07 2016
@@ -0,0 +1,721 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.v0_8.federation;
+
+import java.security.AccessControlContext;
+import java.util.Collection;
+import java.util.EnumMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Queue;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.concurrent.TimeoutException;
+
+import com.google.common.util.concurrent.AbstractFuture;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.framing.AMQBody;
+import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.BasicGetOkBody;
+import org.apache.qpid.framing.ConfirmSelectBody;
+import org.apache.qpid.framing.ConfirmSelectOkBody;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.MethodRegistry;
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.plugin.MessageConverter;
+import org.apache.qpid.server.protocol.MessageConverterRegistry;
+import org.apache.qpid.server.protocol.v0_8.AMQMessage;
+import org.apache.qpid.server.protocol.v0_8.MessageContentSourceBody;
+import org.apache.qpid.server.transfer.TransferQueueConsumer;
+import org.apache.qpid.server.transfer.TransferQueueEntry;
+import org.apache.qpid.server.txn.LocalTransaction;
+import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
+
+class TransferSession_0_8 implements OutboundChannel
+{
+    private static final Logger LOGGER = LoggerFactory.getLogger(TransferSession_0_8.class);
+    private static final Runnable NO_OP = new Runnable()
+    {
+        @Override
+        public void run()
+        {
+
+        }
+    };
+
+    private final OutboundConnection_0_8 _connection;
+    private final int _channelId;
+    private final MethodRegistry _methodRegistry;
+    private final LocalTransaction _txn;
+    private Collection<String> _remoteHostGlobalDomains;
+    private TransferTarget_0_8 _target;
+    private TransferQueueConsumer _consumer;
+    private final SortedMap<Long, TransferQueueEntry> _unconfirmed = new TreeMap<>();
+    private volatile boolean _flowControlled;
+    private volatile long _lastSent = -0L;
+
+    private int _maxUnconfirmed = 200;
+
+    enum Operation {
+        EXCHANGE_DECLARE,
+        EXCHANGE_DELETE,
+        EXCHANGE_BOUND,
+        QUEUE_BIND,
+        QUEUE_UNBIND,
+        QUEUE_DECLARE,
+        QUEUE_DELETE,
+        QUEUE_PURGE,
+        BASIC_RECOVER_SYNC,
+        BAISC_QOS,
+        BASIC_CONSUME,
+        BASIC_CANCEL,
+        TX_SELECT,
+        TX_COMMIT,
+        TX_ROLLBACK,
+        CHANNEL_FLOW,
+        CONFIRM_SELECT,
+        ACK,
+        BASIC_GET
+    }
+
+    private static final class TimedSettableFuture<V> extends AbstractFuture<V>
+    {
+        private final long _timeoutTime;
+
+        private TimedSettableFuture(final long timeoutTime)
+        {
+            _timeoutTime = timeoutTime;
+        }
+
+        public boolean set(V value)
+        {
+            return super.set(value);
+        }
+
+        @Override
+        public boolean setException(Throwable throwable)
+        {
+            return super.setException(throwable);
+        }
+
+        public long getTimeoutTime()
+        {
+            return _timeoutTime;
+        }
+
+        public boolean checkTimeout(long time)
+        {
+            if(time > _timeoutTime)
+            {
+                setException(new TimeoutException("Timed out waiting for response"));
+                return true;
+            }
+            else
+            {
+                return false;
+            }
+        }
+    }
+
+    private final Map<Operation, Queue<TimedSettableFuture<AMQMethodBody>>> _synchronousOperationListeners = new EnumMap<>(Operation.class);
+
+    {
+        for(Operation op : Operation.values())
+        {
+            _synchronousOperationListeners.put(op, new LinkedList<TimedSettableFuture<AMQMethodBody>>());
+        }
+    }
+
+    enum State { AWAITING_OPEN, OPEN, AWAITING_CLOSE_OK }
+
+    private State _state = State.AWAITING_OPEN;
+
+    private interface MessageHandler
+    {
+        void receiveMessageHeader(BasicContentHeaderProperties properties, long bodySize);
+
+        void receiveMessageContent(QpidByteBuffer data);
+    }
+
+
+    private final MessageHandler _unexpectedMessageHandler = new MessageHandler()
+    {
+        @Override
+        public void receiveMessageHeader(final BasicContentHeaderProperties properties, final long bodySize)
+        {
+            throw new ConnectionScopedRuntimeException("Unexpected frame");
+        }
+
+        @Override
+        public void receiveMessageContent(final QpidByteBuffer data)
+        {
+            throw new ConnectionScopedRuntimeException("Unexpected frame");
+        }
+    };
+
+    private volatile MessageHandler _messageHandler = _unexpectedMessageHandler;
+
+
+    TransferSession_0_8(final int channelId, final OutboundConnection_0_8 outboundConnection)
+    {
+        _connection = outboundConnection;
+        _channelId = channelId;
+        _methodRegistry = outboundConnection.getMethodRegistry();
+        writeMethod(_methodRegistry.createChannelOpenBody(AMQShortString.EMPTY_STRING));
+        _txn = new LocalTransaction(outboundConnection.getVirtualHost().getMessageStore());
+    }
+
+    private synchronized void changeState(final State currentState, final State newState)
+    {
+        if(_state != currentState)
+        {
+            throw new ConnectionScopedRuntimeException("Incorrect state");
+        }
+        _state = newState;
+    }
+
+    private synchronized void assertState(final State currentState)
+    {
+        if (_state != currentState)
+        {
+            throw new ConnectionScopedRuntimeException("Incorrect state");
+        }
+    }
+
+
+    @Override
+    public void receiveChannelOpenOk()
+    {
+        changeState(State.AWAITING_OPEN, State.OPEN);
+
+        writeMethod(new ConfirmSelectBody(true));
+
+        writeMethod(_methodRegistry.createBasicGetBody(0, AMQShortString.valueOf("$virtualhostProperties"), true));
+        final FutureCallback<AMQMethodBody> callback = new FutureCallback<AMQMethodBody>()
+        {
+            @Override
+            public void onSuccess(final AMQMethodBody result)
+            {
+                if (result instanceof BasicGetOkBody)
+                {
+                    _messageHandler = new VirtualHostPropertiesHandler();
+                }
+                else
+                {
+                    throw new ConnectionScopedRuntimeException("Unable to determine virtual host properties from remote host");
+                }
+            }
+
+            @Override
+            public void onFailure(final Throwable t)
+            {
+                if(t instanceof RuntimeException)
+                {
+                    throw ((RuntimeException)t);
+                }
+                else
+                {
+                    throw new ConnectionScopedRuntimeException(t);
+                }
+            }
+        };
+        addOperationResponse(Operation.BASIC_GET, callback);
+    }
+
+    private void addOperationResponse(Operation operation, FutureCallback<AMQMethodBody> callback)
+    {
+        final TimedSettableFuture<AMQMethodBody> future =
+                new TimedSettableFuture<>(System.currentTimeMillis() + 30000L);
+        Futures.addCallback(future, callback);
+        _synchronousOperationListeners.get(operation).add(future);
+    }
+
+    private void performOperationResponse(Operation operation, AMQMethodBody body)
+    {
+        final TimedSettableFuture<AMQMethodBody> future = _synchronousOperationListeners.get(operation).poll();
+
+        if(future == null)
+        {
+            throw new ConnectionScopedRuntimeException("Unexpected frame ");
+        }
+        else
+        {
+            future.set(body);
+        }
+    }
+
+
+    private void setRemoteHostGlobalDomains(final Collection<String> remoteHostGlobalDomains)
+    {
+        LOGGER.debug("Setting remote host global domains: {}", remoteHostGlobalDomains);
+        _remoteHostGlobalDomains = remoteHostGlobalDomains;
+
+        _target = new TransferTarget_0_8(this, remoteHostGlobalDomains);
+        _consumer = _connection.getVirtualHost().getTransferQueue().addConsumer(_target, "consumer");
+    }
+
+
+
+    @Override
+    public void receiveChannelAlert(final int replyCode, final AMQShortString replyText, final FieldTable details)
+    {
+
+    }
+
+    @Override
+    public void receiveAccessRequestOk(final int ticket)
+    {
+
+    }
+
+    @Override
+    public void receiveExchangeDeclareOk()
+    {
+        performOperationResponse(Operation.EXCHANGE_DECLARE, _methodRegistry.createExchangeDeclareOkBody());
+    }
+
+    @Override
+    public void receiveExchangeDeleteOk()
+    {
+        performOperationResponse(Operation.EXCHANGE_DELETE, _methodRegistry.createExchangeDeleteOkBody());
+    }
+
+    @Override
+    public void receiveExchangeBoundOk(final int replyCode, final AMQShortString replyText)
+    {
+        performOperationResponse(Operation.EXCHANGE_BOUND, _methodRegistry.createExchangeBoundOkBody(replyCode, replyText));
+    }
+
+    @Override
+    public void receiveQueueBindOk()
+    {
+        performOperationResponse(Operation.QUEUE_BIND, _methodRegistry.createQueueBindOkBody());
+    }
+
+    @Override
+    public void receiveQueueUnbindOk()
+    {
+        performOperationResponse(Operation.QUEUE_UNBIND, _methodRegistry.createQueueUnbindOkBody());
+    }
+
+    @Override
+    public void receiveQueueDeclareOk(final AMQShortString queue, final long messageCount, final long consumerCount)
+    {
+        performOperationResponse(Operation.QUEUE_DECLARE, _methodRegistry.createQueueDeclareOkBody(queue, messageCount, consumerCount));
+    }
+
+    @Override
+    public void receiveQueuePurgeOk(final long messageCount)
+    {
+        performOperationResponse(Operation.QUEUE_PURGE, _methodRegistry.createQueuePurgeOkBody(messageCount));
+
+    }
+
+    @Override
+    public void receiveQueueDeleteOk(final long messageCount)
+    {
+        performOperationResponse(Operation.QUEUE_DELETE, _methodRegistry.createQueueDeleteOkBody(messageCount));
+
+    }
+
+    @Override
+    public void receiveBasicRecoverSyncOk()
+    {
+        performOperationResponse(Operation.BASIC_RECOVER_SYNC, _methodRegistry.createBasicRecoverSyncOkBody());
+    }
+
+    @Override
+    public void receiveBasicQosOk()
+    {
+        performOperationResponse(Operation.BAISC_QOS, _methodRegistry.createBasicQosOkBody());
+    }
+
+    @Override
+    public void receiveBasicConsumeOk(final AMQShortString consumerTag)
+    {
+        performOperationResponse(Operation.BASIC_CONSUME, _methodRegistry.createBasicConsumeOkBody(consumerTag));
+    }
+
+    @Override
+    public void receiveBasicCancelOk(final AMQShortString consumerTag)
+    {
+        performOperationResponse(Operation.BASIC_CANCEL, _methodRegistry.createBasicCancelOkBody(consumerTag));
+    }
+
+    @Override
+    public void receiveBasicReturn(final int replyCode,
+                                   final AMQShortString replyText,
+                                   final AMQShortString exchange,
+                                   final AMQShortString routingKey)
+    {
+
+    }
+
+    @Override
+    public void receiveBasicDeliver(final AMQShortString consumerTag,
+                                    final long deliveryTag,
+                                    final boolean redelivered,
+                                    final AMQShortString exchange,
+                                    final AMQShortString routingKey)
+    {
+
+    }
+
+    @Override
+    public void receiveBasicGetOk(final long deliveryTag,
+                                  final boolean redelivered,
+                                  final AMQShortString exchange,
+                                  final AMQShortString routingKey,
+                                  final long messageCount)
+    {
+        performOperationResponse(Operation.BASIC_GET, _methodRegistry.createBasicGetOkBody(deliveryTag, redelivered, exchange, routingKey, messageCount));
+    }
+
+    @Override
+    public void receiveBasicGetEmpty()
+    {
+        performOperationResponse(Operation.BASIC_GET, _methodRegistry.createBasicGetEmptyBody(AMQShortString.EMPTY_STRING));
+    }
+
+    @Override
+    public void receiveTxSelectOk()
+    {
+        performOperationResponse(Operation.TX_SELECT, _methodRegistry.createTxSelectOkBody());
+    }
+
+    @Override
+    public void receiveTxCommitOk()
+    {
+        performOperationResponse(Operation.TX_COMMIT, _methodRegistry.createTxCommitOkBody());
+    }
+
+    @Override
+    public void receiveTxRollbackOk()
+    {
+        performOperationResponse(Operation.TX_ROLLBACK, _methodRegistry.createTxRollbackOkBody());
+    }
+
+    @Override
+    public void receiveConfirmSelectOk()
+    {
+        performOperationResponse(Operation.CONFIRM_SELECT, ConfirmSelectOkBody.INSTANCE);
+    }
+
+    @Override
+    public void receiveChannelFlow(final boolean active)
+    {
+        _flowControlled = !active;
+        writeMethod(_methodRegistry.createChannelFlowOkBody(active));
+
+    }
+
+    @Override
+    public void receiveChannelFlowOk(final boolean active)
+    {
+        performOperationResponse(Operation.CHANNEL_FLOW, _methodRegistry.createChannelFlowOkBody(active));
+    }
+
+    @Override
+    public void receiveChannelClose(final int replyCode,
+                                    final AMQShortString replyText,
+                                    final int classId,
+                                    final int methodId)
+    {
+
+    }
+
+    @Override
+    public void receiveChannelCloseOk()
+    {
+    }
+
+    @Override
+    public void receiveMessageContent(final QpidByteBuffer data)
+    {
+        _messageHandler.receiveMessageContent(data);
+    }
+
+    @Override
+    public void receiveMessageHeader(final BasicContentHeaderProperties properties, final long bodySize)
+    {
+        _messageHandler.receiveMessageHeader(properties, bodySize);
+    }
+
+    @Override
+    public boolean ignoreAllButCloseOk()
+    {
+        return false;
+    }
+
+    @Override
+    public void receiveBasicNack(final long deliveryTag, final boolean multiple, final boolean requeue)
+    {
+        if(multiple)
+        {
+            Iterator<Map.Entry<Long, TransferQueueEntry>> iter = _unconfirmed.entrySet().iterator();
+            while(iter.hasNext())
+            {
+                Map.Entry<Long, TransferQueueEntry> pair = iter.next();
+                if(pair.getKey() > deliveryTag)
+                {
+                    break;
+                }
+                else
+                {
+                    final TransferQueueEntry entry = pair.getValue();
+
+                    if(requeue)
+                    {
+                        entry.release();
+                    }
+                    else
+                    {
+                        _txn.dequeue(entry.getEnqueueRecord(), new ServerTransaction.Action()
+                        {
+                            @Override
+                            public void postCommit()
+                            {
+                                entry.delete();
+                            }
+
+                            @Override
+                            public void onRollback()
+                            {
+                                entry.release();
+                            }
+                        });
+
+                    }
+                }
+            }
+        }
+        else
+        {
+            final TransferQueueEntry entry = _unconfirmed.remove(deliveryTag);
+            if(requeue)
+            {
+                entry.release();
+            }
+            else
+            {
+                _txn.dequeue(entry.getEnqueueRecord(), new ServerTransaction.Action()
+                {
+                    @Override
+                    public void postCommit()
+                    {
+                        entry.delete();
+                    }
+
+                    @Override
+                    public void onRollback()
+                    {
+                        entry.release();
+                    }
+                });
+            }
+        }
+    }
+
+    @Override
+    public void receiveBasicAck(final long deliveryTag, final boolean multiple)
+    {
+        if(multiple)
+        {
+            Iterator<Map.Entry<Long, TransferQueueEntry>> iter = _unconfirmed.entrySet().iterator();
+            while(iter.hasNext())
+            {
+                Map.Entry<Long, TransferQueueEntry> pair = iter.next();
+                if(pair.getKey() > deliveryTag)
+                {
+                    break;
+                }
+                else
+                {
+                    final TransferQueueEntry entry = pair.getValue();
+                    _txn.dequeue(entry.getEnqueueRecord(), new ServerTransaction.Action()
+                    {
+                        @Override
+                        public void postCommit()
+                        {
+                            entry.delete();
+                        }
+
+                        @Override
+                        public void onRollback()
+                        {
+                            entry.release();
+                        }
+                    });
+                }
+            }
+        }
+        else
+        {
+            final TransferQueueEntry entry = _unconfirmed.remove(deliveryTag);
+            _txn.dequeue(entry.getEnqueueRecord(), new ServerTransaction.Action()
+            {
+                @Override
+                public void postCommit()
+                {
+                    entry.delete();
+                }
+
+                @Override
+                public void onRollback()
+                {
+                    entry.release();
+                }
+            });
+
+        }
+    }
+
+    boolean isSuspended()
+    {
+        return _flowControlled || _unconfirmed.size() > _maxUnconfirmed;
+    }
+
+    @Override
+    public AccessControlContext getAccessControllerContext()
+    {
+        return null;
+    }
+
+    @Override
+    public boolean processPending()
+    {
+        if(_consumer != null)
+        {
+            return _consumer.processPending();
+        }
+        return false;
+    }
+
+    void notifyWork()
+    {
+        _connection.notifyWork();
+    }
+
+    @Override
+    public void receivedComplete()
+    {
+        _txn.commitAsync(NO_OP);
+    }
+
+    void writeMethod(AMQMethodBody body)
+    {
+        writeFrame(body.generateFrame(_channelId));
+    }
+
+    void writeFrame(AMQFrame frame)
+    {
+        _connection.writeFrame(frame);
+    }
+
+
+    private class VirtualHostPropertiesHandler implements MessageHandler
+    {
+        private long _remaining;
+        private BasicContentHeaderProperties _properties;
+
+        @Override
+        public void receiveMessageHeader(final BasicContentHeaderProperties properties, final long bodySize)
+        {
+            _remaining = bodySize;
+            _properties = properties;
+            receiveVirtualHostProperties(_properties);
+        }
+
+        @Override
+        public void receiveMessageContent(final QpidByteBuffer data)
+        {
+            if((_remaining -= data.remaining()) == 0l)
+            {
+                _messageHandler = _unexpectedMessageHandler;
+            }
+        }
+    }
+
+    private void receiveVirtualHostProperties(final BasicContentHeaderProperties properties)
+    {
+        Collection<String> remoteHostGlobalDomains =
+                properties.getHeaders().getFieldArray("virtualhost.globalDomains");
+        setRemoteHostGlobalDomains(remoteHostGlobalDomains);
+    }
+
+
+    void transfer(final TransferQueueEntry entry)
+    {
+        ServerMessage message = entry.getMessage();
+        _unconfirmed.put(++_lastSent, entry);
+
+        writeMethod(_methodRegistry.createBasicPublishBody(0, "", message.getInitialRoutingAddress(), false, false));
+        if(!(message instanceof AMQMessage))
+        {
+            final MessageConverter converter =
+                    MessageConverterRegistry.getConverter(message.getClass(), AMQMessage.class);
+            message = converter.convert(message, _connection.getVirtualHost());
+        }
+
+        AMQMessage amqMessage = (AMQMessage) message;
+        ContentHeaderBody contentHeaderBody = amqMessage.getContentHeaderBody();
+        writeHeader(contentHeaderBody);
+
+        int bodySize = (int) contentHeaderBody.getBodySize();
+
+        if (bodySize > 0)
+        {
+            int maxBodySize = (int) _connection.getMaxFrameSize() - AMQFrame.getFrameOverhead();
+
+            int writtenSize = 0;
+
+            while (writtenSize < bodySize)
+            {
+                int capacity = bodySize - writtenSize > maxBodySize ? maxBodySize : bodySize - writtenSize;
+                AMQBody body = new MessageContentSourceBody(message, writtenSize, capacity);
+                writtenSize += capacity;
+
+                writeFrame(new AMQFrame(_channelId, body));
+            }
+        }
+
+    }
+
+
+
+
+
+    private void writeHeader(final ContentHeaderBody headerBody)
+    {
+        _connection.writeFrame(new AMQFrame(_channelId, headerBody));
+    }
+}

Propchange: qpid/java/branches/transfer-queue/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/federation/TransferSession_0_8.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: qpid/java/branches/transfer-queue/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/federation/TransferTarget_0_8.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/transfer-queue/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/federation/TransferTarget_0_8.java?rev=1765973&view=auto
==============================================================================
--- qpid/java/branches/transfer-queue/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/federation/TransferTarget_0_8.java (added)
+++ qpid/java/branches/transfer-queue/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/federation/TransferTarget_0_8.java Fri Oct 21 09:32:07 2016
@@ -0,0 +1,77 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.v0_8.federation;
+
+import java.util.Collection;
+import java.util.Collections;
+
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.transfer.TransferQueueEntry;
+import org.apache.qpid.server.transfer.TransferTarget;
+
+public class TransferTarget_0_8 implements TransferTarget
+{
+    private final TransferSession_0_8 _session;
+    private final Collection<String> _globalDomains;
+
+    public TransferTarget_0_8(final TransferSession_0_8 transferSession_0_8,
+                              final Collection<String> remoteHostGlobalDomains)
+    {
+        _session = transferSession_0_8;
+        _globalDomains = remoteHostGlobalDomains;
+    }
+
+    @Override
+    public void notifyWork()
+    {
+        _session.notifyWork();
+    }
+
+    @Override
+    public Collection<String> getGlobalAddressDomains()
+    {
+        return Collections.unmodifiableCollection(_globalDomains);
+    }
+
+    @Override
+    public void send(final TransferQueueEntry entry)
+    {
+        _session.transfer(entry);
+    }
+
+    @Override
+    public void restoreCredit(final ServerMessage message)
+    {
+
+    }
+
+    @Override
+    public boolean wouldSuspend(final TransferQueueEntry entry)
+    {
+        return _session.isSuspended();
+    }
+
+    @Override
+    public boolean isSuspended()
+    {
+        return _session.isSuspended();
+    }
+}

Propchange: qpid/java/branches/transfer-queue/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/federation/TransferTarget_0_8.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: qpid/java/branches/transfer-queue/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeueTest.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/transfer-queue/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeueTest.java?rev=1765973&r1=1765972&r2=1765973&view=diff
==============================================================================
--- qpid/java/branches/transfer-queue/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeueTest.java (original)
+++ qpid/java/branches/transfer-queue/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeueTest.java Fri Oct 21 09:32:07 2016
@@ -20,23 +20,25 @@
  */
 package org.apache.qpid.server.protocol.v0_8;
 
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.Map;
+
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
 import org.apache.qpid.QpidException;
-import org.apache.qpid.server.consumer.ConsumerImpl;
 import org.apache.qpid.server.message.MessageInstance;
+import org.apache.qpid.server.message.MessageInstanceConsumer;
 import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.model.Consumer;
 import org.apache.qpid.server.model.Queue;
 import org.apache.qpid.server.queue.QueueEntry;
 import org.apache.qpid.test.utils.QpidTestCase;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-
-import java.util.LinkedHashMap;
-import java.util.LinkedList;
-import java.util.Map;
-
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
 
 /**
  * QPID-1385 : Race condition between added to unacked map and resending due to a rollback.
@@ -62,7 +64,7 @@ public class ExtractResendAndRequeueTest
     private static final int INITIAL_MSG_COUNT = 10;
     private Queue _queue;
     private LinkedList<MessageInstance> _referenceList = new LinkedList<MessageInstance>();
-    private ConsumerImpl _consumer;
+    private MessageInstanceConsumer _consumer;
     private boolean _queueDeleted;
 
     @Override
@@ -73,8 +75,8 @@ public class ExtractResendAndRequeueTest
         _queue = mock(Queue.class);
         when(_queue.getName()).thenReturn(getName());
         when(_queue.isDeleted()).thenReturn(_queueDeleted);
-        _consumer = mock(ConsumerImpl.class);
-        when(_consumer.getConsumerNumber()).thenReturn(ConsumerImpl.CONSUMER_NUMBER_GENERATOR.getAndIncrement());
+        _consumer = mock(MessageInstanceConsumer.class);
+        when(_consumer.getIdentifier()).thenReturn(Consumer.CONSUMER_NUMBER_GENERATOR.getAndIncrement());
 
 
         long id = 0;

Modified: qpid/java/branches/transfer-queue/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapTest.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/transfer-queue/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapTest.java?rev=1765973&r1=1765972&r2=1765973&view=diff
==============================================================================
--- qpid/java/branches/transfer-queue/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapTest.java (original)
+++ qpid/java/branches/transfer-queue/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapTest.java Fri Oct 21 09:32:07 2016
@@ -25,13 +25,13 @@ import static org.mockito.Mockito.when;
 
 import java.util.Collection;
 
-import org.apache.qpid.server.consumer.ConsumerImpl;
 import org.apache.qpid.server.message.MessageInstance;
+import org.apache.qpid.server.message.MessageInstanceConsumer;
 import org.apache.qpid.test.utils.QpidTestCase;
 
 public class UnacknowledgedMessageMapTest extends QpidTestCase
 {
-    private final ConsumerImpl _consumer = mock(ConsumerImpl.class);
+    private final MessageInstanceConsumer _consumer = mock(MessageInstanceConsumer.class);
 
     public void testDeletedMessagesCantBeAcknowledged()
     {

Modified: qpid/java/branches/transfer-queue/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/transfer-queue/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java?rev=1765973&r1=1765972&r2=1765973&view=diff
==============================================================================
--- qpid/java/branches/transfer-queue/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java (original)
+++ qpid/java/branches/transfer-queue/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java Fri Oct 21 09:32:07 2016
@@ -26,6 +26,7 @@ import java.util.Collection;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.qpid.server.message.MessageInstanceConsumer;
 import org.apache.qpid.server.protocol.v1_0.codec.ValueHandler;
 import org.apache.qpid.server.protocol.v1_0.messaging.SectionEncoder;
 import org.apache.qpid.server.protocol.v1_0.messaging.SectionEncoderImpl;
@@ -46,7 +47,6 @@ import org.apache.qpid.server.protocol.v
 import org.apache.qpid.bytebuffer.QpidByteBuffer;
 import org.apache.qpid.server.transport.ProtocolEngine;
 import org.apache.qpid.server.consumer.AbstractConsumerTarget;
-import org.apache.qpid.server.consumer.ConsumerImpl;
 import org.apache.qpid.server.message.MessageInstance;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.plugin.MessageConverter;
@@ -67,7 +67,7 @@ class ConsumerTarget_1_0 extends Abstrac
     private Binary _transactionId;
     private final AMQPDescribedTypeRegistry _typeRegistry;
     private final SectionEncoder _sectionEncoder;
-    private ConsumerImpl _consumer;
+    private MessageInstanceConsumer _consumer;
     private boolean _queueEmpty;
 
     public ConsumerTarget_1_0(final SendingLink_1_0 link,
@@ -80,7 +80,7 @@ class ConsumerTarget_1_0 extends Abstrac
         _acquires = acquires;
     }
 
-    public ConsumerImpl getConsumer()
+    public MessageInstanceConsumer getConsumer()
     {
         return _consumer;
     }
@@ -109,7 +109,7 @@ class ConsumerTarget_1_0 extends Abstrac
 
     }
 
-    public void doSend(final ConsumerImpl consumer, final MessageInstance entry, boolean batch)
+    public void doSend(final MessageInstanceConsumer consumer, final MessageInstance entry, boolean batch)
     {
         // TODO
         ServerMessage serverMessage = entry.getMessage();
@@ -516,13 +516,13 @@ class ConsumerTarget_1_0 extends Abstrac
     }
 
     @Override
-    public void consumerAdded(final ConsumerImpl sub)
+    public void consumerAdded(final MessageInstanceConsumer consumer)
     {
-        _consumer = sub;
+        _consumer = consumer;
     }
 
     @Override
-    public void consumerRemoved(final ConsumerImpl sub)
+    public void consumerRemoved(final MessageInstanceConsumer consumer)
     {
         close();
     }

Modified: qpid/java/branches/transfer-queue/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/transfer-queue/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java?rev=1765973&r1=1765972&r2=1765973&view=diff
==============================================================================
--- qpid/java/branches/transfer-queue/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java (original)
+++ qpid/java/branches/transfer-queue/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java Fri Oct 21 09:32:07 2016
@@ -35,6 +35,9 @@ import java.util.concurrent.ConcurrentMa
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.qpid.server.consumer.ConsumerTarget;
+import org.apache.qpid.server.message.ConsumerOption;
+import org.apache.qpid.server.message.MessageInstanceConsumer;
 import org.apache.qpid.server.model.NamedAddressSpace;
 import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
 import org.apache.qpid.server.protocol.v1_0.type.Binary;
@@ -61,7 +64,6 @@ import org.apache.qpid.exchange.Exchange
 import org.apache.qpid.filter.SelectorParsingException;
 import org.apache.qpid.filter.selector.ParseException;
 import org.apache.qpid.filter.selector.TokenMgrError;
-import org.apache.qpid.server.consumer.ConsumerImpl;
 import org.apache.qpid.server.filter.FilterManager;
 import org.apache.qpid.server.filter.JMSSelectorFilter;
 import org.apache.qpid.server.message.MessageInstance;
@@ -84,7 +86,7 @@ public class SendingLink_1_0 implements
     private NamedAddressSpace _addressSpace;
     private SendingDestination _destination;
 
-    private ConsumerImpl _consumer;
+    private MessageInstanceConsumer _consumer;
     private ConsumerTarget_1_0 _target;
 
     private boolean _draining;
@@ -114,7 +116,7 @@ public class SendingLink_1_0 implements
         linkAttachment.setDeliveryStateHandler(this);
         QueueDestination qd = null;
 
-        EnumSet<ConsumerImpl.Option> options = EnumSet.noneOf(ConsumerImpl.Option.class);
+        EnumSet<ConsumerOption> options = EnumSet.noneOf(ConsumerOption.class);
 
 
         boolean noLocal = false;
@@ -170,8 +172,8 @@ public class SendingLink_1_0 implements
             _target = new ConsumerTarget_1_0(this, source.getDistributionMode() != StdDistMode.COPY);
             if(source.getDistributionMode() != StdDistMode.COPY)
             {
-                options.add(ConsumerImpl.Option.ACQUIRES);
-                options.add(ConsumerImpl.Option.SEES_REQUEUES);
+                options.add(ConsumerOption.ACQUIRES);
+                options.add(ConsumerOption.SEES_REQUEUES);
             }
 
         }
@@ -330,8 +332,8 @@ public class SendingLink_1_0 implements
 
 
             _target = new ConsumerTarget_1_0(this, true);
-            options.add(ConsumerImpl.Option.ACQUIRES);
-            options.add(ConsumerImpl.Option.SEES_REQUEUES);
+            options.add(ConsumerOption.ACQUIRES);
+            options.add(ConsumerOption.SEES_REQUEUES);
 
         }
         else
@@ -343,13 +345,13 @@ public class SendingLink_1_0 implements
         {
             if(noLocal)
             {
-                options.add(ConsumerImpl.Option.NO_LOCAL);
+                options.add(ConsumerOption.NO_LOCAL);
             }
 
             if(_durability == TerminusDurability.CONFIGURATION ||
                _durability == TerminusDurability.UNSETTLED_STATE )
             {
-                options.add(ConsumerImpl.Option.DURABLE);
+                options.add(ConsumerOption.DURABLE);
             }
 
             try
@@ -588,7 +590,7 @@ public class SendingLink_1_0 implements
     public synchronized void setLinkAttachment(SendingLinkAttachment linkAttachment)
     {
 
-        if(_consumer.isActive())
+        if(_target.getState() == ConsumerTarget.State.ACTIVE)
         {
             _target.suspend();
         }
@@ -694,7 +696,7 @@ public class SendingLink_1_0 implements
         return _addressSpace;
     }
 
-    public ConsumerImpl getConsumer()
+    public MessageInstanceConsumer getConsumer()
     {
         return _consumer;
     }

Modified: qpid/java/branches/transfer-queue/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/transfer-queue/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java?rev=1765973&r1=1765972&r2=1765973&view=diff
==============================================================================
--- qpid/java/branches/transfer-queue/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java (original)
+++ qpid/java/branches/transfer-queue/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java Fri Oct 21 09:32:07 2016
@@ -45,7 +45,24 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.qpid.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.server.connection.SessionPrincipal;
+import org.apache.qpid.server.consumer.ConsumerTarget;
+import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.message.MessageDestination;
+import org.apache.qpid.server.message.MessageInstanceConsumer;
+import org.apache.qpid.server.message.MessageSource;
+import org.apache.qpid.server.model.AbstractConfigurationChangeListener;
+import org.apache.qpid.server.model.ConfigurationChangeListener;
+import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.model.Consumer;
+import org.apache.qpid.server.model.Exchange;
 import org.apache.qpid.server.model.NamedAddressSpace;
+import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.model.Session;
+import org.apache.qpid.server.protocol.AMQSessionModel;
+import org.apache.qpid.server.protocol.ConsumerListener;
+import org.apache.qpid.server.protocol.LinkRegistry;
 import org.apache.qpid.server.protocol.v1_0.framing.OversizeFrameException;
 import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
 import org.apache.qpid.server.protocol.v1_0.type.Binary;
@@ -75,22 +92,6 @@ import org.apache.qpid.server.protocol.v
 import org.apache.qpid.server.protocol.v1_0.type.transport.LinkError;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
-import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.server.connection.SessionPrincipal;
-import org.apache.qpid.server.consumer.ConsumerImpl;
-import org.apache.qpid.server.consumer.ConsumerTarget;
-import org.apache.qpid.server.logging.LogSubject;
-import org.apache.qpid.server.message.MessageDestination;
-import org.apache.qpid.server.message.MessageSource;
-import org.apache.qpid.server.model.ConfigurationChangeListener;
-import org.apache.qpid.server.model.ConfiguredObject;
-import org.apache.qpid.server.model.Consumer;
-import org.apache.qpid.server.model.Exchange;
-import org.apache.qpid.server.model.Queue;
-import org.apache.qpid.server.model.Session;
-import org.apache.qpid.server.protocol.AMQSessionModel;
-import org.apache.qpid.server.protocol.ConsumerListener;
-import org.apache.qpid.server.protocol.LinkRegistry;
 import org.apache.qpid.server.security.SecurityToken;
 import org.apache.qpid.server.transport.AMQPConnection;
 import org.apache.qpid.server.txn.AutoCommitTransaction;
@@ -1042,7 +1043,7 @@ public class Session_1_0 implements AMQS
 
     private void registerConsumer(final SendingLink_1_0 link)
     {
-        ConsumerImpl consumer = link.getConsumer();
+        MessageInstanceConsumer consumer = link.getConsumer();
         if(consumer instanceof Consumer<?>)
         {
             Consumer<?> modelConsumer = (Consumer<?>) consumer;
@@ -1623,7 +1624,7 @@ public class Session_1_0 implements AMQS
         }
     }
 
-    private class ConsumerClosedListener implements ConfigurationChangeListener
+    private class ConsumerClosedListener extends AbstractConfigurationChangeListener
     {
         @Override
         public void stateChanged(final ConfiguredObject object, final org.apache.qpid.server.model.State oldState, final org.apache.qpid.server.model.State newState)
@@ -1633,39 +1634,6 @@ public class Session_1_0 implements AMQS
                 consumerRemoved((Consumer<?>)object);
             }
         }
-
-        @Override
-        public void childAdded(final ConfiguredObject object, final ConfiguredObject child)
-        {
-
-        }
-
-        @Override
-        public void childRemoved(final ConfiguredObject object, final ConfiguredObject child)
-        {
-
-        }
-
-        @Override
-        public void attributeSet(final ConfiguredObject object,
-                                 final String attributeName,
-                                 final Object oldAttributeValue,
-                                 final Object newAttributeValue)
-        {
-
-        }
-
-        @Override
-        public void bulkChangeStart(final ConfiguredObject<?> object)
-        {
-
-        }
-
-        @Override
-        public void bulkChangeEnd(final ConfiguredObject<?> object)
-        {
-
-        }
     }
 
     @Override

Modified: qpid/java/branches/transfer-queue/broker-plugins/logging-logback/src/main/java/org/apache/qpid/server/logging/logback/AbstractLogger.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/transfer-queue/broker-plugins/logging-logback/src/main/java/org/apache/qpid/server/logging/logback/AbstractLogger.java?rev=1765973&r1=1765972&r2=1765973&view=diff
==============================================================================
--- qpid/java/branches/transfer-queue/broker-plugins/logging-logback/src/main/java/org/apache/qpid/server/logging/logback/AbstractLogger.java (original)
+++ qpid/java/branches/transfer-queue/broker-plugins/logging-logback/src/main/java/org/apache/qpid/server/logging/logback/AbstractLogger.java Fri Oct 21 09:32:07 2016
@@ -36,9 +36,9 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.qpid.server.logging.LogInclusionRule;
+import org.apache.qpid.server.model.AbstractConfigurationChangeListener;
 import org.apache.qpid.server.model.AbstractConfiguredObject;
 import org.apache.qpid.server.model.BrokerLogInclusionRule;
-import org.apache.qpid.server.model.ConfigurationChangeListener;
 import org.apache.qpid.server.model.ConfiguredObject;
 import org.apache.qpid.server.model.ConfiguredObjectTypeRegistry;
 import org.apache.qpid.server.model.ManagedObject;
@@ -149,7 +149,7 @@ public abstract class AbstractLogger<X e
         }
     }
 
-    private class LogInclusionRuleListener implements ConfigurationChangeListener
+    private class LogInclusionRuleListener extends AbstractConfigurationChangeListener
     {
 
         @Override
@@ -170,28 +170,6 @@ public abstract class AbstractLogger<X e
             }
         }
 
-        @Override
-        public void stateChanged(final ConfiguredObject<?> object, final State oldState, final State newState)
-        {
-        }
-
-        @Override
-        public void attributeSet(final ConfiguredObject<?> object,
-                                 final String attributeName,
-                                 final Object oldAttributeValue,
-                                 final Object newAttributeValue)
-        {
-        }
-
-        @Override
-        public void bulkChangeStart(final ConfiguredObject<?> object)
-        {
-        }
-
-        @Override
-        public void bulkChangeEnd(final ConfiguredObject<?> object)
-        {
-        }
     }
 
     public static Map<String, Collection<String>> getSupportedVirtualHostLoggerChildTypes()

Modified: qpid/java/branches/transfer-queue/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementAddressSpace.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/transfer-queue/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementAddressSpace.java?rev=1765973&r1=1765972&r2=1765973&view=diff
==============================================================================
--- qpid/java/branches/transfer-queue/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementAddressSpace.java (original)
+++ qpid/java/branches/transfer-queue/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementAddressSpace.java Fri Oct 21 09:32:07 2016
@@ -28,9 +28,9 @@ import java.util.Map;
 import java.util.concurrent.CopyOnWriteArrayList;
 
 import org.apache.qpid.exchange.ExchangeDefaults;
+import org.apache.qpid.server.message.BaseMessageInstance;
 import org.apache.qpid.server.message.InstanceProperties;
 import org.apache.qpid.server.message.MessageDestination;
-import org.apache.qpid.server.message.MessageInstance;
 import org.apache.qpid.server.message.MessageSource;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.model.Broker;
@@ -222,7 +222,7 @@ public class ManagementAddressSpace impl
                                                                                      final String routingAddress,
                                                                                      final InstanceProperties instanceProperties,
                                                                                      final ServerTransaction txn,
-                                                                                     final Action<? super MessageInstance> postEnqueueAction)
+                                                                                     final Action<? super BaseMessageInstance> postEnqueueAction)
         {
             MessageDestination destination = getAttainedMessageDestination(routingAddress);
             if(destination == null || destination == this)

Modified: qpid/java/branches/transfer-queue/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/transfer-queue/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java?rev=1765973&r1=1765972&r2=1765973&view=diff
==============================================================================
--- qpid/java/branches/transfer-queue/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java (original)
+++ qpid/java/branches/transfer-queue/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java Fri Oct 21 09:32:07 2016
@@ -41,19 +41,22 @@ import java.util.concurrent.CopyOnWriteA
 import javax.security.auth.Subject;
 
 import org.apache.qpid.server.connection.SessionPrincipal;
-import org.apache.qpid.server.consumer.ConsumerImpl;
 import org.apache.qpid.server.consumer.ConsumerTarget;
 import org.apache.qpid.server.filter.FilterManager;
 import org.apache.qpid.server.filter.Filterable;
 import org.apache.qpid.server.message.AMQMessageHeader;
+import org.apache.qpid.server.message.AcquiringMessageInstanceConsumer;
+import org.apache.qpid.server.message.BaseMessageInstance;
+import org.apache.qpid.server.message.ConsumerOption;
 import org.apache.qpid.server.message.InstanceProperties;
 import org.apache.qpid.server.message.MessageDestination;
 import org.apache.qpid.server.message.MessageInstance;
+import org.apache.qpid.server.message.MessageInstanceConsumer;
 import org.apache.qpid.server.message.MessageSource;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.message.internal.InternalMessage;
 import org.apache.qpid.server.message.internal.InternalMessageHeader;
-import org.apache.qpid.server.model.ConfigurationChangeListener;
+import org.apache.qpid.server.model.AbstractConfigurationChangeListener;
 import org.apache.qpid.server.model.ConfiguredObject;
 import org.apache.qpid.server.model.ManagedObject;
 import org.apache.qpid.server.model.NamedAddressSpace;
@@ -70,7 +73,7 @@ import org.apache.qpid.server.txn.Server
 import org.apache.qpid.server.util.Action;
 import org.apache.qpid.server.util.StateChangeListener;
 
-class ManagementNode implements MessageSource, MessageDestination
+class ManagementNode implements MessageSource<ManagementNodeConsumer>, MessageDestination
 {
 
     public static final String NAME_ATTRIBUTE = "name";
@@ -262,7 +265,7 @@ class ManagementNode implements MessageS
                                                                                   final String routingAddress,
                                                                                   final InstanceProperties instanceProperties,
                                                                                   final ServerTransaction txn,
-                                                                                  final Action<? super MessageInstance> postEnqueueAction)
+                                                                                  final Action<? super BaseMessageInstance> postEnqueueAction)
     {
 
         @SuppressWarnings("unchecked")
@@ -981,7 +984,7 @@ class ManagementNode implements MessageS
                                                             final FilterManager filters,
                                                             final Class<? extends ServerMessage> messageClass,
                                                             final String consumerName,
-                                                            final EnumSet<ConsumerImpl.Option> options,
+                                                            final EnumSet<ConsumerOption> options,
                                                             final Integer priority)
     {
 
@@ -1084,7 +1087,7 @@ class ManagementNode implements MessageS
         }
 
         @Override
-        public ConsumerImpl getAcquiringConsumer()
+        public MessageInstanceConsumer getAcquiringConsumer()
         {
             return null;
         }
@@ -1096,13 +1099,13 @@ class ManagementNode implements MessageS
         }
 
         @Override
-        public boolean isAcquiredBy(final ConsumerImpl consumer)
+        public boolean isAcquiredBy(final MessageInstanceConsumer consumer)
         {
             return false;
         }
 
         @Override
-        public boolean removeAcquisitionFromConsumer(final ConsumerImpl consumer)
+        public boolean removeAcquisitionFromConsumer(final MessageInstanceConsumer consumer)
         {
             return false;
         }
@@ -1120,7 +1123,7 @@ class ManagementNode implements MessageS
         }
 
         @Override
-        public ConsumerImpl getDeliveredConsumer()
+        public AcquiringMessageInstanceConsumer<?,?> getDeliveredConsumer()
         {
             return null;
         }
@@ -1132,7 +1135,7 @@ class ManagementNode implements MessageS
         }
 
         @Override
-        public boolean isRejectedBy(final ConsumerImpl consumer)
+        public boolean isRejectedBy(final MessageInstanceConsumer consumer)
         {
             return false;
         }
@@ -1150,13 +1153,13 @@ class ManagementNode implements MessageS
         }
 
         @Override
-        public boolean acquire(final ConsumerImpl sub)
+        public boolean acquire(final MessageInstanceConsumer sub)
         {
             return false;
         }
 
         @Override
-        public boolean makeAcquisitionUnstealable(final ConsumerImpl consumer)
+        public boolean makeAcquisitionUnstealable(final MessageInstanceConsumer consumer)
         {
             return false;
         }
@@ -1174,7 +1177,7 @@ class ManagementNode implements MessageS
         }
 
         @Override
-        public int routeToAlternate(final Action<? super MessageInstance> action,
+        public int routeToAlternate(final Action<? super BaseMessageInstance> action,
                                     final ServerTransaction txn)
         {
             return 0;
@@ -1212,7 +1215,7 @@ class ManagementNode implements MessageS
         }
 
         @Override
-        public void release(final ConsumerImpl release)
+        public void release(final MessageInstanceConsumer release)
         {
 
         }
@@ -1260,7 +1263,7 @@ class ManagementNode implements MessageS
         }
     }
 
-    private class ModelObjectListener implements ConfigurationChangeListener
+    private class ModelObjectListener extends AbstractConfigurationChangeListener
     {
         @Override
         public void stateChanged(final ConfiguredObject object, final State oldState, final State newState)
@@ -1313,27 +1316,6 @@ class ManagementNode implements MessageS
                 _entities.get(entityType).remove(child.getName());
             }
         }
-
-        @Override
-        public void attributeSet(final ConfiguredObject object,
-                                 final String attributeName,
-                                 final Object oldAttributeValue,
-                                 final Object newAttributeValue)
-        {
-
-        }
-
-        @Override
-        public void bulkChangeStart(final ConfiguredObject<?> object)
-        {
-
-        }
-
-        @Override
-        public void bulkChangeEnd(final ConfiguredObject<?> object)
-        {
-
-        }
     }
 
     private static class MutableMessageHeader implements AMQMessageHeader

Modified: qpid/java/branches/transfer-queue/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/transfer-queue/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java?rev=1765973&r1=1765972&r2=1765973&view=diff
==============================================================================
--- qpid/java/branches/transfer-queue/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java (original)
+++ qpid/java/branches/transfer-queue/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java Fri Oct 21 09:32:07 2016
@@ -24,12 +24,11 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 
-import org.apache.qpid.server.consumer.ConsumerImpl;
 import org.apache.qpid.server.consumer.ConsumerTarget;
+import org.apache.qpid.server.message.BaseMessageInstance;
 import org.apache.qpid.server.message.InstanceProperties;
 import org.apache.qpid.server.message.MessageDestination;
-import org.apache.qpid.server.message.MessageInstance;
-import org.apache.qpid.server.message.MessageSource;
+import org.apache.qpid.server.message.MessageInstanceConsumer;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.message.internal.InternalMessage;
 import org.apache.qpid.server.protocol.AMQSessionModel;
@@ -38,15 +37,14 @@ import org.apache.qpid.server.txn.Server
 import org.apache.qpid.server.util.Action;
 import org.apache.qpid.server.util.StateChangeListener;
 
-class ManagementNodeConsumer implements ConsumerImpl, MessageDestination
+class ManagementNodeConsumer implements MessageInstanceConsumer, MessageDestination
 {
-    private final long _id = ConsumerImpl.CONSUMER_NUMBER_GENERATOR.getAndIncrement();
     private final ManagementNode _managementNode;
     private final List<ManagementResponse> _queue = Collections.synchronizedList(new ArrayList<ManagementResponse>());
     private final ConsumerTarget _target;
     private final String _name;
     private final StateChangeListener<ConsumerTarget, ConsumerTarget.State> _targetChangeListener = new TargetChangeListener();
-
+    private final Object _identifier = new Object();
 
     public ManagementNodeConsumer(final String consumerName, final ManagementNode managementNode, ConsumerTarget target)
     {
@@ -63,52 +61,16 @@ class ManagementNodeConsumer implements
     }
 
     @Override
-    public long getBytesOut()
-    {
-        return 0;
-    }
-
-    @Override
-    public long getMessagesOut()
-    {
-        return 0;
-    }
-
-    @Override
-    public long getUnacknowledgedBytes()
-    {
-        return 0;
-    }
-
-    @Override
-    public long getUnacknowledgedMessages()
+    public Object getIdentifier()
     {
-        return 0;
+        return _identifier;
     }
 
-    @Override
     public AMQSessionModel getSessionModel()
     {
         return _target.getSessionModel();
     }
 
-    @Override
-    public MessageSource getMessageSource()
-    {
-        return _managementNode;
-    }
-
-    @Override
-    public long getConsumerNumber()
-    {
-        return _id;
-    }
-
-    @Override
-    public boolean isSuspended()
-    {
-        return false;
-    }
 
     @Override
     public boolean isClosed()
@@ -123,42 +85,11 @@ class ManagementNodeConsumer implements
     }
 
     @Override
-    public boolean seesRequeues()
-    {
-        return false;
-    }
-
-    @Override
     public void close()
     {
     }
 
     @Override
-    public boolean trySendLock()
-    {
-        return _target.trySendLock();
-    }
-
-    @Override
-    public void getSendLock()
-    {
-        _target.getSendLock();
-    }
-
-    @Override
-    public void releaseSendLock()
-    {
-        _target.releaseSendLock();
-    }
-
-
-    @Override
-    public boolean isActive()
-    {
-        return false;
-    }
-
-    @Override
     public String getName()
     {
         return _name;
@@ -169,7 +100,7 @@ class ManagementNodeConsumer implements
                                                                                  final String routingAddress,
                                                                                  final InstanceProperties instanceProperties,
                                                                                  final ServerTransaction txn,
-                                                                                 final Action<? super MessageInstance> postEnqueueAction)
+                                                                                 final Action<? super BaseMessageInstance> postEnqueueAction)
     {
         send((InternalMessage)message);
         return 1;
@@ -181,7 +112,6 @@ class ManagementNodeConsumer implements
 
     }
 
-    @Override
     public ConsumerTarget getTarget()
     {
         return _target;

Modified: qpid/java/branches/transfer-queue/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/transfer-queue/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java?rev=1765973&r1=1765972&r2=1765973&view=diff
==============================================================================
--- qpid/java/branches/transfer-queue/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java (original)
+++ qpid/java/branches/transfer-queue/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java Fri Oct 21 09:32:07 2016
@@ -20,10 +20,11 @@
  */
 package org.apache.qpid.server.management.amqp;
 
-import org.apache.qpid.server.consumer.ConsumerImpl;
 import org.apache.qpid.server.filter.Filterable;
+import org.apache.qpid.server.message.BaseMessageInstance;
 import org.apache.qpid.server.message.InstanceProperties;
 import org.apache.qpid.server.message.MessageInstance;
+import org.apache.qpid.server.message.MessageInstanceConsumer;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.message.internal.InternalMessage;
 import org.apache.qpid.server.store.MessageEnqueueRecord;
@@ -85,7 +86,7 @@ class ManagementResponse implements Mess
     }
 
     @Override
-    public ConsumerImpl getAcquiringConsumer()
+    public MessageInstanceConsumer getAcquiringConsumer()
     {
         return _consumer;
     }
@@ -97,13 +98,13 @@ class ManagementResponse implements Mess
     }
 
     @Override
-    public boolean isAcquiredBy(final ConsumerImpl consumer)
+    public boolean isAcquiredBy(final MessageInstanceConsumer consumer)
     {
         return consumer == _consumer && !isDeleted();
     }
 
     @Override
-    public boolean removeAcquisitionFromConsumer(final ConsumerImpl consumer)
+    public boolean removeAcquisitionFromConsumer(final MessageInstanceConsumer consumer)
     {
         return consumer == _consumer;
     }
@@ -133,7 +134,7 @@ class ManagementResponse implements Mess
     }
 
     @Override
-    public boolean isRejectedBy(final ConsumerImpl consumer)
+    public boolean isRejectedBy(final MessageInstanceConsumer consumer)
     {
         return false;
     }
@@ -151,13 +152,13 @@ class ManagementResponse implements Mess
     }
 
     @Override
-    public boolean acquire(final ConsumerImpl sub)
+    public boolean acquire(final MessageInstanceConsumer sub)
     {
         return false;
     }
 
     @Override
-    public boolean makeAcquisitionUnstealable(final ConsumerImpl consumer)
+    public boolean makeAcquisitionUnstealable(final MessageInstanceConsumer consumer)
     {
         return false;
     }
@@ -175,7 +176,7 @@ class ManagementResponse implements Mess
     }
 
     @Override
-    public int routeToAlternate(final Action<? super MessageInstance> action,
+    public int routeToAlternate(final Action<? super BaseMessageInstance> action,
                                 final ServerTransaction txn)
     {
         return 0;
@@ -213,7 +214,7 @@ class ManagementResponse implements Mess
     }
 
     @Override
-    public void release(final ConsumerImpl release)
+    public void release(final MessageInstanceConsumer release)
     {
         release();
     }

Modified: qpid/java/branches/transfer-queue/client/src/main/java/org/apache/qpid/client/url/URLParser.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/transfer-queue/client/src/main/java/org/apache/qpid/client/url/URLParser.java?rev=1765973&r1=1765972&r2=1765973&view=diff
==============================================================================
--- qpid/java/branches/transfer-queue/client/src/main/java/org/apache/qpid/client/url/URLParser.java (original)
+++ qpid/java/branches/transfer-queue/client/src/main/java/org/apache/qpid/client/url/URLParser.java Fri Oct 21 09:32:07 2016
@@ -21,16 +21,19 @@
 package org.apache.qpid.client.url;
 
 
-import org.apache.qpid.client.BrokerDetails;
+import java.io.UnsupportedEncodingException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URLDecoder;
+import java.nio.charset.StandardCharsets;
+import java.util.StringTokenizer;
+
 import org.apache.qpid.client.AMQConnectionFactory;
 import org.apache.qpid.client.AMQConnectionURL;
+import org.apache.qpid.client.BrokerDetails;
 import org.apache.qpid.url.URLHelper;
 import org.apache.qpid.url.URLSyntaxException;
 
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.StringTokenizer;
-
 public class URLParser
 {
     private AMQConnectionURL _url;
@@ -82,12 +85,12 @@ public class URLParser
                 _url.setClientName(connection.getHost());
             }
             
-            String userInfo = connection.getUserInfo();
+            String userInfo = connection.getRawUserInfo();
 
             if (userInfo == null)
             {
-                // Fix for Java 1.5 which doesn't parse UserInfo for non http URIs
-                userInfo = connection.getAuthority();
+                // Fix for Java Environments which don't parse UserInfo for non http URIs
+                userInfo = connection.getRawAuthority();
 
                 if (userInfo != null)
                 {
@@ -178,8 +181,16 @@ public class URLParser
         }
         else
         {
-            _url.setUsername(userinfo.substring(0, colonIndex));
-            _url.setPassword(userinfo.substring(colonIndex + 1));
+            try
+            {
+                _url.setUsername(URLDecoder.decode(userinfo.substring(0, colonIndex), StandardCharsets.UTF_8.name()));
+                _url.setPassword(URLDecoder.decode(userinfo.substring(colonIndex + 1), StandardCharsets.UTF_8.name()));
+            }
+            catch (UnsupportedEncodingException e)
+            {
+                throw URLHelper.parseError(AMQConnectionURL.AMQ_PROTOCOL.length() + 3, userinfo.length(),
+                                           e.getLocalizedMessage(), _url.getURL());
+            }
         }
 
     }

Modified: qpid/java/branches/transfer-queue/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/transfer-queue/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java?rev=1765973&r1=1765972&r2=1765973&view=diff
==============================================================================
--- qpid/java/branches/transfer-queue/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java (original)
+++ qpid/java/branches/transfer-queue/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java Fri Oct 21 09:32:07 2016
@@ -34,6 +34,26 @@ import org.apache.qpid.url.URLSyntaxExce
 
 public class ConnectionURLTest extends QpidTestCase
 {
+
+    public void testPasswordWithColon() throws URLSyntaxException
+    {
+        String url = "amqp://PQ-RST-UV-W:VPN%3amwrst@/test?brokerlist='tcp://localhost:5672'";
+        ConnectionURL connectionurl = new AMQConnectionURL(url);
+        assertEquals("PQ-RST-UV-W", connectionurl.getUsername());
+        assertEquals("VPN:mwrst", connectionurl.getPassword());
+        assertEquals("/test",connectionurl.getVirtualHost());
+    }
+
+    public void testUsernameWithColon() throws URLSyntaxException
+    {
+        String url = "amqp://PQ%3aRST-UV-W:VPN%3amwrst@/test?brokerlist='tcp://localhost:5672'";
+        ConnectionURL connectionurl = new AMQConnectionURL(url);
+        assertEquals("PQ:RST-UV-W", connectionurl.getUsername());
+        assertEquals("VPN:mwrst", connectionurl.getPassword());
+        assertEquals("/test",connectionurl.getVirtualHost());
+    }
+
+
     public void testFailoverURL() throws URLSyntaxException
     {
         String url = "amqp://ritchiem:bob@/test?brokerlist='tcp://localhost:5672;tcp://fancyserver:3000/',failover='roundrobin?cyclecount='100''";

Modified: qpid/java/branches/transfer-queue/common/src/main/java/org/apache/qpid/framing/FieldTable.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/transfer-queue/common/src/main/java/org/apache/qpid/framing/FieldTable.java?rev=1765973&r1=1765972&r2=1765973&view=diff
==============================================================================
--- qpid/java/branches/transfer-queue/common/src/main/java/org/apache/qpid/framing/FieldTable.java (original)
+++ qpid/java/branches/transfer-queue/common/src/main/java/org/apache/qpid/framing/FieldTable.java Fri Oct 21 09:32:07 2016
@@ -414,6 +414,28 @@ public class FieldTable
         }
     }
 
+    public FieldArray getFieldArray(String string)
+    {
+        return getFieldArray(AMQShortString.valueOf(string));
+    }
+
+    public FieldArray getFieldArray(AMQShortString string)
+    {
+        AMQTypedValue value = getProperty(string);
+
+        if ((value != null) && (value.getType() == AMQType.FIELD_ARRAY))
+        {
+            return (FieldArray) value.getValue();
+        }
+        else
+        {
+            return null;
+        }
+    }
+
+
+
+
     public Object getObject(String string)
     {
         return getObject(AMQShortString.valueOf(string));

Modified: qpid/java/branches/transfer-queue/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/transfer-queue/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java?rev=1765973&r1=1765972&r2=1765973&view=diff
==============================================================================
--- qpid/java/branches/transfer-queue/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java (original)
+++ qpid/java/branches/transfer-queue/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java Fri Oct 21 09:32:07 2016
@@ -95,4 +95,10 @@ public class HeartbeatBody implements AM
         }
         processor.receiveHeartbeat();
     }
+
+    @Override
+    public String toString()
+    {
+        return "[HeartbeatBody]";
+    }
 }



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message