qpid-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Robbie Gemmell <robbie.gemm...@gmail.com>
Subject Re: amqp.drainTimeout vs. jms.closeTimeout vs. jms.requestTimeout
Date Wed, 24 May 2017 13:51:34 GMT
The client attempts to drain the consumer credit in 3 main situations:
- When a receive[NoWait] call is attempted and any timeout elapses
without a message being available in the local buffer.
- While performing a transaction rollback (inc transacted session closure).
- A special case of the first, where prefetch has also been configured
to 0, and the client must drain credit to either get a message or
satisfy the config.

The first case is the only one those options govern. Last I saw, the
Service Bus documentation said transactions were not supported.
Setting a prefetch of 0 won't work properly against Service Bus
because the client must drain the link. You'll need to investigate
what applies here and is leading to the drain attempt and then try to
update so it doesnt happen.

On 24 May 2017 at 13:01, Milano Nicolum <draczech@gmail.com> wrote:
> Unfortunately it seems that simply setting either of
> jms.receiveLocalOnly=true or jms.receiveNoWaitLocalOnly=true parameters
> does not help. I Tried adding those parameters to connector URL, I tried
> setting the matching options on my JmsConnectionFactory, but the
> JmsListenerEndpointRegistry still can't be closed in time. Application is
> still stuck for 30 seconds before the bean shutdown fails and real closing
> process begins.
>
> Then during DefaultMessageListenerContainer shutdown following exception is
> caught:
>
> 52:59.647 [DefaultMessageListenerContainer-1] WARN
> o.s.j.l.DefaultMessageListenerContainer - Setup of JMS message listener
> invoker failed for destination 'messaging-lib-test-queue' - trying to
> recover. Cause: Remote did not respond to a drain request in time
> org.apache.qpid.jms.JmsOperationTimedOutException: Remote did not respond
> to a drain request in time
>     at
> org.apache.qpid.jms.provider.amqp.AmqpConsumer$1.run(AmqpConsumer.java:140)
>     at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>     at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>     at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>     at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>     at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>     at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>     at java.lang.Thread.run(Thread.java:745)
>
> Shouldn't there be no such problem if I'm using the
> jms.receiveLocalOnly=true option?
>
> If you want me to provide any logs, code or dumps, just let me know how I
> can help.
>
>
> 2017-05-24 12:14 GMT+02:00 Robbie Gemmell <robbie.gemmell@gmail.com>:
>
>> The consumers credit 'drain' attempt is failing to complete because
>> Azure Service Bus because doesnt implement the 'drain' functionality.
>> Instead of reducing the drainTimeout, you will instead need to toggle
>> the client to only receive out of its local buffers when working with
>> Service Bus, using jms.receiveLocalOnly=true in your URI (and
>> jms.receiveNoWaitLocalOnly=true if needed).
>>
>> The timeouts aren't layered and so wouldnt really apply as you are
>> suggesting, however in this case I dont think it makes a difference.
>> I'm guessing (hard to tell for sure from the limited info) that Spring
>> doesnt actiually get as far as trying to close things due to waiting
>> on the receive, hence the drain attempt wasnt aborted by closure
>> because that never actually began until after. In either case, as
>> mentioned above you actually need to prevent the drain attempt instead
>> because it will fail.
>>
>> On 23 May 2017 at 15:55, Milano Nicolum <draczech@gmail.com> wrote:
>> > Hi I have a question about function of different timeouts in Qpid JMS
>> > client and priority they are applied in.
>> >
>> > Currently I'm running Spring test using qpid-jms-client 0.23.0 (in
>> > cooperation with spring-jms 4.3.8.RELEASE) to connect to MS Azure Service
>> > Bus Queue, send a few messages there and confirm the same messages are
>> > received by @JmsListener implementation from the Queue.
>> >
>> > Message send + receive works fine, but when Spring tries to shut down
>> > components at the end, the test gets stuck. The
>> > org.springframework.jms.config.internalJmsListenerEndpointRegistry
>> cannot
>> > be shutdown in time because it is stuck waiting for a message.
>> >
>> > I went through stacktrace and code and I got the impression the shutdown
>> > getting stuck is caused by AmqpConsumer.pull(final long timeout, final
>> > AsyncResult request) taking too long to finish.
>> >
>> > My jms.closeTimeout is set to 1500ms, jms.requestTimeout to 500ms and
>> > neither is applied on context shutdown. But when I tried setting
>> > amqp.drainTimeout to 5000ms, the internal consumer times out on last
>> drain
>> > request, connection gets closed in time and all beans are destroyed OK.
>> >
>> > As this is more of a workaround than a solution I wonder if there is a
>> way
>> > to shutdown the test correctly and more gracefully if possible. Is there
>> > any way of interrupting pending drain requests manually? Shouldn't be the
>> > jms.closeTimeout applied with higher priority than amqp.drainTimeout when
>> > the connection is being closed?
>> >
>> > Thanks for any information. At the moment the only information about
>> these
>> > settings I have is from
>> > https://qpid.apache.org/releases/qpid-jms-0.23.0/docs/index.html and it
>> is
>> > simply not sufficient.
>> >
>> > Stacktrace:
>> > "DefaultMessageListenerContainer-1" #23 prio=5 os_prio=0
>> > tid=0x0000000023db4000 nid=0x3a70 waiting on condition
>> [0x0000000027a0e000]
>> >    java.lang.Thread.State: WAITING (parking)
>> >     at sun.misc.Unsafe.park(Native Method)
>> >     - parking to wait for  <0x0000000773c2d328> (a
>> > java.util.concurrent.CountDownLatch$Sync)
>> >     at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>> >     at
>> > java.util.concurrent.locks.AbstractQueuedSynchronizer.
>> parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
>> >     at
>> > java.util.concurrent.locks.AbstractQueuedSynchronizer.
>> doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
>> >     at
>> > java.util.concurrent.locks.AbstractQueuedSynchronizer.
>> acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
>> >     at java.util.concurrent.CountDownLatch.await(
>> CountDownLatch.java:231)
>> >     at
>> > org.apache.qpid.jms.provider.ProviderFuture.sync(ProviderFuture.java:97)
>> >     at org.apache.qpid.jms.JmsConnection.pull(JmsConnection.java:829)
>> >     at org.apache.qpid.jms.JmsConnection.pull(JmsConnection.java:818)
>> >     at
>> > org.apache.qpid.jms.JmsMessageConsumer.performPullIfRequired(
>> JmsMessageConsumer.java:689)
>> >     at
>> > org.apache.qpid.jms.JmsMessageConsumer.dequeue(
>> JmsMessageConsumer.java:283)
>> >     at
>> > org.apache.qpid.jms.JmsMessageConsumer.receive(
>> JmsMessageConsumer.java:192)
>> >     at
>> > org.springframework.jms.connection.CachedMessageConsumer.receive(
>> CachedMessageConsumer.java:82)
>> >     at
>> > org.springframework.jms.support.destination.JmsDestinationAccessor.
>> receiveFromConsumer(JmsDestinationAccessor.java:130)
>> >     at
>> > org.springframework.jms.listener.AbstractPollingMessageListener
>> Container.receiveMessage(AbstractPollingMessageListenerContainer.java:416)
>> >     at
>> > org.springframework.jms.listener.AbstractPollingMessageListener
>> Container.doReceiveAndExecute(AbstractPollingMessageListener
>> Container.java:302)
>> >     at
>> > org.springframework.jms.listener.AbstractPollingMessageListener
>> Container.receiveAndExecute(AbstractPollingMessageListener
>> Container.java:255)
>> >     at
>> > org.springframework.jms.listener.DefaultMessageListenerContaine
>> r$AsyncMessageListenerInvoker.invokeListener(
>> DefaultMessageListenerContainer.java:1166)
>> >     at
>> > org.springframework.jms.listener.DefaultMessageListenerContaine
>> r$AsyncMessageListenerInvoker.executeOngoingLoop(
>> DefaultMessageListenerContainer.java:1158)
>> >     at
>> > org.springframework.jms.listener.DefaultMessageListenerContaine
>> r$AsyncMessageListenerInvoker.run(DefaultMessageListenerContaine
>> r.java:1055)
>> >     at java.lang.Thread.run(Thread.java:745)
>> >
>> > "AmqpProvider:(1):[amqps://mySB.servicebus.windows.net:-1]" #18 daemon
>> > prio=5 os_prio=0 tid=0x000000002388f800 nid=0x2664 waiting on condition
>> > [0x0000000023a3e000]
>> >    java.lang.Thread.State: TIMED_WAITING (parking)
>> >     at sun.misc.Unsafe.park(Native Method)
>> >     - parking to wait for  <0x000000076fae3810> (a
>> > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>> >     at
>> > java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
>> >     at
>> > java.util.concurrent.locks.AbstractQueuedSynchronizer$
>> ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
>> >     at
>> > java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(
>> ScheduledThreadPoolExecutor.java:1093)
>> >     at
>> > java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(
>> ScheduledThreadPoolExecutor.java:809)
>> >     at
>> > java.util.concurrent.ThreadPoolExecutor.getTask(
>> ThreadPoolExecutor.java:1067)
>> >     at
>> > java.util.concurrent.ThreadPoolExecutor.runWorker(
>> ThreadPoolExecutor.java:1127)
>> >     at
>> > java.util.concurrent.ThreadPoolExecutor$Worker.run(
>> ThreadPoolExecutor.java:617)
>> >     at java.lang.Thread.run(Thread.java:745)
>> >
>> > "QpidJMS Connection Executor: ID:4ac30ae5-f428-4f04-9512-f226aea41377:1"
>> > #17 prio=5 os_prio=0 tid=0x0000000023ce0800 nid=0x3c24 waiting on
>> condition
>> > [0x0000000021d3e000]
>> >    java.lang.Thread.State: WAITING (parking)
>> >     at sun.misc.Unsafe.park(Native Method)
>> >     - parking to wait for  <0x000000076fbb0d50> (a
>> > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>> >     at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>> >     at
>> > java.util.concurrent.locks.AbstractQueuedSynchronizer$
>> ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
>> >     at
>> > java.util.concurrent.LinkedBlockingQueue.take(
>> LinkedBlockingQueue.java:442)
>> >     at
>> > java.util.concurrent.ThreadPoolExecutor.getTask(
>> ThreadPoolExecutor.java:1067)
>> >     at
>> > java.util.concurrent.ThreadPoolExecutor.runWorker(
>> ThreadPoolExecutor.java:1127)
>> >     at
>> > java.util.concurrent.ThreadPoolExecutor$Worker.run(
>> ThreadPoolExecutor.java:617)
>> >     at java.lang.Thread.run(Thread.java:745)
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: users-unsubscribe@qpid.apache.org
>> For additional commands, e-mail: users-help@qpid.apache.org
>>
>>

---------------------------------------------------------------------
To unsubscribe, e-mail: users-unsubscribe@qpid.apache.org
For additional commands, e-mail: users-help@qpid.apache.org


Mime
View raw message