flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aljos...@apache.org
Subject [2/7] flink git commit: [FLINK-3403] Create Section "Working with Time" in Streaming Guide
Date Wed, 17 Feb 2016 09:59:39 GMT
[FLINK-3403] Create Section "Working with Time" in Streaming Guide


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/134b5c2b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/134b5c2b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/134b5c2b

Branch: refs/heads/master
Commit: 134b5c2b354387f599ef0a20fecc6b92e12390c0
Parents: ce8f966
Author: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Authored: Mon Feb 15 19:30:55 2016 +0100
Committer: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Committed: Wed Feb 17 10:57:44 2016 +0100

----------------------------------------------------------------------
 docs/apis/streaming/connectors/index.md    |   2 +-
 docs/apis/streaming/fault_tolerance.md     |   2 +-
 docs/apis/streaming/index.md               | 139 --------------------
 docs/apis/streaming/libs/cep.md            |   8 +-
 docs/apis/streaming/libs/index.md          |   2 +-
 docs/apis/streaming/storm_compatibility.md |   4 +-
 docs/apis/streaming/time.md                | 164 ++++++++++++++++++++++++
 7 files changed, 173 insertions(+), 148 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/134b5c2b/docs/apis/streaming/connectors/index.md
----------------------------------------------------------------------
diff --git a/docs/apis/streaming/connectors/index.md b/docs/apis/streaming/connectors/index.md
index 378be6c..8204e3e 100644
--- a/docs/apis/streaming/connectors/index.md
+++ b/docs/apis/streaming/connectors/index.md
@@ -4,7 +4,7 @@ title: "Streaming Connectors"
 # Sub-level navigation
 sub-nav-group: streaming
 sub-nav-id: connectors
-sub-nav-pos: 2
+sub-nav-pos: 3
 sub-nav-title: Connectors
 ---
 <!--

http://git-wip-us.apache.org/repos/asf/flink/blob/134b5c2b/docs/apis/streaming/fault_tolerance.md
----------------------------------------------------------------------
diff --git a/docs/apis/streaming/fault_tolerance.md b/docs/apis/streaming/fault_tolerance.md
index f1a6803..fa61bd6 100644
--- a/docs/apis/streaming/fault_tolerance.md
+++ b/docs/apis/streaming/fault_tolerance.md
@@ -4,7 +4,7 @@ is_beta: false
 
 sub-nav-group: streaming
 sub-nav-id: fault_tolerance
-sub-nav-pos: 3
+sub-nav-pos: 4
 ---
 <!--
 Licensed to the Apache Software Foundation (ASF) under one

