activemq-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Pavel Bekkerman (Commented) (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (AMQ-3591) bridge transport is stuck on socket write at both ends
Date Thu, 17 Nov 2011 10:50:51 GMT

    [ https://issues.apache.org/jira/browse/AMQ-3591?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13151984#comment-13151984
] 

Pavel Bekkerman commented on AMQ-3591:
--------------------------------------

I've removed "failover:" and running the test that replicates the block.

In the meantime, something that might be related to our case: 

We have a producer P1 for a queue Q1 connected to the central broker CB.
We have a consumer C1 for a queue Q1 connected to the embedded broke EB.
CB and EB are connected using duplex network (as described above).
We send 1000 messages using P1 to Q1. No other activity is performed.
While processing of each message from Q1 by C1 may take minutes, we see in the CB web console,
that all 1000 messages were dequeued in seconds after being sent.

Is this behavior normal?

                
> bridge transport is stuck on socket write at both ends
> ------------------------------------------------------
>
>                 Key: AMQ-3591
>                 URL: https://issues.apache.org/jira/browse/AMQ-3591
>             Project: ActiveMQ
>          Issue Type: Bug
>          Components: Transport
>    Affects Versions: 5.5.0
>         Environment: 2 redhat linux
>            Reporter: Leon Fleysher
>
> We have embedded (into Jboss 7 web app) broker A on one linux connecting through tcp
duplex=true bridge to another standalone broker B. Both brokers exchange messages through
4 common queues. Each broker has its own persistence storage in mssql db.
> After a few hours of flawless work the bridge stops delivering the messages in both directions.
> broker A offending thread stack trace (causing the rest to wait on oneway mutex):
> BrokerService[as2analyticsEmbedded] Task-15 [RUNNABLE, IN_NATIVE] CPU time: 0:00
> java.net.SocketOutputStream.socketWrite0(FileDescriptor, byte[], int, int)
> java.net.SocketOutputStream.socketWrite(byte[], int, int)
> java.net.SocketOutputStream.write(byte[], int, int)
> org.apache.activemq.transport.tcp.TcpBufferedOutputStream.write(byte[], int, int)
> java.io.DataOutputStream.write(byte[], int, int)
> org.apache.activemq.openwire.v7.BaseDataStreamMarshaller.tightMarshalByteSequence2(ByteSequence,
DataOutput, BooleanStream)
> org.apache.activemq.openwire.v7.MessageMarshaller.tightMarshal2(OpenWireFormat, Object,
DataOutput, BooleanStream)
> org.apache.activemq.openwire.v7.ActiveMQMessageMarshaller.tightMarshal2(OpenWireFormat,
Object, DataOutput, BooleanStream)
> org.apache.activemq.openwire.v7.ActiveMQObjectMessageMarshaller.tightMarshal2(OpenWireFormat,
Object, DataOutput, BooleanStream)
> org.apache.activemq.openwire.OpenWireFormat.marshal(Object, DataOutput)
> org.apache.activemq.transport.tcp.TcpTransport.oneway(Object)
> org.apache.activemq.transport.InactivityMonitor.oneway(Object)
> org.apache.activemq.transport.TransportFilter.oneway(Object)
> org.apache.activemq.transport.WireFormatNegotiator.oneway(Object)
> org.apache.activemq.transport.failover.FailoverTransport.oneway(Object)
> org.apache.activemq.transport.MutexTransport.oneway(Object)
> org.apache.activemq.transport.ResponseCorrelator.asyncRequest(Object, ResponseCallback)
> org.apache.activemq.network.DemandForwardingBridgeSupport.serviceLocalCommand(Command)
> org.apache.activemq.network.DemandForwardingBridgeSupport$1.onCommand(Object)
> org.apache.activemq.transport.ResponseCorrelator.onCommand(Object)
> org.apache.activemq.transport.TransportFilter.onCommand(Object)
> org.apache.activemq.transport.vm.VMTransport.dispatch(VMTransport, TransportListener,
Object)
> org.apache.activemq.transport.vm.VMTransport.oneway(Object)
> org.apache.activemq.transport.MutexTransport.oneway(Object)
> org.apache.activemq.transport.ResponseCorrelator.oneway(Object)
> org.apache.activemq.broker.TransportConnection.dispatch(Command)
> org.apache.activemq.broker.TransportConnection.processDispatch(Command)
> org.apache.activemq.broker.TransportConnection.iterate()
> org.apache.activemq.thread.PooledTaskRunner.runTask()
> org.apache.activemq.thread.PooledTaskRunner$1.run()
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(Runnable)
> java.util.concurrent.ThreadPoolExecutor$Worker.run()
> java.lang.Thread.run()
> broker B offending thread stack trace (causing the rest to wait on oneway mutex):
> ActiveMQ Connection Dispatcher: vm://analyticsCentral#6 [RUNNABLE, IN_NATIVE] CPU time:
0:00
> java.net.SocketOutputStream.socketWrite0(FileDescriptor, byte[], int, int)
> java.net.SocketOutputStream.socketWrite(byte[], int, int)
> java.net.SocketOutputStream.write(byte[], int, int)
> org.apache.activemq.transport.tcp.TcpBufferedOutputStream.flush()
> java.io.DataOutputStream.flush()
> org.apache.activemq.transport.tcp.TcpTransport.oneway(Object)
> org.apache.activemq.transport.InactivityMonitor.oneway(Object)
> org.apache.activemq.transport.TransportFilter.oneway(Object)
> org.apache.activemq.transport.WireFormatNegotiator.oneway(Object)
> org.apache.activemq.transport.MutexTransport.oneway(Object)
> org.apache.activemq.transport.ResponseCorrelator.asyncRequest(Object, ResponseCallback)
> org.apache.activemq.network.DemandForwardingBridgeSupport.serviceLocalCommand(Command)
> org.apache.activemq.network.DemandForwardingBridgeSupport$1.onCommand(Object)
> org.apache.activemq.transport.ResponseCorrelator.onCommand(Object)
> org.apache.activemq.transport.TransportFilter.onCommand(Object)
> org.apache.activemq.transport.vm.VMTransport.dispatch(VMTransport, TransportListener,
Object)
> org.apache.activemq.transport.vm.VMTransport.oneway(Object)
> org.apache.activemq.transport.MutexTransport.oneway(Object)
> org.apache.activemq.transport.ResponseCorrelator.oneway(Object)
> org.apache.activemq.broker.TransportConnection.dispatch(Command)
> org.apache.activemq.broker.TransportConnection.processDispatch(Command)
> org.apache.activemq.broker.TransportConnection.iterate()
> org.apache.activemq.thread.DedicatedTaskRunner.runTask()
> org.apache.activemq.thread.DedicatedTaskRunner$1.run()
> Both brokers run on a local 1 Gbit network. This scenario happens always after few hours
of work and is recreated consistently.
> Broker A relevant configuration:
> <beans
>   xmlns="http://www.springframework.org/schema/beans"
>   xmlns:amq="http://activemq.apache.org/schema/core"
>   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
>   xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core-5.5.0.xsd"
> >
>     <broker xmlns="http://activemq.apache.org/schema/core" brokerName="as2analyticsEmbedded"
persistent="true" dataDirectory="/home/as2/amq_storage" destroyApplicationContextOnStop="true">
>         <destinationPolicy>
>             <policyMap>
>               <policyEntries>
>                 <policyEntry queue=">" producerFlowControl="false" memoryLimit="1gb">
>                   <pendingQueuePolicy>
>                     <fileQueueCursor/>
>                   </pendingQueuePolicy>
>                 </policyEntry>
>               </policyEntries>
>             </policyMap>
>         </destinationPolicy>
>         <managementContext>
>                 <managementContext createConnector="false"/>
>         </managementContext>
>         <networkConnectors>
>             <networkConnector uri="static:(failover:(tcp://10.0.213.38:61616))"
>                 name="bridge"
>                 duplex="true"
>                 conduitSubscriptions="true"
>                 decreaseNetworkConsumerPriority="false">
>             </networkConnector>
>         </networkConnectors>
>         <!-- both FS and DB must be provided. AMQ will use DB for long-term storage.
>         FS contents that has not been consumed yet is stored to DB from time to time
-->
>         <persistenceAdapter>
>             <jdbcPersistenceAdapter dataDirectory="/home/as2/amq_storage/as2analyticsEmbedded"
dataSource="#mssql-ds" useDatabaseLock="false">
>                 <adapter>
>                         <imageBasedJDBCAdaptor/>
>                 </adapter>
>             </jdbcPersistenceAdapter>
>         </persistenceAdapter>
>         <systemUsage>
>             <systemUsage>
>                 <memoryUsage>
>                     <memoryUsage limit="2gb"/>
>                 </memoryUsage>
>                 <storeUsage>
>                     <storeUsage limit="80gb"/>
>                 </storeUsage>
>                 <tempUsage>
>                     <tempUsage limit="10gb"/>
>                 </tempUsage>
>             </systemUsage>
>         </systemUsage>
>         <transportConnectors>
>             <transportConnector name="as2analyticsEmbeddedConnector" uri="vm://as2analyticsEmbedded"/>
>         </transportConnectors>
>     </broker>
>     <bean id="mssql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
>         <property name="driverClassName" value="com.microsoft.sqlserver.jdbc.SQLServerDriver"/>
>         <property name="url" value="jdbc:sqlserver://10.0.213.159:1433;databaseName=obfuscated"/>
>         <property name="username" value="obfuscated"/>
>         <property name="password" value="obfuscated"/>
>         <property name="initialSize" value="1"/>
>         <property name="maxActive" value="100"/>
>         <property name="poolPreparedStatements" value="true"/>
>     </bean>
> </beans>
> Broker B relevant configuration:
> <beans
>   xmlns="http://www.springframework.org/schema/beans"
>   xmlns:amq="http://activemq.apache.org/schema/core"
>   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
>   xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
>   http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
>     <!-- Allows us to use system properties as variables in this configuration file
-->
>     <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
>         <property name="locations">
>             <value>file:${activemq.base}/conf/credentials.properties</value>
>         </property>
>     </bean>
>     <!--
>         The <broker> element is used to configure the ActiveMQ broker.
>     -->
>     <broker xmlns="http://activemq.apache.org/schema/core" brokerName="analyticsCentral"
dataDirectory="${activemq.base}/data" destroyApplicationContextOnStop="true">
>         <!--
>                         For better performances use VM cursor and small memory limit.
>                         For more information, see:
>             http://activemq.apache.org/message-cursors.html
>             Also, if your producer is "hanging", it's probably due to producer flow control.
>             For more information, see:
>             http://activemq.apache.org/producer-flow-control.html
>         -->
>         <destinationPolicy>
>             <policyMap>
>               <policyEntries>
>                 <policyEntry queue=">" producerFlowControl="false" memoryLimit="1gb">
>                   <pendingQueuePolicy>
>                     <fileQueueCursor/>
>                   </pendingQueuePolicy>
>                 </policyEntry>
>                 <policyEntry queue=">">
>                    <deadLetterStrategy>
>                      <individualDeadLetterStrategy
>                         queuePrefix="DLQ." useQueueForQueueMessages="true" />
>                    </deadLetterStrategy>
>                </policyEntry>
>               </policyEntries>
>             </policyMap>
>         </destinationPolicy>
>         <!--
>             The managementContext is used to configure how ActiveMQ is exposed in
>             JMX. By default, ActiveMQ uses the MBean server that is started by
>             the JVM. For more information, see:
>             http://activemq.apache.org/jmx.html
>         -->
>         <managementContext>
>             <managementContext createConnector="false"/>
>         </managementContext>
>         <!--
>             Configure message persistence for the broker. The default persistence
>             mechanism is the KahaDB store (identified by the kahaDB tag).
>             For more information, see:
>             http://activemq.apache.org/persistence.html
>         -->
>         <persistenceAdapter>
>                 <jdbcPersistenceAdapter dataDirectory="${activemq.base}/data" dataSource="#mssql-ds"
useDatabaseLock="false">
>                 <adapter>
>                         <imageBasedJDBCAdaptor/>
>                 </adapter>
>                 </jdbcPersistenceAdapter>
>         </persistenceAdapter>
>         <systemUsage>
>             <systemUsage>
>                 <memoryUsage>
>                     <memoryUsage limit="2gb"/>
>                 </memoryUsage>
>                 <storeUsage>
>                     <storeUsage limit="10gb"/>
>                 </storeUsage>
>                 <tempUsage>
>                     <tempUsage limit="5gb"/>
>                 </tempUsage>
>             </systemUsage>
>         </systemUsage>
>         <!--
>             The transport connectors expose ActiveMQ over a given protocol to
>             clients and other brokers. For more information, see:
>             http://activemq.apache.org/configuring-transports.html
>         -->
>         <transportConnectors>
>             <transportConnector name="openwire" uri="tcp://0.0.0.0:61616"/>
>         </transportConnectors>
>     </broker>
>         <amq:connectionFactory id="jmsFactory" brokerURL="tcp://localhost">
>               <amq:redeliveryPolicy>
>                 <amq:redeliveryPolicy maximumRedeliveries="0"/>
>               </amq:redeliveryPolicy>
>         </amq:connectionFactory>
>         <bean id="mssql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
>                 <property name="driverClassName" value="com.microsoft.sqlserver.jdbc.SQLServerDriver"/>
>                 <property name="url" value="jdbc:sqlserver://10.0.213.159:1433;DatabaseName=obfuscated"/>
>                 <property name="username" value="obfuscated"/>
>                 <property name="password" value="obfuscated"/>
>                 <property name="initialSize" value="1"/>
>                 <property name="maxActive" value="100"/>
>                 <property name="poolPreparedStatements" value="true"/>
>         </bean>
>     <!--
>         Enable web consoles, REST and Ajax APIs and demos
>         It also includes Camel (with its web console), see ${ACTIVEMQ_HOME}/conf/camel.xml
for more info
>         Take a look at ${ACTIVEMQ_HOME}/conf/jetty.xml for more details
>     -->
>     <import resource="jetty.xml"/>
> </beans>

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

Mime
View raw message