activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbert...@apache.org
Subject [1/6] activemq-artemis git commit: ARTEMIS-1119 flow controlling connection
Date Tue, 18 Apr 2017 17:06:31 GMT
Repository: activemq-artemis
Updated Branches:
  refs/heads/master 7d5511cfb -> 1f82c783a


ARTEMIS-1119 flow controlling connection

https://issues.apache.org/jira/browse/ARTEMIS-1119


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/807e4e5d
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/807e4e5d
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/807e4e5d

Branch: refs/heads/master
Commit: 807e4e5d9cef75985af09286f13664282ee0a74c
Parents: 0a0955d
Author: Clebert Suconic <clebertsuconic@apache.org>
Authored: Mon Apr 17 15:02:33 2017 -0400
Committer: Clebert Suconic <clebertsuconic@apache.org>
Committed: Tue Apr 18 11:34:09 2017 -0400

----------------------------------------------------------------------
 .../amqp/broker/AMQPConnectionCallback.java     |  6 ++++++
 .../amqp/proton/AMQPConnectionContext.java      |  8 +++++++-
 .../amqp/proton/handler/EventHandler.java       |  3 +++
 .../amqp/proton/handler/ProtonHandler.java      | 20 +++++++++++++++++++-
 4 files changed, 35 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/807e4e5d/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPConnectionCallback.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPConnectionCallback.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPConnectionCallback.java
index 29a4df3..31bec9a 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPConnectionCallback.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPConnectionCallback.java
@@ -47,6 +47,7 @@ import org.apache.activemq.artemis.protocol.amqp.sasl.PlainSASL;
 import org.apache.activemq.artemis.protocol.amqp.sasl.SASLResult;
 import org.apache.activemq.artemis.protocol.amqp.sasl.ServerSASL;
 import org.apache.activemq.artemis.spi.core.remoting.Connection;
+import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
 import org.apache.activemq.artemis.utils.UUIDGenerator;
 import org.apache.qpid.proton.amqp.Binary;
 import org.apache.qpid.proton.amqp.Symbol;
@@ -156,6 +157,11 @@ public class AMQPConnectionCallback implements FailureListener, CloseListener
{
       connection.write(new ChannelBufferWrapper(byteBuf, true));
    }
 
+   public boolean isWritable(ReadyListener readyListener) {
+      return connection.isWritable(readyListener);
+   }
+
+
    public AMQPSessionCallback createSessionCallback(AMQPConnectionContext connection) {
       return new AMQPSessionCallback(this, manager, connection, this.connection, closeExecutor,
server.newOperationContext());
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/807e4e5d/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java
index 0173631..4a46a8a 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java
@@ -35,6 +35,7 @@ import org.apache.activemq.artemis.protocol.amqp.proton.handler.EventHandler;
 import org.apache.activemq.artemis.protocol.amqp.proton.handler.ExtCapability;
 import org.apache.activemq.artemis.protocol.amqp.proton.handler.ProtonHandler;
 import org.apache.activemq.artemis.protocol.amqp.sasl.SASLResult;
+import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
 import org.apache.activemq.artemis.utils.ByteUtil;
 import org.apache.activemq.artemis.utils.VersionLoader;
 import org.apache.qpid.proton.amqp.Symbol;
@@ -90,7 +91,7 @@ public class AMQPConnectionContext extends ProtonInitializable implements
EventH
 
       this.scheduledPool = scheduledPool;
       connectionCallback.setConnection(this);
-      this.handler = new ProtonHandler();
+      this.handler = new ProtonHandler(protocolManager.getServer().getExecutorFactory().getExecutor());
       handler.addEventHandler(this);
       Transport transport = handler.getTransport();
       transport.setEmitFlowEventOnSend(false);
@@ -333,6 +334,11 @@ public class AMQPConnectionContext extends ProtonInitializable implements
EventH
    }
 
    @Override
+   public boolean flowControl(ReadyListener readyListener) {
+      return connectionCallback.isWritable(readyListener);
+   }
+
+   @Override
    public void onRemoteOpen(Connection connection) throws Exception {
       lock();
       try {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/807e4e5d/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/EventHandler.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/EventHandler.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/EventHandler.java
index 0ed1723..c8ba136 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/EventHandler.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/EventHandler.java
@@ -17,6 +17,7 @@
 package org.apache.activemq.artemis.protocol.amqp.proton.handler;
 
 import io.netty.buffer.ByteBuf;
+import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
 import org.apache.qpid.proton.engine.Connection;
 import org.apache.qpid.proton.engine.Delivery;
 import org.apache.qpid.proton.engine.Link;
@@ -78,4 +79,6 @@ public interface EventHandler {
 
    void pushBytes(ByteBuf bytes);
 
+   boolean flowControl(ReadyListener readyListener);
+
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/807e4e5d/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java
index f1be934..e3cb730 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java
@@ -21,6 +21,7 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.ReentrantLock;
 
@@ -29,6 +30,7 @@ import io.netty.buffer.PooledByteBufAllocator;
 import org.apache.activemq.artemis.protocol.amqp.proton.ProtonInitializable;
 import org.apache.activemq.artemis.protocol.amqp.sasl.SASLResult;
 import org.apache.activemq.artemis.protocol.amqp.sasl.ServerSASL;
+import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
 import org.apache.activemq.artemis.utils.ByteUtil;
 import org.apache.qpid.proton.Proton;
 import org.apache.qpid.proton.amqp.Symbol;
@@ -71,14 +73,23 @@ public class ProtonHandler extends ProtonInitializable {
 
    protected boolean receivedFirstPacket = false;
 
+   private final Executor flushExecutor;
+
+   protected final ReadyListener readyListener;
+
    boolean inDispatch = false;
 
-   public ProtonHandler() {
+   public ProtonHandler(Executor flushExecutor) {
+      this.flushExecutor = flushExecutor;
+      this.readyListener = () -> flushExecutor.execute(() -> {
+         flush();
+      });
       this.creationTime = System.currentTimeMillis();
       transport.bind(connection);
       connection.collect(collector);
    }
 
+
    public long tick(boolean firstTick) {
       lock.lock();
       try {
@@ -161,6 +172,13 @@ public class ProtonHandler extends ProtonInitializable {
    }
 
    public void flushBytes() {
+
+      for (EventHandler handler : handlers) {
+         if (!handler.flowControl(readyListener)) {
+            return;
+         }
+      }
+
       lock.lock();
       try {
          while (true) {


Mime
View raw message