activemq-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Neo Wang <wangdong...@hotmail.com>
Subject How to send one message to a topic in MDB?
Date Mon, 07 Sep 2009 07:08:15 GMT

Environment: JBoss 5.1GA, Active MQ 5.2 embeded broker configuration

I want to send a message to a topic when receiving one message in a MDB, but
it can't be successful all the time, any one can help me? I have struggled
in it about 2 weeks.

It is my sample code:

package com.trading.platform.ejb;

import javax.ejb.MessageDrivenBean;
import javax.ejb.MessageDrivenContext;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Queue;
import javax.jms.QueueConnectionFactory;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;

import com.trading.platform.context.ServerContext;
import com.trading.platform.exception.InvokeException;
import com.trading.platform.exception.SystemException;
import com.trading.platform.handler.MessageHandler;
import com.trading.platform.log.ILogger;
import com.trading.platform.log.Logger;

public class OrderResponseMDB implements MessageDrivenBean, MessageListener{
	private ILogger logger = Logger.getLogger(this.getClass());
	private MessageDrivenContext messageDrivenContext;
	
	private volatile TopicConnectionFactory topicConnnectionFactory;
	private volatile Topic topic;
	
	private void getInitialContext(){
		try {
			Context context = new InitialContext();
			topicConnnectionFactory =
(TopicConnectionFactory)context.lookup("java:comp/env/jms/TopicConnectionFactory");
			//topic = (Topic)context.lookup("java:comp/env/jms/OrderBroadcastTopic");
			if(logger.isDebugEnabled()) logger.debug("Platform:Publisher Topic
Session connnection is created..." );
		} catch (NamingException e) {
			if(logger.isErrorEnabled()) logger.error("NamingExcetion:" + e);
			throw new SystemException("System Error", e);
		}
	}

	 public void onMessage(Message inMessage) {

		 TopicConnection topicConn = null;
	        try {
	            if (inMessage instanceof TextMessage) {
	                TextMessage txtmsg = (TextMessage)inMessage;       
	               
	                System.out.println("MESSAGE BEAN3: Message received:
index="+txtmsg.getText());
	                topicConn =
topicConnnectionFactory.createTopicConnection();
	    			topicConn.start();
	    			TopicSession topicSession = topicConn.createTopicSession(false,
TopicSession.AUTO_ACKNOWLEDGE);
	    			topic = topicSession.createTopic("topic.orderbroadcast");
	    			
	    			TopicPublisher publisher = topicSession.createPublisher(topic);
	    			TextMessage newMsg = topicSession.createTextMessage("11111");
	    			publisher.send(newMsg);
	    			
	    			//publisher.close();
	    			//topicSession.close();
	    			topicConn.close();
	            } else {
	                System.out.println("WRONG MESSAGE TYPE received: " +
inMessage.getClass().getName());
	            }
	        } catch (Throwable t) {
	        	System.out.println("error...");
	            t.printStackTrace();
	            messageDrivenContext.setRollbackOnly();
	            
	            try {
	                if (topicConn != null)  topicConn.close();
	                System.out.println("MESSAGE BEAN: Rollback:
index="+((TextMessage)inMessage).getText());
	            } catch (Exception e) {
	                System.out.println("MESSAGE BEAN: Unable to get index
property because:"+e.getMessage());
	                e.printStackTrace();
	                
	            }
	            throw new RuntimeException(t.getMessage());
	        }
	 }

	
	public void ejbCreate(){
		System.out.println("ConnectionFactory created...");
		getInitialContext();
	}
	
	public void ejbRemove(){
		
	}
	
	public void setMessageDrivenContext(MessageDrivenContext
messageDrivenContext){
		this.messageDrivenContext = messageDrivenContext;
	}
}

The following is the exception information:

14:59:09,601 WARN  [Service] Async error occurred:
javax.transaction.xa.XAException: Transaction
'XID:131075:312d613061383764343a313862333a34616134616561303a3662:613061383764343a313862333a34616134616561303a3663'
has not been started.
javax.transaction.xa.XAException: Transaction
'XID:131075:312d613061383764343a313862333a34616134616561303a3662:613061383764343a313862333a34616134616561303a3663'
has not been started.
	at
org.apache.activemq.broker.TransactionBroker.getTransaction(TransactionBroker.java:266)
	at
org.apache.activemq.broker.TransactionBroker.send(TransactionBroker.java:208)
	at org.apache.activemq.broker.BrokerFilter.send(BrokerFilter.java:126)
	at
org.apache.activemq.broker.CompositeDestinationBroker.send(CompositeDestinationBroker.java:95)
	at
org.apache.activemq.broker.MutableBrokerFilter.send(MutableBrokerFilter.java:133)
	at
org.apache.activemq.broker.TransportConnection.processMessage(TransportConnection.java:455)
	at
org.apache.activemq.command.ActiveMQMessage.visit(ActiveMQMessage.java:639)
	at
org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:308)
	at
org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:182)
	at
org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:68)
	at
org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:113)
	at
