qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From oru...@apache.org
Subject qpid-broker-j git commit: QPID-8091: [Broker-J] [AMQP 1.0] Add store transaction timeout feature
Date Tue, 06 Feb 2018 17:46:28 GMT
Repository: qpid-broker-j
Updated Branches:
  refs/heads/master dfebfa23b -> ffd5ad0d4


QPID-8091: [Broker-J] [AMQP 1.0] Add store transaction timeout feature


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/ffd5ad0d
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/ffd5ad0d
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/ffd5ad0d

Branch: refs/heads/master
Commit: ffd5ad0d456532fb6c9b0ba4e28297c3452bf32c
Parents: dfebfa2
Author: Alex Rudyy <orudyy@apache.org>
Authored: Tue Feb 6 15:48:36 2018 +0000
Committer: Alex Rudyy <orudyy@apache.org>
Committed: Tue Feb 6 17:46:18 2018 +0000

----------------------------------------------------------------------
 .../apache/qpid/server/session/AMQPSession.java |   2 -
 .../server/session/AbstractAMQPSession.java     | 125 -------------------
 .../qpid/server/transport/AMQPConnection.java   |   6 +
 .../transport/AbstractAMQPConnection.java       |  76 +++++++++++
 .../server/protocol/v0_10/ServerSession.java    |  25 ++--
 .../server/protocol/v0_10/Session_0_10.java     |   6 -
 .../qpid/server/protocol/v0_8/AMQChannel.java   |  28 +++--
 .../protocol/v1_0/AMQPConnection_1_0Impl.java   |   7 ++
 .../qpid/server/protocol/v1_0/Session_1_0.java  |   6 -
 .../TxnCoordinatorReceivingLinkEndpoint.java    |  45 +++++--
 .../TransactionTimeoutTest.java                 |  21 +++-
 11 files changed, 170 insertions(+), 177 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ffd5ad0d/broker-core/src/main/java/org/apache/qpid/server/session/AMQPSession.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/session/AMQPSession.java b/broker-core/src/main/java/org/apache/qpid/server/session/AMQPSession.java
index 1b224f1..d68592b 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/session/AMQPSession.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/session/AMQPSession.java
@@ -45,8 +45,6 @@ public interface AMQPSession<S extends org.apache.qpid.server.session.AMQPSessio
 
     LogSubject getLogSubject();
 
-    void doTimeoutAction(String reason);
-
     void block(Queue<?> queue);
 
     void unblock(Queue<?> queue);

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ffd5ad0d/broker-core/src/main/java/org/apache/qpid/server/session/AbstractAMQPSession.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/session/AbstractAMQPSession.java
b/broker-core/src/main/java/org/apache/qpid/server/session/AbstractAMQPSession.java
index 6ca5e5e..db69dd9 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/session/AbstractAMQPSession.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/session/AbstractAMQPSession.java
@@ -21,7 +21,6 @@
 package org.apache.qpid.server.session;
 
 import java.security.AccessControlContext;
-import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
@@ -32,7 +31,6 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 import javax.security.auth.Subject;
 
-import com.google.common.base.Supplier;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.MoreExecutors;
 
@@ -50,24 +48,19 @@ import org.apache.qpid.server.model.Broker;
 import org.apache.qpid.server.model.ConfiguredObject;
 import org.apache.qpid.server.model.Connection;
 import org.apache.qpid.server.model.LifetimePolicy;
-import org.apache.qpid.server.model.NamedAddressSpace;
 import org.apache.qpid.server.model.Session;
 import org.apache.qpid.server.model.State;
 import org.apache.qpid.server.protocol.PublishAuthorisationCache;
 import org.apache.qpid.server.security.SecurityToken;
 import org.apache.qpid.server.transport.AMQPConnection;
-import org.apache.qpid.server.transport.TransactionTimeoutTicker;
 import org.apache.qpid.server.transport.network.Ticker;
 import org.apache.qpid.server.util.Action;
