qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kw...@apache.org
Subject svn commit: r1755412 - in /qpid/java/trunk: broker-core/src/main/java/org/apache/qpid/server/ broker-core/src/main/java/org/apache/qpid/server/model/ broker-core/src/main/java/org/apache/qpid/server/model/adapter/ broker-core/src/main/java/org/apache/q...
Date Sat, 06 Aug 2016 22:45:56 GMT
Author: kwall
Date: Sat Aug  6 22:45:56 2016
New Revision: 1755412

URL: http://svn.apache.org/viewvc?rev=1755412&view=rev
Log:
QPID-7374: [Java Broker] Reimplement transaction timeout feature as IO ticker

Added:
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/TransactionTimeoutTicker.java
    qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/transport/TransactionTimeoutTickerTest.java
Removed:
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/TransactionTimeoutHelper.java
    qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/TransactionTimeoutHelperTest.java
Modified:
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Session.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
    qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
    qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
    qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
    qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutDisabledTest.java
    qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTest.java
    qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTestCase.java

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Session.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Session.java?rev=1755412&r1=1755411&r2=1755412&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Session.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Session.java Sat
Aug  6 22:45:56 2016
@@ -31,6 +31,12 @@ public interface Session<X extends Sessi
     // available credit of both producer and consumer sides.
     String PRODUCER_FLOW_BLOCKED = "producerFlowBlocked";
 
