qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rgodf...@apache.org
Subject svn commit: r1631275 [3/3] - in /qpid/branches/QPID-6125-ProtocolRefactoring/java: broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ client/src/main/java/org/apache/qpid/client/protocol/ common/src/main/java/org/apach...
Date Mon, 13 Oct 2014 00:58:47 GMT
Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/FrameCreatingMethodProcessor.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/FrameCreatingMethodProcessor.java?rev=1631275&r1=1631274&r2=1631275&view=diff
==============================================================================
--- qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/FrameCreatingMethodProcessor.java (original)
+++ qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/FrameCreatingMethodProcessor.java Mon Oct 13 00:58:45 2014
@@ -23,7 +23,9 @@ package org.apache.qpid.framing;
 import java.util.ArrayList;
 import java.util.List;
 
-public class FrameCreatingMethodProcessor implements MethodProcessor
+public class FrameCreatingMethodProcessor implements MethodProcessor<FrameCreatingMethodProcessor.ClientAndServerChannelMethodProcessor>,
+                                                     ClientMethodProcessor<FrameCreatingMethodProcessor.ClientAndServerChannelMethodProcessor>,
+                                                     ServerMethodProcessor<FrameCreatingMethodProcessor.ClientAndServerChannelMethodProcessor>
 {
     private ProtocolVersion _protocolVersion;
     
@@ -61,42 +63,6 @@ public class FrameCreatingMethodProcesso
     }
 
     @Override
-    public void receiveTxSelect(final int channelId)
-    {
-        _processedMethods.add(new AMQFrame(channelId, TxSelectBody.INSTANCE));
-    }
-
-    @Override
-    public void receiveTxSelectOk(final int channelId)
-    {
-        _processedMethods.add(new AMQFrame(channelId, TxSelectOkBody.INSTANCE));
-    }
-
-    @Override
-    public void receiveTxCommit(final int channelId)
-    {
-        _processedMethods.add(new AMQFrame(channelId, TxCommitBody.INSTANCE));
-    }
-
-    @Override
-    public void receiveTxCommitOk(final int channelId)
-    {
-        _processedMethods.add(new AMQFrame(channelId, TxCommitOkBody.INSTANCE));
-    }
-
-    @Override
-    public void receiveTxRollback(final int channelId)
-    {
-        _processedMethods.add(new AMQFrame(channelId, TxRollbackBody.INSTANCE));
-    }
-
-    @Override
-    public void receiveTxRollbackOk(final int channelId)
-    {
-        _processedMethods.add(new AMQFrame(channelId, TxRollbackOkBody.INSTANCE));
-    }
-
-    @Override
     public void receiveConnectionSecure(final byte[] challenge)
     {
         _processedMethods.add(new AMQFrame(0, new ConnectionSecureBody(challenge)));
@@ -163,382 +129,483 @@ public class FrameCreatingMethodProcesso
         _processedMethods.add(new AMQFrame(channelId, new ChannelOpenBody()));
     }
 
-    @Override
-    public void receiveChannelOpenOk(final int channelId)
+    private void receiveExchangeBoundOk(final int channelId, final int replyCode, final AMQShortString replyText)
     {
-        _processedMethods.add(new AMQFrame(channelId, ProtocolVersion.v8_0.equals(getProtocolVersion())
-                ? ChannelOpenOkBody.INSTANCE_0_8
-                : ChannelOpenOkBody.INSTANCE_0_9));
+        _processedMethods.add(new AMQFrame(channelId, new ExchangeBoundOkBody(replyCode, replyText)));
     }
 
     @Override
-    public void receiveChannelFlow(final int channelId, final boolean active)
+    public void receiveHeartbeat()
     {
-        _processedMethods.add(new AMQFrame(channelId, new ChannelFlowBody(active)));
+        _processedMethods.add(new AMQFrame(0, new HeartbeatBody()));
     }
 
     @Override
-    public void receiveChannelFlowOk(final int channelId, final boolean active)
+    public ProtocolVersion getProtocolVersion()
     {
-        _processedMethods.add(new AMQFrame(channelId, new ChannelFlowOkBody(active)));
+        return _protocolVersion;
     }
 
     @Override
-    public void receiveChannelAlert(final int channelId,
-                                    final int replyCode,
-                                    final AMQShortString replyText,
-                                    final FieldTable details)
+    public ClientAndServerChannelMethodProcessor getChannelMethodProcessor(final int channelId)
     {
-        _processedMethods.add(new AMQFrame(channelId, new ChannelAlertBody(replyCode, replyText, details)));
+        return new FrameCreatingChannelMethodProcessor(channelId);
     }
 
-    @Override
-    public void receiveChannelClose(final int channelId,
-                                    final int replyCode,
-                                    final AMQShortString replyText,
-                                    final int classId,
-                                    final int methodId)
+    public void setProtocolVersion(final ProtocolVersion protocolVersion)
     {
-        _processedMethods.add(new AMQFrame(channelId, new ChannelCloseBody(replyCode, replyText, classId, methodId)));
+        _protocolVersion = protocolVersion;
     }
 
     @Override
-    public void receiveChannelCloseOk(final int channelId)
+    public void receiveProtocolHeader(final ProtocolInitiation protocolInitiation)
     {
-        _processedMethods.add(new AMQFrame(channelId, ChannelCloseOkBody.INSTANCE));
+        _processedMethods.add(protocolInitiation);
     }
 
     @Override
-    public void receiveAccessRequest(final int channelId,
-                                     final AMQShortString realm,
-                                     final boolean exclusive,
-                                     final boolean passive,
-                                     final boolean active,
-                                     final boolean write,
-                                     final boolean read)
+    public void setCurrentMethod(final int classId, final int methodId)
     {
-        _processedMethods.add(new AMQFrame(channelId, new AccessRequestBody(realm, exclusive, passive, active, write, read)));
+        _classId = classId;
+        _methodId = methodId;
     }
 
     @Override
-    public void receiveAccessRequestOk(final int channelId, final int ticket)
+    public boolean ignoreAllButCloseOk()
     {
-        _processedMethods.add(new AMQFrame(channelId, new AccessRequestOkBody(ticket)));
+        return false;
     }
 
-    @Override
-    public void receiveExchangeDeclare(final int channelId,
-                                       final AMQShortString exchange,
-                                       final AMQShortString type,
-                                       final boolean passive,
-                                       final boolean durable,
-                                       final boolean autoDelete,
-                                       final boolean internal,
-                                       final boolean nowait, final FieldTable arguments)
+    public int getClassId()
     {
-        _processedMethods.add(new AMQFrame(channelId, new ExchangeDeclareBody(0, exchange, type, passive, durable, autoDelete, internal, nowait, arguments)));
+        return _classId;
     }
 
-    @Override
-    public void receiveExchangeDeclareOk(final int channelId)
+    public int getMethodId()
     {
-        _processedMethods.add(new AMQFrame(channelId, new ExchangeDeclareOkBody()));
+        return _methodId;
     }
 