-import org.apache.qpid.server.virtualhost.QueueManagingVirtualHost;
 
 public abstract class AbstractAMQPSession<S extends AbstractAMQPSession<S, X>,
                                           X extends ConsumerTarget<X>>
         extends AbstractConfiguredObject<S>
         implements AMQPSession<S, X>, EventLoggerProvider
 {
-    private static final String OPEN_TRANSACTION_TIMEOUT_ERROR = "Open transaction timed
out";
-    private static final String IDLE_TRANSACTION_TIMEOUT_ERROR = "Idle transaction timed
out";
     private final Action _deleteModelTask;
     private final AMQPConnection<?> _connection;
     private final int _sessionId;
@@ -142,13 +135,6 @@ public abstract class AbstractAMQPSession<S extends AbstractAMQPSession<S,
X>,
     }
 
     @Override
-    protected void postResolveChildren()
-    {
-        super.postResolveChildren();
-        registerTransactionTimeoutTickers(_connection);
-    }
-
-    @Override
     public int getChannelId()
     {
         return _sessionId;
@@ -197,117 +183,6 @@ public abstract class AbstractAMQPSession<S extends AbstractAMQPSession<S,
X>,
         return _connection.getEventLogger();
     }
 
-    private void registerTransactionTimeoutTickers(Connection<?> amqpConnection)
-    {
-        NamedAddressSpace addressSpace = amqpConnection.getAddressSpace();
-        if (addressSpace instanceof QueueManagingVirtualHost)
-        {
-            final EventLogger eventLogger = getEventLogger();
-            final QueueManagingVirtualHost<?> virtualhost = (QueueManagingVirtualHost<?>)
addressSpace;
-            final List<Ticker> tickers = new ArrayList<>(4);
-
-            final Supplier<Long> transactionStartTimeSupplier = new Supplier<Long>()
-            {
-                @Override
-                public Long get()
-                {
-                    return getTransactionStartTimeLong();
-                }
-            };
-            final Supplier<Long> transactionUpdateTimeSupplier = new Supplier<Long>()
-            {
-                @Override
-                public Long get()
-                {
-                    return getTransactionUpdateTimeLong();
-                }
-            };
-
-            long notificationRepeatPeriod =
-                    getContextValue(Long.class, Session.TRANSACTION_TIMEOUT_NOTIFICATION_REPEAT_PERIOD);
-
-            if (virtualhost.getStoreTransactionOpenTimeoutWarn() > 0)
-            {
-                tickers.add(new TransactionTimeoutTicker(
-                        virtualhost.getStoreTransactionOpenTimeoutWarn(),
-                        notificationRepeatPeriod, transactionStartTimeSupplier,
-                        new Action<Long>()
-                        {
-                            @Override
-                            public void performAction(Long age)
-                            {
-                                eventLogger.message(getLogSubject(), ChannelMessages.OPEN_TXN(age));
-                            }
-                        }
-                ));
-            }
-            if (virtualhost.getStoreTransactionOpenTimeoutClose() > 0)
-            {
-                tickers.add(new TransactionTimeoutTicker(
-                        virtualhost.getStoreTransactionOpenTimeoutClose(),
-                        notificationRepeatPeriod, transactionStartTimeSupplier,
-                        new Action<Long>()
-                        {
-                            @Override
-                            public void performAction(Long age)
-                            {
-                                doTimeoutAction(OPEN_TRANSACTION_TIMEOUT_ERROR);
-                            }
-                        }
-                ));
-            }
-            if (virtualhost.getStoreTransactionIdleTimeoutWarn() > 0)
-            {
-                tickers.add(new TransactionTimeoutTicker(
-                        virtualhost.getStoreTransactionIdleTimeoutWarn(),
-                        notificationRepeatPeriod, transactionUpdateTimeSupplier,
-                        new Action<Long>()
-                        {
-                            @Override
-                            public void performAction(Long age)
-                            {
-                                eventLogger.message(getLogSubject(), ChannelMessages.IDLE_TXN(age));
-                            }
-                        }
-                ));
-            }
-            if (virtualhost.getStoreTransactionIdleTimeoutClose() > 0)
-            {
-                tickers.add(new TransactionTimeoutTicker(
-                        virtualhost.getStoreTransactionIdleTimeoutClose(),
-                        notificationRepeatPeriod, transactionUpdateTimeSupplier,
-                        new Action<Long>()
-                        {
-                            @Override
-                            public void performAction(Long age)
-                            {
-                                doTimeoutAction(IDLE_TRANSACTION_TIMEOUT_ERROR);
-                            }
-                        }
-                ));
-            }
-
-            for (Ticker ticker : tickers)
-            {
-                addTicker(ticker);
-            }
-
-            Action deleteTickerTask = new Action()
-            {
-                @Override
-                public void performAction(Object o)
-                {
-                    removeDeleteTask(this);
-                    for (Ticker ticker : tickers)
-                    {
-                        removeTicker(ticker);
-                    }
-                }
-            };
-            addDeleteTask(deleteTickerTask);
-        }
-    }
-
     @Override
     public void addTicker(final Ticker ticker)
     {

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ffd5ad0d/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java
b/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java
index 6da15b1..3b609f9 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java
@@ -37,6 +37,7 @@ import org.apache.qpid.server.model.port.AmqpPort;
 import org.apache.qpid.server.session.AMQPSession;
 import org.apache.qpid.server.txn.LocalTransaction;
 import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.util.Action;
 import org.apache.qpid.server.util.Deletable;
 
 public interface AMQPConnection<C extends AMQPConnection<C>>
@@ -104,6 +105,11 @@ public interface AMQPConnection<C extends AMQPConnection<C>>
 
     Iterator<ServerTransaction> getOpenTransactions();
 
+    void registerTransactionTickers(ServerTransaction serverTransaction,
+                                    final Action<String> closeAction, final long notificationRepeatPeriod);
+
+    void unregisterTransactionTickers(ServerTransaction serverTransaction);
+
     enum CloseReason
     {
         MANAGEMENT,

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ffd5ad0d/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java
b/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java
index 1049b8b..05b5f16 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java
@@ -31,8 +31,11 @@ import java.util.Collection;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -50,6 +53,7 @@ import org.apache.qpid.server.connection.ConnectionPrincipal;
 import org.apache.qpid.server.logging.EventLogger;
 import org.apache.qpid.server.logging.EventLoggerProvider;
 import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.logging.messages.ChannelMessages;
 import org.apache.qpid.server.logging.messages.ConnectionMessages;
 import org.apache.qpid.server.logging.subjects.ConnectionLogSubject;
 import org.apache.qpid.server.model.AbstractConfiguredObject;
@@ -76,6 +80,7 @@ import org.apache.qpid.server.txn.ServerTransaction;
 import org.apache.qpid.server.txn.TransactionObserver;
 import org.apache.qpid.server.util.Action;
 import org.apache.qpid.server.util.FixedKeyMapCreator;
+import org.apache.qpid.server.virtualhost.QueueManagingVirtualHost;
 
 public abstract class AbstractAMQPConnection<C extends AbstractAMQPConnection<C,T>,
T>
         extends AbstractConfiguredObject<C>
@@ -83,6 +88,8 @@ public abstract class AbstractAMQPConnection<C extends AbstractAMQPConnection<C,
 
 {
     public static final FixedKeyMapCreator PUBLISH_ACTION_MAP_CREATOR = new FixedKeyMapCreator("routingKey",
"immediate");
+    private static final String OPEN_TRANSACTION_TIMEOUT_ERROR = "Open transaction timed
out";
+    private static final String IDLE_TRANSACTION_TIMEOUT_ERROR = "Idle transaction timed
out";
     private static final Logger LOGGER = LoggerFactory.getLogger(AbstractAMQPConnection.class);
 
     private final Broker<?> _broker;
@@ -132,6 +139,8 @@ public abstract class AbstractAMQPConnection<C extends AbstractAMQPConnection<C,
     private volatile TransactionObserver _transactionObserver;
     private long _maxUncommittedInMemorySize;
 
+    private final Map<ServerTransaction, Set<Ticker>> _transactionTickers = new
ConcurrentHashMap<>();
+
     public AbstractAMQPConnection(Broker<?> broker,
                                   ServerNetworkConnection network,
                                   AmqpPort<?> port,
@@ -873,6 +882,73 @@ public abstract class AbstractAMQPConnection<C extends AbstractAMQPConnection<C,
                                     _transactionObserver);
     }
 
+    @Override
+    public void registerTransactionTickers(final ServerTransaction serverTransaction,
+                                           final Action<String> closeAction, final
long notificationRepeatPeriod)
+    {
+        NamedAddressSpace addressSpace = getAddressSpace();
+        if (addressSpace instanceof QueueManagingVirtualHost)
+        {
+            final QueueManagingVirtualHost<?> virtualhost = (QueueManagingVirtualHost<?>)
addressSpace;
+
+            EventLogger eventLogger = virtualhost.getEventLogger();
+
+            final Set<Ticker> tickers = new LinkedHashSet<>(4);
+
+            if (virtualhost.getStoreTransactionOpenTimeoutWarn() > 0)
+            {
+                tickers.add(new TransactionTimeoutTicker(
+                        virtualhost.getStoreTransactionOpenTimeoutWarn(),
+                        notificationRepeatPeriod, serverTransaction::getTransactionStartTime,
+                        age -> eventLogger.message(getLogSubject(), ChannelMessages.OPEN_TXN(age))
+                ));
+            }
+            if (virtualhost.getStoreTransactionOpenTimeoutClose() > 0)
+            {
+                tickers.add(new TransactionTimeoutTicker(
+                        virtualhost.getStoreTransactionOpenTimeoutClose(),
+                        notificationRepeatPeriod, serverTransaction::getTransactionStartTime,
+                        age -> closeAction.performAction(OPEN_TRANSACTION_TIMEOUT_ERROR)));
+            }
+            if (virtualhost.getStoreTransactionIdleTimeoutWarn() > 0)
+            {
+                tickers.add(new TransactionTimeoutTicker(
+                        virtualhost.getStoreTransactionIdleTimeoutWarn(),
+                        notificationRepeatPeriod, serverTransaction::getTransactionUpdateTime,
+                        age -> eventLogger.message(getLogSubject(), ChannelMessages.IDLE_TXN(age))
+                ));
+            }
+            if (virtualhost.getStoreTransactionIdleTimeoutClose() > 0)
+            {
+                tickers.add(new TransactionTimeoutTicker(
+                        virtualhost.getStoreTransactionIdleTimeoutClose(),
+                        notificationRepeatPeriod, serverTransaction::getTransactionUpdateTime,
+                        age -> closeAction.performAction(IDLE_TRANSACTION_TIMEOUT_ERROR)
+                ));
+            }
+
+            if (!tickers.isEmpty())
+            {
+                for (Ticker ticker : tickers)
+                {
+                    getAggregateTicker().addTicker(ticker);
+                }
+                notifyWork();
+            }
+            _transactionTickers.put(serverTransaction, tickers);
+        }
+    }
+
+    @Override
+    public void unregisterTransactionTickers(final ServerTransaction serverTransaction)
+    {
+        NamedAddressSpace addressSpace = getAddressSpace();
+        if (addressSpace instanceof QueueManagingVirtualHost)
+        {
+            _transactionTickers.remove(serverTransaction).forEach(t -> getAggregateTicker().removeTicker(t));
+        }
+    }
+
     private class SlowConnectionOpenTicker implements Ticker, SchedulingDelayNotificationListener
     {
         private final long _allowedTime;

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ffd5ad0d/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
index e2211dd..015394e 100644
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
@@ -68,6 +68,7 @@ import com.google.common.util.concurrent.ListenableFuture;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.qpid.server.model.Session;
 import org.apache.qpid.server.txn.AsyncCommand;
 import org.apache.qpid.server.logging.LogMessage;
 import org.apache.qpid.server.logging.LogSubject;
@@ -1147,6 +1148,8 @@ public class ServerSession extends SessionInvoker
             }
             amqpConnection.decrementTransactionOpenCounter();
             _transaction.rollback();
+
+            amqpConnection.unregisterTransactionTickers(_transaction);
         }
         else if(_transaction instanceof DistributedTransaction)
         {
@@ -1240,7 +1243,21 @@ public class ServerSession extends SessionInvoker
 
     public void selectTx()
     {
-        _transaction = getConnection().getAmqpConnection().createLocalTransaction();
+        ServerTransaction txn = _transaction;
+        AMQPConnection_0_10 amqpConnection = getAMQPConnection();
+        if (txn instanceof LocalTransaction)
+        {
+            amqpConnection.unregisterTransactionTickers(_transaction);
+        }
+
+        _transaction = amqpConnection.createLocalTransaction();
+        long notificationRepeatPeriod =
+                getModelObject().getContextValue(Long.class, Session.TRANSACTION_TIMEOUT_NOTIFICATION_REPEAT_PERIOD);
+        amqpConnection.registerTransactionTickers(_transaction,
+                                                  message -> amqpConnection.closeSessionAsync(_modelObject,
+                                                                                        
     AMQPConnection.CloseReason.TRANSACTION_TIMEOUT,
+                                                                                        
     (String) message),
+                                                  notificationRepeatPeriod);
     }
 
     public void selectDtx()
@@ -1671,12 +1688,6 @@ public class ServerSession extends SessionInvoker
         }
     }
 
