activemq-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Andy Redhead <andy.redh...@oneadvanced.com>
Subject RE: Artemis Core consumer hangs when reading message sent by AMQP producer
Date Fri, 02 Sep 2016 16:48:28 GMT
Hmm, something is eating my attachments, both classes inline below:


NativeProcessMsgReader>



package com...c2java;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;

import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.remoting.impl.invm.InVMConnectorFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com...NativeProcessCmd;

/**
 * Read messages from Artemis using server API.
 *
 * Uses a single thread (with a persistent ClientSession) to read from the shared queue used
by all
 * native processes to send outbound messages.
 *
 * Messages are handed on to a fixed sized executor for processing and sending results to
browser.
 *
 * @author andyredhead
 *
 */
public class NativeProcessMsgReader extends Thread {

  final static Logger logger = LoggerFactory.getLogger(NativeProcessMsgReader.class);

  int workerThreadPoolSize = 1;
  int executorQueueLength = 500;

  String incomingMsgFromNativeProcQueueName;

  BlockingQueue<Runnable> nativeProcCmdQueue;
  ThreadPoolExecutor nativeProcCmdExec;

  ClientSessionFactory factory;
  ClientSession session;
  ClientConsumer consumer;

  boolean shutdown = false;

  public NativeProcessMsgReader(int workerThreadPoolSize, int executorQueueLength,
      String incomingMsgFromNativeProcQueueName) {
    this.workerThreadPoolSize = workerThreadPoolSize;
    this.incomingMsgFromNativeProcQueueName = incomingMsgFromNativeProcQueueName;
  }

  @PostConstruct
  public void startUp() throws Exception {
    logger.info("startUp - start, thread pool size: {}, queue length: {}, queue name: {}",
        workerThreadPoolSize, workerThreadPoolSize, incomingMsgFromNativeProcQueueName);

    try {

      // processing work taken from queue
      nativeProcCmdQueue = new ArrayBlockingQueue<Runnable>(executorQueueLength);
      nativeProcCmdExec = new ThreadPoolExecutor(workerThreadPoolSize, workerThreadPoolSize,
5,
          TimeUnit.MINUTES, nativeProcCmdQueue);



      this.start();

    } catch (Exception asyncQueueSetupErr) {
      logger.error("ctor - problem setting up queue consumer: " + asyncQueueSetupErr.getMessage(),
          asyncQueueSetupErr);
      throw asyncQueueSetupErr;
    }

    logger.info("startUp - complete.");
  }

  @PreDestroy
  public void shutDown() {

    logger.info("shutDown - commencing shut down");

    // stop new work coming in
    shutdown = true;

    // close down the native prod cmd executor
    logger.info("shutDown - requesting executor shuts down...");
    nativeProcCmdExec.shutdownNow();
    logger.info("shutDown - requested executor shuts down.");
  }

  @Override
  public void run() {

    // reading from queue
    ServerLocator locator = ActiveMQClient.createServerLocatorWithoutHA(
        new TransportConfiguration(InVMConnectorFactory.class.getName()));

    try {
      factory = locator.createSessionFactory();
      session = factory.createSession();
      consumer = session.createConsumer(incomingMsgFromNativeProcQueueName);
      session.start();
    } catch (Exception e) {
      logger.warn("run - problem setting up native proc to java queue consumer: {}", e.getMessage(),
          e);
    }

    while (!shutdown) {
      try {

        logger.debug("run - in while loop, waiting for message");

        /* break every half second to check shutdown status */
        ClientMessage msgReceived = consumer.receive(500);

        if (msgReceived != null) {

          logger.debug("run - received non-null message");
          logger.debug("run - non-null message, body length: {}", msgReceived.getBodySize());

          String receivedMsg = msgReceived.getBodyBuffer().readString();

          logger.debug("run - message text: {}", receivedMsg);

          if ((receivedMsg != null) && (receivedMsg.length() > 0)) {

            logger.debug("run - in while loop, got message");

            NativeProcessCmd nativeProcCmd = new NativeProcessCmd(receivedMsg);
            nativeProcCmdExec.submit(nativeProcCmd);
            logger.debug("run - in while loop, cmd pushed to executor");

          } else {
            logger.debug("run - null or zero length message read from native process cmd queue");
          }
        } else {
          logger.debug("run - no message during last read period");
        }

      } catch (ActiveMQException e) {
        logger.warn(
            "run - problem reading message from native process command queue: " + e.getMessage(),
            e);
      }

    } // end not shutdown while loop


    // been told to shut down so close Artemis client objects

    try {

      consumer.close();
      session.stop();
      factory.close();

      logger.info("run - Artemis client objects shut down.");

    } catch (Exception e) {
      logger.warn("run - problem closing Artemis client objects during shutdown: " + e.getMessage(),
          e);
    }

  }

}


