activemq-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Leon Fleysher (Created) (JIRA)" <j...@apache.org>
Subject [jira] [Created] (AMQ-3591) bridge transport is stuck on socket write at both ends
Date Sat, 12 Nov 2011 13:56:52 GMT
bridge transport is stuck on socket write at both ends
------------------------------------------------------

                 Key: AMQ-3591
                 URL: https://issues.apache.org/jira/browse/AMQ-3591
             Project: ActiveMQ
          Issue Type: Bug
          Components: Transport
    Affects Versions: 5.5.0
         Environment: 2 redhat linux
            Reporter: Leon Fleysher


We have embedded (into Jboss 7 web app) broker A on one linux connecting through tcp duplex=true
bridge to another standalone broker B. Both brokers exchange messages through 4 common queues.
Each broker has its own persistence storage in mssql db.

After a few hours of flawless work the bridge stops delivering the messages in both directions.

broker A offending thread stack trace (causing the rest to wait on oneway mutex):
BrokerService[as2analyticsEmbedded] Task-15 [RUNNABLE, IN_NATIVE] CPU time: 0:00
java.net.SocketOutputStream.socketWrite0(FileDescriptor, byte[], int, int)
java.net.SocketOutputStream.socketWrite(byte[], int, int)
java.net.SocketOutputStream.write(byte[], int, int)
org.apache.activemq.transport.tcp.TcpBufferedOutputStream.write(byte[], int, int)
java.io.DataOutputStream.write(byte[], int, int)
org.apache.activemq.openwire.v7.BaseDataStreamMarshaller.tightMarshalByteSequence2(ByteSequence,
DataOutput, BooleanStream)
org.apache.activemq.openwire.v7.MessageMarshaller.tightMarshal2(OpenWireFormat, Object, DataOutput,
BooleanStream)
org.apache.activemq.openwire.v7.ActiveMQMessageMarshaller.tightMarshal2(OpenWireFormat, Object,
DataOutput, BooleanStream)
org.apache.activemq.openwire.v7.ActiveMQObjectMessageMarshaller.tightMarshal2(OpenWireFormat,
Object, DataOutput, BooleanStream)
org.apache.activemq.openwire.OpenWireFormat.marshal(Object, DataOutput)
org.apache.activemq.transport.tcp.TcpTransport.oneway(Object)
org.apache.activemq.transport.InactivityMonitor.oneway(Object)
org.apache.activemq.transport.TransportFilter.oneway(Object)
org.apache.activemq.transport.WireFormatNegotiator.oneway(Object)
org.apache.activemq.transport.failover.FailoverTransport.oneway(Object)
org.apache.activemq.transport.MutexTransport.oneway(Object)
org.apache.activemq.transport.ResponseCorrelator.asyncRequest(Object, ResponseCallback)
org.apache.activemq.network.DemandForwardingBridgeSupport.serviceLocalCommand(Command)
org.apache.activemq.network.DemandForwardingBridgeSupport$1.onCommand(Object)
org.apache.activemq.transport.ResponseCorrelator.onCommand(Object)
org.apache.activemq.transport.TransportFilter.onCommand(Object)
org.apache.activemq.transport.vm.VMTransport.dispatch(VMTransport, TransportListener, Object)
org.apache.activemq.transport.vm.VMTransport.oneway(Object)
org.apache.activemq.transport.MutexTransport.oneway(Object)
org.apache.activemq.transport.ResponseCorrelator.oneway(Object)
org.apache.activemq.broker.TransportConnection.dispatch(Command)
org.apache.activemq.broker.TransportConnection.processDispatch(Command)
org.apache.activemq.broker.TransportConnection.iterate()
org.apache.activemq.thread.PooledTaskRunner.runTask()
org.apache.activemq.thread.PooledTaskRunner$1.run()
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(Runnable)
java.util.concurrent.ThreadPoolExecutor$Worker.run()
java.lang.Thread.run()

broker B offending thread stack trace (causing the rest to wait on oneway mutex):

