activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject svn commit: r1378085 - in /activemq/trunk/activemq-core: ./ src/main/java/org/apache/activemq/broker/region/ src/main/java/org/apache/activemq/broker/region/cursors/ src/test/java/org/apache/activemq/broker/ src/test/java/org/apache/activemq/bugs/ src/...
Date Tue, 28 Aug 2012 11:42:27 GMT
Author: gtully
Date: Tue Aug 28 11:42:27 2012
New Revision: 1378085

URL: http://svn.apache.org/viewvc?rev=1378085&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-3998 https://issues.apache.org/jira/browse/AMQ-3999
- fix issue with redispatch of committed prepared transactions, unnecessary call to determin
store size for durable subs and duplicate recording of message pending dispatch on an unsubscribe

Added:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TopicDurableConnectStatsTest.java
  (with props)
Modified:
    activemq/trunk/activemq-core/pom.xml
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2801Test.java

Modified: activemq/trunk/activemq-core/pom.xml
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/pom.xml?rev=1378085&r1=1378084&r2=1378085&view=diff
==============================================================================
--- activemq/trunk/activemq-core/pom.xml (original)
+++ activemq/trunk/activemq-core/pom.xml Tue Aug 28 11:42:27 2012
@@ -609,7 +609,6 @@
             <exclude>org/apache/activemq/transport/failover/FailoverDuplicateTest.*</exclude>
             <exclude>org/apache/activemq/transport/vm/VmTransportNetworkBrokerTest.*</exclude>
             <exclude>org/apache/activemq/ExpiryHogTest.*</exclude>
-            <exclude>org/apache/activemq/bugs/AMQ2801Test.*</exclude>
             <exclude>org/apache/activemq/usecases/BrowseOverNetworkTest.*</exclude>
             <exclude>org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.*</exclude>
             <exclude>org/apache/kahadb/index/HashIndexTest.*</exclude>

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=1378085&r1=1378084&r2=1378085&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
Tue Aug 28 11:42:27 2012
@@ -1169,6 +1169,7 @@ public class Queue extends BaseDestinati
                 store.resetBatching();
             }
             messages.gc();
+            messages.reset();
             asyncWakeup();
         } finally {
             messagesLock.writeLock().unlock();

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java?rev=1378085&r1=1378084&r2=1378085&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
Tue Aug 28 11:42:27 2012
@@ -253,8 +253,6 @@ public abstract class AbstractStoreCurso
         batchList.clear();
         clearIterator(false);
         batchResetNeeded = true;
-        // wonder do we need to determine size here, it may change before restart
-        resetSize();
         setCacheEnabled(false);
     }
 
@@ -269,6 +267,8 @@ public abstract class AbstractStoreCurso
             LOG.trace(this + " - fillBatch");
         }
         if (batchResetNeeded) {
+            resetSize();
+            setMaxBatchSize(Math.min(regionDestination.getMaxPageSize(), size));
             resetBatch();
             this.batchResetNeeded = false;
         }
@@ -305,7 +305,8 @@ public abstract class AbstractStoreCurso
     @Override
     public String toString() {
         return super.toString() + ":" + regionDestination.getActiveMQDestination().getPhysicalName()
+ ",batchResetNeeded=" + batchResetNeeded
-                    + ",storeHasMessages=" + this.storeHasMessages + ",size=" + this.size
+ ",cacheEnabled=" + isCacheEnabled();
+                    + ",storeHasMessages=" + this.storeHasMessages + ",size=" + this.size
+ ",cacheEnabled=" + isCacheEnabled()
+                    + ",maxBatchSize:" + maxBatchSize;
     }
     
     protected abstract void doFillBatch() throws Exception;

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java?rev=1378085&r1=1378084&r2=1378085&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java
Tue Aug 28 11:42:27 2012
@@ -24,8 +24,6 @@ import javax.management.InstanceNotFound
 import javax.management.MalformedObjectNameException;
 import javax.management.ObjectName;
 import junit.framework.Test;
-
-import org.apache.activemq.broker.jmx.DestinationView;
 import org.apache.activemq.broker.jmx.DestinationViewMBean;
 import org.apache.activemq.broker.jmx.RecoveredXATransactionViewMBean;
 import org.apache.activemq.broker.region.policy.PolicyEntry;
@@ -43,7 +41,6 @@ import org.apache.activemq.command.Sessi
 import org.apache.activemq.command.TransactionId;
 import org.apache.activemq.command.TransactionInfo;
 import org.apache.activemq.command.XATransactionId;
-import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
 import org.apache.activemq.util.JMXSupport;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -233,9 +230,10 @@ public class XARecoveryBrokerTest extend
         }
 
         // We should get the committed transactions.
