rocketmq-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (ROCKETMQ-265) when os crash for some reasons, the broker consume queue’s data maybe repeat, consumer can’t pull the latest message, cause message lag
Date Wed, 13 Dec 2017 08:29:00 GMT

    [ https://issues.apache.org/jira/browse/ROCKETMQ-265?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16288872#comment-16288872
] 

ASF GitHub Bot commented on ROCKETMQ-265:
-----------------------------------------

vongosling closed pull request #146: [ROCKETMQ-265] fix consume queue’s data maybe repeat
bug
URL: https://github.com/apache/rocketmq/pull/146
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
index 0bf0aa9a..4922e3d9 100644
--- a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
+++ b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
@@ -446,6 +446,13 @@ private boolean putMessagePositionInfo(final long offset, final int size,
final
 
             if (cqOffset != 0) {
                 long currentLogicOffset = mappedFile.getWrotePosition() + mappedFile.getFileFromOffset();
+
+                if (expectLogicOffset < currentLogicOffset) {
+                    log.warn("Build  consume queue repeatedly, expectLogicOffset: {} currentLogicOffset:
{} Topic: {} QID: {} Diff: {}",
+                        expectLogicOffset, currentLogicOffset, this.topic, this.queueId,
expectLogicOffset - currentLogicOffset);
+                    return true;
+                }
+
                 if (expectLogicOffset != currentLogicOffset) {
                     LOG_ERROR.warn(
                         "[BUG]logic queue order maybe wrong, expectLogicOffset: {} currentLogicOffset:
{} Topic: {} QID: {} Diff: {}",
diff --git a/store/src/test/java/org/apache/rocketmq/store/ConsumeQueueTest.java b/store/src/test/java/org/apache/rocketmq/store/ConsumeQueueTest.java
index b03f2fce..b7d38f8c 100644
--- a/store/src/test/java/org/apache/rocketmq/store/ConsumeQueueTest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/ConsumeQueueTest.java
@@ -17,22 +17,21 @@
 
 package org.apache.rocketmq.store;
 
-import org.apache.rocketmq.common.BrokerConfig;
-import org.apache.rocketmq.common.UtilAll;
-import org.apache.rocketmq.common.message.MessageDecoder;
-import org.apache.rocketmq.store.config.MessageStoreConfig;
-import org.apache.rocketmq.store.stats.BrokerStatsManager;
-import org.junit.Test;
-
 import java.io.File;
+import java.lang.reflect.Method;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
 import java.net.UnknownHostException;
 import java.nio.ByteBuffer;
 import java.util.Map;
-
+import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.message.MessageDecoder;
+import org.apache.rocketmq.store.config.MessageStoreConfig;
+import org.apache.rocketmq.store.stats.BrokerStatsManager;
 import static org.assertj.core.api.Assertions.assertThat;
+import org.junit.Test;
 
 public class ConsumeQueueTest {
 
@@ -131,6 +130,65 @@ protected void putMsg(DefaultMessageStore master) throws Exception {
         }
     }
 
+    protected void deleteDirectory(String rootPath) {
+        File file = new File(rootPath);
+        deleteFile(file);
+    }
+
+    protected void deleteFile(File file) {
+        File[] subFiles = file.listFiles();
+        if (subFiles != null) {
+            for (File sub : subFiles) {
+                deleteFile(sub);
+            }
+        }
+
+        file.delete();
+    }
+
+    @Test
+    public void testPutMessagePositionInfo_buildCQRepeatedly() throws Exception {
+        DefaultMessageStore messageStore = null;
+        try {
+
+            messageStore = gen();
+
+            int totalMessages = 10;
+
+            for (int i = 0; i < totalMessages; i++) {
+                putMsg(messageStore);
+            }
+            Thread.sleep(5);
+
+            ConsumeQueue cq = messageStore.getConsumeQueueTable().get(topic).get(queueId);
+            Method method = cq.getClass().getDeclaredMethod("putMessagePositionInfo", long.class,
int.class, long.class, long.class);
+
+            assertThat(method).isNotNull();
+
+            method.setAccessible(true);
+
+            SelectMappedBufferResult result = messageStore.getCommitLog().getData(0);
+            assertThat(result != null).isTrue();
+
+            DispatchRequest dispatchRequest = messageStore.getCommitLog().checkMessageAndReturnSize(result.getByteBuffer(),
false, false);
+
+            assertThat(cq).isNotNull();
+
+            Object dispatchResult = method.invoke(cq, dispatchRequest.getCommitLogOffset(),
+                dispatchRequest.getMsgSize(), dispatchRequest.getTagsCode(), dispatchRequest.getConsumeQueueOffset());
+
+            assertThat(Boolean.parseBoolean(dispatchResult.toString())).isTrue();
+
+        } finally {
+            if (messageStore != null) {
+                messageStore.shutdown();
+                messageStore.destroy();
+            }
+            deleteDirectory(storePath);
+        }
+
+    }
+
     @Test
     public void testConsumeQueueWithExtendData() {
         DefaultMessageStore master = null;


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> when os crash for some reasons, the broker consume queue’s data maybe repeat, consumer
can’t pull the latest message, cause message lag
> ---------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: ROCKETMQ-265
>                 URL: https://issues.apache.org/jira/browse/ROCKETMQ-265
>             Project: Apache RocketMQ
>          Issue Type: Bug
>          Components: rocketmq-store
>    Affects Versions: 4.0.0-incubating, 4.1.0-incubating
>            Reporter: yubaofu
>            Assignee: yukon
>            Priority: Critical
>              Labels: bug
>             Fix For: 4.2.0
>
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> when os crash for some reasons, the broker consume queue’s data maybe repeat, consumer
can’t pull the latest message, cause message lag



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message