flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From twalthr <...@git.apache.org>
Subject [GitHub] flink pull request #6343: [FLINK-9852] [table] Expose descriptor-based sink ...
Date Wed, 18 Jul 2018 11:55:33 GMT
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6343#discussion_r203349277
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala
---
    @@ -18,33 +18,299 @@
     
     package org.apache.flink.table.catalog
     
    +import org.apache.flink.table.descriptors.DescriptorProperties.toScala
    +import org.apache.flink.table.descriptors.StatisticsValidator.{STATISTICS_COLUMNS, STATISTICS_ROW_COUNT,
readColumnStats}
    +import org.apache.flink.table.descriptors.StreamTableDescriptorValidator.{UPDATE_MODE,
UPDATE_MODE_VALUE_APPEND, UPDATE_MODE_VALUE_RETRACT, UPDATE_MODE_VALUE_UPSERT}
     import org.apache.flink.table.descriptors._
     import org.apache.flink.table.plan.stats.TableStats
     
    +import scala.collection.JavaConverters._
    +
     /**
    -  * Defines a table in an [[ExternalCatalog]].
    +  * Defines a table in an [[ExternalCatalog]]. External catalog tables describe table
sources
    +  * and/or sinks for both batch and stream environments.
    +  *
    +  * The catalog table takes descriptors which allow for declaring the communication to
external
    +  * systems in an implementation-agnostic way. The classpath is scanned for suitable
table factories
    +  * that match the desired configuration.
    +  *
    +  * Use the provided builder methods to configure the external catalog table accordingly.
    +  *
    +  * The following example shows how to read from a connector using a JSON format and
    +  * declaring it as a table source:
       *
    -  * @param connectorDesc describes the system to connect to
    -  * @param formatDesc describes the data format of a connector
    -  * @param schemaDesc describes the schema of the result table
    -  * @param statisticsDesc describes the estimated statistics of the result table
    -  * @param metadataDesc describes additional metadata of a table
    +  * {{{
    +  *   ExternalCatalogTable(
    +  *     new ExternalSystemXYZ()
    +  *       .version("0.11"))
    +  *   .withFormat(
    +  *     new Json()
    +  *       .jsonSchema("{...}")
    +  *       .failOnMissingField(false))
    +  *   .withSchema(
    +  *     new Schema()
    +  *       .field("user-name", "VARCHAR").from("u_name")
    +  *       .field("count", "DECIMAL")
    +  *   .asTableSource()
    +  * }}}
    +  *
    +  * Note: For backwards-compatibility, the table is declared as a table source for batch
and
    +  * streaming environment by default.
    +  *
    +  * See also [[org.apache.flink.table.factories.TableFactory]] for more information about
how
    +  * to target suitable factories.
    +  *
    +  * @param connectorDescriptor describes the system to connect to
       */
    -class ExternalCatalogTable(
    -    connectorDesc: ConnectorDescriptor,
    -    formatDesc: Option[FormatDescriptor],
    -    schemaDesc: Option[Schema],
    -    statisticsDesc: Option[Statistics],
    -    metadataDesc: Option[Metadata])
    -  extends TableSourceDescriptor {
    -
    -  this.connectorDescriptor = Some(connectorDesc)
    -  this.formatDescriptor = formatDesc
    -  this.schemaDescriptor = schemaDesc
    -  this.statisticsDescriptor = statisticsDesc
    -  this.metaDescriptor = metadataDesc
    -
    -  // expose statistics for external table source util
    -  override def getTableStats: Option[TableStats] = super.getTableStats
    +class ExternalCatalogTable(val connectorDescriptor: ConnectorDescriptor)
    +  extends TableDescriptor
    --- End diff --
    
    This is the code style that we should all comply with.


---

Mime
View raw message