-    @Override
-    public void receiveExchangeDelete(final int channelId,
-                                      final AMQShortString exchange,
-                                      final boolean ifUnused,
-                                      final boolean nowait)
+    public static interface ClientAndServerChannelMethodProcessor extends ServerChannelMethodProcessor, ClientChannelMethodProcessor
     {
-        _processedMethods.add(new AMQFrame(channelId, new ExchangeDeleteBody(0, exchange, ifUnused, nowait)));
-    }
 
-    @Override
-    public void receiveExchangeDeleteOk(final int channelId)
-    {
-        _processedMethods.add(new AMQFrame(channelId, new ExchangeDeleteOkBody()));
     }
 
-    @Override
-    public void receiveExchangeBound(final int channelId,
-                                     final AMQShortString exchange,
-                                     final AMQShortString routingKey,
-                                     final AMQShortString queue)
+    private class FrameCreatingChannelMethodProcessor implements ClientAndServerChannelMethodProcessor
     {
-        _processedMethods.add(new AMQFrame(channelId, new ExchangeBoundBody(exchange, routingKey, queue)));
-    }
+        private final int _channelId;
 
-    @Override
-    public void receiveExchangeBoundOk(final int channelId, final int replyCode, final AMQShortString replyText)
-    {
-        _processedMethods.add(new AMQFrame(channelId, new ExchangeBoundOkBody(replyCode, replyText)));
-    }
+        private FrameCreatingChannelMethodProcessor(final int channelId)
+        {
+            _channelId = channelId;
+        }
 
-    @Override
-    public void receiveQueueBindOk(final int channelId)
-    {
-        _processedMethods.add(new AMQFrame(channelId, new QueueBindOkBody()));
-    }
 
-    @Override
-    public void receiveQueueUnbindOk(final int channelId)
-    {
-        _processedMethods.add(new AMQFrame(channelId, new QueueUnbindOkBody()));
-    }
+        @Override
+        public void receiveChannelOpenOk()
+        {
+            _processedMethods.add(new AMQFrame(_channelId, ProtocolVersion.v8_0.equals(getProtocolVersion())
+                    ? ChannelOpenOkBody.INSTANCE_0_8
+                    : ChannelOpenOkBody.INSTANCE_0_9));
+        }
 
-    @Override
-    public void receiveQueueDeclare(final int channelId,
-                                    final AMQShortString queue,
-                                    final boolean passive,
-                                    final boolean durable,
-                                    final boolean exclusive,
-                                    final boolean autoDelete,
-                                    final boolean nowait,
-                                    final FieldTable arguments)
-    {
-        _processedMethods.add(new AMQFrame(channelId, new QueueDeclareBody(0, queue, passive, durable, exclusive, autoDelete, nowait, arguments)));
-    }
+        @Override
+        public void receiveChannelAlert(final int replyCode, final AMQShortString replyText, final FieldTable details)
+        {
+            _processedMethods.add(new AMQFrame(_channelId, new ChannelAlertBody(replyCode, replyText, details)));
+        }
 
-    @Override
-    public void receiveQueueDeclareOk(final int channelId,
-                                      final AMQShortString queue,
-                                      final long messageCount,
-                                      final long consumerCount)
-    {
-        _processedMethods.add(new AMQFrame(channelId, new QueueDeclareOkBody(queue, messageCount, consumerCount)));
-    }
+        @Override
+        public void receiveAccessRequestOk(final int ticket)
+        {
+            _processedMethods.add(new AMQFrame(_channelId, new AccessRequestOkBody(ticket)));
+        }
 
-    @Override
-    public void receiveQueueBind(final int channelId,
-                                 final AMQShortString queue,
-                                 final AMQShortString exchange,
-                                 final AMQShortString bindingKey,
-                                 final boolean nowait,
-                                 final FieldTable arguments)
-    {
-        _processedMethods.add(new AMQFrame(channelId, new QueueBindBody(0, queue, exchange, bindingKey, nowait, arguments)));
-    }
+        @Override
+        public void receiveExchangeDeclareOk()
+        {
+            _processedMethods.add(new AMQFrame(_channelId, new ExchangeDeclareOkBody()));
+        }
 
-    @Override
-    public void receiveQueuePurge(final int channelId, final AMQShortString queue, final boolean nowait)
-    {
-        _processedMethods.add(new AMQFrame(channelId, new QueuePurgeBody(0, queue, nowait)));
-    }
+        @Override
+        public void receiveExchangeDeleteOk()
+        {
+            _processedMethods.add(new AMQFrame(_channelId, new ExchangeDeleteOkBody()));
+        }
 
-    @Override
-    public void receiveQueuePurgeOk(final int channelId, final long messageCount)
-    {
-        _processedMethods.add(new AMQFrame(channelId, new QueuePurgeOkBody(messageCount)));
-    }
+        @Override
+        public void receiveExchangeBoundOk(final int replyCode, final AMQShortString replyText)
+        {
+            FrameCreatingMethodProcessor.this.receiveExchangeBoundOk(_channelId, replyCode, replyText);
+        }
 
-    @Override
-    public void receiveQueueDelete(final int channelId,
-                                   final AMQShortString queue,
-                                   final boolean ifUnused,
-                                   final boolean ifEmpty,
-                                   final boolean nowait)
-    {
-        _processedMethods.add(new AMQFrame(channelId, new QueueDeleteBody(0, queue, ifUnused, ifEmpty, nowait)));
-    }
+        @Override
+        public void receiveQueueBindOk()
+        {
+            _processedMethods.add(new AMQFrame(_channelId, new QueueBindOkBody()));
+        }
 
-    @Override
-    public void receiveQueueDeleteOk(final int channelId, final long messageCount)
-    {
-        _processedMethods.add(new AMQFrame(channelId, new QueueDeleteOkBody(messageCount)));
-    }
+        @Override
+        public void receiveQueueUnbindOk()
+        {
+            _processedMethods.add(new AMQFrame(_channelId, new QueueUnbindOkBody()));
+        }
 
-    @Override
-    public void receiveQueueUnbind(final int channelId,
-                                   final AMQShortString queue,
-                                   final AMQShortString exchange,
-                                   final AMQShortString bindingKey,
-                                   final FieldTable arguments)
-    {
-        _processedMethods.add(new AMQFrame(channelId, new QueueUnbindBody(0, queue, exchange, bindingKey, arguments)));
-    }
+        @Override
+        public void receiveQueueDeclareOk(final AMQShortString queue, final long messageCount, final long consumerCount)
+        {
+            _processedMethods.add(new AMQFrame(_channelId, new QueueDeclareOkBody(queue, messageCount, consumerCount)));
+        }
 
-    @Override
-    public void receiveBasicRecoverSyncOk(final int channelId)
-    {
-        _processedMethods.add(new AMQFrame(channelId, new BasicRecoverSyncOkBody(getProtocolVersion())));
-    }
+        @Override
+        public void receiveQueuePurgeOk(final long messageCount)
+        {
+            _processedMethods.add(new AMQFrame(_channelId, new QueuePurgeOkBody(messageCount)));
+        }
 
