qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kw...@apache.org
Subject svn commit: r1695883 - in /qpid/java/trunk: bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/ broker-core/src/main/java/org/apache/qpid/server/logging/ broker-core/src/main/java/org/apache/qpid/server/model/ broker-core/src/main...
Date Fri, 14 Aug 2015 12:55:49 GMT
Author: kwall
Date: Fri Aug 14 12:55:49 2015
New Revision: 1695883

URL: http://svn.apache.org/r1695883
Log:
QPID-6694: [Java Broker] Ensure that asynch model operations chain exceptions through the
futures

The work of Alex Rudyy <orudyy@apache.org> and me.

Modified:
    qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/logging/AbstractLogger.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/logging/AbstractNameAndLevelFilter.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileBasedGroupProviderImpl.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileSystemPreferencesProviderImpl.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/port/AbstractPort.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/AbstractAuthenticationManager.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractVirtualHostNode.java
    qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java
    qpid/java/trunk/broker-plugins/access-control/src/main/java/org/apache/qpid/server/security/access/plugins/ACLFileAccessControlProviderImpl.java

Modified: qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java?rev=1695883&r1=1695882&r2=1695883&view=diff
==============================================================================
--- qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java
(original)
+++ qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java
Fri Aug 14 12:55:49 2015
@@ -1219,23 +1219,32 @@ public class BDBHAVirtualHostNodeImpl ex
         LOGGER.info("Intruder detected (" + intruderHostAndPort + "), stopping and setting
state to ERRORED");
 
         final State initialState = getState();
-        try
+
+
+        ListenableFuture<Void> future = doAfterAlways(stopAndSetStateTo(State.ERRORED),
new Runnable()
         {
-            stopAndSetStateTo(State.ERRORED).addListener(new Runnable()
+            @Override
+            public void run()
             {
-                @Override
-                public void run()
-                {
-                    _lastRole.set(NodeRole.DETACHED);
-                    attributeSet(ROLE, _role, NodeRole.DETACHED);
-                    notifyStateChanged(initialState, State.ERRORED);
-                }
-            }, getTaskExecutor().getExecutor());
-        }
-        catch (Exception e)
+                _lastRole.set(NodeRole.DETACHED);
+                attributeSet(ROLE, _role, NodeRole.DETACHED);
+                notifyStateChanged(initialState, State.ERRORED);
+            }
+        });
+
+        Futures.addCallback(future, new FutureCallback<Void>()
         {
-            LOGGER.error("Unexpected exception on closing the node when intruder is detected
", e);
-        }
+            @Override
+            public void onSuccess(final Void result)
+            {
+            }
+
+            @Override
+            public void onFailure(final Throwable t)
+            {
+                LOGGER.error("Failed to close children when handling intruder", t);
+            }
+        });
     }
 
     private abstract class VirtualHostNodeGroupTask implements Task<Void>

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/logging/AbstractLogger.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/logging/AbstractLogger.java?rev=1695883&r1=1695882&r2=1695883&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/logging/AbstractLogger.java
(original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/logging/AbstractLogger.java
Fri Aug 14 12:55:49 2015
@@ -102,8 +102,7 @@ public abstract class AbstractLogger<X e
     @StateTransition(currentState = {State.ACTIVE, State.UNINITIALIZED, State.ERRORED, State.STOPPED},
desiredState = State.DELETED)
     private ListenableFuture<Void> doDelete()
     {
-        final SettableFuture<Void> returnVal = SettableFuture.create();
-        closeAsync().addListener(new Runnable()
+        return doAfterAlways(closeAsync(), new Runnable()
         {
             @Override
             public void run()
@@ -111,10 +110,8 @@ public abstract class AbstractLogger<X e
                 deleted();
                 setState(State.DELETED);
                 stopLogging();
-                returnVal.set(null);
             }
-        }, getTaskExecutor().getExecutor());
-        return returnVal;
+        });
     }
 
     @Override

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/logging/AbstractNameAndLevelFilter.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/logging/AbstractNameAndLevelFilter.java?rev=1695883&r1=1695882&r2=1695883&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/logging/AbstractNameAndLevelFilter.java
(original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/logging/AbstractNameAndLevelFilter.java
Fri Aug 14 12:55:49 2015
@@ -64,8 +64,7 @@ public abstract class AbstractNameAndLev
     @StateTransition( currentState = { State.ACTIVE, State.ERRORED, State.UNINITIALIZED },
desiredState = State.DELETED )
     private ListenableFuture<Void> doDelete()
     {
-        final SettableFuture<Void> returnVal = SettableFuture.create();
-        closeAsync().addListener(new Runnable()
+        return doAfterAlways(closeAsync(), new Runnable()
         {
             @Override
             public void run()
@@ -73,11 +72,9 @@ public abstract class AbstractNameAndLev
                 deleted();
                 QpidLoggerTurboFilter.filterRemovedFromRootContext(_filter);
                 setState(State.DELETED);
-                returnVal.set(null);
 
             }
-        }, getTaskExecutor().getExecutor());
-        return returnVal;
+        });
     }
 
     @StateTransition( currentState = { State.ERRORED, State.UNINITIALIZED }, desiredState
= State.ACTIVE )

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java?rev=1695883&r1=1695882&r2=1695883&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java
(original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java
Fri Aug 14 12:55:49 2015
@@ -2134,6 +2134,49 @@ public abstract class AbstractConfigured
 
         return returnVal;
     }
