flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fhue...@apache.org
Subject [4/4] flink git commit: [FLINK-3632] [tableAPI] Clean up TableAPI exceptions.
Date Mon, 23 May 2016 14:30:14 GMT
[FLINK-3632] [tableAPI] Clean up TableAPI exceptions.

This closes #2015


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9cc62966
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9cc62966
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9cc62966

Branch: refs/heads/master
Commit: 9cc629662a34bd9cc6310556a321dd5144a60439
Parents: 53949d1
Author: Yijie Shen <henry.yijieshen@gmail.com>
Authored: Wed May 18 23:57:42 2016 +0800
Committer: Fabian Hueske <fhueske@apache.org>
Committed: Mon May 23 14:06:06 2016 +0200

----------------------------------------------------------------------
 .../api/scala/table/TableConversions.scala      |  2 +-
 .../flink/api/table/TableEnvironment.scala      | 29 +++++++++-----------
 .../api/table/plan/logical/operators.scala      |  3 +-
 .../table/plan/nodes/dataset/DataSetRel.scala   |  4 +--
 .../api/table/plan/rules/FlinkRuleSets.scala    |  2 +-
 .../api/table/plan/schema/FlinkTable.scala      | 12 ++++----
 .../api/table/sources/CsvTableSource.scala      | 10 +++----
 .../apache/flink/api/table/trees/TreeNode.scala |  5 ++--
 .../flink/api/table/typeutils/RowTypeInfo.scala |  6 ++--
 .../api/java/batch/table/FromDataSetITCase.java | 11 ++++----
 .../api/scala/batch/table/ToTableITCase.scala   | 12 ++++----
 .../api/scala/stream/table/SelectITCase.scala   | 12 ++++----
 12 files changed, 54 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9cc62966/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/TableConversions.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/TableConversions.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/TableConversions.scala
index 1fdcbc5..720dac0 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/TableConversions.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/TableConversions.scala
@@ -41,7 +41,7 @@ class TableConversions(table: Table) {
         tEnv.toDataSet(table)
       case _ =>
         throw new TableException(
-          "Only tables that orginate from Scala DataSets can be converted to Scala DataSets.")
+          "Only tables that originate from Scala DataSets can be converted to Scala DataSets.")
     }
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9cc62966/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala
index 8aa9e10..1c592f9 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala
@@ -236,8 +236,7 @@ abstract class TableEnvironment(val config: TableConfig) {
       case c: CaseClassTypeInfo[A] => c.getFieldNames
       case p: PojoTypeInfo[A] => p.getFieldNames
       case tpe =>
-        throw new IllegalArgumentException(
-          s"Type $tpe requires explicit field naming.")
+        throw new TableException(s"Type $tpe lacks explicit field naming")
     }
     val fieldIndexes = fieldNames.indices.toArray
     (fieldNames, fieldIndexes)
