activemq-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "jai.mathaiyan" <jaiganes...@gmail.com>
Subject Subscriber throws errors and dies when using multiple openwire JMS client
Date Mon, 09 May 2011 17:59:40 GMT
Hi, 

 I have been playing around with activemq for some time. Now I am facing a
strange issue. I am using 5.3.1 version. 
I have a broker running and a producer within the same JVM. If I have one
JMS client (openwire) , everything works fine. 

Whenever I launch multiple instances of JMS client (as separate application
from eclipse) subscribing to the same topic, I see the following exceptions
on all the subscribers at various instances.

I see these errors in the client's onException  method. The broker and
producer continue to run normally. There is no errors seen on the broker
side. 
Pls let me know if anyone has faced a similar problem or how to go about
debugging it ? 

The only customizations on the broker is adding Authorization Plugin. I
researched a bit on the error and the following post that says that the
problem is fixed in 5.1 version.
https://issues.apache.org/jira/browse/AMQ-1169?page=com.atlassian.jira.plugin.system.issuetabpanels%3Aall-tabpanel#issue-tabs

javax.jms.JMSException: Unexpected error occured 
        at
org.apache.activemq.util.JMSExceptionSupport.create(JMSExceptionSupport.java:49) 
        at
org.apache.activemq.ActiveMQConnection.onAsyncException(ActiveMQConnection.java:1803) 
        at
org.apache.activemq.ActiveMQConnection.onException(ActiveMQConnection.java:1820) 
        at
org.apache.activemq.transport.TransportFilter.onException(TransportFilter.java:99) 
        at
org.apache.activemq.transport.ResponseCorrelator.onException(ResponseCorrelator.java:126)

        at
org.apache.activemq.transport.TransportFilter.onException(TransportFilter.java:99) 
        at
org.apache.activemq.transport.TransportFilter.onException(TransportFilter.java:99) 
        at
org.apache.activemq.transport.WireFormatNegotiator.onException(WireFormatNegotiator.java:160)

        at
org.apache.activemq.transport.InactivityMonitor.onException(InactivityMonitor.java:254) 
        at
org.apache.activemq.transport.TransportSupport.onException(TransportSupport.java:97) 
        at
org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:195) 
        at java.lang.Thread.run(Thread.java:736) 
Caused by: java.io.IOException: Unexpected error occured 
        at
org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:193) 
        ... 1 more 
Caused by: java.lang.ClassCastException:
org.apache.activemq.command.BrokerId in 
compatible with org.apache.activemq.command.ConsumerId 
        at
org.apache.activemq.openwire.v5.MessageMarshaller.tightUnmarshal(MessageMarshaller.java:75)

        at
org.apache.activemq.openwire.v5.ActiveMQMessageMarshaller.tightUnmarshal(ActiveMQMessageMarshaller.java:66)

        at
org.apache.activemq.openwire.v5.ActiveMQTextMessageMarshaller.tightUnmarshal(ActiveMQTextMessageMarshaller.java:66)

        at
org.apache.activemq.openwire.OpenWireFormat.tightUnmarshalNestedObject(OpenWireFormat.java:453)

        at
org.apache.activemq.openwire.v5.BaseDataStreamMarshaller.tightUnmarsalNestedObject(BaseDataStreamMarshaller.java:126)

        at
org.apache.activemq.openwire.v5.MessageDispatchMarshaller.tightUnmarshal(MessageDispatchMarshaller.java:71)

        at
org.apache.activemq.openwire.OpenWireFormat.doUnmarshal(OpenWireFormat.java:362) 
        at
org.apache.activemq.openwire.OpenWireFormat.unmarshal(OpenWireFormat.java:276) 
        at
org.apache.activemq.transport.tcp.TcpTransport.readCommand(TcpTransport.java:211) 
        at
org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:203) 
        at
org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:186) 


JMS Client code--

package com.cisco.psbu.vs.ism.test.client.events;

import java.io.ByteArrayOutputStream;
import java.io.ObjectOutputStream;

import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQTextMessage;

public class JMSClient implements ExceptionListener{

