Return-Path: X-Original-To: apmail-activemq-commits-archive@www.apache.org Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 8452117C88 for ; Tue, 7 Apr 2015 14:35:37 +0000 (UTC) Received: (qmail 26918 invoked by uid 500); 7 Apr 2015 14:35:28 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 26874 invoked by uid 500); 7 Apr 2015 14:35:28 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 26865 invoked by uid 99); 7 Apr 2015 14:35:27 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 07 Apr 2015 14:35:27 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id CC5A0E10CB; Tue, 7 Apr 2015 14:35:27 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: tabish@apache.org To: commits@activemq.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: activemq git commit: https://issues.apache.org/jira/browse/AMQ-5707 Date: Tue, 7 Apr 2015 14:35:27 +0000 (UTC) Repository: activemq Updated Branches: refs/heads/master 61da1faa4 -> 25c99a6c3 https://issues.apache.org/jira/browse/AMQ-5707 Add support for aborting a slow AMQP consumer and some testing. Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/25c99a6c Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/25c99a6c Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/25c99a6c Branch: refs/heads/master Commit: 25c99a6c367fe3c80cfee24a8118f6b782b4f69e Parents: 61da1fa Author: Timothy Bish Authored: Tue Apr 7 10:25:19 2015 -0400 Committer: Timothy Bish Committed: Tue Apr 7 10:32:49 2015 -0400 ---------------------------------------------------------------------- .../transport/amqp/protocol/AmqpConnection.java | 7 + .../transport/amqp/protocol/AmqpSender.java | 17 ++ .../transport/amqp/AmqpTestSupport.java | 6 + .../amqp/interop/AmqpReceiverTest.java | 1 - .../amqp/interop/AmqpSlowReceiverTest.java | 166 +++++++++++++++++++ .../apache/activemq/command/BaseCommand.java | 50 ++++-- .../org/apache/activemq/command/Command.java | 6 +- .../activemq/command/ConsumerControl.java | 29 +++- .../apache/activemq/command/PartialCommand.java | 38 ++++- .../apache/activemq/command/WireFormatInfo.java | 43 ++++- .../activemq/transport/stomp/StompFrame.java | 29 +++- 11 files changed, 349 insertions(+), 43 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/25c99a6c/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpConnection.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpConnection.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpConnection.java index 0edc62f..c977c8f 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpConnection.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpConnection.java @@ -46,6 +46,7 @@ import org.apache.activemq.command.Command; import org.apache.activemq.command.ConnectionError; import org.apache.activemq.command.ConnectionId; import org.apache.activemq.command.ConnectionInfo; +import org.apache.activemq.command.ConsumerControl; import org.apache.activemq.command.ConsumerId; import org.apache.activemq.command.DestinationInfo; import org.apache.activemq.command.ExceptionResponse; @@ -538,6 +539,12 @@ public class AmqpConnection implements AmqpProtocolConverter { // Pass down any unexpected async errors. Should this close the connection? Throwable exception = ((ConnectionError) command).getException(); handleException(exception); + } else if (command.isConsumerControl()) { + ConsumerControl control = (ConsumerControl) command; + AmqpSender sender = subscriptionsByConsumerId.get(control.getConsumerId()); + if (sender != null) { + sender.onConsumerControl(control); + } } else if (command.isBrokerInfo()) { // ignore } else { http://git-wip-us.apache.org/repos/asf/activemq/blob/25c99a6c/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java index 0096334..f273cee 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java @@ -46,7 +46,9 @@ import org.apache.qpid.proton.amqp.messaging.Outcome; import org.apache.qpid.proton.amqp.messaging.Rejected; import org.apache.qpid.proton.amqp.messaging.Released; import org.apache.qpid.proton.amqp.transaction.TransactionalState; +import org.apache.qpid.proton.amqp.transport.AmqpError; import org.apache.qpid.proton.amqp.transport.DeliveryState; +import org.apache.qpid.proton.amqp.transport.ErrorCondition; import org.apache.qpid.proton.amqp.transport.SenderSettleMode; import org.apache.qpid.proton.engine.Delivery; import org.apache.qpid.proton.engine.Sender; @@ -309,6 +311,21 @@ public class AmqpSender extends AmqpAbstractLink { } } + /** + * Called when the Broker sends a ConsumerControl command to the Consumer that + * this sender creates to obtain messages to dispatch via the sender for this + * end of the open link. + * + * @param control + * The ConsumerControl command to process. + */ + public void onConsumerControl(ConsumerControl control) { + if (control.isClose()) { + close(new ErrorCondition(AmqpError.INTERNAL_ERROR, "Receiver forcably closed")); + session.pumpProtonToSocket(); + } + } + @Override public String toString() { return "AmqpSender {" + getConsumerId() + "}"; http://git-wip-us.apache.org/repos/asf/activemq/blob/25c99a6c/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java index 4f1c861..4d042b1 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java @@ -107,6 +107,8 @@ public class AmqpTestSupport { brokerService.setUseJmx(true); brokerService.getManagementContext().setCreateConnector(false); + performAdditionalConfiguration(brokerService); + SSLContext ctx = SSLContext.getInstance("TLS"); ctx.init(new KeyManager[0], new TrustManager[]{new DefaultTrustManager()}, new SecureRandom()); SSLContext.setDefault(ctx); @@ -132,6 +134,10 @@ public class AmqpTestSupport { addTranportConnectors(); } + protected void performAdditionalConfiguration(BrokerService brokerService) throws Exception { + + } + protected void addTranportConnectors() throws Exception { TransportConnector connector = null; http://git-wip-us.apache.org/repos/asf/activemq/blob/25c99a6c/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverTest.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverTest.java index b1ca527..30571f7 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverTest.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverTest.java @@ -339,7 +339,6 @@ public class AmqpReceiverTest extends AmqpClientTestSupport { connection.close(); } - //@Ignore("Test fails currently due to improper implementation of drain.") @Test(timeout = 60000) public void testReceiverCanDrainMessages() throws Exception { int MSG_COUNT = 20; http://git-wip-us.apache.org/repos/asf/activemq/blob/25c99a6c/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSlowReceiverTest.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSlowReceiverTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSlowReceiverTest.java new file mode 100644 index 0000000..87f8741 --- /dev/null +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSlowReceiverTest.java @@ -0,0 +1,166 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp.interop; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.lang.reflect.UndeclaredThrowableException; +import java.util.concurrent.TimeUnit; + +import javax.management.InstanceNotFoundException; +import javax.management.ObjectName; +import javax.management.openmbean.CompositeData; +import javax.management.openmbean.TabularData; + +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.jmx.AbortSlowConsumerStrategyViewMBean; +import org.apache.activemq.broker.jmx.QueueViewMBean; +import org.apache.activemq.broker.region.policy.AbortSlowConsumerStrategy; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.transport.amqp.client.AmqpClient; +import org.apache.activemq.transport.amqp.client.AmqpClientTestSupport; +import org.apache.activemq.transport.amqp.client.AmqpConnection; +import org.apache.activemq.transport.amqp.client.AmqpMessage; +import org.apache.activemq.transport.amqp.client.AmqpReceiver; +import org.apache.activemq.transport.amqp.client.AmqpSession; +import org.apache.activemq.util.Wait; +import org.junit.Test; + +/** + * Test the handling of consumer abort when the AbortSlowConsumerStrategy is used. + */ +public class AmqpSlowReceiverTest extends AmqpClientTestSupport { + + private final long DEFAULT_CHECK_PERIOD = 1000; + private final long DEFAULT_MAX_SLOW_DURATION = 3000; + + private AbortSlowConsumerStrategy strategy; + + @Test(timeout = 60 * 1000) + public void testSlowConsumerIsAborted() throws Exception { + + AmqpClient client = createAmqpClient(); + AmqpConnection connection = client.connect(); + AmqpSession session = connection.createSession(); + final AmqpReceiver receiver = session.createReceiver("queue://" + getTestName()); + receiver.flow(100); + + assertEquals(1, brokerService.getAdminView().getQueueSubscribers().length); + + sendMessages(getTestName(), 100, false); + + AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS); + assertNotNull(message); + message.accept(); + + assertTrue("Receiver should be closed", Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + return receiver.isClosed(); + } + })); + + assertEquals(0, brokerService.getAdminView().getQueueSubscribers().length); + } + + @Test + public void testSlowConsumerIsAbortedViaJmx() throws Exception { + strategy.setMaxSlowDuration(60*1000); // so jmx does the abort + + AmqpClient client = createAmqpClient(); + AmqpConnection connection = client.connect(); + AmqpSession session = connection.createSession(); + final AmqpReceiver receiver = session.createReceiver("queue://" + getTestName()); + receiver.flow(100); + + sendMessages(getTestName(), 100, false); + + AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS); + assertNotNull(message); + message.accept(); + + QueueViewMBean queue = getProxyToQueue(getTestName()); + ObjectName slowConsumerPolicyMBeanName = queue.getSlowConsumerStrategy(); + assertNotNull(slowConsumerPolicyMBeanName); + + AbortSlowConsumerStrategyViewMBean abortPolicy = (AbortSlowConsumerStrategyViewMBean) + brokerService.getManagementContext().newProxyInstance(slowConsumerPolicyMBeanName, AbortSlowConsumerStrategyViewMBean.class, true); + + TimeUnit.SECONDS.sleep(3); + + TabularData slowOnes = abortPolicy.getSlowConsumers(); + assertEquals("one slow consumers", 1, slowOnes.size()); + + LOG.info("slow ones:" + slowOnes); + + CompositeData slowOne = (CompositeData) slowOnes.values().iterator().next(); + LOG.info("Slow one: " + slowOne); + + assertTrue("we have an object name", slowOne.get("subscription") instanceof ObjectName); + abortPolicy.abortConsumer((ObjectName)slowOne.get("subscription")); + + assertTrue("Receiver should be closed", Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + return receiver.isClosed(); + } + })); + + slowOnes = abortPolicy.getSlowConsumers(); + assertEquals("no slow consumers left", 0, slowOnes.size()); + + // verify mbean gone with destination + brokerService.getAdminView().removeQueue(getTestName()); + + try { + abortPolicy.getSlowConsumers(); + fail("expect not found post destination removal"); + } catch(UndeclaredThrowableException expected) { + assertTrue("correct exception: " + expected.getCause(), + expected.getCause() instanceof InstanceNotFoundException); + } + } + + @Override + protected boolean isUseOpenWireConnector() { + return true; + } + + @Override + protected void performAdditionalConfiguration(BrokerService brokerService) throws Exception { + strategy = new AbortSlowConsumerStrategy(); + strategy.setAbortConnection(false); + strategy.setCheckPeriod(DEFAULT_CHECK_PERIOD); + strategy.setMaxSlowDuration(DEFAULT_MAX_SLOW_DURATION); + + PolicyEntry policy = new PolicyEntry(); + policy.setSlowConsumerStrategy(strategy); + policy.setQueuePrefetch(10); + policy.setTopicPrefetch(10); + + PolicyMap pMap = new PolicyMap(); + pMap.setDefaultEntry(policy); + + brokerService.setDestinationPolicy(pMap); + } +} http://git-wip-us.apache.org/repos/asf/activemq/blob/25c99a6c/activemq-client/src/main/java/org/apache/activemq/command/BaseCommand.java ---------------------------------------------------------------------- diff --git a/activemq-client/src/main/java/org/apache/activemq/command/BaseCommand.java b/activemq-client/src/main/java/org/apache/activemq/command/BaseCommand.java index 66243fa..68e9ba3 100755 --- a/activemq-client/src/main/java/org/apache/activemq/command/BaseCommand.java +++ b/activemq-client/src/main/java/org/apache/activemq/command/BaseCommand.java @@ -17,34 +17,36 @@ package org.apache.activemq.command; import java.util.Map; -import org.apache.activemq.util.IntrospectionSupport; +import org.apache.activemq.util.IntrospectionSupport; /** - * + * * @openwire:marshaller - * + * */ public abstract class BaseCommand implements Command { protected int commandId; protected boolean responseRequired; - + private transient Endpoint from; private transient Endpoint to; - + public void copy(BaseCommand copy) { copy.commandId = commandId; copy.responseRequired = responseRequired; - } + } /** * @openwire:property version=1 */ + @Override public int getCommandId() { return commandId; } + @Override public void setCommandId(int commandId) { this.commandId = commandId; } @@ -52,10 +54,12 @@ public abstract class BaseCommand implements Command { /** * @openwire:property version=1 */ + @Override public boolean isResponseRequired() { return responseRequired; } + @Override public void setResponseRequired(boolean responseRequired) { this.responseRequired = responseRequired; } @@ -64,72 +68,90 @@ public abstract class BaseCommand implements Command { public String toString() { return toString(null); } - - public String toString(MapoverrideFields) { - return IntrospectionSupport.toString(this, BaseCommand.class, overrideFields); + + public String toString(Map overrideFields) { + return IntrospectionSupport.toString(this, BaseCommand.class, overrideFields); } - + + @Override public boolean isWireFormatInfo() { return false; } + @Override public boolean isBrokerInfo() { return false; } + @Override public boolean isResponse() { return false; } + @Override public boolean isMessageDispatch() { return false; } + @Override public boolean isMessage() { return false; } + @Override public boolean isMarshallAware() { return false; } + @Override public boolean isMessageAck() { return false; } + @Override public boolean isMessageDispatchNotification() { return false; } + @Override public boolean isShutdownInfo() { return false; } - + + @Override public boolean isConnectionControl() { return false; } + @Override + public boolean isConsumerControl() { + return false; + } + /** * The endpoint within the transport where this message came from. */ + @Override public Endpoint getFrom() { return from; } + @Override public void setFrom(Endpoint from) { this.from = from; } /** - * The endpoint within the transport where this message is going to - null means all endpoints. + * The endpoint within the transport where this message is going to - null + * means all endpoints. */ + @Override public Endpoint getTo() { return to; } + @Override public void setTo(Endpoint to) { this.to = to; } - - } http://git-wip-us.apache.org/repos/asf/activemq/blob/25c99a6c/activemq-client/src/main/java/org/apache/activemq/command/Command.java ---------------------------------------------------------------------- diff --git a/activemq-client/src/main/java/org/apache/activemq/command/Command.java b/activemq-client/src/main/java/org/apache/activemq/command/Command.java index 125f4ac..585e0f7 100755 --- a/activemq-client/src/main/java/org/apache/activemq/command/Command.java +++ b/activemq-client/src/main/java/org/apache/activemq/command/Command.java @@ -21,8 +21,6 @@ import org.apache.activemq.state.CommandVisitor; /** * The Command Pattern so that we can send and receive commands on the different * transports - * - * */ public interface Command extends DataStructure { @@ -52,9 +50,11 @@ public interface Command extends DataStructure { boolean isMessageDispatchNotification(); boolean isShutdownInfo(); - + boolean isConnectionControl(); + boolean isConsumerControl(); + Response visit(CommandVisitor visitor) throws Exception; /** http://git-wip-us.apache.org/repos/asf/activemq/blob/25c99a6c/activemq-client/src/main/java/org/apache/activemq/command/ConsumerControl.java ---------------------------------------------------------------------- diff --git a/activemq-client/src/main/java/org/apache/activemq/command/ConsumerControl.java b/activemq-client/src/main/java/org/apache/activemq/command/ConsumerControl.java index 6e7bc17..9fcbc4c 100644 --- a/activemq-client/src/main/java/org/apache/activemq/command/ConsumerControl.java +++ b/activemq-client/src/main/java/org/apache/activemq/command/ConsumerControl.java @@ -20,9 +20,9 @@ import org.apache.activemq.state.CommandVisitor; /** * Used to start and stop transports as well as terminating clients. - * + * * @openwire:marshaller code="17" - * + * */ public class ConsumerControl extends BaseCommand { @@ -48,14 +48,21 @@ public class ConsumerControl extends BaseCommand { this.destination = destination; } + @Override public byte getDataStructureType() { return DATA_STRUCTURE_TYPE; } + @Override public Response visit(CommandVisitor visitor) throws Exception { return visitor.processConsumerControl(this); } + @Override + public boolean isConsumerControl() { + return true; + } + /** * @openwire:property version=1 * @return Returns the close. @@ -65,7 +72,8 @@ public class ConsumerControl extends BaseCommand { } /** - * @param close The close to set. + * @param close + * The new value to assign the close state flag. */ public void setClose(boolean close) { this.close = close; @@ -80,7 +88,8 @@ public class ConsumerControl extends BaseCommand { } /** - * @param consumerId The consumerId to set. + * @param consumerId + * The consumerId to set. */ public void setConsumerId(ConsumerId consumerId) { this.consumerId = consumerId; @@ -95,7 +104,8 @@ public class ConsumerControl extends BaseCommand { } /** - * @param prefetch The prefetch to set. + * @param prefetch + * The prefetch to set. */ public void setPrefetch(int prefetch) { this.prefetch = prefetch; @@ -110,7 +120,8 @@ public class ConsumerControl extends BaseCommand { } /** - * @param flush the flush to set + * @param flush + * The flush value to set on this command. */ public void setFlush(boolean flush) { this.flush = flush; @@ -125,7 +136,8 @@ public class ConsumerControl extends BaseCommand { } /** - * @param start the start to set + * @param start + * The start value to set on this command. */ public void setStart(boolean start) { this.start = start; @@ -140,7 +152,8 @@ public class ConsumerControl extends BaseCommand { } /** - * @param stop the stop to set + * @param stop + * the stop value to set on this Command. */ public void setStop(boolean stop) { this.stop = stop; http://git-wip-us.apache.org/repos/asf/activemq/blob/25c99a6c/activemq-client/src/main/java/org/apache/activemq/command/PartialCommand.java ---------------------------------------------------------------------- diff --git a/activemq-client/src/main/java/org/apache/activemq/command/PartialCommand.java b/activemq-client/src/main/java/org/apache/activemq/command/PartialCommand.java index 4069197..9a75a4d 100644 --- a/activemq-client/src/main/java/org/apache/activemq/command/PartialCommand.java +++ b/activemq-client/src/main/java/org/apache/activemq/command/PartialCommand.java @@ -21,9 +21,9 @@ import org.apache.activemq.state.CommandVisitor; /** * Represents a partial command; a large command that has been split up into * pieces. - * + * * @openwire:marshaller code="60" - * + * */ public class PartialCommand implements Command { @@ -38,6 +38,7 @@ public class PartialCommand implements Command { public PartialCommand() { } + @Override public byte getDataStructureType() { return DATA_STRUCTURE_TYPE; } @@ -45,17 +46,19 @@ public class PartialCommand implements Command { /** * @openwire:property version=1 */ + @Override public int getCommandId() { return commandId; } + @Override public void setCommandId(int commandId) { this.commandId = commandId; } /** * The data for this part of the command - * + * * @openwire:property version=1 mandatory=true */ public byte[] getData() { @@ -66,79 +69,102 @@ public class PartialCommand implements Command { this.data = data; } + @Override public Endpoint getFrom() { return from; } + @Override public void setFrom(Endpoint from) { this.from = from; } + @Override public Endpoint getTo() { return to; } + @Override public void setTo(Endpoint to) { this.to = to; } + @Override public Response visit(CommandVisitor visitor) throws Exception { throw new IllegalStateException("The transport layer should filter out PartialCommand instances but received: " + this); } + @Override public boolean isResponseRequired() { return false; } + @Override public boolean isResponse() { return false; } + @Override public boolean isBrokerInfo() { return false; } + @Override public boolean isMessageDispatch() { return false; } + @Override public boolean isMessage() { return false; } + @Override public boolean isMessageAck() { return false; } + @Override public boolean isMessageDispatchNotification() { return false; } + @Override public boolean isShutdownInfo() { return false; } - + + @Override public boolean isConnectionControl() { return false; } + @Override + public boolean isConsumerControl() { + return false; + } + + @Override public void setResponseRequired(boolean responseRequired) { } + @Override public boolean isWireFormatInfo() { return false; } + @Override public boolean isMarshallAware() { return false; } + @Override public String toString() { int size = 0; if (data != null) { size = data.length; } + return "PartialCommand[id: " + commandId + " data: " + size + " byte(s)]"; - } - + } } http://git-wip-us.apache.org/repos/asf/activemq/blob/25c99a6c/activemq-client/src/main/java/org/apache/activemq/command/WireFormatInfo.java ---------------------------------------------------------------------- diff --git a/activemq-client/src/main/java/org/apache/activemq/command/WireFormatInfo.java b/activemq-client/src/main/java/org/apache/activemq/command/WireFormatInfo.java index 8deaa76..9848e66 100755 --- a/activemq-client/src/main/java/org/apache/activemq/command/WireFormatInfo.java +++ b/activemq-client/src/main/java/org/apache/activemq/command/WireFormatInfo.java @@ -23,6 +23,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.Map; + import org.apache.activemq.state.CommandVisitor; import org.apache.activemq.util.ByteArrayInputStream; import org.apache.activemq.util.ByteArrayOutputStream; @@ -33,7 +34,7 @@ import org.fusesource.hawtbuf.UTF8Buffer; /** * @openwire:marshaller code="1" - * + * */ public class WireFormatInfo implements Command, MarshallAware { @@ -49,14 +50,17 @@ public class WireFormatInfo implements Command, MarshallAware { private transient Endpoint from; private transient Endpoint to; + @Override public byte getDataStructureType() { return DATA_STRUCTURE_TYPE; } + @Override public boolean isWireFormatInfo() { return true; } + @Override public boolean isMarshallAware() { return true; } @@ -97,10 +101,12 @@ public class WireFormatInfo implements Command, MarshallAware { /** * The endpoint within the transport where this message came from. */ + @Override public Endpoint getFrom() { return from; } + @Override public void setFrom(Endpoint from) { this.from = from; } @@ -109,16 +115,18 @@ public class WireFormatInfo implements Command, MarshallAware { * The endpoint within the transport where this message is going to - null * means all endpoints. */ + @Override public Endpoint getTo() { return to; } + @Override public void setTo(Endpoint to) { this.to = to; } // //////////////////// - // + // // Implementation Methods. // // //////////////////// @@ -169,6 +177,7 @@ public class WireFormatInfo implements Command, MarshallAware { return MarshallingSupport.unmarshalPrimitiveMap(new DataInputStream(new ByteArrayInputStream(marshalledProperties)), MAX_PROPERTY_SIZE); } + @Override public void beforeMarshall(WireFormat wireFormat) throws IOException { // Need to marshal the properties. if (marshalledProperties == null && properties != null) { @@ -180,12 +189,15 @@ public class WireFormatInfo implements Command, MarshallAware { } } + @Override public void afterMarshall(WireFormat wireFormat) throws IOException { } + @Override public void beforeUnmarshall(WireFormat wireFormat) throws IOException { } + @Override public void afterUnmarshall(WireFormat wireFormat) throws IOException { } @@ -193,6 +205,7 @@ public class WireFormatInfo implements Command, MarshallAware { return magic != null && Arrays.equals(magic, MAGIC); } + @Override public void setResponseRequired(boolean responseRequired) { } @@ -256,7 +269,7 @@ public class WireFormatInfo implements Command, MarshallAware { if( buff == null ) { return null; } - return (String) buff.toString(); + return buff.toString(); } public void setHost(String hostname) throws IOException { @@ -274,7 +287,7 @@ public class WireFormatInfo implements Command, MarshallAware { public void setMaxInactivityDuration(long maxInactivityDuration) throws IOException { setProperty("MaxInactivityDuration", new Long(maxInactivityDuration)); } - + public long getMaxInactivityDurationInitalDelay() throws IOException { Long l = (Long)getProperty("MaxInactivityDurationInitalDelay"); return l == null ? 0 : l.longValue(); @@ -292,8 +305,6 @@ public class WireFormatInfo implements Command, MarshallAware { public void setMaxFrameSize(long maxFrameSize) throws IOException { setProperty("MaxFrameSize", new Long(maxFrameSize)); } - - /** * @throws IOException @@ -307,6 +318,7 @@ public class WireFormatInfo implements Command, MarshallAware { setProperty("CacheSize", new Integer(cacheSize)); } + @Override public Response visit(CommandVisitor visitor) throws Exception { return visitor.processWireFormat(this); } @@ -340,54 +352,69 @@ public class WireFormatInfo implements Command, MarshallAware { // // ///////////////////////////////////////////////////////////// + @Override public void setCommandId(int value) { } + @Override public int getCommandId() { return 0; } + @Override public boolean isResponseRequired() { return false; } + @Override public boolean isResponse() { return false; } + @Override public boolean isBrokerInfo() { return false; } + @Override public boolean isMessageDispatch() { return false; } + @Override public boolean isMessage() { return false; } + @Override public boolean isMessageAck() { return false; } + @Override public boolean isMessageDispatchNotification() { return false; } + @Override public boolean isShutdownInfo() { return false; } - + + @Override public boolean isConnectionControl() { return false; } + @Override + public boolean isConsumerControl() { + return false; + } + public void setCachedMarshalledForm(WireFormat wireFormat, ByteSequence data) { } public ByteSequence getCachedMarshalledForm(WireFormat wireFormat) { return null; } - } http://git-wip-us.apache.org/repos/asf/activemq/blob/25c99a6c/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompFrame.java ---------------------------------------------------------------------- diff --git a/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompFrame.java b/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompFrame.java index 0b2c000..d386550 100644 --- a/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompFrame.java +++ b/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompFrame.java @@ -94,85 +94,108 @@ public class StompFrame implements Command { this.headers = headers; } - // - // Methods in the Command interface - // + @Override public int getCommandId() { return 0; } + @Override public Endpoint getFrom() { return null; } + @Override public Endpoint getTo() { return null; } + @Override public boolean isBrokerInfo() { return false; } + @Override public boolean isMessage() { return false; } + @Override public boolean isMessageAck() { return false; } + @Override public boolean isMessageDispatch() { return false; } + @Override public boolean isMessageDispatchNotification() { return false; } + @Override public boolean isResponse() { return false; } + @Override public boolean isResponseRequired() { return false; } + @Override public boolean isShutdownInfo() { return false; } + @Override public boolean isConnectionControl() { return false; } + @Override + public boolean isConsumerControl() { + return false; + } + + @Override public boolean isWireFormatInfo() { return false; } + @Override public void setCommandId(int value) { } + @Override public void setFrom(Endpoint from) { } + @Override public void setResponseRequired(boolean responseRequired) { } + @Override public void setTo(Endpoint to) { } + @Override public Response visit(CommandVisitor visitor) throws Exception { return null; } + @Override public byte getDataStructureType() { return 0; } + @Override public boolean isMarshallAware() { return false; } + @Override public String toString() { return format(true); }