activemq-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Peter Blackburn (JIRA)" <j...@apache.org>
Subject [jira] Issue Comment Edited: (AMQ-2955) Message getting stuck on queue, leading to KahaDB log files not being deleted and disk running out of space
Date Tue, 12 Oct 2010 12:58:43 GMT

    [ https://issues.apache.org/activemq/browse/AMQ-2955?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=62524#action_62524
] 

Peter Blackburn edited comment on AMQ-2955 at 10/12/10 8:57 AM:
----------------------------------------------------------------

Attached test harness. 

Compile with following:

{noformat}
javac -cp .activemq-core-5.4.1.jar:geronimo-j2ee-management_1.1_spec-1.0.1.jar:log4j-1.2.15.jar:commons-logging.jar:jms.jar
*.java
{noformat}

When producing the error, we restart the activemq server and wait until we see the MessageDatabase
"Checkpoint done" message in the log, then kick off the enquer as follows:

{noformat}
java -cp .:jms.jar:activemq-core-5.4.1.jar:geronimo-j2ee-management_1.1_spec-1.0.1.jar:log4j-1.2.15.jar:commons-logging.jar
jms.TrackedMessageEnqueuer 'tcp://localhost:61616' 1 10 0
{noformat}

and then immediately kick off the dequeuer as follows:

{noformat}
java -cp .:jms.jar:lib/activemq-core-5.4.1.jar:geronimo-j2ee-management_1.1_spec-1.0.1.jar:log4j-1.2.15.jar:commons-logging.jar
jms.TrackedMessageDequeuer 'tcp://localhost:61616'
{noformat}

We don't always get the error, but when it occurs it is always on the 45th message sent, using
the 10K message size as shown.

      was (Author: pblackburn):
    Attached test harness. 

Compile with following:

{noformat}
javac -cp .activemq-core-5.4.1.jar:geronimo-j2ee-management_1.1_spec-1.0.1.jar:log4j-1.2.15.jar:commons-logging.jar:jms.jar
*.java
{noformat}

When producing the error, we restart the activemq server and wait until we see the MessageDatabase
"Checkpoint done" message in the log, then kick off the enquer as follows:

{noformat}
java -cp .:jms.jar:activemq-core-5.4.1.jar:geronimo-j2ee-management_1.1_spec-1.0.1.jar:log4j-1.2.15.jar:commons-logging.jar
jms.TrackedMessageEnqueuer 'tcp://localhost:61616' 1 10 0
{noformat}

and then immediately kick off the dequeuer as follows:

{noformat}
java -cp .:jms.jar:lib/activemq-core-5.4.1.jar:geronimo-j2ee-management_1.1_spec-1.0.1.jar:og4j-1.2.15.jar:commons-logging.jar
jms.TrackedMessageDequeuer 'tcp://localhost:61616'
{noformat}

We don't always get the error, but when it occurs it is always on the 45th message sent, using
the 10K message size as shown.
  
