flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "jingzhang (JIRA)" <j...@apache.org>
Subject [jira] [Comment Edited] (FLINK-5568) Introduce interface for catalog, and provide an in-memory implementation, and integrate with calcite schema
Date Thu, 23 Feb 2017 10:28:44 GMT

    [ https://issues.apache.org/jira/browse/FLINK-5568?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15880229#comment-15880229
] 

jingzhang edited comment on FLINK-5568 at 2/23/17 10:28 AM:
------------------------------------------------------------

[~fhueske], thanks for your advices. 
Here is my thoughts on your questions,  looking forward to your opinions.
1. {{ExternalCatalogTable}} is table definition or description of the external catalog. 
    {{ExternalCatalogTable}} does not extend to {{FlinkTable}}. ({{FlinkTable}} is the table
of Calcite Catalog because it extends to Calcite Table). But {{ExternalCatalogTable}} is the
table of External Catalog.
    When {{CalciteCatalogReader}} look up a table from Calcite catalog, Calcite schema would
first  delegate its underlying externalCatalog to look up the {{ExternalCatalogTable}} instance
, then calcite schema returns a TableSourceTable which holds the TableSource that are generated
by the converter from the {{ExternalCatalogTable}}.
2. Yes, it's better to move {{partitionColumnNames}} into {{properties}}.
3.  It's my bad to said unclearly. We don't want to implement a new Schema class. In fact,
we prefer to use Flink's representation, The DataSchema mode is as following:
    {code}
       case class DataSchema(
           columnTypes: Array[TypeInformation[_]],
           columnNames: Array[String])
    {code}
4. It is important to know where to scan these {{TableSource}} that is annotated with {{@ExternalCatalogCompatible}}.
 We plan to depends on configure file.
     * let each connector specifies the scan packages in appointed configure file. 
     * try to look up all the resources with the given name of classloader , and parse the
scan-packages fields. 

Looking forward to your advices, thanks.


was (Author: jinyu.zj):
[~fhueske], thanks for your advices. 
Here is my thoughts on your questions,  looking forward to your opinions.
1. {{ExternalCatalogTable}} is table definition or description of the external catalog. 
    {{ExternalCatalogTable}} does not extend to {{FlinkTable}}. ({{FlinkTable}} is the table
of Calcite Catalog because it extends to Calcite Table). But {{ExternalCatalogTable}} is the
table of External Catalog.
    When {{CalciteCatalogReader}} look up a table from Calcite catalog, Calcite schema would
first look up the {{ExternalCatalogTable}} instance from the underlying externalCatalog, then
return a TableSourceTable which holds the TableSource that are generated by the converter
from the {{ExternalCatalogTable}}.
2. Yes, it's better to move {{partitionColumnNames}} into {{properties}}.
3.  It's my bad to said unclearly. We don't want to implement a new Schema class. In fact,
we prefer to use Flink's representation, The DataSchema mode is as following:
    {code}
       case class DataSchema(
           columnTypes: Array[TypeInformation[_]],
           columnNames: Array[String])
    {code}
4. It is important to know where to scan these {{TableSource}} that is annotated with {{@ExternalCatalogCompatible}}.
 We plan to depends on configure file.
     * let each connector specifies the scan packages in appointed configure file. 
     * try to look up all the resources with the given name of classloader , and parse the
scan-packages fields. 

Looking forward to your advices, thanks.

> Introduce interface for catalog, and provide an in-memory implementation, and integrate
with calcite schema
> -----------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-5568
>                 URL: https://issues.apache.org/jira/browse/FLINK-5568
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Table API & SQL
>            Reporter: Kurt Young
>            Assignee: jingzhang
>
> The {{TableEnvironment}} now provides a mechanism to register temporary table. It registers
the temp table to calcite catalog, so SQL and TableAPI queries can access to those temp tables.
Now DatasetTable,  DataStreamTable and TableSourceTable can be registered to  {{TableEnvironment}}
as temporary tables.
> This issue wants to provides a mechanism to connect external catalogs such as HCatalog
to the {{TableEnvironment}}, so SQL and TableAPI queries could access to tables in the external
catalogs without register those tables to {{TableEnvironment}} beforehand.
> First, we should point out that there are two kinds of catalog in Flink actually. 
> The first one is external catalog as we mentioned before, it provides CRUD operations
to databases/tables.
> The second one is calcite catalog, it defines namespace that can be accessed in Calcite
queries. It depends on Calcite Schema/Table abstraction. SqlValidator and SqlConverter depends
on the calcite catalog to fetch the tables in SQL or TableAPI.
> So we need to do the following things:
> 1. introduce interface for external catalog, maybe provide an in-memory implementation
first for test and develop environment.
> 2. introduce a mechanism to connect external catalog with Calcite catalog so the tables/databases
in external catalog can be accessed in Calcite catalog. Including convert databases of externalCatalog
to Calcite sub-schemas, convert tables in a database of externalCatalog to  Calcite tables
(only support {{TableSourceTable}}).
> 3. register external catalog to {{TableEnvironment}}.
> Here is the design mode of ExternalCatalogTable.
> |  identifier                      | TableIdentifier | dbName and tableName of table
|
> |  tableType                     | String | type of external catalog table, e.g csv,
hbase, kafka
> |  schema                        | DataSchema|  schema of table data, including column
names and column types
> | partitionColumnNames | List<String> | names of partition column
> | properties                      | Map<String, String> |properties of external
catalog table
> | stats                               | TableStats | statistics of external catalog table

> | comment | String | 
> | create time | long
> There is still a detail problem need to be take into consideration, that is , how to
convert  {{ExternalCatalogTable}} to {{TableSourceTable}}. The question is equals to  convert
 {{ExternalCatalogTable}} to {{TableSource}} because we could  easily get {{TableSourceTable}}
from {{TableSource}}.
> Because different {{TableSource}} often contains different fields to initiate an instance.
E.g. {{CsvTableSource}}  needs path, fieldName, fieldTypes, fieldDelim, rowDelim and so on
to create a new instance , {{KafkaTableSource}} needs configuration and tableName to create
a new instance. So it's not a good idea to let Flink framework be responsible for translate
 {{ExternalCatalogTable}} to different kind of {{TableSourceTable}}. 
> Here is one solution. Let {{TableSource}} specify a converter.
> 1. provide  an Annatition named {{ExternalCatalogCompatible}}. The {{TableSource}} with
the annotation means it is compatible with external catalog, that is, it could be converted
to or from ExternalCatalogTable. This annotation specifies the tabletype and converter of
the tableSource. For example, for {{CsvTableSource}}, it specifies the tableType is csv and
converter class is CsvTableSourceConverter.
> {code}
> @ExternalCatalogCompatible(tableType = "csv", converter = classOf[CsvTableSourceConverter])
> class CsvTableSource(...) {
> ...}
> {code}
> 2. Scan all TableSources with the ExternalCatalogCompatible annotation, save the tableType
and converter in a Map
> 3. When need to convert {{ExternalCatalogTable}} to {{TableSource}} , get the converter
based on tableType. and let converter do convert



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Mime
View raw message