activemq-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Pavel Bekkerman (Commented) (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (AMQ-3591) bridge transport is stuck on socket write at both ends
Date Thu, 17 Nov 2011 09:52:52 GMT

    [ https://issues.apache.org/jira/browse/AMQ-3591?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13151955#comment-13151955
] 

Pavel Bekkerman commented on AMQ-3591:
--------------------------------------

I'm yet to try to remove failover: (and leave static: only), but above is our netstat, as
you suggeted.
I don't see any unusual socket states - LISTEN or ESTABLISHED. 
Any ideas on how to proceed with the solution?
                
> bridge transport is stuck on socket write at both ends
> ------------------------------------------------------
>
>                 Key: AMQ-3591
>                 URL: https://issues.apache.org/jira/browse/AMQ-3591
>             Project: ActiveMQ
>          Issue Type: Bug
>          Components: Transport
>    Affects Versions: 5.5.0
>         Environment: 2 redhat linux
>            Reporter: Leon Fleysher
>
> We have embedded (into Jboss 7 web app) broker A on one linux connecting through tcp
duplex=true bridge to another standalone broker B. Both brokers exchange messages through
4 common queues. Each broker has its own persistence storage in mssql db.
> After a few hours of flawless work the bridge stops delivering the messages in both directions.
> broker A offending thread stack trace (causing the rest to wait on oneway mutex):
> BrokerService[as2analyticsEmbedded] Task-15 [RUNNABLE, IN_NATIVE] CPU time: 0:00
> java.net.SocketOutputStream.socketWrite0(FileDescriptor, byte[], int, int)
> java.net.SocketOutputStream.socketWrite(byte[], int, int)
> java.net.SocketOutputStream.write(byte[], int, int)
> org.apache.activemq.transport.tcp.TcpBufferedOutputStream.write(byte[], int, int)
> java.io.DataOutputStream.write(byte[], int, int)
> org.apache.activemq.openwire.v7.BaseDataStreamMarshaller.tightMarshalByteSequence2(ByteSequence,
DataOutput, BooleanStream)
> org.apache.activemq.openwire.v7.MessageMarshaller.tightMarshal2(OpenWireFormat, Object,
DataOutput, BooleanStream)
> org.apache.activemq.openwire.v7.ActiveMQMessageMarshaller.tightMarshal2(OpenWireFormat,
Object, DataOutput, BooleanStream)
> org.apache.activemq.openwire.v7.ActiveMQObjectMessageMarshaller.tightMarshal2(OpenWireFormat,
Object, DataOutput, BooleanStream)
> org.apache.activemq.openwire.OpenWireFormat.marshal(Object, DataOutput)
> org.apache.activemq.transport.tcp.TcpTransport.oneway(Object)
> org.apache.activemq.transport.InactivityMonitor.oneway(Object)
> org.apache.activemq.transport.TransportFilter.oneway(Object)
> org.apache.activemq.transport.WireFormatNegotiator.oneway(Object)
> org.apache.activemq.transport.failover.FailoverTransport.oneway(Object)
> org.apache.activemq.transport.MutexTransport.oneway(Object)
> org.apache.activemq.transport.ResponseCorrelator.asyncRequest(Object, ResponseCallback)
> org.apache.activemq.network.DemandForwardingBridgeSupport.serviceLocalCommand(Command)
> org.apache.activemq.network.DemandForwardingBridgeSupport$1.onCommand(Object)
> org.apache.activemq.transport.ResponseCorrelator.onCommand(Object)
> org.apache.activemq.transport.TransportFilter.onCommand(Object)
> org.apache.activemq.transport.vm.VMTransport.dispatch(VMTransport, TransportListener,
Object)
> org.apache.activemq.transport.vm.VMTransport.oneway(Object)
> org.apache.activemq.transport.MutexTransport.oneway(Object)
> org.apache.activemq.transport.ResponseCorrelator.oneway(Object)
> org.apache.activemq.broker.TransportConnection.dispatch(Command)
> org.apache.activemq.broker.TransportConnection.processDispatch(Command)
> org.apache.activemq.broker.TransportConnection.iterate()
> org.apache.activemq.thread.PooledTaskRunner.runTask()
> org.apache.activemq.thread.PooledTaskRunner$1.run()
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(Runnable)
> java.util.concurrent.ThreadPoolExecutor$Worker.run()
> java.lang.Thread.run()
> broker B offending thread stack trace (causing the rest to wait on oneway mutex):
> ActiveMQ Connection Dispatcher: vm://analyticsCentral#6 [RUNNABLE, IN_NATIVE] CPU time:
0:00
> java.net.SocketOutputStream.socketWrite0(FileDescriptor, byte[], int, int)
> java.net.SocketOutputStream.socketWrite(byte[], int, int)
> java.net.SocketOutputStream.write(byte[], int, int)
> org.apache.activemq.transport.tcp.TcpBufferedOutputStream.flush()
> java.io.DataOutputStream.flush()
> org.apache.activemq.transport.tcp.TcpTransport.oneway(Object)
> org.apache.activemq.transport.InactivityMonitor.oneway(Object)
> org.apache.activemq.transport.TransportFilter.oneway(Object)
> org.apache.activemq.transport.WireFormatNegotiator.oneway(Object)
> org.apache.activemq.transport.MutexTransport.oneway(Object)
> org.apache.activemq.transport.ResponseCorrelator.asyncRequest(Object, ResponseCallback)
> org.apache.activemq.network.DemandForwardingBridgeSupport.serviceLocalCommand(Command)
> org.apache.activemq.network.DemandForwardingBridgeSupport$1.onCommand(Object)
> org.apache.activemq.transport.ResponseCorrelator.onCommand(Object)
> org.apache.activemq.transport.TransportFilter.onCommand(Object)
> org.apache.activemq.transport.vm.VMTransport.dispatch(VMTransport, TransportListener,
Object)
> org.apache.activemq.transport.vm.VMTransport.oneway(Object)
> org.apache.activemq.transport.MutexTransport.oneway(Object)
> org.apache.activemq.transport.ResponseCorrelator.oneway(Object)
> org.apache.activemq.broker.TransportConnection.dispatch(Command)
> org.apache.activemq.broker.TransportConnection.processDispatch(Command)
> org.apache.activemq.broker.TransportConnection.iterate()
> org.apache.activemq.thread.DedicatedTaskRunner.runTask()
> org.apache.activemq.thread.DedicatedTaskRunner$1.run()
> Both brokers run on a local 1 Gbit network. This scenario happens always after few hours
of work and is recreated consistently.
> Broker A relevant configuration:
> <beans
>   xmlns="http://www.springframework.org/schema/beans"
>   xmlns:amq="http://activemq.apache.org/schema/core"
>   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
>   xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core-5.5.0.xsd"
> >
>     <broker xmlns="http://activemq.apache.org/schema/core" brokerName="as2analyticsEmbedded"
persistent="true" dataDirectory="/home/as2/amq_storage" destroyApplicationContextOnStop="true">
>         <destinationPolicy>
>             <policyMap>
>               <policyEntries>
>                 <policyEntry queue=">" producerFlowControl="false" memoryLimit="1gb">
>                   <pendingQueuePolicy>
>                     <fileQueueCursor/>
>                   </pendingQueuePolicy>
>                 </policyEntry>
>               </policyEntries>
>             </policyMap>
>         </destinationPolicy>
>         <managementContext>
>                 <managementContext createConnector="false"/>
>         </managementContext>
>         <networkConnectors>
>             <networkConnector uri="static:(failover:(tcp://10.0.213.38:61616))"
>                 name="bridge"
>                 duplex="true"
>                 conduitSubscriptions="true"
>                 decreaseNetworkConsumerPriority="false">
>             </networkConnector>
>         </networkConnectors>
>         <!-- both FS and DB must be provided. AMQ will use DB for long-term storage.
>         FS contents that has not been consumed yet is stored to DB from time to time
-->
>         <persistenceAdapter>
>             <jdbcPersistenceAdapter dataDirectory="/home/as2/amq_storage/as2analyticsEmbedded"
dataSource="#mssql-ds" useDatabaseLock="false">
>                 <adapter>
>                         <imageBasedJDBCAdaptor/>
>                 </adapter>
>             </jdbcPersistenceAdapter>
>         </persistenceAdapter>
>         <systemUsage>
>             <systemUsage>
>                 <memoryUsage>
>                     <memoryUsage limit="2gb"/>
>                 </memoryUsage>
>                 <storeUsage>
>                     <storeUsage limit="80gb"/>
>                 </storeUsage>
>                 <tempUsage>
>                     <tempUsage limit="10gb"/>
>                 </tempUsage>
>             </systemUsage>
>         </systemUsage>
>         <transportConnectors>
>             <transportConnector name="as2analyticsEmbeddedConnector" uri="vm://as2analyticsEmbedded"/>
>         </transportConnectors>
>     </broker>
>     <bean id="mssql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
>         <property name="driverClassName" value="com.microsoft.sqlserver.jdbc.SQLServerDriver"/>
>         <property name="url" value="jdbc:sqlserver://10.0.213.159:1433;databaseName=obfuscated"/>
>         <property name="username" value="obfuscated"/>
>         <property name="password" value="obfuscated"/>
>         <property name="initialSize" value="1"/>
>         <property name="maxActive" value="100"/>
>         <property name="poolPreparedStatements" value="true"/>
>     </bean>
> </beans>
> Broker B relevant configuration:
> <beans
>   xmlns="http://www.springframework.org/schema/beans"
>   xmlns:amq="http://activemq.apache.org/schema/core"
>   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
>   xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
>   http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
>     <!-- Allows us to use system properties as variables in this configuration file
-->
>     <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
>         <property name="locations">
>             <value>file:${activemq.base}/conf/credentials.properties</value>
>         </property>
>     </bean>
>     <!--
>         The <broker> element is used to configure the ActiveMQ broker.
>     -->
>     <broker xmlns="http://activemq.apache.org/schema/core" brokerName="analyticsCentral"
dataDirectory="${activemq.base}/data" destroyApplicationContextOnStop="true">
>         <!--
>                         For better performances use VM cursor and small memory limit.
>                         For more information, see:
>             http://activemq.apache.org/message-cursors.html
>             Also, if your producer is "hanging", it's probably due to producer flow control.
>             For more information, see:
>             http://activemq.apache.org/producer-flow-control.html
>         -->
>         <destinationPolicy>
>             <policyMap>
>               <policyEntries>
>                 <policyEntry queue=">" producerFlowControl="false" memoryLimit="1gb">
>                   <pendingQueuePolicy>
>                     <fileQueueCursor/>
>                   </pendingQueuePolicy>
>                 </policyEntry>
>                 <policyEntry queue=">">
>                    <deadLetterStrategy>
>                      <individualDeadLetterStrategy
>                         queuePrefix="DLQ." useQueueForQueueMessages="true" />
>                    </deadLetterStrategy>
>                </policyEntry>
>               </policyEntries>
>             </policyMap>
>         </destinationPolicy>
>         <!--
>             The managementContext is used to configure how ActiveMQ is exposed in
>             JMX. By default, ActiveMQ uses the MBean server that is started by
>             the JVM. For more information, see:
>             http://activemq.apache.org/jmx.html
>         -->
>         <managementContext>
>             <managementContext createConnector="false"/>
>         </managementContext>
>         <!--
>             Configure message persistence for the broker. The default persistence
>             mechanism is the KahaDB store (identified by the kahaDB tag).
>             For more information, see:
>             http://activemq.apache.org/persistence.html
>         -->
>         <persistenceAdapter>
>                 <jdbcPersistenceAdapter dataDirectory="${activemq.base}/data" dataSource="#mssql-ds"
useDatabaseLock="false">
>                 <adapter>
>                         <imageBasedJDBCAdaptor/>
>                 </adapter>
>                 </jdbcPersistenceAdapter>
>         </persistenceAdapter>
>         <systemUsage>
>             <systemUsage>
>                 <memoryUsage>
>                     <memoryUsage limit="2gb"/>
>                 </memoryUsage>
>                 <storeUsage>
>                     <storeUsage limit="10gb"/>
>                 </storeUsage>
>                 <tempUsage>
>                     <tempUsage limit="5gb"/>
>                 </tempUsage>
>             </systemUsage>
>         </systemUsage>
>         <!--
>             The transport connectors expose ActiveMQ over a given protocol to
>             clients and other brokers. For more information, see:
>             http://activemq.apache.org/configuring-transports.html
>         -->
>         <transportConnectors>
>             <transportConnector name="openwire" uri="tcp://0.0.0.0:61616"/>
>         </transportConnectors>
>     </broker>
>         <amq:connectionFactory id="jmsFactory" brokerURL="tcp://localhost">
>               <amq:redeliveryPolicy>
>                 <amq:redeliveryPolicy maximumRedeliveries="0"/>
>               </amq:redeliveryPolicy>
>         </amq:connectionFactory>
>         <bean id="mssql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
>                 <property name="driverClassName" value="com.microsoft.sqlserver.jdbc.SQLServerDriver"/>
>                 <property name="url" value="jdbc:sqlserver://10.0.213.159:1433;DatabaseName=obfuscated"/>
>                 <property name="username" value="obfuscated"/>
>                 <property name="password" value="obfuscated"/>
>                 <property name="initialSize" value="1"/>
>                 <property name="maxActive" value="100"/>
>                 <property name="poolPreparedStatements" value="true"/>
>         </bean>
>     <!--
>         Enable web consoles, REST and Ajax APIs and demos
>         It also includes Camel (with its web console), see ${ACTIVEMQ_HOME}/conf/camel.xml
for more info
>         Take a look at ${ACTIVEMQ_HOME}/conf/jetty.xml for more details
>     -->
>     <import resource="jetty.xml"/>
> </beans>

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

Mime
View raw message