flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [08/17] flink git commit: [FLINK-4728] [core, optimizer] Replace reference equality with object equality
Date Wed, 05 Oct 2016 22:16:58 GMT
[FLINK-4728] [core,optimizer] Replace reference equality with object equality

Some cases of testing Integer equality using == rather than
Integer.equals(Integer), and some additional cleanup.

This closes #2582


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9206b483
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9206b483
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9206b483

Branch: refs/heads/master
Commit: 9206b483b68bb41195bdf2da4f0b9c2de517c031
Parents: 10a42f9
Author: Greg Hogan <code@greghogan.com>
Authored: Mon Oct 3 13:59:57 2016 -0400
Committer: Stephan Ewen <sewen@apache.org>
Committed: Wed Oct 5 19:36:13 2016 +0200

----------------------------------------------------------------------
 .../flink/api/common/operators/Ordering.java    |  6 ++---
 .../api/common/operators/util/FieldList.java    |  6 ++---
 .../api/common/io/DelimitedInputFormatTest.java |  1 -
 .../api/common/io/EnumerateNestedFilesTest.java |  1 -
 .../api/common/io/FileInputFormatTest.java      |  2 --
 .../base/OuterJoinOperatorBaseTest.java         | 13 ++++-----
 .../typeutils/base/BigDecComparatorTest.java    |  2 +-
 .../typeutils/base/BigDecSerializerTest.java    |  1 -
 .../typeutils/base/DoubleSerializerTest.java    |  9 +++----
 .../typeutils/base/SqlDateComparatorTest.java   |  2 +-
 .../api/java/typeutils/EitherTypeInfoTest.java  |  2 +-
 .../typeutils/runtime/EitherSerializerTest.java |  2 +-
 .../runtime/TupleComparatorTTT1Test.java        |  5 ++--
 .../apache/flink/types/CopyableValueTest.java   |  6 ++---
 .../java/org/apache/flink/types/RecordTest.java |  2 +-
 .../api/java/functions/FunctionAnnotation.java  | 12 ++++-----
 .../api/java/operators/PartitionOperator.java   |  2 +-
 .../apache/flink/api/java/sca/TaggedValue.java  |  4 +--
 .../java/functions/SemanticPropUtilTest.java    |  7 +++--
 .../api/java/operator/CrossOperatorTest.java    | 16 +++++------
 .../api/java/operator/DistinctOperatorTest.java |  4 +--
 .../flink/api/java/operator/GroupingTest.java   |  4 +--
 .../api/java/operator/JoinOperatorTest.java     | 21 +++++++--------
 .../api/java/operator/MaxByOperatorTest.java    |  2 +-
 .../api/java/operator/MinByOperatorTest.java    |  2 +-
 .../api/java/operator/ReduceOperatorTest.java   |  3 ---
 .../api/java/operator/SortPartitionTest.java    |  4 +--
 .../translation/DistinctTranslationTest.java    |  2 +-
 .../api/java/sca/UdfAnalyzerExamplesTest.java   |  2 +-
 .../flink/api/java/sca/UdfAnalyzerTest.java     |  2 +-
 .../flink/optimizer/dag/BulkIterationNode.java  | 18 ++++++-------
 .../optimizer/dag/WorksetIterationNode.java     |  2 +-
 .../dataproperties/GlobalProperties.java        |  8 +++---
 .../apache/flink/optimizer/plan/PlanNode.java   | 16 +++++------
 .../plandump/PlanJSONDumpGenerator.java         | 28 ++++++++++----------
 .../optimizer/AdditionalOperatorsTest.java      |  2 +-
 .../optimizer/BranchingPlansCompilerTest.java   |  2 +-
 .../optimizer/PartitioningReusageTest.java      |  2 +-
 .../SemanticPropertiesAPIToPlanTest.java        |  2 +-
 .../optimizer/dag/GroupCombineNodeTest.java     |  1 -
 .../GlobalPropertiesMatchingTest.java           | 14 +++++-----
 41 files changed, 116 insertions(+), 126 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9206b483/flink-core/src/main/java/org/apache/flink/api/common/operators/Ordering.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/Ordering.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/Ordering.java
index 7332698..afc659a 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/Ordering.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/Ordering.java
@@ -18,12 +18,12 @@
 
 package org.apache.flink.api.common.operators;
 
-import java.util.ArrayList;
-
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.operators.util.FieldList;
 import org.apache.flink.api.common.operators.util.FieldSet;
 
