activemq-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Tim Bain <tb...@alumni.duke.edu>
Subject Re: The consumer thread closed right after producer thread finishes sending messages.
Date Tue, 29 Sep 2015 04:46:38 GMT
Now I'm confused.  Are you doing C++ or Java?  You referenced both .cpp
files and the JVM, and I don't know how those two go together.  Unless you
mean that the broker's JVM is exiting when being accessed by C++ clients?
Also, I assumed (since you haven't clearly said one way or the other) that
your consumer and your producer are different threads in the same process;
if that's not true; disregard all of what I've written below, and my
earlier suggestion.

I think you could do one (and only one) of the following options to prevent
the producer thread from exiting before the consumer thread (and there are
others ways that will work, don't feel constrained by them if you have
another one):

   1. Have the producer thread call consumerThread.join() (which means you
   have to pass a reference from one thread to the other), which forces the
   consumer thread to exit before the producer thread is allowed to.
   2. Have a CountDownLatch initialized to 2, where both threads call
   countDown() after they've done all of their work, which guarantees that the
   consumer's work is all done before the producer thread exits, even if it's
   still possible for the producer thread to actually exit first (who cares).
   3. Have an AtomicBoolean called something like consumerDone initialized
   to false and only set to true when the consumer thread has done all of its
   work, and have the producer thread execute the following loop after all of
   its work is done:  while (!consumerDone.get()) {
   Thread.sleep(someDuration);}
   4. Execute consumerThread's work in a FutureTask, and call get() on the
   Future interface it implements to wait until the task is done.

There's no point doing more than one of those approaches; it's duplicative
and unnecessarily complicates your code.  I'd avoid #1 because creating
your own threads is generally frowned upon these days; in most cases you
should be using Executors to run threaded work because it's easier to scale
them and to handle problems, so I wouldn't recommend choosing an option
that prevents you from doing that and will force you to rewrite that code
if you ever wanted to.

Tim

On Sep 28, 2015 8:51 PM, "Chaomei Lo" <chaomeilo@gmail.com> wrote:

> I did have these lines in main.
>   producerThread.join();
>   consumerThread.join();
>
> and in run() in Consumer.cpp, I have
>
>    latch.countDown();
>
> and in onMessage() in Consumer.cpp, I have
>   doneLatch.countDown();
>
> Am I doing right ? But I am having a problem to understand what they are
> for.  I made a mistake on my code, so after fixing, I got about 45000
> message, and then JVM exited out.  The activemq.log is in below. Can you
> see anything wrong ? Thank you.
> --------------------------------------------------------
> 2015-09-28 19:09:01,412 | INFO  |
> queue://Consumer.B.VirtualTopic.TestDestination purged of 0 messages |
> org.apache.activemq.broker.region.Queue | RMI TCP
> Connection(8)-130.20.132.42
> 2015-09-28 19:09:01,414 | INFO  | queue://testBasics1 purged of 0 messages
> | org.apache.activemq.broker.region.Queue | RMI TCP
> Connection(8)-130.20.132.42
> 2015-09-28 19:09:04,649 | INFO  | queue://TEST.FOO purged of 47217 messages
> | org.apache.activemq.broker.region.Queue | RMI TCP
> Connection(8)-130.20.132.42
> 2015-09-28 19:09:04,651 | INFO  |
> queue://testSessionCommitAfterConsumerClosed purged of 0 messages |
> org.apache.activemq.broker.region.Queue | RMI TCP
> Connection(8)-130.20.132.42
> 2015-09-28 19:09:04,653 | INFO  | queue://testSend1 purged of 0 messages |
> org.apache.activemq.broker.region.Queue | RMI TCP
> Connection(8)-130.20.132.42
> 2015-09-28 19:09:04,655 | INFO  |
> queue://Consumer.A.VirtualTopic.TestDestination purged of 0 messages |
> org.apache.activemq.broker.region.Queue | RMI TCP
> Connection(8)-130.20.132.42
> 2015-09-28 19:09:04,656 | INFO  | queue://testReceive1 purged of 0 messages
> | org.apache.activemq.broker.region.Queue | RMI TCP
> Connection(8)-130.20.132.42
> 2015-09-28 19:09:04,660 | INFO  | queue://Queue-1438296202694 purged of 0
> messages | org.apache.activemq.broker.region.Queue | RMI TCP
> Connection(8)-130.20.132.42
> 2015-09-28 19:09:04,663 | INFO  | queue://CmsSendWithAsyncCallbackTest
> purged of 0 messages | org.apache.activemq.broker.region.Queue | RMI TCP
> Connection(8)-130.20.132.42
> 2015-09-28 19:09:04,665 | INFO  | queue://Queue-1438296199638 purged of 0
> messages | org.apache.activemq.broker.region.Queue | RMI TCP
> Connection(8)-130.20.132.42
> 2015-09-28 19:09:04,667 | INFO  | queue://ActiveMQ.DLQ purged of 0 messages
> | org.apache.activemq.broker.region.Queue | RMI TCP
> Connection(8)-130.20.132.42
> 2015-09-28 19:09:04,669 | INFO  | queue://Queue-1438296198618 purged of 0
> messages | org.apache.activemq.broker.region.Queue | RMI TCP
> Connection(8)-130.20.132.42
>
> On Mon, Sep 28, 2015 at 7:05 PM, Tim Bain <tbain@alumni.duke.edu> wrote:
>
> > Is the producer your last non-daemon thread?  If so, the JVM will exit
> when
> > that thread exits, but you could use a synchronization construct such as
> a
> > countdown latch or a call to Thread.join() to make the producer thread
> not
> > exit till the consumer does.
> > On Sep 28, 2015 7:21 PM, "mfan" <chaomeilo@gmail.com> wrote:
> >
> > > I am trying to write an application that a producer (with PERSISTENT
> > > delivery
> > > mode) sends 80000 messages to asynchronous a consumer. Both producer
> and
> > > consumer are in acknowledgement mode (AUTO_ACKNOWLEDGE).  After the
> > > consumer
> > > received a message, it will do some work.  Somehow the producer
> producing
> > > message faster than consumer did the work, and right after the producer
> > > finished sending it's last message, both consumer called destructor
> > > automatically.  How to let the consumer continuously getting the
> message
> > > and
> > > do the computation ?  I search online and found maybe due to the
> > > memoryLimit. So I check the activemq.xml file in
> > > '/home/apache-activemq-5.11.1/data/' directory, I copied partial
> related
> > to
> > > memory paragraph in below which I do not quite understand, I need help
> to
> > > make sure the memory was not the issue. If that is true, what else
> > problem
> > > could be ? Thank you for helping.
> > >
> > >
> > > Store limit is 102400 mb (current store usage is 72 mb). The data
> > > directory:
> > > /home/apache-activemq-5.11.1/data/kahadb only has 12918 mb of usable
> > space
> > > -
> > > resetting to maximum available disk space: 12990 mb |
> > > org.apache.activemq.broker.BrokerService | main
> > > 2015-09-28 17:25:08,460 | WARN  | Temporary Store limit is 51200 mb,
> > whilst
> > > the temporary data directory:
> > > /home/apache-activemq-5.11.1/data/localhost/tmp_storage only has 12918
> mb
> > > of
> > > usable space - resetting to maximum available 12918 mb. |
> > > org.apache.activemq.broker.BrokerService | main
> > >
> > >
> > >
> > > --
> > > View this message in context:
> > >
> >
> http://activemq.2283324.n4.nabble.com/The-consumer-thread-closed-right-after-producer-thread-finishes-sending-messages-tp4702393.html
> > > Sent from the ActiveMQ - User mailing list archive at Nabble.com.
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message