qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kw...@apache.org
Subject [1/3] qpid-broker-j git commit: QPID-8079: [Broker-J] Ensure that actions associated with AsyncCommand are rolled back if the underlying future completes unsucessfully
Date Tue, 16 Jan 2018 13:17:22 GMT
Repository: qpid-broker-j
Updated Branches:
  refs/heads/master 17d4b5607 -> 5fa398cfc


QPID-8079: [Broker-J] Ensure that actions associated with AsyncCommand are rolled back if
the underlying future completes unsucessfully


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/5fa398cf
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/5fa398cf
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/5fa398cf

Branch: refs/heads/master
Commit: 5fa398cfc2141d6ec090df3464f5bcbf63d7b023
Parents: fb98e76
Author: Keith Wall <kwall@apache.org>
Authored: Tue Jan 16 11:50:09 2018 +0000
Committer: Keith Wall <kwall@apache.org>
Committed: Tue Jan 16 13:13:47 2018 +0000

----------------------------------------------------------------------
 .../server/txn/AsyncAutoCommitTransaction.java  |  4 +-
 .../apache/qpid/server/txn/AsyncCommand.java    | 98 ++++++++++++++++++++
 .../server/protocol/v0_10/ServerSession.java    | 62 +------------
 .../qpid/server/protocol/v0_8/AMQChannel.java   | 57 +-----------
 .../v1_0/StandardReceivingLinkEndpoint.java     | 57 +-----------
 5 files changed, 103 insertions(+), 175 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/5fa398cf/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java
b/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java
index 1f65aa3..32e3a6c 100755
--- a/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java
@@ -212,7 +212,7 @@ public class AsyncAutoCommitTransaction implements ServerTransaction
 
                 txn = _messageStore.newTransaction();
                 enqueueRecord = txn.enqueueMessage(queue, message);
-                future = txn.commitTranAsync((Void) null);
+                future = txn.commitTranAsync(null);
                 txn = null;
             }
             else
@@ -232,7 +232,7 @@ public class AsyncAutoCommitTransaction implements ServerTransaction
                 @Override
                 public void onRollback()
                 {
-                    underlying.postCommit(enqueueRecord);
+                    underlying.onRollback();
                 }
             }, message.isPersistent());
             postTransactionAction = null;

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/5fa398cf/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncCommand.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncCommand.java b/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncCommand.java
new file mode 100644
index 0000000..29ce605
--- /dev/null
+++ b/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncCommand.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.server.txn;
+
+import java.util.concurrent.ExecutionException;
+
+import com.google.common.util.concurrent.ListenableFuture;
+
+import org.apache.qpid.server.util.ServerScopedRuntimeException;
+
+public class AsyncCommand
+{
+    private final ListenableFuture<Void> _future;
+    private ServerTransaction.Action _action;
+
+    public AsyncCommand(final ListenableFuture<Void> future, final ServerTransaction.Action
action)
+    {
+        _future = future;
+        _action = action;
+    }
+
+    public void complete()
+    {
+        boolean interrupted = false;
+        boolean success = false;
+        try
+        {
+            while (true)
+            {
+                try
+                {
+                    _future.get();
+                    break;
+                }
+                catch (InterruptedException e)
+                {
+                    interrupted = true;
+                }
+
+            }
+            success = true;
+        }
+        catch(ExecutionException e)
+        {
+            if(e.getCause() instanceof RuntimeException)
+            {
+                throw (RuntimeException)e.getCause();
+            }
+            else if(e.getCause() instanceof Error)
+            {
+                throw (Error) e.getCause();
+            }
+            else
+            {
+                throw new ServerScopedRuntimeException(e.getCause());
+            }
+        }
+        finally
+        {
+            if(interrupted)
+            {
+                Thread.currentThread().interrupt();
+            }
+            if (success)
+            {
+                _action.postCommit();
+            }
+            else
+            {
+                _action.onRollback();
+            }
+            _action = null;
+        }
+    }
+
+    public boolean isReadyForCompletion()
+    {
+        return _future.isDone();
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/5fa398cf/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
index f2bb0cc..b224fa6 100644
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
@@ -56,7 +56,6 @@ import java.util.SortedMap;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -68,6 +67,7 @@ import com.google.common.util.concurrent.ListenableFuture;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.qpid.server.txn.AsyncCommand;
 import org.apache.qpid.server.logging.LogMessage;
 import org.apache.qpid.server.logging.LogSubject;
 import org.apache.qpid.server.logging.messages.ChannelMessages;
@@ -97,7 +97,6 @@ import org.apache.qpid.server.txn.SuspendAndFailDtxException;
 import org.apache.qpid.server.txn.TimeoutDtxException;
 import org.apache.qpid.server.txn.UnknownDtxBranchException;
 import org.apache.qpid.server.util.Action;
-import org.apache.qpid.server.util.ServerScopedRuntimeException;
 
 public class ServerSession extends SessionInvoker
         implements LogSubject, AsyncAutoCommitTransaction.FutureRecorder
@@ -1636,65 +1635,6 @@ public class ServerSession extends SessionInvoker
         _unfinishedCommandsQueue.add(new AsyncCommand(future, action));
     }
 
-    private static class AsyncCommand
-    {
-        private final ListenableFuture<Void> _future;
-        private ServerTransaction.Action _action;
-
-        public AsyncCommand(final ListenableFuture<Void> future, final ServerTransaction.Action
action)
-        {
-            _future = future;
-            _action = action;
-        }
-
-        void complete()
-        {
-            boolean interrupted = false;
-            try
-            {
-                while (true)
-                {
-                    try
-                    {
-                        _future.get();
-                        break;
-                    }
-                    catch (InterruptedException e)
-                    {
-                        interrupted = true;
-                    }
-
-                }
-            }
-            catch(ExecutionException e)
-            {
-                if(e.getCause() instanceof RuntimeException)
-                {
-                    throw (RuntimeException)e.getCause();
-                }
-                else if(e.getCause() instanceof Error)
-                {
-                    throw (Error) e.getCause();
-                }
-                else
-                {
-                    throw new ServerScopedRuntimeException(e.getCause());
-                }
-            }
-            if(interrupted)
-            {
-                Thread.currentThread().interrupt();
-            }
-            _action.postCommit();
-            _action = null;
-        }
-
-        boolean isReadyForCompletion()
-        {
-            return _future.isDone();
-        }
-    }
-
     public void setModelObject(final Session_0_10 session)
     {
         _modelObject = session;

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/5fa398cf/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
index 34571cf..c0e3389 100644
--- a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
+++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
@@ -38,7 +38,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import javax.security.auth.Subject;
@@ -96,10 +95,10 @@ import org.apache.qpid.server.store.StoredMessage;
 import org.apache.qpid.server.store.TransactionLogResource;
 import org.apache.qpid.server.transport.AMQPConnection;
 import org.apache.qpid.server.txn.AsyncAutoCommitTransaction;
+import org.apache.qpid.server.txn.AsyncCommand;
 import org.apache.qpid.server.txn.LocalTransaction;
 import org.apache.qpid.server.txn.ServerTransaction;
 import org.apache.qpid.server.util.Action;
-import org.apache.qpid.server.util.ServerScopedRuntimeException;
 import org.apache.qpid.server.virtualhost.MessageDestinationIsAlternateException;
 import org.apache.qpid.server.virtualhost.RequiredExchangeException;
 import org.apache.qpid.server.virtualhost.ReservedExchangeNameException;
@@ -1638,60 +1637,6 @@ public class AMQChannel extends AbstractAMQPSession<AMQChannel,
ConsumerTarget_0
         }
     }
 
