From commits-return-43446-archive-asf-public=cust-asf.ponee.io@qpid.apache.org Tue Jan 16 14:17:27 2018 Return-Path: X-Original-To: archive-asf-public@eu.ponee.io Delivered-To: archive-asf-public@eu.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by mx-eu-01.ponee.io (Postfix) with ESMTP id 4821118065B for ; Tue, 16 Jan 2018 14:17:27 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 37F10160C34; Tue, 16 Jan 2018 13:17:27 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 2FC19160C26 for ; Tue, 16 Jan 2018 14:17:26 +0100 (CET) Received: (qmail 18558 invoked by uid 500); 16 Jan 2018 13:17:25 -0000 Mailing-List: contact commits-help@qpid.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@qpid.apache.org Delivered-To: mailing list commits@qpid.apache.org Received: (qmail 18549 invoked by uid 99); 16 Jan 2018 13:17:25 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 16 Jan 2018 13:17:25 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 8D150E1859; Tue, 16 Jan 2018 13:17:22 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: kwall@apache.org To: commits@qpid.apache.org Date: Tue, 16 Jan 2018 13:17:22 -0000 Message-Id: <903e51258e5248cda0ef483d874887aa@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [1/3] qpid-broker-j git commit: QPID-8079: [Broker-J] Ensure that actions associated with AsyncCommand are rolled back if the underlying future completes unsucessfully Repository: qpid-broker-j Updated Branches: refs/heads/master 17d4b5607 -> 5fa398cfc QPID-8079: [Broker-J] Ensure that actions associated with AsyncCommand are rolled back if the underlying future completes unsucessfully Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/5fa398cf Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/5fa398cf Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/5fa398cf Branch: refs/heads/master Commit: 5fa398cfc2141d6ec090df3464f5bcbf63d7b023 Parents: fb98e76 Author: Keith Wall Authored: Tue Jan 16 11:50:09 2018 +0000 Committer: Keith Wall Committed: Tue Jan 16 13:13:47 2018 +0000 ---------------------------------------------------------------------- .../server/txn/AsyncAutoCommitTransaction.java | 4 +- .../apache/qpid/server/txn/AsyncCommand.java | 98 ++++++++++++++++++++ .../server/protocol/v0_10/ServerSession.java | 62 +------------ .../qpid/server/protocol/v0_8/AMQChannel.java | 57 +----------- .../v1_0/StandardReceivingLinkEndpoint.java | 57 +----------- 5 files changed, 103 insertions(+), 175 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/5fa398cf/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java ---------------------------------------------------------------------- diff --git a/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java b/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java index 1f65aa3..32e3a6c 100755 --- a/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java +++ b/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java @@ -212,7 +212,7 @@ public class AsyncAutoCommitTransaction implements ServerTransaction txn = _messageStore.newTransaction(); enqueueRecord = txn.enqueueMessage(queue, message); - future = txn.commitTranAsync((Void) null); + future = txn.commitTranAsync(null); txn = null; } else @@ -232,7 +232,7 @@ public class AsyncAutoCommitTransaction implements ServerTransaction @Override public void onRollback() { - underlying.postCommit(enqueueRecord); + underlying.onRollback(); } }, message.isPersistent()); postTransactionAction = null; http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/5fa398cf/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncCommand.java ---------------------------------------------------------------------- diff --git a/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncCommand.java b/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncCommand.java new file mode 100644 index 0000000..29ce605 --- /dev/null +++ b/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncCommand.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.server.txn; + +import java.util.concurrent.ExecutionException; + +import com.google.common.util.concurrent.ListenableFuture; + +import org.apache.qpid.server.util.ServerScopedRuntimeException; + +public class AsyncCommand +{ + private final ListenableFuture _future; + private ServerTransaction.Action _action; + + public AsyncCommand(final ListenableFuture future, final ServerTransaction.Action action) + { + _future = future; + _action = action; + } + + public void complete() + { + boolean interrupted = false; + boolean success = false; + try + { + while (true) + { + try + { + _future.get(); + break; + } + catch (InterruptedException e) + { + interrupted = true; + } + + } + success = true; + } + catch(ExecutionException e) + { + if(e.getCause() instanceof RuntimeException) + { + throw (RuntimeException)e.getCause(); + } + else if(e.getCause() instanceof Error) + { + throw (Error) e.getCause(); + } + else + { + throw new ServerScopedRuntimeException(e.getCause()); + } + } + finally + { + if(interrupted) + { + Thread.currentThread().interrupt(); + } + if (success) + { + _action.postCommit(); + } + else + { + _action.onRollback(); + } + _action = null; + } + } + + public boolean isReadyForCompletion() + { + return _future.isDone(); + } +} http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/5fa398cf/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java ---------------------------------------------------------------------- diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java index f2bb0cc..b224fa6 100644 --- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java +++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java @@ -56,7 +56,6 @@ import java.util.SortedMap; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentSkipListMap; -import java.util.concurrent.ExecutionException; import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -68,6 +67,7 @@ import com.google.common.util.concurrent.ListenableFuture; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.qpid.server.txn.AsyncCommand; import org.apache.qpid.server.logging.LogMessage; import org.apache.qpid.server.logging.LogSubject; import org.apache.qpid.server.logging.messages.ChannelMessages; @@ -97,7 +97,6 @@ import org.apache.qpid.server.txn.SuspendAndFailDtxException; import org.apache.qpid.server.txn.TimeoutDtxException; import org.apache.qpid.server.txn.UnknownDtxBranchException; import org.apache.qpid.server.util.Action; -import org.apache.qpid.server.util.ServerScopedRuntimeException; public class ServerSession extends SessionInvoker implements LogSubject, AsyncAutoCommitTransaction.FutureRecorder @@ -1636,65 +1635,6 @@ public class ServerSession extends SessionInvoker _unfinishedCommandsQueue.add(new AsyncCommand(future, action)); } - private static class AsyncCommand - { - private final ListenableFuture _future; - private ServerTransaction.Action _action; - - public AsyncCommand(final ListenableFuture future, final ServerTransaction.Action action) - { - _future = future; - _action = action; - } - - void complete() - { - boolean interrupted = false; - try - { - while (true) - { - try - { - _future.get(); - break; - } - catch (InterruptedException e) - { - interrupted = true; - } - - } - } - catch(ExecutionException e) - { - if(e.getCause() instanceof RuntimeException) - { - throw (RuntimeException)e.getCause(); - } - else if(e.getCause() instanceof Error) - { - throw (Error) e.getCause(); - } - else - { - throw new ServerScopedRuntimeException(e.getCause()); - } - } - if(interrupted) - { - Thread.currentThread().interrupt(); - } - _action.postCommit(); - _action = null; - } - - boolean isReadyForCompletion() - { - return _future.isDone(); - } - } - public void setModelObject(final Session_0_10 session) { _modelObject = session; http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/5fa398cf/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java ---------------------------------------------------------------------- diff --git a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java index 34571cf..c0e3389 100644 --- a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java +++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java @@ -38,7 +38,6 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.UUID; -import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicBoolean; import javax.security.auth.Subject; @@ -96,10 +95,10 @@ import org.apache.qpid.server.store.StoredMessage; import org.apache.qpid.server.store.TransactionLogResource; import org.apache.qpid.server.transport.AMQPConnection; import org.apache.qpid.server.txn.AsyncAutoCommitTransaction; +import org.apache.qpid.server.txn.AsyncCommand; import org.apache.qpid.server.txn.LocalTransaction; import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.util.Action; -import org.apache.qpid.server.util.ServerScopedRuntimeException; import org.apache.qpid.server.virtualhost.MessageDestinationIsAlternateException; import org.apache.qpid.server.virtualhost.RequiredExchangeException; import org.apache.qpid.server.virtualhost.ReservedExchangeNameException; @@ -1638,60 +1637,6 @@ public class AMQChannel extends AbstractAMQPSession _future; - private ServerTransaction.Action _action; - - public AsyncCommand(final ListenableFuture future, final ServerTransaction.Action action) - { - _future = future; - _action = action; - } - - void complete() - { - boolean interrupted = false; - try - { - while (true) - { - try - { - _future.get(); - break; - } - catch (InterruptedException e) - { - interrupted = true; - } - - } - } - catch(ExecutionException e) - { - if(e.getCause() instanceof RuntimeException) - { - throw (RuntimeException)e.getCause(); - } - else if(e.getCause() instanceof Error) - { - throw (Error) e.getCause(); - } - else - { - throw new ServerScopedRuntimeException(e.getCause()); - } - } - if(interrupted) - { - Thread.currentThread().interrupt(); - } - _action.postCommit(); - _action = null; - } - } - @Override public long getTransactionStartTimeLong() { http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/5fa398cf/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java ---------------------------------------------------------------------- diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java index fe5c4db..0512968 100644 --- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java +++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java @@ -33,7 +33,6 @@ import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Set; -import java.util.concurrent.ExecutionException; import com.google.common.util.concurrent.ListenableFuture; import org.slf4j.Logger; @@ -69,10 +68,10 @@ import org.apache.qpid.server.protocol.v1_0.type.transport.Detach; import org.apache.qpid.server.protocol.v1_0.type.transport.Error; import org.apache.qpid.server.protocol.v1_0.type.transport.ReceiverSettleMode; import org.apache.qpid.server.txn.AsyncAutoCommitTransaction; +import org.apache.qpid.server.txn.AsyncCommand; import org.apache.qpid.server.txn.AutoCommitTransaction; import org.apache.qpid.server.txn.LocalTransaction; import org.apache.qpid.server.txn.ServerTransaction; -import org.apache.qpid.server.util.ServerScopedRuntimeException; public class StandardReceivingLinkEndpoint extends AbstractReceivingLinkEndpoint implements AsyncAutoCommitTransaction.FutureRecorder @@ -605,60 +604,6 @@ public class StandardReceivingLinkEndpoint extends AbstractReceivingLinkEndpoint } } - private static class AsyncCommand - { - private final ListenableFuture _future; - private ServerTransaction.Action _action; - - public AsyncCommand(final ListenableFuture future, final ServerTransaction.Action action) - { - _future = future; - _action = action; - } - - void complete() - { - boolean interrupted = false; - try - { - while (true) - { - try - { - _future.get(); - break; - } - catch (InterruptedException e) - { - interrupted = true; - } - - } - } - catch(ExecutionException e) - { - if(e.getCause() instanceof RuntimeException) - { - throw (RuntimeException)e.getCause(); - } - else if(e.getCause() instanceof java.lang.Error) - { - throw (java.lang.Error) e.getCause(); - } - else - { - throw new ServerScopedRuntimeException(e.getCause()); - } - } - if(interrupted) - { - Thread.currentThread().interrupt(); - } - _action.postCommit(); - _action = null; - } - } - private static class PendingDispositionHolder { private final Binary _deliveryTag; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org For additional commands, e-mail: commits-help@qpid.apache.org