activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r1408953 - in /activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp: AmqpProtocolConverter.java AmqpTransport.java
Date Tue, 13 Nov 2012 20:52:19 GMT
Author: chirino
Date: Tue Nov 13 20:52:18 2012
New Revision: 1408953

URL: http://svn.apache.org/viewvc?rev=1408953&view=rev
Log:
Improve format of the amqp trace messages, implement better producer flow control.

Modified:
    activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
    activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransport.java

Modified: activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java?rev=1408953&r1=1408952&r2=1408953&view=diff
==============================================================================
--- activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
(original)
+++ activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
Tue Nov 13 20:52:18 2012
@@ -94,12 +94,12 @@ class AmqpProtocolConverter {
         this.protonTransport.setProtocolTracer(new ProtocolTracer() {
             @Override
             public void receivedFrame(TransportFrame transportFrame) {
-                System.out.println(String.format("RECV: %05d | %s", transportFrame.getChannel(),
transportFrame.getBody()));
+                System.out.println(String.format("%s | RECV: %s", amqpTransport.getRemoteAddress(),
transportFrame.getBody()));
             }
 
             @Override
             public void sentFrame(TransportFrame transportFrame) {
-                System.out.println(String.format("SENT: %05d | %s", transportFrame.getChannel(),
transportFrame.getBody()));
+                System.out.println(String.format("%s | SENT: %s", amqpTransport.getRemoteAddress(),
transportFrame.getBody()));
             }
         });
 
@@ -474,7 +474,7 @@ class AmqpProtocolConverter {
         }
 
         @Override
-        protected void onMessage(Receiver receiver, final Delivery delivery, Buffer buffer)
throws Exception {
+        protected void onMessage(final Receiver receiver, final Delivery delivery, Buffer
buffer) throws Exception {
             EncodedMessage em = new EncodedMessage(delivery.getMessageFormat(), buffer.data,
buffer.offset, buffer.length);
             final ActiveMQMessage message = (ActiveMQMessage) getInboundTransformer().transform(em);
             current = null;
@@ -494,13 +494,11 @@ class AmqpProtocolConverter {
                 message.setTransactionId(new LocalTransactionId(connectionId, txid));
             }
 
-            ResponseHandler handler = null;
-            if( delivery.remotelySettled() ) {
-                delivery.settle();
-            } else {
-                handler = new ResponseHandler() {
-                    @Override
-                    public void onResponse(AmqpProtocolConverter converter, Response response)
throws IOException {
+            message.onSend();
+            sendToActiveMQ(message, new ResponseHandler() {
+                @Override
+                public void onResponse(AmqpProtocolConverter converter, Response response)
throws IOException {
+                    if( !delivery.remotelySettled()  ) {
                         if( response.isException() ) {
                             ExceptionResponse er = (ExceptionResponse)response;
                             Rejected rejected = new Rejected();
@@ -509,14 +507,12 @@ class AmqpProtocolConverter {
                             rejected.setError(errors);
                             delivery.disposition(rejected);
                         }
-                        delivery.settle();
-                        pumpProtonToSocket();
                     }
-                };
-            }
-
-            message.onSend();
-            sendToActiveMQ(message, handler);
+                    receiver.flow(1);
+                    delivery.settle();
+                    pumpProtonToSocket();
+                }
+            });
         }
 
     }

Modified: activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransport.java?rev=1408953&r1=1408952&r2=1408953&view=diff
==============================================================================
--- activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransport.java
(original)
+++ activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransport.java
Tue Nov 13 20:52:18 2012
@@ -42,4 +42,7 @@ public interface AmqpTransport {
     public void stop() throws Exception;
 
     public String getTransformer();
+
+    public String getRemoteAddress();
+
 }



Mime
View raw message