Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id B6B8F200B95 for ; Tue, 27 Sep 2016 15:54:37 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id B5150160AD2; Tue, 27 Sep 2016 13:54:37 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id AF182160AD3 for ; Tue, 27 Sep 2016 15:54:35 +0200 (CEST) Received: (qmail 81029 invoked by uid 500); 27 Sep 2016 13:54:29 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 80552 invoked by uid 99); 27 Sep 2016 13:54:29 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 27 Sep 2016 13:54:29 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 2A2CDEAB5D; Tue, 27 Sep 2016 13:54:29 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: martyntaylor@apache.org To: commits@activemq.apache.org Date: Tue, 27 Sep 2016 13:54:41 -0000 Message-Id: <17aad3e18a9b429ca5dda682603e0b26@git.apache.org> In-Reply-To: <78353452965d44cab418fa170a48e6aa@git.apache.org> References: <78353452965d44cab418fa170a48e6aa@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [14/15] activemq-artemis git commit: ARTEMIS-751 Simplification of the AMQP implementation archived-at: Tue, 27 Sep 2016 13:54:37 -0000 ARTEMIS-751 Simplification of the AMQP implementation Since we don't need client implementations any longer, given the maturity level of qpid jms, these classes can go, as a result a lot of the interfaces can be removed. As part of this I am removing proton-plug, and reorganizing the packages in a way I think it makes more sense and easier to other developers to understand and maintain it. https://issues.apache.org/jira/browse/ARTEMIS-751 Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/a838bf04 Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/a838bf04 Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/a838bf04 Branch: refs/heads/master Commit: a838bf047951e264d40ae1e59fbfe88aae854bf5 Parents: 4e34969 Author: Clebert Suconic Authored: Fri Sep 23 17:25:36 2016 -0400 Committer: Clebert Suconic Committed: Tue Sep 27 09:29:40 2016 -0400 ---------------------------------------------------------------------- .../apache/activemq/artemis/utils/ByteUtil.java | 57 + artemis-distribution/src/main/assembly/dep.xml | 1 - .../src/main/resources/features.xml | 1 - artemis-protocols/artemis-amqp-protocol/pom.xml | 5 - .../ActiveMQProtonRemotingConnection.java | 142 -- .../protocol/proton/ProtonProtocolManager.java | 176 -- .../proton/ProtonProtocolManagerFactory.java | 64 - .../proton/converter/ActiveMQJMSVendor.java | 151 -- .../converter/ProtonMessageConverter.java | 109 -- .../proton/converter/jms/ServerDestination.java | 37 - .../converter/jms/ServerJMSBytesMessage.java | 208 --- .../converter/jms/ServerJMSMapMessage.java | 291 --- .../proton/converter/jms/ServerJMSMessage.java | 381 ---- .../converter/jms/ServerJMSObjectMessage.java | 79 - .../converter/jms/ServerJMSStreamMessage.java | 364 ---- .../converter/jms/ServerJMSTextMessage.java | 99 - .../converter/message/AMQPMessageIdHelper.java | 257 --- .../converter/message/AMQPMessageTypes.java | 25 - .../message/AMQPNativeInboundTransformer.java | 46 - .../message/AMQPNativeOutboundTransformer.java | 60 - .../message/AMQPRawInboundTransformer.java | 60 - .../converter/message/EncodedMessage.java | 67 - .../converter/message/InboundTransformer.java | 318 ---- .../message/JMSMappingInboundTransformer.java | 128 -- .../message/JMSMappingOutboundTransformer.java | 365 ---- .../proton/converter/message/JMSVendor.java | 53 - .../converter/message/OutboundTransformer.java | 69 - .../plug/ActiveMQProtonConnectionCallback.java | 285 --- .../plug/ProtonSessionIntegrationCallback.java | 542 ------ .../protocol/proton/sasl/ActiveMQPlainSASL.java | 45 - .../amqp/broker/AMQPConnectionCallback.java | 260 +++ .../amqp/broker/AMQPSessionCallback.java | 515 ++++++ .../ActiveMQProtonRemotingConnection.java | 142 ++ .../amqp/broker/ProtonProtocolManager.java | 172 ++ .../broker/ProtonProtocolManagerFactory.java | 64 + .../protocol/amqp/broker/package-info.java | 23 + .../amqp/converter/ActiveMQJMSVendor.java | 151 ++ .../amqp/converter/ProtonMessageConverter.java | 109 ++ .../amqp/converter/jms/ServerDestination.java | 37 + .../converter/jms/ServerJMSBytesMessage.java | 208 +++ .../amqp/converter/jms/ServerJMSMapMessage.java | 291 +++ .../amqp/converter/jms/ServerJMSMessage.java | 381 ++++ .../converter/jms/ServerJMSObjectMessage.java | 79 + .../converter/jms/ServerJMSStreamMessage.java | 364 ++++ .../converter/jms/ServerJMSTextMessage.java | 99 + .../converter/message/AMQPMessageIdHelper.java | 257 +++ .../converter/message/AMQPMessageTypes.java | 25 + .../message/AMQPNativeInboundTransformer.java | 46 + .../message/AMQPNativeOutboundTransformer.java | 60 + .../message/AMQPRawInboundTransformer.java | 60 + .../amqp/converter/message/EncodedMessage.java | 67 + .../converter/message/InboundTransformer.java | 318 ++++ .../message/JMSMappingInboundTransformer.java | 128 ++ .../message/JMSMappingOutboundTransformer.java | 365 ++++ .../amqp/converter/message/JMSVendor.java | 53 + .../converter/message/OutboundTransformer.java | 69 + .../protocol/amqp/converter/package-info.java | 21 + .../amqp/exceptions/ActiveMQAMQPException.java | 42 + .../ActiveMQAMQPIllegalStateException.java | 27 + .../ActiveMQAMQPInternalErrorException.java | 31 + .../ActiveMQAMQPInvalidFieldException.java | 27 + .../ActiveMQAMQPNotFoundException.java | 27 + .../ActiveMQAMQPNotImplementedException.java | 27 + ...iveMQAMQPResourceLimitExceededException.java | 27 + .../ActiveMQAMQPTimeoutException.java | 28 + .../ActiveMQAMQPProtocolMessageBundle.java | 80 + .../amqp/proton/AMQPConnectionContext.java | 424 +++++ .../protocol/amqp/proton/AMQPConstants.java | 36 + .../amqp/proton/AMQPSessionContext.java | 221 +++ .../protocol/amqp/proton/AmqpSupport.java | 131 ++ .../amqp/proton/ProtonDeliveryHandler.java | 39 + .../amqp/proton/ProtonInitializable.java | 32 + .../proton/ProtonServerReceiverContext.java | 211 +++ .../amqp/proton/ProtonServerSenderContext.java | 513 ++++++ .../amqp/proton/ProtonTransactionHandler.java | 141 ++ .../amqp/proton/handler/EventHandler.java | 78 + .../protocol/amqp/proton/handler/Events.java | 102 ++ .../amqp/proton/handler/ExtCapability.java | 44 + .../amqp/proton/handler/ProtonHandler.java | 357 ++++ .../protocol/amqp/proton/package-info.java | 22 + .../protocol/amqp/sasl/AnonymousServerSASL.java | 34 + .../artemis/protocol/amqp/sasl/PlainSASL.java | 44 + .../protocol/amqp/sasl/PlainSASLResult.java | 44 + .../artemis/protocol/amqp/sasl/SASLResult.java | 24 + .../artemis/protocol/amqp/sasl/ServerSASL.java | 24 + .../protocol/amqp/sasl/ServerSASLPlain.java | 63 + .../artemis/protocol/amqp/util/CodecCache.java | 50 + .../protocol/amqp/util/CreditsSemaphore.java | 110 ++ .../protocol/amqp/util/DeliveryUtil.java | 44 + .../protocol/amqp/util/NettyWritable.java | 100 ++ .../protocol/amqp/util/ProtonServerMessage.java | 470 +++++ ...mis.spi.core.protocol.ProtocolManagerFactory | 2 +- .../core/protocol/proton/TestConversions.java | 793 -------- .../amqp/converter/TestConversions.java | 792 ++++++++ .../protocol/amqp/sasl/ClientSASLPlain.java | 54 + .../protocol/amqp/sasl/PlainSASLTest.java | 34 + .../amqp/util/CreditsSemaphoreTest.java | 134 ++ artemis-protocols/artemis-proton-plug/pom.xml | 137 -- .../plug/AMQPClientConnectionContext.java | 36 - .../proton/plug/AMQPClientReceiverContext.java | 34 - .../proton/plug/AMQPClientSenderContext.java | 27 - .../proton/plug/AMQPClientSessionContext.java | 30 - .../org/proton/plug/AMQPConnectionCallback.java | 58 - .../org/proton/plug/AMQPConnectionContext.java | 71 - .../plug/AMQPConnectionContextFactory.java | 39 - .../plug/AMQPServerConnectionContext.java | 21 - .../org/proton/plug/AMQPSessionCallback.java | 112 -- .../org/proton/plug/AMQPSessionContext.java | 34 - .../main/java/org/proton/plug/AmqpSupport.java | 131 -- .../main/java/org/proton/plug/ClientSASL.java | 24 - .../main/java/org/proton/plug/SASLResult.java | 24 - .../main/java/org/proton/plug/ServerSASL.java | 24 - .../org/proton/plug/context/AMQPConstants.java | 36 - .../plug/context/AbstractConnectionContext.java | 360 ---- .../context/AbstractProtonContextSender.java | 153 -- .../context/AbstractProtonReceiverContext.java | 88 - .../context/AbstractProtonSessionContext.java | 161 -- .../plug/context/ProtonDeliveryHandler.java | 39 - .../plug/context/ProtonInitializable.java | 67 - .../proton/plug/context/ProtonPlugSender.java | 26 - .../plug/context/ProtonTransactionHandler.java | 143 -- .../client/ProtonClientConnectionContext.java | 107 -- .../ProtonClientConnectionContextFactory.java | 50 - .../context/client/ProtonClientContext.java | 76 - .../client/ProtonClientReceiverContext.java | 92 - .../client/ProtonClientSessionContext.java | 145 -- .../server/ProtonServerConnectionContext.java | 94 - .../ProtonServerConnectionContextFactory.java | 53 - .../server/ProtonServerReceiverContext.java | 161 -- .../server/ProtonServerSenderContext.java | 452 ----- .../server/ProtonServerSessionContext.java | 117 -- .../plug/exceptions/ActiveMQAMQPException.java | 42 - .../ActiveMQAMQPIllegalStateException.java | 27 - .../ActiveMQAMQPInternalErrorException.java | 31 - .../ActiveMQAMQPInvalidFieldException.java | 27 - .../ActiveMQAMQPNotFoundException.java | 27 - .../ActiveMQAMQPNotImplementedException.java | 27 - ...iveMQAMQPResourceLimitExceededException.java | 27 - .../ActiveMQAMQPTimeoutException.java | 28 - .../org/proton/plug/handler/EventHandler.java | 78 - .../java/org/proton/plug/handler/Events.java | 105 -- .../org/proton/plug/handler/ExtCapability.java | 46 - .../org/proton/plug/handler/ProtonHandler.java | 133 -- .../plug/handler/impl/DefaultEventHandler.java | 144 -- .../plug/handler/impl/ProtonHandlerImpl.java | 389 ---- .../ActiveMQAMQPProtocolMessageBundle.java | 80 - .../proton/plug/sasl/AnonymousServerSASL.java | 37 - .../org/proton/plug/sasl/ClientSASLPlain.java | 59 - .../org/proton/plug/sasl/PlainSASLResult.java | 46 - .../org/proton/plug/sasl/ServerSASLPlain.java | 66 - .../java/org/proton/plug/util/ByteUtil.java | 130 -- .../java/org/proton/plug/util/CodecCache.java | 50 - .../org/proton/plug/util/CreditsSemaphore.java | 110 -- .../java/org/proton/plug/util/DeliveryUtil.java | 44 - .../org/proton/plug/util/FutureRunnable.java | 61 - .../org/proton/plug/util/NettyWritable.java | 100 -- .../proton/plug/util/ProtonServerMessage.java | 470 ----- .../org/proton/plug/util/ReusableLatch.java | 130 -- .../context/AbstractConnectionContextTest.java | 137 -- .../org/proton/plug/test/AbstractJMSTest.java | 93 - .../java/org/proton/plug/test/Constants.java | 22 - .../java/org/proton/plug/test/ProtonTest.java | 335 ---- .../plug/test/invm/InVMTestConnector.java | 40 - .../proton/plug/test/invm/ProtonINVMSPI.java | 240 --- .../plug/test/minimalclient/AMQPClientSPI.java | 140 -- .../plug/test/minimalclient/Connector.java | 26 - .../test/minimalclient/SimpleAMQPConnector.java | 76 - .../plug/test/minimalserver/DumbServer.java | 52 - .../minimalserver/MinimalConnectionSPI.java | 171 -- .../plug/test/minimalserver/MinimalServer.java | 167 -- .../test/minimalserver/MinimalSessionSPI.java | 229 --- .../SimpleServerThreadFactory.java | 82 - .../proton/plug/test/sasl/PlainSASLTest.java | 37 - .../plug/test/util/CreditsSemaphoreTest.java | 135 -- .../plug/test/util/ReusableLatchTest.java | 300 ---- .../test/util/SimpleServerAbstractTest.java | 67 - artemis-protocols/pom.xml | 1 - tests/integration-tests/pom.xml | 12 - .../amqp/ProtonMaxFrameSizeTest.java | 98 + .../integration/amqp/ProtonPubSubTest.java | 257 +++ .../tests/integration/amqp/ProtonTest.java | 1548 ++++++++++++++++ .../tests/integration/amqp/ProtonTestBase.java | 77 + .../integration/amqp/ProtonTestForHeader.java | 219 +++ .../amqp/SendingAndReceivingTest.java | 100 ++ .../amqp/jms/SendingAndReceivingTest.java | 100 -- .../proton/ProtonMaxFrameSizeTest.java | 98 - .../integration/proton/ProtonPubSubTest.java | 337 ---- .../tests/integration/proton/ProtonTest.java | 1700 ------------------ .../integration/proton/ProtonTestBase.java | 77 - .../integration/proton/ProtonTestForHeader.java | 219 --- 190 files changed, 11409 insertions(+), 15851 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ByteUtil.java ---------------------------------------------------------------------- diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ByteUtil.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ByteUtil.java index 7921072..c678941 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ByteUtil.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ByteUtil.java @@ -22,6 +22,7 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.UnpooledByteBufAllocator; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.SimpleString; +import org.jboss.logging.Logger; public class ByteUtil { @@ -29,6 +30,52 @@ public class ByteUtil { private static final char[] hexArray = "0123456789ABCDEF".toCharArray(); + public static void debugFrame(Logger logger, String message, ByteBuf byteIn) { + if (logger.isTraceEnabled()) { + int location = byteIn.readerIndex(); + // debugging + byte[] frame = new byte[byteIn.writerIndex()]; + byteIn.readBytes(frame); + + try { + logger.trace(message + "\n" + ByteUtil.formatGroup(ByteUtil.bytesToHex(frame), 8, 16)); + } + catch (Exception e) { + logger.warn(e.getMessage(), e); + } + + byteIn.readerIndex(location); + } + } + + + public static String formatGroup(String str, int groupSize, int lineBreak) { + StringBuffer buffer = new StringBuffer(); + + int line = 1; + buffer.append("/* 1 */ \""); + for (int i = 0; i < str.length(); i += groupSize) { + buffer.append(str.substring(i, i + Math.min(str.length() - i, groupSize))); + + if ((i + groupSize) % lineBreak == 0) { + buffer.append("\" +\n/* "); + line++; + if (line < 10) { + buffer.append(" "); + } + buffer.append(Integer.toString(line) + " */ \""); + } + else if ((i + groupSize) % groupSize == 0 && str.length() - i > groupSize) { + buffer.append("\" + \""); + } + } + + buffer.append("\";"); + + return buffer.toString(); + + } + public static String maxString(String value, int size) { if (value.length() < size) { return value; @@ -38,6 +85,16 @@ public class ByteUtil { } } + public static String bytesToHex(byte[] bytes) { + char[] hexChars = new char[bytes.length * 2]; + for (int j = 0; j < bytes.length; j++) { + int v = bytes[j] & 0xFF; + hexChars[j * 2] = hexArray[v >>> 4]; + hexChars[j * 2 + 1] = hexArray[v & 0x0F]; + } + return new String(hexChars); + } + public static String bytesToHex(byte[] bytes, int groupSize) { char[] hexChars = new char[bytes.length * 2 + numberOfGroups(bytes, groupSize)]; int outPos = 0; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-distribution/src/main/assembly/dep.xml ---------------------------------------------------------------------- diff --git a/artemis-distribution/src/main/assembly/dep.xml b/artemis-distribution/src/main/assembly/dep.xml index a62ef25..a50fe79 100644 --- a/artemis-distribution/src/main/assembly/dep.xml +++ b/artemis-distribution/src/main/assembly/dep.xml @@ -58,7 +58,6 @@ org.apache.activemq:artemis-native org.apache.activemq:artemis-amqp-protocol org.apache.activemq:artemis-openwire-protocol - org.apache.activemq:artemis-proton-plug org.apache.activemq:artemis-hornetq-protocol org.apache.activemq:artemis-hqclient-protocol org.apache.activemq:artemis-stomp-protocol http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-features/src/main/resources/features.xml ---------------------------------------------------------------------- diff --git a/artemis-features/src/main/resources/features.xml b/artemis-features/src/main/resources/features.xml index 2677626..7e477aa 100644 --- a/artemis-features/src/main/resources/features.xml +++ b/artemis-features/src/main/resources/features.xml @@ -63,7 +63,6 @@ artemis-core wrap:mvn:org.apache.qpid/proton-j/${proton.version} wrap:mvn:org.apache.qpid/qpid-jms-client/${qpid.jms.version} - mvn:org.apache.activemq/artemis-proton-plug/${pom.version} mvn:org.apache.activemq/artemis-amqp-protocol/${pom.version} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-amqp-protocol/pom.xml ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/pom.xml b/artemis-protocols/artemis-amqp-protocol/pom.xml index e623ef8..aac507e 100644 --- a/artemis-protocols/artemis-amqp-protocol/pom.xml +++ b/artemis-protocols/artemis-amqp-protocol/pom.xml @@ -75,11 +75,6 @@ ${project.version} - org.apache.activemq - artemis-proton-plug - ${project.version} - - org.apache.qpid proton-j http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/ActiveMQProtonRemotingConnection.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/ActiveMQProtonRemotingConnection.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/ActiveMQProtonRemotingConnection.java deleted file mode 100644 index 670ca5b..0000000 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/ActiveMQProtonRemotingConnection.java +++ /dev/null @@ -1,142 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.artemis.core.protocol.proton; - -import java.util.concurrent.Executor; - -import org.apache.activemq.artemis.api.core.ActiveMQBuffer; -import org.apache.activemq.artemis.api.core.ActiveMQException; -import org.apache.activemq.artemis.api.core.SimpleString; -import org.apache.activemq.artemis.core.client.ActiveMQClientLogger; -import org.apache.activemq.artemis.spi.core.protocol.AbstractRemotingConnection; -import org.apache.activemq.artemis.spi.core.remoting.Connection; -import org.proton.plug.AMQPConnectionContext; - -/** - * This is a Server's Connection representation used by ActiveMQ Artemis. - */ -public class ActiveMQProtonRemotingConnection extends AbstractRemotingConnection { - - private final AMQPConnectionContext amqpConnection; - - private boolean destroyed = false; - - private final ProtonProtocolManager manager; - - public ActiveMQProtonRemotingConnection(ProtonProtocolManager manager, - AMQPConnectionContext amqpConnection, - Connection transportConnection, - Executor executor) { - super(transportConnection, executor); - this.manager = manager; - this.amqpConnection = amqpConnection; - } - - public Executor getExecutor() { - return this.executor; - } - - public ProtonProtocolManager getManager() { - return manager; - } - - /* - * This can be called concurrently by more than one thread so needs to be locked - */ - @Override - public void fail(final ActiveMQException me, String scaleDownTargetNodeID) { - if (destroyed) { - return; - } - - destroyed = true; - - ActiveMQClientLogger.LOGGER.connectionFailureDetected(me.getMessage(), me.getType()); - - // Then call the listeners - callFailureListeners(me, scaleDownTargetNodeID); - - callClosingListeners(); - - internalClose(); - } - - @Override - public void destroy() { - synchronized (this) { - if (destroyed) { - return; - } - - destroyed = true; - } - - callClosingListeners(); - - internalClose(); - - } - - @Override - public boolean isClient() { - return false; - } - - @Override - public boolean isDestroyed() { - return destroyed; - } - - @Override - public void disconnect(boolean criticalError) { - getTransportConnection().close(); - } - - /** - * Disconnect the connection, closing all channels - */ - @Override - public void disconnect(String scaleDownNodeID, boolean criticalError) { - getTransportConnection().close(); - } - - @Override - public boolean checkDataReceived() { - return amqpConnection.checkDataReceived(); - } - - @Override - public void flush() { - amqpConnection.flush(); - } - - @Override - public void bufferReceived(Object connectionID, ActiveMQBuffer buffer) { - amqpConnection.inputBuffer(buffer.byteBuf()); - super.bufferReceived(connectionID, buffer); - } - - private void internalClose() { - // We close the underlying transport connection - getTransportConnection().close(); - } - - @Override - public void killMessage(SimpleString nodeID) { - //unsupported - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/ProtonProtocolManager.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/ProtonProtocolManager.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/ProtonProtocolManager.java deleted file mode 100644 index a2563a1..0000000 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/ProtonProtocolManager.java +++ /dev/null @@ -1,176 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.artemis.core.protocol.proton; - -import java.util.Arrays; -import java.util.List; -import java.util.concurrent.Executor; - -import io.netty.channel.ChannelPipeline; -import org.apache.activemq.artemis.api.core.ActiveMQBuffer; -import org.apache.activemq.artemis.api.core.BaseInterceptor; -import org.apache.activemq.artemis.api.core.Interceptor; -import org.apache.activemq.artemis.api.core.client.ActiveMQClient; -import org.apache.activemq.artemis.core.protocol.proton.converter.ProtonMessageConverter; -import org.apache.activemq.artemis.core.protocol.proton.plug.ActiveMQProtonConnectionCallback; -import org.apache.activemq.artemis.core.remoting.impl.netty.NettyServerConnection; -import org.apache.activemq.artemis.core.server.ActiveMQServer; -import org.apache.activemq.artemis.core.server.management.Notification; -import org.apache.activemq.artemis.core.server.management.NotificationListener; -import org.apache.activemq.artemis.jms.client.ActiveMQDestination; -import org.apache.activemq.artemis.spi.core.protocol.ConnectionEntry; -import org.apache.activemq.artemis.spi.core.protocol.MessageConverter; -import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager; -import org.apache.activemq.artemis.spi.core.protocol.ProtocolManagerFactory; -import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; -import org.apache.activemq.artemis.spi.core.remoting.Acceptor; -import org.apache.activemq.artemis.spi.core.remoting.Connection; -import org.proton.plug.AMQPServerConnectionContext; -import org.proton.plug.context.server.ProtonServerConnectionContextFactory; - -import static org.proton.plug.context.AMQPConstants.Connection.DEFAULT_CHANNEL_MAX; -import static org.proton.plug.context.AMQPConstants.Connection.DEFAULT_MAX_FRAME_SIZE; - -/** - * A proton protocol manager, basically reads the Proton Input and maps proton resources to ActiveMQ Artemis resources - */ -public class ProtonProtocolManager implements ProtocolManager, NotificationListener { - - private static final List websocketRegistryNames = Arrays.asList("amqp"); - - private final ActiveMQServer server; - - private MessageConverter protonConverter; - - private final ProtonProtocolManagerFactory factory; - - /* - * used when you want to treat senders as a subscription on an address rather than consuming from the actual queue for - * the address. This can be changed on the acceptor. - * */ - private String pubSubPrefix = ActiveMQDestination.JMS_TOPIC_ADDRESS_PREFIX; - - private int maxFrameSize = DEFAULT_MAX_FRAME_SIZE; - - public ProtonProtocolManager(ProtonProtocolManagerFactory factory, ActiveMQServer server) { - this.factory = factory; - this.server = server; - this.protonConverter = new ProtonMessageConverter(server.getStorageManager()); - } - - public ActiveMQServer getServer() { - return server; - } - - @Override - public MessageConverter getConverter() { - return protonConverter; - } - - @Override - public void onNotification(Notification notification) { - - } - - @Override - public ProtocolManagerFactory getFactory() { - return factory; - } - - @Override - public void updateInterceptors(List incomingInterceptors, - List outgoingInterceptors) { - // no op - } - - @Override - public boolean acceptsNoHandshake() { - return false; - } - - @Override - public ConnectionEntry createConnectionEntry(Acceptor acceptorUsed, Connection remotingConnection) { - ActiveMQProtonConnectionCallback connectionCallback = new ActiveMQProtonConnectionCallback(this, remotingConnection, server.getExecutorFactory().getExecutor(), server); - long ttl = ActiveMQClient.DEFAULT_CONNECTION_TTL; - - if (server.getConfiguration().getConnectionTTLOverride() != -1) { - ttl = server.getConfiguration().getConnectionTTLOverride(); - } - - String id = server.getConfiguration().getName(); - AMQPServerConnectionContext amqpConnection = ProtonServerConnectionContextFactory.getFactory(). - createConnection(connectionCallback, id, (int) ttl, getMaxFrameSize(), DEFAULT_CHANNEL_MAX, server.getExecutorFactory().getExecutor(), server.getScheduledPool()); - - Executor executor = server.getExecutorFactory().getExecutor(); - - ActiveMQProtonRemotingConnection delegate = new ActiveMQProtonRemotingConnection(this, amqpConnection, remotingConnection, executor); - - connectionCallback.setProtonConnectionDelegate(delegate); - - ConnectionEntry entry = new ConnectionEntry(delegate, executor, System.currentTimeMillis(), ttl); - - return entry; - } - - @Override - public void removeHandler(String name) { - - } - - @Override - public void handleBuffer(RemotingConnection connection, ActiveMQBuffer buffer) { - ActiveMQProtonRemotingConnection protonConnection = (ActiveMQProtonRemotingConnection) connection; - - protonConnection.bufferReceived(protonConnection.getID(), buffer); - } - - @Override - public void addChannelHandlers(ChannelPipeline pipeline) { - - } - - @Override - public boolean isProtocol(byte[] array) { - return array.length >= 4 && array[0] == (byte) 'A' && array[1] == (byte) 'M' && array[2] == (byte) 'Q' && array[3] == (byte) 'P'; - } - - @Override - public void handshake(NettyServerConnection connection, ActiveMQBuffer buffer) { - } - - @Override - public List websocketSubprotocolIdentifiers() { - return websocketRegistryNames; - } - - public String getPubSubPrefix() { - return pubSubPrefix; - } - - public void setPubSubPrefix(String pubSubPrefix) { - this.pubSubPrefix = pubSubPrefix; - } - - - public int getMaxFrameSize() { - return maxFrameSize; - } - - public void setMaxFrameSize(int maxFrameSize) { - this.maxFrameSize = maxFrameSize; - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/ProtonProtocolManagerFactory.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/ProtonProtocolManagerFactory.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/ProtonProtocolManagerFactory.java deleted file mode 100644 index e677563..0000000 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/ProtonProtocolManagerFactory.java +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.artemis.core.protocol.proton; - -import org.apache.activemq.artemis.api.core.BaseInterceptor; -import org.apache.activemq.artemis.api.core.Interceptor; -import org.apache.activemq.artemis.core.server.ActiveMQServer; -import org.apache.activemq.artemis.spi.core.protocol.AbstractProtocolManagerFactory; -import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager; -import org.apache.activemq.artemis.spi.core.protocol.ProtocolManagerFactory; -import org.apache.activemq.artemis.utils.uri.BeanSupport; -import org.osgi.service.component.annotations.Component; - -import java.util.Collections; -import java.util.List; -import java.util.Map; - -@Component(service = ProtocolManagerFactory.class) -public class ProtonProtocolManagerFactory extends AbstractProtocolManagerFactory { - - private static final String AMQP_PROTOCOL_NAME = "AMQP"; - - private static final String MODULE_NAME = "artemis-amqp-protocol"; - - private static String[] SUPPORTED_PROTOCOLS = {AMQP_PROTOCOL_NAME}; - - @Override - public ProtocolManager createProtocolManager(ActiveMQServer server, - final Map parameters, - List incomingInterceptors, - List outgoingInterceptors) throws Exception { - return BeanSupport.setData(new ProtonProtocolManager(this, server), parameters); - } - - @Override - public List filterInterceptors(List interceptors) { - // no interceptors on Proton - return Collections.emptyList(); - } - - @Override - public String[] getProtocols() { - return SUPPORTED_PROTOCOLS; - } - - @Override - public String getModuleName() { - return MODULE_NAME; - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/ActiveMQJMSVendor.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/ActiveMQJMSVendor.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/ActiveMQJMSVendor.java deleted file mode 100644 index 7ed95d4..0000000 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/ActiveMQJMSVendor.java +++ /dev/null @@ -1,151 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.artemis.core.protocol.proton.converter; - -import javax.jms.BytesMessage; -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.MapMessage; -import javax.jms.Message; -import javax.jms.ObjectMessage; -import javax.jms.StreamMessage; -import javax.jms.TextMessage; - -import org.apache.activemq.artemis.core.buffers.impl.ResetLimitWrappedActiveMQBuffer; -import org.apache.activemq.artemis.core.protocol.proton.converter.jms.ServerDestination; -import org.apache.activemq.artemis.core.protocol.proton.converter.jms.ServerJMSBytesMessage; -import org.apache.activemq.artemis.core.protocol.proton.converter.jms.ServerJMSMapMessage; -import org.apache.activemq.artemis.core.protocol.proton.converter.jms.ServerJMSMessage; -import org.apache.activemq.artemis.core.protocol.proton.converter.jms.ServerJMSObjectMessage; -import org.apache.activemq.artemis.core.protocol.proton.converter.jms.ServerJMSStreamMessage; -import org.apache.activemq.artemis.core.protocol.proton.converter.jms.ServerJMSTextMessage; -import org.apache.activemq.artemis.core.protocol.proton.converter.message.JMSVendor; -import org.apache.activemq.artemis.core.server.ServerMessage; -import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl; -import org.apache.activemq.artemis.jms.client.ActiveMQDestination; -import org.apache.activemq.artemis.utils.IDGenerator; - -public class ActiveMQJMSVendor implements JMSVendor { - - private final IDGenerator serverGenerator; - - ActiveMQJMSVendor(IDGenerator idGenerator) { - this.serverGenerator = idGenerator; - } - - @Override - public BytesMessage createBytesMessage() { - return new ServerJMSBytesMessage(newMessage(org.apache.activemq.artemis.api.core.Message.BYTES_TYPE), 0); - } - - @Override - public StreamMessage createStreamMessage() { - return new ServerJMSStreamMessage(newMessage(org.apache.activemq.artemis.api.core.Message.STREAM_TYPE), 0); - } - - @Override - public Message createMessage() { - return new ServerJMSMessage(newMessage(org.apache.activemq.artemis.api.core.Message.DEFAULT_TYPE), 0); - } - - @Override - public TextMessage createTextMessage() { - return new ServerJMSTextMessage(newMessage(org.apache.activemq.artemis.api.core.Message.TEXT_TYPE), 0); - } - - @Override - public ObjectMessage createObjectMessage() { - return new ServerJMSObjectMessage(newMessage(org.apache.activemq.artemis.api.core.Message.OBJECT_TYPE), 0); - } - - @Override - public MapMessage createMapMessage() { - return new ServerJMSMapMessage(newMessage(org.apache.activemq.artemis.api.core.Message.MAP_TYPE), 0); - } - - @Override - public void setJMSXUserID(Message message, String s) { - } - - @Override - public Destination createDestination(String name) { - return new ServerDestination(name); - } - - @Override - public void setJMSXGroupID(Message message, String s) { - try { - message.setStringProperty("_AMQ_GROUP_ID", s); - } - catch (JMSException e) { - throw new RuntimeException(e); - } - } - - @Override - public void setJMSXGroupSequence(Message message, int i) { - try { - message.setIntProperty("JMSXGroupSeq", i); - } - catch (JMSException e) { - throw new RuntimeException(e); - } - } - - @Override - public void setJMSXDeliveryCount(Message message, long l) { - try { - message.setLongProperty("JMSXDeliveryCount", l); - } - catch (JMSException e) { - throw new RuntimeException(e); - } - } - - public ServerJMSMessage wrapMessage(int messageType, ServerMessage wrapped, int deliveryCount) { - switch (messageType) { - case org.apache.activemq.artemis.api.core.Message.STREAM_TYPE: - return new ServerJMSStreamMessage(wrapped, deliveryCount); - case org.apache.activemq.artemis.api.core.Message.BYTES_TYPE: - return new ServerJMSBytesMessage(wrapped, deliveryCount); - case org.apache.activemq.artemis.api.core.Message.MAP_TYPE: - return new ServerJMSMapMessage(wrapped, deliveryCount); - case org.apache.activemq.artemis.api.core.Message.TEXT_TYPE: - return new ServerJMSTextMessage(wrapped, deliveryCount); - case org.apache.activemq.artemis.api.core.Message.OBJECT_TYPE: - return new ServerJMSObjectMessage(wrapped, deliveryCount); - default: - return new ServerJMSMessage(wrapped, deliveryCount); - } - } - - @Override - public String toAddress(Destination destination) { - if (destination instanceof ActiveMQDestination) { - return ((ActiveMQDestination) destination).getAddress(); - } - return null; - } - - private ServerMessageImpl newMessage(byte messageType) { - ServerMessageImpl message = new ServerMessageImpl(serverGenerator.generateID(), 512); - message.setType(messageType); - ((ResetLimitWrappedActiveMQBuffer) message.getBodyBuffer()).setMessage(null); - return message; - } - -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/ProtonMessageConverter.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/ProtonMessageConverter.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/ProtonMessageConverter.java deleted file mode 100644 index 6b4e99b..0000000 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/ProtonMessageConverter.java +++ /dev/null @@ -1,109 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.artemis.core.protocol.proton.converter; - -import org.apache.activemq.artemis.core.client.ActiveMQClientLogger; -import org.apache.activemq.artemis.core.protocol.proton.converter.message.AMQPNativeOutboundTransformer; -import org.apache.activemq.artemis.core.protocol.proton.converter.message.EncodedMessage; -import org.apache.activemq.artemis.core.protocol.proton.converter.message.InboundTransformer; -import org.apache.activemq.artemis.core.protocol.proton.converter.message.JMSMappingInboundTransformer; -import org.apache.activemq.artemis.core.protocol.proton.converter.jms.ServerJMSMessage; -import org.apache.activemq.artemis.core.protocol.proton.converter.message.JMSMappingOutboundTransformer; -import org.apache.activemq.artemis.core.server.ServerMessage; -import org.apache.activemq.artemis.spi.core.protocol.MessageConverter; -import org.apache.activemq.artemis.utils.IDGenerator; - -import javax.jms.BytesMessage; -import java.io.IOException; - -public class ProtonMessageConverter implements MessageConverter { - - ActiveMQJMSVendor activeMQJMSVendor; - - private final String prefixVendor; - - public ProtonMessageConverter(IDGenerator idGenerator) { - activeMQJMSVendor = new ActiveMQJMSVendor(idGenerator); - inboundTransformer = new JMSMappingInboundTransformer(activeMQJMSVendor); - outboundTransformer = new JMSMappingOutboundTransformer(activeMQJMSVendor); - prefixVendor = outboundTransformer.getPrefixVendor(); - } - - private final InboundTransformer inboundTransformer; - private final JMSMappingOutboundTransformer outboundTransformer; - - @Override - public ServerMessage inbound(Object messageSource) throws Exception { - ServerJMSMessage jmsMessage = inboundJMSType((EncodedMessage) messageSource); - - return (ServerMessage) jmsMessage.getInnerMessage(); - } - - /** - * Just create the JMS Part of the inbound (for testing) - * - * @param messageSource - * @return - * @throws Exception https://issues.jboss.org/browse/ENTMQ-1560 - */ - public ServerJMSMessage inboundJMSType(EncodedMessage messageSource) throws Exception { - EncodedMessage encodedMessageSource = messageSource; - ServerJMSMessage transformedMessage = null; - - InboundTransformer transformer = inboundTransformer; - - while (transformer != null) { - try { - transformedMessage = (ServerJMSMessage) transformer.transform(encodedMessageSource); - break; - } - catch (Exception e) { - ActiveMQClientLogger.LOGGER.debug("Transform of message using [{}] transformer, failed" + inboundTransformer.getTransformerName()); - ActiveMQClientLogger.LOGGER.trace("Transformation error:", e); - - transformer = transformer.getFallbackTransformer(); - } - } - - if (transformedMessage == null) { - throw new IOException("Failed to transform incoming delivery, skipping."); - } - - transformedMessage.encode(); - - return transformedMessage; - } - - @Override - public Object outbound(ServerMessage messageOutbound, int deliveryCount) throws Exception { - ServerJMSMessage jmsMessage = activeMQJMSVendor.wrapMessage(messageOutbound.getType(), messageOutbound, deliveryCount); - - jmsMessage.decode(); - - if (jmsMessage.getBooleanProperty(prefixVendor + "NATIVE")) { - if (jmsMessage instanceof BytesMessage) { - return AMQPNativeOutboundTransformer.transform(outboundTransformer, (BytesMessage) jmsMessage); - } - else { - return null; - } - } - else { - return outboundTransformer.convert(jmsMessage); - } - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/jms/ServerDestination.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/jms/ServerDestination.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/jms/ServerDestination.java deleted file mode 100644 index 0a8bb29..0000000 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/jms/ServerDestination.java +++ /dev/null @@ -1,37 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.artemis.core.protocol.proton.converter.jms; - -import org.apache.activemq.artemis.jms.client.ActiveMQDestination; - -import javax.jms.JMSException; -import javax.jms.Queue; - -/** - * This is just here to avoid all the client checks we need with valid JMS destinations, protocol convertors don't need to - * adhere to the jms. semantics. - */ -public class ServerDestination extends ActiveMQDestination implements Queue { - public ServerDestination(String name) { - super(name, name, false, false, null); - } - - @Override - public String getQueueName() throws JMSException { - return getName(); - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/jms/ServerJMSBytesMessage.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/jms/ServerJMSBytesMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/jms/ServerJMSBytesMessage.java deleted file mode 100644 index 990c7d7..0000000 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/jms/ServerJMSBytesMessage.java +++ /dev/null @@ -1,208 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.artemis.core.protocol.proton.converter.jms; - -import javax.jms.BytesMessage; -import javax.jms.JMSException; - -import org.apache.activemq.artemis.core.message.impl.MessageImpl; -import org.apache.activemq.artemis.core.message.impl.MessageInternal; - -import static org.apache.activemq.artemis.reader.BytesMessageUtil.bytesMessageReset; -import static org.apache.activemq.artemis.reader.BytesMessageUtil.bytesReadBoolean; -import static org.apache.activemq.artemis.reader.BytesMessageUtil.bytesReadByte; -import static org.apache.activemq.artemis.reader.BytesMessageUtil.bytesReadBytes; -import static org.apache.activemq.artemis.reader.BytesMessageUtil.bytesReadChar; -import static org.apache.activemq.artemis.reader.BytesMessageUtil.bytesReadDouble; -import static org.apache.activemq.artemis.reader.BytesMessageUtil.bytesReadFloat; -import static org.apache.activemq.artemis.reader.BytesMessageUtil.bytesReadInt; -import static org.apache.activemq.artemis.reader.BytesMessageUtil.bytesReadLong; -import static org.apache.activemq.artemis.reader.BytesMessageUtil.bytesReadShort; -import static org.apache.activemq.artemis.reader.BytesMessageUtil.bytesReadUTF; -import static org.apache.activemq.artemis.reader.BytesMessageUtil.bytesReadUnsignedByte; -import static org.apache.activemq.artemis.reader.BytesMessageUtil.bytesReadUnsignedShort; -import static org.apache.activemq.artemis.reader.BytesMessageUtil.bytesWriteBoolean; -import static org.apache.activemq.artemis.reader.BytesMessageUtil.bytesWriteByte; -import static org.apache.activemq.artemis.reader.BytesMessageUtil.bytesWriteBytes; -import static org.apache.activemq.artemis.reader.BytesMessageUtil.bytesWriteChar; -import static org.apache.activemq.artemis.reader.BytesMessageUtil.bytesWriteDouble; -import static org.apache.activemq.artemis.reader.BytesMessageUtil.bytesWriteFloat; -import static org.apache.activemq.artemis.reader.BytesMessageUtil.bytesWriteInt; -import static org.apache.activemq.artemis.reader.BytesMessageUtil.bytesWriteLong; -import static org.apache.activemq.artemis.reader.BytesMessageUtil.bytesWriteObject; -import static org.apache.activemq.artemis.reader.BytesMessageUtil.bytesWriteShort; -import static org.apache.activemq.artemis.reader.BytesMessageUtil.bytesWriteUTF; - -public class ServerJMSBytesMessage extends ServerJMSMessage implements BytesMessage { - - public ServerJMSBytesMessage(MessageInternal message, int deliveryCount) { - super(message, deliveryCount); - } - - @Override - public long getBodyLength() throws JMSException { - return message.getEndOfBodyPosition() - MessageImpl.BODY_OFFSET; - } - - @Override - public boolean readBoolean() throws JMSException { - return bytesReadBoolean(getReadBodyBuffer()); - } - - @Override - public byte readByte() throws JMSException { - return bytesReadByte(getReadBodyBuffer()); - } - - @Override - public int readUnsignedByte() throws JMSException { - return bytesReadUnsignedByte(getReadBodyBuffer()); - } - - @Override - public short readShort() throws JMSException { - return bytesReadShort(getReadBodyBuffer()); - } - - @Override - public int readUnsignedShort() throws JMSException { - return bytesReadUnsignedShort(getReadBodyBuffer()); - } - - @Override - public char readChar() throws JMSException { - return bytesReadChar(getReadBodyBuffer()); - } - - @Override - public int readInt() throws JMSException { - return bytesReadInt(getReadBodyBuffer()); - } - - @Override - public long readLong() throws JMSException { - return bytesReadLong(getReadBodyBuffer()); - } - - @Override - public float readFloat() throws JMSException { - return bytesReadFloat(getReadBodyBuffer()); - } - - @Override - public double readDouble() throws JMSException { - return bytesReadDouble(getReadBodyBuffer()); - } - - @Override - public String readUTF() throws JMSException { - return bytesReadUTF(getReadBodyBuffer()); - } - - @Override - public int readBytes(byte[] value) throws JMSException { - return bytesReadBytes(getReadBodyBuffer(), value); - } - - @Override - public int readBytes(byte[] value, int length) throws JMSException { - return bytesReadBytes(getReadBodyBuffer(), value, length); - } - - @Override - public void writeBoolean(boolean value) throws JMSException { - bytesWriteBoolean(getWriteBodyBuffer(), value); - - } - - @Override - public void writeByte(byte value) throws JMSException { - bytesWriteByte(getWriteBodyBuffer(), value); - } - - @Override - public void writeShort(short value) throws JMSException { - bytesWriteShort(getWriteBodyBuffer(), value); - } - - @Override - public void writeChar(char value) throws JMSException { - bytesWriteChar(getWriteBodyBuffer(), value); - } - - @Override - public void writeInt(int value) throws JMSException { - bytesWriteInt(getWriteBodyBuffer(), value); - } - - @Override - public void writeLong(long value) throws JMSException { - bytesWriteLong(getWriteBodyBuffer(), value); - } - - @Override - public void writeFloat(float value) throws JMSException { - bytesWriteFloat(getWriteBodyBuffer(), value); - } - - @Override - public void writeDouble(double value) throws JMSException { - bytesWriteDouble(getWriteBodyBuffer(), value); - } - - @Override - public void writeUTF(String value) throws JMSException { - bytesWriteUTF(getWriteBodyBuffer(), value); - } - - @Override - public void writeBytes(byte[] value) throws JMSException { - bytesWriteBytes(getWriteBodyBuffer(), value); - } - - @Override - public void writeBytes(byte[] value, int offset, int length) throws JMSException { - bytesWriteBytes(getWriteBodyBuffer(), value, offset, length); - } - - @Override - public void writeObject(Object value) throws JMSException { - if (!bytesWriteObject(getWriteBodyBuffer(), value)) { - throw new JMSException("Can't make conversion of " + value + " to any known type"); - } - } - - @Override - public void encode() throws Exception { - super.encode(); - // this is to make sure we encode the body-length before it's persisted - getBodyLength(); - } - - @Override - public void decode() throws Exception { - super.decode(); - - } - - @Override - public void reset() throws JMSException { - bytesMessageReset(getReadBodyBuffer()); - bytesMessageReset(getWriteBodyBuffer()); - } - -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/jms/ServerJMSMapMessage.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/jms/ServerJMSMapMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/jms/ServerJMSMapMessage.java deleted file mode 100644 index 2083b84..0000000 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/jms/ServerJMSMapMessage.java +++ /dev/null @@ -1,291 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.artemis.core.protocol.proton.converter.jms; - -import javax.jms.JMSException; -import javax.jms.MapMessage; -import javax.jms.MessageFormatException; -import java.util.Collections; -import java.util.Enumeration; -import java.util.HashSet; -import java.util.Set; - -import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException; -import org.apache.activemq.artemis.api.core.Message; -import org.apache.activemq.artemis.api.core.SimpleString; -import org.apache.activemq.artemis.core.message.impl.MessageInternal; -import org.apache.activemq.artemis.utils.TypedProperties; - -import static org.apache.activemq.artemis.reader.MapMessageUtil.readBodyMap; -import static org.apache.activemq.artemis.reader.MapMessageUtil.writeBodyMap; - -/** - * ActiveMQ Artemis implementation of a JMS MapMessage. - */ -public final class ServerJMSMapMessage extends ServerJMSMessage implements MapMessage { - // Constants ----------------------------------------------------- - - public static final byte TYPE = Message.MAP_TYPE; - - // Attributes ---------------------------------------------------- - - private final TypedProperties map = new TypedProperties(); - - // Static -------------------------------------------------------- - - // Constructors -------------------------------------------------- - - /* - * This constructor is used to construct messages prior to sending - */ - public ServerJMSMapMessage(MessageInternal message, int deliveryCount) { - super(message, deliveryCount); - - } - - // MapMessage implementation ------------------------------------- - - @Override - public void setBoolean(final String name, final boolean value) throws JMSException { - map.putBooleanProperty(new SimpleString(name), value); - } - - @Override - public void setByte(final String name, final byte value) throws JMSException { - map.putByteProperty(new SimpleString(name), value); - } - - @Override - public void setShort(final String name, final short value) throws JMSException { - map.putShortProperty(new SimpleString(name), value); - } - - @Override - public void setChar(final String name, final char value) throws JMSException { - map.putCharProperty(new SimpleString(name), value); - } - - @Override - public void setInt(final String name, final int value) throws JMSException { - map.putIntProperty(new SimpleString(name), value); - } - - @Override - public void setLong(final String name, final long value) throws JMSException { - map.putLongProperty(new SimpleString(name), value); - } - - @Override - public void setFloat(final String name, final float value) throws JMSException { - map.putFloatProperty(new SimpleString(name), value); - } - - @Override - public void setDouble(final String name, final double value) throws JMSException { - map.putDoubleProperty(new SimpleString(name), value); - } - - @Override - public void setString(final String name, final String value) throws JMSException { - map.putSimpleStringProperty(new SimpleString(name), value == null ? null : new SimpleString(value)); - } - - @Override - public void setBytes(final String name, final byte[] value) throws JMSException { - map.putBytesProperty(new SimpleString(name), value); - } - - @Override - public void setBytes(final String name, final byte[] value, final int offset, final int length) throws JMSException { - if (offset + length > value.length) { - throw new JMSException("Invalid offset/length"); - } - byte[] newBytes = new byte[length]; - System.arraycopy(value, offset, newBytes, 0, length); - map.putBytesProperty(new SimpleString(name), newBytes); - } - - @Override - public void setObject(final String name, final Object value) throws JMSException { - try { - TypedProperties.setObjectProperty(new SimpleString(name), value, map); - } - catch (ActiveMQPropertyConversionException e) { - throw new MessageFormatException(e.getMessage()); - } - } - - @Override - public boolean getBoolean(final String name) throws JMSException { - try { - return map.getBooleanProperty(new SimpleString(name)); - } - catch (ActiveMQPropertyConversionException e) { - throw new MessageFormatException(e.getMessage()); - } - } - - @Override - public byte getByte(final String name) throws JMSException { - try { - return map.getByteProperty(new SimpleString(name)); - } - catch (ActiveMQPropertyConversionException e) { - throw new MessageFormatException(e.getMessage()); - } - } - - @Override - public short getShort(final String name) throws JMSException { - try { - return map.getShortProperty(new SimpleString(name)); - } - catch (ActiveMQPropertyConversionException e) { - throw new MessageFormatException(e.getMessage()); - } - } - - @Override - public char getChar(final String name) throws JMSException { - try { - return map.getCharProperty(new SimpleString(name)); - } - catch (ActiveMQPropertyConversionException e) { - throw new MessageFormatException(e.getMessage()); - } - } - - @Override - public int getInt(final String name) throws JMSException { - try { - return map.getIntProperty(new SimpleString(name)); - } - catch (ActiveMQPropertyConversionException e) { - throw new MessageFormatException(e.getMessage()); - } - } - - @Override - public long getLong(final String name) throws JMSException { - try { - return map.getLongProperty(new SimpleString(name)); - } - catch (ActiveMQPropertyConversionException e) { - throw new MessageFormatException(e.getMessage()); - } - } - - @Override - public float getFloat(final String name) throws JMSException { - try { - return map.getFloatProperty(new SimpleString(name)); - } - catch (ActiveMQPropertyConversionException e) { - throw new MessageFormatException(e.getMessage()); - } - } - - @Override - public double getDouble(final String name) throws JMSException { - try { - return map.getDoubleProperty(new SimpleString(name)); - } - catch (ActiveMQPropertyConversionException e) { - throw new MessageFormatException(e.getMessage()); - } - } - - @Override - public String getString(final String name) throws JMSException { - try { - SimpleString str = map.getSimpleStringProperty(new SimpleString(name)); - if (str == null) { - return null; - } - else { - return str.toString(); - } - } - catch (ActiveMQPropertyConversionException e) { - throw new MessageFormatException(e.getMessage()); - } - } - - @Override - public byte[] getBytes(final String name) throws JMSException { - try { - return map.getBytesProperty(new SimpleString(name)); - } - catch (ActiveMQPropertyConversionException e) { - throw new MessageFormatException(e.getMessage()); - } - } - - @Override - public Object getObject(final String name) throws JMSException { - Object val = map.getProperty(new SimpleString(name)); - - if (val instanceof SimpleString) { - val = ((SimpleString) val).toString(); - } - - return val; - } - - @Override - public Enumeration getMapNames() throws JMSException { - Set simplePropNames = map.getPropertyNames(); - Set propNames = new HashSet<>(simplePropNames.size()); - - for (SimpleString str : simplePropNames) { - propNames.add(str.toString()); - } - - return Collections.enumeration(propNames); - } - - @Override - public boolean itemExists(final String name) throws JMSException { - return map.containsProperty(new SimpleString(name)); - } - - @Override - public void clearBody() throws JMSException { - super.clearBody(); - - map.clear(); - } - - @Override - public void encode() throws Exception { - super.encode(); - writeBodyMap(getWriteBodyBuffer(), map); - } - - @Override - public void decode() throws Exception { - super.decode(); - readBodyMap(getReadBodyBuffer(), map); - } - - // Package protected --------------------------------------------- - - // Protected ----------------------------------------------------- - - // Private ------------------------------------------------------- - -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/jms/ServerJMSMessage.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/jms/ServerJMSMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/jms/ServerJMSMessage.java deleted file mode 100644 index 465539b..0000000 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/jms/ServerJMSMessage.java +++ /dev/null @@ -1,381 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.artemis.core.protocol.proton.converter.jms; - -import javax.jms.DeliveryMode; -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.Message; -import java.util.Collections; -import java.util.Enumeration; - -import org.apache.activemq.artemis.api.core.ActiveMQBuffer; -import org.apache.activemq.artemis.api.core.ActiveMQException; -import org.apache.activemq.artemis.api.core.SimpleString; -import org.apache.activemq.artemis.core.message.impl.MessageInternal; -import org.apache.activemq.artemis.jms.client.ActiveMQDestination; -import org.apache.activemq.artemis.reader.MessageUtil; - -public class ServerJMSMessage implements Message { - - public static final String NATIVE_MESSAGE_ID = "NATIVE_MESSAGE_ID"; - - protected final MessageInternal message; - - protected int deliveryCount; - - public MessageInternal getInnerMessage() { - return message; - } - - public ServerJMSMessage(MessageInternal message, int deliveryCount) { - this.message = message; - this.deliveryCount = deliveryCount; - } - - private ActiveMQBuffer readBodyBuffer; - - /** When reading we use a protected copy so multi-threads can work fine */ - protected ActiveMQBuffer getReadBodyBuffer() { - if (readBodyBuffer == null) { - // to avoid clashes between multiple threads - readBodyBuffer = message.getBodyBufferDuplicate(); - } - return readBodyBuffer; - } - - /** When writing on the conversion we use the buffer directly */ - protected ActiveMQBuffer getWriteBodyBuffer() { - readBodyBuffer = null; // it invalidates this buffer if anything is written - return message.getBodyBuffer(); - } - - - @Override - public final String getJMSMessageID() throws JMSException { - if (message.containsProperty(NATIVE_MESSAGE_ID)) { - return getStringProperty(NATIVE_MESSAGE_ID); - } - return null; - } - - @Override - public final void setJMSMessageID(String id) throws JMSException { - if (id != null) { - message.putStringProperty(NATIVE_MESSAGE_ID, id); - } - } - - @Override - public final long getJMSTimestamp() throws JMSException { - return message.getTimestamp(); - } - - @Override - public final void setJMSTimestamp(long timestamp) throws JMSException { - message.setTimestamp(timestamp); - } - - @Override - public final byte[] getJMSCorrelationIDAsBytes() throws JMSException { - return MessageUtil.getJMSCorrelationIDAsBytes(message); - } - - @Override - public final void setJMSCorrelationIDAsBytes(byte[] correlationID) throws JMSException { - try { - MessageUtil.setJMSCorrelationIDAsBytes(message, correlationID); - } - catch (ActiveMQException e) { - throw new JMSException(e.getMessage()); - } - } - - @Override - public final void setJMSCorrelationID(String correlationID) throws JMSException { - MessageUtil.setJMSCorrelationID(message, correlationID); - } - - @Override - public final String getJMSCorrelationID() throws JMSException { - return MessageUtil.getJMSCorrelationID(message); - } - - @Override - public final Destination getJMSReplyTo() throws JMSException { - SimpleString reply = MessageUtil.getJMSReplyTo(message); - if (reply != null) { - return new ServerDestination(reply.toString()); - } - else { - return null; - } - } - - @Override - public final void setJMSReplyTo(Destination replyTo) throws JMSException { - MessageUtil.setJMSReplyTo(message, replyTo == null ? null : ((ActiveMQDestination) replyTo).getSimpleAddress()); - - } - - @Override - public final Destination getJMSDestination() throws JMSException { - SimpleString sdest = message.getAddress(); - - if (sdest == null) { - return null; - } - else { - return new ServerDestination(sdest.toString()); - } - } - - @Override - public final void setJMSDestination(Destination destination) throws JMSException { - if (destination == null) { - message.setAddress(null); - } - else { - message.setAddress(((ActiveMQDestination) destination).getSimpleAddress()); - } - - } - - @Override - public final int getJMSDeliveryMode() throws JMSException { - return message.isDurable() ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT; - } - - @Override - public final void setJMSDeliveryMode(int deliveryMode) throws JMSException { - if (deliveryMode == DeliveryMode.PERSISTENT) { - message.setDurable(true); - } - else if (deliveryMode == DeliveryMode.NON_PERSISTENT) { - message.setDurable(false); - } - else { - throw new JMSException("Invalid mode " + deliveryMode); - } - } - - @Override - public final boolean getJMSRedelivered() throws JMSException { - return false; - } - - @Override - public final void setJMSRedelivered(boolean redelivered) throws JMSException { - // no op - } - - @Override - public final String getJMSType() throws JMSException { - return MessageUtil.getJMSType(message); - } - - @Override - public final void setJMSType(String type) throws JMSException { - MessageUtil.setJMSType(message, type); - } - - @Override - public final long getJMSExpiration() throws JMSException { - return message.getExpiration(); - } - - @Override - public final void setJMSExpiration(long expiration) throws JMSException { - message.setExpiration(expiration); - } - - @Override - public final long getJMSDeliveryTime() throws JMSException { - // no op - return 0; - } - - @Override - public final void setJMSDeliveryTime(long deliveryTime) throws JMSException { - // no op - } - - @Override - public final int getJMSPriority() throws JMSException { - return message.getPriority(); - } - - @Override - public final void setJMSPriority(int priority) throws JMSException { - message.setPriority((byte) priority); - } - - @Override - public final void clearProperties() throws JMSException { - MessageUtil.clearProperties(message); - - } - - @Override - public final boolean propertyExists(String name) throws JMSException { - return MessageUtil.propertyExists(message, name); - } - - @Override - public final boolean getBooleanProperty(String name) throws JMSException { - return message.getBooleanProperty(name); - } - - @Override - public final byte getByteProperty(String name) throws JMSException { - return message.getByteProperty(name); - } - - @Override - public final short getShortProperty(String name) throws JMSException { - return message.getShortProperty(name); - } - - @Override - public final int getIntProperty(String name) throws JMSException { - if (MessageUtil.JMSXDELIVERYCOUNT.equals(name)) { - return deliveryCount; - } - - return message.getIntProperty(name); - } - - @Override - public final long getLongProperty(String name) throws JMSException { - if (MessageUtil.JMSXDELIVERYCOUNT.equals(name)) { - return deliveryCount; - } - - return message.getLongProperty(name); - } - - @Override - public final float getFloatProperty(String name) throws JMSException { - return message.getFloatProperty(name); - } - - @Override - public final double getDoubleProperty(String name) throws JMSException { - return message.getDoubleProperty(name); - } - - @Override - public final String getStringProperty(String name) throws JMSException { - if (MessageUtil.JMSXDELIVERYCOUNT.equals(name)) { - return String.valueOf(deliveryCount); - } - - return message.getStringProperty(name); - } - - @Override - public final Object getObjectProperty(String name) throws JMSException { - Object val = message.getObjectProperty(name); - if (val instanceof SimpleString) { - val = ((SimpleString) val).toString(); - } - return val; - } - - @Override - public final Enumeration getPropertyNames() throws JMSException { - return Collections.enumeration(MessageUtil.getPropertyNames(message)); - } - - @Override - public final void setBooleanProperty(String name, boolean value) throws JMSException { - message.putBooleanProperty(name, value); - } - - @Override - public final void setByteProperty(String name, byte value) throws JMSException { - message.putByteProperty(name, value); - } - - @Override - public final void setShortProperty(String name, short value) throws JMSException { - message.putShortProperty(name, value); - } - - @Override - public final void setIntProperty(String name, int value) throws JMSException { - message.putIntProperty(name, value); - } - - @Override - public final void setLongProperty(String name, long value) throws JMSException { - message.putLongProperty(name, value); - } - - @Override - public final void setFloatProperty(String name, float value) throws JMSException { - message.putFloatProperty(name, value); - } - - @Override - public final void setDoubleProperty(String name, double value) throws JMSException { - message.putDoubleProperty(name, value); - } - - @Override - public final void setStringProperty(String name, String value) throws JMSException { - message.putStringProperty(name, value); - } - - @Override - public final void setObjectProperty(String name, Object value) throws JMSException { - message.putObjectProperty(name, value); - } - - @Override - public final void acknowledge() throws JMSException { - // no op - } - - @Override - public void clearBody() throws JMSException { - message.getBodyBuffer().clear(); - } - - @Override - public final T getBody(Class c) throws JMSException { - // no op.. jms2 not used on the conversion - return null; - } - - /** - * Encode the body into the internal message - */ - public void encode() throws Exception { - message.getBodyBuffer().resetReaderIndex(); - } - - public void decode() throws Exception { - message.getBodyBuffer().resetReaderIndex(); - } - - @Override - public final boolean isBodyAssignableTo(Class c) throws JMSException { - // no op.. jms2 not used on the conversion - return false; - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/jms/ServerJMSObjectMessage.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/jms/ServerJMSObjectMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/jms/ServerJMSObjectMessage.java deleted file mode 100644 index fb42993..0000000 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/jms/ServerJMSObjectMessage.java +++ /dev/null @@ -1,79 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.artemis.core.protocol.proton.converter.jms; - -import org.apache.activemq.artemis.api.core.Message; -import org.apache.activemq.artemis.core.message.impl.MessageInternal; -import org.apache.activemq.artemis.utils.ObjectInputStreamWithClassLoader; - -import javax.jms.JMSException; -import javax.jms.ObjectMessage; -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.ObjectOutputStream; -import java.io.Serializable; - - -public class ServerJMSObjectMessage extends ServerJMSMessage implements ObjectMessage { - private static final String DEFAULT_WHITELIST; - private static final String DEFAULT_BLACKLIST; - - static { - DEFAULT_WHITELIST = System.getProperty(ObjectInputStreamWithClassLoader.WHITELIST_PROPERTY, - "java.lang,java.math,javax.security,java.util,org.apache.activemq,org.apache.qpid.proton.amqp"); - - DEFAULT_BLACKLIST = System.getProperty(ObjectInputStreamWithClassLoader.BLACKLIST_PROPERTY, null); - } - public static final byte TYPE = Message.STREAM_TYPE; - - private Serializable object; - - public ServerJMSObjectMessage(MessageInternal message, int deliveryCount) { - super(message, deliveryCount); - } - - @Override - public void setObject(Serializable object) throws JMSException { - this.object = object; - } - - @Override - public Serializable getObject() throws JMSException { - return object; - } - - @Override - public void encode() throws Exception { - super.encode(); - ByteArrayOutputStream out = new ByteArrayOutputStream(); - ObjectOutputStream ous = new ObjectOutputStream(out); - ous.writeObject(object); - getInnerMessage().getBodyBuffer().writeBytes(out.toByteArray()); - } - - @Override - public void decode() throws Exception { - super.decode(); - int size = getInnerMessage().getBodyBuffer().readableBytes(); - byte[] bytes = new byte[size]; - getInnerMessage().getBodyBuffer().readBytes(bytes); - ObjectInputStreamWithClassLoader ois = new ObjectInputStreamWithClassLoader(new ByteArrayInputStream(bytes)); - ois.setWhiteList(DEFAULT_WHITELIST); - ois.setBlackList(DEFAULT_BLACKLIST); - object = (Serializable) ois.readObject(); - } -}