+    String TRANSACTION_TIMEOUT_NOTIFICATION_REPEAT_PERIOD = "qpid.session.transactionTimeoutNotificationRepeatPeriod";
+
+    @ManagedContextDefault(name = TRANSACTION_TIMEOUT_NOTIFICATION_REPEAT_PERIOD,
+                           description = "Frequency, in milliseconds, with which transaction
timeout warnings will be repeated.")
+    long TRANSACTION_TIMEOUT_NOTIFICATION_REPEAT_PERIOD_DEFAULT = 10000;
+
     @DerivedAttribute
     int getChannelId();
 

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java?rev=1755412&r1=1755411&r2=1755412&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java
(original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java
Sat Aug  6 22:45:56 2016
@@ -94,25 +94,37 @@ public interface VirtualHost<X extends V
     @ManagedContextDefault( name = "virtualhost.storeTransactionIdleTimeoutClose")
     public static final long DEFAULT_STORE_TRANSACTION_IDLE_TIMEOUT_CLOSE = 0l;
 
-    @ManagedAttribute( defaultValue = "${virtualhost.storeTransactionIdleTimeoutClose}")
+    @ManagedAttribute( defaultValue = "${virtualhost.storeTransactionIdleTimeoutClose}",
+                       description = "The maximum length of time, in milliseconds, that an
open store transaction may "
+                                     + "remain idle. If a transaction exceeds this threshold,
the resource that "
+                                     + "created the transaction will be closed automatically.")
     long getStoreTransactionIdleTimeoutClose();
 
     @ManagedContextDefault( name = "virtualhost.storeTransactionIdleTimeoutWarn")
     public static final long DEFAULT_STORE_TRANSACTION_IDLE_TIMEOUT_WARN = 180000l;
 
-    @ManagedAttribute( defaultValue = "${virtualhost.storeTransactionIdleTimeoutWarn}")
+    @ManagedAttribute( defaultValue = "${virtualhost.storeTransactionIdleTimeoutWarn}",
+                       description = "The maximum length of time, in milliseconds, that an
open store transaction may "
+                                     + "remain idle. If a transaction exceeds this threshold,
warnings will be "
+                                     + "written to the logs.")
     long getStoreTransactionIdleTimeoutWarn();
 
     @ManagedContextDefault( name = "virtualhost.storeTransactionOpenTimeoutClose")
     public static final long DEFAULT_STORE_TRANSACTION_OPEN_TIMEOUT_CLOSE = 0l;
 
-    @ManagedAttribute( defaultValue = "${virtualhost.storeTransactionOpenTimeoutClose}")
+    @ManagedAttribute( defaultValue = "${virtualhost.storeTransactionOpenTimeoutClose}",
+                       description = "The maximum length of time, in milliseconds, that a
store transaction may "
+                                   + "remain open. If a transaction exceeds this threshold,
the resource that "
+                                   + "created the transaction will be closed automatically.")
     long getStoreTransactionOpenTimeoutClose();
 
     @ManagedContextDefault( name = "virtualhost.storeTransactionOpenTimeoutWarn")
     public static final long DEFAULT_STORE_TRANSACTION_OPEN_TIMEOUT_WARN = 300000l;
 
-    @ManagedAttribute( defaultValue = "${virtualhost.storeTransactionOpenTimeoutWarn}")
+    @ManagedAttribute( defaultValue = "${virtualhost.storeTransactionOpenTimeoutWarn}",
+                       description = "The maximum length of time, in milliseconds, that a
store transaction may "
+                                   + "remain open. If a transaction exceeds this threshold,
warnings will be "
+                                   + "written to the logs.")
     long getStoreTransactionOpenTimeoutWarn();
 
     @ManagedContextDefault( name = "virtualhost.housekeepingThreadCount")

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java?rev=1755412&r1=1755411&r2=1755412&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java
(original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java
Sat Aug  6 22:45:56 2016
@@ -20,39 +20,51 @@
  */
 package org.apache.qpid.server.model.adapter;
 
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Date;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 
+import com.google.common.base.Supplier;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 
+import org.apache.qpid.server.logging.EventLogger;
+import org.apache.qpid.server.logging.messages.ChannelMessages;
 import org.apache.qpid.server.model.AbstractConfiguredObject;
 import org.apache.qpid.server.model.ConfiguredObject;
 import org.apache.qpid.server.model.Consumer;
 import org.apache.qpid.server.model.LifetimePolicy;
+import org.apache.qpid.server.model.NamedAddressSpace;
 import org.apache.qpid.server.model.Publisher;
 import org.apache.qpid.server.model.Session;
 import org.apache.qpid.server.model.State;
 import org.apache.qpid.server.model.StateTransition;
+import org.apache.qpid.server.model.VirtualHost;
 import org.apache.qpid.server.protocol.AMQSessionModel;
 import org.apache.qpid.server.protocol.ConsumerListener;
 import org.apache.qpid.server.transport.AbstractAMQPConnection;
 import org.apache.qpid.server.util.Action;
+import org.apache.qpid.server.transport.TransactionTimeoutTicker;
+import org.apache.qpid.transport.network.Ticker;
 
 public final class SessionAdapter extends AbstractConfiguredObject<SessionAdapter>
implements Session<SessionAdapter>
 {
+    private static final String OPEN_TRANSACTION_TIMEOUT_ERROR = "Open transaction timed
out";
+    private static final String IDLE_TRANSACTION_TIMEOUT_ERROR = "Idle transaction timed
out";
+
     // Attributes
     private final AMQSessionModel _session;
     private final Action _deleteModelTask;
 
-    public SessionAdapter(final AbstractAMQPConnection<?> connectionAdapter,
+    public SessionAdapter(final AbstractAMQPConnection<?> amqpConnection,
                           final AMQSessionModel session)
     {
-        super(parentsMap(connectionAdapter), createAttributes(session));
+        super(parentsMap(amqpConnection), createAttributes(session));
         _session = session;
         _session.addConsumerListener(new ConsumerListener()
         {
@@ -70,6 +82,9 @@ public final class SessionAdapter extend
             }
         });
         session.setModelObject(this);
+
+        registerTransactionTimeoutTickers(amqpConnection, session);
+
         _deleteModelTask = new Action()
         {
             @Override
@@ -85,7 +100,7 @@ public final class SessionAdapter extend
 
     private static Map<String, Object> createAttributes(final AMQSessionModel session)
     {
-        Map<String,Object> attributes = new HashMap<String, Object>();
+        Map<String, Object> attributes = new HashMap<>();
         attributes.put(ID, UUID.randomUUID());
         attributes.put(NAME, String.valueOf(session.getChannelId()));
         attributes.put(DURABLE, false);
@@ -118,11 +133,11 @@ public final class SessionAdapter extend
     @Override
     public <C extends ConfiguredObject> Collection<C> getChildren(Class<C>
clazz)
     {
-        if(clazz == org.apache.qpid.server.model.Consumer.class)
+        if (clazz == org.apache.qpid.server.model.Consumer.class)
         {
             return (Collection<C>) getConsumers();
         }
-        else if(clazz == Publisher.class)
+        else if (clazz == Publisher.class)
         {
             return (Collection<C>) getPublishers();
         }
@@ -184,4 +199,115 @@ public final class SessionAdapter extend
         return Futures.immediateFuture(null);
     }
 
+    private void registerTransactionTimeoutTickers(AbstractAMQPConnection<?> amqpConnection,
+                                                   final AMQSessionModel session)
+    {
+        NamedAddressSpace addressSpace = amqpConnection.getAddressSpace();
+        if (addressSpace instanceof VirtualHost)
+        {
+            final EventLogger eventLogger = amqpConnection.getEventLogger();
+            final VirtualHost virtualhost = (VirtualHost) addressSpace;
+            final List<Ticker> tickers = new ArrayList<>(4);
+
+            final Supplier<Date> transactionStartTimeSupplier = new Supplier<Date>()
+            {
+                @Override
+                public Date get()
+                {
+                    return SessionAdapter.this.getTransactionStartTime();
+                }
+            };
+            final Supplier<Date> transactionUpdateTimeSupplier = new Supplier<Date>()
+            {
+                @Override
+                public Date get()
+                {
+                    return SessionAdapter.this.getTransactionUpdateTime();
+                }
+            };
+
+            long notificationRepeatPeriod =
+                    getContextValue(Long.class, Session.TRANSACTION_TIMEOUT_NOTIFICATION_REPEAT_PERIOD);
+
+            if (virtualhost.getStoreTransactionOpenTimeoutWarn() > 0)
+            {
+                tickers.add(new TransactionTimeoutTicker(
+                        virtualhost.getStoreTransactionOpenTimeoutWarn(),
+                        notificationRepeatPeriod, transactionStartTimeSupplier,
+                        new Action<Long>()
+                        {
+                            @Override
+                            public void performAction(Long age)
+                            {
+                                eventLogger.message(_session.getLogSubject(), ChannelMessages.OPEN_TXN(age));
+                            }
+                        }
+                ));
+            }
+            if (virtualhost.getStoreTransactionOpenTimeoutClose() > 0)
+            {
+                tickers.add(new TransactionTimeoutTicker(
+                        virtualhost.getStoreTransactionOpenTimeoutClose(),
+                        notificationRepeatPeriod, transactionStartTimeSupplier,
+                        new Action<Long>()
+                        {
+                            @Override
+                            public void performAction(Long age)
+                            {
+                                _session.doTimeoutAction(OPEN_TRANSACTION_TIMEOUT_ERROR);
+                            }
+                        }
+                ));
+            }
+            if (virtualhost.getStoreTransactionIdleTimeoutWarn() > 0)
+            {
+                tickers.add(new TransactionTimeoutTicker(
+                        virtualhost.getStoreTransactionIdleTimeoutWarn(),
+                        notificationRepeatPeriod, transactionUpdateTimeSupplier,
+                        new Action<Long>()
+                        {
+                            @Override
+                            public void performAction(Long age)
+                            {
+                                eventLogger.message(_session.getLogSubject(), ChannelMessages.IDLE_TXN(age));
+                            }
+                        }
+                ));
+            }
+            if (virtualhost.getStoreTransactionIdleTimeoutClose() > 0)
+            {
+                tickers.add(new TransactionTimeoutTicker(
+                        virtualhost.getStoreTransactionIdleTimeoutClose(),
+                        notificationRepeatPeriod, transactionUpdateTimeSupplier,
+                        new Action<Long>()
+                        {
+                            @Override
+                            public void performAction(Long age)
+                            {
+                                _session.doTimeoutAction(IDLE_TRANSACTION_TIMEOUT_ERROR);
+                            }
+                        }
+                ));
+            }
+
+            for (Ticker ticker : tickers)
+            {
+                session.addTicker(ticker);
+            }
+
+            Action deleteTickerTask = new Action()
+            {
+                @Override
+                public void performAction(Object o)
+                {
+                    session.removeDeleteTask(this);
+                    for (Ticker ticker : tickers)
+                    {
+                        session.removeTicker(ticker);
+                    }
+                }
+            };
+            session.addDeleteTask(deleteTickerTask);
+        }
+    }
 }

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java?rev=1755412&r1=1755411&r2=1755412&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java
(original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java
Sat Aug  6 22:45:56 2016
@@ -50,22 +50,7 @@ public interface AMQSessionModel<T exten
 
     LogSubject getLogSubject();
 
-    /**
-     * This method is called from the housekeeping thread to check the status of
-     * transactions on this session and react appropriately.
-     *
-     * If a transaction is open for too long or idle for too long then a warning
-     * is logged or the connection is closed, depending on the configuration. An open
-     * transaction is one that has recent activity. The transaction age is counted
-     * from the time the transaction was started. An idle transaction is one that
-     * has had no activity, such as publishing or acknowledging messages.
-     *
-     * @param openWarn time in milliseconds before alerting on open transaction
-     * @param openClose time in milliseconds before closing connection with open transaction
-     * @param idleWarn time in milliseconds before alerting on idle transaction
-     * @param idleClose time in milliseconds before closing connection with idle transaction
-     */
-    void checkTransactionStatus(long openWarn, long openClose, long idleWarn, long idleClose);
+    void doTimeoutAction(String reason);
 
     void block(Queue<?> queue);
 

Added: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/TransactionTimeoutTicker.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/TransactionTimeoutTicker.java?rev=1755412&view=auto
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/TransactionTimeoutTicker.java
(added)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/TransactionTimeoutTicker.java
Sat Aug  6 22:45:56 2016
@@ -0,0 +1,104 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.server.transport;
+
+import java.util.Date;
+
+import com.google.common.base.Supplier;
+
+import org.apache.qpid.server.util.Action;
+import org.apache.qpid.transport.network.Ticker;
+
+public class TransactionTimeoutTicker implements Ticker
+{
+    private final long _timeoutValue;
+    private final Action<Long> _notfication;
+    private final Supplier<Date> _timeSupplier;
+    private final long _notificationRepeatPeriod;
+
+    /** The time the ticker will next procedure the notification */
+    private volatile long _nextNotificationTime = 0;
+    /** Last transaction time stamp seen by this ticker.  */
+    private volatile long _lastTransactionTimeStamp = 0;
+
+    public TransactionTimeoutTicker(long timeoutValue,
+                                    long notificationRepeatPeriod,
+                                    Supplier<Date> timeStampSupplier,
+                                    Action<Long> notification)
+    {
+        _timeoutValue = timeoutValue;
+        _notfication = notification;
+        _timeSupplier = timeStampSupplier;
+        _notificationRepeatPeriod = notificationRepeatPeriod;
+    }
+
+    @Override
+    public int getTimeToNextTick(final long currentTime)
+    {
+        final long transactionTimeStamp = _timeSupplier.get().getTime();
+        int tick = calculateTimeToNextTick(currentTime, transactionTimeStamp);
+        if (tick <= 0 && _nextNotificationTime > currentTime)
+        {
+            tick = (int) (_nextNotificationTime - currentTime);
+        }
+        return tick;
+    }
+
+    @Override
+    public int tick(final long currentTime)
+    {
+        final long transactionTimeStamp = _timeSupplier.get().getTime();
+        int tick = calculateTimeToNextTick(currentTime, transactionTimeStamp);
+        if (tick <= 0)
+        {
+            if (currentTime >= _nextNotificationTime)
+            {
+                final long idleTime = currentTime - transactionTimeStamp;
+                _nextNotificationTime = currentTime + _notificationRepeatPeriod;
+                _notfication.performAction(idleTime);
+            }
+            else
+            {
+                tick = (int) (_nextNotificationTime - currentTime);
+            }
+        }
+        return tick;
+    }
+
+    private int calculateTimeToNextTick(final long currentTime, final long transactionTimeStamp)
+    {
+        if (transactionTimeStamp != _lastTransactionTimeStamp)
+        {
+            // Transactions's time stamp has changed, reset the next notification time
+            _lastTransactionTimeStamp = transactionTimeStamp;
+            _nextNotificationTime = 0;
+        }
+        if (transactionTimeStamp > 0)
+        {
+            return (int) ((transactionTimeStamp + _timeoutValue) - currentTime);
+        }
+        else
+        {
+            return Integer.MAX_VALUE;
+        }
+    }
+}

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java?rev=1755412&r1=1755411&r2=1755412&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
(original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
Sat Aug  6 22:45:56 2016
@@ -95,7 +95,6 @@ import org.apache.qpid.server.model.port
 import org.apache.qpid.server.plugin.ConnectionValidator;
 import org.apache.qpid.server.plugin.QpidServiceLoader;
 import org.apache.qpid.server.plugin.SystemNodeCreator;
-import org.apache.qpid.server.protocol.AMQSessionModel;
 import org.apache.qpid.server.protocol.LinkRegistry;
 import org.apache.qpid.server.protocol.LinkRegistryImpl;
 import org.apache.qpid.server.queue.QueueEntry;
@@ -1752,18 +1751,6 @@ public abstract class AbstractVirtualHos
                     q.checkMessageStatus();
                 }
             }
-            for (AMQPConnection<?> connection : _connections)
-            {
-                _logger.debug("Checking for long running open transactions on connection
{}", connection);
-                for (AMQSessionModel<?> session : connection.getSessionModels())
-                {
-                    _logger.debug("Checking for long running open transactions on session
{}", session);
-                    session.checkTransactionStatus(getStoreTransactionOpenTimeoutWarn(),
-                                                   getStoreTransactionOpenTimeoutClose(),
-                                                   getStoreTransactionIdleTimeoutWarn(),
-                                                   getStoreTransactionIdleTimeoutClose());
-                }
-            }
         }
     }
 

Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java?rev=1755412&r1=1755411&r2=1755412&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
(original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
Sat Aug  6 22:45:56 2016
@@ -314,8 +314,7 @@ public class MockConsumer implements Con
         }
 
         @Override
-        public void checkTransactionStatus(long openWarn, long openClose,
-                long idleWarn, long idleClose)
+        public void doTimeoutAction(final String reason)
         {
         }
 

Added: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/transport/TransactionTimeoutTickerTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/transport/TransactionTimeoutTickerTest.java?rev=1755412&view=auto
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/transport/TransactionTimeoutTickerTest.java
(added)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/transport/TransactionTimeoutTickerTest.java
Sat Aug  6 22:45:56 2016
@@ -0,0 +1,195 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.server.transport;
+
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Mockito.inOrder;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.Date;
+
+import com.google.common.base.Supplier;
+import org.mockito.InOrder;
+
+import org.apache.qpid.server.util.Action;
+import org.apache.qpid.test.utils.QpidTestCase;
+
+public class TransactionTimeoutTickerTest extends QpidTestCase
+{
+
+    private TransactionTimeoutTicker _ticker;
+    private Supplier<Date> _dateSupplier = mock(Supplier.class);
+    private Action<Long> _notificationAction = mock(Action.class);
+    private long _timeoutValue = 100;
+    private long _notificationRepeatPeriod = 5000;
+
+    public void testTickWhenNoTransaction() throws Exception
+    {
+        final long timeNow = System.currentTimeMillis();
+
+        _ticker = new TransactionTimeoutTicker(_timeoutValue, _notificationRepeatPeriod,
+                                               _dateSupplier,
+                                               _notificationAction);
+
+        when(_dateSupplier.get()).thenReturn(new Date(0));
+
+        assertTickTime("Unexpected ticker value when no transaction is in-progress",
+                       Integer.MAX_VALUE,
+                       timeNow, _ticker);
+
+        verify(_notificationAction, never()).performAction(anyLong());
+    }
+
+    public void testTickDuringSingleTransaction() throws Exception
+    {
+        final long timeNow = System.currentTimeMillis();
+        final long transactionTime = timeNow - 90;
+
+        _ticker = new TransactionTimeoutTicker(_timeoutValue, _notificationRepeatPeriod,
+                                               _dateSupplier,
+                                               _notificationAction);
+
+        when(_dateSupplier.get()).thenReturn(new Date(transactionTime));
+
+        final int expected = 10;
+        assertTickTime("Unexpected ticker value when transaction is in-progress",
+                       expected,
+                       timeNow, _ticker);
+
+        verify(_notificationAction, never()).performAction(anyLong());
+    }
+
+    public void testTicksDuringManyTransactions() throws Exception
+    {
+        long timeNow = System.currentTimeMillis();
+        final long firstTransactionTime = timeNow - 10;
+
+        _ticker = new TransactionTimeoutTicker(_timeoutValue, _notificationRepeatPeriod,
+                                               _dateSupplier,
+                                               _notificationAction);
+
+        // First transaction
+        when(_dateSupplier.get()).thenReturn(new Date(firstTransactionTime));
+
+        final int expectedTickForFirstTransaction = 90;
+        assertTickTime("Unexpected ticker value for first transaction",
+                       expectedTickForFirstTransaction,
+                       timeNow, _ticker);
+
+        // Second transaction
+        timeNow += 100;
+        final long secondTransactionTime = timeNow - 5;
+
+        when(_dateSupplier.get()).thenReturn(new Date(secondTransactionTime));
+
+        final int expectedTickForSecondTransaction = 95;
+        assertTickTime("Unexpected ticker value for second transaction",
+                       expectedTickForSecondTransaction,
+                       timeNow, _ticker);
+
+        verify(_notificationAction, never()).performAction(anyLong());
+    }
+
+    public void testSingleTimeoutsDuringSingleTransaction() throws Exception
+    {
+        long timeNow = System.currentTimeMillis();
+        final long transactionTime = timeNow - 110;
+
+        _ticker = new TransactionTimeoutTicker(_timeoutValue, _notificationRepeatPeriod,
+                                               _dateSupplier,
+                                               _notificationAction);
+
+        when(_dateSupplier.get()).thenReturn(new Date(transactionTime));
+
+
+        final long expectedInitialIdle = 110L;
+        assertTickerTimeout("Unexpected ticker value when transaction is in-progress",
+                            timeNow, _ticker);
+
+        verify(_notificationAction, times(1)).performAction(expectedInitialIdle);
+    }
+
+    public void testMultipleTimeoutsDuringSingleTransaction_NotificationsRespectPeriod()
throws Exception
+    {
+        InOrder inorder = inOrder(_notificationAction);
+
+        long timeNow = System.currentTimeMillis();
+        final long transactionTime = timeNow - 110;
+
+        _ticker = new TransactionTimeoutTicker(_timeoutValue, _notificationRepeatPeriod,
+                                               _dateSupplier,
+                                               _notificationAction);
+
+        when(_dateSupplier.get()).thenReturn(new Date(transactionTime));
+
+
+        final long expectedInitialIdle = 110L;
+        assertTickerTimeout("Unexpected ticker value when transaction is in-progress",
+                            timeNow, _ticker);
+
+        inorder.verify(_notificationAction, times(1)).performAction(expectedInitialIdle);
+
+        // Advance the clock by a period of time less than the notification repeat period
+        final long timeAdjustment = _notificationRepeatPeriod / 2;
+        timeNow += timeAdjustment;
+
+        final long expectedNotificationTick = timeAdjustment;
+        assertTickTime("Unexpected ticker value when transaction is in-progress after notification",
+                       expectedNotificationTick,
+                       timeNow,
+                       _ticker);
+
+        inorder.verify(_notificationAction, never()).performAction(anyLong());
+
+        // Advance the clock again past the notification repeat period and that verify that
we are notified
+        // a second time
+
+        timeNow += timeAdjustment;
+        final long expectedSecondIdle = timeNow - transactionTime;
+        assertTickerTimeout("Unexpected ticker value when transaction is in-progress and
renotification is due",
+                            timeNow, _ticker);
+
+        inorder.verify(_notificationAction, times(1)).performAction(expectedSecondIdle);
+
+    }
+
+    private void assertTickerTimeout(final String message,
+                                     final long currentTime,
+                                     final TransactionTimeoutTicker ticker)
+    {
+        assertTrue(message, ticker.getTimeToNextTick(currentTime) <= 0);
+        assertTrue(message, ticker.tick(currentTime) <= 0);
+    }
+
+    private void assertTickTime(final String message,
+                                final long expectedValue,
+                                final long currentTime,
+                                final TransactionTimeoutTicker ticker)
+    {
+        assertEquals(message, expectedValue, ticker.getTimeToNextTick(currentTime));
+        assertEquals(message, expectedValue, ticker.tick(currentTime));
+    }
+}

Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java?rev=1755412&r1=1755411&r2=1755412&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
(original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
Sat Aug  6 22:45:56 2016
@@ -55,8 +55,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.server.TransactionTimeoutHelper;
-import org.apache.qpid.server.TransactionTimeoutHelper.CloseAction;
 import org.apache.qpid.server.connection.SessionPrincipal;
 import org.apache.qpid.server.consumer.ConsumerImpl;
 import org.apache.qpid.server.consumer.ConsumerTarget;
@@ -178,8 +176,6 @@ public class ServerSession extends Sessi
 
     private final List<Action<? super ServerSession>> _taskList = new CopyOnWriteArrayList<Action<?
super ServerSession>>();
 
-    private final TransactionTimeoutHelper _transactionTimeoutHelper;
-
     private AtomicReference<LogMessage> _forcedCloseLogMessage = new AtomicReference<LogMessage>();
 
     private volatile long _uncommittedMessageSize;
@@ -209,14 +205,6 @@ public class ServerSession extends Sessi
         {
             _token = amqpConnection.getBroker().newToken(_subject);
         }
-        _transactionTimeoutHelper = new TransactionTimeoutHelper(_logSubject, new CloseAction()
-        {
-            @Override
-            public void doTimeoutAction(String reason)
-            {
-                getAMQPConnection().closeSessionAsync(ServerSession.this, AMQConstant.RESOURCE_ERROR,
reason);
-            }
-        }, getConnection().getAmqpConnection());
 
         _blockingTimeout = serverConnection.getBroker().getContextValue(Long.class,
                                                                                        Broker.CHANNEL_FLOW_CONTROL_ENFORCEMENT_TIMEOUT);
@@ -837,11 +825,6 @@ public class ServerSession extends Sessi
         return this;
     }
 
-    public void checkTransactionStatus(long openWarn, long openClose, long idleWarn, long
idleClose)
-    {
-        _transactionTimeoutHelper.checkIdleOrOpenTimes(_transaction, openWarn, openClose,
idleWarn, idleClose);
-    }
-
     public void block(Queue<?> queue)
     {
         block(queue, queue.getName());
@@ -1285,6 +1268,11 @@ public class ServerSession extends Sessi
         }
     }
 
+    @Override
+    public void doTimeoutAction(final String reason)
+    {
+        getAMQPConnection().closeSessionAsync(ServerSession.this, AMQConstant.RESOURCE_ERROR,
reason);
+    }
 
     public final long getMaxUncommittedInMemorySize()
     {

Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java?rev=1755412&r1=1755411&r2=1755412&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
(original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
Sat Aug  6 22:45:56 2016
@@ -56,8 +56,6 @@ import org.apache.qpid.common.AMQPFilter
 import org.apache.qpid.exchange.ExchangeDefaults;
 import org.apache.qpid.framing.*;
 import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.server.TransactionTimeoutHelper;
-import org.apache.qpid.server.TransactionTimeoutHelper.CloseAction;
 import org.apache.qpid.server.connection.SessionPrincipal;
 import org.apache.qpid.server.consumer.ConsumerImpl;
 import org.apache.qpid.server.consumer.ConsumerTarget;
@@ -196,7 +194,6 @@ public class AMQChannel
 
     private final ClientDeliveryMethod _clientDeliveryMethod;
 
-    private final TransactionTimeoutHelper _transactionTimeoutHelper;
     private final UUID _id = UUID.randomUUID();
 
     private final List<Action<? super AMQChannel>> _taskList =
@@ -258,15 +255,6 @@ public class AMQChannel
 
         _clientDeliveryMethod = connection.createDeliveryMethod(_channelId);
 
-        _transactionTimeoutHelper = new TransactionTimeoutHelper(_logSubject, new CloseAction()
-        {
-            @Override
-            public void doTimeoutAction(String reason)
-            {
-                _connection.sendConnectionCloseAsync(AMQConstant.RESOURCE_ERROR, reason);
-            }
-        }, getConnection());
-
         AccessController.doPrivileged((new PrivilegedAction<Object>()
         {
             @Override
@@ -280,6 +268,12 @@ public class AMQChannel
 
     }
 
+    @Override
+    public void doTimeoutAction(String reason)
+    {
+        _connection.sendConnectionCloseAsync(AMQConstant.RESOURCE_ERROR, reason);
+    }
+
     private void message(final LogMessage message)
     {
         getEventLogger().message(message);
@@ -1764,11 +1758,6 @@ public class AMQChannel
         return getConnection().getAddressSpace();
     }
 
-    public void checkTransactionStatus(long openWarn, long openClose, long idleWarn, long
idleClose)
-    {
-        _transactionTimeoutHelper.checkIdleOrOpenTimes(_transaction, openWarn, openClose,
idleWarn, idleClose);
-    }
-
     private void deadLetter(long deliveryTag)
     {
         final UnacknowledgedMessageMap unackedMap = getUnacknowledgedMessageMap();

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java?rev=1755412&r1=1755411&r2=1755412&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
(original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
Sat Aug  6 22:45:56 2016
@@ -1261,12 +1261,6 @@ public class Session_1_0 implements AMQS
     }
 
     @Override
-    public void checkTransactionStatus(long openWarn, long openClose, long idleWarn, long
idleClose)
-    {
-        // TODO - required for AMQSessionModel / long running transaction detection
-    }
-
-    @Override
     public void block(Queue<?> queue)
     {
         // TODO - required for AMQSessionModel / producer side flow control
@@ -1607,6 +1601,12 @@ public class Session_1_0 implements AMQS
         }
     }
 
+    @Override
+    public void doTimeoutAction(final String reason)
+    {
+        getAMQPConnection().closeSessionAsync(this, AMQConstant.RESOURCE_ERROR, reason);
+    }
+
     private void consumerAdded(Consumer<?> consumer)
     {
         for(ConsumerListener l : _consumerListeners)

Modified: qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutDisabledTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutDisabledTest.java?rev=1755412&r1=1755411&r2=1755412&view=diff
==============================================================================
--- qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutDisabledTest.java
(original)
+++ qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutDisabledTest.java
Sat Aug  6 22:45:56 2016
@@ -20,6 +20,7 @@
  */
 package org.apache.qpid.test.unit.transacted;
 
+import org.apache.qpid.server.model.Session;
 import org.apache.qpid.test.utils.TestBrokerConfiguration;
 
 /**
@@ -32,7 +33,7 @@ public class TransactionTimeoutDisabledT
     {
         // Setup housekeeping every second
         TestBrokerConfiguration brokerConfiguration = getDefaultBrokerConfiguration();
-        setTestSystemProperty("virtualhost.housekeepingCheckPeriod", "100");
+        setTestSystemProperty(Session.TRANSACTION_TIMEOUT_NOTIFICATION_REPEAT_PERIOD, "100");
 
         // No transaction timeout configuration.
     }

Modified: qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTest.java?rev=1755412&r1=1755411&r2=1755412&view=diff
==============================================================================
--- qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTest.java
(original)
+++ qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTest.java
Sat Aug  6 22:45:56 2016
@@ -27,7 +27,7 @@ import javax.jms.MessageProducer;
 import javax.jms.Queue;
 
 import org.apache.qpid.server.model.Broker;
-import org.apache.qpid.test.utils.TestBrokerConfiguration;
+import org.apache.qpid.server.model.Session;
 
 /**
  * This tests the behaviour of transactional sessions when the {@code transactionTimeout}
configuration
@@ -44,10 +44,7 @@ public class TransactionTimeoutTest exte
     {
         // switch off connection close in order to test timeout on publishing of unroutable
messages
         getDefaultBrokerConfiguration().setBrokerAttribute(Broker.CONNECTION_CLOSE_WHEN_NO_ROUTE,
false);
-
-        // Setup housekeeping every 100ms
-        TestBrokerConfiguration brokerConfiguration = getDefaultBrokerConfiguration();
-        setTestSystemProperty("virtualhost.housekeepingCheckPeriod","100");
+        setTestSystemProperty(Session.TRANSACTION_TIMEOUT_NOTIFICATION_REPEAT_PERIOD, "100");
 
         if (getName().contains("ProducerIdle"))
         {

Modified: qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTestCase.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTestCase.java?rev=1755412&r1=1755411&r2=1755412&view=diff
==============================================================================
--- qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTestCase.java
(original)
+++ qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTestCase.java
Sat Aug  6 22:45:56 2016
@@ -50,7 +50,6 @@ import java.util.concurrent.atomic.Atomi
 public abstract class TransactionTimeoutTestCase extends QpidBrokerTestCase implements ExceptionListener
 {
     private static final int ALERT_MESSAGE_TOLERANCE = 6;
-    public static final String VIRTUALHOST = "test";
     public static final String TEXT = "0123456789abcdefghiforgettherest";
     public static final String CHN_OPEN_TXN = "CHN-1007";
     public static final String CHN_IDLE_TXN = "CHN-1008";




---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message