+    protected <V> ChainedListenableFuture<Void> doAfterAlways(ListenableFuture<V>
future,
+                                                              Runnable after)
+    {
+        return doAfterAlways(getTaskExecutor().getExecutor(), future, after);
+    }
+
+    protected static <V> ChainedListenableFuture<Void> doAfterAlways(Executor
executor,
+                                                                     ListenableFuture<V>
future,
+                                                                     final Runnable after)
+    {
+        final ChainedSettableFuture<Void> returnVal = new ChainedSettableFuture<Void>(executor);
+        Futures.addCallback(future, new FutureCallback<V>()
+        {
+            @Override
+            public void onSuccess(final V result)
+            {
+                try
+                {
+                    after.run();
+                    returnVal.set(null);
+                }
+                catch (Throwable e)
+                {
+                    returnVal.setException(e);
+                }
+            }
+
+            @Override
+            public void onFailure(final Throwable t)
+            {
+                try
+                {
+                    after.run();
+                }
+                finally
+                {
+                    returnVal.setException(t);
+                }
+            }
+        }, executor);
+
+        return returnVal;
+    }
 
 
     @Override

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java?rev=1695883&r1=1695882&r2=1695883&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java
(original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java
Fri Aug 14 12:55:49 2015
@@ -36,7 +36,6 @@ import javax.security.auth.Subject;
 
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.SettableFuture;
 import org.apache.qpid.server.logging.QpidLoggerTurboFilter;
 import org.apache.qpid.server.logging.StartupAppender;
 import org.slf4j.Logger;
@@ -264,35 +263,24 @@ public class BrokerAdapter extends Abstr
     {
         if(_parent.isManagementMode())
         {
-            final SettableFuture<Void> returnVal = SettableFuture.create();
-
-            _managementModeAuthenticationProvider.openAsync().addListener(
+            return doAfter(_managementModeAuthenticationProvider.openAsync(),
                     new Runnable()
                     {
                         @Override
                         public void run()
                         {
-                            try
-                            {
-                                activateWithoutManagementMode();
-                            }
-                            finally
-                            {
-                                returnVal.set(null);
-                            }
+                            performActivation();
                         }
-                    }, getTaskExecutor().getExecutor()
-                                                                         );
-            return returnVal;
+                    });
         }
         else
         {
-            activateWithoutManagementMode();
+            performActivation();
             return Futures.immediateFuture(null);
         }
     }
 
