flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fhue...@apache.org
Subject [1/7] flink git commit: [FLINK-5188] [table] [connectors] [core] Adjust imports and method calls to new Row type.
Date Thu, 15 Dec 2016 10:49:51 GMT
Repository: flink
Updated Branches:
  refs/heads/master 5dab9345c -> 4d27f8f2d


http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/SumAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/SumAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/SumAggregate.scala
index 7ff2340..16f1608 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/SumAggregate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/SumAggregate.scala
@@ -19,7 +19,7 @@ package org.apache.flink.api.table.runtime.aggregate
 
 import java.math.BigDecimal
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo
-import org.apache.flink.api.table.Row
+import org.apache.flink.types.Row
 
 abstract class SumAggregate[T: Numeric]
   extends Aggregate[T] {
@@ -32,9 +32,9 @@ abstract class SumAggregate[T: Numeric]
   }
 
   override def merge(partial1: Row, buffer: Row): Unit = {
-    val partialValue = partial1.productElement(sumIndex).asInstanceOf[T]
+    val partialValue = partial1.getField(sumIndex).asInstanceOf[T]
     if (partialValue != null) {
-      val bufferValue = buffer.productElement(sumIndex).asInstanceOf[T]
+      val bufferValue = buffer.getField(sumIndex).asInstanceOf[T]
       if (bufferValue != null) {
         buffer.setField(sumIndex, numeric.plus(partialValue, bufferValue))
       } else {
@@ -44,7 +44,7 @@ abstract class SumAggregate[T: Numeric]
   }
 
   override def evaluate(buffer: Row): T = {
-    buffer.productElement(sumIndex).asInstanceOf[T]
+    buffer.getField(sumIndex).asInstanceOf[T]
   }
 
   override def prepare(value: Any, partial: Row): Unit = {
@@ -98,9 +98,9 @@ class DecimalSumAggregate extends Aggregate[BigDecimal] {
   }
 
   override def merge(partial1: Row, buffer: Row): Unit = {
-    val partialValue = partial1.productElement(sumIndex).asInstanceOf[BigDecimal]
+    val partialValue = partial1.getField(sumIndex).asInstanceOf[BigDecimal]
     if (partialValue != null) {
-      val bufferValue = buffer.productElement(sumIndex).asInstanceOf[BigDecimal]
+      val bufferValue = buffer.getField(sumIndex).asInstanceOf[BigDecimal]
       if (bufferValue != null) {
         buffer.setField(sumIndex, partialValue.add(bufferValue))
       } else {
@@ -110,7 +110,7 @@ class DecimalSumAggregate extends Aggregate[BigDecimal] {
   }
 
   override def evaluate(buffer: Row): BigDecimal = {
-    buffer.productElement(sumIndex).asInstanceOf[BigDecimal]
+    buffer.getField(sumIndex).asInstanceOf[BigDecimal]
   }
 
   override def prepare(value: Any, partial: Row): Unit = {

http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/TimeWindowPropertyCollector.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/TimeWindowPropertyCollector.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/TimeWindowPropertyCollector.scala
index 9f1c23b..417c1f1 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/TimeWindowPropertyCollector.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/TimeWindowPropertyCollector.scala
@@ -19,7 +19,7 @@
 package org.apache.flink.api.table.runtime.aggregate
 
 import org.apache.calcite.runtime.SqlFunctions
-import org.apache.flink.api.table.Row
+import org.apache.flink.types.Row
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow
 import org.apache.flink.util.Collector
 
@@ -35,7 +35,7 @@ class TimeWindowPropertyCollector(windowStartOffset: Option[Int], windowEndOffse
 
   override def collect(record: Row): Unit = {
 
-    val lastFieldPos = record.productArity - 1
+    val lastFieldPos = record.getArity - 1
 
     if (windowStartOffset.isDefined) {
       record.setField(

http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sinks/CsvTableSink.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sinks/CsvTableSink.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sinks/CsvTableSink.scala
index 7567ba8..5038d9b 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sinks/CsvTableSink.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sinks/CsvTableSink.scala
@@ -21,8 +21,8 @@ package org.apache.flink.api.table.sinks
 import org.apache.flink.api.common.functions.MapFunction
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.DataSet
-import org.apache.flink.api.table.Row
-import org.apache.flink.api.table.typeutils.RowTypeInfo
+import org.apache.flink.types.Row
+import org.apache.flink.api.java.typeutils.RowTypeInfo
 import org.apache.flink.streaming.api.datastream.DataStream
 
 /**
@@ -53,7 +53,7 @@ class CsvTableSink(
   }
 
   override def getOutputType: TypeInformation[Row] = {
-    new RowTypeInfo(getFieldTypes)
+    new RowTypeInfo(getFieldTypes: _*)
   }
 }
 
@@ -68,15 +68,15 @@ class CsvFormatter(fieldDelim: String) extends MapFunction[Row, String] {
     val builder = new StringBuilder
 
     // write first value
-    val v = row.productElement(0)
+    val v = row.getField(0)
     if (v != null) {
       builder.append(v.toString)
     }
 
     // write following values
-    for (i <- 1 until row.productArity) {
+    for (i <- 1 until row.getArity) {
       builder.append(fieldDelim)
-      val v = row.productElement(i)
+      val v = row.getField(i)
       if (v != null) {
         builder.append(v.toString)
       }

http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sources/CsvTableSource.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sources/CsvTableSource.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sources/CsvTableSource.scala
index 9cf4397..b60575a 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sources/CsvTableSource.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sources/CsvTableSource.scala
@@ -21,9 +21,10 @@ package org.apache.flink.api.table.sources
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.io.CsvInputFormat
 import org.apache.flink.api.java.{DataSet, ExecutionEnvironment}
-import org.apache.flink.api.table.{Row, TableException}
-import org.apache.flink.api.table.runtime.io.RowCsvInputFormat
-import org.apache.flink.api.table.typeutils.RowTypeInfo
+import org.apache.flink.api.table.TableException
+import org.apache.flink.types.Row
+import org.apache.flink.api.java.io.RowCsvInputFormat
+import org.apache.flink.api.java.typeutils.RowTypeInfo
 import org.apache.flink.core.fs.Path
 import org.apache.flink.streaming.api.datastream.DataStream
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
@@ -71,7 +72,7 @@ class CsvTableSource(
     throw TableException("Number of field names and field types must be equal.")
   }
 
-  private val returnType = new RowTypeInfo(fieldTypes)
+  private val returnType = new RowTypeInfo(fieldTypes: _*)
 
   /**
     * Returns the data of the table as a [[DataSet]] of [[Row]].

http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/TypeConverter.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/TypeConverter.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/TypeConverter.scala
index a162d9f..a81577c 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/TypeConverter.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/TypeConverter.scala
@@ -25,14 +25,15 @@ import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation}
 import org.apache.flink.api.common.typeutils.CompositeType
 import org.apache.flink.api.java.operators.join.JoinType
 import org.apache.flink.api.java.tuple.Tuple
-import org.apache.flink.api.java.typeutils.{PojoTypeInfo, TupleTypeInfo}
-import org.apache.flink.api.table.{FlinkTypeFactory, Row, TableException}
+import org.apache.flink.api.java.typeutils.{PojoTypeInfo, RowTypeInfo, TupleTypeInfo}
+import org.apache.flink.api.table.{FlinkTypeFactory, TableException}
+import org.apache.flink.types.Row
 
 import scala.collection.JavaConversions._
 
 object TypeConverter {
 
-  val DEFAULT_ROW_TYPE = new RowTypeInfo(Seq()).asInstanceOf[TypeInformation[Any]]
+  val DEFAULT_ROW_TYPE = new RowTypeInfo().asInstanceOf[TypeInformation[Any]]
 
   /**
     * Determines the return type of Flink operators based on the logical fields, the expected
@@ -115,7 +116,7 @@ object TypeConverter {
 
       // Row is expected, create the arity for it
       case Some(typeInfo) if typeInfo.getTypeClass == classOf[Row] =>
-        new RowTypeInfo(logicalFieldTypes)
+        new RowTypeInfo(logicalFieldTypes: _*)
 
       // no physical type
       // determine type based on logical fields and configuration parameters
@@ -123,7 +124,7 @@ object TypeConverter {
         // no need for efficient types -> use Row
         // we cannot use efficient types if row arity > tuple arity or nullable
         if (!useEfficientTypes || logicalFieldTypes.length > Tuple.MAX_ARITY || nullable) {
-          new RowTypeInfo(logicalFieldTypes)
+          new RowTypeInfo(logicalFieldTypes: _*)
         }
         // use efficient type tuple or atomic type
         else {

http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/TableEnvironmentITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/TableEnvironmentITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/TableEnvironmentITCase.java
index 294cba2..75d964b 100644
--- a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/TableEnvironmentITCase.java
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/TableEnvironmentITCase.java
@@ -36,7 +36,7 @@ import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase;
 import org.apache.flink.api.table.CalciteConfig;
 import org.apache.flink.api.table.CalciteConfigBuilder;
-import org.apache.flink.api.table.Row;
+import org.apache.flink.types.Row;
 import org.apache.flink.api.table.Table;
 import org.apache.flink.api.table.TableEnvironment;
 import org.apache.flink.api.table.TableException;

http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/TableSourceITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/TableSourceITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/TableSourceITCase.java
index b634d51..7538808 100644
--- a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/TableSourceITCase.java
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/TableSourceITCase.java
@@ -25,11 +25,11 @@ import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.table.BatchTableEnvironment;
 import org.apache.flink.api.scala.batch.GeneratingInputFormat;
 import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase;
-import org.apache.flink.api.table.Row;
+import org.apache.flink.types.Row;
 import org.apache.flink.api.table.Table;
 import org.apache.flink.api.table.TableEnvironment;
 import org.apache.flink.api.table.sources.BatchTableSource;
-import org.apache.flink.api.table.typeutils.RowTypeInfo;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;

http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/sql/SqlITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/sql/SqlITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/sql/SqlITCase.java
index 1364cbd..89b0d50 100644
--- a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/sql/SqlITCase.java
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/sql/SqlITCase.java
@@ -24,7 +24,7 @@ import org.apache.flink.api.java.table.BatchTableEnvironment;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.tuple.Tuple5;
 import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase;
-import org.apache.flink.api.table.Row;
+import org.apache.flink.types.Row;
 import org.apache.flink.api.table.Table;
 import org.apache.flink.api.table.TableEnvironment;
 import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;

http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/AggregationsITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/AggregationsITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/AggregationsITCase.java
index 02f6e0b..0856a70 100644
--- a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/AggregationsITCase.java
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/AggregationsITCase.java
@@ -30,7 +30,7 @@ import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.tuple.Tuple5;
 import org.apache.flink.api.java.tuple.Tuple7;
 import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase;
-import org.apache.flink.api.table.Row;
+import org.apache.flink.types.Row;
 import org.apache.flink.api.table.Table;
 import org.apache.flink.api.table.TableEnvironment;
 import org.apache.flink.api.table.ValidationException;

http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/CalcITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/CalcITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/CalcITCase.java
index 6fc8173..1d5c189 100644
--- a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/CalcITCase.java
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/CalcITCase.java
@@ -23,7 +23,7 @@ import java.util.Collection;
 import org.apache.flink.api.java.operators.DataSource;
 import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase;
 import org.apache.flink.api.table.Table;
-import org.apache.flink.api.table.Row;
+import org.apache.flink.types.Row;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.table.BatchTableEnvironment;

http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/CastingITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/CastingITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/CastingITCase.java
index 333953b..1139837 100644
--- a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/CastingITCase.java
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/CastingITCase.java
@@ -28,7 +28,7 @@ import org.apache.flink.api.java.tuple.Tuple4;
 import org.apache.flink.api.java.tuple.Tuple6;
 import org.apache.flink.api.java.tuple.Tuple8;
 import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase;
-import org.apache.flink.api.table.Row;
+import org.apache.flink.types.Row;
 import org.apache.flink.api.table.Table;
 import org.apache.flink.api.table.TableEnvironment;
 import org.junit.Test;

http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/JoinITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/JoinITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/JoinITCase.java
index 9676608..014c127 100644
--- a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/JoinITCase.java
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/JoinITCase.java
@@ -25,7 +25,7 @@ import org.apache.flink.api.java.table.BatchTableEnvironment;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.tuple.Tuple5;
 import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase;
-import org.apache.flink.api.table.Row;
+import org.apache.flink.types.Row;
 import org.apache.flink.api.table.Table;
 import org.apache.flink.api.table.TableEnvironment;
 import org.apache.flink.api.table.ValidationException;

http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/stream/sql/SqlITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/stream/sql/SqlITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/stream/sql/SqlITCase.java
index 10ae5d9..53a1a7d 100644
--- a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/stream/sql/SqlITCase.java
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/stream/sql/SqlITCase.java
@@ -22,7 +22,7 @@ import org.apache.flink.api.java.table.StreamTableEnvironment;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.tuple.Tuple5;
 import org.apache.flink.api.scala.stream.utils.StreamITCase;
-import org.apache.flink.api.table.Row;
+import org.apache.flink.types.Row;
 import org.apache.flink.api.table.Table;
 import org.apache.flink.api.table.TableEnvironment;
 import org.apache.flink.streaming.api.datastream.DataStream;

http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/ProjectableTableSourceITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/ProjectableTableSourceITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/ProjectableTableSourceITCase.scala
index 42b9de0..ddea3ba 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/ProjectableTableSourceITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/ProjectableTableSourceITCase.scala
@@ -19,16 +19,17 @@
 package org.apache.flink.api.scala.batch
 
 import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.api.java.typeutils.RowTypeInfo
 import org.apache.flink.api.java.{DataSet => JavaSet, ExecutionEnvironment => JavaExecEnv}
 import org.apache.flink.api.scala.ExecutionEnvironment
 import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase
 import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
 import org.apache.flink.api.scala.table._
 import org.apache.flink.api.table.sources.{BatchTableSource, ProjectableTableSource}
-import org.apache.flink.api.table.typeutils.RowTypeInfo
-import org.apache.flink.api.table.{Row, TableEnvironment}
+import org.apache.flink.api.table.TableEnvironment
 import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
 import org.apache.flink.test.util.TestBaseUtils
+import org.apache.flink.types.Row
 import org.junit.{Before, Test}
 import org.junit.runner.RunWith
 import org.junit.runners.Parameterized
@@ -104,7 +105,7 @@ class TestProjectableTableSource(
   override def getFieldsNames: Array[String] = fieldNames
 
   /** Returns the [[TypeInformation]] for the return type. */
-  override def getReturnType: TypeInformation[Row] = new RowTypeInfo(fieldTypes)
+  override def getReturnType: TypeInformation[Row] = new RowTypeInfo(fieldTypes: _*)
 
   /** Returns the number of fields of the table. */
   override def getNumberOfFields: Int = fieldNames.length

http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/TableEnvironmentITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/TableEnvironmentITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/TableEnvironmentITCase.scala
index 4c07615..b7c8bc0 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/TableEnvironmentITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/TableEnvironmentITCase.scala
@@ -25,7 +25,8 @@ import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase
 import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
 import org.apache.flink.api.scala.table._
 import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.api.table.{Row, TableEnvironment, TableException, ValidationException}
+import org.apache.flink.api.table.{TableEnvironment, TableException, ValidationException}
+import org.apache.flink.types.Row
 import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
 import org.apache.flink.test.util.TestBaseUtils
 import org.junit._

http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/TableSourceITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/TableSourceITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/TableSourceITCase.scala
index 08bee72..b5c8ada 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/TableSourceITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/TableSourceITCase.scala
@@ -28,10 +28,11 @@ import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase
 import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
 import org.apache.flink.api.scala.table._
 import org.apache.flink.api.table.sources.{BatchTableSource, CsvTableSource}
-import org.apache.flink.api.table.typeutils.RowTypeInfo
-import org.apache.flink.api.table.{Row, TableEnvironment}
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.api.table.TableEnvironment
+import org.apache.flink.types.Row
 import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
-import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils}
+import org.apache.flink.test.util.TestBaseUtils
 import org.junit.Test
 import org.junit.runner.RunWith
 import org.junit.runners.Parameterized
@@ -153,7 +154,7 @@ class TestBatchTableSource extends BatchTableSource[Row] {
   override def getFieldsNames: Array[String] = Array("name", "id", "amount")
 
   /** Returns the [[TypeInformation]] for the return type. */
-  override def getReturnType: TypeInformation[Row] = new RowTypeInfo(fieldTypes)
+  override def getReturnType: TypeInformation[Row] = new RowTypeInfo(fieldTypes: _*)
 
   /** Returns the number of fields of the table. */
   override def getNumberOfFields: Int = 3

http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/AggregationsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/AggregationsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/AggregationsITCase.scala
index 35bb7dc..d5d46ba 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/AggregationsITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/AggregationsITCase.scala
@@ -23,7 +23,8 @@ import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase
 import org.apache.flink.api.scala.table._
 import org.apache.flink.api.scala.util.CollectionDataSets
 import TableProgramsTestBase.TableConfigMode
-import org.apache.flink.api.table.{TableException, Row, TableEnvironment}
+import org.apache.flink.api.table.{TableException, TableEnvironment}
+import org.apache.flink.types.Row
 import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
 import org.apache.flink.test.util.TestBaseUtils
 import org.junit._

http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/CalcITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/CalcITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/CalcITCase.scala
index 155833b..5037469 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/CalcITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/CalcITCase.scala
@@ -29,7 +29,8 @@ import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase.TableConfigM
 import org.apache.flink.api.scala.table._
 import org.apache.flink.api.scala.util.CollectionDataSets
 import org.apache.flink.api.table.functions.ScalarFunction
-import org.apache.flink.api.table.{Row, TableEnvironment, ValidationException}
+import org.apache.flink.api.table.{TableEnvironment, ValidationException}
+import org.apache.flink.types.Row
 import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
 import org.apache.flink.test.util.TestBaseUtils
 import org.junit._

http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/JoinITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/JoinITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/JoinITCase.scala
index 68f63c3..074f70b 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/JoinITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/JoinITCase.scala
@@ -23,7 +23,8 @@ import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase
 import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
 import org.apache.flink.api.scala.table._
 import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.api.table.{ValidationException, Row, TableEnvironment, TableException}
+import org.apache.flink.api.table.{ValidationException, TableEnvironment, TableException}
+import org.apache.flink.types.Row
 import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
 import org.apache.flink.test.util.TestBaseUtils
 import org.junit._

http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SetOperatorsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SetOperatorsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SetOperatorsITCase.scala
index 7cdb746..42bd6e8 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SetOperatorsITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SetOperatorsITCase.scala
@@ -23,7 +23,8 @@ import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase
 import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
 import org.apache.flink.api.scala.table._
 import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.api.table.{Row, TableEnvironment}
+import org.apache.flink.api.table.TableEnvironment
+import org.apache.flink.types.Row
 import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
 import org.apache.flink.test.util.TestBaseUtils
 import org.junit._

http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SortITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SortITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SortITCase.scala
index f345984..b94cd00 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SortITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SortITCase.scala
@@ -25,7 +25,8 @@ import org.apache.flink.api.scala.batch.utils.SortTestUtils._
 import org.apache.flink.api.scala.util.CollectionDataSets
 import org.apache.flink.api.scala.table._
 import org.apache.flink.api.scala._
-import org.apache.flink.api.table.{Row, TableEnvironment, TableException}
+import org.apache.flink.api.table.{TableEnvironment, TableException}
+import org.apache.flink.types.Row
 import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
 import org.apache.flink.test.util.TestBaseUtils
 import org.junit._
@@ -46,6 +47,7 @@ class SortITCase(
     val tEnv = TableEnvironment.getTableEnvironment(env, config)
 
     val sqlQuery = "SELECT * FROM MyTable ORDER BY _1 DESC, _2 DESC"
+
     implicit def rowOrdering[T <: Product] = Ordering.by((x : T) =>
       (- x.productElement(0).asInstanceOf[Int], - x.productElement(1).asInstanceOf[Long]))
 
@@ -55,7 +57,10 @@ class SortITCase(
     val expected = sortExpectedly(tupleDataSetStrings)
     val results = tEnv.sql(sqlQuery).toDataSet[Row].mapPartition(rows => Seq(rows.toSeq)).collect()
 
-    val result = results.filterNot(_.isEmpty).sortBy(p => p.head).reduceLeft(_ ++ _)
+    val result = results
+      .filterNot(_.isEmpty)
+      .sortBy(_.head)(Ordering.by(f=> f.toString))
+      .reduceLeft(_ ++ _)
 
     TestBaseUtils.compareOrderedResultAsText(result.asJava, expected)
   }
@@ -68,7 +73,7 @@ class SortITCase(
     val sqlQuery = "SELECT * FROM MyTable ORDER BY _1 DESC OFFSET 2 ROWS"
 
     implicit def rowOrdering[T <: Product] = Ordering.by((x : T) =>
-      -x.productElement(0).asInstanceOf[Int])
+      - x.productElement(0).asInstanceOf[Int] )
 
     val ds = CollectionDataSets.get3TupleDataSet(env)
     tEnv.registerDataSet("MyTable", ds)
@@ -76,7 +81,10 @@ class SortITCase(
     val expected = sortExpectedly(tupleDataSetStrings, 2, 21)
     val results = tEnv.sql(sqlQuery).toDataSet[Row].mapPartition(rows => Seq(rows.toSeq)).collect()
 
-    val result = results.filterNot(_.isEmpty).sortBy(p => p.head).reduceLeft(_ ++ _)
+    val result = results.
+      filterNot(_.isEmpty)
+      .sortBy(_.head)(Ordering.by(f=> f.toString))
+      .reduceLeft(_ ++ _)
 
     TestBaseUtils.compareOrderedResultAsText(result.asJava, expected)
   }
@@ -89,7 +97,7 @@ class SortITCase(
     val sqlQuery = "SELECT * FROM MyTable ORDER BY _1 OFFSET 2 ROWS FETCH NEXT 5 ROWS ONLY"
 
     implicit def rowOrdering[T <: Product] = Ordering.by((x : T) =>
-      x.productElement(0).asInstanceOf[Int])
+      x.productElement(0).asInstanceOf[Int] )
 
     val ds = CollectionDataSets.get3TupleDataSet(env)
     tEnv.registerDataSet("MyTable", ds)
@@ -97,7 +105,10 @@ class SortITCase(
     val expected = sortExpectedly(tupleDataSetStrings, 2, 7)
     val results = tEnv.sql(sqlQuery).toDataSet[Row].mapPartition(rows => Seq(rows.toSeq)).collect()
 
-    val result = results.filterNot(_.isEmpty).sortBy(p => p.head).reduceLeft(_ ++ _)
+    val result = results
+      .filterNot(_.isEmpty)
+      .sortBy(_.head)(Ordering.by(f=> f.toString))
+      .reduceLeft(_ ++ _)
 
     TestBaseUtils.compareOrderedResultAsText(result.asJava, expected)
   }
@@ -107,10 +118,10 @@ class SortITCase(
     val env = ExecutionEnvironment.getExecutionEnvironment
     val tEnv = TableEnvironment.getTableEnvironment(env, config)
 
-    val sqlQuery = "SELECT * FROM MyTable ORDER BY _1 LIMIT 5"
+    val sqlQuery = "SELECT * FROM MyTable ORDER BY _2, _1 LIMIT 5"
 
     implicit def rowOrdering[T <: Product] = Ordering.by((x : T) =>
-      x.productElement(0).asInstanceOf[Int])
+      (x.productElement(1).asInstanceOf[Long], x.productElement(0).asInstanceOf[Int]) )
 
     val ds = CollectionDataSets.get3TupleDataSet(env)
     tEnv.registerDataSet("MyTable", ds)
@@ -118,7 +129,10 @@ class SortITCase(
     val expected = sortExpectedly(tupleDataSetStrings, 0, 5)
     val results = tEnv.sql(sqlQuery).toDataSet[Row].mapPartition(rows => Seq(rows.toSeq)).collect()
 
-    val result = results.filterNot(_.isEmpty).sortBy(p => p.head).reduceLeft(_ ++ _)
+    val result = results
+      .filterNot(_.isEmpty)
+      .sortBy(_.head)(Ordering.by(f=> f.toString))
+      .reduceLeft(_ ++ _)
 
     TestBaseUtils.compareOrderedResultAsText(result.asJava, expected)
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/TableWithSQLITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/TableWithSQLITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/TableWithSQLITCase.scala
index a770a6e..d41f3e0 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/TableWithSQLITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/TableWithSQLITCase.scala
@@ -23,7 +23,8 @@ import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase
 import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
 import org.apache.flink.api.scala.table._
 import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.api.table.{Row, TableEnvironment}
+import org.apache.flink.api.table.TableEnvironment
+import org.apache.flink.types.Row
 import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
 import org.apache.flink.test.util.TestBaseUtils
 import org.junit._

http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/AggregationsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/AggregationsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/AggregationsITCase.scala
index 16c8ece..3f4e1e5 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/AggregationsITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/AggregationsITCase.scala
@@ -23,7 +23,8 @@ import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase
 import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
 import org.apache.flink.api.scala.table._
 import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.api.table.{Row, TableEnvironment, ValidationException}
+import org.apache.flink.api.table.{TableEnvironment, ValidationException}
+import org.apache.flink.types.Row
 import org.apache.flink.examples.scala.WordCountTable.{WC => MyWC}
 import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
 import org.apache.flink.test.util.TestBaseUtils

http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/CalcITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/CalcITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/CalcITCase.scala
index c3758a4..b011462 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/CalcITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/CalcITCase.scala
@@ -27,7 +27,8 @@ import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase.TableConfigM
 import org.apache.flink.api.scala.table._
 import org.apache.flink.api.scala.util.CollectionDataSets
 import org.apache.flink.api.table.expressions.Literal
-import org.apache.flink.api.table.{Row, TableEnvironment, TableException, ValidationException}
+import org.apache.flink.api.table.{TableEnvironment, TableException, ValidationException}
+import org.apache.flink.types.Row
 import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
 import org.apache.flink.test.util.TestBaseUtils
 import org.junit.Assert._

http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/JoinITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/JoinITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/JoinITCase.scala
index 67cac14..195027d 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/JoinITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/JoinITCase.scala
@@ -24,7 +24,8 @@ import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase.TableConfigM
 import org.apache.flink.api.scala.table._
 import org.apache.flink.api.scala.util.CollectionDataSets
 import org.apache.flink.api.table.expressions.Literal
-import org.apache.flink.api.table.{Row, TableEnvironment, TableException, ValidationException}
+import org.apache.flink.api.table.{TableEnvironment, TableException, ValidationException}
+import org.apache.flink.types.Row
 import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
 import org.apache.flink.test.util.TestBaseUtils
 import org.junit._

http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/SetOperatorsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/SetOperatorsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/SetOperatorsITCase.scala
index 283ba10..0d32cb4 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/SetOperatorsITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/SetOperatorsITCase.scala
@@ -23,7 +23,8 @@ import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase
 import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
 import org.apache.flink.api.scala.table._
 import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.api.table.{Row, TableEnvironment, ValidationException}
+import org.apache.flink.api.table.{TableEnvironment, ValidationException}
+import org.apache.flink.types.Row
 import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
 import org.apache.flink.test.util.TestBaseUtils
 import org.junit._

http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/SortITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/SortITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/SortITCase.scala
index d4a1d8d..b3cc054 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/SortITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/SortITCase.scala
@@ -24,7 +24,8 @@ import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase.TableConfigM
 import org.apache.flink.api.scala.table._
 import org.apache.flink.api.scala.util.CollectionDataSets
 import org.apache.flink.api.scala.{ExecutionEnvironment, _}
-import org.apache.flink.api.table.{Row, TableEnvironment, ValidationException}
+import org.apache.flink.api.table.{TableEnvironment, ValidationException}
+import org.apache.flink.types.Row
 import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
 import org.apache.flink.test.util.TestBaseUtils
 import org.junit._
@@ -53,12 +54,15 @@ class SortITCase(
     val ds = CollectionDataSets.get3TupleDataSet(env)
     val t = ds.toTable(tEnv).orderBy('_1.desc)
     implicit def rowOrdering[T <: Product] = Ordering.by((x : T) =>
-      - x.productElement(0).asInstanceOf[Int])
+      - x.productElement(0).asInstanceOf[Int] )
 
     val expected = sortExpectedly(tupleDataSetStrings)
     val results = t.toDataSet[Row].mapPartition(rows => Seq(rows.toSeq)).collect()
 
-    val result = results.filterNot(_.isEmpty).sortBy(p => p.head).reduceLeft(_ ++ _)
+    val result = results
+      .filterNot(_.isEmpty)
+      .sortBy(_.head)(Ordering.by(f=> f.toString))
+      .reduceLeft(_ ++ _)
 
     TestBaseUtils.compareOrderedResultAsText(result.asJava, expected)
   }
@@ -71,12 +75,15 @@ class SortITCase(
     val ds = CollectionDataSets.get3TupleDataSet(env)
     val t = ds.toTable(tEnv).orderBy('_1.asc)
     implicit def rowOrdering[T <: Product] = Ordering.by((x : T) =>
-      x.productElement(0).asInstanceOf[Int])
+      x.productElement(0).asInstanceOf[Int] )
 
     val expected = sortExpectedly(tupleDataSetStrings)
     val results = t.toDataSet[Row].mapPartition(rows => Seq(rows.toSeq)).collect()
 
-    val result = results.filterNot(_.isEmpty).sortBy(p => p.head).reduceLeft(_ ++ _)
+    val result = results
+      .filterNot(_.isEmpty)
+      .sortBy(_.head)(Ordering.by(f=> f.toString))
+      .reduceLeft(_ ++ _)
 
     TestBaseUtils.compareOrderedResultAsText(result.asJava, expected)
   }
@@ -87,14 +94,17 @@ class SortITCase(
     val tEnv = TableEnvironment.getTableEnvironment(env, config)
 
     val ds = CollectionDataSets.get3TupleDataSet(env)
-    val t = ds.toTable(tEnv).orderBy('_1.asc, '_2.desc)
+    val t = ds.toTable(tEnv).orderBy('_2.asc, '_1.desc)
     implicit def rowOrdering[T <: Product] = Ordering.by((x : T) =>
-      (x.productElement(0).asInstanceOf[Int], - x.productElement(1).asInstanceOf[Long]))
+      (x.productElement(1).asInstanceOf[Long], - x.productElement(0).asInstanceOf[Int]) )
 
     val expected = sortExpectedly(tupleDataSetStrings)
     val results = t.toDataSet[Row].mapPartition(rows => Seq(rows.toSeq)).collect()
 
-    val result = results.filterNot(_.isEmpty).sortBy(p => p.head).reduceLeft(_ ++ _)
+    val result = results
+      .filterNot(_.isEmpty)
+      .sortBy(_.head)(Ordering.by(f=> f.toString))
+      .reduceLeft(_ ++ _)
 
     TestBaseUtils.compareOrderedResultAsText(result.asJava, expected)
   }
@@ -107,12 +117,15 @@ class SortITCase(
     val ds = CollectionDataSets.get3TupleDataSet(env)
     val t = ds.toTable(tEnv).orderBy('_1.asc).limit(3)
     implicit def rowOrdering[T <: Product] = Ordering.by((x : T) =>
-      x.productElement(0).asInstanceOf[Int])
+      x.productElement(0).asInstanceOf[Int] )
 
     val expected = sortExpectedly(tupleDataSetStrings, 3, 21)
     val results = t.toDataSet[Row].mapPartition(rows => Seq(rows.toSeq)).collect()
 
-    val result = results.filterNot(_.isEmpty).sortBy(p => p.head).reduceLeft(_ ++ _)
+    val result = results
+      .filterNot(_.isEmpty)
+      .sortBy(_.head)(Ordering.by(f=> f.toString))
+      .reduceLeft(_ ++ _)
 
     TestBaseUtils.compareOrderedResultAsText(result.asJava, expected)
   }
@@ -123,14 +136,17 @@ class SortITCase(
     val tEnv = TableEnvironment.getTableEnvironment(env, config)
 
     val ds = CollectionDataSets.get3TupleDataSet(env)
-    val t = ds.toTable(tEnv).orderBy('_1.asc).limit(3, 5)
+    val t = ds.toTable(tEnv).orderBy('_1.desc).limit(3, 5)
     implicit def rowOrdering[T <: Product] = Ordering.by((x : T) =>
-      x.productElement(0).asInstanceOf[Int])
+      - x.productElement(0).asInstanceOf[Int] )
 
     val expected = sortExpectedly(tupleDataSetStrings, 3, 8)
     val results = t.toDataSet[Row].mapPartition(rows => Seq(rows.toSeq)).collect()
 
-    val result = results.filterNot(_.isEmpty).sortBy(p => p.head).reduceLeft(_ ++ _)
+    val result = results
+      .filterNot(_.isEmpty)
+      .sortBy(_.head)(Ordering.by(f=> f.toString))
+      .reduceLeft(_ ++ _)
 
     TestBaseUtils.compareOrderedResultAsText(result.asJava, expected)
   }
@@ -143,12 +159,15 @@ class SortITCase(
     val ds = CollectionDataSets.get3TupleDataSet(env)
     val t = ds.toTable(tEnv).orderBy('_1.asc).limit(0, 5)
     implicit def rowOrdering[T <: Product] = Ordering.by((x : T) =>
-      x.productElement(0).asInstanceOf[Int])
+      x.productElement(0).asInstanceOf[Int] )
 
     val expected = sortExpectedly(tupleDataSetStrings, 0, 5)
     val results = t.toDataSet[Row].mapPartition(rows => Seq(rows.toSeq)).collect()
 
-    val result = results.filterNot(_.isEmpty).sortBy(p => p.head).reduceLeft(_ ++ _)
+    val result = results
+      .filterNot(_.isEmpty)
+      .sortBy(_.head)(Ordering.by(f=> f.toString))
+      .reduceLeft(_ ++ _)
 
     TestBaseUtils.compareOrderedResultAsText(result.asJava, expected)
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/UserDefinedTableFunctionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/UserDefinedTableFunctionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/UserDefinedTableFunctionTest.scala
index 7e170d4..285a181 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/UserDefinedTableFunctionTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/UserDefinedTableFunctionTest.scala
@@ -21,10 +21,11 @@ import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.{DataSet => JDataSet, ExecutionEnvironment => JavaExecutionEnv}
 import org.apache.flink.api.scala.table._
 import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment => ScalaExecutionEnv, _}
-import org.apache.flink.api.table.typeutils.RowTypeInfo
+import org.apache.flink.api.java.typeutils.RowTypeInfo
 import org.apache.flink.api.table.utils.TableTestUtil._
 import org.apache.flink.api.table.utils.{PojoTableFunc, TableFunc2, _}
-import org.apache.flink.api.table.{Row, TableEnvironment, Types}
+import org.apache.flink.api.table.{TableEnvironment, Types}
+import org.apache.flink.types.Row
 import org.junit.Test
 import org.mockito.Mockito._
 
@@ -35,7 +36,7 @@ class UserDefinedTableFunctionTest extends TableTestBase {
     // mock
     val ds = mock(classOf[DataSet[Row]])
     val jDs = mock(classOf[JDataSet[Row]])
-    val typeInfo: TypeInformation[Row] = new RowTypeInfo(Seq(Types.INT, Types.LONG, Types.STRING))
+    val typeInfo = new RowTypeInfo(Seq(Types.INT, Types.LONG, Types.STRING): _*)
     when(ds.javaSet).thenReturn(jDs)
     when(jDs.getType).thenReturn(typeInfo)
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/TableSourceITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/TableSourceITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/TableSourceITCase.scala
index c14ad97..1c93112 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/TableSourceITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/TableSourceITCase.scala
@@ -25,8 +25,9 @@ import org.apache.flink.api.scala._
 import org.apache.flink.api.scala.stream.utils.StreamITCase
 import org.apache.flink.api.scala.table._
 import org.apache.flink.api.table.sources.{CsvTableSource, StreamTableSource}
-import org.apache.flink.api.table.typeutils.RowTypeInfo
-import org.apache.flink.api.table.{Row, TableEnvironment}
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.api.table.TableEnvironment
+import org.apache.flink.types.Row
 import org.apache.flink.streaming.api.datastream.DataStream
 import org.apache.flink.streaming.api.environment
 import org.apache.flink.streaming.api.functions.source.SourceFunction
@@ -163,7 +164,7 @@ class TestStreamTableSource(val numRecords: Int) extends StreamTableSource[Row]
   override def getFieldsNames: Array[String] = Array("name", "id", "amount")
 
   /** Returns the [[TypeInformation]] for the return type. */
-  override def getReturnType: TypeInformation[Row] = new RowTypeInfo(fieldTypes)
+  override def getReturnType: TypeInformation[Row] = new RowTypeInfo(fieldTypes: _*)
 
   /** Returns the number of fields of the table. */
   override def getNumberOfFields: Int = 3

http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/sql/SqlITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/sql/SqlITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/sql/SqlITCase.scala
index 5b278c1..c4ca964 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/sql/SqlITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/sql/SqlITCase.scala
@@ -21,7 +21,8 @@ package org.apache.flink.api.scala.stream.sql
 import org.apache.flink.api.scala._
 import org.apache.flink.api.scala.stream.utils.{StreamTestData, StreamITCase}
 import org.apache.flink.api.scala.table._
-import org.apache.flink.api.table.{Row, TableEnvironment}
+import org.apache.flink.api.table.TableEnvironment
+import org.apache.flink.types.Row
 import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
 import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
 import org.junit.Assert._

http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/AggregationsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/AggregationsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/AggregationsITCase.scala
index 0753484..d398556 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/AggregationsITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/AggregationsITCase.scala
@@ -22,7 +22,8 @@ import org.apache.flink.api.scala._
 import org.apache.flink.api.scala.stream.table.GroupWindowITCase.TimestampWithEqualWatermark
 import org.apache.flink.api.scala.stream.utils.StreamITCase
 import org.apache.flink.api.scala.table._
-import org.apache.flink.api.table.{Row, _}
+import org.apache.flink.api.table._
+import org.apache.flink.types.Row
 import org.apache.flink.streaming.api.TimeCharacteristic
 import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
 import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment

http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/CalcITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/CalcITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/CalcITCase.scala
index 578ad30..3eee4d4 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/CalcITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/CalcITCase.scala
@@ -22,7 +22,8 @@ import org.apache.flink.api.scala._
 import org.apache.flink.api.scala.stream.utils.{StreamITCase, StreamTestData}
 import org.apache.flink.api.scala.table._
 import org.apache.flink.api.table.expressions.Literal
-import org.apache.flink.api.table.{Row, TableEnvironment, TableException}
+import org.apache.flink.api.table.{TableEnvironment, TableException}
+import org.apache.flink.types.Row
 import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
 import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
 import org.junit.Assert._

http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/UnionITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/UnionITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/UnionITCase.scala
index 131974e..5096b53 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/UnionITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/UnionITCase.scala
@@ -21,7 +21,8 @@ package org.apache.flink.api.scala.stream.table
 import org.apache.flink.api.scala._
 import org.apache.flink.api.scala.stream.utils.{StreamITCase, StreamTestData}
 import org.apache.flink.api.scala.table._
-import org.apache.flink.api.table.{Row, TableEnvironment, ValidationException}
+import org.apache.flink.api.table.{TableEnvironment, ValidationException}
+import org.apache.flink.types.Row
 import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
 import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
 import org.junit.Assert._

http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/UserDefinedTableFunctionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/UserDefinedTableFunctionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/UserDefinedTableFunctionTest.scala
index b45ae8e..305f1db 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/UserDefinedTableFunctionTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/UserDefinedTableFunctionTest.scala
@@ -17,12 +17,12 @@
  */
 package org.apache.flink.api.scala.stream.table
 
-import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.scala._
 import org.apache.flink.api.scala.table._
-import org.apache.flink.api.table._
+import org.apache.flink.types.Row
 import org.apache.flink.api.table.expressions.utils._
-import org.apache.flink.api.table.typeutils.RowTypeInfo
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.api.table.{TableEnvironment, TableException, Types, ValidationException}
 import org.apache.flink.api.table.utils.TableTestUtil._
 import org.apache.flink.api.table.utils._
 import org.apache.flink.streaming.api.datastream.{DataStream => JDataStream}
@@ -39,7 +39,7 @@ class UserDefinedTableFunctionTest extends TableTestBase {
     // mock
     val ds = mock(classOf[DataStream[Row]])
     val jDs = mock(classOf[JDataStream[Row]])
-    val typeInfo: TypeInformation[Row] = new RowTypeInfo(Seq(Types.INT, Types.LONG, Types.STRING))
+    val typeInfo = new RowTypeInfo(Seq(Types.INT, Types.LONG, Types.STRING): _*)
     when(ds.javaStream).thenReturn(jDs)
     when(jDs.getType).thenReturn(typeInfo)
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/utils/StreamITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/utils/StreamITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/utils/StreamITCase.scala
index 4860005..4fd3cd7 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/utils/StreamITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/utils/StreamITCase.scala
@@ -20,7 +20,7 @@ package org.apache.flink.api.scala.stream.utils
 
 import java.util.Collections
 
-import org.apache.flink.api.table.Row
+import org.apache.flink.types.Row
 import org.junit.Assert._
 import scala.collection.mutable
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction

http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/ArrayTypeTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/ArrayTypeTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/ArrayTypeTest.scala
index 034ce0b..2ba76ad 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/ArrayTypeTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/ArrayTypeTest.scala
@@ -24,8 +24,9 @@ import org.apache.flink.api.common.typeinfo.{PrimitiveArrayTypeInfo, TypeInforma
 import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo
 import org.apache.flink.api.scala.table._
 import org.apache.flink.api.table.expressions.utils.ExpressionTestBase
-import org.apache.flink.api.table.typeutils.RowTypeInfo
-import org.apache.flink.api.table.{Row, Types, ValidationException}
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.api.table.{Types, ValidationException}
+import org.apache.flink.types.Row
 import org.junit.Test
 
 class ArrayTypeTest extends ExpressionTestBase {
@@ -342,7 +343,7 @@ class ArrayTypeTest extends ExpressionTestBase {
   }
 
   override def typeInfo: TypeInformation[Any] = {
-    new RowTypeInfo(Seq(
+    new RowTypeInfo(
       Types.INT,
       Types.INT,
       PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO,
@@ -354,6 +355,6 @@ class ArrayTypeTest extends ExpressionTestBase {
       PrimitiveArrayTypeInfo.DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO,
       ObjectArrayTypeInfo.getInfoFor(Types.INT),
       ObjectArrayTypeInfo.getInfoFor(Types.INT)
-    )).asInstanceOf[TypeInformation[Any]]
+    ).asInstanceOf[TypeInformation[Any]]
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/CompositeAccessTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/CompositeAccessTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/CompositeAccessTest.scala
index 3121c58..879f68d 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/CompositeAccessTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/CompositeAccessTest.scala
@@ -22,10 +22,11 @@ import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.typeutils.{TupleTypeInfo, TypeExtractor}
 import org.apache.flink.api.scala.createTypeInformation
 import org.apache.flink.api.scala.table._
-import org.apache.flink.api.table.{Row, Types, ValidationException}
+import org.apache.flink.api.table.{Types, ValidationException}
+import org.apache.flink.types.Row
 import org.apache.flink.api.table.expressions.CompositeAccessTest.{MyCaseClass, MyCaseClass2, MyPojo}
 import org.apache.flink.api.table.expressions.utils.ExpressionTestBase
-import org.apache.flink.api.table.typeutils.RowTypeInfo
+import org.apache.flink.api.java.typeutils.RowTypeInfo
 import org.junit.Test
 
 
@@ -154,7 +155,7 @@ class CompositeAccessTest extends ExpressionTestBase {
   }
 
   def typeInfo = {
-    new RowTypeInfo(Seq(
+    new RowTypeInfo(
       createTypeInformation[MyCaseClass],
       createTypeInformation[MyCaseClass2],
       createTypeInformation[(String, String)],
@@ -163,7 +164,7 @@ class CompositeAccessTest extends ExpressionTestBase {
       Types.INT,
       createTypeInformation[MyCaseClass2],
       createTypeInformation[Tuple1[Boolean]]
-      )).asInstanceOf[TypeInformation[Any]]
+      ).asInstanceOf[TypeInformation[Any]]
   }
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/DecimalTypeTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/DecimalTypeTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/DecimalTypeTest.scala
index 20a8af8..a986365 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/DecimalTypeTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/DecimalTypeTest.scala
@@ -20,9 +20,10 @@ package org.apache.flink.api.table.expressions
 
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.scala.table._
-import org.apache.flink.api.table.{Row, Types}
+import org.apache.flink.api.table.Types
+import org.apache.flink.types.Row
 import org.apache.flink.api.table.expressions.utils.ExpressionTestBase
-import org.apache.flink.api.table.typeutils.RowTypeInfo
+import org.apache.flink.api.java.typeutils.RowTypeInfo
 import org.junit.Test
 
 class DecimalTypeTest extends ExpressionTestBase {
@@ -298,13 +299,13 @@ class DecimalTypeTest extends ExpressionTestBase {
   }
 
   def typeInfo = {
-    new RowTypeInfo(Seq(
+    new RowTypeInfo(
       Types.DECIMAL,
       Types.DECIMAL,
       Types.INT,
       Types.DOUBLE,
       Types.DECIMAL,
-      Types.DECIMAL)).asInstanceOf[TypeInformation[Any]]
+      Types.DECIMAL).asInstanceOf[TypeInformation[Any]]
   }
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/NonDeterministicTests.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/NonDeterministicTests.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/NonDeterministicTests.scala
index de48849..0b39d4d 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/NonDeterministicTests.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/NonDeterministicTests.scala
@@ -20,9 +20,9 @@ package org.apache.flink.api.table.expressions
 
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.scala.table._
-import org.apache.flink.api.table.Row
+import org.apache.flink.types.Row
 import org.apache.flink.api.table.expressions.utils.ExpressionTestBase
-import org.apache.flink.api.table.typeutils.RowTypeInfo
+import org.apache.flink.api.java.typeutils.RowTypeInfo
 import org.junit.{Ignore, Test}
 
 /**
@@ -85,5 +85,5 @@ class NonDeterministicTests extends ExpressionTestBase {
   override def testData: Any = new Row(0)
 
   override def typeInfo: TypeInformation[Any] =
-    new RowTypeInfo(Seq[TypeInformation[_]]()).asInstanceOf[TypeInformation[Any]]
+    new RowTypeInfo().asInstanceOf[TypeInformation[Any]]
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/ScalarFunctionsTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/ScalarFunctionsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/ScalarFunctionsTest.scala
index 1d2a1b7..3ef02a9 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/ScalarFunctionsTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/ScalarFunctionsTest.scala
@@ -23,8 +23,9 @@ import java.sql.{Date, Time, Timestamp}
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.scala.table._
 import org.apache.flink.api.table.expressions.utils.ExpressionTestBase
-import org.apache.flink.api.table.typeutils.RowTypeInfo
-import org.apache.flink.api.table.{Row, Types, ValidationException}
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.api.table.{Types, ValidationException}
+import org.apache.flink.types.Row
 import org.junit.Test
 
 class ScalarFunctionsTest extends ExpressionTestBase {
@@ -1134,7 +1135,7 @@ class ScalarFunctionsTest extends ExpressionTestBase {
   }
 
   def typeInfo = {
-    new RowTypeInfo(Seq(
+    new RowTypeInfo(
       Types.STRING,
       Types.BOOLEAN,
       Types.BYTE,
@@ -1159,7 +1160,7 @@ class ScalarFunctionsTest extends ExpressionTestBase {
       Types.BOOLEAN,
       Types.DECIMAL,
       Types.STRING,
-      Types.STRING)).asInstanceOf[TypeInformation[Any]]
+      Types.STRING).asInstanceOf[TypeInformation[Any]]
 
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/ScalarOperatorsTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/ScalarOperatorsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/ScalarOperatorsTest.scala
index 7ad2212..86f884f 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/ScalarOperatorsTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/ScalarOperatorsTest.scala
@@ -21,8 +21,9 @@ package org.apache.flink.api.table.expressions
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.scala.table._
 import org.apache.flink.api.table.expressions.utils.ExpressionTestBase
-import org.apache.flink.api.table.typeutils.RowTypeInfo
-import org.apache.flink.api.table.{Row, Types, ValidationException}
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.api.table.{Types, ValidationException}
+import org.apache.flink.types.Row
 import org.junit.Test
 
 class ScalarOperatorsTest extends ExpressionTestBase {
@@ -201,7 +202,7 @@ class ScalarOperatorsTest extends ExpressionTestBase {
   }
 
   def typeInfo = {
-    new RowTypeInfo(Seq(
+    new RowTypeInfo(
       Types.BYTE,
       Types.SHORT,
       Types.INT,
@@ -213,7 +214,7 @@ class ScalarOperatorsTest extends ExpressionTestBase {
       Types.INT,
       Types.INT,
       Types.STRING
-      )).asInstanceOf[TypeInformation[Any]]
+      ).asInstanceOf[TypeInformation[Any]]
   }
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/SqlExpressionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/SqlExpressionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/SqlExpressionTest.scala
index 52dc848..56f40ea 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/SqlExpressionTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/SqlExpressionTest.scala
@@ -19,9 +19,9 @@
 package org.apache.flink.api.table.expressions
 
 import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.table.Row
+import org.apache.flink.types.Row
 import org.apache.flink.api.table.expressions.utils.ExpressionTestBase
-import org.apache.flink.api.table.typeutils.RowTypeInfo
+import org.apache.flink.api.java.typeutils.RowTypeInfo
 import org.junit.{Ignore, Test}
 
 /**
@@ -166,5 +166,5 @@ class SqlExpressionTest extends ExpressionTestBase {
   override def testData: Any = new Row(0)
 
   override def typeInfo: TypeInformation[Any] =
-    new RowTypeInfo(Seq()).asInstanceOf[TypeInformation[Any]]
+    new RowTypeInfo().asInstanceOf[TypeInformation[Any]]
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/TemporalTypesTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/TemporalTypesTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/TemporalTypesTest.scala
index 0547552..bd771ba 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/TemporalTypesTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/TemporalTypesTest.scala
@@ -23,8 +23,9 @@ import java.sql.{Date, Time, Timestamp}
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.scala.table._
 import org.apache.flink.api.table.expressions.utils.ExpressionTestBase
-import org.apache.flink.api.table.typeutils.RowTypeInfo
-import org.apache.flink.api.table.{Row, Types}
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.api.table.Types
+import org.apache.flink.types.Row
 import org.junit.Test
 
 class TemporalTypesTest extends ExpressionTestBase {
@@ -556,7 +557,7 @@ class TemporalTypesTest extends ExpressionTestBase {
   }
 
   def typeInfo = {
-    new RowTypeInfo(Seq(
+    new RowTypeInfo(
       Types.DATE,
       Types.TIME,
       Types.TIMESTAMP,
@@ -567,6 +568,6 @@ class TemporalTypesTest extends ExpressionTestBase {
       Types.INT,
       Types.LONG,
       Types.INTERVAL_MONTHS,
-      Types.INTERVAL_MILLIS)).asInstanceOf[TypeInformation[Any]]
+      Types.INTERVAL_MILLIS).asInstanceOf[TypeInformation[Any]]
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/UserDefinedScalarFunctionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/UserDefinedScalarFunctionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/UserDefinedScalarFunctionTest.scala
index ffe3cd3..567cca1 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/UserDefinedScalarFunctionTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/UserDefinedScalarFunctionTest.scala
@@ -25,8 +25,9 @@ import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.scala.table._
 import org.apache.flink.api.table.expressions.utils._
 import org.apache.flink.api.table.functions.ScalarFunction
-import org.apache.flink.api.table.typeutils.RowTypeInfo
-import org.apache.flink.api.table.{Row, Types}
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.api.table.Types
+import org.apache.flink.types.Row
 import org.junit.Test
 
 class UserDefinedScalarFunctionTest extends ExpressionTestBase {
@@ -195,7 +196,7 @@ class UserDefinedScalarFunctionTest extends ExpressionTestBase {
   }
 
   override def typeInfo: TypeInformation[Any] = {
-    new RowTypeInfo(Seq(
+    new RowTypeInfo(
       Types.INT,
       Types.STRING,
       Types.BOOLEAN,
@@ -205,7 +206,7 @@ class UserDefinedScalarFunctionTest extends ExpressionTestBase {
       Types.TIMESTAMP,
       Types.INTERVAL_MONTHS,
       Types.INTERVAL_MILLIS
-    )).asInstanceOf[TypeInformation[Any]]
+    ).asInstanceOf[TypeInformation[Any]]
   }
 
   override def functions: Map[String, ScalarFunction] = Map(

http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/utils/ExpressionTestBase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/utils/ExpressionTestBase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/utils/ExpressionTestBase.scala
index 958fd25..3156ba8 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/utils/ExpressionTestBase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/utils/ExpressionTestBase.scala
@@ -33,7 +33,8 @@ import org.apache.flink.api.table.expressions.{Expression, ExpressionParser}
 import org.apache.flink.api.table.functions.ScalarFunction
 import org.apache.flink.api.table.plan.nodes.dataset.{DataSetCalc, DataSetConvention}
 import org.apache.flink.api.table.plan.rules.FlinkRuleSets
-import org.apache.flink.api.table.typeutils.RowTypeInfo
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.types.Row
 import org.junit.Assert._
 import org.junit.{After, Before}
 import org.mockito.Mockito._
@@ -96,7 +97,7 @@ abstract class ExpressionTestBase {
     val stringTestExprs = testExprs.map(expr => relBuilder.cast(expr._1, VARCHAR)).toSeq
 
     // generate code
-    val resultType = new RowTypeInfo(Seq.fill(testExprs.size)(STRING_TYPE_INFO))
+    val resultType = new RowTypeInfo(Seq.fill(testExprs.size)(STRING_TYPE_INFO): _*)
     val genExpr = generator.generateResultExpression(
       resultType,
       resultType.getFieldNames,
@@ -124,7 +125,7 @@ abstract class ExpressionTestBase {
       .zipWithIndex
       .foreach {
         case ((expr, expected), index) =>
-          val actual = result.productElement(index)
+          val actual = result.getField(index)
           assertEquals(
             s"Wrong result for: $expr",
             expected,

http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/aggregate/AggregateTestBase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/aggregate/AggregateTestBase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/aggregate/AggregateTestBase.scala
index 54911a5..4e33a61 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/aggregate/AggregateTestBase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/aggregate/AggregateTestBase.scala
@@ -19,7 +19,7 @@
 package org.apache.flink.api.table.runtime.aggregate
 
 import java.math.BigDecimal
-import org.apache.flink.api.table.Row
+import org.apache.flink.types.Row
 import org.junit.Test
 import org.junit.Assert.assertEquals
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/dataset/DataSetCorrelateITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/dataset/DataSetCorrelateITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/dataset/DataSetCorrelateITCase.scala
index 32559f1..993347f 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/dataset/DataSetCorrelateITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/dataset/DataSetCorrelateITCase.scala
@@ -23,7 +23,8 @@ import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase.TableConfigM
 import org.apache.flink.api.scala.table._
 import org.apache.flink.api.table.expressions.utils._
 import org.apache.flink.api.table.utils._
-import org.apache.flink.api.table.{Row, TableEnvironment}
+import org.apache.flink.api.table.TableEnvironment
+import org.apache.flink.types.Row
 import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
 import org.apache.flink.test.util.TestBaseUtils
 import org.junit.Test

http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/datastream/DataStreamCorrelateITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/datastream/DataStreamCorrelateITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/datastream/DataStreamCorrelateITCase.scala
index 70b0359..21fe157 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/datastream/DataStreamCorrelateITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/datastream/DataStreamCorrelateITCase.scala
@@ -21,7 +21,8 @@ import org.apache.flink.api.scala._
 import org.apache.flink.api.scala.stream.utils.StreamITCase
 import org.apache.flink.api.scala.table._
 import org.apache.flink.api.table.utils.TableFunc0
-import org.apache.flink.api.table.{Row, TableEnvironment}
+import org.apache.flink.api.table.TableEnvironment
+import org.apache.flink.types.Row
 import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
 import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
 import org.junit.Assert._

http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/utils/UserDefinedTableFunctions.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/utils/UserDefinedTableFunctions.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/utils/UserDefinedTableFunctions.scala
index 3da3857..4291b29 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/utils/UserDefinedTableFunctions.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/utils/UserDefinedTableFunctions.scala
@@ -21,9 +21,9 @@ import java.lang.Boolean
 
 import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
 import org.apache.flink.api.java.tuple.Tuple3
-import org.apache.flink.api.table.Row
+import org.apache.flink.types.Row
 import org.apache.flink.api.table.functions.TableFunction
-import org.apache.flink.api.table.typeutils.RowTypeInfo
+import org.apache.flink.api.java.typeutils.RowTypeInfo
 
 
 case class SimpleUser(name: String, age: Int)
@@ -66,8 +66,8 @@ class TableFunc2 extends TableFunction[Row] {
   }
 
   override def getResultType: TypeInformation[Row] = {
-    new RowTypeInfo(Seq(BasicTypeInfo.STRING_TYPE_INFO,
-                        BasicTypeInfo.INT_TYPE_INFO))
+    new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO,
+                    BasicTypeInfo.INT_TYPE_INFO)
   }
 }
 


Mime
View raw message