Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id D5A71200C40 for ; Wed, 8 Mar 2017 23:06:48 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id D2D92160B73; Wed, 8 Mar 2017 22:06:48 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id F38AC160B83 for ; Wed, 8 Mar 2017 23:06:47 +0100 (CET) Received: (qmail 42424 invoked by uid 500); 8 Mar 2017 22:06:47 -0000 Mailing-List: contact issues-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list issues@flink.apache.org Received: (qmail 42414 invoked by uid 99); 8 Mar 2017 22:06:47 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 08 Mar 2017 22:06:47 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id 8FF7CC3B24 for ; Wed, 8 Mar 2017 22:06:46 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.451 X-Spam-Level: * X-Spam-Status: No, score=1.451 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, RP_MATCHES_RCVD=-0.001, SPF_NEUTRAL=0.652] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id 8p-gWoCHfxsg for ; Wed, 8 Mar 2017 22:06:45 +0000 (UTC) Received: from mailrelay1-us-west.apache.org (mailrelay1-us-west.apache.org [209.188.14.139]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with ESMTP id 2875B60D38 for ; Wed, 8 Mar 2017 22:06:44 +0000 (UTC) Received: from jira-lw-us.apache.org (unknown [207.244.88.139]) by mailrelay1-us-west.apache.org (ASF Mail Server at mailrelay1-us-west.apache.org) with ESMTP id C9B7CE04A6 for ; Wed, 8 Mar 2017 22:06:40 +0000 (UTC) Received: from jira-lw-us.apache.org (localhost [127.0.0.1]) by jira-lw-us.apache.org (ASF Mail Server at jira-lw-us.apache.org) with ESMTP id 1A7AF243D8 for ; Wed, 8 Mar 2017 22:06:40 +0000 (UTC) Date: Wed, 8 Mar 2017 22:06:40 +0000 (UTC) From: "ASF GitHub Bot (JIRA)" To: issues@flink.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Commented] (FLINK-3850) Add forward field annotations to DataSet operators generated by the Table API MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 7bit X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 archived-at: Wed, 08 Mar 2017 22:06:49 -0000 [ https://issues.apache.org/jira/browse/FLINK-3850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15902058#comment-15902058 ] ASF GitHub Bot commented on FLINK-3850: --------------------------------------- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3040#discussion_r104723484 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCorrelate.scala --- @@ -97,18 +103,41 @@ class DataSetCorrelate( val sqlFunction = rexCall.getOperator.asInstanceOf[TableSqlFunction] val pojoFieldMapping = sqlFunction.getPojoFieldMapping val udtfTypeInfo = sqlFunction.getRowTypeInfo.asInstanceOf[TypeInformation[Any]] + val returnType = FlinkTypeFactory.toInternalRowTypeInfo(getRowType) val mapFunc = correlateMapFunction( config, inputDS.getType, udtfTypeInfo, + returnType, getRowType, joinType, rexCall, condition, Some(pojoFieldMapping), ruleDescription) - inputDS.flatMap(mapFunc).name(correlateOpName(rexCall, sqlFunction, relRowType)) + def getIndices = { --- End diff -- A correlate forwards all fields from the input and the table function like this `[in1, in2, in3, tf1, tf2]` for an input `[in1, in2, in3]` and table function `[tf1, tf2]`. So we can do a simple position based mapping of the fields of the input type against the output type (field names might change). Basically similar to what you are doing with the single row join. We do not need to look at the table function or the condition. > Add forward field annotations to DataSet operators generated by the Table API > ----------------------------------------------------------------------------- > > Key: FLINK-3850 > URL: https://issues.apache.org/jira/browse/FLINK-3850 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL > Reporter: Fabian Hueske > Assignee: Nikolay Vasilishin > > The DataSet API features semantic annotations [1] to hint the optimizer which input fields an operator copies. This information is valuable for the optimizer because it can infer that certain physical properties such as partitioning or sorting are not destroyed by user functions and thus generate more efficient execution plans. > The Table API is built on top of the DataSet API and generates DataSet programs and code for user-defined functions. Hence, it knows exactly which fields are modified and which not. We should use this information to automatically generate forward field annotations and attach them to the operators. This can help to significantly improve the performance of certain jobs. > [1] https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/batch/index.html#semantic-annotations -- This message was sent by Atlassian JIRA (v6.3.15#6346)