qpid-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Milano Nicolum <dracz...@gmail.com>
Subject Re: amqp.drainTimeout vs. jms.closeTimeout vs. jms.requestTimeout
Date Wed, 24 May 2017 14:10:48 GMT
The problem was really prefetch set to 0 because of some previous issues
where high prefetch caused problem with message consumption on multiple
parallel consumers on single Service Bus queue. (With high prefetch
messages are usually not processed in predefined time. Which then triggers
another message redelivery depending on queue properties. And eventually
caused large number of messages being moved to Dead letter queue instead of
being processed properly.)

I set it to 0 intentionally to disable prefetch entirely as described in:
https://docs.microsoft.com/cs-cz/azure/service-bus-messaging/service-bus-performance-improvements
.

With prefetch set to 1 my tests work like charm, but I have to check if the
real code gets affected by this change or not.

Thanks for help!

Now it closes all beans in an instant:
15:56:23.071 [main] INFO  o.s.c.s.DefaultLifecycleProcessor - Stopping
beans in phase 2147483647
15:56:23.071 [main] DEBUG o.s.c.s.DefaultLifecycleProcessor - Asking bean
'org.springframework.jms.config.internalJmsListenerEndpointRegistry' of
type [class org.springframework.jms.config.JmsListenerEndpointRegistry] to
stop
15:56:23.112 [DefaultMessageListenerContainer-1] TRACE
o.s.j.l.DefaultMessageListenerContainer - Consumer
[org.apache.qpid.jms.JmsDurableTopicSubscriber@1b6cc7c1] of session
[org.apache.qpid.jms.JmsSession@1bc295e9] did not receive a message
15:56:23.112 [DefaultMessageListenerContainer-1] DEBUG
o.s.c.s.DefaultLifecycleProcessor - Bean
'org.springframework.jms.config.internalJmsListenerEndpointRegistry'
completed its stop procedure
15:56:23.112 [main] DEBUG o.s.b.f.s.DefaultListableBeanFactory - Destroying
singletons in
org.springframework.beans.factory.support.DefaultListableBeanFactory@13c9d689:
defining beans
[org.springframework.context.annotation.internalConfigurationAnnotationProcessor,org.springframework.context.annotation.internalAutowiredAnnotationProcessor,org.springframework.context.annotation.internalRequiredAnnotationProcessor,org.springframework.context.annotation.internalCommonAnnotationProcessor,org.springframework.context.event.internalEventListenerProcessor,org.springframework.context.event.internalEventListenerFactory,jsonContextConfiguration,json2ObjectConverterFactory,wrappedJson2ObjectConverterFactory,messagingContextConfiguration,testQueueListener,testTopicListener,serviceBusTestContextConfiguration,objectMapper,org.springframework.jms.annotation.JmsBootstrapConfiguration,org.springframework.jms.config.internalJmsListenerAnnotationProcessor,org.springframework.jms.config.internalJmsListenerEndpointRegistry,connectionFactory,destinationResolver,jmsListenerContainerFactory,messageHandlerMethodFactory,messagingConversionService,jacksonJmsMessageConverter,messaging-com.openmatics.cloud.lib.messaging.MessagingConfigurationProperties,org.springframework.boot.context.properties.ConfigurationPropertiesBindingPostProcessor,org.springframework.boot.context.properties.ConfigurationPropertiesBindingPostProcessor.store,propertyPlaceholderConfigurer,getTestErrorHandler];
root of factory hierarchy
15:56:23.112 [main] DEBUG o.s.b.f.s.DisposableBeanAdapter - Invoking
destroy() on bean with name
'org.springframework.jms.config.internalJmsListenerEndpointRegistry'
15:56:23.112 [main] DEBUG o.s.j.l.DefaultMessageListenerContainer -
Shutting down JMS listener container

2017-05-24 15:51 GMT+02:00 Robbie Gemmell <robbie.gemmell@gmail.com>:

