cxf-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Freeman Fang (JIRA)" <j...@apache.org>
Subject [jira] Issue Comment Edited: (CXF-2002) Server async jms transport needs dynamic mechanism to throttle message consumption
Date Wed, 22 Apr 2009 09:38:47 GMT

    [ https://issues.apache.org/jira/browse/CXF-2002?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=12701443#action_12701443
] 

Freeman Fang edited comment on CXF-2002 at 4/22/09 2:37 AM:
------------------------------------------------------------

Hi Christian and Sergey,

I'm trying to verify this fix works in Servicemix now.
And I just found that JMSDestination.activate() method will hang(actually hang when AbstractJmsListeningContainer.initialize),
the stacktrace is
"Timer-4" daemon prio=1 tid=0x81a96d10 nid=0x1d6c in Object.wait() [0x7f7be000..0x7f7beeb0]
        at java.lang.Object.wait(Native Method)
        - waiting on <0xafb871a0> (a java.lang.Object)
        at java.lang.Object.wait(Object.java:474)
        at org.springframework.util.ConcurrencyThrottleSupport.beforeAccess(ConcurrencyThrottleSupport.java:118)
        - locked <0xafb871a0> (a java.lang.Object)
        at org.springframework.core.task.SimpleAsyncTaskExecutor$ConcurrencyThrottleAdapter.beforeAccess(SimpleAsyncTaskExecutor.java:169)
        at org.springframework.core.task.SimpleAsyncTaskExecutor.execute(SimpleAsyncTaskExecutor.java:141)
        at org.springframework.core.task.SimpleAsyncTaskExecutor.execute(SimpleAsyncTaskExecutor.java:126)
        at org.springframework.jms.listener.DefaultMessageListenerContainer.doRescheduleTask(DefaultMessageListenerContainer.java:495)
        at org.springframework.jms.listener.AbstractJmsListeningContainer.rescheduleTaskIfNecessary(AbstractJmsListeningContainer.java:474)
        - locked <0xafb52a88> (a java.lang.Object)
        at org.springframework.jms.listener.DefaultMessageListenerContainer.scheduleNewInvoker(DefaultMessageListenerContainer.java:532)
        at org.springframework.jms.listener.DefaultMessageListenerContainer.doInitialize(DefaultMessageListenerContainer.java:485)
        - locked <0xafb67c08> (a java.lang.Object)
        at org.springframework.jms.listener.AbstractJmsListeningContainer.initialize(AbstractJmsListeningContainer.java:160)
        at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.initialize(AbstractPollingMessageListenerContainer.java:199)
        at org.springframework.jms.listener.DefaultMessageListenerContainer.initialize(DefaultMessageListenerContainer.java:451)
        at org.apache.cxf.transport.jms.JMSFactory.createJmsListener(JMSFactory.java:171)
        at org.apache.cxf.transport.jms.JMSDestination.activate(JMSDestination.java:105)
        at org.apache.cxf.transport.AbstractObservable.setMessageObserver(AbstractObservable.java:48)
        - locked <0xb0e3f2d8> (a org.apache.cxf.transport.jms.JMSDestination)
        at org.apache.cxf.endpoint.ServerImpl.start(ServerImpl.java:120)
        at org.apache.servicemix.cxfbc.CxfBcConsumer.start(CxfBcConsumer.java:325)
        at org.apache.servicemix.common.endpoints.SimpleEndpoint.activate(SimpleEndpoint.java:58)
        - locked <0xb0e52018> (a org.apache.servicemix.cxfbc.CxfBcConsumer)
        at org.apache.servicemix.common.ServiceUnit.start(ServiceUnit.java:53)
        at org.apache.servicemix.common.BaseServiceUnitManager.start(BaseServiceUnitManager.java:151)
        - locked <0x92087f10> (a org.apache.servicemix.common.BaseServiceUnitManager)
        at org.apache.servicemix.jbi.framework.ServiceUnitLifeCycle.start(ServiceUnitLifeCycle.java:103)
        at org.apache.servicemix.jbi.framework.ServiceAssemblyLifeCycle.start(ServiceAssemblyLifeCycle.java:132)
        - locked <0xb0e88458> (a org.apache.servicemix.jbi.framework.ServiceAssemblyLifeCycle)
        at org.apache.servicemix.jbi.framework.DeploymentService.start(DeploymentService.java:378)