-    public void doTimeoutAction(final String reason)
-    {
-        getAMQPConnection().closeSessionAsync(_modelObject,
-                                              AMQPConnection.CloseReason.TRANSACTION_TIMEOUT,
reason);
-    }
-
     private class ResultFuture<T> implements Future<T>
     {
 

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ffd5ad0d/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Session_0_10.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Session_0_10.java
b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Session_0_10.java
index 62e98fa..dd6ca80 100644
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Session_0_10.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Session_0_10.java
@@ -113,12 +113,6 @@ public class Session_0_10 extends AbstractAMQPSession<Session_0_10,
ConsumerTarg
     }
 
     @Override
-    public void doTimeoutAction(final String idleTransactionTimeoutError)
-    {
-        _serverSession.doTimeoutAction(idleTransactionTimeoutError);
-    }
-
-    @Override
     public long getTransactionUpdateTimeLong()
     {
         return _serverSession.getTransactionUpdateTimeLong();

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ffd5ad0d/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
index d0597c1..c76d78f 100644
--- a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
+++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
@@ -235,12 +235,6 @@ public class AMQChannel extends AbstractAMQPSession<AMQChannel, ConsumerTarget_0
 
     }
 
-    @Override
-    public void doTimeoutAction(String reason)
-    {
-        _connection.sendConnectionCloseAsync(AMQPConnection.CloseReason.TRANSACTION_TIMEOUT,
reason);
-    }
-
     private void message(final LogMessage message)
     {
         getEventLogger().message(message);
@@ -288,12 +282,6 @@ public class AMQChannel extends AbstractAMQPSession<AMQChannel, ConsumerTarget_0
         return getDeliveryMethod.hasDeliveredMessage();
     }
 
