kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From damian...@apache.org
Subject kafka git commit: MINOR: Added a couple of unit tests for KStreamPrint node when values are bytes
Date Fri, 04 Aug 2017 11:52:19 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 332a8129d -> c06fa5ada


MINOR: Added a couple of unit tests for KStreamPrint node when values are bytes

With current tests, the deserialization inside the KStreamPrint node processor which happens
when key and/or values are byte[] isn't tested. This PR fixes that.

Author: Paolo Patierno <ppatierno@live.com>

Reviewers: Damian Guy <damian.guy@gmail.com>, Bill Bejeck <bbejeck@gmail.com>

Closes #3611 from ppatierno/minor-kstream-print-test


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c06fa5ad
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c06fa5ad
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c06fa5ad

Branch: refs/heads/trunk
Commit: c06fa5adaaf7fe940c314a0be97e122e921de970
Parents: 332a812
Author: Paolo Patierno <ppatierno@live.com>
Authored: Fri Aug 4 12:52:13 2017 +0100
Committer: Damian Guy <damian.guy@gmail.com>
Committed: Fri Aug 4 12:52:13 2017 +0100

----------------------------------------------------------------------
 .../kstream/internals/KStreamPrintTest.java     | 103 +++++++++----------
 1 file changed, 50 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/c06fa5ad/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPrintTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPrintTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPrintTest.java
index 2813a08..c9bbb07 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPrintTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPrintTest.java
@@ -19,14 +19,13 @@ package org.apache.kafka.streams.kstream.internals;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.StreamsBuilder;
-import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.streams.kstream.PrintForeachAction;
-import org.apache.kafka.test.KStreamTestDriver;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
 
+import org.easymock.EasyMock;
 import org.junit.Before;
-import org.junit.Rule;
 import org.junit.Test;
 
 import java.io.ByteArrayOutputStream;
