Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 7108F200C30 for ; Tue, 21 Feb 2017 03:45:51 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 6FA08160B76; Tue, 21 Feb 2017 02:45:51 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 4650D160B73 for ; Tue, 21 Feb 2017 03:45:50 +0100 (CET) Received: (qmail 37507 invoked by uid 500); 21 Feb 2017 02:45:49 -0000 Mailing-List: contact issues-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list issues@flink.apache.org Received: (qmail 37498 invoked by uid 99); 21 Feb 2017 02:45:49 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 21 Feb 2017 02:45:49 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id 12770C0D33 for ; Tue, 21 Feb 2017 02:45:49 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -1.999 X-Spam-Level: X-Spam-Status: No, score=-1.999 tagged_above=-999 required=6.31 tests=[KAM_LAZY_DOMAIN_SECURITY=1, RP_MATCHES_RCVD=-2.999] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id rJjev1Il9lwp for ; Tue, 21 Feb 2017 02:45:46 +0000 (UTC) Received: from mailrelay1-us-west.apache.org (mailrelay1-us-west.apache.org [209.188.14.139]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with ESMTP id 53BE45F5F7 for ; Tue, 21 Feb 2017 02:45:45 +0000 (UTC) Received: from jira-lw-us.apache.org (unknown [207.244.88.139]) by mailrelay1-us-west.apache.org (ASF Mail Server at mailrelay1-us-west.apache.org) with ESMTP id 6D1B7E040F for ; Tue, 21 Feb 2017 02:45:44 +0000 (UTC) Received: from jira-lw-us.apache.org (localhost [127.0.0.1]) by jira-lw-us.apache.org (ASF Mail Server at jira-lw-us.apache.org) with ESMTP id 1A7C22411F for ; Tue, 21 Feb 2017 02:45:44 +0000 (UTC) Date: Tue, 21 Feb 2017 02:45:44 +0000 (UTC) From: "godfrey he (JIRA)" To: issues@flink.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Updated] (FLINK-5858) Support multiple sinks in same execution DAG MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 7bit X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 archived-at: Tue, 21 Feb 2017 02:45:51 -0000 [ https://issues.apache.org/jira/browse/FLINK-5858?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] godfrey he updated FLINK-5858: ------------------------------ Description: When call writeToSink method to write the Table(with TableSource) to a TableSink, the Table was translated to DataSet or DataStream, if we call writeToSink(write to different sinks) more than once, the Table was also translated more than once. The final execution graph was parted to different DAGs. For example: {code:title=Example.scala|borderStyle=solid} val env = ExecutionEnvironment.getExecutionEnvironment val tEnv = TableEnvironment.getTableEnvironment(env) val csvTableSource = new CsvTableSource( "/tmp/words", Array("first", "id", "score", "last"), Array( BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.DOUBLE_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO ), fieldDelim = "#" ) tEnv.registerTableSource("csv_source", csvTableSource) val resultTable = tEnv.scan("csv_source") .groupBy('first) .select('first, 'score.sum) resultTable.writeToSink(new CsvTableSink("/tmp/wordcount1")) resultTable.writeToSink(new CsvTableSink("/tmp/wordcount2")) println(tEnv.explain(resultTable)) {code} result: == Abstract Syntax Tree == LogicalProject(first=[$0], TMP_1=[$1]) LogicalAggregate(group=[{0}], TMP_0=[SUM($1)]) LogicalProject(first=[$0], score=[$2]) LogicalTableScan(table=[[csv_source]]) == Optimized Logical Plan == DataSetAggregate(groupBy=[first], select=[first, SUM(score) AS TMP_0]) BatchTableSourceScan(table=[[csv_source]], fields=[first, score]) == Physical Execution Plan == {color:red} Stage 6 : Data Source {color} content : collect elements with CollectionInputFormat Partitioning : RANDOM_PARTITIONED Stage 5 : Map content : prepare select: (first, SUM(score) AS TMP_0) ship_strategy : Forward exchange_mode : PIPELINED driver_strategy : Map Partitioning : RANDOM_PARTITIONED Stage 4 : GroupCombine content : groupBy: (first), select: (first, SUM(score) AS TMP_0) ship_strategy : Forward exchange_mode : PIPELINED driver_strategy : Sorted Combine Partitioning : RANDOM_PARTITIONED Stage 3 : GroupReduce content : groupBy: (first), select: (first, SUM(score) AS TMP_0) ship_strategy : Hash Partition on [0] exchange_mode : PIPELINED driver_strategy : Sorted Group Reduce Partitioning : RANDOM_PARTITIONED Stage 2 : Map content : to: Row(f0: String, f1: Double) ship_strategy : Forward exchange_mode : PIPELINED driver_strategy : Map Partitioning : RANDOM_PARTITIONED Stage 1 : Map content : Map at emitDataSet(CsvTableSink.scala:67) ship_strategy : Forward exchange_mode : PIPELINED driver_strategy : Map Partitioning : RANDOM_PARTITIONED Stage 0 : Data Sink content : TextOutputFormat (/tmp/wordcount1) - UTF-8 ship_strategy : Forward exchange_mode : PIPELINED Partitioning : RANDOM_PARTITIONED {color:red} Stage 13 : Data Source {color} content : collect elements with CollectionInputFormat Partitioning : RANDOM_PARTITIONED Stage 12 : Map content : prepare select: (first, SUM(score) AS TMP_0) ship_strategy : Forward exchange_mode : PIPELINED driver_strategy : Map Partitioning : RANDOM_PARTITIONED Stage 11 : GroupCombine content : groupBy: (first), select: (first, SUM(score) AS TMP_0) ship_strategy : Forward exchange_mode : PIPELINED driver_strategy : Sorted Combine Partitioning : RANDOM_PARTITIONED Stage 10 : GroupReduce content : groupBy: (first), select: (first, SUM(score) AS TMP_0) ship_strategy : Hash Partition on [0] exchange_mode : PIPELINED driver_strategy : Sorted Group Reduce Partitioning : RANDOM_PARTITIONED Stage 9 : Map content : to: Row(f0: String, f1: Double) ship_strategy : Forward exchange_mode : PIPELINED driver_strategy : Map Partitioning : RANDOM_PARTITIONED Stage 8 : Map content : Map at emitDataSet(CsvTableSink.scala:67) ship_strategy : Forward exchange_mode : PIPELINED driver_strategy : Map Partitioning : RANDOM_PARTITIONED Stage 7 : Data Sink content : TextOutputFormat (/tmp/wordcount2) - UTF-8 ship_strategy : Forward exchange_mode : PIPELINED Partitioning : RANDOM_PARTITIONED {color:red} Stage 18 : Data Source {color} content : collect elements with CollectionInputFormat Partitioning : RANDOM_PARTITIONED Stage 17 : Map content : prepare select: (first, SUM(score) AS TMP_0) ship_strategy : Forward exchange_mode : PIPELINED driver_strategy : Map Partitioning : RANDOM_PARTITIONED Stage 16 : GroupCombine content : groupBy: (first), select: (first, SUM(score) AS TMP_0) ship_strategy : Forward exchange_mode : PIPELINED driver_strategy : Sorted Combine Partitioning : RANDOM_PARTITIONED Stage 15 : GroupReduce content : groupBy: (first), select: (first, SUM(score) AS TMP_0) ship_strategy : Hash Partition on [0] exchange_mode : PIPELINED driver_strategy : Sorted Group Reduce Partitioning : RANDOM_PARTITIONED Stage 14 : Data Sink content : org.apache.flink.api.java.io.DiscardingOutputFormat ship_strategy : Forward exchange_mode : PIPELINED Partitioning : RANDOM_PARTITIONED was: When call writeToSink method to write the Table(with TableSource) to a TableSink, the Table was translated to DataSet or DataStream, if we call writeToSink(write to different sinks) more than once, the Table was also translated more than once. The final execution graph was parted to different DAGs. For example: {code:title=Example.scala|borderStyle=solid} val env = ExecutionEnvironment.getExecutionEnvironment val tEnv = TableEnvironment.getTableEnvironment(env) val csvTableSource = new CsvTableSource( "/tmp/words", Array("first", "id", "score", "last"), Array( BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.DOUBLE_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO ), fieldDelim = "#" ) tEnv.registerTableSource("csv_source", csvTableSource) val resultTable = tEnv.scan("csv_source") .groupBy('first) .select('first, 'score.sum) resultTable.writeToSink(new CsvTableSink("/tmp/wordcount1")) resultTable.writeToSink(new CsvTableSink("/tmp/wordcount2")) println(tEnv.explain(resultTable)) {code} result: == Abstract Syntax Tree == LogicalProject(first=[$0], TMP_1=[$1]) LogicalAggregate(group=[{0}], TMP_0=[SUM($1)]) LogicalProject(first=[$0], score=[$2]) LogicalTableScan(table=[[csv_source]]) == Optimized Logical Plan == DataSetAggregate(groupBy=[first], select=[first, SUM(score) AS TMP_0]) BatchTableSourceScan(table=[[csv_source]], fields=[first, score]) == Physical Execution Plan == {color:red} Stage 6 : Data Source {color} content : collect elements with CollectionInputFormat Partitioning : RANDOM_PARTITIONED Stage 5 : Map content : prepare select: (first, SUM(score) AS TMP_0) ship_strategy : Forward exchange_mode : PIPELINED driver_strategy : Map Partitioning : RANDOM_PARTITIONED Stage 4 : GroupCombine content : groupBy: (first), select: (first, SUM(score) AS TMP_0) ship_strategy : Forward exchange_mode : PIPELINED driver_strategy : Sorted Combine Partitioning : RANDOM_PARTITIONED Stage 3 : GroupReduce content : groupBy: (first), select: (first, SUM(score) AS TMP_0) ship_strategy : Hash Partition on [0] exchange_mode : PIPELINED driver_strategy : Sorted Group Reduce Partitioning : RANDOM_PARTITIONED Stage 2 : Map content : to: Row(f0: String, f1: Double) ship_strategy : Forward exchange_mode : PIPELINED driver_strategy : Map Partitioning : RANDOM_PARTITIONED Stage 1 : Map content : Map at emitDataSet(CsvTableSink.scala:67) ship_strategy : Forward exchange_mode : PIPELINED driver_strategy : Map Partitioning : RANDOM_PARTITIONED Stage 0 : Data Sink content : TextOutputFormat (/tmp/wordcount1) - UTF-8 ship_strategy : Forward exchange_mode : PIPELINED Partitioning : RANDOM_PARTITIONED {color:red} Stage 13 : Data Source {color} content : collect elements with CollectionInputFormat Partitioning : RANDOM_PARTITIONED Stage 12 : Map content : prepare select: (first, SUM(score) AS TMP_0) ship_strategy : Forward exchange_mode : PIPELINED driver_strategy : Map Partitioning : RANDOM_PARTITIONED Stage 11 : GroupCombine content : groupBy: (first), select: (first, SUM(score) AS TMP_0) ship_strategy : Forward exchange_mode : PIPELINED driver_strategy : Sorted Combine Partitioning : RANDOM_PARTITIONED Stage 10 : GroupReduce content : groupBy: (first), select: (first, SUM(score) AS TMP_0) ship_strategy : Hash Partition on [0] exchange_mode : PIPELINED driver_strategy : Sorted Group Reduce Partitioning : RANDOM_PARTITIONED Stage 9 : Map content : to: Row(f0: String, f1: Double) ship_strategy : Forward exchange_mode : PIPELINED driver_strategy : Map Partitioning : RANDOM_PARTITIONED Stage 8 : Map content : Map at emitDataSet(CsvTableSink.scala:67) ship_strategy : Forward exchange_mode : PIPELINED driver_strategy : Map Partitioning : RANDOM_PARTITIONED Stage 7 : Data Sink content : TextOutputFormat (/tmp/wordcount2) - UTF-8 ship_strategy : Forward exchange_mode : PIPELINED Partitioning : RANDOM_PARTITIONED {color:red} Stage 18 : Data Source {color} content : collect elements with CollectionInputFormat Partitioning : RANDOM_PARTITIONED Stage 17 : Map content : prepare select: (first, SUM(score) AS TMP_0) ship_strategy : Forward exchange_mode : PIPELINED driver_strategy : Map Partitioning : RANDOM_PARTITIONED Stage 16 : GroupCombine content : groupBy: (first), select: (first, SUM(score) AS TMP_0) ship_strategy : Forward exchange_mode : PIPELINED driver_strategy : Sorted Combine Partitioning : RANDOM_PARTITIONED Stage 15 : GroupReduce content : groupBy: (first), select: (first, SUM(score) AS TMP_0) ship_strategy : Hash Partition on [0] exchange_mode : PIPELINED driver_strategy : Sorted Group Reduce Partitioning : RANDOM_PARTITIONED Stage 14 : Data Sink content : org.apache.flink.api.java.io.DiscardingOutputFormat ship_strategy : Forward exchange_mode : PIPELINED Partitioning : RANDOM_PARTITIONED > Support multiple sinks in same execution DAG > -------------------------------------------- > > Key: FLINK-5858 > URL: https://issues.apache.org/jira/browse/FLINK-5858 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL > Reporter: godfrey he > > When call writeToSink method to write the Table(with TableSource) to a TableSink, the Table was translated to DataSet or DataStream, if we call writeToSink(write to different sinks) more than once, the Table was also translated more than once. The final execution graph was parted to different DAGs. For example: > {code:title=Example.scala|borderStyle=solid} > val env = ExecutionEnvironment.getExecutionEnvironment > val tEnv = TableEnvironment.getTableEnvironment(env) > val csvTableSource = new CsvTableSource( > "/tmp/words", > Array("first", "id", "score", "last"), > Array( > BasicTypeInfo.STRING_TYPE_INFO, > BasicTypeInfo.INT_TYPE_INFO, > BasicTypeInfo.DOUBLE_TYPE_INFO, > BasicTypeInfo.STRING_TYPE_INFO > ), > fieldDelim = "#" > ) > tEnv.registerTableSource("csv_source", csvTableSource) > val resultTable = tEnv.scan("csv_source") > .groupBy('first) > .select('first, 'score.sum) > resultTable.writeToSink(new CsvTableSink("/tmp/wordcount1")) > resultTable.writeToSink(new CsvTableSink("/tmp/wordcount2")) > println(tEnv.explain(resultTable)) > {code} > result: > == Abstract Syntax Tree == > LogicalProject(first=[$0], TMP_1=[$1]) > LogicalAggregate(group=[{0}], TMP_0=[SUM($1)]) > LogicalProject(first=[$0], score=[$2]) > LogicalTableScan(table=[[csv_source]]) > == Optimized Logical Plan == > DataSetAggregate(groupBy=[first], select=[first, SUM(score) AS TMP_0]) > BatchTableSourceScan(table=[[csv_source]], fields=[first, score]) > == Physical Execution Plan == > {color:red} > Stage 6 : Data Source > {color} > content : collect elements with CollectionInputFormat > Partitioning : RANDOM_PARTITIONED > Stage 5 : Map > content : prepare select: (first, SUM(score) AS TMP_0) > ship_strategy : Forward > exchange_mode : PIPELINED > driver_strategy : Map > Partitioning : RANDOM_PARTITIONED > Stage 4 : GroupCombine > content : groupBy: (first), select: (first, SUM(score) AS TMP_0) > ship_strategy : Forward > exchange_mode : PIPELINED > driver_strategy : Sorted Combine > Partitioning : RANDOM_PARTITIONED > Stage 3 : GroupReduce > content : groupBy: (first), select: (first, SUM(score) AS TMP_0) > ship_strategy : Hash Partition on [0] > exchange_mode : PIPELINED > driver_strategy : Sorted Group Reduce > Partitioning : RANDOM_PARTITIONED > Stage 2 : Map > content : to: Row(f0: String, f1: Double) > ship_strategy : Forward > exchange_mode : PIPELINED > driver_strategy : Map > Partitioning : RANDOM_PARTITIONED > Stage 1 : Map > content : Map at emitDataSet(CsvTableSink.scala:67) > ship_strategy : Forward > exchange_mode : PIPELINED > driver_strategy : Map > Partitioning : RANDOM_PARTITIONED > Stage 0 : Data Sink > content : TextOutputFormat (/tmp/wordcount1) - UTF-8 > ship_strategy : Forward > exchange_mode : PIPELINED > Partitioning : RANDOM_PARTITIONED > {color:red} > Stage 13 : Data Source > {color} > content : collect elements with CollectionInputFormat > Partitioning : RANDOM_PARTITIONED > Stage 12 : Map > content : prepare select: (first, SUM(score) AS TMP_0) > ship_strategy : Forward > exchange_mode : PIPELINED > driver_strategy : Map > Partitioning : RANDOM_PARTITIONED > Stage 11 : GroupCombine > content : groupBy: (first), select: (first, SUM(score) AS TMP_0) > ship_strategy : Forward > exchange_mode : PIPELINED > driver_strategy : Sorted Combine > Partitioning : RANDOM_PARTITIONED > Stage 10 : GroupReduce > content : groupBy: (first), select: (first, SUM(score) AS TMP_0) > ship_strategy : Hash Partition on [0] > exchange_mode : PIPELINED > driver_strategy : Sorted Group Reduce > Partitioning : RANDOM_PARTITIONED > Stage 9 : Map > content : to: Row(f0: String, f1: Double) > ship_strategy : Forward > exchange_mode : PIPELINED > driver_strategy : Map > Partitioning : RANDOM_PARTITIONED > Stage 8 : Map > content : Map at emitDataSet(CsvTableSink.scala:67) > ship_strategy : Forward > exchange_mode : PIPELINED > driver_strategy : Map > Partitioning : RANDOM_PARTITIONED > Stage 7 : Data Sink > content : TextOutputFormat (/tmp/wordcount2) - UTF-8 > ship_strategy : Forward > exchange_mode : PIPELINED > Partitioning : RANDOM_PARTITIONED > {color:red} > Stage 18 : Data Source > {color} > content : collect elements with CollectionInputFormat > Partitioning : RANDOM_PARTITIONED > Stage 17 : Map > content : prepare select: (first, SUM(score) AS TMP_0) > ship_strategy : Forward > exchange_mode : PIPELINED > driver_strategy : Map > Partitioning : RANDOM_PARTITIONED > Stage 16 : GroupCombine > content : groupBy: (first), select: (first, SUM(score) AS TMP_0) > ship_strategy : Forward > exchange_mode : PIPELINED > driver_strategy : Sorted Combine > Partitioning : RANDOM_PARTITIONED > Stage 15 : GroupReduce > content : groupBy: (first), select: (first, SUM(score) AS TMP_0) > ship_strategy : Hash Partition on [0] > exchange_mode : PIPELINED > driver_strategy : Sorted Group Reduce > Partitioning : RANDOM_PARTITIONED > Stage 14 : Data Sink > content : org.apache.flink.api.java.io.DiscardingOutputFormat > ship_strategy : Forward > exchange_mode : PIPELINED > Partitioning : RANDOM_PARTITIONED -- This message was sent by Atlassian JIRA (v6.3.15#6346)