-    @Override
-    public void receiveBasicRecover(final int channelId, final boolean requeue, final boolean sync)
-    {
-        if(ProtocolVersion.v8_0.equals(getProtocolVersion()) || !sync)
+        @Override
+        public void receiveQueueDeleteOk(final long messageCount)
         {
-            _processedMethods.add(new AMQFrame(channelId, new BasicRecoverBody(requeue)));
+            _processedMethods.add(new AMQFrame(_channelId, new QueueDeleteOkBody(messageCount)));
         }
-        else
+
+        @Override
+        public void receiveBasicRecoverSyncOk()
         {
-            _processedMethods.add(new AMQFrame(channelId, new BasicRecoverSyncBody(getProtocolVersion(), requeue)));
+            _processedMethods.add(new AMQFrame(_channelId, new BasicRecoverSyncOkBody(getProtocolVersion())));
         }
-    }
 
-    @Override
-    public void receiveBasicQos(final int channelId,
-                                final long prefetchSize,
-                                final int prefetchCount,
-                                final boolean global)
-    {
-        _processedMethods.add(new AMQFrame(channelId, new BasicQosBody(prefetchSize, prefetchCount, global)));
-    }
+        @Override
+        public void receiveBasicQosOk()
+        {
+            _processedMethods.add(new AMQFrame(_channelId, new BasicQosOkBody()));
+        }
 
-    @Override
-    public void receiveBasicQosOk(final int channelId)
-    {
-        _processedMethods.add(new AMQFrame(channelId, new BasicQosOkBody()));
-    }
+        @Override
+        public void receiveBasicConsumeOk(final AMQShortString consumerTag)
+        {
+            _processedMethods.add(new AMQFrame(_channelId, new BasicConsumeOkBody(consumerTag)));
+        }
 
-    @Override
-    public void receiveBasicConsume(final int channelId,
-                                    final AMQShortString queue,
-                                    final AMQShortString consumerTag,
-                                    final boolean noLocal,
-                                    final boolean noAck,
-                                    final boolean exclusive,
-                                    final boolean nowait,
-                                    final FieldTable arguments)
-    {
-        _processedMethods.add(new AMQFrame(channelId, new BasicConsumeBody(0, queue, consumerTag, noLocal, noAck, exclusive, nowait, arguments)));
-    }
+        @Override
+        public void receiveBasicCancelOk(final AMQShortString consumerTag)
+        {
+            _processedMethods.add(new AMQFrame(_channelId, new BasicCancelOkBody(consumerTag)));
+        }
 
-    @Override
-    public void receiveBasicConsumeOk(final int channelId, final AMQShortString consumerTag)
-    {
-        _processedMethods.add(new AMQFrame(channelId, new BasicConsumeOkBody(consumerTag)));
-    }
+        @Override
+        public void receiveBasicReturn(final int replyCode,
+                                       final AMQShortString replyText,
+                                       final AMQShortString exchange,
+                                       final AMQShortString routingKey)
+        {
+            _processedMethods.add(new AMQFrame(_channelId, new BasicReturnBody(replyCode,
+                                                                               replyText,
+                                                                               exchange,
+                                                                               routingKey)));
+        }
 
-    @Override
-    public void receiveBasicCancel(final int channelId, final AMQShortString consumerTag, final boolean noWait)
-    {
-        _processedMethods.add(new AMQFrame(channelId, new BasicCancelBody(consumerTag, noWait)));
-    }
+        @Override
+        public void receiveBasicDeliver(final AMQShortString consumerTag,
+                                        final long deliveryTag,
+                                        final boolean redelivered,
+                                        final AMQShortString exchange,
+                                        final AMQShortString routingKey)
+        {
+            _processedMethods.add(new AMQFrame(_channelId, new BasicDeliverBody(consumerTag,
+                                                                                deliveryTag,
+                                                                                redelivered,
+                                                                                exchange,
+                                                                                routingKey)));
+        }
 
-    @Override
-    public void receiveBasicCancelOk(final int channelId, final AMQShortString consumerTag)
-    {
-        _processedMethods.add(new AMQFrame(channelId, new BasicCancelOkBody(consumerTag)));
-    }
+        @Override
+        public void receiveBasicGetOk(final long deliveryTag,
+                                      final boolean redelivered,
+                                      final AMQShortString exchange,
+                                      final AMQShortString routingKey,
+                                      final long messageCount)
+        {
+            _processedMethods.add(new AMQFrame(_channelId, new BasicGetOkBody(deliveryTag,
+                                                                              redelivered,
+                                                                              exchange,
+                                                                              routingKey,
+                                                                              messageCount)));
+        }
 
-    @Override
-    public void receiveBasicPublish(final int channelId,
-                                    final AMQShortString exchange,
-                                    final AMQShortString routingKey,
-                                    final boolean mandatory,
-                                    final boolean immediate)
-    {
-        _processedMethods.add(new AMQFrame(channelId, new BasicPublishBody(0, exchange, routingKey, mandatory, immediate)));
-    }
+        @Override
+        public void receiveBasicGetEmpty()
+        {
+            _processedMethods.add(new AMQFrame(_channelId, new BasicGetEmptyBody((AMQShortString)null)));
+        }
 
-    @Override
-    public void receiveBasicReturn(final int channelId, final int replyCode,
-                                   final AMQShortString replyText,
-                                   final AMQShortString exchange,
-                                   final AMQShortString routingKey)
-    {
-        _processedMethods.add(new AMQFrame(channelId, new BasicReturnBody(replyCode, replyText, exchange, routingKey)));
-    }
+        @Override
+        public void receiveTxSelectOk()
+        {
+            _processedMethods.add(new AMQFrame(_channelId, TxSelectOkBody.INSTANCE));
+        }
 
-    @Override
-    public void receiveBasicDeliver(final int channelId,
-                                    final AMQShortString consumerTag,
-                                    final long deliveryTag,
-                                    final boolean redelivered,
-                                    final AMQShortString exchange,
-                                    final AMQShortString routingKey)
-    {
-        _processedMethods.add(new AMQFrame(channelId, new BasicDeliverBody(consumerTag, deliveryTag, redelivered, exchange, routingKey)));
-    }
+        @Override
+        public void receiveTxCommitOk()
+        {
+            _processedMethods.add(new AMQFrame(_channelId, TxCommitOkBody.INSTANCE));
+        }
 
-    @Override
-    public void receiveBasicGet(final int channelId, final AMQShortString queue, final boolean noAck)
-    {
-        _processedMethods.add(new AMQFrame(channelId, new BasicGetBody(0, queue, noAck)));
-    }
+        @Override
+        public void receiveTxRollbackOk()
+        {
+            _processedMethods.add(new AMQFrame(_channelId, TxRollbackOkBody.INSTANCE));
+        }
 
-    @Override
-    public void receiveBasicGetOk(final int channelId,
-                                  final long deliveryTag,
-                                  final boolean redelivered,
-                                  final AMQShortString exchange,
-                                  final AMQShortString routingKey,
-                                  final long messageCount)
-    {
-        _processedMethods.add(new AMQFrame(channelId, new BasicGetOkBody(deliveryTag, redelivered, exchange, routingKey, messageCount)));
-    }
+        @Override
+        public void receiveAccessRequest(final AMQShortString realm,
+                                         final boolean exclusive,
+                                         final boolean passive,
+                                         final boolean active,
+                                         final boolean write,
+                                         final boolean read)
+        {
+            _processedMethods.add(new AMQFrame(_channelId, new AccessRequestBody(realm,
+                                                                                 exclusive,
+                                                                                 passive,
+                                                                                 active,
+                                                                                 write,
+                                                                                 read)));
+        }
 
