activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From djen...@apache.org
Subject svn commit: r735912 - in /activemq/trunk: activemq-core/src/test/java/org/apache/activemq/ activemq-core/src/test/java/org/apache/activemq/test/ activemq-ra/src/test/java/org/apache/activemq/ra/
Date Tue, 20 Jan 2009 02:06:37 GMT
Author: djencks
Date: Mon Jan 19 18:06:37 2009
New Revision: 735912

URL: http://svn.apache.org/viewvc?rev=735912&view=rev
Log:
AMQ-2078 extend transaction tests to xa in ra

Added:
    activemq/trunk/activemq-ra/src/test/java/org/apache/activemq/ra/JmsXAQueueTransactionTest.java
  (with props)
Modified:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsQueueTransactionTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsTransactionTestSupport.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/test/JmsResourceProvider.java

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsQueueTransactionTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsQueueTransactionTest.java?rev=735912&r1=735911&r2=735912&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsQueueTransactionTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsQueueTransactionTest.java
Mon Jan 19 18:06:37 2009
@@ -56,18 +56,21 @@
         Message[] outbound = new Message[] {session.createTextMessage("First Message"), session.createTextMessage("Second
Message")};
 
         // lets consume any outstanding messages from previous test runs
+        beginTx();
         while (consumer.receive(1000) != null) {
         }
-        session.commit();
+        commitTx();
 
+        beginTx();
         producer.send(outbound[0]);
         producer.send(outbound[1]);
-        session.commit();
+        commitTx();
 
         LOG.info("Sent 0: " + outbound[0]);
         LOG.info("Sent 1: " + outbound[1]);
 
         ArrayList<Message> messages = new ArrayList<Message>();
+        beginTx();
         Message message = consumer.receive(1000);
         assertEquals(outbound[0], message);
 
@@ -80,6 +83,7 @@
 
         // Consume again.. the previous message should
         // get redelivered.
+        beginTx();
         message = consumer.receive(5000);
         assertNotNull("Should have re-received the first message again!", message);
         messages.add(message);
@@ -89,7 +93,7 @@
         assertNotNull("Should have re-received the second message again!", message);
         messages.add(message);
         assertEquals(outbound[1], message);
-        session.commit();
+        commitTx();
 
         Message inbound[] = new Message[messages.size()];
         messages.toArray(inbound);
@@ -111,24 +115,28 @@
             // Session that sends messages
             {
                 Session session = resourceProvider.createSession(connection);
+                this.session = session;
                 MessageProducer producer = resourceProvider.createProducer(session, destination);
                 // consumer = resourceProvider.createConsumer(session,
                 // destination);
+                beginTx();
                 producer.send(session.createTextMessage("Test Message: " + i));
-                session.commit();
+                commitTx();
                 session.close();
             }
 
             // Session that consumes messages
             {
                 Session session = resourceProvider.createSession(connection);
+                this.session = session;
                 MessageConsumer consumer = resourceProvider.createConsumer(session, destination);
 
+                beginTx();
                 TextMessage message = (TextMessage)consumer.receive(1000 * 5);
                 assertNotNull("Received only " + i + " messages in batch ", message);
                 assertEquals("Test Message: " + i, message.getText());
 
-                session.commit();
+                commitTx();
                 session.close();
             }
         }
@@ -145,20 +153,24 @@
         Message[] outbound = new Message[] {session.createTextMessage("First Message"), session.createTextMessage("Second
Message"), session.createTextMessage("Third Message")};
 
         // lets consume any outstanding messages from previous test runs
+        beginTx();
         while (consumer.receive(1000) != null) {
         }
-        session.commit();
+        commitTx();
 
+        beginTx();
         producer.send(outbound[0]);
         producer.send(outbound[1]);
         producer.send(outbound[2]);
-        session.commit();
+        commitTx();
 
         // Get the first.
+        beginTx();
         assertEquals(outbound[0], consumer.receive(1000));
         consumer.close();
-        session.commit();
+        commitTx();
         
+        beginTx();
         QueueBrowser browser = session.createBrowser((Queue)destination);
         Enumeration enumeration = browser.getEnumeration();
 
@@ -187,7 +199,7 @@
         assertEquals(outbound[2], consumer.receive(1000));
         consumer.close();
 
-        session.commit();
+        commitTx();
     }
 
 }

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsTransactionTestSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsTransactionTestSupport.java?rev=735912&r1=735911&r2=735912&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsTransactionTestSupport.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsTransactionTestSupport.java
Mon Jan 19 18:06:37 2009
@@ -85,11 +85,31 @@
         resourceProvider = getJmsResourceProvider();
         topic = resourceProvider.isTopic();
         // We will be using transacted sessions.
-        resourceProvider.setTransacted(true);
-        connectionFactory = resourceProvider.createConnectionFactory();
+        setSessionTransacted();
+        connectionFactory = newConnectionFactory();
         reconnect();
     }
 
