activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dej...@apache.org
Subject svn commit: r906054 - in /activemq/trunk/activemq-core: ./ src/main/java/org/apache/activemq/broker/region/cursors/ src/main/java/org/apache/activemq/store/jdbc/ src/main/java/org/apache/activemq/store/jdbc/adapter/ src/test/java/org/apache/activemq/br...
Date Wed, 03 Feb 2010 14:07:09 GMT
Author: dejanb
Date: Wed Feb  3 14:07:08 2010
New Revision: 906054

URL: http://svn.apache.org/viewvc?rev=906054&view=rev
Log:
https://issues.apache.org/activemq/browse/AMQ-2563 - jdbc memory leak

Added:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCNegativeQueueTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCTestMemory.java
Modified:
    activemq/trunk/activemq-core/pom.xml
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/NegativeQueueTest.java

Modified: activemq/trunk/activemq-core/pom.xml
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/pom.xml?rev=906054&r1=906053&r2=906054&view=diff
==============================================================================
--- activemq/trunk/activemq-core/pom.xml (original)
+++ activemq/trunk/activemq-core/pom.xml Wed Feb  3 14:07:08 2010
@@ -498,6 +498,9 @@
              
              <exclude>**/NetworkConnectionsCleanedupTest.*/**</exclude>
              
+             <!-- used just to test potential memory leaks manually -->
+             <exclude>**/JDBCTestMemory.*</exclude>
+             
              <exclude>**/amq1490/*</exclude>
              <exclude>**/AMQ1925*</exclude>
              <exclude>**/archive/*</exclude>

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java?rev=906054&r1=906053&r2=906054&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
Wed Feb  3 14:07:08 2010
@@ -86,7 +86,9 @@
             clearIterator(true);
             recovered = true;
         } else {
-            LOG.error(regionDestination.getActiveMQDestination().getPhysicalName() + " cursor
got duplicate: " + message);
+            if (LOG.isDebugEnabled()) {
+                LOG.debug(regionDestination.getActiveMQDestination().getPhysicalName() +
" cursor got duplicate: " + message);
+            }
             storeHasMessages = true;
         }
         return recovered;
@@ -160,6 +162,10 @@
             recoverMessage(node.getMessage(),true);
             lastCachedId = node.getMessageId();
         } else {
+            if (lastCachedId != null && node.getMessageId().getBrokerSequenceId()
< lastCachedId.getBrokerSequenceId()) {
+                lastCachedId = node.getMessageId();
+                setBatch(lastCachedId);
+            }
             if (cacheEnabled) {
                 cacheEnabled=false;
                 if (LOG.isDebugEnabled()) {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java?rev=906054&r1=906053&r2=906054&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
Wed Feb  3 14:07:08 2010
@@ -231,8 +231,9 @@
                     if (listener.hasSpace()) {
                         Message msg = (Message)wireFormat.unmarshal(new ByteSequence(data));
                         msg.getMessageId().setBrokerSequenceId(sequenceId);
-                        listener.recoverMessage(msg);
-                        lastMessageId.set(sequenceId);
+                        if (listener.recoverMessage(msg)) {
+                            lastMessageId.set(sequenceId);
+                        }
                         return true;
                     }
                     return false;

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java?rev=906054&r1=906053&r2=906054&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java
Wed Feb  3 14:07:08 2010
@@ -345,7 +345,7 @@
     public String getFindNextMessagesStatement() {
         if (findNextMessagesStatement == null) {
             findNextMessagesStatement = "SELECT ID, MSG FROM " + getFullMessageTableName()
-                                        + " WHERE CONTAINER=? AND ID > ? ORDER BY ID";
+                                        + " WHERE CONTAINER=? AND ID >= ? ORDER BY ID";
         }
         return findNextMessagesStatement;
     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java?rev=906054&r1=906053&r2=906054&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
Wed Feb  3 14:07:08 2010
@@ -742,9 +742,6 @@
             int maxReturned, JDBCMessageRecoveryListener listener) throws Exception {
         PreparedStatement s = null;
         ResultSet rs = null;
-        long id = 0;
-        List<Long> cleanupIds = new ArrayList<Long>();
-        int index = 0;
         try {
             s = c.getConnection().prepareStatement(this.statements.getFindNextMessagesStatement());
             s.setMaxRows(maxReturned * 2);
@@ -754,15 +751,8 @@
             int count = 0;
             if (this.statements.isUseExternalMessageReferences()) {
                 while (rs.next() && count < maxReturned) {
-                    id = rs.getLong(1);
-                    if (this.lastRecoveredMessagesIds.contains(id)) {
-                        // this message was already recovered
-                        cleanupIds.add(id);
-                        continue;
-                    }
                     if (listener.recoverMessageReference(rs.getString(1))) {
                         count++;
-                        this.lastRecoveredMessagesIds.add(id);
                     } else {
                         LOG.debug("Stopped recover next messages");
                         break;
@@ -770,27 +760,14 @@
                 }
             } else {
                 while (rs.next() && count < maxReturned) {
-                    id = rs.getLong(1);
-                    if (this.lastRecoveredMessagesIds.contains(id)) {
-                        // this message was already recovered
-                        cleanupIds.add(id);
-                        continue;
-                    }
                     if (listener.recoverMessage(rs.getLong(1), getBinaryData(rs, 2))) {
                         count++;
-                        this.lastRecoveredMessagesIds.add(id);
                     } else {
                         LOG.debug("Stopped recover next messages");
                         break;
                     }
                 }
             }
-            // not cleanup the list of recovered messages
-            index = 0;
-            Iterator<Long> it = cleanupIds.iterator();
-            while (it.hasNext() && index < count) {
-                this.lastRecoveredMessagesIds.remove(it.next());
-            }
         } catch (Exception e) {
             e.printStackTrace();
         } finally {

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/NegativeQueueTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/NegativeQueueTest.java?rev=906054&r1=906053&r2=906054&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/NegativeQueueTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/NegativeQueueTest.java
Wed Feb  3 14:07:08 2010
@@ -18,6 +18,7 @@
 
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Date;
 import java.util.List;
 import java.util.Properties;
@@ -155,7 +156,7 @@
         MessageProducer producer = session.createProducer(queue);
         List<TextMessage> senderList = new ArrayList<TextMessage>();
         for (int i = 0; i < MESSAGE_COUNT; i++) {
-            TextMessage msg = session.createTextMessage(formatter.format(new Date()));
+            TextMessage msg = session.createTextMessage(i + " " + formatter.format(new Date()));
             senderList.add(msg);
             producer.send(msg);
             if(TRANSACTED) session.commit();
@@ -268,6 +269,7 @@
         super.tearDown();
         if (broker != null) {
             broker.stop();
+            broker.waitUntilStopped();
         }
     }
 
@@ -285,6 +287,7 @@
         BrokerService answer = new BrokerService();
         configureBroker(answer);
         answer.start();
+        answer.waitUntilStarted();
         return answer;
     }
     

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCNegativeQueueTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCNegativeQueueTest.java?rev=906054&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCNegativeQueueTest.java
(added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCNegativeQueueTest.java
Wed Feb  3 14:07:08 2010
@@ -0,0 +1,75 @@
+package org.apache.activemq.store.jdbc;
+
+import java.io.PrintStream;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.cursors.NegativeQueueTest;
+import org.apache.derby.jdbc.EmbeddedDataSource;
+
+public class JDBCNegativeQueueTest extends NegativeQueueTest {
+
+    EmbeddedDataSource dataSource;
+    
+    protected void configureBroker(BrokerService answer) throws Exception {
+        super.configureBroker(answer);
+        JDBCPersistenceAdapter jdbc = new JDBCPersistenceAdapter();
+        dataSource = new EmbeddedDataSource();
+        dataSource.setDatabaseName("derbyDb");
+        dataSource.setCreateDatabase("create");
+        jdbc.setDataSource(dataSource);
+        answer.setPersistenceAdapter(jdbc);
+    }
+
+    protected void tearDown() throws Exception {
+        /*Connection conn = dataSource.getConnection();
+        printQuery(conn, "Select * from ACTIVEMQ_MSGS", System.out); */
+        super.tearDown();
+    }
+    
+    
+    private void printQuery(Connection c, String query, PrintStream out)
+            throws SQLException {
+        printQuery(c.prepareStatement(query), out);
+    }
+
+    private void printQuery(PreparedStatement s, PrintStream out)
+            throws SQLException {
+
+        ResultSet set = null;
+        try {
+            set = s.executeQuery();
+            ResultSetMetaData metaData = set.getMetaData();
+            for (int i = 1; i <= metaData.getColumnCount(); i++) {
+                if (i == 1)
+                    out.print("||");
+                out.print(metaData.getColumnName(i) + "||");
+            }
+            out.println();
+            while (set.next()) {
+                for (int i = 1; i <= metaData.getColumnCount(); i++) {
+                    if (i == 1)
+                        out.print("|");
+                    out.print(set.getString(i) + "|");
+                }
+                out.println();
+            }
+        } finally {
+            try {
+                set.close();
+            } catch (Throwable ignore) {
+            }
+            try {
+                s.close();
+            } catch (Throwable ignore) {
+            }
+        }
+    }
+
+    
+    
+}

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCTestMemory.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCTestMemory.java?rev=906054&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCTestMemory.java
(added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCTestMemory.java
Wed Feb  3 14:07:08 2010
@@ -0,0 +1,134 @@
+package org.apache.activemq.store.jdbc;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import junit.framework.TestCase;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.derby.jdbc.EmbeddedDataSource;
+
+
+public class JDBCTestMemory extends TestCase {
+
+    ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
+    Connection conn;
+    Session sess;
+    Destination dest;
+    
+    BrokerService broker;
+    
+    protected void setUp() throws Exception {
+        broker = createBroker();
+        broker.start();
+        broker.waitUntilStarted();
+    }
+
+    protected void tearDown() throws Exception {
+        broker.stop();
+    }
+
+    protected BrokerService createBroker() throws Exception {
+        BrokerService broker = new BrokerService();
+        broker.setUseJmx(true);
+        JDBCPersistenceAdapter jdbc = new JDBCPersistenceAdapter();
+        EmbeddedDataSource dataSource = new EmbeddedDataSource();
+        dataSource.setDatabaseName("derbyDb");
+        dataSource.setCreateDatabase("create");
+        jdbc.setDataSource(dataSource);
+        
+        jdbc.deleteAllMessages();
+        broker.setPersistenceAdapter(jdbc);
+        broker.addConnector("tcp://0.0.0.0:61616");
+        return broker;
+    }
+    
+    protected BrokerService createRestartedBroker() throws Exception {
+        BrokerService broker = new BrokerService();
+        broker.setUseJmx(true);
+        JDBCPersistenceAdapter jdbc = new JDBCPersistenceAdapter();
+        EmbeddedDataSource dataSource = new EmbeddedDataSource();
+        dataSource.setDatabaseName("derbyDb");
+        dataSource.setCreateDatabase("create");
+        jdbc.setDataSource(dataSource);
+        broker.setPersistenceAdapter(jdbc);
+        broker.addConnector("tcp://0.0.0.0:61616");
+        return broker;
+    }
+    
+    public void init() throws Exception {
+        conn = factory.createConnection();
+        conn.start();
+        sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        dest = sess.createQueue("test");        
+    }
+    
+    public void testRecovery() throws Exception {
+        init();
+        MessageProducer producer = sess.createProducer(dest);
+        for (int i = 0; i < 1000; i++) {
+            producer.send(sess.createTextMessage("test"));
+        }
+        producer.close();
+        sess.close();
+        conn.close();
+        
+        broker.stop();
+        broker.waitUntilStopped();
+        broker = createRestartedBroker();
+        broker.start();
+        broker.waitUntilStarted();
+        
+        init();
+        
+        for (int i = 0; i < 10; i++) {
+            new Thread("Producer " + i) {
+
+                public void run() {
+                    try {
+                        MessageProducer producer = sess.createProducer(dest);
+                        for (int i = 0; i < 15000; i++) {
+                            producer.send(sess.createTextMessage("test"));
+                            if (i % 100 == 0) {
+                                System.out.println(getName() + " sent message " + i);
+                            }
+                        }
+                        producer.close();
+                    } catch (Exception e) {
+                        e.printStackTrace();
+                    }
+                }
+                
+            }.start();
+            
+            new Thread("Consumer " + i) {
+
+                public void run() {
+                    try {
+                        MessageConsumer consumer = sess.createConsumer(dest);
+                        for (int i = 0; i < 15000; i++) {
+                            consumer.receive(2000);
+                            if (i % 100 == 0) {
+                                System.out.println(getName() + " received message " + i);
+                            }
+                        }
+                        consumer.close();
+                    } catch (Exception e) {
+                        e.printStackTrace();
+                    }
+                }
+                
+            }.start();
+        }
+        
+        // Check out JConsole
+        System.in.read();
+        sess.close();
+        conn.close();
+    }
+    
+}



Mime
View raw message