-    private static class AsyncCommand
-    {
-        private final ListenableFuture<Void> _future;
-        private ServerTransaction.Action _action;
-
-        public AsyncCommand(final ListenableFuture<Void> future, final ServerTransaction.Action
action)
-        {
-            _future = future;
-            _action = action;
-        }
-
-        void complete()
-        {
-            boolean interrupted = false;
-            try
-            {
-                while (true)
-                {
-                    try
-                    {
-                        _future.get();
-                        break;
-                    }
-                    catch (InterruptedException e)
-                    {
-                        interrupted = true;
-                    }
-
-                }
-            }
-            catch(ExecutionException e)
-            {
-                if(e.getCause() instanceof RuntimeException)
-                {
-                    throw (RuntimeException)e.getCause();
-                }
-                else if(e.getCause() instanceof Error)
-                {
-                    throw (Error) e.getCause();
-                }
-                else
-                {
-                    throw new ServerScopedRuntimeException(e.getCause());
-                }
-            }
-            if(interrupted)
-            {
-                Thread.currentThread().interrupt();
-            }
-            _action.postCommit();
-            _action = null;
-        }
-    }
-
     @Override
     public long getTransactionStartTimeLong()
     {

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/5fa398cf/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
index fe5c4db..0512968 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
@@ -33,7 +33,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
-import java.util.concurrent.ExecutionException;
 
 import com.google.common.util.concurrent.ListenableFuture;
 import org.slf4j.Logger;
@@ -69,10 +68,10 @@ import org.apache.qpid.server.protocol.v1_0.type.transport.Detach;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
 import org.apache.qpid.server.protocol.v1_0.type.transport.ReceiverSettleMode;
 import org.apache.qpid.server.txn.AsyncAutoCommitTransaction;
+import org.apache.qpid.server.txn.AsyncCommand;
 import org.apache.qpid.server.txn.AutoCommitTransaction;
 import org.apache.qpid.server.txn.LocalTransaction;
 import org.apache.qpid.server.txn.ServerTransaction;
-import org.apache.qpid.server.util.ServerScopedRuntimeException;
 
 public class StandardReceivingLinkEndpoint extends AbstractReceivingLinkEndpoint<Target>
         implements AsyncAutoCommitTransaction.FutureRecorder
@@ -605,60 +604,6 @@ public class StandardReceivingLinkEndpoint extends AbstractReceivingLinkEndpoint
         }
     }
 
-    private static class AsyncCommand
-    {
-        private final ListenableFuture<Void> _future;
-        private ServerTransaction.Action _action;
-
-        public AsyncCommand(final ListenableFuture<Void> future, final ServerTransaction.Action
action)
-        {
-            _future = future;
-            _action = action;
-        }
-
-        void complete()
-        {
-            boolean interrupted = false;
-            try
-            {
-                while (true)
-                {
-                    try
-                    {
-                        _future.get();
-                        break;
-                    }
-                    catch (InterruptedException e)
-                    {
-                        interrupted = true;
-                    }
-
-                }
-            }
-            catch(ExecutionException e)
-            {
-                if(e.getCause() instanceof RuntimeException)
-                {
-                    throw (RuntimeException)e.getCause();
-                }
-                else if(e.getCause() instanceof java.lang.Error)
-                {
-                    throw (java.lang.Error) e.getCause();
-                }
-                else
-                {
-                    throw new ServerScopedRuntimeException(e.getCause());
-                }
-            }
-            if(interrupted)
-            {
-                Thread.currentThread().interrupt();
-            }
-            _action.postCommit();
-            _action = null;
-        }
-    }
-
     private static class PendingDispositionHolder
     {
         private final Binary _deliveryTag;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message