-    /** Sets this channel to be part of a local transaction */
-    private void setLocalTransactional()
-    {
-        _transaction = _connection.createLocalTransaction();
-    }
-
     boolean isTransactional()
     {
         return _transaction.isTransactional();
@@ -789,6 +777,8 @@ public class AMQChannel extends AbstractAMQPSession<AMQChannel, ConsumerTarget_0
                     _connection.incrementTransactionRollbackCounter();
                 }
                 _connection.decrementTransactionOpenCounter();
+
+                _connection.unregisterTransactionTickers(_transaction);
             }
 
             _transaction.rollback();
@@ -3309,7 +3299,19 @@ public class AMQChannel extends AbstractAMQPSession<AMQChannel,
ConsumerTarget_0
             LOGGER.debug("RECV[" + _channelId + "] TxSelect");
         }
 
-        setLocalTransactional();
+        ServerTransaction txn = _transaction;
+        if (txn instanceof LocalTransaction)
+        {
+            getConnection().unregisterTransactionTickers(_transaction);
+        }
+
+        _transaction = _connection.createLocalTransaction();
+        long notificationRepeatPeriod = getContextValue(Long.class,
+                                                 TRANSACTION_TIMEOUT_NOTIFICATION_REPEAT_PERIOD);
+        getConnection().registerTransactionTickers(_transaction,
+                                                   message -> _connection.sendConnectionCloseAsync(AMQPConnection.CloseReason.TRANSACTION_TIMEOUT,
+                                                                                        
          message),
+                                                   notificationRepeatPeriod);
 
         MethodRegistry methodRegistry = _connection.getMethodRegistry();
         TxSelectOkBody responseBody = methodRegistry.createTxSelectOkBody();

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ffd5ad0d/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java
index 9091456..e440187 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java
@@ -50,6 +50,7 @@ import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 import java.util.stream.StreamSupport;
 