org.apache.activemq.transport.InactivityMonitor.onCommand(InactivityMonitor.java:210)
	at
org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:84)
	at
org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:203)
	at
org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:185)
	at java.lang.Thread.run(Thread.java:595)
14:59:09,614 WARN  [ActiveMQManagedConnection] Connection failed:
javax.jms.JMSException: Transaction
'XID:131075:312d613061383764343a313862333a34616134616561303a3662:613061383764343a313862333a34616134616561303a3663'
has not been started.
14:59:09,615 WARN  [TxConnectionManager] Connection error occured:
org.jboss.resource.connectionmanager.TxConnectionManager$TxConnectionEventListener@18936ae[state=NORMAL
mc=org.apache.activemq.ra.ActiveMQManagedConnection@1365339 handles=0
lastUse=1252306749595 permit=false trackByTx=false
mcp=org.jboss.resource.connectionmanager.JBossManagedConnectionPool$OnePool@191c3ce
context=org.jboss.resource.connectionmanager.InternalManagedConnectionPool@ce1472
xaResource=org.apache.activemq.ra.ActiveMQManagedConnection$1@164e48d
txSync=null]
javax.jms.JMSException: Transaction
'XID:131075:312d613061383764343a313862333a34616134616561303a3662:613061383764343a313862333a34616134616561303a3663'
has not been started.
	at
org.apache.activemq.util.JMSExceptionSupport.create(JMSExceptionSupport.java:49)
	at
org.apache.activemq.ActiveMQConnection.onAsyncException(ActiveMQConnection.java:1784)
	at
org.apache.activemq.ActiveMQConnection$2$1.run(ActiveMQConnection.java:1705)
	at
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:650)
	at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:675)
	at java.lang.Thread.run(Thread.java:595)
Caused by: javax.transaction.xa.XAException: Transaction
'XID:131075:312d613061383764343a313862333a34616134616561303a3662:613061383764343a313862333a34616134616561303a3663'
has not been started.
	at
org.apache.activemq.broker.TransactionBroker.getTransaction(TransactionBroker.java:266)
	at
org.apache.activemq.broker.TransactionBroker.send(TransactionBroker.java:208)
	at org.apache.activemq.broker.BrokerFilter.send(BrokerFilter.java:126)
	at
org.apache.activemq.broker.CompositeDestinationBroker.send(CompositeDestinationBroker.java:95)
	at
org.apache.activemq.broker.MutableBrokerFilter.send(MutableBrokerFilter.java:133)
	at
org.apache.activemq.broker.TransportConnection.processMessage(TransportConnection.java:455)
	at
org.apache.activemq.command.ActiveMQMessage.visit(ActiveMQMessage.java:639)
	at
org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:308)
	at
org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:182)
	at
org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:68)
	at
org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:113)
	at
org.apache.activemq.transport.InactivityMonitor.onCommand(InactivityMonitor.java:210)
	at
org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:84)
	at
org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:203)
	at
org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:185)
	... 1 more



-- 
View this message in context: http://www.nabble.com/How-to-send-one-message-to-a-topic-in-MDB--tp25325925p25325925.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.


Mime
View raw message