activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [4/5] activemq-artemis git commit: ARTEMIS-320 Refactoring TCP flow control and proper implementation of flow control on consumers
Date Thu, 10 Dec 2015 22:32:40 GMT
ARTEMIS-320 Refactoring TCP flow control and proper implementation of flow control on consumers

https://issues.apache.org/jira/browse/ARTEMIS-320


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/b1b4bb8a
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/b1b4bb8a
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/b1b4bb8a

Branch: refs/heads/master
Commit: b1b4bb8a32a9bb9c6d951f3d0fdd3626944319f9
Parents: 351bcfc
Author: Clebert Suconic <clebertsuconic@apache.org>
Authored: Tue Dec 8 19:09:05 2015 -0500
Committer: Clebert Suconic <clebertsuconic@apache.org>
Committed: Thu Dec 10 16:50:26 2015 -0500

----------------------------------------------------------------------
 .../client/impl/ClientSessionFactoryImpl.java   | 20 -------
 .../impl/ClientSessionFactoryInternal.java      |  3 -
 .../core/client/impl/ClientSessionImpl.java     | 14 +++--
 .../core/client/impl/ClientSessionInternal.java |  4 +-
 .../core/client/impl/DelegatingSession.java     |  6 +-
 .../core/impl/ActiveMQSessionContext.java       |  6 ++
 .../remoting/impl/netty/NettyConnection.java    | 58 +++++++++++++-------
 .../remoting/impl/netty/NettyConnector.java     |  4 ++
 .../protocol/AbstractRemotingConnection.java    |  6 ++
 .../spi/core/protocol/RemotingConnection.java   |  3 +
 .../artemis/spi/core/remoting/Connection.java   |  8 +--
 .../spi/core/remoting/ReadyListener.java        |  2 +-
 .../spi/core/remoting/SessionContext.java       |  2 +
 .../plug/ActiveMQProtonConnectionCallback.java  |  2 +-
 .../plug/ProtonSessionIntegrationCallback.java  | 25 +++++----
 .../core/protocol/mqtt/MQTTConnection.java      |  5 ++
 .../artemis/core/protocol/mqtt/MQTTSession.java |  2 +-
 .../core/protocol/mqtt/MQTTSessionCallback.java | 21 +++----
 .../protocol/openwire/OpenWireConnection.java   |  4 ++
 .../core/protocol/openwire/amq/AMQSession.java  | 19 ++-----
 .../core/protocol/stomp/StompConnection.java    |  5 ++
 .../core/protocol/stomp/StompSession.java       | 15 ++---
 .../core/ServerSessionPacketHandler.java        |  7 +++
 .../core/impl/ActiveMQPacketHandler.java        |  3 +-
 .../protocol/core/impl/CoreSessionCallback.java | 21 ++++---
 .../core/remoting/impl/invm/InVMConnection.java | 16 +++---
 .../core/remoting/impl/netty/NettyAcceptor.java |  2 +
 .../core/server/cluster/impl/BridgeImpl.java    | 40 +++-----------
 .../core/server/impl/ServerConsumerImpl.java    | 30 ++--------
 .../spi/core/protocol/SessionCallback.java      |  6 +-
 .../integration/client/HangConsumerTest.java    | 21 ++-----
 31 files changed, 176 insertions(+), 204 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b1b4bb8a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java
index 3b36345..9df7ed4 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java
@@ -148,8 +148,6 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
 
    private String liveNodeID;
 
-   private Set<ConnectionLifeCycleListener> lifeCycleListeners;
-
    // We need to cache this value here since some listeners may be registered after connectionReadyForWrites was called.
    private boolean connectionReadyForWrites;
 
@@ -222,8 +220,6 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
 
       confirmationWindowWarning = new ConfirmationWindowWarning(serverLocator.getConfirmationWindowSize() < 0);
 
-      lifeCycleListeners = new HashSet<>();
-
       connectionReadyForWrites = true;
    }
 
@@ -239,14 +235,6 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
    }
 
    @Override