NumberGuessMain>

package com...numberguess;

import java.io.IOException;
import java.util.HashMap;

import org.apache.qpid.proton.amqp.messaging.AmqpValue;
import org.apache.qpid.proton.message.Message;
import org.apache.qpid.proton.messenger.Messenger;

import com.google.gson.Gson;

public class NumberGuessMain {

  private int answer = -1;

  public static final int EXIT_CODE_WRONG_CMD_LINE_ARGS = 1000;

  public static final int EXIT_CODE_BROWSER_CONTROL_QUEUE_SETUP_ERR = 1001;

  public static final int EXIT_CODE_BROWSER_RESPONSE_QUEUE_SETUP_ERR = 1002;

  private String taskInstanceId;

  private String browserControlQueueUrl;

  private String browserResponseQueueUrl;

  Messenger sendBrowserControlMessenger;

  Messenger receiveBrowserResponseMessenger;

  Gson gson = new Gson();


  public static void main(String[] args) {
    if ((args != null) && (args.length > 2)) {
      NumberGuessMain me = new NumberGuessMain(args[0], args[1], args[2]);
      me.run();
      me.shutdown();
    } else {

      System.err.println("task instance id, browser control and response queues not specified"
          + " (expects taskInstanceId browserControlQueueAmqpUrl browserResponseQueueAmqpUrl)");
      System.exit(EXIT_CODE_WRONG_CMD_LINE_ARGS);
    }



  }

  public NumberGuessMain(String taskInstanceId, String browserControlQueueUrl,
      String browserResponseQueueUrl) {

    this.taskInstanceId = taskInstanceId;
    this.browserControlQueueUrl = browserControlQueueUrl;
    this.browserResponseQueueUrl = browserResponseQueueUrl;

    System.out.println("Task Instance ID: " + this.taskInstanceId);

    // browser control queue

    System.out.println("Browser Control Queue: " + this.browserControlQueueUrl);

    sendBrowserControlMessenger = new Messenger.Factory().create();
    try {
      sendBrowserControlMessenger.start();
    } catch (IOException e) {
      System.err.println("Problem starting browser control AMQP messenger: " + e.getMessage());
      e.printStackTrace();
      System.exit(EXIT_CODE_BROWSER_CONTROL_QUEUE_SETUP_ERR);
    }


    // browser response queue

    System.out.println("Browser Response Queue: " + this.browserResponseQueueUrl);

    receiveBrowserResponseMessenger = new Messenger.Factory().create();
    try {

      receiveBrowserResponseMessenger.start();
      receiveBrowserResponseMessenger.subscribe(browserResponseQueueUrl);

    } catch (IOException e) {
      System.err.println("Problem starting receive from browser AMQP messenger: " + e.getMessage());
      e.printStackTrace();
      System.exit(EXIT_CODE_BROWSER_RESPONSE_QUEUE_SETUP_ERR);
    }

  }

  protected void run() {

    System.out.println("Hello from numberguess");

    OutboundNativeProcessMessage showFirstPanelMsg =
        new OutboundNativeProcessMessage(taskInstanceId,
            "ShowTitlePanel", null);

    send(showFirstPanelMsg);
  }

  protected void send(OutboundNativeProcessMessage outMsg) {
    Message msg = new Message.Factory().create();
    msg.setAddress(browserControlQueueUrl);
    String msgJson = gson.toJson(outMsg);
    msg.setBody(new AmqpValue(msgJson));
    sendBrowserControlMessenger.put(msg);
    sendBrowserControlMessenger.send();
    System.out.println("Sent browser control msg: " + msgJson);
  }