ActiveMQ Connection Dispatcher: vm://analyticsCentral#6 [RUNNABLE, IN_NATIVE] CPU time: 0:00
java.net.SocketOutputStream.socketWrite0(FileDescriptor, byte[], int, int)
java.net.SocketOutputStream.socketWrite(byte[], int, int)
java.net.SocketOutputStream.write(byte[], int, int)
org.apache.activemq.transport.tcp.TcpBufferedOutputStream.flush()
java.io.DataOutputStream.flush()
org.apache.activemq.transport.tcp.TcpTransport.oneway(Object)
org.apache.activemq.transport.InactivityMonitor.oneway(Object)
org.apache.activemq.transport.TransportFilter.oneway(Object)
org.apache.activemq.transport.WireFormatNegotiator.oneway(Object)
org.apache.activemq.transport.MutexTransport.oneway(Object)
org.apache.activemq.transport.ResponseCorrelator.asyncRequest(Object, ResponseCallback)
org.apache.activemq.network.DemandForwardingBridgeSupport.serviceLocalCommand(Command)
org.apache.activemq.network.DemandForwardingBridgeSupport$1.onCommand(Object)
org.apache.activemq.transport.ResponseCorrelator.onCommand(Object)
org.apache.activemq.transport.TransportFilter.onCommand(Object)
org.apache.activemq.transport.vm.VMTransport.dispatch(VMTransport, TransportListener, Object)
org.apache.activemq.transport.vm.VMTransport.oneway(Object)
org.apache.activemq.transport.MutexTransport.oneway(Object)
org.apache.activemq.transport.ResponseCorrelator.oneway(Object)
org.apache.activemq.broker.TransportConnection.dispatch(Command)
org.apache.activemq.broker.TransportConnection.processDispatch(Command)
org.apache.activemq.broker.TransportConnection.iterate()
org.apache.activemq.thread.DedicatedTaskRunner.runTask()
org.apache.activemq.thread.DedicatedTaskRunner$1.run()

Both brokers run on a local 1 Gbit network. This scenario happens always after few hours of
work and is recreated consistently.

Broker A relevant configuration:
<beans
  xmlns="http://www.springframework.org/schema/beans"
  xmlns:amq="http://activemq.apache.org/schema/core"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core-5.5.0.xsd"
>

    <broker xmlns="http://activemq.apache.org/schema/core" brokerName="as2analyticsEmbedded"
persistent="true" dataDirectory="/home/as2/amq_storage" destroyApplicationContextOnStop="true">
        <destinationPolicy>
            <policyMap>
              <policyEntries>
                <policyEntry queue=">" producerFlowControl="false" memoryLimit="1gb">
                  <pendingQueuePolicy>
                    <fileQueueCursor/>
                  </pendingQueuePolicy>
                </policyEntry>
              </policyEntries>
            </policyMap>
        </destinationPolicy>

        <managementContext>
                <managementContext createConnector="false"/>
        </managementContext>

        <networkConnectors>
            <networkConnector uri="static:(failover:(tcp://10.0.213.38:61616))"
                name="bridge"
                duplex="true"
                conduitSubscriptions="true"
                decreaseNetworkConsumerPriority="false">
            </networkConnector>
        </networkConnectors>

        <!-- both FS and DB must be provided. AMQ will use DB for long-term storage.
        FS contents that has not been consumed yet is stored to DB from time to time -->
        <persistenceAdapter>
            <jdbcPersistenceAdapter dataDirectory="/home/as2/amq_storage/as2analyticsEmbedded"
dataSource="#mssql-ds" useDatabaseLock="false">
                <adapter>
                        <imageBasedJDBCAdaptor/>
                </adapter>
            </jdbcPersistenceAdapter>
        </persistenceAdapter>

        <systemUsage>
            <systemUsage>
                <memoryUsage>
                    <memoryUsage limit="2gb"/>
                </memoryUsage>
                <storeUsage>
                    <storeUsage limit="80gb"/>
                </storeUsage>
                <tempUsage>
                    <tempUsage limit="10gb"/>
                </tempUsage>
            </systemUsage>
        </systemUsage>

        <transportConnectors>
            <transportConnector name="as2analyticsEmbeddedConnector" uri="vm://as2analyticsEmbedded"/>
        </transportConnectors>

    </broker>

    <bean id="mssql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
        <property name="driverClassName" value="com.microsoft.sqlserver.jdbc.SQLServerDriver"/>
        <property name="url" value="jdbc:sqlserver://10.0.213.159:1433;databaseName=obfuscated"/>
        <property name="username" value="obfuscated"/>
        <property name="password" value="obfuscated"/>
        <property name="initialSize" value="1"/>
        <property name="maxActive" value="100"/>
        <property name="poolPreparedStatements" value="true"/>
    </bean>

