camel-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Claus Ibsen <claus.ib...@gmail.com>
Subject Re: Camel ActiveMQ In/OUT endpoint creates additional consumers on response queue which are more than maxConcurrentConsumers
Date Wed, 18 Mar 2015 06:50:45 GMT
Hi

When doing request/reply over JMS using camel-jms then Camel uses the
concurrentConsumers settings you may have configured on the component
/ endpoint.

So if you only want 1 consumer, then set that on the endpoint

Though I have logged a ticket to allow having separate options for
regular concurrent consumers vs for request/reply
https://issues.apache.org/jira/browse/CAMEL-8503

On Tue, Mar 17, 2015 at 5:09 PM, agentalpha <yogesh.life@gmail.com> wrote:
> Hi Everyone,
>
> We are using Camel 2.13.2 and ActiveMQ 5.9.0.
> The configuration of activemq broker is as follows:
>         <bean id="activemq"
> class="org.apache.activemq.camel.component.ActiveMQComponent">
>                 <property name="configuration" ref="jmsConfig" />
>                 <property name="transacted" value="false" />
>                 <property name="acceptMessagesWhileStopping" value="false" />
>                 <property name="cacheLevelName" value="CACHE_CONSUMER" />
>         </bean>
>
>         <bean id="jmsConnectionFactory"
> class="org.apache.activemq.ActiveMQConnectionFactory">
>                 <property name="brokerURL" value="$[ren.brokerUrl]" />
>                 <property name="useAsyncSend" value="true" />
>         </bean>
>
>         <bean id="pooledConnectionFactory"
> class="org.apache.activemq.pool.PooledConnectionFactory"
>                 init-method="start" destroy-method="stop">
>                 <property name="maxConnections" value="10" />
>                 <property name="maximumActiveSessionPerConnection" value="-1" />
>                 <property name="expiryTimeout" value="0" />
>                 <property name="idleTimeout" value="0" />
>                 <property name="connectionFactory" ref="jmsConnectionFactory" />
>         </bean>
>
>         <bean id="jmsConfig"
> class="org.apache.camel.component.jms.JmsConfiguration">
>                 <property name="connectionFactory" ref="pooledConnectionFactory" />
>                 <property name="concurrentConsumers" value="15" />
>                 <property name="maxConcurrentConsumers" value="15" />
>                 <property name="maxMessagesPerTask" value="5" />
>                 <property name="idleTaskExecutionLimit" value="0" />
>                 <property name="idleConsumerLimit" value="0" />
>         </bean>
>
>
> As mentioned, we have concurrent and max concurrent consumers having value
> 15.
>
> We have following route sample:
> <route id="openApiRoute" trace="true" autoStartup="false"
>                         xmlns="http://camel.apache.org/schema/blueprint">
>                         <from uri="openAPI" />
>                         <onException>
>                                 <exception>java.lang.Throwable</exception>
>                                 <handled>
>                                         <constant>true</constant>
>                                 </handled>
>                                 <bean ref="errorHandlerBean" />
>                                 <convertBodyTo type="com.company.openapi.SoapMap"
/>
>                         </onException>
>                         <onException>
>                                 <exception>java.lang.Exception</exception>
>                                 <handled>
>                                         <constant>true</constant>
>                                 </handled>
>                                 <bean ref="errorHandlerBean" />
>                                 <convertBodyTo type="com.company.openapi.SoapMap"
/>
>                         </onException>
>                         <onException>
>                                 <exception>org.apache.camel.ExchangeTimedOutException</exception>
>                                 <redeliveryPolicy logRetryAttempted="true"
>                                         retryAttemptedLogLevel="WARN"
>
> maximumRedeliveries="{{ren.context.camel.openApiRoute.maximumRedeliveries}}"
>                                         redeliveryDelay="{{ren.context.camel.openApiRoute.redeliveryDelay}}"
/>
>                                 <handled>
>                                         <simple>${header.requestType} != 'broadcast'</simple>
>                                 </handled>
>                                 <choice>
>                                         <when>
>                                                 <simple>${header.requestType} !=
'broadcast'</simple>
>                                                 <bean ref="errorHandlerBean" />
>                                                 <convertBodyTo type="com.company.openapi.SoapMap"
/>
>                                         </when>
>                                 </choice>
>                         </onException>
>
>                         <convertBodyTo type="java.util.Map" />
>                         <choice>
>                                 <when>
>                                         <simple>${header.requestType} == 'broadcast'</simple>
>                                         <bean ref ="broadcasterBean"/>
>                                         <recipientList parallelProcessing="true"
> strategyRef="aggregatorStrategy" streaming="true" stopOnException="false"
> prop:timeout="{{ren.context.camel.httpRoute.aggregatorTimeout}}">
>                                                         <header>recipientList</header>
>                                         </recipientList>
>                                         <process ref="broadcastResultProcessor" />
>                                 </when>
>                                 <when>
>                                         <simple>${header.requestType} == 'forward'</simple>
>                                         <bean ref="dynamicRouterBean" method="route"
/>
>                                 </when>
>                                 <otherwise>
>                                         <bean ref="operationTypeNotSupported"
> method="throwOperationTypeNotSupportedException" />
>                                 </otherwise>
>                         </choice>
>                         <convertBodyTo type="com.company.openapi.SoapMap" />
> </route>
>
>
> What we are essentially doing, is fetching a request on CXF and forwarding
> it to a jms endpoint using IN/OUT messaging. We use dynamic router for
> performing this operation dynamically.
> A sample dynamic url for this jms endpoint is :
>
> activemq:queue:soapRequestQ2?exchangePattern=InOut&asyncConsumer=true&useMessageIDAsCorrelationID=false&requestTimeout=5000&transacted=false&replyTo=responseQ&replyToType=Exclusive
>
> Now few problematic behaviors of automatic consumer creation on
> responseQueue are:
> 1. Whenever the route starts and requests are sent, 15 consumers are created
> on the response queue. After that no matter how much the load is, these 15
> consumers are handling the requests properly (as per our tps expectations).
> Now if we dynamically update the above sample jms url in our system (even
> just the value of requestTimeout) and send another request, we see that the
> reponseQ now has additional 15 (total of 30) consumers on it. And right
> after this, most of the requests start failing for unable to map the
> response to the request.
> The message is sent in the request queue, picked up by the client, the
> response is sent, which we can see being enqueued adn dequeued in the
> response queue (from activeMQ web console), the response is fetched by our
> application but QueueReplyManager fails to map it to original request,
> mentioning Response received for unknown correlationId xyz. And just after a
> few seconds we see the exchangetimeout happening for the message with same
> correlationid.
> Is this because the new set of 15 consumers got created in different session
> than the original one and hence they are not able to share the
> correlationId?
> Even though I have maxConcurrentConsumers set to 15 for that specific queue,
> why is it still increasing beyond that value? (for every change in url, it
> keeps on increasing by 15).
>
> 2. We have same behaviour when we use the same queue/endpoint url for single
> request and in receipient list.
> The single forward request registers its own 15 consumers and when we
> perform a receipient list url, it creates additional 15 consumers on the
> queues (again maxconcurrentconsumers is 15 only).
>
> 3. When use the dynamic router (routing to the same jms in/out url) in
> multiple routes, we always have 15 consumers on the response queue.
> But we use receipient list in multiple routes (routing to same set of jms
> in/out urls), it creates set of 15 consumers per route (meaning if we have 3
> routes it creates 45 consumers).
> And again we start getting the same issue of response not being mapped to
> the request for unknown correlation id.
>
> Is there any book/blog from where I can understand how and why is this
> happening? How does camel/activemq creates and manages consumers on queues,
> specially when we are using the building blocks of dynamic router,
> receipient list, activemq endpoint etc.
>
> I have been trying to understand and find a correct solution for this since
> couple of weeks now.
> Please help me in this.
> Thanks.
>
> BR!
> Yogesh
>
>
>
>
>
> --
> View this message in context: http://camel.465427.n5.nabble.com/Camel-ActiveMQ-In-OUT-endpoint-creates-additional-consumers-on-response-queue-which-are-more-than-mas-tp5764288.html
> Sent from the Camel - Users mailing list archive at Nabble.com.



-- 
Claus Ibsen
-----------------
Red Hat, Inc.
Email: cibsen@redhat.com
Twitter: davsclaus
Blog: http://davsclaus.com
Author of Camel in Action: http://www.manning.com/ibsen
hawtio: http://hawt.io/
fabric8: http://fabric8.io/

Mime
View raw message