activemq-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Czeslaw (JIRA)" <j...@apache.org>
Subject [jira] [Comment Edited] (AMQ-6067) OutOfMemoryError when expiring big amount of topic messages
Date Tue, 09 Aug 2016 07:21:20 GMT

    [ https://issues.apache.org/jira/browse/AMQ-6067?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15413093#comment-15413093
] 

Czeslaw edited comment on AMQ-6067 at 8/9/16 7:21 AM:
------------------------------------------------------

Hi,
I see another issue in amq 5.13.2 related with topic message expiration.   I have one producer
which send me messages to MyTopic and one durable subscriber/consumer. Messages are stored
in DB and after 30 seconds amq broker trying to expire these messages ( I have configuration
entry for this: <amq:policyEntry topic="MyTopic.>" expireMessagesPeriod="30000">
). I’m getting NullPointerException from AdvisoryBroker and messages are never expired and
remain in activemq_msgs table.

java.lang.NullPointerException
	at org.apache.activemq.advisory.AdvisoryBroker.messageExpired(AdvisoryBroker.java:430)
	at org.apache.activemq.broker.BrokerFilter.messageExpired(BrokerFilter.java:313)
	at org.apache.activemq.broker.BrokerFilter.messageExpired(BrokerFilter.java:313)
	at org.apache.activemq.broker.BrokerFilter.messageExpired(BrokerFilter.java:313)
	at org.apache.activemq.broker.MutableBrokerFilter.messageExpired(MutableBrokerFilter.java:325)
	at org.apache.activemq.broker.region.Topic.messageExpired(Topic.java:776)
	at org.apache.activemq.broker.region.Topic.doBrowse(Topic.java:660)


Under debugger I see that in org.apache.activemq.advisory.AdvisoryBroker.messageExpired  baseDestination
is null and call baseDestination.getActiveMQDestination() generates NPE

