flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] xuefuz commented on a change in pull request #6906: [Flink-6036][table]Let catalog support partition.
Date Wed, 24 Oct 2018 18:45:38 GMT
xuefuz commented on a change in pull request #6906: [Flink-6036][table]Let catalog support
partition.
URL: https://github.com/apache/flink/pull/6906#discussion_r227915200
 
 

 ##########
 File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala
 ##########
 @@ -282,34 +296,63 @@ class ExternalCatalogTableBuilder(private val connectorDescriptor:
ConnectorDesc
     this
   }
 
+  /**
+    * Specifies the partition columns for this external table.
+    */
+  def withPartitionColumnNames(
+    partitionColumnNames: java.util.LinkedHashSet[String]): ExternalCatalogTableBuilder =
{
+    require(partitionColumnNames != null && !partitionColumnNames.isEmpty)
+    this.partitionColumnNames = Some(partitionColumnNames)
+    this
+  }
+
   /**
     * Declares this external table as a table source and returns the
     * configured [[ExternalCatalogTable]].
     *
     * @return External catalog table
     */
-  def asTableSource(): ExternalCatalogTable = {
-    new ExternalCatalogTable(
-      isBatch,
-      isStreaming,
-      isSource = true,
-      isSink = false,
-      DescriptorProperties.toJavaMap(this))
-  }
+  def asTableSource(): ExternalCatalogTable = this.partitionColumnNames match {
+      case Some(pc) =>
+        new ExternalCatalogPartitionedTable(
+          isBatch,
+          isStreaming,
+          isSource = true,
+          isSink = false,
+          pc,
+          DescriptorProperties.toJavaMap(this)
+        )
+      case None =>
+        new ExternalCatalogTable(
+          isBatch,
+          isStreaming,
+          isSource = true,
+          isSink = false,
+          DescriptorProperties.toJavaMap(this))
+   }
 
   /**
     * Declares this external table as a table sink and returns the
     * configured [[ExternalCatalogTable]].
     *
     * @return External catalog table
     */
-  def asTableSink(): ExternalCatalogTable = {
-    new ExternalCatalogTable(
-      isBatch,
-      isStreaming,
-      isSource = false,
-      isSink = true,
-      DescriptorProperties.toJavaMap(this))
+  def asTableSink(): ExternalCatalogTable = this.partitionColumnNames match {
 
 Review comment:
   I see a repeated pattern in the three asXXX methods. While it's not introduced in this
PR, it might be good if we can introduce a help method that those asXXX methods call to minimize
the repetition.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message