activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dej...@apache.org
Subject svn commit: r1478823 - in /activemq/trunk: activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/ activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/ activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/ ...
Date Fri, 03 May 2013 14:50:57 GMT
Author: dejanb
Date: Fri May  3 14:50:56 2013
New Revision: 1478823

URL: http://svn.apache.org/r1478823
Log:
https://issues.apache.org/jira/browse/AMQ-4495 - always get a max batch of messages from the
store

Added:
    activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/MemoryLimitTest.java
Modified:
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java
    activemq/trunk/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
    activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
    activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
    activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorNoDuplicateTest.java
    activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/util/ConsumerThread.java

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java?rev=1478823&r1=1478822&r2=1478823&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
(original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
Fri May  3 14:50:56 2013
@@ -38,7 +38,7 @@ public abstract class AbstractStoreCurso
     private boolean storeHasMessages = false;
     protected int size;
     private MessageId lastCachedId;
-    private boolean hadSpace = false;
+    protected boolean hadSpace = false;
 
     protected AbstractStoreCursor(Destination destination) {
         super((destination != null ? destination.isPrioritizedMessages():false));
@@ -253,12 +253,6 @@ public abstract class AbstractStoreCurso
         setCacheEnabled(false);
     }
 
-    @Override
-    public boolean hasSpace() {
-        hadSpace = super.hasSpace();
-        return hadSpace;
-    }
-
     protected final synchronized void fillBatch() {
         if (LOG.isTraceEnabled()) {
             LOG.trace(this + " - fillBatch");

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java?rev=1478823&r1=1478822&r2=1478823&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java
(original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java
Fri May  3 14:50:56 2013
@@ -17,10 +17,15 @@
 package org.apache.activemq.broker.region.cursors;
 
 import java.io.IOException;
+
+import org.apache.activemq.broker.Broker;
+import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.region.Queue;
 import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageId;
 import org.apache.activemq.store.MessageStore;
+import org.apache.activemq.store.memory.MemoryMessageStore;
+import org.apache.activemq.store.memory.MemoryTransactionStore;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -33,14 +38,16 @@ import org.slf4j.LoggerFactory;
 class QueueStorePrefetch extends AbstractStoreCursor {
     private static final Logger LOG = LoggerFactory.getLogger(QueueStorePrefetch.class);
     private final MessageStore store;
+    private final Broker broker;
    
     /**
      * Construct it
      * @param queue
      */
-    public QueueStorePrefetch(Queue queue) {
+    public QueueStorePrefetch(Queue queue, Broker broker) {
         super(queue);
         this.store = queue.getMessageStore();
+        this.broker = broker;
 
     }
 
@@ -94,7 +101,10 @@ class QueueStorePrefetch extends Abstrac
     
     @Override
     protected void doFillBatch() throws Exception {
-        this.store.recoverNextMessages(this.maxBatchSize, this);
+        hadSpace = this.hasSpace();
+        if (!broker.getBrokerService().isPersistent() || hadSpace) {
+            this.store.recoverNextMessages(this.maxBatchSize, this);
+        }
     }
 
 }

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java?rev=1478823&r1=1478822&r2=1478823&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java
(original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java
Fri May  3 14:50:56 2013
@@ -47,7 +47,7 @@ public class StoreQueueCursor extends Ab
         super((queue != null ? queue.isPrioritizedMessages():false));
         this.broker=broker;
         this.queue = queue;
-        this.persistent = new QueueStorePrefetch(queue);
+        this.persistent = new QueueStorePrefetch(queue, broker);
         currentCursor = persistent;
     }
 

Modified: activemq/trunk/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java?rev=1478823&r1=1478822&r2=1478823&view=diff
==============================================================================
--- activemq/trunk/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
(original)
+++ activemq/trunk/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
Fri May  3 14:50:56 2013
@@ -279,15 +279,12 @@ public class JDBCMessageStore extends Ab
                     maxReturned, isPrioritizedMessages(), new JDBCMessageRecoveryListener()
{
 
                 public boolean recoverMessage(long sequenceId, byte[] data) throws Exception
{
-                    if (listener.hasSpace()) {
                         Message msg = (Message)wireFormat.unmarshal(new ByteSequence(data));
                         msg.getMessageId().setBrokerSequenceId(sequenceId);
                         listener.recoverMessage(msg);
                         lastRecoveredSequenceId.set(sequenceId);
                         lastRecoveredPriority.set(msg.getPriority());
                         return true;
-                    }
-                    return false;
                 }
 
                 public boolean recoverMessageReference(String reference) throws Exception
{

Modified: activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java?rev=1478823&r1=1478822&r2=1478823&view=diff
==============================================================================
--- activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
(original)
+++ activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
Fri May  3 14:50:56 2013
@@ -548,8 +548,7 @@ public class KahaDBStore extends Message
                         StoredDestination sd = getStoredDestination(dest, tx);
                         Entry<Long, MessageKeys> entry = null;
                         int counter = 0;
-                        for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx);
-                             listener.hasSpace() && iterator.hasNext(); ) {
+                        for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx);
iterator.hasNext(); ) {
                             entry = iterator.next();
                             if (ackedAndPrepared.contains(entry.getValue().messageId)) {
                                 continue;

Modified: activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala?rev=1478823&r1=1478822&r2=1478823&view=diff
==============================================================================
--- activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
(original)
+++ activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
Fri May  3 14:50:56 2013
@@ -665,7 +665,7 @@ class LevelDBStore extends LockableServi
 
   case class LimitingRecoveryListener(max: Int, listener: MessageRecoveryListener) extends
MessageRecoveryListener {
     private var recovered: Int = 0
-    def hasSpace = recovered < max && listener.hasSpace
+    def hasSpace = recovered < max
     def recoverMessage(message: Message) = {
       recovered += 1;
       listener.recoverMessage(message)

Modified: activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorNoDuplicateTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorNoDuplicateTest.java?rev=1478823&r1=1478822&r2=1478823&view=diff
==============================================================================
--- activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorNoDuplicateTest.java
(original)
+++ activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorNoDuplicateTest.java
Fri May  3 14:50:56 2013
@@ -76,7 +76,7 @@ public class StoreQueueCursorNoDuplicate
 
         queueMessageStore.start();
 
-        QueueStorePrefetch underTest = new QueueStorePrefetch(queue);
+        QueueStorePrefetch underTest = new QueueStorePrefetch(queue, brokerService.getBroker());
         SystemUsage systemUsage = new SystemUsage();
         // ensure memory limit is reached
         systemUsage.getMemoryUsage().setLimit(messageBytesSize * (count + 2));

Added: activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/MemoryLimitTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/MemoryLimitTest.java?rev=1478823&view=auto
==============================================================================
--- activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/MemoryLimitTest.java
(added)
+++ activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/MemoryLimitTest.java
Fri May  3 14:50:56 2013
@@ -0,0 +1,201 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.usecases;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.TestSupport;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.Destination;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.util.ConsumerThread;
+import org.apache.activemq.util.ProducerThread;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.*;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+@RunWith(value = Parameterized.class)
+public class MemoryLimitTest extends TestSupport {
+    private static final Logger LOG = LoggerFactory.getLogger(MemoryLimitTest.class);
+    final String payload = new String(new byte[10 * 1024]); //10KB
+    protected BrokerService broker;
+
+    private final TestSupport.PersistenceAdapterChoice persistenceAdapterChoice;
+
+    @Parameterized.Parameters
+    public static Collection<TestSupport.PersistenceAdapterChoice[]> getTestParameters()
{
+        TestSupport.PersistenceAdapterChoice[] kahaDb = {TestSupport.PersistenceAdapterChoice.KahaDB};
+        TestSupport.PersistenceAdapterChoice[] levelDb = {TestSupport.PersistenceAdapterChoice.LevelDB};
+        TestSupport.PersistenceAdapterChoice[] jdbc = {TestSupport.PersistenceAdapterChoice.JDBC};
+        List<TestSupport.PersistenceAdapterChoice[]> choices = new ArrayList<TestSupport.PersistenceAdapterChoice[]>();
+        choices.add(kahaDb);
+        choices.add(levelDb);
+        choices.add(jdbc);
+        return choices;
+    }
+
+    public MemoryLimitTest(TestSupport.PersistenceAdapterChoice choice) {
+        this.persistenceAdapterChoice = choice;
+    }
+
+    protected BrokerService createBroker() throws Exception {
+        BrokerService broker = new BrokerService();
+        broker.getSystemUsage().getMemoryUsage().setLimit(1 * 1024 * 1024); //1MB
+        broker.deleteAllMessages();
+
+        PolicyMap policyMap = new PolicyMap();
+        PolicyEntry policyEntry = new PolicyEntry();
+        policyEntry.setProducerFlowControl(false);
+        policyMap.put(new ActiveMQQueue(">"), policyEntry);
+        broker.setDestinationPolicy(policyMap);
+
+        LOG.info("Starting broker with persistenceAdapterChoice " + persistenceAdapterChoice.toString());
+        setPersistenceAdapter(broker, persistenceAdapterChoice);
+        broker.getPersistenceAdapter().deleteAllMessages();
+
+        return broker;
+    }
+
+    @Before
+    public void setUp() throws Exception {
+        if (broker == null) {
+            broker = createBroker();
+        }
+        broker.start();
+        broker.waitUntilStarted();
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        if (broker != null) {
+            broker.stop();
+            broker.waitUntilStopped();
+        }
+    }
+
+    @Test(timeout = 120000)
+    public void testCursorBatch() throws Exception {
+
+        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost?jms.prefetchPolicy.all=10");
+        factory.setOptimizeAcknowledge(true);
+        Connection conn = factory.createConnection();
+        conn.start();
+        Session sess = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+        Queue queue = sess.createQueue("STORE");
+        final ProducerThread producer = new ProducerThread(sess, queue) {
+            @Override
+            protected Message createMessage(int i) throws Exception {
+                return sess.createTextMessage(payload + "::" + i);
+            }
+        };
+        producer.setMessageCount(2000);
+        producer.start();
+        producer.join();
+
+        Thread.sleep(1000);
+
+        // assert we didn't break high watermark (70%) usage
+        Destination dest = broker.getDestination((ActiveMQQueue) queue);
+        LOG.info("Destination usage: " + dest.getMemoryUsage());
+        assertTrue(dest.getMemoryUsage().getPercentUsage() <= 71);
+        LOG.info("Broker usage: " + broker.getSystemUsage().getMemoryUsage());
+        assertTrue(broker.getSystemUsage().getMemoryUsage().getPercentUsage() <= 71);
+
+        // consume one message
+        MessageConsumer consumer = sess.createConsumer(queue);
+        Message msg = consumer.receive();
+        msg.acknowledge();
+
+        Thread.sleep(1000);
+        // this should free some space and allow us to get new batch of messages in the memory
+        // exceeding the limit
+        LOG.info("Destination usage: " + dest.getMemoryUsage());
+        assertTrue(dest.getMemoryUsage().getPercentUsage() >= 478);
+        LOG.info("Broker usage: " + broker.getSystemUsage().getMemoryUsage());
+        assertTrue(broker.getSystemUsage().getMemoryUsage().getPercentUsage() >= 478);
+
+        // let's make sure we can consume all messages
+        for (int i = 1; i < 2000; i++) {
+            msg = consumer.receive(1000);
+            assertNotNull("Didn't receive message " + i, msg);
+            msg.acknowledge();
+        }
+
+    }
+
+    /**
+     *
+     * Handy test for manually checking what's going on
+     *
+     */
+    @Ignore
+    @Test(timeout = 120000)
+    public void testLimit() throws Exception {
+
+        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost?jms.prefetchPolicy.all=10");
+        factory.setOptimizeAcknowledge(true);
+        Connection conn = factory.createConnection();
+        conn.start();
+        Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        final ProducerThread producer = new ProducerThread(sess, sess.createQueue("STORE.1"))
{
+            @Override
+            protected Message createMessage(int i) throws Exception {
+                return sess.createTextMessage(payload + "::" + i);
+            }
+        };
+        producer.setMessageCount(1000);
+
+        final ProducerThread producer2 = new ProducerThread(sess, sess.createQueue("STORE.2"))
{
+            @Override
+            protected Message createMessage(int i) throws Exception {
+                return sess.createTextMessage(payload + "::" + i);
+            }
+        };
+        producer2.setMessageCount(1000);
+
+
+        ConsumerThread consumer = new ConsumerThread(sess, sess.createQueue("STORE.1"));
+        consumer.setBreakOnNull(false);
+        consumer.setMessageCount(1000);
+
+        producer.start();
+        producer.join();
+
+        producer2.start();
+
+        Thread.sleep(300);
+
+        consumer.start();
+
+        consumer.join();
+        producer2.join();
+
+        assertEquals("consumer got all produced messages", producer.getMessageCount(), consumer.getReceived());
+
+    }
+}

Modified: activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/util/ConsumerThread.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/util/ConsumerThread.java?rev=1478823&r1=1478822&r2=1478823&view=diff
==============================================================================
--- activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/util/ConsumerThread.java
(original)
+++ activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/util/ConsumerThread.java
Fri May  3 14:50:56 2013
@@ -45,7 +45,7 @@ public class ConsumerThread extends Thre
             while (received < messageCount) {
                 Message msg = consumer.receive(3000);
                 if (msg != null) {
-                    LOG.info("Received " + ((TextMessage)msg).getText());
+                    LOG.info("Received " + received + ": " + ((TextMessage)msg).getText());
                     received++;
                 } else {
                     if (breakOnNull) {



Mime
View raw message