kafka-jira mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (KAFKA-6424) QueryableStateIntegrationTest#queryOnRebalance should accept raw text
Date Fri, 16 Feb 2018 20:37:00 GMT

    [ https://issues.apache.org/jira/browse/KAFKA-6424?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16367849#comment-16367849
] 

ASF GitHub Bot commented on KAFKA-6424:
---------------------------------------

mjsax closed pull request #4549: KAFKA-6424: QueryableStateIntegrationTest#queryOnRebalance
should accept raw text
URL: https://github.com/apache/kafka/pull/4549
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
index fc2eaccebc9..8b4e8957ed5 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
@@ -65,15 +65,20 @@
 import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
-import java.util.Locale;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
@@ -90,6 +95,8 @@
 
 @Category({IntegrationTest.class})
 public class QueryableStateIntegrationTest {
+    private static final Logger log = LoggerFactory.getLogger(QueryableStateIntegrationTest.class);
+
     private static final int NUM_BROKERS = 1;
 
     @ClassRule
@@ -133,6 +140,40 @@ private void createTopics() throws InterruptedException {
         CLUSTER.createTopics(outputTopic, outputTopicConcurrent, outputTopicConcurrentWindowed,
outputTopicThree);
     }
 
+    /**
+     * Try to read inputValues from {@code resources/QueryableStateIntegrationTest/inputValues.txt},
which might be useful
+     * for larger scale testing. In case of exception, for instance if no such file can be
read, return a small list
+     * which satisfies all the prerequisites of the tests.
+     */
+    private List<String> getInputValues() {
+        List<String> input = new ArrayList<>();
+        final ClassLoader classLoader = getClass().getClassLoader();
+        final String fileName = "QueryableStateIntegrationTest" + File.separator + "inputValues.txt";
+        try (final BufferedReader reader = new BufferedReader(new FileReader(classLoader.getResource(fileName).getFile())))
{
+            for (String line = reader.readLine(); line != null; line = reader.readLine())
{
+                input.add(line);
+            }
+        } catch (final Exception e) {
+            log.warn("Unable to read '{}{}{}'. Using default inputValues list", "resources",
File.separator, fileName);
+            input = Arrays.asList(
+                        "hello world",
+                        "all streams lead to kafka",
+                        "streams",
+                        "kafka streams",
+                        "the cat in the hat",
+                        "green eggs and ham",
+                        "that Sam i am",
+                        "up the creek without a paddle",
+                        "run forest run",
+                        "a tank full of gas",
+                        "eat sleep rave repeat",
+                        "one jolly sailor",
+                        "king of the world");
+
+        }
+        return input;
+    }
+
     @Before
     public void before() throws Exception {
         testNo++;
@@ -167,20 +208,7 @@ public int compare(final KeyValue<String, Long> o1,
                 return o1.key.compareTo(o2.key);
             }
         };
-        inputValues = Arrays.asList(
-            "hello world",
-            "all streams lead to kafka",
-            "streams",
-            "kafka streams",
-            "the cat in the hat",
-            "green eggs and ham",
-            "that sam i am",
-            "up the creek without a paddle",
-            "run forest run",
-            "a tank full of gas",
-            "eat sleep rave repeat",
-            "one jolly sailor",
-            "king of the world");
+        inputValues = getInputValues();
         inputValuesKeys = new HashSet<>();
         for (final String sentence : inputValues) {
             final String[] words = sentence.split("\\W+");
@@ -215,7 +243,7 @@ private KafkaStreams createCountStream(final String inputTopic,
             .flatMapValues(new ValueMapper<String, Iterable<String>>() {
                 @Override
                 public Iterable<String> apply(final String value) {
-                    return Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+"));
+                    return Arrays.asList(value.split("\\W+"));
                 }
             })
             .groupBy(MockMapper.<String, String>selectValueMapper());
@@ -376,7 +404,7 @@ public void queryOnRebalance() throws InterruptedException {
                     storeName + "-" + streamThree);
                 verifyAllWindowedKeys(streamRunnables, streamRunnables[i].getStream(), streamRunnables[i].getStateListener(),
inputValuesKeys,
                     windowStoreName + "-" + streamThree, 0L, WINDOW_SIZE);
-                assertEquals(streamRunnables[i].getStream().state(), KafkaStreams.State.RUNNING);
+                assertEquals(KafkaStreams.State.RUNNING, streamRunnables[i].getStream().state());
             }
 
             // kill N-1 threads
@@ -391,7 +419,7 @@ public void queryOnRebalance() throws InterruptedException {
                 storeName + "-" + streamThree);
             verifyAllWindowedKeys(streamRunnables, streamRunnables[0].getStream(), streamRunnables[0].getStateListener(),
inputValuesKeys,
                 windowStoreName + "-" + streamThree, 0L, WINDOW_SIZE);
-            assertEquals(streamRunnables[0].getStream().state(), KafkaStreams.State.RUNNING);
+            assertEquals(KafkaStreams.State.RUNNING, streamRunnables[0].getStream().state());
         } finally {
             for (int i = 0; i < numThreads; i++) {
                 if (!streamRunnables[i].isClosed()) {


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> QueryableStateIntegrationTest#queryOnRebalance should accept raw text
> ---------------------------------------------------------------------
>
>                 Key: KAFKA-6424
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6424
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>            Reporter: Ted Yu
>            Assignee: Filipe Agapito
>            Priority: Minor
>              Labels: newbie, unit-test
>
> I was using QueryableStateIntegrationTest#queryOnRebalance for some performance test
by adding more sentences to inputValues.
> I found that when the sentence contains upper case letter, the test would timeout.
> I get around this limitation by calling {{sentence.toLowerCase(Locale.ROOT)}} before
the split.
> Ideally we can specify the path to text file which contains the text. The test can read
the text file and generate the input array.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message