rocketmq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jinrongt...@apache.org
Subject [rocketmq] 01/02: optimise benchmark consumer, add consume fail rate option
Date Mon, 20 Jan 2020 12:54:57 GMT
This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch apache-master
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit 9da19d2e374b50ba00d1245f3b5b0a7f6063501c
Author: huangli <areyouok@gmail.com>
AuthorDate: Fri Dec 20 12:24:01 2019 +0800

    optimise benchmark consumer, add consume fail rate option
---
 .../rocketmq/example/benchmark/Consumer.java       | 41 +++++++++++++++++-----
 1 file changed, 32 insertions(+), 9 deletions(-)

diff --git a/example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java b/example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java
index 4724a1d..08897fa 100644
--- a/example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java
@@ -22,7 +22,9 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Timer;
 import java.util.TimerTask;
+import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.atomic.AtomicLong;
+
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.Option;
 import org.apache.commons.cli.Options;
@@ -49,15 +51,16 @@ public class Consumer {
 
         final String topic = commandLine.hasOption('t') ? commandLine.getOptionValue('t').trim()
: "BenchmarkTest";
         final String groupPrefix = commandLine.hasOption('g') ? commandLine.getOptionValue('g').trim()
: "benchmark_consumer";
-        final String isPrefixEnable = commandLine.hasOption('p') ? commandLine.getOptionValue('p').trim()
: "true";
+        final String isSuffixEnable = commandLine.hasOption('p') ? commandLine.getOptionValue('p').trim()
: "true";
         final String filterType = commandLine.hasOption('f') ? commandLine.getOptionValue('f').trim()
: null;
         final String expression = commandLine.hasOption('e') ? commandLine.getOptionValue('e').trim()
: null;
+        final double failRate = commandLine.hasOption('r') ? Double.parseDouble(commandLine.getOptionValue('r').trim())
: 0.0;
         String group = groupPrefix;
-        if (Boolean.parseBoolean(isPrefixEnable)) {
-            group = groupPrefix + "_" + Long.toString(System.currentTimeMillis() % 100);
+        if (Boolean.parseBoolean(isSuffixEnable)) {
+            group = groupPrefix + "_" + (System.currentTimeMillis() % 100);
         }
 
-        System.out.printf("topic: %s, group: %s, prefix: %s, filterType: %s, expression:
%s%n", topic, group, isPrefixEnable, filterType, expression);
+        System.out.printf("topic: %s, group: %s, suffix: %s, filterType: %s, expression:
%s%n", topic, group, isSuffixEnable, filterType, expression);
 
         final StatsBenchmarkConsumer statsBenchmarkConsumer = new StatsBenchmarkConsumer();
 
@@ -85,9 +88,15 @@ public class Consumer {
                         (long) (((end[1] - begin[1]) / (double) (end[0] - begin[0])) * 1000L);
                     final double averageB2CRT = (end[2] - begin[2]) / (double) (end[1] -
begin[1]);
                     final double averageS2CRT = (end[3] - begin[3]) / (double) (end[1] -
begin[1]);
+                    final long failCount = end[4] - begin[4];
+                    final long b2cMax = statsBenchmarkConsumer.getBorn2ConsumerMaxRT().get();
+                    final long s2cMax = statsBenchmarkConsumer.getStore2ConsumerMaxRT().get();
+
+                    statsBenchmarkConsumer.getBorn2ConsumerMaxRT().set(0);
+                    statsBenchmarkConsumer.getStore2ConsumerMaxRT().set(0);
 
-                    System.out.printf("Consume TPS: %d Average(B2C) RT: %7.3f Average(S2C)
RT: %7.3f MAX(B2C) RT: %d MAX(S2C) RT: %d%n",
-                        consumeTps, averageB2CRT, averageS2CRT, end[4], end[5]
+                    System.out.printf("TPS: %d FAIL: %d AVG(B2C) RT: %7.3f AVG(S2C) RT: %7.3f
MAX(B2C) RT: %d MAX(S2C) RT: %d%n",
+                            consumeTps, failCount, averageB2CRT, averageS2CRT, b2cMax, s2cMax
                     );
                 }
             }
@@ -144,7 +153,12 @@ public class Consumer {
 
                 compareAndSetMax(statsBenchmarkConsumer.getStore2ConsumerMaxRT(), store2ConsumerRT);
 
-                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+                if (ThreadLocalRandom.current().nextDouble() < failRate) {
+                    statsBenchmarkConsumer.getFailCount().incrementAndGet();
+                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
+                } else {
+                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+                }
             }
         });
 
@@ -174,6 +188,10 @@ public class Consumer {
         opt.setRequired(false);
         options.addOption(opt);
 
+        opt = new Option("r", "fail rate", true, "consumer fail rate, default 0");
+        opt.setRequired(false);
+        options.addOption(opt);
+
         return options;
     }
 
@@ -200,14 +218,15 @@ class StatsBenchmarkConsumer {
 
     private final AtomicLong store2ConsumerMaxRT = new AtomicLong(0L);
 
+    private final AtomicLong failCount = new AtomicLong(0L);
+
     public Long[] createSnapshot() {
         Long[] snap = new Long[] {
             System.currentTimeMillis(),
             this.receiveMessageTotalCount.get(),
             this.born2ConsumerTotalRT.get(),
             this.store2ConsumerTotalRT.get(),
-            this.born2ConsumerMaxRT.get(),
-            this.store2ConsumerMaxRT.get(),
+            this.failCount.get()
         };
 
         return snap;
@@ -232,4 +251,8 @@ class StatsBenchmarkConsumer {
     public AtomicLong getStore2ConsumerMaxRT() {
         return store2ConsumerMaxRT;
     }
+
+    public AtomicLong getFailCount() {
+        return failCount;
+    }
 }


Mime
View raw message