-    private void activateWithoutManagementMode()
+    private void performActivation()
     {
         boolean hasBrokerAnyErroredChildren = false;
 
@@ -305,8 +293,8 @@ public class BrokerAdapter extends Abstr
                     if (child.getState() == State.ERRORED )
                     {
                         hasBrokerAnyErroredChildren = true;
-                        LOGGER.warn(String.format("Broker child object '%s' of type '%s'
is %s",
-                                                    child.getName(), childClass.getSimpleName(),
State.ERRORED ));
+                        LOGGER.warn("Broker child object '{}' of type '{}' is {}",
+                                new Object[]{ child.getName(), childClass.getSimpleName(),
State.ERRORED });
                     }
                 }
             }

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileBasedGroupProviderImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileBasedGroupProviderImpl.java?rev=1695883&r1=1695882&r2=1695883&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileBasedGroupProviderImpl.java
(original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileBasedGroupProviderImpl.java
Fri Aug 14 12:55:49 2015
@@ -289,35 +289,25 @@ public class FileBasedGroupProviderImpl
     @StateTransition( currentState = { State.QUIESCED, State.ACTIVE, State.ERRORED}, desiredState
= State.DELETED )
     private ListenableFuture<Void> doDelete()
     {
-        final SettableFuture<Void> returnVal = SettableFuture.create();
-        closeAsync().addListener(
+        return doAfterAlways(closeAsync(),
                 new Runnable()
                 {
                     @Override
                     public void run()
                     {
-                        try
+                        File file = new File(getPath());
+                        if (file.exists())
                         {
-                            File file = new File(getPath());
-                            if (file.exists())
+                            if (!file.delete())
                             {
-                                if (!file.delete())
-                                {
-                                    throw new IllegalConfigurationException("Cannot delete
group file");
-                                }
+                                throw new IllegalConfigurationException("Cannot delete group
file");
                             }
-
-                            deleted();
-                            setState(State.DELETED);
-                        }
-                        finally
-                        {
-                            returnVal.set(null);
                         }
+
+                        deleted();
+                        setState(State.DELETED);
                     }
-                }, getTaskExecutor().getExecutor()
-                           );
-        return returnVal;
+                });
     }
 
     @StateTransition( currentState = State.UNINITIALIZED, desiredState = State.QUIESCED)

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileSystemPreferencesProviderImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileSystemPreferencesProviderImpl.java?rev=1695883&r1=1695882&r2=1695883&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileSystemPreferencesProviderImpl.java
(original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileSystemPreferencesProviderImpl.java
Fri Aug 14 12:55:49 2015
@@ -191,34 +191,22 @@ public class FileSystemPreferencesProvid
     @StateTransition(currentState = { State.ACTIVE, State.QUIESCED, State.ERRORED }, desiredState
= State.DELETED )
     private ListenableFuture<Void> doDelete()
     {
-        final SettableFuture<Void> returnVal = SettableFuture.create();
-        closeAsync().addListener(
+        return doAfterAlways(closeAsync(),
                 new Runnable()
                 {
                     @Override
                     public void run()
                     {
-                        try
+                        if(_store != null)
                         {
-                            if(_store != null)
-                            {
-                                _store.close();
-                                _store.delete();
-                                deleted();
-                                _authenticationProvider.setPreferencesProvider(null);
-
-                            }
-                            setState(State.DELETED);
-                        }
-                        finally
-                        {
-                            returnVal.set(null);
+                            _store.close();
+                            _store.delete();
+                            deleted();
+                            _authenticationProvider.setPreferencesProvider(null);
                         }
+                        setState(State.DELETED);
                     }
-                }, getTaskExecutor().getExecutor()
-                           );
-
-        return returnVal;
+                });
 
     }
 

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/port/AbstractPort.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/port/AbstractPort.java?rev=1695883&r1=1695882&r2=1695883&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/port/AbstractPort.java
(original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/port/AbstractPort.java
Fri Aug 14 12:55:49 2015
@@ -29,7 +29,6 @@ import java.util.Set;
 
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.SettableFuture;
 import org.apache.qpid.server.logging.EventLogger;
 import org.apache.qpid.server.logging.messages.PortMessages;
 import org.apache.qpid.server.model.IntegrityViolationException;
@@ -238,18 +237,16 @@ abstract public class AbstractPort<X ext
     @StateTransition(currentState = { State.ACTIVE, State.QUIESCED, State.ERRORED}, desiredState
= State.DELETED )
     private ListenableFuture<Void> doDelete()
     {
-        final SettableFuture<Void> returnVal = SettableFuture.create();
-        closeAsync().addListener(new Runnable()
+        return doAfterAlways(closeAsync(), new Runnable()
         {
             @Override
             public void run()
             {
+                deleted();
                 setState(State.DELETED);
-                returnVal.set(null);
                 _eventLogger.message(PortMessages.DELETE(getType(), getName()));
             }
-        }, getTaskExecutor().getExecutor());
-        return returnVal;
+        });
     }
 
     @StateTransition( currentState = {State.UNINITIALIZED, State.QUIESCED, State.ERRORED},
desiredState = State.ACTIVE )

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/AbstractAuthenticationManager.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/AbstractAuthenticationManager.java?rev=1695883&r1=1695882&r2=1695883&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/AbstractAuthenticationManager.java
(original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/AbstractAuthenticationManager.java
Fri Aug 14 12:55:49 2015
@@ -27,6 +27,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
@@ -205,8 +206,8 @@ public abstract class AbstractAuthentica
         String providerName = getName();
 
         // verify that provider is not in use
-        Collection<Port> ports = new ArrayList<Port>(_broker.getPorts());
-        for (Port port : ports)
+        Collection<Port<?>> ports = new ArrayList<>(_broker.getPorts());
+        for (Port<?> port : ports)
         {
             if(port instanceof AbstractPortWithAuthProvider
                && ((AbstractPortWithAuthProvider<?>)port).getAuthenticationProvider()
== this)
@@ -215,52 +216,80 @@ public abstract class AbstractAuthentica
             }
         }
 
-        final SettableFuture<Void> returnVal = SettableFuture.create();
+        return performDelete();
+    }
+
+    private ListenableFuture<Void> performDelete()
+    {
+        final SettableFuture<Void> futureResult = SettableFuture.create();
+        final ListenableFuture<Void> preferenceDeleteFuture;
+        if (_preferencesProvider != null)
+        {
+            preferenceDeleteFuture = _preferencesProvider.deleteAsync();
+        }
+        else
+        {
+            preferenceDeleteFuture = Futures.immediateFuture(null);
+        }
 
-        final ListenableFuture<Void> future = closeAsync();
-        future.addListener(new Runnable()
+        Futures.addCallback(preferenceDeleteFuture, new FutureCallback<Void>()
         {
+
             @Override
-            public void run()
+            public void onSuccess(final Void result)
+            {
+                closeAndDelete();
+            }
+
+            @Override
+            public void onFailure(final Throwable t)
+            {
+                LOGGER.warn("Failed to delete preference provider : {}", _preferencesProvider.getName(),
t);
+                closeAndDelete();
+            }
+
+            private void closeAndDelete()
             {
-                if (_preferencesProvider != null)
+                Futures.addCallback(closeAsync(), new FutureCallback<Void>()
                 {
-                    _preferencesProvider.deleteAsync().addListener(new Runnable()
+                    @Override
+                    public void onSuccess(final Void result)
                     {
-                        @Override
-                        public void run()
+                        try
                         {
-                            try
-                            {
-                                deleted();
-                                setState(State.DELETED);
-                            }
-                            finally
-                            {
-                                returnVal.set(null);
-                                _eventLogger.message(AuthenticationProviderMessages.DELETE(getName()));
-                            }
+                            tidyUp();
+                            futureResult.set(null);
                         }
-                    }, getTaskExecutor().getExecutor());
-                }
-                else
-                {
-                    try
-                    {
-                        deleted();
+                        catch (Exception e)
+                        {
+                            futureResult.setException(e);
+                        }
+                    }
 
-                        setState(State.DELETED);
+                    @Override
+                    public void onFailure(final Throwable t)
+                    {
+                        try
+                        {
+                            tidyUp();
+                        }
+                        finally
+                        {
+                            futureResult.setException(t);
+                        }
                     }
-                    finally
+
+                    private void tidyUp()
                     {
-                        returnVal.set(null);
+                        deleted();
+                        setState(State.DELETED);
                         _eventLogger.message(AuthenticationProviderMessages.DELETE(getName()));
                     }
-                }
+                });
             }
-        }, getTaskExecutor().getExecutor());
+        });
 
