activemq-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Gary Tully <gary.tu...@gmail.com>
Subject Re: Subscriber throws errors and dies when using multiple openwire JMS client
Date Wed, 11 May 2011 17:39:06 GMT
Can you see if you can recreate with the current 5.5 version?

On 9 May 2011 18:59, jai.mathaiyan <jaiganesh.m@gmail.com> wrote:
> Hi,
>
>  I have been playing around with activemq for some time. Now I am facing a
> strange issue. I am using 5.3.1 version.
> I have a broker running and a producer within the same JVM. If I have one
> JMS client (openwire) , everything works fine.
>
> Whenever I launch multiple instances of JMS client (as separate application
> from eclipse) subscribing to the same topic, I see the following exceptions
> on all the subscribers at various instances.
>
> I see these errors in the client's onException  method. The broker and
> producer continue to run normally. There is no errors seen on the broker
> side.
> Pls let me know if anyone has faced a similar problem or how to go about
> debugging it ?
>
> The only customizations on the broker is adding Authorization Plugin. I
> researched a bit on the error and the following post that says that the
> problem is fixed in 5.1 version.
> https://issues.apache.org/jira/browse/AMQ-1169?page=com.atlassian.jira.plugin.system.issuetabpanels%3Aall-tabpanel#issue-tabs
>
> javax.jms.JMSException: Unexpected error occured
>         at
> org.apache.activemq.util.JMSExceptionSupport.create(JMSExceptionSupport.java:49)
>         at
> org.apache.activemq.ActiveMQConnection.onAsyncException(ActiveMQConnection.java:1803)
>         at
> org.apache.activemq.ActiveMQConnection.onException(ActiveMQConnection.java:1820)
>         at
> org.apache.activemq.transport.TransportFilter.onException(TransportFilter.java:99)
>         at
> org.apache.activemq.transport.ResponseCorrelator.onException(ResponseCorrelator.java:126)
>         at
> org.apache.activemq.transport.TransportFilter.onException(TransportFilter.java:99)
>         at
> org.apache.activemq.transport.TransportFilter.onException(TransportFilter.java:99)
>         at
> org.apache.activemq.transport.WireFormatNegotiator.onException(WireFormatNegotiator.java:160)
>         at
> org.apache.activemq.transport.InactivityMonitor.onException(InactivityMonitor.java:254)
>         at
> org.apache.activemq.transport.TransportSupport.onException(TransportSupport.java:97)
>         at
> org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:195)
>         at java.lang.Thread.run(Thread.java:736)
> Caused by: java.io.IOException: Unexpected error occured
>         at
> org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:193)
>         ... 1 more
> Caused by: java.lang.ClassCastException:
> org.apache.activemq.command.BrokerId in
> compatible with org.apache.activemq.command.ConsumerId
>         at
> org.apache.activemq.openwire.v5.MessageMarshaller.tightUnmarshal(MessageMarshaller.java:75)
>         at
> org.apache.activemq.openwire.v5.ActiveMQMessageMarshaller.tightUnmarshal(ActiveMQMessageMarshaller.java:66)
>         at
> org.apache.activemq.openwire.v5.ActiveMQTextMessageMarshaller.tightUnmarshal(ActiveMQTextMessageMarshaller.java:66)
>         at
> org.apache.activemq.openwire.OpenWireFormat.tightUnmarshalNestedObject(OpenWireFormat.java:453)
>         at
> org.apache.activemq.openwire.v5.BaseDataStreamMarshaller.tightUnmarsalNestedObject(BaseDataStreamMarshaller.java:126)
>         at
> org.apache.activemq.openwire.v5.MessageDispatchMarshaller.tightUnmarshal(MessageDispatchMarshaller.java:71)
>         at
> org.apache.activemq.openwire.OpenWireFormat.doUnmarshal(OpenWireFormat.java:362)
>         at
> org.apache.activemq.openwire.OpenWireFormat.unmarshal(OpenWireFormat.java:276)
>         at
> org.apache.activemq.transport.tcp.TcpTransport.readCommand(TcpTransport.java:211)
>         at
> org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:203)
>         at
> org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:186)
>
>
> JMS Client code--
>
> package com.cisco.psbu.vs.ism.test.client.events;
>
> import java.io.ByteArrayOutputStream;
> import java.io.ObjectOutputStream;
>
> import javax.jms.Connection;
> import javax.jms.Destination;
> import javax.jms.ExceptionListener;
> import javax.jms.JMSException;
> import javax.jms.Message;
> import javax.jms.MessageConsumer;
> import javax.jms.MessageListener;
> import javax.jms.MessageProducer;
> import javax.jms.Session;
> import javax.jms.TextMessage;
>
> import org.apache.activemq.ActiveMQConnectionFactory;
> import org.apache.activemq.command.ActiveMQTextMessage;
>
> public class JMSClient implements ExceptionListener{
>
>        private final String DEFAULT_USER = "jai";
>        private final String DEFAULT_PASSWORD = "jai";
>        private final String DEFAULT_BROKER_URL = "tcp://localhost:61616";
>        private ActiveMQConnectionFactory connectionFactory;
>        private Connection connection;
>        private Session session;
>        private Destination destination;
>        private boolean transacted = false;
>        private boolean isQueue = false;
>        private String destinationName;
>        private MessageConsumer consumer ;
>        private MessageProducer producer;
>        private String hostname = "localhost";
>        private static int count = 0;
>
>        public JMSClient(boolean isQ,String destination){
>                this(isQ, destination, "localhost");
>        }
>
>        public JMSClient(boolean isQ,String destination, String hostname){
>                try{
>                        this.isQueue = isQ;
>                        this.destinationName = destination;
>                        this.hostname = hostname;
>                        print("begin");
>                        setUp();
>                        print("setup complete");
>
>                        //print("message Size in bytes: " + getMessageSize());
>                }catch (Exception e) {
>                        e.printStackTrace();
>                }
>        }
>
>        private void setUp() throws JMSException, InterruptedException {
>                connectionFactory = new ActiveMQConnectionFactory(
>                                DEFAULT_USER,
>                                DEFAULT_PASSWORD,
>                                "tcp://"+ hostname + ":61616");
>                connection =
> connectionFactory.createConnection("abc","a5405e08-701f-4631-9b69-2476cc49a87b");
>                connection.setExceptionListener(this);
>                connection.start();
>
>                session = connection.createSession(transacted, Session.AUTO_ACKNOWLEDGE);
>                //destination = session.createTopic("c.c.p.v.ism.>");
>                if(isQueue){
>                        destination = session.createQueue(destinationName);
>                }else{
>                        destination = session.createTopic(destinationName);
>                        createConsumerAndReceiveAMessage();
>                        print("create topic complete. Waiting for messages...");
>                }
>                createProducer();
>        }
>
>        private void createConsumerAndReceiveAMessage() throws JMSException,
> InterruptedException {
>                consumer = session.createConsumer(destination);
>                MyConsumer myConsumer = new MyConsumer();
>                connection.setExceptionListener(myConsumer);
>                consumer.setMessageListener(myConsumer);
>        }
>
>        private void createProducer() throws JMSException{
>                producer = session.createProducer(destination);
>        }
>
>        public void sendMessage(TextMessage message, int messageCount, long
> sleepTime){
>                try{
>                        sendLoop(message, messageCount, sleepTime);
>                }catch (Exception e) {
>                        e.printStackTrace();
>                }
>        }
>
>
>        protected void sendLoop(TextMessage message, int messageCount, long
> sleepTime)
>        throws Exception {
>
>                for (int i = 0; i < messageCount || messageCount == 0; i++)
{
>
>                        String msg = message.getText();
>                        if (msg.length() > 50) {
>                                msg = msg.substring(0, 50) + "...";
>                        }
>                        System.out.println("Sending message: " + msg);
>                        producer.send(message);
>                        Thread.sleep(sleepTime);
>                }
>
>        }
>
>        public void tearDown(){
>                try {
>                                connection.close();
>                } catch (Throwable ignore) {
>                        ignore.printStackTrace();
>                }
>        }
>
>        private class MyConsumer implements MessageListener, ExceptionListener {
>
>                synchronized public void onException(JMSException ex) {
>                        print("JMS Exception occured.  Shutting down client.");
>                        ex.printStackTrace();
>                        System.exit(1);
>
>                }
>
>                public void onMessage(Message message) {
>                        if (message instanceof TextMessage) {
>                                count ++;
>                                TextMessage textMessage = (TextMessage)
message;
>
>                                System.out.println(" total message size
in bytes :" + totalSize);
>                                try{
>                                        print("Received message: "
+ textMessage);
>                                }catch(Exception ex){
>                                        print("Received message: "
+ textMessage);
>                                }
>                                //textMessage.getText();
>                                //textMessage.getStringProperty("msgOpCode");
>
>                        } else  {
>                                print("Received: " + message);
>                        }
>                }
>        }
>
>        public TextMessage getTextMessage(String payload) throws JMSException{
>                return session.createTextMessage(payload);
>
>        }
>
>
>        public void print(Object text){
>                System.out.println(text.toString());
>        }
>
>
>        public static void main(String[] args) throws Exception {
>                JMSClient client = new JMSClient(false,"c.c.p.v.ism.device");
>                //TextMessage message = client.getTextMessage("This is a test");
>                //client.sendMessage(message,1,0);
>                //Thread.sleep(100000);
>                //client.tearDown();
>        }
>
>        @Override
>        public void onException(JMSException exception) {
>                System.out.println("Exception detected..");
>                exception.printStackTrace();
>
>        }
>
>
> }
>
>
> --
> View this message in context: http://activemq.2283324.n4.nabble.com/Subscriber-throws-errors-and-dies-when-using-multiple-openwire-JMS-client-tp3509914p3509914.html
> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>



-- 
http://blog.garytully.com
http://fusesource.com

Mime
View raw message