+import com.google.common.base.Supplier;
 import com.google.common.collect.Iterators;
 import com.google.common.collect.PeekingIterator;
 import com.google.common.collect.Sets;
@@ -58,12 +59,15 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.logging.EventLogger;
+import org.apache.qpid.server.logging.messages.ChannelMessages;
 import org.apache.qpid.server.logging.messages.ConnectionMessages;
 import org.apache.qpid.server.model.AuthenticationProvider;
 import org.apache.qpid.server.model.Broker;
 import org.apache.qpid.server.model.Connection;
 import org.apache.qpid.server.model.NamedAddressSpace;
 import org.apache.qpid.server.model.Protocol;
+import org.apache.qpid.server.model.Session;
 import org.apache.qpid.server.model.Transport;
 import org.apache.qpid.server.model.port.AmqpPort;
 import org.apache.qpid.server.plugin.ConnectionPropertyEnricher;
@@ -124,12 +128,15 @@ import org.apache.qpid.server.transport.AggregateTicker;
 import org.apache.qpid.server.transport.ByteBufferSender;
 import org.apache.qpid.server.transport.ProtocolEngine;
 import org.apache.qpid.server.transport.ServerNetworkConnection;
+import org.apache.qpid.server.transport.TransactionTimeoutTicker;
+import org.apache.qpid.server.transport.network.Ticker;
 import org.apache.qpid.server.transport.util.Functions;
 import org.apache.qpid.server.txn.LocalTransaction;
 import org.apache.qpid.server.txn.ServerTransaction;
 import org.apache.qpid.server.util.Action;
 import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
 import org.apache.qpid.server.util.ServerScopedRuntimeException;
