activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dej...@apache.org
Subject svn commit: r906069 - in /activemq/branches/activemq-5.3/activemq-core: ./ src/main/java/org/apache/activemq/broker/region/cursors/ src/main/java/org/apache/activemq/store/jdbc/ src/main/java/org/apache/activemq/store/jdbc/adapter/ src/test/java/org/ap...
Date Wed, 03 Feb 2010 14:36:39 GMT
Author: dejanb
Date: Wed Feb  3 14:36:39 2010
New Revision: 906069

URL: http://svn.apache.org/viewvc?rev=906069&view=rev
Log:
merging 906054 - https://issues.apache.org/activemq/browse/AMQ-2563 - jdbc memory leak

Added:
    activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCNegativeQueueTest.java
      - copied unchanged from r906054, activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCNegativeQueueTest.java
    activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCTestMemory.java
      - copied unchanged from r906054, activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCTestMemory.java
Modified:
    activemq/branches/activemq-5.3/activemq-core/pom.xml
    activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
    activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
    activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java
    activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
    activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/NegativeQueueTest.java

Modified: activemq/branches/activemq-5.3/activemq-core/pom.xml
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/pom.xml?rev=906069&r1=906068&r2=906069&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/pom.xml (original)
+++ activemq/branches/activemq-5.3/activemq-core/pom.xml Wed Feb  3 14:36:39 2010
@@ -493,6 +493,9 @@
              
              <exclude>**/NetworkConnectionsCleanedupTest.*/**</exclude>
              
+             <!-- used just to test potential memory leaks manually -->
+             <exclude>**/JDBCTestMemory.*</exclude>
+             
              <exclude>**/amq1490/*</exclude>
              <exclude>**/AMQ1925*</exclude>
              <exclude>**/archive/*</exclude>

Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java?rev=906069&r1=906068&r2=906069&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
(original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
Wed Feb  3 14:36:39 2010
@@ -86,7 +86,9 @@
             clearIterator(true);
             recovered = true;
         } else {
-            LOG.error(regionDestination.getActiveMQDestination().getPhysicalName() + " cursor
got duplicate: " + message);
+            if (LOG.isDebugEnabled()) {
+                LOG.debug(regionDestination.getActiveMQDestination().getPhysicalName() +
" cursor got duplicate: " + message);
+            }
             storeHasMessages = true;
         }
         return recovered;
@@ -160,6 +162,10 @@
             recoverMessage(node.getMessage(),true);
             lastCachedId = node.getMessageId();
         } else {
+            if (lastCachedId != null && node.getMessageId().getBrokerSequenceId()
< lastCachedId.getBrokerSequenceId()) {
+                lastCachedId = node.getMessageId();
+                setBatch(lastCachedId);
+            }
             if (cacheEnabled) {
                 cacheEnabled=false;
                 if (LOG.isDebugEnabled()) {

Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java?rev=906069&r1=906068&r2=906069&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
(original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
Wed Feb  3 14:36:39 2010
@@ -231,8 +231,9 @@
                     if (listener.hasSpace()) {
                         Message msg = (Message)wireFormat.unmarshal(new ByteSequence(data));
                         msg.getMessageId().setBrokerSequenceId(sequenceId);
-                        listener.recoverMessage(msg);
-                        lastMessageId.set(sequenceId);
+                        if (listener.recoverMessage(msg)) {
+                            lastMessageId.set(sequenceId);
+                        }
                         return true;
                     }
                     return false;

Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java?rev=906069&r1=906068&r2=906069&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java
(original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java
Wed Feb  3 14:36:39 2010
@@ -345,7 +345,7 @@
     public String getFindNextMessagesStatement() {
         if (findNextMessagesStatement == null) {
             findNextMessagesStatement = "SELECT ID, MSG FROM " + getFullMessageTableName()
-                                        + " WHERE CONTAINER=? AND ID > ? ORDER BY ID";
+                                        + " WHERE CONTAINER=? AND ID >= ? ORDER BY ID";
         }
         return findNextMessagesStatement;
     }

Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java?rev=906069&r1=906068&r2=906069&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
(original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
Wed Feb  3 14:36:39 2010
@@ -740,9 +740,6 @@
             int maxReturned, JDBCMessageRecoveryListener listener) throws Exception {
         PreparedStatement s = null;
         ResultSet rs = null;
-        long id = 0;
-        List<Long> cleanupIds = new ArrayList<Long>();
-        int index = 0;
         try {
             s = c.getConnection().prepareStatement(this.statements.getFindNextMessagesStatement());
             s.setMaxRows(maxReturned * 2);
@@ -752,15 +749,8 @@
             int count = 0;
             if (this.statements.isUseExternalMessageReferences()) {
                 while (rs.next() && count < maxReturned) {
-                    id = rs.getLong(1);
-                    if (this.lastRecoveredMessagesIds.contains(id)) {
-                        // this message was already recovered
-                        cleanupIds.add(id);
-                        continue;
-                    }
                     if (listener.recoverMessageReference(rs.getString(1))) {
                         count++;
-                        this.lastRecoveredMessagesIds.add(id);
                     } else {
                         LOG.debug("Stopped recover next messages");
                         break;
@@ -768,27 +758,14 @@
                 }
             } else {
                 while (rs.next() && count < maxReturned) {
-                    id = rs.getLong(1);
-                    if (this.lastRecoveredMessagesIds.contains(id)) {
-                        // this message was already recovered
-                        cleanupIds.add(id);
-                        continue;
-                    }
                     if (listener.recoverMessage(rs.getLong(1), getBinaryData(rs, 2))) {
                         count++;
-                        this.lastRecoveredMessagesIds.add(id);
                     } else {
                         LOG.debug("Stopped recover next messages");
                         break;
                     }
                 }
             }
-            // not cleanup the list of recovered messages
-            index = 0;
-            Iterator<Long> it = cleanupIds.iterator();
-            while (it.hasNext() && index < count) {
-                this.lastRecoveredMessagesIds.remove(it.next());
-            }
         } catch (Exception e) {
             e.printStackTrace();
         } finally {

Modified: activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/NegativeQueueTest.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/NegativeQueueTest.java?rev=906069&r1=906068&r2=906069&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/NegativeQueueTest.java
(original)
+++ activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/NegativeQueueTest.java
Wed Feb  3 14:36:39 2010
@@ -18,6 +18,7 @@
 
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Date;
 import java.util.List;
 import java.util.Properties;
@@ -155,7 +156,7 @@
         MessageProducer producer = session.createProducer(queue);
         List<TextMessage> senderList = new ArrayList<TextMessage>();
         for (int i = 0; i < MESSAGE_COUNT; i++) {
-            TextMessage msg = session.createTextMessage(formatter.format(new Date()));
+            TextMessage msg = session.createTextMessage(i + " " + formatter.format(new Date()));
             senderList.add(msg);
             producer.send(msg);
             if(TRANSACTED) session.commit();
@@ -259,6 +260,7 @@
         super.tearDown();
         if (broker != null) {
             broker.stop();
+            broker.waitUntilStopped();
         }
     }
 
@@ -276,6 +278,7 @@
         BrokerService answer = new BrokerService();
         configureBroker(answer);
         answer.start();
+        answer.waitUntilStarted();
         return answer;
     }
     



Mime
View raw message