activemq-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From xjchwork <xjchw...@gmail.com>
Subject sometime,messages can not be consumed ,remaining in queue
Date Fri, 11 Feb 2011 15:23:50 GMT

hi, 
Our system uses activemq5.3.0, running about 15 days, monitoring to two
messages can not be consumed, only behind the message into the queue, before
the backlog of messages to be consumed, the queue still retains two new
messages can not be consumed in the queue怂Only after the application restart
all two messages in order to be consumed. 

Just reproduce the problem, when consumer is very slow, QueueSize>
InFlightCount, there will be QueueSize-InFlightCount number of message can
not be consumed 


Thanks in advance, 
xjchwork 

ActiveMQ: 5.3.0 
Java: 1.6.0_14 
Spring: 2.5.6 
com.springsource:xbean-spring:3.3 
Connector URL: tcp://localhost:61620 
JMS receivetimeout: 2000 


activemq.xml: 
==============================================================
<beans 
        xmlns="http://www.springframework.org/schema/beans" 
        xmlns:amq="http://activemq.apache.org/schema/core" 
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
        xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
  http://activemq.apache.org/schema/core
http://activemq.apache.org/schema/core/activemq-core.xsd"> 

    <bean
class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/> 

    <broker useJmx="true" xmlns="http://activemq.apache.org/schema/core"> 

        <destinationPolicy> 
            <policyMap> 
                <policyEntries> 
                    <policyEntry queue=">" producerFlowControl="true"
memoryLimit="1mb"> 
                    </policyEntry> 
                </policyEntries> 
            </policyMap> 
        </destinationPolicy> 

        <managementContext> 
            <managementContext connectorPort="3012"
jmxDomainName="switch.domain"/> 
        </managementContext> 

        <persistenceAdapter> 
            <kahaPersistenceAdapter
directory="/home/iboss/appdatas/casender/mqdata" maxDataFileLength="1 gb"/> 
        </persistenceAdapter> 


        <systemUsage> 
            <systemUsage> 
                <memoryUsage> 
                    <memoryUsage limit="256 mb"/> 
                </memoryUsage> 
                <storeUsage> 
                    <storeUsage limit="2 gb"/> 
                </storeUsage> 
                <tempUsage> 
                    <tempUsage limit="512 mb"/> 
                </tempUsage> 
            </systemUsage> 
        </systemUsage> 


        <transportConnectors> 
            <transportConnector uri="tcp://0.0.0.0:61620"/> 
        </transportConnectors> 

    </broker> 


</beans>  



resource.xml
................................................................. 

<bean id="msgTool" class="com.lr.lightmessage.MessageToolFactory" 
          destroy-method="close"> 
        <property name="receiveTimeout" value="2000"/> 
        <property name="cf"> 
            <bean class="org.apache.activemq.pool.PooledConnectionFactory" 
                  destroy-method="stop"> 
                <property name="connectionFactory"> 
                    <bean
class="org.apache.activemq.ActiveMQConnectionFactory"> 
                        <property name="brokerURL" 
                                 
value="failover:(tcp://localhost:61620?wireFormat.maxInactivityDuration=0)"/> 
                        <property name="useAsyncSend"> 
                            <value type="boolean">true</value> 
                        </property> 
                    </bean> 
                </property> 
            </bean> 
        </property> 
  </bean> 
................................................................. 


public class MsgListenerThread extends Thread { 
..................... 

while (true) { 
                if (messageTool == null) 
                    messageTool = (MessageToolFactory)
SpringUtil.getBean("msgTool"); 
                object = (RequestBean)
messageTool.getTool("queneA").receive(); 
                if (object == null) { 
                    continue; 
                } 
                log.info(new JSON(object).toString()); 
} 
.................. 
} 

public class MessageTool extends JmsGatewaySupport 
{ 
................................ 

 public synchronized Serializable receive() 
  { 
    if (!this.running) { 
      try { 
        Thread.sleep(100L); 
      } catch (InterruptedException localInterruptedException) { 
      } 
      return null; 
    } 
    try 
    { 
      if (this.consumer == null) 
      { 
        consumeInit(); 
      } 

      long start = System.currentTimeMillis(); 
      Message msg; 
      Message msg; 
      if (getJmsTemplate().getReceiveTimeout() > 0L) 
        msg = this.consumer.receive(getJmsTemplate().getReceiveTimeout()); 
      else { 
        msg = this.consumer.receive(); 
      } 

      log.debug( 
        new StringBuffer("queue
").append(getJmsTemplate().getDefaultDestinationName()) 
        .append(" receive msg spend: ") 
        .append(System.currentTimeMillis() - start)); 

      if (msg == null) 
        return null; 
      if (msg instanceof TextMessage) { 
        return ((TextMessage)msg).getText(); 
      } 
      return ((ObjectMessage)msg).getObject(); 
    } 
    catch (Exception e) { 
      throw new RuntimeException("receive msg error", e); 
    } 
  } 

http://activemq.2283324.n4.nabble.com/file/n3301439/001.jpg 
-- 
View this message in context: http://activemq.2283324.n4.nabble.com/sometime-messages-can-not-be-consumed-remaining-in-queue-tp3301439p3301439.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.

Mime
View raw message