Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 7C9B6200BE8 for ; Fri, 23 Dec 2016 13:42:00 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 7B6D0160B1D; Fri, 23 Dec 2016 12:42:00 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id B29BF160B3B for ; Fri, 23 Dec 2016 13:41:59 +0100 (CET) Received: (qmail 63089 invoked by uid 500); 23 Dec 2016 12:41:58 -0000 Mailing-List: contact issues-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list issues@flink.apache.org Received: (qmail 62936 invoked by uid 99); 23 Dec 2016 12:41:58 -0000 Received: from arcas.apache.org (HELO arcas) (140.211.11.28) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 23 Dec 2016 12:41:58 +0000 Received: from arcas.apache.org (localhost [127.0.0.1]) by arcas (Postfix) with ESMTP id 961FB2C2A69 for ; Fri, 23 Dec 2016 12:41:58 +0000 (UTC) Date: Fri, 23 Dec 2016 12:41:58 +0000 (UTC) From: "ASF GitHub Bot (JIRA)" To: issues@flink.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Commented] (FLINK-5280) Extend TableSource to support nested data MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 7bit X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 archived-at: Fri, 23 Dec 2016 12:42:00 -0000 [ https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15772815#comment-15772815 ] ASF GitHub Bot commented on FLINK-5280: --------------------------------------- Github user mushketyk commented on a diff in the pull request: https://github.com/apache/flink/pull/3039#discussion_r93761548 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala --- @@ -535,4 +509,74 @@ object TableEnvironment { new ScalaStreamTableEnv(executionEnvironment, tableConfig) } + + /** + * Returns field names and field positions for a given [[TypeInformation]]. + * + * Field names are automatically extracted for + * [[org.apache.flink.api.common.typeutils.CompositeType]]. + * The method fails if inputType is not a + * [[org.apache.flink.api.common.typeutils.CompositeType]]. + * + * @param inputType The TypeInformation extract the field names and positions from. + * @tparam A The type of the TypeInformation. + * @return A tuple of two arrays holding the field names and corresponding field positions. + */ + def getFieldInfo[A](inputType: TypeInformation[A]): (Array[String], Array[Int]) = { + validateType(inputType) + + val fieldNames: Array[String] = inputType match { + case t: TupleTypeInfo[A] => t.getFieldNames + case c: CaseClassTypeInfo[A] => c.getFieldNames + case p: PojoTypeInfo[A] => p.getFieldNames + case r: RowTypeInfo => r.getFieldNames + case tpe => + throw new TableException(s"Type $tpe lacks explicit field naming") + } + val fieldIndexes = fieldNames.indices.toArray + + if (fieldNames.contains("*")) { + throw new TableException("Field name can not be '*'.") + } + + (fieldNames, fieldIndexes) + } + + def validateType(typeInfo: TypeInformation[_]): Unit = { + val clazz = typeInfo.getTypeClass + if ((clazz.isMemberClass && !Modifier.isStatic(clazz.getModifiers)) || + !Modifier.isPublic(clazz.getModifiers) || + clazz.getCanonicalName == null) { + throw TableException(s"Class '$clazz' described in type information '$typeInfo' must be " + + s"static and globally accessible.") + } + } + + /** + * Returns field types for a given [[TypeInformation]]. + * + * Field types are automatically extracted for + * [[org.apache.flink.api.common.typeutils.CompositeType]]. + * The method fails if inputType is not a + * [[org.apache.flink.api.common.typeutils.CompositeType]]. --- End diff -- Ok, this makes sense. > Extend TableSource to support nested data > ----------------------------------------- > > Key: FLINK-5280 > URL: https://issues.apache.org/jira/browse/FLINK-5280 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL > Affects Versions: 1.2.0 > Reporter: Fabian Hueske > Assignee: Ivan Mushketyk > > The {{TableSource}} interface does currently only support the definition of flat rows. > However, there are several storage formats for nested data that should be supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can also natively handle nested rows. > The {{TableSource}} interface and the code to register table sources in Calcite's schema need to be extended to support nested data. -- This message was sent by Atlassian JIRA (v6.3.4#6332)