kafka-jira mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (KAFKA-4772) Exploit #peek to implement #print() and other methods
Date Tue, 30 Jan 2018 04:43:00 GMT

    [ https://issues.apache.org/jira/browse/KAFKA-4772?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16344494#comment-16344494
] 

ASF GitHub Bot commented on KAFKA-4772:
---------------------------------------

guozhangwang closed pull request #2704: KAFKA-4772: [WIP] Use KStreamPeek to replace KeyValuePrinter
URL: https://github.com/apache/kafka/pull/2704
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index 79abbb558eb..62fbecdf527 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -192,7 +192,7 @@ public void print(Serde<K> keySerde, Serde<V> valSerde) {
     public void print(Serde<K> keySerde, Serde<V> valSerde, String streamName)
{
         String name = topology.newName(PRINTING_NAME);
         streamName = (streamName == null) ? this.name : streamName;
-        topology.addProcessor(name, new KeyValuePrinter<>(keySerde, valSerde, streamName),
this.name);
+        topology.addProcessor(name, new KeyValuePrinter<>(System.out, keySerde, valSerde,
streamName), this.name);
     }
 
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValuePrinter.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValuePrinter.java
index e193e52ec34..e4b142d92eb 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValuePrinter.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValuePrinter.java
@@ -18,104 +18,63 @@
 
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.streams.kstream.ForeachAction;
 import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.ProcessorSupplier;
 
 import java.io.PrintStream;
 
+class KeyValuePrinter<K, V> extends KStreamPeek {
 
-class KeyValuePrinter<K, V> implements ProcessorSupplier<K, V> {
-
-    private final PrintStream printStream;
+    private PrintStream printStream;
     private Serde<?> keySerde;
     private Serde<?> valueSerde;
     private String streamName;
 
-
-    KeyValuePrinter(PrintStream printStream, Serde<?> keySerde, Serde<?> valueSerde,
String streamName) {
+    public KeyValuePrinter(final PrintStream printStream, final Serde<?> keySerde,
final Serde<?> valueSerde, final String streamName) {
+        super(null);
+        this.printStream = printStream;
         this.keySerde = keySerde;
         this.valueSerde = valueSerde;
         this.streamName = streamName;
-        if (printStream == null) {
-            this.printStream = System.out;
-        } else {
-            this.printStream = printStream;
-        }
-    }
-
-    KeyValuePrinter(PrintStream printStream, String streamName) {
-        this(printStream, null, null, streamName);
-    }
-
-    KeyValuePrinter(Serde<?> keySerde, Serde<?> valueSerde, String streamName)
{
-        this(System.out, keySerde, valueSerde, streamName);
     }
 
     @Override
     public Processor<K, V> get() {
-        return new KeyValuePrinterProcessor(this.printStream, this.keySerde, this.valueSerde,
this.streamName);
+        return new KStreamPeekProcessor();
     }
 
-
-    private class KeyValuePrinterProcessor extends AbstractProcessor<K, V> {
-        private final PrintStream printStream;
-        private Serde<?> keySerde;
-        private Serde<?> valueSerde;
-        private ProcessorContext processorContext;
-        private String streamName;
-
-        private KeyValuePrinterProcessor(PrintStream printStream, Serde<?> keySerde,
Serde<?> valueSerde, String streamName) {
-            this.printStream = printStream;
-            this.keySerde = keySerde;
-            this.valueSerde = valueSerde;
-            this.streamName = streamName;
-        }
+    private class KStreamPeekProcessor extends AbstractProcessor<K, V> {
+        ForeachAction<K, V> action = printAction(context(), printStream, keySerde,
valueSerde, streamName);
 
         @Override
-        public void init(ProcessorContext context) {
-            this.processorContext = context;
-
-            if (this.keySerde == null) {
-                keySerde = this.processorContext.keySerde();
-            }
-
-            if (this.valueSerde == null) {
-                valueSerde = this.processorContext.valueSerde();
-            }
-        }
-
-        @Override
-        public void process(K key, V value) {
-            K keyToPrint = (K) maybeDeserialize(key, keySerde.deserializer());
-            V valueToPrint = (V) maybeDeserialize(value, valueSerde.deserializer());
-
-            printStream.println("[" + this.streamName + "]: " + keyToPrint + " , " + valueToPrint);
-
-            this.processorContext.forward(key, value);
+        public void process(final K key, final V value) {
+            action.apply(key, value);
+            context().forward(key, value);
         }
+    }
 
-
-        private Object maybeDeserialize(Object receivedElement, Deserializer<?> deserializer)
{
-            if (receivedElement == null) {
-                return null;
+    private static <K, V> ForeachAction<K, V> printAction(final ProcessorContext
context, final PrintStream printStream, final Serde<?> keySerde, final Serde<?>
valueSerde, final String streamName) {
+        return new ForeachAction<K, V>() {
+            @Override
+            public void apply(final K key, final V value) {
+                K keyToPrint = (K) maybeDeserialize(key, keySerde.deserializer());
+                V valueToPrint = (V) maybeDeserialize(value, valueSerde.deserializer());
+                printStream.println("[" + streamName + "]: " + keyToPrint + " , " + valueToPrint);
             }
 
-            if (receivedElement instanceof byte[]) {
-                return deserializer.deserialize(this.processorContext.topic(), (byte[]) receivedElement);
-            }
+            private Object maybeDeserialize(Object receivedElement, Deserializer<?>
deserializer) {
+                if (receivedElement == null) {
+                    return null;
+                }
 
-            return receivedElement;
-        }
+                if (receivedElement instanceof byte[]) {
+                    return deserializer.deserialize(context.topic(), (byte[]) receivedElement);
+                }
 
-        @Override
-        public void close() {
-            if (this.printStream == System.out) {
-                this.printStream.flush();
-            } else {
-                this.printStream.close();
+                return receivedElement;
             }
-        }
+        };
     }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Exploit #peek to implement #print() and other methods
> -----------------------------------------------------
>
>                 Key: KAFKA-4772
>                 URL: https://issues.apache.org/jira/browse/KAFKA-4772
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>            Reporter: Matthias J. Sax
>            Assignee: james chien
>            Priority: Minor
>              Labels: beginner, newbie
>             Fix For: 0.11.0.0
>
>
> From: https://github.com/apache/kafka/pull/2493#pullrequestreview-22157555
> Things that I can think of:
> - print / writeAsTest can be a special impl of peek; KStreamPrint etc can be removed.
> - consider collapse KStreamPeek with KStreamForeach with a flag parameter indicating
if the acted key-value pair should still be forwarded.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message