+import java.util.ArrayList;
+
 /**
  * This class represents an ordering on a set of fields. It specifies the fields and order direction
  * (ascending, descending).
@@ -145,7 +145,7 @@ public class Ordering implements Cloneable {
 		}
 		
 		for (int i = 0; i < this.indexes.size(); i++) {
-			if (this.indexes.get(i) != otherOrdering.indexes.get(i)) {
+			if (!this.indexes.get(i).equals(otherOrdering.indexes.get(i))) {
 				return false;
 			}
 			

http://git-wip-us.apache.org/repos/asf/flink/blob/9206b483/flink-core/src/main/java/org/apache/flink/api/common/operators/util/FieldList.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/util/FieldList.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/util/FieldList.java
index 15a993c..4cbde56 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/util/FieldList.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/util/FieldList.java
@@ -18,12 +18,12 @@
 
 package org.apache.flink.api.common.operators.util;
 
+import org.apache.flink.annotation.Internal;
+
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 
-import org.apache.flink.annotation.Internal;
-
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
@@ -166,7 +166,7 @@ public class FieldList extends FieldSet {
 			return false;
 		} else {
 			for (int i = 0; i < this.size(); i++) {
-				if (this.get(i) != list.get(i)) {
+				if (!this.get(i).equals(list.get(i))) {
 					return false;
 				}
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/9206b483/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatTest.java b/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatTest.java
index 8a31099..93d5f9f 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatTest.java
@@ -38,7 +38,6 @@ import java.util.Arrays;
 import java.util.List;
 
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.core.fs.FileInputSplit;
 import org.apache.flink.core.fs.Path;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9206b483/flink-core/src/test/java/org/apache/flink/api/common/io/EnumerateNestedFilesTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/io/EnumerateNestedFilesTest.java b/flink-core/src/test/java/org/apache/flink/api/common/io/EnumerateNestedFilesTest.java
index 1076338..3ac17db 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/io/EnumerateNestedFilesTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/io/EnumerateNestedFilesTest.java
@@ -23,7 +23,6 @@ import java.io.IOException;
 
 import org.apache.flink.api.common.io.statistics.BaseStatistics;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.core.fs.FileInputSplit;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.testutils.TestFileUtils;

http://git-wip-us.apache.org/repos/asf/flink/blob/9206b483/flink-core/src/test/java/org/apache/flink/api/common/io/FileInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/io/FileInputFormatTest.java b/flink-core/src/test/java/org/apache/flink/api/common/io/FileInputFormatTest.java
index f66bd76..3e5d309 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/io/FileInputFormatTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/io/FileInputFormatTest.java
@@ -40,8 +40,6 @@ import java.net.URI;
 import java.util.Arrays;
 import java.util.Collections;
 
-import static org.junit.Assert.*;
-
 /**
  * Tests for the FileInputFormat
  */

http://git-wip-us.apache.org/repos/asf/flink/blob/9206b483/flink-core/src/test/java/org/apache/flink/api/common/operators/base/OuterJoinOperatorBaseTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/OuterJoinOperatorBaseTest.java b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/OuterJoinOperatorBaseTest.java
index 69159f2..683e164 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/OuterJoinOperatorBaseTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/OuterJoinOperatorBaseTest.java
@@ -18,10 +18,6 @@
 
 package org.apache.flink.api.common.operators.base;
 
-import java.io.Serializable;
-import java.util.Arrays;
-import java.util.List;
-
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.FlatJoinFunction;
 import org.apache.flink.api.common.operators.BinaryOperatorInformation;
@@ -29,6 +25,11 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.util.Collector;
 import org.junit.Test;
 
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
 import static org.junit.Assert.assertEquals;
 
 @SuppressWarnings("serial")