-        return  returnVal;
+        return futureResult;
     }
 
     @Override

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java?rev=1695883&r1=1695882&r2=1695883&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
(original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
Fri Aug 14 12:55:49 2015
@@ -40,12 +40,12 @@ import java.util.concurrent.*;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
 import javax.security.auth.Subject;
 
 import com.google.common.base.Function;
+import com.google.common.util.concurrent.AsyncFunction;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
@@ -144,7 +144,6 @@ public abstract class AbstractVirtualHos
     private final EventLogger _eventLogger;
 
     private final List<VirtualHostAlias> _aliases = new ArrayList<VirtualHostAlias>();
-    private final AtomicBoolean _deleted = new AtomicBoolean();
     private final VirtualHostNode<?> _virtualHostNode;
 
     private final AtomicLong _targetSize = new AtomicLong(1024*1024);
@@ -397,30 +396,20 @@ public abstract class AbstractVirtualHos
         return isStoreEmptyHandler.isEmpty();
     }
 
-    protected ListenableFuture<Void> createDefaultExchanges()
+    private ListenableFuture<List<Void>> createDefaultExchanges()
     {
-        return Subject.doAs(getSecurityManager().getSubjectWithAddedSystemRights(), new PrivilegedAction<ListenableFuture<Void>>()
+        return Subject.doAs(getSecurityManager().getSubjectWithAddedSystemRights(), new PrivilegedAction<ListenableFuture<List<Void>>>()
         {
-            private static final int TOTAL_STANDARD_EXCHANGES = 4;
-            private final AtomicInteger _createdExchangeCount = new AtomicInteger();
-            private SettableFuture<Void> _future = SettableFuture.create();
 
             @Override
-            public ListenableFuture<Void> run()
+            public ListenableFuture<List<Void>> run()
             {
-                addStandardExchange(ExchangeDefaults.DIRECT_EXCHANGE_NAME, ExchangeDefaults.DIRECT_EXCHANGE_CLASS);
-                addStandardExchange(ExchangeDefaults.TOPIC_EXCHANGE_NAME, ExchangeDefaults.TOPIC_EXCHANGE_CLASS);
-                addStandardExchange(ExchangeDefaults.HEADERS_EXCHANGE_NAME, ExchangeDefaults.HEADERS_EXCHANGE_CLASS);
-                addStandardExchange(ExchangeDefaults.FANOUT_EXCHANGE_NAME, ExchangeDefaults.FANOUT_EXCHANGE_CLASS);
-                return _future;
-            }
-
-            private void standardExchangeCreated()
-            {
-                if (_createdExchangeCount.incrementAndGet() == TOTAL_STANDARD_EXCHANGES)
-                {
-                    _future.set(null);
-                }
+                List<ListenableFuture<Void>> standardExchangeFutures = new ArrayList<>();
+                standardExchangeFutures.add(addStandardExchange(ExchangeDefaults.DIRECT_EXCHANGE_NAME,
ExchangeDefaults.DIRECT_EXCHANGE_CLASS));
+                standardExchangeFutures.add(addStandardExchange(ExchangeDefaults.TOPIC_EXCHANGE_NAME,
ExchangeDefaults.TOPIC_EXCHANGE_CLASS));
+                standardExchangeFutures.add(addStandardExchange(ExchangeDefaults.HEADERS_EXCHANGE_NAME,
ExchangeDefaults.HEADERS_EXCHANGE_CLASS));
+                standardExchangeFutures.add(addStandardExchange(ExchangeDefaults.FANOUT_EXCHANGE_NAME,
ExchangeDefaults.FANOUT_EXCHANGE_CLASS));
+                return Futures.allAsList(standardExchangeFutures);
             }
 
             ListenableFuture<Void> addStandardExchange(String name, String type)
@@ -440,18 +429,18 @@ public abstract class AbstractVirtualHos
                         try
                         {
                             childAdded(result);
+                            returnVal.set(null);
                         }
-                        finally
+                        catch (Throwable t)
                         {
-                            standardExchangeCreated();
+                            returnVal.setException(t);
                         }
-
                     }
 
                     @Override
                     public void onFailure(final Throwable t)
                     {
-                        standardExchangeCreated();
+                        returnVal.setException(t);
                     }
                 }, getTaskExecutor().getExecutor());
 
@@ -1494,22 +1483,22 @@ public abstract class AbstractVirtualHos
                                                     return closeChildren();
                                                 }
                                             }).then(new Runnable()
-                                            {
-                                                @Override
-                                                public void run()
-                                                {
-                                                    shutdownHouseKeeping();
-                                                    if (_networkConnectionScheduler != null)
-                                                    {
-                                                        _networkConnectionScheduler.close();
-                                                        _networkConnectionScheduler = null;
-                                                    }
-                                                    closeMessageStore();
-                                                    setState(State.STOPPED);
+        {
+            @Override
+            public void run()
+            {
+                shutdownHouseKeeping();
+                if (_networkConnectionScheduler != null)
+                {
+                    _networkConnectionScheduler.close();
+                    _networkConnectionScheduler = null;
+                }
+                closeMessageStore();
+                setState(State.STOPPED);
 
-                                                    stopLogging(loggers);
-                                                }
-                                            });
+                stopLogging(loggers);
+            }
+        });
     }
 
     private void stopLogging(Collection<VirtualHostLogger> loggers)
