activemq-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "John D. Ament" <johndam...@apache.org>
Subject Re: Artemis Core consumer hangs when reading message sent by AMQP producer
Date Fri, 02 Sep 2016 16:51:33 GMT
Cool, thanks.  I won't be able to recreate a test today.  But I have a
suspicion that the message length is being lost.  Will follow up.


And like Justin, I would expect this to work as well.

John

On Fri, Sep 2, 2016 at 12:48 PM Andy Redhead <andy.redhead@oneadvanced.com>
wrote:

> Hmm, something is eating my attachments, both classes inline below:
>
>
> NativeProcessMsgReader>
>
>
>
> package com...c2java;
>
> import java.util.concurrent.ArrayBlockingQueue;
> import java.util.concurrent.BlockingQueue;
> import java.util.concurrent.ThreadPoolExecutor;
> import java.util.concurrent.TimeUnit;
>
> import javax.annotation.PostConstruct;
> import javax.annotation.PreDestroy;
>
> import org.apache.activemq.artemis.api.core.ActiveMQException;
> import org.apache.activemq.artemis.api.core.TransportConfiguration;
> import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
> import org.apache.activemq.artemis.api.core.client.ClientConsumer;
> import org.apache.activemq.artemis.api.core.client.ClientMessage;
> import org.apache.activemq.artemis.api.core.client.ClientSession;
> import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
> import org.apache.activemq.artemis.api.core.client.ServerLocator;
> import
> org.apache.activemq.artemis.core.remoting.impl.invm.InVMConnectorFactory;
> import org.slf4j.Logger;
> import org.slf4j.LoggerFactory;
>
> import com...NativeProcessCmd;
>
> /**
>  * Read messages from Artemis using server API.
>  *
>  * Uses a single thread (with a persistent ClientSession) to read from the
> shared queue used by all
>  * native processes to send outbound messages.
>  *
>  * Messages are handed on to a fixed sized executor for processing and
> sending results to browser.
>  *
>  * @author andyredhead
>  *
>  */
> public class NativeProcessMsgReader extends Thread {
>
>   final static Logger logger =
> LoggerFactory.getLogger(NativeProcessMsgReader.class);
>
>   int workerThreadPoolSize = 1;
>   int executorQueueLength = 500;
>
>   String incomingMsgFromNativeProcQueueName;
>
>   BlockingQueue<Runnable> nativeProcCmdQueue;
>   ThreadPoolExecutor nativeProcCmdExec;
>
>   ClientSessionFactory factory;
>   ClientSession session;
>   ClientConsumer consumer;
>
>   boolean shutdown = false;
>
>   public NativeProcessMsgReader(int workerThreadPoolSize, int
> executorQueueLength,
>       String incomingMsgFromNativeProcQueueName) {
>     this.workerThreadPoolSize = workerThreadPoolSize;
>     this.incomingMsgFromNativeProcQueueName =
> incomingMsgFromNativeProcQueueName;
>   }
>
>   @PostConstruct
>   public void startUp() throws Exception {
>     logger.info("startUp - start, thread pool size: {}, queue length: {},
> queue name: {}",
>         workerThreadPoolSize, workerThreadPoolSize,
> incomingMsgFromNativeProcQueueName);
>
>     try {
>
>       // processing work taken from queue
>       nativeProcCmdQueue = new
> ArrayBlockingQueue<Runnable>(executorQueueLength);
>       nativeProcCmdExec = new ThreadPoolExecutor(workerThreadPoolSize,
> workerThreadPoolSize, 5,
>           TimeUnit.MINUTES, nativeProcCmdQueue);
>
>
>
>       this.start();
>
>     } catch (Exception asyncQueueSetupErr) {
>       logger.error("ctor - problem setting up queue consumer: " +
> asyncQueueSetupErr.getMessage(),
>           asyncQueueSetupErr);
>       throw asyncQueueSetupErr;
>     }
>
>     logger.info("startUp - complete.");
>   }
>
>   @PreDestroy
>   public void shutDown() {
>
>     logger.info("shutDown - commencing shut down");
>
>     // stop new work coming in
>     shutdown = true;
>
>     // close down the native prod cmd executor
>     logger.info("shutDown - requesting executor shuts down...");
>     nativeProcCmdExec.shutdownNow();
>     logger.info("shutDown - requested executor shuts down.");
>   }
>
>   @Override
>   public void run() {
>
>     // reading from queue
>     ServerLocator locator = ActiveMQClient.createServerLocatorWithoutHA(
>         new TransportConfiguration(InVMConnectorFactory.class.getName()));
>
>     try {
>       factory = locator.createSessionFactory();
>       session = factory.createSession();
>       consumer =
> session.createConsumer(incomingMsgFromNativeProcQueueName);
>       session.start();
>     } catch (Exception e) {
>       logger.warn("run - problem setting up native proc to java queue
> consumer: {}", e.getMessage(),
>           e);
>     }
>
>     while (!shutdown) {
>       try {
>
>         logger.debug("run - in while loop, waiting for message");
>
>         /* break every half second to check shutdown status */
>         ClientMessage msgReceived = consumer.receive(500);
>
>         if (msgReceived != null) {
>
>           logger.debug("run - received non-null message");
>           logger.debug("run - non-null message, body length: {}",
> msgReceived.getBodySize());
>
>           String receivedMsg = msgReceived.getBodyBuffer().readString();
>
>           logger.debug("run - message text: {}", receivedMsg);
>
>           if ((receivedMsg != null) && (receivedMsg.length() > 0)) {
>
>             logger.debug("run - in while loop, got message");
>
>             NativeProcessCmd nativeProcCmd = new
> NativeProcessCmd(receivedMsg);
>             nativeProcCmdExec.submit(nativeProcCmd);
>             logger.debug("run - in while loop, cmd pushed to executor");
>
>           } else {
>             logger.debug("run - null or zero length message read from
> native process cmd queue");
>           }
>         } else {
>           logger.debug("run - no message during last read period");
>         }
>
>       } catch (ActiveMQException e) {
>         logger.warn(
>             "run - problem reading message from native process command
> queue: " + e.getMessage(),
>             e);
>       }
>
>     } // end not shutdown while loop
>
>
>     // been told to shut down so close Artemis client objects
>
>     try {
>
>       consumer.close();
>       session.stop();
>       factory.close();
>
>       logger.info("run - Artemis client objects shut down.");
>
>     } catch (Exception e) {
>       logger.warn("run - problem closing Artemis client objects during
> shutdown: " + e.getMessage(),
>           e);
>     }
>
>   }
>
> }
>
>
> NumberGuessMain>
>
> package com...numberguess;
>
> import java.io.IOException;
> import java.util.HashMap;
>
> import org.apache.qpid.proton.amqp.messaging.AmqpValue;
> import org.apache.qpid.proton.message.Message;
> import org.apache.qpid.proton.messenger.Messenger;
>
> import com.google.gson.Gson;
>
> public class NumberGuessMain {
>
>   private int answer = -1;
>
>   public static final int EXIT_CODE_WRONG_CMD_LINE_ARGS = 1000;
>
>   public static final int EXIT_CODE_BROWSER_CONTROL_QUEUE_SETUP_ERR = 1001;
>
>   public static final int EXIT_CODE_BROWSER_RESPONSE_QUEUE_SETUP_ERR =
> 1002;
>
>   private String taskInstanceId;
>
>   private String browserControlQueueUrl;
>
>   private String browserResponseQueueUrl;
>
>   Messenger sendBrowserControlMessenger;
>
>   Messenger receiveBrowserResponseMessenger;
>
>   Gson gson = new Gson();
>
>
>   public static void main(String[] args) {
>     if ((args != null) && (args.length > 2)) {
>       NumberGuessMain me = new NumberGuessMain(args[0], args[1], args[2]);
>       me.run();
>       me.shutdown();
>     } else {
>
>       System.err.println("task instance id, browser control and response
> queues not specified"
>           + " (expects taskInstanceId browserControlQueueAmqpUrl
> browserResponseQueueAmqpUrl)");
>       System.exit(EXIT_CODE_WRONG_CMD_LINE_ARGS);
>     }
>
>
>
>   }
>
>   public NumberGuessMain(String taskInstanceId, String
> browserControlQueueUrl,
>       String browserResponseQueueUrl) {
>
>     this.taskInstanceId = taskInstanceId;
>     this.browserControlQueueUrl = browserControlQueueUrl;
>     this.browserResponseQueueUrl = browserResponseQueueUrl;
>
>     System.out.println("Task Instance ID: " + this.taskInstanceId);
>
>     // browser control queue
>
>     System.out.println("Browser Control Queue: " +
> this.browserControlQueueUrl);
>
>     sendBrowserControlMessenger = new Messenger.Factory().create();
>     try {
>       sendBrowserControlMessenger.start();
>     } catch (IOException e) {
>       System.err.println("Problem starting browser control AMQP messenger:
> " + e.getMessage());
>       e.printStackTrace();
>       System.exit(EXIT_CODE_BROWSER_CONTROL_QUEUE_SETUP_ERR);
>     }
>
>
>     // browser response queue
>
>     System.out.println("Browser Response Queue: " +
> this.browserResponseQueueUrl);
>
>     receiveBrowserResponseMessenger = new Messenger.Factory().create();
>     try {
>
>       receiveBrowserResponseMessenger.start();
>       receiveBrowserResponseMessenger.subscribe(browserResponseQueueUrl);
>
>     } catch (IOException e) {
>       System.err.println("Problem starting receive from browser AMQP
> messenger: " + e.getMessage());
>       e.printStackTrace();
>       System.exit(EXIT_CODE_BROWSER_RESPONSE_QUEUE_SETUP_ERR);
>     }
>
>   }
>
>   protected void run() {
>
>     System.out.println("Hello from numberguess");
>
>     OutboundNativeProcessMessage showFirstPanelMsg =
>         new OutboundNativeProcessMessage(taskInstanceId,
>             "ShowTitlePanel", null);
>
>     send(showFirstPanelMsg);
>   }
>
>   protected void send(OutboundNativeProcessMessage outMsg) {
>     Message msg = new Message.Factory().create();
>     msg.setAddress(browserControlQueueUrl);
>     String msgJson = gson.toJson(outMsg);
>     msg.setBody(new AmqpValue(msgJson));
>     sendBrowserControlMessenger.put(msg);
>     sendBrowserControlMessenger.send();
>     System.out.println("Sent browser control msg: " + msgJson);
>   }
>
>   protected HashMap sendReceive(OutboundNativeProcessMessage outMsg) {
>     HashMap receivedData = null;
>
>     send(outMsg);
>
>     System.out.println("Waiting for response message...");
>     receiveBrowserResponseMessenger.recv();
>     Message msg = receiveBrowserResponseMessenger.get();
>     System.out.println("Got response message.");
>
>     // @ToDo process the message here...
>
>     return receivedData;
>   }
>
>   protected void shutdown() {
>     if (sendBrowserControlMessenger != null) {
>       sendBrowserControlMessenger.stop();
>     }
>     if (receiveBrowserResponseMessenger != null) {
>       try {
>         // receiveBrowserResponseMessenger.
>         receiveBrowserResponseMessenger.stop();
>       } catch (Exception e) {
>         System.err.println("Problem closing browser response queue
> messenger: " + e.getMessage());
>         e.printStackTrace();
>       }
>     }
>   }
>
> }
>
> -----Original Message-----
> From: Andy Redhead [mailto:andy.redhead@oneadvanced.com]
> Sent: 02 September 2016 17:45
> To: users@activemq.apache.org
> Subject: RE: Artemis Core consumer hangs when reading message sent by AMQP
> producer
>
> Hi,
>
> Sorry, I could have sworn I attached the code in my original email -
> trying again...
>
> Cheers, Andy
>
> -----Original Message-----
> From: John D. Ament [mailto:johndament@apache.org]
> Sent: 02 September 2016 17:42
> To: users@activemq.apache.org
> Subject: Re: Artemis Core consumer hangs when reading message sent by AMQP
> producer
>
> Andy,
>
> Are you able to provide your client code?  NativeProcessMsgReader isn't
> Artemis's so its not clear what else you're doing to connect, subscribe,
> etc.
>
> John
>
> On Fri, Sep 2, 2016 at 12:38 PM Andy Redhead <andy.redhead@oneadvanced.com
> >
> wrote:
>
> > Hi,
> >
> >
> >
> > Adding to the information below, if I change the junit test case that
> > tries sending a message through artemis into “NativeProcessMsgReader”
> > to use AMQP (rather than the core api), I get the stack trace:
> >
> >
> >
> > Exception in thread "Thread-1" *java.lang.IndexOutOfBoundsException*:
> > readerIndex(25) + length(276496418) exceeds writerIndex(294):
> > DuplicatedByteBuf(ridx: 25, widx: 294, cap: 512, unwrapped:
> > UnpooledHeapByteBuf(ridx: 17, widx: 294, cap: 512))
> >
> >       at io.netty.buffer.AbstractByteBuf.checkReadableBytes(
> > *AbstractByteBuf.java:1165*)
> >
> >       at io.netty.buffer.AbstractByteBuf.readBytes(
> > *AbstractByteBuf.java:675*)
> >
> >       at io.netty.buffer.AbstractByteBuf.readBytes(
> > *AbstractByteBuf.java:683*)
> >
> >       at io.netty.buffer.WrappedByteBuf.readBytes(
> > *WrappedByteBuf.java:511*)
> >
> >       at
> > org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper.rea
> > dSimpleStringInternal(
> > *ChannelBufferWrapper.java:90*)
> >
> >       at
> > org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper.rea
> > dStringInternal(
> > *ChannelBufferWrapper.java:113*)
> >
> >       at
> > org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper.rea
> > dString(
> > *ChannelBufferWrapper.java:96*)
> >
> >       at
> > com.advanced365.transoft.renault.poc.decforms.asyncmsg.c2java.NativePr
> > ocessMsgReader.run(
> > *NativeProcessMsgReader.java:132*)
> >
> >
> >
> > It is interesting to note that “NativeProcessMsgReader.java:132” links
> > to this line:
> >
> >
> >
> >               String receivedMsg =
> > msgReceived.getBodyBuffer().readString();
> >
> >
> >
> > Which is the line I thought was causing trouble when the code ran in
> > Tomcat…
> >
> >
> >
> >
> >
> > Next, I modified “NativeProcessMsgReader” to use Qpid AMQP client to
> > read messages from Artemis (with the test case still sending messages
> > using
> > AMQP) – this worked perfectly.
> >
> >
> >
> > So it seems that:
> >
> > ·       if the producer and consumer use the same protocol, messages flow
> > through ok.
> >
> > ·       If the producer users AMQP API and the consumer uses Artemis Core
> > API then there is a problem
> >
> >
> >
> > For now I’ll stick with using AMQP at both ends.
> >
> >
> >
> > I’m still curious to know if it’s reasonable to expect using an AMQP
> > producer and an Artemis Core consumer to work?
> >
> >
> >
> > Cheers, Andy
> >
> >
> >
> > *From:* Andy Redhead [mailto:andy.redhead@oneadvanced.com]
> > *Sent:* 01 September 2016 23:03
> > *To:* users@activemq.apache.org
> > *Subject:* Artemis Core consumer hangs when reading message sent by
> > AMQP producer
> >
> >
> >
> > Hi,
> >
> >
> >
> > I’m running Artemis 1.3.0 embedded inside a spring app running in Tomcat.
> >
> >
> >
> > I have a remote message producer (NumberGuessMain.java) that uses the
> > Apache Qpid Proton library to push messages onto an Artemis queue with
> > the
> > URL:
> >
> >
> >
> >               amqp://localhost:5672/native-2-java
> >
> >
> >
> > The messages created by the producer are JSON strings.
> >
> >
> >
> > I have a single threaded, singleton consumer that uses the native
> > Artemis API (NativeProcessMsgReader.java) running inside the same web
> > app as Artemis, reading from the queue:
> >
> >
> >
> >               native-2-java
> >
> >
> >
> > While there are no messages to read, the consumer happily loops
> > through the while loop in the “run” method.
> >
> >
> >
> > As soon as the consumer tries to read the body of the first message,
> > it hangs on the line:
> >
> >
> >
> >               String receivedMsg =
> > msgReceived.getBodyBuffer().readString();
> >
> >
> >
> > The last lines in the log file are:
> >
> >
> >
> > 2016-09-01T22:27:26,270 15137 [Thread-6] DEBUG
> > c.a.t.r.p.d.a.c.NativeProcessMsgReader - run - no message during last
> > read period
> >
> > 2016-09-01T22:27:26,270 15137 [Thread-6] DEBUG
> > c.a.t.r.p.d.a.c.NativeProcessMsgReader - run - in while loop, waiting
> > for message
> >
> > 2016-09-01T22:27:26,772 15639 [Thread-6] DEBUG
> > c.a.t.r.p.d.a.c.NativeProcessMsgReader - run - no message during last
> > read period
> >
> > 2016-09-01T22:27:26,772 15639 [Thread-6] DEBUG
> > c.a.t.r.p.d.a.c.NativeProcessMsgReader - run - in while loop, waiting
> > for message
> >
> > 2016-09-01T22:27:26,912 15779 [Thread-6] DEBUG
> > c.a.t.r.p.d.a.c.NativeProcessMsgReader - run - received non-null
> > message
> >
> > 2016-09-01T22:27:26,912 15779 [Thread-6] DEBUG
> > c.a.t.r.p.d.a.c.NativeProcessMsgReader - run - non-null message, body
> > length: 307
> >
> >
> >
> > To me this looks like the message has reached Artemis and is made
> > available to the consumer but something goes wrong when the consumer
> > tries to access the body of the message.
> >
> >
> >
> > I’m new to Artemis and AMQP so it’s quite possible I’m doing something
> > stupid…
> >
> >
> >
> > Is the basic assumption that it’s ok to send a message using AMQP and
> > receive it using Artemis core API valid?
> >
> >
> >
> > Is there something obvious in the code that is causing this problem?
> >
> >
> >
> > Any pointers gratefully received.
> >
> >
> >
> > Cheers, Andy
> >
> >
> >
> > [image: cid:image012.png@01D17AF7.D972DF70]
> > <http://www.oneadvanced.com/>
> >
> > *Andy Redhead*
> > Principal Consultant > Solutions > Advanced
> > *________________________*
> >
> > *Advanced*
> > 230 City Road, London, EC1V 2TT
> > t: 020 7880 8888 > m: 0781 392 5246
> >
> > www.oneadvanced.com
> >
> > [image: cid:image018.png@01D17AF7.D972DF70]
> > <http://www.linkedin.com/company/2426258>[image:
> > cid:image019.png@01D17AF7.D972DF70]
> > <https://twitter.com/Going_Advanced>
> >
> > *>*
> > * A Sunday Times Top Track 250 Company 2015 **>** Ranked in UK's 50
> > fastest growing technology companies 2014*
> >
> >
> >
> > ***** Email confidentiality *****
> >
> > This message is private and confidential. If you have received this
> > message in error, please notify us and remove it from your system. The
> > dissemination, copying or distribution of this message, or related
> > files, by anyone other than the intended recipient is strictly
> prohibited.
> >
> >
> >
> > Any views or opinions expressed are solely those of the author and do
> > not necessarily represent those of Advanced 365 Limited.
> >
> >
> >
> > ***** Email monitoring *****
> >
> > Advanced 365 Limited may monitor email traffic data and also the
> > content of email for the purposes of security and staff training.
> >
> >
> >
> > ***** Email security *****
> >
> > In keeping with good computing practice, the recipient of this email
> > should ensure that it is virus-free. Advanced 365 Limited does not
> > accept responsibility for any virus that may be transferred by way of
> this email.
> >
> >
> >
> > Email may be susceptible to data corruption, interception and/or
> > unauthorised amendment. Advanced 365 Limited does not accept liability
> > for any such corruption, interception or amendment or any consequences
> thereof.
> >
> >
> >
> > This email has been scanned for viruses by the Symantec Email
> > Security.cloud service.
> >
> >
> >
> > Advanced 365 Limited, part of the Advanced Computer Software Group
> >
> > Registered office: Ditton Park, Riding Court Road, Datchet, Berkshire,
> > SL3 9LL, UK
> >
> > Registered in England under number 2124540
> >
> >
> > ------------------------------
> >
> >
> > Please consider the environment: Think before you print!
> >
> > ***** Email confidentiality *****
> >
> > This message is private and confidential. If you have received this
> > message in error, please notify us and remove it from your system. The
> > dissemination, copying or distribution of this message, or related
> > files, by anyone other than the intended recipient is strictly
> prohibited.
> >
> >
> >
> > Any views or opinions expressed are solely those of the author and do
> > not necessarily represent those of Advanced 365 Limited.
> >
> >
> >
> > ***** Email monitoring *****
> >
> > Advanced 365 Limited may monitor email traffic data and also the
> > content of email for the purposes of security and staff training.
> >
> >
> >
> > ***** Email security *****
> >
> > In keeping with good computing practice, the recipient of this email
> > should ensure that it is virus-free. Advanced 365 Limited does not
> > accept responsibility for any virus that may be transferred by way of
> this email.
> >
> >
> >
> > Email may be susceptible to data corruption, interception and/or
> > unauthorised amendment. Advanced 365 Limited does not accept liability
> > for any such corruption, interception or amendment or any consequences
> thereof.
> >
> >
> >
> > This email has been scanned for viruses by the Symantec Email
> > Security.cloud service.
> >
> >
> >
> > Advanced 365 Limited, part of the Advanced Computer Software Group
> >
> > Registered office: Ditton Park, Riding Court Road, Datchet, Berkshire,
> > SL3 9LL, UK
> >
> > Registered in England under number 2124540
> >
> ***** Email confidentiality *****
>
> This message is private and confidential. If you have received this
> message in error, please notify us and remove it from your system. The
> dissemination, copying or distribution of this message, or related files,
> by anyone other than the intended recipient is strictly prohibited.
>
>
>
> Any views or opinions expressed are solely those of the author and do not
> necessarily represent those of Advanced 365 Limited.
>
>
>
> ***** Email monitoring *****
>
> Advanced 365 Limited may monitor email traffic data and also the content
> of email for the purposes of security and staff training.
>
>
>
> ***** Email security *****
>
> In keeping with good computing practice, the recipient of this email
> should ensure that it is virus-free. Advanced 365 Limited does not accept
> responsibility for any virus that may be transferred by way of this email.
>
>
>
> Email may be susceptible to data corruption, interception and/or
> unauthorised amendment. Advanced 365 Limited does not accept liability for
> any such corruption, interception or amendment or any consequences thereof.
>
>
>
> This email has been scanned for viruses by the Symantec Email
> Security.cloud service.
>
>
>
> Advanced 365 Limited, part of the Advanced Computer Software Group
>
> Registered office: Ditton Park, Riding Court Road, Datchet, Berkshire, SL3
> 9LL, UK
>
> Registered in England under number 2124540
> ***** Email confidentiality *****
>
> This message is private and confidential. If you have received this
> message in error, please notify us and remove it from your system. The
> dissemination, copying or distribution of this message, or related files,
> by anyone other than the intended recipient is strictly prohibited.
>
>
>
> Any views or opinions expressed are solely those of the author and do not
> necessarily represent those of Advanced 365 Limited.
>
>
>
> ***** Email monitoring *****
>
> Advanced 365 Limited may monitor email traffic data and also the content
> of email for the purposes of security and staff training.
>
>
>
> ***** Email security *****
>
> In keeping with good computing practice, the recipient of this email
> should ensure that it is virus-free. Advanced 365 Limited does not accept
> responsibility for any virus that may be transferred by way of this email.
>
>
>
> Email may be susceptible to data corruption, interception and/or
> unauthorised amendment. Advanced 365 Limited does not accept liability for
> any such corruption, interception or amendment or any consequences thereof.
>
>
>
> This email has been scanned for viruses by the Symantec Email
> Security.cloud service.
>
>
>
> Advanced 365 Limited, part of the Advanced Computer Software Group
>
> Registered office: Ditton Park, Riding Court Road, Datchet, Berkshire, SL3
> 9LL, UK
>
> Registered in England under number 2124540
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message