  protected HashMap sendReceive(OutboundNativeProcessMessage outMsg) {
    HashMap receivedData = null;

    send(outMsg);

    System.out.println("Waiting for response message...");
    receiveBrowserResponseMessenger.recv();
    Message msg = receiveBrowserResponseMessenger.get();
    System.out.println("Got response message.");

    // @ToDo process the message here...

    return receivedData;
  }

  protected void shutdown() {
    if (sendBrowserControlMessenger != null) {
      sendBrowserControlMessenger.stop();
    }
    if (receiveBrowserResponseMessenger != null) {
      try {
        // receiveBrowserResponseMessenger.
        receiveBrowserResponseMessenger.stop();
      } catch (Exception e) {
        System.err.println("Problem closing browser response queue messenger: " + e.getMessage());
        e.printStackTrace();
      }
    }
  }

}

-----Original Message-----
From: Andy Redhead [mailto:andy.redhead@oneadvanced.com]
Sent: 02 September 2016 17:45
To: users@activemq.apache.org
Subject: RE: Artemis Core consumer hangs when reading message sent by AMQP producer

Hi,

Sorry, I could have sworn I attached the code in my original email - trying again...

Cheers, Andy

-----Original Message-----
From: John D. Ament [mailto:johndament@apache.org]
Sent: 02 September 2016 17:42
To: users@activemq.apache.org
Subject: Re: Artemis Core consumer hangs when reading message sent by AMQP producer

Andy,

Are you able to provide your client code?  NativeProcessMsgReader isn't Artemis's so its not
clear what else you're doing to connect, subscribe, etc.

John

On Fri, Sep 2, 2016 at 12:38 PM Andy Redhead <andy.redhead@oneadvanced.com>
wrote:

> Hi,
>
>
>
> Adding to the information below, if I change the junit test case that
> tries sending a message through artemis into “NativeProcessMsgReader”
> to use AMQP (rather than the core api), I get the stack trace:
>
>
>
> Exception in thread "Thread-1" *java.lang.IndexOutOfBoundsException*:
> readerIndex(25) + length(276496418) exceeds writerIndex(294):
> DuplicatedByteBuf(ridx: 25, widx: 294, cap: 512, unwrapped:
> UnpooledHeapByteBuf(ridx: 17, widx: 294, cap: 512))
>
>       at io.netty.buffer.AbstractByteBuf.checkReadableBytes(
> *AbstractByteBuf.java:1165*)
>
>       at io.netty.buffer.AbstractByteBuf.readBytes(
> *AbstractByteBuf.java:675*)
>
>       at io.netty.buffer.AbstractByteBuf.readBytes(
> *AbstractByteBuf.java:683*)
>
>       at io.netty.buffer.WrappedByteBuf.readBytes(
> *WrappedByteBuf.java:511*)
>
>       at
> org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper.rea
> dSimpleStringInternal(
> *ChannelBufferWrapper.java:90*)
>
>       at
> org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper.rea
> dStringInternal(
> *ChannelBufferWrapper.java:113*)
>
>       at
> org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper.rea
> dString(
> *ChannelBufferWrapper.java:96*)
>
>       at
> com.advanced365.transoft.renault.poc.decforms.asyncmsg.c2java.NativePr
> ocessMsgReader.run(
> *NativeProcessMsgReader.java:132*)
>
>
>
> It is interesting to note that “NativeProcessMsgReader.java:132” links
> to this line:
>
>
>
>               String receivedMsg =
> msgReceived.getBodyBuffer().readString();
>
>
>
> Which is the line I thought was causing trouble when the code ran in
> Tomcat…
>
>
>
>
>
> Next, I modified “NativeProcessMsgReader” to use Qpid AMQP client to
> read messages from Artemis (with the test case still sending messages
> using
> AMQP) – this worked perfectly.
>
>
>
> So it seems that:
>
> ·       if the producer and consumer use the same protocol, messages flow
> through ok.
>
> ·       If the producer users AMQP API and the consumer uses Artemis Core
> API then there is a problem
>
>
>
> For now I’ll stick with using AMQP at both ends.
>
>
>
> I’m still curious to know if it’s reasonable to expect using an AMQP
> producer and an Artemis Core consumer to work?
>
>
>
> Cheers, Andy
>
>
>
> *From:* Andy Redhead [mailto:andy.redhead@oneadvanced.com]
> *Sent:* 01 September 2016 23:03
> *To:* users@activemq.apache.org
> *Subject:* Artemis Core consumer hangs when reading message sent by
> AMQP producer
>
>
>
> Hi,
>
>
>
> I’m running Artemis 1.3.0 embedded inside a spring app running in Tomcat.
>
>
>
> I have a remote message producer (NumberGuessMain.java) that uses the
> Apache Qpid Proton library to push messages onto an Artemis queue with
> the
> URL:
>
>
>
>               amqp://localhost:5672/native-2-java
>
>
>
> The messages created by the producer are JSON strings.
>
>
>
> I have a single threaded, singleton consumer that uses the native
> Artemis API (NativeProcessMsgReader.java) running inside the same web
> app as Artemis, reading from the queue:
>
>
>
>               native-2-java
>
>
>
> While there are no messages to read, the consumer happily loops
> through the while loop in the “run” method.
>
>
>
> As soon as the consumer tries to read the body of the first message,
> it hangs on the line:
>
>
>
>               String receivedMsg =
> msgReceived.getBodyBuffer().readString();
>
>
>
> The last lines in the log file are:
>
>
>
> 2016-09-01T22:27:26,270 15137 [Thread-6] DEBUG
> c.a.t.r.p.d.a.c.NativeProcessMsgReader - run - no message during last
> read period
>
> 2016-09-01T22:27:26,270 15137 [Thread-6] DEBUG
> c.a.t.r.p.d.a.c.NativeProcessMsgReader - run - in while loop, waiting
> for message
>
> 2016-09-01T22:27:26,772 15639 [Thread-6] DEBUG
> c.a.t.r.p.d.a.c.NativeProcessMsgReader - run - no message during last
> read period
>
> 2016-09-01T22:27:26,772 15639 [Thread-6] DEBUG
> c.a.t.r.p.d.a.c.NativeProcessMsgReader - run - in while loop, waiting
> for message
>
> 2016-09-01T22:27:26,912 15779 [Thread-6] DEBUG
> c.a.t.r.p.d.a.c.NativeProcessMsgReader - run - received non-null
> message
>
> 2016-09-01T22:27:26,912 15779 [Thread-6] DEBUG
> c.a.t.r.p.d.a.c.NativeProcessMsgReader - run - non-null message, body
> length: 307
>
>
>
> To me this looks like the message has reached Artemis and is made
> available to the consumer but something goes wrong when the consumer
> tries to access the body of the message.
>
>
>
> I’m new to Artemis and AMQP so it’s quite possible I’m doing something
> stupid…
>
>
>
> Is the basic assumption that it’s ok to send a message using AMQP and
> receive it using Artemis core API valid?
>
>
>
> Is there something obvious in the code that is causing this problem?
>
>
>
> Any pointers gratefully received.
>
>
>
> Cheers, Andy
>
>
>
> [image: cid:image012.png@01D17AF7.D972DF70]
> <http://www.oneadvanced.com/>
>
> *Andy Redhead*
> Principal Consultant > Solutions > Advanced
> *________________________*
>
> *Advanced*
> 230 City Road, London, EC1V 2TT
> t: 020 7880 8888 > m: 0781 392 5246
>
> www.oneadvanced.com
>
> [image: cid:image018.png@01D17AF7.D972DF70]
> <http://www.linkedin.com/company/2426258>[image:
> cid:image019.png@01D17AF7.D972DF70]
> <https://twitter.com/Going_Advanced>
>
> *>*
> * A Sunday Times Top Track 250 Company 2015 **>** Ranked in UK's 50
> fastest growing technology companies 2014*
>
>
>
> ***** Email confidentiality *****
>
> This message is private and confidential. If you have received this
> message in error, please notify us and remove it from your system. The
> dissemination, copying or distribution of this message, or related
> files, by anyone other than the intended recipient is strictly prohibited.
>
>
>
> Any views or opinions expressed are solely those of the author and do
> not necessarily represent those of Advanced 365 Limited.
>
>
>
> ***** Email monitoring *****
>
> Advanced 365 Limited may monitor email traffic data and also the
> content of email for the purposes of security and staff training.
>
>
>
> ***** Email security *****
>
> In keeping with good computing practice, the recipient of this email
> should ensure that it is virus-free. Advanced 365 Limited does not
> accept responsibility for any virus that may be transferred by way of this email.
>
>
>
> Email may be susceptible to data corruption, interception and/or
> unauthorised amendment. Advanced 365 Limited does not accept liability
> for any such corruption, interception or amendment or any consequences thereof.
>
>
>
> This email has been scanned for viruses by the Symantec Email
> Security.cloud service.
>
>
>
> Advanced 365 Limited, part of the Advanced Computer Software Group
>
> Registered office: Ditton Park, Riding Court Road, Datchet, Berkshire,
> SL3 9LL, UK
>
> Registered in England under number 2124540
>
>
> ------------------------------
>
>
> Please consider the environment: Think before you print!
>
> ***** Email confidentiality *****
>
> This message is private and confidential. If you have received this
> message in error, please notify us and remove it from your system. The
> dissemination, copying or distribution of this message, or related
> files, by anyone other than the intended recipient is strictly prohibited.
>
>
>
> Any views or opinions expressed are solely those of the author and do
> not necessarily represent those of Advanced 365 Limited.
>
>
>
> ***** Email monitoring *****
>
> Advanced 365 Limited may monitor email traffic data and also the
> content of email for the purposes of security and staff training.
>
>
>
> ***** Email security *****
>
> In keeping with good computing practice, the recipient of this email
> should ensure that it is virus-free. Advanced 365 Limited does not
> accept responsibility for any virus that may be transferred by way of this email.
>
>
>
> Email may be susceptible to data corruption, interception and/or
> unauthorised amendment. Advanced 365 Limited does not accept liability
> for any such corruption, interception or amendment or any consequences thereof.
>
>
>
> This email has been scanned for viruses by the Symantec Email
> Security.cloud service.
>
>
>
> Advanced 365 Limited, part of the Advanced Computer Software Group
>
> Registered office: Ditton Park, Riding Court Road, Datchet, Berkshire,
> SL3 9LL, UK
>
> Registered in England under number 2124540
>
***** Email confidentiality *****