@@ -1523,48 +1512,28 @@ public abstract class AbstractVirtualHos
     @StateTransition( currentState = { State.ACTIVE, State.ERRORED }, desiredState = State.DELETED
)
     private ListenableFuture<Void> doDelete()
     {
-        if(_deleted.compareAndSet(false,true))
-        {
-            final SettableFuture<Void> returnVal = SettableFuture.create();
-            String hostName = getName();
-
-            closeAsync().addListener(
-                    new Runnable()
+        return doAfterAlways(closeAsync(),
+                new Runnable()
+                {
+                    @Override
+                    public void run()
                     {
-                        @Override
-                        public void run()
+                        MessageStore ms = getMessageStore();
+                        if (ms != null)
                         {
                             try
                             {
-                                MessageStore ms = getMessageStore();
-                                if (ms != null)
-                                {
-                                    try
-                                    {
-                                        ms.onDelete(AbstractVirtualHost.this);
-                                    }
-                                    catch (Exception e)
-                                    {
-                                        _logger.warn("Exception occurred on message store
deletion", e);
-                                    }
-                                }
-                                deleted();
-                                setState(State.DELETED);
+                                ms.onDelete(AbstractVirtualHost.this);
                             }
-                            finally
+                            catch (Exception e)
                             {
-                                returnVal.set(null);
+                                _logger.warn("Exception occurred on message store deletion",
e);
                             }
                         }
-                    }, getTaskExecutor().getExecutor()
-                               );
-
-            return returnVal;
-        }
-        else
-        {
-            return Futures.immediateFuture(null);
-        }
+                        deleted();
+                        setState(State.DELETED);
+                    }
+                });
     }
 
     public Collection<VirtualHostAlias> getAliases()