-    @Override
-    public void receiveBasicGetEmpty(final int channelId)
-    {
-        _processedMethods.add(new AMQFrame(channelId, new BasicGetEmptyBody((AMQShortString)null)));
-    }
+        @Override
+        public void receiveExchangeDeclare(final AMQShortString exchange,
+                                           final AMQShortString type,
+                                           final boolean passive,
+                                           final boolean durable,
+                                           final boolean autoDelete,
+                                           final boolean internal,
+                                           final boolean nowait,
+                                           final FieldTable arguments)
+        {
+            _processedMethods.add(new AMQFrame(_channelId, new ExchangeDeclareBody(0,
+                                                                                   exchange,
+                                                                                   type,
+                                                                                   passive,
+                                                                                   durable,
+                                                                                   autoDelete,
+                                                                                   internal,
+                                                                                   nowait,
+                                                                                   arguments)));
+        }
 
-    @Override
-    public void receiveBasicAck(final int channelId, final long deliveryTag, final boolean multiple)
-    {
-        _processedMethods.add(new AMQFrame(channelId, new BasicAckBody(deliveryTag, multiple)));
-    }
+        @Override
+        public void receiveExchangeDelete(final AMQShortString exchange, final boolean ifUnused, final boolean nowait)
+        {
+            _processedMethods.add(new AMQFrame(_channelId, new ExchangeDeleteBody(0, exchange, ifUnused, nowait)));
+        }
 
-    @Override
-    public void receiveBasicReject(final int channelId, final long deliveryTag, final boolean requeue)
-    {
-        _processedMethods.add(new AMQFrame(channelId, new BasicRejectBody(deliveryTag, requeue)));
-    }
+        @Override
+        public void receiveExchangeBound(final AMQShortString exchange,
+                                         final AMQShortString routingKey,
+                                         final AMQShortString queue)
+        {
+            _processedMethods.add(new AMQFrame(_channelId, new ExchangeBoundBody(exchange, routingKey, queue)));
+        }
 
-    @Override
-    public void receiveHeartbeat()
-    {
-        _processedMethods.add(new AMQFrame(0, new HeartbeatBody()));
-    }
+        @Override
+        public void receiveQueueDeclare(final AMQShortString queue,
+                                        final boolean passive,
+                                        final boolean durable,
+                                        final boolean exclusive,
+                                        final boolean autoDelete,
+                                        final boolean nowait,
+                                        final FieldTable arguments)
+        {
+            _processedMethods.add(new AMQFrame(_channelId, new QueueDeclareBody(0,
+                                                                                queue,
+                                                                                passive,
+                                                                                durable,
+                                                                                exclusive,
+                                                                                autoDelete,
+                                                                                nowait,
+                                                                                arguments)));
+        }
 
-    @Override
-    public ProtocolVersion getProtocolVersion()
-    {
-        return _protocolVersion;
-    }
+        @Override
+        public void receiveQueueBind(final AMQShortString queue,
+                                     final AMQShortString exchange,
+                                     final AMQShortString bindingKey,
+                                     final boolean nowait,
+                                     final FieldTable arguments)
+        {
+            _processedMethods.add(new AMQFrame(_channelId, new QueueBindBody(0,
+                                                                             queue,
+                                                                             exchange,
+                                                                             bindingKey,
+                                                                             nowait,
+                                                                             arguments)));
+        }
 
-    public void setProtocolVersion(final ProtocolVersion protocolVersion)
-    {
-        _protocolVersion = protocolVersion;
-    }
+        @Override
+        public void receiveQueuePurge(final AMQShortString queue, final boolean nowait)
+        {
+            _processedMethods.add(new AMQFrame(_channelId, new QueuePurgeBody(0, queue, nowait)));
+        }
 
-    @Override
-    public void receiveMessageContent(final int channelId, final byte[] data)
-    {
-        _processedMethods.add(new AMQFrame(channelId, new ContentBody(data)));
-    }
+        @Override
+        public void receiveQueueDelete(final AMQShortString queue,
+                                       final boolean ifUnused,
+                                       final boolean ifEmpty,
+                                       final boolean nowait)
+        {
+            _processedMethods.add(new AMQFrame(_channelId, new QueueDeleteBody(0, queue, ifUnused, ifEmpty, nowait)));
+        }
 
-    @Override
-    public void receiveMessageHeader(final int channelId,
-                                     final BasicContentHeaderProperties properties,
-                                     final long bodySize)
-    {
-        _processedMethods.add(new AMQFrame(channelId, new ContentHeaderBody(properties, bodySize)));
-    }
+        @Override
+        public void receiveQueueUnbind(final AMQShortString queue,
+                                       final AMQShortString exchange,
+                                       final AMQShortString bindingKey,
+                                       final FieldTable arguments)
+        {
+            _processedMethods.add(new AMQFrame(_channelId, new QueueUnbindBody(0,
+                                                                               queue,
+                                                                               exchange,
+                                                                               bindingKey,
+                                                                               arguments)));
+        }
 
-    @Override
-    public void receiveProtocolHeader(final ProtocolInitiation protocolInitiation)
-    {
-        _processedMethods.add(protocolInitiation);
-    }
+        @Override
+        public void receiveBasicRecover(final boolean requeue, final boolean sync)
+        {
+            if(ProtocolVersion.v8_0.equals(getProtocolVersion()) || !sync)
+            {
+                _processedMethods.add(new AMQFrame(_channelId, new BasicRecoverBody(requeue)));
+            }
+            else
+            {
+                _processedMethods.add(new AMQFrame(_channelId, new BasicRecoverSyncBody(getProtocolVersion(), requeue)));
+            }
+        }
 
-    @Override
-    public void setCurrentMethod(final int classId, final int methodId)
-    {
-        _classId = classId;
-        _methodId = methodId;
-    }
+        @Override
+        public void receiveBasicQos(final long prefetchSize, final int prefetchCount, final boolean global)
+        {
+            _processedMethods.add(new AMQFrame(_channelId, new BasicQosBody(prefetchSize, prefetchCount, global)));
+        }
 
-    public int getClassId()
-    {
-        return _classId;
-    }
+        @Override
+        public void receiveBasicConsume(final AMQShortString queue,
+                                        final AMQShortString consumerTag,
+                                        final boolean noLocal,
+                                        final boolean noAck,
+                                        final boolean exclusive,
+                                        final boolean nowait,
+                                        final FieldTable arguments)
+        {
+            _processedMethods.add(new AMQFrame(_channelId, new BasicConsumeBody(0,
+                                                                                queue,
+                                                                                consumerTag,
+                                                                                noLocal,
+                                                                                noAck,
+                                                                                exclusive,
+                                                                                nowait,
+                                                                                arguments)));
+        }
 