-        for (int i = 0; i < expectedMessageCount(4, destination); i++) {
+        final int countToReceive = expectedMessageCount(4, destination);
+        for (int i = 0; i < countToReceive ; i++) {
             Message m = receiveMessage(connection, TimeUnit.SECONDS.toMillis(10));
-            assertNotNull(m);
+            assertNotNull("Got non null message: " + i, m);
         }
 
         assertNoMessagesLeft(connection);

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2801Test.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2801Test.java?rev=1378085&r1=1378084&r2=1378085&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2801Test.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2801Test.java Tue
Aug 28 11:42:27 2012
@@ -181,9 +181,9 @@ public class AMQ2801Test
             DurableSubscriptionViewMBean sub = (DurableSubscriptionViewMBean)
                 broker.getManagementContext().newProxyInstance(subName, DurableSubscriptionViewMBean.class,
true);
 
-            LOG.info(sub.getSubscriptionName() + ": pending = " + sub.getPendingQueueSize());
+            LOG.info(sub.getSubscriptionName() + ": pending = " + sub.getPendingQueueSize()
+ ", dispatched: " + sub.getDispatchedQueueSize());
             if(sub.getSubscriptionName().equals(SUBSCRIPTION1)) {
-                assertEquals("Incorrect number of pending messages", MSG_COUNT, sub.getPendingQueueSize());
+                assertEquals("Incorrect number of pending messages", MSG_COUNT, sub.getPendingQueueSize()
+ sub.getDispatchedQueueSize());
             } else {
                 assertEquals("Incorrect number of pending messages", 0, sub.getPendingQueueSize());
             }

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TopicDurableConnectStatsTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TopicDurableConnectStatsTest.java?rev=1378085&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TopicDurableConnectStatsTest.java
(added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TopicDurableConnectStatsTest.java
Tue Aug 28 11:42:27 2012
@@ -0,0 +1,261 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.usecases;
+
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.util.Date;
+import java.util.Vector;
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.management.MBeanServerConnection;
+import javax.management.MBeanServerInvocationHandler;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
+import junit.framework.Test;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.ActiveMQPrefetchPolicy;
+import org.apache.activemq.broker.BrokerFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.jmx.DurableSubscriptionViewMBean;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.util.Wait;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TopicDurableConnectStatsTest extends org.apache.activemq.TestSupport {
+
+    private static final Logger LOG = LoggerFactory.getLogger(TopicDurableConnectStatsTest.class);
+    private BrokerService broker;
+    private ActiveMQTopic topic;
+    private Vector<Throwable> exceptions = new Vector<Throwable>();
+    private int messageSize = 4000;
+    protected MBeanServerConnection mbeanServer;
+    protected String domain = "org.apache.activemq";
+    private ActiveMQConnectionFactory connectionFactory = null;
+    final int numMessages = 20;
+
+    private static Session session2 = null;
+
+    protected ActiveMQConnectionFactory createConnectionFactory() throws Exception {
+
+        connectionFactory = new ActiveMQConnectionFactory("vm://" + getName(true));
+
+        ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy();
+        prefetchPolicy.setAll(10);
+        connectionFactory.setPrefetchPolicy(prefetchPolicy);
+
+        connectionFactory.setWatchTopicAdvisories(false);
+        return connectionFactory;
+    }
+
+    @Override
+    protected Connection createConnection() throws Exception {
+        return createConnection("cliName");
+    }
+
+    protected Connection createConnection(String name) throws Exception {
+        Connection con = super.createConnection();
+        con.setClientID(name);
+        con.start();
+        return con;
+    }
+
+    public static Test suite() {
+        return suite(TopicDurableConnectStatsTest.class);
+    }
+
+    protected void setUp() throws Exception {
+        exceptions.clear();
+        topic = (ActiveMQTopic) createDestination();
+
+        createBroker();
+        mbeanServer = ManagementFactory.getPlatformMBeanServer();
+        super.setUp();
+    }
+
+    protected void tearDown() throws Exception {
+        super.tearDown();
+        destroyBroker();
+    }
+
+    private void createBroker() throws Exception {
+        createBroker(true);
+    }
+
+    private void createBroker(boolean deleteAllMessages) throws Exception {
+        broker = BrokerFactory.createBroker("broker:(vm://" + getName(true) + ")");
+        broker.setBrokerName(getName(true));
+        broker.setDeleteAllMessagesOnStartup(deleteAllMessages);
+        broker.setAdvisorySupport(false);
+        broker.addConnector("tcp://0.0.0.0:0");
+
+        setDefaultPersistenceAdapter(broker);
+        broker.start();
+    }
+
+    private void destroyBroker() throws Exception {
+        if (broker != null)
+            broker.stop();
+    }
+
+    protected ObjectName assertRegisteredObjectName(String name) throws MalformedObjectNameException,
NullPointerException {
+        ObjectName objectName = new ObjectName(name);
+
+        LOG.info("** Looking for " + name);
+        try {
+            if (mbeanServer.isRegistered(objectName)) {
+                LOG.info("Bean Registered: " + objectName);
+            } else {
+                LOG.info("Couldn't find Mbean! " + objectName);
+
+            }
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+        return objectName;
+    }
+
+    public void testPendingTopicStat() throws Exception {
+
+        Connection consumerCon = createConnection("cliId1");
+        Session consumerSession = consumerCon.createSession(true, Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer consumer1 = consumerSession.createDurableSubscriber(topic, "SubsId",
"filter = 'true'", true);
+
+        DurableSubscriptionViewMBean subscriber1 = null;
+
+        ObjectName subscriberObjName1 = assertRegisteredObjectName(domain + ":BrokerName="
+ getName(true) + ",Type=Subscription,persistentMode=Durable,subscriptionID=SubsId,destinationType=Topic,destinationName="
+ topic.getTopicName() + ",clientId=cliId1");
+        subscriber1 = (DurableSubscriptionViewMBean) MBeanServerInvocationHandler.newProxyInstance(mbeanServer,
subscriberObjName1, DurableSubscriptionViewMBean.class, true);
+
+        LOG.info("Beginning Pending Queue Size count: " + subscriber1.getPendingQueueSize());
+        LOG.info("Prefetch Limit: " + subscriber1.getPrefetchSize());
+
+        assertEquals("no pending", 0, subscriber1.getPendingQueueSize());
+        assertEquals("Prefetch Limit ", 10, subscriber1.getPrefetchSize());
+
+
+        Connection producerCon = createConnection("x");
+        Session producerSessions = producerCon.createSession(true, Session.AUTO_ACKNOWLEDGE);
+
+
+        MessageProducer producer = producerSessions.createProducer(topic);
+        producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+
+        int i = 0;
+        for (; i < numMessages; i++) {
+
+            if (i == 15) {
+                // kill consumer
+
+                LOG.info("Killing consumer at 15");
+                consumerSession.close();
+                consumerCon.close();
+            }
+
+            TextMessage message = producerSessions.createTextMessage(createMessageText(i));
+            message.setJMSExpiration(0);
+            message.setStringProperty("filter", "true");
+            producer.send(topic, message);
+            producerSessions.commit();
+
+        }
+        LOG.info("Sent " + i + " messages in total");
+        producerCon.close();
+
+        LOG.info("Pending Queue Size count: " + subscriber1.getPendingQueueSize());
+        assertEquals("pending as expected", 20, subscriber1.getPendingQueueSize());
+
+        LOG.info("Re-connect client and consume messages");
+        Connection con2 = createConnection("cliId1");
+        session2 = con2.createSession(true, Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer consumer2 = session2.createDurableSubscriber(topic, "SubsId", "filter
= 'true'", true);
+
+
+        final Listener listener = new Listener();
+        consumer2.setMessageListener(listener);
+
+        assertTrue("received all sent", Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return numMessages == listener.count;
+            }
+        }));
+
+        LOG.info("Received: " + listener.count);
+
+        int pq = subscriber1.getPendingQueueSize();
+        LOG.info("Pending Queue Size count: " + pq);
+        assertEquals("Pending queue after consumed", 0, pq);
+
+        session2.close();
+        con2.close();
+        LOG.info("FINAL Pending Queue Size count (after consumer close): " + subscriber1.getPendingQueueSize());
+    }
+
+
+    private String createMessageText(int index) {
+        StringBuffer buffer = new StringBuffer(messageSize);
+        buffer.append("Message: " + index + " sent at: " + new Date());
+        if (buffer.length() > messageSize) {
+            return buffer.substring(0, messageSize);
+        }
+        for (int i = buffer.length(); i < messageSize; i++) {
+            buffer.append(' ');
+        }
+        return buffer.toString();
+    }
+
+
+    public static class Listener implements MessageListener {
+        int count = 0;
+        String id = null;
+
+        Listener() {
+        }
+
+        public void onMessage(Message message) {
+            count++;
+            try {
+                session2.commit();
+            } catch (JMSException e1) {
+                e1.printStackTrace();
+            }
+
+            if (id != null) {
+                try {
+                    LOG.info(id + ", " + message.getJMSMessageID());
+                } catch (Exception ignored) {
+                }
+            }
+
+            try {
+                Thread.sleep(2);
+            } catch (InterruptedException e) {
+                e.printStackTrace();
+            }
+        }
+    }
+}
+
+

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TopicDurableConnectStatsTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TopicDurableConnectStatsTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date



Mime
View raw message