This message is private and confidential. If you have received this message in error, please
notify us and remove it from your system. The dissemination, copying or distribution of this
message, or related files, by anyone other than the intended recipient is strictly prohibited.



Any views or opinions expressed are solely those of the author and do not necessarily represent
those of Advanced 365 Limited.



***** Email monitoring *****

Advanced 365 Limited may monitor email traffic data and also the content of email for the
purposes of security and staff training.



***** Email security *****

In keeping with good computing practice, the recipient of this email should ensure that it
is virus-free. Advanced 365 Limited does not accept responsibility for any virus that may
be transferred by way of this email.



Email may be susceptible to data corruption, interception and/or unauthorised amendment. Advanced
365 Limited does not accept liability for any such corruption, interception or amendment or
any consequences thereof.



This email has been scanned for viruses by the Symantec Email Security.cloud service.



Advanced 365 Limited, part of the Advanced Computer Software Group

Registered office: Ditton Park, Riding Court Road, Datchet, Berkshire, SL3 9LL, UK

Registered in England under number 2124540
***** Email confidentiality *****

This message is private and confidential. If you have received this message in error, please
notify us and remove it from your system. The dissemination, copying or distribution of this
message, or related files, by anyone other than the intended recipient is strictly prohibited.



Any views or opinions expressed are solely those of the author and do not necessarily represent
those of Advanced 365 Limited.



***** Email monitoring *****

Advanced 365 Limited may monitor email traffic data and also the content of email for the
purposes of security and staff training.



***** Email security *****

In keeping with good computing practice, the recipient of this email should ensure that it
is virus-free. Advanced 365 Limited does not accept responsibility for any virus that may
be transferred by way of this email.



Email may be susceptible to data corruption, interception and/or unauthorised amendment. Advanced
365 Limited does not accept liability for any such corruption, interception or amendment or
any consequences thereof.



This email has been scanned for viruses by the Symantec Email Security.cloud service.



Advanced 365 Limited, part of the Advanced Computer Software Group

Registered office: Ditton Park, Riding Court Road, Datchet, Berkshire, SL3 9LL, UK

Registered in England under number 2124540
Mime
View raw message