@@ -67,7 +68,7 @@ public class OuterJoinOperatorBaseTest implements Serializable {
 
 	@Test
 	public void testFullOuterJoinWithEmptyLeftInput() throws Exception {
-		final List<String> leftInput = Arrays.asList();
+		final List<String> leftInput = Collections.emptyList();
 		final List<String> rightInput = Arrays.asList("foo", "bar", "foobar");
 		baseOperator.setOuterJoinType(OuterJoinOperatorBase.OuterJoinType.FULL);
 		List<String> expected = Arrays.asList("null,bar", "null,foo", "null,foobar");
@@ -77,7 +78,7 @@ public class OuterJoinOperatorBaseTest implements Serializable {
 	@Test
 	public void testFullOuterJoinWithEmptyRightInput() throws Exception {
 		final List<String> leftInput = Arrays.asList("foo", "bar", "foobar");
-		final List<String> rightInput = Arrays.asList();
+		final List<String> rightInput = Collections.emptyList();
 		baseOperator.setOuterJoinType(OuterJoinOperatorBase.OuterJoinType.FULL);
 		List<String> expected = Arrays.asList("bar,null", "foo,null", "foobar,null");
 		testOuterJoin(leftInput, rightInput, expected);

http://git-wip-us.apache.org/repos/asf/flink/blob/9206b483/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/BigDecComparatorTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/BigDecComparatorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/BigDecComparatorTest.java
index ca5819e..5bfdca2 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/BigDecComparatorTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/BigDecComparatorTest.java
@@ -19,7 +19,7 @@
 package org.apache.flink.api.common.typeutils.base;
 
 import java.math.BigDecimal;
-import java.math.BigInteger;
+
 import org.apache.flink.api.common.typeutils.ComparatorTestBase;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;

http://git-wip-us.apache.org/repos/asf/flink/blob/9206b483/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/BigDecSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/BigDecSerializerTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/BigDecSerializerTest.java
index fd3cbd5..ff375d1 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/BigDecSerializerTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/BigDecSerializerTest.java
@@ -19,7 +19,6 @@
 package org.apache.flink.api.common.typeutils.base;
 
 import java.math.BigDecimal;
-import java.math.BigInteger;
 import java.util.Random;
 import org.apache.flink.api.common.typeutils.SerializerTestBase;
 import org.apache.flink.api.common.typeutils.TypeSerializer;

http://git-wip-us.apache.org/repos/asf/flink/blob/9206b483/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/DoubleSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/DoubleSerializerTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/DoubleSerializerTest.java
index 543c0e9..2e5de9f 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/DoubleSerializerTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/DoubleSerializerTest.java
@@ -18,11 +18,10 @@
 
 package org.apache.flink.api.common.typeutils.base;
 
-import java.util.Random;
-
 import org.apache.flink.api.common.typeutils.SerializerTestBase;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.base.DoubleSerializer;
+
+import java.util.Random;
 /**
  * A test for the {@link DoubleSerializer}.
  */
@@ -46,8 +45,8 @@ public class DoubleSerializerTest extends SerializerTestBase<Double> {
 	@Override
 	protected Double[] getTestData() {
 		Random rnd = new Random(874597969123412341L);
-		Double rndDouble = rnd.nextDouble() * Double.MAX_VALUE;
-		
+		double rndDouble = rnd.nextDouble() * Double.MAX_VALUE;
+
 		return new Double[] {Double.valueOf(0), Double.valueOf(1), Double.valueOf(-1),
 							Double.valueOf(Double.MAX_VALUE), Double.valueOf(Double.MIN_VALUE),
 							Double.valueOf(rndDouble), Double.valueOf(-rndDouble),

http://git-wip-us.apache.org/repos/asf/flink/blob/9206b483/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/SqlDateComparatorTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/SqlDateComparatorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/SqlDateComparatorTest.java
index cedefe7..7133b01 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/SqlDateComparatorTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/SqlDateComparatorTest.java
@@ -19,7 +19,7 @@
 package org.apache.flink.api.common.typeutils.base;
 
 import java.sql.Date;
-import java.sql.Timestamp;
+
 import org.apache.flink.api.common.typeutils.ComparatorTestBase;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;

http://git-wip-us.apache.org/repos/asf/flink/blob/9206b483/flink-core/src/test/java/org/apache/flink/api/java/typeutils/EitherTypeInfoTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/EitherTypeInfoTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/EitherTypeInfoTest.java
index 78c10b1..a3d4568 100644
--- a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/EitherTypeInfoTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/EitherTypeInfoTest.java
@@ -32,7 +32,7 @@ public class EitherTypeInfoTest extends TestLogger {
 
 	Either<Integer, String> intEither = Either.Left(1);
 	Either<Integer, String> stringEither = Either.Right("boo");
-	Either<Integer, Tuple2<Double, Long>> tuple2Either = new Right<>(new Tuple2<Double, Long>(42.0, 2l));
+	Either<Integer, Tuple2<Double, Long>> tuple2Either = new Right<>(new Tuple2<Double, Long>(42.0, 2L));
 
 	@Test
 	public void testEitherTypeEquality() {

http://git-wip-us.apache.org/repos/asf/flink/blob/9206b483/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializerTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializerTest.java
index d636d5e..acf0d2e 100644
--- a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializerTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializerTest.java
@@ -60,7 +60,7 @@ public class EitherSerializerTest {
 	public void testEitherWithTuple() {
 
 	Either<Tuple2<Long, Long>, Double>[] testData = new Either[] {
-			Either.Left(new Tuple2<>(2l, 9l)),
+			Either.Left(new Tuple2<>(2L, 9L)),
 			new Left<>(new Tuple2<>(Long.MIN_VALUE, Long.MAX_VALUE)),
 			new Right<>(32.0),
 			Right(Double.MIN_VALUE),

http://git-wip-us.apache.org/repos/asf/flink/blob/9206b483/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorTTT1Test.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorTTT1Test.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorTTT1Test.java
index cf73be2..9db5cd9 100644
--- a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorTTT1Test.java
+++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorTTT1Test.java
@@ -17,14 +17,11 @@
  */
 package org.apache.flink.api.java.typeutils.runtime;
 
-import static org.junit.Assert.assertEquals;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.base.DoubleComparator;
 import org.apache.flink.api.common.typeutils.base.DoubleSerializer;
-import org.apache.flink.api.common.typeutils.base.IntComparator;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
-import org.apache.flink.api.common.typeutils.base.LongComparator;
 import org.apache.flink.api.common.typeutils.base.LongSerializer;
 import org.apache.flink.api.common.typeutils.base.StringComparator;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
@@ -32,6 +29,8 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.typeutils.runtime.tuple.base.TupleComparatorTestBase;
 
+import static org.junit.Assert.assertEquals;
+
 public class TupleComparatorTTT1Test extends TupleComparatorTestBase<Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>> {
 
 	@SuppressWarnings("unchecked")

http://git-wip-us.apache.org/repos/asf/flink/blob/9206b483/flink-core/src/test/java/org/apache/flink/types/CopyableValueTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/types/CopyableValueTest.java b/flink-core/src/test/java/org/apache/flink/types/CopyableValueTest.java
index 76bdece..29dc436 100644
--- a/flink-core/src/test/java/org/apache/flink/types/CopyableValueTest.java
+++ b/flink-core/src/test/java/org/apache/flink/types/CopyableValueTest.java
@@ -18,10 +18,10 @@
 
 package org.apache.flink.types;
 
-import static org.junit.Assert.*;
-
 import org.junit.Test;
 
+import static org.junit.Assert.assertEquals;
+
 public class CopyableValueTest {
 
 	@Test
@@ -33,7 +33,7 @@ public class CopyableValueTest {
 			new DoubleValue(3.1415926535897932),
 			new FloatValue((float) 3.14159265),
 			new IntValue(42),
-			new LongValue(42l),
+			new LongValue(42L),
 			new NullValue(),
 			new ShortValue((short) 42),
 			new StringValue("QED")

http://git-wip-us.apache.org/repos/asf/flink/blob/9206b483/flink-core/src/test/java/org/apache/flink/types/RecordTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/types/RecordTest.java b/flink-core/src/test/java/org/apache/flink/types/RecordTest.java
index d7e3edd..a081e8e 100644
--- a/flink-core/src/test/java/org/apache/flink/types/RecordTest.java
+++ b/flink-core/src/test/java/org/apache/flink/types/RecordTest.java
@@ -285,7 +285,7 @@ public class RecordTest {
 			record.setNull(mask);
 	
 			for (int i = 0; i < 58; i++) {
-				if (((1l << i) & mask) != 0) {
+				if (((1L << i) & mask) != 0) {
 					assertTrue(record.getField(i, IntValue.class) == null);
 				}
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/9206b483/flink-java/src/main/java/org/apache/flink/api/java/functions/FunctionAnnotation.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/functions/FunctionAnnotation.java b/flink-java/src/main/java/org/apache/flink/api/java/functions/FunctionAnnotation.java
index d6fd913..f01d9d8 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/functions/FunctionAnnotation.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/functions/FunctionAnnotation.java
@@ -18,19 +18,19 @@
 
 package org.apache.flink.api.java.functions;
 
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.Public;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.InvalidProgramException;
+
 import java.lang.annotation.Annotation;
 import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
 import java.lang.annotation.RetentionPolicy;
 import java.lang.annotation.Target;
-import java.lang.annotation.Retention;
 import java.util.HashSet;
 import java.util.Set;
 
-import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.annotation.Public;
-import org.apache.flink.api.common.InvalidProgramException;
-
 /**
  * This class defines Java annotations for semantic assertions that can be added to Flink functions.
  * Semantic annotations can help the Flink optimizer to generate more efficient execution plans for Flink programs.

http://git-wip-us.apache.org/repos/asf/flink/blob/9206b483/flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionOperator.java
index 2ed0300..b3234b8 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionOperator.java
@@ -106,7 +106,7 @@ public class PartitionOperator<T> extends SingleInputOperator<T, T, PartitionOpe
 
 	/**
 	 * Sets the order of keys for range partitioning.
-	 * NOTE: Only valid for {@link PartitionMethod.RANGE}.
+	 * NOTE: Only valid for {@link PartitionMethod#RANGE}.
 	 *
 	 * @param orders array of orders for each specified partition key
 	 * @return The partitioneOperator with properly set orders for given keys

http://git-wip-us.apache.org/repos/asf/flink/blob/9206b483/flink-java/src/main/java/org/apache/flink/api/java/sca/TaggedValue.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/sca/TaggedValue.java b/flink-java/src/main/java/org/apache/flink/api/java/sca/TaggedValue.java
index 43450f7..cf0716d 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/sca/TaggedValue.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/sca/TaggedValue.java
@@ -44,7 +44,7 @@ public class TaggedValue extends BasicValue {
 		INPUT_1_ITERABLE, INPUT_2_ITERABLE, INPUT_1_ITERATOR, INPUT_2_ITERATOR, // input iterators
 		ITERATOR_TRUE_ASSUMPTION, // boolean value that is "true" at least once
 		NULL // null
-	};
+	}
 
 	public static enum Input {
 		INPUT_1(0), INPUT_2(1);
@@ -58,7 +58,7 @@ public class TaggedValue extends BasicValue {
 		public int getId() {
 			return id;
 		}
-	};
+	}
 
 	private Tag tag;
 	// only inputs can set this to true

http://git-wip-us.apache.org/repos/asf/flink/blob/9206b483/flink-java/src/test/java/org/apache/flink/api/java/functions/SemanticPropUtilTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/functions/SemanticPropUtilTest.java b/flink-java/src/test/java/org/apache/flink/api/java/functions/SemanticPropUtilTest.java
index 372c0f7..b845e73 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/functions/SemanticPropUtilTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/functions/SemanticPropUtilTest.java
@@ -19,20 +19,19 @@
 
 package org.apache.flink.api.java.functions;
 
+import org.apache.flink.api.common.operators.DualInputSemanticProperties;
 import org.apache.flink.api.common.operators.SemanticProperties;
 import org.apache.flink.api.common.operators.SemanticProperties.InvalidSemanticAnnotationException;
-import org.apache.flink.api.common.typeutils.CompositeType;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.junit.Assert;
-import org.apache.flink.api.common.operators.DualInputSemanticProperties;
 import org.apache.flink.api.common.operators.SingleInputSemanticProperties;
 import org.apache.flink.api.common.operators.util.FieldSet;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.CompositeType;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.tuple.Tuple4;
 import org.apache.flink.api.java.tuple.Tuple5;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.junit.Test;
 
 import static org.junit.Assert.assertTrue;

http://git-wip-us.apache.org/repos/asf/flink/blob/9206b483/flink-java/src/test/java/org/apache/flink/api/java/operator/CrossOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operator/CrossOperatorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operator/CrossOperatorTest.java
index 474563d..59d2d61 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/operator/CrossOperatorTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/operator/CrossOperatorTest.java
@@ -18,18 +18,18 @@
 
 package org.apache.flink.api.java.operator;
 
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.junit.Assert;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.tuple.Tuple5;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
 
 public class CrossOperatorTest {
 
@@ -450,7 +450,7 @@ public class CrossOperatorTest {
 		public long myLong;
 		public String myString;
 
-		public CustomType() {};
+		public CustomType() {}
 
 		public CustomType(int i, long l, String s) {
 			myInt = i;

http://git-wip-us.apache.org/repos/asf/flink/blob/9206b483/flink-java/src/test/java/org/apache/flink/api/java/operator/DistinctOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operator/DistinctOperatorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operator/DistinctOperatorTest.java
index 2e9bdf7..0bbeeb2 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/operator/DistinctOperatorTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/operator/DistinctOperatorTest.java
@@ -214,8 +214,8 @@ public class DistinctOperatorTest {
 		public long myLong;
 		public String myString;
 		
-		public CustomType() {};
-		
+		public CustomType() {}
+
 		public CustomType(int i, long l, String s) {
 			myInt = i;
 			myLong = l;

http://git-wip-us.apache.org/repos/asf/flink/blob/9206b483/flink-java/src/test/java/org/apache/flink/api/java/operator/GroupingTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operator/GroupingTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operator/GroupingTest.java
index 9220095..18b17b5 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/operator/GroupingTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/operator/GroupingTest.java
@@ -606,8 +606,8 @@ public class GroupingTest {
 		public String myString;
 		public Nest nested;
 		
-		public CustomType() {};
-		
+		public CustomType() {}
+
 		public CustomType(int i, long l, String s) {
 			myInt = i;
 			myLong = l;

http://git-wip-us.apache.org/repos/asf/flink/blob/9206b483/flink-java/src/test/java/org/apache/flink/api/java/operator/JoinOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operator/JoinOperatorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operator/JoinOperatorTest.java
index ae23382..0246f60 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/operator/JoinOperatorTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/operator/JoinOperatorTest.java
@@ -18,10 +18,6 @@
 
 package org.apache.flink.api.java.operator;
 
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.List;
-
 import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.functions.JoinFunction;
 import org.apache.flink.api.common.operators.SemanticProperties;
@@ -32,7 +28,6 @@ import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.functions.FunctionAnnotation;
 import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.operators.GroupReduceOperator;
 import org.apache.flink.api.java.operators.JoinOperator;
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.tuple.Tuple2;
@@ -44,6 +39,10 @@ import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
 import static org.junit.Assert.assertTrue;
 
 @SuppressWarnings("serial")
@@ -1167,7 +1166,7 @@ public class JoinOperatorTest {
 
 		public int myInt;
 
-		public Nested() {};
+		public Nested() {}
 
 		public Nested(int i, long l, String s) {
 			myInt = i;
@@ -1188,7 +1187,7 @@ public class JoinOperatorTest {
 		public String myString;
 		public Nested nest;
 		
-		public NestedCustomType() {};
+		public NestedCustomType() {}
 
 		public NestedCustomType(int i, long l, String s) {
 			myInt = i;
@@ -1214,8 +1213,8 @@ public class JoinOperatorTest {
 		public List<String> countries;
 		public Writable interfaceTest;
 		
-		public CustomType() {};
-		
+		public CustomType() {}
+
 		public CustomType(int i, long l, String s) {
 			myInt = i;
 			myLong = l;
@@ -1242,8 +1241,8 @@ public class JoinOperatorTest {
 		public String myString;
 		public Tuple2<Integer, String> intByString;
 		
-		public CustomTypeWithTuple() {};
-		
+		public CustomTypeWithTuple() {}
+
 		public CustomTypeWithTuple(int i, long l, String s) {
 			myInt = i;
 			myLong = l;

http://git-wip-us.apache.org/repos/asf/flink/blob/9206b483/flink-java/src/test/java/org/apache/flink/api/java/operator/MaxByOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operator/MaxByOperatorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operator/MaxByOperatorTest.java
index 2af8a8c..cbb7690 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/operator/MaxByOperatorTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/operator/MaxByOperatorTest.java
@@ -211,7 +211,7 @@ public class MaxByOperatorTest {
 		public String myString;
 
 		public CustomType() {
-		};
+		}
 
 		public CustomType(int i, long l, String s) {
 			myInt = i;

http://git-wip-us.apache.org/repos/asf/flink/blob/9206b483/flink-java/src/test/java/org/apache/flink/api/java/operator/MinByOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operator/MinByOperatorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operator/MinByOperatorTest.java
index 5d9c938..b9659c0 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/operator/MinByOperatorTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/operator/MinByOperatorTest.java
@@ -212,7 +212,7 @@ public class MinByOperatorTest {
 		public String myString;
 
 		public CustomType() {
-		};
+		}
 
 		public CustomType(int i, long l, String s) {
 			myInt = i;

http://git-wip-us.apache.org/repos/asf/flink/blob/9206b483/flink-java/src/test/java/org/apache/flink/api/java/operator/ReduceOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operator/ReduceOperatorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operator/ReduceOperatorTest.java
index dafc1f2..b7f7555 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/operator/ReduceOperatorTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/operator/ReduceOperatorTest.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.api.java.operator;
 
-import org.apache.flink.api.common.functions.GroupReduceFunction;
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.operators.SemanticProperties;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
@@ -26,12 +25,10 @@ import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.functions.FunctionAnnotation;
 import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.operators.GroupReduceOperator;
 import org.apache.flink.api.java.operators.ReduceOperator;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple5;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.apache.flink.util.Collector;
 import org.junit.Test;
 
 import java.util.ArrayList;

http://git-wip-us.apache.org/repos/asf/flink/blob/9206b483/flink-java/src/test/java/org/apache/flink/api/java/operator/SortPartitionTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operator/SortPartitionTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operator/SortPartitionTest.java
index 3540e6a..c3307ec 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/operator/SortPartitionTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/operator/SortPartitionTest.java
@@ -263,8 +263,8 @@ public class SortPartitionTest {
 		public String myString;
 		public Nest nested;
 		
-		public CustomType() {};
-		
+		public CustomType() {}
+
 		public CustomType(int i, long l, String s) {
 			myInt = i;
 			myLong = l;

http://git-wip-us.apache.org/repos/asf/flink/blob/9206b483/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DistinctTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DistinctTranslationTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DistinctTranslationTest.java
index cbdac4a..27c7b2f 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DistinctTranslationTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DistinctTranslationTest.java
@@ -261,7 +261,7 @@ public class DistinctTranslationTest {
 		private static final long serialVersionUID = 1L;
 		public int myInt;
 
-		public CustomType() {};
+		public CustomType() {}
 
 		public CustomType(int i) {
 			myInt = i;

http://git-wip-us.apache.org/repos/asf/flink/blob/9206b483/flink-java/src/test/java/org/apache/flink/api/java/sca/UdfAnalyzerExamplesTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/sca/UdfAnalyzerExamplesTest.java b/flink-java/src/test/java/org/apache/flink/api/java/sca/UdfAnalyzerExamplesTest.java
index 01dc070..72a27b1 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/sca/UdfAnalyzerExamplesTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/sca/UdfAnalyzerExamplesTest.java
@@ -204,7 +204,7 @@ public class UdfAnalyzerExamplesTest {
 				edge = edges.next();
 				Integer otherVertex = edge.getSecondVertex();
 				// collect unique vertices
-				if(!otherVertices.contains(otherVertex) && otherVertex != groupVertex) {
+				if(!otherVertices.contains(otherVertex) && !otherVertex.equals(groupVertex)) {
 					this.otherVertices.add(otherVertex);
 				}
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/9206b483/flink-java/src/test/java/org/apache/flink/api/java/sca/UdfAnalyzerTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/sca/UdfAnalyzerTest.java b/flink-java/src/test/java/org/apache/flink/api/java/sca/UdfAnalyzerTest.java
index ac35793..c371082 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/sca/UdfAnalyzerTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/sca/UdfAnalyzerTest.java
@@ -751,7 +751,7 @@ public class UdfAnalyzerTest {
 		}
 
 		private MyPojo recursiveFunction(MyPojo value) {
-			if (value.field == "xyz") {
+			if (value.field.equals("xyz")) {
 				value.field = value.field + "x";
 				return recursiveFunction(value);
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/9206b483/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/BulkIterationNode.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/BulkIterationNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/BulkIterationNode.java
index 556e2e3..bf3c4ed 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/BulkIterationNode.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/BulkIterationNode.java
@@ -18,19 +18,12 @@
 
 package org.apache.flink.optimizer.dag;
 
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Set;
-
 import org.apache.flink.api.common.ExecutionMode;
 import org.apache.flink.api.common.operators.SemanticProperties;
 import org.apache.flink.api.common.operators.SemanticProperties.EmptySemanticProperties;
 import org.apache.flink.api.common.operators.base.BulkIterationBase;
 import org.apache.flink.optimizer.CompilerException;
 import org.apache.flink.optimizer.DataStatistics;
-import org.apache.flink.optimizer.traversals.InterestingPropertyVisitor;
 import org.apache.flink.optimizer.costs.CostEstimator;
 import org.apache.flink.optimizer.dag.WorksetIterationNode.SingleRootJoiner;
 import org.apache.flink.optimizer.dataproperties.GlobalProperties;
@@ -45,12 +38,19 @@ import org.apache.flink.optimizer.plan.BulkPartialSolutionPlanNode;
 import org.apache.flink.optimizer.plan.Channel;
 import org.apache.flink.optimizer.plan.NamedChannel;
 import org.apache.flink.optimizer.plan.PlanNode;
-import org.apache.flink.optimizer.plan.SingleInputPlanNode;
 import org.apache.flink.optimizer.plan.PlanNode.FeedbackPropertiesMeetRequirementsReport;
+import org.apache.flink.optimizer.plan.SingleInputPlanNode;
+import org.apache.flink.optimizer.traversals.InterestingPropertyVisitor;
 import org.apache.flink.optimizer.util.NoOpUnaryUdfOp;
 import org.apache.flink.runtime.operators.DriverStrategy;
 import org.apache.flink.util.Visitor;
 
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
 /**
  * A node in the optimizer's program representation for a bulk iteration.
  */
@@ -314,7 +314,7 @@ public class BulkIterationNode extends SingleInputNode implements IterationNode
 				
 				FeedbackPropertiesMeetRequirementsReport report = candidate.checkPartialSolutionPropertiesMet(pspn, atEndGlobal, atEndLocal);
 				if (report == FeedbackPropertiesMeetRequirementsReport.NO_PARTIAL_SOLUTION) {
-					; // depends only through broadcast variable on the partial solution
+					// depends only through broadcast variable on the partial solution
 				}
 				else if (report == FeedbackPropertiesMeetRequirementsReport.NOT_MET) {
 					// attach a no-op node through which we create the properties of the original input

http://git-wip-us.apache.org/repos/asf/flink/blob/9206b483/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/WorksetIterationNode.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/WorksetIterationNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/WorksetIterationNode.java
index 7969a94..d7ccaca 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/WorksetIterationNode.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/WorksetIterationNode.java
@@ -361,7 +361,7 @@ public class WorksetIterationNode extends TwoInputNode implements IterationNode
 																							atEndGlobal, atEndLocal);
 
 				if (report == FeedbackPropertiesMeetRequirementsReport.NO_PARTIAL_SOLUTION) {
-					; // depends only through broadcast variable on the workset solution
+					// depends only through broadcast variable on the workset solution
 				}
 				else if (report == FeedbackPropertiesMeetRequirementsReport.NOT_MET) {
 					// attach a no-op node through which we create the properties of the original input

http://git-wip-us.apache.org/repos/asf/flink/blob/9206b483/flink-optimizer/src/main/java/org/apache/flink/optimizer/dataproperties/GlobalProperties.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dataproperties/GlobalProperties.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dataproperties/GlobalProperties.java
index e64782f..654b054 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dataproperties/GlobalProperties.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dataproperties/GlobalProperties.java
@@ -18,9 +18,6 @@
 
 package org.apache.flink.optimizer.dataproperties;
 
-import java.util.HashSet;
-import java.util.Set;
-
 import org.apache.flink.api.common.ExecutionMode;
 import org.apache.flink.api.common.distributions.DataDistribution;
 import org.apache.flink.api.common.functions.Partitioner;
@@ -37,6 +34,9 @@ import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.HashSet;
+import java.util.Set;
+
 /**
  * This class represents global properties of the data at a certain point in the plan.
  * Global properties are properties that describe data across different partitions, such as
@@ -224,7 +224,7 @@ public class GlobalProperties implements Cloneable {
 			}
 			
 			for (int i = 0; i < this.ordering.getNumberOfFields(); i++) {
-				if (this.ordering.getFieldNumber(i) != o.getFieldNumber(i)) {
+				if (!this.ordering.getFieldNumber(i).equals(o.getFieldNumber(i))) {
 					return false;
 				}
 				

http://git-wip-us.apache.org/repos/asf/flink/blob/9206b483/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/PlanNode.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/PlanNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/PlanNode.java
index 9505a57..b30fa36 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/PlanNode.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/PlanNode.java
@@ -18,12 +18,6 @@
 
 package org.apache.flink.optimizer.plan;
 
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
 import org.apache.flink.api.common.operators.Operator;
 import org.apache.flink.api.common.operators.util.FieldSet;
 import org.apache.flink.optimizer.CompilerException;
@@ -39,6 +33,12 @@ import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
 import org.apache.flink.runtime.operators.util.LocalStrategy;
 import org.apache.flink.util.Visitable;
 
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
 /**
  * The representation of a data exchange between to operators. The data exchange can realize a shipping strategy, 
  * which established global properties, and a local strategy, which establishes local properties.
@@ -547,7 +547,7 @@ public abstract class PlanNode implements Visitable<PlanNode>, DumpableNode<Plan
 	// --------------------------------------------------------------------------------------------
 	
 	public static enum SourceAndDamReport {
-		NOT_FOUND, FOUND_SOURCE, FOUND_SOURCE_AND_DAM;
+		NOT_FOUND, FOUND_SOURCE, FOUND_SOURCE_AND_DAM
 	}
 	
 	
@@ -568,6 +568,6 @@ public abstract class PlanNode implements Visitable<PlanNode>, DumpableNode<Plan
 		MET,
 		
 		/** Indicates that the question whether the properties are met has been determined false */
-		NOT_MET;
+		NOT_MET
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9206b483/flink-optimizer/src/main/java/org/apache/flink/optimizer/plandump/PlanJSONDumpGenerator.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plandump/PlanJSONDumpGenerator.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plandump/PlanJSONDumpGenerator.java
index 536e425..16bb47b 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plandump/PlanJSONDumpGenerator.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plandump/PlanJSONDumpGenerator.java
@@ -18,19 +18,6 @@
 
 package org.apache.flink.optimizer.plandump;
 
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.PrintWriter;
-import java.io.StringWriter;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Locale;
-import java.util.Map;
-
 import org.apache.commons.lang3.StringEscapeUtils;
 import org.apache.flink.api.common.operators.CompilerHints;
 import org.apache.flink.optimizer.CompilerException;
@@ -55,6 +42,19 @@ import org.apache.flink.runtime.operators.DriverStrategy;
 import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
 import org.apache.flink.util.StringUtils;
 
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+
 
 public class PlanJSONDumpGenerator {
 	
@@ -160,7 +160,7 @@ public class PlanJSONDumpGenerator {
 			//to set first to false!
 			if (visit(child, writer, first)) {
 				first = false;
-			};
+			}
 		}
 		
 		// check if this node should be skipped from the dump

http://git-wip-us.apache.org/repos/asf/flink/blob/9206b483/flink-optimizer/src/test/java/org/apache/flink/optimizer/AdditionalOperatorsTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/AdditionalOperatorsTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/AdditionalOperatorsTest.java
index 656a323..b9b6d8a 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/AdditionalOperatorsTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/AdditionalOperatorsTest.java
@@ -77,7 +77,7 @@ public class AdditionalOperatorsTest extends CompilerTestBase {
 		DataSet<Long> set2 = env.generateSequence(0,1);
 
 		set1.crossWithHuge(set2).name("Cross")
-				.output(new DiscardingOutputFormat<Tuple2<Long, Long>>());;
+				.output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
 
 		try {
 			Plan plan = env.createProgramPlan();

http://git-wip-us.apache.org/repos/asf/flink/blob/9206b483/flink-optimizer/src/test/java/org/apache/flink/optimizer/BranchingPlansCompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/BranchingPlansCompilerTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/BranchingPlansCompilerTest.java
index 1f8904a..2b3136a 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/BranchingPlansCompilerTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/BranchingPlansCompilerTest.java
@@ -505,7 +505,7 @@ public class BranchingPlansCompilerTest extends CompilerTestBase {
 
 		loopRes.output(new DiscardingOutputFormat<Long>());
 		loopRes.map(new IdentityMapper<Long>())
-				.output(new DiscardingOutputFormat<Long>());;
+				.output(new DiscardingOutputFormat<Long>());
 
 		Plan plan = env.createProgramPlan();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9206b483/flink-optimizer/src/test/java/org/apache/flink/optimizer/PartitioningReusageTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/PartitioningReusageTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/PartitioningReusageTest.java
index e4cf1c8..ac4c090 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/PartitioningReusageTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/PartitioningReusageTest.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information

http://git-wip-us.apache.org/repos/asf/flink/blob/9206b483/flink-optimizer/src/test/java/org/apache/flink/optimizer/SemanticPropertiesAPIToPlanTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/SemanticPropertiesAPIToPlanTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/SemanticPropertiesAPIToPlanTest.java
index df13da8..fe79704 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/SemanticPropertiesAPIToPlanTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/SemanticPropertiesAPIToPlanTest.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information

http://git-wip-us.apache.org/repos/asf/flink/blob/9206b483/flink-optimizer/src/test/java/org/apache/flink/optimizer/dag/GroupCombineNodeTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/dag/GroupCombineNodeTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/dag/GroupCombineNodeTest.java
index f4776a0..f3e124b 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/dag/GroupCombineNodeTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/dag/GroupCombineNodeTest.java
@@ -21,7 +21,6 @@ package org.apache.flink.optimizer.dag;
 import org.apache.flink.api.common.operators.SemanticProperties;
 import org.apache.flink.api.common.operators.SingleInputSemanticProperties;
 import org.apache.flink.api.common.operators.base.GroupCombineOperatorBase;
-import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase;
 import org.apache.flink.api.common.operators.util.FieldSet;
 import org.apache.flink.configuration.Configuration;
 import org.junit.Test;

http://git-wip-us.apache.org/repos/asf/flink/blob/9206b483/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataproperties/GlobalPropertiesMatchingTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataproperties/GlobalPropertiesMatchingTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataproperties/GlobalPropertiesMatchingTest.java
index 52826d6..ddfa074 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataproperties/GlobalPropertiesMatchingTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataproperties/GlobalPropertiesMatchingTest.java
@@ -18,8 +18,6 @@
 
 package org.apache.flink.optimizer.dataproperties;
 
-import static org.junit.Assert.*;
-
 import org.apache.flink.api.common.functions.Partitioner;
 import org.apache.flink.api.common.operators.Order;
 import org.apache.flink.api.common.operators.Ordering;
@@ -28,6 +26,10 @@ import org.apache.flink.api.common.operators.util.FieldSet;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.junit.Test;
 
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
 public class GlobalPropertiesMatchingTest {
 
 	@Test
@@ -180,12 +182,12 @@ public class GlobalPropertiesMatchingTest {
 			assertFalse(req.isMetBy(gp3));
 
 			GlobalProperties gp4 = new GlobalProperties();
-			gp3.setAnyPartitioning(new FieldList(6, 1));
-			assertFalse(req.isMetBy(gp3));
+			gp4.setAnyPartitioning(new FieldList(6, 1));
+			assertFalse(req.isMetBy(gp4));
 
 			GlobalProperties gp5 = new GlobalProperties();
-			gp4.setAnyPartitioning(new FieldList(2));
-			assertFalse(req.isMetBy(gp4));
+			gp5.setAnyPartitioning(new FieldList(2));
+			assertFalse(req.isMetBy(gp5));
 		}
 
 		// match hash partitioning


Mime
View raw message