qpid-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Milano Nicolum <dracz...@gmail.com>
Subject Re: amqp.drainTimeout vs. jms.closeTimeout vs. jms.requestTimeout
Date Wed, 24 May 2017 12:01:55 GMT
Unfortunately it seems that simply setting either of
jms.receiveLocalOnly=true or jms.receiveNoWaitLocalOnly=true parameters
does not help. I Tried adding those parameters to connector URL, I tried
setting the matching options on my JmsConnectionFactory, but the
JmsListenerEndpointRegistry still can't be closed in time. Application is
still stuck for 30 seconds before the bean shutdown fails and real closing
process begins.

Then during DefaultMessageListenerContainer shutdown following exception is
caught:

52:59.647 [DefaultMessageListenerContainer-1] WARN
o.s.j.l.DefaultMessageListenerContainer - Setup of JMS message listener
invoker failed for destination 'messaging-lib-test-queue' - trying to
recover. Cause: Remote did not respond to a drain request in time
org.apache.qpid.jms.JmsOperationTimedOutException: Remote did not respond
to a drain request in time
    at
org.apache.qpid.jms.provider.amqp.AmqpConsumer$1.run(AmqpConsumer.java:140)
    at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
    at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
    at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

Shouldn't there be no such problem if I'm using the
jms.receiveLocalOnly=true option?

If you want me to provide any logs, code or dumps, just let me know how I
can help.


2017-05-24 12:14 GMT+02:00 Robbie Gemmell <robbie.gemmell@gmail.com>:

