flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From twalthr <...@git.apache.org>
Subject [GitHub] flink pull request #6264: [FLINK-8558] [table] Add unified format interfaces...
Date Tue, 10 Jul 2018 11:28:21 GMT
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6264#discussion_r201307506
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
---
    @@ -110,16 +111,44 @@ abstract class BatchTableEnvironment(
         }
       }
     
    -// TODO expose this once we have enough table source factories that can deal with it
    -//  /**
    -//    * Creates a table from a descriptor that describes the source connector, source
encoding,
    -//    * the resulting table schema, and other properties.
    -//    *
    -//    * @param connectorDescriptor connector descriptor describing the source of the
table
    -//    */
    -//  def from(connectorDescriptor: ConnectorDescriptor): BatchTableSourceDescriptor =
{
    -//    new BatchTableSourceDescriptor(this, connectorDescriptor)
    -//  }
    +  /**
    +    * Creates a table from a descriptor that describes the source connector, the source
format,
    +    * the resulting table schema, and other properties.
    +    *
    +    * Descriptors allow for declaring communication to external systems in an
    +    * implementation-agnostic way. The classpath is scanned for connectors and matching
connectors
    +    * are configured accordingly.
    +    *
    +    * The following example shows how to read from a Kafka connector using a JSON format
and
    +    * creating a table:
    +    *
    +    * {{{
    +    *
    +    * tableEnv
    +    *   .from(
    +    *     new Kafka()
    +    *       .version("0.11")
    +    *       .topic("clicks")
    +    *       .property("zookeeper.connect", "localhost")
    +    *       .property("group.id", "click-group")
    +    *       .startFromEarliest())
    +    *   .withFormat(
    +    *     new Json()
    +    *       .jsonSchema("{...}")
    +    *       .failOnMissingField(false))
    +    *   .withSchema(
    +    *     new Schema()
    +    *       .field("user-name", "VARCHAR").from("u_name")
    +    *       .field("count", "DECIMAL")
    +    *       .field("proc-time", "TIMESTAMP").proctime())
    +    *   .toTable()
    +    * }}}
    +    *
    +    * @param connectorDescriptor connector descriptor describing the source of the table
    +    */
    +  def from(connectorDescriptor: ConnectorDescriptor): BatchTableSourceDescriptor = {
    --- End diff --
    
    Once we reimplement the environments in Java and introduce proper interfaces for hiding
the implementation. This problem will be gone anyway.


---

Mime
View raw message