-    public int getMethodId()
-    {
-        return _methodId;
+        @Override
+        public void receiveBasicCancel(final AMQShortString consumerTag, final boolean noWait)
+        {
+            _processedMethods.add(new AMQFrame(_channelId, new BasicCancelBody(consumerTag, noWait)));
+        }
+
+        @Override
+        public void receiveBasicPublish(final AMQShortString exchange,
+                                        final AMQShortString routingKey,
+                                        final boolean mandatory,
+                                        final boolean immediate)
+        {
+            _processedMethods.add(new AMQFrame(_channelId, new BasicPublishBody(0,
+                                                                                exchange,
+                                                                                routingKey,
+                                                                                mandatory,
+                                                                                immediate)));
+        }
+
+        @Override
+        public void receiveBasicGet(final AMQShortString queue, final boolean noAck)
+        {
+            _processedMethods.add(new AMQFrame(_channelId, new BasicGetBody(0, queue, noAck)));
+        }
+
+        @Override
+        public void receiveBasicAck(final long deliveryTag, final boolean multiple)
+        {
+            _processedMethods.add(new AMQFrame(_channelId, new BasicAckBody(deliveryTag, multiple)));
+        }
+
+        @Override
+        public void receiveBasicReject(final long deliveryTag, final boolean requeue)
+        {
+            _processedMethods.add(new AMQFrame(_channelId, new BasicRejectBody(deliveryTag, requeue)));
+        }
+
+        @Override
+        public void receiveTxSelect()
+        {
+            _processedMethods.add(new AMQFrame(_channelId, TxSelectBody.INSTANCE));
+        }
+
+        @Override
+        public void receiveTxCommit()
+        {
+            _processedMethods.add(new AMQFrame(_channelId, TxCommitBody.INSTANCE));
+        }
+
+        @Override
+        public void receiveTxRollback()
+        {
+            _processedMethods.add(new AMQFrame(_channelId, TxRollbackBody.INSTANCE));
+        }
+
+        @Override
+        public void receiveChannelFlow(final boolean active)
+        {
+            _processedMethods.add(new AMQFrame(_channelId, new ChannelFlowBody(active)));
+        }
+
+        @Override
+        public void receiveChannelFlowOk(final boolean active)
+        {
+            _processedMethods.add(new AMQFrame(_channelId, new ChannelFlowOkBody(active)));
+        }
+
+        @Override
+        public void receiveChannelClose(final int replyCode,
+                                        final AMQShortString replyText,
+                                        final int classId,
+                                        final int methodId)
+        {
+            _processedMethods.add(new AMQFrame(_channelId, new ChannelCloseBody(replyCode, replyText, classId, methodId)));
+        }
+
+        @Override
+        public void receiveChannelCloseOk()
+        {
+            _processedMethods.add(new AMQFrame(_channelId, ChannelCloseOkBody.INSTANCE));
+        }
+
+        @Override
+        public void receiveMessageContent(final byte[] data)
+        {
+            _processedMethods.add(new AMQFrame(_channelId, new ContentBody(data)));
+        }
+
+        @Override
+        public void receiveMessageHeader(final BasicContentHeaderProperties properties, final long bodySize)
+        {
+            _processedMethods.add(new AMQFrame(_channelId, new ContentHeaderBody(properties, bodySize)));
+        }
+
+        @Override
+        public boolean ignoreAllButCloseOk()
+        {
+            return false;
+        }
     }
 }

Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/MethodProcessor.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/MethodProcessor.java?rev=1631275&r1=1631274&r2=1631275&view=diff
==============================================================================
--- qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/MethodProcessor.java (original)
+++ qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/MethodProcessor.java Mon Oct 13 00:58:45 2014
@@ -20,184 +20,21 @@
  */
 package org.apache.qpid.framing;
 
-public interface MethodProcessor
+public interface MethodProcessor<T extends ChannelMethodProcessor>
 {
     ProtocolVersion getProtocolVersion();
 
-    void receiveConnectionStart(short versionMajor,
-                                short versionMinor,
-                                FieldTable serverProperties,
-                                byte[] mechanisms,
-                                byte[] locales);
-
-    void receiveConnectionStartOk(FieldTable clientProperties,
-                                  AMQShortString mechanism,
-                                  byte[] response,
-                                  AMQShortString locale);
-
-    void receiveTxSelect(int channelId);
-
-    void receiveTxSelectOk(int channelId);
-
-    void receiveTxCommit(int channelId);
-
-    void receiveTxCommitOk(int channelId);
-
-    void receiveTxRollback(int channelId);
-
-    void receiveTxRollbackOk(int channelId);
-
-    void receiveConnectionSecure(byte[] challenge);
-
-    void receiveConnectionSecureOk(byte[] response);
-
-    void receiveConnectionTune(int channelMax, long frameMax, int heartbeat);
-
-    void receiveConnectionTuneOk(int channelMax, long frameMax, int heartbeat);
-
-    void receiveConnectionOpen(AMQShortString virtualHost, AMQShortString capabilities, boolean insist);
-
-    void receiveConnectionOpenOk(AMQShortString knownHosts);
-
-    void receiveConnectionRedirect(AMQShortString host, AMQShortString knownHosts);
+    T getChannelMethodProcessor(int channelId);
 
     void receiveConnectionClose(int replyCode, AMQShortString replyText, int classId, int methodId);
 
     void receiveConnectionCloseOk();
 
-    void receiveChannelOpen(int channelId);
-
-    void receiveChannelOpenOk(int channelId);
-
-    void receiveChannelFlow(int channelId, boolean active);
-
-    void receiveChannelFlowOk(int channelId, boolean active);
-
-    void receiveChannelAlert(int channelId, int replyCode, final AMQShortString replyText, FieldTable details);
-
-    void receiveChannelClose(int channelId, int replyCode, AMQShortString replyText, int classId, int methodId);
-
-    void receiveChannelCloseOk(int channelId);
-
-    void receiveAccessRequest(int channelId,
-                              AMQShortString realm,
-                              boolean exclusive,
-                              boolean passive,
-                              boolean active,
-                              boolean write, boolean read);
-
-    void receiveAccessRequestOk(int channelId, int ticket);
-
-    void receiveExchangeDeclare(int channelId,
-                                AMQShortString exchange,
-                                AMQShortString type,
-                                boolean passive,
-                                boolean durable,
-                                boolean autoDelete, boolean internal, boolean nowait, final FieldTable arguments);
-
-    void receiveExchangeDeclareOk(int channelId);
-
-    void receiveExchangeDelete(int channelId, AMQShortString exchange, boolean ifUnused, boolean nowait);
-
-    void receiveExchangeDeleteOk(int channelId);
-
-    void receiveExchangeBound(int channelId, AMQShortString exchange, AMQShortString routingKey, AMQShortString queue);
-
-    void receiveExchangeBoundOk(int channelId, int replyCode, AMQShortString replyText);
-
-    void receiveQueueBindOk(int channelId);
-
-    void receiveQueueUnbindOk(final int channelId);
-
-    void receiveQueueDeclare(int channelId,
-                             AMQShortString queue,
-                             boolean passive,
-                             boolean durable,
-                             boolean exclusive,
-                             boolean autoDelete, boolean nowait, FieldTable arguments);
-
-    void receiveQueueDeclareOk(int channelId, final AMQShortString queue, long messageCount, long consumerCount);
-
-    void receiveQueueBind(int channelId,
-                          AMQShortString queue,
-                          AMQShortString exchange,
-                          AMQShortString bindingKey,
-                          boolean nowait, FieldTable arguments);
-
-    void receiveQueuePurge(int channelId, AMQShortString queue, boolean nowait);
-
-    void receiveQueuePurgeOk(int channelId, long messageCount);
-
-    void receiveQueueDelete(int channelId, AMQShortString queue, boolean ifUnused, boolean ifEmpty, boolean nowait);
-
-    void receiveQueueDeleteOk(int channelId, long messageCount);
-
-    void receiveQueueUnbind(int channelId,
-                            AMQShortString queue,
-                            AMQShortString exchange,
-                            AMQShortString bindingKey,
-                            FieldTable arguments);
-
-    void receiveBasicRecoverSyncOk(int channelId);
-
-    void receiveBasicRecover(int channelId, final boolean requeue, boolean sync);
-
-    void receiveBasicQos(int channelId, long prefetchSize, int prefetchCount, boolean global);
-
-    void receiveBasicQosOk(int channelId);
-
-    void receiveBasicConsume(int channelId,
-                             AMQShortString queue,
-                             AMQShortString consumerTag,
-                             boolean noLocal,
-                             boolean noAck,
-                             boolean exclusive, boolean nowait, FieldTable arguments);
-
-    void receiveBasicConsumeOk(int channelId, AMQShortString consumerTag);
-
-    void receiveBasicCancel(int channelId, AMQShortString consumerTag, boolean noWait);
-
-    void receiveBasicCancelOk(int channelId, AMQShortString consumerTag);
-
-    void receiveBasicPublish(int channelId,
-                             AMQShortString exchange,
-                             AMQShortString routingKey,
-                             boolean mandatory,
-                             boolean immediate);
-
-    void receiveBasicReturn(final int channelId,
-                            int replyCode,
-                            AMQShortString replyText,
-                            AMQShortString exchange,
-                            AMQShortString routingKey);
-
-    void receiveBasicDeliver(int channelId,
-                             AMQShortString consumerTag,
-                             long deliveryTag,
-                             boolean redelivered,
-                             AMQShortString exchange, AMQShortString routingKey);
-
-    void receiveBasicGet(int channelId, AMQShortString queue, boolean noAck);
-
-    void receiveBasicGetOk(int channelId,
-                           long deliveryTag,
-                           boolean redelivered,
-                           AMQShortString exchange,
-                           AMQShortString routingKey, long messageCount);
-
-    void receiveBasicGetEmpty(int channelId);
-
-    void receiveBasicAck(int channelId, long deliveryTag, boolean multiple);
-
-    void receiveBasicReject(int channelId, long deliveryTag, boolean requeue);
-
     void receiveHeartbeat();
 
-    void receiveMessageContent(int channelId, byte[] data);
-
-    void receiveMessageHeader(int channelId, BasicContentHeaderProperties properties, long bodySize);
-
     void receiveProtocolHeader(ProtocolInitiation protocolInitiation);
 
     void setCurrentMethod(int classId, int methodId);
+
+    boolean ignoreAllButCloseOk();
 }

Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueueBindBody.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueueBindBody.java?rev=1631275&r1=1631274&r2=1631275&view=diff
==============================================================================
--- qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueueBindBody.java (original)
+++ qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueueBindBody.java Mon Oct 13 00:58:45 2014
@@ -165,9 +165,8 @@ public class QueueBindBody extends AMQMe
         return buf.toString();
     }
 
