flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ches...@apache.org
Subject [08/11] flink git commit: [FLINK-6722] [table] Activate strict checkstyle
Date Thu, 01 Jun 2017 10:57:12 GMT
[FLINK-6722] [table] Activate strict checkstyle

This closes #4021.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ceaf5b61
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ceaf5b61
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ceaf5b61

Branch: refs/heads/master
Commit: ceaf5b611090cec51dd1d2af1681eb496912b993
Parents: 2f8cacd
Author: Greg Hogan <code@greghogan.com>
Authored: Thu May 25 17:20:56 2017 -0400
Committer: zentol <chesnay@apache.org>
Committed: Thu Jun 1 11:15:06 2017 +0200

----------------------------------------------------------------------
 flink-libraries/flink-table/pom.xml             |  34 +++
 .../flink/table/annotation/TableType.java       |   1 +
 .../flink/table/api/java/package-info.java      |   1 +
 .../org/apache/flink/table/explain/Node.java    |  60 ++++--
 .../flink/table/explain/PlanJsonParser.java     |  48 +++--
 .../resources/tableSourceConverter.properties   |   2 +-
 .../api/java/batch/TableEnvironmentITCase.java  |  49 +++--
 .../table/api/java/batch/TableSourceITCase.java |   4 +
 .../api/java/batch/sql/GroupingSetsITCase.java  |  10 +-
 .../table/api/java/batch/sql/SqlITCase.java     |  16 +-
 .../table/api/java/stream/sql/SqlITCase.java    |  32 +--
 .../api/java/stream/utils/StreamTestData.java   |   3 +
 .../api/java/utils/UserDefinedAggFunctions.java | 205 ++++++++++---------
 .../java/utils/UserDefinedScalarFunctions.java  |  22 +-
 .../java/utils/UserDefinedTableFunctions.java   |   7 +
 .../table/api/scala/batch/ExplainTest.scala     |   8 +-
 16 files changed, 325 insertions(+), 177 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ceaf5b61/flink-libraries/flink-table/pom.xml
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/pom.xml b/flink-libraries/flink-table/pom.xml
index 15a9d07..855a520 100644
--- a/flink-libraries/flink-table/pom.xml
+++ b/flink-libraries/flink-table/pom.xml
@@ -251,6 +251,40 @@ under the License.
 				</configuration>
 			</plugin>
 
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-checkstyle-plugin</artifactId>
+				<version>2.17</version>
+				<dependencies>
+					<dependency>
+						<groupId>com.puppycrawl.tools</groupId>
+						<artifactId>checkstyle</artifactId>
+						<version>6.19</version>
+					</dependency>
+				</dependencies>
+				<configuration>
+					<configLocation>/tools/maven/strict-checkstyle.xml</configLocation>
+					<suppressionsLocation>/tools/maven/suppressions.xml</suppressionsLocation>
+					<includeTestSourceDirectory>true</includeTestSourceDirectory>
+					<logViolationsToConsole>true</logViolationsToConsole>
+					<failOnViolation>true</failOnViolation>
+				</configuration>
+				<executions>
+					<!--
+					Execute checkstyle after compilation but before tests.
+
+					This ensures that any parsing or type checking errors are from
+					javac, so they look as expected. Beyond that, we want to
+					fail as early as possible.
+					-->
+					<execution>
+						<phase>test-compile</phase>
+						<goals>
+							<goal>check</goal>
+						</goals>
+					</execution>
+				</executions>
+			</plugin>
 		</plugins>
 	</build>
 </project>

http://git-wip-us.apache.org/repos/asf/flink/blob/ceaf5b61/flink-libraries/flink-table/src/main/java/org/apache/flink/table/annotation/TableType.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/java/org/apache/flink/table/annotation/TableType.java
b/flink-libraries/flink-table/src/main/java/org/apache/flink/table/annotation/TableType.java
index 3845eae..2d2a7af 100644
--- a/flink-libraries/flink-table/src/main/java/org/apache/flink/table/annotation/TableType.java
+++ b/flink-libraries/flink-table/src/main/java/org/apache/flink/table/annotation/TableType.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.annotation;
 
 import org.apache.flink.annotation.Public;
 import org.apache.flink.table.catalog.TableSourceConverter;
+
 import java.lang.annotation.Documented;
 import java.lang.annotation.ElementType;
 import java.lang.annotation.Retention;

