activemq-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Martin C." <mart...@gmx.at>
Subject Re: sometime,messages can not be consumed ,remaining in queue
Date Tue, 15 Feb 2011 11:00:44 GMT
Hi,

I don't see which acknowledge mode you are using, but beware that you
need to call session.recover() as indicated in the JMS specification
in order to start redelivery of messages, if there is an exception
(see http://download.oracle.com/javaee/1.4/api/javax/jms/Session.html).
This is at least required, if you don't use SESSION_TRANSACTED or
AUTO_ACKNOWLEDGE.

Best regards,
Martin

On Fri, Feb 11, 2011 at 4:23 PM, xjchwork <xjchwork@gmail.com> wrote:
>
> hi,
> Our system uses activemq5.3.0, running about 15 days, monitoring to two
> messages can not be consumed, only behind the message into the queue, before
> the backlog of messages to be consumed, the queue still retains two new
> messages can not be consumed in the queue。Only after the application restart
> all two messages in order to be consumed.
>
> Just reproduce the problem, when consumer is very slow, QueueSize>
> InFlightCount, there will be QueueSize-InFlightCount number of message can
> not be consumed
>
>
> Thanks in advance,
> xjchwork
>
> ActiveMQ: 5.3.0
> Java: 1.6.0_14
> Spring: 2.5.6
> com.springsource:xbean-spring:3.3
> Connector URL: tcp://localhost:61620
> JMS receivetimeout: 2000
>
>
> activemq.xml:
> ==============================================================
> <beans
>        xmlns="http://www.springframework.org/schema/beans"
>        xmlns:amq="http://activemq.apache.org/schema/core"
>        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
>        xsi:schemaLocation="http://www.springframework.org/schema/beans
> http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
>  http://activemq.apache.org/schema/core
> http://activemq.apache.org/schema/core/activemq-core.xsd">
>
>    <bean
> class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/>
>
>    <broker useJmx="true" xmlns="http://activemq.apache.org/schema/core">
>
>        <destinationPolicy>
>            <policyMap>
>                <policyEntries>
>                    <policyEntry queue=">" producerFlowControl="true"
> memoryLimit="1mb">
>                    </policyEntry>
>                </policyEntries>
>            </policyMap>
>        </destinationPolicy>
>
>        <managementContext>
>            <managementContext connectorPort="3012"
> jmxDomainName="switch.domain"/>
>        </managementContext>
>
>        <persistenceAdapter>
>            <kahaPersistenceAdapter
> directory="/home/iboss/appdatas/casender/mqdata" maxDataFileLength="1 gb"/>
>        </persistenceAdapter>
>
>
>        <systemUsage>
>            <systemUsage>
>                <memoryUsage>
>                    <memoryUsage limit="256 mb"/>
>                </memoryUsage>
>                <storeUsage>
>                    <storeUsage limit="2 gb"/>
>                </storeUsage>
>                <tempUsage>
>                    <tempUsage limit="512 mb"/>
>                </tempUsage>
>            </systemUsage>
>        </systemUsage>
>
>
>        <transportConnectors>
>            <transportConnector uri="tcp://0.0.0.0:61620"/>
>        </transportConnectors>
>
>    </broker>
>
>
> </beans>
>
>
>
> resource.xml
> .................................................................
>
> <bean id="msgTool" class="com.lr.lightmessage.MessageToolFactory"
>          destroy-method="close">
>        <property name="receiveTimeout" value="2000"/>
>        <property name="cf">
>            <bean class="org.apache.activemq.pool.PooledConnectionFactory"
>                  destroy-method="stop">
>                <property name="connectionFactory">
>                    <bean
> class="org.apache.activemq.ActiveMQConnectionFactory">
>                        <property name="brokerURL"
>
> value="failover:(tcp://localhost:61620?wireFormat.maxInactivityDuration=0)"/>
>                        <property name="useAsyncSend">
>                            <value type="boolean">true</value>
>                        </property>
>                    </bean>
>                </property>
>            </bean>
>        </property>
>  </bean>
> .................................................................
>
>
> public class MsgListenerThread extends Thread {
> .....................
>
> while (true) {
>                if (messageTool == null)
>                    messageTool = (MessageToolFactory)
> SpringUtil.getBean("msgTool");
>                object = (RequestBean)
> messageTool.getTool("queneA").receive();
>                if (object == null) {
>                    continue;
>                }
>                log.info(new JSON(object).toString());
> }
> ..................
> }
>
> public class MessageTool extends JmsGatewaySupport
> {
> ................................
>
>  public synchronized Serializable receive()
>  {
>    if (!this.running) {
>      try {
>        Thread.sleep(100L);
>      } catch (InterruptedException localInterruptedException) {
>      }
>      return null;
>    }
>    try
>    {
>      if (this.consumer == null)
>      {
>        consumeInit();
>      }
>
>      long start = System.currentTimeMillis();
>      Message msg;
>      Message msg;
>      if (getJmsTemplate().getReceiveTimeout() > 0L)
>        msg = this.consumer.receive(getJmsTemplate().getReceiveTimeout());
>      else {
>        msg = this.consumer.receive();
>      }
>
>      log.debug(
>        new StringBuffer("queue
> ").append(getJmsTemplate().getDefaultDestinationName())
>        .append(" receive msg spend: ")
>        .append(System.currentTimeMillis() - start));
>
>      if (msg == null)
>        return null;
>      if (msg instanceof TextMessage) {
>        return ((TextMessage)msg).getText();
>      }
>      return ((ObjectMessage)msg).getObject();
>    }
>    catch (Exception e) {
>      throw new RuntimeException("receive msg error", e);
>    }
>  }
>
> http://activemq.2283324.n4.nabble.com/file/n3301439/001.jpg

Mime
View raw message