activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject [1/2] activemq-artemis git commit: ARTEMIS-1056 Improving Proton usage
Date Tue, 28 Mar 2017 19:00:25 GMT
Repository: activemq-artemis
Updated Branches:
  refs/heads/master 177480d86 -> bf7a6ef10


ARTEMIS-1056 Improving Proton usage


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/7f91d295
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/7f91d295
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/7f91d295

Branch: refs/heads/master
Commit: 7f91d295642ae02c10175dfc3c6e01bdb0998635
Parents: 177480d
Author: Clebert Suconic <clebertsuconic@apache.org>
Authored: Mon Mar 27 22:54:06 2017 -0400
Committer: Clebert Suconic <clebertsuconic@apache.org>
Committed: Tue Mar 28 14:48:25 2017 -0400

----------------------------------------------------------------------
 .../amqp/broker/AMQPConnectionCallback.java     |  27 +-
 .../amqp/broker/ProtonProtocolManager.java      |   2 +-
 .../client/AMQPClientConnectionFactory.java     |   2 +-
 .../amqp/proton/AMQPConnectionContext.java      | 357 +++++++++----------
 .../amqp/proton/handler/EventHandler.java       |   3 +
 .../protocol/amqp/proton/handler/Events.java    |   5 -
 .../amqp/proton/handler/ProtonHandler.java      | 162 ++++-----
 7 files changed, 244 insertions(+), 314 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7f91d295/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPConnectionCallback.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPConnectionCallback.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPConnectionCallback.java
index 4265f28..29a4df3 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPConnectionCallback.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPConnectionCallback.java
@@ -22,12 +22,9 @@ import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.Executor;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import io.netty.buffer.ByteBuf;
-import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelFutureListener;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
@@ -35,7 +32,6 @@ import org.apache.activemq.artemis.core.client.impl.TopologyMemberImpl;
 import org.apache.activemq.artemis.core.remoting.CloseListener;
 import org.apache.activemq.artemis.core.remoting.FailureListener;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
-import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
 import org.apache.activemq.artemis.core.server.cluster.ClusterConnection;
 import org.apache.activemq.artemis.core.server.cluster.ClusterManager;
 import org.apache.activemq.artemis.core.transaction.Transaction;
@@ -51,7 +47,6 @@ import org.apache.activemq.artemis.protocol.amqp.sasl.PlainSASL;
 import org.apache.activemq.artemis.protocol.amqp.sasl.SASLResult;
 import org.apache.activemq.artemis.protocol.amqp.sasl.ServerSASL;
 import org.apache.activemq.artemis.spi.core.remoting.Connection;
-import org.apache.activemq.artemis.utils.ReusableLatch;
 import org.apache.activemq.artemis.utils.UUIDGenerator;
 import org.apache.qpid.proton.amqp.Binary;
 import org.apache.qpid.proton.amqp.Symbol;
@@ -72,8 +67,6 @@ public class AMQPConnectionCallback implements FailureListener, CloseListener
{
 
    protected AMQPConnectionContext amqpConnection;
 
-   private final ReusableLatch latch = new ReusableLatch(0);
-
    private final Executor closeExecutor;
 
    private String remoteContainerId;
@@ -160,25 +153,7 @@ public class AMQPConnectionCallback implements FailureListener, CloseListener
{
    }
 
    public void onTransport(ByteBuf byteBuf, AMQPConnectionContext amqpConnection) {
-      final int size = byteBuf.writerIndex();
-
-      latch.countUp();
-      connection.write(new ChannelBufferWrapper(byteBuf, true), false, false, new ChannelFutureListener()
{
-         @Override
-         public void operationComplete(ChannelFuture future) throws Exception {
-            latch.countDown();
-         }
-      });
-
-      if (amqpConnection.isSyncOnFlush()) {
-         try {
-            latch.await(5, TimeUnit.SECONDS);
-         } catch (InterruptedException e) {
-            ActiveMQServerLogger.LOGGER.warn("Error during await invocation", e);
-         }
-      }
-
-      amqpConnection.outputDone(size);
+      connection.write(new ChannelBufferWrapper(byteBuf, true));
    }
 
    public AMQPSessionCallback createSessionCallback(AMQPConnectionContext connection) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7f91d295/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java
index 03314b2..a6463fa 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java
@@ -109,7 +109,7 @@ public class ProtonProtocolManager implements ProtocolManager<Interceptor>,
Noti
       }
 
       String id = server.getConfiguration().getName();
