activemq-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From arjun <arjun....@gmail.com>
Subject Activemq producer locks
Date Thu, 13 Dec 2012 17:22:57 GMT
Hi,

We are having a webapplication running on tomcat in which we have an
embedded broker. We also have a separate broker that we make TCP connections
to. 
We use activemq 5.5.0

We have a camel route that looks like this -


We send object messages to the vm broker queue that routes the message to
main broker after marshalling

<route>
   <from uri="vmBroker:queue:QUEUE_NAME?concurrentConsumers=3" />
   <marshal ref="camelJaxbMarshaller" />
   <to ref="mainBroker:queue:QUEUE_NAME?jmsMessageType=Text" />
</route>

Here are the remaining broker configs

<bean id="vmBroker"
class="org.apache.activemq.camel.component.ActiveMQComponent">
   <property name="connectionFactory" ref="vmBrokerPooledConnFactory" />
</bean>

<bean id="vmBrokerConnFactory"
class="org.apache.activemq.ActiveMQConnectionFactory" 
         depends-on="vmBroker">
   <property name="brokerURL"
value="vm://embeddedBroker?brokerConfig=xbean:vmbroker-      
                                                   
activemq.xml&amp;jms.prefetchPolicy.queuePrefetch=10" />
    <property name="objectMessageSerializationDefered" value="true" />
    <property name="copyMessageOnSend" value="false" />
</bean>
	
    <bean id="vmBrokerPooledConnFactory"
class="org.apache.activemq.pool.PooledConnectionFactory"
destroy-method="stop">
      <property name="connectionFactory" ref="vmBrokerConnFactory" />
    </bean>    

	<bean id="vmBrokerDest" class="org.apache.activemq.command.ActiveMQQueue">
		<constructor-arg value="QUEUE_NAME" />
	</bean>

	<bean id="mainBroker"
class="org.apache.activemq.camel.component.ActiveMQComponent">
		<property name="connectionFactory" ref="primaryJmsConnectionFactory" />
	</bean>

        <amq:connectionFactory id="primaryJmsConnectionFactory"
                
brokerURL="failover://(tcp://host1:61616,tcp://host2:61616,tcp://host3:61616)?jms.alwaysSyncSend"
          userName="user_name" password="password"/>
-------------------------------------------------------------------------
the embedded broker's configs

	<amq:broker persistent="false" brokerName="embeddedBroker">
		<amq:destinationPolicy>
			<amq:policyMap>
				<amq:policyEntries>
					<amq:policyEntry queue=">" producerFlowControl="false">
						<amq:dispatchPolicy>
							<amq:roundRobinDispatchPolicy />
						</amq:dispatchPolicy>
					</amq:policyEntry>
				</amq:policyEntries>
			</amq:policyMap>
		</amq:destinationPolicy>

		<amq:systemUsage>
			<amq:systemUsage>
				<amq:memoryUsage>
					<amq:memoryUsage limit="128 mb" />
				</amq:memoryUsage>
			</amq:systemUsage>
		</amq:systemUsage>
	</amq:broker>
----------------------------------------------------------------------------

We have an issue where after consuming some 15000 odd messages, one of the
consumers of the emdedded broker stops consuming more messages. The inflight
count in jconsole shows 10 messages(which is equal to the prefetch). On
taking a thread dump of the tomcat(embedded broker), we find the below,

"Camel (camelcontextname) thread #2 - JmsConsumer[Consumer name" daemon
prio=10 tid=0x00002aaab8b8d000 nid=0x3f66 waiting on condition
[0x0000000044154000]
   java.lang.Thread.State: WAITING (parking)
	at sun.misc.Unsafe.park(Native Method)
	- parking to wait for  <0x0000000784607780> (a
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
	at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
	at
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987)
	at
java.util.concurrent.ArrayBlockingQueue.take(ArrayBlockingQueue.java:317)
	at
org.apache.activemq.transport.FutureResponse.getResult(FutureResponse.java:40)
	at
org.apache.activemq.transport.ResponseCorrelator.request(ResponseCorrelator.java:87)
	at
org.apache.activemq.ActiveMQConnection.syncSendPacket(ActiveMQConnection.java:1284)
	at
org.apache.activemq.ActiveMQConnection.ensureConnectionInfoSent(ActiveMQConnection.java:1392)
	- locked <0x0000000784607968> (a java.lang.Object)
	at
org.apache.activemq.ActiveMQConnection.createSession(ActiveMQConnection.java:309)
	at
org.springframework.jms.support.JmsAccessor.createSession(JmsAccessor.java:196)
	at org.springframework.jms.core.JmsTemplate.execute(JmsTemplate.java:457)
	at
org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate.send(JmsConfiguration.java:170)
	at org.apache.camel.component.jms.JmsProducer.doSend(JmsProducer.java:355)
	at
org.apache.camel.component.jms.JmsProducer.processInOnly(JmsProducer.java:309)
	at org.apache.camel.component.jms.JmsProducer.process(JmsProducer.java:99)
	at
org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:78)
	at
org.apache.camel.processor.SendProcessor$2.doInAsyncProducer(SendProcessor.java:114)
	at
org.apache.camel.impl.ProducerCache.doInAsyncProducer(ProducerCache.java:284)
	at org.apache.camel.processor.SendProcessor.process(SendProcessor.java:109)
	at
org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:78)
	at
org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:98)
	at
org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:89)
	at
org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:69)
	at
org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:78)
	at
org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:98)
	at
org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:89)
	at
org.apache.camel.processor.interceptor.TraceInterceptor.process(TraceInterceptor.java:99)
	at
org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:78)
	at
org.apache.camel.processor.RedeliveryErrorHandler.processErrorHandler(RedeliveryErrorHandler.java:318)
	at
org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:209)
	at
org.apache.camel.processor.DefaultChannel.process(DefaultChannel.java:305)
	at
org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:78)
	at org.apache.camel.processor.Pipeline.process(Pipeline.java:116)
	at org.apache.camel.processor.Pipeline.process(Pipeline.java:79)
	at
org.apache.camel.processor.UnitOfWorkProcessor.process(UnitOfWorkProcessor.java:102)
	at
org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:78)
	at
org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:98)
	at
org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:89)
	at
org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:69)
	at
org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:104)
	at
org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:85)
	at
org.apache.camel.component.jms.EndpointMessageListener.onMessage(EndpointMessageListener.java:92)
	at
org.springframework.jms.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:560)
	at
org.springframework.jms.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:498)
	at
org.springframework.jms.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:467)
	at
org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:325)
	at
org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:263)
	at
org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1058)
	at
org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1050)
	at
org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:947)
	at
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
	at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)

Would really appreciate any kind of pointers to wrong configs/any pointers
towards solving the issue. please  note, the other 2 consumers are consuming
messages and are fine. producer flow control is also disabled



--
View this message in context: http://activemq.2283324.n4.nabble.com/Activemq-producer-locks-tp4660619.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.

Mime
View raw message