+import org.apache.qpid.server.virtualhost.QueueManagingVirtualHost;
 import org.apache.qpid.server.virtualhost.VirtualHostUnavailableException;
 
 public class AMQPConnection_1_0Impl extends AbstractAMQPConnection<AMQPConnection_1_0Impl,
ConnectionHandler>

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ffd5ad0d/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
index 9f4f064..ec875e1 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
@@ -1198,12 +1198,6 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0,
ConsumerTarget
     }
 
     @Override
-    public void doTimeoutAction(final String reason)
-    {
-        getAMQPConnection().closeSessionAsync(this, AMQPConnection.CloseReason.TRANSACTION_TIMEOUT,
reason);
-    }
-
-    @Override
     public String toString()
     {
         return "Session_1_0[" + _connection + ": " + _sendingChannel + ']';

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ffd5ad0d/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLinkEndpoint.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLinkEndpoint.java
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLinkEndpoint.java
index cf0ddf3..1394d3a 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLinkEndpoint.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLinkEndpoint.java
@@ -21,11 +21,12 @@ package org.apache.qpid.server.protocol.v1_0;
 
 import java.util.Arrays;
 import java.util.Collections;
-import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.model.Session;
 import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
 import org.apache.qpid.server.protocol.v1_0.type.Binary;
 import org.apache.qpid.server.protocol.v1_0.type.DeliveryState;
@@ -50,7 +51,7 @@ import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
 
 public class TxnCoordinatorReceivingLinkEndpoint extends AbstractReceivingLinkEndpoint<Coordinator>
 {
-    private final LinkedHashMap<Integer, ServerTransaction> _createdTransactions =
new LinkedHashMap<>();
+    private final Map<Integer, ServerTransaction> _createdTransactions = new ConcurrentHashMap<>();
 
     public TxnCoordinatorReceivingLinkEndpoint(final Session_1_0 session, final Link_1_0<Source,
Coordinator> link)
     {
@@ -87,12 +88,18 @@ public class TxnCoordinatorReceivingLinkEndpoint extends AbstractReceivingLinkEn
 
                         Session_1_0 session = getSession();
 
-                        session.getConnection().receivedComplete();
+                        AMQPConnection_1_0<?> connection = session.getConnection();
+                        connection.receivedComplete();
 
                         if (command instanceof Declare)
                         {
-                            final IdentifiedTransaction txn = session.getConnection().createIdentifiedTransaction();
+                            final IdentifiedTransaction txn = connection.createIdentifiedTransaction();
                             _createdTransactions.put(txn.getId(), txn.getServerTransaction());
+                            long notificationRepeatPeriod =
+                                    getSession().getContextValue(Long.class, Session.TRANSACTION_TIMEOUT_NOTIFICATION_REPEAT_PERIOD);
+                            connection.registerTransactionTickers(txn.getServerTransaction(),
+                                                                  this::doTimeoutAction,
+                                                                  notificationRepeatPeriod);
 
                             Declared state = new Declared();
 
@@ -188,6 +195,7 @@ public class TxnCoordinatorReceivingLinkEndpoint extends AbstractReceivingLinkEn
                 error.setDescription("The transaction was marked as rollback only due to
an earlier issue (e.g. a published message was sent settled but could not be enqueued)");
             }
             _createdTransactions.remove(transactionId);
+            connection.unregisterTransactionTickers(txn);
             connection.removeTransaction(transactionId);
             connection.decrementTransactionOpenCounter();
         }
@@ -204,14 +212,7 @@ public class TxnCoordinatorReceivingLinkEndpoint extends AbstractReceivingLinkEn
     protected void remoteDetachedPerformDetach(Detach detach)
     {
         // force rollback of open transactions
-        for(Map.Entry<Integer, ServerTransaction> entry : _createdTransactions.entrySet())
-        {
-            entry.getValue().rollback();
-            AMQPConnection_1_0<?> connection = getSession().getConnection();
-            connection.decrementTransactionOpenCounter();
-            connection.incrementTransactionRollbackCounter();
-            connection.removeTransaction(entry.getKey());
-        }
+        rollbackOpenTransactions();
         close();
     }
 
@@ -266,4 +267,24 @@ public class TxnCoordinatorReceivingLinkEndpoint extends AbstractReceivingLinkEn
     {
 
     }
+
+    private void doTimeoutAction(final String message)
+    {
+        rollbackOpenTransactions();
+        Error error = new Error(TransactionError.TRANSACTION_TIMEOUT, message);
+        getSession().getConnection().close(error);
+    }
+
+    private void rollbackOpenTransactions()
+    {
+        for(Map.Entry<Integer, ServerTransaction> entry : _createdTransactions.entrySet())
+        {
+            entry.getValue().rollback();
+            AMQPConnection_1_0<?> connection = getSession().getConnection();
+            connection.decrementTransactionOpenCounter();
+            connection.incrementTransactionRollbackCounter();
+            connection.removeTransaction(entry.getKey());
+        }
+        _createdTransactions.clear();
+    }
 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ffd5ad0d/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/extensions/transactiontimeout/TransactionTimeoutTest.java
----------------------------------------------------------------------
diff --git a/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/extensions/transactiontimeout/TransactionTimeoutTest.java
b/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/extensions/transactiontimeout/TransactionTimeoutTest.java
index 3f1145c..6c6b1a7 100644
--- a/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/extensions/transactiontimeout/TransactionTimeoutTest.java
+++ b/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/extensions/transactiontimeout/TransactionTimeoutTest.java
@@ -25,7 +25,6 @@ import static org.apache.qpid.server.virtualhost.QueueManagingVirtualHost.STORE_
 import static org.hamcrest.CoreMatchers.containsString;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.CoreMatchers.not;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.fail;
 import static org.junit.Assume.assumeThat;
@@ -57,11 +56,9 @@ public class TransactionTimeoutTest extends JmsTestBase
     private final ExceptionCatchingListener _listener = new ExceptionCatchingListener();
 
     @Before
-    public void setUp() throws Exception
+    public void setUp()
     {
         assumeThat(getBrokerAdmin().isManagementSupported(), is(true));
-        assumeThat("AMQP 1.0 does not support producer transaction timeout",
-                   getProtocol(), is(not(equalTo(Protocol.AMQP_1_0))));
     }
 
     @Test
@@ -95,7 +92,13 @@ public class TransactionTimeoutTest extends JmsTestBase
         }
         finally
         {
-            connection.close();
+            try
+            {
+                connection.close();
+            }
+            catch (JMSException ignore)
+            {
+            }
         }
     }
 
@@ -150,7 +153,13 @@ public class TransactionTimeoutTest extends JmsTestBase
         }
         finally
         {
-            connection.close();
+            try
+            {
+                connection.close();
+            }
+            catch (JMSException ignore)
+            {
+            }
         }
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message