Return-Path: X-Original-To: apmail-activemq-commits-archive@www.apache.org Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 1B32018CAE for ; Sat, 19 Mar 2016 05:08:13 +0000 (UTC) Received: (qmail 39701 invoked by uid 500); 19 Mar 2016 05:08:10 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 39586 invoked by uid 500); 19 Mar 2016 05:08:10 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 37000 invoked by uid 99); 19 Mar 2016 05:08:08 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 19 Mar 2016 05:08:08 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id C3C5DDFB7D; Sat, 19 Mar 2016 05:08:08 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: clebertsuconic@apache.org To: commits@activemq.apache.org Date: Sat, 19 Mar 2016 05:08:59 -0000 Message-Id: <1188ac9fbb2443ba93d439c55fdcf7bd@git.apache.org> In-Reply-To: <097bf1c283774e2ea2098fcd9461446c@git.apache.org> References: <097bf1c283774e2ea2098fcd9461446c@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [53/67] [abbrv] activemq-artemis git commit: using converter interface using converter interface Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/411d7d4e Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/411d7d4e Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/411d7d4e Branch: refs/heads/refactor-openwire Commit: 411d7d4eccd476c3de026d99cd3f2c27d61a2cdf Parents: 48bac9f Author: Clebert Suconic Authored: Thu Feb 25 18:57:21 2016 -0500 Committer: Clebert Suconic Committed: Sat Mar 19 01:07:37 2016 -0400 ---------------------------------------------------------------------- .../protocol/openwire/OpenWireConnection.java | 7 ------- .../openwire/OpenWireMessageConverter.java | 22 +++++++++++++------- .../openwire/OpenWireProtocolManager.java | 9 +++++++- .../core/protocol/openwire/amq/AMQSession.java | 13 ++++++++++-- 4 files changed, 33 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/411d7d4e/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java index 6839259..0fd8dc2 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java @@ -146,8 +146,6 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se private volatile AMQSession advisorySession; - private String defaultSocketURIString; - // TODO-NOW: check on why there are two connections created for every createConnection on the client. public OpenWireConnection(Connection connection, Executor executor, @@ -156,7 +154,6 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se super(connection, executor); this.protocolManager = openWireProtocolManager; this.wireFormat = wf; - this.defaultSocketURIString = connection.getLocalAddress(); } // SecurityAuth implementation @@ -635,10 +632,6 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se return this.context; } - public String getDefaultSocketURIString() { - return defaultSocketURIString; - } - public void updateClient(ConnectionControl control) { // if (!destroyed && context.isFaultTolerant()) { if (protocolManager.isUpdateClusterClients()) { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/411d7d4e/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java index d040955..6176490 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java @@ -96,10 +96,11 @@ public class OpenWireMessageConverter implements MessageConverter { private static final String AMQ_MSG_DROPPABLE = AMQ_PREFIX + "DROPPABLE"; private static final String AMQ_MSG_COMPRESSED = AMQ_PREFIX + "COMPRESSED"; - @Override - public ServerMessage inbound(Object message) { - // TODO: implement this - return null; + + private final WireFormat marshaller; + + public OpenWireMessageConverter(WireFormat marshaller) { + this.marshaller = marshaller; } @Override @@ -108,10 +109,13 @@ public class OpenWireMessageConverter implements MessageConverter { return null; } - //convert an ActiveMQ Artemis message to coreMessage - public static void toCoreMessage(ServerMessageImpl coreMessage, - Message messageSend, - WireFormat marshaller) throws IOException { + + @Override + public ServerMessage inbound(Object message) throws Exception { + + Message messageSend = (Message)message; + ServerMessageImpl coreMessage = new ServerMessageImpl(-1, messageSend.getSize()); + String type = messageSend.getType(); if (type != null) { coreMessage.putStringProperty(new SimpleString("JMSType"), new SimpleString(type)); @@ -398,6 +402,8 @@ public class OpenWireMessageConverter implements MessageConverter { origDestBytes.compact(); coreMessage.putBytesProperty(AMQ_MSG_ORIG_DESTINATION, origDestBytes.data); } + + return coreMessage; } private static void loadMapIntoProperties(TypedProperties props, Map map) { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/411d7d4e/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java index 514a2b9..51c4bec 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java @@ -115,6 +115,8 @@ public class OpenWireProtocolManager implements ProtocolManager, Cl private boolean updateClusterClients = false; private boolean updateClusterClientsOnRemove = false; + private final OpenWireMessageConverter messageConverter; + public OpenWireProtocolManager(OpenWireProtocolManagerFactory factory, ActiveMQServer server) { this.factory = factory; this.server = server; @@ -123,6 +125,7 @@ public class OpenWireProtocolManager implements ProtocolManager, Cl wireFactory.setCacheEnabled(false); advisoryProducerId.setConnectionId(ID_GENERATOR.generateId()); scheduledPool = server.getScheduledPool(); + this.messageConverter = new OpenWireMessageConverter(wireFactory.createWireFormat()); final ClusterManager clusterManager = this.server.getClusterManager(); @@ -134,6 +137,10 @@ public class OpenWireProtocolManager implements ProtocolManager, Cl } } + public OpenWireFormat getNewWireFormat() { + return (OpenWireFormat)wireFactory.createWireFormat(); + } + @Override public void nodeUP(TopologyMember member, boolean last) { if (topologyMap.put(member.getNodeId(), member) == null) { @@ -217,7 +224,7 @@ public class OpenWireProtocolManager implements ProtocolManager, Cl @Override public MessageConverter getConverter() { - return new OpenWireMessageConverter(); + return messageConverter; } @Override http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/411d7d4e/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java index d16d4c8..4db5967 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java @@ -61,6 +61,7 @@ import org.apache.activemq.artemis.core.server.ServerMessage; import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl; import org.apache.activemq.artemis.core.transaction.impl.XidImpl; import org.apache.activemq.artemis.spi.core.protocol.SessionCallback; +import org.apache.activemq.openwire.OpenWireFormat; import org.apache.activemq.wireformat.WireFormat; public class AMQSession implements SessionCallback { @@ -82,6 +83,11 @@ public class AMQSession implements SessionCallback { private OpenWireProtocolManager manager; + // The sessionWireformat used by the session + // this object is meant to be used per thread / session + // so we make a new one per AMQSession + private final OpenWireMessageConverter converter; + public AMQSession(ConnectionInfo connInfo, SessionInfo sessInfo, ActiveMQServer server, @@ -95,6 +101,9 @@ public class AMQSession implements SessionCallback { this.connection = connection; this.scheduledPool = scheduledPool; this.manager = manager; + OpenWireFormat marshaller = (OpenWireFormat)connection.getMarshaller(); + + this.converter = new OpenWireMessageConverter(marshaller.copy()); } public void initialize() { @@ -254,7 +263,8 @@ public class AMQSession implements SessionCallback { } for (ActiveMQDestination dest : actualDestinations) { - ServerMessageImpl coreMsg = new ServerMessageImpl(-1, 1024); + + ServerMessageImpl coreMsg = (ServerMessageImpl)converter.inbound(messageSend); /* ActiveMQ failover transport will attempt to reconnect after connection failure. Any sent messages that did * not receive acks will be resent. (ActiveMQ broker handles this by returning a last sequence id received to @@ -263,7 +273,6 @@ public class AMQSession implements SessionCallback { if (producerExchange.getConnectionContext().isFaultTolerant() && !messageSend.getProperties().containsKey(org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID)) { coreMsg.putStringProperty(org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID.toString(), messageSend.getMessageId().toString()); } - OpenWireMessageConverter.toCoreMessage(coreMsg, messageSend, connection.getMarshaller()); SimpleString address = OpenWireUtil.toCoreAddress(dest); coreMsg.setAddress(address);