> The client attempts to drain the consumer credit in 3 main situations:
> - When a receive[NoWait] call is attempted and any timeout elapses
> without a message being available in the local buffer.
> - While performing a transaction rollback (inc transacted session closure).
> - A special case of the first, where prefetch has also been configured
> to 0, and the client must drain credit to either get a message or
> satisfy the config.
>
> The first case is the only one those options govern. Last I saw, the
> Service Bus documentation said transactions were not supported.
> Setting a prefetch of 0 won't work properly against Service Bus
> because the client must drain the link. You'll need to investigate
> what applies here and is leading to the drain attempt and then try to
> update so it doesnt happen.
>
> On 24 May 2017 at 13:01, Milano Nicolum <draczech@gmail.com> wrote:
> > Unfortunately it seems that simply setting either of
> > jms.receiveLocalOnly=true or jms.receiveNoWaitLocalOnly=true parameters
> > does not help. I Tried adding those parameters to connector URL, I tried
> > setting the matching options on my JmsConnectionFactory, but the
> > JmsListenerEndpointRegistry still can't be closed in time. Application is
> > still stuck for 30 seconds before the bean shutdown fails and real
> closing
> > process begins.
> >
> > Then during DefaultMessageListenerContainer shutdown following
> exception is
> > caught:
> >
> > 52:59.647 [DefaultMessageListenerContainer-1] WARN
> > o.s.j.l.DefaultMessageListenerContainer - Setup of JMS message listener
> > invoker failed for destination 'messaging-lib-test-queue' - trying to
> > recover. Cause: Remote did not respond to a drain request in time
> > org.apache.qpid.jms.JmsOperationTimedOutException: Remote did not
> respond
> > to a drain request in time
> >     at
> > org.apache.qpid.jms.provider.amqp.AmqpConsumer$1.run(
> AmqpConsumer.java:140)
> >     at
> > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> >     at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> >     at
> > java.util.concurrent.ScheduledThreadPoolExecutor$
> ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
> >     at
> > java.util.concurrent.ScheduledThreadPoolExecutor$
> ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
> >     at
> > java.util.concurrent.ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1142)
> >     at
> > java.util.concurrent.ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor.java:617)
> >     at java.lang.Thread.run(Thread.java:745)
> >
> > Shouldn't there be no such problem if I'm using the
> > jms.receiveLocalOnly=true option?
> >
> > If you want me to provide any logs, code or dumps, just let me know how I
> > can help.
> >
> >
> > 2017-05-24 12:14 GMT+02:00 Robbie Gemmell <robbie.gemmell@gmail.com>:
> >
> >> The consumers credit 'drain' attempt is failing to complete because
> >> Azure Service Bus because doesnt implement the 'drain' functionality.
> >> Instead of reducing the drainTimeout, you will instead need to toggle
> >> the client to only receive out of its local buffers when working with
> >> Service Bus, using jms.receiveLocalOnly=true in your URI (and
> >> jms.receiveNoWaitLocalOnly=true if needed).
> >>
> >> The timeouts aren't layered and so wouldnt really apply as you are
> >> suggesting, however in this case I dont think it makes a difference.
> >> I'm guessing (hard to tell for sure from the limited info) that Spring
> >> doesnt actiually get as far as trying to close things due to waiting
> >> on the receive, hence the drain attempt wasnt aborted by closure
> >> because that never actually began until after. In either case, as
> >> mentioned above you actually need to prevent the drain attempt instead
> >> because it will fail.
> >>
> >> On 23 May 2017 at 15:55, Milano Nicolum <draczech@gmail.com> wrote:
> >> > Hi I have a question about function of different timeouts in Qpid JMS
> >> > client and priority they are applied in.
> >> >
> >> > Currently I'm running Spring test using qpid-jms-client 0.23.0 (in
> >> > cooperation with spring-jms 4.3.8.RELEASE) to connect to MS Azure
> Service
> >> > Bus Queue, send a few messages there and confirm the same messages are
> >> > received by @JmsListener implementation from the Queue.
> >> >
> >> > Message send + receive works fine, but when Spring tries to shut down
> >> > components at the end, the test gets stuck. The
> >> > org.springframework.jms.config.internalJmsListenerEndpointRegistry
> >> cannot
> >> > be shutdown in time because it is stuck waiting for a message.
> >> >
> >> > I went through stacktrace and code and I got the impression the
> shutdown
> >> > getting stuck is caused by AmqpConsumer.pull(final long timeout, final
> >> > AsyncResult request) taking too long to finish.
> >> >
> >> > My jms.closeTimeout is set to 1500ms, jms.requestTimeout to 500ms and
> >> > neither is applied on context shutdown. But when I tried setting
> >> > amqp.drainTimeout to 5000ms, the internal consumer times out on last
> >> drain
> >> > request, connection gets closed in time and all beans are destroyed
> OK.
> >> >
> >> > As this is more of a workaround than a solution I wonder if there is a
> >> way
> >> > to shutdown the test correctly and more gracefully if possible. Is
> there
> >> > any way of interrupting pending drain requests manually? Shouldn't be
> the
> >> > jms.closeTimeout applied with higher priority than amqp.drainTimeout
> when
> >> > the connection is being closed?
> >> >
> >> > Thanks for any information. At the moment the only information about
> >> these
> >> > settings I have is from
> >> > https://qpid.apache.org/releases/qpid-jms-0.23.0/docs/index.html and
> it
> >> is
> >> > simply not sufficient.
> >> >
> >> > Stacktrace:
> >> > "DefaultMessageListenerContainer-1" #23 prio=5 os_prio=0
> >> > tid=0x0000000023db4000 nid=0x3a70 waiting on condition
> >> [0x0000000027a0e000]
> >> >    java.lang.Thread.State: WAITING (parking)
> >> >     at sun.misc.Unsafe.park(Native Method)
> >> >     - parking to wait for  <0x0000000773c2d328> (a
> >> > java.util.concurrent.CountDownLatch$Sync)
> >> >     at java.util.concurrent.locks.LockSupport.park(LockSupport.
> java:175)
> >> >     at
> >> > java.util.concurrent.locks.AbstractQueuedSynchronizer.
> >> parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
> >> >     at
> >> > java.util.concurrent.locks.AbstractQueuedSynchronizer.
> >> doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
> >> >     at
> >> > java.util.concurrent.locks.AbstractQueuedSynchronizer.
> >> acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
> >> >     at java.util.concurrent.CountDownLatch.await(
> >> CountDownLatch.java:231)
> >> >     at
> >> > org.apache.qpid.jms.provider.ProviderFuture.sync(
> ProviderFuture.java:97)
> >> >     at org.apache.qpid.jms.JmsConnection.pull(JmsConnection.java:829)
> >> >     at org.apache.qpid.jms.JmsConnection.pull(JmsConnection.java:818)
> >> >     at
> >> > org.apache.qpid.jms.JmsMessageConsumer.performPullIfRequired(
> >> JmsMessageConsumer.java:689)
> >> >     at
> >> > org.apache.qpid.jms.JmsMessageConsumer.dequeue(
> >> JmsMessageConsumer.java:283)
> >> >     at
> >> > org.apache.qpid.jms.JmsMessageConsumer.receive(
> >> JmsMessageConsumer.java:192)
> >> >     at
> >> > org.springframework.jms.connection.CachedMessageConsumer.receive(
> >> CachedMessageConsumer.java:82)
> >> >     at
> >> > org.springframework.jms.support.destination.JmsDestinationAccessor.
> >> receiveFromConsumer(JmsDestinationAccessor.java:130)
> >> >     at
> >> > org.springframework.jms.listener.AbstractPollingMessageListener
> >> Container.receiveMessage(AbstractPollingMessageListener
> Container.java:416)
> >> >     at
> >> > org.springframework.jms.listener.AbstractPollingMessageListener
> >> Container.doReceiveAndExecute(AbstractPollingMessageListener
> >> Container.java:302)
> >> >     at
> >> > org.springframework.jms.listener.AbstractPollingMessageListener
> >> Container.receiveAndExecute(AbstractPollingMessageListener
> >> Container.java:255)
> >> >     at
> >> > org.springframework.jms.listener.DefaultMessageListenerContaine
> >> r$AsyncMessageListenerInvoker.invokeListener(
> >> DefaultMessageListenerContainer.java:1166)
> >> >     at
> >> > org.springframework.jms.listener.DefaultMessageListenerContaine
> >> r$AsyncMessageListenerInvoker.executeOngoingLoop(
> >> DefaultMessageListenerContainer.java:1158)
> >> >     at
> >> > org.springframework.jms.listener.DefaultMessageListenerContaine
> >> r$AsyncMessageListenerInvoker.run(DefaultMessageListenerContaine
> >> r.java:1055)
> >> >     at java.lang.Thread.run(Thread.java:745)
> >> >
> >> > "AmqpProvider:(1):[amqps://mySB.servicebus.windows.net:-1]" #18
> daemon
> >> > prio=5 os_prio=0 tid=0x000000002388f800 nid=0x2664 waiting on
> condition
> >> > [0x0000000023a3e000]
> >> >    java.lang.Thread.State: TIMED_WAITING (parking)
> >> >     at sun.misc.Unsafe.park(Native Method)
> >> >     - parking to wait for  <0x000000076fae3810> (a
> >> > java.util.concurrent.locks.AbstractQueuedSynchronizer$
> ConditionObject)
> >> >     at
> >> > java.util.concurrent.locks.LockSupport.parkNanos(
> LockSupport.java:215)
> >> >     at
> >> > java.util.concurrent.locks.AbstractQueuedSynchronizer$
> >> ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
> >> >     at
> >> > java.util.concurrent.ScheduledThreadPoolExecutor$
> DelayedWorkQueue.take(
> >> ScheduledThreadPoolExecutor.java:1093)
> >> >     at
> >> > java.util.concurrent.ScheduledThreadPoolExecutor$
> DelayedWorkQueue.take(
> >> ScheduledThreadPoolExecutor.java:809)
> >> >     at
> >> > java.util.concurrent.ThreadPoolExecutor.getTask(
> >> ThreadPoolExecutor.java:1067)
> >> >     at
> >> > java.util.concurrent.ThreadPoolExecutor.runWorker(
> >> ThreadPoolExecutor.java:1127)
> >> >     at
> >> > java.util.concurrent.ThreadPoolExecutor$Worker.run(
> >> ThreadPoolExecutor.java:617)
> >> >     at java.lang.Thread.run(Thread.java:745)
> >> >
> >> > "QpidJMS Connection Executor: ID:4ac30ae5-f428-4f04-9512-
> f226aea41377:1"
> >> > #17 prio=5 os_prio=0 tid=0x0000000023ce0800 nid=0x3c24 waiting on
> >> condition
> >> > [0x0000000021d3e000]
> >> >    java.lang.Thread.State: WAITING (parking)
> >> >     at sun.misc.Unsafe.park(Native Method)
> >> >     - parking to wait for  <0x000000076fbb0d50> (a
> >> > java.util.concurrent.locks.AbstractQueuedSynchronizer$
> ConditionObject)
> >> >     at java.util.concurrent.locks.LockSupport.park(LockSupport.
> java:175)
> >> >     at
> >> > java.util.concurrent.locks.AbstractQueuedSynchronizer$
> >> ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
> >> >     at
> >> > java.util.concurrent.LinkedBlockingQueue.take(
> >> LinkedBlockingQueue.java:442)
> >> >     at
> >> > java.util.concurrent.ThreadPoolExecutor.getTask(
> >> ThreadPoolExecutor.java:1067)
> >> >     at
> >> > java.util.concurrent.ThreadPoolExecutor.runWorker(
> >> ThreadPoolExecutor.java:1127)
> >> >     at
> >> > java.util.concurrent.ThreadPoolExecutor$Worker.run(
> >> ThreadPoolExecutor.java:617)
> >> >     at java.lang.Thread.run(Thread.java:745)
> >>
> >> ---------------------------------------------------------------------
> >> To unsubscribe, e-mail: users-unsubscribe@qpid.apache.org
> >> For additional commands, e-mail: users-help@qpid.apache.org
> >>
> >>
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: users-unsubscribe@qpid.apache.org
> For additional commands, e-mail: users-help@qpid.apache.org
>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message