From issues-return-146195-archive-asf-public=cust-asf.ponee.io@flink.apache.org Fri Jan 5 00:26:20 2018 Return-Path: X-Original-To: archive-asf-public@eu.ponee.io Delivered-To: archive-asf-public@eu.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by mx-eu-01.ponee.io (Postfix) with ESMTP id C7EA9180657 for ; Fri, 5 Jan 2018 00:26:20 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id B7F4F160C3C; Thu, 4 Jan 2018 23:26:20 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 0B24D160C2B for ; Fri, 5 Jan 2018 00:26:19 +0100 (CET) Received: (qmail 97566 invoked by uid 500); 4 Jan 2018 23:26:19 -0000 Mailing-List: contact issues-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list issues@flink.apache.org Received: (qmail 97557 invoked by uid 99); 4 Jan 2018 23:26:19 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 04 Jan 2018 23:26:19 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id A5A5A1A01B2 for ; Thu, 4 Jan 2018 23:26:18 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -4.03 X-Spam-Level: X-Spam-Status: No, score=-4.03 tagged_above=-999 required=6.31 tests=[KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, T_RP_MATCHES_RCVD=-0.01] autolearn=disabled Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id MSLAna1aMCXa for ; Thu, 4 Jan 2018 23:26:17 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with SMTP id 657185FB91 for ; Thu, 4 Jan 2018 23:26:17 +0000 (UTC) Received: (qmail 97208 invoked by uid 99); 4 Jan 2018 23:26:16 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 04 Jan 2018 23:26:16 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 236D7E0885; Thu, 4 Jan 2018 23:26:15 +0000 (UTC) From: fhueske To: issues@flink.incubator.apache.org Reply-To: issues@flink.incubator.apache.org References: In-Reply-To: Subject: [GitHub] flink pull request #5132: [FLINK-8203] [FLINK-7681] [table] Make schema defi... Content-Type: text/plain Message-Id: <20180104232615.236D7E0885@git1-us-west.apache.org> Date: Thu, 4 Jan 2018 23:26:15 +0000 (UTC) Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5132#discussion_r159764095 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala --- @@ -791,92 +824,109 @@ abstract class TableEnvironment(val config: TableConfig) { * Returns field names and field positions for a given [[TypeInformation]] and [[Array]] of * [[Expression]]. It does not handle time attributes but considers them in indices. * + * @param isReferenceByPosition schema mode see [[isReferenceByPosition()]] * @param inputType The [[TypeInformation]] against which the [[Expression]]s are evaluated. * @param exprs The expressions that define the field names. * @tparam A The type of the TypeInformation. * @return A tuple of two arrays holding the field names and corresponding field positions. */ - protected[flink] def getFieldInfo[A]( + protected def getFieldInfo[A]( + isReferenceByPosition: Boolean, inputType: TypeInformation[A], exprs: Array[Expression]) : (Array[String], Array[Int]) = { TableEnvironment.validateType(inputType) + def referenceByName(name: String, ct: CompositeType[_]): Option[(Int, String)] = { + val inputIdx = ct.getFieldIndex(name) + if (inputIdx < 0) { + throw new TableException(s"$name is not a field of type $ct. " + + s"Expected: ${ct.getFieldNames.mkString(", ")}") + } else { + Some((inputIdx, name)) + } + } + val indexedNames: Array[(Int, String)] = inputType match { + case g: GenericTypeInfo[A] if g.getTypeClass == classOf[Row] => throw new TableException( "An input of GenericTypeInfo cannot be converted to Table. " + "Please specify the type of the input with a RowTypeInfo.") - case a: AtomicType[_] => - exprs.zipWithIndex flatMap { - case (_: TimeAttribute, _) => - None - case (UnresolvedFieldReference(name), idx) if idx > 0 => - // only accept the first field for an atomic type - throw new TableException("Only the first field can reference an atomic type.") - case (UnresolvedFieldReference(name), idx) => - // first field reference is mapped to atomic type - Some((0, name)) - case _ => throw new TableException("Field reference expression requested.") - } + case t: TupleTypeInfo[A] => exprs.zipWithIndex flatMap { - case (UnresolvedFieldReference(name), idx) => - Some((idx, name)) - case (Alias(UnresolvedFieldReference(origName), name, _), _) => + case (UnresolvedFieldReference(name: String), idx) => + if (isReferenceByPosition) { + Some((idx, name)) + } else { + referenceByName(name, t) + } + case (Alias(UnresolvedFieldReference(origName), name: String, _), _) => val idx = t.getFieldIndex(origName) --- End diff -- we can use `referenceByName()` here. ---