-    public static void process(final int channelId,
-                                final MarkableDataInput buffer,
-                                final MethodProcessor dispatcher) throws IOException, AMQFrameDecodingException
+    public static void process(final MarkableDataInput buffer,
+                               final ServerChannelMethodProcessor dispatcher) throws IOException, AMQFrameDecodingException
     {
 
         int ticket = buffer.readUnsignedShort();
@@ -176,6 +175,9 @@ public class QueueBindBody extends AMQMe
         AMQShortString bindingKey = buffer.readAMQShortString();
         boolean nowait = (buffer.readByte() & 0x01) == 0x01;
         FieldTable arguments = EncodingUtils.readFieldTable(buffer);
-        dispatcher.receiveQueueBind(channelId, queue, exchange, bindingKey, nowait, arguments);
+        if(!dispatcher.ignoreAllButCloseOk())
+        {
+            dispatcher.receiveQueueBind(queue, exchange, bindingKey, nowait, arguments);
+        }
     }
 }

Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueueDeclareBody.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueueDeclareBody.java?rev=1631275&r1=1631274&r2=1631275&view=diff
==============================================================================
--- qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueueDeclareBody.java (original)
+++ qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueueDeclareBody.java Mon Oct 13 00:58:45 2014
@@ -191,9 +191,8 @@ public class QueueDeclareBody extends AM
         return buf.toString();
     }
 
-    public static void process(final int channelId,
-                                final MarkableDataInput buffer,
-                                final MethodProcessor dispatcher) throws IOException, AMQFrameDecodingException
+    public static void process(final MarkableDataInput buffer,
+                               final ServerChannelMethodProcessor dispatcher) throws IOException, AMQFrameDecodingException
     {
 
         int ticket = buffer.readUnsignedShort();
@@ -206,6 +205,9 @@ public class QueueDeclareBody extends AM
         boolean autoDelete = (bitfield & 0x08 ) == 0x08;
         boolean nowait = (bitfield & 0x010 ) == 0x010;
         FieldTable arguments = EncodingUtils.readFieldTable(buffer);
-        dispatcher.receiveQueueDeclare(channelId, queue, passive, durable, exclusive, autoDelete, nowait, arguments);
+        if(!dispatcher.ignoreAllButCloseOk())
+        {
+            dispatcher.receiveQueueDeclare(queue, passive, durable, exclusive, autoDelete, nowait, arguments);
+        }
     }
 }

Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueueDeclareOkBody.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueueDeclareOkBody.java?rev=1631275&r1=1631274&r2=1631275&view=diff
==============================================================================
--- qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueueDeclareOkBody.java (original)
+++ qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueueDeclareOkBody.java Mon Oct 13 00:58:45 2014
@@ -120,13 +120,15 @@ public class QueueDeclareOkBody extends 
         return buf.toString();
     }
 
-    public static void process(final int channelId,
-                                final MarkableDataInput buffer,
-                                final MethodProcessor dispatcher) throws IOException
+    public static void process(final MarkableDataInput buffer,
+                               final ClientChannelMethodProcessor dispatcher) throws IOException
     {
         AMQShortString queue = buffer.readAMQShortString();
         long messageCount = EncodingUtils.readUnsignedInteger(buffer);
         long consumerCount = EncodingUtils.readUnsignedInteger(buffer);
-        dispatcher.receiveQueueDeclareOk(channelId, queue, messageCount, consumerCount);
+        if(!dispatcher.ignoreAllButCloseOk())
+        {
+            dispatcher.receiveQueueDeclareOk(queue, messageCount, consumerCount);
+        }
     }
 }

Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueueDeleteBody.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueueDeleteBody.java?rev=1631275&r1=1631274&r2=1631275&view=diff
==============================================================================
--- qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueueDeleteBody.java (original)
+++ qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueueDeleteBody.java Mon Oct 13 00:58:45 2014
@@ -151,9 +151,8 @@ public class QueueDeleteBody extends AMQ
         return buf.toString();
     }
 
