flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From twal...@apache.org
Subject [2/5] flink git commit: [FLINK-8538] [table] Improve unified table sources
Date Tue, 27 Feb 2018 19:25:17 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/db2c510f/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/DescriptorProperties.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/DescriptorProperties.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/DescriptorProperties.scala
index d112732..555d92d 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/DescriptorProperties.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/DescriptorProperties.scala
@@ -21,24 +21,32 @@ package org.apache.flink.table.descriptors
 import java.io.Serializable
 import java.lang.{Boolean => JBoolean, Double => JDouble, Integer => JInt, Long => JLong}
 import java.util
+import java.util.function.{Consumer, Supplier}
 import java.util.regex.Pattern
+import java.util.{Optional, List => JList, Map => JMap}
 
 import org.apache.commons.codec.binary.Base64
 import org.apache.commons.lang.StringEscapeUtils
 import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.table.api.{TableSchema, ValidationException}
-import org.apache.flink.table.descriptors.DescriptorProperties.{NAME, TYPE, normalizeTableSchema}
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.table.api.{TableException, TableSchema, ValidationException}
+import org.apache.flink.table.descriptors.DescriptorProperties.{NAME, TYPE, normalizeTableSchema, toJava}
 import org.apache.flink.table.typeutils.TypeStringUtils
 import org.apache.flink.util.InstantiationUtil
 import org.apache.flink.util.Preconditions.checkNotNull
 
-import scala.collection.mutable
 import scala.collection.JavaConverters._
+import scala.collection.mutable
 
 /**
   * Utility class for having a unified string-based representation of Table API related classes
   * such as [[TableSchema]], [[TypeInformation]], etc.
   *
+  * '''Note to implementers''': Please try to reuse key names as much as possible. Key-names
+  * should be hierarchical and lower case. Use "-" instead of dots or camel case.
+  * E.g., connector.schema.start-from = from-earliest. Try not to use the higher level in a
+  * key-name. E.g., instead of connector.kafka.kafka-version use connector.kafka.version.
+  *
   * @param normalizeKeys flag that indicates if keys should be normalized (this flag is
   *                      necessary for backwards compatibility)
   */