http://git-wip-us.apache.org/repos/asf/flink/blob/134b5c2b/docs/apis/streaming/index.md
----------------------------------------------------------------------
diff --git a/docs/apis/streaming/index.md b/docs/apis/streaming/index.md
index 6bc37a3..5cfde85 100644
--- a/docs/apis/streaming/index.md
+++ b/docs/apis/streaming/index.md
@@ -1984,145 +1984,6 @@ val myOutput: Iterator[(String, Int)] = DataStreamUtils.collect(myResult.getJava
 Windows
 -------
 
-### Working with Time
-
-Windows are typically groups of events within a certain time period. Reasoning about time
and windows assumes
-a definition of time. Flink has support for three kinds of time:
-
-- *Processing time:* Processing time is simply the wall clock time of the machine that happens
to be
-    executing the transformation. Processing time is the simplest notion of time and provides
the best
-    performance. However, in distributed and asynchronous environments processing time does
not provide
-    determinism.
-
-- *Event time:* Event time is the time that each individual event occurred. This time is
-    typically embedded within the records before they enter Flink or can be extracted from
their contents.
-    When using event time, out-of-order events can be properly handled. For example, an event
with a lower
-    timestamp may arrive after an event with a higher timestamp, but transformations will
handle these events
-    correctly. Event time processing provides predictable results, but incurs more latency,
as out-of-order
-    events need to be buffered
-
-- *Ingestion time:* Ingestion time is the time that events enter Flink. In particular, the
timestamp of
-    an event is assigned by the source operator as the current wall clock time of the machine
that executes
-    the source task at the time the records enter the Flink source. Ingestion time is more
predictable
-    than processing time, and gives lower latencies than event time as the latency does not
depend on
-    external systems. Ingestion time provides thus a middle ground between processing time
and event time.
-    Ingestion time is a special case of event time (and indeed, it is treated by Flink identically
to
-    event time).
-
-When dealing with event time, transformations need to avoid indefinite wait times for events
to
-arrive. *Watermarks* provide the mechanism to control the event time/processing time skew.
-Watermarks can be emitted by the sources. A watermark with a certain timestamp denotes the
knowledge
-that no event with timestamp lower than the timestamp of the watermark will ever arrive.
-
-Per default, a Flink Job is only set up for processing time semantics, so in order to write
a
-program with processing time semantics nothing needs to be specified (e.g., the first [example
-](#example-program) in this guide follows processing time semantics). To perform processing-time
-windowing you would use window assigners such as `SlidingProcessingTimeWindows` and
-`TumblingProcessingTimeWindows`.
-
-In order to work with event time semantics, i.e. if you want to use window assigners such
as
-`TumblingTimeWindows` or `SlidingTimeWindows`, you need to follow these steps:
-
-- Set `enableTimestamps()`, as well the interval for watermark emission
-(`setAutoWatermarkInterval(long milliseconds)`) in `ExecutionConfig`.
-
-- Use `DataStream.assignTimestamps(...)` in order to tell Flink how timestamps relate to
events
-(e.g., which record field is the timestamp)
-
-For example, assume that we have a data stream of tuples, in which the first field is the
timestamp (assigned
-by the system that generates these data streams), and we know that the lag between the current
processing
-time and the timestamp of an event is never more than 1 second:
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-DataStream<Tuple4<Long,Integer,Double,String>> stream = //...
-stream.assignTimestamps(new TimestampExtractor<Tuple4<Long,Integer,Double,String>>{
-    @Override
-    public long extractTimestamp(Tuple4<Long,Integer,Double,String> element, long currentTimestamp)
{
-        return element.f0;
-    }
-
-    @Override
-    public long extractWatermark(Tuple4<Long,Integer,Double,String> element, long currentTimestamp)
{
-        return element.f0 - 1000;
-    }
-
-    @Override
-    public long getCurrentWatermark() {
-        return Long.MIN_VALUE;
-    }
-});
-{% endhighlight %}
-</div>
-
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-val stream: DataStream[(Long,Int,Double,String)] = null;
-stream.assignTimestampts(new TimestampExtractor[(Long, Int, Double, String)] {
-  override def extractTimestamp(element: (Long, Int, Double, String), currentTimestamp: Long):
Long = element._1
-
-  override def extractWatermark(element: (Long, Int, Double, String), currentTimestamp: Long):
Long = element._1 - 1000
-
-  override def getCurrentWatermark: Long = Long.MinValue
-})
-{% endhighlight %}
-</div>
-</div>
-
-If you know that timestamps of events are always ascending, i.e., elements arrive in order,
you can use
-the `AscendingTimestampExtractor`, and the system generates watermarks automatically:
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-DataStream<Tuple4<Long,Integer,Double,String>> stream = //...
-stream.assignTimestamps(new AscendingTimestampExtractor<Tuple4<Long,Integer,Double,String>>{
-    @Override
-    public long extractAscendingTimestamp(Tuple4<Long,Integer,Double,String> element,
long currentTimestamp) {
-        return element.f0;
-    }
-});
-{% endhighlight %}
-</div>
-
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-stream.extractAscendingTimestamp(record => record._1)
-{% endhighlight %}
-</div>
-</div>
-
-Flink also has a shortcut for working with time, the `stream time characteristic`. It can
-be specified as:
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
-env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
-env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
-{% endhighlight %}
-</div>
-
-<div data-lang="scala" markdown="1">
-{% highlight java %}
-env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
-env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
-env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
-{% endhighlight %}
-</div>
-</div>
-
-For `EventTime`, this will enable timestamps and also set a default watermark interval.
-The `timeWindow()` and `timeWindowAll()` transformations will respect this time characteristic
and
-instantiate the correct window assigner based on the time characteristic.
-
-In order to write a program with ingestion time semantics, you need to set
-`env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)`. You can think of this
setting
-as a shortcut for writing a `TimestampExtractor` which assignes timestamps to events at the
sources
-based on the current source wall-clock time. Flink injects this timestamp extractor automatically.
-
 ### Windows on Keyed Data Streams
 
 Flink offers a variety of methods for defining windows on a `KeyedStream`. All of these group
elements *per key*,

http://git-wip-us.apache.org/repos/asf/flink/blob/134b5c2b/docs/apis/streaming/libs/cep.md
----------------------------------------------------------------------
diff --git a/docs/apis/streaming/libs/cep.md b/docs/apis/streaming/libs/cep.md
index aa23876..db4ddf2 100644
--- a/docs/apis/streaming/libs/cep.md
+++ b/docs/apis/streaming/libs/cep.md
@@ -59,7 +59,7 @@ DataStream<Event> input = ...
 Pattern<Event, ?> pattern = Pattern.begin("start").where(evt -> evt.getId() == 42)
     .next("middle").subtype(SubEvent.class).where(subEvt -> subEvt.getVolume() >= 10.0)
     .followedBy("end").where(evt -> evt.getName().equals("end"));
-    
+
 PatternStream<Event> patternStream = CEP.from(input, pattern);
 
 DataStream<Alert> result = patternStream.select(pattern -> {
@@ -85,7 +85,7 @@ Pattern<Event, ?> start = Pattern.<Event>begin("start");
 
 Each state must have an unique name to identify the matched events later on.
 Additionally, we can specify a filter condition for the event to be accepted as the start
event via the `where` method.
-  
+
 {% highlight java %}
 start.where(new FilterFunction<Event>() {
     @Override
@@ -234,7 +234,7 @@ class MyPatternSelectFunction<IN, OUT> implements PatternSelectFunction<IN,
OUT>
     public OUT select(Map<String, IN> pattern) {
         IN startEvent = pattern.get("start");
         IN endEvent = pattern.get("end");
-        
+
         return new OUT(startEvent, endEvent);
     }
 }
@@ -249,7 +249,7 @@ class MyPatternFlatSelectFunction<IN, OUT> implements PatternFlatSelectFunction<
     public void select(Map<String, IN> pattern, Collector<OUT> collector) {
         IN startEvent = pattern.get("start");
         IN endEvent = pattern.get("end");
-        
+
         for (int i = 0; i < startEvent.getValue(); i++ ) {
             collector.collect(new OUT(startEvent, endEvent));
         }

http://git-wip-us.apache.org/repos/asf/flink/blob/134b5c2b/docs/apis/streaming/libs/index.md
----------------------------------------------------------------------
diff --git a/docs/apis/streaming/libs/index.md b/docs/apis/streaming/libs/index.md
index a7362a6..57c7b62 100644
--- a/docs/apis/streaming/libs/index.md
+++ b/docs/apis/streaming/libs/index.md
@@ -2,7 +2,7 @@
 title: "Streaming Libraries"
 sub-nav-group: streaming
 sub-nav-id: libs
-sub-nav-pos: 4
+sub-nav-pos: 5
 sub-nav-title: Libraries
 ---
 <!--

http://git-wip-us.apache.org/repos/asf/flink/blob/134b5c2b/docs/apis/streaming/storm_compatibility.md
----------------------------------------------------------------------
diff --git a/docs/apis/streaming/storm_compatibility.md b/docs/apis/streaming/storm_compatibility.md
index 4d5715c..9ff4a58 100644
--- a/docs/apis/streaming/storm_compatibility.md
+++ b/docs/apis/streaming/storm_compatibility.md
@@ -2,7 +2,7 @@
 title: "Storm Compatibility"
 is_beta: true
 sub-nav-group: streaming
-sub-nav-pos: 6
+sub-nav-pos: 7
 ---
 <!--
 Licensed to the Apache Software Foundation (ASF) under one
@@ -176,7 +176,7 @@ For `Tuple` input types, it is required to specify the input schema using
Storm'
 For this case, the constructor of `BoltWrapper` takes an additional argument: `new BoltWrapper<Tuple1<String>,
...>(..., new Fields("sentence"))`.
 The input type is `Tuple1<String>` and `Fields("sentence")` specify that `input.getStringByField("sentence")`
is equivalent to `input.getString(0)`.
 
-See [BoltTokenizerWordCountPojo](https://github.com/apache/flink/tree/master/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountPojo.java)
and [BoltTokenizerWordCountWithNames](https://github.com/apache/flink/tree/master/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountWithNames.java)
for examples.  
+See [BoltTokenizerWordCountPojo](https://github.com/apache/flink/tree/master/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountPojo.java)
and [BoltTokenizerWordCountWithNames](https://github.com/apache/flink/tree/master/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountWithNames.java)
for examples.
 
 ## Configuring Spouts and Bolts
 

http://git-wip-us.apache.org/repos/asf/flink/blob/134b5c2b/docs/apis/streaming/time.md
----------------------------------------------------------------------
diff --git a/docs/apis/streaming/time.md b/docs/apis/streaming/time.md
new file mode 100644
index 0000000..1359f10
--- /dev/null
+++ b/docs/apis/streaming/time.md
@@ -0,0 +1,164 @@
+---
+title: "Working with Time"
+is_beta: false
+sub-nav-group: streaming
+sub-nav-pos: 2
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+* toc
+{:toc}
+
+When working with time windows it becomes necessary to think about the concept of time
+in a streaming program. Flink has support for three kinds of time:
+
+- *Processing time:* Processing time is simply the wall clock time of the machine that happens
to be
+    executing the transformation. Processing time is the simplest notion of time and provides
the best
+    performance. However, in distributed and asynchronous environments processing time does
not provide
+    determinism.
+
+- *Event time:* Event time is the time that each individual event occurred. This time is
+    typically embedded within the records before they enter Flink or can be extracted from
their contents.
+    When using event time, out-of-order events can be properly handled. For example, an event
with a lower
+    timestamp may arrive after an event with a higher timestamp, but transformations will
handle these events
+    correctly. Event time processing provides predictable results, but incurs more latency,
as out-of-order
+    events need to be buffered
+
+- *Ingestion time:* Ingestion time is the time that events enter Flink. In particular, the
timestamp of
+    an event is assigned by the source operator as the current wall clock time of the machine
that executes
+    the source task at the time the records enter the Flink source. Ingestion time is more
predictable
+    than processing time, and gives lower latencies than event time as the latency does not
depend on
+    external systems. Ingestion time provides thus a middle ground between processing time
and event time.
+    Ingestion time is a special case of event time (and indeed, it is treated by Flink identically
to
+    event time).
+
+When dealing with event time, transformations need to avoid indefinite wait times for events
to
+arrive. *Watermarks* provide the mechanism to control the event time/processing time skew.
+Watermarks can be emitted by the sources. A watermark with a certain timestamp denotes the
knowledge
+that no event with timestamp lower than the timestamp of the watermark will ever arrive.
+
+Per default, a Flink Job is only set up for processing time semantics, so in order to write
a
+program with processing time semantics nothing needs to be specified (e.g., the first [example
+](#example-program) in this guide follows processing time semantics). To perform processing-time
+windowing you would use window assigners such as `SlidingProcessingTimeWindows` and
+`TumblingProcessingTimeWindows`.
+
+In order to work with event time semantics, i.e. if you want to use window assigners such
as
+`TumblingTimeWindows` or `SlidingTimeWindows`, you need to follow these steps:
+
+- Set `enableTimestamps()`, as well the interval for watermark emission
+(`setAutoWatermarkInterval(long milliseconds)`) in `ExecutionConfig`.
+
+- Use `DataStream.assignTimestamps(...)` in order to tell Flink how timestamps relate to
events
+(e.g., which record field is the timestamp)
+
+For example, assume that we have a data stream of tuples, in which the first field is the
timestamp (assigned
+by the system that generates these data streams), and we know that the lag between the current
processing
+time and the timestamp of an event is never more than 1 second:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+DataStream<Tuple4<Long,Integer,Double,String>> stream = //...
+stream.assignTimestamps(new TimestampExtractor<Tuple4<Long,Integer,Double,String>>{
+    @Override
+    public long extractTimestamp(Tuple4<Long,Integer,Double,String> element, long currentTimestamp)
{
+        return element.f0;
+    }
+
+    @Override
+    public long extractWatermark(Tuple4<Long,Integer,Double,String> element, long currentTimestamp)
{
+        return element.f0 - 1000;
+    }
+
+    @Override
+    public long getCurrentWatermark() {
+        return Long.MIN_VALUE;
+    }
+});
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val stream: DataStream[(Long,Int,Double,String)] = null;
+stream.assignTimestampts(new TimestampExtractor[(Long, Int, Double, String)] {
+  override def extractTimestamp(element: (Long, Int, Double, String), currentTimestamp: Long):
Long = element._1
+
+  override def extractWatermark(element: (Long, Int, Double, String), currentTimestamp: Long):
Long = element._1 - 1000
+
+  override def getCurrentWatermark: Long = Long.MinValue
+})
+{% endhighlight %}
+</div>
+</div>
+
+If you know that timestamps of events are always ascending, i.e., elements arrive in order,
you can use
+the `AscendingTimestampExtractor`, and the system generates watermarks automatically:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+DataStream<Tuple4<Long,Integer,Double,String>> stream = //...
+stream.assignTimestamps(new AscendingTimestampExtractor<Tuple4<Long,Integer,Double,String>>{
+    @Override
+    public long extractAscendingTimestamp(Tuple4<Long,Integer,Double,String> element,
long currentTimestamp) {
+        return element.f0;
+    }
+});
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+stream.extractAscendingTimestamp(record => record._1)
+{% endhighlight %}
+</div>
+</div>
+
+Flink also has a shortcut for working with time, the `stream time characteristic`. It can
+be specified as:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
+env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight java %}
+env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
+env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
+env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+{% endhighlight %}
+</div>
+</div>
+
+For `EventTime`, this will enable timestamps and also set a default watermark interval.
+The `timeWindow()` and `timeWindowAll()` transformations will respect this time characteristic
and
+instantiate the correct window assigner based on the time characteristic.
+
+In order to write a program with ingestion time semantics, you need to set
+`env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)`. You can think of this
setting
+as a shortcut for writing a `TimestampExtractor` which assignes timestamps to events at the
sources
+based on the current source wall-clock time. Flink injects this timestamp extractor automatically.
\ No newline at end of file


Mime
View raw message