@@ -1897,22 +1866,18 @@ public abstract class AbstractVirtualHos
 
         if (isStoreEmpty())
         {
-            final SettableFuture<Void> returnVal = SettableFuture.create();
-            createDefaultExchanges().addListener(new Runnable()
+            return doAfter(createDefaultExchanges(), new Runnable()
             {
                 @Override
                 public void run()
                 {
                     postCreateDefaultExchangeTasks();
-                    returnVal.set(null);
                 }
-            }, getTaskExecutor().getExecutor());
-            return returnVal;
+            });
         }
         else
         {
             postCreateDefaultExchangeTasks();
-
             return Futures.immediateFuture(null);
         }
     }
@@ -1966,30 +1931,6 @@ public abstract class AbstractVirtualHos
         _connectionAssociationListeners.remove(listener);
     }
 
-    private static class ChildCounter
-    {
-        private final AtomicInteger _count = new AtomicInteger();
-        private final Runnable _task;
-
-        private ChildCounter(final Runnable task)
-        {
-            _task = task;
-        }
-
-        public void incrementCount()
-        {
-            _count.incrementAndGet();
-        }
-
-        public void decrementCount()
-        {
-            if(_count.decrementAndGet() == 0)
-            {
-                _task.run();
-            }
-        }
-    }
-
     @StateTransition( currentState = { State.STOPPED, State.ERRORED }, desiredState = State.ACTIVE
)
     private ListenableFuture<Void> onRestart()
     {
@@ -2022,25 +1963,8 @@ public abstract class AbstractVirtualHos
 
         new GenericRecoverer(this).recover(records);
 
-        final SettableFuture<Void> returnVal = SettableFuture.create();
-        final ChildCounter counter = new ChildCounter(new Runnable()
-        {
-            @Override
-            public void run()
-            {
-                onActivate().addListener(
-                        new Runnable()
-                        {
-                            @Override
-                            public void run()
-                            {
-                                returnVal.set(null);
-                            }
-                        }, getTaskExecutor().getExecutor()
-                                        );
-            }
-        });
-        counter.incrementCount();
+        final List<ListenableFuture<Void>> childOpenFutures = new ArrayList<>();
+
         Subject.doAs(SecurityManager.getSubjectWithAddedSystemRights(), new PrivilegedAction<Object>()
         {
             @Override
@@ -2049,24 +1973,41 @@ public abstract class AbstractVirtualHos
                 applyToChildren(new Action<ConfiguredObject<?>>()
                 {
                     @Override
-                    public void performAction(final ConfiguredObject<?> object)
+                    public void performAction(final ConfiguredObject<?> child)
                     {
-                        counter.incrementCount();
-                        object.openAsync().addListener(new Runnable()
+                        final ListenableFuture<Void> childOpenFuture = child.openAsync();
+                        childOpenFutures.add(childOpenFuture);
+
+                        Futures.addCallback(childOpenFuture, new FutureCallback<Void>()
                         {
                             @Override
-                            public void run()
+                            public void onSuccess(final Void result)
+                            {
+                            }
+
+                            @Override
+                            public void onFailure(final Throwable t)
                             {
-                                counter.decrementCount();
+                                _logger.error("Exception occurred while opening {} : {}",
+                                              new Object[]{child.getClass().getSimpleName(),
child.getName(), t});
                             }
-                        }, getTaskExecutor().getExecutor());
+
+                        });
                     }
                 });
                 return null;
             }
         });
