activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dej...@apache.org
Subject svn commit: r1343171 - in /activemq/trunk: activemq-camel/pom.xml activemq-camel/src/test/java/org/apache/activemq/camel/JmsJdbcXALoadTest.java activemq-pool/src/main/java/org/apache/activemq/pool/XaConnectionPool.java
Date Mon, 28 May 2012 09:28:19 GMT
Author: dejanb
Date: Mon May 28 09:28:18 2012
New Revision: 1343171

URL: http://svn.apache.org/viewvc?rev=1343171&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-3863 - xa session pooling problem

Added:
    activemq/trunk/activemq-camel/src/test/java/org/apache/activemq/camel/JmsJdbcXALoadTest.java
Modified:
    activemq/trunk/activemq-camel/pom.xml
    activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/XaConnectionPool.java

Modified: activemq/trunk/activemq-camel/pom.xml
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-camel/pom.xml?rev=1343171&r1=1343170&r2=1343171&view=diff
==============================================================================
--- activemq/trunk/activemq-camel/pom.xml (original)
+++ activemq/trunk/activemq-camel/pom.xml Mon May 28 09:28:18 2012
@@ -206,6 +206,9 @@
               <value>target/</value>
             </property>
           </systemProperties>
+          <excludes>
+              <exclude>**/JmsJdbcXALoadTest.java</exclude><!-- used only for
manual xa performance testing -->
+          </excludes>
         </configuration>
       </plugin>
     </plugins>

Added: activemq/trunk/activemq-camel/src/test/java/org/apache/activemq/camel/JmsJdbcXALoadTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-camel/src/test/java/org/apache/activemq/camel/JmsJdbcXALoadTest.java?rev=1343171&view=auto
==============================================================================
--- activemq/trunk/activemq-camel/src/test/java/org/apache/activemq/camel/JmsJdbcXALoadTest.java
(added)
+++ activemq/trunk/activemq-camel/src/test/java/org/apache/activemq/camel/JmsJdbcXALoadTest.java
Mon May 28 09:28:18 2012
@@ -0,0 +1,149 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.camel;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.util.Wait;
+import org.apache.camel.test.junit4.CamelSpringTestSupport;
+import org.apache.commons.dbcp.BasicDataSource;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.context.support.AbstractXmlApplicationContext;
+import org.springframework.context.support.ClassPathXmlApplicationContext;
+
+import javax.jms.Connection;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+
+public class JmsJdbcXALoadTest extends CamelSpringTestSupport {
+    private static final Logger LOG = LoggerFactory.getLogger(JmsJdbcXATest.class);
+    BrokerService broker = null;
+    int messageCount;
+
+    public java.sql.Connection initDb() throws Exception {
+        String createStatement =
+                "CREATE TABLE SCP_INPUT_MESSAGES (" +
+                        "id int NOT NULL GENERATED ALWAYS AS IDENTITY, " +
+                        "messageId varchar(96) NOT NULL, " +
+                        "messageCorrelationId varchar(96) NOT NULL, " +
+                        "messageContent varchar(2048) NOT NULL, " +
+                        "PRIMARY KEY (id) )";
+
+        java.sql.Connection conn = getJDBCConnection();
+        try {
+            conn.createStatement().execute(createStatement);
+        } catch (SQLException alreadyExists) {
+            log.info("ex on create tables", alreadyExists);
+        }
+
+        try {
+            conn.createStatement().execute("DELETE FROM SCP_INPUT_MESSAGES");
+        } catch (SQLException ex) {
+            log.info("ex on create delete all", ex);
+        }
+
+        return conn;
+    }
+
+    private java.sql.Connection getJDBCConnection() throws Exception {
+        BasicDataSource dataSource = getMandatoryBean(BasicDataSource.class, "managedDataSourceWithRecovery");
+        return dataSource.getConnection();
+    }
+
+    private int dumpDb(java.sql.Connection jdbcConn) throws Exception {
+        int count = 0;
+        ResultSet resultSet = jdbcConn.createStatement().executeQuery("SELECT * FROM SCP_INPUT_MESSAGES");
+        while (resultSet.next()) {
+            count++;
+        }
+        log.info(count + " messages");
+        return count;
+    }
+
+    @Test
+    public void testRecoveryCommit() throws Exception {
+        java.sql.Connection jdbcConn = initDb();
+        final int count = 1000;
+
+        sendJMSMessageToKickOffRoute(count);
+
+
+        final java.sql.Connection freshConnection = getJDBCConnection();
+        assertTrue("did not get replay", Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return count == dumpDb(freshConnection);
+            }
+        }, 20*60*1000));
+        assertEquals("still one message in db", count, dumpDb(freshConnection));
+    }
+
+    private void sendJMSMessageToKickOffRoute(int count) throws Exception {
+        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://testXA");
+        factory.setWatchTopicAdvisories(false);
+        Connection connection = factory.createConnection();
+        connection.start();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageProducer producer = session.createProducer(new ActiveMQQueue("scp_transacted"));
+        for (int i = 0; i < count; i++) {
+            TextMessage message = session.createTextMessage("Some Text, messageCount:" +
messageCount++);
+            message.setJMSCorrelationID("pleaseCorrelate");
+            producer.send(message);
+        }
+        connection.close();
+    }
+
+    private BrokerService createBroker(boolean deleteAllMessages) throws Exception {
+        BrokerService brokerService = new BrokerService();
+        brokerService.setDeleteAllMessagesOnStartup(deleteAllMessages);
+        brokerService.setBrokerName("testXA");
+        brokerService.setAdvisorySupport(false);
+        brokerService.setUseJmx(false);
+        brokerService.setDataDirectory("target/data");
+        brokerService.addConnector("tcp://0.0.0.0:61616");
+        return brokerService;
+    }
+
+    @Override
+    protected AbstractXmlApplicationContext createApplicationContext() {
+
+        deleteDirectory("target/data/howl");
+
+        // make broker available to recovery processing on app context start
+        try {
+            broker = createBroker(true);
+            broker.start();
+        } catch (Exception e) {
+            throw new RuntimeException("Failed to start broker", e);
+        }
+
+        return new ClassPathXmlApplicationContext("org/apache/activemq/camel/jmsXajdbc.xml");
+    }
+
+
+
+    @Override
+    public void tearDown() throws Exception {
+        super.tearDown();
+    }
+}