@@ -259,12 +258,11 @@ abstract class TableEnvironment(val config: TableConfig) {
     val indexedNames: Array[(Int, String)] = inputType match {
       case a: AtomicType[A] =>
         if (exprs.length != 1) {
-          throw new IllegalArgumentException("Atomic type may can only have a single field.")
+          throw new TableException("Table of atomic type can only have a single field.")
         }
         exprs.map {
           case UnresolvedFieldReference(name) => (0, name)
-          case _ => throw new IllegalArgumentException(
-            "Field reference expression expected.")
+          case _ => throw new TableException("Field reference expression expected.")
         }
       case t: TupleTypeInfo[A] =>
         exprs.zipWithIndex.map {
@@ -272,11 +270,11 @@ abstract class TableEnvironment(val config: TableConfig) {
           case (Alias(UnresolvedFieldReference(origName), name), _) =>
             val idx = t.getFieldIndex(origName)
             if (idx < 0) {
-              throw new IllegalArgumentException(s"$origName is not a field of type $t")
+              throw new TableException(s"$origName is not a field of type $t")
             }
             (idx, name)
-          case _ => throw new IllegalArgumentException(
-            "Field reference expression or naming expression expected.")
+          case _ => throw new TableException(
+            "Field reference expression or alias on field expression expected.")
         }
       case c: CaseClassTypeInfo[A] =>
         exprs.zipWithIndex.map {
@@ -284,25 +282,24 @@ abstract class TableEnvironment(val config: TableConfig) {
           case (Alias(UnresolvedFieldReference(origName), name), _) =>
             val idx = c.getFieldIndex(origName)
             if (idx < 0) {
-              throw new IllegalArgumentException(s"$origName is not a field of type $c")
+              throw new TableException(s"$origName is not a field of type $c")
             }
             (idx, name)
-          case _ => throw new IllegalArgumentException(
-            "Field reference expression or naming expression expected.")
+          case _ => throw new TableException(
+            "Field reference expression or alias on field expression expected.")
         }
       case p: PojoTypeInfo[A] =>
         exprs.map {
           case Alias(UnresolvedFieldReference(origName), name) =>
             val idx = p.getFieldIndex(origName)
             if (idx < 0) {
-              throw new IllegalArgumentException(s"$origName is not a field of type $p")
+              throw new TableException(s"$origName is not a field of type $p")
             }
             (idx, name)
-          case _ => throw new IllegalArgumentException(
-            "Field naming expression expected.")
+          case _ => throw new TableException("Alias on field reference expression expected.")
         }
-      case tpe => throw new IllegalArgumentException(
-        s"Type $tpe cannot be converted into Table.")
+      case tpe => throw new TableException(
+        s"Source of type $tpe cannot be converted into Table.")
     }
 
     val (fieldIndexes, fieldNames) = indexedNames.unzip

http://git-wip-us.apache.org/repos/asf/flink/blob/9cc62966/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala
index d347651..bd299b3 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala
@@ -46,7 +46,8 @@ case class Project(projectList: Seq[NamedExpression], child: LogicalNode)
extend
             case c @ Cast(ne: NamedExpression, tp) => Alias(c, s"${ne.name}-$tp")
             case other => Alias(other, s"_c$i")
           }
-          case _ => throw new IllegalArgumentException
+          case _ =>
+            throw new RuntimeException("This should never be called and probably points to
a bug.")
         }
     }
     Project(newProjectList, child)

http://git-wip-us.apache.org/repos/asf/flink/blob/9cc62966/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetRel.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetRel.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetRel.scala
index 8c14e9e..946dfc0 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetRel.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetRel.scala
@@ -27,7 +27,7 @@ import org.apache.flink.api.java.DataSet
 import org.apache.flink.api.table.codegen.CodeGenerator
 import org.apache.flink.api.table.plan.nodes.FlinkRel
 import org.apache.flink.api.table.runtime.MapRunner
-import org.apache.flink.api.table.{BatchTableEnvironment, TableConfig}
+import org.apache.flink.api.table.{BatchTableEnvironment, TableConfig, TableException}
 
 import scala.collection.JavaConversions._
 
@@ -61,7 +61,7 @@ trait DataSetRel extends RelNode with FlinkRel {
         case SqlTypeName.DOUBLE => s + 8
         case SqlTypeName.VARCHAR => s + 12
         case SqlTypeName.CHAR => s + 1
-        case _ => throw new IllegalArgumentException("Unsupported data type encountered")
+        case _ => throw new TableException("Unsupported data type encountered")
       }
     }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9cc62966/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala
index 4ce8e5f..a2ec08d 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala
@@ -108,7 +108,7 @@ object FlinkRuleSets {
   )
 
   /**
-  * RuleSet to optimize plans for batch / DataSet execution
+  * RuleSet to optimize plans for stream / DataStream execution
   */
   val DATASTREAM_OPT_RULES: RuleSet = RuleSets.ofList(
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9cc62966/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/FlinkTable.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/FlinkTable.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/FlinkTable.scala
index 9414fae..7024ce2 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/FlinkTable.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/FlinkTable.scala
@@ -23,8 +23,8 @@ import org.apache.calcite.schema.impl.AbstractTable
 import org.apache.calcite.sql.`type`.SqlTypeName
 import org.apache.flink.api.common.typeinfo.{TypeInformation, AtomicType}
 import org.apache.flink.api.common.typeutils.CompositeType
+import org.apache.flink.api.table.TableException
 import org.apache.flink.api.table.typeutils.TypeConverter
-import org.apache.flink.streaming.api.datastream.DataStream
 
 abstract class FlinkTable[T](
     val typeInfo: TypeInformation[T],
@@ -33,13 +33,13 @@ abstract class FlinkTable[T](
   extends AbstractTable {
 
   if (fieldIndexes.length != fieldNames.length) {
-    throw new IllegalArgumentException(
+    throw new TableException(
       "Number of field indexes and field names must be equal.")
   }
 
   // check uniqueness of field names
   if (fieldNames.length != fieldNames.toSet.size) {
-    throw new IllegalArgumentException(
+    throw new TableException(
       "Table field names must be unique.")
   }
 
@@ -47,8 +47,8 @@ abstract class FlinkTable[T](
     typeInfo match {
       case cType: CompositeType[T] =>
         if (fieldNames.length != cType.getArity) {
-          throw new IllegalArgumentException(
-          s"Arity of DataStream type (" + cType.getFieldNames.deep + ") " +
+          throw new TableException(
+          s"Arity of type (" + cType.getFieldNames.deep + ") " +
             "not equal to number of field names " + fieldNames.deep + ".")
         }
         fieldIndexes
@@ -56,7 +56,7 @@ abstract class FlinkTable[T](
           .map(TypeConverter.typeInfoToSqlType(_))
       case aType: AtomicType[T] =>
         if (fieldIndexes.length != 1 || fieldIndexes(0) != 0) {
-          throw new IllegalArgumentException(
+          throw new TableException(
             "Non-composite input type may have only a single field and its index must be
0.")
         }
         Array(TypeConverter.typeInfoToSqlType(aType))

http://git-wip-us.apache.org/repos/asf/flink/blob/9cc62966/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sources/CsvTableSource.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sources/CsvTableSource.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sources/CsvTableSource.scala
index b9ccbf9..9296fe2 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sources/CsvTableSource.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sources/CsvTableSource.scala
@@ -21,9 +21,9 @@ package org.apache.flink.api.table.sources
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.io.TupleCsvInputFormat
 import org.apache.flink.api.java.tuple.Tuple
-import org.apache.flink.api.java.typeutils.{TupleTypeInfoBase, TupleTypeInfo}
-import org.apache.flink.api.java.{ExecutionEnvironment, DataSet}
-import org.apache.flink.api.table.Row
+import org.apache.flink.api.java.typeutils.{TupleTypeInfo, TupleTypeInfoBase}
+import org.apache.flink.api.java.{DataSet, ExecutionEnvironment}
+import org.apache.flink.api.table.{Row, TableException}
 import org.apache.flink.core.fs.Path
 
 /**
@@ -52,11 +52,11 @@ class CsvTableSource(
   extends BatchTableSource[Tuple] {
 
   if (fieldNames.length != fieldTypes.length) {
-    throw new IllegalArgumentException("Number of field names and field types must be equal.")
+    throw new TableException("Number of field names and field types must be equal.")
   }
 
   if (fieldNames.length > 25) {
-    throw new IllegalArgumentException("Only up to 25 fields supported with this CsvTableSource.")
+    throw new TableException("Only up to 25 fields supported with this CsvTableSource.")
   }
 
   /** Returns the data of the table as a [[DataSet]] of [[Row]]. */

http://git-wip-us.apache.org/repos/asf/flink/blob/9cc62966/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/trees/TreeNode.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/trees/TreeNode.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/trees/TreeNode.scala
index 63c7013..5d3ffef 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/trees/TreeNode.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/trees/TreeNode.scala
@@ -107,8 +107,9 @@ abstract class TreeNode[A <: TreeNode[A]] extends Product { self: A
=>
     try {
       defaultCtor.newInstance(newArgs: _*).asInstanceOf[A]
     } catch {
-      case e: java.lang.IllegalArgumentException =>
-        throw new IllegalArgumentException(s"Fail to copy treeNode ${getClass.getName}")
+      case e: Throwable =>
+        throw new RuntimeException(
+          s"Fail to copy treeNode ${getClass.getName}: ${e.getStackTraceString}")
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9cc62966/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/RowTypeInfo.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/RowTypeInfo.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/RowTypeInfo.scala
index f0f169f..d606a76 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/RowTypeInfo.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/RowTypeInfo.scala
@@ -25,7 +25,7 @@ import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
 
 import scala.collection.mutable.ArrayBuffer
 import org.apache.flink.api.common.typeutils.TypeSerializer
-import org.apache.flink.api.table.Row
+import org.apache.flink.api.table.{Row, TableException}
 
 /**
  * TypeInformation for [[Row]].
@@ -39,10 +39,10 @@ class RowTypeInfo(fieldTypes: Seq[TypeInformation[_]], fieldNames: Seq[String])
 {
 
   if (fieldTypes.length != fieldNames.length) {
-    throw new IllegalArgumentException("Number of field types and names is different.")
+    throw new TableException("Number of field types and names is different.")
   }
   if (fieldNames.length != fieldNames.toSet.size) {
-    throw new IllegalArgumentException("Field names are not unique.")
+    throw new TableException("Field names are not unique.")
   }
 
   def this(fieldTypes: Seq[TypeInformation[_]]) = {

http://git-wip-us.apache.org/repos/asf/flink/blob/9cc62966/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/FromDataSetITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/FromDataSetITCase.java
b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/FromDataSetITCase.java
index a3204f9..ecd916f 100644
--- a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/FromDataSetITCase.java
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/FromDataSetITCase.java
@@ -30,6 +30,7 @@ import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.table.BatchTableEnvironment;
 import org.apache.flink.api.table.TableEnvironment;
+import org.apache.flink.api.table.TableException;
 import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -220,7 +221,7 @@ public class FromDataSetITCase extends TableProgramsTestBase {
 		compareResultAsText(results, expected);
 	}
 
-	@Test(expected = IllegalArgumentException.class)
+	@Test(expected = TableException.class)
 	public void testAsWithToFewFields() throws Exception {
 		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
@@ -229,7 +230,7 @@ public class FromDataSetITCase extends TableProgramsTestBase {
 		tableEnv.fromDataSet(CollectionDataSets.get3TupleDataSet(env), "a, b");
 	}
 
-	@Test(expected = IllegalArgumentException.class)
+	@Test(expected = TableException.class)
 	public void testAsWithToManyFields() throws Exception {
 		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
@@ -238,7 +239,7 @@ public class FromDataSetITCase extends TableProgramsTestBase {
 		tableEnv.fromDataSet(CollectionDataSets.get3TupleDataSet(env), "a, b, c, d");
 	}
 
-	@Test(expected = IllegalArgumentException.class)
+	@Test(expected = TableException.class)
 	public void testAsWithAmbiguousFields() throws Exception {
 		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
@@ -247,7 +248,7 @@ public class FromDataSetITCase extends TableProgramsTestBase {
 		tableEnv.fromDataSet(CollectionDataSets.get3TupleDataSet(env), "a, b, b");
 	}
 
-	@Test(expected = IllegalArgumentException.class)
+	@Test(expected = TableException.class)
 	public void testAsWithNonFieldReference1() throws Exception {
 		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
@@ -256,7 +257,7 @@ public class FromDataSetITCase extends TableProgramsTestBase {
 		tableEnv.fromDataSet(CollectionDataSets.get3TupleDataSet(env), "a + 1, b, c");
 	}
 
-	@Test(expected = IllegalArgumentException.class)
+	@Test(expected = TableException.class)
 	public void testAsWithNonFieldReference2() throws Exception {
 		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());

http://git-wip-us.apache.org/repos/asf/flink/blob/9cc62966/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/ToTableITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/ToTableITCase.scala
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/ToTableITCase.scala
index ed7b88f..84bdbb0 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/ToTableITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/ToTableITCase.scala
@@ -23,7 +23,7 @@ import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase
 import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
 import org.apache.flink.api.scala.table._
 import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.api.table.{Row, TableEnvironment}
+import org.apache.flink.api.table.{Row, TableEnvironment, TableException}
 import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
 import org.apache.flink.test.util.TestBaseUtils
 import org.junit._
@@ -101,7 +101,7 @@ class ToTableITCase(
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
-  @Test(expected = classOf[IllegalArgumentException])
+  @Test(expected = classOf[TableException])
   def testToTableWithToFewFields(): Unit = {
     val env = ExecutionEnvironment.getExecutionEnvironment
     val tEnv = TableEnvironment.getTableEnvironment(env, config)
@@ -111,7 +111,7 @@ class ToTableITCase(
       .toTable(tEnv, 'a, 'b)
   }
 
-  @Test(expected = classOf[IllegalArgumentException])
+  @Test(expected = classOf[TableException])
   def testToTableWithToManyFields(): Unit = {
     val env = ExecutionEnvironment.getExecutionEnvironment
     val tEnv = TableEnvironment.getTableEnvironment(env, config)
@@ -121,7 +121,7 @@ class ToTableITCase(
       .toTable(tEnv, 'a, 'b, 'c, 'd)
   }
 
-  @Test(expected = classOf[IllegalArgumentException])
+  @Test(expected = classOf[TableException])
   def testToTableWithAmbiguousFields(): Unit = {
     val env = ExecutionEnvironment.getExecutionEnvironment
     val tEnv = TableEnvironment.getTableEnvironment(env, config)
@@ -131,7 +131,7 @@ class ToTableITCase(
       .toTable(tEnv, 'a, 'b, 'b)
   }
 
-  @Test(expected = classOf[IllegalArgumentException])
+  @Test(expected = classOf[TableException])
   def testToTableWithNonFieldReference1(): Unit = {
     val env = ExecutionEnvironment.getExecutionEnvironment
     val tEnv = TableEnvironment.getTableEnvironment(env, config)
@@ -141,7 +141,7 @@ class ToTableITCase(
       .toTable(tEnv, 'a + 1, 'b, 'c)
   }
 
-  @Test(expected = classOf[IllegalArgumentException])
+  @Test(expected = classOf[TableException])
   def testToTableWithNonFieldReference2(): Unit = {
     val env = ExecutionEnvironment.getExecutionEnvironment
     val tEnv = TableEnvironment.getTableEnvironment(env, config)

http://git-wip-us.apache.org/repos/asf/flink/blob/9cc62966/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/SelectITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/SelectITCase.scala
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/SelectITCase.scala
index e3eb19a..c6a2139 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/SelectITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/SelectITCase.scala
@@ -19,9 +19,9 @@
 package org.apache.flink.api.scala.stream.table
 
 import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.stream.utils.{StreamTestData, StreamITCase}
+import org.apache.flink.api.scala.stream.utils.{StreamITCase, StreamTestData}
 import org.apache.flink.api.scala.table._
-import org.apache.flink.api.table.{Row, TableEnvironment}
+import org.apache.flink.api.table.{Row, TableEnvironment, TableException}
 import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
 import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
 import org.junit.Assert._
@@ -108,7 +108,7 @@ class SelectITCase extends StreamingMultipleProgramsTestBase {
     assertEquals(expected.sorted, StreamITCase.testResults.sorted)
   }
 
-  @Test(expected = classOf[IllegalArgumentException])
+  @Test(expected = classOf[TableException])
   def testAsWithToFewFields(): Unit = {
 
     val env = StreamExecutionEnvironment.getExecutionEnvironment
@@ -124,7 +124,7 @@ class SelectITCase extends StreamingMultipleProgramsTestBase {
     assertEquals(expected.sorted, StreamITCase.testResults.sorted)
   }
 
-  @Test(expected = classOf[IllegalArgumentException])
+  @Test(expected = classOf[TableException])
   def testAsWithToManyFields(): Unit = {
 
     val env = StreamExecutionEnvironment.getExecutionEnvironment
@@ -140,7 +140,7 @@ class SelectITCase extends StreamingMultipleProgramsTestBase {
     assertEquals(expected.sorted, StreamITCase.testResults.sorted)
   }
 
-  @Test(expected = classOf[IllegalArgumentException])
+  @Test(expected = classOf[TableException])
   def testAsWithAmbiguousFields(): Unit = {
 
     val env = StreamExecutionEnvironment.getExecutionEnvironment
@@ -157,7 +157,7 @@ class SelectITCase extends StreamingMultipleProgramsTestBase {
   }
 
 
-  @Test(expected = classOf[IllegalArgumentException])
+  @Test(expected = classOf[TableException])
   def testOnlyFieldRefInAs(): Unit = {
 
     val env = StreamExecutionEnvironment.getExecutionEnvironment


Mime
View raw message