activemq-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Chaomei Lo <chaome...@gmail.com>
Subject Re: The consumer thread closed right after producer thread finishes sending messages.
Date Wed, 30 Sep 2015 00:16:28 GMT
Thanks. Tim. sorry to confuse you, I am doing C++ and the Java broker did
not exit when my c++ application exited.  Yes, I am using one processor
with two threads involved.  I appreciate your suggestions, now I understand
how that CountDownLatch works, that made me realize I modified the
HellowWorld example code wrong.  Now I fixed it then I modified one line of
the example code from doneLatch.await(Millis)
to
doneLatch.await(), and have the user to specify the number of messages and
pass it to consumer for the doneLatch, and it works fine.
But to have a better solution, if I do not want user to input the number of
messages, I still have work to do; I would go with your second suggestion.
First I would create a CountDownLatch object with 2 counts in main() and
then pass this instance cLatch to both consumer and producer.

CountDownLatch cLatch(2);
cLatch.await();

then invoke
cLatch.countDown() in producer code after producer finishes
cLatch.countDown() in consumer code after consumer finishes

But I am wondering how do I know when the consumer would finish.  the
onMessage() will be invoked every time when the message comes in. Without
knowing the number of messages beforehand, do I know when it will be
finished ?  Did I think wrong on what you have suggested ?

Thanks.

On Mon, Sep 28, 2015 at 9:46 PM, Tim Bain <tbain@alumni.duke.edu> wrote:

> Now I'm confused.  Are you doing C++ or Java?  You referenced both .cpp
> files and the JVM, and I don't know how those two go together.  Unless you
> mean that the broker's JVM is exiting when being accessed by C++ clients?
> Also, I assumed (since you haven't clearly said one way or the other) that
> your consumer and your producer are different threads in the same process;
> if that's not true; disregard all of what I've written below, and my
> earlier suggestion.
>
> I think you could do one (and only one) of the following options to prevent
> the producer thread from exiting before the consumer thread (and there are
> others ways that will work, don't feel constrained by them if you have
> another one):
>
>    1. Have the producer thread call consumerThread.join() (which means you
>    have to pass a reference from one thread to the other), which forces the
>    consumer thread to exit before the producer thread is allowed to.
>    2. Have a CountDownLatch initialized to 2, where both threads call
>    countDown() after they've done all of their work, which guarantees that
> the
>    consumer's work is all done before the producer thread exits, even if
> it's
>    still possible for the producer thread to actually exit first (who
> cares).
>    3. Have an AtomicBoolean called something like consumerDone initialized
>    to false and only set to true when the consumer thread has done all of
> its
>    work, and have the producer thread execute the following loop after all
> of
>    its work is done:  while (!consumerDone.get()) {
>    Thread.sleep(someDuration);}
>    4. Execute consumerThread's work in a FutureTask, and call get() on the
>    Future interface it implements to wait until the task is done.
>
> There's no point doing more than one of those approaches; it's duplicative
> and unnecessarily complicates your code.  I'd avoid #1 because creating
> your own threads is generally frowned upon these days; in most cases you
> should be using Executors to run threaded work because it's easier to scale
> them and to handle problems, so I wouldn't recommend choosing an option
> that prevents you from doing that and will force you to rewrite that code
> if you ever wanted to.
>
> Tim
>
> On Sep 28, 2015 8:51 PM, "Chaomei Lo" <chaomeilo@gmail.com> wrote:
>
> > I did have these lines in main.
> >   producerThread.join();
> >   consumerThread.join();
> >
> > and in run() in Consumer.cpp, I have
> >
> >    latch.countDown();
> >
> > and in onMessage() in Consumer.cpp, I have
> >   doneLatch.countDown();
> >
> > Am I doing right ? But I am having a problem to understand what they are
> > for.  I made a mistake on my code, so after fixing, I got about 45000
> > message, and then JVM exited out.  The activemq.log is in below. Can you
> > see anything wrong ? Thank you.
> > --------------------------------------------------------
> > 2015-09-28 19:09:01,412 | INFO  |
> > queue://Consumer.B.VirtualTopic.TestDestination purged of 0 messages |
> > org.apache.activemq.broker.region.Queue | RMI TCP
> > Connection(8)-130.20.132.42
> > 2015-09-28 19:09:01,414 | INFO  | queue://testBasics1 purged of 0
> messages
> > | org.apache.activemq.broker.region.Queue | RMI TCP
> > Connection(8)-130.20.132.42
> > 2015-09-28 19:09:04,649 | INFO  | queue://TEST.FOO purged of 47217
> messages
> > | org.apache.activemq.broker.region.Queue | RMI TCP
> > Connection(8)-130.20.132.42
> > 2015-09-28 19:09:04,651 | INFO  |
> > queue://testSessionCommitAfterConsumerClosed purged of 0 messages |
> > org.apache.activemq.broker.region.Queue | RMI TCP
> > Connection(8)-130.20.132.42
> > 2015-09-28 19:09:04,653 | INFO  | queue://testSend1 purged of 0 messages
> |
> > org.apache.activemq.broker.region.Queue | RMI TCP
> > Connection(8)-130.20.132.42
> > 2015-09-28 19:09:04,655 | INFO  |
> > queue://Consumer.A.VirtualTopic.TestDestination purged of 0 messages |
> > org.apache.activemq.broker.region.Queue | RMI TCP
> > Connection(8)-130.20.132.42
> > 2015-09-28 19:09:04,656 | INFO  | queue://testReceive1 purged of 0
> messages
> > | org.apache.activemq.broker.region.Queue | RMI TCP
> > Connection(8)-130.20.132.42
> > 2015-09-28 19:09:04,660 | INFO  | queue://Queue-1438296202694 purged of 0
> > messages | org.apache.activemq.broker.region.Queue | RMI TCP
> > Connection(8)-130.20.132.42
> > 2015-09-28 19:09:04,663 | INFO  | queue://CmsSendWithAsyncCallbackTest
> > purged of 0 messages | org.apache.activemq.broker.region.Queue | RMI TCP
> > Connection(8)-130.20.132.42
> > 2015-09-28 19:09:04,665 | INFO  | queue://Queue-1438296199638 purged of 0
> > messages | org.apache.activemq.broker.region.Queue | RMI TCP
> > Connection(8)-130.20.132.42
> > 2015-09-28 19:09:04,667 | INFO  | queue://ActiveMQ.DLQ purged of 0
> messages
> > | org.apache.activemq.broker.region.Queue | RMI TCP
> > Connection(8)-130.20.132.42
> > 2015-09-28 19:09:04,669 | INFO  | queue://Queue-1438296198618 purged of 0
> > messages | org.apache.activemq.broker.region.Queue | RMI TCP
> > Connection(8)-130.20.132.42
> >
> > On Mon, Sep 28, 2015 at 7:05 PM, Tim Bain <tbain@alumni.duke.edu> wrote:
> >
> > > Is the producer your last non-daemon thread?  If so, the JVM will exit
> > when
> > > that thread exits, but you could use a synchronization construct such
> as
> > a
> > > countdown latch or a call to Thread.join() to make the producer thread
> > not
> > > exit till the consumer does.
> > > On Sep 28, 2015 7:21 PM, "mfan" <chaomeilo@gmail.com> wrote:
> > >
> > > > I am trying to write an application that a producer (with PERSISTENT
> > > > delivery
> > > > mode) sends 80000 messages to asynchronous a consumer. Both producer
> > and
> > > > consumer are in acknowledgement mode (AUTO_ACKNOWLEDGE).  After the
> > > > consumer
> > > > received a message, it will do some work.  Somehow the producer
> > producing
> > > > message faster than consumer did the work, and right after the
> producer
> > > > finished sending it's last message, both consumer called destructor
> > > > automatically.  How to let the consumer continuously getting the
> > message
> > > > and
> > > > do the computation ?  I search online and found maybe due to the
> > > > memoryLimit. So I check the activemq.xml file in
> > > > '/home/apache-activemq-5.11.1/data/' directory, I copied partial
> > related
> > > to
> > > > memory paragraph in below which I do not quite understand, I need
> help
> > to
> > > > make sure the memory was not the issue. If that is true, what else
> > > problem
> > > > could be ? Thank you for helping.
> > > >
> > > >
> > > > Store limit is 102400 mb (current store usage is 72 mb). The data
> > > > directory:
> > > > /home/apache-activemq-5.11.1/data/kahadb only has 12918 mb of usable
> > > space
> > > > -
> > > > resetting to maximum available disk space: 12990 mb |
> > > > org.apache.activemq.broker.BrokerService | main
> > > > 2015-09-28 17:25:08,460 | WARN  | Temporary Store limit is 51200 mb,
> > > whilst
> > > > the temporary data directory:
> > > > /home/apache-activemq-5.11.1/data/localhost/tmp_storage only has
> 12918
> > mb
> > > > of
> > > > usable space - resetting to maximum available 12918 mb. |
> > > > org.apache.activemq.broker.BrokerService | main
> > > >
> > > >
> > > >
> > > > --
> > > > View this message in context:
> > > >
> > >
> >
> http://activemq.2283324.n4.nabble.com/The-consumer-thread-closed-right-after-producer-thread-finishes-sending-messages-tp4702393.html
> > > > Sent from the ActiveMQ - User mailing list archive at Nabble.com.
> > > >
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message