@@ -41,62 +40,39 @@ import static org.junit.Assert.assertEquals;
 
 public class KStreamPrintTest {
 
-    private final String topicName = "topic";
     private final Serde<Integer> intSerd = Serdes.Integer();
     private final Serde<String> stringSerd = Serdes.String();
     private PrintWriter printWriter;
     private ByteArrayOutputStream byteOutStream;
-    @Rule
-    public KStreamTestDriver driver = new KStreamTestDriver();
+
+    private KeyValueMapper<Integer, String, String> mapper;
+    private KStreamPrint kStreamPrint;
+    private Processor printProcessor;
 
     @Before
     public void setUp() {
         byteOutStream = new ByteArrayOutputStream();
         printWriter = new PrintWriter(new OutputStreamWriter(byteOutStream, StandardCharsets.UTF_8));
-    }
 
-    @Test
-    public void testPrintKeyValueWithName() {
-        KeyValueMapper<Integer, String, String> mapper = new KeyValueMapper<Integer,
String, String>() {
+        mapper = new KeyValueMapper<Integer, String, String>() {
             @Override
-            public String apply(Integer key, String value) {
+            public String apply(final Integer key, final String value) {
                 return String.format("%d, %s", key, value);
             }
         };
-        final KStreamPrint<Integer, String> kStreamPrint = new KStreamPrint<>(new
PrintForeachAction<>(printWriter, mapper, "test-stream"), intSerd, stringSerd);
 
-        final List<KeyValue<Integer, String>> inputRecords = Arrays.asList(
-                new KeyValue<>(0, "zero"),
-                new KeyValue<>(1, "one"),
-                new KeyValue<>(2, "two"),
-                new KeyValue<>(3, "three"));
-        
-        final String[] expectedResult = {"[test-stream]: 0, zero", "[test-stream]: 1, one",
"[test-stream]: 2, two", "[test-stream]: 3, three"};
-        
-        final StreamsBuilder builder = new StreamsBuilder();
-        final KStream<Integer, String> stream = builder.stream(intSerd, stringSerd,
topicName);
-        stream.process(kStreamPrint);
-        
-        driver.setUp(builder);
-        for (KeyValue<Integer, String> record: inputRecords) {
-            driver.process(topicName, record.key, record.value);
-        }
-        printWriter.flush();
-        final String[] flushOutDatas = new String(byteOutStream.toByteArray(), Charset.forName("UTF-8")).split("\\r*\\n");
-        for (int i = 0; i < flushOutDatas.length; i++) {
-            assertEquals(expectedResult[i], flushOutDatas[i]);
-        }
+        kStreamPrint = new KStreamPrint<>(new PrintForeachAction<>(printWriter,
mapper, "test-stream"), intSerd, stringSerd);
+
+        printProcessor = kStreamPrint.get();
+        ProcessorContext processorContext = EasyMock.createNiceMock(ProcessorContext.class);
+        EasyMock.replay(processorContext);
+
+        printProcessor.init(processorContext);
     }
 
     @Test
+    @SuppressWarnings("unchecked")
     public void testPrintStreamWithProvidedKeyValueMapper() {
-        final KeyValueMapper<Integer, String, String> mapper = new KeyValueMapper<Integer,
String, String>() {
-            @Override
-            public String apply(Integer key, String value) {
-                return String.format("(%d, %s)", key, value);
-            }
-        };
-        final KStreamPrint<Integer, String> kStreamPrint = new KStreamPrint<>(new
PrintForeachAction<>(printWriter, mapper, "test-stream"), intSerd, stringSerd);
 
         final List<KeyValue<Integer, String>> inputRecords = Arrays.asList(
                 new KeyValue<>(0, "zero"),
@@ -104,21 +80,42 @@ public class KStreamPrintTest {
                 new KeyValue<>(2, "two"),
                 new KeyValue<>(3, "three"));
 
-        final String[] expectedResult = {"[test-stream]: (0, zero)", "[test-stream]: (1,
one)", "[test-stream]: (2, two)", "[test-stream]: (3, three)"};
+        final String[] expectedResult = {"[test-stream]: 0, zero", "[test-stream]: 1, one",
"[test-stream]: 2, two", "[test-stream]: 3, three"};
 
-        final StreamsBuilder builder = new StreamsBuilder();
-        final KStream<Integer, String> stream = builder.stream(intSerd, stringSerd,
topicName);
-        stream.process(kStreamPrint);
+        doTest(inputRecords, expectedResult);
+    }
 
-        driver.setUp(builder);
-        for (KeyValue<Integer, String> record: inputRecords) {
-            driver.process(topicName, record.key, record.value);
-        }
-        printWriter.flush();
-        final String[] results = new String(byteOutStream.toByteArray(), Charset.forName("UTF-8")).split("\\r*\\n");
-        for (int i = 0; i < results.length; i++) {
-            assertEquals(expectedResult[i], results[i]);
+    @Test
+    @SuppressWarnings("unchecked")
+    public void testPrintKeyValueStringBytesArray() {
+
+        // we don't have a topic name because we don't need it for the test at this level
+        final List<KeyValue<byte[], byte[]>> inputRecords = Arrays.asList(
+                new KeyValue<>(intSerd.serializer().serialize(null, 0), stringSerd.serializer().serialize(null,
"zero")),
+                new KeyValue<>(intSerd.serializer().serialize(null, 1), stringSerd.serializer().serialize(null,
"one")),
+                new KeyValue<>(intSerd.serializer().serialize(null, 2), stringSerd.serializer().serialize(null,
"two")),
+                new KeyValue<>(intSerd.serializer().serialize(null, 3), stringSerd.serializer().serialize(null,
"three")));
+
+        final String[] expectedResult = {"[test-stream]: 0, zero", "[test-stream]: 1, one",
"[test-stream]: 2, two", "[test-stream]: 3, three"};
+
+        doTest(inputRecords, expectedResult);
+    }
+
+    private void assertFlushData(final String[] expectedResult, final ByteArrayOutputStream
byteOutStream) {
+
+        final String[] flushOutDatas = new String(byteOutStream.toByteArray(), Charset.forName("UTF-8")).split("\\r*\\n");
+        for (int i = 0; i < flushOutDatas.length; i++) {
+            assertEquals(expectedResult[i], flushOutDatas[i]);
         }
     }
 
+    @SuppressWarnings("unchecked")
+    private <K, V> void doTest(final List<KeyValue<K, V>> inputRecords,
final String[] expectedResult) {
+
+        for (KeyValue<K, V> record: inputRecords) {
+            printProcessor.process(record.key, record.value);
+        }
+        printWriter.flush();
+        assertFlushData(expectedResult, byteOutStream);
+    }
 }


Mime
View raw message