> The consumers credit 'drain' attempt is failing to complete because
> Azure Service Bus because doesnt implement the 'drain' functionality.
> Instead of reducing the drainTimeout, you will instead need to toggle
> the client to only receive out of its local buffers when working with
> Service Bus, using jms.receiveLocalOnly=true in your URI (and
> jms.receiveNoWaitLocalOnly=true if needed).
>
> The timeouts aren't layered and so wouldnt really apply as you are
> suggesting, however in this case I dont think it makes a difference.
> I'm guessing (hard to tell for sure from the limited info) that Spring
> doesnt actiually get as far as trying to close things due to waiting
> on the receive, hence the drain attempt wasnt aborted by closure
> because that never actually began until after. In either case, as
> mentioned above you actually need to prevent the drain attempt instead
> because it will fail.
>
> On 23 May 2017 at 15:55, Milano Nicolum <draczech@gmail.com> wrote:
> > Hi I have a question about function of different timeouts in Qpid JMS
> > client and priority they are applied in.
> >
> > Currently I'm running Spring test using qpid-jms-client 0.23.0 (in
> > cooperation with spring-jms 4.3.8.RELEASE) to connect to MS Azure Service
> > Bus Queue, send a few messages there and confirm the same messages are
> > received by @JmsListener implementation from the Queue.
> >
> > Message send + receive works fine, but when Spring tries to shut down
> > components at the end, the test gets stuck. The
> > org.springframework.jms.config.internalJmsListenerEndpointRegistry
> cannot
> > be shutdown in time because it is stuck waiting for a message.
> >
> > I went through stacktrace and code and I got the impression the shutdown
> > getting stuck is caused by AmqpConsumer.pull(final long timeout, final
> > AsyncResult request) taking too long to finish.
> >
> > My jms.closeTimeout is set to 1500ms, jms.requestTimeout to 500ms and
> > neither is applied on context shutdown. But when I tried setting
> > amqp.drainTimeout to 5000ms, the internal consumer times out on last
> drain
> > request, connection gets closed in time and all beans are destroyed OK.
> >
> > As this is more of a workaround than a solution I wonder if there is a
> way
> > to shutdown the test correctly and more gracefully if possible. Is there
> > any way of interrupting pending drain requests manually? Shouldn't be the
> > jms.closeTimeout applied with higher priority than amqp.drainTimeout when
> > the connection is being closed?
> >
> > Thanks for any information. At the moment the only information about
> these
> > settings I have is from
> > https://qpid.apache.org/releases/qpid-jms-0.23.0/docs/index.html and it
> is
> > simply not sufficient.
> >
> > Stacktrace:
> > "DefaultMessageListenerContainer-1" #23 prio=5 os_prio=0
> > tid=0x0000000023db4000 nid=0x3a70 waiting on condition
> [0x0000000027a0e000]
> >    java.lang.Thread.State: WAITING (parking)
> >     at sun.misc.Unsafe.park(Native Method)
> >     - parking to wait for  <0x0000000773c2d328> (a
> > java.util.concurrent.CountDownLatch$Sync)
> >     at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> >     at
> > java.util.concurrent.locks.AbstractQueuedSynchronizer.
> parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
> >     at
> > java.util.concurrent.locks.AbstractQueuedSynchronizer.
> doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
> >     at
> > java.util.concurrent.locks.AbstractQueuedSynchronizer.
> acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
> >     at java.util.concurrent.CountDownLatch.await(
> CountDownLatch.java:231)
> >     at
> > org.apache.qpid.jms.provider.ProviderFuture.sync(ProviderFuture.java:97)
> >     at org.apache.qpid.jms.JmsConnection.pull(JmsConnection.java:829)
> >     at org.apache.qpid.jms.JmsConnection.pull(JmsConnection.java:818)
> >     at
> > org.apache.qpid.jms.JmsMessageConsumer.performPullIfRequired(
> JmsMessageConsumer.java:689)
> >     at
> > org.apache.qpid.jms.JmsMessageConsumer.dequeue(
> JmsMessageConsumer.java:283)
> >     at
> > org.apache.qpid.jms.JmsMessageConsumer.receive(
> JmsMessageConsumer.java:192)
> >     at
> > org.springframework.jms.connection.CachedMessageConsumer.receive(
> CachedMessageConsumer.java:82)
> >     at
> > org.springframework.jms.support.destination.JmsDestinationAccessor.
> receiveFromConsumer(JmsDestinationAccessor.java:130)
> >     at
> > org.springframework.jms.listener.AbstractPollingMessageListener
> Container.receiveMessage(AbstractPollingMessageListenerContainer.java:416)
> >     at
> > org.springframework.jms.listener.AbstractPollingMessageListener
> Container.doReceiveAndExecute(AbstractPollingMessageListener
> Container.java:302)
> >     at
> > org.springframework.jms.listener.AbstractPollingMessageListener
> Container.receiveAndExecute(AbstractPollingMessageListener
> Container.java:255)
> >     at
> > org.springframework.jms.listener.DefaultMessageListenerContaine
> r$AsyncMessageListenerInvoker.invokeListener(
> DefaultMessageListenerContainer.java:1166)
> >     at
> > org.springframework.jms.listener.DefaultMessageListenerContaine
> r$AsyncMessageListenerInvoker.executeOngoingLoop(
> DefaultMessageListenerContainer.java:1158)
> >     at
> > org.springframework.jms.listener.DefaultMessageListenerContaine
> r$AsyncMessageListenerInvoker.run(DefaultMessageListenerContaine
> r.java:1055)
> >     at java.lang.Thread.run(Thread.java:745)
> >
> > "AmqpProvider:(1):[amqps://mySB.servicebus.windows.net:-1]" #18 daemon
> > prio=5 os_prio=0 tid=0x000000002388f800 nid=0x2664 waiting on condition
> > [0x0000000023a3e000]
> >    java.lang.Thread.State: TIMED_WAITING (parking)
> >     at sun.misc.Unsafe.park(Native Method)
> >     - parking to wait for  <0x000000076fae3810> (a
> > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> >     at
> > java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
> >     at
> > java.util.concurrent.locks.AbstractQueuedSynchronizer$
> ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
> >     at
> > java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(
> ScheduledThreadPoolExecutor.java:1093)
> >     at
> > java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(
> ScheduledThreadPoolExecutor.java:809)
> >     at
> > java.util.concurrent.ThreadPoolExecutor.getTask(
> ThreadPoolExecutor.java:1067)
> >     at
> > java.util.concurrent.ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1127)
> >     at
> > java.util.concurrent.ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor.java:617)
> >     at java.lang.Thread.run(Thread.java:745)
> >
> > "QpidJMS Connection Executor: ID:4ac30ae5-f428-4f04-9512-f226aea41377:1"
> > #17 prio=5 os_prio=0 tid=0x0000000023ce0800 nid=0x3c24 waiting on
> condition
> > [0x0000000021d3e000]
> >    java.lang.Thread.State: WAITING (parking)
> >     at sun.misc.Unsafe.park(Native Method)
> >     - parking to wait for  <0x000000076fbb0d50> (a
> > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> >     at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> >     at
> > java.util.concurrent.locks.AbstractQueuedSynchronizer$
> ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
> >     at
> > java.util.concurrent.LinkedBlockingQueue.take(
> LinkedBlockingQueue.java:442)
> >     at
> > java.util.concurrent.ThreadPoolExecutor.getTask(
> ThreadPoolExecutor.java:1067)
> >     at
> > java.util.concurrent.ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1127)
> >     at
> > java.util.concurrent.ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor.java:617)
> >     at java.lang.Thread.run(Thread.java:745)
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: users-unsubscribe@qpid.apache.org
> For additional commands, e-mail: users-help@qpid.apache.org
>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message