activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r1486951 - /activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
Date Tue, 28 May 2013 15:04:25 GMT
Author: tabish
Date: Tue May 28 15:04:25 2013
New Revision: 1486951

URL: http://svn.apache.org/r1486951
Log:
fix for: https://issues.apache.org/jira/browse/AMQ-4560

ensure dispatched Messages are set to Read Only mode before passing onto the transformer.


Modified:
    activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java

Modified: activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java?rev=1486951&r1=1486950&r2=1486951&view=diff
==============================================================================
--- activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
(original)
+++ activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
Tue May 28 15:04:25 2013
@@ -16,41 +16,91 @@
  */
 package org.apache.activemq.transport.amqp;
 
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.ReentrantLock;
+
+import javax.jms.InvalidSelectorException;
+
 import org.apache.activemq.broker.BrokerContext;
-import org.apache.activemq.command.*;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.ActiveMQTempQueue;
+import org.apache.activemq.command.Command;
 import org.apache.activemq.command.ConnectionError;
+import org.apache.activemq.command.ConnectionId;
+import org.apache.activemq.command.ConnectionInfo;
+import org.apache.activemq.command.ConsumerId;
+import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.DestinationInfo;
+import org.apache.activemq.command.ExceptionResponse;
+import org.apache.activemq.command.LocalTransactionId;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.MessageDispatch;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.command.ProducerId;
+import org.apache.activemq.command.ProducerInfo;
+import org.apache.activemq.command.RemoveInfo;
+import org.apache.activemq.command.RemoveSubscriptionInfo;
+import org.apache.activemq.command.Response;
+import org.apache.activemq.command.SessionId;
+import org.apache.activemq.command.SessionInfo;
+import org.apache.activemq.command.TransactionInfo;
 import org.apache.activemq.selector.SelectorParser;
 import org.apache.activemq.util.IOExceptionSupport;
 import org.apache.activemq.util.IdGenerator;
 import org.apache.activemq.util.LongSequenceGenerator;
-import org.apache.qpid.proton.amqp.*;
-import org.apache.qpid.proton.amqp.messaging.*;
+import org.apache.qpid.proton.amqp.Binary;
+import org.apache.qpid.proton.amqp.DescribedType;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.UnsignedInteger;
+import org.apache.qpid.proton.amqp.messaging.Accepted;
+import org.apache.qpid.proton.amqp.messaging.AmqpValue;
 import org.apache.qpid.proton.amqp.messaging.Modified;
 import org.apache.qpid.proton.amqp.messaging.Rejected;
 import org.apache.qpid.proton.amqp.messaging.Released;
 import org.apache.qpid.proton.amqp.messaging.Target;
-import org.apache.qpid.proton.amqp.transaction.*;
-import org.apache.qpid.proton.amqp.transport.*;
-import org.apache.qpid.proton.engine.*;
+import org.apache.qpid.proton.amqp.transaction.Coordinator;
+import org.apache.qpid.proton.amqp.transaction.Declare;
+import org.apache.qpid.proton.amqp.transaction.Declared;
+import org.apache.qpid.proton.amqp.transaction.Discharge;
+import org.apache.qpid.proton.amqp.transaction.TransactionalState;
+import org.apache.qpid.proton.amqp.transport.DeliveryState;
+import org.apache.qpid.proton.amqp.transport.ErrorCondition;
+import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
+import org.apache.qpid.proton.engine.Delivery;
+import org.apache.qpid.proton.engine.EndpointError;
+import org.apache.qpid.proton.engine.EndpointState;
+import org.apache.qpid.proton.engine.Link;
+import org.apache.qpid.proton.engine.Receiver;
+import org.apache.qpid.proton.engine.Sasl;
+import org.apache.qpid.proton.engine.Sender;
+import org.apache.qpid.proton.engine.Session;
 import org.apache.qpid.proton.engine.impl.ConnectionImpl;
 import org.apache.qpid.proton.engine.impl.LinkImpl;
 import org.apache.qpid.proton.engine.impl.ProtocolTracer;
 import org.apache.qpid.proton.engine.impl.TransportImpl;
 import org.apache.qpid.proton.framing.TransportFrame;
