activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject git commit: Use non-deprecated output methods for proton to allow for faster bulk sends of outbound amqp frames.
Date Tue, 04 Feb 2014 17:47:45 GMT
Updated Branches:
  refs/heads/trunk 9eb7fb906 -> e102e64e9


Use non-deprecated output methods for proton to allow for faster bulk
sends of outbound amqp frames.

Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/e102e64e
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/e102e64e
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/e102e64e

Branch: refs/heads/trunk
Commit: e102e64e9da9477494e73fe7291f15cf50b4ad70
Parents: 9eb7fb9
Author: Timothy Bish <tabish121@gmai.com>
Authored: Tue Feb 4 12:47:42 2014 -0500
Committer: Timothy Bish <tabish121@gmai.com>
Committed: Tue Feb 4 12:47:42 2014 -0500

----------------------------------------------------------------------
 .../transport/amqp/AmqpProtocolConverter.java   | 16 +++++++--------
 .../activemq/transport/amqp/AmqpWireFormat.java | 21 ++++++++++++++++++--
 2 files changed, 27 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/e102e64e/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
index 62a05b9..7e24957 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
@@ -18,6 +18,7 @@ package org.apache.activemq.transport.amqp;
 
 import java.io.IOException;
 import java.io.UnsupportedEncodingException;
+import java.nio.ByteBuffer;
 import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -159,16 +160,13 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
 
     void pumpProtonToSocket() {
         try {
-            int size = 1024 * 64;
-            byte data[] = new byte[size];
             boolean done = false;
             while (!done) {
-                int count = protonTransport.output(data, 0, size);
-                if (count > 0) {
-                    final Buffer buffer;
-                    buffer = new Buffer(data, 0, count);
-                    // System.out.println("writing: " + buffer.toString().substring(5).replaceAll("(..)",
"$1 "));
-                    amqpTransport.sendToAmqp(buffer);
+                ByteBuffer toWrite = protonTransport.getOutputBuffer();
+                if (toWrite != null && toWrite.hasRemaining()) {
+//                  // System.out.println("writing: " + buffer.toString().substring(5).replaceAll("(..)",
"$1 "));
+                    amqpTransport.sendToAmqp(toWrite);
+                    protonTransport.outputConsumed();
                 } else {
                     done = true;
                 }
@@ -248,10 +246,12 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
                             sasl.done(Sasl.SaslOutcome.PN_SASL_OK);
                             amqpTransport.getWireFormat().magicRead = false;
                             sasl = null;
+                            LOG.debug("SASL [PLAIN] Handshake complete.");
                         } else if ("ANONYMOUS".equals(sasl.getRemoteMechanisms()[0])) {
                             sasl.done(Sasl.SaslOutcome.PN_SASL_OK);
                             amqpTransport.getWireFormat().magicRead = false;
                             sasl = null;
+                            LOG.debug("SASL [ANONYMOUS] Handshake complete.");
                         }
                     }
                 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/e102e64e/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormat.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormat.java
b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormat.java
index 4a11374..13a264a 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormat.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormat.java
@@ -21,6 +21,10 @@ import java.io.DataInputStream;
 import java.io.DataOutput;
 import java.io.DataOutputStream;
 import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.WritableByteChannel;
 
 import org.apache.activemq.util.ByteArrayInputStream;
 import org.apache.activemq.util.ByteArrayOutputStream;
@@ -53,8 +57,21 @@ public class AmqpWireFormat implements WireFormat {
 
     @Override
     public void marshal(Object command, DataOutput dataOut) throws IOException {
-        Buffer frame = (Buffer) command;
-        frame.writeTo(dataOut);
+        if (command instanceof ByteBuffer) {
+            ByteBuffer buffer = (ByteBuffer) command;
+
+            if (dataOut instanceof OutputStream) {
+                WritableByteChannel channel = Channels.newChannel((OutputStream) dataOut);
+                channel.write(buffer);
+            } else {
+                while (buffer.hasRemaining()) {
+                    dataOut.writeByte(buffer.get());
+                }
+            }
+        } else {
+            Buffer frame = (Buffer) command;
+            frame.writeTo(dataOut);
+        }
     }
 
     boolean magicRead = false;


Mime
View raw message