activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbert...@apache.org
Subject [1/2] activemq-6 git commit: Avoiding possible deadlock after Proton deliveries
Date Wed, 22 Apr 2015 21:44:28 GMT
Repository: activemq-6
Updated Branches:
  refs/heads/master 47edcd401 -> fe849a4f1


Avoiding possible deadlock after Proton deliveries


Project: http://git-wip-us.apache.org/repos/asf/activemq-6/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-6/commit/e62112fb
Tree: http://git-wip-us.apache.org/repos/asf/activemq-6/tree/e62112fb
Diff: http://git-wip-us.apache.org/repos/asf/activemq-6/diff/e62112fb

Branch: refs/heads/master
Commit: e62112fbffa139e0a2672eade2218bfb61f3a70d
Parents: 47edcd4
Author: Clebert Suconic <clebertsuconic@apache.org>
Authored: Wed Apr 22 16:44:03 2015 -0400
Committer: Clebert Suconic <clebertsuconic@apache.org>
Committed: Wed Apr 22 16:45:08 2015 -0400

----------------------------------------------------------------------
 .../ActiveMQProtonRemotingConnection.java       |  5 +++
 .../plug/ActiveMQProtonConnectionCallback.java  | 13 ++++++++
 .../plug/ProtonSessionIntegrationCallback.java  | 34 ++++++++++++++++++--
 3 files changed, 50 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-6/blob/e62112fb/activemq-protocols/activemq-amqp-protocol/src/main/java/org/apache/activemq/core/protocol/proton/ActiveMQProtonRemotingConnection.java
----------------------------------------------------------------------
diff --git a/activemq-protocols/activemq-amqp-protocol/src/main/java/org/apache/activemq/core/protocol/proton/ActiveMQProtonRemotingConnection.java
b/activemq-protocols/activemq-amqp-protocol/src/main/java/org/apache/activemq/core/protocol/proton/ActiveMQProtonRemotingConnection.java
index 85834d2..1dfe055 100644
--- a/activemq-protocols/activemq-amqp-protocol/src/main/java/org/apache/activemq/core/protocol/proton/ActiveMQProtonRemotingConnection.java
+++ b/activemq-protocols/activemq-amqp-protocol/src/main/java/org/apache/activemq/core/protocol/proton/ActiveMQProtonRemotingConnection.java
@@ -44,6 +44,11 @@ public class ActiveMQProtonRemotingConnection extends AbstractRemotingConnection
       this.amqpConnection = amqpConnection;
    }
 
+   public Executor getExecutor()
+   {
+      return this.executor;
+   }
+
    public ProtonProtocolManager getManager()
    {
       return manager;

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/e62112fb/activemq-protocols/activemq-amqp-protocol/src/main/java/org/apache/activemq/core/protocol/proton/plug/ActiveMQProtonConnectionCallback.java
----------------------------------------------------------------------
diff --git a/activemq-protocols/activemq-amqp-protocol/src/main/java/org/apache/activemq/core/protocol/proton/plug/ActiveMQProtonConnectionCallback.java
b/activemq-protocols/activemq-amqp-protocol/src/main/java/org/apache/activemq/core/protocol/proton/plug/ActiveMQProtonConnectionCallback.java
index f57b9fd..4d83dac 100644
--- a/activemq-protocols/activemq-amqp-protocol/src/main/java/org/apache/activemq/core/protocol/proton/plug/ActiveMQProtonConnectionCallback.java
+++ b/activemq-protocols/activemq-amqp-protocol/src/main/java/org/apache/activemq/core/protocol/proton/plug/ActiveMQProtonConnectionCallback.java
@@ -16,6 +16,7 @@
  */
 package org.apache.activemq.core.protocol.proton.plug;
 
+import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
 
 import io.netty.buffer.ByteBuf;
@@ -63,6 +64,18 @@ public class ActiveMQProtonConnectionCallback implements AMQPConnectionCallback
 
    }
 
+   public Executor getExeuctor()
+   {
+      if (protonConnectionDelegate != null)
+      {
+         return protonConnectionDelegate.getExecutor();
+      }
+      else
+      {
+         return null;
+      }
+   }
+
    @Override
    public void setConnection(AMQPConnectionContext connection)
    {

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/e62112fb/activemq-protocols/activemq-amqp-protocol/src/main/java/org/apache/activemq/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java
----------------------------------------------------------------------
diff --git a/activemq-protocols/activemq-amqp-protocol/src/main/java/org/apache/activemq/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java
b/activemq-protocols/activemq-amqp-protocol/src/main/java/org/apache/activemq/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java
index 000cc25..6aedd25 100644
--- a/activemq-protocols/activemq-amqp-protocol/src/main/java/org/apache/activemq/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java
+++ b/activemq-protocols/activemq-amqp-protocol/src/main/java/org/apache/activemq/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java
@@ -17,6 +17,8 @@
 package org.apache.activemq.core.protocol.proton.plug;
 
 
+import java.util.concurrent.Executor;
+
 import io.netty.buffer.ByteBuf;
 import org.apache.qpid.proton.amqp.Binary;
 import org.apache.qpid.proton.amqp.transport.AmqpError;
@@ -172,9 +174,37 @@ public class ProtonSessionIntegrationCallback implements AMQPSessionCallback,
Se
    }
 
    @Override
-   public void closeSender(Object brokerConsumer) throws Exception
+   public void closeSender(final Object brokerConsumer) throws Exception
    {
-      ((ServerConsumer) brokerConsumer).close(false);
+      Runnable runnable = new Runnable()
+      {
+         @Override
+         public void run()
+         {
+            try
+            {
+               ((ServerConsumer) brokerConsumer).close(false);
+            }
+            catch (Exception e)
+            {
+            }
+         }
+      };
+
+
+      // Due to the nature of proton this could be happening within flushes from the queue-delivery
(depending on how it happened on the protocol)
+      // to avoid deadlocks the close has to be done outside of the main thread on an executor
+      // otherwise you could get a deadlock
+      Executor executor = protonSPI.getExeuctor();
+
+      if (executor != null)
+      {
+         executor.execute(runnable);
+      }
+      else
+      {
+         runnable.run();
+      }
    }
 
    @Override


Mime
View raw message