@Override
public void messageExpired(ConnectionContext context, MessageReference messageReference, Subscription
subscription) {
    super.messageExpired(context, messageReference, subscription);
    try {
        if (!messageReference.isAdvisory()) {
            BaseDestination baseDestination = (BaseDestination) messageReference.getMessage().getRegionDestination();
            ActiveMQTopic topic = AdvisorySupport.getExpiredMessageTopic(baseDestination.getActiveMQDestination());

 Below is code how messages are produced:

public static class Producer implements Runnable {
    public void run() {
        try {
            ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(BROKER_ULR);
            Connection connection = connectionFactory.createConnection();
            connection.start();
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            Topic destination = session.createTopic("MyTopic");

            MessageProducer producer = session.createProducer(destination);
            producer.setDeliveryMode(DeliveryMode.PERSISTENT);
            producer.setTimeToLive(30000);
          
            for(int i =0; i < 10; i ++) {
                String text = "Message number " + i +  "  From: " + Thread.currentThread().getName();
                TextMessage message = session.createTextMessage(text);
                message.setJMSDestination(destination);
                producer.send(destination, message);

                Thread.sleep(10000);
            }
            // Clean up
            session.close();
            connection.close();
        }
        catch (Exception e) {
            System.out.println("Caught: " + e);
            e.printStackTrace();
        }
    }
}

Under debugger I see that setter method setRegionDestination (org.apache.activemq.command.Message
) is called in org.apache.activemq.broker.region.Topic class before message is send:
  
@Override
public void send(final ProducerBrokerExchange producerExchange, final Message message) throws
Exception {
    final ConnectionContext context = producerExchange.getConnectionContext();

    final ProducerInfo producerInfo = producerExchange.getProducerState().getInfo();
    producerExchange.incrementSend();
    final boolean sendProducerAck = !message.isResponseRequired() && producerInfo.getWindowSize()
> 0
            && !context.isInRecoveryMode();

    message.setRegionDestination(this);

I don’t see call this method when message is loaded. 

Do you have any idea what can be wrong ? Is it issue in my messages (something is missing)
 or it is another bug in amq code ?
 



was (Author: cszczotka):
Hi,
I see another issue in amq 5.13.2 related with topic message expiration.   I have one producer
which send me messages to MyTopic and one durable subscriber/consumer. Messages are stored
in DB and after 30 seconds amq broker trying to expire these messages ( I have configuration
entry for this: <amq:policyEntry topic="MyTopic.>" expireMessagesPeriod="30000">
). I’m getting NullPointerException from AdvisoryBroker and messages are never expired and
remain in activemq_msgs table.

java.lang.NullPointerException
	at org.apache.activemq.advisory.AdvisoryBroker.messageExpired(AdvisoryBroker.java:430)
	at org.apache.activemq.broker.BrokerFilter.messageExpired(BrokerFilter.java:313)
	at org.apache.activemq.broker.BrokerFilter.messageExpired(BrokerFilter.java:313)
	at org.apache.activemq.broker.BrokerFilter.messageExpired(BrokerFilter.java:313)
	at org.apache.activemq.broker.MutableBrokerFilter.messageExpired(MutableBrokerFilter.java:325)
	at org.apache.activemq.broker.region.Topic.messageExpired(Topic.java:776)
	at org.apache.activemq.broker.region.Topic.doBrowse(Topic.java:660)


Under debugger I see that in org.apache.activemq.advisory.AdvisoryBroker.messageExpired  baseDestination
is null and call baseDestination.getActiveMQDestination() generates NPE

@Override
public void messageExpired(ConnectionContext context, MessageReference messageReference, Subscription
subscription) {
    super.messageExpired(context, messageReference, subscription);
    try {
        if (!messageReference.isAdvisory()) {
            BaseDestination baseDestination = (BaseDestination) messageReference.getMessage().getRegionDestination();
            ActiveMQTopic topic = AdvisorySupport.getExpiredMessageTopic(baseDestination.getActiveMQDestination());

 Below is code how messages are produced:

public static class Producer implements Runnable {
    public void run() {
        try {
            ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(BROKER_ULR);
            Connection connection = connectionFactory.createConnection();
            connection.start();
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            Topic destination = session.createTopic("MyTopic");

            MessageProducer producer = session.createProducer(destination);
            producer.setDeliveryMode(DeliveryMode.PERSISTENT);
            producer.setTimeToLive(30000);
          
            for(int i =0; i < 10; i ++) {
                String text = "Message number " + i +  "  From: " + Thread.currentThread().getName();
                TextMessage message = session.createTextMessage(text);
                message.setJMSDestination(destination);
                producer.send(destination, message);
                Thread.sleep(10000);
            }
            // Clean up
            session.close();
            connection.close();
        }
        catch (Exception e) {
            System.out.println("Caught: " + e);
            e.printStackTrace();
        }
    }
}

Under debugger I see that setter method setRegionDestination (org.apache.activemq.command.Message
) is called in org.apache.activemq.broker.region.Topic class before message is send:
  
@Override
public void send(final ProducerBrokerExchange producerExchange, final Message message) throws
Exception {
    final ConnectionContext context = producerExchange.getConnectionContext();

    final ProducerInfo producerInfo = producerExchange.getProducerState().getInfo();
    producerExchange.incrementSend();
    final boolean sendProducerAck = !message.isResponseRequired() && producerInfo.getWindowSize()
> 0
            && !context.isInRecoveryMode();

    message.setRegionDestination(this);

I don’t see call this method when message is loaded. 

Do you have any idea what can be wrong ? Is it issue in my messages (something is missing)
 or it is another bug in amq code ?
 


> OutOfMemoryError when expiring big amount of topic messages
> -----------------------------------------------------------
>
>                 Key: AMQ-6067
>                 URL: https://issues.apache.org/jira/browse/AMQ-6067
>             Project: ActiveMQ
>          Issue Type: Bug
>          Components: JDBC
>    Affects Versions: 5.10.0
>            Reporter: Petr Havránek
>              Labels: durable, durable_subscription, expiration, jdbc, timeToLive,
>
> There is a problem in
> {noformat}
> org.apache.activemq.broker.region.Topic.expireMessagesTask
> {noformat}
> When there are big amount of topic messages that are going to be expired, this {{expireMessagesTask}}
loads all of the messages to memory. This causes
> {noformat}
> 2015-11-24 11:05:46.359 WARN  [ActiveMQ Broker[JmsEngineActivemqBroker] Scheduler] [Topic]
Failed to browse Topic: test-topic
> java.lang.OutOfMemoryError: Java heap space
> 	at oracle.sql.BLOB.getBytes(BLOB.java:204)
> 	at oracle.jdbc.driver.T4CBlobAccessor.getBytes(T4CBlobAccessor.java:464)
> 	at oracle.jdbc.driver.OracleResultSetImpl.getBytes(OracleResultSetImpl.java:676)
> 	at org.apache.commons.dbcp.DelegatingResultSet.getBytes(DelegatingResultSet.java:203)
> 	at org.apache.activemq.store.jdbc.adapter.DefaultJDBCAdapter.getBinaryData(DefaultJDBCAdapter.java:80)
> 	at org.apache.activemq.store.jdbc.adapter.DefaultJDBCAdapter.doRecover(DefaultJDBCAdapter.java:418)
> 	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
> 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
> 	at java.lang.reflect.Method.invoke(Method.java:597)
> 	at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:309)
> 	at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:183)
> 	at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:150)
> 	at org.springframework.aop.interceptor.AbstractTraceInterceptor.invoke(AbstractTraceInterceptor.java:113)
> 	at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:172)
> 	at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:202)
> 	at $Proxy14.doRecover(Unknown Source)
> 	at org.apache.activemq.store.jdbc.JDBCMessageStore.recover(JDBCMessageStore.java:236)
> 	at org.apache.activemq.store.ProxyTopicMessageStore.recover(ProxyTopicMessageStore.java:62)
> 	at org.apache.activemq.broker.region.Topic.doBrowse(Topic.java:594)
> 	at org.apache.activemq.broker.region.Topic.access$100(Topic.java:65)
> 	at org.apache.activemq.broker.region.Topic$6.run(Topic.java:733)
> 	at org.apache.activemq.thread.SchedulerTimerTask.run(SchedulerTimerTask.java:33)
> 	at java.util.TimerThread.mainLoop(Timer.java:512)
> 	at java.util.TimerThread.run(Timer.java:462)
> {noformat}
> The problem happens when using JDBC persistency with ActiveMQ 5.10.0. After a short look
to source code, the same problem could be also with 5.12.1.
> Test case:
> - run ActiveMQ broker with JDBC persistency
> - create subscription to a topic, but do not receive the messages
> - send enough number of messages with short TimeToLive
> - when expireMessagesTask is scheduled, it tries to load all of the messages and causes
the OutOfMemoryError
> It would be fine if
> {noformat}
> org.apache.activemq.store.jdbc.JDBCMessageStore.recover(MessageRecoveryListener)
> {noformat}
> will be updated like this:
> {code:java}
> public void recover(final MessageRecoveryListener listener) throws Exception {
>   // Get all the Message ids out of the database.
>   TransactionContext c = persistenceAdapter.getTransactionContext();
>   try {
>     c = persistenceAdapter.getTransactionContext();
>     adapter.doRecover(c, destination, new JDBCMessageRecoveryListener() {
>       public boolean recoverMessage(long sequenceId, byte[] data) throws Exception {
>         if (listener.hasSpace()) {
>           Message msg = (Message)wireFormat.unmarshal(new ByteSequence(data));
>           msg.getMessageId().setBrokerSequenceId(sequenceId);
>           return listener.recoverMessage(msg);
>         } else {
>           logger.debug("Recovery limit of the messages has exceeded.");
>           return false;
>         }                    
>       }
>       public boolean recoverMessageReference(String reference) throws Exception {
>         if (listener.hasSpace()) {
>           return listener.recoverMessageReference(new MessageId(reference));
>         } else {
>           logger.debug("Recovery limit of the message references has exceeded.");
>           return false;
>         }
>       }
>     });
>   } catch (SQLException e) {
>     JDBCPersistenceAdapter.log("JDBC Failure: ", e);
>     throw IOExceptionSupport.create("Failed to recover container. Reason: " + e, e);
>   } finally {
>     c.close();
>   }
> }
> {code}
> But I am not sure if this limitation is the best way, because there will be some messages
that should be expired, but need to wait. So better solution might be to do this job in more
separated transactions.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message