flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From twal...@apache.org
Subject flink git commit: [FLINK-9059] [table] Replace "sources" with "tables" in environment file
Date Fri, 06 Apr 2018 12:13:53 GMT
Repository: flink
Updated Branches:
  refs/heads/master 7baf7649e -> bc9982c36


[FLINK-9059] [table] Replace "sources" with "tables" in environment file

This closes #5758.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bc9982c3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bc9982c3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bc9982c3

Branch: refs/heads/master
Commit: bc9982c364f54c40223d14eeb4823a882c021e7a
Parents: 7baf764
Author: Shuyi Chen <shuyi@uber.com>
Authored: Thu Mar 22 23:00:00 2018 -0700
Committer: Timo Walther <twalthr@apache.org>
Committed: Fri Apr 6 14:08:08 2018 +0200

----------------------------------------------------------------------
 .../conf/sql-client-defaults.yaml               |  5 ++-
 .../flink/table/client/config/Environment.java  | 47 ++++++++++++--------
 .../client/gateway/local/ExecutionContext.java  | 12 +++--
 .../resources/test-sql-client-defaults.yaml     |  4 +-
 .../test/resources/test-sql-client-factory.yaml |  3 +-
 .../table/descriptors/TableDescriptor.scala     | 40 +++++++++++++++++
 .../descriptors/TableDescriptorValidator.scala  | 43 ++++++++++++++++++
 .../descriptors/TableSourceDescriptor.scala     | 32 ++++++-------
 .../sources/TableSourceFactoryServiceTest.scala |  4 +-
 .../table/sources/TestTableSourceFactory.scala  |  4 +-
 10 files changed, 145 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/bc9982c3/flink-libraries/flink-sql-client/conf/sql-client-defaults.yaml
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-sql-client/conf/sql-client-defaults.yaml b/flink-libraries/flink-sql-client/conf/sql-client-defaults.yaml
index 4ec64d6..5fd01d9 100644
--- a/flink-libraries/flink-sql-client/conf/sql-client-defaults.yaml
+++ b/flink-libraries/flink-sql-client/conf/sql-client-defaults.yaml
@@ -25,11 +25,12 @@
 # Table Sources
 #==============================================================================
 
-# Define table sources here. See the Table API & SQL documentation for details.
+# Define table sources and sinks here. See the Table API & SQL documentation for details.
 
-sources: [] # empty list
+tables: [] # empty list
 # A typical table source definition looks like:
 # - name: ...
+#   type: source
 #   connector: ...
 #   format: ...
 #   schema: ...

http://git-wip-us.apache.org/repos/asf/flink/blob/bc9982c3/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Environment.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Environment.java
b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Environment.java
index a910c49..7169fe1 100644
--- a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Environment.java
+++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Environment.java
@@ -19,6 +19,8 @@
 package org.apache.flink.table.client.config;
 
 import org.apache.flink.table.client.SqlClientException;
+import org.apache.flink.table.descriptors.TableDescriptor;
+import org.apache.flink.table.descriptors.TableDescriptorValidator;
 
 import java.io.IOException;
 import java.net.URL;