After digging the code, I found that this error might be introduced by the commit http://svn.apache.org/viewvc?rev=737356&view=rev
If I remove the changes in this commit for JMSFactory.java
+        } else {
+            SimpleAsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor();
+            taskExecutor.setConcurrencyLimit(jmsConfig.getMaxConcurrentTasks());
+            jmsListener.setTaskExecutor(taskExecutor);
         }
then the hang disapper.
I'm not sure if this piece of code is really necessary here for addressing CXF-2002, Could
you please shed me some light?
My configuration for JMSFeature looks like
<bean class="org.apache.cxf.transport.jms.JMSConfigFeature">
                        <property name="jmsConfig">
                            <bean class="org.apache.cxf.transport.jms.JMSConfiguration">
                                <property name="connectionFactory">
                                    <ref bean="myConnectionFactory" />
                                </property>
                                <property name="targetDestination">
                                    <value>dynamicQueues/person.queue</value>
                                </property>

                                <property name="useJms11">
                                    <value>true</value>
                                </property>
                                <property name="timeToLive">
                                    <value>500000</value>
                                </property>
                                <property name="concurrentConsumers">
                                    <value>1</value>
                                </property>
                                <property name="maxConcurrentConsumers">
                                    <value>1</value>
                                </property>
                                <property name="maxSuspendedContinuations">
                                    <value>1</value>
                                </property>
                                <property name="cacheLevel">
                                    <value>2</value>
                                </property>
                            </bean>
                        </property>
                    </bean>

<bean id="myConnectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
        <property name="targetConnectionFactory">
            <bean class="org.apache.activemq.ActiveMQConnectionFactory">
                <property name="brokerURL" value="tcp://localhost:61616" />
            </bean>
        </property>
  </bean>
Do I miss something?
Thanks
Freeman



      was (Author: ffang):
    Hi Christian and Sergey,

I'm trying to verify this fix works in Servicemix now.
And I just found that JMSDestination.activate() method will hang(actually hang when AbstractJmsListeningContainer.initialize),
the stacktrace is
"Timer-4" daemon prio=1 tid=0x81a96d10 nid=0x1d6c in Object.wait() [0x7f7be000..0x7f7beeb0]
        at java.lang.Object.wait(Native Method)
        - waiting on <0xafb871a0> (a java.lang.Object)
        at java.lang.Object.wait(Object.java:474)
        at org.springframework.util.ConcurrencyThrottleSupport.beforeAccess(ConcurrencyThrottleSupport.java:118)
        - locked <0xafb871a0> (a java.lang.Object)
        at org.springframework.core.task.SimpleAsyncTaskExecutor$ConcurrencyThrottleAdapter.beforeAccess(SimpleAsyncTaskExecutor.java:169)
        at org.springframework.core.task.SimpleAsyncTaskExecutor.execute(SimpleAsyncTaskExecutor.java:141)
        at org.springframework.core.task.SimpleAsyncTaskExecutor.execute(SimpleAsyncTaskExecutor.java:126)
        at org.springframework.jms.listener.DefaultMessageListenerContainer.doRescheduleTask(DefaultMessageListenerContainer.java:495)
        at org.springframework.jms.listener.AbstractJmsListeningContainer.rescheduleTaskIfNecessary(AbstractJmsListeningContainer.java:474)
        - locked <0xafb52a88> (a java.lang.Object)
        at org.springframework.jms.listener.DefaultMessageListenerContainer.scheduleNewInvoker(DefaultMessageListenerContainer.java:532)
        at org.springframework.jms.listener.DefaultMessageListenerContainer.doInitialize(DefaultMessageListenerContainer.java:485)
        - locked <0xafb67c08> (a java.lang.Object)
        at org.springframework.jms.listener.AbstractJmsListeningContainer.initialize(AbstractJmsListeningContainer.java:160)
        at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.initialize(AbstractPollingMessageListenerContainer.java:199)
        at org.springframework.jms.listener.DefaultMessageListenerContainer.initialize(DefaultMessageListenerContainer.java:451)
        at org.apache.cxf.transport.jms.JMSFactory.createJmsListener(JMSFactory.java:171)
        at org.apache.cxf.transport.jms.JMSDestination.activate(JMSDestination.java:105)
        at org.apache.cxf.transport.AbstractObservable.setMessageObserver(AbstractObservable.java:48)
        - locked <0xb0e3f2d8> (a org.apache.cxf.transport.jms.JMSDestination)
        at org.apache.cxf.endpoint.ServerImpl.start(ServerImpl.java:120)
        at org.apache.servicemix.cxfbc.CxfBcConsumer.start(CxfBcConsumer.java:325)
        at org.apache.servicemix.common.endpoints.SimpleEndpoint.activate(SimpleEndpoint.java:58)
        - locked <0xb0e52018> (a org.apache.servicemix.cxfbc.CxfBcConsumer)
        at org.apache.servicemix.common.ServiceUnit.start(ServiceUnit.java:53)
        at org.apache.servicemix.common.BaseServiceUnitManager.start(BaseServiceUnitManager.java:151)
        - locked <0x92087f10> (a org.apache.servicemix.common.BaseServiceUnitManager)
        at org.apache.servicemix.jbi.framework.ServiceUnitLifeCycle.start(ServiceUnitLifeCycle.java:103)
        at org.apache.servicemix.jbi.framework.ServiceAssemblyLifeCycle.start(ServiceAssemblyLifeCycle.java:132)
        - locked <0xb0e88458> (a org.apache.servicemix.jbi.framework.ServiceAssemblyLifeCycle)
        at org.apache.servicemix.jbi.framework.DeploymentService.start(DeploymentService.java:378)

