Return-Path: Delivered-To: apmail-geronimo-activemq-users-archive@www.apache.org Received: (qmail 1917 invoked from network); 2 Jun 2006 16:14:31 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (209.237.227.199) by minotaur.apache.org with SMTP; 2 Jun 2006 16:14:31 -0000 Received: (qmail 18164 invoked by uid 500); 2 Jun 2006 16:14:29 -0000 Delivered-To: apmail-geronimo-activemq-users-archive@geronimo.apache.org Received: (qmail 18071 invoked by uid 500); 2 Jun 2006 16:14:29 -0000 Mailing-List: contact activemq-users-help@geronimo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: activemq-users@geronimo.apache.org Delivered-To: mailing list activemq-users@geronimo.apache.org Received: (qmail 17992 invoked by uid 99); 2 Jun 2006 16:14:29 -0000 Received: from asf.osuosl.org (HELO asf.osuosl.org) (140.211.166.49) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 02 Jun 2006 09:14:29 -0700 X-ASF-Spam-Status: No, hits=-0.0 required=10.0 tests=SPF_HELO_PASS,SPF_PASS X-Spam-Check-By: apache.org Received-SPF: pass (asf.osuosl.org: domain of lists@nabble.com designates 72.21.53.35 as permitted sender) Received: from [72.21.53.35] (HELO talk.nabble.com) (72.21.53.35) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 02 Jun 2006 09:14:26 -0700 Received: from localhost ([127.0.0.1] helo=talk.nabble.com) by talk.nabble.com with esmtp (Exim 4.50) id 1FmCHm-0001Wx-5O for activemq-users@geronimo.apache.org; Fri, 02 Jun 2006 09:14:06 -0700 Message-ID: <4682596.post@talk.nabble.com> Date: Fri, 2 Jun 2006 09:14:06 -0700 (PDT) From: mbmerr To: activemq-users@geronimo.apache.org Subject: Multiple sessions on single connection not supported? MIME-Version: 1.0 Content-Type: text/plain; charset=us-ascii Content-Transfer-Encoding: 7bit X-Nabble-Sender: mbm@nmi.net X-Nabble-From: mbmerr X-Virus-Checked: Checked by ClamAV on apache.org X-Spam-Rating: minotaur.apache.org 1.6.2 0/1000/N I'm trying to create multiple sessions on a single connection, and perform distinct .commit() commands on them. I'm not getting the results I expect. Basically, I'd like to have the following messages come in to the queue: Message 1 Message 2 Message 3 I want to commit messages 1 and 3, but not 2. I do this by opening distinct sessions for each message. When I close the connection, Message 2 should be rolled back, since it was never committed. But I don't get those results. I've tested this with ActiveMQ 4.0 and SNAPSHOT as of 2006-06-02. And I get different results depending on whether I read the messages with .receiveNoWait() vs. .receive(), and which version of ActiveMQ I'm using. Any thoughts? The sample code gives the expected results under JORAM 4.3.18. Thanks for your time. EXPECTED RESULTS [java] Opening connection for writes [java] Sending message 1 [java] Sending message 2 [java] Sending message 3 [java] Closing connection for writes [java] Opening connection for reads [java] Opening session for read [java] Polling for message... [java] Received message with text [Message 1] [java] Committing session for message [Message 1] [java] Opening session for read [java] Polling for message... [java] Received message with text [Message 2] [java] Performing no action against session for message [Message 2] [java] Opening session for read [java] Polling for message... [java] Received message with text [Message 3] [java] Committing session for message [Message 3] [java] Closing connection for reads - should force rollback of any uncommitted session [java] Sends and receives complete - now emptying queue of remaining messages [java] Opening connection for read [java] Looking for lingering message... [java] Found message on queue [Message 2] [java] Looking for lingering message... [java] No more lingering messages [java] Closing connection for read [java] All work completed ACTUAL RESULTS (4.0, using .receiveNoWait()) [java] Opening connection for writes [java] Sending message 1 [java] Sending message 2 [java] Sending message 3 [java] Closing connection for writes [java] Opening connection for reads [java] Opening session for read [java] Polling for message... [java] Received message with text [Message 1] [java] Committing session for message [Message 1] [java] Opening session for read [java] Polling for message... [java] Opening session for read [java] Polling for message... [java] Closing connection for reads - should force rollback of any uncommitted session [java] Sends and receives complete - now emptying queue of remaining messages [java] Opening connection for read [java] Looking for lingering message... [java] Found message on queue [Message 2] [java] Looking for lingering message... [java] Found message on queue [Message 3] [java] Looking for lingering message... [java] No more lingering messages [java] Closing connection for read [java] All work completed ACTUAL RESULTS (SNAPSHOT, using .receiveNoWait()) [java] Opening connection for writes [java] Sending message 1 [java] Sending message 2 [java] Sending message 3 [java] Closing connection for writes [java] Opening connection for reads [java] Opening session for read [java] Polling for message... [java] Received message with text [Message 1] [java] Committing session for message [Message 1] [java] Opening session for read [java] Polling for message... [java] Opening session for read [java] Polling for message... [java] Closing connection for reads - should force rollback of any uncommitted session [java] Sends and receives complete - now emptying queue of remaining messages [java] Opening connection for read [java] Looking for lingering message... [java] No more lingering messages [java] Closing connection for read [java] All work completed ACTUAL RESULTS (4.0 and SNAPSHOT, using .receive()) [java] Opening connection for writes [java] Sending message 1 [java] Sending message 2 [java] Sending message 3 [java] Closing connection for writes [java] Opening connection for reads [java] Opening session for read [java] Polling for message... [java] Received message with text [Message 1] [java] Committing session for message [Message 1] [java] Opening session for read [java] Polling for message... [code hangs here] TEST CODE import javax.jms.*; import javax.naming.Context; import javax.naming.InitialContext; import javax.naming.NamingException; import java.util.Hashtable; public class MultipleSessionTest { private static final String DESTNAME = "MULTISESSIONQUEUETEST"; private static final int TOTAL_MESSAGES = 3; private static final int[] MSGS_TO_COMMIT = new int[]{ 1, 3 }; private Context context; private ConnectionFactory factory; public static void main(String[] args) throws JMSException, NamingException { MultipleSessionTest mst = new MultipleSessionTest(); // stock the queue with 3 messages mst.sendMessages(TOTAL_MESSAGES); // process the messages, doing a session.commit() on just messages 1 and 3 mst.receiveMessages(TOTAL_MESSAGES, MSGS_TO_COMMIT); // destination should now contain just those messages that were not explicitly committed - list them mst.emptyDestination(); System.out.println("All work completed"); } public MultipleSessionTest() throws NamingException { Hashtable properties = new Hashtable(); // ActiveMQ properties.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.activemq.jndi.ActiveMQInitialContextFactory"); properties.put(Context.PROVIDER_URL, "tcp://localhost:61616"); // OpenJMS // properties.put(Context.INITIAL_CONTEXT_FACTORY, "org.exolab.jms.jndi.InitialContextFactory"); // properties.put(Context.PROVIDER_URL, "tcp://localhost:3035/"); // JORAM // properties.put(Context.INITIAL_CONTEXT_FACTORY, "fr.dyade.aaa.jndi2.client.NamingContextFactory"); // properties.put(fr.dyade.aaa.jndi2.client.NamingContextFactory.JAVA_HOST_PROPERTY, "localhost"); // properties.put(fr.dyade.aaa.jndi2.client.NamingContextFactory.JAVA_PORT_PROPERTY, "16400"); context = new InitialContext(properties); // ActiveMQ factory = (ConnectionFactory) context.lookup("ConnectionFactory"); // OpenJMS // factory = (ConnectionFactory) context.lookup("ConnectionFactory"); // JORAM // factory = (ConnectionFactory) context.lookup("qcf"); } private void sendMessages(int totalMessages) throws JMSException, NamingException { // open connection to JMS provider System.out.println("Opening connection for writes"); Connection connection = factory.createConnection(); // open session on connection (true for transacted session) Session session = connection.createSession(true, 0); // create a destination Destination destination = getDestination(session); // create a producer on the session for the destination MessageProducer producer = session.createProducer(destination); producer.setDeliveryMode(DeliveryMode.PERSISTENT); // send messages via the producer to the destination for (int i = 1; i <= totalMessages; i++) { sendMessage(session, producer, i); } // close everything (closing connection handles other closes) System.out.println("Closing connection for writes"); connection.close(); } private void sendMessage(Session session, MessageProducer producer, int num) throws JMSException { System.out.println("Sending message " + num); TextMessage message = session.createTextMessage("Message " + num); producer.send(message); session.commit(); } private void receiveMessages(int totalMessages, int[] commitList) throws JMSException, NamingException { // open connection to JMS server Connection connection = factory.createConnection(); System.out.println("Opening connection for reads"); // receive messages, committing them if requested for (int i = 1; i <= totalMessages; i++) { receiveMessage(connection, commitList); } // close the connection - should force rollback for any messages not on commitList System.out.println("Closing connection for reads - should force rollback of any uncommitted session"); connection.close(); } private void receiveMessage(Connection connection, int[] commitList) throws JMSException, NamingException { // open session on connection (true for transacted session) System.out.println("Opening session for read"); Session session = connection.createSession(true, 0); // create a destination Destination destination = getDestination(session); // create a consumer on the session for the destination MessageConsumer consumer = session.createConsumer(destination); // start the connection (allows message delivery to occur) connection.start(); System.out.println("Polling for message..."); // NOTE: different behavior when .receive() vs. .receiveNoWait()? Message message = consumer.receiveNoWait(); if (message != null) { if (message instanceof TextMessage) { TextMessage textMessage = (TextMessage) message; String s = textMessage.getText(); System.out.println("Received message with text [" + s + "]"); boolean committed = false; for (int i = 0; i < commitList.length; i++) { if (s.indexOf(commitList[i] + "") != -1) { System.out.println("Committing session for message [" + s + "]"); session.commit(); committed = true; break; } } if (!committed) { System.out.println("Performing no action against session for message [" + s + "]"); } } else { System.err.println("Received message of unexpected type [" + message + "]"); } } // note we never close sessions } private void emptyDestination() throws JMSException, NamingException { System.out.println("Sends and receives complete - now emptying queue of remaining messages"); // open connection to JMS server System.out.println("Opening connection for read"); Connection connection = factory.createConnection(); // open session on connection (true for transacted session) Session session = connection.createSession(true, 0); // create a destination Destination destination = getDestination(session); // create a consumer on the session for the destination MessageConsumer consumer = session.createConsumer(destination); // start the connection (allows message delivery to occur) connection.start(); while (true) { System.out.println("Looking for lingering message..."); Message message = consumer.receiveNoWait(); if (message != null) { if (message instanceof TextMessage) { System.out.println("Found message on queue [" + ((TextMessage) message).getText() + "]"); } else { System.out.println("Found message on server of unexpected type [" + message + "]"); } } else { System.out.println("No more lingering messages"); break; } } System.out.println("Closing connection for read"); session.commit(); connection.close(); } private Destination getDestination(Session session) throws NamingException, JMSException { // ActiveMQ return session.createQueue(DESTNAME); // OpenJMS // return session.createQueue(DESTNAME); // JORAM // return (Queue) context.lookup("queue"); } } -- View this message in context: http://www.nabble.com/Multiple-sessions-on-single-connection-not-supported--t1723640.html#a4682596 Sent from the ActiveMQ - User forum at Nabble.com.