@@ -29,7 +31,7 @@ import java.util.Map;
 
 /**
  * Environment configuration that represents the content of an environment file. Environment
files
- * define sources, execution, and deployment behavior. An environment might be defined by
default or
+ * define tables, execution, and deployment behavior. An environment might be defined by
default or
  * as part of a session. Environments can be merged or enriched with properties (e.g. from
CLI command).
  *
  * <p>In future versions, we might restrict the merging or enrichment of deployment
properties to not
@@ -37,30 +39,39 @@ import java.util.Map;
  */
 public class Environment {
 
-	private Map<String, Source> sources;
+	private Map<String, TableDescriptor> tables;
 
 	private Execution execution;
 
 	private Deployment deployment;
 
 	public Environment() {
-		this.sources = Collections.emptyMap();
+		this.tables = Collections.emptyMap();
 		this.execution = new Execution();
 		this.deployment = new Deployment();
 	}
 
-	public Map<String, Source> getSources() {
-		return sources;
+	public Map<String, TableDescriptor> getTables() {
+		return tables;
 	}
 
-	public void setSources(List<Map<String, Object>> sources) {
-		this.sources = new HashMap<>(sources.size());
-		sources.forEach(config -> {
-			final Source s = Source.create(config);
-			if (this.sources.containsKey(s.getName())) {
-				throw new SqlClientException("Duplicate source name '" + s + "'.");
+	public void setTables(List<Map<String, Object>> tables) {
+		this.tables = new HashMap<>(tables.size());
+		tables.forEach(config -> {
+			if (!config.containsKey(TableDescriptorValidator.TABLE_TYPE())) {
+				throw new SqlClientException("The 'type' attribute of a table is missing.");
+			}
+			if (config.get(TableDescriptorValidator.TABLE_TYPE()).equals(TableDescriptorValidator.TABLE_TYPE_VALUE_SOURCE()))
{
+				config.remove(TableDescriptorValidator.TABLE_TYPE());
+				final Source s = Source.create(config);
+				if (this.tables.containsKey(s.getName())) {
+					throw new SqlClientException("Duplicate source name '" + s + "'.");
+				}
+				this.tables.put(s.getName(), s);
+			} else {
+				throw new SqlClientException(
+						"Invalid table 'type' attribute value, only 'source' is supported");
 			}
-			this.sources.put(s.getName(), s);
 		});
 	}
 
@@ -102,10 +113,10 @@ public class Environment {
 	public static Environment merge(Environment env1, Environment env2) {
 		final Environment mergedEnv = new Environment();
 
-		// merge sources
-		final Map<String, Source> sources = new HashMap<>(env1.getSources());
-		mergedEnv.getSources().putAll(env2.getSources());
-		mergedEnv.sources = sources;
+		// merge tables
+		final Map<String, TableDescriptor> tables = new HashMap<>(env1.getTables());
+		mergedEnv.getTables().putAll(env2.getTables());
+		mergedEnv.tables = tables;
 
 		// merge execution properties
 		mergedEnv.execution = Execution.merge(env1.getExecution(), env2.getExecution());
@@ -119,8 +130,8 @@ public class Environment {
 	public static Environment enrich(Environment env, Map<String, String> properties)
{
 		final Environment enrichedEnv = new Environment();
 
-		// merge sources
-		enrichedEnv.sources = new HashMap<>(env.getSources());
+		// merge tables
+		enrichedEnv.tables = new HashMap<>(env.getTables());
 
 		// enrich execution properties
 		enrichedEnv.execution = Execution.enrich(env.execution, properties);

http://git-wip-us.apache.org/repos/asf/flink/blob/bc9982c3/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
index 81931e2..84b7b28 100644
--- a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
+++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
@@ -45,6 +45,7 @@ import org.apache.flink.table.client.config.Deployment;
 import org.apache.flink.table.client.config.Environment;
 import org.apache.flink.table.client.gateway.SessionContext;
 import org.apache.flink.table.client.gateway.SqlExecutionException;
+import org.apache.flink.table.descriptors.TableSourceDescriptor;
 import org.apache.flink.table.sources.TableSource;
 import org.apache.flink.table.sources.TableSourceFactoryService;
 import org.apache.flink.util.FlinkException;
@@ -60,7 +61,7 @@ import java.util.Map;
 /**
  * Context for executing table programs. This class caches everything that can be cached
across
  * multiple queries as long as the session context does not change. This must be thread-safe
as
- * it might be reused across different query submission.
+ * it might be reused across different query submissions.
  *
  * @param <T> cluster id
  */
@@ -92,9 +93,12 @@ public class ExecutionContext<T> {
 
 		// create table sources
 		tableSources = new HashMap<>();
-		mergedEnv.getSources().forEach((name, source) -> {
-			final TableSource<?> tableSource = TableSourceFactoryService.findAndCreateTableSource(source,
classLoader);
-			tableSources.put(name, tableSource);
+		mergedEnv.getTables().forEach((name, descriptor) -> {
+			if (descriptor instanceof TableSourceDescriptor) {
+				TableSource<?> tableSource = TableSourceFactoryService.findAndCreateTableSource(
+						(TableSourceDescriptor) descriptor, classLoader);
+				tableSources.put(name, tableSource);
+			}
 		});
 
 		// convert deployment options into command line options that describe a cluster

http://git-wip-us.apache.org/repos/asf/flink/blob/bc9982c3/flink-libraries/flink-sql-client/src/test/resources/test-sql-client-defaults.yaml
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-sql-client/src/test/resources/test-sql-client-defaults.yaml
b/flink-libraries/flink-sql-client/src/test/resources/test-sql-client-defaults.yaml
index 8f11d23..1186615 100644
--- a/flink-libraries/flink-sql-client/src/test/resources/test-sql-client-defaults.yaml
+++ b/flink-libraries/flink-sql-client/src/test/resources/test-sql-client-defaults.yaml
@@ -23,8 +23,9 @@
 
 # this file has variables that can be filled with content by replacing $VAR_XXX
 
-sources:
+tables:
   - name: TableNumber1
+    type: source
     schema:
       - name: IntegerField1
         type: INT
@@ -43,6 +44,7 @@ sources:
       line-delimiter: "\n"
       comment-prefix: "#"
   - name: TableNumber2
+    type: source
     schema:
       - name: IntegerField2
         type: INT

http://git-wip-us.apache.org/repos/asf/flink/blob/bc9982c3/flink-libraries/flink-sql-client/src/test/resources/test-sql-client-factory.yaml
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-sql-client/src/test/resources/test-sql-client-factory.yaml
b/flink-libraries/flink-sql-client/src/test/resources/test-sql-client-factory.yaml
index d0caf84..c7b6097 100644
--- a/flink-libraries/flink-sql-client/src/test/resources/test-sql-client-factory.yaml
+++ b/flink-libraries/flink-sql-client/src/test/resources/test-sql-client-factory.yaml
@@ -23,8 +23,9 @@
 
 # this file has variables that can be filled with content by replacing $VAR_XXX
 
-sources:
+tables:
   - name: TableNumber1
+    type: source
     schema:
       - name: IntegerField1
         type: INT

http://git-wip-us.apache.org/repos/asf/flink/blob/bc9982c3/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableDescriptor.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableDescriptor.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableDescriptor.scala
new file mode 100644
index 0000000..7b864d8
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableDescriptor.scala
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+/**
+  * Common class for all descriptors describing table sources and sinks.
+  */
+abstract class TableDescriptor extends Descriptor {
+
+  protected var connectorDescriptor: Option[ConnectorDescriptor] = None
+  protected var formatDescriptor: Option[FormatDescriptor] = None
+  protected var schemaDescriptor: Option[Schema] = None
+  protected var metaDescriptor: Option[Metadata] = None
+
+  /**
+    * Internal method for properties conversion.
+    */
+  override private[flink] def addProperties(properties: DescriptorProperties): Unit = {
+    connectorDescriptor.foreach(_.addProperties(properties))
+    formatDescriptor.foreach(_.addProperties(properties))
+    schemaDescriptor.foreach(_.addProperties(properties))
+    metaDescriptor.foreach(_.addProperties(properties))
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/bc9982c3/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableDescriptorValidator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableDescriptorValidator.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableDescriptorValidator.scala
new file mode 100644
index 0000000..b868a8a
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableDescriptorValidator.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+/**
+  * Validator for [[TableDescriptor]].
+  */
+class TableDescriptorValidator extends DescriptorValidator {
+
+  override def validate(properties: DescriptorProperties): Unit = {
+    // nothing to do
+  }
+}
+
+object TableDescriptorValidator {
+
+  /**
+    * Key for describing the type of this table, valid values are ('source').
+    */
+  val TABLE_TYPE = "type"
+
+  /**
+    * Valid TABLE_TYPE value.
+    */
+  val TABLE_TYPE_VALUE_SOURCE = "source"
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/bc9982c3/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableSourceDescriptor.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableSourceDescriptor.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableSourceDescriptor.scala
index 5118489..3ca39c2 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableSourceDescriptor.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableSourceDescriptor.scala
@@ -27,37 +27,31 @@ import scala.collection.JavaConverters._
 /**
   * Common class for all descriptors describing a table source.
   */
-abstract class TableSourceDescriptor extends Descriptor {
+abstract class TableSourceDescriptor extends TableDescriptor {
 
-  protected var connectorDescriptor: Option[ConnectorDescriptor] = None
-  protected var formatDescriptor: Option[FormatDescriptor] = None
-  protected var schemaDescriptor: Option[Schema] = None
   protected var statisticsDescriptor: Option[Statistics] = None
-  protected var metaDescriptor: Option[Metadata] = None
 
   /**
     * Internal method for properties conversion.
     */
   override private[flink] def addProperties(properties: DescriptorProperties): Unit = {
-    connectorDescriptor.foreach(_.addProperties(properties))
-    formatDescriptor.foreach(_.addProperties(properties))
-    schemaDescriptor.foreach(_.addProperties(properties))
-    metaDescriptor.foreach(_.addProperties(properties))
+    super.addProperties(properties)
+    statisticsDescriptor.foreach(_.addProperties(properties))
   }
 
   /**
     * Reads table statistics from the descriptors properties.
     */
   protected def getTableStats: Option[TableStats] = {
-      val normalizedProps = new DescriptorProperties()
-      addProperties(normalizedProps)
-      val rowCount = toScala(normalizedProps.getOptionalLong(STATISTICS_ROW_COUNT))
-      rowCount match {
-        case Some(cnt) =>
-          val columnStats = readColumnStats(normalizedProps, STATISTICS_COLUMNS)
-          Some(TableStats(cnt, columnStats.asJava))
-        case None =>
-          None
-      }
+    val normalizedProps = new DescriptorProperties()
+    addProperties(normalizedProps)
+    val rowCount = toScala(normalizedProps.getOptionalLong(STATISTICS_ROW_COUNT))
+    rowCount match {
+      case Some(cnt) =>
+        val columnStats = readColumnStats(normalizedProps, STATISTICS_COLUMNS)
+        Some(TableStats(cnt, columnStats.asJava))
+      case None =>
+        None
     }
+  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/bc9982c3/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sources/TableSourceFactoryServiceTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sources/TableSourceFactoryServiceTest.scala
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sources/TableSourceFactoryServiceTest.scala
index 279e9a4..48db9da 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sources/TableSourceFactoryServiceTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sources/TableSourceFactoryServiceTest.scala
@@ -19,8 +19,8 @@
 package org.apache.flink.table.sources
 
 import org.apache.flink.table.api.{NoMatchingTableSourceException, TableException, ValidationException}
-import org.apache.flink.table.descriptors.ConnectorDescriptorValidator.{CONNECTOR_TYPE, CONNECTOR_PROPERTY_VERSION}
-import org.apache.flink.table.descriptors.FormatDescriptorValidator.{FORMAT_TYPE, FORMAT_PROPERTY_VERSION}
+import org.apache.flink.table.descriptors.ConnectorDescriptorValidator.{CONNECTOR_PROPERTY_VERSION,
CONNECTOR_TYPE}
+import org.apache.flink.table.descriptors.FormatDescriptorValidator.{FORMAT_PROPERTY_VERSION,
FORMAT_TYPE}
 import org.junit.Assert.assertTrue
 import org.junit.Test
 

http://git-wip-us.apache.org/repos/asf/flink/blob/bc9982c3/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sources/TestTableSourceFactory.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sources/TestTableSourceFactory.scala
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sources/TestTableSourceFactory.scala
index ee3d637..b4aa08d 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sources/TestTableSourceFactory.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sources/TestTableSourceFactory.scala
@@ -22,8 +22,8 @@ import java.util
 
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.table.api.TableSchema
-import org.apache.flink.table.descriptors.ConnectorDescriptorValidator.{CONNECTOR_TYPE, CONNECTOR_PROPERTY_VERSION}
-import org.apache.flink.table.descriptors.FormatDescriptorValidator.{FORMAT_TYPE, FORMAT_PROPERTY_VERSION}
+import org.apache.flink.table.descriptors.ConnectorDescriptorValidator.{CONNECTOR_PROPERTY_VERSION,
CONNECTOR_TYPE}
+import org.apache.flink.table.descriptors.FormatDescriptorValidator.{FORMAT_PROPERTY_VERSION,
FORMAT_TYPE}
 import org.apache.flink.types.Row
 
 class TestTableSourceFactory extends TableSourceFactory[Row] {


Mime
View raw message