activemq-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Tim Bain <tb...@alumni.duke.edu>
Subject Re: The consumer thread closed right after producer thread finishes sending messages.
Date Wed, 30 Sep 2015 14:36:58 GMT
You could say that if you haven't received a message and it's been longer
than some time interval (1 second? 5 seconds? 10 minutes? whatever you
want) since you finished processing the last message, then you assume that
there's nothing more to process.  You could do that in a few different
ways, but one would be to start a timer at the end of your onMessage()
method that will call cLatch.countDown() when the timer fires, and cancel
the timer (if one exists) at the beginning of onMessage().

An alternative solution would be to have an atomic/threadsafe integer
containing the number of outstanding messages and an atomic/threadsafe
boolean containing a flag to say that the producer has sent all of the
messages it's going to send.  Then when allMessagesSent is true and the
consumer has received as many messages as the producer says it sent, you
know the consumer is done and you can count down the latch

Also, you probably could initialize the countdown latch to 1 instead of 2
and have the producer not call countDown() and only call await(); my
initial suggestion works but is more complicated than it needs to be...

Tim

On Tue, Sep 29, 2015 at 6:16 PM, Chaomei Lo <chaomeilo@gmail.com> wrote:

> Thanks. Tim. sorry to confuse you, I am doing C++ and the Java broker did
> not exit when my c++ application exited.  Yes, I am using one processor
> with two threads involved.  I appreciate your suggestions, now I understand
> how that CountDownLatch works, that made me realize I modified the
> HellowWorld example code wrong.  Now I fixed it then I modified one line of
> the example code from doneLatch.await(Millis)
> to
> doneLatch.await(), and have the user to specify the number of messages and
> pass it to consumer for the doneLatch, and it works fine.
> But to have a better solution, if I do not want user to input the number of
> messages, I still have work to do; I would go with your second suggestion.
> First I would create a CountDownLatch object with 2 counts in main() and
> then pass this instance cLatch to both consumer and producer.
>
> CountDownLatch cLatch(2);
> cLatch.await();
>
> then invoke
> cLatch.countDown() in producer code after producer finishes
> cLatch.countDown() in consumer code after consumer finishes
>
> But I am wondering how do I know when the consumer would finish.  the
> onMessage() will be invoked every time when the message comes in. Without
> knowing the number of messages beforehand, do I know when it will be
> finished ?  Did I think wrong on what you have suggested ?
>
> Thanks.
>
> On Mon, Sep 28, 2015 at 9:46 PM, Tim Bain <tbain@alumni.duke.edu> wrote:
>
> > Now I'm confused.  Are you doing C++ or Java?  You referenced both .cpp
> > files and the JVM, and I don't know how those two go together.  Unless
> you
> > mean that the broker's JVM is exiting when being accessed by C++ clients?
> > Also, I assumed (since you haven't clearly said one way or the other)
> that
> > your consumer and your producer are different threads in the same
> process;
> > if that's not true; disregard all of what I've written below, and my
> > earlier suggestion.
> >
> > I think you could do one (and only one) of the following options to
> prevent
> > the producer thread from exiting before the consumer thread (and there
> are
> > others ways that will work, don't feel constrained by them if you have
> > another one):
> >
> >    1. Have the producer thread call consumerThread.join() (which means
> you
> >    have to pass a reference from one thread to the other), which forces
> the
> >    consumer thread to exit before the producer thread is allowed to.
> >    2. Have a CountDownLatch initialized to 2, where both threads call
> >    countDown() after they've done all of their work, which guarantees
> that
> > the
> >    consumer's work is all done before the producer thread exits, even if
> > it's
> >    still possible for the producer thread to actually exit first (who
> > cares).
> >    3. Have an AtomicBoolean called something like consumerDone
> initialized
> >    to false and only set to true when the consumer thread has done all of
> > its
> >    work, and have the producer thread execute the following loop after
> all
> > of
> >    its work is done:  while (!consumerDone.get()) {
> >    Thread.sleep(someDuration);}
> >    4. Execute consumerThread's work in a FutureTask, and call get() on
> the
> >    Future interface it implements to wait until the task is done.
> >
> > There's no point doing more than one of those approaches; it's
> duplicative
> > and unnecessarily complicates your code.  I'd avoid #1 because creating
> > your own threads is generally frowned upon these days; in most cases you
> > should be using Executors to run threaded work because it's easier to
> scale
> > them and to handle problems, so I wouldn't recommend choosing an option
> > that prevents you from doing that and will force you to rewrite that code
> > if you ever wanted to.
> >
> > Tim
> >
> > On Sep 28, 2015 8:51 PM, "Chaomei Lo" <chaomeilo@gmail.com> wrote:
> >
> > > I did have these lines in main.
> > >   producerThread.join();
> > >   consumerThread.join();
> > >
> > > and in run() in Consumer.cpp, I have
> > >
> > >    latch.countDown();
> > >
> > > and in onMessage() in Consumer.cpp, I have
> > >   doneLatch.countDown();
> > >
> > > Am I doing right ? But I am having a problem to understand what they
> are
> > > for.  I made a mistake on my code, so after fixing, I got about 45000
> > > message, and then JVM exited out.  The activemq.log is in below. Can
> you
> > > see anything wrong ? Thank you.
> > > --------------------------------------------------------
> > > 2015-09-28 19:09:01,412 | INFO  |
> > > queue://Consumer.B.VirtualTopic.TestDestination purged of 0 messages |
> > > org.apache.activemq.broker.region.Queue | RMI TCP
> > > Connection(8)-130.20.132.42
> > > 2015-09-28 19:09:01,414 | INFO  | queue://testBasics1 purged of 0
> > messages
> > > | org.apache.activemq.broker.region.Queue | RMI TCP
> > > Connection(8)-130.20.132.42
> > > 2015-09-28 19:09:04,649 | INFO  | queue://TEST.FOO purged of 47217
> > messages
> > > | org.apache.activemq.broker.region.Queue | RMI TCP
> > > Connection(8)-130.20.132.42
> > > 2015-09-28 19:09:04,651 | INFO  |
> > > queue://testSessionCommitAfterConsumerClosed purged of 0 messages |
> > > org.apache.activemq.broker.region.Queue | RMI TCP
> > > Connection(8)-130.20.132.42
> > > 2015-09-28 19:09:04,653 | INFO  | queue://testSend1 purged of 0
> messages
> > |
> > > org.apache.activemq.broker.region.Queue | RMI TCP
> > > Connection(8)-130.20.132.42
> > > 2015-09-28 19:09:04,655 | INFO  |
> > > queue://Consumer.A.VirtualTopic.TestDestination purged of 0 messages |
> > > org.apache.activemq.broker.region.Queue | RMI TCP
> > > Connection(8)-130.20.132.42
> > > 2015-09-28 19:09:04,656 | INFO  | queue://testReceive1 purged of 0
> > messages
> > > | org.apache.activemq.broker.region.Queue | RMI TCP
> > > Connection(8)-130.20.132.42
> > > 2015-09-28 19:09:04,660 | INFO  | queue://Queue-1438296202694 purged
> of 0
> > > messages | org.apache.activemq.broker.region.Queue | RMI TCP
> > > Connection(8)-130.20.132.42
> > > 2015-09-28 19:09:04,663 | INFO  | queue://CmsSendWithAsyncCallbackTest
> > > purged of 0 messages | org.apache.activemq.broker.region.Queue | RMI
> TCP
> > > Connection(8)-130.20.132.42
> > > 2015-09-28 19:09:04,665 | INFO  | queue://Queue-1438296199638 purged
> of 0
> > > messages | org.apache.activemq.broker.region.Queue | RMI TCP
> > > Connection(8)-130.20.132.42
> > > 2015-09-28 19:09:04,667 | INFO  | queue://ActiveMQ.DLQ purged of 0
> > messages
> > > | org.apache.activemq.broker.region.Queue | RMI TCP
> > > Connection(8)-130.20.132.42
> > > 2015-09-28 19:09:04,669 | INFO  | queue://Queue-1438296198618 purged
> of 0
> > > messages | org.apache.activemq.broker.region.Queue | RMI TCP
> > > Connection(8)-130.20.132.42
> > >
> > > On Mon, Sep 28, 2015 at 7:05 PM, Tim Bain <tbain@alumni.duke.edu>
> wrote:
> > >
> > > > Is the producer your last non-daemon thread?  If so, the JVM will
> exit
> > > when
> > > > that thread exits, but you could use a synchronization construct such
> > as
> > > a
> > > > countdown latch or a call to Thread.join() to make the producer
> thread
> > > not
> > > > exit till the consumer does.
> > > > On Sep 28, 2015 7:21 PM, "mfan" <chaomeilo@gmail.com> wrote:
> > > >
> > > > > I am trying to write an application that a producer (with
> PERSISTENT
> > > > > delivery
> > > > > mode) sends 80000 messages to asynchronous a consumer. Both
> producer
> > > and
> > > > > consumer are in acknowledgement mode (AUTO_ACKNOWLEDGE).  After the
> > > > > consumer
> > > > > received a message, it will do some work.  Somehow the producer
> > > producing
> > > > > message faster than consumer did the work, and right after the
> > producer
> > > > > finished sending it's last message, both consumer called destructor
> > > > > automatically.  How to let the consumer continuously getting the
> > > message
> > > > > and
> > > > > do the computation ?  I search online and found maybe due to the
> > > > > memoryLimit. So I check the activemq.xml file in
> > > > > '/home/apache-activemq-5.11.1/data/' directory, I copied partial
> > > related
> > > > to
> > > > > memory paragraph in below which I do not quite understand, I need
> > help
> > > to
> > > > > make sure the memory was not the issue. If that is true, what else
> > > > problem
> > > > > could be ? Thank you for helping.
> > > > >
> > > > >
> > > > > Store limit is 102400 mb (current store usage is 72 mb). The data
> > > > > directory:
> > > > > /home/apache-activemq-5.11.1/data/kahadb only has 12918 mb of
> usable
> > > > space
> > > > > -
> > > > > resetting to maximum available disk space: 12990 mb |
> > > > > org.apache.activemq.broker.BrokerService | main
> > > > > 2015-09-28 17:25:08,460 | WARN  | Temporary Store limit is 51200
> mb,
> > > > whilst
> > > > > the temporary data directory:
> > > > > /home/apache-activemq-5.11.1/data/localhost/tmp_storage only has
> > 12918
> > > mb
> > > > > of
> > > > > usable space - resetting to maximum available 12918 mb. |
> > > > > org.apache.activemq.broker.BrokerService | main
> > > > >
> > > > >
> > > > >
> > > > > --
> > > > > View this message in context:
> > > > >
> > > >
> > >
> >
> http://activemq.2283324.n4.nabble.com/The-consumer-thread-closed-right-after-producer-thread-finishes-sending-messages-tp4702393.html
> > > > > Sent from the ActiveMQ - User mailing list archive at Nabble.com.
> > > > >
> > > >
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message