flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] [flink] twalthr commented on a change in pull request #15837: [FLINK-22537][docs] Add documentation how to interact with DataStream API
Date Thu, 06 May 2021 09:13:36 GMT

twalthr commented on a change in pull request #15837:
URL: https://github.com/apache/flink/pull/15837#discussion_r627235155



##########
File path: docs/content/docs/dev/table/data_stream_api.md
##########
@@ -0,0 +1,2106 @@
+---
+title: "DataStream API Integration"
+weight: 3
+type: docs
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# DataStream API Integration
+
+{{< hint info >}}
+Tthis page only discusses the integration with DataStream API in JVM languages such as Java
or Scala.
+For Python, see the [Python API]({{< ref "docs/dev/python/overview" >}}) area.
+{{< /hint >}}
+
+Both Table & SQL API and DataStream API are equality important when it comes to defining
a data
+processing pipeline.
+
+The DataStream API offers the primitives of stream processing (namely time, state, and dataflow
+management) in a rather low-level imperative programming API. The Table & SQL API abstracts
many
+internals and provides a structured and declarative API.
+
+Both APIs can work with bounded *and* unbounded streams.
+
+Bounded streams need to be managed when processing historical data. Unbounded streams occur
+in real-time processing scenarios that might be initialized with historical data first.
+
+For efficient execution, both APIs offer processing bounded streams in an optimized batch
execution
+mode. However, since batch is just a special case of streaming, it is also possible to run
pipelines
+of bounded streams in regular streaming execution mode.
+
+{{< hint warning >}}
+Both DataStream API and Table API provide their own way for enabling the batch execution
mode at the
+moment. In the near future, this will be further unified.
+{{< /hint >}}
+
+Pipelines in one API can be defined end-to-end without dependencies to the other API. However,
it
+might be useful to mix both APIs for various reasons:
+
+- Use the table ecosystem for accessing catalogs or connecting to external systems easily,
before
+implementing the main pipeline in DataStream API.
+- Access some of the SQL functions for stateless data normalization and cleansing, before
+implementing the main pipeline in DataStream API.
+- Switch to DataStream API every now and then if a more low-level operation (e.g. custom
timer
+handling) is not present in Table API.
+
+Flink provides special bridging functionalities to make the integration with DataStream API
as smooth
+as possible.
+
+{{< hint info >}}
+Switching between DataStream and Table API adds some conversion overhead. For example, internal
data
+structures of the table runtime (i.e. `RowData`) that partially work on binary data need
to be converted
+to more user-friendly data structures (i.e. `Row`). Usually, this overhead can be neglected
but is
+mentioned here for completeness.
+{{< /hint >}}
+
+{{< top >}}
+
+Converting between DataStream and Table
+---------------------------------------
+
+Flink provides a specialized `StreamTableEnvironment` in Java and Scala for integrating with
the
+DataStream API. Those environments extend the regular `TableEnvironment` with additional
methods
+and take the `StreamExecutionEnvironment` used in the DataStream API as a parameter.
+
+{{< hint warning >}}
+Currently, the `StreamTableEnvironment` does not support batch execution mode. Use the regular
`TableEnvironment`
+for this. Nevertheless, both bounded and unbounded streams can also be processed using streaming
+execution mode.
+{{< /hint >}}
+
+The following code shows an example of how to go back and forth between the two APIs. Column
names
+and types of the `Table` are automatically derived from the `TypeInformation` of the `DataStream`.
+Since the DataStream API does not support changelog processing natively, the code assumes
+append-only/insert-only semantics during the stream-to-table and table-to-stream conversion.
+
+{{< tabs "6ec84aa4-d91d-4c47-9fa2-b1aae1e3cdb5" >}}
+{{< tab "Java" >}}
+```java
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.types.Row;
+
+// create environments of both APIs
+StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
+
+// create a DataStream
+DataStream<String> dataStream = env.fromElements("Alice", "Bob", "John");
+
+// interpret the insert-only DataStream as a Table
+Table inputTable = tableEnv.fromDataStream(dataStream);
+
+// register the Table object as a view and query it
+tableEnv.createTemporaryView("InputTable", inputTable);
+Table resultTable = tableEnv.sqlQuery("SELECT UPPER(f0) FROM InputTable");
+
+// interpret the insert-only Table as a DataStream again
+DataStream<Row> resultStream = tableEnv.toDataStream(resultTable);
+
+// add a printing sink and execute in DataStream API
+resultStream.print();
+env.execute();
+
+// prints:
+// +I[Alice]
+// +I[Bob]
+// +I[John]
+```
+{{< /tab >}}
+{{< tab "Scala" >}}
+```scala
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
+
+// create environments of both APIs
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tableEnv = StreamTableEnvironment.create(env)
+
+// create a DataStream
+val dataStream = env.fromElements("Alice", "Bob", "John")
+
+// interpret the insert-only DataStream as a Table
+val inputTable = tableEnv.fromDataStream(dataStream)
+
+// register the Table object as a view and query it
+tableEnv.createTemporaryView("InputTable", inputTable)
+val resultTable = tableEnv.sqlQuery("SELECT UPPER(f0) FROM InputTable")
+
+// interpret the insert-only Table as a DataStream again
+val resultStream = tableEnv.toDataStream(resultTable)
+
+// add a printing sink and execute in DataStream API
+resultStream.print()
+env.execute()
+
+// prints:
+// +I[Alice]
+// +I[Bob]
+// +I[John]
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+The complete semantics of `fromDataStream` and `toDataStream` can be found in the [dedicated
section below](#handling-of-insert-only-streams).
+In particular, the section discusses how to influence the schema derivation with more complex
+and nested types. It also covers working with event-time and watermarks.
+
+Depending on the kind of query, in many cases the resulting dynamic table is a pipeline that
does not
+only produce insert-only changes when coverting the `Table` to a `DataStream` but also produces
retractions
+and other kinds of updates.
+
+The following example shows how updating tables can be converted. Every result row represents
+an entry in a changelog with a change flag that can be queried by calling `row.getKind()`
on it. In
+the example, the second score for `Alice` creates an _update before_ (`-U`) and _update after_
(`+U`)
+change.
+
+{{< tabs "f45d1374-61a0-40c0-9280-702ed87d2ed0" >}}
+{{< tab "Java" >}}
+```java
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.types.Row;
+
+// create environments of both APIs
+StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
+
+// create a DataStream
+DataStream<Row> dataStream = env.fromElements(
+    Row.of("Alice", 12),
+    Row.of("Bob", 10),
+    Row.of("Alice", 100));
+
+// interpret the insert-only DataStream as a Table
+Table inputTable = tableEnv.fromDataStream(dataStream).as("name", "score");
+
+// register the Table object as a view and query it
+// the query contains an aggregation that produces updates
+tableEnv.createTemporaryView("InputTable", inputTable);
+Table resultTable = tableEnv.sqlQuery(
+    "SELECT name, SUM(score) FROM InputTable GROUP BY name");
+
+// interpret the updating Table as a changelog DataStream
+DataStream<Row> resultStream = tableEnv.toChangelogStream(resultTable);
+
+// add a printing sink and execute in DataStream API
+resultStream.print();
+env.execute();
+
+// prints:
+// +I[Alice, 12]
+// +I[Bob, 10]
+// -U[Alice, 12]
+// +U[Alice, 112]
+```
+{{< /tab >}}
+{{< tab "Scala" >}}
+```scala
+import org.apache.flink.api.scala.typeutils.Types
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
+import org.apache.flink.types.Row
+
+// create environments of both APIs
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tableEnv = StreamTableEnvironment.create(env)
+
+// create a DataStream
+val dataStream = env.fromElements(
+  Row.of("Alice", Int.box(12)),
+  Row.of("Bob", Int.box(10)),
+  Row.of("Alice", Int.box(100))
+)(Types.ROW(Types.STRING, Types.INT))
+
+// interpret the insert-only DataStream as a Table
+val inputTable = tableEnv.fromDataStream(dataStream).as("name", "score")
+
+// register the Table object as a view and query it
+// the query contains an aggregation that produces updates
+tableEnv.createTemporaryView("InputTable", inputTable)
+val resultTable = tableEnv.sqlQuery("SELECT name, SUM(score) FROM InputTable GROUP BY name")
+
+// interpret the updating Table as a changelog DataStream
+val resultStream = tableEnv.toChangelogStream(resultTable)
+
+// add a printing sink and execute in DataStream API
+resultStream.print()
+env.execute()
+
+// prints:
+// +I[Alice, 12]
+// +I[Bob, 10]
+// -U[Alice, 12]
+// +U[Alice, 112]
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+The complete semantics of `fromChangelogStream` and `toChangelogStream` can be found in the
[dedicated section below](#handling-of-insert-only-streams).
+In particular, the section discusses how to influence the schema derivation with more complex
and nested
+types. It covers working with event-time and watermarks. It discusses how to declare a primary
key and
+changelog mode for the input and output streams.
+
+### Dependencies and Imports
+
+Projects that combine Table API with DataStream API need to add one of the following bridging
modules.
+They include transitive dependencies to `flink-table-api-java` or `flink-table-api-scala`
and the
+corresponding language-specific DataStream API module.
+
+{{< tabs "0d2da52a-ee43-4d06-afde-b165517c0617" >}}
+{{< tab "Java" >}}
+```xml
+<dependency>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-table-api-java-bridge{{< scala_version >}}</artifactId>
+  <version>{{< version >}}</version>
+  <scope>provided</scope>
+</dependency>
+```
+{{< /tab >}}
+{{< tab "Scala" >}}
+```xml
+<dependency>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-table-api-scala-bridge{{< scala_version >}}</artifactId>
+  <version>{{< version >}}</version>
+  <scope>provided</scope>
+</dependency>
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+The following imports are required to declare common pipelines using either the Java or Scala
version
+of both DataStream API and Table API.
+
+{{< tabs "19a47e2d-168b-4f73-a966-abfcc8a6baca" >}}
+{{< tab "Java" >}}
+```java
+// imports for Java DataStream API
+import org.apache.flink.streaming.api.*;
+import org.apache.flink.streaming.api.environment.*;
+
+// imports for Table API with bridging to Java DataStream API
+import org.apache.flink.table.api.*;
+import org.apache.flink.table.api.bridge.java.*;
+```
+{{< /tab >}}
+{{< tab "Scala" >}}
+```scala
+// imports for Scala DataStream API
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.scala._
+
+// imports for Table API with bridging to Scala DataStream API
+import org.apache.flink.table.api._
+import org.apache.flink.table.api.bridge.scala._
+```
+{{< /tab >}}
+{{< /tabs >}}

Review comment:
       We definitely need this. Many people don't know how to use an IDE properly or mixup
the classes of same name.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



Mime
View raw message