-    public static void process(final int channelId,
-                                final MarkableDataInput buffer,
-                                final MethodProcessor dispatcher) throws IOException
+    public static void process(final MarkableDataInput buffer,
+                               final ServerChannelMethodProcessor dispatcher) throws IOException
     {
 
         int ticket = buffer.readUnsignedShort();
@@ -163,6 +162,9 @@ public class QueueDeleteBody extends AMQ
         boolean ifUnused = (bitfield & 0x01) == 0x01;
         boolean ifEmpty = (bitfield & 0x02) == 0x02;
         boolean nowait = (bitfield & 0x04) == 0x04;
-        dispatcher.receiveQueueDelete(channelId, queue, ifUnused, ifEmpty, nowait);
+        if(!dispatcher.ignoreAllButCloseOk())
+        {
+            dispatcher.receiveQueueDelete(queue, ifUnused, ifEmpty, nowait);
+        }
     }
 }

Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueueDeleteOkBody.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueueDeleteOkBody.java?rev=1631275&r1=1631274&r2=1631275&view=diff
==============================================================================
--- qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueueDeleteOkBody.java (original)
+++ qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueueDeleteOkBody.java Mon Oct 13 00:58:45 2014
@@ -95,11 +95,13 @@ public class QueueDeleteOkBody extends A
         return buf.toString();
     }
 
-    public static void process(final int channelId,
-                                final MarkableDataInput buffer,
-                                final MethodProcessor dispatcher) throws IOException
+    public static void process(final MarkableDataInput buffer,
+                               final ClientChannelMethodProcessor dispatcher) throws IOException
     {
         long messageCount = EncodingUtils.readUnsignedInteger(buffer);
-        dispatcher.receiveQueueDeleteOk(channelId, messageCount);
+        if(!dispatcher.ignoreAllButCloseOk())
+        {
+            dispatcher.receiveQueueDeleteOk(messageCount);
+        }
     }
 }

Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueuePurgeBody.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueuePurgeBody.java?rev=1631275&r1=1631274&r2=1631275&view=diff
==============================================================================
--- qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueuePurgeBody.java (original)
+++ qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueuePurgeBody.java Mon Oct 13 00:58:45 2014
@@ -125,14 +125,16 @@ public class QueuePurgeBody extends AMQM
         return buf.toString();
     }
 
-    public static void process(final int channelId,
-                                final MarkableDataInput buffer,
-                                final MethodProcessor dispatcher) throws IOException
+    public static void process(final MarkableDataInput buffer,
+                               final ServerChannelMethodProcessor dispatcher) throws IOException
     {
 
         int ticket = buffer.readUnsignedShort();
         AMQShortString queue = buffer.readAMQShortString();
         boolean nowait = (buffer.readByte() & 0x01) == 0x01;
-        dispatcher.receiveQueuePurge(channelId, queue, nowait);
+        if(!dispatcher.ignoreAllButCloseOk())
+        {
+            dispatcher.receiveQueuePurge(queue, nowait);
+        }
     }
 }

Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueuePurgeOkBody.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueuePurgeOkBody.java?rev=1631275&r1=1631274&r2=1631275&view=diff
==============================================================================
--- qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueuePurgeOkBody.java (original)
+++ qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueuePurgeOkBody.java Mon Oct 13 00:58:45 2014
@@ -95,11 +95,13 @@ public class QueuePurgeOkBody extends AM
         return buf.toString();
     }
 
-    public static void process(final int channelId,
-                                final MarkableDataInput buffer,
-                                final MethodProcessor dispatcher) throws IOException
+    public static void process(final MarkableDataInput buffer,
+                               final ClientChannelMethodProcessor dispatcher) throws IOException
     {
         long messageCount = EncodingUtils.readUnsignedInteger(buffer);
-        dispatcher.receiveQueuePurgeOk(channelId, messageCount);
+        if(!dispatcher.ignoreAllButCloseOk())
+        {
+            dispatcher.receiveQueuePurgeOk(messageCount);
+        }
     }
 }

Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueueUnbindBody.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueueUnbindBody.java?rev=1631275&r1=1631274&r2=1631275&view=diff
==============================================================================
--- qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueueUnbindBody.java (original)
+++ qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueueUnbindBody.java Mon Oct 13 00:58:45 2014
@@ -147,9 +147,8 @@ public class QueueUnbindBody extends AMQ
         return buf.toString();
     }
 
-    public static void process(final int channelId,
-                                final MarkableDataInput buffer,
-                                final MethodProcessor dispatcher) throws IOException, AMQFrameDecodingException
+    public static void process(final MarkableDataInput buffer,
+                               final ServerChannelMethodProcessor dispatcher) throws IOException, AMQFrameDecodingException
     {
 
         int ticket = buffer.readUnsignedShort();
@@ -157,6 +156,9 @@ public class QueueUnbindBody extends AMQ
         AMQShortString exchange = buffer.readAMQShortString();
         AMQShortString routingKey = buffer.readAMQShortString();
         FieldTable arguments = EncodingUtils.readFieldTable(buffer);
-        dispatcher.receiveQueueUnbind(channelId, queue, exchange, routingKey, arguments);
+        if(!dispatcher.ignoreAllButCloseOk())
+        {
+            dispatcher.receiveQueueUnbind(queue, exchange, routingKey, arguments);
+        }
     }
 }

Added: qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ServerChannelMethodProcessor.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ServerChannelMethodProcessor.java?rev=1631275&view=auto
==============================================================================
--- qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ServerChannelMethodProcessor.java (added)
+++ qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ServerChannelMethodProcessor.java Mon Oct 13 00:58:45 2014
@@ -0,0 +1,92 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.framing;
+
+public interface ServerChannelMethodProcessor extends ChannelMethodProcessor
+{
+    void receiveAccessRequest(AMQShortString realm,
+                              boolean exclusive,
+                              boolean passive,
+                              boolean active,
+                              boolean write, boolean read);
+
+    void receiveExchangeDeclare(AMQShortString exchange,
+                                AMQShortString type,
+                                boolean passive,
+                                boolean durable,
+                                boolean autoDelete, boolean internal, boolean nowait, final FieldTable arguments);
+
+    void receiveExchangeDelete(AMQShortString exchange, boolean ifUnused, boolean nowait);
+
+    void receiveExchangeBound(AMQShortString exchange, AMQShortString routingKey, AMQShortString queue);
+
+    void receiveQueueDeclare(AMQShortString queue,
+                             boolean passive,
+                             boolean durable,
+                             boolean exclusive,
+                             boolean autoDelete, boolean nowait, FieldTable arguments);
+
+    void receiveQueueBind(AMQShortString queue,
+                          AMQShortString exchange,
+                          AMQShortString bindingKey,
+                          boolean nowait, FieldTable arguments);
+
+    void receiveQueuePurge(AMQShortString queue, boolean nowait);
+
+    void receiveQueueDelete(AMQShortString queue, boolean ifUnused, boolean ifEmpty, boolean nowait);
+
+    void receiveQueueUnbind(AMQShortString queue,
+                            AMQShortString exchange,
+                            AMQShortString bindingKey,
+                            FieldTable arguments);
+
+    void receiveBasicRecover(final boolean requeue, boolean sync);
+
+    void receiveBasicQos(long prefetchSize, int prefetchCount, boolean global);
+
+    void receiveBasicConsume(AMQShortString queue,
+                             AMQShortString consumerTag,
+                             boolean noLocal,
+                             boolean noAck,
+                             boolean exclusive, boolean nowait, FieldTable arguments);
+
+    void receiveBasicCancel(AMQShortString consumerTag, boolean noWait);
+
+    void receiveBasicPublish(AMQShortString exchange,
+                             AMQShortString routingKey,
+                             boolean mandatory,
+                             boolean immediate);
+
+    void receiveBasicGet(AMQShortString queue, boolean noAck);
+
+    void receiveBasicAck(long deliveryTag, boolean multiple);
+
+    void receiveBasicReject(long deliveryTag, boolean requeue);
+
+
+
+    void receiveTxSelect();
+
+    void receiveTxCommit();
+
+    void receiveTxRollback();
+
+}

