activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject activemq git commit: https://issues.apache.org/jira/browse/AMQ-5707
Date Tue, 07 Apr 2015 14:35:27 GMT
Repository: activemq
Updated Branches:
  refs/heads/master 61da1faa4 -> 25c99a6c3


https://issues.apache.org/jira/browse/AMQ-5707

Add support for aborting a slow AMQP consumer and some testing.  

Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/25c99a6c
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/25c99a6c
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/25c99a6c

Branch: refs/heads/master
Commit: 25c99a6c367fe3c80cfee24a8118f6b782b4f69e
Parents: 61da1fa
Author: Timothy Bish <tabish121@gmail.com>
Authored: Tue Apr 7 10:25:19 2015 -0400
Committer: Timothy Bish <tabish121@gmail.com>
Committed: Tue Apr 7 10:32:49 2015 -0400

----------------------------------------------------------------------
 .../transport/amqp/protocol/AmqpConnection.java |   7 +
 .../transport/amqp/protocol/AmqpSender.java     |  17 ++
 .../transport/amqp/AmqpTestSupport.java         |   6 +
 .../amqp/interop/AmqpReceiverTest.java          |   1 -
 .../amqp/interop/AmqpSlowReceiverTest.java      | 166 +++++++++++++++++++
 .../apache/activemq/command/BaseCommand.java    |  50 ++++--
 .../org/apache/activemq/command/Command.java    |   6 +-
 .../activemq/command/ConsumerControl.java       |  29 +++-
 .../apache/activemq/command/PartialCommand.java |  38 ++++-
 .../apache/activemq/command/WireFormatInfo.java |  43 ++++-
 .../activemq/transport/stomp/StompFrame.java    |  29 +++-
 11 files changed, 349 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/25c99a6c/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpConnection.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpConnection.java
b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpConnection.java
index 0edc62f..c977c8f 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpConnection.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpConnection.java
@@ -46,6 +46,7 @@ import org.apache.activemq.command.Command;
 import org.apache.activemq.command.ConnectionError;
 import org.apache.activemq.command.ConnectionId;
 import org.apache.activemq.command.ConnectionInfo;
+import org.apache.activemq.command.ConsumerControl;
 import org.apache.activemq.command.ConsumerId;
 import org.apache.activemq.command.DestinationInfo;
 import org.apache.activemq.command.ExceptionResponse;
@@ -538,6 +539,12 @@ public class AmqpConnection implements AmqpProtocolConverter {
             // Pass down any unexpected async errors. Should this close the connection?
             Throwable exception = ((ConnectionError) command).getException();
             handleException(exception);
+        } else if (command.isConsumerControl()) {
+            ConsumerControl control = (ConsumerControl) command;
+            AmqpSender sender = subscriptionsByConsumerId.get(control.getConsumerId());
+            if (sender != null) {
+                sender.onConsumerControl(control);
+            }
         } else if (command.isBrokerInfo()) {
             // ignore
         } else {

http://git-wip-us.apache.org/repos/asf/activemq/blob/25c99a6c/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java
b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java
index 0096334..f273cee 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java
@@ -46,7 +46,9 @@ import org.apache.qpid.proton.amqp.messaging.Outcome;
 import org.apache.qpid.proton.amqp.messaging.Rejected;
 import org.apache.qpid.proton.amqp.messaging.Released;
 import org.apache.qpid.proton.amqp.transaction.TransactionalState;
+import org.apache.qpid.proton.amqp.transport.AmqpError;
 import org.apache.qpid.proton.amqp.transport.DeliveryState;
+import org.apache.qpid.proton.amqp.transport.ErrorCondition;
 import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
 import org.apache.qpid.proton.engine.Delivery;
 import org.apache.qpid.proton.engine.Sender;
@@ -309,6 +311,21 @@ public class AmqpSender extends AmqpAbstractLink<Sender> {
         }
     }
 
