qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kw...@apache.org
Subject svn commit: r1694532 - in /qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server: binding/BindingImpl.java exchange/AbstractExchange.java exchange/ExchangeImpl.java queue/AbstractQueue.java virtualhostnode/AbstractVirtualHostNode.java
Date Thu, 06 Aug 2015 16:01:19 GMT
Author: kwall
Date: Thu Aug  6 16:01:19 2015
New Revision: 1694532

URL: http://svn.apache.org/r1694532
Log:
QPID-6670: [Java Broker] Address review comments from Lorenz Quack <quack.lorenz@gmail.com>
and ensure that if queue is deleted from both management and wire, that both await the same
future.

Modified:
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/binding/BindingImpl.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/ExchangeImpl.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractVirtualHostNode.java

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/binding/BindingImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/binding/BindingImpl.java?rev=1694532&r1=1694531&r2=1694532&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/binding/BindingImpl.java
(original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/binding/BindingImpl.java
Thu Aug  6 16:01:19 2015
@@ -24,7 +24,6 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
 import com.google.common.util.concurrent.Futures;
@@ -61,8 +60,6 @@ public class BindingImpl
     private final AtomicLong _matches = new AtomicLong();
     private BindingLogSubject _logSubject;
 
-    final AtomicBoolean _deleted = new AtomicBoolean();
-
     public BindingImpl(Map<String, Object> attributes, AMQQueue queue, ExchangeImpl
exchange)
     {
         super(parentsMap(queue,exchange),stripEmptyArguments(enhanceWithDurable(attributes,
queue, exchange)));
@@ -198,27 +195,17 @@ public class BindingImpl
     @StateTransition(currentState = State.ACTIVE, desiredState = State.DELETED)
     private ListenableFuture<Void> doDelete()
     {
-        if (_deleted.compareAndSet(false, true))
+        ListenableFuture<Void> removeBinding = _exchange.removeBindingAsync(this);
+        return doAfter(removeBinding, new Runnable()
         {
-            ListenableFuture<Void> removeBinding = _exchange.removeBindingAsync(this);
-            return doAfter(removeBinding, new Runnable()
+            @Override
+            public void run()
             {
-                @Override
-                public void run()
-                {
-                    getEventLogger().message(_logSubject, BindingMessages.DELETED());
-                    deleted();
-                    setState(State.DELETED);
-                }
-            });
-        }
-        else
-        {
-            getEventLogger().message(_logSubject, BindingMessages.DELETED());
-            deleted();
-            setState(State.DELETED);
-            return Futures.immediateFuture(null);
-        }
+                getEventLogger().message(_logSubject, BindingMessages.DELETED());
+                deleted();
+                setState(State.DELETED);
+            }
+        });
     }
 
     @StateTransition(currentState = State.UNINITIALIZED, desiredState = State.ACTIVE)

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java?rev=1694532&r1=1694531&r2=1694532&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
(original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
Thu Aug  6 16:01:19 2015
@@ -186,15 +186,12 @@ public abstract class AbstractExchange<T
 
         if(_closed.compareAndSet(false,true))
         {
-            List<ListenableFuture<Void>> removeBindingFutures = new ArrayList<>();
-            ListenableFuture<Void> atLeastOne = Futures.immediateFuture(null);
-            removeBindingFutures.add(atLeastOne);
+            List<ListenableFuture<Void>> removeBindingFutures = new ArrayList<>(_bindings.size());
 
             List<BindingImpl> bindings = new ArrayList<>(_bindings);
             for(BindingImpl binding : bindings)
             {
-                ListenableFuture<Void> deleteFuture = binding.deleteAsync();
-                removeBindingFutures.add(deleteFuture);
+                removeBindingFutures.add(binding.deleteAsync());
             }
 
             ListenableFuture<List<Void>> combinedFuture = Futures.allAsList(removeBindingFutures);

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/ExchangeImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/ExchangeImpl.java?rev=1694532&r1=1694531&r2=1694532&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/ExchangeImpl.java
(original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/ExchangeImpl.java
Thu Aug  6 16:01:19 2015
@@ -48,8 +48,6 @@ public interface ExchangeImpl<T extends
                            AMQQueue queue,
                            Map<String, Object> arguments);
 
-    ListenableFuture<Void> deleteWithChecks();
-
     /**
      * Determines whether a message would be isBound to a particular queue using a specific
routing key and arguments
      * @param bindingKey

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java?rev=1694532&r1=1694531&r2=1694532&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
(original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
Thu Aug  6 16:01:19 2015
@@ -38,11 +38,7 @@ import java.util.concurrent.ConcurrentLi
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ConcurrentSkipListSet;
 import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executor;
 import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
@@ -57,6 +53,7 @@ import org.apache.qpid.server.message.Me
 import org.apache.qpid.server.model.CustomRestHeaders;
 import org.apache.qpid.server.model.RestContentHeader;
 
+import com.google.common.util.concurrent.SettableFuture;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -229,6 +226,8 @@ public abstract class AbstractQueue<X ex
     private final Set<AMQSessionModel> _blockedChannels = new ConcurrentSkipListSet<AMQSessionModel>();
 
     private final AtomicBoolean _deleted = new AtomicBoolean(false);
+    private final SettableFuture<Integer> _deleteFuture = SettableFuture.create();
+
     private final List<Action<? super AMQQueue>> _deleteTaskList =
             new CopyOnWriteArrayList<Action<? super AMQQueue>>();
 
@@ -1876,23 +1875,21 @@ public abstract class AbstractQueue<X ex
         // Check access
         _virtualHost.getSecurityManager().authoriseDelete(this);
 
-        final ListenableFuture<Integer> returnCountFuture = Futures.immediateFuture(getQueueDepthMessages());
-        if (!_deleted.getAndSet(true))
+        if (_deleted.compareAndSet(false, true))
         {
-            final List<ListenableFuture<Void>> removeBindingFutures = new ArrayList<>();
-            final ListenableFuture<Void> atLeastOne = Futures.immediateFuture(null);
-            removeBindingFutures.add(atLeastOne);
+            final int queueDepthMessages = getQueueDepthMessages();
+            final List<ListenableFuture<Void>> removeBindingFutures = new ArrayList<>(_bindings.size());
             final ArrayList<BindingImpl> bindingCopy = new ArrayList<>(_bindings);
 
             // TODO - RG - Need to sort out bindings!
             for (BindingImpl b : bindingCopy)
             {
-                final ListenableFuture<Void> removeFuture = b.deleteAsync();
-                removeBindingFutures.add(removeFuture);
+                removeBindingFutures.add(b.deleteAsync());
             }
 
             ListenableFuture<List<Void>> combinedFuture = Futures.allAsList(removeBindingFutures);
-            final ListenableFuture<Void> result = doAfter(combinedFuture, new Runnable()
+
+            doAfter(combinedFuture, new Runnable()
             {
                 @Override
                 public void run()
@@ -1919,55 +1916,12 @@ public abstract class AbstractQueue<X ex
 
                     //Log Queue Deletion
                     getEventLogger().message(_logSubject, QueueMessages.DELETED());
-                }
-            });
-
-            return new ListenableFuture<Integer>()
-            {
-                @Override
-                public void addListener(final Runnable listener, final Executor executor)
-                {
-                    result.addListener(listener, executor);
-                }
-
-                @Override
-                public boolean cancel(final boolean mayInterruptIfRunning)
-                {
-                    return result.cancel(mayInterruptIfRunning);
-                }
 
-                @Override
-                public boolean isCancelled()
-                {
-                    return result.isCancelled();
+                    _deleteFuture.set(queueDepthMessages);
                 }
-
-                @Override
-                public boolean isDone()
-                {
-                    return result.isDone();
-                }
-
-                @Override
-                public Integer get() throws InterruptedException, ExecutionException
-                {
-                    result.get();
-                    return returnCountFuture.get();
-                }
-
-                @Override
-                public Integer get(final long timeout, final TimeUnit unit)
-                        throws InterruptedException, ExecutionException, TimeoutException
-                {
-                    result.get(timeout, unit);
-                    return returnCountFuture.get();
-                }
-            };
-        }
-        else
-        {
-           return returnCountFuture;
+            });
         }
+        return _deleteFuture;
     }
 
     protected void routeToAlternate(List<QueueEntry> entries)

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractVirtualHostNode.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractVirtualHostNode.java?rev=1694532&r1=1694531&r2=1694532&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractVirtualHostNode.java
(original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractVirtualHostNode.java
Thu Aug  6 16:01:19 2015
@@ -207,7 +207,7 @@ public abstract class AbstractVirtualHos
     @Override
     public VirtualHost<?,?,?> getVirtualHost()
     {
-        Collection<VirtualHost> children = getChildren(VirtualHost.class);
+        Collection<VirtualHost> children = new ArrayList<>(getChildren(VirtualHost.class));
         if (children.size() == 0)
         {
             return null;



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message