-      AMQPConnectionContext amqpConnection = new AMQPConnectionContext(this, connectionCallback,
id, (int) ttl, getMaxFrameSize(), AMQPConstants.Connection.DEFAULT_CHANNEL_MAX, server.getExecutorFactory().getExecutor(),
server.getScheduledPool());
+      AMQPConnectionContext amqpConnection = new AMQPConnectionContext(this, connectionCallback,
id, (int) ttl, getMaxFrameSize(), AMQPConstants.Connection.DEFAULT_CHANNEL_MAX, server.getScheduledPool());
 
       Executor executor = server.getExecutorFactory().getExecutor();
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7f91d295/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/client/AMQPClientConnectionFactory.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/client/AMQPClientConnectionFactory.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/client/AMQPClientConnectionFactory.java
index 510fdad..441f3a6 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/client/AMQPClientConnectionFactory.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/client/AMQPClientConnectionFactory.java
@@ -52,7 +52,7 @@ public class AMQPClientConnectionFactory {
 
       Executor executor = server.getExecutorFactory().getExecutor();
 
-      AMQPConnectionContext amqpConnection = new AMQPConnectionContext(null, connectionCallback,
containerId, ttl, protocolManager.getMaxFrameSize(), AMQPConstants.Connection.DEFAULT_CHANNEL_MAX,
executor, server.getScheduledPool());
+      AMQPConnectionContext amqpConnection = new AMQPConnectionContext(null, connectionCallback,
containerId, ttl, protocolManager.getMaxFrameSize(), AMQPConstants.Connection.DEFAULT_CHANNEL_MAX,
server.getScheduledPool());
       eventHandler.ifPresent(amqpConnection::addEventHandler);
 
       ActiveMQProtonRemotingConnection delegate = new ActiveMQProtonRemotingConnection(protocolManager,
amqpConnection, connection, executor);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7f91d295/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java
index 7994be4..a884f0d 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java
@@ -16,22 +16,16 @@
  */
 package org.apache.activemq.artemis.protocol.amqp.proton;
 
-import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.FAILOVER_SERVER_LIST;
-import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.HOSTNAME;
-import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.NETWORK_HOST;
-import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.PORT;
-import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.SCHEME;
-
 import java.net.URI;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
+import io.netty.buffer.ByteBuf;
 import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
 import org.apache.activemq.artemis.protocol.amqp.broker.AMQPConnectionCallback;
 import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback;
@@ -55,9 +49,13 @@ import org.apache.qpid.proton.engine.Session;
 import org.apache.qpid.proton.engine.Transport;
 import org.jboss.logging.Logger;
 
-import io.netty.buffer.ByteBuf;
+import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.FAILOVER_SERVER_LIST;
+import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.HOSTNAME;
+import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.NETWORK_HOST;
+import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.PORT;
+import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.SCHEME;
 
-public class AMQPConnectionContext extends ProtonInitializable {
+public class AMQPConnectionContext extends ProtonInitializable implements EventHandler {
 
    private static final Logger log = Logger.getLogger(AMQPConnectionContext.class);
 
@@ -73,8 +71,6 @@ public class AMQPConnectionContext extends ProtonInitializable {
 
    private final Map<Session, AMQPSessionContext> sessions = new ConcurrentHashMap<>();
 
-   protected LocalListener listener = new LocalListener();
-
    private final ProtonProtocolManager protocolManager;
 
    public AMQPConnectionContext(ProtonProtocolManager protocolManager,
@@ -83,7 +79,6 @@ public class AMQPConnectionContext extends ProtonInitializable {
                                 int idleTimeout,
                                 int maxFrameSize,
                                 int channelMax,
-                                Executor dispatchExecutor,
                                 ScheduledExecutorService scheduledPool) {
 
       this.protocolManager = protocolManager;
@@ -95,7 +90,8 @@ public class AMQPConnectionContext extends ProtonInitializable {
 
       this.scheduledPool = scheduledPool;
       connectionCallback.setConnection(this);
-      this.handler = new ProtonHandler(dispatchExecutor);
+      this.handler = new ProtonHandler();
+      handler.addEventHandler(this);
       Transport transport = handler.getTransport();
       transport.setEmitFlowEventOnSend(false);
       if (idleTimeout > 0) {
@@ -103,7 +99,6 @@ public class AMQPConnectionContext extends ProtonInitializable {
       }
       transport.setChannelMax(channelMax);
       transport.setMaxFrameSize(maxFrameSize);
-      handler.addEventHandler(listener);
    }
 
    protected AMQPSessionContext newSessionExtension(Session realSession) throws ActiveMQAMQPException
{
@@ -141,10 +136,6 @@ public class AMQPConnectionContext extends ProtonInitializable {
       return handler.capacity();
    }
 
-   public void outputDone(int bytes) {
-      handler.outputDone(bytes);
-   }
-
    public void flush() {
       handler.flush();
    }
@@ -176,14 +167,6 @@ public class AMQPConnectionContext extends ProtonInitializable {
       return handler.getCreationTime();
    }
 
-   protected void flushBytes() {
-      ByteBuf bytes;
-      // handler.outputBuffer has the lock
-      while ((bytes = handler.outputBuffer()) != null) {
-         connectionCallback.onTransport(bytes, this);
-      }
-   }
-
    public String getRemoteContainer() {
       return handler.getConnection().getRemoteContainer();
    }
@@ -218,7 +201,7 @@ public class AMQPConnectionContext extends ProtonInitializable {
    public Symbol[] getConnectionCapabilitiesOffered() {
       URI tc = connectionCallback.getFailoverList();
       if (tc != null) {
-         Map<Symbol,Object> hostDetails = new HashMap<>();
+         Map<Symbol, Object> hostDetails = new HashMap<>();
          hostDetails.put(NETWORK_HOST, tc.getHost());
          boolean isSSL = tc.getQuery().contains(TransportConstants.SSL_ENABLED_PROP_NAME
+ "=true");
          if (isSSL) {
@@ -229,7 +212,7 @@ public class AMQPConnectionContext extends ProtonInitializable {
          hostDetails.put(HOSTNAME, tc.getHost());
          hostDetails.put(PORT, tc.getPort());
 
-         connectionProperties.put(FAILOVER_SERVER_LIST,  Arrays.asList(hostDetails));
+         connectionProperties.put(FAILOVER_SERVER_LIST, Arrays.asList(hostDetails));
       }
       return ExtCapability.getCapabilities();
    }
@@ -268,220 +251,218 @@ public class AMQPConnectionContext extends ProtonInitializable {
       }
    }
 
-   // This listener will perform a bunch of things here
-   class LocalListener implements EventHandler {
-
-      @Override
-      public void onInit(Connection connection) throws Exception {
+   @Override
+   public void onInit(Connection connection) throws Exception {
 
-      }
+   }
 
-      @Override
-      public void onLocalOpen(Connection connection) throws Exception {
+   @Override
+   public void onLocalOpen(Connection connection) throws Exception {
 
-      }
+   }
 
-      @Override
-      public void onLocalClose(Connection connection) throws Exception {
+   @Override
+   public void onLocalClose(Connection connection) throws Exception {
 
-      }
+   }
 
-      @Override
-      public void onFinal(Connection connection) throws Exception {
+   @Override
+   public void onFinal(Connection connection) throws Exception {
 
-      }
+   }
 
-      @Override
-      public void onInit(Session session) throws Exception {
+   @Override
+   public void onInit(Session session) throws Exception {
 
-      }
+   }
 
-      @Override
-      public void onFinal(Session session) throws Exception {
+   @Override
+   public void onFinal(Session session) throws Exception {
 
-      }
+   }
 
-      @Override
-      public void onInit(Link link) throws Exception {
+   @Override
+   public void onInit(Link link) throws Exception {
 
-      }
+   }
 
-      @Override
-      public void onLocalOpen(Link link) throws Exception {
+   @Override
+   public void onLocalOpen(Link link) throws Exception {
 
-      }
+   }
 
-      @Override
-      public void onLocalClose(Link link) throws Exception {
+   @Override
+   public void onLocalClose(Link link) throws Exception {
 
-      }
+   }
 
-      @Override
-      public void onFinal(Link link) throws Exception {
+   @Override
+   public void onFinal(Link link) throws Exception {
 
-      }
+   }
 
-      @Override
-      public void onAuthInit(ProtonHandler handler, Connection connection, boolean sasl)
{
-         if (sasl) {
-            handler.createServerSASL(connectionCallback.getSASLMechnisms());
-         } else {
-            if (!connectionCallback.isSupportsAnonymous()) {
-               connectionCallback.sendSASLSupported();
-               connectionCallback.close();
-               handler.close(null);
-            }
+   @Override
+   public void onAuthInit(ProtonHandler handler, Connection connection, boolean sasl) {
+      if (sasl) {
+         handler.createServerSASL(connectionCallback.getSASLMechnisms());
+      } else {
+         if (!connectionCallback.isSupportsAnonymous()) {
+            connectionCallback.sendSASLSupported();
+            connectionCallback.close();
+            handler.close(null);
          }
       }
+   }
+
+   @Override
+   public void onTransport(Transport transport) {
+      handler.flushBytes();
+   }
 
-      @Override
-      public void onTransport(Transport transport) {
-         flushBytes();
-      }
 
-      @Override
-      public void onRemoteOpen(Connection connection) throws Exception {
-         synchronized (getLock()) {
-            try {
-               initInternal();
-            } catch (Exception e) {
-               log.error("Error init connection", e);
-            }
-            if (!validateConnection(connection)) {
-               connection.close();
-            } else {
-               connection.setContext(AMQPConnectionContext.this);
-               connection.setContainer(containerId);
-               connection.setProperties(connectionProperties);
-               connection.setOfferedCapabilities(getConnectionCapabilitiesOffered());
-               connection.open();
-            }
+   @Override
+   public void pushBytes(ByteBuf bytes) {
+      connectionCallback.onTransport(bytes, this);
+   }
+
+   @Override
+   public void onRemoteOpen(Connection connection) throws Exception {
+      synchronized (getLock()) {
+         try {
+            initInternal();
+         } catch (Exception e) {
+            log.error("Error init connection", e);
          }
-         initialise();
+         if (!validateConnection(connection)) {
+            connection.close();
+         } else {
+            connection.setContext(AMQPConnectionContext.this);
+            connection.setContainer(containerId);
+            connection.setProperties(connectionProperties);
+            connection.setOfferedCapabilities(getConnectionCapabilitiesOffered());
+            connection.open();
+         }
+      }
+      initialise();
 
          /*
          * This can be null which is in effect an empty map, also we really don't need to
check this for in bound connections
          * but its here in case we add support for outbound connections.
          * */
-         if (connection.getRemoteProperties() == null || !connection.getRemoteProperties().containsKey(CONNECTION_OPEN_FAILED))
{
-            long nextKeepAliveTime = handler.tick(true);
-            flushBytes();
-            if (nextKeepAliveTime > 0 && scheduledPool != null) {
-               scheduledPool.schedule(new Runnable() {
-                  @Override
-                  public void run() {
-                     long rescheduleAt = (handler.tick(false) - TimeUnit.NANOSECONDS.toMillis(System.nanoTime()));
-                     flushBytes();
-                     if (rescheduleAt > 0) {
-                        scheduledPool.schedule(this, rescheduleAt, TimeUnit.MILLISECONDS);
-                     }
+      if (connection.getRemoteProperties() == null || !connection.getRemoteProperties().containsKey(CONNECTION_OPEN_FAILED))
{
+         long nextKeepAliveTime = handler.tick(true);
+         if (nextKeepAliveTime > 0 && scheduledPool != null) {
+            scheduledPool.schedule(new Runnable() {
+               @Override
+               public void run() {
+                  long rescheduleAt = (handler.tick(false) - TimeUnit.NANOSECONDS.toMillis(System.nanoTime()));
+                  if (rescheduleAt > 0) {
+                     scheduledPool.schedule(this, rescheduleAt, TimeUnit.MILLISECONDS);
                   }
-               }, (nextKeepAliveTime - TimeUnit.NANOSECONDS.toMillis(System.nanoTime())),
TimeUnit.MILLISECONDS);
-            }
+               }
+            }, (nextKeepAliveTime - TimeUnit.NANOSECONDS.toMillis(System.nanoTime())), TimeUnit.MILLISECONDS);
          }
       }
+   }
 
-      @Override
-      public void onRemoteClose(Connection connection) {
-         synchronized (getLock()) {
-            connection.close();
-            connection.free();
-         }
-
-         for (AMQPSessionContext protonSession : sessions.values()) {
-            protonSession.close();
-         }
-         sessions.clear();
-
-         // We must force write the channel before we actually destroy the connection
-         onTransport(handler.getTransport());
-         destroy();
+   @Override
+   public void onRemoteClose(Connection connection) {
+      synchronized (getLock()) {
+         connection.close();
+         connection.free();
       }
 
-      @Override
-      public void onLocalOpen(Session session) throws Exception {
-         getSessionExtension(session);
+      for (AMQPSessionContext protonSession : sessions.values()) {
+         protonSession.close();
       }
+      sessions.clear();
 
-      @Override
-      public void onRemoteOpen(Session session) throws Exception {
-         getSessionExtension(session).initialise();
-         synchronized (getLock()) {
-            session.open();
-         }
-      }
+      // We must force write the channel before we actually destroy the connection
+      handler.flushBytes();
+      destroy();
+   }
 
-      @Override
-      public void onLocalClose(Session session) throws Exception {
+   @Override
+   public void onLocalOpen(Session session) throws Exception {
+      getSessionExtension(session);
+   }
+
+   @Override
+   public void onRemoteOpen(Session session) throws Exception {
+      getSessionExtension(session).initialise();
+      synchronized (getLock()) {
+         session.open();
       }
+   }
 
-      @Override
-      public void onRemoteClose(Session session) throws Exception {
-         synchronized (getLock()) {
-            session.close();
-            session.free();
-         }
+   @Override
+   public void onLocalClose(Session session) throws Exception {
+   }
 
-         AMQPSessionContext sessionContext = (AMQPSessionContext) session.getContext();
-         if (sessionContext != null) {
-            sessionContext.close();
-            sessions.remove(session);
-            session.setContext(null);
-         }
+   @Override
+   public void onRemoteClose(Session session) throws Exception {
+      synchronized (getLock()) {
+         session.close();
+         session.free();
       }
 
-      @Override
-      public void onRemoteOpen(Link link) throws Exception {
-         remoteLinkOpened(link);
+      AMQPSessionContext sessionContext = (AMQPSessionContext) session.getContext();
+      if (sessionContext != null) {
+         sessionContext.close();
+         sessions.remove(session);
+         session.setContext(null);
       }
+   }
 
-      @Override
-      public void onFlow(Link link) throws Exception {
-         if (link.getContext() != null) {
-            ((ProtonDeliveryHandler) link.getContext()).onFlow(link.getCredit(), link.getDrain());
-         }
-      }
+   @Override
+   public void onRemoteOpen(Link link) throws Exception {
+      remoteLinkOpened(link);
+   }
 
-      @Override
-      public void onRemoteClose(Link link) throws Exception {
-         synchronized (getLock()) {
-            link.close();
-            link.free();
-         }
-         ProtonDeliveryHandler linkContext = (ProtonDeliveryHandler) link.getContext();
-         if (linkContext != null) {
-            linkContext.close(true);
-         }
+   @Override
+   public void onFlow(Link link) throws Exception {
+      if (link.getContext() != null) {
+         ((ProtonDeliveryHandler) link.getContext()).onFlow(link.getCredit(), link.getDrain());
       }
+   }
 
-      @Override
-      public void onRemoteDetach(Link link) throws Exception {
-         synchronized (getLock()) {
-            link.detach();
-            link.free();
-         }
+   @Override
+   public void onRemoteClose(Link link) throws Exception {
+      synchronized (getLock()) {
+         link.close();
+         link.free();
+      }
+      ProtonDeliveryHandler linkContext = (ProtonDeliveryHandler) link.getContext();
+      if (linkContext != null) {
+         linkContext.close(true);
+      }
+   }
 
-         flush();
+   @Override
+   public void onRemoteDetach(Link link) throws Exception {
+      synchronized (getLock()) {
+         link.detach();
+         link.free();
       }
 
-      @Override
-      public void onLocalDetach(Link link) throws Exception {
-         Object context = link.getContext();
-         if (context instanceof ProtonServerSenderContext) {
-            ProtonServerSenderContext senderContext = (ProtonServerSenderContext) context;
-            senderContext.close(false);
-         }
+   }
+
+   @Override
+   public void onLocalDetach(Link link) throws Exception {
+      Object context = link.getContext();
+      if (context instanceof ProtonServerSenderContext) {
+         ProtonServerSenderContext senderContext = (ProtonServerSenderContext) context;
+         senderContext.close(false);
       }
+   }
 
-      @Override
-      public void onDelivery(Delivery delivery) throws Exception {
-         ProtonDeliveryHandler handler = (ProtonDeliveryHandler) delivery.getLink().getContext();
-         if (handler != null) {
-            handler.onMessage(delivery);
-         } else {
-            // TODO: logs
-            System.err.println("Handler is null, can't delivery " + delivery);
-         }
+   @Override
+   public void onDelivery(Delivery delivery) throws Exception {
+      ProtonDeliveryHandler handler = (ProtonDeliveryHandler) delivery.getLink().getContext();
+      if (handler != null) {
+         handler.onMessage(delivery);
+      } else {
+         log.warn("Handler is null, can't delivery " + delivery, new Exception("tracing location"));
       }
    }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7f91d295/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/EventHandler.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/EventHandler.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/EventHandler.java
index 00bd27a..0ed1723 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/EventHandler.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/EventHandler.java
@@ -16,6 +16,7 @@
  */
 package org.apache.activemq.artemis.protocol.amqp.proton.handler;
 
+import io.netty.buffer.ByteBuf;
 import org.apache.qpid.proton.engine.Connection;
 import org.apache.qpid.proton.engine.Delivery;
 import org.apache.qpid.proton.engine.Link;
@@ -75,4 +76,6 @@ public interface EventHandler {
 
    void onTransport(Transport transport) throws Exception;
 
+   void pushBytes(ByteBuf bytes);
+
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7f91d295/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/Events.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/Events.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/Events.java
index 405491a..a4d1463 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/Events.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/Events.java
@@ -17,14 +17,9 @@
 package org.apache.activemq.artemis.protocol.amqp.proton.handler;
 
 import org.apache.qpid.proton.engine.Event;
-import org.apache.qpid.proton.engine.Transport;
 
 public final class Events {
 
-   public static void dispatchTransport(Transport transport, EventHandler handler) throws
Exception {
-      handler.onTransport(transport);
-   }
-
    public static void dispatch(Event event, EventHandler handler) throws Exception {
       switch (event.getType()) {
          case CONNECTION_INIT:

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7f91d295/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java
index b5594fa..91b252b 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java
@@ -19,8 +19,8 @@ package org.apache.activemq.artemis.protocol.amqp.proton.handler;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
-import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
 
 import io.netty.buffer.ByteBuf;
@@ -54,16 +54,10 @@ public class ProtonHandler extends ProtonInitializable {
 
    private final Collector collector = Proton.collector();
 
-   private final Executor dispatchExecutor;
-
-   private final Runnable dispatchRunnable = () -> dispatch();
-
-   private ArrayList<EventHandler> handlers = new ArrayList<>();
+   private List<EventHandler> handlers = new ArrayList<>();
 
    private Sasl serverSasl;
 
-   private Sasl clientSasl;
-
    private final Object lock = new Object();
 
    private final long creationTime;
@@ -76,33 +70,37 @@ public class ProtonHandler extends ProtonInitializable {
 
    protected boolean receivedFirstPacket = false;
 
-   private int offset = 0;
+   boolean inDispatch = false;
 
-   public ProtonHandler(Executor dispatchExecutor) {
-      this.dispatchExecutor = dispatchExecutor;
+   public ProtonHandler() {
       this.creationTime = System.currentTimeMillis();
       transport.bind(connection);
       connection.collect(collector);
    }
 
    public long tick(boolean firstTick) {
-      synchronized (lock) {
-         if (!firstTick) {
-            try {
-               if (connection.getLocalState() != EndpointState.CLOSED) {
-                  long rescheduleAt = transport.tick(TimeUnit.NANOSECONDS.toMillis(System.nanoTime()));
-                  if (transport.isClosed()) {
-                     throw new IllegalStateException("Channel was inactive for to long");
+      try {
+         synchronized (lock) {
+            if (!firstTick) {
+               try {
+                  if (connection.getLocalState() != EndpointState.CLOSED) {
+                     long rescheduleAt = transport.tick(TimeUnit.NANOSECONDS.toMillis(System.nanoTime()));
+                     if (transport.isClosed()) {
+                        throw new IllegalStateException("Channel was inactive for to long");
+                     }
+                     return rescheduleAt;
                   }
-                  return rescheduleAt;
+               } catch (Exception e) {
+                  log.warn(e.getMessage(), e);
+                  transport.close();
+                  connection.setCondition(new ErrorCondition());
                }
-            } catch (Exception e) {
-               transport.close();
-               connection.setCondition(new ErrorCondition());
+               return 0;
             }
-            return 0;
+            return transport.tick(TimeUnit.NANOSECONDS.toMillis(System.nanoTime()));
          }
-         return transport.tick(TimeUnit.NANOSECONDS.toMillis(System.nanoTime()));
+      } finally {
+         flushBytes();
       }
    }
 
@@ -143,6 +141,30 @@ public class ProtonHandler extends ProtonInitializable {
 
    }
 
+   public void flushBytes() {
+      synchronized (lock) {
+         while (true) {
+            int pending = transport.pending();
+
+            if (pending <= 0) {
+               break;
+            }
+
+            // We allocated a Pooled Direct Buffer, that will be sent down the stream
+            ByteBuf buffer = PooledByteBufAllocator.DEFAULT.directBuffer(pending);
+            ByteBuffer head = transport.head();
+            buffer.writeBytes(head);
+
+            for (EventHandler handler : handlers) {
+               handler.pushBytes(buffer);
+            }
+
+            transport.pop(pending);
+         }
+      }
+   }
+
+
    public SASLResult getSASLResult() {
       return saslResult;
    }
@@ -201,57 +223,13 @@ public class ProtonHandler extends ProtonInitializable {
       return creationTime;
    }
 
-   public void outputDone(int bytes) {
-      synchronized (lock) {
-         transport.pop(bytes);
-         offset -= bytes;
-
-         if (offset < 0) {
-            throw new IllegalStateException("You called outputDone for more bytes than you
actually received. numberOfBytes=" + bytes +
-                                               ", outcome result=" + offset);
-         }
-      }
-
-      flush();
-   }
-
-   public ByteBuf outputBuffer() {
-
-      synchronized (lock) {
-         int pending = transport.pending();
-
-         if (pending < 0) {
-            return null;//throw new IllegalStateException("xxx need to close the connection");
-         }
-
-         int size = pending - offset;
-
-         if (size < 0) {
-            throw new IllegalStateException("negative size: " + pending);
-         }
-
-         if (size == 0) {
-            return null;
-         }
-
-         // For returning PooledBytes
-         ByteBuf buffer = PooledByteBufAllocator.DEFAULT.directBuffer(size);
-         ByteBuffer head = transport.head();
-         head.position(offset);
-         head.limit(offset + size);
-         buffer.writeBytes(head);
-         offset += size; // incrementing offset for future calls
-         return buffer;
-      }
-   }
-
    public void flush() {
       synchronized (lock) {
          transport.process();
          checkServerSASL();
       }
 
-      dispatchExecutor.execute(dispatchRunnable);
+      dispatch();
    }
 
    public void close(ErrorCondition errorCondition) {
@@ -304,38 +282,36 @@ public class ProtonHandler extends ProtonInitializable {
 
    private void dispatch() {
       Event ev;
-      // We don't hold a lock on the entire event processing
-      // because we could have a distributed deadlock
-      // while processing events (for instance onTransport)
-      // while a client is also trying to write here
 
       synchronized (lock) {
-         while ((ev = collector.peek()) != null) {
-            for (EventHandler h : handlers) {
-               if (log.isTraceEnabled()) {
-                  log.trace("Handling " + ev + " towards " + h);
-               }
-               try {
-                  Events.dispatch(ev, h);
-               } catch (Exception e) {
-                  log.warn(e.getMessage(), e);
-                  connection.setCondition(new ErrorCondition());
+         if (inDispatch) {
+            // Avoid recursion from events
+            return;
+         }
+         try {
+            inDispatch = true;
+            while ((ev = collector.peek()) != null) {
+               for (EventHandler h : handlers) {
+                  if (log.isTraceEnabled()) {
+                     log.trace("Handling " + ev + " towards " + h);
+                  }
+                  try {
+                     Events.dispatch(ev, h);
+                  } catch (Exception e) {
+                     log.warn(e.getMessage(), e);
+                     connection.setCondition(new ErrorCondition());
+                  }
                }
-            }
 
-            collector.pop();
-         }
-      }
+               collector.pop();
+            }
 
-      for (EventHandler h : handlers) {
-         try {
-            h.onTransport(transport);
-         } catch (Exception e) {
-            log.warn(e.getMessage(), e);
-            connection.setCondition(new ErrorCondition());
+         } finally {
+            inDispatch = false;
          }
       }
 
+      flushBytes();
    }
 
    public void open(String containerId, Map<Symbol, Object> connectionProperties) {


Mime
View raw message