flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aljos...@apache.org
Subject [1/2] flink git commit: [hotfix] Change order of reduce and fold in window documentation
Date Mon, 16 Oct 2017 11:32:17 GMT
Repository: flink
Updated Branches:
  refs/heads/master 0214e8003 -> 404e37d21


[hotfix] Change order of reduce and fold in window documentation

In the section about incremental aggregation with a window function the
order of fold and reduce was different from the order of fold and reduce
in the rest of the documentation.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/aa2f92c7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/aa2f92c7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/aa2f92c7

Branch: refs/heads/master
Commit: aa2f92c73d2b2bfbd57f341597407ebcb44ca174
Parents: 0214e80
Author: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Authored: Mon Oct 16 13:25:15 2017 +0200
Committer: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Committed: Mon Oct 16 13:25:15 2017 +0200

----------------------------------------------------------------------
 docs/dev/stream/operators/windows.md | 112 +++++++++++++++---------------
 1 file changed, 56 insertions(+), 56 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/aa2f92c7/docs/dev/stream/operators/windows.md
----------------------------------------------------------------------
diff --git a/docs/dev/stream/operators/windows.md b/docs/dev/stream/operators/windows.md
index 0ecae0c..79a2dd6 100644
--- a/docs/dev/stream/operators/windows.md
+++ b/docs/dev/stream/operators/windows.md
@@ -506,13 +506,13 @@ public abstract class ProcessWindowFunction<IN, OUT, KEY, W extends
Window> impl
    	     * Returns the window that is being evaluated.
    	     */
    	    public abstract W window();
-   
+
    	    /** Returns the current processing time. */
    	    public abstract long currentProcessingTime();
-   
+
    	    /** Returns the current event-time watermark. */
    	    public abstract long currentWatermark();
-   
+
    	    /**
    	     * State accessor for per-key and per-window state.
    	     *
@@ -520,7 +520,7 @@ public abstract class ProcessWindowFunction<IN, OUT, KEY, W extends
Window> impl
    	     * by implementing {@link ProcessWindowFunction#clear(Context)}.
    	     */
    	    public abstract KeyedStateStore windowState();
-   
+
    	    /**
    	     * State accessor for per-key global state.
    	     */
@@ -558,22 +558,22 @@ abstract class ProcessWindowFunction[IN, OUT, KEY, W <: Window] extends
Function
       * Returns the window that is being evaluated.
       */
     def window: W
-  
+
     /**
       * Returns the current processing time.
       */
     def currentProcessingTime: Long
-  
+
     /**
       * Returns the current event-time watermark.
       */
     def currentWatermark: Long
-  
+
     /**
       * State accessor for per-key and per-window state.
       */
     def windowState: KeyedStateStore
-  
+
     /**
       * State accessor for per-key global state.
       */
@@ -658,11 +658,11 @@ additional window meta information of the `ProcessWindowFunction`.
 <span class="label label-info">Note</span> You can also the legacy `WindowFunction`
instead of
 `ProcessWindowFunction` for incremental window aggregation.
 
-#### Incremental Window Aggregation with FoldFunction
+#### Incremental Window Aggregation with ReduceFunction
 
-The following example shows how an incremental `FoldFunction` can be combined with
-a `ProcessWindowFunction` to extract the number of events in the window and return also
-the key and end time of the window.
+The following example shows how an incremental `ReduceFunction` can be combined with
+a `WindowFunction` to return the smallest event in a window along
+with the start time of the window.
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
@@ -672,29 +672,26 @@ DataStream<SensorReading> input = ...;
 input
   .keyBy(<key selector>)
   .timeWindow(<window assigner>)
-  .fold(new Tuple3<String, Long, Integer>("",0L, 0), new MyFoldFunction(), new MyProcessWindowFunction())
+  .reduce(new MyReduceFunction(), new MyProcessWindowFunction());
 
 // Function definitions
 
