activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cmacn...@apache.org
Subject svn commit: r829225 - in /activemq/sandbox/activemq-apollo: activemq-broker/src/main/java/org/apache/activemq/apollo/broker/ activemq-openwire/src/test/java/org/apache/activemq/openwire/
Date Fri, 23 Oct 2009 20:47:35 GMT
Author: cmacnaug
Date: Fri Oct 23 20:47:34 2009
New Revision: 829225

URL: http://svn.apache.org/viewvc?rev=829225&view=rev
Log:
Minor cleanup changes relating to Transactions

Modified:
    activemq/sandbox/activemq-apollo/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerMessageDelivery.java
    activemq/sandbox/activemq-apollo/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/DurableSubscription.java
    activemq/sandbox/activemq-apollo/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/MessageDelivery.java
    activemq/sandbox/activemq-apollo/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/MessageDeliveryWrapper.java
    activemq/sandbox/activemq-apollo/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Transaction.java
    activemq/sandbox/activemq-apollo/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/XATransaction.java
    activemq/sandbox/activemq-apollo/activemq-openwire/src/test/java/org/apache/activemq/openwire/BrokerTest.java

Modified: activemq/sandbox/activemq-apollo/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerMessageDelivery.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerMessageDelivery.java?rev=829225&r1=829224&r2=829225&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerMessageDelivery.java
(original)
+++ activemq/sandbox/activemq-apollo/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerMessageDelivery.java
Fri Oct 23 20:47:34 2009
@@ -36,7 +36,7 @@
 
     // A non null pending save indicates that the message is the
     // saver queue and that the message
-    OperationContext pendingSave;
+    OperationContext<?> pendingSave;
 
     // List of persistent targets for which the message should be saved
     // when dispatch is complete:

Modified: activemq/sandbox/activemq-apollo/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/DurableSubscription.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/DurableSubscription.java?rev=829225&r1=829224&r2=829225&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/DurableSubscription.java
(original)
+++ activemq/sandbox/activemq-apollo/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/DurableSubscription.java
Fri Oct 23 20:47:34 2009
@@ -20,7 +20,6 @@
 import org.apache.activemq.filter.BooleanExpression;
 import org.apache.activemq.filter.FilterException;
 import org.apache.activemq.filter.MessageEvaluationContext;
-import org.apache.activemq.flow.IFlowSink;
 import org.apache.activemq.flow.ISourceController;
 import org.apache.activemq.queue.IQueue;
 import org.apache.activemq.queue.Subscription;

Modified: activemq/sandbox/activemq-apollo/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/MessageDelivery.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/MessageDelivery.java?rev=829225&r1=829224&r2=829225&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/MessageDelivery.java
(original)
+++ activemq/sandbox/activemq-apollo/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/MessageDelivery.java
Fri Oct 23 20:47:34 2009
@@ -21,7 +21,6 @@
 import org.apache.activemq.broker.store.Store;
 import org.apache.activemq.filter.MessageEvaluationContext;
 import org.apache.activemq.flow.ISourceController;
-import org.apache.activemq.queue.QueueDescriptor;
 import org.apache.activemq.queue.SaveableQueueElement;
 import org.apache.activemq.util.buffer.AsciiBuffer;
 

Modified: activemq/sandbox/activemq-apollo/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/MessageDeliveryWrapper.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/MessageDeliveryWrapper.java?rev=829225&r1=829224&r2=829225&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/MessageDeliveryWrapper.java
(original)
+++ activemq/sandbox/activemq-apollo/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/MessageDeliveryWrapper.java
Fri Oct 23 20:47:34 2009
@@ -21,7 +21,6 @@
 import org.apache.activemq.broker.store.Store.MessageRecord;
 import org.apache.activemq.filter.MessageEvaluationContext;
 import org.apache.activemq.flow.ISourceController;
-import org.apache.activemq.queue.QueueDescriptor;
 import org.apache.activemq.queue.SaveableQueueElement;
 import org.apache.activemq.util.buffer.AsciiBuffer;
 

Modified: activemq/sandbox/activemq-apollo/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Transaction.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Transaction.java?rev=829225&r1=829224&r2=829225&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Transaction.java
(original)
+++ activemq/sandbox/activemq-apollo/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Transaction.java
Fri Oct 23 20:47:34 2009
@@ -270,6 +270,7 @@
         }
 
         public final void onRollback(ISourceController<?> controller) {
+            //Nothing to do here, message just gets dropped:
             return;
         }
 
