activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject git commit: https://issues.apache.org/jira/browse/AMQ-5195
Date Tue, 20 May 2014 22:26:33 GMT
Repository: activemq
Updated Branches:
  refs/heads/trunk b9d51bf1d -> 9b6f419d4


https://issues.apache.org/jira/browse/AMQ-5195

Set session incoming credit value and improves the flow handling to
reduce chatter on each message send, improves overall producer
performance significantly.  

Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/9b6f419d
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/9b6f419d
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/9b6f419d

Branch: refs/heads/trunk
Commit: 9b6f419d441458a624ffc7a1f4132021dcdeb88a
Parents: b9d51bf
Author: Timothy Bish <tabish121@gmail.com>
Authored: Tue May 20 18:26:17 2014 -0400
Committer: Timothy Bish <tabish121@gmail.com>
Committed: Tue May 20 18:26:17 2014 -0400

----------------------------------------------------------------------
 .../transport/amqp/AmqpProtocolConverter.java   | 33 +++++++++++++++-----
 1 file changed, 25 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/9b6f419d/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
index 05e20dc..36a41e6 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
@@ -170,6 +170,7 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
             while (!done) {
                 ByteBuffer toWrite = protonTransport.getOutputBuffer();
                 if (toWrite != null && toWrite.hasRemaining()) {
+                    LOG.trace("Sending {} bytes out", toWrite.limit());
                     amqpTransport.sendToAmqp(toWrite);
                     protonTransport.outputConsumed();
                 } else {
@@ -463,6 +464,7 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
         AmqpSessionContext sessionContext = new AmqpSessionContext(connectionId, nextSessionId++);
         session.setContext(sessionContext);
         sendToActiveMQ(new SessionInfo(sessionContext.sessionId), null);
+        session.setIncomingCapacity(AmqpWireFormat.DEFAULT_MAX_FRAME_SIZE * prefetch);
         session.open();
     }
 
@@ -608,10 +610,10 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
                 }
 
                 message.onSend();
-                sendToActiveMQ(message, new ResponseHandler() {
-                    @Override
-                    public void onResponse(IAmqpProtocolConverter converter, Response response)
throws IOException {
-                        if (!delivery.remotelySettled()) {
+                if (!delivery.remotelySettled()) {
+                    sendToActiveMQ(message, new ResponseHandler() {
+                        @Override
+                        public void onResponse(IAmqpProtocolConverter converter, Response
response) throws IOException {
                             if (response.isException()) {
                                 ExceptionResponse er = (ExceptionResponse) response;
                                 Rejected rejected = new Rejected();
@@ -620,14 +622,29 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
                                 condition.setDescription(er.getException().getMessage());
                                 rejected.setError(condition);
                                 delivery.disposition(rejected);
+                            } else {
+                                if (receiver.getCredit() <= (prefetch * .2)) {
+                                    LOG.trace("Sending more credit ({}) to producer: {}",
+                                              prefetch - receiver.getCredit(), producerId);
+                                    receiver.flow(prefetch - receiver.getCredit());
+                                }
+
+                                delivery.disposition(Accepted.getInstance());
+                                delivery.settle();
                             }
+
+                            pumpProtonToSocket();
                         }
-                        receiver.flow(1);
-                        delivery.disposition(Accepted.getInstance());
-                        delivery.settle();
+                    });
+                } else {
+                    if (receiver.getCredit() <= (prefetch * .2)) {
+                        LOG.trace("Sending more credit ({}) to producer: {}",
+                                  prefetch - receiver.getCredit(), producerId);
+                        receiver.flow(prefetch - receiver.getCredit());
                         pumpProtonToSocket();
                     }
-                });
+                    sendToActiveMQ(message, null);
+                }
             }
         }
 


Mime
View raw message