</beans>

Broker B relevant configuration:

<beans
  xmlns="http://www.springframework.org/schema/beans"
  xmlns:amq="http://activemq.apache.org/schema/core"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
  http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">

    <!-- Allows us to use system properties as variables in this configuration file -->
    <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
        <property name="locations">
            <value>file:${activemq.base}/conf/credentials.properties</value>
        </property>
    </bean>

    <!--
        The <broker> element is used to configure the ActiveMQ broker.
    -->
    <broker xmlns="http://activemq.apache.org/schema/core" brokerName="analyticsCentral"
dataDirectory="${activemq.base}/data" destroyApplicationContextOnStop="true">

        <!--
                        For better performances use VM cursor and small memory limit.
                        For more information, see:

            http://activemq.apache.org/message-cursors.html

            Also, if your producer is "hanging", it's probably due to producer flow control.
            For more information, see:
            http://activemq.apache.org/producer-flow-control.html
        -->

        <destinationPolicy>
            <policyMap>
              <policyEntries>
                <policyEntry queue=">" producerFlowControl="false" memoryLimit="1gb">
                  <pendingQueuePolicy>
                    <fileQueueCursor/>
                  </pendingQueuePolicy>
                </policyEntry>
                <policyEntry queue=">">
                   <deadLetterStrategy>
                     <individualDeadLetterStrategy
                        queuePrefix="DLQ." useQueueForQueueMessages="true" />
                   </deadLetterStrategy>
               </policyEntry>
              </policyEntries>
            </policyMap>
        </destinationPolicy>


        <!--
            The managementContext is used to configure how ActiveMQ is exposed in
            JMX. By default, ActiveMQ uses the MBean server that is started by
            the JVM. For more information, see:

            http://activemq.apache.org/jmx.html
        -->
        <managementContext>
            <managementContext createConnector="false"/>
        </managementContext>

        <!--
            Configure message persistence for the broker. The default persistence
            mechanism is the KahaDB store (identified by the kahaDB tag).
            For more information, see:

            http://activemq.apache.org/persistence.html
        -->
        <persistenceAdapter>
                <jdbcPersistenceAdapter dataDirectory="${activemq.base}/data" dataSource="#mssql-ds"
useDatabaseLock="false">
                <adapter>
                        <imageBasedJDBCAdaptor/>
                </adapter>
                </jdbcPersistenceAdapter>
        </persistenceAdapter>

        <systemUsage>
            <systemUsage>
                <memoryUsage>
                    <memoryUsage limit="2gb"/>
                </memoryUsage>
                <storeUsage>
                    <storeUsage limit="10gb"/>
                </storeUsage>
                <tempUsage>
                    <tempUsage limit="5gb"/>
                </tempUsage>
            </systemUsage>
        </systemUsage>

        <!--
            The transport connectors expose ActiveMQ over a given protocol to
            clients and other brokers. For more information, see:

            http://activemq.apache.org/configuring-transports.html
        -->
        <transportConnectors>
            <transportConnector name="openwire" uri="tcp://0.0.0.0:61616"/>
        </transportConnectors>

    </broker>

        <amq:connectionFactory id="jmsFactory" brokerURL="tcp://localhost">
              <amq:redeliveryPolicy>
                <amq:redeliveryPolicy maximumRedeliveries="0"/>
              </amq:redeliveryPolicy>
        </amq:connectionFactory>

        <bean id="mssql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
                <property name="driverClassName" value="com.microsoft.sqlserver.jdbc.SQLServerDriver"/>
                <property name="url" value="jdbc:sqlserver://10.0.213.159:1433;DatabaseName=obfuscated"/>
                <property name="username" value="obfuscated"/>
                <property name="password" value="obfuscated"/>
                <property name="initialSize" value="1"/>
                <property name="maxActive" value="100"/>
                <property name="poolPreparedStatements" value="true"/>
        </bean>

    <!--
        Enable web consoles, REST and Ajax APIs and demos
        It also includes Camel (with its web console), see ${ACTIVEMQ_HOME}/conf/camel.xml
for more info

        Take a look at ${ACTIVEMQ_HOME}/conf/jetty.xml for more details
    -->
    <import resource="jetty.xml"/>

</beans>

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

Mime
View raw message