kafka-jira mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (KAFKA-6454) Allow timestamp manipulation in Processor API
Date Fri, 16 Mar 2018 23:03:01 GMT

    [ https://issues.apache.org/jira/browse/KAFKA-6454?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16403086#comment-16403086 ] 

ASF GitHub Bot commented on KAFKA-6454:
---------------------------------------

mjsax closed pull request #4519: KAFKA-6454: Allow timestamp manipulation in Processor API
URL: https://github.com/apache/kafka/pull/4519
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/streams/core-concepts.html b/docs/streams/core-concepts.html
index 2f22be784a0..889fe06c45f 100644
--- a/docs/streams/core-concepts.html
+++ b/docs/streams/core-concepts.html
@@ -127,6 +127,10 @@ <h3><a id="streams_time" href="#streams_time">Time</a></h3>
         <li> For aggregations, the timestamp of a resulting aggregate update record will be that of the latest arrived input record that triggered the update.</li>
     </ul>
 
+    <p>
+	Note, that the describe default behavior can be changed in the Processor API by assigning timestamps to output records explicitly when calling <code>#forward()</code>.
+    </p>
+
     <h3><a id="streams_state" href="#streams_state">States</a></h3>
 
     <p>
diff --git a/docs/streams/developer-guide/processor-api.html b/docs/streams/developer-guide/processor-api.html
index fdf6c86b4cd..b51bc22cfe2 100644
--- a/docs/streams/developer-guide/processor-api.html
+++ b/docs/streams/developer-guide/processor-api.html
@@ -77,6 +77,10 @@ <h2><a class="toc-backref" href="#id1">Overview</a><a class="headerlink" href="#
                 its corresponding message offset, and further such information. You can also use this context instance to schedule a punctuation
                 function (via <code class="docutils literal"><span class="pre">ProcessorContext#schedule()</span></code>), to forward a new record as a key-value pair to the downstream processors (via <code class="docutils literal"><span class="pre">ProcessorContext#forward()</span></code>),
                 and to commit the current processing progress (via <code class="docutils literal"><span class="pre">ProcessorContext#commit()</span></code>).</p>
+            <p>When records are forwarded via downstream processors they also get a timestamp assigned. There are two different default behaviors:
+	       (1) If <code class="docutils literal"><span class="pre">#forward()</span></code> is called within <code class="docutils literal"><span class="pre">#process()</span></code> the output record inherits the input record timestamp.
+	       (2) If <code class="docutils literal"><span class="pre">#forward()</span></code> is called within <code class="docutils literal"><span class="pre">punctuate()</span></code></p> the output record inherits the current punctuation timestamp (either current 'stream time' or system wall-clock time).
+	       Note, that <code class="docutils literal"><span class="pre">#forward()</span></code> also allows to change the default behavior by passing a custom timestamp for the output record.</p>
             <p>Specifically, <code class="docutils literal"><span class="pre">ProcessorContext#schedule()</span></code> accepts a user <code class="docutils literal"><span class="pre">Punctuator</span></code> callback interface, which triggers its <code class="docutils literal"><span class="pre">punctuate()</span></code>
                 API method periodically based on the <code class="docutils literal"><span class="pre">PunctuationType</span></code>. The <code class="docutils literal"><span class="pre">PunctuationType</span></code> determines what notion of time is used
                 for the punctuation scheduling: either <a class="reference internal" href="../concepts.html#streams-concepts-time"><span class="std std-ref">stream-time</span></a> or wall-clock-time (by default, stream-time
diff --git a/docs/streams/upgrade-guide.html b/docs/streams/upgrade-guide.html
index 46be969815a..baf9633a0c3 100644
--- a/docs/streams/upgrade-guide.html
+++ b/docs/streams/upgrade-guide.html
@@ -85,6 +85,14 @@ <h3><a id="streams_api_changes_120" href="#streams_api_changes_120">Streams API
         In addition, in <code>StreamsConfig</code> we have also added <code>default.windowed.key.serde.inner</code> and <code>default.windowed.value.serde.inner</code>
         to let users specify inner serdes if the default serde classes are windowed serdes.
         For more details, see <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-265%3A+Make+Windowed+Serde+to+public+APIs">KIP-265</a>.
+    /<p>
+    
+    <p>
+      Kafka 1.2.0 allows to manipulate timestamps of output records using the Processor API (<a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-251%3A+Allow+timestamp+manipulation+in+Processor+API">KIP-251</a>).
+      To enable this new feature, <code>ProcessorContext#forward(...)</code> was modified.
+      The two existing overloads <code>#forward(Object key, Object value, String childName)</code> and <code>#forward(Object key, Object value, int childIndex)</code> were deprecated and a new overload <code>#forward(Object key, Object value, To to)</code> was added.
+      The new class <code>To</code> allows you to send records to all or specific downstream processors by name and to set the timestamp for the output record.
+      Forwarding based on child index is not supported in the new API any longer.
     </p>
 
     <h3><a id="streams_api_changes_110" href="#streams_api_changes_110">Streams API changes in 1.1.0</a></h3>
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Transformer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Transformer.java
index 308fcadf6d4..a83b4a3425b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Transformer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Transformer.java
@@ -22,6 +22,7 @@
 import org.apache.kafka.streams.processor.Punctuator;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TimestampExtractor;
+import org.apache.kafka.streams.processor.To;
 
 /**
  * The {@code Transformer} interface is for stateful mapping of an input record to zero, one, or multiple new output
@@ -69,9 +70,8 @@
      * attached} to this operator can be accessed and modified
      * arbitrarily (cf. {@link ProcessorContext#getStateStore(String)}).
      * <p>
-     * If more than one output record should be forwarded downstream {@link ProcessorContext#forward(Object, Object)},
-     * {@link ProcessorContext#forward(Object, Object, int)}, and
-     * {@link ProcessorContext#forward(Object, Object, String)} can be used.
+     * If more than one output record should be forwarded downstream {@link ProcessorContext#forward(Object, Object)}
+     * and {@link ProcessorContext#forward(Object, Object, To)} can be used.
      * If not record should be forwarded downstream, {@code transform} can return {@code null}.
      *
      * @param key the key for the record
@@ -86,9 +86,8 @@
      * {@link ProcessorContext#schedule(long) schedules itself} with the context during
      * {@link #init(ProcessorContext) initialization}.
      * <p>
-     * To generate new {@link KeyValue} pairs {@link ProcessorContext#forward(Object, Object)},
-     * {@link ProcessorContext#forward(Object, Object, int)}, and
-     * {@link ProcessorContext#forward(Object, Object, String)} can be used.
+     * To generate new {@link KeyValue} pairs {@link ProcessorContext#forward(Object, Object)} and
+     * {@link ProcessorContext#forward(Object, Object, To)} can be used.
      * <p>
      * Note that {@code punctuate} is called based on <it>stream time</it> (i.e., time progresses with regard to
      * timestamps return by the used {@link TimestampExtractor})
@@ -105,9 +104,8 @@
     /**
      * Close this processor and clean up any resources.
      * <p>
-     * To generate new {@link KeyValue} pairs {@link ProcessorContext#forward(Object, Object)},
-     * {@link ProcessorContext#forward(Object, Object, int)}, and
-     * {@link ProcessorContext#forward(Object, Object, String)} can be used.
+     * To generate new {@link KeyValue} pairs {@link ProcessorContext#forward(Object, Object)} and
+     * {@link ProcessorContext#forward(Object, Object, To)} can be used.
      */
     void close();
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java
index 1802a6151d2..1da779ea963 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java
@@ -23,6 +23,7 @@
 import org.apache.kafka.streams.processor.Punctuator;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TimestampExtractor;
+import org.apache.kafka.streams.processor.To;
 
 /**
  * The {@code ValueTransformer} interface for stateful mapping of a value to a new value (with possible new type).
@@ -58,9 +59,8 @@
      * Note that {@link ProcessorContext} is updated in the background with the current record's meta data.
      * Thus, it only contains valid record meta data when accessed within {@link #transform(Object)}.
      * <p>
-     * Note that using {@link ProcessorContext#forward(Object, Object)},
-     * {@link ProcessorContext#forward(Object, Object, int)}, or
-     * {@link ProcessorContext#forward(Object, Object, String)} is not allowed within any method of
+     * Note that using {@link ProcessorContext#forward(Object, Object)} or
+     * {@link ProcessorContext#forward(Object, Object, To)} is not allowed within any method of
      * {@code ValueTransformer} and will result in an {@link StreamsException exception}.
      *
      * @param context the context
@@ -75,9 +75,8 @@
      * attached} to this operator can be accessed and modified arbitrarily (cf.
      * {@link ProcessorContext#getStateStore(String)}).
      * <p>
-     * Note, that using {@link ProcessorContext#forward(Object, Object)},
-     * {@link ProcessorContext#forward(Object, Object, int)}, and
-     * {@link ProcessorContext#forward(Object, Object, String)} is not allowed within {@code transform} and
+     * Note, that using {@link ProcessorContext#forward(Object, Object)} or
+     * {@link ProcessorContext#forward(Object, Object, To)} is not allowed within {@code transform} and
      * will result in an {@link StreamsException exception}.
      *
      * @param value the value to be transformed
@@ -90,9 +89,8 @@
      * the context during {@link #init(ProcessorContext) initialization}.
      * <p>
      * It is not possible to return any new output records within {@code punctuate}.
-     * Using {@link ProcessorContext#forward(Object, Object)}, {@link ProcessorContext#forward(Object, Object, int)},
-     * or {@link ProcessorContext#forward(Object, Object, String)} will result in an
-     * {@link StreamsException exception}.
+     * Using {@link ProcessorContext#forward(Object, Object)} or {@link ProcessorContext#forward(Object, Object, To)}
+     * will result in an {@link StreamsException exception}.
      * Furthermore, {@code punctuate} must return {@code null}.
      * <p>
      * Note, that {@code punctuate} is called base on <it>stream time</it> (i.e., time progress with regard to
@@ -111,8 +109,8 @@
      * Close this processor and clean up any resources.
      * <p>
      * It is not possible to return any new output records within {@code close()}.
-     * Using {@link ProcessorContext#forward(Object, Object)}, {@link ProcessorContext#forward(Object, Object, int)},
-     * or {@link ProcessorContext#forward(Object, Object, String)} will result in an {@link StreamsException exception}.
+     * Using {@link ProcessorContext#forward(Object, Object)} or {@link ProcessorContext#forward(Object, Object, To)}
+     * will result in an {@link StreamsException exception}.
      */
     void close();
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerWithKey.java b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerWithKey.java
index 128c61f55dd..7f399b595b9 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerWithKey.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerWithKey.java
@@ -22,6 +22,7 @@
 import org.apache.kafka.streams.processor.PunctuationType;
 import org.apache.kafka.streams.processor.Punctuator;
 import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.To;
 
 /**
  * The {@code ValueTransformerWithKey} interface for stateful mapping of a value to a new value (with possible new type).
@@ -62,9 +63,8 @@
      * Note that {@link ProcessorContext} is updated in the background with the current record's meta data.
      * Thus, it only contains valid record meta data when accessed within {@link #transform(Object, Object)}.
      * <p>
-     * Note that using {@link ProcessorContext#forward(Object, Object)},
-     * {@link ProcessorContext#forward(Object, Object, int)}, or
-     * {@link ProcessorContext#forward(Object, Object, String)} is not allowed within any method of
+     * Note that using {@link ProcessorContext#forward(Object, Object)} or
+     * {@link ProcessorContext#forward(Object, Object, To)} is not allowed within any method of
      * {@code ValueTransformerWithKey} and will result in an {@link StreamsException exception}.
      *
      * @param context the context
@@ -79,9 +79,8 @@
      * attached} to this operator can be accessed and modified arbitrarily (cf.
      * {@link ProcessorContext#getStateStore(String)}).
      * <p>
-     * Note, that using {@link ProcessorContext#forward(Object, Object)},
-     * {@link ProcessorContext#forward(Object, Object, int)}, and
-     * {@link ProcessorContext#forward(Object, Object, String)} is not allowed within {@code transform} and
+     * Note, that using {@link ProcessorContext#forward(Object, Object)} or
+     * {@link ProcessorContext#forward(Object, Object, To)} is not allowed within {@code transform} and
      * will result in an {@link StreamsException exception}.
      *
      * @param readOnlyKey the read-only key
@@ -94,8 +93,8 @@
      * Close this processor and clean up any resources.
      * <p>
      * It is not possible to return any new output records within {@code close()}.
-     * Using {@link ProcessorContext#forward(Object, Object)}, {@link ProcessorContext#forward(Object, Object, int)},
-     * or {@link ProcessorContext#forward(Object, Object, String)} will result in an {@link StreamsException exception}.
+     * Using {@link ProcessorContext#forward(Object, Object)} or {@link ProcessorContext#forward(Object, Object, To)},
+     * will result in an {@link StreamsException exception}.
      */
     void close();
 }
\ No newline at end of file
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamBranch.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamBranch.java
index 317c5bf9e7a..baa9b63f6ce 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamBranch.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamBranch.java
@@ -16,18 +16,21 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
+import org.apache.kafka.streams.kstream.Predicate;
 import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
-import org.apache.kafka.streams.kstream.Predicate;
+import org.apache.kafka.streams.processor.To;
 
 class KStreamBranch<K, V> implements ProcessorSupplier<K, V> {
 
     private final Predicate<K, V>[] predicates;
+    private final String[] childNodes;
 
-    @SuppressWarnings("unchecked")
-    public KStreamBranch(Predicate<K, V> ... predicates) {
+    KStreamBranch(final Predicate<K, V>[] predicates,
+                  final String[] childNodes) {
         this.predicates = predicates;
+        this.childNodes = childNodes;
     }
 
     @Override
@@ -37,12 +40,12 @@ public KStreamBranch(Predicate<K, V> ... predicates) {
 
     private class KStreamBranchProcessor extends AbstractProcessor<K, V> {
         @Override
-        public void process(K key, V value) {
+        public void process(final K key, final V value) {
             for (int i = 0; i < predicates.length; i++) {
                 if (predicates[i].test(key, value)) {
-                    // use forward with childIndex here and then break the loop
+                    // use forward with child here and then break the loop
                     // so that no record is going to be piped to multiple streams
-                    context().forward(key, value, i);
+                    context().forward(key, value, To.child(childNodes[i]));
                     break;
                 }
             }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index 07bc67d952c..349be867ed1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -358,17 +358,20 @@ public void writeAsText(final String filePath,
         for (final Predicate<? super K, ? super V> predicate : predicates) {
             Objects.requireNonNull(predicate, "predicates can't have null values");
         }
-        String branchName = builder.newProcessorName(BRANCH_NAME);
 
-        builder.internalTopologyBuilder.addProcessor(branchName, new KStreamBranch(predicates.clone()), this.name);
+        String branchName = builder.newProcessorName(BRANCH_NAME);
 
-        KStream<K, V>[] branchChildren = (KStream<K, V>[]) Array.newInstance(KStream.class, predicates.length);
+        String[] childNames = new String[predicates.length];
         for (int i = 0; i < predicates.length; i++) {
-            String childName = builder.newProcessorName(BRANCHCHILD_NAME);
+            childNames[i] = builder.newProcessorName(BRANCHCHILD_NAME);
+        }
 
-            builder.internalTopologyBuilder.addProcessor(childName, new KStreamPassThrough<K, V>(), branchName);
+        builder.internalTopologyBuilder.addProcessor(branchName, new KStreamBranch(predicates.clone(), childNames), this.name);
 
-            branchChildren[i] = new KStreamImpl<>(builder, childName, sourceNodes, this.repartitionRequired);
+        KStream<K, V>[] branchChildren = (KStream<K, V>[]) Array.newInstance(KStream.class, predicates.length);
+        for (int i = 0; i < predicates.length; i++) {
+            builder.internalTopologyBuilder.addProcessor(childNames[i], new KStreamPassThrough<K, V>(), branchName);
+            branchChildren[i] = new KStreamImpl<>(builder, childNames[i], sourceNodes, this.repartitionRequired);
         }
 
         return branchChildren;
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java
index ace4f698bfe..e6445978438 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java
@@ -28,6 +28,7 @@
 import org.apache.kafka.streams.processor.StateRestoreCallback;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.To;
 
 import java.io.File;
 import java.util.Map;
@@ -116,11 +117,18 @@ public void schedule(final long interval) {
                         throw new StreamsException("ProcessorContext#forward() must not be called within TransformValues.");
                     }
 
+                    @Override
+                    public <K, V> void forward(final K key, final V value, final To to) {
+                        throw new StreamsException("ProcessorContext#forward() must not be called within TransformValues.");
+                    }
+
+                    @SuppressWarnings("deprecation")
                     @Override
                     public <K, V> void forward(final K key, final V value, final int childIndex) {
                         throw new StreamsException("ProcessorContext#forward() must not be called within TransformValues.");
                     }
 
+                    @SuppressWarnings("deprecation")
                     @Override
                     public <K, V> void forward(final K key, final V value, final String childName) {
                         throw new StreamsException("ProcessorContext#forward() must not be called within TransformValues.");
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/AbstractProcessor.java b/streams/src/main/java/org/apache/kafka/streams/processor/AbstractProcessor.java
index 1cfe78a1e40..14e6c2aeb9e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/AbstractProcessor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/AbstractProcessor.java
@@ -31,7 +31,7 @@ protected AbstractProcessor() {
     }
 
     @Override
-    public void init(ProcessorContext context) {
+    public void init(final ProcessorContext context) {
         this.context = context;
     }
 
@@ -46,7 +46,7 @@ public void init(ProcessorContext context) {
      */
     @SuppressWarnings("deprecation")
     @Override
-    public void punctuate(long timestamp) {
+    public void punctuate(final long timestamp) {
         // do nothing
     }
 
@@ -67,6 +67,6 @@ public void close() {
      * @return the processor context; null only when called prior to {@link #init(ProcessorContext) initialization}.
      */
     protected final ProcessorContext context() {
-        return this.context;
+        return context;
     }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
index 42902a866b1..404b2258258 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
@@ -83,7 +83,9 @@
      * @throws IllegalStateException If store gets registered after initialized is already finished
      * @throws StreamsException if the store's change log does not contain the partition
      */
-    void register(StateStore store, boolean loggingEnabledIsDeprecatedAndIgnored, StateRestoreCallback stateRestoreCallback);
+    void register(final StateStore store,
+                  final boolean loggingEnabledIsDeprecatedAndIgnored,
+                  final StateRestoreCallback stateRestoreCallback);
 
     /**
      * Get the state store given the store name.
@@ -91,7 +93,7 @@
      * @param name The store name
      * @return The state store instance
      */
-    StateStore getStateStore(String name);
+    StateStore getStateStore(final String name);
 
     /**
      * Schedules a periodic operation for processors. A processor may call this method during
@@ -125,7 +127,9 @@
      * @param callback a function consuming timestamps representing the current stream or system time
      * @return a handle allowing cancellation of the punctuation schedule established by this method
      */
-    Cancellable schedule(long intervalMs, PunctuationType type, Punctuator callback);
+    Cancellable schedule(final long intervalMs,
+                         final PunctuationType type,
+                         final Punctuator callback);
 
     /**
      * Schedules a periodic operation for processors. A processor may call this method during
@@ -137,30 +141,47 @@
      * @param interval the time interval between punctuations
      */
     @Deprecated
-    void schedule(long interval);
+    void schedule(final long interval);
 
     /**
-     * Forwards a key/value pair to the downstream processors
+     * Forwards a key/value pair to all downstream processors.
+     * Used the input record's timestamp as timestamp for the output record.
+     *
      * @param key key
      * @param value value
      */
-    <K, V> void forward(K key, V value);
+    <K, V> void forward(final K key, final V value);
+
+    /**
+     * Forwards a key/value pair to the specified downstream processors.
+     * Can be used to set the timestamp of the output record.
+     *
+     * @param key key
+     * @param value value
+     * @param to the options to use when forwarding
+     */
+    <K, V> void forward(final K key, final V value, final To to);
 
     /**
      * Forwards a key/value pair to one of the downstream processors designated by childIndex
      * @param key key
      * @param value value
      * @param childIndex index in list of children of this node
+     * @deprecated please use {@link #forward(Object, Object, To)} instead
      */
-    <K, V> void forward(K key, V value, int childIndex);
+    // TODO when we remove this method, we can also remove `ProcessorNode#children`
+    @Deprecated
+    <K, V> void forward(final K key, final V value, final int childIndex);
 
     /**
      * Forwards a key/value pair to one of the downstream processors designated by the downstream processor name
      * @param key key
      * @param value value
      * @param childName name of downstream processor
+     * @deprecated please use {@link #forward(Object, Object, To)} instead
      */
-    <K, V> void forward(K key, V value, String childName);
+    @Deprecated
+    <K, V> void forward(final K key, final V value, final String childName);
 
     /**
      * Requests a commit
@@ -231,6 +252,6 @@
      * @return the key/values matching the given prefix from the StreamsConfig properties.
      *
      */
-    Map<String, Object> appConfigsWithPrefix(String prefix);
+    Map<String, Object> appConfigsWithPrefix(final String prefix);
 
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/To.java b/streams/src/main/java/org/apache/kafka/streams/processor/To.java
new file mode 100644
index 00000000000..52007dfb4dc
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/To.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor;
+
+/**
+ * This class is used to provide the optional parameters when sending output records to downstream processor
+ * using {@link ProcessorContext#forward(Object, Object, To)}.
+ */
+public class To {
+    protected String childName;
+    protected long timestamp;
+
+    private To(final String childName,
+               final long timestamp) {
+        this.childName = childName;
+        this.timestamp = timestamp;
+    }
+
+    protected To(final To to) {
+        this(to.childName, to.timestamp);
+    }
+
+    protected void update(final To to) {
+        childName = to.childName;
+        timestamp = to.timestamp;
+    }
+
+    /**
+     * Forward the key/value pair to one of the downstream processors designated by the downstream processor name.
+     * @param childName name of downstream processor
+     * @return a new {@link To} instance configured with {@code childName}
+     */
+    public static To child(final String childName) {
+        return new To(childName, -1);
+    }
+
+    /**
+     * Forward the key/value pair to all downstream processors
+     * @return a new {@link To} instance configured for all downstream processor
+     */
+    public static To all() {
+        return new To((String) null, -1);
+    }
+
+    /**
+     * Set the timestamp of the output record.
+     * @param timestamp the output record timestamp
+     * @return itself (i.e., {@code this})
+     */
+    public To withTimestamp(final long timestamp) {
+        this.timestamp = timestamp;
+        return this;
+    }
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
index e9b5a4c1ec4..87408c61411 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
@@ -26,7 +26,6 @@
 
 import java.io.File;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 
@@ -164,20 +163,6 @@ public long timestamp() {
         return combined;
     }
 
-    @SuppressWarnings("unchecked")
-    @Override
-    public <K, V> void forward(final K key, final V value) {
-        final ProcessorNode previousNode = currentNode();
-        try {
-            for (final ProcessorNode child : (List<ProcessorNode>) currentNode().children()) {
-                setCurrentNode(child);
-                child.process(key, value);
-            }
-        } finally {
-            setCurrentNode(previousNode);
-        }
-    }
-
     @Override
     public Map<String, Object> appConfigsWithPrefix(final String prefix) {
         return config.originalsWithPrefix(prefix);
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java
index 37e7cb55c22..88d9f56f0e1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java
@@ -23,8 +23,11 @@
 import org.apache.kafka.streams.processor.Punctuator;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.To;
 import org.apache.kafka.streams.state.internals.ThreadCache;
 
+import java.util.List;
+
 public class GlobalProcessorContextImpl extends AbstractProcessorContext {
 
 
@@ -40,20 +43,43 @@ public StateStore getStateStore(final String name) {
         return stateManager.getGlobalStore(name);
     }
 
+    @SuppressWarnings("unchecked")
+    @Override
+    public <K, V> void forward(final K key, final V value) {
+        final ProcessorNode previousNode = currentNode();
+        try {
+            for (final ProcessorNode child : (List<ProcessorNode<K, V>>) currentNode().children()) {
+                setCurrentNode(child);
+                child.process(key, value);
+            }
+        } finally {
+            setCurrentNode(previousNode);
+        }
+    }
+
     /**
      * @throws UnsupportedOperationException on every invocation
      */
     @Override
-    public <K, V> void forward(K key, V value, int childIndex) {
+    public <K, V> void forward(final K key, final V value, final To to) {
         throw new UnsupportedOperationException("this should not happen: forward() not supported in global processor context.");
     }
 
+    /**
+     * @throws UnsupportedOperationException on every invocation
+     */
+    @SuppressWarnings("deprecation")
+    @Override
+    public <K, V> void forward(final K key, final V value, final int childIndex) {
+        throw new UnsupportedOperationException("this should not happen: forward() not supported in global processor context.");
+    }
 
     /**
      * @throws UnsupportedOperationException on every invocation
      */
+    @SuppressWarnings("deprecation")
     @Override
-    public <K, V> void forward(K key, V value, String childName) {
+    public <K, V> void forward(final K key, final V value, final String childName) {
         throw new UnsupportedOperationException("this should not happen: forward() not supported in global processor context.");
     }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
index 42d3d70e396..3761bfb0ee6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
@@ -18,12 +18,13 @@
 
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.StreamsMetrics;
+import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.processor.Cancellable;
 import org.apache.kafka.streams.processor.PunctuationType;
 import org.apache.kafka.streams.processor.Punctuator;
 import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.To;
 import org.apache.kafka.streams.state.internals.ThreadCache;
 
 import java.util.List;
@@ -32,6 +33,8 @@
 
     private final StreamTask task;
     private final RecordCollector collector;
+    private final ToInternal toInternal = new ToInternal();
+    private final static To SEND_TO_ALL = To.all();
 
     ProcessorContextImpl(final TaskId id,
                          final StreamTask task,
@@ -77,32 +80,60 @@ public StateStore getStateStore(final String name) {
 
     @SuppressWarnings("unchecked")
     @Override
+    public <K, V> void forward(final K key, final V value) {
+        forward(key, value, SEND_TO_ALL);
+    }
+
+    @SuppressWarnings({"unchecked", "deprecation"})
+    @Override
     public <K, V> void forward(final K key, final V value, final int childIndex) {
+        forward(key, value, To.child(((List<ProcessorNode>) currentNode().children()).get(childIndex).name()));
+    }
+
+    @SuppressWarnings({"unchecked", "deprecation"})
+    @Override
+    public <K, V> void forward(final K key, final V value, final String childName) {
+        forward(key, value, To.child(childName));
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public <K, V> void forward(final K key, final V value, final To to) {
+        toInternal.update(to);
+        if (toInternal.hasTimestamp()) {
+            recordContext.setTimestamp(toInternal.timestamp());
+        }
         final ProcessorNode previousNode = currentNode();
-        final ProcessorNode child = (ProcessorNode<K, V>) currentNode().children().get(childIndex);
-        setCurrentNode(child);
         try {
-            child.process(key, value);
+            final List<ProcessorNode<K, V>> children = (List<ProcessorNode<K, V>>) currentNode().children();
+            final String sendTo = toInternal.child();
+            if (sendTo != null) {
+                final ProcessorNode child = currentNode().getChild(sendTo);
+                if (child == null) {
+                    throw new StreamsException("Unknown processor name: " + sendTo);
+                }
+                forward(child, key, value);
+            } else {
+                if (children.size() == 1) {
+                    final ProcessorNode child = children.get(0);
+                    forward(child, key, value);
+                } else {
+                    for (final ProcessorNode child : children) {
+                        forward(child, key, value);
+                    }
+                }
+            }
         } finally {
             setCurrentNode(previousNode);
         }
     }
 
     @SuppressWarnings("unchecked")
-    @Override
-    public <K, V> void forward(final K key, final V value, final String childName) {
-        for (ProcessorNode child : (List<ProcessorNode<K, V>>) currentNode().children()) {
-            if (child.name().equals(childName)) {
-                ProcessorNode previousNode = currentNode();
-                setCurrentNode(child);
-                try {
-                    child.process(key, value);
-                    return;
-                } finally {
-                    setCurrentNode(previousNode);
-                }
-            }
-        }
+    private <K, V> void forward(final ProcessorNode child,
+                                final K key,
+                                final V value) {
+        setCurrentNode(child);
+        child.process(key, value);
     }
 
     @Override
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
index 29f442f4658..94e86407697 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
@@ -26,12 +26,16 @@
 import org.apache.kafka.streams.processor.Punctuator;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
 public class ProcessorNode<K, V> {
 
+    // TODO: 'children' can be removed when #forward() via index is removed
     private final List<ProcessorNode<?, ?>> children;
+    private final Map<String, ProcessorNode<?, ?>> childByName;
 
     private final String name;
     private final Processor<K, V> processor;
@@ -75,6 +79,7 @@ public ProcessorNode(String name, Processor<K, V> processor, Set<String> stateSt
         this.name = name;
         this.processor = processor;
         this.children = new ArrayList<>();
+        this.childByName = new HashMap<>();
         this.stateStores = stateStores;
         this.time = new SystemTime();
     }
@@ -92,11 +97,15 @@ public final String name() {
         return children;
     }
 
+    public final ProcessorNode getChild(final String childName) {
+        return childByName.get(childName);
+    }
+
     public void addChild(ProcessorNode<?, ?> child) {
         children.add(child);
+        childByName.put(child.name, child);
     }
 
-
     public void init(ProcessorContext context) {
         this.context = context;
         try {
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java
index aa2010353eb..92acfc9a50c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java
@@ -20,7 +20,7 @@
 
 public class ProcessorRecordContext implements RecordContext {
 
-    private final long timestamp;
+    private long timestamp;
     private final long offset;
     private final String topic;
     private final int partition;
@@ -44,6 +44,10 @@ public long timestamp() {
         return timestamp;
     }
 
+    public void setTimestamp(final long timestamp) {
+        this.timestamp = timestamp;
+    }
+
     @Override
     public String topic() {
         return topic;
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordContext.java
index dc752cb2d9e..dd58f4c02a8 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordContext.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordContext.java
@@ -33,6 +33,11 @@
      */
     long timestamp();
 
+    /**
+     * Sets a new timestamp for the output record.
+     */
+    void setTimestamp(final long timestamp);
+
     /**
      * @return The topic the record was received on
      */
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
index e38b821a2ed..360c4ab243b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
@@ -27,6 +27,7 @@
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.StreamPartitioner;
 import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.To;
 import org.apache.kafka.streams.state.internals.ThreadCache;
 
 import java.util.Collections;
@@ -142,6 +143,15 @@ public long timestamp() {
      * @throws UnsupportedOperationException on every invocation
      */
     @Override
+    public <K, V> void forward(final K key, final V value, final To to) {
+        throw new UnsupportedOperationException("this should not happen: forward() not supported in standby tasks.");
+    }
+
+    /**
+     * @throws UnsupportedOperationException on every invocation
+     */
+    @SuppressWarnings("deprecation")
+    @Override
     public <K, V> void forward(final K key, final V value, final int childIndex) {
         throw new UnsupportedOperationException("this should not happen: forward() not supported in standby tasks.");
     }
@@ -149,6 +159,7 @@ public long timestamp() {
     /**
      * @throws UnsupportedOperationException on every invocation
      */
+    @SuppressWarnings("deprecation")
     @Override
     public <K, V> void forward(final K key, final V value, final String childName) {
         throw new UnsupportedOperationException("this should not happen: forward() not supported in standby tasks.");
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ToInternal.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ToInternal.java
new file mode 100644
index 00000000000..6c5798e7605
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ToInternal.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.streams.processor.To;
+
+public class ToInternal extends To {
+    public ToInternal() {
+        super(To.all());
+    }
+
+    public void update(final To to) {
+        super.update(to);
+    }
+
+    public boolean hasTimestamp() {
+        return timestamp != -1;
+    }
+
+    public long timestamp() {
+        return timestamp;
+    }
+
+    public String child() {
+        return childName;
+    }
+}
\ No newline at end of file
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/LRUCacheEntry.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/LRUCacheEntry.java
index dedb906f097..af7059b30e3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/LRUCacheEntry.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/LRUCacheEntry.java
@@ -27,7 +27,7 @@
     private final long offset;
     private final String topic;
     private final int partition;
-    private final long timestamp;
+    private long timestamp;
 
     private long sizeBytes;
     private boolean isDirty;
@@ -63,6 +63,11 @@ public long timestamp() {
         return timestamp;
     }
 
+    @Override
+    public void setTimestamp(final long timestamp) {
+        throw new UnsupportedOperationException();
+    }
+
     @Override
     public String topic() {
         return topic;
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java
index 1b34fabb12a..dc0b886a93c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java
@@ -23,18 +23,19 @@
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.ValueTransformer;
-import org.apache.kafka.streams.kstream.ValueTransformerWithKey;
 import org.apache.kafka.streams.kstream.ValueTransformerSupplier;
+import org.apache.kafka.streams.kstream.ValueTransformerWithKey;
 import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.To;
 import org.apache.kafka.test.KStreamTestDriver;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.junit.Rule;
 import org.junit.Test;
 
-import static org.junit.Assert.fail;
 import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.fail;
 
 public class KStreamTransformValuesTest {
 
@@ -191,6 +192,13 @@ public void close() {
             // expected
         }
 
+        try {
+            transformValueProcessor.process(null, 3);
+            fail("should not allow call to context.forward() within ValueTransformer");
+        } catch (final StreamsException e) {
+            // expected
+        }
+
         try {
             transformValueProcessor.punctuate(0);
             fail("should not allow ValueTransformer#puntuate() to return not-null value");
@@ -213,11 +221,14 @@ public Integer transform(final Integer key, final Integer value) {
                 context.forward(null, null);
             }
             if (value == 1) {
-                context.forward(null, null, null);
+                context.forward(null, null, (String) null);
             }
             if (value == 2) {
                 context.forward(null, null, 0);
             }
+            if (value == 3) {
+                context.forward(null, null, To.all());
+            }
             throw new RuntimeException("Should never happen in this test");
         }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java
index 46c23c63387..aac275d47ec 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java
@@ -24,6 +24,7 @@
 import org.apache.kafka.streams.processor.Punctuator;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.To;
 import org.apache.kafka.streams.state.RocksDBConfigSetter;
 import org.apache.kafka.streams.state.internals.ThreadCache;
 import org.apache.kafka.test.MockStateStore;
@@ -176,28 +177,21 @@ public Cancellable schedule(long interval, PunctuationType type, Punctuator call
         }
 
         @Override
-        public void schedule(final long interval) {
-
-        }
+        public void schedule(final long interval) {}
 
         @Override
-        public <K, V> void forward(final K key, final V value) {
-
-        }
+        public <K, V> void forward(final K key, final V value) {}
 
         @Override
-        public <K, V> void forward(final K key, final V value, final int childIndex) {
-
-        }
+        public <K, V> void forward(final K key, final V value, final To to) {}
 
         @Override
-        public <K, V> void forward(final K key, final V value, final String childName) {
-
-        }
+        public <K, V> void forward(final K key, final V value, final int childIndex) {}
 
         @Override
-        public void commit() {
+        public <K, V> void forward(final K key, final V value, final String childName) {}
 
-        }
+        @Override
+        public void commit() {}
     }
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
index da2e5dc12c5..d07274a32a9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
@@ -31,6 +31,7 @@
 import org.apache.kafka.streams.processor.StateStoreSupplier;
 import org.apache.kafka.streams.processor.StreamPartitioner;
 import org.apache.kafka.streams.processor.TimestampExtractor;
+import org.apache.kafka.streams.processor.To;
 import org.apache.kafka.streams.processor.TopologyBuilder;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
@@ -72,8 +73,8 @@
     @Before
     public void setup() {
         // Create a new directory in which we'll put all of the state for this test, enabling running tests in parallel ...
-        File localState = TestUtils.tempDirectory();
-        Properties props = new Properties();
+        final File localState = TestUtils.tempDirectory();
+        final Properties props = new Properties();
         props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "processor-topology-test");
         props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
         props.setProperty(StreamsConfig.STATE_DIR_CONFIG, localState.getAbsolutePath());
@@ -120,8 +121,8 @@ public void testTopologyMetadata() {
     }
 
     @Test
-    public void testDrivingSimpleTopology() throws Exception {
-        int partition = 10;
+    public void testDrivingSimpleTopology() {
+        final int partition = 10;
         driver = new ProcessorTopologyTestDriver(config, createSimpleTopology(partition).internalTopologyBuilder);
         driver.process(INPUT_TOPIC_1, "key1", "value1", STRING_SERIALIZER, STRING_SERIALIZER);
         assertNextOutputRecord(OUTPUT_TOPIC_1, "key1", "value1", partition);
@@ -142,7 +143,7 @@ public void testDrivingSimpleTopology() throws Exception {
 
 
     @Test
-    public void testDrivingMultiplexingTopology() throws Exception {
+    public void testDrivingMultiplexingTopology() {
         driver = new ProcessorTopologyTestDriver(config, createMultiplexingTopology().internalTopologyBuilder);
         driver.process(INPUT_TOPIC_1, "key1", "value1", STRING_SERIALIZER, STRING_SERIALIZER);
         assertNextOutputRecord(OUTPUT_TOPIC_1, "key1", "value1(1)");
@@ -164,7 +165,7 @@ public void testDrivingMultiplexingTopology() throws Exception {
     }
 
     @Test
-    public void testDrivingMultiplexByNameTopology() throws Exception {
+    public void testDrivingMultiplexByNameTopology() {
         driver = new ProcessorTopologyTestDriver(config, createMultiplexByNameTopology().internalTopologyBuilder);
         driver.process(INPUT_TOPIC_1, "key1", "value1", STRING_SERIALIZER, STRING_SERIALIZER);
         assertNextOutputRecord(OUTPUT_TOPIC_1, "key1", "value1(1)");
@@ -186,8 +187,8 @@ public void testDrivingMultiplexByNameTopology() throws Exception {
     }
 
     @Test
-    public void testDrivingStatefulTopology() throws Exception {
-        String storeName = "entries";
+    public void testDrivingStatefulTopology() {
+        final String storeName = "entries";
         driver = new ProcessorTopologyTestDriver(config, createStatefulTopology(storeName).internalTopologyBuilder);
         driver.process(INPUT_TOPIC_1, "key1", "value1", STRING_SERIALIZER, STRING_SERIALIZER);
         driver.process(INPUT_TOPIC_1, "key2", "value2", STRING_SERIALIZER, STRING_SERIALIZER);
@@ -195,7 +196,7 @@ public void testDrivingStatefulTopology() throws Exception {
         driver.process(INPUT_TOPIC_1, "key1", "value4", STRING_SERIALIZER, STRING_SERIALIZER);
         assertNoOutputRecord(OUTPUT_TOPIC_1);
 
-        KeyValueStore<String, String> store = driver.getKeyValueStore("entries");
+        final KeyValueStore<String, String> store = driver.getKeyValueStore(storeName);
         assertEquals("value4", store.get("key1"));
         assertEquals("value2", store.get("key2"));
         assertEquals("value3", store.get("key3"));
@@ -205,15 +206,16 @@ public void testDrivingStatefulTopology() throws Exception {
     @SuppressWarnings("unchecked")
     @Test
     public void shouldDriveGlobalStore() {
-        final StateStoreSupplier storeSupplier = Stores.create("my-store")
+        final String storeName = "my-store";
+        final StateStoreSupplier storeSupplier = Stores.create(storeName)
                 .withStringKeys().withStringValues().inMemory().disableLogging().build();
         final String global = "global";
         final String topic = "topic";
         final TopologyBuilder topologyBuilder = this.builder
-                .addGlobalStore(storeSupplier, global, STRING_DESERIALIZER, STRING_DESERIALIZER, topic, "processor", define(new StatefulProcessor("my-store")));
+                .addGlobalStore(storeSupplier, global, STRING_DESERIALIZER, STRING_DESERIALIZER, topic, "processor", define(new StatefulProcessor(storeName)));
 
         driver = new ProcessorTopologyTestDriver(config, topologyBuilder.internalTopologyBuilder);
-        final KeyValueStore<String, String> globalStore = (KeyValueStore<String, String>) topologyBuilder.globalStateStores().get("my-store");
+        final KeyValueStore<String, String> globalStore = (KeyValueStore<String, String>) topologyBuilder.globalStateStores().get(storeName);
         driver.process(topic, "key1", "value1", STRING_SERIALIZER, STRING_SERIALIZER);
         driver.process(topic, "key2", "value2", STRING_SERIALIZER, STRING_SERIALIZER);
         assertEquals("value1", globalStore.get("key1"));
@@ -221,8 +223,8 @@ public void shouldDriveGlobalStore() {
     }
 
     @Test
-    public void testDrivingSimpleMultiSourceTopology() throws Exception {
-        int partition = 10;
+    public void testDrivingSimpleMultiSourceTopology() {
+        final int partition = 10;
         driver = new ProcessorTopologyTestDriver(config, createSimpleMultiSourceTopology(partition).internalTopologyBuilder);
 
         driver.process(INPUT_TOPIC_1, "key1", "value1", STRING_SERIALIZER, STRING_SERIALIZER);
@@ -235,7 +237,7 @@ public void testDrivingSimpleMultiSourceTopology() throws Exception {
     }
 
     @Test
-    public void testDrivingForwardToSourceTopology() throws Exception {
+    public void testDrivingForwardToSourceTopology() {
         driver = new ProcessorTopologyTestDriver(config, createForwardToSourceTopology().internalTopologyBuilder);
         driver.process(INPUT_TOPIC_1, "key1", "value1", STRING_SERIALIZER, STRING_SERIALIZER);
         driver.process(INPUT_TOPIC_1, "key2", "value2", STRING_SERIALIZER, STRING_SERIALIZER);
@@ -246,7 +248,7 @@ public void testDrivingForwardToSourceTopology() throws Exception {
     }
 
     @Test
-    public void testDrivingInternalRepartitioningTopology() throws Exception {
+    public void testDrivingInternalRepartitioningTopology() {
         driver = new ProcessorTopologyTestDriver(config, createInternalRepartitioningTopology().internalTopologyBuilder);
         driver.process(INPUT_TOPIC_1, "key1", "value1", STRING_SERIALIZER, STRING_SERIALIZER);
         driver.process(INPUT_TOPIC_1, "key2", "value2", STRING_SERIALIZER, STRING_SERIALIZER);
@@ -257,7 +259,7 @@ public void testDrivingInternalRepartitioningTopology() throws Exception {
     }
 
     @Test
-    public void testDrivingInternalRepartitioningForwardingTimestampTopology() throws Exception {
+    public void testDrivingInternalRepartitioningForwardingTimestampTopology() {
         driver = new ProcessorTopologyTestDriver(config, createInternalRepartitioningWithValueTimestampTopology().internalTopologyBuilder);
         driver.process(INPUT_TOPIC_1, "key1", "value1@1000", STRING_SERIALIZER, STRING_SERIALIZER);
         driver.process(INPUT_TOPIC_1, "key2", "value2@2000", STRING_SERIALIZER, STRING_SERIALIZER);
@@ -315,7 +317,7 @@ public void shouldRecursivelyPrintChildren() {
     }
 
     @Test
-    public void shouldConsiderTimeStamps() throws Exception {
+    public void shouldConsiderTimeStamps() {
         final int partition = 10;
         driver = new ProcessorTopologyTestDriver(config, createSimpleTopology(partition).internalTopologyBuilder);
         driver.process(INPUT_TOPIC_1, "key1", "value1", STRING_SERIALIZER, STRING_SERIALIZER, 10L);
@@ -326,6 +328,17 @@ public void shouldConsiderTimeStamps() throws Exception {
         assertNextOutputRecord(OUTPUT_TOPIC_1, "key3", "value3", partition, 30L);
     }
 
+    @Test
+    public void shouldConsiderModifiedTimeStamps() {
+        final int partition = 10;
+        driver = new ProcessorTopologyTestDriver(config, createTimestampTopology(partition).internalTopologyBuilder);
+        driver.process(INPUT_TOPIC_1, "key1", "value1", STRING_SERIALIZER, STRING_SERIALIZER, 10L);
+        driver.process(INPUT_TOPIC_1, "key2", "value2", STRING_SERIALIZER, STRING_SERIALIZER, 20L);
+        driver.process(INPUT_TOPIC_1, "key3", "value3", STRING_SERIALIZER, STRING_SERIALIZER, 30L);
+        assertNextOutputRecord(OUTPUT_TOPIC_1, "key1", "value1", partition, 20L);
+        assertNextOutputRecord(OUTPUT_TOPIC_1, "key2", "value2", partition, 30L);
+        assertNextOutputRecord(OUTPUT_TOPIC_1, "key3", "value3", partition, 40L);
+    }
 
     private void assertNextOutputRecord(final String topic,
                                         final String key,
@@ -345,7 +358,7 @@ private void assertNextOutputRecord(final String topic,
                                         final String value,
                                         final Integer partition,
                                         final Long timestamp) {
-        ProducerRecord<String, String> record = driver.readOutput(topic, STRING_DESERIALIZER, STRING_DESERIALIZER);
+        final ProducerRecord<String, String> record = driver.readOutput(topic, STRING_DESERIALIZER, STRING_DESERIALIZER);
         assertEquals(topic, record.topic());
         assertEquals(key, record.key());
         assertEquals(value, record.value());
@@ -353,51 +366,63 @@ private void assertNextOutputRecord(final String topic,
         assertEquals(timestamp, record.timestamp());
     }
 
-    private void assertNoOutputRecord(String topic) {
+    private void assertNoOutputRecord(final String topic) {
         assertNull(driver.readOutput(topic));
     }
 
     private StreamPartitioner<Object, Object> constantPartitioner(final Integer partition) {
         return new StreamPartitioner<Object, Object>() {
             @Override
-            public Integer partition(Object key, Object value, int numPartitions) {
+            public Integer partition(final Object key, final Object value, final int numPartitions) {
                 return partition;
             }
         };
     }
 
-    private TopologyBuilder createSimpleTopology(int partition) {
-        return builder.addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC_1)
-                                    .addProcessor("processor", define(new ForwardingProcessor()), "source")
-                                    .addSink("sink", OUTPUT_TOPIC_1, constantPartitioner(partition), "processor");
+    private TopologyBuilder createSimpleTopology(final int partition) {
+        return builder
+            .addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC_1)
+            .addProcessor("processor", define(new ForwardingProcessor()), "source")
+            .addSink("sink", OUTPUT_TOPIC_1, constantPartitioner(partition), "processor");
+    }
+
+    private TopologyBuilder createTimestampTopology(final int partition) {
+        return builder
+            .addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC_1)
+            .addProcessor("processor", define(new TimestampProcessor()), "source")
+            .addSink("sink", OUTPUT_TOPIC_1, constantPartitioner(partition), "processor");
     }
 
     private TopologyBuilder createMultiplexingTopology() {
-        return builder.addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC_1)
-                                    .addProcessor("processor", define(new MultiplexingProcessor(2)), "source")
-                                    .addSink("sink1", OUTPUT_TOPIC_1, "processor")
-                                    .addSink("sink2", OUTPUT_TOPIC_2, "processor");
+        return builder
+            .addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC_1)
+            .addProcessor("processor", define(new MultiplexingProcessor(2)), "source")
+            .addSink("sink1", OUTPUT_TOPIC_1, "processor")
+            .addSink("sink2", OUTPUT_TOPIC_2, "processor");
     }
 
     private TopologyBuilder createMultiplexByNameTopology() {
-        return builder.addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC_1)
+        return builder
+            .addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC_1)
             .addProcessor("processor", define(new MultiplexByNameProcessor(2)), "source")
             .addSink("sink0", OUTPUT_TOPIC_1, "processor")
             .addSink("sink1", OUTPUT_TOPIC_2, "processor");
     }
 
-    private TopologyBuilder createStatefulTopology(String storeName) {
-        return builder.addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC_1)
-                                    .addProcessor("processor", define(new StatefulProcessor(storeName)), "source")
-                                    .addStateStore(
-                                            Stores.create(storeName).withStringKeys().withStringValues().inMemory().build(),
-                                            "processor"
-                                    )
-                                    .addSink("counts", OUTPUT_TOPIC_1, "processor");
+    private TopologyBuilder createStatefulTopology(final String storeName) {
+        return builder
+            .addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC_1)
+            .addProcessor("processor", define(new StatefulProcessor(storeName)), "source")
+            .addStateStore(
+                Stores.create(storeName).withStringKeys().withStringValues().inMemory().build(),
+                "processor"
+            )
+            .addSink("counts", OUTPUT_TOPIC_1, "processor");
     }
 
     private TopologyBuilder createInternalRepartitioningTopology() {
-        return builder.addSource("source", INPUT_TOPIC_1)
+        return builder
+            .addSource("source", INPUT_TOPIC_1)
             .addInternalTopic(THROUGH_TOPIC_1)
             .addSink("sink0", THROUGH_TOPIC_1, "source")
             .addSource("source1", THROUGH_TOPIC_1)
@@ -405,12 +430,13 @@ private TopologyBuilder createInternalRepartitioningTopology() {
     }
 
     private TopologyBuilder createInternalRepartitioningWithValueTimestampTopology() {
-        return builder.addSource("source", INPUT_TOPIC_1)
-                .addInternalTopic(THROUGH_TOPIC_1)
-                .addProcessor("processor", define(new ValueTimestampProcessor()), "source")
-                .addSink("sink0", THROUGH_TOPIC_1, "processor")
-                .addSource("source1", THROUGH_TOPIC_1)
-                .addSink("sink1", OUTPUT_TOPIC_1, "source1");
+        return builder
+            .addSource("source", INPUT_TOPIC_1)
+            .addInternalTopic(THROUGH_TOPIC_1)
+            .addProcessor("processor", define(new ValueTimestampProcessor()), "source")
+            .addSink("sink0", THROUGH_TOPIC_1, "processor")
+            .addSource("source1", THROUGH_TOPIC_1)
+            .addSink("sink1", OUTPUT_TOPIC_1, "source1");
     }
 
     private TopologyBuilder createForwardToSourceTopology() {
@@ -434,26 +460,34 @@ private TopologyBuilder createSimpleMultiSourceTopology(int partition) {
      * A processor that simply forwards all messages to all children.
      */
     protected static class ForwardingProcessor extends AbstractProcessor<String, String> {
-
         @Override
-        public void process(String key, String value) {
+        public void process(final String key, final String value) {
             context().forward(key, value);
         }
 
         @Override
-        public void punctuate(long streamTime) {
+        public void punctuate(final long streamTime) {
             context().forward(Long.toString(streamTime), "punctuate");
         }
     }
 
+    /**
+     * A processor that simply forwards all messages to all children with advanced timestamps.
+     */
+    protected static class TimestampProcessor extends AbstractProcessor<String, String> {
+        @Override
+        public void process(final String key, final String value) {
+            context().forward(key, value, To.all().withTimestamp(context().timestamp() + 10));
+        }
+    }
+
     /**
      * A processor that removes custom timestamp information from messages and forwards modified messages to each child.
      * A message contains custom timestamp information if the value is in ".*@[0-9]+" format.
      */
     protected static class ValueTimestampProcessor extends AbstractProcessor<String, String> {
-
         @Override
-        public void process(String key, String value) {
+        public void process(final String key, final String value) {
             context().forward(key, value.split("@")[0]);
         }
     }
@@ -462,22 +496,23 @@ public void process(String key, String value) {
      * A processor that forwards slightly-modified messages to each child.
      */
     protected static class MultiplexingProcessor extends AbstractProcessor<String, String> {
-
         private final int numChildren;
 
-        public MultiplexingProcessor(int numChildren) {
+        MultiplexingProcessor(final int numChildren) {
             this.numChildren = numChildren;
         }
 
+        @SuppressWarnings("deprecation")
         @Override
-        public void process(String key, String value) {
+        public void process(final String key, final String value) {
             for (int i = 0; i != numChildren; ++i) {
                 context().forward(key, value + "(" + (i + 1) + ")", i);
             }
         }
 
+        @SuppressWarnings("deprecation")
         @Override
-        public void punctuate(long streamTime) {
+        public void punctuate(final long streamTime) {
             for (int i = 0; i != numChildren; ++i) {
                 context().forward(Long.toString(streamTime), "punctuate(" + (i + 1) + ")", i);
             }
@@ -489,22 +524,23 @@ public void punctuate(long streamTime) {
      * Note: the children are assumed to be named "sink{child number}", e.g., sink1, or sink2, etc.
      */
     protected static class MultiplexByNameProcessor extends AbstractProcessor<String, String> {
-
         private final int numChildren;
 
-        public MultiplexByNameProcessor(int numChildren) {
+        MultiplexByNameProcessor(final int numChildren) {
             this.numChildren = numChildren;
         }
 
+        @SuppressWarnings("deprecation")
         @Override
-        public void process(String key, String value) {
+        public void process(final String key, final String value) {
             for (int i = 0; i != numChildren; ++i) {
-                context().forward(key, value + "(" + (i + 1) + ")", "sink" + i);
+                context().forward(key, value + "(" + (i + 1) + ")",  "sink" + i);
             }
         }
 
+        @SuppressWarnings("deprecation")
         @Override
-        public void punctuate(long streamTime) {
+        public void punctuate(final long streamTime) {
             for (int i = 0; i != numChildren; ++i) {
                 context().forward(Long.toString(streamTime), "punctuate(" + (i + 1) + ")", "sink" + i);
             }
@@ -516,28 +552,27 @@ public void punctuate(long streamTime) {
      * {@link #punctuate(long)} is called, it outputs the total number of entries in the store.
      */
     protected static class StatefulProcessor extends AbstractProcessor<String, String> {
-
         private KeyValueStore<String, String> store;
         private final String storeName;
 
-        public StatefulProcessor(String storeName) {
+        StatefulProcessor(final String storeName) {
             this.storeName = storeName;
         }
 
         @Override
         @SuppressWarnings("unchecked")
-        public void init(ProcessorContext context) {
+        public void init(final ProcessorContext context) {
             super.init(context);
             store = (KeyValueStore<String, String>) context.getStateStore(storeName);
         }
 
         @Override
-        public void process(String key, String value) {
+        public void process(final String key, final String value) {
             store.put(key, value);
         }
 
         @Override
-        public void punctuate(long streamTime) {
+        public void punctuate(final long streamTime) {
             int count = 0;
             try (KeyValueIterator<String, String> iter = store.all()) {
                 while (iter.hasNext()) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordContextStub.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordContextStub.java
index 7932d1ffa44..0af5e17980c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordContextStub.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordContextStub.java
@@ -19,14 +19,18 @@
 public class RecordContextStub implements RecordContext {
 
     private final long offset;
-    private final long timestamp;
+    private long timestamp;
     private final int partition;
     private final String topic;
 
     public RecordContextStub() {
         this(-1, -1, -1, "");
     }
-    public RecordContextStub(final long offset, final long timestamp, final int partition, final String topic) {
+
+    public RecordContextStub(final long offset,
+                             final long timestamp,
+                             final int partition,
+                             final String topic) {
         this.offset = offset;
         this.timestamp = timestamp;
         this.partition = partition;
@@ -43,6 +47,11 @@ public long timestamp() {
         return timestamp;
     }
 
+    @Override
+    public void setTimestamp(final long timestamp) {
+        this.timestamp = timestamp;
+    }
+
     @Override
     public String topic() {
         return topic;
diff --git a/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
index 06137fbca7a..6b0cb66bcb7 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
@@ -28,12 +28,14 @@
 import org.apache.kafka.streams.processor.StateRestoreListener;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.To;
 import org.apache.kafka.streams.processor.internals.AbstractProcessorContext;
 import org.apache.kafka.streams.processor.internals.CompositeRestoreListener;
 import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
 import org.apache.kafka.streams.processor.internals.ProcessorNode;
 import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
 import org.apache.kafka.streams.processor.internals.RecordCollector;
+import org.apache.kafka.streams.processor.internals.ToInternal;
 import org.apache.kafka.streams.processor.internals.WrappedBatchingStateRestoreCallback;
 import org.apache.kafka.streams.state.StateSerdes;
 import org.apache.kafka.streams.state.internals.ThreadCache;
@@ -53,6 +55,7 @@
     private final RecordCollector.Supplier recordCollectorSupplier;
     private final Map<String, StateStore> storeMap = new LinkedHashMap<>();
     private final Map<String, StateRestoreCallback> restoreFuncs = new HashMap<>();
+    private final ToInternal toInternal = new ToInternal();
 
     private Serde<?> keySerde;
     private Serde<?> valSerde;
@@ -179,44 +182,39 @@ public void commit() { }
     @Override
     @SuppressWarnings("unchecked")
     public <K, V> void forward(final K key, final V value) {
-        final ProcessorNode thisNode = currentNode;
-        for (final ProcessorNode childNode : (List<ProcessorNode<K, V>>) thisNode.children()) {
-            currentNode = childNode;
-            try {
-                childNode.process(key, value);
-            } finally {
-                currentNode = thisNode;
-            }
-        }
+        forward(key, value, To.all());
     }
 
     @Override
     @SuppressWarnings("unchecked")
     public <K, V> void forward(final K key, final V value, final int childIndex) {
-        final ProcessorNode thisNode = currentNode;
-        final ProcessorNode childNode = (ProcessorNode<K, V>) thisNode.children().get(childIndex);
-        currentNode = childNode;
-        try {
-            childNode.process(key, value);
-        } finally {
-            currentNode = thisNode;
-        }
+        forward(key, value, To.child(((List<ProcessorNode>) currentNode().children()).get(childIndex).name()));
     }
 
     @Override
     @SuppressWarnings("unchecked")
     public <K, V> void forward(final K key, final V value, final String childName) {
+        forward(key, value, To.child(childName));
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public <K, V> void forward(final K key, final V value, final To to) {
+        toInternal.update(to);
+        if (toInternal.hasTimestamp()) {
+            setTime(toInternal.timestamp());
+        }
         final ProcessorNode thisNode = currentNode;
-        for (final ProcessorNode childNode : (List<ProcessorNode<K, V>>) thisNode.children()) {
-            if (childNode.name().equals(childName)) {
-                currentNode = childNode;
-                try {
+        try {
+            for (final ProcessorNode childNode : (List<ProcessorNode<K, V>>) thisNode.children()) {
+                if (toInternal.child() == null || toInternal.child().equals(childNode.name())) {
+                    currentNode = childNode;
                     childNode.process(key, value);
-                } finally {
-                    currentNode = thisNode;
+                    toInternal.update(to); // need to reset because MockProcessorContext is shared over multiple Processors and toInternal might have been modified
                 }
-                break;
             }
+        } finally {
+            currentNode = thisNode;
         }
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java
index afa06394369..6b5d47a6b7b 100644
--- a/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java
+++ b/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java
@@ -24,6 +24,7 @@
 import org.apache.kafka.streams.processor.StateRestoreCallback;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.To;
 import org.apache.kafka.streams.processor.internals.AbstractProcessorContext;
 import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
 
@@ -64,6 +65,11 @@ public void schedule(final long interval) {
         forwardedValues.put(key, value);
     }
 
+    @Override
+    public <K, V> void forward(final K key, final V value, final To to) {
+        forwardedValues.put(key, value);
+    }
+
     @Override
     public <K, V> void forward(final K key, final V value, final int childIndex) {
         forward(key, value);


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Allow timestamp manipulation in Processor API
> ---------------------------------------------
>
>                 Key: KAFKA-6454
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6454
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>    Affects Versions: 1.0.0
>            Reporter: Matthias J. Sax
>            Assignee: Matthias J. Sax
>            Priority: Major
>              Labels: kip
>             Fix For: 1.2.0
>
>
> Atm, Kafka Streams only has a defined "contract" about timestamp propagation at the Processor API level: all processor within a sub-topology, see the timestamp from the input topic record and this timestamp will be used for all result record when writing them to an topic, too.
> For the DSL and also for custom operator, it would be desirable to allow timestamp manipulation for at Processor level for individual records that are forwarded.
> KIP-251: https://cwiki.apache.org/confluence/display/KAFKA/KIP-251%3A+Allow+timestamp+manipulation+in+Processor+API



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message