Propchange: qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ServerChannelMethodProcessor.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ServerMethodProcessor.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ServerMethodProcessor.java?rev=1631275&view=auto
==============================================================================
--- qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ServerMethodProcessor.java (added)
+++ qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ServerMethodProcessor.java Mon Oct 13 00:58:45 2014
@@ -0,0 +1,39 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.framing;
+
+public interface ServerMethodProcessor<T extends ServerChannelMethodProcessor> extends MethodProcessor<T>
+{
+    void receiveConnectionStartOk(FieldTable clientProperties,
+                                  AMQShortString mechanism,
+                                  byte[] response,
+                                  AMQShortString locale);
+
+    void receiveConnectionSecureOk(byte[] response);
+
+    void receiveConnectionTuneOk(int channelMax, long frameMax, int heartbeat);
+
+    void receiveConnectionOpen(AMQShortString virtualHost, AMQShortString capabilities, boolean insist);
+
+    void receiveChannelOpen(int channelId);
+
+
+}

Propchange: qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ServerMethodProcessor.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/transport/util/Functions.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/transport/util/Functions.java?rev=1631275&r1=1631274&r2=1631275&view=diff
==============================================================================
--- qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/transport/util/Functions.java (original)
+++ qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/transport/util/Functions.java Mon Oct 13 00:58:45 2014
@@ -20,10 +20,10 @@
  */
 package org.apache.qpid.transport.util;
 
-import java.nio.ByteBuffer;
-
 import static java.lang.Math.min;
 
+import java.nio.ByteBuffer;
+
 
 /**
  * Functions
@@ -33,6 +33,9 @@ import static java.lang.Math.min;
 
 public final class Functions
 {
+    private static final char[] HEX_CHARACTERS =
+            {'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f'};
+
     private Functions()
     {
     }
@@ -102,4 +105,21 @@ public final class Functions
         return str(ByteBuffer.wrap(bytes), limit);
     }
 
+    public static String hex(byte[] bytes, int limit)
+    {
+        limit = Math.min(limit, bytes == null ? 0 : bytes.length);
+        StringBuilder sb = new StringBuilder(3 + limit*2);
+        for(int i = 0; i < limit; i++)
+        {
+            sb.append(HEX_CHARACTERS[(((int)bytes[i]) & 0xf0)>>4]);
+            sb.append(HEX_CHARACTERS[(((int)bytes[i]) & 0x0f)]);
+
+        }
+        if(bytes != null && bytes.length>limit)
+        {
+            sb.append("...");
+        }
+        return sb.toString();
+    }
+
 }

Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/test/java/org/apache/qpid/codec/AMQDecoderTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/test/java/org/apache/qpid/codec/AMQDecoderTest.java?rev=1631275&r1=1631274&r2=1631275&view=diff
==============================================================================
--- qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/test/java/org/apache/qpid/codec/AMQDecoderTest.java (original)
+++ qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/test/java/org/apache/qpid/codec/AMQDecoderTest.java Mon Oct 13 00:58:45 2014
@@ -47,7 +47,7 @@ public class AMQDecoderTest extends Test
     public void setUp()
     {
         _methodProcessor = new FrameCreatingMethodProcessor(ProtocolVersion.v0_91);
-        _decoder = new AMQDecoder(false, _methodProcessor);
+        _decoder = new ClientDecoder(_methodProcessor);
     }
    
     

Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/systests/src/test/java/org/apache/qpid/transport/MaxFrameSizeTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/systests/src/test/java/org/apache/qpid/transport/MaxFrameSizeTest.java?rev=1631275&r1=1631274&r2=1631275&view=diff
==============================================================================
--- qpid/branches/QPID-6125-ProtocolRefactoring/java/systests/src/test/java/org/apache/qpid/transport/MaxFrameSizeTest.java (original)
+++ qpid/branches/QPID-6125-ProtocolRefactoring/java/systests/src/test/java/org/apache/qpid/transport/MaxFrameSizeTest.java Mon Oct 13 00:58:45 2014
@@ -26,6 +26,7 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.net.Socket;
+import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
 import java.util.Collections;
 import java.util.List;
@@ -40,13 +41,13 @@ import javax.security.sasl.Sasl;
 import javax.security.sasl.SaslClient;
 import javax.security.sasl.SaslException;
 
+import org.apache.qpid.codec.AMQDecoder;
+import org.apache.qpid.codec.ClientDecoder;
 import org.apache.qpid.framing.AMQDataBlock;
-import org.apache.qpid.framing.AMQDataBlockDecoder;
 import org.apache.qpid.framing.AMQFrame;
 import org.apache.qpid.framing.AMQFrameDecodingException;
 import org.apache.qpid.framing.AMQProtocolVersionException;
 import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.ByteArrayDataInput;
 import org.apache.qpid.framing.ConnectionCloseBody;
 import org.apache.qpid.framing.ConnectionStartOkBody;
 import org.apache.qpid.framing.ConnectionTuneOkBody;
@@ -234,14 +235,9 @@ public class MaxFrameSizeTest extends Qp
         }
 
         byte[] serverData = baos.toByteArray();
-        ByteArrayDataInput badi = new ByteArrayDataInput(serverData);
-        AMQDataBlockDecoder datablockDecoder = new AMQDataBlockDecoder();
         final FrameCreatingMethodProcessor methodProcessor = new FrameCreatingMethodProcessor(ProtocolVersion.v0_91);
-
-        while (datablockDecoder.decodable(badi))
-        {
-            datablockDecoder.processInput(methodProcessor, badi);
-        }
+        AMQDecoder decoder = new ClientDecoder(methodProcessor);
+        decoder.decodeBuffer(ByteBuffer.wrap(serverData));
 
         evaluator.evaluate(socket, methodProcessor.getProcessedMethods());
     }



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message