After digging the code, I found that this error might be introduced by the commit http://svn.apache.org/viewvc?rev=737356&view=rev
If I remove the changes in this commit for JMSFactory.java
+        } else {
+            SimpleAsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor();
+            taskExecutor.setConcurrencyLimit(jmsConfig.getMaxConcurrentTasks());
+            jmsListener.setTaskExecutor(taskExecutor);
         }
then the hang disapper.
I'm not sure if this piece of code is really necessary here for addressing CXF-2002, Could
you please shed me some light?
My configuration for JMSFeature looks like
<bean class="org.apache.cxf.transport.jms.JMSConfigFeature">
                        <property name="jmsConfig">
                            <bean class="org.apache.cxf.transport.jms.JMSConfiguration">
                                <property name="concurrentConsumers">
                                    <value>25</value>
                                </property>
                                <property name="connectionFactory">
                                    <ref bean="myConnectionFactory" />
                                </property>
                                <property name="targetDestination">
                                    <value>dynamicQueues/person.queue</value>
                                </property>

                                <property name="useJms11">
                                    <value>true</value>
                                </property>
                            </bean>
                        </property>
                    </bean>
<bean id="myConnectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
        <property name="targetConnectionFactory">
            <bean class="org.apache.activemq.ActiveMQConnectionFactory">
                <property name="brokerURL" value="tcp://localhost:61616" />
            </bean>
        </property>
  </bean>
Do I miss something?
Thanks
Freeman


  
> Server async jms transport needs dynamic mechanism to throttle message consumption
> ----------------------------------------------------------------------------------
>
>                 Key: CXF-2002
>                 URL: https://issues.apache.org/jira/browse/CXF-2002
>             Project: CXF
>          Issue Type: Improvement
>          Components: Transports
>    Affects Versions: 2.0.9, 2.1.3, 2.0.10
>            Reporter: Ron Gavlin
>            Assignee: Sergey Beryozkin
>
> Currently, the server-side async jms transport has no mechanism to throttle consumption
of incoming messages. This becomes problematic in scenarios where a large backlog of messages
exists on the input queue. In this case, it is likely that the cxf server will overload its
internal work item queues resulting in problems. A dynamic throttling mechanism on the async
jms server is required to avoid this problem.

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.


Mime
View raw message