	private final String DEFAULT_USER = "jai";
	private final String DEFAULT_PASSWORD = "jai";
	private final String DEFAULT_BROKER_URL = "tcp://localhost:61616";
	private ActiveMQConnectionFactory connectionFactory;
	private Connection connection;
	private Session session;
	private Destination destination;
	private boolean transacted = false;
	private boolean isQueue = false;
	private String destinationName;
	private MessageConsumer consumer ;
	private MessageProducer producer;
	private String hostname = "localhost";
	private static int count = 0;
	
	public JMSClient(boolean isQ,String destination){
		this(isQ, destination, "localhost");
	}
	
	public JMSClient(boolean isQ,String destination, String hostname){
		try{
			this.isQueue = isQ;
			this.destinationName = destination;
			this.hostname = hostname;
			print("begin");
			setUp();
			print("setup complete");

			//print("message Size in bytes: " + getMessageSize());
		}catch (Exception e) {
			e.printStackTrace();
		}
	}

	private void setUp() throws JMSException, InterruptedException {
		connectionFactory = new ActiveMQConnectionFactory(
				DEFAULT_USER,
				DEFAULT_PASSWORD,
				"tcp://"+ hostname + ":61616");
		connection =
connectionFactory.createConnection("abc","a5405e08-701f-4631-9b69-2476cc49a87b");
		connection.setExceptionListener(this);
		connection.start();
		
		session = connection.createSession(transacted, Session.AUTO_ACKNOWLEDGE);
		//destination = session.createTopic("c.c.p.v.ism.>");
		if(isQueue){
			destination = session.createQueue(destinationName);
		}else{
			destination = session.createTopic(destinationName);
			createConsumerAndReceiveAMessage();
			print("create topic complete. Waiting for messages...");
		}
		createProducer();
	}

	private void createConsumerAndReceiveAMessage() throws JMSException,
InterruptedException {		
		consumer = session.createConsumer(destination);
		MyConsumer myConsumer = new MyConsumer();
		connection.setExceptionListener(myConsumer);
		consumer.setMessageListener(myConsumer);
	}

	private void createProducer() throws JMSException{
		producer = session.createProducer(destination);
 	}

	public void sendMessage(TextMessage message, int messageCount, long
sleepTime){
		try{
			sendLoop(message, messageCount, sleepTime);
		}catch (Exception e) {
			e.printStackTrace();
		}
	}


	protected void sendLoop(TextMessage message, int messageCount, long
sleepTime) 
	throws Exception {

		for (int i = 0; i < messageCount || messageCount == 0; i++) {

			String msg = message.getText();
			if (msg.length() > 50) {
				msg = msg.substring(0, 50) + "...";
			}
			System.out.println("Sending message: " + msg);
			producer.send(message);
			Thread.sleep(sleepTime);
		}

	}

	public void tearDown(){
		try {
				connection.close();
		} catch (Throwable ignore) {
			ignore.printStackTrace();
		}
	}

	private class MyConsumer implements MessageListener, ExceptionListener {

		synchronized public void onException(JMSException ex) {
			print("JMS Exception occured.  Shutting down client.");
			ex.printStackTrace();
			System.exit(1);
			
		}

		public void onMessage(Message message) {
			if (message instanceof TextMessage) {
				count ++;
				TextMessage textMessage = (TextMessage) message;

				System.out.println(" total message size in bytes :" + totalSize);
				try{
					print("Received message: " + textMessage);
				}catch(Exception ex){
					print("Received message: " + textMessage);	
				}
				//textMessage.getText();
				//textMessage.getStringProperty("msgOpCode");

			} else  {
				print("Received: " + message);
			}
		}
	}

	public TextMessage getTextMessage(String payload) throws JMSException{
		return session.createTextMessage(payload);

	}
	
	
	public void print(Object text){
		System.out.println(text.toString());
	}


	public static void main(String[] args) throws Exception {
		JMSClient client = new JMSClient(false,"c.c.p.v.ism.device");
		//TextMessage message = client.getTextMessage("This is a test");
		//client.sendMessage(message,1,0);
		//Thread.sleep(100000);
		//client.tearDown();
	}

	@Override
	public void onException(JMSException exception) {
		System.out.println("Exception detected..");
		exception.printStackTrace();
		
	}


}
 

--
View this message in context: http://activemq.2283324.n4.nabble.com/Subscriber-throws-errors-and-dies-when-using-multiple-openwire-JMS-client-tp3509914p3509914.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.

Mime
View raw message