+    /**
+     * Called when the Broker sends a ConsumerControl command to the Consumer that
+     * this sender creates to obtain messages to dispatch via the sender for this
+     * end of the open link.
+     *
+     * @param control
+     *        The ConsumerControl command to process.
+     */
+    public void onConsumerControl(ConsumerControl control) {
+        if (control.isClose()) {
+            close(new ErrorCondition(AmqpError.INTERNAL_ERROR, "Receiver forcably closed"));
+            session.pumpProtonToSocket();
+        }
+    }
+
     @Override
     public String toString() {
         return "AmqpSender {" + getConsumerId() + "}";

http://git-wip-us.apache.org/repos/asf/activemq/blob/25c99a6c/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java
index 4f1c861..4d042b1 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java
@@ -107,6 +107,8 @@ public class AmqpTestSupport {
         brokerService.setUseJmx(true);
         brokerService.getManagementContext().setCreateConnector(false);
 
+        performAdditionalConfiguration(brokerService);
+
         SSLContext ctx = SSLContext.getInstance("TLS");
         ctx.init(new KeyManager[0], new TrustManager[]{new DefaultTrustManager()}, new SecureRandom());
         SSLContext.setDefault(ctx);
@@ -132,6 +134,10 @@ public class AmqpTestSupport {
         addTranportConnectors();
     }
 
+    protected void performAdditionalConfiguration(BrokerService brokerService) throws Exception
{
+
+    }
+
     protected void addTranportConnectors() throws Exception {
         TransportConnector connector = null;
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/25c99a6c/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverTest.java
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverTest.java
index b1ca527..30571f7 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverTest.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverTest.java
@@ -339,7 +339,6 @@ public class AmqpReceiverTest extends AmqpClientTestSupport {
         connection.close();
     }
 
-    //@Ignore("Test fails currently due to improper implementation of drain.")
     @Test(timeout = 60000)
     public void testReceiverCanDrainMessages() throws Exception {
         int MSG_COUNT = 20;

http://git-wip-us.apache.org/repos/asf/activemq/blob/25c99a6c/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSlowReceiverTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSlowReceiverTest.java
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSlowReceiverTest.java
new file mode 100644
index 0000000..87f8741
--- /dev/null
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSlowReceiverTest.java
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.interop;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.lang.reflect.UndeclaredThrowableException;
+import java.util.concurrent.TimeUnit;
+
+import javax.management.InstanceNotFoundException;
+import javax.management.ObjectName;
+import javax.management.openmbean.CompositeData;
+import javax.management.openmbean.TabularData;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.jmx.AbortSlowConsumerStrategyViewMBean;
+import org.apache.activemq.broker.jmx.QueueViewMBean;
+import org.apache.activemq.broker.region.policy.AbortSlowConsumerStrategy;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.transport.amqp.client.AmqpClient;
+import org.apache.activemq.transport.amqp.client.AmqpClientTestSupport;
+import org.apache.activemq.transport.amqp.client.AmqpConnection;
+import org.apache.activemq.transport.amqp.client.AmqpMessage;
+import org.apache.activemq.transport.amqp.client.AmqpReceiver;
+import org.apache.activemq.transport.amqp.client.AmqpSession;
+import org.apache.activemq.util.Wait;
+import org.junit.Test;
+
+/**
+ * Test the handling of consumer abort when the AbortSlowConsumerStrategy is used.
+ */
+public class AmqpSlowReceiverTest extends AmqpClientTestSupport {
+
+    private final long DEFAULT_CHECK_PERIOD = 1000;
+    private final long DEFAULT_MAX_SLOW_DURATION = 3000;
+
+    private AbortSlowConsumerStrategy strategy;
+
+    @Test(timeout = 60 * 1000)
+    public void testSlowConsumerIsAborted() throws Exception {
+
+        AmqpClient client = createAmqpClient();
+        AmqpConnection connection = client.connect();
+        AmqpSession session = connection.createSession();
+        final AmqpReceiver receiver = session.createReceiver("queue://" + getTestName());
+        receiver.flow(100);
+
+        assertEquals(1, brokerService.getAdminView().getQueueSubscribers().length);
+
+        sendMessages(getTestName(), 100, false);
+
+        AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
+        assertNotNull(message);
+        message.accept();
+
+        assertTrue("Receiver should be closed", Wait.waitFor(new Wait.Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return receiver.isClosed();
+            }
+        }));
+
+        assertEquals(0, brokerService.getAdminView().getQueueSubscribers().length);
+    }
+
+    @Test
+    public void testSlowConsumerIsAbortedViaJmx() throws Exception {
+        strategy.setMaxSlowDuration(60*1000); // so jmx does the abort
+
+        AmqpClient client = createAmqpClient();
+        AmqpConnection connection = client.connect();
+        AmqpSession session = connection.createSession();
+        final AmqpReceiver receiver = session.createReceiver("queue://" + getTestName());
+        receiver.flow(100);
+
+        sendMessages(getTestName(), 100, false);
+
+        AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
+        assertNotNull(message);
+        message.accept();
+
+        QueueViewMBean queue = getProxyToQueue(getTestName());
+        ObjectName slowConsumerPolicyMBeanName = queue.getSlowConsumerStrategy();
+        assertNotNull(slowConsumerPolicyMBeanName);
+
+        AbortSlowConsumerStrategyViewMBean abortPolicy = (AbortSlowConsumerStrategyViewMBean)
+                brokerService.getManagementContext().newProxyInstance(slowConsumerPolicyMBeanName,
AbortSlowConsumerStrategyViewMBean.class, true);
+
+        TimeUnit.SECONDS.sleep(3);
+
+        TabularData slowOnes = abortPolicy.getSlowConsumers();
+        assertEquals("one slow consumers", 1, slowOnes.size());
+
+        LOG.info("slow ones:"  + slowOnes);
+
+        CompositeData slowOne = (CompositeData) slowOnes.values().iterator().next();
+        LOG.info("Slow one: " + slowOne);
+
+        assertTrue("we have an object name", slowOne.get("subscription") instanceof ObjectName);
+        abortPolicy.abortConsumer((ObjectName)slowOne.get("subscription"));
+
+        assertTrue("Receiver should be closed", Wait.waitFor(new Wait.Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return receiver.isClosed();
+            }
+        }));
+
+        slowOnes = abortPolicy.getSlowConsumers();
+        assertEquals("no slow consumers left", 0, slowOnes.size());
+
+        // verify mbean gone with destination
+        brokerService.getAdminView().removeQueue(getTestName());
+
+        try {
+            abortPolicy.getSlowConsumers();
+            fail("expect not found post destination removal");
+        } catch(UndeclaredThrowableException expected) {
+            assertTrue("correct exception: " + expected.getCause(),
+                    expected.getCause() instanceof InstanceNotFoundException);
+        }
+    }
+
+    @Override
+    protected boolean isUseOpenWireConnector() {
+        return true;
+    }
+
+    @Override
+    protected void performAdditionalConfiguration(BrokerService brokerService) throws Exception
{
+        strategy = new AbortSlowConsumerStrategy();
+        strategy.setAbortConnection(false);
+        strategy.setCheckPeriod(DEFAULT_CHECK_PERIOD);
+        strategy.setMaxSlowDuration(DEFAULT_MAX_SLOW_DURATION);
+
+        PolicyEntry policy = new PolicyEntry();
+        policy.setSlowConsumerStrategy(strategy);
+        policy.setQueuePrefetch(10);
+        policy.setTopicPrefetch(10);
+
+        PolicyMap pMap = new PolicyMap();
+        pMap.setDefaultEntry(policy);
+
+        brokerService.setDestinationPolicy(pMap);
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/25c99a6c/activemq-client/src/main/java/org/apache/activemq/command/BaseCommand.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/command/BaseCommand.java b/activemq-client/src/main/java/org/apache/activemq/command/BaseCommand.java
index 66243fa..68e9ba3 100755
--- a/activemq-client/src/main/java/org/apache/activemq/command/BaseCommand.java
+++ b/activemq-client/src/main/java/org/apache/activemq/command/BaseCommand.java
@@ -17,34 +17,36 @@
 package org.apache.activemq.command;
 
 import java.util.Map;
-import org.apache.activemq.util.IntrospectionSupport;
 
+import org.apache.activemq.util.IntrospectionSupport;
 
 /**
- * 
+ *
  * @openwire:marshaller
- * 
+ *
  */
 public abstract class BaseCommand implements Command {
 
     protected int commandId;
     protected boolean responseRequired;
-    
+
     private transient Endpoint from;
     private transient Endpoint to;
-    
+
     public void copy(BaseCommand copy) {
         copy.commandId = commandId;
         copy.responseRequired = responseRequired;
-    }    
+    }
 
     /**
      * @openwire:property version=1
      */
+    @Override
     public int getCommandId() {
         return commandId;
     }
 
+    @Override
     public void setCommandId(int commandId) {
         this.commandId = commandId;
     }
@@ -52,10 +54,12 @@ public abstract class BaseCommand implements Command {
     /**
      * @openwire:property version=1
      */
+    @Override
     public boolean isResponseRequired() {
         return responseRequired;
     }
 
+    @Override
     public void setResponseRequired(boolean responseRequired) {
         this.responseRequired = responseRequired;
     }
@@ -64,72 +68,90 @@ public abstract class BaseCommand implements Command {
     public String toString() {
         return toString(null);
     }
-    
-    public String toString(Map<String, Object>overrideFields) {
-    	return IntrospectionSupport.toString(this, BaseCommand.class, overrideFields);
+
+    public String toString(Map<String, Object> overrideFields) {
+        return IntrospectionSupport.toString(this, BaseCommand.class, overrideFields);
     }
-    
+
+    @Override
     public boolean isWireFormatInfo() {
         return false;
     }
 
+    @Override
     public boolean isBrokerInfo() {
         return false;
     }
 
+    @Override
     public boolean isResponse() {
         return false;
     }
 
+    @Override
     public boolean isMessageDispatch() {
         return false;
     }
 
+    @Override
     public boolean isMessage() {
         return false;
     }
 
+    @Override
     public boolean isMarshallAware() {
         return false;
     }
 
+    @Override
     public boolean isMessageAck() {
         return false;
     }
 
+    @Override
     public boolean isMessageDispatchNotification() {
         return false;
     }
 
+    @Override
     public boolean isShutdownInfo() {
         return false;
     }
-    
+
+    @Override
     public boolean isConnectionControl() {
         return false;
     }
 
+    @Override
+    public boolean isConsumerControl() {
+        return false;
+    }
+
     /**
      * The endpoint within the transport where this message came from.
      */
+    @Override
     public Endpoint getFrom() {
         return from;
     }
 
+    @Override
     public void setFrom(Endpoint from) {
         this.from = from;
     }
 
     /**
-     * The endpoint within the transport where this message is going to - null means all
endpoints.
+     * The endpoint within the transport where this message is going to - null
+     * means all endpoints.
      */
+    @Override
     public Endpoint getTo() {
         return to;
     }
 
+    @Override
     public void setTo(Endpoint to) {
         this.to = to;
     }
-    
-    
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/25c99a6c/activemq-client/src/main/java/org/apache/activemq/command/Command.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/command/Command.java b/activemq-client/src/main/java/org/apache/activemq/command/Command.java
index 125f4ac..585e0f7 100755
--- a/activemq-client/src/main/java/org/apache/activemq/command/Command.java
+++ b/activemq-client/src/main/java/org/apache/activemq/command/Command.java
@@ -21,8 +21,6 @@ import org.apache.activemq.state.CommandVisitor;
 /**
  * The Command Pattern so that we can send and receive commands on the different
  * transports
- * 
- * 
  */
 public interface Command extends DataStructure {
 
@@ -52,9 +50,11 @@ public interface Command extends DataStructure {
     boolean isMessageDispatchNotification();
 
     boolean isShutdownInfo();
-    
+
     boolean isConnectionControl();
 
+    boolean isConsumerControl();
+
     Response visit(CommandVisitor visitor) throws Exception;
 
     /**

http://git-wip-us.apache.org/repos/asf/activemq/blob/25c99a6c/activemq-client/src/main/java/org/apache/activemq/command/ConsumerControl.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/command/ConsumerControl.java
b/activemq-client/src/main/java/org/apache/activemq/command/ConsumerControl.java
index 6e7bc17..9fcbc4c 100644
--- a/activemq-client/src/main/java/org/apache/activemq/command/ConsumerControl.java
+++ b/activemq-client/src/main/java/org/apache/activemq/command/ConsumerControl.java
@@ -20,9 +20,9 @@ import org.apache.activemq.state.CommandVisitor;
 
 /**
  * Used to start and stop transports as well as terminating clients.
- * 
+ *
  * @openwire:marshaller code="17"
- * 
+ *
  */
 public class ConsumerControl extends BaseCommand {
 
@@ -48,14 +48,21 @@ public class ConsumerControl extends BaseCommand {
         this.destination = destination;
     }
 
+    @Override
     public byte getDataStructureType() {
         return DATA_STRUCTURE_TYPE;
     }
 
+    @Override
     public Response visit(CommandVisitor visitor) throws Exception {
         return visitor.processConsumerControl(this);
     }
 
+    @Override
+    public boolean isConsumerControl() {
+        return true;
+    }
+
     /**
      * @openwire:property version=1
      * @return Returns the close.
@@ -65,7 +72,8 @@ public class ConsumerControl extends BaseCommand {
     }
 
     /**
-     * @param close The close to set.
+     * @param close
+     *        The new value to assign the close state flag.
      */
     public void setClose(boolean close) {
         this.close = close;
@@ -80,7 +88,8 @@ public class ConsumerControl extends BaseCommand {
     }
 
     /**
-     * @param consumerId The consumerId to set.
+     * @param consumerId
+     *        The consumerId to set.
      */
     public void setConsumerId(ConsumerId consumerId) {
         this.consumerId = consumerId;
@@ -95,7 +104,8 @@ public class ConsumerControl extends BaseCommand {
     }
 
     /**
-     * @param prefetch The prefetch to set.
+     * @param prefetch
+     *        The prefetch to set.
      */
     public void setPrefetch(int prefetch) {
         this.prefetch = prefetch;
@@ -110,7 +120,8 @@ public class ConsumerControl extends BaseCommand {
     }
 
     /**
-     * @param flush the flush to set
+     * @param flush
+     *        The flush value to set on this command.
      */
     public void setFlush(boolean flush) {
         this.flush = flush;
@@ -125,7 +136,8 @@ public class ConsumerControl extends BaseCommand {
     }
 
     /**
-     * @param start the start to set
+     * @param start
+     *        The start value to set on this command.
      */
     public void setStart(boolean start) {
         this.start = start;
@@ -140,7 +152,8 @@ public class ConsumerControl extends BaseCommand {
     }
 
     /**
-     * @param stop the stop to set
+     * @param stop
+     *        the stop value to set on this Command.
      */
     public void setStop(boolean stop) {
         this.stop = stop;

http://git-wip-us.apache.org/repos/asf/activemq/blob/25c99a6c/activemq-client/src/main/java/org/apache/activemq/command/PartialCommand.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/command/PartialCommand.java
b/activemq-client/src/main/java/org/apache/activemq/command/PartialCommand.java
index 4069197..9a75a4d 100644
--- a/activemq-client/src/main/java/org/apache/activemq/command/PartialCommand.java
+++ b/activemq-client/src/main/java/org/apache/activemq/command/PartialCommand.java
@@ -21,9 +21,9 @@ import org.apache.activemq.state.CommandVisitor;
 /**
  * Represents a partial command; a large command that has been split up into
  * pieces.
- * 
+ *
  * @openwire:marshaller code="60"
- * 
+ *
  */
 public class PartialCommand implements Command {
 
@@ -38,6 +38,7 @@ public class PartialCommand implements Command {
     public PartialCommand() {
     }
 
+    @Override
     public byte getDataStructureType() {
         return DATA_STRUCTURE_TYPE;
     }
@@ -45,17 +46,19 @@ public class PartialCommand implements Command {
     /**
      * @openwire:property version=1
      */
+    @Override
     public int getCommandId() {
         return commandId;
     }
 
+    @Override
     public void setCommandId(int commandId) {
         this.commandId = commandId;
     }
 
     /**
      * The data for this part of the command
-     * 
+     *
      * @openwire:property version=1 mandatory=true
      */
     public byte[] getData() {
@@ -66,79 +69,102 @@ public class PartialCommand implements Command {
         this.data = data;
     }
 
+    @Override
     public Endpoint getFrom() {
         return from;
     }
 
+    @Override
     public void setFrom(Endpoint from) {
         this.from = from;
     }
 
+    @Override
     public Endpoint getTo() {
         return to;
     }
 
+    @Override
     public void setTo(Endpoint to) {
         this.to = to;
     }
 
+    @Override
     public Response visit(CommandVisitor visitor) throws Exception {
         throw new IllegalStateException("The transport layer should filter out PartialCommand
instances but received: " + this);
     }
 
+    @Override
     public boolean isResponseRequired() {
         return false;
     }
 
+    @Override
     public boolean isResponse() {
         return false;
     }
 
+    @Override
     public boolean isBrokerInfo() {
         return false;
     }
 
+    @Override
     public boolean isMessageDispatch() {
         return false;
     }
 
+    @Override
     public boolean isMessage() {
         return false;
     }
 
+    @Override
     public boolean isMessageAck() {
         return false;
     }
 
+    @Override
     public boolean isMessageDispatchNotification() {
         return false;
     }
 
+    @Override
     public boolean isShutdownInfo() {
         return false;
     }
-    
+
+    @Override
     public boolean isConnectionControl() {
         return false;
     }
 
+    @Override
+    public boolean isConsumerControl() {
+        return false;
+    }
+
+    @Override
     public void setResponseRequired(boolean responseRequired) {
     }
 
+    @Override
     public boolean isWireFormatInfo() {
         return false;
     }
 
+    @Override
     public boolean isMarshallAware() {
         return false;
     }
 
+    @Override
     public String toString() {
         int size = 0;
         if (data != null) {
             size = data.length;
         }
+
         return "PartialCommand[id: " + commandId + " data: " + size + " byte(s)]";
-    }   
-    
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/25c99a6c/activemq-client/src/main/java/org/apache/activemq/command/WireFormatInfo.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/command/WireFormatInfo.java
b/activemq-client/src/main/java/org/apache/activemq/command/WireFormatInfo.java
index 8deaa76..9848e66 100755
--- a/activemq-client/src/main/java/org/apache/activemq/command/WireFormatInfo.java
+++ b/activemq-client/src/main/java/org/apache/activemq/command/WireFormatInfo.java
@@ -23,6 +23,7 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
+
 import org.apache.activemq.state.CommandVisitor;
 import org.apache.activemq.util.ByteArrayInputStream;
 import org.apache.activemq.util.ByteArrayOutputStream;
@@ -33,7 +34,7 @@ import org.fusesource.hawtbuf.UTF8Buffer;
 
 /**
  * @openwire:marshaller code="1"
- * 
+ *
  */
 public class WireFormatInfo implements Command, MarshallAware {
 
@@ -49,14 +50,17 @@ public class WireFormatInfo implements Command, MarshallAware {
     private transient Endpoint from;
     private transient Endpoint to;
 
+    @Override
     public byte getDataStructureType() {
         return DATA_STRUCTURE_TYPE;
     }
 
+    @Override
     public boolean isWireFormatInfo() {
         return true;
     }
 
+    @Override
     public boolean isMarshallAware() {
         return true;
     }
@@ -97,10 +101,12 @@ public class WireFormatInfo implements Command, MarshallAware {
     /**
      * The endpoint within the transport where this message came from.
      */
+    @Override
     public Endpoint getFrom() {
         return from;
     }
 
+    @Override
     public void setFrom(Endpoint from) {
         this.from = from;
     }
@@ -109,16 +115,18 @@ public class WireFormatInfo implements Command, MarshallAware {
      * The endpoint within the transport where this message is going to - null
      * means all endpoints.
      */
+    @Override
     public Endpoint getTo() {
         return to;
     }
 
+    @Override
     public void setTo(Endpoint to) {
         this.to = to;
     }
 
     // ////////////////////
-    // 
+    //
     // Implementation Methods.
     //
     // ////////////////////
@@ -169,6 +177,7 @@ public class WireFormatInfo implements Command, MarshallAware {
         return MarshallingSupport.unmarshalPrimitiveMap(new DataInputStream(new ByteArrayInputStream(marshalledProperties)),
MAX_PROPERTY_SIZE);
     }
 
+    @Override
     public void beforeMarshall(WireFormat wireFormat) throws IOException {
         // Need to marshal the properties.
         if (marshalledProperties == null && properties != null) {
@@ -180,12 +189,15 @@ public class WireFormatInfo implements Command, MarshallAware {
         }
     }
 
+    @Override
     public void afterMarshall(WireFormat wireFormat) throws IOException {
     }
 
+    @Override
     public void beforeUnmarshall(WireFormat wireFormat) throws IOException {
     }
 
+    @Override
     public void afterUnmarshall(WireFormat wireFormat) throws IOException {
     }
 
@@ -193,6 +205,7 @@ public class WireFormatInfo implements Command, MarshallAware {
         return magic != null && Arrays.equals(magic, MAGIC);
     }
 
+    @Override
     public void setResponseRequired(boolean responseRequired) {
     }
 
@@ -256,7 +269,7 @@ public class WireFormatInfo implements Command, MarshallAware {
         if( buff == null ) {
             return null;
         }
-        return (String) buff.toString();
+        return buff.toString();
     }
 
     public void setHost(String hostname) throws IOException {
@@ -274,7 +287,7 @@ public class WireFormatInfo implements Command, MarshallAware {
     public void setMaxInactivityDuration(long maxInactivityDuration) throws IOException {
         setProperty("MaxInactivityDuration", new Long(maxInactivityDuration));
     }
-    
+
     public long getMaxInactivityDurationInitalDelay() throws IOException {
         Long l = (Long)getProperty("MaxInactivityDurationInitalDelay");
         return l == null ? 0 : l.longValue();
@@ -292,8 +305,6 @@ public class WireFormatInfo implements Command, MarshallAware {
     public void setMaxFrameSize(long maxFrameSize) throws IOException {
         setProperty("MaxFrameSize", new Long(maxFrameSize));
     }
-    
-   
 
     /**
      * @throws IOException
@@ -307,6 +318,7 @@ public class WireFormatInfo implements Command, MarshallAware {
         setProperty("CacheSize", new Integer(cacheSize));
     }
 
+    @Override
     public Response visit(CommandVisitor visitor) throws Exception {
         return visitor.processWireFormat(this);
     }
@@ -340,54 +352,69 @@ public class WireFormatInfo implements Command, MarshallAware {
     //
     // /////////////////////////////////////////////////////////////
 
+    @Override
     public void setCommandId(int value) {
     }
 
+    @Override
     public int getCommandId() {
         return 0;
     }
 
+    @Override
     public boolean isResponseRequired() {
         return false;
     }
 
+    @Override
     public boolean isResponse() {
         return false;
     }
 
+    @Override
     public boolean isBrokerInfo() {
         return false;
     }
 
+    @Override
     public boolean isMessageDispatch() {
         return false;
     }
 
+    @Override
     public boolean isMessage() {
         return false;
     }
 
+    @Override
     public boolean isMessageAck() {
         return false;
     }
 
+    @Override
     public boolean isMessageDispatchNotification() {
         return false;
     }
 
+    @Override
     public boolean isShutdownInfo() {
         return false;
     }
-    
+
+    @Override
     public boolean isConnectionControl() {
         return false;
     }
 
+    @Override
+    public boolean isConsumerControl() {
+        return false;
+    }
+
     public void setCachedMarshalledForm(WireFormat wireFormat, ByteSequence data) {
     }
 
     public ByteSequence getCachedMarshalledForm(WireFormat wireFormat) {
         return null;
     }
-
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/25c99a6c/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompFrame.java
----------------------------------------------------------------------
diff --git a/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompFrame.java
b/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompFrame.java
index 0b2c000..d386550 100644
--- a/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompFrame.java
+++ b/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompFrame.java
@@ -94,85 +94,108 @@ public class StompFrame implements Command {
         this.headers = headers;
     }
 
-    //
-    // Methods in the Command interface
-    //
+    @Override
     public int getCommandId() {
         return 0;
     }
 
+    @Override
     public Endpoint getFrom() {
         return null;
     }
 
+    @Override
     public Endpoint getTo() {
         return null;
     }
 
+    @Override
     public boolean isBrokerInfo() {
         return false;
     }
 
+    @Override
     public boolean isMessage() {
         return false;
     }
 
+    @Override
     public boolean isMessageAck() {
         return false;
     }
 
+    @Override
     public boolean isMessageDispatch() {
         return false;
     }
 
+    @Override
     public boolean isMessageDispatchNotification() {
         return false;
     }
 
+    @Override
     public boolean isResponse() {
         return false;
     }
 
+    @Override
     public boolean isResponseRequired() {
         return false;
     }
 
+    @Override
     public boolean isShutdownInfo() {
         return false;
     }
 
+    @Override
     public boolean isConnectionControl() {
         return false;
     }
 
+    @Override
+    public boolean isConsumerControl() {
+        return false;
+    }
+
+    @Override
     public boolean isWireFormatInfo() {
         return false;
     }
 
+    @Override
     public void setCommandId(int value) {
     }
 
+    @Override
     public void setFrom(Endpoint from) {
     }
 
+    @Override
     public void setResponseRequired(boolean responseRequired) {
     }
 
+    @Override
     public void setTo(Endpoint to) {
     }
 
+    @Override
     public Response visit(CommandVisitor visitor) throws Exception {
         return null;
     }
 
+    @Override
     public byte getDataStructureType() {
         return 0;
     }
 
+    @Override
     public boolean isMarshallAware() {
         return false;
     }
 
+    @Override
     public String toString() {
         return format(true);
     }


Mime
View raw message