-   public void addLifeCycleListener(ConnectionLifeCycleListener lifeCycleListener) {
-      synchronized (connectionReadyLock) {
-         lifeCycleListener.connectionReadyForWrites(connection.getTransportConnection().getID(), connectionReadyForWrites);
-         lifeCycleListeners.add(lifeCycleListener);
-      }
-   }
-
-   @Override
    public void connect(final int initialConnectAttempts,
                        final boolean failoverOnInitialConnection) throws ActiveMQException {
       // Get the connection
@@ -395,14 +383,6 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
 
    @Override
    public void connectionReadyForWrites(final Object connectionID, final boolean ready) {
-      synchronized (connectionReadyLock) {
-         if (connectionReadyForWrites != ready) {
-            connectionReadyForWrites = ready;
-            for (ConnectionLifeCycleListener lifeCycleListener : lifeCycleListeners) {
-               lifeCycleListener.connectionReadyForWrites(connectionID, ready);
-            }
-         }
-      }
    }
 
    @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b1b4bb8a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryInternal.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryInternal.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryInternal.java
index 3671618..ba2dab7 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryInternal.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryInternal.java
@@ -22,7 +22,6 @@ import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.TransportConfiguration;
 import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
 import org.apache.activemq.artemis.api.core.client.SessionFailureListener;
-import org.apache.activemq.artemis.spi.core.remoting.ConnectionLifeCycleListener;
 import org.apache.activemq.artemis.utils.ConfirmationWindowWarning;
 
 public interface ClientSessionFactoryInternal extends ClientSessionFactory {
@@ -58,6 +57,4 @@ public interface ClientSessionFactoryInternal extends ClientSessionFactory {
    ConfirmationWindowWarning getConfirmationWindowWarning();
 
    Lock lockFailover();
-
-   void addLifeCycleListener(ConnectionLifeCycleListener lifeCycleListener);
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b1b4bb8a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java
index ca7e767..dc89680 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java
@@ -44,8 +44,8 @@ import org.apache.activemq.artemis.core.client.ActiveMQClientLogger;
 import org.apache.activemq.artemis.core.client.ActiveMQClientMessageBundle;
 import org.apache.activemq.artemis.core.remoting.FailureListener;
 import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
-import org.apache.activemq.artemis.spi.core.remoting.ConnectionLifeCycleListener;
 import org.apache.activemq.artemis.spi.core.remoting.ConsumerContext;
+import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
 import org.apache.activemq.artemis.spi.core.remoting.SessionContext;
 import org.apache.activemq.artemis.utils.ConfirmationWindowWarning;
 import org.apache.activemq.artemis.utils.TokenBucketLimiterImpl;
@@ -408,6 +408,13 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
       return createConsumer(SimpleString.toSimpleString(queueName), null, browseOnly);
    }
 
+
+   @Override
+   public boolean isWritable(ReadyListener callback) {
+      return sessionContext.isWritable(callback);
+   }
+
+
    /**
     * Note, we DO NOT currently support direct consumers (i.e. consumers where delivery occurs on
     * the remoting thread).
@@ -695,11 +702,6 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
       return sessionFactory.getLiveNodeId();
    }
 
-   @Override
-   public void addLifeCycleListener(ConnectionLifeCycleListener lifeCycleListener) {
-      sessionFactory.addLifeCycleListener(lifeCycleListener);
-   }
-
    // ClientSessionInternal implementation
    // ------------------------------------------------------------
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b1b4bb8a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionInternal.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionInternal.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionInternal.java
index cd697c0..30c8404 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionInternal.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionInternal.java
@@ -23,8 +23,8 @@ import org.apache.activemq.artemis.api.core.client.ClientConsumer;
 import org.apache.activemq.artemis.api.core.client.ClientSession;
 import org.apache.activemq.artemis.api.core.client.SendAcknowledgementHandler;
 import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
-import org.apache.activemq.artemis.spi.core.remoting.ConnectionLifeCycleListener;
 import org.apache.activemq.artemis.spi.core.remoting.ConsumerContext;
+import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
 
 public interface ClientSessionInternal extends ClientSession {
 
@@ -126,5 +126,5 @@ public interface ClientSessionInternal extends ClientSession {
 
    String getNodeId();
 
-   void addLifeCycleListener(ConnectionLifeCycleListener lifeCycleListener);
+   boolean isWritable(ReadyListener callback);
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b1b4bb8a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/DelegatingSession.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/DelegatingSession.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/DelegatingSession.java
index dde59d2..087a150 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/DelegatingSession.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/DelegatingSession.java
@@ -33,8 +33,8 @@ import org.apache.activemq.artemis.api.core.client.SendAcknowledgementHandler;
 import org.apache.activemq.artemis.api.core.client.SessionFailureListener;
 import org.apache.activemq.artemis.core.client.ActiveMQClientLogger;
 import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
-import org.apache.activemq.artemis.spi.core.remoting.ConnectionLifeCycleListener;
 import org.apache.activemq.artemis.spi.core.remoting.ConsumerContext;
+import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
 import org.apache.activemq.artemis.utils.ConcurrentHashSet;
 
 /**
@@ -101,8 +101,8 @@ public class DelegatingSession implements ClientSessionInternal {
    }
 
    @Override
-   public void addLifeCycleListener(ConnectionLifeCycleListener lifeCycleListener) {
-      session.addLifeCycleListener(lifeCycleListener);
+   public boolean isWritable(ReadyListener callback) {
+      return session.isWritable(callback);
    }
 
    @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b1b4bb8a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
index 05ae474..f723802 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
@@ -97,6 +97,7 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAS
 import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAStartMessage;
 import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
 import org.apache.activemq.artemis.spi.core.remoting.Connection;
+import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
 import org.apache.activemq.artemis.spi.core.remoting.SessionContext;
 import org.apache.activemq.artemis.utils.TokenBucketLimiterImpl;
 import org.apache.activemq.artemis.utils.VersionLoader;
@@ -239,6 +240,11 @@ public class ActiveMQSessionContext extends SessionContext {
    }
 
    @Override
+   public boolean isWritable(ReadyListener callback) {
+      return remotingConnection.isWritable(callback);
+   }
+
+   @Override
    public ClientConsumerInternal createConsumer(SimpleString queueName,
                                                 SimpleString filterString,
                                                 int windowSize,

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b1b4bb8a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java
index 6f2f792..7be8354 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java
@@ -18,7 +18,7 @@ package org.apache.activemq.artemis.core.remoting.impl.netty;
 
 import java.net.SocketAddress;
 import java.util.Map;
-import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedDeque;
 import java.util.concurrent.Semaphore;
 
 import io.netty.buffer.ByteBuf;
@@ -39,7 +39,6 @@ import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
 import org.apache.activemq.artemis.spi.core.remoting.Connection;
 import org.apache.activemq.artemis.spi.core.remoting.ConnectionLifeCycleListener;
 import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
-import org.apache.activemq.artemis.utils.ConcurrentHashSet;
 import org.apache.activemq.artemis.utils.IPV6Util;
 
 public class NettyConnection implements Connection {
@@ -65,10 +64,14 @@ public class NettyConnection implements Connection {
 
    private final Semaphore writeLock = new Semaphore(1);
 
-   private final Set<ReadyListener> readyListeners = new ConcurrentHashSet<>();
-
    private RemotingConnection protocolConnection;
 
+   private boolean ready = true;
+
+   /** if {@link #isWritable(ReadyListener)} returns false, we add a callback
+    *  here for when the connection (or Netty Channel) becomes available again. */
+   private final ConcurrentLinkedDeque<ReadyListener> readyListeners = new ConcurrentLinkedDeque<>();
+
    // Static --------------------------------------------------------
 
    // Constructors --------------------------------------------------
@@ -96,6 +99,37 @@ public class NettyConnection implements Connection {
    }
    // Connection implementation ----------------------------
 
+
+   public boolean isWritable(ReadyListener callback) {
+      synchronized (readyListeners) {
+         readyListeners.push(callback);
+
+         return ready;
+      }
+   }
+
+   public void fireReady(final boolean ready) {
+      synchronized (readyListeners) {
+         this.ready = ready;
+
+         if (ready) {
+            for (;;) {
+               ReadyListener readyListener = readyListeners.poll();
+               if (readyListener == null) {
+                  return;
+               }
+
+               try {
+                  readyListener.readyForWriting();
+               }
+               catch (Throwable logOnly) {
+                  ActiveMQClientLogger.LOGGER.warn(logOnly.getMessage(), logOnly);
+               }
+            }
+         }
+      }
+   }
+
    @Override
    public void forceClose() {
       if (channel != null) {
@@ -323,28 +357,12 @@ public class NettyConnection implements Connection {
       return directDeliver;
    }
 
-   @Override
-   public void addReadyListener(final ReadyListener listener) {
-      readyListeners.add(listener);
-   }
-
-   @Override
-   public void removeReadyListener(final ReadyListener listener) {
-      readyListeners.remove(listener);
-   }
-
    //never allow this
    @Override
    public ActiveMQPrincipal getDefaultActiveMQPrincipal() {
       return null;
    }
 
-   void fireReady(final boolean ready) {
-      for (ReadyListener listener : readyListeners) {
-         listener.readyForWriting(ready);
-      }
-   }
-
    @Override
    public TransportConfiguration getConnectorConfig() {
       if (configuration != null) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b1b4bb8a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java
index 3f56248..97c0fef 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java
@@ -936,6 +936,10 @@ public class NettyConnector extends AbstractConnector {
 
       @Override
       public void connectionReadyForWrites(Object connectionID, boolean ready) {
+         NettyConnection connection = (NettyConnection)connections.get(connectionID);
+         if (connection != null) {
+            connection.fireReady(ready);
+         }
          listener.connectionReadyForWrites(connectionID, ready);
       }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b1b4bb8a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/protocol/AbstractRemotingConnection.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/protocol/AbstractRemotingConnection.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/protocol/AbstractRemotingConnection.java
index 32df48c..b759ccc 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/protocol/AbstractRemotingConnection.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/protocol/AbstractRemotingConnection.java
@@ -29,6 +29,7 @@ import org.apache.activemq.artemis.core.client.ActiveMQClientMessageBundle;
 import org.apache.activemq.artemis.core.remoting.CloseListener;
 import org.apache.activemq.artemis.core.remoting.FailureListener;
 import org.apache.activemq.artemis.spi.core.remoting.Connection;
+import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
 
 public abstract class AbstractRemotingConnection implements RemotingConnection {
 
@@ -50,6 +51,11 @@ public abstract class AbstractRemotingConnection implements RemotingConnection {
       return new ArrayList<>(failureListeners);
    }
 
+   @Override
+   public boolean isWritable(ReadyListener callback) {
+      return transportConnection.isWritable(callback);
+   }
+
    protected void callFailureListeners(final ActiveMQException me, String scaleDownTargetNodeID) {
       final List<FailureListener> listenersClone = new ArrayList<>(failureListeners);
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b1b4bb8a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/protocol/RemotingConnection.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/protocol/RemotingConnection.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/protocol/RemotingConnection.java
index 420314b..078e42e 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/protocol/RemotingConnection.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/protocol/RemotingConnection.java
@@ -24,6 +24,7 @@ import org.apache.activemq.artemis.core.remoting.CloseListener;
 import org.apache.activemq.artemis.core.remoting.FailureListener;
 import org.apache.activemq.artemis.spi.core.remoting.BufferHandler;
 import org.apache.activemq.artemis.spi.core.remoting.Connection;
+import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
 
 /**
  * A RemotingConnection is a connection between a client and a server.
@@ -181,4 +182,6 @@ public interface RemotingConnection extends BufferHandler {
     */
    void flush();
 
+   boolean isWritable(ReadyListener callback);
+
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b1b4bb8a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/Connection.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/Connection.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/Connection.java
index ac05267..ed10113 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/Connection.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/Connection.java
@@ -39,6 +39,10 @@ public interface Connection {
 
    void setProtocolConnection(RemotingConnection connection);
 
+   boolean isWritable(ReadyListener listener);
+
+   void fireReady(boolean ready);
+
    /**
     * returns the unique id of this wire.
     *
@@ -104,10 +108,6 @@ public interface Connection {
     */
    void checkFlushBatchBuffer();
 
-   void addReadyListener(ReadyListener listener);
-
-   void removeReadyListener(ReadyListener listener);
-
    /**
     * Generates a {@link TransportConfiguration} to be used to connect to the same target this is
     * connected to.

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b1b4bb8a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/ReadyListener.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/ReadyListener.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/ReadyListener.java
index b846a1a..9103070 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/ReadyListener.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/ReadyListener.java
@@ -18,6 +18,6 @@ package org.apache.activemq.artemis.spi.core.remoting;
 
 public interface ReadyListener {
 
-   void readyForWriting(boolean ready);
+   void readyForWriting();
 
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b1b4bb8a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java
index 1bdaffc..f766e48 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java
@@ -269,4 +269,6 @@ public abstract class SessionContext {
    public abstract void cleanup();
 
    public abstract void linkFlowControl(SimpleString address, ClientProducerCreditsImpl clientProducerCredits);
+
+   public abstract boolean isWritable(ReadyListener callback);
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b1b4bb8a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ActiveMQProtonConnectionCallback.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ActiveMQProtonConnectionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ActiveMQProtonConnectionCallback.java
index 64a6232..d8c0f18 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ActiveMQProtonConnectionCallback.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ActiveMQProtonConnectionCallback.java
@@ -114,7 +114,7 @@ public class ActiveMQProtonConnectionCallback implements AMQPConnectionCallback
 
    @Override
    public AMQPSessionCallback createSessionCallback(AMQPConnectionContext connection) {
-      return new ProtonSessionIntegrationCallback(this, manager, connection);
+      return new ProtonSessionIntegrationCallback(this, manager, connection, this.connection);
    }
 
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b1b4bb8a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java
index 88506b6..5c3c41a 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java
@@ -20,6 +20,8 @@ import java.util.concurrent.Executor;
 
 import io.netty.buffer.ByteBuf;
 import org.apache.activemq.artemis.core.io.IOCallback;
+import org.apache.activemq.artemis.spi.core.remoting.Connection;
+import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
 import org.apache.qpid.proton.amqp.Binary;
 import org.apache.qpid.proton.amqp.transport.AmqpError;
 import org.apache.qpid.proton.amqp.transport.ErrorCondition;
@@ -36,7 +38,6 @@ import org.apache.activemq.artemis.core.server.ServerConsumer;
 import org.apache.activemq.artemis.core.server.ServerMessage;
 import org.apache.activemq.artemis.core.server.ServerSession;
 import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
-import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
 import org.apache.activemq.artemis.utils.ByteUtil;
 import org.apache.activemq.artemis.utils.IDGenerator;
 import org.apache.activemq.artemis.utils.SimpleIDGenerator;
@@ -58,16 +59,26 @@ public class ProtonSessionIntegrationCallback implements AMQPSessionCallback, Se
 
    private final AMQPConnectionContext connection;
 
+   private final Connection transportConnection;
+
+
    private ServerSession serverSession;
 
    private AMQPSessionContext protonSession;
 
    public ProtonSessionIntegrationCallback(ActiveMQProtonConnectionCallback protonSPI,
                                            ProtonProtocolManager manager,
-                                           AMQPConnectionContext connection) {
+                                           AMQPConnectionContext connection,
+                                           Connection transportConnection) {
       this.protonSPI = protonSPI;
       this.manager = manager;
       this.connection = connection;
+      this.transportConnection = transportConnection;
+   }
+
+   @Override
+   public boolean isWritable(ReadyListener callback) {
+      return transportConnection.isWritable(callback);
    }
 
    @Override
@@ -306,16 +317,6 @@ public class ProtonSessionIntegrationCallback implements AMQPSessionCallback, Se
    }
 
    @Override
-   public void addReadyListener(ReadyListener listener) {
-
-   }
-
-   @Override
-   public void removeReadyListener(ReadyListener listener) {
-
-   }
-
-   @Override
    public void disconnect(ServerConsumer consumer, String queueName) {
       synchronized (connection.getLock()) {
          ((Link) consumer.getProtocolContext()).close();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b1b4bb8a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnection.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnection.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnection.java
index 87849ab..0603951 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnection.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnection.java
@@ -28,6 +28,7 @@ import org.apache.activemq.artemis.core.remoting.CloseListener;
 import org.apache.activemq.artemis.core.remoting.FailureListener;
 import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
 import org.apache.activemq.artemis.spi.core.remoting.Connection;
+import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
 
 public class MQTTConnection implements RemotingConnection {
 
@@ -52,6 +53,10 @@ public class MQTTConnection implements RemotingConnection {
       this.destroyed = false;
    }
 
+   public boolean isWritable(ReadyListener callback) {
+      return transportConnection.isWritable(callback);
+   }
+
    @Override
    public Object getID() {
       return transportConnection.getID();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b1b4bb8a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java
index 64023a4..d9c819c 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java
@@ -59,7 +59,7 @@ public class MQTTSession {
 
       mqttConnectionManager = new MQTTConnectionManager(this);
       mqttPublishManager = new MQTTPublishManager(this);
-      sessionCallback = new MQTTSessionCallback(this);
+      sessionCallback = new MQTTSessionCallback(this, connection);
       subscriptionManager = new MQTTSubscriptionManager(this);
       retainMessageManager = new MQTTRetainMessageManager(this);
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b1b4bb8a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java
index 45a9192..cf323d4 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java
@@ -25,12 +25,19 @@ import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
 
 public class MQTTSessionCallback implements SessionCallback {
 
-   private MQTTSession session;
+   private final MQTTSession session;
+   private final MQTTConnection connection;
 
    private MQTTLogger log = MQTTLogger.LOGGER;
 
-   public MQTTSessionCallback(MQTTSession session) throws Exception {
+   public MQTTSessionCallback(MQTTSession session, MQTTConnection connection) throws Exception {
       this.session = session;
+      this.connection = connection;
+   }
+
+   @Override
+   public boolean isWritable(ReadyListener callback) {
+      return connection.isWritable(callback);
    }
 
    @Override
@@ -55,16 +62,6 @@ public class MQTTSessionCallback implements SessionCallback {
    }
 
    @Override
-   public void addReadyListener(ReadyListener listener) {
-      session.getConnection().getTransportConnection().addReadyListener(listener);
-   }
-
-   @Override
-   public void removeReadyListener(ReadyListener listener) {
-      session.getConnection().getTransportConnection().removeReadyListener(listener);
-   }
-
-   @Override
    public int sendLargeMessage(ServerMessage message, ServerConsumer consumer, long bodySize, int deliveryCount) {
       return sendMessage(message, consumer, deliveryCount);
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b1b4bb8a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
index 0bc87ee..43cc92d 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
@@ -52,6 +52,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
 import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
 import org.apache.activemq.artemis.spi.core.remoting.Acceptor;
 import org.apache.activemq.artemis.spi.core.remoting.Connection;
+import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
 import org.apache.activemq.artemis.utils.ConcurrentHashSet;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQMessage;
@@ -160,6 +161,9 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor, S
       this.creationTime = System.currentTimeMillis();
    }
 
+   public boolean isWritable(ReadyListener callback) {
+      return transportConnection.isWritable(callback);
+   }
 
    // SecurityAuth implementation
    @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b1b4bb8a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
index 0765676..b4d04b7 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
@@ -30,6 +30,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ConnectionInfo;
 import org.apache.activemq.command.ConsumerInfo;
@@ -58,7 +59,6 @@ import org.apache.activemq.artemis.core.server.ServerMessage;
 import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl;
 import org.apache.activemq.artemis.core.transaction.impl.XidImpl;
 import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
-import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
 import org.apache.activemq.wireformat.WireFormat;
 
 public class AMQSession implements SessionCallback {
@@ -148,6 +148,11 @@ public class AMQSession implements SessionCallback {
    }
 
    @Override
+   public boolean isWritable(ReadyListener callback) {
+      return connection.isWritable(callback);
+   }
+
+   @Override
    public void sendProducerCreditsMessage(int credits, SimpleString address) {
       // TODO Auto-generated method stub
 
@@ -187,18 +192,6 @@ public class AMQSession implements SessionCallback {
    }
 
    @Override
-   public void addReadyListener(ReadyListener listener) {
-      // TODO Auto-generated method stub
-
-   }
-
-   @Override
-   public void removeReadyListener(ReadyListener listener) {
-      // TODO Auto-generated method stub
-
-   }
-
-   @Override
    public boolean hasCredits(ServerConsumer consumerID) {
       AMQConsumer amqConsumer = consumers.get(consumerID.getID());
       return amqConsumer.hasCredits();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b1b4bb8a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java
index 2701b5f..31d7377 100644
--- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java
+++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java
@@ -41,6 +41,7 @@ import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl;
 import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
 import org.apache.activemq.artemis.spi.core.remoting.Acceptor;
 import org.apache.activemq.artemis.spi.core.remoting.Connection;
+import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
 import org.apache.activemq.artemis.utils.ConfigurationHelper;
 import org.apache.activemq.artemis.utils.VersionLoader;
 
@@ -118,6 +119,10 @@ public final class StompConnection implements RemotingConnection {
       return frame;
    }
 
+   public boolean isWritable(ReadyListener callback) {
+      return transportConnection.isWritable(callback);
+   }
+
    public boolean hasBytes() {
       return frameHandler.hasBytes();
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b1b4bb8a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
index 85eebe0..dd338f6 100644
--- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
+++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
@@ -73,6 +73,11 @@ public class StompSession implements SessionCallback {
       this.consumerCredits = ConfigurationHelper.getIntProperty(TransportConstants.STOMP_CONSUMERS_CREDIT, TransportConstants.STOMP_DEFAULT_CONSUMERS_CREDIT, connection.getAcceptorUsed().getConfiguration());
    }
 
+   @Override
+   public boolean isWritable(ReadyListener callback) {
+      return connection.isWritable(callback);
+   }
+
    void setServerSession(ServerSession session) {
       this.session = session;
    }
@@ -182,16 +187,6 @@ public class StompSession implements SessionCallback {
    }
 
    @Override
-   public void addReadyListener(final ReadyListener listener) {
-      connection.getTransportConnection().addReadyListener(listener);
-   }
-
-   @Override
-   public void removeReadyListener(final ReadyListener listener) {
-      connection.getTransportConnection().removeReadyListener(listener);
-   }
-
-   @Override
    public void disconnect(ServerConsumer consumerId, String queueName) {
       StompSubscription stompSubscription = subscriptions.remove(consumerId.getID());
       if (stompSubscription != null) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b1b4bb8a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
index c8d7838..78fd83f 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
@@ -611,6 +611,9 @@ public class ServerSessionPacketHandler implements ChannelHandler {
 
       newConnection.syncIDGeneratorSequence(remotingConnection.getIDGeneratorSequence());
 
+
+      Connection oldTransportConnection = remotingConnection.getTransportConnection();
+
       remotingConnection = newConnection;
 
       remotingConnection.setCloseListeners(closeListeners);
@@ -624,6 +627,10 @@ public class ServerSessionPacketHandler implements ChannelHandler {
 
       session.setTransferring(false);
 
+      // We do this because the old connection could be out of credits on netty
+      // this will force anything to resume after the reattach through the ReadyListener callbacks
+      oldTransportConnection.fireReady(true);
+
       return serverLastReceivedCommandID;
    }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b1b4bb8a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQPacketHandler.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQPacketHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQPacketHandler.java
index b60804e..2de5adb 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQPacketHandler.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQPacketHandler.java
@@ -148,7 +148,8 @@ public class ActiveMQPacketHandler implements ChannelHandler {
             activeMQPrincipal = connection.getDefaultActiveMQPrincipal();
          }
 
-         ServerSession session = server.createSession(request.getName(), activeMQPrincipal == null ? request.getUsername() : activeMQPrincipal.getUserName(), activeMQPrincipal == null ? request.getPassword() : activeMQPrincipal.getPassword(), request.getMinLargeMessageSize(), connection, request.isAutoCommitSends(), request.isAutoCommitAcks(), request.isPreAcknowledge(), request.isXA(), request.getDefaultAddress(), new CoreSessionCallback(request.getName(), protocolManager, channel), null, true);
+         ServerSession session = server.createSession(request.getName(), activeMQPrincipal == null ? request.getUsername() : activeMQPrincipal.getUserName(), activeMQPrincipal == null ? request.getPassword() : activeMQPrincipal.getPassword(), request.getMinLargeMessageSize(), connection, request.isAutoCommitSends(), request.isAutoCommitAcks(), request.isPreAcknowledge(), request.isXA(), request.getDefaultAddress(),
+                                                      new CoreSessionCallback(request.getName(), protocolManager, channel, connection), null, true);
 
          ServerSessionPacketHandler handler = new ServerSessionPacketHandler(session, server.getStorageManager(), channel);
          channel.setHandler(handler);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b1b4bb8a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java
index 2fe808f..4fc7879 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java
@@ -29,6 +29,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
 import org.apache.activemq.artemis.core.server.ServerConsumer;
 import org.apache.activemq.artemis.core.server.ServerMessage;
 import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager;
+import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
 import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
 import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
 
@@ -38,12 +39,20 @@ public final class CoreSessionCallback implements SessionCallback {
 
    private ProtocolManager protocolManager;
 
+   private final RemotingConnection connection;
+
    private String name;
 
-   public CoreSessionCallback(String name, ProtocolManager protocolManager, Channel channel) {
+   public CoreSessionCallback(String name, ProtocolManager protocolManager, Channel channel, RemotingConnection connection) {
       this.name = name;
       this.protocolManager = protocolManager;
       this.channel = channel;
+      this.connection = connection;
+   }
+
+   @Override
+   public boolean isWritable(ReadyListener callback) {
+      return connection.isWritable(callback);
    }
 
    @Override
@@ -102,16 +111,6 @@ public final class CoreSessionCallback implements SessionCallback {
    }
 
    @Override
-   public void addReadyListener(final ReadyListener listener) {
-      channel.getConnection().getTransportConnection().addReadyListener(listener);
-   }
-
-   @Override
-   public void removeReadyListener(final ReadyListener listener) {
-      channel.getConnection().getTransportConnection().removeReadyListener(listener);
-   }
-
-   @Override
    public void disconnect(ServerConsumer consumerId, String queueName) {
       if (channel.supports(PacketImpl.DISCONNECT_CONSUMER)) {
          channel.send(new DisconnectConsumerMessage(consumerId.getID()));

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b1b4bb8a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMConnection.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMConnection.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMConnection.java
index 7931f82..0cbb575 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMConnection.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMConnection.java
@@ -101,6 +101,14 @@ public class InVMConnection implements Connection {
       // no op
    }
 
+   public boolean isWritable(ReadyListener listener) {
+      return true;
+   }
+
+   @Override
+   public void fireReady(boolean ready) {
+   }
+
    @Override
    public RemotingConnection getProtocolConnection() {
       return this.protocolConnection;
@@ -231,14 +239,6 @@ public class InVMConnection implements Connection {
    }
 
    @Override
-   public void addReadyListener(ReadyListener listener) {
-   }
-
-   @Override
-   public void removeReadyListener(ReadyListener listener) {
-   }
-
-   @Override
    public boolean isUsingProtocolHandling() {
       return false;
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b1b4bb8a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java
index ef56322..345981e 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java
@@ -687,6 +687,8 @@ public class NettyAcceptor implements Acceptor {
          if (conn != null) {
             conn.fireReady(ready);
          }
+
+         listener.connectionReadyForWrites(connectionID, ready);
       }
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b1b4bb8a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
index ac75a1c..b8b30bc 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
@@ -45,7 +45,6 @@ import org.apache.activemq.artemis.core.client.impl.ServerLocatorInternal;
 import org.apache.activemq.artemis.core.filter.Filter;
 import org.apache.activemq.artemis.core.message.impl.MessageImpl;
 import org.apache.activemq.artemis.core.persistence.StorageManager;
-import org.apache.activemq.artemis.core.server.ActiveMQComponent;
 import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
 import org.apache.activemq.artemis.core.server.HandleStatus;
 import org.apache.activemq.artemis.core.server.LargeServerMessage;
@@ -58,8 +57,7 @@ import org.apache.activemq.artemis.core.server.impl.QueueImpl;
 import org.apache.activemq.artemis.core.server.management.Notification;
 import org.apache.activemq.artemis.core.server.management.NotificationService;
 import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
-import org.apache.activemq.artemis.spi.core.remoting.Connection;
-import org.apache.activemq.artemis.spi.core.remoting.ConnectionLifeCycleListener;
+import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
 import org.apache.activemq.artemis.utils.FutureLatch;
 import org.apache.activemq.artemis.utils.ReusableLatch;
 import org.apache.activemq.artemis.utils.TypedProperties;
@@ -69,7 +67,7 @@ import org.apache.activemq.artemis.utils.UUID;
  * A Core BridgeImpl
  */
 
-public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowledgementHandler, ConnectionLifeCycleListener {
+public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowledgementHandler, ReadyListener {
    // Constants -----------------------------------------------------
 
    private static final boolean isTrace = ActiveMQServerLogger.LOGGER.isTraceEnabled();
@@ -135,8 +133,6 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
 
    private volatile ClientProducer producer;
 
-   private volatile boolean connectionWritable = false;
-
    private volatile boolean started;
 
    private volatile boolean stopping = false;
@@ -498,13 +494,18 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
    }
 
    @Override
+   public void readyForWriting() {
+      queue.deliverAsync();
+   }
+
+   @Override
    public HandleStatus handle(final MessageReference ref) throws Exception {
       if (filter != null && !filter.match(ref.getMessage())) {
          return HandleStatus.NO_MATCH;
       }
 
       synchronized (this) {
-         if (!active || !connectionWritable) {
+         if (!active || !session.isWritable(this)) {
             if (ActiveMQServerLogger.LOGGER.isDebugEnabled()) {
                ActiveMQServerLogger.LOGGER.debug(this + "::Ignoring reference on bridge as it is set to inactive ref=" + ref);
             }
@@ -555,29 +556,6 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
       }
    }
 
-   @Override
-   public void connectionCreated(ActiveMQComponent component, Connection connection, String protocol) {
-
-   }
-
-   @Override
-   public void connectionDestroyed(Object connectionID) {
-
-   }
-
-   @Override
-   public void connectionException(Object connectionID, ActiveMQException me) {
-
-   }
-
-   @Override
-   public void connectionReadyForWrites(Object connectionID, boolean ready) {
-      connectionWritable = ready;
-      if (connectionWritable) {
-         queue.deliverAsync();
-      }
-   }
-
    // FailureListener implementation --------------------------------
 
    @Override
@@ -891,8 +869,6 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
 
             session.setSendAcknowledgementHandler(BridgeImpl.this);
 
-            session.addLifeCycleListener(BridgeImpl.this);
-
             afterConnect();
 
             active = true;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b1b4bb8a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
index 243d139..e04c35c 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
@@ -21,7 +21,6 @@ import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReadWriteLock;
@@ -129,12 +128,6 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
 
    private boolean transferring = false;
 
-   /* As well as consumer credit based flow control, we also tap into TCP flow control (assuming transport is using TCP)
-    * This is useful in the case where consumer-window-size = -1, but we don't want to OOM by sending messages ad infinitum to the Netty
-    * write queue when the TCP buffer is full, e.g. the client is slow or has died.
-    */
-   private final AtomicBoolean writeReady = new AtomicBoolean(true);
-
    private final long creationTime;
 
    private AtomicLong consumerRateCheckTime = new AtomicLong(System.currentTimeMillis());
@@ -198,8 +191,6 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
 
       this.strictUpdateDeliveryCount = strictUpdateDeliveryCount;
 
-      this.callback.addReadyListener(this);
-
       this.creationTime = System.currentTimeMillis();
 
       if (browseOnly) {
@@ -220,6 +211,11 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
       }
    }
 
+   @Override
+   public void readyForWriting() {
+      promptDelivery();
+   }
+
    // ServerConsumer implementation
    // ----------------------------------------------------------------------
 
@@ -289,7 +285,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
          // If the consumer is stopped then we don't accept the message, it
          // should go back into the
          // queue for delivery later.
-         if (!started || transferring) {
+         if (!started || transferring || !callback.isWritable(this)) {
             return HandleStatus.BUSY;
          }
 
@@ -395,8 +391,6 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
          ActiveMQServerLogger.LOGGER.trace("ServerConsumerImpl::" + this + " being closed with failed=" + failed, new Exception("trace"));
       }
 
-      callback.removeReadyListener(this);
-
       setStarted(false);
 
       LargeMessageDeliverer del = largeMessageDeliverer;
@@ -811,18 +805,6 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
       }
    }
 
-   @Override
-   public void readyForWriting(final boolean ready) {
-      if (ready) {
-         writeReady.set(true);
-
-         promptDelivery();
-      }
-      else {
-         writeReady.set(false);
-      }
-   }
-
    /**
     * To be used on tests only
     */

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b1b4bb8a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java
index 3c9670d..83c4e93 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java
@@ -43,9 +43,7 @@ public interface SessionCallback {
 
    void closed();
 
-   void addReadyListener(ReadyListener listener);
-
-   void removeReadyListener(ReadyListener listener);
-
    void disconnect(ServerConsumer consumerId, String queueName);
+
+   boolean isWritable(ReadyListener callback);
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b1b4bb8a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java
index 8199b4c..5c0e272 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java
@@ -484,6 +484,11 @@ public class HangConsumerTest extends ActiveMQTestBase {
       }
 
       @Override
+      public boolean isWritable(ReadyListener callback) {
+         return true;
+      }
+
+      @Override
       public void sendProducerCreditsFailMessage(int credits, SimpleString address) {
          targetCallback.sendProducerCreditsFailMessage(credits, address);
       }
@@ -538,22 +543,6 @@ public class HangConsumerTest extends ActiveMQTestBase {
          targetCallback.closed();
       }
 
-      /* (non-Javadoc)
-       * @see SessionCallback#addReadyListener(ReadyListener)
-       */
-      @Override
-      public void addReadyListener(ReadyListener listener) {
-         targetCallback.addReadyListener(listener);
-      }
-
-      /* (non-Javadoc)
-       * @see SessionCallback#removeReadyListener(ReadyListener)
-       */
-      @Override
-      public void removeReadyListener(ReadyListener listener) {
-         targetCallback.removeReadyListener(listener);
-      }
-
       @Override
       public void disconnect(ServerConsumer consumerId, String queueName) {
          //To change body of implemented methods use File | Settings | File Templates.


Mime
View raw message