From jira-return-9231-archive-asf-public=cust-asf.ponee.io@kafka.apache.org Tue Jan 30 05:43:10 2018 Return-Path: X-Original-To: archive-asf-public@eu.ponee.io Delivered-To: archive-asf-public@eu.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by mx-eu-01.ponee.io (Postfix) with ESMTP id 4E47D180654 for ; Tue, 30 Jan 2018 05:43:10 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 3E40C160C3F; Tue, 30 Jan 2018 04:43:10 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 5EC8C160C31 for ; Tue, 30 Jan 2018 05:43:09 +0100 (CET) Received: (qmail 96146 invoked by uid 500); 30 Jan 2018 04:43:08 -0000 Mailing-List: contact jira-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: jira@kafka.apache.org Delivered-To: mailing list jira@kafka.apache.org Received: (qmail 96135 invoked by uid 99); 30 Jan 2018 04:43:08 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 30 Jan 2018 04:43:08 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id 127C2C040D for ; Tue, 30 Jan 2018 04:43:08 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -109.511 X-Spam-Level: X-Spam-Status: No, score=-109.511 tagged_above=-999 required=6.31 tests=[ENV_AND_HDR_SPF_MATCH=-0.5, KAM_ASCII_DIVIDERS=0.8, RCVD_IN_DNSWL_MED=-2.3, SPF_PASS=-0.001, T_RP_MATCHES_RCVD=-0.01, USER_IN_DEF_SPF_WL=-7.5, USER_IN_WHITELIST=-100] autolearn=disabled Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id R3jJyMXBg8T9 for ; Tue, 30 Jan 2018 04:43:06 +0000 (UTC) Received: from mailrelay1-us-west.apache.org (mailrelay1-us-west.apache.org [209.188.14.139]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTP id 076E15F341 for ; Tue, 30 Jan 2018 04:43:06 +0000 (UTC) Received: from jira-lw-us.apache.org (unknown [207.244.88.139]) by mailrelay1-us-west.apache.org (ASF Mail Server at mailrelay1-us-west.apache.org) with ESMTP id 749F9E012D for ; Tue, 30 Jan 2018 04:43:00 +0000 (UTC) Received: from jira-lw-us.apache.org (localhost [127.0.0.1]) by jira-lw-us.apache.org (ASF Mail Server at jira-lw-us.apache.org) with ESMTP id 2CB2721301 for ; Tue, 30 Jan 2018 04:43:00 +0000 (UTC) Date: Tue, 30 Jan 2018 04:43:00 +0000 (UTC) From: "ASF GitHub Bot (JIRA)" To: jira@kafka.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Commented] (KAFKA-4772) Exploit #peek to implement #print() and other methods MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 7bit X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 [ https://issues.apache.org/jira/browse/KAFKA-4772?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16344494#comment-16344494 ] ASF GitHub Bot commented on KAFKA-4772: --------------------------------------- guozhangwang closed pull request #2704: KAFKA-4772: [WIP] Use KStreamPeek to replace KeyValuePrinter URL: https://github.com/apache/kafka/pull/2704 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java index 79abbb558eb..62fbecdf527 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java @@ -192,7 +192,7 @@ public void print(Serde keySerde, Serde valSerde) { public void print(Serde keySerde, Serde valSerde, String streamName) { String name = topology.newName(PRINTING_NAME); streamName = (streamName == null) ? this.name : streamName; - topology.addProcessor(name, new KeyValuePrinter<>(keySerde, valSerde, streamName), this.name); + topology.addProcessor(name, new KeyValuePrinter<>(System.out, keySerde, valSerde, streamName), this.name); } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValuePrinter.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValuePrinter.java index e193e52ec34..e4b142d92eb 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValuePrinter.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValuePrinter.java @@ -18,104 +18,63 @@ import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.streams.kstream.ForeachAction; import org.apache.kafka.streams.processor.AbstractProcessor; import org.apache.kafka.streams.processor.Processor; import org.apache.kafka.streams.processor.ProcessorContext; -import org.apache.kafka.streams.processor.ProcessorSupplier; import java.io.PrintStream; +class KeyValuePrinter extends KStreamPeek { -class KeyValuePrinter implements ProcessorSupplier { - - private final PrintStream printStream; + private PrintStream printStream; private Serde keySerde; private Serde valueSerde; private String streamName; - - KeyValuePrinter(PrintStream printStream, Serde keySerde, Serde valueSerde, String streamName) { + public KeyValuePrinter(final PrintStream printStream, final Serde keySerde, final Serde valueSerde, final String streamName) { + super(null); + this.printStream = printStream; this.keySerde = keySerde; this.valueSerde = valueSerde; this.streamName = streamName; - if (printStream == null) { - this.printStream = System.out; - } else { - this.printStream = printStream; - } - } - - KeyValuePrinter(PrintStream printStream, String streamName) { - this(printStream, null, null, streamName); - } - - KeyValuePrinter(Serde keySerde, Serde valueSerde, String streamName) { - this(System.out, keySerde, valueSerde, streamName); } @Override public Processor get() { - return new KeyValuePrinterProcessor(this.printStream, this.keySerde, this.valueSerde, this.streamName); + return new KStreamPeekProcessor(); } - - private class KeyValuePrinterProcessor extends AbstractProcessor { - private final PrintStream printStream; - private Serde keySerde; - private Serde valueSerde; - private ProcessorContext processorContext; - private String streamName; - - private KeyValuePrinterProcessor(PrintStream printStream, Serde keySerde, Serde valueSerde, String streamName) { - this.printStream = printStream; - this.keySerde = keySerde; - this.valueSerde = valueSerde; - this.streamName = streamName; - } + private class KStreamPeekProcessor extends AbstractProcessor { + ForeachAction action = printAction(context(), printStream, keySerde, valueSerde, streamName); @Override - public void init(ProcessorContext context) { - this.processorContext = context; - - if (this.keySerde == null) { - keySerde = this.processorContext.keySerde(); - } - - if (this.valueSerde == null) { - valueSerde = this.processorContext.valueSerde(); - } - } - - @Override - public void process(K key, V value) { - K keyToPrint = (K) maybeDeserialize(key, keySerde.deserializer()); - V valueToPrint = (V) maybeDeserialize(value, valueSerde.deserializer()); - - printStream.println("[" + this.streamName + "]: " + keyToPrint + " , " + valueToPrint); - - this.processorContext.forward(key, value); + public void process(final K key, final V value) { + action.apply(key, value); + context().forward(key, value); } + } - - private Object maybeDeserialize(Object receivedElement, Deserializer deserializer) { - if (receivedElement == null) { - return null; + private static ForeachAction printAction(final ProcessorContext context, final PrintStream printStream, final Serde keySerde, final Serde valueSerde, final String streamName) { + return new ForeachAction() { + @Override + public void apply(final K key, final V value) { + K keyToPrint = (K) maybeDeserialize(key, keySerde.deserializer()); + V valueToPrint = (V) maybeDeserialize(value, valueSerde.deserializer()); + printStream.println("[" + streamName + "]: " + keyToPrint + " , " + valueToPrint); } - if (receivedElement instanceof byte[]) { - return deserializer.deserialize(this.processorContext.topic(), (byte[]) receivedElement); - } + private Object maybeDeserialize(Object receivedElement, Deserializer deserializer) { + if (receivedElement == null) { + return null; + } - return receivedElement; - } + if (receivedElement instanceof byte[]) { + return deserializer.deserialize(context.topic(), (byte[]) receivedElement); + } - @Override - public void close() { - if (this.printStream == System.out) { - this.printStream.flush(); - } else { - this.printStream.close(); + return receivedElement; } - } + }; } } ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: users@infra.apache.org > Exploit #peek to implement #print() and other methods > ----------------------------------------------------- > > Key: KAFKA-4772 > URL: https://issues.apache.org/jira/browse/KAFKA-4772 > Project: Kafka > Issue Type: Improvement > Components: streams > Reporter: Matthias J. Sax > Assignee: james chien > Priority: Minor > Labels: beginner, newbie > Fix For: 0.11.0.0 > > > From: https://github.com/apache/kafka/pull/2493#pullrequestreview-22157555 > Things that I can think of: > - print / writeAsTest can be a special impl of peek; KStreamPrint etc can be removed. > - consider collapse KStreamPeek with KStreamForeach with a flag parameter indicating if the acted key-value pair should still be forwarded. -- This message was sent by Atlassian JIRA (v7.6.3#76005)