-private static class MyFoldFunction
-    implements FoldFunction<SensorReading, Tuple3<String, Long, Integer> > {
+private static class MyReduceFunction implements ReduceFunction<SensorReading> {
 
-  public Tuple3<String, Long, Integer> fold(Tuple3<String, Long, Integer> acc,
SensorReading s) {
-      Integer cur = acc.getField(2);
-      acc.setField(2, cur + 1);
-      return acc;
+  public SensorReading reduce(SensorReading r1, SensorReading r2) {
+      return r1.value() > r2.value() ? r2 : r1;
   }
 }
 
 private static class MyProcessWindowFunction
-    implements ProcessWindowFunction<Tuple3<String, Long, Integer>, Tuple3<String,
Long, Integer>, String, TimeWindow> {
+    implements ProcessWindowFunction<SensorReading, Tuple2<Long, SensorReading>,
String, TimeWindow> {
 
-  public void process(String key,
+  public void apply(String key,
                     Context context,
-                    Iterable<Tuple3<String, Long, Integer>> counts,
-                    Collector<Tuple3<String, Long, Integer>> out) {
-    Integer count = counts.iterator().next().getField(2);
-    out.collect(new Tuple3<String, Long, Integer>(key, context.window().getEnd(),count));
+                    Iterable<SensorReading> minReadings,
+                    Collector<Tuple2<Long, SensorReading>> out) {
+      SensorReading min = minReadings.iterator().next();
+      out.collect(new Tuple2<Long, SensorReading>(window.getStart(), min));
   }
 }
 
@@ -706,18 +703,17 @@ private static class MyProcessWindowFunction
 val input: DataStream[SensorReading] = ...
 
 input
- .keyBy(<key selector>)
- .timeWindow(<window assigner>)
- .fold (
-    ("", 0L, 0),
-    (acc: (String, Long, Int), r: SensorReading) => { ("", 0L, acc._3 + 1) },
+  .keyBy(<key selector>)
+  .timeWindow(<window assigner>)
+  .reduce(
+    (r1: SensorReading, r2: SensorReading) => { if (r1.value > r2.value) r2 else r1
},
     ( key: String,
       window: TimeWindow,
-      counts: Iterable[(String, Long, Int)],
-      out: Collector[(String, Long, Int)] ) =>
+      minReadings: Iterable[SensorReading],
+      out: Collector[(Long, SensorReading)] ) =>
       {
-        val count = counts.iterator.next()
-        out.collect((key, window.getEnd, count._3))
+        val min = minReadings.iterator.next()
+        out.collect((window.getStart, min))
       }
   )
 
@@ -725,11 +721,11 @@ input
 </div>
 </div>
 
-#### Incremental Window Aggregation with ReduceFunction
+#### Incremental Window Aggregation with FoldFunction
 
-The following example shows how an incremental `ReduceFunction` can be combined with
-a `WindowFunction` to return the smallest event in a window along
-with the start time of the window.
+The following example shows how an incremental `FoldFunction` can be combined with
+a `ProcessWindowFunction` to extract the number of events in the window and return also
+the key and end time of the window.
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
@@ -739,26 +735,29 @@ DataStream<SensorReading> input = ...;
 input
   .keyBy(<key selector>)
   .timeWindow(<window assigner>)
-  .reduce(new MyReduceFunction(), new MyProcessWindowFunction());
+  .fold(new Tuple3<String, Long, Integer>("",0L, 0), new MyFoldFunction(), new MyProcessWindowFunction())
 
 // Function definitions
 
-private static class MyReduceFunction implements ReduceFunction<SensorReading> {
+private static class MyFoldFunction
+    implements FoldFunction<SensorReading, Tuple3<String, Long, Integer> > {
 
-  public SensorReading reduce(SensorReading r1, SensorReading r2) {
-      return r1.value() > r2.value() ? r2 : r1;
+  public Tuple3<String, Long, Integer> fold(Tuple3<String, Long, Integer> acc,
SensorReading s) {
+      Integer cur = acc.getField(2);
+      acc.setField(2, cur + 1);
+      return acc;
   }
 }
 
 private static class MyProcessWindowFunction
-    implements ProcessWindowFunction<SensorReading, Tuple2<Long, SensorReading>,
String, TimeWindow> {
+    implements ProcessWindowFunction<Tuple3<String, Long, Integer>, Tuple3<String,
Long, Integer>, String, TimeWindow> {
 
-  public void apply(String key,
+  public void process(String key,
                     Context context,
-                    Iterable<SensorReading> minReadings,
-                    Collector<Tuple2<Long, SensorReading>> out) {
-      SensorReading min = minReadings.iterator().next();
-      out.collect(new Tuple2<Long, SensorReading>(window.getStart(), min));
+                    Iterable<Tuple3<String, Long, Integer>> counts,
+                    Collector<Tuple3<String, Long, Integer>> out) {
+    Integer count = counts.iterator().next().getField(2);
+    out.collect(new Tuple3<String, Long, Integer>(key, context.window().getEnd(),count));
   }
 }
 
@@ -770,17 +769,18 @@ private static class MyProcessWindowFunction
 val input: DataStream[SensorReading] = ...
 
 input
-  .keyBy(<key selector>)
-  .timeWindow(<window assigner>)
-  .reduce(
-    (r1: SensorReading, r2: SensorReading) => { if (r1.value > r2.value) r2 else r1
},
+ .keyBy(<key selector>)
+ .timeWindow(<window assigner>)
+ .fold (
+    ("", 0L, 0),
+    (acc: (String, Long, Int), r: SensorReading) => { ("", 0L, acc._3 + 1) },
     ( key: String,
       window: TimeWindow,
-      minReadings: Iterable[SensorReading],
-      out: Collector[(Long, SensorReading)] ) =>
+      counts: Iterable[(String, Long, Int)],
+      out: Collector[(String, Long, Int)] ) =>
       {
-        val min = minReadings.iterator.next()
-        out.collect((window.getStart, min))
+        val count = counts.iterator.next()
+        out.collect((key, window.getEnd, count._3))
       }
   )
 


Mime
View raw message