qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kw...@apache.org
Subject svn commit: r1671810 - in /qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter: ConnectionAdapter.java SessionAdapter.java
Date Tue, 07 Apr 2015 11:30:50 GMT
Author: kwall
Date: Tue Apr  7 11:30:50 2015
New Revision: 1671810

URL: http://svn.apache.org/r1671810
Log:
QPID-5818: [Java Broker] Ensure that connection/session use a configuration thread to mutate
the model on receipt of close from wire

Modified:
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java?rev=1671810&r1=1671809&r2=1671810&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java
(original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java
Tue Apr  7 11:30:50 2015
@@ -27,6 +27,7 @@ import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
 import org.slf4j.Logger;
@@ -34,7 +35,6 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.server.model.AbstractConfiguredObject;
-import org.apache.qpid.server.model.CloseFuture;
 import org.apache.qpid.server.model.ConfiguredObject;
 import org.apache.qpid.server.model.Connection;
 import org.apache.qpid.server.model.Port;
@@ -54,8 +54,7 @@ public final class ConnectionAdapter ext
 
     private final Action _underlyingConnectionDeleteTask;
     private final AtomicBoolean _underlyingClosed = new AtomicBoolean(false);
-    private AMQConnectionModel _underlyingConnection;
-    private final AtomicBoolean _closing = new AtomicBoolean();
+    private final AMQConnectionModel _underlyingConnection;
 
     public ConnectionAdapter(final AMQConnectionModel conn)
     {
@@ -70,7 +69,7 @@ public final class ConnectionAdapter ext
             {
                 conn.removeDeleteTask(this);
                 _underlyingClosed.set(true);
-                deleted();
+                deleteAsync();
             }
         };
         conn.addDeleteTask(_underlyingConnectionDeleteTask);
@@ -163,38 +162,52 @@ public final class ConnectionAdapter ext
     @StateTransition( currentState = State.ACTIVE, desiredState = State.DELETED)
     private ListenableFuture<Void> doDelete()
     {
-        final SettableFuture<Void> returnVal = SettableFuture.create();
-        asyncClose().addListener(
-                new Runnable()
-                {
-                    @Override
-                    public void run()
+        if (_underlyingClosed.get())
+        {
+            deleted();
+            return Futures.immediateFuture(null);
+        }
+        else
+        {
+            final SettableFuture<Void> returnVal = SettableFuture.create();
+            asyncCloseUnderlying().addListener(
+                    new Runnable()
                     {
-                        try
-                        {
-                            deleted();
-                            setState(State.DELETED);
-                        }
-                        finally
+                        @Override
+                        public void run()
                         {
-                            returnVal.set(null);
+                            try
+                            {
+                                deleted();
+                                setState(State.DELETED);
+                            }
+                            finally
+                            {
+                                returnVal.set(null);
+                            }
                         }
-                    }
-                }, getTaskExecutor().getExecutor()
-                                );
-        return returnVal;
+                    }, getTaskExecutor().getExecutor()
+                                    );
+            return returnVal;
+        }
     }
 
     @Override
     protected ListenableFuture<Void> beforeClose()
     {
-        _closing.set(true);
+        if (_underlyingClosed.get())
+        {
+            return Futures.immediateFuture(null);
+        }
+        else
+        {
 
-        return asyncClose();
+            return asyncCloseUnderlying();
+        }
 
     }
 
-    private ListenableFuture<Void> asyncClose()
+    private ListenableFuture<Void> asyncCloseUnderlying()
     {
         final SettableFuture<Void> closeFuture = SettableFuture.create();
 
@@ -206,6 +219,7 @@ public final class ConnectionAdapter ext
                 closeFuture.set(null);
             }
         });
+        _underlyingConnection.removeDeleteTask(_underlyingConnectionDeleteTask);
 
         _underlyingConnection.closeAsync(AMQConstant.CONNECTION_FORCED, "Connection closed
by external action");
         return closeFuture;
@@ -279,56 +293,4 @@ public final class ConnectionAdapter ext
     {
         // SessionAdapter installs delete task to cause session model object to delete
     }
-
-
-    private static class ConnectionCloseFuture implements CloseFuture
-    {
-        private boolean _closed;
-
-        public synchronized void connectionClosed()
-        {
-            _closed = true;
-            notifyAll();
-
-        }
-
-        @Override
-        public void runWhenComplete(final Runnable closeRunnable)
-        {
-            if (_closed )
-            {
-                closeRunnable.run();
-            }
-            else
-            {
-                Thread t = new Thread(new Runnable()
-                {
-                    @Override
-                    public void run()
-                    {
-                        synchronized (ConnectionCloseFuture.this)
-                        {
-                            while (!_closed)
-                            {
-                                try
-                                {
-                                    ConnectionCloseFuture.this.wait();
-                                }
-                                catch (InterruptedException e)
-                                {
-                                }
-                            }
-
-                            closeRunnable.run();
-                        }
-                    }
-                });
-
-                t.setDaemon(true);
-                t.start();
-
-            }
-        }
-    }
-
 }

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java?rev=1671810&r1=1671809&r2=1671810&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java
(original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java
Tue Apr  7 11:30:50 2015
@@ -73,7 +73,7 @@ final class SessionAdapter extends Abstr
             public void performAction(final Object object)
             {
                 session.removeDeleteTask(this);
-                deleted();
+                deleteAsync();
             }
         });
         setState(State.ACTIVE);



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message