qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From oru...@apache.org
Subject [09/10] qpid-broker-j git commit: QPID-7531: [Java Broker, AMQP 1.0] Ensure consumer target is closed on detaching of sending link endpoint
Date Thu, 05 Oct 2017 19:20:11 GMT
QPID-7531: [Java Broker, AMQP 1.0] Ensure consumer target is closed on detaching of sending
link endpoint


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/2736b3b7
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/2736b3b7
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/2736b3b7

Branch: refs/heads/master
Commit: 2736b3b7d5faed6ea51b8e08e650ca40162e3072
Parents: 016279f
Author: Alex Rudyy <orudyy@apache.org>
Authored: Thu Oct 5 14:59:16 2017 +0100
Committer: Alex Rudyy <orudyy@apache.org>
Committed: Thu Oct 5 20:17:35 2017 +0100

----------------------------------------------------------------------
 .../protocol/v1_0/SendingLinkEndpoint.java      | 86 +++++++++++++-------
 1 file changed, 56 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/2736b3b7/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
index 222074e..0e4a90c 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
@@ -104,7 +104,6 @@ public class SendingLinkEndpoint extends AbstractLinkEndpoint<Source,
Target>
         setDeliveryCount(new SequenceNumber(0));
         setAvailable(UnsignedInteger.valueOf(0));
         setCapabilities(Arrays.asList(AMQPConnection_1_0.SHARED_SUBSCRIPTIONS));
-        session.addDeleteTask(_cleanUpUnsettledDeliveryTask);
     }
 
     @Override
@@ -494,45 +493,19 @@ public class SendingLinkEndpoint extends AbstractLinkEndpoint<Source,
Target>
     @Override
     protected void remoteDetachedPerformDetach(final Detach detach)
     {
-        getConsumerTarget().close();
-
-        TerminusExpiryPolicy expiryPolicy = (getSource()).getExpiryPolicy();
+        TerminusExpiryPolicy expiryPolicy = getSource().getExpiryPolicy();
         if (Boolean.TRUE.equals(detach.getClosed())
             || TerminusExpiryPolicy.LINK_DETACH.equals(expiryPolicy)
             || ((expiryPolicy == null || TerminusExpiryPolicy.SESSION_END.equals(expiryPolicy))
&& getSession().isClosing())
             || (TerminusExpiryPolicy.CONNECTION_CLOSE.equals(expiryPolicy) && getSession().getConnection().isClosing()))
         {
             cleanUpUnsettledDeliveries();
-            getSession().removeDeleteTask(_cleanUpUnsettledDeliveryTask);
-            Error closingError = null;
-            if (getDestination() instanceof ExchangeSendingDestination
-                && getSession().getConnection().getAddressSpace() instanceof QueueManagingVirtualHost)
-            {
-                try
-                {
-                    ((QueueManagingVirtualHost) getSession().getConnection().getAddressSpace()).removeSubscriptionQueue(
-                            ((ExchangeSendingDestination) getDestination()).getQueue().getName());
-                }
-                catch (AccessControlException e)
-                {
-                    LOGGER.error("Error unregistering subscription", e);
-                    closingError = new Error(AmqpError.NOT_ALLOWED, "Error unregistering
subscription");
-                }
-                catch (IllegalStateException e)
-                {
-                    closingError = new Error(AmqpError.RESOURCE_LOCKED, e.getMessage());
-                }
-                catch (NotFoundException e)
-                {
-                    closingError = new Error(AmqpError.NOT_FOUND, e.getMessage());
-                }
-            }
-            close(closingError);
+
+            close();
         }
         else if (detach.getError() != null)
         {
             cleanUpUnsettledDeliveries();
-            getSession().removeDeleteTask(_cleanUpUnsettledDeliveryTask);
             detach();
             destroy();
             getConsumerTarget().updateNotifyWorkDesired();
@@ -549,6 +522,7 @@ public class SendingLinkEndpoint extends AbstractLinkEndpoint<Source,
Target>
 
     private void cleanUpUnsettledDeliveries()
     {
+        getSession().removeDeleteTask(_cleanUpUnsettledDeliveryTask);
         Modified state = new Modified();
         state.setDeliveryFailed(true);
 
@@ -666,6 +640,7 @@ public class SendingLinkEndpoint extends AbstractLinkEndpoint<Source,
Target>
 
         // TODO: QPID-7845 : Resuming links is unsupported at the moment. Thus, cleaning
up unsettled deliveries unconditionally.
         cleanUpUnsettledDeliveries();
+        getSession().addDeleteTask(_cleanUpUnsettledDeliveryTask);
 
         Map<Binary, OutgoingDelivery> unsettledCopy = new HashMap<>(_unsettled);
         Map<Binary, DeliveryState> remoteUnsettled =
@@ -779,6 +754,57 @@ public class SendingLinkEndpoint extends AbstractLinkEndpoint<Source,
Target>
         _destination = destination;
     }
 
+    @Override
+    protected void detach(Error error, final boolean close)
+    {
+        if (_consumerTarget != null)
+        {
+            _consumerTarget.close();
+        }
+
+        TerminusExpiryPolicy expiryPolicy = (getSource()).getExpiryPolicy();
+        if (close
+            || TerminusExpiryPolicy.LINK_DETACH.equals(expiryPolicy)
+            || ((expiryPolicy == null || TerminusExpiryPolicy.SESSION_END.equals(expiryPolicy))
&& getSession().isClosing())
+            || (TerminusExpiryPolicy.CONNECTION_CLOSE.equals(expiryPolicy) && getSession().getConnection().isClosing()))
+        {
+
+            Error closingError = null;
+            if (getDestination() instanceof ExchangeSendingDestination
+                && getSession().getConnection().getAddressSpace() instanceof QueueManagingVirtualHost)
+            {
+                cleanUpUnsettledDeliveries();
+                try
+                {
+                    ((QueueManagingVirtualHost) getSession().getConnection().getAddressSpace()).removeSubscriptionQueue(
+                            ((ExchangeSendingDestination) getDestination()).getQueue().getName());
+                }
+                catch (AccessControlException e)
+                {
+                    LOGGER.error("Error unregistering subscription", e);
+                    closingError = new Error(AmqpError.NOT_ALLOWED, "Error unregistering
subscription");
+                }
+                catch (IllegalStateException e)
+                {
+                    closingError = new Error(AmqpError.RESOURCE_LOCKED, e.getMessage());
+                }
+                catch (NotFoundException e)
+                {
+                    closingError = new Error(AmqpError.NOT_FOUND, e.getMessage());
+                }
+            }
+            if (error == null)
+            {
+                error = closingError;
+            }
+            else
+            {
+                LOGGER.warn("Unexpected error on detaching endpoint {}: {}", getLinkName(),
error);
+            }
+        }
+        super.detach(error, close);
+    }
+
     private static class OutgoingDelivery
     {
         private final MessageInstance _messageInstance;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message