flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ches...@apache.org
Subject [10/11] flink git commit: [hotfix] [gelly] Explicit type can be replaced with <>
Date Mon, 07 Aug 2017 13:17:25 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegrees.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegrees.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegrees.java
index 57045a1..db87eb9 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegrees.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegrees.java
@@ -52,7 +52,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Edge<K, Tuple2<EV, Degrees>>> {
 			.join(vertexDegrees, JoinHint.REPARTITION_HASH_SECOND)
 			.where(1)
 			.equalTo(0)
-			.with(new JoinEdgeWithVertexDegree<K, EV, Degrees>())
+			.with(new JoinEdgeWithVertexDegree<>())
 				.setParallelism(parallelism)
 				.name("Edge target degrees");
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegrees.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegrees.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegrees.java
index 06a7fd2..a703789 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegrees.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegrees.java
@@ -93,18 +93,18 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Vertex<K, Degrees>> {
 			throws Exception {
 		// s, t, bitmask
 		DataSet<Tuple2<K, ByteValue>> vertexWithEdgeOrder = input.getEdges()
-			.flatMap(new EmitAndFlipEdge<K, EV>())
+			.flatMap(new EmitAndFlipEdge<>())
 				.setParallelism(parallelism)
 				.name("Emit and flip edge")
 			.groupBy(0, 1)
-			.reduceGroup(new ReduceBitmask<K>())
+			.reduceGroup(new ReduceBitmask<>())
 				.setParallelism(parallelism)
 				.name("Reduce bitmask");
 
 		// s, d(s)
 		DataSet<Vertex<K, Degrees>> vertexDegrees = vertexWithEdgeOrder
 			.groupBy(0)
-			.reduceGroup(new DegreeCount<K>())
+			.reduceGroup(new DegreeCount<>())
 				.setParallelism(parallelism)
 				.name("Degree count");
 
@@ -113,7 +113,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Vertex<K, Degrees>> {
 				.leftOuterJoin(vertexDegrees)
 				.where(0)
 				.equalTo(0)
-				.with(new JoinVertexWithVertexDegrees<K, VV>())
+				.with(new JoinVertexWithVertexDegrees<>())
 					.setParallelism(parallelism)
 					.name("Zero degree vertices");
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegree.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegree.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegree.java
index dc071cf..38c7995 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegree.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegree.java
@@ -84,14 +84,14 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Vertex<K, LongValue>> {
 		// t
 		DataSet<Vertex<K, LongValue>> targetIds = input
 			.getEdges()
-			.map(new MapEdgeToTargetId<K, EV>())
+			.map(new MapEdgeToTargetId<>())
 				.setParallelism(parallelism)
 				.name("Edge to target ID");
 
 		// t, d(t)
 		DataSet<Vertex<K, LongValue>> targetDegree = targetIds
 			.groupBy(0)
-			.reduce(new DegreeCount<K>())
+			.reduce(new DegreeCount<>())
 			.setCombineHint(CombineHint.HASH)
 				.setParallelism(parallelism)
 				.name("Degree count");
@@ -101,7 +101,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Vertex<K, LongValue>> {
 				.leftOuterJoin(targetDegree)
 				.where(0)
 				.equalTo(0)
-				.with(new JoinVertexWithVertexDegree<K, VV>())
+				.with(new JoinVertexWithVertexDegree<>())
 					.setParallelism(parallelism)
 					.name("Zero degree vertices");
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegree.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegree.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegree.java
index 4a4689b..ef9c781 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegree.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegree.java
@@ -84,14 +84,14 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Vertex<K, LongValue>> {
 		// s
 		DataSet<Vertex<K, LongValue>> sourceIds = input
 			.getEdges()
-			.map(new MapEdgeToSourceId<K, EV>())
+			.map(new MapEdgeToSourceId<>())
 				.setParallelism(parallelism)
 				.name("Edge to source ID");
 
 		// s, d(s)
 		DataSet<Vertex<K, LongValue>> sourceDegree = sourceIds
 			.groupBy(0)
-			.reduce(new DegreeCount<K>())
+			.reduce(new DegreeCount<>())
 			.setCombineHint(CombineHint.HASH)
 				.setParallelism(parallelism)
 				.name("Degree count");
@@ -101,7 +101,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Vertex<K, LongValue>> {
 				.leftOuterJoin(sourceDegree)
 				.where(0)
 				.equalTo(0)
-				.with(new JoinVertexWithVertexDegree<K, VV>())
+				.with(new JoinVertexWithVertexDegree<>())
 					.setParallelism(parallelism)
 					.name("Zero degree vertices");
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeDegreePair.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeDegreePair.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeDegreePair.java
index ff4285f..6825295 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeDegreePair.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeDegreePair.java
@@ -89,7 +89,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Edge<K, Tuple3<EV, LongValue, L
 			.join(vertexDegrees, JoinHint.REPARTITION_HASH_SECOND)
 			.where(1)
 			.equalTo(0)
-			.with(new JoinEdgeDegreeWithVertexDegree<K, EV, LongValue>())
+			.with(new JoinEdgeDegreeWithVertexDegree<>())
 				.setParallelism(parallelism)
 				.name("Edge target degree");
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeSourceDegree.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeSourceDegree.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeSourceDegree.java
index bd8ce3d..3fe05d9 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeSourceDegree.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeSourceDegree.java
@@ -81,7 +81,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Edge<K, Tuple2<EV, LongValue>>>
 			.join(vertexDegrees, JoinHint.REPARTITION_HASH_SECOND)
 			.where(0)
 			.equalTo(0)
-			.with(new JoinEdgeWithVertexDegree<K, EV, LongValue>())
+			.with(new JoinEdgeWithVertexDegree<>())
 				.setParallelism(parallelism)
 				.name("Edge source degree");
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeTargetDegree.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeTargetDegree.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeTargetDegree.java
index cb18d2c..6020ba3 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeTargetDegree.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeTargetDegree.java
@@ -81,7 +81,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Edge<K, Tuple2<EV, LongValue>>>
 			.join(vertexDegrees, JoinHint.REPARTITION_HASH_SECOND)
 			.where(1)
 			.equalTo(0)
-			.with(new JoinEdgeWithVertexDegree<K, EV, LongValue>())
+			.with(new JoinEdgeWithVertexDegree<>())
 				.setParallelism(parallelism)
 				.name("Edge target degree");
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegree.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegree.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegree.java
index d2fad18..fee58a3 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegree.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegree.java
@@ -103,7 +103,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Vertex<K, LongValue>> {
 	public DataSet<Vertex<K, LongValue>> runInternal(Graph<K, VV, EV> input)
 			throws Exception {
 		MapFunction<Edge<K, EV>, Vertex<K, LongValue>> mapEdgeToId = reduceOnTargetId.get() ?
-			new MapEdgeToTargetId<K, EV>() : new MapEdgeToSourceId<K, EV>();
+			new MapEdgeToTargetId<>() : new MapEdgeToSourceId<>();
 
 		// v
 		DataSet<Vertex<K, LongValue>> vertexIds = input
@@ -115,7 +115,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Vertex<K, LongValue>> {
 		// v, deg(v)
 		DataSet<Vertex<K, LongValue>> degree = vertexIds
 			.groupBy(0)
-			.reduce(new DegreeCount<K>())
+			.reduce(new DegreeCount<>())
 			.setCombineHint(CombineHint.HASH)
 				.setParallelism(parallelism)
 				.name("Degree count");
@@ -126,7 +126,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Vertex<K, LongValue>> {
 				.leftOuterJoin(degree)
 				.where(0)
 				.equalTo(0)
-				.with(new JoinVertexWithVertexDegree<K, VV>())
+				.with(new JoinVertexWithVertexDegree<>())
 					.setParallelism(parallelism)
 					.name("Zero degree vertices");
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/filter/undirected/MaximumDegree.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/filter/undirected/MaximumDegree.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/filter/undirected/MaximumDegree.java
index 522d39c..41dc64b 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/filter/undirected/MaximumDegree.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/filter/undirected/MaximumDegree.java
@@ -138,7 +138,7 @@ extends GraphAlgorithmWrappingGraph<K, VV, EV, K, VV, EV> {
 
 		// u, d(u) if d(u) > maximumDegree
 		DataSet<Tuple1<K>> highDegreeVertices = vertexDegree
-			.flatMap(new DegreeFilter<K>(maximumDegree))
+			.flatMap(new DegreeFilter<>(maximumDegree))
 				.setParallelism(parallelism)
 				.name("Filter high-degree vertices");
 
@@ -150,7 +150,7 @@ extends GraphAlgorithmWrappingGraph<K, VV, EV, K, VV, EV> {
 			.leftOuterJoin(highDegreeVertices, joinHint)
 			.where(0)
 			.equalTo(0)
-			.with(new ProjectVertex<K, VV>())
+			.with(new ProjectVertex<>())
 				.setParallelism(parallelism)
 				.name("Project low-degree vertices");
 
@@ -160,13 +160,13 @@ extends GraphAlgorithmWrappingGraph<K, VV, EV, K, VV, EV> {
 			.leftOuterJoin(highDegreeVertices, joinHint)
 			.where(reduceOnTargetId.get() ? 1 : 0)
 			.equalTo(0)
-				.with(new ProjectEdge<K, EV>())
+				.with(new ProjectEdge<>())
 				.setParallelism(parallelism)
 				.name("Project low-degree edges by " + (reduceOnTargetId.get() ? "target" : "source"))
 			.leftOuterJoin(highDegreeVertices, joinHint)
 			.where(reduceOnTargetId.get() ? 0 : 1)
 			.equalTo(0)
-			.with(new ProjectEdge<K, EV>())
+			.with(new ProjectEdge<>())
 				.setParallelism(parallelism)
 				.name("Project low-degree edges by " + (reduceOnTargetId.get() ? "source" : "target"));
 

http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/directed/Simplify.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/directed/Simplify.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/directed/Simplify.java
index 511840a..0d4fa1e 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/directed/Simplify.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/directed/Simplify.java
@@ -41,7 +41,7 @@ extends GraphAlgorithmWrappingGraph<K, VV, EV, K, VV, EV> {
 		// Edges
 		DataSet<Edge<K, EV>> edges = input
 			.getEdges()
-			.filter(new RemoveSelfLoops<K, EV>())
+			.filter(new RemoveSelfLoops<>())
 				.setParallelism(parallelism)
 				.name("Remove self-loops")
 			.distinct(0, 1)

http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/undirected/Simplify.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/undirected/Simplify.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/undirected/Simplify.java
index 21db233..f00a162 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/undirected/Simplify.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/undirected/Simplify.java
@@ -71,7 +71,7 @@ extends GraphAlgorithmWrappingGraph<K, VV, EV, K, VV, EV> {
 		// Edges
 		DataSet<Edge<K, EV>> edges = input
 			.getEdges()
-			.flatMap(new SymmetrizeAndRemoveSelfLoops<K, EV>(clipAndFlip))
+			.flatMap(new SymmetrizeAndRemoveSelfLoops<>(clipAndFlip))
 				.setParallelism(parallelism)
 				.name("Remove self-loops")
 			.distinct(0, 1)

http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/Translate.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/Translate.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/Translate.java
index 4cb4e01..6dcf766 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/Translate.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/Translate.java
@@ -88,7 +88,7 @@ public class Translate {
 		TupleTypeInfo<Vertex<NEW, VV>> returnType = new TupleTypeInfo<>(vertexClass, newType, vertexValueType);
 
 		return vertices
-			.map(new TranslateVertexId<OLD, NEW, VV>(translator))
+			.map(new TranslateVertexId<>(translator))
 			.returns(returnType)
 				.setParallelism(parallelism)
 				.name("Translate vertex IDs");
@@ -172,7 +172,7 @@ public class Translate {
 		TupleTypeInfo<Edge<NEW, EV>> returnType = new TupleTypeInfo<>(edgeClass, newType, newType, edgeValueType);
 
 		return edges
-			.map(new TranslateEdgeId<OLD, NEW, EV>(translator))
+			.map(new TranslateEdgeId<>(translator))
 			.returns(returnType)
 				.setParallelism(parallelism)
 				.name("Translate edge IDs");
@@ -257,7 +257,7 @@ public class Translate {
 		TupleTypeInfo<Vertex<K, NEW>> returnType = new TupleTypeInfo<>(vertexClass, idType, newType);
 
 		return vertices
-			.map(new TranslateVertexValue<K, OLD, NEW>(translator))
+			.map(new TranslateVertexValue<>(translator))
 			.returns(returnType)
 				.setParallelism(parallelism)
 				.name("Translate vertex values");
@@ -341,7 +341,7 @@ public class Translate {
 		TupleTypeInfo<Edge<K, NEW>> returnType = new TupleTypeInfo<>(edgeClass, idType, idType, newType);
 
 		return edges
-			.map(new TranslateEdgeValue<K, OLD, NEW>(translator))
+			.map(new TranslateEdgeValue<>(translator))
 			.returns(returnType)
 				.setParallelism(parallelism)
 				.name("Translate edge values");

http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/bipartite/BipartiteGraph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/bipartite/BipartiteGraph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/bipartite/BipartiteGraph.java
index 029e2c4..97c93e2 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/bipartite/BipartiteGraph.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/bipartite/BipartiteGraph.java
@@ -128,7 +128,7 @@ public class BipartiteGraph<KT, KB, VVT, VVB, EV> {
 		DataSet<Edge<KT, Tuple2<EV, EV>>> newEdges = edges.join(edges)
 			.where(1)
 			.equalTo(1)
-			.with(new ProjectionTopSimple<KT, KB, EV>())
+			.with(new ProjectionTopSimple<>())
 				.name("Simple top projection");
 
 		return Graph.fromDataSet(topVertices, newEdges, context);
@@ -172,7 +172,7 @@ public class BipartiteGraph<KT, KB, VVT, VVB, EV> {
 		DataSet<Edge<KB, Tuple2<EV, EV>>> newEdges =  edges.join(edges)
 			.where(0)
 			.equalTo(0)
-			.with(new ProjectionBottomSimple<KT, KB, EV>())
+			.with(new ProjectionBottomSimple<>())
 			.name("Simple bottom projection");
 
 		return Graph.fromDataSet(bottomVertices, newEdges, context);
@@ -218,7 +218,7 @@ public class BipartiteGraph<KT, KB, VVT, VVB, EV> {
 		DataSet<Edge<KT, Projection<KB, VVB, VVT, EV>>> newEdges = edgesWithVertices.join(edgesWithVertices)
 			.where(1)
 			.equalTo(1)
-			.with(new ProjectionTopFull<KT, KB, EV, VVT, VVB>())
+			.with(new ProjectionTopFull<>())
 				.name("Full top projection");
 
 		return Graph.fromDataSet(topVertices, newEdges, context);
@@ -284,7 +284,7 @@ public class BipartiteGraph<KT, KB, VVT, VVB, EV> {
 		DataSet<Edge<KB, Projection<KT, VVT, VVB, EV>>> newEdges = edgesWithVertices.join(edgesWithVertices)
 			.where(0)
 			.equalTo(0)
-			.with(new ProjectionBottomFull<KT, KB, EV, VVT, VVB>())
+			.with(new ProjectionBottomFull<>())
 				.name("Full bottom projection");
 
 		return Graph.fromDataSet(bottomVertices, newEdges, context);

http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GraphGeneratorUtils.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GraphGeneratorUtils.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GraphGeneratorUtils.java
index fca9d8b..d5a70f3 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GraphGeneratorUtils.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GraphGeneratorUtils.java
@@ -91,7 +91,7 @@ public class GraphGeneratorUtils {
 	 */
 	public static <K, EV> DataSet<Vertex<K, NullValue>> vertexSet(DataSet<Edge<K, EV>> edges, int parallelism) {
 		DataSet<Vertex<K, NullValue>> vertexSet = edges
-			.flatMap(new EmitSrcAndTarget<K, EV>())
+			.flatMap(new EmitSrcAndTarget<>())
 				.setParallelism(parallelism)
 				.name("Emit source and target labels");
 

http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/RMatGraph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/RMatGraph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/RMatGraph.java
index d14d32c..1960aa3 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/RMatGraph.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/RMatGraph.java
@@ -156,7 +156,7 @@ extends GraphGeneratorBase<LongValue, NullValue, NullValue> {
 			.rebalance()
 				.setParallelism(parallelism)
 				.name("Rebalance")
-			.flatMap(new GenerateEdges<T>(vertexCount, scale, a, b, c, noiseEnabled, noise))
+			.flatMap(new GenerateEdges<>(vertexCount, scale, a, b, c, noiseEnabled, noise))
 				.setParallelism(parallelism)
 				.name("RMat graph edges");
 

http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GSAConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GSAConfiguration.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GSAConfiguration.java
index 72e18ae..f09a890 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GSAConfiguration.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GSAConfiguration.java
@@ -58,7 +58,7 @@ public class GSAConfiguration extends IterationConfiguration {
 	 * @param data The data set to be broadcasted.
 	 */
 	public void addBroadcastSetForGatherFunction(String name, DataSet<?> data) {
-		this.bcVarsGather.add(new Tuple2<String, DataSet<?>>(name, data));
+		this.bcVarsGather.add(new Tuple2<>(name, data));
 	}
 
 	/**
@@ -68,7 +68,7 @@ public class GSAConfiguration extends IterationConfiguration {
 	 * @param data The data set to be broadcasted.
 	 */
 	public void addBroadcastSetForSumFunction(String name, DataSet<?> data) {
-		this.bcVarsSum.add(new Tuple2<String, DataSet<?>>(name, data));
+		this.bcVarsSum.add(new Tuple2<>(name, data));
 	}
 
 	/**
@@ -78,7 +78,7 @@ public class GSAConfiguration extends IterationConfiguration {
 	 * @param data The data set to be broadcasted.
 	 */
 	public void addBroadcastSetForApplyFunction(String name, DataSet<?> data) {
-		this.bcVarsApply.add(new Tuple2<String, DataSet<?>>(name, data));
+		this.bcVarsApply.add(new Tuple2<>(name, data));
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java
index 12d4977..5f04b70 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java
@@ -169,24 +169,24 @@ public class GatherSumApplyIteration<K, VV, EV, M> implements CustomUnaryOperati
 			case OUT:
 				neighbors = iteration
 				.getWorkset().join(edgeDataSet)
-				.where(0).equalTo(0).with(new ProjectKeyWithNeighborOUT<K, VV, EV>());
+				.where(0).equalTo(0).with(new ProjectKeyWithNeighborOUT<>());
 				break;
 			case IN:
 				neighbors = iteration
 				.getWorkset().join(edgeDataSet)
-				.where(0).equalTo(1).with(new ProjectKeyWithNeighborIN<K, VV, EV>());
+				.where(0).equalTo(1).with(new ProjectKeyWithNeighborIN<>());
 				break;
 			case ALL:
 				neighbors =  iteration
 						.getWorkset().join(edgeDataSet)
-						.where(0).equalTo(0).with(new ProjectKeyWithNeighborOUT<K, VV, EV>()).union(iteration
+						.where(0).equalTo(0).with(new ProjectKeyWithNeighborOUT<>()).union(iteration
 								.getWorkset().join(edgeDataSet)
-								.where(0).equalTo(1).with(new ProjectKeyWithNeighborIN<K, VV, EV>()));
+								.where(0).equalTo(1).with(new ProjectKeyWithNeighborIN<>()));
 				break;
 			default:
 				neighbors = iteration
 						.getWorkset().join(edgeDataSet)
-						.where(0).equalTo(0).with(new ProjectKeyWithNeighborOUT<K, VV, EV>());
+						.where(0).equalTo(0).with(new ProjectKeyWithNeighborOUT<>());
 				break;
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/CommunityDetection.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/CommunityDetection.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/CommunityDetection.java
index 9846286..ccf2bb1 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/CommunityDetection.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/CommunityDetection.java
@@ -73,14 +73,14 @@ public class CommunityDetection<K> implements GraphAlgorithm<K, Long, Double, Gr
 	public Graph<K, Long, Double> run(Graph<K, Long, Double> graph) {
 
 		DataSet<Vertex<K, Tuple2<Long, Double>>> initializedVertices = graph.getVertices()
-			.map(new AddScoreToVertexValuesMapper<K>());
+			.map(new AddScoreToVertexValuesMapper<>());
 
 		Graph<K, Tuple2<Long, Double>, Double> graphWithScoredVertices =
 			Graph.fromDataSet(initializedVertices, graph.getEdges(), graph.getContext()).getUndirected();
 
-		return graphWithScoredVertices.runScatterGatherIteration(new LabelMessenger<K>(),
-			new VertexLabelUpdater<K>(delta), maxIterations)
-				.mapVertices(new RemoveScoreFromVertexValuesMapper<K>());
+		return graphWithScoredVertices.runScatterGatherIteration(new LabelMessenger<>(),
+			new VertexLabelUpdater<>(delta), maxIterations)
+				.mapVertices(new RemoveScoreFromVertexValuesMapper<>());
 	}
 
 	@SuppressWarnings("serial")

http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/ConnectedComponents.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/ConnectedComponents.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/ConnectedComponents.java
index a3110ab..5cb8abe 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/ConnectedComponents.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/ConnectedComponents.java
@@ -22,7 +22,6 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.GraphAlgorithm;
 import org.apache.flink.graph.Vertex;
@@ -73,12 +72,12 @@ public class ConnectedComponents<K, VV extends Comparable<VV>, EV>
 		TypeInformation<VV> valueTypeInfo = ((TupleTypeInfo<?>) graph.getVertices().getType()).getTypeAt(1);
 
 		Graph<K, VV, NullValue> undirectedGraph = graph
-			.mapEdges(new MapTo<Edge<K, EV>, NullValue>(NullValue.getInstance()))
+			.mapEdges(new MapTo<>(NullValue.getInstance()))
 			.getUndirected();
 
 		return undirectedGraph.runScatterGatherIteration(
-			new CCMessenger<K, VV>(valueTypeInfo),
-			new CCUpdater<K, VV>(),
+			new CCMessenger<>(valueTypeInfo),
+			new CCUpdater<>(),
 			maxIterations).getVertices();
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAConnectedComponents.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAConnectedComponents.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAConnectedComponents.java
index 37e5cab..230f88e 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAConnectedComponents.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAConnectedComponents.java
@@ -22,7 +22,6 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.GraphAlgorithm;
 import org.apache.flink.graph.Vertex;
@@ -74,13 +73,13 @@ public class GSAConnectedComponents<K, VV extends Comparable<VV>, EV>
 		TypeInformation<VV> valueTypeInfo = ((TupleTypeInfo<?>) graph.getVertices().getType()).getTypeAt(1);
 
 		Graph<K, VV, NullValue> undirectedGraph = graph
-			.mapEdges(new MapTo<Edge<K, EV>, NullValue>(NullValue.getInstance()))
+			.mapEdges(new MapTo<>(NullValue.getInstance()))
 			.getUndirected();
 
 		return undirectedGraph.runGatherSumApplyIteration(
 			new GatherNeighborIds<>(valueTypeInfo),
 			new SelectMinId<>(valueTypeInfo),
-			new UpdateComponentId<K, VV>(valueTypeInfo),
+			new UpdateComponentId<>(valueTypeInfo),
 			maxIterations).getVertices();
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSASingleSourceShortestPaths.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSASingleSourceShortestPaths.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSASingleSourceShortestPaths.java
index 2d0b8da..28e9168 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSASingleSourceShortestPaths.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSASingleSourceShortestPaths.java
@@ -51,9 +51,9 @@ public class GSASingleSourceShortestPaths<K, VV> implements
 	@Override
 	public DataSet<Vertex<K, Double>> run(Graph<K, VV, Double> input) {
 
-		return input.mapVertices(new InitVerticesMapper<K, VV>(srcVertexId))
+		return input.mapVertices(new InitVerticesMapper<>(srcVertexId))
 				.runGatherSumApplyIteration(new CalculateDistances(), new ChooseMinDistance(),
-						new UpdateDistance<K>(), maxIterations)
+					new UpdateDistance<>(), maxIterations)
 						.getVertices();
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java
index 1e700f4..880a67b 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java
@@ -22,7 +22,6 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.GraphAlgorithm;
 import org.apache.flink.graph.Vertex;
@@ -77,9 +76,9 @@ public class LabelPropagation<K, VV extends Comparable<VV>, EV>
 		TypeInformation<VV> valueType = ((TupleTypeInfo<?>) input.getVertices().getType()).getTypeAt(1);
 		// iteratively adopt the most frequent label among the neighbors of each vertex
 		return input
-			.mapEdges(new MapTo<Edge<K, EV>, NullValue>(NullValue.getInstance()))
+			.mapEdges(new MapTo<>(NullValue.getInstance()))
 			.runScatterGatherIteration(
-				new SendNewLabelToNeighbors<K, VV>(valueType), new UpdateVertexLabel<K, VV>(), maxIterations)
+				new SendNewLabelToNeighbors<>(valueType), new UpdateVertexLabel<>(), maxIterations)
 			.getVertices();
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java
index 15f0a84..8f41fa0 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java
@@ -51,8 +51,8 @@ public class SingleSourceShortestPaths<K, VV> implements GraphAlgorithm<K, VV, D
 	@Override
 	public DataSet<Vertex<K, Double>> run(Graph<K, VV, Double> input) {
 
-		return input.mapVertices(new InitVerticesMapper<K, VV>(srcVertexId))
-				.runScatterGatherIteration(new MinDistanceMessenger<K>(), new VertexDistanceUpdater<K>(),
+		return input.mapVertices(new InitVerticesMapper<>(srcVertexId))
+				.runScatterGatherIteration(new MinDistanceMessenger<>(), new VertexDistanceUpdater<>(),
 				maxIterations).getVertices();
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/Summarization.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/Summarization.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/Summarization.java
index 44ea988..a1498df 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/Summarization.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/Summarization.java
@@ -102,11 +102,11 @@ public class Summarization<K, VV, EV>
 		// group vertices by value and create vertex group items
 		DataSet<VertexGroupItem<K, VV>> vertexGroupItems = input.getVertices()
 				.groupBy(1)
-				.reduceGroup(new VertexGroupReducer<K, VV>());
+				.reduceGroup(new VertexGroupReducer<>());
 		// create super vertices
 		DataSet<Vertex<K, VertexValue<VV>>> summarizedVertices = vertexGroupItems
-				.filter(new VertexGroupItemToSummarizedVertexFilter<K, VV>())
-				.map(new VertexGroupItemToSummarizedVertexMapper<K, VV>());
+				.filter(new VertexGroupItemToSummarizedVertexFilter<>())
+				.map(new VertexGroupItemToSummarizedVertexMapper<>());
 
 		// -------------------------
 		// build super edges
@@ -114,22 +114,22 @@ public class Summarization<K, VV, EV>
 
 		// create mapping between vertices and their representative
 		DataSet<VertexWithRepresentative<K>> vertexToRepresentativeMap = vertexGroupItems
-			.filter(new VertexGroupItemToRepresentativeFilter<K, VV>())
-			.map(new VertexGroupItemToVertexWithRepresentativeMapper<K, VV>());
+			.filter(new VertexGroupItemToRepresentativeFilter<>())
+			.map(new VertexGroupItemToVertexWithRepresentativeMapper<>());
 		// join edges with vertex representatives and update source and target identifiers
 		DataSet<Edge<K, EV>> edgesForGrouping = input.getEdges()
 				.join(vertexToRepresentativeMap)
 				.where(0) 	// source vertex id
 				.equalTo(0) // vertex id
-				.with(new SourceVertexJoinFunction<K, EV>())
+				.with(new SourceVertexJoinFunction<>())
 				.join(vertexToRepresentativeMap)
 				.where(1) 	// target vertex id
 				.equalTo(0) // vertex id
-				.with(new TargetVertexJoinFunction<K, EV>());
+				.with(new TargetVertexJoinFunction<>());
 		// create super edges
 		DataSet<Edge<K, EdgeValue<EV>>> summarizedEdges = edgesForGrouping
 				.groupBy(0, 1, 2) // group by source id (0), target id (1) and edge value (2)
-				.reduceGroup(new EdgeGroupReducer<K, EV>());
+				.reduceGroup(new EdgeGroupReducer<>());
 
 		return Graph.fromDataSet(summarizedVertices, summarizedEdges, input.getContext());
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/TriangleEnumerator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/TriangleEnumerator.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/TriangleEnumerator.java
index 2ae6120..23f942c 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/TriangleEnumerator.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/TriangleEnumerator.java
@@ -65,21 +65,21 @@ public class TriangleEnumerator<K extends Comparable<K>, VV, EV> implements
 		DataSet<Edge<K, EV>> edges = input.getEdges();
 
 		// annotate edges with degrees
-		DataSet<EdgeWithDegrees<K>> edgesWithDegrees = edges.flatMap(new EdgeDuplicator<K, EV>())
-				.groupBy(0).sortGroup(1, Order.ASCENDING).reduceGroup(new DegreeCounter<K, EV>())
-				.groupBy(EdgeWithDegrees.V1, EdgeWithDegrees.V2).reduce(new DegreeJoiner<K>());
+		DataSet<EdgeWithDegrees<K>> edgesWithDegrees = edges.flatMap(new EdgeDuplicator<>())
+				.groupBy(0).sortGroup(1, Order.ASCENDING).reduceGroup(new DegreeCounter<>())
+				.groupBy(EdgeWithDegrees.V1, EdgeWithDegrees.V2).reduce(new DegreeJoiner<>());
 
 		// project edges by degrees
-		DataSet<Edge<K, NullValue>> edgesByDegree = edgesWithDegrees.map(new EdgeByDegreeProjector<K>());
+		DataSet<Edge<K, NullValue>> edgesByDegree = edgesWithDegrees.map(new EdgeByDegreeProjector<>());
 		// project edges by vertex id
-		DataSet<Edge<K, NullValue>> edgesById = edgesByDegree.map(new EdgeByIdProjector<K>());
+		DataSet<Edge<K, NullValue>> edgesById = edgesByDegree.map(new EdgeByIdProjector<>());
 
 		DataSet<Tuple3<K, K, K>> triangles = edgesByDegree
 				// build triads
 				.groupBy(EdgeWithDegrees.V1).sortGroup(EdgeWithDegrees.V2, Order.ASCENDING)
-				.reduceGroup(new TriadBuilder<K>())
+				.reduceGroup(new TriadBuilder<>())
 				// filter triads
-				.join(edgesById, JoinHint.REPARTITION_HASH_SECOND).where(Triad.V2, Triad.V3).equalTo(0, 1).with(new TriadFilter<K>());
+				.join(edgesById, JoinHint.REPARTITION_HASH_SECOND).where(Triad.V2, Triad.V3).equalTo(0, 1).with(new TriadFilter<>());
 
 		return triangles;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficient.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficient.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficient.java
index 981110f..55d3056 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficient.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficient.java
@@ -119,13 +119,13 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 
 		// u, edge count
 		DataSet<Tuple2<K, LongValue>> triangleVertices = triangles
-			.flatMap(new SplitTriangles<K>())
+			.flatMap(new SplitTriangles<>())
 				.name("Split triangle vertices");
 
 		// u, triangle count
 		DataSet<Tuple2<K, LongValue>> vertexTriangleCount = triangleVertices
 			.groupBy(0)
-			.reduce(new CountTriangles<K>())
+			.reduce(new CountTriangles<>())
 			.setCombineHint(CombineHint.HASH)
 				.name("Count triangles");
 
@@ -140,7 +140,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 			.leftOuterJoin(vertexTriangleCount)
 			.where(0)
 			.equalTo(0)
-			.with(new JoinVertexDegreeWithTriangleCount<K>())
+			.with(new JoinVertexDegreeWithTriangleCount<>())
 				.setParallelism(parallelism)
 				.name("Clustering coefficient");
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriangleListing.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriangleListing.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriangleListing.java
index 00b2210..52d3c10 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriangleListing.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriangleListing.java
@@ -84,11 +84,11 @@ extends TriangleListingBase<K, VV, EV, Result<K>> {
 		// u, v, bitmask where u < v
 		DataSet<Tuple3<K, K, ByteValue>> filteredByID = input
 			.getEdges()
-			.map(new OrderByID<K, EV>())
+			.map(new OrderByID<>())
 				.setParallelism(parallelism)
 				.name("Order by ID")
 			.groupBy(0, 1)
-			.reduceGroup(new ReduceBitmask<K>())
+			.reduceGroup(new ReduceBitmask<>())
 				.setParallelism(parallelism)
 				.name("Flatten by ID");
 
@@ -99,11 +99,11 @@ extends TriangleListingBase<K, VV, EV, Result<K>> {
 
 		// u, v, bitmask where deg(u) < deg(v) or (deg(u) == deg(v) and u < v)
 		DataSet<Tuple3<K, K, ByteValue>> filteredByDegree = pairDegrees
-			.map(new OrderByDegree<K, EV>())
+			.map(new OrderByDegree<>())
 				.setParallelism(parallelism)
 				.name("Order by degree")
 			.groupBy(0, 1)
-			.reduceGroup(new ReduceBitmask<K>())
+			.reduceGroup(new ReduceBitmask<>())
 				.setParallelism(parallelism)
 				.name("Flatten by degree");
 
@@ -111,7 +111,7 @@ extends TriangleListingBase<K, VV, EV, Result<K>> {
 		DataSet<Tuple4<K, K, K, ByteValue>> triplets = filteredByDegree
 			.groupBy(0)
 			.sortGroup(1, Order.ASCENDING)
-			.reduceGroup(new GenerateTriplets<K>())
+			.reduceGroup(new GenerateTriplets<>())
 				.name("Generate triplets");
 
 		// u, v, w, bitmask where (u, v), (u, w), and (v, w) are edges in graph
@@ -119,16 +119,16 @@ extends TriangleListingBase<K, VV, EV, Result<K>> {
 			.join(filteredByID, JoinOperatorBase.JoinHint.REPARTITION_HASH_SECOND)
 			.where(1, 2)
 			.equalTo(0, 1)
-			.with(new ProjectTriangles<K>())
+			.with(new ProjectTriangles<>())
 				.name("Triangle listing");
 
 		if (permuteResults) {
 			triangles = triangles
-				.flatMap(new PermuteResult<K>())
+				.flatMap(new PermuteResult<>())
 					.name("Permute triangle vertices");
 		} else if (sortTriangleVertices.get()) {
 			triangles = triangles
-				.map(new SortTriangleVertices<K>())
+				.map(new SortTriangleVertices<>())
 					.name("Sort triangle vertices");
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java
index 0beb989..e1c7655 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java
@@ -118,13 +118,13 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 
 		// u, 1
 		DataSet<Tuple2<K, LongValue>> triangleVertices = triangles
-			.flatMap(new SplitTriangles<K>())
+			.flatMap(new SplitTriangles<>())
 				.name("Split triangle vertices");
 
 		// u, triangle count
 		DataSet<Tuple2<K, LongValue>> vertexTriangleCount = triangleVertices
 			.groupBy(0)
-			.reduce(new CountTriangles<K>())
+			.reduce(new CountTriangles<>())
 			.setCombineHint(CombineHint.HASH)
 				.name("Count triangles");
 
@@ -139,7 +139,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 			.leftOuterJoin(vertexTriangleCount)
 			.where(0)
 			.equalTo(0)
-			.with(new JoinVertexDegreeWithTriangleCount<K>())
+			.with(new JoinVertexDegreeWithTriangleCount<>())
 				.setParallelism(parallelism)
 				.name("Clustering coefficient");
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleListing.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleListing.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleListing.java
index 2472744..3f1b00a 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleListing.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleListing.java
@@ -83,7 +83,7 @@ extends TriangleListingBase<K, VV, EV, Result<K>> {
 		// u, v where u < v
 		DataSet<Tuple2<K, K>> filteredByID = input
 			.getEdges()
-			.flatMap(new FilterByID<K, EV>())
+			.flatMap(new FilterByID<>())
 				.setParallelism(parallelism)
 				.name("Filter by ID");
 
@@ -94,7 +94,7 @@ extends TriangleListingBase<K, VV, EV, Result<K>> {
 
 		// u, v where deg(u) < deg(v) or (deg(u) == deg(v) and u < v)
 		DataSet<Tuple2<K, K>> filteredByDegree = pairDegree
-			.flatMap(new FilterByDegree<K, EV>())
+			.flatMap(new FilterByDegree<>())
 				.setParallelism(parallelism)
 				.name("Filter by degree");
 
@@ -102,7 +102,7 @@ extends TriangleListingBase<K, VV, EV, Result<K>> {
 		DataSet<Tuple3<K, K, K>> triplets = filteredByDegree
 			.groupBy(0)
 			.sortGroup(1, Order.ASCENDING)
-			.reduceGroup(new GenerateTriplets<K>())
+			.reduceGroup(new GenerateTriplets<>())
 				.name("Generate triplets");
 
 		// u, v, w where (u, v), (u, w), and (v, w) are edges in graph, v < w
@@ -110,16 +110,16 @@ extends TriangleListingBase<K, VV, EV, Result<K>> {
 			.join(filteredByID, JoinOperatorBase.JoinHint.REPARTITION_HASH_SECOND)
 			.where(1, 2)
 			.equalTo(0, 1)
-			.with(new ProjectTriangles<K>())
+			.with(new ProjectTriangles<>())
 				.name("Triangle listing");
 
 		if (permuteResults) {
 			triangles = triangles
-				.flatMap(new PermuteResult<K>())
+				.flatMap(new PermuteResult<>())
 					.name("Permute triangle vertices");
 		} else if (sortTriangleVertices.get()) {
 			triangles = triangles
-				.map(new SortTriangleVertices<K>())
+				.map(new SortTriangleVertices<>())
 					.name("Sort triangle vertices");
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/linkanalysis/HITS.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/linkanalysis/HITS.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/linkanalysis/HITS.java
index 6b41ee4..e59240b 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/linkanalysis/HITS.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/linkanalysis/HITS.java
@@ -129,17 +129,17 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 			throws Exception {
 		DataSet<Tuple2<K, K>> edges = input
 			.getEdges()
-			.map(new ExtractEdgeIDs<K, EV>())
+			.map(new ExtractEdgeIDs<>())
 				.setParallelism(parallelism)
 				.name("Extract edge IDs");
 
 		// ID, hub, authority
 		DataSet<Tuple3<K, DoubleValue, DoubleValue>> initialScores = edges
-			.map(new InitializeScores<K>())
+			.map(new InitializeScores<>())
 				.setParallelism(parallelism)
 				.name("Initial scores")
 			.groupBy(0)
-			.reduce(new SumScores<K>())
+			.reduce(new SumScores<>())
 			.setCombineHint(CombineHint.HASH)
 				.setParallelism(parallelism)
 				.name("Sum");
@@ -153,18 +153,18 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 			.coGroup(edges)
 			.where(0)
 			.equalTo(1)
-			.with(new Hubbiness<K>())
+			.with(new Hubbiness<>())
 				.setParallelism(parallelism)
 				.name("Hub")
 			.groupBy(0)
-			.reduce(new SumScore<K>())
+			.reduce(new SumScore<>())
 			.setCombineHint(CombineHint.HASH)
 				.setParallelism(parallelism)
 				.name("Sum");
 
 		// sum-of-hubbiness-squared
 		DataSet<DoubleValue> hubbinessSumSquared = hubbiness
-			.map(new Square<K>())
+			.map(new Square<>())
 				.setParallelism(parallelism)
 				.name("Square")
 			.reduce(new Sum())
@@ -177,18 +177,18 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 			.coGroup(edges)
 			.where(0)
 			.equalTo(0)
-			.with(new Authority<K>())
+			.with(new Authority<>())
 				.setParallelism(parallelism)
 				.name("Authority")
 			.groupBy(0)
-			.reduce(new SumScore<K>())
+			.reduce(new SumScore<>())
 			.setCombineHint(CombineHint.HASH)
 				.setParallelism(parallelism)
 				.name("Sum");
 
 		// sum-of-authority-squared
 		DataSet<DoubleValue> authoritySumSquared = authority
-			.map(new Square<K>())
+			.map(new Square<>())
 				.setParallelism(parallelism)
 				.name("Square")
 			.reduce(new Sum())
@@ -201,7 +201,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 			.fullOuterJoin(authority, JoinHint.REPARTITION_SORT_MERGE)
 			.where(0)
 			.equalTo(0)
-			.with(new JoinAndNormalizeHubAndAuthority<K>())
+			.with(new JoinAndNormalizeHubAndAuthority<>())
 			.withBroadcastSet(hubbinessSumSquared, HUBBINESS_SUM_SQUARED)
 			.withBroadcastSet(authoritySumSquared, AUTHORITY_SUM_SQUARED)
 				.setParallelism(parallelism)
@@ -214,7 +214,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 				.fullOuterJoin(scores, JoinHint.REPARTITION_SORT_MERGE)
 				.where(0)
 				.equalTo(0)
-				.with(new ChangeInScores<K>())
+				.with(new ChangeInScores<>())
 					.setParallelism(parallelism)
 					.name("Change in scores");
 
@@ -225,7 +225,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 
 		return iterative
 			.closeWith(passThrough)
-			.map(new TranslateResult<K>())
+			.map(new TranslateResult<>())
 				.setParallelism(parallelism)
 				.name("Map result");
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/linkanalysis/PageRank.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/linkanalysis/PageRank.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/linkanalysis/PageRank.java
index af56e50..71c37aa 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/linkanalysis/PageRank.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/linkanalysis/PageRank.java
@@ -151,20 +151,20 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 		DataSet<Edge<K, LongValue>> edgeSourceDegree = input
 			.run(new EdgeSourceDegrees<K, VV, EV>()
 				.setParallelism(parallelism))
-			.map(new ExtractSourceDegree<K, EV>())
+			.map(new ExtractSourceDegree<>())
 				.setParallelism(parallelism)
 				.name("Extract source degree");
 
 		// vertices with zero in-edges
 		DataSet<Tuple2<K, DoubleValue>> sourceVertices = vertexDegree
-			.flatMap(new InitializeSourceVertices<K>())
+			.flatMap(new InitializeSourceVertices<>())
 			.withBroadcastSet(vertexCount, VERTEX_COUNT)
 				.setParallelism(parallelism)
 				.name("Initialize source vertex scores");
 
 		// s, initial pagerank(s)
 		DataSet<Tuple2<K, DoubleValue>> initialScores = vertexDegree
-			.map(new InitializeVertexScores<K>())
+			.map(new InitializeVertexScores<>())
 			.withBroadcastSet(vertexCount, VERTEX_COUNT)
 				.setParallelism(parallelism)
 				.name("Initialize scores");
@@ -178,18 +178,18 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 			.coGroup(edgeSourceDegree)
 			.where(0)
 			.equalTo(0)
-			.with(new SendScore<K>())
+			.with(new SendScore<>())
 				.setParallelism(parallelism)
 				.name("Send score")
 			.groupBy(0)
-			.reduce(new SumScore<K>())
+			.reduce(new SumScore<>())
 			.setCombineHint(CombineHint.HASH)
 				.setParallelism(parallelism)
 				.name("Sum");
 
 		// ignored ID, total pagerank
 		DataSet<Tuple2<K, DoubleValue>> sumOfScores = vertexScores
-			.reduce(new SumVertexScores<K>())
+			.reduce(new SumVertexScores<>())
 				.setParallelism(parallelism)
 				.name("Sum");
 
@@ -198,7 +198,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 			.union(sourceVertices)
 				.setParallelism(parallelism)
 				.name("Union with source vertices")
-			.map(new AdjustScores<K>(dampingFactor))
+			.map(new AdjustScores<>(dampingFactor))
 				.withBroadcastSet(sumOfScores, SUM_OF_SCORES)
 				.withBroadcastSet(vertexCount, VERTEX_COUNT)
 					.setParallelism(parallelism)
@@ -211,7 +211,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 				.join(adjustedScores)
 				.where(0)
 				.equalTo(0)
-				.with(new ChangeInScores<K>())
+				.with(new ChangeInScores<>())
 					.setParallelism(parallelism)
 					.name("Change in scores");
 
@@ -222,7 +222,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 
 		return iterative
 			.closeWith(passThrough)
-			.map(new TranslateResult<K>())
+			.map(new TranslateResult<>())
 				.setParallelism(parallelism)
 				.name("Map result");
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/EdgeMetrics.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/EdgeMetrics.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/EdgeMetrics.java
index c88e401..7294fd1 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/EdgeMetrics.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/EdgeMetrics.java
@@ -92,15 +92,15 @@ extends GraphAnalyticBase<K, VV, EV, Result> {
 
 		// s, d(s), count of (u, v) where deg(u) < deg(v) or (deg(u) == deg(v) and u < v)
 		DataSet<Tuple3<K, Degrees, LongValue>> edgeStats = edgeDegreesPair
-			.flatMap(new EdgeStats<K, EV>())
+			.flatMap(new EdgeStats<>())
 				.setParallelism(parallelism)
 				.name("Edge stats")
 			.groupBy(0, 1)
-			.reduceGroup(new ReduceEdgeStats<K>())
+			.reduceGroup(new ReduceEdgeStats<>())
 				.setParallelism(parallelism)
 				.name("Reduce edge stats")
 			.groupBy(0)
-			.reduce(new SumEdgeStats<K>())
+			.reduce(new SumEdgeStats<>())
 			.setCombineHint(CombineHint.HASH)
 				.setParallelism(parallelism)
 				.name("Sum edge stats");

http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/EdgeMetrics.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/EdgeMetrics.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/EdgeMetrics.java
index 4c0d654..8c520e6 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/EdgeMetrics.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/EdgeMetrics.java
@@ -103,11 +103,11 @@ extends GraphAnalyticBase<K, VV, EV, Result> {
 
 		// s, d(s), count of (u, v) where deg(u) < deg(v) or (deg(u) == deg(v) and u < v)
 		DataSet<Tuple3<K, LongValue, LongValue>> edgeStats = edgeDegreePair
-			.map(new EdgeStats<K, EV>())
+			.map(new EdgeStats<>())
 				.setParallelism(parallelism)
 				.name("Edge stats")
 			.groupBy(0)
-			.reduce(new SumEdgeStats<K>())
+			.reduce(new SumEdgeStats<>())
 			.setCombineHint(CombineHint.HASH)
 				.setParallelism(parallelism)
 				.name("Sum edge stats");

http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/AdamicAdar.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/AdamicAdar.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/AdamicAdar.java
index d761f60..752e206 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/AdamicAdar.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/AdamicAdar.java
@@ -153,7 +153,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 		DataSet<Tuple3<K, LongValue, FloatValue>> inverseLogDegree = input
 			.run(new VertexDegree<K, VV, EV>()
 				.setParallelism(parallelism))
-			.map(new VertexInverseLogDegree<K>())
+			.map(new VertexInverseLogDegree<>())
 				.setParallelism(parallelism)
 				.name("Vertex score");
 
@@ -172,7 +172,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 		DataSet<Tuple4<IntValue, K, K, FloatValue>> groupSpans = sourceInverseLogDegree
 			.groupBy(0)
 			.sortGroup(1, Order.ASCENDING)
-			.reduceGroup(new GenerateGroupSpans<K>())
+			.reduceGroup(new GenerateGroupSpans<>())
 				.setParallelism(parallelism)
 				.name("Generate group spans");
 
@@ -181,7 +181,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 			.rebalance()
 				.setParallelism(parallelism)
 				.name("Rebalance")
-			.flatMap(new GenerateGroups<K>())
+			.flatMap(new GenerateGroups<>())
 				.setParallelism(parallelism)
 				.name("Generate groups");
 
@@ -189,19 +189,19 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 		DataSet<Tuple3<K, K, FloatValue>> twoPaths = groups
 			.groupBy(0, 1)
 			.sortGroup(2, Order.ASCENDING)
-			.reduceGroup(new GenerateGroupPairs<K>())
+			.reduceGroup(new GenerateGroupPairs<>())
 				.name("Generate group pairs");
 
 		// t, u, adamic-adar score
 		GroupReduceOperator<Tuple3<K, K, FloatValue>, Result<K>> scores = twoPaths
 			.groupBy(0, 1)
-			.reduceGroup(new ComputeScores<K>(minimumScore, minimumRatio))
+			.reduceGroup(new ComputeScores<>(minimumScore, minimumRatio))
 				.name("Compute scores");
 
 		if (minimumRatio > 0.0f) {
 			// total score, number of pairs of neighbors
 			DataSet<Tuple2<FloatValue, LongValue>> sumOfScoresAndNumberOfNeighborPairs = inverseLogDegree
-				.map(new ComputeScoreFromVertex<K>())
+				.map(new ComputeScoreFromVertex<>())
 					.setParallelism(parallelism)
 					.name("Average score")
 				.sum(0)
@@ -213,7 +213,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 
 		if (mirrorResults) {
 			return scores
-				.flatMap(new MirrorResult<K, Result<K>>())
+				.flatMap(new MirrorResult<>())
 					.name("Mirror results");
 		} else {
 			return scores;

http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java
index 8e820ac..92bf9e3 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java
@@ -199,7 +199,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 		DataSet<Tuple4<IntValue, K, K, IntValue>> groupSpans = neighborDegree
 			.groupBy(0)
 			.sortGroup(1, Order.ASCENDING)
-			.reduceGroup(new GenerateGroupSpans<K, EV>(groupSize))
+			.reduceGroup(new GenerateGroupSpans<>(groupSize))
 				.setParallelism(parallelism)
 				.name("Generate group spans");
 
@@ -208,7 +208,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 			.rebalance()
 				.setParallelism(parallelism)
 				.name("Rebalance")
-			.flatMap(new GenerateGroups<K>())
+			.flatMap(new GenerateGroups<>())
 				.setParallelism(parallelism)
 				.name("Generate groups");
 
@@ -216,20 +216,20 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 		DataSet<Tuple3<K, K, IntValue>> twoPaths = groups
 			.groupBy(0, 1)
 			.sortGroup(2, Order.ASCENDING)
-			.reduceGroup(new GenerateGroupPairs<K>(groupSize))
+			.reduceGroup(new GenerateGroupPairs<>(groupSize))
 				.name("Generate group pairs");
 
 		// t, u, intersection, union
 		DataSet<Result<K>> scores = twoPaths
 			.groupBy(0, 1)
-			.reduceGroup(new ComputeScores<K>(unboundedScores,
+			.reduceGroup(new ComputeScores<>(unboundedScores,
 					minimumScoreNumerator, minimumScoreDenominator,
 					maximumScoreNumerator, maximumScoreDenominator))
 				.name("Compute scores");
 
 		if (mirrorResults) {
 			scores = scores
-				.flatMap(new MirrorResult<K, Result<K>>())
+				.flatMap(new MirrorResult<>())
 					.name("Mirror results");
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/VertexCentricConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/VertexCentricConfiguration.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/VertexCentricConfiguration.java
index 69fcc52..39b9bcf 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/VertexCentricConfiguration.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/VertexCentricConfiguration.java
@@ -48,7 +48,7 @@ public class VertexCentricConfiguration extends IterationConfiguration {
 	 * @param data The data set to be broadcasted.
 	 */
 	public void addBroadcastSet(String name, DataSet<?> data) {
-		this.bcVars.add(new Tuple2<String, DataSet<?>>(name, data));
+		this.bcVars.add(new Tuple2<>(name, data));
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/VertexCentricIteration.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/VertexCentricIteration.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/VertexCentricIteration.java
index c30b1a7..6c06a3a 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/VertexCentricIteration.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/VertexCentricIteration.java
@@ -176,9 +176,9 @@ public class VertexCentricIteration<K, VV, EV, Message>
 		DataSet<Tuple2<Vertex<K, VV>, Either<NullValue, Message>>> verticesWithMsgs =
 				iteration.getSolutionSet().join(iteration.getWorkset())
 				.where(0).equalTo(0)
-				.with(new AppendVertexState<K, VV, Message>())
-				.returns(new TupleTypeInfo<Tuple2<Vertex<K, VV>, Either<NullValue, Message>>>(
-						vertexType, nullableMsgTypeInfo));
+				.with(new AppendVertexState<>())
+				.returns(new TupleTypeInfo<>(
+					vertexType, nullableMsgTypeInfo));
 
 		VertexComputeUdf<K, VV, EV, Message> vertexUdf =
 			new VertexComputeUdf<>(computeFunction, intermediateTypeInfo);
@@ -190,11 +190,11 @@ public class VertexCentricIteration<K, VV, EV, Message>
 
 		// compute the solution set delta
 		DataSet<Vertex<K, VV>> solutionSetDelta = superstepComputation.flatMap(
-				new ProjectNewVertexValue<K, VV, Message>()).returns(vertexType);
+			new ProjectNewVertexValue<>()).returns(vertexType);
 
 		// compute the inbox of each vertex for the next superstep (new workset)
 		DataSet<Tuple2<K, Either<NullValue, Message>>> allMessages = superstepComputation.flatMap(
-				new ProjectMessages<K, VV, Message>()).returns(workSetTypeInfo);
+			new ProjectMessages<>()).returns(workSetTypeInfo);
 
 		DataSet<Tuple2<K, Either<NullValue, Message>>> newWorkSet = allMessages;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/ScatterGatherConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/ScatterGatherConfiguration.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/ScatterGatherConfiguration.java
index 6a62847..0422f13 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/ScatterGatherConfiguration.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/ScatterGatherConfiguration.java
@@ -59,7 +59,7 @@ public class ScatterGatherConfiguration extends IterationConfiguration {
 	 * @param data The data set to be broadcasted.
 	 */
 	public void addBroadcastSetForScatterFunction(String name, DataSet<?> data) {
-		this.bcVarsScatter.add(new Tuple2<String, DataSet<?>>(name, data));
+		this.bcVarsScatter.add(new Tuple2<>(name, data));
 	}
 
 	/**
@@ -69,7 +69,7 @@ public class ScatterGatherConfiguration extends IterationConfiguration {
 	 * @param data The data set to be broadcasted.
 	 */
 	public void addBroadcastSetForGatherFunction(String name, DataSet<?> data) {
-		this.bcVarsGather.add(new Tuple2<String, DataSet<?>>(name, data));
+		this.bcVarsGather.add(new Tuple2<>(name, data));
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/GraphUtils.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/GraphUtils.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/GraphUtils.java
index 3e2ac23..8082cd9 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/GraphUtils.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/GraphUtils.java
@@ -46,7 +46,7 @@ public class GraphUtils {
 	 */
 	public static <T> DataSet<LongValue> count(DataSet<T> input) {
 		return input
-			.map(new MapTo<T, LongValue>(new LongValue(1)))
+			.map(new MapTo<>(new LongValue(1)))
 				.returns(LONG_VALUE_TYPE_INFO)
 				.name("Emit 1")
 			.reduce(new AddLongValue())

http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/validation/InvalidVertexIdsValidator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/validation/InvalidVertexIdsValidator.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/validation/InvalidVertexIdsValidator.java
index 57aa987..b85b6e6 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/validation/InvalidVertexIdsValidator.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/validation/InvalidVertexIdsValidator.java
@@ -48,11 +48,11 @@ public class InvalidVertexIdsValidator<K, VV, EV> extends GraphValidator<K, VV,
 	@Override
 	public boolean validate(Graph<K, VV, EV> graph) throws Exception {
 		DataSet<Tuple1<K>> edgeIds = graph.getEdges()
-				.flatMap(new MapEdgeIds<K, EV>()).distinct();
+				.flatMap(new MapEdgeIds<>()).distinct();
 		DataSet<K> invalidIds = graph.getVertices().coGroup(edgeIds).where(0)
-				.equalTo(0).with(new GroupInvalidIds<K, VV>()).first(1);
+				.equalTo(0).with(new GroupInvalidIds<>()).first(1);
 
-		return invalidIds.map(new KToTupleMap<K>()).count() == 0;
+		return invalidIds.map(new KToTupleMap<>()).count() == 0;
 	}
 
 	private static final class MapEdgeIds<K, EV> implements FlatMapFunction<Edge<K, EV>, Tuple1<K>> {

http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/AsmTestBase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/AsmTestBase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/AsmTestBase.java
index f89d4f5..1afb5da 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/AsmTestBase.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/AsmTestBase.java
@@ -120,7 +120,7 @@ public class AsmTestBase {
 
 		return new RMatGraph<>(env, new JDKRandomGeneratorFactory(), vertexCount, edgeCount)
 			.generate()
-			.run(new org.apache.flink.graph.asm.simple.directed.Simplify<LongValue, NullValue, NullValue>());
+			.run(new org.apache.flink.graph.asm.simple.directed.Simplify<>());
 	}
 
 	/**
@@ -149,6 +149,6 @@ public class AsmTestBase {
 
 		return new RMatGraph<>(env, new JDKRandomGeneratorFactory(), vertexCount, edgeCount)
 			.generate()
-			.run(new org.apache.flink.graph.asm.simple.undirected.Simplify<LongValue, NullValue, NullValue>(false));
+			.run(new org.apache.flink.graph.asm.simple.undirected.Simplify<>(false));
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPairTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPairTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPairTest.java
index 22b47fe..63bf133 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPairTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPairTest.java
@@ -53,7 +53,7 @@ extends AsmTestBase {
 			"(5,3,((null),(1,1,0),(4,2,2)))";
 
 		DataSet<Edge<IntValue, Tuple3<NullValue, Degrees, Degrees>>> degreesPair = directedSimpleGraph
-			.run(new EdgeDegreesPair<IntValue, NullValue, NullValue>());
+			.run(new EdgeDegreesPair<>());
 
 		TestBaseUtils.compareResultAsText(degreesPair.collect(), expectedResult);
 	}
@@ -62,7 +62,7 @@ extends AsmTestBase {
 	public void testWithRMatGraph()
 			throws Exception {
 		DataSet<Edge<LongValue, Tuple3<NullValue, Degrees, Degrees>>> degreesPair = directedRMatGraph(10, 16)
-			.run(new EdgeDegreesPair<LongValue, NullValue, NullValue>());
+			.run(new EdgeDegreesPair<>());
 
 		Checksum checksum = new ChecksumHashCode<Edge<LongValue, Tuple3<NullValue, Degrees, Degrees>>>()
 			.run(degreesPair)

http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegreesTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegreesTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegreesTest.java
index f0d51d2..967cfb2 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegreesTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegreesTest.java
@@ -53,7 +53,7 @@ extends AsmTestBase {
 			"(5,3,((null),(1,1,0)))";
 
 		DataSet<Edge<IntValue, Tuple2<NullValue, Degrees>>> sourceDegrees = directedSimpleGraph
-				.run(new EdgeSourceDegrees<IntValue, NullValue, NullValue>());
+				.run(new EdgeSourceDegrees<>());
 
 		TestBaseUtils.compareResultAsText(sourceDegrees.collect(), expectedResult);
 	}
@@ -62,7 +62,7 @@ extends AsmTestBase {
 	public void testWithRMatGraph()
 			throws Exception {
 		DataSet<Edge<LongValue, Tuple2<NullValue, Degrees>>> sourceDegrees = directedRMatGraph(10, 16)
-			.run(new EdgeSourceDegrees<LongValue, NullValue, NullValue>());
+			.run(new EdgeSourceDegrees<>());
 
 		Checksum checksum = new ChecksumHashCode<Edge<LongValue, Tuple2<NullValue, Degrees>>>()
 			.run(sourceDegrees)

http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegreesTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegreesTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegreesTest.java
index 6d58bb0..abb76c4 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegreesTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegreesTest.java
@@ -53,7 +53,7 @@ extends AsmTestBase {
 			"(5,3,((null),(4,2,2)))";
 
 		DataSet<Edge<IntValue, Tuple2<NullValue, Degrees>>> targetDegrees = directedSimpleGraph
-				.run(new EdgeTargetDegrees<IntValue, NullValue, NullValue>());
+				.run(new EdgeTargetDegrees<>());
 
 		TestBaseUtils.compareResultAsText(targetDegrees.collect(), expectedResult);
 	}
@@ -62,7 +62,7 @@ extends AsmTestBase {
 	public void testWithRMatGraph()
 			throws Exception {
 		DataSet<Edge<LongValue, Tuple2<NullValue, Degrees>>> targetDegrees = directedRMatGraph(10, 16)
-			.run(new EdgeTargetDegrees<LongValue, NullValue, NullValue>());
+			.run(new EdgeTargetDegrees<>());
 
 		Checksum checksum = new ChecksumHashCode<Edge<LongValue, Tuple2<NullValue, Degrees>>>()
 			.run(targetDegrees)

http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegreesTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegreesTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegreesTest.java
index 5214282..91f354f 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegreesTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegreesTest.java
@@ -43,7 +43,7 @@ extends AsmTestBase {
 	public void testWithSimpleDirectedGraph()
 			throws Exception {
 		DataSet<Vertex<IntValue, Degrees>> degrees = directedSimpleGraph
-			.run(new VertexDegrees<IntValue, NullValue, NullValue>());
+			.run(new VertexDegrees<>());
 
 		String expectedResult =
 			"(0,(2,2,0))\n" +
@@ -60,7 +60,7 @@ extends AsmTestBase {
 	public void testWithSimpleUndirectedGraph()
 			throws Exception {
 		DataSet<Vertex<IntValue, Degrees>> degrees = undirectedSimpleGraph
-			.run(new VertexDegrees<IntValue, NullValue, NullValue>());
+			.run(new VertexDegrees<>());
 
 		String expectedResult =
 			"(0,(2,2,2))\n" +
@@ -100,7 +100,7 @@ extends AsmTestBase {
 	public void testWithRMatGraph()
 	throws Exception {
 		DataSet<Vertex<LongValue, Degrees>> degrees = directedRMatGraph(10, 16)
-			.run(new VertexDegrees<LongValue, NullValue, NullValue>());
+			.run(new VertexDegrees<>());
 
 		Checksum checksum = new ChecksumHashCode<Vertex<LongValue, Degrees>>()
 			.run(degrees)

http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeDegreePairTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeDegreePairTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeDegreePairTest.java
index 5f492e4..1cae2e7 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeDegreePairTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeDegreePairTest.java
@@ -59,7 +59,7 @@ extends AsmTestBase {
 			"(5,3,((null),1,4))";
 
 		DataSet<Edge<IntValue, Tuple3<NullValue, LongValue, LongValue>>> degreePairOnSourceId = undirectedSimpleGraph
-			.run(new EdgeDegreePair<IntValue, NullValue, NullValue>());
+			.run(new EdgeDegreePair<>());
 
 		TestBaseUtils.compareResultAsText(degreePairOnSourceId.collect(), expectedResult);
 
@@ -74,7 +74,7 @@ extends AsmTestBase {
 	public void testWithRMatGraph()
 			throws Exception {
 		DataSet<Edge<LongValue, Tuple3<NullValue, LongValue, LongValue>>> degreePairOnSourceId = undirectedRMatGraph(10, 16)
-			.run(new EdgeDegreePair<LongValue, NullValue, NullValue>());
+			.run(new EdgeDegreePair<>());
 
 		Checksum checksumOnSourceId = new ChecksumHashCode<Edge<LongValue, Tuple3<NullValue, LongValue, LongValue>>>()
 			.run(degreePairOnSourceId)

http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeSourceDegreeTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeSourceDegreeTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeSourceDegreeTest.java
index 393220d..2d8b2e3 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeSourceDegreeTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeSourceDegreeTest.java
@@ -59,7 +59,7 @@ extends AsmTestBase {
 			"(5,3,((null),1))";
 
 		DataSet<Edge<IntValue, Tuple2<NullValue, LongValue>>> sourceDegreeOnSourceId = undirectedSimpleGraph
-			.run(new EdgeSourceDegree<IntValue, NullValue, NullValue>());
+			.run(new EdgeSourceDegree<>());
 
 		TestBaseUtils.compareResultAsText(sourceDegreeOnSourceId.collect(), expectedResult);
 
@@ -74,7 +74,7 @@ extends AsmTestBase {
 	public void testWithRMatGraph()
 			throws Exception {
 		DataSet<Edge<LongValue, Tuple2<NullValue, LongValue>>> sourceDegreeOnSourceId = undirectedRMatGraph(10, 16)
-			.run(new EdgeSourceDegree<LongValue, NullValue, NullValue>());
+			.run(new EdgeSourceDegree<>());
 
 		Checksum checksumOnSourceId = new ChecksumHashCode<Edge<LongValue, Tuple2<NullValue, LongValue>>>()
 			.run(sourceDegreeOnSourceId)

http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeTargetDegreeTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeTargetDegreeTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeTargetDegreeTest.java
index 782296a..a7c88a1 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeTargetDegreeTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeTargetDegreeTest.java
@@ -59,7 +59,7 @@ extends AsmTestBase {
 			"(5,3,((null),4))";
 
 		DataSet<Edge<IntValue, Tuple2<NullValue, LongValue>>> targetDegreeOnTargetId = undirectedSimpleGraph
-				.run(new EdgeTargetDegree<IntValue, NullValue, NullValue>());
+				.run(new EdgeTargetDegree<>());
 
 		TestBaseUtils.compareResultAsText(targetDegreeOnTargetId.collect(), expectedResult);
 
@@ -74,7 +74,7 @@ extends AsmTestBase {
 	public void testWithRMatGraph()
 			throws Exception {
 		DataSet<Edge<LongValue, Tuple2<NullValue, LongValue>>> targetDegreeOnTargetId = undirectedRMatGraph(10, 16)
-			.run(new EdgeSourceDegree<LongValue, NullValue, NullValue>());
+			.run(new EdgeSourceDegree<>());
 
 		Checksum checksumOnTargetId = new ChecksumHashCode<Edge<LongValue, Tuple2<NullValue, LongValue>>>()
 			.run(targetDegreeOnTargetId)

http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegreeTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegreeTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegreeTest.java
index 192782d..bc76bff 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegreeTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegreeTest.java
@@ -50,7 +50,7 @@ extends AsmTestBase {
 			"(5,1)";
 
 		DataSet<Vertex<IntValue, LongValue>> degreeOnSourceId = undirectedSimpleGraph
-			.run(new VertexDegree<IntValue, NullValue, NullValue>());
+			.run(new VertexDegree<>());
 
 		TestBaseUtils.compareResultAsText(degreeOnSourceId.collect(), expectedResult);
 
@@ -67,7 +67,7 @@ extends AsmTestBase {
 		long expectedDegree = completeGraphVertexCount - 1;
 
 		DataSet<Vertex<LongValue, LongValue>> degreeOnSourceId = completeGraph
-			.run(new VertexDegree<LongValue, NullValue, NullValue>());
+			.run(new VertexDegree<>());
 
 		for (Vertex<LongValue, LongValue> vertex : degreeOnSourceId.collect()) {
 			assertEquals(expectedDegree, vertex.getValue().getValue());
@@ -109,7 +109,7 @@ extends AsmTestBase {
 	public void testWithRMatGraph()
 			throws Exception {
 		DataSet<Vertex<LongValue, LongValue>> degreeOnSourceId = undirectedRMatGraph(10, 16)
-			.run(new VertexDegree<LongValue, NullValue, NullValue>());
+			.run(new VertexDegree<>());
 
 		Checksum checksumOnSourceId = new ChecksumHashCode<Vertex<LongValue, LongValue>>()
 			.run(degreeOnSourceId)

http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/filter/undirected/MaximumDegreeTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/filter/undirected/MaximumDegreeTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/filter/undirected/MaximumDegreeTest.java
index f03d82c..51e7712 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/filter/undirected/MaximumDegreeTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/filter/undirected/MaximumDegreeTest.java
@@ -24,7 +24,6 @@ import org.apache.flink.graph.asm.dataset.ChecksumHashCode.Checksum;
 import org.apache.flink.graph.library.metric.ChecksumHashCode;
 import org.apache.flink.test.util.TestBaseUtils;
 import org.apache.flink.types.IntValue;
-import org.apache.flink.types.LongValue;
 import org.apache.flink.types.NullValue;
 
 import org.junit.Test;
@@ -41,7 +40,7 @@ extends AsmTestBase {
 	public void testWithSimpleGraph()
 			throws Exception {
 		Graph<IntValue, NullValue, NullValue> graph = undirectedSimpleGraph
-			.run(new MaximumDegree<IntValue, NullValue, NullValue>(3));
+			.run(new MaximumDegree<>(3));
 
 		String expectedVerticesResult =
 			"(0,(null))\n" +
@@ -67,8 +66,8 @@ extends AsmTestBase {
 	public void testWithRMatGraph()
 			throws Exception {
 		Checksum checksum = undirectedRMatGraph(10, 16)
-			.run(new MaximumDegree<LongValue, NullValue, NullValue>(16))
-			.run(new ChecksumHashCode<LongValue, NullValue, NullValue>())
+			.run(new MaximumDegree<>(16))
+			.run(new ChecksumHashCode<>())
 			.execute();
 
 		assertEquals(805, checksum.getCount());

http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/simple/directed/SimplifyTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/simple/directed/SimplifyTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/simple/directed/SimplifyTest.java
index a3aad4b..751d030 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/simple/directed/SimplifyTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/simple/directed/SimplifyTest.java
@@ -70,7 +70,7 @@ public class SimplifyTest {
 			"(1,0,(null))";
 
 		Graph<IntValue, NullValue, NullValue> simpleGraph = graph
-			.run(new Simplify<IntValue, NullValue, NullValue>());
+			.run(new Simplify<>());
 
 		TestBaseUtils.compareResultAsText(simpleGraph.getEdges().collect(), expectedResult);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/simple/undirected/SimplifyTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/simple/undirected/SimplifyTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/simple/undirected/SimplifyTest.java
index 6ff4292..68b4e0c 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/simple/undirected/SimplifyTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/simple/undirected/SimplifyTest.java
@@ -71,7 +71,7 @@ public class SimplifyTest {
 			"(2,0,(null))";
 
 		Graph<IntValue, NullValue, NullValue> simpleGraph = graph
-			.run(new Simplify<IntValue, NullValue, NullValue>(false));
+			.run(new Simplify<>(false));
 
 		TestBaseUtils.compareResultAsText(simpleGraph.getEdges().collect(), expectedResult);
 	}
@@ -84,7 +84,7 @@ public class SimplifyTest {
 			"(1,0,(null))";
 
 		Graph<IntValue, NullValue, NullValue> simpleGraph = graph
-			.run(new Simplify<IntValue, NullValue, NullValue>(true));
+			.run(new Simplify<>(true));
 
 		TestBaseUtils.compareResultAsText(simpleGraph.getEdges().collect(), expectedResult);
 	}


Mime
View raw message