activemq-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Devesh Arora (JIRA)" <j...@apache.org>
Subject [jira] [Comment Edited] (AMQ-5934) ActiveMQ durable topic subscribers not able to receive new messages after running for awhile
Date Thu, 05 Nov 2015 15:13:27 GMT

    [ https://issues.apache.org/jira/browse/AMQ-5934?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14991745#comment-14991745
] 

Devesh Arora edited comment on AMQ-5934 at 11/5/15 3:12 PM:
------------------------------------------------------------

[~gtully] thank you very much for your feedback - you're absolutely right.  I wasn't sure
if it was intentional or not to have multiple threads access that map. Now that I know, I'm
going to dig a little further to figure out which threads are trying to access it. 


was (Author: darora.asf@gmail.com):
[~gtully] thank you very much for your feedback - you're absolutely right. I'm going to dig
a little further to figure out which threads are trying to access this map. 

> ActiveMQ durable topic subscribers not able to receive new messages after running for
awhile
> --------------------------------------------------------------------------------------------
>
>                 Key: AMQ-5934
>                 URL: https://issues.apache.org/jira/browse/AMQ-5934
>             Project: ActiveMQ
>          Issue Type: Bug
>    Affects Versions: 5.10.2, 5.11.1
>         Environment: ActiveMQ 5.10.2 broker and a topic subscriber with ActiveMQ 5.10
client libraries.
>            Reporter: Eric X
>            Priority: Critical
>         Attachments: AMQ-5934.patch, activemq.dump
>
>
> We have set up an environment which uses ActiveMQ 5.10.2 as broker and a test durable
topic subscriber that connects to the broker using tcp protocol.  The client connection string
as follows:
> failover:(tcp://host1:61617?keepAlive=true&wireFormat.tightEncodingEnabled=false,tcp://host1:61617?keepAlive=true&wireFormat.tightEncodingEnabled=false)?randomize=false&initialReconnectDelay=500&timeout=2000&useExponentialBackOff=false&jms.watchTopicAdvisories=false
> At first, we started five test clients (durable topic subscribers) with aforementioned
connection string and make sure every subscriber is connected to the broker.  After that,
we use a test client to publish 1000000 messages to the topic that five clients subscribed.
 After running for a few hours, two of these five clients were hung (stopped receiving messages
from the broker).  In these two clients, there are 636786 and 93915 messages in pending queue,
1000 messages in dispatched queue.  The other three received all the messages from topic.
 We have been able to consistently reproduce the issue.   These stale clients appears connected
to the broker but do not receive any new messages after they were hung.
> Can you someone look into this issue?  It is very important for us.  We have tried various
configurations including jms.prefetchPolicy.topicPrefetch=1 but none of them are working.
> Here is the activemq conf file (activemq.xml) for this issue.
>  <!--
>     Licensed to the Apache Software Foundation (ASF) under one or more
>     contributor license agreements.  See the NOTICE file distributed with
>     this work for additional information regarding copyright ownership.
>     The ASF licenses this file to You under the Apache License, Version 2.0
>     (the "License"); you may not use this file except in compliance with
>     the License.  You may obtain a copy of the License at
>    
>     http://www.apache.org/licenses/LICENSE-2.0
>    
>     Unless required by applicable law or agreed to in writing, software
>     distributed under the License is distributed on an "AS IS" BASIS,
>     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
>     See the License for the specific language governing permissions and
>     limitations under the License.
> -->
> <beans
>   xmlns="http://www.springframework.org/schema/beans"
>   xmlns:amq="http://activemq.apache.org/schema/core"
>   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
>   xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
>   http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
>     <!-- Allows us to use system properties as variables in this configuration file
-->
>     <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
>         <property name="locations">
>             <value>file:${activemq.base}/conf/credentials.properties</value>

>         </property>      
>     </bean> 
>     <!-- 
>         The <broker> element is used to configure the ActiveMQ broker. 
>     -->
>     <broker xmlns="http://activemq.apache.org/schema/core" advisorySupport="false"
useJmx="true" brokerName="dev.masterbroker-1" dataDirectory="${activemq.data}" >
>         <!-- 
>             The managementContext is used to configure how ActiveMQ is exposed in 
>             JMX. By default, ActiveMQ uses the MBean server that is started by 
>             the JVM. For more information, see: 
>             
>             http://activemq.apache.org/jmx.html 
>         -->
>         <managementContext>
>             <managementContext createConnector="true" connectorPort="2099"/>
>         </managementContext>
> 	<plugins>
>         <jaasDualAuthenticationPlugin configuration="activemq-ssl-domain"/>
>         <jaasDualAuthenticationPlugin configuration="activemq-domain"/>
>  
>         <!-- <simpleAuthenticationPlugin>
> 	<users>
> 		<authenticationUser username="system" password="manager"
> 			groups="users,admins"/>
> 		<authenticationUser username="user" password="password"
> 			groups="users"/>
> 		<authenticationUser username="guest" password="password" groups="guests"/>
> 	</users>
>       </simpleAuthenticationPlugin>   -->
>  
>       <authorizationPlugin>
>         <map>
>           <authorizationMap>
>             <authorizationEntries>
>               <authorizationEntry queue=">" read="admins,users,guests" write="admins,users,guests"
admin="admins" />
>               <authorizationEntry queue="USERS.>" read="users" write="users" admin="users"
/>
>               <authorizationEntry queue="GUEST.>" read="guests" write="guests,users"
admin="guests,users" />
>               
>               <authorizationEntry topic=">" read="admins,users,guests" write="admins,users,guests"
admin="admins" />
>               <authorizationEntry topic="USERS.>" read="users" write="users" admin="users"
/>
>               <authorizationEntry topic="GUEST.>" read="guests" write="guests,users"
admin="guests,users" />
>               
>               <authorizationEntry topic="ActiveMQ.Advisory.>" read="guests,users"
write="guests,users" admin="guests,users"/>
>             </authorizationEntries>
>             
>             <tempDestinationAuthorizationEntry>  
>              <tempDestinationAuthorizationEntry read="tempDestinationAdmins" write="tempDestinationAdmins"
admin="tempDestinationAdmins"/>
>            </tempDestinationAuthorizationEntry>               
>           </authorizationMap>
>         </map>
>       </authorizationPlugin>
>       <redeliveryPlugin fallbackToDeadLetter="false" sendToDlqIfMaxRetriesExceeded="false">
>               <redeliveryPolicyMap>
>                   <redeliveryPolicyMap>
>                       <redeliveryPolicyEntries>
>                           <!-- a destination specific policy -->
>                           <redeliveryPolicy topic="TP5" maximumRedeliveries="-1" redeliveryDelay="10000"
/>
>                       </redeliveryPolicyEntries>
>                       <!-- the fallback policy for all other destinations -->
>                       <defaultEntry>
>                           <redeliveryPolicy maximumRedeliveries="4" initialRedeliveryDelay="5000"
redeliveryDelay="10000" />
>                       </defaultEntry>
>                   </redeliveryPolicyMap>
>               </redeliveryPolicyMap>
>        </redeliveryPlugin>
> 	</plugins> 
>         <!-- 
>             Configure message persistence for the broker. The default persistence
>             mechanism is the KahaDB store (identified by the kahaDB tag). 
>             For more information, see: 
>             
>             http://activemq.apache.org/persistence.html 
>         -->
>         <!--
>         <persistenceAdapter>
>        	            <kahaDB directory="${activemq.data}/kahadb"
>        	                    ignoreMissingJournalfiles="true"
>        	                    checkForCorruptJournalFiles="true"
>        	                    checksumJournalFiles="true"
>        	                    journalMaxFileLength="256mb"/>
>         </persistenceAdapter>
>         -->
>         <persistenceAdapter>
>                 <mKahaDB directory="${activemq.data}/kahadb">
>                         <filteredPersistenceAdapters>
>                                 <!-- match all queues -->
>                                 <filteredKahaDB queue=">">
>                                         <persistenceAdapter>
>                                                 <kahaDB journalMaxFileLength="64mb"
>                                                         ignoreMissingJournalfiles="true"
>                                                         checkForCorruptJournalFiles="true"
>                                                         checksumJournalFiles="true" />
>                                         </persistenceAdapter>
>                                 </filteredKahaDB>
>                                 <!-- match all destinations -->
>                                 <filteredKahaDB topic=">">
>                                         <persistenceAdapter>
>                                                 <kahaDB journalMaxFileLength="64mb"
>                                                         ignoreMissingJournalfiles="true"
>                                                         checkForCorruptJournalFiles="true"
>                                                         checksumJournalFiles="true" />
>                                         </persistenceAdapter>
>                                 </filteredKahaDB>
>                         </filteredPersistenceAdapters>
>                 </mKahaDB>
>         </persistenceAdapter>
>         <!--
>             It's advisable to turn on producer flow control in the production system
>             The systemUsage controls the maximum amount of space the broker will 
>             use before slowing down producers. For more information, see:
>             
>             http://activemq.apache.org/producer-flow-control.html -->
>               
>            <destinationPolicy>
>             <policyMap>
>               <policyEntries>
>                 <policyEntry topic=">" optimizedDispatch="true" producerFlowControl="false"
memoryLimit="200mb" expireMessagesPeriod="300000">
>                     <!-- The constantPendingMessageLimitStrategy is used to prevent
>                          slow topic consumers to block producers and affect other consumers
>                          by limiting the number of messages that are retained
>                          For more information, see:
>                          http://activemq.apache.org/slow-consumer-handling.html
>                     -->
>                   <pendingMessageLimitStrategy>
>                     <constantPendingMessageLimitStrategy limit="1000"/>
>                   </pendingMessageLimitStrategy>
>                 </policyEntry>
>                 <policyEntry queue=">" optimizedDispatch="true" producerFlowControl="false"
memoryLimit="100mb">
>                   <!-- Use VM cursor for better latency
>                        For more information, see:
>                        http://activemq.apache.org/message-cursors.html
>                   <pendingQueuePolicy>
>                     <vmQueueCursor/>
>                   </pendingQueuePolicy>
>                   -->
>                 </policyEntry>
>            <policyEntry queue="Que1" producerFlowControl="false" maxPageSize="5000"
>
>             <dispatchPolicy>
>               <strictOrderDispatchPolicy/>
>             </dispatchPolicy>
>             <subscriptionRecoveryPolicy>
>               <lastImageSubscriptionRecoveryPolicy/>
>             </subscriptionRecoveryPolicy>
>           </policyEntry>
>           <policyEntry queue="Que2" producerFlowControl="false" maxPageSize="5000"
>
>             <dispatchPolicy>
>               <strictOrderDispatchPolicy/>
>             </dispatchPolicy>
>             <subscriptionRecoveryPolicy>
>               <lastImageSubscriptionRecoveryPolicy/>
>             </subscriptionRecoveryPolicy>
>           </policyEntry>
>           <policyEntry queue="Que3" producerFlowControl="false" maxPageSize="5000"
>
>             <dispatchPolicy>
>               <strictOrderDispatchPolicy/>
>             </dispatchPolicy>
>             <subscriptionRecoveryPolicy>
>               <lastImageSubscriptionRecoveryPolicy/>
>             </subscriptionRecoveryPolicy>
>           </policyEntry>
>          <policyEntry queue="Que4" producerFlowControl="false" maxPageSize="5000"
>
>             <dispatchPolicy>
>               <strictOrderDispatchPolicy/>
>             </dispatchPolicy>
>             <subscriptionRecoveryPolicy>
>               <lastImageSubscriptionRecoveryPolicy/>
>             </subscriptionRecoveryPolicy>
>           </policyEntry>
>           <policyEntry queue="Que5" producerFlowControl="false" maxPageSize="5000"
>
>             <dispatchPolicy>
>               <strictOrderDispatchPolicy/>
>             </dispatchPolicy>
>             <subscriptionRecoveryPolicy>
>               <lastImageSubscriptionRecoveryPolicy/>
>             </subscriptionRecoveryPolicy>
>           </policyEntry>
>           <policyEntry queue="Que6" producerFlowControl="false" maxPageSize="5000"
>
>             <dispatchPolicy>
>               <strictOrderDispatchPolicy/>
>             </dispatchPolicy>
>             <subscriptionRecoveryPolicy>
>               <lastImageSubscriptionRecoveryPolicy/>
>             </subscriptionRecoveryPolicy>
>           </policyEntry>
>           <policyEntry queue="Que7" producerFlowControl="false" maxPageSize="5000"
>
>             <dispatchPolicy>
>               <strictOrderDispatchPolicy/>
>             </dispatchPolicy>
>             <subscriptionRecoveryPolicy>
>               <lastImageSubscriptionRecoveryPolicy/>
>             </subscriptionRecoveryPolicy>
>           </policyEntry>
>           <policyEntry queue="Que8" producerFlowControl="false" maxPageSize="5000"
>
>             <dispatchPolicy>
>               <strictOrderDispatchPolicy/>
>             </dispatchPolicy>
>             <subscriptionRecoveryPolicy>
>               <lastImageSubscriptionRecoveryPolicy/>
>             </subscriptionRecoveryPolicy>
>           </policyEntry>
>           <policyEntry queue="Que9" producerFlowControl="false" maxPageSize="5000"
>
>             <dispatchPolicy>
>               <strictOrderDispatchPolicy/>
>             </dispatchPolicy>
>             <subscriptionRecoveryPolicy>
>               <lastImageSubscriptionRecoveryPolicy/>
>             </subscriptionRecoveryPolicy>
>           </policyEntry>
>           <policyEntry queue="Que10" producerFlowControl="false" maxPageSize="5000"
 expireMessagesPeriod="300000" >
>             <dispatchPolicy>
>               <strictOrderDispatchPolicy/>
>             </dispatchPolicy>
>             <subscriptionRecoveryPolicy>
>               <lastImageSubscriptionRecoveryPolicy/>
>             </subscriptionRecoveryPolicy>
>           </policyEntry>
>           
>           <policyEntry topic="TP1" producerFlowControl="false" maxPageSize="5000"
>
>             <dispatchPolicy>
>               <roundRobinDispatchPolicy />
>             </dispatchPolicy>
>             <subscriptionRecoveryPolicy>
>               <lastImageSubscriptionRecoveryPolicy />
>             </subscriptionRecoveryPolicy>
>           </policyEntry>
>           <policyEntry topic="TP2" producerFlowControl="false" maxPageSize="5000"
>
>             <dispatchPolicy>
>               <roundRobinDispatchPolicy />
>             </dispatchPolicy>
>             <subscriptionRecoveryPolicy>
>               <lastImageSubscriptionRecoveryPolicy />
>             </subscriptionRecoveryPolicy>
>           </policyEntry>
>           <policyEntry topic="TP3" producerFlowControl="false" maxPageSize="5000"
>
>             <dispatchPolicy>
>               <roundRobinDispatchPolicy />
>             </dispatchPolicy>
>             <subscriptionRecoveryPolicy>
>               <lastImageSubscriptionRecoveryPolicy />
>             </subscriptionRecoveryPolicy>
>           </policyEntry>
>           <policyEntry topic="TP5" producerFlowControl="false" maxPageSize="5000"
>
>             <dispatchPolicy>
>               <roundRobinDispatchPolicy />
>             </dispatchPolicy>
>             <subscriptionRecoveryPolicy>
>               <lastImageSubscriptionRecoveryPolicy />
>             </subscriptionRecoveryPolicy>
>           </policyEntry>
>           <policyEntry topic="TP6" producerFlowControl="false" maxPageSize="5000"
>
>             <dispatchPolicy>
>               <roundRobinDispatchPolicy />
>             </dispatchPolicy>
>             <subscriptionRecoveryPolicy>
>               <lastImageSubscriptionRecoveryPolicy />
>             </subscriptionRecoveryPolicy>
>           </policyEntry>
>           <policyEntry topic="TP7" producerFlowControl="false" maxPageSize="5000"
>
>             <dispatchPolicy>
>               <roundRobinDispatchPolicy />
>             </dispatchPolicy>
>             <subscriptionRecoveryPolicy>
>               <lastImageSubscriptionRecoveryPolicy />
>             </subscriptionRecoveryPolicy>
>           </policyEntry>
>           <policyEntry topic="TP8" producerFlowControl="false" maxPageSize="5000"
>
>             <dispatchPolicy>
>               <roundRobinDispatchPolicy />
>             </dispatchPolicy>
>             <subscriptionRecoveryPolicy>
>               <lastImageSubscriptionRecoveryPolicy />
>             </subscriptionRecoveryPolicy>
>           </policyEntry>
>           <policyEntry topic="TP9" producerFlowControl="false" maxPageSize="5000"
>
>             <dispatchPolicy>
>               <roundRobinDispatchPolicy />
>             </dispatchPolicy>
>             <subscriptionRecoveryPolicy>
>               <lastImageSubscriptionRecoveryPolicy />
>             </subscriptionRecoveryPolicy>
>           </policyEntry>
>           <policyEntry topic="TP10" producerFlowControl="false" maxPageSize="5000"
>
>             <dispatchPolicy>
>               <roundRobinDispatchPolicy />
>             </dispatchPolicy>
>             <subscriptionRecoveryPolicy>
>               <lastImageSubscriptionRecoveryPolicy />
>             </subscriptionRecoveryPolicy>
>           </policyEntry>
>               </policyEntries>
>             </policyMap>
>         </destinationPolicy>
>     <destinations>
>       <queue physicalName="Que1" />
>       <queue physicalName="Que2" />
>       <queue physicalName="Que3" />
>       <queue physicalName="Que4" />
>       <queue physicalName="Que5" />
>       <queue physicalName="Que6" />
>       <queue physicalName="Que7" />
>       <queue physicalName="Que8" />
>       <queue physicalName="Que9" />
>       <queue physicalName="Que10" />
>       <topic physicalName="TP1" />
>       <topic physicalName="TP2" />
>       <topic physicalName="TP3" />
>       <topic physicalName="TP4" />
>       <topic physicalName="TP5" />
>       <topic physicalName="TP6" />
>       <topic physicalName="TP7" />
>       <topic physicalName="TP8" />
>       <topic physicalName="TP9" />
>       <topic physicalName="TP10" />
>     </destinations>
>  
>         <!--
>             The sslContext can be used to configure broker-specific SSL properties.
>         -->
>         <sslContext>
>             <sslContext keyStore="file:${activemq.base}/conf/broker.ks"
>               keyStorePassword="password" trustStore="file:${activemq.base}/conf/broker.ts"
>               trustStorePassword="password"/>
>         </sslContext>
>       
>         <plugins>
>             <loggingBrokerPlugin logAll="false" logConnectionEvents="false" logSessionEvents="false"/>
>             <timeStampingBrokerPlugin zeroExpirationOverride="0" ttlCeiling="0" futureOnly="true"/>
>             <traceBrokerPathPlugin/>
>         </plugins>
>   
>         <systemUsage>
>             <systemUsage>
>                 <memoryUsage>
>                     <memoryUsage limit="6 gb"/>
>                 </memoryUsage>
>                 <storeUsage>
>                     <storeUsage limit="20 gb" name="foo"/>
>                 </storeUsage>
>                 <tempUsage>
>                     <tempUsage limit="2 gb"/>
>                 </tempUsage>
>             </systemUsage>
>         </systemUsage>
>         <!-- The store and forward broker networks ActiveMQ will listen to -->
>         <networkConnectors>
>             <!-- <networkConnector name="default-nc" uri="multicast://default"/>
-->
>         </networkConnectors>
>         <!-- 
>             The transport connectors expose ActiveMQ over a given protocol to
>             clients and other brokers. For more information, see: 
>             
>             http://activemq.apache.org/configuring-transports.html 
>         -->
>         <transportConnectors>
>             <!-- DOS protection, limit concurrent connections to 1000 and frame size
to 100MB -->
>             <transportConnector name="openwire" uri="tcp://localhost:61636?wireFormat.maxInactivityDurationInitalDelay=30000"/>
>             
> 	         <transportConnector name="nio+ssl" uri="nio+ssl://localhost:61639?needClientAuth=true&amp;wireFormat.maxInactivityDurationInitalDelay=30000"/>
>             <transportConnector name="ssl" uri="ssl://localhost:61637?needClientAuth=true&amp;wireFormat.maxInactivityDurationInitalDelay=30000"/>
>         </transportConnectors>
>         <!-- destroy the spring context on shutdown to stop jetty -->
>         <shutdownHooks>
>             <bean xmlns="http://www.springframework.org/schema/beans" class="org.apache.activemq.hooks.SpringContextHook"
/>
>         </shutdownHooks>
>     </broker>
> <!--
>     <commandAgent xmlns="http://activemq.apache.org/schema/core" brokerUrl="vm://asb-jms-01-vm.eng.rr.com"
username="system" password="password"/>
> -->
>     <!-- 
>         Uncomment to enable Camel
>         Take a look at activemq-camel.xml for more details
>     <import resource="activemq-camel.xml"/> -->
>     <!-- 
>         Enable web consoles, REST and Ajax APIs and demos
>         Take a look at activemq-jetty.xml for more details -->
>     <import resource="jetty.xml"/> 
>     
> </beans>
>     



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message