If you post a thread dump when you are blocked on the stop call, from
something like kill -3 or jstack we may be able to spot the problem
On 2 June 2011 19:18, ravimbhatt <ravi@qubitdigital.com> wrote:
> Hi All,
>
> I am facing a very strange problem with ActiveMQ.
>
> I have a java class that attaches itself to a queue. This class holds a
> threadExecutor and once it gets a message, it uses the threadexecutor to run
> a task with just arrived message.
>
> It keeps track of how many threads are currently performing tasks, once it
> reaches a MAX, say 4, it stops the connection so that it does not get more
> messages from the queue. As and when any one of the tasks complete it starts
> connection again.
>
> This setup works for few messages and then suddenly my class waits forever
> in connection's stop method. it never returns!!
>
> Here is my onMessage method:
>
> public void onMessage(Message objectMessage) {
> if (objectMessage instanceof ObjectMessage) {
> try {
> Serializable serializableObject = ((ObjectMessage) objectMessage)
> .getObject();
>
> // we look for MyRequest object, we ignore
> // everything else that was sent by mistake!
> // TODO: check if its possible to reject a message delivery. reject
> the
> // ones we are not looking for.
> if (serializableObject instanceof MyRequest) {
> Log.getLogger().log(Level.INFO,
> Thread.currentThread().getName() + " received an object");
> MyRequest myRequest = (MyRequest) serializableObject;
> MyWorker myWorker = new MyWorker(
> myRequest);
> // make this object an observer of this categorisor
> MyWorker.addObserver(this);
> threadPoolExecutor.execute(myWorker);
> // increment running thread count, its inside synchronized as some
> // other thread may call update and try to decrement the count at
> the
> // same time when this thread is incrementing it.
> Log.getLogger().log(
> Level.INFO,
> Thread.currentThread().getName()
> + " Entering synchronised block...");
> synchronized (this) {
> runningThreads++;
>
> Log.getLogger().log(
> Level.INFO,
> Thread.currentThread().getName() + " Running: "
> + runningThreads + " Max:" + MAX_THREADS);
>
> // now check if we are running max allowed thread for this
> machine.
> if (runningThreads == MAX_THREADS) {
> Log.getLogger()
> .log(
> Level.INFO,
> Thread.currentThread().getName()
> + " Reached max threads... stoping feedback
> message consumer!!");
> // stop receiving more messages.
> stopConsumption();
>
> }
> }
>
> Log.getLogger().log(
> Level.INFO,
> Thread.currentThread().getName()
> + " out of synchronised block....");
>
> }
> } catch (JMSException e) {
> e.printStackTrace();
> }
> }
> }
>
>
> Please note that this class adds itself as a listner to MyWorker object.
> // make this object an observer of this categorisor
> MyWorker.addObserver(this);
>
> MyWorker once complete, updates this class as below:
> public void update(Observable observableSource,
> MyStatus status) {
> // TODO: work with status object later.
> if (observableSource instanceof MyWorker) {
> MyWorker myWorker = (MyWorker) observableSource;
> // notify observers about categorization being over.
> notifyObservers((MyResult) myWorker.getResult());
>
> Log.getLogger().log(Level.INFO,
> Thread.currentThread().getName() + ": Notified observers...");
>
> synchronized (this) {
> Log.getLogger()
> .log(
> Level.INFO,
> Thread.currentThread().getName()
> + ": One of the threads comepleted... starting message
> consumer if max was reached!!");
> runningThreads--;
> Log.getLogger().log(
> Level.INFO,
> Thread.currentThread().getName() + " Running: " + runningThreads
> + " Max:" + MAX_THREADS);
> // start consuming only when we were at max. need not call
> // startConsumption on each thread completion.
> if (runningThreads == MAX_THREADS - 1) {
> startConsumption();
> }
> }
> }
> }
>
> Since there are multiple threads running and they all can call update
> together, i have used synchronized block to decrement runningThreads and
> call startConsumption() method.
>
> It works for random N messages and then it stops at the call to
> stopConsumption();
>
> Here is my start and stop consumption methods.
>
> /**
> * stops listing to the queue
> */
> private void stopConsumption() {
> try {
> conn.stop();
> } catch (JMSException e) {
> e.printStackTrace();
> }
> }
>
> /**
> * start listing to the queue again.
> */
> private void startConsumption() {
> try {
> conn.start();
> } catch (JMSException e) {
> e.printStackTrace();
> }
> }
>
> Looking at jconsole threads, i can see that it waits in the call to
> conn.start() and keeps on waiting forever.
>
> Here is how i connect to the queue in one my run() method:
>
> @Override
> public void run() {
> // create a activeMQ Queue consumer and register this
> // object as a JMS message listner.
> // try connecting to ActiveMQ broker
> try {
> javax.naming.Context ctx = ActiveMQInitialContext.getInstance();
> // lookup the connection factory
> javax.jms.QueueConnectionFactory factory =
> (javax.jms.QueueConnectionFactory) ctx
>
> .lookup(PropertyManager.getProperty("JMS_CONNECTION_FACTORY_NAME"));
>
> conn = factory.createQueueConnection();
> conn.start();
> // lookup an existing queue
> javax.jms.Queue myqueue = (Queue) ctx.lookup(PropertyManager
> .getProperty("JMS_REQUEST_QUEUE_NAME"));
> // create a new queueSession for the client
> QueueSession session = conn.createQueueSession(false,
> QueueSession.AUTO_ACKNOWLEDGE);
>
> // create a new subscriber to receive messages
> MessageConsumer subscriber = session.createConsumer(myqueue);
> subscriber.setMessageListener(this);
>
> Log.getLogger().log(
> Level.INFO,
> Thread.currentThread().getName() + ": Attached to: "
> + PropertyManager.getProperty("JMS_REQUEST_QUEUE_NAME"));
>
> } catch (NamingException e) {
> e.printStackTrace();
> } catch (JMSException e) {
> e.printStackTrace();
> }
> }
>
> As you can see i am using Auto ACK. I have tested this on activemq 5.4.2 and
> 5.5.0 and it behaves the same on both versions. I am using java 6.
>
>
> --
> View this message in context: http://activemq.2283324.n4.nabble.com/QueueConnection-Stop-Waits-forever-Stops-Message-Delivery-to-consumer-tp3569019p3569019.html
> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>
--
http://fusesource.com
http://blog.garytully.com
|