activemq-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Nicusor Tanase (JIRA)" <j...@apache.org>
Subject [jira] Commented: (AMQ-1918) AbstractStoreCursor.size gets out of synch with Store size and blocks consumers
Date Fri, 26 Sep 2008 11:06:52 GMT

    [ https://issues.apache.org/activemq/browse/AMQ-1918?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=46016#action_46016
] 

Nicusor Tanase commented on AMQ-1918:
-------------------------------------

I found a way to work around this issue, by changing the way messages are loaded from the
database.
I ran tests with several queues, producers and consumers and did not get any undelivered messages
anymore.

DefaultJDBCAdapter.doRecoverNextMessages() recovers the messages with ID higher then the last
recovered messages.
The SQL statement is:
{code:title=org.apache.activemq.store.jdbc.Statements.java|borderStyle=solid}
findNextMessagesStatement = "SELECT ID, MSG FROM " + getFullMessageTableName()
                                        + " WHERE CONTAINER=? AND ID > ? ORDER BY ID";
{code}
However, it can happen that messages with lower id are inserted into the DB after messages
with higher IDs. 
Such messages do not get recovered from DB.

I have changed on my local copy the DefaultJDBCAdapter to act retroactive, looking back {{maxReturned}}
rows for any missed messages.
Anyway, I am not familiar with ActiveMQ code, so you might want to have a look at the modified
DefaultJDBCAdapter.doRecoverNextMessages() bellow:

{code:title=org.apache.activemq.store.jdbc.adapter.DefaultJDBCAdapter.java|borderStyle=solid}
   public class DefaultJDBCAdapter implements JDBCAdapter {

   private Set<Long> lastRecoveredMessagesIds = new TreeSet<Long>();
   -------------------------------------------------------

    public void doRecoverNextMessages(TransactionContext c, ActiveMQDestination destination,
long nextSeq,
                                      int maxReturned, JDBCMessageRecoveryListener listener)
throws Exception {
        PreparedStatement s = null;
        ResultSet rs = null;
        long id = 0;
        List<Long> cleanupIds = new ArrayList<Long>();
        int index = 0;
        try {
            s = c.getConnection().prepareStatement(statements.getFindNextMessagesStatement());
            s.setMaxRows(maxReturned*2);
            s.setString(1, destination.getQualifiedName());
            s.setLong(2, nextSeq - maxReturned);
            rs = s.executeQuery();
            int count = 0;
            if (statements.isUseExternalMessageReferences()) {
                while (rs.next() && count < maxReturned) {
                	id = rs.getLong(1);
                	if ( lastRecoveredMessagesIds.contains(id) ) {
                		// this message was already recovered
                		cleanupIds.add(id);
                		continue;
                	}                	
                    if (listener.recoverMessageReference(rs.getString(1))) {
                        count++;
                        lastRecoveredMessagesIds.add(id);
                    } else {
                        LOG.debug("Stopped recover next messages");
                    }
                }
            } else {
                while (rs.next() && count < maxReturned) {
                	id = rs.getLong(1);
                	if ( lastRecoveredMessagesIds.contains(id) ) {
                		// this message was already recovered
                		cleanupIds.add(id);
                		continue;
                	}
                    if (listener.recoverMessage(rs.getLong(1), getBinaryData(rs, 2))) {
                        count++;
                        lastRecoveredMessagesIds.add(id);
                    } else {
                        LOG.debug("Stopped recover next messages");
                    }
                }
            }
            
            //not cleanup the list of recovered messages
            index = 0;
            Iterator<Long> it = cleanupIds.iterator();
            while (it.hasNext() && index < count) {
            	lastRecoveredMessagesIds.remove(it.next());
            }
            
            
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            close(rs);
            close(s);
        }
    }

}
{code}




> AbstractStoreCursor.size gets out of synch with Store size and blocks consumers
> -------------------------------------------------------------------------------
>
>                 Key: AMQ-1918
>                 URL: https://issues.apache.org/activemq/browse/AMQ-1918
>             Project: ActiveMQ
>          Issue Type: Bug
>          Components: Message Store
>    Affects Versions: 5.1.0
>            Reporter: Richard Yarger
>            Assignee: Rob Davies
>            Priority: Critical
>             Fix For: 5.3.0
>
>         Attachments: activemq.xml, testAMQMessageStore.zip, testdata.zip
>
>
> In version 5.1.0, we are seeing our queue consumers stop consuming for no reason.
> We have a staged queue environment and we occasionally see one queue display negative
pending message counts that hang around -x, rise to -x+n gradually and then fall back to -x
abruptly. The messages are building up and being processed in bunches but its not easy to
see because the counts are negative. We see this behavior in the messages coming out of the
system. Outbound messages come out in bunches and are synchronized with the queue pending
count dropping to -x.
> This issue does not happen ALL of the time. It happens about once a week and the only
way to fix it is to bounce the broker. It doesn't happen to the same queue everytime, so it
is not our consuming code.
> Although we don't have a reproducible scenario, we have been able to debug the issue
in our test environment.
> We traced the problem to the cached store size in the AbstractStoreCursor.
> This value becomes 0 or negative and prevents the AbstractStoreCursor from retrieving
more messages from the store. (see AbstractStoreCursor.fillBatch() )
> We have seen size value go lower than -1000.
> We have also forced it to fix itself by sending in n+1 messages. Once the size goes above
zero, the cached value is refreshed and things work ok again.
> Unfortunately, during low volume times, it could be hours before n+1 messages are received,
so our message latency can rise during low volume times.... :(
> I have attached our broker config.

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.


Mime
View raw message