http://git-wip-us.apache.org/repos/asf/flink/blob/ceaf5b61/flink-libraries/flink-table/src/main/java/org/apache/flink/table/api/java/package-info.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/java/org/apache/flink/table/api/java/package-info.java
b/flink-libraries/flink-table/src/main/java/org/apache/flink/table/api/java/package-info.java
index 3dbf50f..50d41a2 100644
--- a/flink-libraries/flink-table/src/main/java/org/apache/flink/table/api/java/package-info.java
+++ b/flink-libraries/flink-table/src/main/java/org/apache/flink/table/api/java/package-info.java
@@ -61,6 +61,7 @@
  * {@link org.apache.flink.table.api.java.StreamTableEnvironment#toAppendStream(Table, java.lang.Class)}},
or
  * {@link org.apache.flink.table.api.java.StreamTableEnvironment#toRetractStream(Table, java.lang.Class)}}.
  */
+
 package org.apache.flink.table.api.java;
 
 import org.apache.flink.table.api.Table;

http://git-wip-us.apache.org/repos/asf/flink/blob/ceaf5b61/flink-libraries/flink-table/src/main/java/org/apache/flink/table/explain/Node.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/java/org/apache/flink/table/explain/Node.java
b/flink-libraries/flink-table/src/main/java/org/apache/flink/table/explain/Node.java
index 4616728..6317d0c 100644
--- a/flink-libraries/flink-table/src/main/java/org/apache/flink/table/explain/Node.java
+++ b/flink-libraries/flink-table/src/main/java/org/apache/flink/table/explain/Node.java
@@ -20,77 +20,93 @@ package org.apache.flink.table.explain;
 
 import java.util.List;
 
+/**
+ * Field hierarchy of an execution plan.
+ */
 public class Node {
 	private int id;
 	private String type;
 	private String pact;
 	private String contents;
 	private int parallelism;
-	private String driver_strategy;
+	private String driverStrategy;
 	private List<Predecessors> predecessors;
-	private List<Global_properties> global_properties;
-	private List<LocalProperty> local_properties;
+	private List<GlobalProperties> globalProperties;
+	private List<LocalProperty> localProperties;
 	private List<Estimates> estimates;
 	private List<Costs> costs;
-	private List<Compiler_hints> compiler_hints;
+	private List<CompilerHints> compilerHints;
 
 	public int getId() {
 		return id;
 	}
+
 	public String getType() {
 		return type;
 	}
+
 	public String getPact() {
 		return pact;
 	}
+
 	public String getContents() {
 		return contents;
 	}
+
 	public int getParallelism() {
 		return parallelism;
 	}
-	public String getDriver_strategy() {
-		return driver_strategy;
+
+	public String getDriverStrategy() {
+		return driverStrategy;
 	}
+
 	public List<Predecessors> getPredecessors() {
 		return predecessors;
 	}
-	public List<Global_properties> getGlobal_properties() {
-		return global_properties;
+
+	public List<GlobalProperties> getGlobalProperties() {
+		return globalProperties;
 	}
-	public List<LocalProperty> getLocal_properties() {
-		return local_properties;
+
+	public List<LocalProperty> getLocalProperties() {
+		return localProperties;
 	}
+
 	public List<Estimates> getEstimates() {
 		return estimates;
 	}
+
 	public List<Costs> getCosts() {
 		return costs;
 	}
-	public List<Compiler_hints> getCompiler_hints() {
-		return compiler_hints;
+
+	public List<CompilerHints> getCompilerHints() {
+		return compilerHints;
 	}
 }
 
 class Predecessors {
-	private String ship_strategy;
-	private String exchange_mode;
+	private String shipStrategy;
+	private String exchangeMode;
 
-	public String getShip_strategy() {
-		return ship_strategy;
+	public String getShipStrategy() {
+		return shipStrategy;
 	}
-	public String getExchange_mode() {
-		return exchange_mode;
+
+	public String getExchangeMode() {
+		return exchangeMode;
 	}
 }
 
-class Global_properties {
+class GlobalProperties {
 	private String name;
 	private String value;
 
 	public String getValue() {
 		return value;
 	}
+
 	public String getName() {
 		return name;
 	}
@@ -103,6 +119,7 @@ class LocalProperty {
 	public String getValue() {
 		return value;
 	}
+
 	public String getName() {
 		return name;
 	}
@@ -115,6 +132,7 @@ class Estimates {
 	public String getValue() {
 		return value;
 	}
+
 	public String getName() {
 		return name;
 	}
@@ -127,18 +145,20 @@ class Costs {
 	public String getValue() {
 		return value;
 	}
+
 	public String getName() {
 		return name;
 	}
 }
 
-class Compiler_hints {
+class CompilerHints {
 	private String name;
 	private String value;
 
 	public String getValue() {
 		return value;
 	}
+
 	public String getName() {
 		return name;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/ceaf5b61/flink-libraries/flink-table/src/main/java/org/apache/flink/table/explain/PlanJsonParser.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/java/org/apache/flink/table/explain/PlanJsonParser.java
b/flink-libraries/flink-table/src/main/java/org/apache/flink/table/explain/PlanJsonParser.java
index f13c042..ee9b9da 100644
--- a/flink-libraries/flink-table/src/main/java/org/apache/flink/table/explain/PlanJsonParser.java
+++ b/flink-libraries/flink-table/src/main/java/org/apache/flink/table/explain/PlanJsonParser.java
@@ -18,20 +18,25 @@
 
 package org.apache.flink.table.explain;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.PropertyNamingStrategy;
 
 import java.io.PrintWriter;
 import java.io.StringWriter;
 import java.util.LinkedHashMap;
 import java.util.List;
 
+/**
+ * Utility for converting an execution plan from JSON to a human-readable string.
+ */
 public class PlanJsonParser {
 
 	public static String getSqlExecutionPlan(String t, Boolean extended) throws Exception {
 		ObjectMapper objectMapper = new ObjectMapper();
+		objectMapper.setPropertyNamingStrategy(PropertyNamingStrategy.SNAKE_CASE);
 
-		//not every node is same, ignore the unknown field
+		// not every node is same, ignore the unknown field
 		objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
 
 		PlanTree tree = objectMapper.readValue(t, PlanTree.class);
@@ -43,7 +48,7 @@ public class PlanJsonParser {
 		for (int index = 0; index < tree.getNodes().size(); index++) {
 			Node tempNode = tree.getNodes().get(index);
 
-			//input with operation such as join or union is coordinate, keep the same indent 
+			// input with operation such as join or union is coordinate, keep the same indent
 			if ((tempNode.getPact().equals("Data Source")) && (map.containsKey(tempNode.getPact())))
{
 				tabCount = map.get(tempNode.getPact());
 			}
@@ -57,15 +62,15 @@ public class PlanJsonParser {
 			printTab(tabCount + 1, pw);
 			String content = tempNode.getContents();
 
-			//drop the hashcode of object instance
+			// drop the hashcode of object instance
 			int dele = tempNode.getContents().indexOf("@");
 			if (dele > -1) {
 				content = tempNode.getContents().substring(0, dele);
 			}
 
-			//replace with certain content if node is dataSource to pass
-			//unit tests, because java and scala use different api to
-			//get input element
+			// replace with certain content if node is dataSource to pass
+			// unit tests, because java and scala use different api to
+			// get input element
 			if (tempNode.getPact().equals("Data Source")) {
 				content = "collect elements with CollectionInputFormat";
 			}
@@ -74,35 +79,35 @@ public class PlanJsonParser {
 			List<Predecessors> predecessors = tempNode.getPredecessors();
 			if (predecessors != null) {
 				printTab(tabCount + 1, pw);
-				pw.print("ship_strategy : " + predecessors.get(0).getShip_strategy() + "\n");
+				pw.print("ship_strategy : " + predecessors.get(0).getShipStrategy() + "\n");
 
-				String mode = predecessors.get(0).getExchange_mode();
+				String mode = predecessors.get(0).getExchangeMode();
 				if (mode != null) {
 					printTab(tabCount + 1, pw);
 					pw.print("exchange_mode : " + mode + "\n");
 				}
 			}
 
-			if (tempNode.getDriver_strategy() != null) {
+			if (tempNode.getDriverStrategy() != null) {
 				printTab(tabCount + 1, pw);
-				pw.print("driver_strategy : " + tempNode.getDriver_strategy() + "\n");
+				pw.print("driver_strategy : " + tempNode.getDriverStrategy() + "\n");
 			}
 
-			if (tempNode.getGlobal_properties() != null) {
+			if (tempNode.getGlobalProperties() != null) {
 				printTab(tabCount + 1, pw);
-				pw.print(tempNode.getGlobal_properties().get(0).getName() + " : "
-					+ tempNode.getGlobal_properties().get(0).getValue() + "\n");
+				pw.print(tempNode.getGlobalProperties().get(0).getName() + " : "
+					+ tempNode.getGlobalProperties().get(0).getValue() + "\n");
 			}
 
 			if (extended) {
-				List<Global_properties> globalProperties = tempNode.getGlobal_properties();
+				List<GlobalProperties> globalProperties = tempNode.getGlobalProperties();
 				for (int i = 1; i < globalProperties.size(); i++) {
 					printTab(tabCount + 1, pw);
 					pw.print(globalProperties.get(i).getName() + " : "
 					+ globalProperties.get(i).getValue() + "\n");
 				}
 
-				List<LocalProperty> localProperties = tempNode.getLocal_properties();
+				List<LocalProperty> localProperties = tempNode.getLocalProperties();
 				for (int i = 0; i < localProperties.size(); i++) {
 					printTab(tabCount + 1, pw);
 					pw.print(localProperties.get(i).getName() + " : "
@@ -123,11 +128,11 @@ public class PlanJsonParser {
 					+ costs.get(i).getValue() + "\n");
 				}
 
-				List<Compiler_hints> compilerHintses = tempNode.getCompiler_hints();
-				for (int i = 0; i < compilerHintses.size(); i++) {
+				List<CompilerHints> compilerHints = tempNode.getCompilerHints();
+				for (int i = 0; i < compilerHints.size(); i++) {
 					printTab(tabCount + 1, pw);
-					pw.print(compilerHintses.get(i).getName() + " : "
-					+ compilerHintses.get(i).getValue() + "\n");
+					pw.print(compilerHints.get(i).getName() + " : "
+					+ compilerHints.get(i).getValue() + "\n");
 				}
 			}
 			tabCount++;
@@ -138,8 +143,9 @@ public class PlanJsonParser {
 	}
 
 	private static void printTab(int tabCount, PrintWriter pw) {
-		for (int i = 0; i < tabCount; i++)
+		for (int i = 0; i < tabCount; i++) {
 			pw.print("\t");
+		}
 	}
 }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ceaf5b61/flink-libraries/flink-table/src/main/resources/tableSourceConverter.properties
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/resources/tableSourceConverter.properties
b/flink-libraries/flink-table/src/main/resources/tableSourceConverter.properties
index 1632e12..d548f48 100644
--- a/flink-libraries/flink-table/src/main/resources/tableSourceConverter.properties
+++ b/flink-libraries/flink-table/src/main/resources/tableSourceConverter.properties
@@ -26,4 +26,4 @@
 # which offers converters instead of put all information into the
 # tableSourceConverter.properties of flink-table module.
 ################################################################################
-scan.packages=org.apache.flink.table.sources
\ No newline at end of file
+scan.packages=org.apache.flink.table.sources

http://git-wip-us.apache.org/repos/asf/flink/blob/ceaf5b61/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableEnvironmentITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableEnvironmentITCase.java
b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableEnvironmentITCase.java
index 3bb283f..aac7e11 100644
--- a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableEnvironmentITCase.java
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableEnvironmentITCase.java
@@ -18,38 +18,43 @@
 
 package org.apache.flink.table.api.java.batch;
 
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.calcite.tools.RuleSets;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.typeutils.GenericTypeInfo;
-import org.apache.flink.table.api.java.BatchTableEnvironment;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.tuple.Tuple4;
 import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.java.BatchTableEnvironment;
 import org.apache.flink.table.api.scala.batch.utils.TableProgramsCollectionTestBase;
 import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase;
-import org.apache.flink.types.Row;
 import org.apache.flink.table.calcite.CalciteConfig;
 import org.apache.flink.table.calcite.CalciteConfigBuilder;
-import org.apache.flink.table.api.Table;
-import org.apache.flink.table.api.TableEnvironment;
-import org.apache.flink.table.api.TableException;
 import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
+import org.apache.flink.types.Row;
+
+import org.apache.calcite.tools.RuleSets;
 import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+
 import static org.junit.Assert.assertTrue;
 
+/**
+ * Integration tests for {@link BatchTableEnvironment}.
+ */
 @RunWith(Parameterized.class)
 public class TableEnvironmentITCase extends TableProgramsCollectionTestBase {
 
@@ -401,7 +406,7 @@ public class TableEnvironmentITCase extends TableProgramsCollectionTestBase
{
 		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
 
 		// use null value the enforce GenericType
-		DataSet<Row> dataSet = env.fromElements(Row.of((Integer)null));
+		DataSet<Row> dataSet = env.fromElements(Row.of((Integer) null));
 		assertTrue(dataSet.getType() instanceof GenericTypeInfo);
 		assertTrue(dataSet.getType().getTypeClass().equals(Row.class));
 
@@ -482,10 +487,16 @@ public class TableEnvironmentITCase extends TableProgramsCollectionTestBase
{
 
 	// --------------------------------------------------------------------------------------------
 
+	/**
+	 * Non-static class.
+	 */
 	public class MyNonStatic {
 		public int number;
 	}
 
+	/**
+	 * Small POJO.
+	 */
 	@SuppressWarnings("unused")
 	public static class SmallPojo {
 
@@ -506,6 +517,9 @@ public class TableEnvironmentITCase extends TableProgramsCollectionTestBase
{
 		public Integer[] roles;
 	}
 
+	/**
+	 * POJO with generic fields.
+	 */
 	@SuppressWarnings("unused")
 	public static class PojoWithGeneric {
 		public String name;
@@ -531,6 +545,9 @@ public class TableEnvironmentITCase extends TableProgramsCollectionTestBase
{
 		}
 	}
 
+	/**
+	 * Small POJO with private fields.
+	 */
 	@SuppressWarnings("unused")
 	public static class PrivateSmallPojo {
 
@@ -581,6 +598,9 @@ public class TableEnvironmentITCase extends TableProgramsCollectionTestBase
{
 		}
 	}
 
+	/**
+	 * Another small POJO.
+	 */
 	@SuppressWarnings("unused")
 	public static class SmallPojo2 {
 
@@ -606,6 +626,9 @@ public class TableEnvironmentITCase extends TableProgramsCollectionTestBase
{
 		}
 	}
 
+	/**
+	 * Another small POJO with private fields.
+	 */
 	@SuppressWarnings("unused")
 	public static class PrivateSmallPojo2 {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ceaf5b61/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableSourceITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableSourceITCase.java
b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableSourceITCase.java
index a7ccb7e..864d4f8 100644
--- a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableSourceITCase.java
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableSourceITCase.java
@@ -27,12 +27,16 @@ import org.apache.flink.table.api.scala.batch.utils.TableProgramsCollectionTestB
 import org.apache.flink.table.sources.BatchTableSource;
 import org.apache.flink.table.utils.CommonTestData;
 import org.apache.flink.types.Row;
+
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
 import java.util.List;
 
+/**
+ * Integration tests for {@link BatchTableSource}.
+ */
 @RunWith(Parameterized.class)
 public class TableSourceITCase extends TableProgramsCollectionTestBase {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ceaf5b61/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/sql/GroupingSetsITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/sql/GroupingSetsITCase.java
b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/sql/GroupingSetsITCase.java
index 3f611d5..6c1a753 100644
--- a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/sql/GroupingSetsITCase.java
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/sql/GroupingSetsITCase.java
@@ -18,8 +18,6 @@
 
 package org.apache.flink.table.api.java.batch.sql;
 
-import java.util.Comparator;
-import java.util.List;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
@@ -33,11 +31,15 @@ import org.apache.flink.table.api.scala.batch.utils.TableProgramsClusterTestBase
 import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
 import org.apache.flink.test.util.TestBaseUtils;
 import org.apache.flink.types.Row;
+
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
+import java.util.Comparator;
+import java.util.List;
+
 /**
  * This test should be replaced by a DataSetAggregateITCase.
  * We should only perform logical unit tests here.
@@ -46,8 +48,8 @@ import org.junit.runners.Parameterized;
 @RunWith(Parameterized.class)
 public class GroupingSetsITCase extends TableProgramsClusterTestBase {
 
-	private final static String TABLE_NAME = "MyTable";
-	private final static String TABLE_WITH_NULLS_NAME = "MyTableWithNulls";
+	private static final String TABLE_NAME = "MyTable";
+	private static final String TABLE_WITH_NULLS_NAME = "MyTableWithNulls";
 	private BatchTableEnvironment tableEnv;
 
 	public GroupingSetsITCase(TestExecutionMode mode, TableConfigMode tableConfigMode) {

http://git-wip-us.apache.org/repos/asf/flink/blob/ceaf5b61/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/sql/SqlITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/sql/SqlITCase.java
b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/sql/SqlITCase.java
index 114226c..f4e5daf 100644
--- a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/sql/SqlITCase.java
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/sql/SqlITCase.java
@@ -22,17 +22,18 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.typeutils.MapTypeInfo;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.apache.flink.table.api.java.BatchTableEnvironment;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.tuple.Tuple5;
-import org.apache.flink.types.Row;
-import org.apache.flink.table.api.scala.batch.utils.TableProgramsCollectionTestBase;
+import org.apache.flink.api.java.typeutils.MapTypeInfo;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.table.api.Table;
 import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.java.BatchTableEnvironment;
+import org.apache.flink.table.api.scala.batch.utils.TableProgramsCollectionTestBase;
 import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
+import org.apache.flink.types.Row;
+
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
@@ -42,6 +43,9 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
+/**
+ * Integration tests for batch SQL.
+ */
 @RunWith(Parameterized.class)
 public class SqlITCase extends TableProgramsCollectionTestBase {
 
@@ -136,7 +140,7 @@ public class SqlITCase extends TableProgramsCollectionTestBase {
 		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
 
 		tableEnv.registerDataSet("t1", ds1, "a, b, c");
-		tableEnv.registerDataSet("t2",ds2, "d, e, f, g, h");
+		tableEnv.registerDataSet("t2", ds2, "d, e, f, g, h");
 
 		String sqlQuery = "SELECT c, g FROM t1, t2 WHERE b = e";
 		Table result = tableEnv.sql(sqlQuery);

http://git-wip-us.apache.org/repos/asf/flink/blob/ceaf5b61/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/stream/sql/SqlITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/stream/sql/SqlITCase.java
b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/stream/sql/SqlITCase.java
index d827cd6..9270221 100644
--- a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/stream/sql/SqlITCase.java
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/stream/sql/SqlITCase.java
@@ -18,27 +18,31 @@
 
 package org.apache.flink.table.api.java.stream.sql;
 
-import org.apache.flink.table.api.java.StreamTableEnvironment;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.tuple.Tuple5;
-import org.apache.flink.types.Row;
-import org.apache.flink.table.api.scala.stream.utils.StreamITCase;
-import org.apache.flink.table.api.Table;
-import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.java.StreamTableEnvironment;
 import org.apache.flink.table.api.java.stream.utils.StreamTestData;
+import org.apache.flink.table.api.scala.stream.utils.StreamITCase;
+import org.apache.flink.types.Row;
+
 import org.junit.Test;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.typeutils.RowTypeInfo;
 
 import java.util.ArrayList;
 import java.util.List;
 
+/**
+ * Integration tests for streaming SQL.
+ */
 public class SqlITCase extends StreamingMultipleProgramsTestBase {
-	
+
 	@Test
 	public void testRowRegisterRowWithNames() throws Exception {
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
@@ -49,17 +53,17 @@ public class SqlITCase extends StreamingMultipleProgramsTestBase {
 		data.add(Row.of(1, 1L, "Hi"));
 		data.add(Row.of(2, 2L, "Hello"));
 		data.add(Row.of(3, 2L, "Hello world"));
-		
+
 		TypeInformation<?>[] types = {
 				BasicTypeInfo.INT_TYPE_INFO,
 				BasicTypeInfo.LONG_TYPE_INFO,
 				BasicTypeInfo.STRING_TYPE_INFO};
-		String names[] = {"a","b","c"};
-		
+		String[] names = {"a", "b", "c"};
+
 		RowTypeInfo typeInfo = new RowTypeInfo(types, names);
-		
+
 		DataStream<Row> ds = env.fromCollection(data).returns(typeInfo);
-		
+
 		Table in = tableEnv.fromDataStream(ds, "a,b,c");
 		tableEnv.registerTable("MyTableRow", in);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ceaf5b61/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/stream/utils/StreamTestData.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/stream/utils/StreamTestData.java
b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/stream/utils/StreamTestData.java
index 139801f..a23bc5a 100644
--- a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/stream/utils/StreamTestData.java
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/stream/utils/StreamTestData.java
@@ -27,6 +27,9 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 
+/**
+ * Test data.
+ */
 public class StreamTestData {
 
 	public static DataStream<Tuple3<Integer, Long, String>> getSmall3TupleDataSet(StreamExecutionEnvironment
env) {

http://git-wip-us.apache.org/repos/asf/flink/blob/ceaf5b61/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/utils/UserDefinedAggFunctions.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/utils/UserDefinedAggFunctions.java
b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/utils/UserDefinedAggFunctions.java
index a51a4af..94c5c90 100644
--- a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/utils/UserDefinedAggFunctions.java
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/utils/UserDefinedAggFunctions.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.table.api.java.utils;
 
 import org.apache.flink.api.java.tuple.Tuple2;
@@ -22,98 +23,116 @@ import org.apache.flink.table.functions.AggregateFunction;
 
 import java.util.Iterator;
 
+/**
+ * Test aggregator functions.
+ */
 public class UserDefinedAggFunctions {
-    // Accumulator for test requiresOver
-    public static class Accumulator0 extends Tuple2<Long, Integer>{}
-
-    // Test for requiresOver
-    public static class OverAgg0 extends AggregateFunction<Long, Accumulator0> {
-        @Override
-        public Accumulator0 createAccumulator() {
-            return new Accumulator0();
-        }
-
-        @Override
-        public Long getValue(Accumulator0 accumulator) {
-            return 1L;
-        }
-
-        //Overloaded accumulate method
-        public void accumulate(Accumulator0 accumulator, long iValue, int iWeight) {
-        }
-
-        @Override
-        public boolean requiresOver() {
-            return true;
-        }
-    }
-
-    // Accumulator for WeightedAvg
-    public static class WeightedAvgAccum extends Tuple2<Long, Integer> {
-        public long sum = 0;
-        public int count = 0;
-    }
-
-    // Base class for WeightedAvg
-    public static class WeightedAvg extends AggregateFunction<Long, WeightedAvgAccum>
{
-        @Override
-        public WeightedAvgAccum createAccumulator() {
-            return new WeightedAvgAccum();
-        }
-
-        @Override
-        public Long getValue(WeightedAvgAccum accumulator) {
-            if (accumulator.count == 0)
-                return null;
-            else
-                return accumulator.sum/accumulator.count;
-        }
-
-        //Overloaded accumulate method
-        public void accumulate(WeightedAvgAccum accumulator, long iValue, int iWeight) {
-            accumulator.sum += iValue * iWeight;
-            accumulator.count += iWeight;
-        }
-
-        //Overloaded accumulate method
-        public void accumulate(WeightedAvgAccum accumulator, int iValue, int iWeight) {
-            accumulator.sum += iValue * iWeight;
-            accumulator.count += iWeight;
-        }
-    }
-
-    // A WeightedAvg class with merge method
-    public static class WeightedAvgWithMerge extends WeightedAvg {
-        public void merge(WeightedAvgAccum acc, Iterable<WeightedAvgAccum> it) {
-            Iterator<WeightedAvgAccum> iter = it.iterator();
-            while (iter.hasNext()) {
-                WeightedAvgAccum a = iter.next();
-                acc.count += a.count;
-                acc.sum += a.sum;
-            }
-        }
-    }
-
-    // A WeightedAvg class with merge and reset method
-    public static class WeightedAvgWithMergeAndReset extends WeightedAvgWithMerge {
-        public void resetAccumulator(WeightedAvgAccum acc) {
-            acc.count = 0;
-            acc.sum = 0L;
-        }
-    }
-
-    // A WeightedAvg class with retract method
-    public static class WeightedAvgWithRetract extends WeightedAvg {
-        //Overloaded retract method
-        public void retract(WeightedAvgAccum accumulator, long iValue, int iWeight) {
-            accumulator.sum -= iValue * iWeight;
-            accumulator.count -= iWeight;
-        }
-
-        //Overloaded retract method
-        public void retract(WeightedAvgAccum accumulator, int iValue, int iWeight) {
-            accumulator.sum -= iValue * iWeight;
-            accumulator.count -= iWeight;
-        }
-    }
+	/**
+	 * Accumulator for test requiresOver.
+ 	 */
+	public static class Accumulator0 extends Tuple2<Long, Integer>{}
+
+	/**
+	 * Test for requiresOver.
+	 */
+	public static class OverAgg0 extends AggregateFunction<Long, Accumulator0> {
+		@Override
+		public Accumulator0 createAccumulator() {
+			return new Accumulator0();
+		}
+
+		@Override
+		public Long getValue(Accumulator0 accumulator) {
+			return 1L;
+		}
+
+		//Overloaded accumulate method
+		public void accumulate(Accumulator0 accumulator, long iValue, int iWeight) {
+		}
+
+		@Override
+		public boolean requiresOver() {
+			return true;
+		}
+	}
+
+	/**
+	 * Accumulator for WeightedAvg.
+	 */
+	public static class WeightedAvgAccum extends Tuple2<Long, Integer> {
+		public long sum = 0;
+		public int count = 0;
+	}
+
+	/**
+	 * Base class for WeightedAvg.
+	 */
+	public static class WeightedAvg extends AggregateFunction<Long, WeightedAvgAccum>
{
+		@Override
+		public WeightedAvgAccum createAccumulator() {
+			return new WeightedAvgAccum();
+		}
+
+		@Override
+		public Long getValue(WeightedAvgAccum accumulator) {
+			if (accumulator.count == 0) {
+				return null;
+			} else {
+				return accumulator.sum / accumulator.count;
+			}
+		}
+
+		// overloaded accumulate method
+		public void accumulate(WeightedAvgAccum accumulator, long iValue, int iWeight) {
+			accumulator.sum += iValue * iWeight;
+			accumulator.count += iWeight;
+		}
+
+		//Overloaded accumulate method
+		public void accumulate(WeightedAvgAccum accumulator, int iValue, int iWeight) {
+			accumulator.sum += iValue * iWeight;
+			accumulator.count += iWeight;
+		}
+	}
+
+	/**
+	 * A WeightedAvg class with merge method.
+	 */
+	public static class WeightedAvgWithMerge extends WeightedAvg {
+		public void merge(WeightedAvgAccum acc, Iterable<WeightedAvgAccum> it) {
+			Iterator<WeightedAvgAccum> iter = it.iterator();
+			while (iter.hasNext()) {
+				WeightedAvgAccum a = iter.next();
+				acc.count += a.count;
+				acc.sum += a.sum;
+			}
+		}
+	}
+
+	/**
+	 * A WeightedAvg class with merge and reset method.
+	 */
+	public static class WeightedAvgWithMergeAndReset extends WeightedAvgWithMerge {
+		public void resetAccumulator(WeightedAvgAccum acc) {
+			acc.count = 0;
+			acc.sum = 0L;
+		}
+	}
+
+	/**
+	 * A WeightedAvg class with retract method.
+	 */
+	public static class WeightedAvgWithRetract extends WeightedAvg {
+		//Overloaded retract method
+		public void retract(WeightedAvgAccum accumulator, long iValue, int iWeight) {
+			accumulator.sum -= iValue * iWeight;
+			accumulator.count -= iWeight;
+		}
+
+		//Overloaded retract method
+		public void retract(WeightedAvgAccum accumulator, int iValue, int iWeight) {
+			accumulator.sum -= iValue * iWeight;
+			accumulator.count -= iWeight;
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ceaf5b61/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/utils/UserDefinedScalarFunctions.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/utils/UserDefinedScalarFunctions.java
b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/utils/UserDefinedScalarFunctions.java
index 1e5fabe..214dbea 100644
--- a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/utils/UserDefinedScalarFunctions.java
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/utils/UserDefinedScalarFunctions.java
@@ -15,25 +15,39 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.table.api.java.utils;
 
-import java.util.Arrays;
 import org.apache.flink.table.functions.ScalarFunction;
 
+import java.util.Arrays;
+
+/**
+ * Test scalar functions.
+ */
 public class UserDefinedScalarFunctions {
 
+	/**
+	 * Increment input.
+	 */
 	public static class JavaFunc0 extends ScalarFunction {
 		public long eval(Long l) {
 			return l + 1;
 		}
 	}
 
+	/**
+	 * Concatenate inputs as strings.
+	 */
 	public static class JavaFunc1 extends ScalarFunction {
 		public String eval(Integer a, int b,  Long c) {
 			return a + " and " + b + " and " + c;
 		}
 	}
 
+	/**
+	 * Append product to string.
+	 */
 	public static class JavaFunc2 extends ScalarFunction {
 		public String eval(String s, Integer... a) {
 			int m = 1;
@@ -44,6 +58,9 @@ public class UserDefinedScalarFunctions {
 		}
 	}
 
+	/**
+	 * Test overloading.
+	 */
 	public static class JavaFunc3 extends ScalarFunction {
 		public int eval(String a, int... b) {
 			return b.length;
@@ -54,6 +71,9 @@ public class UserDefinedScalarFunctions {
 		}
 	}
 
+	/**
+	 * Concatenate arrays as strings.
+	 */
 	public static class JavaFunc4 extends ScalarFunction {
 		public String eval(Integer[] a, String[] b) {
 			return Arrays.toString(a) + " and " + Arrays.toString(b);

http://git-wip-us.apache.org/repos/asf/flink/blob/ceaf5b61/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/utils/UserDefinedTableFunctions.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/utils/UserDefinedTableFunctions.java
b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/utils/UserDefinedTableFunctions.java
index 3af8646..63c07ed 100644
--- a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/utils/UserDefinedTableFunctions.java
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/utils/UserDefinedTableFunctions.java
@@ -15,12 +15,19 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.table.api.java.utils;
 
 import org.apache.flink.table.functions.TableFunction;
 
+/**
+ * Test functions.
+ */
 public class UserDefinedTableFunctions {
 
+	/**
+	 * Emit inputs as long.
+	 */
 	public static class JavaTableFunc0 extends TableFunction<Long> {
 		public void eval(Integer a, Long b, Long c) {
 			collect(a.longValue());

http://git-wip-us.apache.org/repos/asf/flink/blob/ceaf5b61/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/ExplainTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/ExplainTest.scala
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/ExplainTest.scala
index 1a6b314..10af4d7 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/ExplainTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/ExplainTest.scala
@@ -42,7 +42,7 @@ class ExplainTest
     val result = tEnv.explain(table).replaceAll("\\r\\n", "\n")
     val source = scala.io.Source.fromFile(testFilePath +
       "../../src/test/scala/resources/testFilter0.out").mkString.replaceAll("\\r\\n", "\n")
-    assertEquals(result, source)
+    assertEquals(source, result)
   }
 
   @Test
@@ -57,7 +57,7 @@ class ExplainTest
     val result = tEnv.explain(table, true).replaceAll("\\r\\n", "\n")
     val source = scala.io.Source.fromFile(testFilePath +
       "../../src/test/scala/resources/testFilter1.out").mkString.replaceAll("\\r\\n", "\n")
-    assertEquals(result, source)
+    assertEquals(source, result)
   }
 
   @Test
@@ -102,7 +102,7 @@ class ExplainTest
     val result = tEnv.explain(table).replaceAll("\\r\\n", "\n")
     val source = scala.io.Source.fromFile(testFilePath +
       "../../src/test/scala/resources/testUnion0.out").mkString.replaceAll("\\r\\n", "\n")
-    assertEquals(result, source)
+    assertEquals(source, result)
   }
 
   @Test
@@ -117,7 +117,7 @@ class ExplainTest
     val result = tEnv.explain(table, true).replaceAll("\\r\\n", "\n")
     val source = scala.io.Source.fromFile(testFilePath +
       "../../src/test/scala/resources/testUnion1.out").mkString.replaceAll("\\r\\n", "\n")
-    assertEquals(result, source)
+    assertEquals(source, result)
   }
 
 }


Mime
View raw message