From users-return-27482-apmail-activemq-users-archive=activemq.apache.org@activemq.apache.org Wed May 11 17:39:34 2011 Return-Path: X-Original-To: apmail-activemq-users-archive@www.apache.org Delivered-To: apmail-activemq-users-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id A9FCE413B for ; Wed, 11 May 2011 17:39:34 +0000 (UTC) Received: (qmail 25190 invoked by uid 500); 11 May 2011 17:39:34 -0000 Delivered-To: apmail-activemq-users-archive@activemq.apache.org Received: (qmail 25170 invoked by uid 500); 11 May 2011 17:39:34 -0000 Mailing-List: contact users-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: users@activemq.apache.org Delivered-To: mailing list users@activemq.apache.org Received: (qmail 25162 invoked by uid 99); 11 May 2011 17:39:34 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 11 May 2011 17:39:34 +0000 X-ASF-Spam-Status: No, hits=0.6 required=5.0 tests=FREEMAIL_FROM,RCVD_IN_DNSWL_LOW,RFC_ABUSE_POST,SPF_PASS,T_TO_NO_BRKTS_FREEMAIL,URI_HEX X-Spam-Check-By: apache.org Received-SPF: pass (nike.apache.org: domain of gary.tully@gmail.com designates 209.85.216.171 as permitted sender) Received: from [209.85.216.171] (HELO mail-qy0-f171.google.com) (209.85.216.171) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 11 May 2011 17:39:27 +0000 Received: by qyj19 with SMTP id 19so2993584qyj.2 for ; Wed, 11 May 2011 10:39:06 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=gamma; h=domainkey-signature:mime-version:in-reply-to:references:date :message-id:subject:from:to:content-type:content-transfer-encoding; bh=2RMoDYBtPOJxmpsjOv8tZbaGoFHE4GodDyahcGFf0Ag=; b=wzHydJQgCsUanCqKVAbSCNQ4WWr/oAWsYyK0lU7ebXGDkrORVAi8gVm+qpRob2QHBV FvryEC1sP5QXc6CPpGz65LE/AwBx0xn9ZpX2CGwsI9baOjPtMjcAHn16zS+Jmtb7U+Kp wKFlBzqrHnB68vvGoRn1YYvtGE2NbpqZd8c+U= DomainKey-Signature: a=rsa-sha1; c=nofws; d=gmail.com; s=gamma; h=mime-version:in-reply-to:references:date:message-id:subject:from:to :content-type:content-transfer-encoding; b=E3LKgBrtCnySbbWwtCerzhJMnVr+hQxMMTSTDCrOoacbQEo6X8h5mEd0atWA/3j9To lzvgVW8DDl5EmK/UvpAlU5+QTmKjr6ptMuqkktMLurMdynmWRzqGuM8rHv9meWoLY5dd Mrpv9wISrVZ+PNXB8dfDXMgawiWYh1w23wOxA= MIME-Version: 1.0 Received: by 10.229.26.213 with SMTP id f21mr7157762qcc.217.1305135546712; Wed, 11 May 2011 10:39:06 -0700 (PDT) Received: by 10.229.84.207 with HTTP; Wed, 11 May 2011 10:39:06 -0700 (PDT) In-Reply-To: <1304963980337-3509914.post@n4.nabble.com> References: <1304963980337-3509914.post@n4.nabble.com> Date: Wed, 11 May 2011 18:39:06 +0100 Message-ID: Subject: Re: Subscriber throws errors and dies when using multiple openwire JMS client From: Gary Tully To: users@activemq.apache.org Content-Type: text/plain; charset=ISO-8859-1 Content-Transfer-Encoding: quoted-printable X-Virus-Checked: Checked by ClamAV on apache.org Can you see if you can recreate with the current 5.5 version? On 9 May 2011 18:59, jai.mathaiyan wrote: > Hi, > > =A0I have been playing around with activemq for some time. Now I am facin= g a > strange issue. I am using 5.3.1 version. > I have a broker running and a producer within the same JVM. If I have one > JMS client (openwire) , everything works fine. > > Whenever I launch multiple instances of JMS client (as separate applicati= on > from eclipse) subscribing to the same topic, I see the following exceptio= ns > on all the subscribers at various instances. > > I see these errors in the client's onException =A0method. The broker and > producer continue to run normally. There is no errors seen on the broker > side. > Pls let me know if anyone has faced a similar problem or how to go about > debugging it ? > > The only customizations on the broker is adding Authorization Plugin. I > researched a bit on the error and the following post that says that the > problem is fixed in 5.1 version. > https://issues.apache.org/jira/browse/AMQ-1169?page=3Dcom.atlassian.jira.= plugin.system.issuetabpanels%3Aall-tabpanel#issue-tabs > > javax.jms.JMSException: Unexpected error occured > =A0 =A0 =A0 =A0 at > org.apache.activemq.util.JMSExceptionSupport.create(JMSExceptionSupport.j= ava:49) > =A0 =A0 =A0 =A0 at > org.apache.activemq.ActiveMQConnection.onAsyncException(ActiveMQConnectio= n.java:1803) > =A0 =A0 =A0 =A0 at > org.apache.activemq.ActiveMQConnection.onException(ActiveMQConnection.jav= a:1820) > =A0 =A0 =A0 =A0 at > org.apache.activemq.transport.TransportFilter.onException(TransportFilter= .java:99) > =A0 =A0 =A0 =A0 at > org.apache.activemq.transport.ResponseCorrelator.onException(ResponseCorr= elator.java:126) > =A0 =A0 =A0 =A0 at > org.apache.activemq.transport.TransportFilter.onException(TransportFilter= .java:99) > =A0 =A0 =A0 =A0 at > org.apache.activemq.transport.TransportFilter.onException(TransportFilter= .java:99) > =A0 =A0 =A0 =A0 at > org.apache.activemq.transport.WireFormatNegotiator.onException(WireFormat= Negotiator.java:160) > =A0 =A0 =A0 =A0 at > org.apache.activemq.transport.InactivityMonitor.onException(InactivityMon= itor.java:254) > =A0 =A0 =A0 =A0 at > org.apache.activemq.transport.TransportSupport.onException(TransportSuppo= rt.java:97) > =A0 =A0 =A0 =A0 at > org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:195) > =A0 =A0 =A0 =A0 at java.lang.Thread.run(Thread.java:736) > Caused by: java.io.IOException: Unexpected error occured > =A0 =A0 =A0 =A0 at > org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:193) > =A0 =A0 =A0 =A0 ... 1 more > Caused by: java.lang.ClassCastException: > org.apache.activemq.command.BrokerId in > compatible with org.apache.activemq.command.ConsumerId > =A0 =A0 =A0 =A0 at > org.apache.activemq.openwire.v5.MessageMarshaller.tightUnmarshal(MessageM= arshaller.java:75) > =A0 =A0 =A0 =A0 at > org.apache.activemq.openwire.v5.ActiveMQMessageMarshaller.tightUnmarshal(= ActiveMQMessageMarshaller.java:66) > =A0 =A0 =A0 =A0 at > org.apache.activemq.openwire.v5.ActiveMQTextMessageMarshaller.tightUnmars= hal(ActiveMQTextMessageMarshaller.java:66) > =A0 =A0 =A0 =A0 at > org.apache.activemq.openwire.OpenWireFormat.tightUnmarshalNestedObject(Op= enWireFormat.java:453) > =A0 =A0 =A0 =A0 at > org.apache.activemq.openwire.v5.BaseDataStreamMarshaller.tightUnmarsalNes= tedObject(BaseDataStreamMarshaller.java:126) > =A0 =A0 =A0 =A0 at > org.apache.activemq.openwire.v5.MessageDispatchMarshaller.tightUnmarshal(= MessageDispatchMarshaller.java:71) > =A0 =A0 =A0 =A0 at > org.apache.activemq.openwire.OpenWireFormat.doUnmarshal(OpenWireFormat.ja= va:362) > =A0 =A0 =A0 =A0 at > org.apache.activemq.openwire.OpenWireFormat.unmarshal(OpenWireFormat.java= :276) > =A0 =A0 =A0 =A0 at > org.apache.activemq.transport.tcp.TcpTransport.readCommand(TcpTransport.j= ava:211) > =A0 =A0 =A0 =A0 at > org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:20= 3) > =A0 =A0 =A0 =A0 at > org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:186) > > > JMS Client code-- > > package com.cisco.psbu.vs.ism.test.client.events; > > import java.io.ByteArrayOutputStream; > import java.io.ObjectOutputStream; > > import javax.jms.Connection; > import javax.jms.Destination; > import javax.jms.ExceptionListener; > import javax.jms.JMSException; > import javax.jms.Message; > import javax.jms.MessageConsumer; > import javax.jms.MessageListener; > import javax.jms.MessageProducer; > import javax.jms.Session; > import javax.jms.TextMessage; > > import org.apache.activemq.ActiveMQConnectionFactory; > import org.apache.activemq.command.ActiveMQTextMessage; > > public class JMSClient implements ExceptionListener{ > > =A0 =A0 =A0 =A0private final String DEFAULT_USER =3D "jai"; > =A0 =A0 =A0 =A0private final String DEFAULT_PASSWORD =3D "jai"; > =A0 =A0 =A0 =A0private final String DEFAULT_BROKER_URL =3D "tcp://localho= st:61616"; > =A0 =A0 =A0 =A0private ActiveMQConnectionFactory connectionFactory; > =A0 =A0 =A0 =A0private Connection connection; > =A0 =A0 =A0 =A0private Session session; > =A0 =A0 =A0 =A0private Destination destination; > =A0 =A0 =A0 =A0private boolean transacted =3D false; > =A0 =A0 =A0 =A0private boolean isQueue =3D false; > =A0 =A0 =A0 =A0private String destinationName; > =A0 =A0 =A0 =A0private MessageConsumer consumer ; > =A0 =A0 =A0 =A0private MessageProducer producer; > =A0 =A0 =A0 =A0private String hostname =3D "localhost"; > =A0 =A0 =A0 =A0private static int count =3D 0; > > =A0 =A0 =A0 =A0public JMSClient(boolean isQ,String destination){ > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0this(isQ, destination, "localhost"); > =A0 =A0 =A0 =A0} > > =A0 =A0 =A0 =A0public JMSClient(boolean isQ,String destination, String ho= stname){ > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0try{ > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0this.isQueue =3D isQ; > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0this.destinationName =3D d= estination; > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0this.hostname =3D hostname= ; > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0print("begin"); > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0setUp(); > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0print("setup complete"); > > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0//print("message Size in b= ytes: " + getMessageSize()); > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0}catch (Exception e) { > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0e.printStackTrace(); > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0} > =A0 =A0 =A0 =A0} > > =A0 =A0 =A0 =A0private void setUp() throws JMSException, InterruptedExcep= tion { > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0connectionFactory =3D new ActiveMQConnecti= onFactory( > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0DEFAULT_US= ER, > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0DEFAULT_PA= SSWORD, > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0"tcp://"+ = hostname + ":61616"); > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0connection =3D > connectionFactory.createConnection("abc","a5405e08-701f-4631-9b69-2476cc4= 9a87b"); > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0connection.setExceptionListener(this); > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0connection.start(); > > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0session =3D connection.createSession(trans= acted, Session.AUTO_ACKNOWLEDGE); > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0//destination =3D session.createTopic("c.c= .p.v.ism.>"); > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0if(isQueue){ > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0destination =3D session.cr= eateQueue(destinationName); > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0}else{ > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0destination =3D session.cr= eateTopic(destinationName); > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0createConsumerAndReceiveAM= essage(); > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0print("create topic comple= te. Waiting for messages..."); > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0} > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0createProducer(); > =A0 =A0 =A0 =A0} > > =A0 =A0 =A0 =A0private void createConsumerAndReceiveAMessage() throws JMS= Exception, > InterruptedException { > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0consumer =3D session.createConsumer(destin= ation); > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0MyConsumer myConsumer =3D new MyConsumer()= ; > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0connection.setExceptionListener(myConsumer= ); > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0consumer.setMessageListener(myConsumer); > =A0 =A0 =A0 =A0} > > =A0 =A0 =A0 =A0private void createProducer() throws JMSException{ > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0producer =3D session.createProducer(destin= ation); > =A0 =A0 =A0 =A0} > > =A0 =A0 =A0 =A0public void sendMessage(TextMessage message, int messageCo= unt, long > sleepTime){ > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0try{ > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0sendLoop(message, messageC= ount, sleepTime); > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0}catch (Exception e) { > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0e.printStackTrace(); > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0} > =A0 =A0 =A0 =A0} > > > =A0 =A0 =A0 =A0protected void sendLoop(TextMessage message, int messageCo= unt, long > sleepTime) > =A0 =A0 =A0 =A0throws Exception { > > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0for (int i =3D 0; i < messageCount || mess= ageCount =3D=3D 0; i++) { > > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0String msg =3D message.get= Text(); > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0if (msg.length() > 50) { > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0msg =3D ms= g.substring(0, 50) + "..."; > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0} > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0System.out.println("Sendin= g message: " + msg); > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0producer.send(message); > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0Thread.sleep(sleepTime); > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0} > > =A0 =A0 =A0 =A0} > > =A0 =A0 =A0 =A0public void tearDown(){ > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0try { > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0connection= .close(); > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0} catch (Throwable ignore) { > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0ignore.printStackTrace(); > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0} > =A0 =A0 =A0 =A0} > > =A0 =A0 =A0 =A0private class MyConsumer implements MessageListener, Excep= tionListener { > > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0synchronized public void onException(JMSEx= ception ex) { > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0print("JMS Exception occur= ed. =A0Shutting down client."); > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0ex.printStackTrace(); > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0System.exit(1); > > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0} > > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0public void onMessage(Message message) { > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0if (message instanceof Tex= tMessage) { > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0count ++; > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0TextMessag= e textMessage =3D (TextMessage) message; > > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0System.out= .println(" total message size in bytes :" + totalSize); > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0try{ > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 = =A0 =A0print("Received message: " + textMessage); > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0}catch(Exc= eption ex){ > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 = =A0 =A0print("Received message: " + textMessage); > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0} > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0//textMess= age.getText(); > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0//textMess= age.getStringProperty("msgOpCode"); > > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0} else =A0{ > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0print("Rec= eived: " + message); > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0} > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0} > =A0 =A0 =A0 =A0} > > =A0 =A0 =A0 =A0public TextMessage getTextMessage(String payload) throws J= MSException{ > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0return session.createTextMessage(payload); > > =A0 =A0 =A0 =A0} > > > =A0 =A0 =A0 =A0public void print(Object text){ > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0System.out.println(text.toString()); > =A0 =A0 =A0 =A0} > > > =A0 =A0 =A0 =A0public static void main(String[] args) throws Exception { > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0JMSClient client =3D new JMSClient(false,"= c.c.p.v.ism.device"); > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0//TextMessage message =3D client.getTextMe= ssage("This is a test"); > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0//client.sendMessage(message,1,0); > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0//Thread.sleep(100000); > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0//client.tearDown(); > =A0 =A0 =A0 =A0} > > =A0 =A0 =A0 =A0@Override > =A0 =A0 =A0 =A0public void onException(JMSException exception) { > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0System.out.println("Exception detected..")= ; > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0exception.printStackTrace(); > > =A0 =A0 =A0 =A0} > > > } > > > -- > View this message in context: http://activemq.2283324.n4.nabble.com/Subsc= riber-throws-errors-and-dies-when-using-multiple-openwire-JMS-client-tp3509= 914p3509914.html > Sent from the ActiveMQ - User mailing list archive at Nabble.com. > --=20 http://blog.garytully.com http://fusesource.com