> Message getting stuck on queue, leading to KahaDB log files not being deleted and disk
running out of space
> -----------------------------------------------------------------------------------------------------------
>
>                 Key: AMQ-2955
>                 URL: https://issues.apache.org/activemq/browse/AMQ-2955
>             Project: ActiveMQ
>          Issue Type: Bug
>          Components: Message Store
>    Affects Versions: 5.4.1
>         Environment: Red Hat Enterprise Linux 5
>            Reporter: Peter Blackburn
>            Priority: Critical
>         Attachments: TrackedMessageDequeuer.java, TrackedMessageEnqueuer.java
>
>
> Using the following test client, we see a single message getting stuck on the queue.
This then prevents the KahaDB files from being cleaned up. Once this message gets stuck, we
need to restart the broker before it can be consumed. This is a total show stopper for us,
as when this occurs in our system the large number of messages that we produce and consume
each second causes the disk to run out of space within the space of an hour. We also see the
same behaviour using synchronous sending and without failover.
> This doesn't happen every time with the test client - the most reliable way I have found
to reproduce it is to start the broker and wait for the first MessageDatabase checkpoint to
finish before starting the test client. 
> {code:title=Test Client}
> import java.io.BufferedWriter;
> import java.io.FileWriter;
> import java.util.Random;
> import javax.jms.Connection;
> import javax.jms.Message;
> import javax.jms.MessageConsumer;
> import javax.jms.MessageListener;
> import javax.jms.MessageProducer;
> import javax.jms.Queue;
> import javax.jms.Session;
> import javax.jms.ConnectionFactory;
> import org.apache.activemq.ActiveMQConnectionFactory;
> public class Test {
>         public static final void main(String[] args) throws Exception {
>                 ConnectionFactory cf = new ActiveMQConnectionFactory("failover:(tcp://localhost:61616)?jms.useAsyncSend=true&trackMessages=true");
>                 final Connection producerConn = cf.createConnection();
>                 final Connection consumerConn = cf.createConnection();
>                 final BufferedWriter producerLog = new BufferedWriter(new FileWriter("produced.log"));
>                 final BufferedWriter consumerLog = new BufferedWriter(new FileWriter("consumed.log"));
>                 new Thread(new Runnable() {
>                         public void run() {
>                                 try {
>                                         producerConn.start();
>                                         Session session = producerConn.createSession(false,
Session.CLIENT_ACKNOWLEDGE);
>                                         Queue queue = session.createQueue("TEST_QUEUE");
>                                         MessageProducer producer = session.createProducer(queue);
>                                         Random random = new Random();
>                                         byte[] messageBytes = new byte[1024];
>                                         for (int i = 0; i < 100000; i++) {
>                                         //while (true) {
>                                                 random.nextBytes(messageBytes);
>                                                 Message message = session.createObjectMessage(messageBytes);
>                                                 producer.send(message);
>                                                 producerLog.write(message.getJMSMessageID());
>                                                 producerLog.newLine();
>                                                 producerLog.flush();
>                                         }
>                                         System.out.println("Produced 100000 messages...");
>                                         producerLog.close();
>                                 }
>                                 catch (Exception e) {
>                                         e.printStackTrace();
>                                 }
>                         }
>                 }).start();
>                 System.out.println("Started producer...");
>                 new Thread(new Runnable() {
>                         public void run() {
>                                 try {
>                                         consumerConn.start();
>                                         Session session = consumerConn.createSession(false,
Session.CLIENT_ACKNOWLEDGE);
>                                         Queue queue = session.createQueue("TEST_QUEUE");
>                                         MessageConsumer consumer = session.createConsumer(queue);
>                                         consumer.setMessageListener(new MessageListener()
{
>                                                 public void onMessage(Message message)
{
>                                                         try {
>                                                                 message.acknowledge();
>                                                                 consumerLog.write(message.getJMSMessageID());
>                                                                 consumerLog.newLine();
>                                                                 consumerLog.flush();
>                                                         }
>                                                         catch (Exception e) {
>                                                                 e.printStackTrace();
>                                                         }
>                                                 }
>                                         });
>                                 }
>                                 catch (Exception e) {
>                                         e.printStackTrace();
>                                 }
>                         }
>                 }).start();
>                 System.out.println("Started consumer...");
>         }
> }
> {code}
> After the 100,000 messages have been produced, we can see the following difference in
the log files:
> {noformat}
> [pblackburn@xxxx test]$ diff produced.log consumed.log
> 10394d10393
> < ID:xxxx-35451-1285948546531-0:0:1:1:10394
> [pblackburn@xxxx test]$
> {noformat}
> Looking in the activemq log file, at around this point we see:
> {noformat}
> 2010-10-01 15:55:51 Queue [DEBUG] TEST_QUEUE toPageIn: 1, Inflight: 205, pagedInMessages.size
349, enqueueSize: 10390
> 2010-10-01 15:55:51 Queue [DEBUG] TEST_QUEUE toPageIn: 1, Inflight: 205, pagedInMessages.size
350, enqueueSize: 10391
> 2010-10-01 15:55:51 Queue [DEBUG] TEST_QUEUE toPageIn: 1, Inflight: 205, pagedInMessages.size
351, enqueueSize: 10392
> 2010-10-01 15:55:51 Queue [DEBUG] TEST_QUEUE toPageIn: 1, Inflight: 205, pagedInMessages.size
352, enqueueSize: 10393
> 2010-10-01 15:55:51 Usage [DEBUG] Main:memory:queue://TEST_QUEUE:memory: usage change
from: 69% of available memory, to: 70% of available memory
> 2010-10-01 15:55:51 Usage [DEBUG] Main:memory:queue://TEST_QUEUE:memory: usage change
from: 70% of available memory, to: 69% of available memory
> 2010-10-01 15:55:51 AbstractStoreCursor [DEBUG] TEST_QUEUE disabling cache on size:0,
lastCachedIdSeq: 10398 current node seqId: 10399
> 2010-10-01 15:55:51 Usage [DEBUG] Main:memory:queue://TEST_QUEUE:memory: usage change
from: 69% of available memory, to: 70% of available memory
> 2010-10-01 15:55:51 Queue [DEBUG] TEST_QUEUE toPageIn: 2, Inflight: 353, pagedInMessages.size
353, enqueueSize: 10395
> 2010-10-01 15:55:51 Usage [DEBUG] Main:memory:queue://TEST_QUEUE:memory: usage change
from: 70% of available memory, to: 69% of available memory
> 2010-10-01 15:55:51 Usage [DEBUG] Main:memory:queue://TEST_QUEUE:memory: usage change
from: 69% of available memory, to: 70% of available memory
> {noformat}
> At the end of the log file, where we have a single message stuck on the queue, we see:
> {noformat}
> 2010-10-01 15:56:10 Queue [DEBUG] TEST_QUEUE toPageIn: 1, Inflight: 0, pagedInMessages.size
0, enqueueSize: 100000
> 2010-10-01 15:56:10 Queue [DEBUG] TEST_QUEUE toPageIn: 1, Inflight: 0, pagedInMessages.size
0, enqueueSize: 100000
> 2010-10-01 15:56:10 Queue [DEBUG] TEST_QUEUE toPageIn: 1, Inflight: 0, pagedInMessages.size
0, enqueueSize: 100000
> 2010-10-01 15:56:10 Queue [DEBUG] TEST_QUEUE toPageIn: 1, Inflight: 0, pagedInMessages.size
0, enqueueSize: 100000
> 2010-10-01 15:56:10 Queue [DEBUG] TEST_QUEUE toPageIn: 1, Inflight: 0, pagedInMessages.size
0, enqueueSize: 100000
> 2010-10-01 15:56:10 Queue [DEBUG] TEST_QUEUE toPageIn: 1, Inflight: 0, pagedInMessages.size
0, enqueueSize: 100000
> {noformat}
> We can see the checkpoint failing to clean up the log files:
> {noformat}
> 2010-10-01 15:56:36 MessageDatabase [DEBUG] Checkpoint started.
> 2010-10-01 15:56:36 MessageDatabase [DEBUG] not removing data file: 2 as contained ack(s)
refer to referenced file: [1, 2]
> 2010-10-01 15:56:36 MessageDatabase [DEBUG] not removing data file: 3 as contained ack(s)
refer to referenced file: [2, 3]
> 2010-10-01 15:56:36 MessageDatabase [DEBUG] not removing data file: 4 as contained ack(s)
refer to referenced file: [3, 4]
> 2010-10-01 15:56:36 MessageDatabase [DEBUG] not removing data file: 5 as contained ack(s)
refer to referenced file: [4, 5]
> 2010-10-01 15:56:36 MessageDatabase [DEBUG] Checkpoint done.
> {noformat}
> At this point our consumer had consumed all of the messages except the single stuck message.
> We are using a clean out of the box set up - we have made no changes to the default activemq.xml
file,

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.


Mime
View raw message