-        counter.decrementCount();
-        return returnVal;
+
+        ListenableFuture<List<Void>> combinedFuture = Futures.allAsList(childOpenFutures);
+        return Futures.transform(combinedFuture, new AsyncFunction<List<Void>, Void>()
+        {
+            @Override
+            public ListenableFuture<Void> apply(final List<Void> input) throws
Exception
+            {
+                return onActivate();
+            }
+        });
     }
 
     private class FileSystemSpaceChecker extends HouseKeepingTask

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractVirtualHostNode.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractVirtualHostNode.java?rev=1695883&r1=1695882&r2=1695883&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractVirtualHostNode.java
(original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractVirtualHostNode.java
Fri Aug 14 12:55:49 2015
@@ -177,12 +177,15 @@ public abstract class AbstractVirtualHos
                                     @Override
                                     public void onFailure(final Throwable t)
                                     {
-
                                         setState(State.ERRORED);
-                                        returnVal.set(null);
                                         if (_broker.isManagementMode())
                                         {
                                             LOGGER.warn("Failed to make " + this + " active.",
t);
+                                            returnVal.set(null);
+                                        }
+                                        else
+                                        {
+                                            returnVal.setException(t);
                                         }
                                     }
                                 }, getTaskExecutor().getExecutor()
@@ -282,33 +285,64 @@ public abstract class AbstractVirtualHos
     @StateTransition( currentState = { State.ACTIVE, State.STOPPED, State.ERRORED}, desiredState
= State.DELETED )
     protected ListenableFuture<Void> doDelete()
     {
-        final SettableFuture<Void> returnVal = SettableFuture.create();
-        setState(State.DELETED);
-        deleteVirtualHostIfExists();
-        final ListenableFuture<Void> closeFuture = closeAsync();
-        closeFuture.addListener(new Runnable()
+        final SettableFuture<Void> futureResult = SettableFuture.create();
+
+        // Delete the node only if deletion of the virtualhost succeeds.
+        Futures.addCallback(deleteVirtualHostIfExists(), new FutureCallback<Void>()
         {
             @Override
-            public void run()
+            public void onSuccess(final Void result)
             {
-                try
+                Futures.addCallback(closeAsync(), new FutureCallback<Void>()
                 {
-                    deleted();
-                    DurableConfigurationStore configurationStore = getConfigurationStore();
-                    if (configurationStore != null)
+                    @Override
+                    public void onSuccess(final Void result)
                     {
-                        configurationStore.onDelete(AbstractVirtualHostNode.this);
+                        try
+                        {
+                            delete();
+                            futureResult.set(null);
+                        }
+                        catch (Throwable t)
+                        {
+                            futureResult.setException(t);
+                        }
                     }
-                }
-                finally
-                {
-                    returnVal.set(null);
-                }
+
+                    @Override
+                    public void onFailure(final Throwable t)
+                    {
+                        try
+                        {
+                            delete();
+                        }
+                        finally
+                        {
+                            futureResult.setException(t);
+                        }
+                    }
+
+                    private void delete()
+                    {
+                        deleted();
+                        setState(State.DELETED);
+                        DurableConfigurationStore configurationStore = getConfigurationStore();
+                        if (configurationStore != null)
+                        {
+                            configurationStore.onDelete(AbstractVirtualHostNode.this);
+                        }
+                    }
+                });
             }
-        }, getTaskExecutor().getExecutor());
 
-        return returnVal;
+            @Override
+            public void onFailure(final Throwable t)
+            {
+                futureResult.setException(t);
+            }
+        });
 
