flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Ivan Mushketyk (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-5280) Extend TableSource to support nested data
Date Wed, 14 Dec 2016 09:31:59 GMT

    [ https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15747806#comment-15747806

Ivan Mushketyk commented on FLINK-5280:

Hi Fabian, Jark,

Thank you for all your comments and for your patience.

Let me try to propose a solution and see if this will work.

I performed a simple test using TableSource, and it seems that we can access nested fields.
Here is my test *TableSource* that returns POJOs:


And here is a test code that uses it:

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());

tableEnv.registerTableSource("MyTable", new TestBatchTableSource());

Table result = tableEnv
	.sql("SELECT MyTable.amount * MyTable.id, MyTable.name, MyTable.childPojo.child.str FROM
MyTable WHERE amount < 4");

DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
List<Row> results = resultSet.collect();

And the result of the test seems feesible:

[0,pojo16,mostChildPojo16, 0,pojo32,mostChildPojo32, 1,pojo1,mostChildPojo1, 17,pojo17,mostChildPojo17,
33,pojo33,mostChildPojo33, 36,pojo18,mostChildPojo18, 4,pojo2,mostChildPojo2, 57,pojo19,mostChildPojo19,

Since we can access nested fields, it looks like we only need to convert the first level of
fields into a *Row*. The result *Row* will contain potentially nested POJOs, but this does
not seem to be an issue. I don't see why do we need to go beyond one level of unpacking when
we create a *Row*, so will make an assumption this is all we need.

To do this, we need to specify how each field of a result *Row* should be extracted from *TableSource*'s
type T. We can add a new method called: *getFieldMapping* that will return an array of strings.
A String in position *i* will be a field name that should be accessed to get i-th *Row* field
value. So for example in this comment it can be implemented simply like this:

public String[] getFieldMapping() {
	return new String[]{"amount", "childPojo", "id", "name"};

Which means that to get value for a 0-th field in the result *Row* we need to access field
*amount*, to get 1-st field we need to access field "childPojo" and so on.

In cases, if we need to convert an indexable type like a tuple or an array we do not need
this mapping. In this case, we can return *null* or an empty array. *Optional* would be a
better option, but I think that Flink should work for both Java 7 and Java 8.

The only problem with this approach that the *FlinkTable* class accepts an array of field
indexes that is used to convert values from original type into a *Row*:

abstract class FlinkTable[T](
    val typeInfo: TypeInformation[T],
    val fieldIndexes: Array[Int],
    val fieldNames: Array[String])
  extends AbstractTable {

So to work around this I propose to change this to:

abstract class FlinkTable[T](
    val typeInfo: TypeInformation[T],
    val fieldIndexes: Array[Int],
    val fieldMappings: Optional[Array[String]], // <--- New argument
    val fieldNames: Array[String])
  extends AbstractTable {

We can then use this fieldMappings in *CodeGenerator* to generate a proper mapper.

This will technically make it possible to convert *GenericRecord* into a *Row*. But since
*GenericRecord* implements Avro's interfaces do we need to add a dependency on Avro in flink-table
to access these fields? Or should we use reflection to access these methods? Or should we
ignore *GenericRecord* case altogether and simply return *Row* from *KafkaTableSource*?

I also wonder why do we need this method in the TableSource interface: 

  /** Returns the number of fields of the table. */
  def getNumberOfFields: Int

and I wonder if we can drop it.

What do you think about it? Am I missing something?

> Extend TableSource to support nested data
> -----------------------------------------
>                 Key: FLINK-5280
>                 URL: https://issues.apache.org/jira/browse/FLINK-5280
>             Project: Flink
>          Issue Type: Improvement
>          Components: Table API & SQL
>    Affects Versions: 1.2.0
>            Reporter: Fabian Hueske
>            Assignee: Ivan Mushketyk
> The {{TableSource}} interface does currently only support the definition of flat rows.

> However, there are several storage formats for nested data that should be supported such
as Avro, Json, Parquet, and Orc. The Table API and SQL can also natively handle nested rows.
> The {{TableSource}} interface and the code to register table sources in Calcite's schema
need to be extended to support nested data.

This message was sent by Atlassian JIRA

View raw message