@@ -46,39 +54,18 @@ class DescriptorProperties(normalizeKeys: Boolean = true) {
 
   private val properties: mutable.Map[String, String] = new mutable.HashMap[String, String]()
 
-  private def put(key: String, value: String): Unit = {
-    if (properties.contains(key)) {
-      throw new IllegalStateException("Property already present.")
-    }
-    if (normalizeKeys) {
-      properties.put(key.toLowerCase, value)
-    } else {
-      properties.put(key, value)
-    }
-  }
-
-  // for testing
-  private[flink] def unsafePut(key: String, value: String): Unit = {
-    properties.put(key, value)
-  }
-
-  // for testing
-  private[flink] def unsafeRemove(key: String): Unit = {
-    properties.remove(key)
-  }
-
-  def putProperties(properties: Map[String, String]): Unit = {
-    properties.foreach { case (k, v) =>
-      put(k, v)
-    }
-  }
-
-  def putProperties(properties: util.Map[String, String]): Unit = {
+  /**
+    * Adds a set of properties.
+    */
+  def putProperties(properties: JMap[String, String]): Unit = {
     properties.asScala.foreach { case (k, v) =>
       put(k, v)
     }
   }
 
+  /**
+    * Adds a class under the given key.
+    */
   def putClass(key: String, clazz: Class[_]): Unit = {
     checkNotNull(key)
     checkNotNull(clazz)
@@ -89,43 +76,62 @@ class DescriptorProperties(normalizeKeys: Boolean = true) {
     put(key, clazz.getName)
   }
 
+  /**
+    * Adds a string under the given key.
+    */
   def putString(key: String, str: String): Unit = {
     checkNotNull(key)
     checkNotNull(str)
     put(key, str)
   }
 
+  /**
+    * Adds a boolean under the given key.
+    */
   def putBoolean(key: String, b: Boolean): Unit = {
     checkNotNull(key)
     put(key, b.toString)
   }
 
+  /**
+    * Adds a long under the given key.
+    */
   def putLong(key: String, l: Long): Unit = {
     checkNotNull(key)
     put(key, l.toString)
   }
 
+  /**
+    * Adds an integer under the given key.
+    */
   def putInt(key: String, i: Int): Unit = {
     checkNotNull(key)
     put(key, i.toString)
   }
 
+  /**
+    * Adds a character under the given key.
+    */
   def putCharacter(key: String, c: Character): Unit = {
     checkNotNull(key)
     checkNotNull(c)
     put(key, c.toString)
   }
 
+  /**
+    * Adds a table schema under the given key.
+    */
   def putTableSchema(key: String, schema: TableSchema): Unit = {
+    checkNotNull(key)
+    checkNotNull(schema)
     putTableSchema(key, normalizeTableSchema(schema))
   }
 
-  def putTableSchema(key: String, nameAndType: Seq[(String, String)]): Unit = {
-    putIndexedFixedProperties(
-      key,
-      Seq(NAME, TYPE),
-      nameAndType.map(t => Seq(t._1, t._2))
-    )
+  /**
+    * Adds a table schema under the given key.
+    */
+  def putTableSchema(key: String, nameAndType: JList[JTuple2[String, String]]): Unit = {
+    putTableSchema(key, nameAndType.asScala.map(t => (t.f0, t.f1)))
   }
 
   /**
@@ -140,19 +146,12 @@ class DescriptorProperties(normalizeKeys: Boolean = true) {
     */
   def putIndexedFixedProperties(
       key: String,
-      propertyKeys: Seq[String],
-      propertyValues: Seq[Seq[String]])
+      propertyKeys: JList[String],
+      propertyValues: JList[JList[String]])
     : Unit = {
     checkNotNull(key)
     checkNotNull(propertyValues)
-    propertyValues.zipWithIndex.foreach { case (values, idx) =>
-      if (values.lengthCompare(propertyKeys.size) != 0) {
-        throw new ValidationException("Values must have same arity as keys.")
-      }
-      values.zipWithIndex.foreach { case (value, keyIdx) =>
-          put(s"$key.$idx.${propertyKeys(keyIdx)}", value)
-      }
-    }
+    putIndexedFixedProperties(key, propertyKeys.asScala, propertyValues.asScala.map(_.asScala))
   }
 
   /**
@@ -167,65 +166,163 @@ class DescriptorProperties(normalizeKeys: Boolean = true) {
     */
   def putIndexedVariableProperties(
       key: String,
-      propertySets: Seq[Map[String, String]])
+      propertySets: JList[JMap[String, String]])
     : Unit = {
     checkNotNull(key)
     checkNotNull(propertySets)
-    propertySets.zipWithIndex.foreach { case (propertySet, idx) =>
-      propertySet.foreach { case (k, v) =>
-        put(s"$key.$idx.$k", v)
-      }
-    }
+    putIndexedVariableProperties(key, propertySets.asScala.map(_.asScala.toMap))
   }
 
   // ----------------------------------------------------------------------------------------------
 
-  def getString(key: String): Option[String] = {
-    properties.get(key)
+  /**
+    * Returns a string value under the given key if it exists.
+    */
+  def getOptionalString(key: String): Optional[String] = toJava(properties.get(key))
+
+  /**
+    * Returns a string value under the given existing key.
+    */
+  def getString(key: String): String = {
+    get(key)
   }
 
-  def getCharacter(key: String): Option[Character] = getString(key) match {
-    case Some(c) =>
+  /**
+    * Returns a character value under the given key if it exists.
+    */
+  def getOptionalCharacter(key: String): Optional[Character] = {
+    val value = properties.get(key).map { c =>
       if (c.length != 1) {
         throw new ValidationException(s"The value of $key must only contain one character.")
       }
-      Some(c.charAt(0))
+      Char.box(c.charAt(0))
+    }
+    toJava(value)
+  }
 
-    case None => None
+  /**
+    * Returns a character value under the given existing key.
+    */
+  def getCharacter(key: String): Char = {
+    getOptionalCharacter(key).orElseThrow(exceptionSupplier(key))
   }
 
-  def getBoolean(key: String): Option[Boolean] = getString(key) match {
-    case Some(b) => Some(JBoolean.parseBoolean(b))
+  /**
+    * Returns a class value under the given key if it exists.
+    */
+  def getOptionalClass[T](key: String, superClass: Class[T]): Optional[Class[T]] = {
+    val value = properties.get(key).map { name =>
+      val clazz = try {
+        Class.forName(
+          name,
+          true,
+          Thread.currentThread().getContextClassLoader).asInstanceOf[Class[T]]
+      } catch {
+        case e: Exception =>
+          throw new ValidationException(s"Could not get class '$name' for key '$key'.", e)
+      }
+      if (!superClass.isAssignableFrom(clazz)) {
+        throw new ValidationException(s"Class '$name' does not extend from the required " +
+          s"class '${superClass.getName}' for key '$key'.")
+      }
+      clazz
+    }
+    toJava(value)
+  }
 
-    case None => None
+  /**
+    * Returns a class value under the given existing key.
+    */
+  def getClass[T](key: String, superClass: Class[T]): Class[T] = {
+    getOptionalClass(key, superClass).orElseThrow(exceptionSupplier(key))
   }
 
-  def getInt(key: String): Option[Int] = getString(key) match {
-    case Some(l) => Some(JInt.parseInt(l))
+  /**
+    * Returns a boolean value under the given key if it exists.
+    */
+  def getOptionalBoolean(key: String): Optional[JBoolean] = {
+    val value = properties.get(key).map(JBoolean.parseBoolean(_)).map(Boolean.box)
+    toJava(value)
+  }
+
+  /**
+    * Returns a boolean value under the given existing key.
+    */
+  def getBoolean(key: String): Boolean = {
+    getOptionalBoolean(key).orElseThrow(exceptionSupplier(key))
+  }
 
-    case None => None
+  /**
+    * Returns an integer value under the given key if it exists.
+    */
+  def getOptionalInt(key: String): Optional[JInt] = {
+    val value = properties.get(key).map(JInt.parseInt(_)).map(Int.box)
+    toJava(value)
   }
 
-  def getLong(key: String): Option[Long] = getString(key) match {
-    case Some(l) => Some(JLong.parseLong(l))
+  /**
+    * Returns an integer value under the given existing key.
+    */
+  def getInt(key: String): Int = {
+    getOptionalInt(key).orElseThrow(exceptionSupplier(key))
+  }
 
-    case None => None
+  /**
+    * Returns a long value under the given key if it exists.
+    */
+  def getOptionalLong(key: String): Optional[JLong] = {
+    val value = properties.get(key).map(JLong.parseLong(_)).map(Long.box)
+    toJava(value)
+  }
+
+  /**
+    * Returns a long value under the given existing key.
+    */
+  def getLong(key: String): Long = {
+    getOptionalLong(key).orElseThrow(exceptionSupplier(key))
+  }
+
+  /**
+    * Returns a double value under the given key if it exists.
+    */
+  def getOptionalDouble(key: String): Optional[JDouble] = {
+    val value = properties.get(key).map(JDouble.parseDouble(_)).map(Double.box)
+    toJava(value)
+  }
+
+  /**
+    * Returns a double value under the given key if it exists.
+    */
+  def getDouble(key: String): Double = {
+    getOptionalDouble(key).orElseThrow(exceptionSupplier(key))
   }
 
-  def getDouble(key: String): Option[Double] = getString(key) match {
-    case Some(d) => Some(JDouble.parseDouble(d))
+  /**
+    * Returns the type information under the given key if it exists.
+    */
+  def getOptionalType(key: String): Optional[TypeInformation[_]] = {
+    val value = properties.get(key).map(TypeStringUtils.readTypeInfo)
+    toJava(value)
+  }
 
-    case None => None
+  /**
+    * Returns the type information under the given existing key.
+    */
+  def getType(key: String): TypeInformation[_] = {
+    getOptionalType(key).orElseThrow(exceptionSupplier(key))
   }
 
-  def getTableSchema(key: String): Option[TableSchema] = {
+  /**
+    * Returns a table schema under the given key if it exists.
+    */
+  def getOptionalTableSchema(key: String): Optional[TableSchema] = {
     // filter for number of columns
     val fieldCount = properties
       .filterKeys(k => k.startsWith(key) && k.endsWith(s".$NAME"))
       .size
 
     if (fieldCount == 0) {
-      return None
+      return toJava(None)
     }
 
     // validate fields and build schema
@@ -243,16 +340,186 @@ class DescriptorProperties(normalizeKeys: Boolean = true) {
         )
       )
     }
-    Some(schemaBuilder.build())
+    toJava(Some(schemaBuilder.build()))
+  }
+
+  /**
+    * Returns a table schema under the given existing key.
+    */
+  def getTableSchema(key: String): TableSchema = {
+    getOptionalTableSchema(key).orElseThrow(exceptionSupplier(key))
+  }
+
+  /**
+    * Returns the property keys of fixed indexed properties.
+    *
+    * For example:
+    *
+    * schema.fields.0.type = INT, schema.fields.0.name = test
+    * schema.fields.1.type = LONG, schema.fields.1.name = test2
+    *
+    * getFixedIndexedProperties("schema.fields", List("type", "name")) leads to:
+    *
+    * 0: Map("type" -> "schema.fields.0.type", "name" -> "schema.fields.0.name")
+    * 1: Map("type" -> "schema.fields.1.type", "name" -> "schema.fields.1.name")
+    */
+  def getFixedIndexedProperties(
+      key: String,
+      propertyKeys: JList[String])
+    : JList[JMap[String, String]] = {
+
+    val keys = propertyKeys.asScala
+
+    // filter for index
+    val escapedKey = Pattern.quote(key)
+    val pattern = Pattern.compile(s"$escapedKey\\.(\\d+)\\.(.*)")
+
+    // extract index and property keys
+    val indexes = properties.keys.flatMap { k =>
+      val matcher = pattern.matcher(k)
+      if (matcher.find()) {
+        Some(JInt.parseInt(matcher.group(1)))
+      } else {
+        None
+      }
+    }
+
+    // determine max index
+    val maxIndex = indexes.reduceOption(_ max _).getOrElse(-1)
+
+    // validate and create result
+    val list = new util.ArrayList[JMap[String, String]]()
+    for (i <- 0 to maxIndex) {
+      val map = new util.HashMap[String, String]()
+
+      keys.foreach { subKey =>
+        val fullKey = s"$key.$i.$subKey"
+        // check for existence of full key
+        if (!containsKey(fullKey)) {
+          throw exceptionSupplier(fullKey).get()
+        }
+        map.put(subKey, fullKey)
+      }
+
+      list.add(map)
+    }
+    list
+  }
+
+  /**
+    * Returns the property keys of variable indexed properties.
+    *
+    * For example:
+    *
+    * schema.fields.0.type = INT, schema.fields.0.name = test
+    * schema.fields.1.type = LONG
+    *
+    * getFixedIndexedProperties("schema.fields", List("type")) leads to:
+    *
+    * 0: Map("type" -> "schema.fields.0.type", "name" -> "schema.fields.0.name")
+    * 1: Map("type" -> "schema.fields.1.type")
+    */
+  def getVariableIndexedProperties(
+      key: String,
+      requiredKeys: JList[String])
+    : JList[JMap[String, String]] = {
+
+    val keys = requiredKeys.asScala
+
+    // filter for index
+    val escapedKey = Pattern.quote(key)
+    val pattern = Pattern.compile(s"$escapedKey\\.(\\d+)\\.(.*)")
+
+    // extract index and property keys
+    val indexes = properties.keys.flatMap { k =>
+      val matcher = pattern.matcher(k)
+      if (matcher.find()) {
+        Some((JInt.parseInt(matcher.group(1)), matcher.group(2)))
+      } else {
+        None
+      }
+    }
+
+    // determine max index
+    val maxIndex = indexes.map(_._1).reduceOption(_ max _).getOrElse(-1)
+
+    // validate and create result
+    val list = new util.ArrayList[JMap[String, String]]()
+    for (i <- 0 to maxIndex) {
+      val map = new util.HashMap[String, String]()
+
+      // check and add required keys
+      keys.foreach { subKey =>
+        val fullKey = s"$key.$i.$subKey"
+        // check for existence of full key
+        if (!containsKey(fullKey)) {
+          throw exceptionSupplier(fullKey).get()
+        }
+        map.put(subKey, fullKey)
+      }
+
+      // add optional keys
+      indexes.filter(_._1 == i).foreach { case (_, subKey) =>
+        val fullKey = s"$key.$i.$subKey"
+        map.put(subKey, fullKey)
+      }
+
+      list.add(map)
+    }
+    list
+  }
+
+  /**
+    * Returns all properties under a given key that contains an index in between.
+    *
+    * E.g. rowtime.0.name -> returns all rowtime.#.name properties
+    */
+  def getIndexedProperty(key: String, property: String): JMap[String, String] = {
+    val escapedKey = Pattern.quote(key)
+    properties.filterKeys(k => k.matches(s"$escapedKey\\.\\d+\\.$property")).asJava
+  }
+
+  /**
+    * Returns a prefix subset of properties.
+    */
+  def getPrefix(prefixKey: String): JMap[String, String] = {
+    val prefix = prefixKey + '.'
+    properties.filterKeys(_.startsWith(prefix)).toSeq.map{ case (k, v) =>
+      k.substring(prefix.length) -> v // remove prefix
+    }.toMap.asJava
   }
 
   // ----------------------------------------------------------------------------------------------
 
+  /**
+    * Validates a string property.
+    */
+  def validateString(
+      key: String,
+      isOptional: Boolean)
+    : Unit = {
+    validateString(key, isOptional, 0, Integer.MAX_VALUE)
+  }
+
+  /**
+    * Validates a string property. The boundaries are inclusive.
+    */
   def validateString(
       key: String,
       isOptional: Boolean,
-      minLen: Int = 0, // inclusive
-      maxLen: Int = Integer.MAX_VALUE) // inclusive
+      minLen: Int) // inclusive
+    : Unit = {
+    validateString(key, isOptional, minLen, Integer.MAX_VALUE)
+  }
+
+  /**
+    * Validates a string property. The boundaries are inclusive.
+    */
+  def validateString(
+      key: String,
+      isOptional: Boolean,
+      minLen: Int, // inclusive
+      maxLen: Int) // inclusive
     : Unit = {
 
     if (!properties.contains(key)) {
@@ -269,11 +536,35 @@ class DescriptorProperties(normalizeKeys: Boolean = true) {
     }
   }
 
+  /**
+    * Validates an integer property.
+    */
+  def validateInt(
+      key: String,
+      isOptional: Boolean)
+    : Unit = {
+    validateInt(key, isOptional, Int.MinValue, Int.MaxValue)
+  }
+
+  /**
+    * Validates an integer property. The boundaries are inclusive.
+    */
   def validateInt(
       key: String,
       isOptional: Boolean,
-      min: Int = Int.MinValue, // inclusive
-      max: Int = Int.MaxValue) // inclusive
+      min: Int) // inclusive
+    : Unit = {
+    validateInt(key, isOptional, min, Int.MaxValue)
+  }
+
+  /**
+    * Validates an integer property. The boundaries are inclusive.
+    */
+  def validateInt(
+      key: String,
+      isOptional: Boolean,
+      min: Int, // inclusive
+      max: Int) // inclusive
     : Unit = {
 
     if (!properties.contains(key)) {
@@ -295,11 +586,35 @@ class DescriptorProperties(normalizeKeys: Boolean = true) {
     }
   }
 
+  /**
+    * Validates a long property.
+    */
+  def validateLong(
+      key: String,
+      isOptional: Boolean)
+    : Unit = {
+    validateLong(key, isOptional, Long.MinValue, Long.MaxValue)
+  }
+
+  /**
+    * Validates a long property. The boundaries are inclusive.
+    */
+  def validateLong(
+      key: String,
+      isOptional: Boolean,
+      min: Long) // inclusive
+    : Unit = {
+    validateLong(key, isOptional, min, Long.MaxValue)
+  }
+
+  /**
+    * Validates a long property. The boundaries are inclusive.
+    */
   def validateLong(
       key: String,
       isOptional: Boolean,
-      min: Long = Long.MinValue, // inclusive
-      max: Long = Long.MaxValue) // inclusive
+      min: Long, // inclusive
+      max: Long) // inclusive
     : Unit = {
 
     if (!properties.contains(key)) {
@@ -321,6 +636,9 @@ class DescriptorProperties(normalizeKeys: Boolean = true) {
     }
   }
 
+  /**
+    * Validates that a certain value is present under the given key.
+    */
   def validateValue(key: String, value: String, isOptional: Boolean): Unit = {
     if (!properties.contains(key)) {
       if (!isOptional) {
@@ -334,6 +652,9 @@ class DescriptorProperties(normalizeKeys: Boolean = true) {
     }
   }
 
+  /**
+    * Validates that a boolean value is present under the given key.
+    */
   def validateBoolean(key: String, isOptional: Boolean): Unit = {
     if (!properties.contains(key)) {
       if (!isOptional) {
@@ -348,11 +669,35 @@ class DescriptorProperties(normalizeKeys: Boolean = true) {
     }
   }
 
+  /**
+    * Validates a double property.
+    */
+  def validateDouble(
+      key: String,
+      isOptional: Boolean)
+    : Unit = {
+    validateDouble(key, isOptional, Double.MinValue, Double.MaxValue)
+  }
+
+  /**
+    * Validates a double property. The boundaries are inclusive.
+    */
+  def validateDouble(
+      key: String,
+      isOptional: Boolean,
+      min: Double) // inclusive
+    : Unit = {
+    validateDouble(key, isOptional, min, Double.MaxValue)
+  }
+
+  /**
+    * Validates a double property. The boundaries are inclusive.
+    */
   def validateDouble(
       key: String,
       isOptional: Boolean,
-      min: Double = Double.MinValue, // inclusive
-      max: Double = Double.MaxValue) // inclusive
+      min: Double, // inclusive
+      max: Double) // inclusive
     : Unit = {
 
     if (!properties.contains(key)) {
@@ -374,25 +719,117 @@ class DescriptorProperties(normalizeKeys: Boolean = true) {
     }
   }
 
+  /**
+    * Validation for variable indexed properties.
+    *
+    * For example:
+    *
+    * schema.fields.0.type = INT, schema.fields.0.name = test
+    * schema.fields.1.type = LONG
+    *
+    * The propertyKeys map defines e.g. "type" and a validation logic for the given full key.
+    *
+    * The validation consumer takes the current prefix e.g. "schema.fields.1.".
+    */
+  def validateVariableIndexedProperties(
+      key: String,
+      allowEmpty: Boolean,
+      propertyKeys: JMap[String, Consumer[String]],
+      requiredKeys: JList[String])
+    : Unit = {
+
+    val keys = propertyKeys.asScala
+
+    // filter for index
+    val escapedKey = Pattern.quote(key)
+    val pattern = Pattern.compile(s"$escapedKey\\.(\\d+)\\.(.*)")
+
+    // extract index and property keys
+    val indexes = properties.keys.flatMap { k =>
+      val matcher = pattern.matcher(k)
+      if (matcher.find()) {
+        Some(JInt.parseInt(matcher.group(1)))
+      } else {
+        None
+      }
+    }
+
+    // determine max index
+    val maxIndex = indexes.reduceOption(_ max _).getOrElse(-1)
+
+    if (maxIndex < 0 && !allowEmpty) {
+      throw new ValidationException(s"Property key '$key' must not be empty.")
+    }
+
+    // validate
+    for (i <- 0 to maxIndex) {
+      keys.foreach { case (subKey, validation) =>
+        val fullKey = s"$key.$i.$subKey"
+        // only validate if it exists
+        if (properties.contains(fullKey)) {
+          validation.accept(s"$key.$i.")
+        } else {
+          // check if it is required
+          if (requiredKeys.contains(subKey)) {
+            throw new ValidationException(s"Required property key '$fullKey' is missing.")
+          }
+        }
+      }
+    }
+  }
+
+  /**
+    * Validation for fixed indexed properties.
+    *
+    * For example:
+    *
+    * schema.fields.0.type = INT, schema.fields.0.name = test
+    * schema.fields.1.type = LONG, schema.fields.1.name = test2
+    *
+    * The propertyKeys map must define e.g. "type" and "name" and a validation logic for the
+    * given full key.
+    */
+  def validateFixedIndexedProperties(
+      key: String,
+      allowEmpty: Boolean,
+      propertyKeys: JMap[String, Consumer[String]])
+    : Unit = {
+
+    validateVariableIndexedProperties(
+      key,
+      allowEmpty,
+      propertyKeys,
+      new util.ArrayList(propertyKeys.keySet()))
+  }
+
+  /**
+    * Validates a table schema property.
+    */
   def validateTableSchema(key: String, isOptional: Boolean): Unit = {
-    // filter for name columns
-    val names = getIndexedProperty(key, NAME)
-    // filter for type columns
-    val types = getIndexedProperty(key, TYPE)
-    if (names.isEmpty && types.isEmpty && !isOptional) {
-      throw new ValidationException(
-        s"Could not find the required schema for property '$key'.")
+    val nameValidation = (prefix: String) => {
+      validateString(prefix + NAME, isOptional = false, minLen = 1)
     }
-    for (i <- 0 until Math.max(names.size, types.size)) {
-      validateString(s"$key.$i.$NAME", isOptional = false, minLen = 1)
-      validateType(s"$key.$i.$TYPE", isOptional = false)
+    val typeValidation = (prefix: String) => {
+      validateType(prefix + TYPE, isOptional = false)
     }
+
+    validateFixedIndexedProperties(
+      key,
+      isOptional,
+      Map(
+        NAME -> toJava(nameValidation),
+        TYPE -> toJava(typeValidation)
+      ).asJava
+    )
   }
 
+  /**
+    * Validates a enum property with a set of validation logic for each enum value.
+    */
   def validateEnum(
       key: String,
       isOptional: Boolean,
-      enumToValidation: Map[String, () => Unit])
+      enumToValidation: JMap[String, Consumer[String]])
     : Unit = {
 
     if (!properties.contains(key)) {
@@ -401,15 +838,26 @@ class DescriptorProperties(normalizeKeys: Boolean = true) {
       }
     } else {
       val value = properties(key)
-      if (!enumToValidation.contains(value)) {
+      if (!enumToValidation.containsKey(value)) {
         throw new ValidationException(s"Unknown value for property '$key'. " +
-          s"Supported values [${enumToValidation.keys.mkString(", ")}] but was: $value")
+          s"Supported values [${enumToValidation.keySet().asScala.mkString(", ")}] but was: $value")
       } else {
-        enumToValidation(value).apply() // run validation logic
+        // run validation logic
+        enumToValidation.get(value).accept(key)
       }
     }
   }
 
+  /**
+    * Validates a enum property with a set of enum values.
+    */
+  def validateEnumValues(key: String, isOptional: Boolean, values: JList[String]): Unit = {
+    validateEnum(key, isOptional, values.asScala.map((_, noValidation())).toMap.asJava)
+  }
+
+  /**
+    * Validates a type property.
+    */
   def validateType(key: String, isOptional: Boolean): Unit = {
     if (!properties.contains(key)) {
       if (!isOptional) {
@@ -420,6 +868,9 @@ class DescriptorProperties(normalizeKeys: Boolean = true) {
     }
   }
 
+  /**
+    * Validates that the given prefix is not included in these properties.
+    */
   def validatePrefixExclusion(prefix: String): Unit = {
     val invalidField = properties.find(_._1.startsWith(prefix))
     if (invalidField.isDefined) {
@@ -428,6 +879,9 @@ class DescriptorProperties(normalizeKeys: Boolean = true) {
     }
   }
 
+  /**
+    * Validates that the given key is not included in these properties.
+    */
   def validateExclusion(key: String): Unit = {
     if (properties.contains(key)) {
       throw new ValidationException(s"Property '$key' is not allowed in this context.")
@@ -436,28 +890,159 @@ class DescriptorProperties(normalizeKeys: Boolean = true) {
 
   // ----------------------------------------------------------------------------------------------
 
-  def getIndexedProperty(key: String, property: String): Map[String, String] = {
-    val escapedKey = Pattern.quote(key)
-    properties.filterKeys(k => k.matches(s"$escapedKey\\.\\d+\\.$property")).toMap
+  /**
+    * Returns if any property contains parts of a given string.
+    */
+  def containsString(str: String): Boolean = {
+    properties.exists(e => e._1.contains(str))
   }
 
-  def contains(str: String): Boolean = {
-    properties.exists(e => e._1.contains(str))
+  /**
+    * Returns if the given key is contained.
+    */
+  def containsKey(key: String): Boolean = {
+    properties.contains(key)
   }
 
+  /**
+    * Returns if a given prefix exists in the properties.
+    */
   def hasPrefix(prefix: String): Boolean = {
     properties.exists(e => e._1.startsWith(prefix))
   }
 
-  def asMap: Map[String, String] = {
-    properties.toMap
+  /**
+    * Returns a Scala Map.
+    */
+  def asMap: JMap[String, String] = {
+    properties.toMap.asJava
+  }
+
+  // ----------------------------------------------------------------------------------------------
+
+  /**
+    * Returns an empty validation logic.
+    */
+  def noValidation(): Consumer[String] = DescriptorProperties.emptyConsumer
+
+  def exceptionSupplier(key: String): Supplier[TableException] = new Supplier[TableException] {
+    override def get(): TableException = {
+      new TableException(s"Property with key '$key' could not be found. " +
+        s"This is a bug because the validation logic should have checked that before.")
+    }
+  }
+
+  // ----------------------------------------------------------------------------------------------
+
+  /**
+    * Adds a property.
+    */
+  private def put(key: String, value: String): Unit = {
+    if (properties.contains(key)) {
+      throw new IllegalStateException("Property already present.")
+    }
+    if (normalizeKeys) {
+      properties.put(key.toLowerCase, value)
+    } else {
+      properties.put(key, value)
+    }
+  }
+
+  /**
+    * Gets an existing property.
+    */
+  private def get(key: String): String = {
+    properties.getOrElse(
+      key,
+      throw exceptionSupplier(key).get())
+  }
+
+  /**
+    * Raw access to the underlying properties map for testing purposes.
+    */
+  private[flink] def unsafePut(key: String, value: String): Unit = {
+    properties.put(key, value)
+  }
+
+  /**
+    * Raw access to the underlying properties map for testing purposes.
+    */
+  private[flink] def unsafeRemove(key: String): Unit = {
+    properties.remove(key)
+  }
+
+  /**
+    *  Adds a table schema under the given key.
+    */
+  private def putTableSchema(key: String, nameAndType: Seq[(String, String)]): Unit = {
+    putIndexedFixedProperties(
+      key,
+      Seq(NAME, TYPE),
+      nameAndType.map(t => Seq(t._1, t._2))
+    )
+  }
+
+  /**
+    * Adds an indexed sequence of properties (with sub-properties) under a common key.
+    *
+    * For example:
+    *
+    * schema.fields.0.type = INT, schema.fields.0.name = test
+    * schema.fields.1.type = LONG, schema.fields.1.name = test2
+    *
+    * The arity of each propertyValue must match the arity of propertyKeys.
+    */
+  private def putIndexedFixedProperties(
+      key: String,
+      propertyKeys: Seq[String],
+      propertyValues: Seq[Seq[String]])
+    : Unit = {
+    checkNotNull(key)
+    checkNotNull(propertyValues)
+    propertyValues.zipWithIndex.foreach { case (values, idx) =>
+      if (values.lengthCompare(propertyKeys.size) != 0) {
+        throw new ValidationException("Values must have same arity as keys.")
+      }
+      values.zipWithIndex.foreach { case (value, keyIdx) =>
+          put(s"$key.$idx.${propertyKeys(keyIdx)}", value)
+      }
+    }
+  }
+
+  /**
+    * Adds an indexed mapping of properties under a common key.
+    *
+    * For example:
+    *
+    * schema.fields.0.type = INT, schema.fields.0.name = test
+    *                             schema.fields.1.name = test2
+    *
+    * The arity of the propertySets can differ.
+    */
+  private def putIndexedVariableProperties(
+      key: String,
+      propertySets: Seq[Map[String, String]])
+    : Unit = {
+    checkNotNull(key)
+    checkNotNull(propertySets)
+    propertySets.zipWithIndex.foreach { case (propertySet, idx) =>
+      propertySet.foreach { case (k, v) =>
+        put(s"$key.$idx.$k", v)
+      }
+    }
   }
 }
 
 object DescriptorProperties {
 
-  val TYPE = "type"
-  val NAME = "name"
+  private val emptyConsumer: Consumer[String] = new Consumer[String] {
+    override def accept(t: String): Unit = {
+      // nothing to do
+    }
+  }
+
+  val TYPE: String = "type"
+  val NAME: String = "name"
 
   // the string representation should be equal to SqlTypeName
   def normalizeTypeInfo(typeInfo: TypeInformation[_]): String = {
@@ -487,6 +1072,24 @@ object DescriptorProperties {
     }
   }
 
+  def deserialize[T](data: String, expected: Class[T]): T = {
+    try {
+      val byteData = Base64.decodeBase64(data)
+      val obj = InstantiationUtil.deserializeObject[T](
+        byteData,
+        Thread.currentThread.getContextClassLoader)
+      if (!expected.isAssignableFrom(obj.getClass)) {
+        throw new ValidationException(
+          s"Serialized data contains an object of unexpected type. " +
+          s"Expected '${expected.getName}' but was '${obj.getClass.getName}'")
+      }
+      obj
+    } catch {
+      case e: Exception =>
+        throw new ValidationException(s"Could not deserialize data: '$data'", e)
+    }
+  }
+
   def toString(keyOrValue: String): String = {
     StringEscapeUtils.escapeJava(keyOrValue)
   }
@@ -494,4 +1097,24 @@ object DescriptorProperties {
   def toString(key: String, value: String): String = {
     toString(key) + "=" + toString(value)
   }
+
+  // the following methods help for Scala <-> Java interfaces
+  // most of these methods are not necessary once we upgraded to Scala 2.12
+
+  def toJava[T](option: Option[T]): Optional[T] = option match {
+    case Some(v) => Optional.of(v)
+    case None => Optional.empty()
+  }
+
+  def toScala[T](option: Optional[T]): Option[T] = Option(option.orElse(null.asInstanceOf[T]))
+
+  def toJava[T](func: Function[T, Unit]): Consumer[T] = new Consumer[T] {
+    override def accept(t: T): Unit = {
+      func.apply(t)
+    }
+  }
+
+  def toJava[T0, T1](tuple: (T0, T1)): JTuple2[T0, T1] = {
+    new JTuple2[T0, T1](tuple._1, tuple._2)
+  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/db2c510f/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/FileSystem.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/FileSystem.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/FileSystem.scala
index b1d900f..f306b5a 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/FileSystem.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/FileSystem.scala
@@ -23,7 +23,8 @@ import org.apache.flink.table.descriptors.FileSystemValidator.{CONNECTOR_PATH, C
 /**
   * Connector descriptor for a file system.
   */
-class FileSystem extends ConnectorDescriptor(CONNECTOR_TYPE_VALUE, version = 1) {
+class FileSystem extends ConnectorDescriptor(
+    CONNECTOR_TYPE_VALUE, version = 1, formatNeeded = true) {
 
   private var path: Option[String] = None
 
@@ -43,8 +44,6 @@ class FileSystem extends ConnectorDescriptor(CONNECTOR_TYPE_VALUE, version = 1)
   override protected def addConnectorProperties(properties: DescriptorProperties): Unit = {
     path.foreach(properties.putString(CONNECTOR_PATH, _))
   }
-
-  override private[flink] def needsFormat() = true
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/flink/blob/db2c510f/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/FormatDescriptor.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/FormatDescriptor.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/FormatDescriptor.scala
index 86f6229..bca67c6 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/FormatDescriptor.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/FormatDescriptor.scala
@@ -18,7 +18,7 @@
 
 package org.apache.flink.table.descriptors
 
-import org.apache.flink.table.descriptors.FormatDescriptorValidator.{FORMAT_TYPE, FORMAT_VERSION}
+import org.apache.flink.table.descriptors.FormatDescriptorValidator.{FORMAT_TYPE, FORMAT_PROPERTY_VERSION}
 
 /**
   * Describes the format of data.
@@ -37,7 +37,7 @@ abstract class FormatDescriptor(
     */
   final private[flink] def addProperties(properties: DescriptorProperties): Unit = {
     properties.putString(FORMAT_TYPE, tpe)
-    properties.putInt(FORMAT_VERSION, version)
+    properties.putInt(FORMAT_PROPERTY_VERSION, version)
     addFormatProperties(properties)
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/db2c510f/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/FormatDescriptorValidator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/FormatDescriptorValidator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/FormatDescriptorValidator.scala
index 1aaa399..301189a 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/FormatDescriptorValidator.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/FormatDescriptorValidator.scala
@@ -18,7 +18,7 @@
 
 package org.apache.flink.table.descriptors
 
-import org.apache.flink.table.descriptors.FormatDescriptorValidator.{FORMAT_TYPE, FORMAT_VERSION}
+import org.apache.flink.table.descriptors.FormatDescriptorValidator.{FORMAT_PROPERTY_VERSION, FORMAT_TYPE}
 
 /**
   * Validator for [[FormatDescriptor]].
@@ -27,13 +27,32 @@ class FormatDescriptorValidator extends DescriptorValidator {
 
   override def validate(properties: DescriptorProperties): Unit = {
     properties.validateString(FORMAT_TYPE, isOptional = false, minLen = 1)
-    properties.validateInt(FORMAT_VERSION, isOptional = true, 0, Integer.MAX_VALUE)
+    properties.validateInt(FORMAT_PROPERTY_VERSION, isOptional = true, 0, Integer.MAX_VALUE)
   }
 }
 
 object FormatDescriptorValidator {
 
+  /**
+    * Key for describing the type of the format. Usually used for factory discovery.
+    */
   val FORMAT_TYPE = "format.type"
+
+  /**
+    *  Key for describing the property version. This property can be used for backwards
+    *  compatibility in case the property format changes.
+    */
+  val FORMAT_PROPERTY_VERSION = "format.property-version"
+
+  /**
+    * Key for describing the version of the format. This property can be used for different
+    * format versions (e.g. Avro 1.8.2 or Avro 2.0).
+    */
   val FORMAT_VERSION = "format.version"
 
+  /**
+    * Key for deriving the schema of the format from the table's schema.
+    */
+  val FORMAT_DERIVE_SCHEMA = "format.derive-schema"
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/db2c510f/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Json.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Json.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Json.scala
deleted file mode 100644
index cc46d9c..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Json.scala
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.descriptors
-
-import org.apache.flink.table.descriptors.JsonValidator.{FORMAT_FAIL_ON_MISSING_FIELD, FORMAT_SCHEMA_STRING, FORMAT_TYPE_VALUE}
-
-/**
-  * Encoding descriptor for JSON.
-  */
-class Json extends FormatDescriptor(FORMAT_TYPE_VALUE, version = 1) {
-
-  private var failOnMissingField: Option[Boolean] = None
-
-  private var schema: Option[String] = None
-
-  /**
-    * Sets flag whether to fail if a field is missing or not.
-    *
-    * @param failOnMissingField If set to true, the operation fails if there is a missing field.
-    *                           If set to false, a missing field is set to null.
-    * @return The builder.
-    */
-  def failOnMissingField(failOnMissingField: Boolean): Json = {
-    this.failOnMissingField = Some(failOnMissingField)
-    this
-  }
-
-  /**
-    * Sets the JSON schema string with field names and the types according to the JSON schema
-    * specification [[http://json-schema.org/specification.html]]. Required.
-    *
-    * The schema might be nested.
-    *
-    * @param schema JSON schema
-    */
-  def schema(schema: String): Json = {
-    this.schema = Some(schema)
-    this
-  }
-
-  /**
-    * Internal method for format properties conversion.
-    */
-  override protected def addFormatProperties(properties: DescriptorProperties): Unit = {
-    // we distinguish between "schema string" and "schema" to allow parsing of a
-    // schema object in the future (such that the entire JSON schema can be defined in a YAML
-    // file instead of one large string)
-    schema.foreach(properties.putString(FORMAT_SCHEMA_STRING, _))
-    failOnMissingField.foreach(properties.putBoolean(FORMAT_FAIL_ON_MISSING_FIELD, _))
-  }
-}
-
-/**
-  * Encoding descriptor for JSON.
-  */
-object Json {
-
-  /**
-    * Encoding descriptor for JSON.
-    */
-  def apply(): Json = new Json()
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/db2c510f/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/JsonValidator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/JsonValidator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/JsonValidator.scala
deleted file mode 100644
index 9f11caf..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/JsonValidator.scala
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.descriptors
-
-import org.apache.flink.table.descriptors.JsonValidator.{FORMAT_FAIL_ON_MISSING_FIELD, FORMAT_SCHEMA_STRING}
-
-/**
-  * Validator for [[Json]].
-  */
-class JsonValidator extends FormatDescriptorValidator {
-
-  override def validate(properties: DescriptorProperties): Unit = {
-    super.validate(properties)
-    properties.validateString(FORMAT_SCHEMA_STRING, isOptional = false, minLen = 1)
-    properties.validateBoolean(FORMAT_FAIL_ON_MISSING_FIELD, isOptional = true)
-  }
-}
-
-object JsonValidator {
-
-  val FORMAT_TYPE_VALUE = "json"
-  val FORMAT_SCHEMA_STRING = "format.schema-string"
-  val FORMAT_FAIL_ON_MISSING_FIELD = "format.fail-on-missing-field"
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/db2c510f/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/MetadataValidator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/MetadataValidator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/MetadataValidator.scala
index a8d580c..6631e22 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/MetadataValidator.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/MetadataValidator.scala
@@ -18,7 +18,7 @@
 
 package org.apache.flink.table.descriptors
 
-import org.apache.flink.table.descriptors.MetadataValidator.{METADATA_COMMENT, METADATA_CREATION_TIME, METADATA_LAST_ACCESS_TIME, METADATA_VERSION}
+import org.apache.flink.table.descriptors.MetadataValidator.{METADATA_COMMENT, METADATA_CREATION_TIME, METADATA_LAST_ACCESS_TIME, METADATA_PROPERTY_VERSION}
 
 /**
   * Validator for [[Metadata]].
@@ -26,7 +26,7 @@ import org.apache.flink.table.descriptors.MetadataValidator.{METADATA_COMMENT, M
 class MetadataValidator extends DescriptorValidator {
 
   override def validate(properties: DescriptorProperties): Unit = {
-    properties.validateInt(METADATA_VERSION, isOptional = true, 0, Integer.MAX_VALUE)
+    properties.validateInt(METADATA_PROPERTY_VERSION, isOptional = true, 0, Integer.MAX_VALUE)
     properties.validateString(METADATA_COMMENT, isOptional = true)
     properties.validateLong(METADATA_CREATION_TIME, isOptional = true)
     properties.validateLong(METADATA_LAST_ACCESS_TIME, isOptional = true)
@@ -35,7 +35,7 @@ class MetadataValidator extends DescriptorValidator {
 
 object MetadataValidator {
 
-  val METADATA_VERSION = "metadata.version"
+  val METADATA_PROPERTY_VERSION = "metadata.property-version"
   val METADATA_COMMENT = "metadata.comment"
   val METADATA_CREATION_TIME = "metadata.creation-time"
   val METADATA_LAST_ACCESS_TIME = "metadata.last-access-time"

http://git-wip-us.apache.org/repos/asf/flink/blob/db2c510f/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Rowtime.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Rowtime.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Rowtime.scala
index a1c80f5..ed3854d 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Rowtime.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Rowtime.scala
@@ -19,11 +19,12 @@
 package org.apache.flink.table.descriptors
 
 import org.apache.flink.table.api.Types
-import org.apache.flink.table.descriptors.RowtimeValidator.{ROWTIME, ROWTIME_VERSION, normalizeTimestampExtractor, normalizeWatermarkStrategy}
+import org.apache.flink.table.descriptors.RowtimeValidator.{normalizeTimestampExtractor, normalizeWatermarkStrategy}
 import org.apache.flink.table.sources.tsextractors.{ExistingField, StreamRecordTimestamp, TimestampExtractor}
 import org.apache.flink.table.sources.wmstrategies.{AscendingTimestamps, BoundedOutOfOrderTimestamps, PreserveWatermarks, WatermarkStrategy}
 
 import scala.collection.mutable
+import scala.collection.JavaConverters._
 
 /**
   * Rowtime descriptor for describing an event time attribute in the schema.
@@ -111,12 +112,9 @@ class Rowtime extends Descriptor {
     */
   final override def addProperties(properties: DescriptorProperties): Unit = {
     val props = mutable.HashMap[String, String]()
-    props.put(ROWTIME_VERSION, "1")
     timestampExtractor.foreach(normalizeTimestampExtractor(_).foreach(e => props.put(e._1, e._2)))
     watermarkStrategy.foreach(normalizeWatermarkStrategy(_).foreach(e => props.put(e._1, e._2)))
-
-    // use a list for the rowtime to support multiple rowtime attributes in the future
-    properties.putIndexedVariableProperties(ROWTIME, Seq(props.toMap))
+    properties.putProperties(props.toMap.asJava)
   }
 }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/db2c510f/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/RowtimeValidator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/RowtimeValidator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/RowtimeValidator.scala
index 74e49f1..fdec820 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/RowtimeValidator.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/RowtimeValidator.scala
@@ -18,58 +18,62 @@
 
 package org.apache.flink.table.descriptors
 
-import org.apache.flink.table.descriptors.DescriptorProperties.serialize
+import org.apache.flink.table.descriptors.DescriptorProperties.{serialize, toJava}
 import org.apache.flink.table.descriptors.RowtimeValidator._
 import org.apache.flink.table.sources.tsextractors.{ExistingField, StreamRecordTimestamp, TimestampExtractor}
 import org.apache.flink.table.sources.wmstrategies.{AscendingTimestamps, BoundedOutOfOrderTimestamps, PreserveWatermarks, WatermarkStrategy}
 
+import scala.collection.JavaConverters._
+
 /**
   * Validator for [[Rowtime]].
   */
 class RowtimeValidator(val prefix: String = "") extends DescriptorValidator {
 
   override def validate(properties: DescriptorProperties): Unit = {
-    properties.validateInt(prefix + ROWTIME_VERSION, isOptional = true, 0, Integer.MAX_VALUE)
-
-    val noValidation = () => {}
-
-    val timestampExistingField = () => {
-      properties.validateString(prefix + TIMESTAMPS_FROM, isOptional = false, minLen = 1)
+    val timestampExistingField = (_: String) => {
+      properties.validateString(
+        prefix + ROWTIME_TIMESTAMPS_FROM, isOptional = false, minLen = 1)
     }
 
-    val timestampCustom = () => {
-      properties.validateString(prefix + TIMESTAMPS_CLASS, isOptional = false, minLen = 1)
-      properties.validateString(prefix + TIMESTAMPS_SERIALIZED, isOptional = false, minLen = 1)
+    val timestampCustom = (_: String) => {
+      properties.validateString(
+        prefix + ROWTIME_TIMESTAMPS_CLASS, isOptional = false, minLen = 1)
+      properties.validateString(
+        prefix + ROWTIME_TIMESTAMPS_SERIALIZED, isOptional = false, minLen = 1)
     }
 
     properties.validateEnum(
-      prefix + TIMESTAMPS_TYPE,
+      prefix + ROWTIME_TIMESTAMPS_TYPE,
       isOptional = false,
       Map(
-        TIMESTAMPS_TYPE_VALUE_FROM_FIELD -> timestampExistingField,
-        TIMESTAMPS_TYPE_VALUE_FROM_SOURCE -> noValidation,
-        TIMESTAMPS_TYPE_VALUE_CUSTOM -> timestampCustom
-      )
+        ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_FIELD -> toJava(timestampExistingField),
+        ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_SOURCE -> properties.noValidation(),
+        ROWTIME_TIMESTAMPS_TYPE_VALUE_CUSTOM -> toJava(timestampCustom)
+      ).asJava
     )
 
-    val watermarkPeriodicBounding = () => {
-      properties.validateLong(prefix + WATERMARKS_DELAY, isOptional = false, min = 0)
+    val watermarkPeriodicBounded = (_: String) => {
+      properties.validateLong(
+        prefix + ROWTIME_WATERMARKS_DELAY, isOptional = false, min = 0)
     }
 
-    val watermarkCustom = () => {
-      properties.validateString(prefix + WATERMARKS_CLASS, isOptional = false, minLen = 1)
-      properties.validateString(prefix + WATERMARKS_SERIALIZED, isOptional = false, minLen = 1)
+    val watermarkCustom = (_: String) => {
+      properties.validateString(
+        prefix + ROWTIME_WATERMARKS_CLASS, isOptional = false, minLen = 1)
+      properties.validateString(
+        prefix + ROWTIME_WATERMARKS_SERIALIZED, isOptional = false, minLen = 1)
     }
 
     properties.validateEnum(
-      prefix + WATERMARKS_TYPE,
+      prefix + ROWTIME_WATERMARKS_TYPE,
       isOptional = false,
       Map(
-        WATERMARKS_TYPE_VALUE_PERIODIC_ASCENDING -> noValidation,
-        WATERMARKS_TYPE_VALUE_PERIODIC_BOUNDING -> watermarkPeriodicBounding,
-        WATERMARKS_TYPE_VALUE_FROM_SOURCE -> noValidation,
-        WATERMARKS_TYPE_VALUE_CUSTOM -> watermarkCustom
-      )
+        ROWTIME_WATERMARKS_TYPE_VALUE_PERIODIC_ASCENDING -> properties.noValidation(),
+        ROWTIME_WATERMARKS_TYPE_VALUE_PERIODIC_BOUNDED -> toJava(watermarkPeriodicBounded),
+        ROWTIME_WATERMARKS_TYPE_VALUE_FROM_SOURCE -> properties.noValidation(),
+        ROWTIME_WATERMARKS_TYPE_VALUE_CUSTOM -> toJava(watermarkCustom)
+      ).asJava
     )
   }
 }
@@ -77,58 +81,113 @@ class RowtimeValidator(val prefix: String = "") extends DescriptorValidator {
 object RowtimeValidator {
 
   val ROWTIME = "rowtime"
-
-  // per rowtime properties
-
-  val ROWTIME_VERSION = "version"
-  val TIMESTAMPS_TYPE = "timestamps.type"
-  val TIMESTAMPS_TYPE_VALUE_FROM_FIELD = "from-field"
-  val TIMESTAMPS_TYPE_VALUE_FROM_SOURCE = "from-source"
-  val TIMESTAMPS_TYPE_VALUE_CUSTOM = "custom"
-  val TIMESTAMPS_FROM = "timestamps.from"
-  val TIMESTAMPS_CLASS = "timestamps.class"
-  val TIMESTAMPS_SERIALIZED = "timestamps.serialized"
-
-  val WATERMARKS_TYPE = "watermarks.type"
-  val WATERMARKS_TYPE_VALUE_PERIODIC_ASCENDING = "periodic-ascending"
-  val WATERMARKS_TYPE_VALUE_PERIODIC_BOUNDING = "periodic-bounding"
-  val WATERMARKS_TYPE_VALUE_FROM_SOURCE = "from-source"
-  val WATERMARKS_TYPE_VALUE_CUSTOM = "custom"
-  val WATERMARKS_CLASS = "watermarks.class"
-  val WATERMARKS_SERIALIZED = "watermarks.serialized"
-  val WATERMARKS_DELAY = "watermarks.delay"
+  val ROWTIME_TIMESTAMPS_TYPE = "rowtime.timestamps.type"
+  val ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_FIELD = "from-field"
+  val ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_SOURCE = "from-source"
+  val ROWTIME_TIMESTAMPS_TYPE_VALUE_CUSTOM = "custom"
+  val ROWTIME_TIMESTAMPS_FROM = "rowtime.timestamps.from"
+  val ROWTIME_TIMESTAMPS_CLASS = "rowtime.timestamps.class"
+  val ROWTIME_TIMESTAMPS_SERIALIZED = "rowtime.timestamps.serialized"
+
+  val ROWTIME_WATERMARKS_TYPE = "rowtime.watermarks.type"
+  val ROWTIME_WATERMARKS_TYPE_VALUE_PERIODIC_ASCENDING = "periodic-ascending"
+  val ROWTIME_WATERMARKS_TYPE_VALUE_PERIODIC_BOUNDED = "periodic-bounded"
+  val ROWTIME_WATERMARKS_TYPE_VALUE_FROM_SOURCE = "from-source"
+  val ROWTIME_WATERMARKS_TYPE_VALUE_CUSTOM = "custom"
+  val ROWTIME_WATERMARKS_CLASS = "rowtime.watermarks.class"
+  val ROWTIME_WATERMARKS_SERIALIZED = "rowtime.watermarks.serialized"
+  val ROWTIME_WATERMARKS_DELAY = "rowtime.watermarks.delay"
 
   // utilities
 
   def normalizeTimestampExtractor(extractor: TimestampExtractor): Map[String, String] =
     extractor match {
+
         case existing: ExistingField =>
           Map(
-            TIMESTAMPS_TYPE -> TIMESTAMPS_TYPE_VALUE_FROM_FIELD,
-            TIMESTAMPS_FROM -> existing.getArgumentFields.apply(0))
+            ROWTIME_TIMESTAMPS_TYPE -> ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_FIELD,
+            ROWTIME_TIMESTAMPS_FROM -> existing.getArgumentFields.apply(0))
+
         case _: StreamRecordTimestamp =>
-          Map(TIMESTAMPS_TYPE -> TIMESTAMPS_TYPE_VALUE_FROM_SOURCE)
+          Map(ROWTIME_TIMESTAMPS_TYPE -> ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_SOURCE)
+
         case _: TimestampExtractor =>
           Map(
-            TIMESTAMPS_TYPE -> TIMESTAMPS_TYPE_VALUE_CUSTOM,
-            TIMESTAMPS_CLASS -> extractor.getClass.getName,
-            TIMESTAMPS_SERIALIZED -> serialize(extractor))
+            ROWTIME_TIMESTAMPS_TYPE -> ROWTIME_TIMESTAMPS_TYPE_VALUE_CUSTOM,
+            ROWTIME_TIMESTAMPS_CLASS -> extractor.getClass.getName,
+            ROWTIME_TIMESTAMPS_SERIALIZED -> serialize(extractor))
     }
 
   def normalizeWatermarkStrategy(strategy: WatermarkStrategy): Map[String, String] =
     strategy match {
+
       case _: AscendingTimestamps =>
-        Map(WATERMARKS_TYPE -> WATERMARKS_TYPE_VALUE_PERIODIC_ASCENDING)
-      case bounding: BoundedOutOfOrderTimestamps =>
+        Map(ROWTIME_WATERMARKS_TYPE -> ROWTIME_WATERMARKS_TYPE_VALUE_PERIODIC_ASCENDING)
+
+      case bounded: BoundedOutOfOrderTimestamps =>
         Map(
-          WATERMARKS_TYPE -> WATERMARKS_TYPE_VALUE_PERIODIC_BOUNDING,
-          WATERMARKS_DELAY -> bounding.delay.toString)
+          ROWTIME_WATERMARKS_TYPE -> ROWTIME_WATERMARKS_TYPE_VALUE_PERIODIC_BOUNDED,
+          ROWTIME_WATERMARKS_DELAY -> bounded.delay.toString)
+
       case _: PreserveWatermarks =>
-        Map(WATERMARKS_TYPE -> WATERMARKS_TYPE_VALUE_FROM_SOURCE)
+        Map(ROWTIME_WATERMARKS_TYPE -> ROWTIME_WATERMARKS_TYPE_VALUE_FROM_SOURCE)
+
       case _: WatermarkStrategy =>
         Map(
-          WATERMARKS_TYPE -> WATERMARKS_TYPE_VALUE_CUSTOM,
-          WATERMARKS_CLASS -> strategy.getClass.getName,
-          WATERMARKS_SERIALIZED -> serialize(strategy))
+          ROWTIME_WATERMARKS_TYPE -> ROWTIME_WATERMARKS_TYPE_VALUE_CUSTOM,
+          ROWTIME_WATERMARKS_CLASS -> strategy.getClass.getName,
+          ROWTIME_WATERMARKS_SERIALIZED -> serialize(strategy))
     }
+
+  def getRowtimeComponents(properties: DescriptorProperties, prefix: String)
+    : Option[(TimestampExtractor, WatermarkStrategy)] = {
+
+    // create timestamp extractor
+    val t = properties.getOptionalString(prefix + ROWTIME_TIMESTAMPS_TYPE)
+    if (!t.isPresent) {
+      return None
+    }
+    val extractor: TimestampExtractor = t.get() match {
+
+      case ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_FIELD =>
+        val field = properties.getString(prefix + ROWTIME_TIMESTAMPS_FROM)
+        new ExistingField(field)
+
+      case ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_SOURCE =>
+        new StreamRecordTimestamp
+
+      case ROWTIME_TIMESTAMPS_TYPE_VALUE_CUSTOM =>
+        val clazz = properties.getClass(
+          ROWTIME_TIMESTAMPS_CLASS,
+          classOf[TimestampExtractor])
+        DescriptorProperties.deserialize(
+          properties.getString(prefix + ROWTIME_TIMESTAMPS_SERIALIZED),
+          clazz)
+    }
+
+    // create watermark strategy
+    val s = properties.getString(prefix + ROWTIME_WATERMARKS_TYPE)
+    val strategy: WatermarkStrategy = s match {
+
+      case ROWTIME_WATERMARKS_TYPE_VALUE_PERIODIC_ASCENDING =>
+        new AscendingTimestamps()
+
+      case ROWTIME_WATERMARKS_TYPE_VALUE_PERIODIC_BOUNDED =>
+        val delay = properties.getLong(prefix + ROWTIME_WATERMARKS_DELAY)
+        new BoundedOutOfOrderTimestamps(delay)
+
+      case ROWTIME_WATERMARKS_TYPE_VALUE_FROM_SOURCE =>
+        PreserveWatermarks.INSTANCE
+
+      case ROWTIME_WATERMARKS_TYPE_VALUE_CUSTOM =>
+        val clazz = properties.getClass(
+          prefix + ROWTIME_WATERMARKS_CLASS,
+          classOf[WatermarkStrategy])
+        DescriptorProperties.deserialize(
+          properties.getString(prefix + ROWTIME_WATERMARKS_SERIALIZED),
+          clazz)
+    }
+
+    Some((extractor, strategy))
+  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/db2c510f/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Schema.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Schema.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Schema.scala
index 2f3a389..fcbb2c7 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Schema.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Schema.scala
@@ -24,6 +24,7 @@ import org.apache.flink.table.descriptors.DescriptorProperties.{normalizeTableSc
 import org.apache.flink.table.descriptors.SchemaValidator._
 
 import scala.collection.mutable
+import scala.collection.JavaConverters._
 
 /**
   * Describes a schema of a table.
@@ -80,7 +81,7 @@ class Schema extends Descriptor {
     }
 
     val fieldProperties = mutable.LinkedHashMap[String, String]()
-    fieldProperties += (TYPE -> fieldType)
+    fieldProperties += (SCHEMA_TYPE -> fieldType)
 
     tableSchema += (fieldName -> fieldProperties)
 
@@ -100,7 +101,7 @@ class Schema extends Descriptor {
     lastField match {
       case None => throw new ValidationException("No field previously defined. Use field() before.")
       case Some(f) =>
-        tableSchema(f) += (FROM -> originFieldName)
+        tableSchema(f) += (SCHEMA_FROM -> originFieldName)
         lastField = None
     }
     this
@@ -115,7 +116,7 @@ class Schema extends Descriptor {
     lastField match {
       case None => throw new ValidationException("No field defined previously. Use field() before.")
       case Some(f) =>
-        tableSchema(f) += (PROCTIME -> PROCTIME_VALUE_TRUE)
+        tableSchema(f) += (SCHEMA_PROCTIME -> "true")
         lastField = None
     }
     this
@@ -132,7 +133,7 @@ class Schema extends Descriptor {
       case Some(f) =>
         val fieldProperties = new DescriptorProperties()
         rowtime.addProperties(fieldProperties)
-        tableSchema(f) ++= fieldProperties.asMap
+        tableSchema(f) ++= fieldProperties.asMap.asScala
         lastField = None
     }
     this
@@ -142,12 +143,11 @@ class Schema extends Descriptor {
     * Internal method for properties conversion.
     */
   final override private[flink] def addProperties(properties: DescriptorProperties): Unit = {
-    properties.putInt(SCHEMA_VERSION, 1)
     properties.putIndexedVariableProperties(
       SCHEMA,
       tableSchema.toSeq.map { case (name, props) =>
-        Map(NAME -> name) ++ props
-      }
+        (Map(SCHEMA_NAME -> name) ++ props).asJava
+      }.asJava
     )
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/db2c510f/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/SchemaValidator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/SchemaValidator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/SchemaValidator.scala
index 19c0e41..0a23911 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/SchemaValidator.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/SchemaValidator.scala
@@ -18,9 +18,17 @@
 
 package org.apache.flink.table.descriptors
 
-import org.apache.flink.table.api.ValidationException
-import org.apache.flink.table.descriptors.RowtimeValidator.ROWTIME
+import java.util
+import java.util.Optional
+
+import org.apache.flink.table.api.{TableSchema, ValidationException}
+import org.apache.flink.table.descriptors.DescriptorProperties.{toJava, toScala}
+import org.apache.flink.table.descriptors.RowtimeValidator.{ROWTIME, ROWTIME_TIMESTAMPS_TYPE}
 import org.apache.flink.table.descriptors.SchemaValidator._
+import org.apache.flink.table.sources.RowtimeAttributeDescriptor
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
 
 /**
   * Validator for [[Schema]].
@@ -28,29 +36,39 @@ import org.apache.flink.table.descriptors.SchemaValidator._
 class SchemaValidator(isStreamEnvironment: Boolean = true) extends DescriptorValidator {
 
   override def validate(properties: DescriptorProperties): Unit = {
-    properties.validateInt(SCHEMA_VERSION, isOptional = true, 0, Integer.MAX_VALUE)
-
-    val names = properties.getIndexedProperty(SCHEMA, NAME)
-    val types = properties.getIndexedProperty(SCHEMA, TYPE)
+    val names = properties.getIndexedProperty(SCHEMA, SCHEMA_NAME)
+    val types = properties.getIndexedProperty(SCHEMA, SCHEMA_TYPE)
 
     if (names.isEmpty && types.isEmpty) {
-      throw new ValidationException(s"Could not find the required schema for property '$SCHEMA'.")
+      throw new ValidationException(
+        s"Could not find the required schema in property '$SCHEMA'.")
     }
 
+    var proctimeFound = false
+
     for (i <- 0 until Math.max(names.size, types.size)) {
-      properties.validateString(s"$SCHEMA.$i.$NAME", isOptional = false, minLen = 1)
-      properties.validateType(s"$SCHEMA.$i.$TYPE", isOptional = false)
-      properties.validateString(s"$SCHEMA.$i.$FROM", isOptional = true, minLen = 1)
+      properties
+        .validateString(s"$SCHEMA.$i.$SCHEMA_NAME", isOptional = false, minLen = 1)
+      properties
+        .validateType(s"$SCHEMA.$i.$SCHEMA_TYPE", isOptional = false)
+      properties
+        .validateString(s"$SCHEMA.$i.$SCHEMA_FROM", isOptional = true, minLen = 1)
       // either proctime or rowtime
-      val proctime = s"$SCHEMA.$i.$PROCTIME"
+      val proctime = s"$SCHEMA.$i.$SCHEMA_PROCTIME"
       val rowtime = s"$SCHEMA.$i.$ROWTIME"
-      if (properties.contains(proctime)) {
+      if (properties.containsKey(proctime)) {
+        // check the environment
         if (!isStreamEnvironment) {
           throw new ValidationException(
             s"Property '$proctime' is not allowed in a batch environment.")
         }
+        // check for only one proctime attribute
+        else if (proctimeFound) {
+          throw new ValidationException("A proctime attribute must only be defined once.")
+        }
         // check proctime
         properties.validateBoolean(proctime, isOptional = false)
+        proctimeFound = properties.getBoolean(proctime)
         // no rowtime
         properties.validatePrefixExclusion(rowtime)
       } else if (properties.hasPrefix(rowtime)) {
@@ -67,14 +85,129 @@ class SchemaValidator(isStreamEnvironment: Boolean = true) extends DescriptorVal
 object SchemaValidator {
 
   val SCHEMA = "schema"
-  val SCHEMA_VERSION = "schema.version"
+  val SCHEMA_NAME = "name"
+  val SCHEMA_TYPE = "type"
+  val SCHEMA_PROCTIME = "proctime"
+  val SCHEMA_FROM = "from"
+
+  // utilities
+
+  /**
+    * Finds the proctime attribute if defined.
+    */
+  def deriveProctimeAttribute(properties: DescriptorProperties): Optional[String] = {
+    val names = properties.getIndexedProperty(SCHEMA, SCHEMA_NAME)
+
+    for (i <- 0 until names.size) {
+      val isProctime = toScala(
+        properties.getOptionalBoolean(s"$SCHEMA.$i.$SCHEMA_PROCTIME"))
+      isProctime.foreach { isSet =>
+        if (isSet) {
+          return toJava(names.asScala.get(s"$SCHEMA.$i.$SCHEMA_NAME"))
+        }
+      }
+    }
+    toJava(None)
+  }
+
+  /**
+    * Finds the rowtime attributes if defined.
+    */
+  def deriveRowtimeAttributes(properties: DescriptorProperties)
+    : util.List[RowtimeAttributeDescriptor] = {
+
+    val names = properties.getIndexedProperty(SCHEMA, SCHEMA_NAME)
+
+    var attributes = new mutable.ArrayBuffer[RowtimeAttributeDescriptor]()
+
+    // check for rowtime in every field
+    for (i <- 0 until names.size) {
+      RowtimeValidator
+        .getRowtimeComponents(properties, s"$SCHEMA.$i.")
+        .foreach { case (extractor, strategy) =>
+          // create descriptor
+          attributes += new RowtimeAttributeDescriptor(
+            properties.getString(s"$SCHEMA.$i.$SCHEMA_NAME"),
+            extractor,
+            strategy)
+        }
+    }
+
+    attributes.asJava
+  }
+
+  /**
+    * Finds a table source field mapping.
+    */
+  def deriveFieldMapping(
+      properties: DescriptorProperties,
+      sourceSchema: Optional[TableSchema])
+    : util.Map[String, String] = {
+
+    val mapping = mutable.Map[String, String]()
+
+    val schema = properties.getTableSchema(SCHEMA)
+
+    // add all schema fields first for implicit mappings
+    schema.getColumnNames.foreach { name =>
+      mapping.put(name, name)
+    }
+
+    val names = properties.getIndexedProperty(SCHEMA, SCHEMA_NAME)
+
+    for (i <- 0 until names.size) {
+      val name = properties.getString(s"$SCHEMA.$i.$SCHEMA_NAME")
+      toScala(properties.getOptionalString(s"$SCHEMA.$i.$SCHEMA_FROM")) match {
 
-  // per column properties
+        // add explicit mapping
+        case Some(source) =>
+          mapping.put(name, source)
 
-  val NAME = "name"
-  val TYPE = "type"
-  val PROCTIME = "proctime"
-  val PROCTIME_VALUE_TRUE = "true"
-  val FROM = "from"
+        // implicit mapping or time
+        case None =>
+          val isProctime = properties
+            .getOptionalBoolean(s"$SCHEMA.$i.$SCHEMA_PROCTIME")
+            .orElse(false)
+          val isRowtime = properties
+            .containsKey(s"$SCHEMA.$i.$ROWTIME_TIMESTAMPS_TYPE")
+          // remove proctime/rowtime from mapping
+          if (isProctime || isRowtime) {
+            mapping.remove(name)
+          }
+          // check for invalid fields
+          else if (toScala(sourceSchema).forall(s => !s.getColumnNames.contains(name))) {
+            throw new ValidationException(s"Could not map the schema field '$name' to a field " +
+              s"from source. Please specify the source field from which it can be derived.")
+          }
+      }
+    }
 
+    mapping.toMap.asJava
+  }
+
+  /**
+    * Finds the fields that can be used for a format schema (without time attributes).
+    */
+  def deriveFormatFields(properties: DescriptorProperties): TableSchema = {
+
+    val builder = TableSchema.builder()
+
+    val schema = properties.getTableSchema(SCHEMA)
+
+    schema.getColumnNames.zip(schema.getTypes).zipWithIndex.foreach { case ((n, t), i) =>
+      val isProctime = properties
+        .getOptionalBoolean(s"$SCHEMA.$i.$SCHEMA_PROCTIME")
+        .orElse(false)
+      val isRowtime = properties
+        .containsKey(s"$SCHEMA.$i.$ROWTIME_TIMESTAMPS_TYPE")
+      if (!isProctime && !isRowtime) {
+        // check for a aliasing
+        val fieldName = properties.getOptionalString(s"$SCHEMA.$i.$SCHEMA_FROM")
+          .orElse(n)
+        builder.field(fieldName, t)
+      }
+    }
+
+    builder.build()
+  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/db2c510f/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Statistics.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Statistics.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Statistics.scala
index 3037286..f87a868 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Statistics.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Statistics.scala
@@ -135,12 +135,12 @@ class Statistics extends Descriptor {
     * Internal method for properties conversion.
     */
   final override def addProperties(properties: DescriptorProperties): Unit = {
-    properties.putInt(STATISTICS_VERSION, 1)
+    properties.putInt(STATISTICS_PROPERTY_VERSION, 1)
     rowCount.foreach(rc => properties.putLong(STATISTICS_ROW_COUNT, rc))
     val namedStats = columnStats.map { case (name, stats) =>
       // name should not be part of the properties key
-      (stats + (NAME -> name)).toMap
-    }.toSeq
+      (stats + (NAME -> name)).toMap.asJava
+    }.toList.asJava
     properties.putIndexedVariableProperties(STATISTICS_COLUMNS, namedStats)
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/db2c510f/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/StatisticsValidator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/StatisticsValidator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/StatisticsValidator.scala
index a78e422..691cb21 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/StatisticsValidator.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/StatisticsValidator.scala
@@ -19,7 +19,8 @@
 package org.apache.flink.table.descriptors
 
 import org.apache.flink.table.api.ValidationException
-import org.apache.flink.table.descriptors.StatisticsValidator.{STATISTICS_COLUMNS, STATISTICS_ROW_COUNT, STATISTICS_VERSION, validateColumnStats}
+import org.apache.flink.table.descriptors.DescriptorProperties.toScala
+import org.apache.flink.table.descriptors.StatisticsValidator.{STATISTICS_COLUMNS, STATISTICS_PROPERTY_VERSION, STATISTICS_ROW_COUNT, validateColumnStats}
 import org.apache.flink.table.plan.stats.ColumnStats
 
 import scala.collection.mutable
@@ -30,7 +31,7 @@ import scala.collection.mutable
 class StatisticsValidator extends DescriptorValidator {
 
   override def validate(properties: DescriptorProperties): Unit = {
-    properties.validateInt(STATISTICS_VERSION, isOptional = true, 0, Integer.MAX_VALUE)
+    properties.validateInt(STATISTICS_PROPERTY_VERSION, isOptional = true, 0, Integer.MAX_VALUE)
     properties.validateLong(STATISTICS_ROW_COUNT, isOptional = true, min = 0)
     validateColumnStats(properties, STATISTICS_COLUMNS)
   }
@@ -38,7 +39,7 @@ class StatisticsValidator extends DescriptorValidator {
 
 object StatisticsValidator {
 
-  val STATISTICS_VERSION = "statistics.version"
+  val STATISTICS_PROPERTY_VERSION = "statistics.property-version"
   val STATISTICS_ROW_COUNT = "statistics.row-count"
   val STATISTICS_COLUMNS = "statistics.columns"
 
@@ -99,16 +100,16 @@ object StatisticsValidator {
     val columnCount = properties.getIndexedProperty(key, NAME).size
 
     val stats = for (i <- 0 until columnCount) yield {
-      val name = properties.getString(s"$key.$i.$NAME").getOrElse(
+      val name = toScala(properties.getOptionalString(s"$key.$i.$NAME")).getOrElse(
         throw new ValidationException(s"Could not find name of property '$key.$i.$NAME'."))
 
       val stats = ColumnStats(
-        properties.getLong(s"$key.$i.$DISTINCT_COUNT").map(v => Long.box(v)).orNull,
-        properties.getLong(s"$key.$i.$NULL_COUNT").map(v => Long.box(v)).orNull,
-        properties.getDouble(s"$key.$i.$AVG_LENGTH").map(v => Double.box(v)).orNull,
-        properties.getInt(s"$key.$i.$MAX_LENGTH").map(v => Int.box(v)).orNull,
-        properties.getDouble(s"$key.$i.$MAX_VALUE").map(v => Double.box(v)).orNull,
-        properties.getDouble(s"$key.$i.$MIN_VALUE").map(v => Double.box(v)).orNull
+        properties.getOptionalLong(s"$key.$i.$DISTINCT_COUNT").orElse(null),
+        properties.getOptionalLong(s"$key.$i.$NULL_COUNT").orElse(null),
+        properties.getOptionalDouble(s"$key.$i.$AVG_LENGTH").orElse(null),
+        properties.getOptionalInt(s"$key.$i.$MAX_LENGTH").orElse(null),
+        properties.getOptionalDouble(s"$key.$i.$MAX_VALUE").orElse(null),
+        properties.getOptionalDouble(s"$key.$i.$MIN_VALUE").orElse(null)
       )
 
       name -> stats

http://git-wip-us.apache.org/repos/asf/flink/blob/db2c510f/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/StreamTableSourceDescriptor.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/StreamTableSourceDescriptor.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/StreamTableSourceDescriptor.scala
index 5e0b42a..8f2e473 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/StreamTableSourceDescriptor.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/StreamTableSourceDescriptor.scala
@@ -46,7 +46,7 @@ class StreamTableSourceDescriptor(tableEnv: StreamTableEnvironment, connector: C
     * Searches for the specified table source, configures it accordingly, and returns it.
     */
   def toTableSource: TableSource[_] = {
-    val source = TableSourceFactoryService.findTableSourceFactory(this)
+    val source = TableSourceFactoryService.findAndCreateTableSource(this)
     source match {
       case _: StreamTableSource[_] => source
       case _ => throw new TableException(

http://git-wip-us.apache.org/repos/asf/flink/blob/db2c510f/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableSourceDescriptor.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableSourceDescriptor.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableSourceDescriptor.scala
index a49a41b..5118489 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableSourceDescriptor.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableSourceDescriptor.scala
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.descriptors
 
+import org.apache.flink.table.descriptors.DescriptorProperties.toScala
 import org.apache.flink.table.descriptors.StatisticsValidator.{STATISTICS_COLUMNS, STATISTICS_ROW_COUNT, readColumnStats}
 import org.apache.flink.table.plan.stats.TableStats
 
@@ -50,7 +51,7 @@ abstract class TableSourceDescriptor extends Descriptor {
   protected def getTableStats: Option[TableStats] = {
       val normalizedProps = new DescriptorProperties()
       addProperties(normalizedProps)
-      val rowCount = normalizedProps.getLong(STATISTICS_ROW_COUNT).map(v => Long.box(v))
+      val rowCount = toScala(normalizedProps.getOptionalLong(STATISTICS_ROW_COUNT))
       rowCount match {
         case Some(cnt) =>
           val columnStats = readColumnStats(normalizedProps, STATISTICS_COLUMNS)

http://git-wip-us.apache.org/repos/asf/flink/blob/db2c510f/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSourceFactory.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSourceFactory.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSourceFactory.scala
index bec4565..06d6bfb 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSourceFactory.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSourceFactory.scala
@@ -21,11 +21,12 @@ package org.apache.flink.table.sources
 import java.util
 
 import org.apache.flink.table.api.TableException
-import org.apache.flink.table.descriptors.ConnectorDescriptorValidator.{CONNECTOR_TYPE, CONNECTOR_VERSION}
+import org.apache.flink.table.descriptors.ConnectorDescriptorValidator.{CONNECTOR_PROPERTY_VERSION, CONNECTOR_TYPE}
 import org.apache.flink.table.descriptors.CsvValidator._
+import org.apache.flink.table.descriptors.DescriptorProperties.toScala
 import org.apache.flink.table.descriptors.FileSystemValidator.{CONNECTOR_PATH, CONNECTOR_TYPE_VALUE}
-import org.apache.flink.table.descriptors.FormatDescriptorValidator.{FORMAT_TYPE, FORMAT_VERSION}
-import org.apache.flink.table.descriptors.SchemaValidator.{SCHEMA, SCHEMA_VERSION}
+import org.apache.flink.table.descriptors.FormatDescriptorValidator.{FORMAT_PROPERTY_VERSION, FORMAT_TYPE}
+import org.apache.flink.table.descriptors.SchemaValidator.SCHEMA
 import org.apache.flink.table.descriptors._
 import org.apache.flink.types.Row
 
@@ -38,9 +39,8 @@ class CsvTableSourceFactory extends TableSourceFactory[Row] {
     val context = new util.HashMap[String, String]()
     context.put(CONNECTOR_TYPE, CONNECTOR_TYPE_VALUE)
     context.put(FORMAT_TYPE, FORMAT_TYPE_VALUE)
-    context.put(CONNECTOR_VERSION, "1")
-    context.put(FORMAT_VERSION, "1")
-    context.put(SCHEMA_VERSION, "1")
+    context.put(CONNECTOR_PROPERTY_VERSION, "1")
+    context.put(FORMAT_PROPERTY_VERSION, "1")
     context
   }
 
@@ -76,33 +76,36 @@ class CsvTableSourceFactory extends TableSourceFactory[Row] {
     // build
     val csvTableSourceBuilder = new CsvTableSource.Builder
 
-    val tableSchema = params.getTableSchema(SCHEMA).get
-    val encodingSchema = params.getTableSchema(FORMAT_FIELDS)
+    val formatSchema = params.getTableSchema(FORMAT_FIELDS)
+    val tableSchema = params.getTableSchema(SCHEMA)
 
     // the CsvTableSource needs some rework first
     // for now the schema must be equal to the encoding
-    if (!encodingSchema.contains(tableSchema)) {
+    if (!formatSchema.equals(tableSchema)) {
       throw new TableException(
         "Encodings that differ from the schema are not supported yet for CsvTableSources.")
     }
 
-    params.getString(CONNECTOR_PATH).foreach(csvTableSourceBuilder.path)
-    params.getString(FORMAT_FIELD_DELIMITER).foreach(csvTableSourceBuilder.fieldDelimiter)
-    params.getString(FORMAT_LINE_DELIMITER).foreach(csvTableSourceBuilder.lineDelimiter)
+    toScala(params.getOptionalString(CONNECTOR_PATH))
+      .foreach(csvTableSourceBuilder.path)
+    toScala(params.getOptionalString(FORMAT_FIELD_DELIMITER))
+      .foreach(csvTableSourceBuilder.fieldDelimiter)
+    toScala(params.getOptionalString(FORMAT_LINE_DELIMITER))
+      .foreach(csvTableSourceBuilder.lineDelimiter)
 
-    encodingSchema.foreach { schema =>
-      schema.getColumnNames.zip(schema.getTypes).foreach { case (name, tpe) =>
-        csvTableSourceBuilder.field(name, tpe)
-      }
+    formatSchema.getColumnNames.zip(formatSchema.getTypes).foreach { case (name, tpe) =>
+      csvTableSourceBuilder.field(name, tpe)
     }
-    params.getCharacter(FORMAT_QUOTE_CHARACTER).foreach(csvTableSourceBuilder.quoteCharacter)
-    params.getString(FORMAT_COMMENT_PREFIX).foreach(csvTableSourceBuilder.commentPrefix)
-    params.getBoolean(FORMAT_IGNORE_FIRST_LINE).foreach { flag =>
+    toScala(params.getOptionalCharacter(FORMAT_QUOTE_CHARACTER))
+      .foreach(csvTableSourceBuilder.quoteCharacter)
+    toScala(params.getOptionalString(FORMAT_COMMENT_PREFIX))
+      .foreach(csvTableSourceBuilder.commentPrefix)
+    toScala(params.getOptionalBoolean(FORMAT_IGNORE_FIRST_LINE)).foreach { flag =>
       if (flag) {
         csvTableSourceBuilder.ignoreFirstLine()
       }
     }
-    params.getBoolean(FORMAT_IGNORE_PARSE_ERRORS).foreach { flag =>
+    toScala(params.getOptionalBoolean(FORMAT_IGNORE_PARSE_ERRORS)).foreach { flag =>
       if (flag) {
         csvTableSourceBuilder.ignoreParseErrors()
       }

http://git-wip-us.apache.org/repos/asf/flink/blob/db2c510f/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSourceFactory.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSourceFactory.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSourceFactory.scala
index f42d765..e5f6965 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSourceFactory.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSourceFactory.scala
@@ -41,10 +41,10 @@ trait TableSourceFactory[T] {
     *   - connector.type
     *   - format.type
     *
-    * Specified versions allow the framework to provide backwards compatible properties in case of
-    * string format changes:
-    *   - connector.version
-    *   - format.version
+    * Specified property versions allow the framework to provide backwards compatible properties
+    * in case of string format changes:
+    *   - connector.property-version
+    *   - format.property-version
     *
     * An empty context means that the factory matches for all requests.
     */
@@ -61,7 +61,8 @@ trait TableSourceFactory[T] {
     *   - format.fields.#.type
     *   - format.fields.#.name
     *
-    * Note: Use "#" to denote an array of values where "#" represents one or more digits.
+    * Note: Use "#" to denote an array of values where "#" represents one or more digits. Property
+    * versions like "format.property-version" must not be part of the supported properties.
     */
   def supportedProperties(): util.List[String]
 

http://git-wip-us.apache.org/repos/asf/flink/blob/db2c510f/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSourceFactoryService.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSourceFactoryService.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSourceFactoryService.scala
index 1e8e836..877cb7b 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSourceFactoryService.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSourceFactoryService.scala
@@ -21,12 +21,10 @@ package org.apache.flink.table.sources
 import java.util.{ServiceConfigurationError, ServiceLoader}
 
 import org.apache.flink.table.api.{AmbiguousTableSourceException, NoMatchingTableSourceException, TableException, ValidationException}
-import org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_VERSION
-import org.apache.flink.table.descriptors.FormatDescriptorValidator.FORMAT_VERSION
-import org.apache.flink.table.descriptors.MetadataValidator.METADATA_VERSION
-import org.apache.flink.table.descriptors.RowtimeValidator.ROWTIME_VERSION
-import org.apache.flink.table.descriptors.SchemaValidator.SCHEMA_VERSION
-import org.apache.flink.table.descriptors.StatisticsValidator.STATISTICS_VERSION
+import org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_PROPERTY_VERSION
+import org.apache.flink.table.descriptors.FormatDescriptorValidator.FORMAT_PROPERTY_VERSION
+import org.apache.flink.table.descriptors.MetadataValidator.METADATA_PROPERTY_VERSION
+import org.apache.flink.table.descriptors.StatisticsValidator.STATISTICS_PROPERTY_VERSION
 import org.apache.flink.table.descriptors._
 import org.apache.flink.table.util.Logging
 
@@ -40,13 +38,13 @@ object TableSourceFactoryService extends Logging {
 
   private lazy val loader = ServiceLoader.load(classOf[TableSourceFactory[_]])
 
-  def findTableSourceFactory(descriptor: TableSourceDescriptor): TableSource[_] = {
+  def findAndCreateTableSource(descriptor: TableSourceDescriptor): TableSource[_] = {
     val properties = new DescriptorProperties()
     descriptor.addProperties(properties)
-    findTableSourceFactory(properties.asMap)
+    findAndCreateTableSource(properties.asMap.asScala.toMap)
   }
 
-  def findTableSourceFactory(properties: Map[String, String]): TableSource[_] = {
+  def findAndCreateTableSource(properties: Map[String, String]): TableSource[_] = {
     var matchingFactory: Option[(TableSourceFactory[_], Seq[String])] = None
     try {
       val iter = loader.iterator()
@@ -73,12 +71,10 @@ object TableSourceFactoryService extends Logging {
         plainContext ++= requiredContext
         // we remove the versions for now until we have the first backwards compatibility case
         // with the version we can provide mappings in case the format changes
-        plainContext.remove(CONNECTOR_VERSION)
-        plainContext.remove(FORMAT_VERSION)
-        plainContext.remove(SCHEMA_VERSION)
-        plainContext.remove(ROWTIME_VERSION)
-        plainContext.remove(METADATA_VERSION)
-        plainContext.remove(STATISTICS_VERSION)
+        plainContext.remove(CONNECTOR_PROPERTY_VERSION)
+        plainContext.remove(FORMAT_PROPERTY_VERSION)
+        plainContext.remove(METADATA_PROPERTY_VERSION)
+        plainContext.remove(STATISTICS_PROPERTY_VERSION)
 
         // check if required context is met
         if (plainContext.forall(e => properties.contains(e._1) && properties(e._1) == e._2)) {

http://git-wip-us.apache.org/repos/asf/flink/blob/db2c510f/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/tsextractors/StreamRecordTimestamp.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/tsextractors/StreamRecordTimestamp.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/tsextractors/StreamRecordTimestamp.scala
index 329f790..fcbd63f 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/tsextractors/StreamRecordTimestamp.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/tsextractors/StreamRecordTimestamp.scala
@@ -27,7 +27,7 @@ import org.apache.flink.table.expressions.{Expression, ResolvedFieldReference}
   *
   * Note: This extractor only works for StreamTableSources.
   */
-class StreamRecordTimestamp extends TimestampExtractor {
+final class StreamRecordTimestamp extends TimestampExtractor {
 
   /** No argument fields required. */
   override def getArgumentFields: Array[String] = Array()
@@ -42,5 +42,8 @@ class StreamRecordTimestamp extends TimestampExtractor {
   override def getExpression(fieldAccesses: Array[ResolvedFieldReference]): Expression = {
     org.apache.flink.table.expressions.StreamRecordTimestamp()
   }
+}
 
+object StreamRecordTimestamp {
+  val INSTANCE: StreamRecordTimestamp = new StreamRecordTimestamp
 }


Mime
View raw message