+        return futureResult;
     }
 
     protected ListenableFuture<Void> deleteVirtualHostIfExists()
@@ -332,21 +366,16 @@ public abstract class AbstractVirtualHos
 
     protected ListenableFuture<Void> stopAndSetStateTo(final State stoppedState)
     {
-        final SettableFuture<Void> returnVal = SettableFuture.create();
-
         ListenableFuture<Void> childCloseFuture = closeChildren();
-        childCloseFuture.addListener(new Runnable()
+        return doAfterAlways(childCloseFuture, new Runnable()
         {
             @Override
             public void run()
             {
                 closeConfigurationStoreSafely();
                 setState(stoppedState);
-                returnVal.set(null);
             }
-        }, getTaskExecutor().getExecutor());
-
-        return returnVal;
+        });
     }
 
     @Override

Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java?rev=1695883&r1=1695882&r2=1695883&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java
(original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java
Fri Aug 14 12:55:49 2015
@@ -264,6 +264,7 @@ public class VirtualHostTest extends Qpi
 
         AMQPConnection modelConnection = mock(AMQPConnection.class);
         when(modelConnection.getUnderlyingConnection()).thenReturn(connection);
+        when(modelConnection.closeAsync()).thenReturn(Futures.immediateFuture(null));
         virtualHost.registerConnection(modelConnection);
 
         assertEquals("Unexpected number of connections after connection registered", 1, virtualHost.getConnectionCount());

Modified: qpid/java/trunk/broker-plugins/access-control/src/main/java/org/apache/qpid/server/security/access/plugins/ACLFileAccessControlProviderImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/access-control/src/main/java/org/apache/qpid/server/security/access/plugins/ACLFileAccessControlProviderImpl.java?rev=1695883&r1=1695882&r2=1695883&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/access-control/src/main/java/org/apache/qpid/server/security/access/plugins/ACLFileAccessControlProviderImpl.java
(original)
+++ qpid/java/trunk/broker-plugins/access-control/src/main/java/org/apache/qpid/server/security/access/plugins/ACLFileAccessControlProviderImpl.java
Fri Aug 14 12:55:49 2015
@@ -208,28 +208,17 @@ public class ACLFileAccessControlProvide
     @StateTransition(currentState = {State.ACTIVE, State.QUIESCED, State.ERRORED}, desiredState
= State.DELETED)
     private ListenableFuture<Void> doDelete()
     {
-        final SettableFuture<Void> returnVal = SettableFuture.create();
-        closeAsync().addListener(
+        return doAfterAlways(closeAsync(),
                 new Runnable()
                 {
                     @Override
                     public void run()
                     {
-                        try
-                        {
-
-                            setState(State.DELETED);
-                            deleted();
-                        }
-                        finally
-                        {
-                            returnVal.set(null);
-                            _eventLogger.message(AccessControlMessages.DELETE(getName()));
-                        }
+                        setState(State.DELETED);
+                        deleted();
+                        _eventLogger.message(AccessControlMessages.DELETE(getName()));
                     }
-                }, getTaskExecutor().getExecutor()
-                           );
-        return returnVal;
+                });
     }
 
     public AccessControl getAccessControl()



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message