-import org.apache.qpid.proton.jms.*;
+import org.apache.qpid.proton.jms.AMQPNativeInboundTransformer;
+import org.apache.qpid.proton.jms.AMQPRawInboundTransformer;
+import org.apache.qpid.proton.jms.AutoOutboundTransformer;
+import org.apache.qpid.proton.jms.EncodedMessage;
+import org.apache.qpid.proton.jms.InboundTransformer;
+import org.apache.qpid.proton.jms.JMSMappingInboundTransformer;
+import org.apache.qpid.proton.jms.OutboundTransformer;
 import org.apache.qpid.proton.message.impl.MessageImpl;
 import org.fusesource.hawtbuf.Buffer;
 import org.fusesource.hawtbuf.ByteArrayOutputStream;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.jms.InvalidSelectorException;
-import java.io.IOException;
-import java.io.UnsupportedEncodingException;
-import java.util.*;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.locks.ReentrantLock;
-
 class AmqpProtocolConverter {
 
     public static final EnumSet<EndpointState> UNINITIALIZED_SET = EnumSet.of(EndpointState.UNINITIALIZED);
@@ -312,7 +362,7 @@ class AmqpProtocolConverter {
 
     private static final IdGenerator CONNECTION_ID_GENERATOR = new IdGenerator();
     private final ConnectionId connectionId = new ConnectionId(CONNECTION_ID_GENERATOR.generateId());
-    private ConnectionInfo connectionInfo = new ConnectionInfo();
+    private final ConnectionInfo connectionInfo = new ConnectionInfo();
     private long nextSessionId = 0;
     private long nextTempDestinationId = 0;
 
@@ -336,6 +386,7 @@ class AmqpProtocolConverter {
         connectionInfo.setTransportContext(amqpTransport.getPeerCertificates());
 
         sendToActiveMQ(connectionInfo, new ResponseHandler() {
+            @Override
             public void onResponse(AmqpProtocolConverter converter, Response response) throws
IOException {
                 protonConnection.open();
                 pumpProtonToSocket();
@@ -609,6 +660,7 @@ class AmqpProtocolConverter {
                 ProducerInfo producerInfo = new ProducerInfo(producerId);
                 producerInfo.setDestination(dest);
                 sendToActiveMQ(producerInfo, new ResponseHandler() {
+                    @Override
                     public void onResponse(AmqpProtocolConverter converter, Response response)
throws IOException {
                         if (response.isException()) {
                             receiver.setTarget(null);
@@ -658,7 +710,7 @@ class AmqpProtocolConverter {
     class ConsumerContext extends AmqpDeliveryListener {
         private final ConsumerId consumerId;
         private final Sender sender;
-        private boolean presettle;
+        private final boolean presettle;
         private boolean closed;
 
         public ConsumerContext(ConsumerId consumerId, Sender sender) {
@@ -748,6 +800,7 @@ class AmqpProtocolConverter {
                         sender.drained();
                     } else {
                         jms.setRedeliveryCounter(md.getRedeliveryCounter());
+                        jms.setReadOnlyBody(true);
                         final EncodedMessage amqp = outboundTransformer.transform(jms);
                         if( amqp!=null && amqp.getLength() > 0 ) {
 
@@ -906,6 +959,7 @@ class AmqpProtocolConverter {
 
                 consumerContext.closed=true;
                 sendToActiveMQ(rsi, new ResponseHandler() {
+                    @Override
                     public void onResponse(AmqpProtocolConverter converter, Response response)
throws IOException {
                         if (response.isException()) {
                             sender.setSource(null);
@@ -957,6 +1011,7 @@ class AmqpProtocolConverter {
             }
 
             sendToActiveMQ(consumerInfo, new ResponseHandler() {
+                @Override
                 public void onResponse(AmqpProtocolConverter converter, Response response)
throws IOException {
                     if (response.isException()) {
                         sender.setSource(null);



Mime
View raw message