activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject svn commit: r1512332 - in /activemq/trunk: activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2149Test.java
Date Fri, 09 Aug 2013 15:11:11 GMT
Author: gtully
Date: Fri Aug  9 15:11:11 2013
New Revision: 1512332

URL: http://svn.apache.org/r1512332
Log:
fix up failure - still leveldb variant problem that needs work - testQueueTransactionalOrderWithRestart
- org.apache.activemq.bugs.AMQ2149LevelDBTest

Modified:
    activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
    activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2149Test.java

Modified: activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala?rev=1512332&r1=1512331&r2=1512332&view=diff
==============================================================================
--- activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
(original)
+++ activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
Fri Aug  9 15:11:11 2013
@@ -450,7 +450,7 @@ class LevelDBStore extends LockableServi
   def rollback(txid: TransactionId) = {
     transactions.remove(txid) match {
       case null =>
-        println("The transaction does not exist")
+        debug("on rollback, the transaction " + txid + " does not exist")
       case tx =>
         if( tx.prepared ) {
           val done = new CountDownLatch(1)
@@ -470,7 +470,7 @@ class LevelDBStore extends LockableServi
   def prepare(tx: TransactionId) = {
     transactions.get(tx) match {
       case null =>
-        println("The transaction does not exist")
+        warn("on prepare, the transaction " + tx + " does not exist")
       case tx =>
         tx.prepare
     }

Modified: activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2149Test.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2149Test.java?rev=1512332&r1=1512331&r2=1512332&view=diff
==============================================================================
--- activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2149Test.java
(original)
+++ activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2149Test.java
Fri Aug  9 15:11:11 2013
@@ -18,20 +18,13 @@
 package org.apache.activemq.bugs;
 
 import java.io.File;
+import java.lang.IllegalStateException;
+import java.util.HashSet;
 import java.util.Timer;
 import java.util.TimerTask;
 import java.util.Vector;
 
-import javax.jms.Connection;
-import javax.jms.DeliveryMode;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.Topic;
-import javax.jms.TransactionRolledBackException;
+import javax.jms.*;
 
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.AutoFailTestSupport;
@@ -127,6 +120,7 @@ public class AMQ2149Test extends AutoFai
         return stringBuilder.toString();
     }
 
+    HashSet<Connection> connections = new HashSet<Connection>();
     private class Receiver implements MessageListener {
 
         private final javax.jms.Destination dest;
@@ -157,6 +151,7 @@ public class AMQ2149Test extends AutoFai
             }
             messageConsumer.setMessageListener(this);
             connection.start();
+            connections.add(connection);
         }
 
         public void close() throws JMSException {
@@ -208,6 +203,8 @@ public class AMQ2149Test extends AutoFai
                     // in doubt - either commit command or reply missing
                     // don't know if we will get a replay
                     resumeOnNextOrPreviousIsOk = true;
+                    nextExpectedSeqNum++;
+                    LOG.info("in doubt transaction completion: ok to get next or previous
batch. next:" + nextExpectedSeqNum);
                 } else {
                     resumeOnNextOrPreviousIsOk = false;
                     // batch will be replayed
@@ -242,6 +239,7 @@ public class AMQ2149Test extends AutoFai
             messageProducer = session.createProducer(dest);
             messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
             connection.start();
+            connections.add(connection);
         }
 
         public void run() {
@@ -258,7 +256,11 @@ public class AMQ2149Test extends AutoFai
                     if ((nextSequenceNumber % 500) == 0) {
                         LOG.info(dest + " sent " + nextSequenceNumber);
                     }
-                        
+
+                } catch (javax.jms.IllegalStateException e) {
+                    LOG.error(dest + " bailing on send error", e);
+                    exceptions.add(e);
+                    break;
                 } catch (Exception e) {
                     LOG.error(dest + " send error", e);
                     exceptions.add(e);
@@ -278,6 +280,52 @@ public class AMQ2149Test extends AutoFai
         }
     }
 
+    // attempt to simply replicate leveldb failure. no joy yet
+    public void x_testRestartReReceive() throws Exception {
+        createBroker(new Configurer() {
+            public void configure(BrokerService broker) throws Exception {
+                broker.deleteAllMessages();
+            }
+        });
+
+        final javax.jms.Destination destination =
+                ActiveMQDestination.createDestination("test.dest.X", ActiveMQDestination.QUEUE_TYPE);
+        Thread thread = new Thread(new Sender(destination));
+        thread.start();
+        thread.join();
+
+        Connection connection = new ActiveMQConnectionFactory(brokerURL).createConnection();
+        connection.setClientID(destination.toString());
+        Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+        MessageConsumer messageConsumer = session.createConsumer(destination);
+        connection.start();
+
+        int batch = 200;
+        long expectedSeq;
+
+        final TimerTask restartTask = schedualRestartTask(null, new Configurer() {
+            public void configure(BrokerService broker) throws Exception {
+            }
+        });
+
+        expectedSeq = 0;
+        for (int s = 0; s < 4; s++) {
+            for (int i = 0; i < batch; i++) {
+                Message message = messageConsumer.receive(20000);
+                assertNotNull("s:" + s + ", i:" + i, message);
+                final long seqNum = message.getLongProperty(SEQ_NUM_PROPERTY);
+                assertEquals("expected order s:" + s, expectedSeq++, seqNum);
+
+                if (i > 0 && i%600 == 0) {
+                    LOG.info("Commit on %5");
+                //    session.commit();
+                }
+            }
+            restartTask.run();
+        }
+
+    }
+
     // no need to run this unless there are some issues with the others
     public void vanilaVerify_testOrder() throws Exception {
         
@@ -381,7 +429,7 @@ public class AMQ2149Test extends AutoFai
         }
     }
 
-    private void schedualRestartTask(final Timer timer, final Configurer configurer) {
+    private TimerTask schedualRestartTask(final Timer timer, final Configurer configurer)
{
         class RestartTask extends TimerTask {
             public void run() {
                 synchronized (brokerLock) {
@@ -402,18 +450,22 @@ public class AMQ2149Test extends AutoFai
                         exceptions.add(e);
                     }
                 }
-                if (++numBrokerRestarts < MAX_BROKER_RESTARTS) {
+                if (++numBrokerRestarts < MAX_BROKER_RESTARTS && timer != null)
{
                     // do it again
                     try {
                         timer.schedule(new RestartTask(), brokerStopPeriod);
-                    } catch (IllegalStateException ignore_alreadyCancelled) {   
+                    } catch (IllegalStateException ignore_alreadyCancelled) {
                     }
                 } else {
                     LOG.info("no longer stopping broker on reaching Max restarts: " + MAX_BROKER_RESTARTS);
                 }
-            } 
+            }
         }
-        timer.schedule(new RestartTask(), brokerStopPeriod);
+        RestartTask task = new RestartTask();
+        if (timer != null) {
+            timer.schedule(task, brokerStopPeriod);
+        }
+        return task;
     }
     
     private void verifyOrderedMessageReceipt(byte destinationType) throws Exception {
@@ -446,8 +498,8 @@ public class AMQ2149Test extends AutoFai
                 threads.remove(sendThread);
             }
         }
-        LOG.info("senders done...");
-        
+        LOG.info("senders done..." + threads);
+
         while(!receivers.isEmpty() && System.currentTimeMillis() < expiry) {
             Receiver receiver = receivers.firstElement();
             if (receiver.getNextExpectedSeqNo() >= numtoSend || !exceptions.isEmpty())
{
@@ -459,6 +511,20 @@ public class AMQ2149Test extends AutoFai
         if (!exceptions.isEmpty()) {
             exceptions.get(0).printStackTrace();
         }
+
+        for (Connection connection : connections) {
+            try {
+                connection.close();
+            } catch (Exception ignored) {}
+        }
+        connections.clear();
+
+        LOG.info("Dangling threads: " + threads);
+        for (Thread dangling : threads) {
+            dangling.interrupt();
+            dangling.join(10*1000);
+        }
+
         assertTrue("No exceptions", exceptions.isEmpty());
     }
 



Mime
View raw message