Modified: activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/XaConnectionPool.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/XaConnectionPool.java?rev=1343171&r1=1343170&r2=1343171&view=diff
==============================================================================
--- activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/XaConnectionPool.java
(original)
+++ activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/XaConnectionPool.java
Mon May 28 09:28:18 2012
@@ -16,6 +16,9 @@
  */
 package org.apache.activemq.pool;
 
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.commons.pool.ObjectPoolFactory;
+
 import javax.jms.JMSException;
 import javax.jms.Session;
 import javax.transaction.RollbackException;
@@ -24,9 +27,6 @@ import javax.transaction.SystemException
 import javax.transaction.TransactionManager;
 import javax.transaction.xa.XAResource;
 
-import org.apache.activemq.ActiveMQConnection;
-import org.apache.commons.pool.ObjectPoolFactory;
-
 /**
  * An XA-aware connection pool.  When a session is created and an xa transaction is active,
  * the session will automatically be enlisted in the current transaction.
@@ -56,6 +56,8 @@ public class XaConnectionPool extends Co
                 transactionManager.getTransaction().registerSynchronization(new Synchronization(session));
                 incrementReferenceCount();
                 transactionManager.getTransaction().enlistResource(createXaResource(session));
+            } else {
+                session.setIgnoreClose(false);
             }
             return session;
         } catch (RollbackException e) {
@@ -89,6 +91,7 @@ public class XaConnectionPool extends Co
                 // This will return session to the pool.
                 session.setIgnoreClose(false);
                 session.close();
+                session.setIgnoreClose(true);
                 session.setIsXa(false);
                 decrementReferenceCount();
             } catch (JMSException e) {



Mime
View raw message