+    protected void setSessionTransacted() {
+        resourceProvider.setTransacted(true);
+    }
+
+    protected ConnectionFactory newConnectionFactory() throws Exception {
+        return resourceProvider.createConnectionFactory();
+    }
+
+    protected void beginTx() throws Exception {
+        //no-op for local tx
+    }
+
+    protected void commitTx() throws Exception {
+        session.commit();
+    }
+
+    protected void rollbackTx() throws Exception {
+        session.rollback();
+    }
+
     /**
      */
     protected BrokerService createBroker() throws Exception, URISyntaxException {
@@ -124,24 +144,25 @@
     public void testSendReceiveTransactedBatches() throws Exception {
 
         TextMessage message = session.createTextMessage("Batch Message");
-
         for (int j = 0; j < batchCount; j++) {
             LOG.info("Producing bacth " + j + " of " + batchSize + " messages");
 
+            beginTx();
             for (int i = 0; i < batchSize; i++) {
                 producer.send(message);
             }
             messageSent();
-            session.commit();
+            commitTx();
             LOG.info("Consuming bacth " + j + " of " + batchSize + " messages");
 
+            beginTx();
             for (int i = 0; i < batchSize; i++) {
                 message = (TextMessage)consumer.receive(1000 * 5);
                 assertNotNull("Received only " + i + " messages in batch " + j, message);
                 assertEquals("Batch Message", message.getText());
             }
 
-            session.commit();
+            commitTx();
         }
     }
 
@@ -158,18 +179,22 @@
         Message[] outbound = new Message[] {session.createTextMessage("First Message"), session.createTextMessage("Second
Message")};
 
         // sends a message
+        beginTx();
         producer.send(outbound[0]);
-        session.commit();
+        commitTx();
 
         // sends a message that gets rollbacked
+        beginTx();
         producer.send(session.createTextMessage("I'm going to get rolled back."));
-        session.rollback();
+        rollbackTx();
 
         // sends a message
+        beginTx();
         producer.send(outbound[1]);
-        session.commit();
+        commitTx();
 
         // receives the first message
+        beginTx();
         ArrayList<Message> messages = new ArrayList<Message>();
         LOG.info("About to consume message 1");
         Message message = consumer.receive(1000);
@@ -183,26 +208,58 @@
         LOG.info("Received: " + message);
 
         // validates that the rollbacked was not consumed
-        session.commit();
+        commitTx();
         Message inbound[] = new Message[messages.size()];
         messages.toArray(inbound);
         assertTextMessagesEqual("Rollback did not work.", outbound, inbound);
     }
 
     /**
+     * spec section 3.6 acking a message with automation acks has no effect.
+     * @throws Exception
+     */
+    public void testAckMessageInTx() throws Exception {
+        Message[] outbound = new Message[] {session.createTextMessage("First Message")};
+
+        // sends a message
+        beginTx();
+        producer.send(outbound[0]);
+        outbound[0].acknowledge();
+        commitTx();
+        outbound[0].acknowledge();
+
+        // receives the first message
+        beginTx();
+        ArrayList<Message> messages = new ArrayList<Message>();
+        LOG.info("About to consume message 1");
+        Message message = consumer.receive(1000);
+        messages.add(message);
+        LOG.info("Received: " + message);
+
+        // validates that the rollbacked was not consumed
+        commitTx();
+        Message inbound[] = new Message[messages.size()];
+        messages.toArray(inbound);
+        assertTextMessagesEqual("Message not delivered.", outbound, inbound);
+    }
+
+    /**
      * Sends a batch of messages and validates that the message sent before
      * session close is not consumed.
-     * 
+     *
+     * This test only works with local transactions, not xa.
      * @throws Exception
      */
     public void testSendSessionClose() throws Exception {
         Message[] outbound = new Message[] {session.createTextMessage("First Message"), session.createTextMessage("Second
Message")};
 
         // sends a message
+        beginTx();
         producer.send(outbound[0]);
-        session.commit();
+        commitTx();
 
         // sends a message that gets rollbacked
+        beginTx();
         producer.send(session.createTextMessage("I'm going to get rolled back."));
         consumer.close();
 
@@ -210,11 +267,12 @@
 
         // sends a message
         producer.send(outbound[1]);
-        session.commit();
+        commitTx();
 
         // receives the first message
         ArrayList<Message> messages = new ArrayList<Message>();
         LOG.info("About to consume message 1");
+        beginTx();
         Message message = consumer.receive(1000);
         messages.add(message);
         LOG.info("Received: " + message);
@@ -226,7 +284,7 @@
         LOG.info("Received: " + message);
 
         // validates that the rollbacked was not consumed
-        session.commit();
+        commitTx();
         Message inbound[] = new Message[messages.size()];
         messages.toArray(inbound);
         assertTextMessagesEqual("Rollback did not work.", outbound, inbound);
@@ -242,10 +300,12 @@
         Message[] outbound = new Message[] {session.createTextMessage("First Message"), session.createTextMessage("Second
Message")};
 
         // sends a message
+        beginTx();
         producer.send(outbound[0]);
-        session.commit();
+        commitTx();
 
         // sends a message that gets rollbacked
+        beginTx();
         producer.send(session.createTextMessage("I'm going to get rolled back."));
         consumer.close();
         session.close();
@@ -253,12 +313,14 @@
         reconnect();
 
         // sends a message
+        beginTx();
         producer.send(outbound[1]);
-        session.commit();
+        commitTx();
 
         // receives the first message
         ArrayList<Message> messages = new ArrayList<Message>();
         LOG.info("About to consume message 1");
+        beginTx();
         Message message = consumer.receive(1000);
         messages.add(message);
         LOG.info("Received: " + message);
@@ -270,7 +332,7 @@
         LOG.info("Received: " + message);
 
         // validates that the rollbacked was not consumed
-        session.commit();
+        commitTx();
         Message inbound[] = new Message[messages.size()];
         messages.toArray(inbound);
         assertTextMessagesEqual("Rollback did not work.", outbound, inbound);
@@ -286,36 +348,41 @@
         Message[] outbound = new Message[] {session.createTextMessage("First Message"), session.createTextMessage("Second
Message")};
 
         // lets consume any outstanding messages from prev test runs
+        beginTx();
         while (consumer.receive(1000) != null) {
         }
-        session.commit();
+        commitTx();
 
         // sent both messages
+        beginTx();
         producer.send(outbound[0]);
         producer.send(outbound[1]);
-        session.commit();
+        commitTx();
 
         LOG.info("Sent 0: " + outbound[0]);
         LOG.info("Sent 1: " + outbound[1]);
 
         ArrayList<Message> messages = new ArrayList<Message>();
+        beginTx();
         Message message = consumer.receive(1000);
         messages.add(message);
         assertEquals(outbound[0], message);
-        session.commit();
+        commitTx();
 
         // rollback so we can get that last message again.
+        beginTx();
         message = consumer.receive(1000);
         assertNotNull(message);
         assertEquals(outbound[1], message);
-        session.rollback();
+        rollbackTx();
 
         // Consume again.. the prev message should
         // get redelivered.
+        beginTx();
         message = consumer.receive(5000);
         assertNotNull("Should have re-received the message again!", message);
         messages.add(message);
-        session.commit();
+        commitTx();
 
         Message inbound[] = new Message[messages.size()];
         messages.toArray(inbound);
@@ -332,29 +399,33 @@
         Message[] outbound = new Message[] {session.createTextMessage("First Message"), session.createTextMessage("Second
Message")};
 
         // lets consume any outstanding messages from prev test runs
+        beginTx();
         while (consumer.receive(1000) != null) {
         }
-        session.commit();
+        commitTx();
 
         //
+        beginTx();
         producer.send(outbound[0]);
         producer.send(outbound[1]);
-        session.commit();
+        commitTx();
 
         LOG.info("Sent 0: " + outbound[0]);
         LOG.info("Sent 1: " + outbound[1]);
 
         ArrayList<Message> messages = new ArrayList<Message>();
+        beginTx();
         Message message = consumer.receive(1000);
         assertEquals(outbound[0], message);
 
         message = consumer.receive(1000);
         assertNotNull(message);
         assertEquals(outbound[1], message);
-        session.rollback();
+        rollbackTx();
 
         // Consume again.. the prev message should
         // get redelivered.
+        beginTx();
         message = consumer.receive(5000);
         assertNotNull("Should have re-received the first message again!", message);
         messages.add(message);
@@ -365,7 +436,7 @@
         assertEquals(outbound[1], message);
 
         assertNull(consumer.receiveNoWait());
-        session.commit();
+        commitTx();
 
         Message inbound[] = new Message[messages.size()];
         messages.toArray(inbound);
@@ -383,13 +454,15 @@
         Message[] outbound = new Message[] {session.createTextMessage("First Message"), session.createTextMessage("Second
Message"), session.createTextMessage("Third Message"),
                                             session.createTextMessage("Fourth Message")};
 
+        beginTx();
         for (int i = 0; i < outbound.length; i++) {
             // sends a message
             producer.send(outbound[i]);
         }
-        session.commit();
+        commitTx();
 
         // receives the first message
+        beginTx();
         for (int i = 0; i < outbound.length; i++) {
             LOG.info("About to consume message 1");
             Message message = consumer.receive(1000);
@@ -398,7 +471,7 @@
         }
 
         // validates that the rollbacked was not consumed
-        session.commit();
+        commitTx();
     }
 
     /**
@@ -446,33 +519,37 @@
         TextMessage[] outbound = new TextMessage[] {session.createTextMessage("First Message"),
session.createTextMessage("Second Message")};
 
         // lets consume any outstanding messages from prev test runs
+        beginTx();
         while (consumer.receiveNoWait() != null) {
         }
 
-        session.commit();
+        commitTx();
 
         // sends the messages
+        beginTx();
         producer.send(outbound[0]);
         producer.send(outbound[1]);
-        session.commit();
+        commitTx();
         LOG.info("Sent 0: " + outbound[0]);
         LOG.info("Sent 1: " + outbound[1]);
 
+        beginTx();
         TextMessage message = (TextMessage)consumer.receive(1000);
         assertEquals(outbound[0].getText(), message.getText());
         // Close the consumer before the commit. This should not cause the
         // received message
         // to rollback.
         consumer.close();
-        session.commit();
+        commitTx();
 
         // Create a new consumer
         consumer = resourceProvider.createConsumer(session, destination);
         LOG.info("Created consumer: " + consumer);
 
+        beginTx();
         message = (TextMessage)consumer.receive(1000);
         assertEquals(outbound[1].getText(), message.getText());
-        session.commit();
+        commitTx();
     }
 
     public void testChangeMutableObjectInObjectMessageThenRollback() throws Exception {
@@ -481,10 +558,12 @@
         Message outbound = session.createObjectMessage(list);
         outbound.setStringProperty("foo", "abc");
 
+        beginTx();
         producer.send(outbound);
-        session.commit();
+        commitTx();
 
         LOG.info("About to consume message 1");
+        beginTx();
         Message message = consumer.receive(5000);
 
         List<String> body = assertReceivedObjectMessageWithListBody(message);
@@ -498,12 +577,13 @@
         }
         body.clear();
         body.add("This should never be seen!");
-        session.rollback();
+        rollbackTx();
 
+        beginTx();
         message = consumer.receive(5000);
         List<String> secondBody = assertReceivedObjectMessageWithListBody(message);
         assertNotSame("Second call should return a different body", secondBody, body);
-        session.commit();
+        commitTx();
     }
 
     @SuppressWarnings("unchecked")
@@ -526,7 +606,7 @@
      * 
      * @throws JMSException
      */
-    protected void reconnect() throws JMSException {
+    protected void reconnect() throws Exception {
 
         if (connection != null) {
             // Close the prev connection.
@@ -558,19 +638,24 @@
      * Sets the prefeftch policy to one.
      */
     protected void setPrefetchToOne() {
-        ActiveMQPrefetchPolicy prefetchPolicy = ((ActiveMQConnection)connection).getPrefetchPolicy();
+        ActiveMQPrefetchPolicy prefetchPolicy = getPrefetchPolicy();
         prefetchPolicy.setQueuePrefetch(1);
         prefetchPolicy.setTopicPrefetch(1);
         prefetchPolicy.setDurableTopicPrefetch(1);
         prefetchPolicy.setOptimizeDurableTopicPrefetch(1);
     }
 
+    protected ActiveMQPrefetchPolicy getPrefetchPolicy() {
+        return ((ActiveMQConnection)connection).getPrefetchPolicy();
+    }
+
+    //This test won't work with xa tx so no beginTx() has been added.
     public void testMessageListener() throws Exception {
         // send messages
         for (int i = 0; i < MESSAGE_COUNT; i++) {
             producer.send(session.createTextMessage(MESSAGE_TEXT + i));
         }
-        session.commit();
+        commitTx();
         consumer.setMessageListener(this);
         // wait receive
         waitReceiveUnack();
@@ -589,7 +674,7 @@
             unackMessages.add(message);
             if (unackMessages.size() == MESSAGE_COUNT) {
                 try {
-                    session.rollback();
+                    rollbackTx();
                     resendPhase = true;
                 } catch (Exception e) {
                     e.printStackTrace();
@@ -599,7 +684,7 @@
             ackMessages.add(message);
             if (ackMessages.size() == MESSAGE_COUNT) {
                 try {
-                    session.commit();
+                    commitTx();
                 } catch (Exception e) {
                     e.printStackTrace();
                 }

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/test/JmsResourceProvider.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/test/JmsResourceProvider.java?rev=735912&r1=735911&r2=735912&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/test/JmsResourceProvider.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/test/JmsResourceProvider.java
Mon Jan 19 18:06:37 2009
@@ -197,6 +197,9 @@
      */
     public void setTransacted(boolean transacted) {
         this.transacted = transacted;
+        if (transacted) {
+            setAckMode(Session.SESSION_TRANSACTED);
+        }
     }
 
     /**

Added: activemq/trunk/activemq-ra/src/test/java/org/apache/activemq/ra/JmsXAQueueTransactionTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-ra/src/test/java/org/apache/activemq/ra/JmsXAQueueTransactionTest.java?rev=735912&view=auto
==============================================================================
--- activemq/trunk/activemq-ra/src/test/java/org/apache/activemq/ra/JmsXAQueueTransactionTest.java
(added)
+++ activemq/trunk/activemq-ra/src/test/java/org/apache/activemq/ra/JmsXAQueueTransactionTest.java
Mon Jan 19 18:06:37 2009
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+package org.apache.activemq.ra;
+
+import java.io.IOException;
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+
+import javax.jms.ConnectionFactory;
+import javax.jms.JMSException;
+import javax.jms.Session;
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+import javax.resource.spi.ManagedConnection;
+import javax.resource.ResourceException;
+
+import org.apache.activemq.*;
+import org.apache.activemq.ActiveMQConnectionFactory;
+
+/**
+ * @version $Rev:$ $Date:$
+ */
+public class JmsXAQueueTransactionTest extends JmsQueueTransactionTest {
+    private static final String DEFAULT_HOST = "vm://localhost";
+
+    private ConnectionManagerAdapter connectionManager = new ConnectionManagerAdapter();
+    private ActiveMQManagedConnectionFactory managedConnectionFactory;
+    private XAResource xaResource;
+    private static long txGenerator;
+    private Xid xid;
+
+
+    @Override
+    protected void setSessionTransacted() {
+        resourceProvider.setTransacted(false);
+        resourceProvider.setAckMode(Session.AUTO_ACKNOWLEDGE);
+    }
+
+    @Override
+    protected ConnectionFactory newConnectionFactory() throws Exception {
+        managedConnectionFactory = new ActiveMQManagedConnectionFactory();
+        managedConnectionFactory.setServerUrl(DEFAULT_HOST);
+        managedConnectionFactory.setUserName(org.apache.activemq.ActiveMQConnectionFactory.DEFAULT_USER);
+        managedConnectionFactory.setPassword(ActiveMQConnectionFactory.DEFAULT_PASSWORD);
+
+        return (ConnectionFactory)managedConnectionFactory.createConnectionFactory(connectionManager);
+    }
+
+
+    /**
+     * Recreates the connection.
+     *
+     * @throws javax.jms.JMSException
+     */
+    @Override
+    protected void reconnect() throws Exception {
+        super.reconnect();
+        ManagedConnectionProxy proxy = (ManagedConnectionProxy) connection;
+        ManagedConnection mc = proxy.getManagedConnection();
+        xaResource = mc.getXAResource();
+    }
+
+    @Override
+    protected ActiveMQPrefetchPolicy getPrefetchPolicy() {
+        ManagedConnectionProxy proxy = (ManagedConnectionProxy) connection;
+        ActiveMQManagedConnection mc = proxy.getManagedConnection();
+        ActiveMQConnection conn = (ActiveMQConnection) mc.getPhysicalConnection();
+        return conn.getPrefetchPolicy();
+    }
+
+    @Override
+    protected void beginTx() throws Exception {
+        xid = createXid();
+        xaResource.start(xid, XAResource.TMNOFLAGS);
+    }
+
+    @Override
+    protected void commitTx() throws Exception {
+        xaResource.end(xid, XAResource.TMSUCCESS);
+        int result = xaResource.prepare(xid);
+        if (result == XAResource.XA_OK) {
+            xaResource.commit(xid, false);
+        }
+        xid = null;
+    }
+
+    @Override
+    protected void rollbackTx() throws Exception {
+        xaResource.end(xid, XAResource.TMSUCCESS);
+        xaResource.rollback(xid);
+        xid = null;
+    }
+
+    //This test won't work with xa tx it is overridden to do nothing here
+    @Override
+    public void testMessageListener() throws Exception {
+    }
+
+    /**
+     * Sends a batch of messages and validates that the message sent before
+     * session close is not consumed.
+     * <p/>
+     * This test only works with local transactions, not xa. so its commented out here
+     *
+     * @throws Exception
+     */
+    @Override
+    public void testSendSessionClose() throws Exception {
+    }
+
+    public Xid createXid() throws IOException {
+
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        DataOutputStream os = new DataOutputStream(baos);
+        os.writeLong(++txGenerator);
+        os.close();
+        final byte[] bs = baos.toByteArray();
+
+        return new Xid() {
+            public int getFormatId() {
+                return 86;
+            }
+
+            public byte[] getGlobalTransactionId() {
+                return bs;
+            }
+
+            public byte[] getBranchQualifier() {
+                return bs;
+            }
+        };
+
+    }
+
+}

Propchange: activemq/trunk/activemq-ra/src/test/java/org/apache/activemq/ra/JmsXAQueueTransactionTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-ra/src/test/java/org/apache/activemq/ra/JmsXAQueueTransactionTest.java
------------------------------------------------------------------------------
    svn:keywords = Date Revision

Propchange: activemq/trunk/activemq-ra/src/test/java/org/apache/activemq/ra/JmsXAQueueTransactionTest.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain



Mime
View raw message