@@ -341,7 +342,9 @@
          * @see org.apache.activemq.apollo.broker.Transaction.TxOp#onRollback()
          */
         public final void onRollback(ISourceController<?> controller) {
-            // TODO unaquire the element.
+            //No-Op for now, it is possible that we'd want to unaquire these
+            //in the queue if the client weren't to keep these
+            //around
         }
 
         /*

Modified: activemq/sandbox/activemq-apollo/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/XATransaction.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/XATransaction.java?rev=829225&r1=829224&r2=829225&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/XATransaction.java
(original)
+++ activemq/sandbox/activemq-apollo/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/XATransaction.java
Fri Oct 23 20:47:34 2009
@@ -20,7 +20,6 @@
 
 import javax.transaction.xa.XAException;
 
-import org.apache.activemq.apollo.broker.Transaction.TransactionListener;
 import org.apache.activemq.queue.IQueue;
 import org.apache.activemq.util.buffer.Buffer;
 

Modified: activemq/sandbox/activemq-apollo/activemq-openwire/src/test/java/org/apache/activemq/openwire/BrokerTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/activemq-openwire/src/test/java/org/apache/activemq/openwire/BrokerTest.java?rev=829225&r1=829224&r2=829225&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo/activemq-openwire/src/test/java/org/apache/activemq/openwire/BrokerTest.java
(original)
+++ activemq/sandbox/activemq-apollo/activemq-openwire/src/test/java/org/apache/activemq/openwire/BrokerTest.java
Fri Oct 23 20:47:34 2009
@@ -35,6 +35,7 @@
 import org.apache.activemq.command.ActiveMQTopic;
 import org.apache.activemq.command.ConnectionInfo;
 import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.LocalTransactionId;
 import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageAck;
 import org.apache.activemq.command.ProducerInfo;
@@ -1432,6 +1433,149 @@
         Assert.assertNotNull(m3);
         connection.request(scenario.createAck(consumerInfo, m3, 1, MessageAck.DELIVERED_ACK_TYPE));
     }
+	
+	@Test(dataProvider = "deliveryMode-all-destinations-combinations")
+	public void testTransactedSend(BrokerTestScenario scenario) throws Exception {
+
+        // Setup a first connection
+        StubConnection connection1 = scenario.createConnection();
+        ConnectionInfo connectionInfo1 = scenario.createConnectionInfo();
+        SessionInfo sessionInfo1 = scenario.createSessionInfo(connectionInfo1);
+        ProducerInfo producerInfo1 = scenario.createProducerInfo(sessionInfo1);
+        connection1.send(connectionInfo1);
+        connection1.send(sessionInfo1);
+        connection1.send(producerInfo1);
+
+        scenario.destination = scenario.createDestinationInfo(connection1, connectionInfo1,
scenario.destinationType);
+
+        ConsumerInfo consumerInfo1 = scenario.createConsumerInfo(sessionInfo1, scenario.destination);
+        consumerInfo1.setPrefetchSize(100);
+        connection1.send(consumerInfo1);
+
+        // Begin the transaction.
+        LocalTransactionId txid = scenario.createLocalTransaction(sessionInfo1);
+        connection1.send(scenario.createBeginTransaction(connectionInfo1, txid));
+
+        // Send the messages
+        for (int i = 0; i < 4; i++) {
+            Message message = scenario.createMessage(producerInfo1, scenario.destination,
scenario.deliveryMode);
+            message.setTransactionId(txid);
+            connection1.request(message);
+        }
+
+        // The point of this test is that message should not be delivered until
+        // send is committed.
+        Assert.assertNull(scenario.receiveMessage(connection1,scenario.MAX_NULL_WAIT));
+
+        // Commit the transaction.
+        connection1.send(scenario.createCommitTransaction1Phase(connectionInfo1, txid));
+
+        // Now get the messages.
+        for (int i = 0; i < 4; i++) {
+            Message m1 = scenario.receiveMessage(connection1);
+            Assert.assertNotNull(m1);
+        }
+
+        scenario.assertNoMessagesLeft(connection1);
+    }
+
+	@Test(dataProvider = "deliveryMode-all-destinations-combinations")
+	public void testTransactedAckWithPrefetchOfOne(BrokerTestScenario scenario) throws Exception
{
 
+        // Setup a first connection
+        StubConnection connection1 = scenario.createConnection();
+        ConnectionInfo connectionInfo1 = scenario.createConnectionInfo();
+        SessionInfo sessionInfo1 = scenario.createSessionInfo(connectionInfo1);
+        ProducerInfo producerInfo1 = scenario.createProducerInfo(sessionInfo1);
+        connection1.send(connectionInfo1);
+        connection1.send(sessionInfo1);
+        connection1.send(producerInfo1);
+
+        scenario.destination = scenario.createDestinationInfo(connection1, connectionInfo1,
scenario.destinationType);
+
+        ConsumerInfo consumerInfo1 = scenario.createConsumerInfo(sessionInfo1, scenario.destination);
+        consumerInfo1.setPrefetchSize(1);
+        connection1.send(consumerInfo1);
+
+        // Send the messages
+        for (int i = 0; i < 4; i++) {
+            Message message = scenario.createMessage(producerInfo1, scenario.destination,
scenario.deliveryMode);
+            connection1.send(message);
+        }
+
+       
+
+        // Now get the messages.
+        for (int i = 0; i < 4; i++) {
+            // Begin the transaction.
+            LocalTransactionId txid = scenario.createLocalTransaction(sessionInfo1);
+            connection1.send(scenario.createBeginTransaction(connectionInfo1, txid));
+            Message m1 = scenario.receiveMessage(connection1);
+            Assert.assertNotNull(m1);
+            MessageAck ack = scenario.createAck(consumerInfo1, m1, 1, MessageAck.STANDARD_ACK_TYPE);
+            ack.setTransactionId(txid);
+            connection1.send(ack);
+         // Commit the transaction.
+            connection1.send(scenario.createCommitTransaction1Phase(connectionInfo1, txid));
+        }
+        scenario.assertNoMessagesLeft(connection1);
+    }
+	
+	@Test(dataProvider = "deliveryMode-all-destinations-combinations")
+    public void testTransactedAckRollbackWithPrefetchOfOne(BrokerTestScenario scenario) throws
Exception {
+
+        // Setup a first connection
+        StubConnection connection1 = scenario.createConnection();
+        ConnectionInfo connectionInfo1 = scenario.createConnectionInfo();
+        SessionInfo sessionInfo1 = scenario.createSessionInfo(connectionInfo1);
+        ProducerInfo producerInfo1 = scenario.createProducerInfo(sessionInfo1);
+        connection1.send(connectionInfo1);
+        connection1.send(sessionInfo1);
+        connection1.send(producerInfo1);
+
+        scenario.destination = scenario.createDestinationInfo(connection1, connectionInfo1,
scenario.destinationType);
+
+        ConsumerInfo consumerInfo1 = scenario.createConsumerInfo(sessionInfo1, scenario.destination);
+        consumerInfo1.setPrefetchSize(1);
+        connection1.send(consumerInfo1);
+
+        // Send the messages
+        for (int i = 0; i < 4; i++) {
+            Message message = scenario.createMessage(producerInfo1, scenario.destination,
scenario.deliveryMode);
+            connection1.send(message);
+        }
+
+        // Now get the messages.
+        LocalTransactionId txid = scenario.createLocalTransaction(sessionInfo1);
+        // Begin the transaction.
+        connection1.send(scenario.createBeginTransaction(connectionInfo1, txid));
+        for (int i = 0; i < 4; i++) {
+            Message m1 = scenario.receiveMessage(connection1);
+            Assert.assertNotNull(m1);
+            MessageAck ack = scenario.createAck(consumerInfo1, m1, 1, MessageAck.STANDARD_ACK_TYPE);
+            ack.setTransactionId(txid);
+            connection1.send(ack);
+        }
+        
+        // Rollback the transaction:
+        connection1.send(scenario.createRollbackTransaction(connectionInfo1, txid));
+        
+        for (int i = 0; i < 4; i++) {
+            // Begin the transaction.
+            txid = scenario.createLocalTransaction(sessionInfo1);
+            connection1.send(scenario.createBeginTransaction(connectionInfo1, txid));
+            Message m1 = scenario.receiveMessage(connection1);
+            Assert.assertNotNull(m1);
+            MessageAck ack = scenario.createAck(consumerInfo1, m1, 1, MessageAck.STANDARD_ACK_TYPE);
+            ack.setTransactionId(txid);
+            connection1.send(ack);
+        }
+        // Commit the transaction.
+        connection1.send(scenario.createCommitTransaction1Phase(connectionInfo1, txid));
+        
+        
+        scenario.assertNoMessagesLeft(connection1);
+    }
+	
 
 }



Mime
View raw message