flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [46/50] [abbrv] flink git commit: [FLINK-1201] [gelly] added licence headers; added gelly to addons pom file; fixed checkstyle errors
Date Wed, 11 Feb 2015 10:49:48 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/081de39b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestDegrees.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestDegrees.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestDegrees.java
index f48c975..8694c66 100644
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestDegrees.java
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestDegrees.java
@@ -1,8 +1,28 @@
-package flink.graphs;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.test;
 
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.graph.Graph;
 import org.apache.flink.test.util.JavaProgramTestBase;
+import org.apache.flink.types.NullValue;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
@@ -14,7 +34,7 @@ import java.util.LinkedList;
 @RunWith(Parameterized.class)
 public class TestDegrees extends JavaProgramTestBase {
 
-    private static int NUM_PROGRAMS = 5;
+    private static int NUM_PROGRAMS = 6;
 
     private int curProgId = config.getInteger("ProgramId", -1);
     private String resultPath;
@@ -64,7 +84,7 @@ public class TestDegrees extends JavaProgramTestBase {
 				 */
                     final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 
-                    Graph<Long, Long, Long> graph = Graph.create(TestGraphUtils.getLongLongVertexData(env),
+                    Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
                             TestGraphUtils.getLongLongEdgeData(env), env);
 
                     graph.outDegrees().writeAsCsv(resultPath);
@@ -81,7 +101,7 @@ public class TestDegrees extends JavaProgramTestBase {
 				 */
                     final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 
-                    Graph<Long, Long, Long> graph = Graph.create(TestGraphUtils.getLongLongVertexData(env),
+                    Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
                             TestGraphUtils.getLongLongEdgeDataWithZeroDegree(env), env);
 
                     graph.outDegrees().writeAsCsv(resultPath);
@@ -98,7 +118,7 @@ public class TestDegrees extends JavaProgramTestBase {
 				 */
                     final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 
-                    Graph<Long, Long, Long> graph = Graph.create(TestGraphUtils.getLongLongVertexData(env),
+                    Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
                             TestGraphUtils.getLongLongEdgeData(env), env);
 
                     graph.inDegrees().writeAsCsv(resultPath);
@@ -115,7 +135,7 @@ public class TestDegrees extends JavaProgramTestBase {
 				 */
                     final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 
-                    Graph<Long, Long, Long> graph = Graph.create(TestGraphUtils.getLongLongVertexData(env),
+                    Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
                             TestGraphUtils.getLongLongEdgeDataWithZeroDegree(env), env);
 
                     graph.inDegrees().writeAsCsv(resultPath);
@@ -132,7 +152,7 @@ public class TestDegrees extends JavaProgramTestBase {
 				 */
                     final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 
-                    Graph<Long, Long, Long> graph = Graph.create(TestGraphUtils.getLongLongVertexData(env),
+                    Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
                             TestGraphUtils.getLongLongEdgeData(env), env);
 
                     graph.getDegrees().writeAsCsv(resultPath);
@@ -143,9 +163,25 @@ public class TestDegrees extends JavaProgramTestBase {
                             "4,2\n" +
                             "5,3\n";
                 }
+                case 6: {
+                /*
+				 * Test getDegrees() with disconnected data
+				 */
+                    final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+                    Graph<Long, NullValue, Long> graph =
+                            Graph.fromDataSet(TestGraphUtils.getDisconnectedLongLongEdgeData(env), env);
+
+                    graph.outDegrees().writeAsCsv(resultPath);
+                    env.execute();
+                    return "1,2\n" +
+                            "2,1\n" +
+                            "3,0\n" +
+                            "4,1\n" +
+                            "5,0\n";
+                }
                 default:
                     throw new IllegalArgumentException("Invalid program id");
-
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/flink/blob/081de39b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestFromCollection.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestFromCollection.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestFromCollection.java
index f4977ff..063e58b 100644
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestFromCollection.java
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestFromCollection.java
@@ -1,8 +1,27 @@
-package flink.graphs;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.test;
 
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.graph.Graph;
 import org.apache.flink.test.util.JavaProgramTestBase;
 import org.apache.flink.types.NullValue;
 import org.junit.runner.RunWith;
@@ -65,8 +84,8 @@ public class TestFromCollection extends JavaProgramTestBase {
 					 * Test fromCollection(vertices, edges):
 					 */
                     final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-                    Graph<Long, Long, Long> graph = Graph.fromCollection(env, TestGraphUtils.getLongLongVertices(env),
-                            TestGraphUtils.getLongLongEdges(env));
+                    Graph<Long, Long, Long> graph = Graph.fromCollection(TestGraphUtils.getLongLongVertices(),
+                            TestGraphUtils.getLongLongEdges(), env);
 
                     graph.getEdges().writeAsCsv(resultPath);
                     env.execute();
@@ -83,8 +102,8 @@ public class TestFromCollection extends JavaProgramTestBase {
                      * Test fromCollection(edges) with no initial value for the vertices
                      */
                     final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-                    Graph<Long, NullValue, Long> graph = Graph.fromCollection(env,
-                            TestGraphUtils.getLongLongEdges(env));
+                    Graph<Long, NullValue, Long> graph = Graph.fromCollection(TestGraphUtils.getLongLongEdges(),
+                    		env);
 
                     graph.getVertices().writeAsCsv(resultPath);
                     env.execute();
@@ -100,12 +119,12 @@ public class TestFromCollection extends JavaProgramTestBase {
                      * function that takes the id and doubles it
                      */
                     final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-                    Graph<Long, Long, Long> graph = Graph.fromCollection(env, TestGraphUtils.getLongLongEdges(env),
+                    Graph<Long, Long, Long> graph = Graph.fromCollection(TestGraphUtils.getLongLongEdges(),
                             new MapFunction<Long, Long>() {
                                 public Long map(Long vertexId) {
                                     return vertexId * 2;
                                 }
-                            });
+                            }, env);
 
                     graph.getVertices().writeAsCsv(resultPath);
                     env.execute();
@@ -120,5 +139,4 @@ public class TestFromCollection extends JavaProgramTestBase {
             }
         }
     }
-
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/081de39b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestGraphCreation.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestGraphCreation.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestGraphCreation.java
index 8bbad9f..241a0f5 100644
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestGraphCreation.java
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestGraphCreation.java
@@ -1,4 +1,22 @@
-package flink.graphs;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.test;
 
 import java.io.FileNotFoundException;
 import java.io.IOException;
@@ -9,15 +27,17 @@ import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.test.TestGraphUtils.DummyCustomParameterizedType;
+import org.apache.flink.graph.validation.InvalidVertexIdsValidator;
 import org.apache.flink.test.util.JavaProgramTestBase;
 import org.apache.flink.types.NullValue;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 import org.junit.runners.Parameterized.Parameters;
 
-import flink.graphs.TestGraphUtils.DummyCustomParameterizedType;
-import flink.graphs.validation.InvalidVertexIdsValidator;
-
 @RunWith(Parameterized.class)
 public class TestGraphCreation extends JavaProgramTestBase {
 
@@ -71,7 +91,7 @@ public class TestGraphCreation extends JavaProgramTestBase {
 				 * Test create() with edge dataset and no vertex values
 		         */
 					final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-					Graph<Long, NullValue, Long> graph = Graph.create(TestGraphUtils.getLongLongEdgeData(env), env);
+					Graph<Long, NullValue, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongEdgeData(env), env);
 
 					graph.getVertices().writeAsCsv(resultPath);
 					env.execute();
@@ -86,7 +106,7 @@ public class TestGraphCreation extends JavaProgramTestBase {
 				 * Test create() with edge dataset and a mapper that assigns the id as value
 		         */
 					final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-					Graph<Long, Long, Long> graph = Graph.create(TestGraphUtils.getLongLongEdgeData(env),
+					Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongEdgeData(env),
 							new MapFunction<Long, Long>() {
 								public Long map(Long vertexId) {
 									return vertexId;
@@ -106,7 +126,7 @@ public class TestGraphCreation extends JavaProgramTestBase {
 				 * Test create() with edge dataset and a mapper that assigns a parametrized custom vertex value
 				 */
 					final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-					Graph<Long, DummyCustomParameterizedType<Double>, Long> graph = Graph.create(
+					Graph<Long, DummyCustomParameterizedType<Double>, Long> graph = Graph.fromDataSet(
 							TestGraphUtils.getLongLongEdgeData(env),
 							new MapFunction<Long, DummyCustomParameterizedType<Double>>() {
 
@@ -135,7 +155,7 @@ public class TestGraphCreation extends JavaProgramTestBase {
 					final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 					DataSet<Vertex<Long, Long>> vertices = TestGraphUtils.getLongLongVertexData(env);
 					DataSet<Edge<Long, Long>> edges = TestGraphUtils.getLongLongEdgeData(env);
-					Graph<Long, Long, Long> graph = new Graph<Long, Long, Long>(vertices, edges, env);
+					Graph<Long, Long, Long> graph = Graph.fromDataSet(vertices, edges, env);
 					DataSet<Boolean> result = graph.validate(new InvalidVertexIdsValidator<Long, Long, Long>());
 					result.writeAsText(resultPath);
 					env.execute();
@@ -150,7 +170,7 @@ public class TestGraphCreation extends JavaProgramTestBase {
 					DataSet<Vertex<Long, Long>> vertices = TestGraphUtils.getLongLongInvalidVertexData(env);
 					DataSet<Edge<Long, Long>> edges = TestGraphUtils.getLongLongEdgeData(env);
 
-					Graph<Long, Long, Long> graph = new Graph<Long, Long, Long>(vertices, edges, env);
+					Graph<Long, Long, Long> graph = Graph.fromDataSet(vertices, edges, env);
 					DataSet<Boolean> result = graph.validate(new InvalidVertexIdsValidator<Long, Long, Long>());
 					result.writeAsText(resultPath);
 					env.execute();

http://git-wip-us.apache.org/repos/asf/flink/blob/081de39b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestGraphCreationWithMapper.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestGraphCreationWithMapper.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestGraphCreationWithMapper.java
index 73ecb7d..edf9e26 100644
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestGraphCreationWithMapper.java
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestGraphCreationWithMapper.java
@@ -1,4 +1,22 @@
-package flink.graphs;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.test;
 
 import java.io.FileNotFoundException;
 import java.io.IOException;
@@ -9,13 +27,13 @@ import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.test.TestGraphUtils.DummyCustomType;
 import org.apache.flink.test.util.JavaProgramTestBase;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 import org.junit.runners.Parameterized.Parameters;
 
-import flink.graphs.TestGraphUtils.DummyCustomType;
-
 @RunWith(Parameterized.class)
 public class TestGraphCreationWithMapper extends JavaProgramTestBase {
 
@@ -69,7 +87,7 @@ public class TestGraphCreationWithMapper extends JavaProgramTestBase {
 				 * Test create() with edge dataset and a mapper that assigns a double constant as value
 		         */
 					final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-					Graph<Long, Double, Long> graph = Graph.create(TestGraphUtils.getLongLongEdgeData(env),
+					Graph<Long, Double, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongEdgeData(env),
 							new MapFunction<Long, Double>() {
 								public Double map(Long value) {
 									return 0.1d;
@@ -89,7 +107,7 @@ public class TestGraphCreationWithMapper extends JavaProgramTestBase {
 				 * Test create() with edge dataset and a mapper that assigns a Tuple2 as value
 				 */
 					final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-					Graph<Long, Tuple2<Long, Long>, Long> graph = Graph.create(
+					Graph<Long, Tuple2<Long, Long>, Long> graph = Graph.fromDataSet(
 							TestGraphUtils.getLongLongEdgeData(env), new MapFunction<Long, Tuple2<Long, Long>>() {
 								public Tuple2<Long, Long> map(Long vertexId) {
 									return new Tuple2<Long, Long>(vertexId*2, 42l);
@@ -110,7 +128,7 @@ public class TestGraphCreationWithMapper extends JavaProgramTestBase {
 				 * and a mapper that assigns a double constant as value
 				 */
 					final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-					Graph<String, Double, Long> graph = Graph.create(TestGraphUtils.getStringLongEdgeData(env),
+					Graph<String, Double, Long> graph = Graph.fromDataSet(TestGraphUtils.getStringLongEdgeData(env),
 							new MapFunction<String, Double>() {
 								public Double map(String value) {
 									return 0.1d;
@@ -130,7 +148,7 @@ public class TestGraphCreationWithMapper extends JavaProgramTestBase {
 					 * Test create() with edge dataset and a mapper that assigns a custom vertex value
 					 */
 						final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-						Graph<Long, DummyCustomType, Long> graph = Graph.create(
+						Graph<Long, DummyCustomType, Long> graph = Graph.fromDataSet(
 								TestGraphUtils.getLongLongEdgeData(env), new MapFunction<Long, DummyCustomType>() {
 									public DummyCustomType map(Long vertexId) {
 										return new DummyCustomType(vertexId.intValue()-1, false);

http://git-wip-us.apache.org/repos/asf/flink/blob/081de39b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestGraphMutations.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestGraphMutations.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestGraphMutations.java
index deeab04..b8f1af2 100644
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestGraphMutations.java
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestGraphMutations.java
@@ -1,4 +1,22 @@
-package flink.graphs;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.test;
 
 import java.io.FileNotFoundException;
 import java.io.IOException;
@@ -9,6 +27,9 @@ import java.util.List;
 
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
 import org.apache.flink.test.util.JavaProgramTestBase;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
@@ -68,7 +89,7 @@ public class TestGraphMutations extends JavaProgramTestBase {
 				
 				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 				
-				Graph<Long, Long, Long> graph = Graph.create(TestGraphUtils.getLongLongVertexData(env),
+				Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
 						TestGraphUtils.getLongLongEdgeData(env), env);
 				
 				List<Edge<Long, Long>> edges = new ArrayList<Edge<Long, Long>>();
@@ -94,7 +115,7 @@ public class TestGraphMutations extends JavaProgramTestBase {
 				
 				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 				
-				Graph<Long, Long, Long> graph = Graph.create(TestGraphUtils.getLongLongVertexData(env),
+				Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
 						TestGraphUtils.getLongLongEdgeData(env), env);
 				
 				List<Edge<Long, Long>> edges = new ArrayList<Edge<Long, Long>>();
@@ -120,7 +141,7 @@ public class TestGraphMutations extends JavaProgramTestBase {
 				
 				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 				
-				Graph<Long, Long, Long> graph = Graph.create(TestGraphUtils.getLongLongVertexData(env),
+				Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
 						TestGraphUtils.getLongLongEdgeData(env), env);
 				List<Edge<Long, Long>> edges = new ArrayList<Edge<Long, Long>>();
 				graph = graph.addVertex(new Vertex<Long, Long>(6L, 6L), edges);
@@ -142,7 +163,7 @@ public class TestGraphMutations extends JavaProgramTestBase {
 				
 				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 				
-				Graph<Long, Long, Long> graph = Graph.create(TestGraphUtils.getLongLongVertexData(env),
+				Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
 						TestGraphUtils.getLongLongEdgeData(env), env);
 				graph = graph.removeVertex(new Vertex<Long, Long>(5L, 5L));
 				graph.getEdges().writeAsCsv(resultPath);
@@ -161,7 +182,7 @@ public class TestGraphMutations extends JavaProgramTestBase {
 				
 				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 				
-				Graph<Long, Long, Long> graph = Graph.create(TestGraphUtils.getLongLongVertexData(env),
+				Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
 						TestGraphUtils.getLongLongEdgeData(env), env);
 				graph = graph.removeVertex(new Vertex<Long, Long>(6L, 6L));
 				graph.getEdges().writeAsCsv(resultPath);
@@ -182,7 +203,7 @@ public class TestGraphMutations extends JavaProgramTestBase {
 				
 				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 				
-				Graph<Long, Long, Long> graph = Graph.create(TestGraphUtils.getLongLongVertexData(env),
+				Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
 						TestGraphUtils.getLongLongEdgeData(env), env);
 				graph = graph.addEdge(new Vertex<Long, Long>(6L, 6L), new Vertex<Long, Long>(1L, 1L),
 						61L);
@@ -205,7 +226,7 @@ public class TestGraphMutations extends JavaProgramTestBase {
 				
 				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 				
-				Graph<Long, Long, Long> graph = Graph.create(TestGraphUtils.getLongLongVertexData(env),
+				Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
 						TestGraphUtils.getLongLongEdgeData(env), env);
 				graph = graph.addEdge(new Vertex<Long, Long>(1L, 1L), new Vertex<Long, Long>(2L, 2L),
 						12L);
@@ -228,7 +249,7 @@ public class TestGraphMutations extends JavaProgramTestBase {
 				
 				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 				
-				Graph<Long, Long, Long> graph = Graph.create(TestGraphUtils.getLongLongVertexData(env),
+				Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
 						TestGraphUtils.getLongLongEdgeData(env), env);
 				graph = graph.removeEdge(new Edge<Long, Long>(5L, 1L, 51L));
 				graph.getEdges().writeAsCsv(resultPath);
@@ -249,7 +270,7 @@ public class TestGraphMutations extends JavaProgramTestBase {
 				
 				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 				
-				Graph<Long, Long, Long> graph = Graph.create(TestGraphUtils.getLongLongVertexData(env),
+				Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
 						TestGraphUtils.getLongLongEdgeData(env), env);
 				graph = graph.removeEdge(new Edge<Long, Long>(6L, 1L, 61L));
 				graph.getEdges().writeAsCsv(resultPath);

http://git-wip-us.apache.org/repos/asf/flink/blob/081de39b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestGraphOperations.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestGraphOperations.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestGraphOperations.java
index cb285c0..9482997 100644
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestGraphOperations.java
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestGraphOperations.java
@@ -1,4 +1,22 @@
-package flink.graphs;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.test;
 
 import java.io.FileNotFoundException;
 import java.io.IOException;
@@ -10,6 +28,9 @@ import java.util.List;
 import org.apache.flink.api.common.functions.FilterFunction;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
 import org.apache.flink.test.util.JavaProgramTestBase;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
@@ -69,7 +90,7 @@ public class TestGraphOperations extends JavaProgramTestBase {
 				 */
 					final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 
-					Graph<Long, Long, Long> graph = Graph.create(TestGraphUtils.getLongLongVertexData(env),
+					Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
 							TestGraphUtils.getLongLongEdgeData(env), env);
 
 					graph.getUndirected().getEdges().writeAsCsv(resultPath);
@@ -88,7 +109,7 @@ public class TestGraphOperations extends JavaProgramTestBase {
 				 */
 					final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 
-					Graph<Long, Long, Long> graph = Graph.create(TestGraphUtils.getLongLongVertexData(env),
+					Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
 							TestGraphUtils.getLongLongEdgeData(env), env);
 
 					graph.reverse().getEdges().writeAsCsv(resultPath);
@@ -107,7 +128,7 @@ public class TestGraphOperations extends JavaProgramTestBase {
 				 */
 					final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 
-					Graph<Long, Long, Long> graph = Graph.create(TestGraphUtils.getLongLongVertexData(env),
+					Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
 							TestGraphUtils.getLongLongEdgeData(env), env);
 					graph.subgraph(new FilterFunction<Vertex<Long, Long>>() {
 									   public boolean filter(Vertex<Long, Long> vertex) throws Exception {
@@ -130,7 +151,7 @@ public class TestGraphOperations extends JavaProgramTestBase {
 				 */
 					final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 
-					Graph<Long, Long, Long> graph = Graph.create(TestGraphUtils.getLongLongVertexData(env),
+					Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
 							TestGraphUtils.getLongLongEdgeData(env), env);
 					graph.filterOnVertices(new FilterFunction<Vertex<Long, Long>>() {
 						public boolean filter(Vertex<Long, Long> vertex) throws Exception {
@@ -149,7 +170,7 @@ public class TestGraphOperations extends JavaProgramTestBase {
 				 */
 					final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 
-					Graph<Long, Long, Long> graph = Graph.create(TestGraphUtils.getLongLongVertexData(env),
+					Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
 							TestGraphUtils.getLongLongEdgeData(env), env);
 					graph.filterOnEdges(new FilterFunction<Edge<Long, Long>>() {
 						public boolean filter(Edge<Long, Long> edge) throws Exception {
@@ -168,7 +189,7 @@ public class TestGraphOperations extends JavaProgramTestBase {
 				 */
 					final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 
-					Graph<Long, Long, Long> graph = Graph.create(TestGraphUtils.getLongLongVertexData(env),
+					Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
 							TestGraphUtils.getLongLongEdgeData(env), env);
 					graph.numberOfVertices().writeAsText(resultPath);
 
@@ -181,7 +202,7 @@ public class TestGraphOperations extends JavaProgramTestBase {
 				 */
 					final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 
-					Graph<Long, Long, Long> graph = Graph.create(TestGraphUtils.getLongLongVertexData(env),
+					Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
 							TestGraphUtils.getLongLongEdgeData(env), env);
 					graph.numberOfEdges().writeAsText(resultPath);
 
@@ -194,7 +215,7 @@ public class TestGraphOperations extends JavaProgramTestBase {
 				 */
 					final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 
-					Graph<Long, Long, Long> graph = Graph.create(TestGraphUtils.getLongLongVertexData(env),
+					Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
 							TestGraphUtils.getLongLongEdgeData(env), env);
 					graph.getVertexIds().writeAsText(resultPath);
 
@@ -207,7 +228,7 @@ public class TestGraphOperations extends JavaProgramTestBase {
 				 */
 					final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 
-					Graph<Long, Long, Long> graph = Graph.create(TestGraphUtils.getLongLongVertexData(env),
+					Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
 							TestGraphUtils.getLongLongEdgeData(env), env);
 					graph.getEdgeIds().writeAsCsv(resultPath);
 
@@ -224,7 +245,7 @@ public class TestGraphOperations extends JavaProgramTestBase {
 
 					final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 
-					Graph<Long, Long, Long> graph = Graph.create(TestGraphUtils.getLongLongVertexData(env),
+					Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
 							TestGraphUtils.getLongLongEdgeData(env), env);
 
 					List<Vertex<Long, Long>> vertices = new ArrayList<Vertex<Long, Long>>();
@@ -233,7 +254,7 @@ public class TestGraphOperations extends JavaProgramTestBase {
 					vertices.add(new Vertex<Long, Long>(6L, 6L));
 					edges.add(new Edge<Long, Long>(6L, 1L, 61L));
 
-					graph = graph.union(graph.fromCollection(vertices, edges));
+					graph = graph.union(Graph.fromCollection(vertices, edges, env));
 
 					graph.getEdges().writeAsCsv(resultPath);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/081de39b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestGraphUtils.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestGraphUtils.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestGraphUtils.java
index d5062c5..9f6569b 100644
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestGraphUtils.java
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestGraphUtils.java
@@ -1,4 +1,22 @@
-package flink.graphs;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.test;
 
 import java.io.Serializable;
 import java.util.ArrayList;
@@ -8,6 +26,8 @@ import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Vertex;
 
 public class TestGraphUtils {
 
@@ -93,7 +113,7 @@ public class TestGraphUtils {
 
 	public static final DataSet<Tuple3<Long, Long, Long>> getLongLongLongTuple3Data(
 			ExecutionEnvironment env) {
-		List<Tuple3<Long, Long, Long>> tuples = new ArrayList<>();
+		List<Tuple3<Long, Long, Long>> tuples = new ArrayList<Tuple3<Long, Long, Long>>();
 		tuples.add(new Tuple3<Long, Long, Long>(1L, 2L, 12L));
 		tuples.add(new Tuple3<Long, Long, Long>(1L, 3L, 13L));
 		tuples.add(new Tuple3<Long, Long, Long>(2L, 3L, 23L));
@@ -154,7 +174,8 @@ public class TestGraphUtils {
 
 	public static final DataSet<Tuple3<Long, Long, DummyCustomParameterizedType<Float>>> getLongLongCustomTuple3Data(
 			ExecutionEnvironment env) {
-		List<Tuple3<Long, Long, DummyCustomParameterizedType<Float>>> tuples = new ArrayList<>();
+		List<Tuple3<Long, Long, DummyCustomParameterizedType<Float>>> tuples = 
+				new ArrayList<Tuple3<Long, Long, DummyCustomParameterizedType<Float>>>();
 		tuples.add(new Tuple3<Long, Long, DummyCustomParameterizedType<Float>>(1L, 2L,
 				new DummyCustomParameterizedType<Float>(10, 10f)));
 		tuples.add(new Tuple3<Long, Long, DummyCustomParameterizedType<Float>>(1L, 3L,
@@ -201,9 +222,8 @@ public class TestGraphUtils {
 	/**
 	 * Function that produces an ArrayList of vertices
 	 */
-	public static final List<Vertex<Long, Long>> getLongLongVertices(
-			ExecutionEnvironment env) {
-		List<Vertex<Long, Long>> vertices = new ArrayList<>();
+	public static final List<Vertex<Long, Long>> getLongLongVertices() {
+		List<Vertex<Long, Long>> vertices = new ArrayList<Vertex<Long, Long>>();
 		vertices.add(new Vertex<Long, Long>(1L, 1L));
 		vertices.add(new Vertex<Long, Long>(2L, 2L));
 		vertices.add(new Vertex<Long, Long>(3L, 3L));
@@ -227,8 +247,7 @@ public class TestGraphUtils {
 	/**
 	 * Function that produces an ArrayList of edges
 	 */
-	public static final List<Edge<Long, Long>> getLongLongEdges(
-			ExecutionEnvironment env) {
+	public static final List<Edge<Long, Long>> getLongLongEdges() {
 		List<Edge<Long, Long>> edges = new ArrayList<Edge<Long, Long>>();
 		edges.add(new Edge<Long, Long>(1L, 2L, 12L));
 		edges.add(new Edge<Long, Long>(1L, 3L, 13L));

http://git-wip-us.apache.org/repos/asf/flink/blob/081de39b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestJoinWithEdges.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestJoinWithEdges.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestJoinWithEdges.java
index 7375d0c..87f137f 100644
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestJoinWithEdges.java
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestJoinWithEdges.java
@@ -1,10 +1,30 @@
-package flink.graphs;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.test;
 
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
 import org.apache.flink.test.util.JavaProgramTestBase;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
@@ -68,7 +88,7 @@ public class TestJoinWithEdges extends JavaProgramTestBase {
 				 */
                     final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 
-                    Graph<Long, Long, Long> graph = Graph.create(TestGraphUtils.getLongLongVertexData(env),
+                    Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
                             TestGraphUtils.getLongLongEdgeData(env), env);
 
                     Graph<Long, Long, Long> result = graph.joinWithEdges(graph.getEdges()
@@ -105,7 +125,7 @@ public class TestJoinWithEdges extends JavaProgramTestBase {
 				 */
                     final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 
-                    Graph<Long, Long, Long> graph = Graph.create(TestGraphUtils.getLongLongVertexData(env),
+                    Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
                             TestGraphUtils.getLongLongEdgeData(env), env);
 
                     Graph<Long, Long, Long> result = graph.joinWithEdges(graph.getEdges().first(3)
@@ -142,7 +162,7 @@ public class TestJoinWithEdges extends JavaProgramTestBase {
 				 */
                     final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 
-                    Graph<Long, Long, Long> graph = Graph.create(TestGraphUtils.getLongLongVertexData(env),
+                    Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
                             TestGraphUtils.getLongLongEdgeData(env), env);
 
                     Graph<Long, Long, Long> result = graph.joinWithEdges(graph.getEdges().first(3)
@@ -184,7 +204,7 @@ public class TestJoinWithEdges extends JavaProgramTestBase {
 				 */
                     final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 
-                    Graph<Long, Long, Long> graph = Graph.create(TestGraphUtils.getLongLongVertexData(env),
+                    Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
                             TestGraphUtils.getLongLongEdgeData(env), env);
 
                     Graph<Long, Long, Long> result = graph.joinWithEdges(TestGraphUtils.getLongLongLongTuple3Data(env),
@@ -213,7 +233,7 @@ public class TestJoinWithEdges extends JavaProgramTestBase {
     			 */
                     final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 
-                    Graph<Long, Long, Long> graph = Graph.create(TestGraphUtils.getLongLongVertexData(env),
+                    Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
                             TestGraphUtils.getLongLongEdgeData(env), env);
 
                     Graph<Long, Long, Long> result = graph.joinWithEdges(TestGraphUtils.getLongLongCustomTuple3Data(env),
@@ -241,7 +261,7 @@ public class TestJoinWithEdges extends JavaProgramTestBase {
 				 */
                     final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 
-                    Graph<Long, Long, Long> graph = Graph.create(TestGraphUtils.getLongLongVertexData(env),
+                    Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
                             TestGraphUtils.getLongLongEdgeData(env), env);
 
                     Graph<Long, Long, Long> result = graph.joinWithEdgesOnSource(graph.getEdges()
@@ -277,7 +297,7 @@ public class TestJoinWithEdges extends JavaProgramTestBase {
 				 */
                     final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 
-                    Graph<Long, Long, Long> graph = Graph.create(TestGraphUtils.getLongLongVertexData(env),
+                    Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
                             TestGraphUtils.getLongLongEdgeData(env), env);
 
                     Graph<Long, Long, Long> result = graph.joinWithEdgesOnSource(graph.getEdges().first(3)
@@ -313,7 +333,7 @@ public class TestJoinWithEdges extends JavaProgramTestBase {
 				 */
                     final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 
-                    Graph<Long, Long, Long> graph = Graph.create(TestGraphUtils.getLongLongVertexData(env),
+                    Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
                             TestGraphUtils.getLongLongEdgeData(env), env);
 
                     Graph<Long, Long, Long> result = graph.joinWithEdgesOnSource(graph.getEdges().first(3)
@@ -353,7 +373,7 @@ public class TestJoinWithEdges extends JavaProgramTestBase {
 				 */
                     final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 
-                    Graph<Long, Long, Long> graph = Graph.create(TestGraphUtils.getLongLongVertexData(env),
+                    Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
                             TestGraphUtils.getLongLongEdgeData(env), env);
 
                     Graph<Long, Long, Long> result = graph.joinWithEdgesOnSource(TestGraphUtils.getLongLongTuple2SourceData(env),
@@ -382,7 +402,7 @@ public class TestJoinWithEdges extends JavaProgramTestBase {
     			 */
                     final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 
-                    Graph<Long, Long, Long> graph = Graph.create(TestGraphUtils.getLongLongVertexData(env),
+                    Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
                             TestGraphUtils.getLongLongEdgeData(env), env);
 
                     Graph<Long, Long, Long> result = graph.joinWithEdgesOnSource(TestGraphUtils.getLongCustomTuple2SourceData(env),
@@ -410,7 +430,7 @@ public class TestJoinWithEdges extends JavaProgramTestBase {
 				 */
                     final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 
-                    Graph<Long, Long, Long> graph = Graph.create(TestGraphUtils.getLongLongVertexData(env),
+                    Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
                             TestGraphUtils.getLongLongEdgeData(env), env);
 
                     Graph<Long, Long, Long> result = graph.joinWithEdgesOnTarget(graph.getEdges()
@@ -446,7 +466,7 @@ public class TestJoinWithEdges extends JavaProgramTestBase {
 				 */
                     final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 
-                    Graph<Long, Long, Long> graph = Graph.create(TestGraphUtils.getLongLongVertexData(env),
+                    Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
                             TestGraphUtils.getLongLongEdgeData(env), env);
 
                     Graph<Long, Long, Long> result = graph.joinWithEdgesOnTarget(graph.getEdges().first(3)
@@ -485,7 +505,7 @@ public class TestJoinWithEdges extends JavaProgramTestBase {
 				 */
                     final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 
-                    Graph<Long, Long, Long> graph = Graph.create(TestGraphUtils.getLongLongVertexData(env),
+                    Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
                             TestGraphUtils.getLongLongEdgeData(env), env);
 
                     Graph<Long, Long, Long> result = graph.joinWithEdgesOnTarget(graph.getEdges().first(3)
@@ -525,7 +545,7 @@ public class TestJoinWithEdges extends JavaProgramTestBase {
 				 */
                     final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 
-                    Graph<Long, Long, Long> graph = Graph.create(TestGraphUtils.getLongLongVertexData(env),
+                    Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
                             TestGraphUtils.getLongLongEdgeData(env), env);
 
                     Graph<Long, Long, Long> result = graph.joinWithEdgesOnTarget(TestGraphUtils.getLongLongTuple2TargetData(env),
@@ -554,7 +574,7 @@ public class TestJoinWithEdges extends JavaProgramTestBase {
     			 */
                     final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 
-                    Graph<Long, Long, Long> graph = Graph.create(TestGraphUtils.getLongLongVertexData(env),
+                    Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
                             TestGraphUtils.getLongLongEdgeData(env), env);
 
                     Graph<Long, Long, Long> result = graph.joinWithEdgesOnTarget(TestGraphUtils.getLongCustomTuple2TargetData(env),

http://git-wip-us.apache.org/repos/asf/flink/blob/081de39b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestJoinWithVertices.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestJoinWithVertices.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestJoinWithVertices.java
index d2b7ba6..f10140b 100644
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestJoinWithVertices.java
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestJoinWithVertices.java
@@ -1,15 +1,34 @@
-package flink.graphs;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.test;
 
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.test.TestGraphUtils.DummyCustomParameterizedType;
 import org.apache.flink.test.util.JavaProgramTestBase;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
-import flink.graphs.TestGraphUtils.DummyCustomParameterizedType;
-
 import java.io.IOException;
 import java.util.Collection;
 import java.util.LinkedList;
@@ -69,7 +88,7 @@ public class TestJoinWithVertices extends JavaProgramTestBase {
 				 */
                     final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 
-                    Graph<Long, Long, Long> graph = Graph.create(TestGraphUtils.getLongLongVertexData(env),
+                    Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
                             TestGraphUtils.getLongLongEdgeData(env), env);
 
                     Graph<Long, Long, Long> result = graph.joinWithVertices(graph.getVertices()
@@ -103,7 +122,7 @@ public class TestJoinWithVertices extends JavaProgramTestBase {
 				 */
                     final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 
-                    Graph<Long, Long, Long> graph = Graph.create(TestGraphUtils.getLongLongVertexData(env),
+                    Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
                             TestGraphUtils.getLongLongEdgeData(env), env);
 
                     Graph<Long, Long, Long> result = graph.joinWithVertices(graph.getVertices().first(3)
@@ -137,7 +156,7 @@ public class TestJoinWithVertices extends JavaProgramTestBase {
 				 */
                     final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 
-                    Graph<Long, Long, Long> graph = Graph.create(TestGraphUtils.getLongLongVertexData(env),
+                    Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
                             TestGraphUtils.getLongLongEdgeData(env), env);
 
                     Graph<Long, Long, Long> result = graph.joinWithVertices(graph.getVertices().first(3)
@@ -176,7 +195,7 @@ public class TestJoinWithVertices extends JavaProgramTestBase {
     				 */
                     final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 
-                    Graph<Long, Long, Long> graph = Graph.create(TestGraphUtils.getLongLongVertexData(env),
+                    Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
                             TestGraphUtils.getLongLongEdgeData(env), env);
 
                     Graph<Long, Long, Long> result = graph.joinWithVertices(TestGraphUtils.getLongLongTuple2Data(env),
@@ -201,7 +220,7 @@ public class TestJoinWithVertices extends JavaProgramTestBase {
     				 */
                     final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 
-                    Graph<Long, Long, Long> graph = Graph.create(TestGraphUtils.getLongLongVertexData(env),
+                    Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
                             TestGraphUtils.getLongLongEdgeData(env), env);
 
                     Graph<Long, Long, Long> result = graph.joinWithVertices(TestGraphUtils.getLongCustomTuple2Data(env),

http://git-wip-us.apache.org/repos/asf/flink/blob/081de39b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestMapEdges.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestMapEdges.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestMapEdges.java
index 7a3a695..bcba7e7 100644
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestMapEdges.java
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestMapEdges.java
@@ -1,4 +1,22 @@
-package flink.graphs;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.test;
 
 import java.io.FileNotFoundException;
 import java.io.IOException;
@@ -10,14 +28,15 @@ import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.test.TestGraphUtils.DummyCustomParameterizedType;
+import org.apache.flink.graph.test.TestGraphUtils.DummyCustomType;
 import org.apache.flink.test.util.JavaProgramTestBase;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 import org.junit.runners.Parameterized.Parameters;
 
-import flink.graphs.TestGraphUtils.DummyCustomParameterizedType;
-import flink.graphs.TestGraphUtils.DummyCustomType;
-
 @RunWith(Parameterized.class)
 public class TestMapEdges extends JavaProgramTestBase {
 
@@ -72,8 +91,8 @@ public class TestMapEdges extends JavaProgramTestBase {
 				 */
 				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 				
-				Graph<Long, Long, Long> graph = Graph.create(TestGraphUtils.getLongLongVertexData(env),
-						TestGraphUtils.getLongLongEdgeData(env));
+				Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+						TestGraphUtils.getLongLongEdgeData(env), env);
 				
 				DataSet<Edge<Long, Long>> mappedEdges = graph.mapEdges(new MapFunction<Edge<Long, Long>, Long>() {
 					public Long map(Edge<Long, Long> edge) throws Exception {
@@ -97,8 +116,8 @@ public class TestMapEdges extends JavaProgramTestBase {
 				 */
 				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 				
-				Graph<Long, Long, Long> graph = Graph.create(TestGraphUtils.getLongLongVertexData(env),
-						TestGraphUtils.getLongLongEdgeData(env));
+				Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+						TestGraphUtils.getLongLongEdgeData(env), env);
 				
 				DataSet<Edge<Long, String>> mappedEdges = graph.mapEdges(new MapFunction<Edge<Long, Long>, String>() {
 					public String map(Edge<Long, Long> edge) throws Exception {
@@ -122,8 +141,8 @@ public class TestMapEdges extends JavaProgramTestBase {
 				 */
 				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 				
-				Graph<Long, Long, Long> graph = Graph.create(TestGraphUtils.getLongLongVertexData(env),
-						TestGraphUtils.getLongLongEdgeData(env));
+				Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+						TestGraphUtils.getLongLongEdgeData(env), env);
 				
 				DataSet<Edge<Long, Tuple1<Long>>> mappedEdges = graph.mapEdges(new MapFunction<Edge<Long, Long>, 
 						Tuple1<Long>>() {
@@ -150,8 +169,8 @@ public class TestMapEdges extends JavaProgramTestBase {
 				 */
 				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 				
-				Graph<Long, Long, Long> graph = Graph.create(TestGraphUtils.getLongLongVertexData(env),
-						TestGraphUtils.getLongLongEdgeData(env));
+				Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+						TestGraphUtils.getLongLongEdgeData(env), env);
 				
 				DataSet<Edge<Long, DummyCustomType>> mappedEdges = graph.mapEdges(new MapFunction<Edge<Long, Long>, 
 						DummyCustomType>() {
@@ -178,8 +197,8 @@ public class TestMapEdges extends JavaProgramTestBase {
 				 */
 				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 				
-				Graph<Long, Long, Long> graph = Graph.create(TestGraphUtils.getLongLongVertexData(env),
-						TestGraphUtils.getLongLongEdgeData(env));
+				Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+						TestGraphUtils.getLongLongEdgeData(env), env);
 				
 				DataSet<Edge<Long, DummyCustomParameterizedType<Double>>> mappedEdges = graph.mapEdges(
 						new MapFunction<Edge<Long, Long>, DummyCustomParameterizedType<Double>>() {

http://git-wip-us.apache.org/repos/asf/flink/blob/081de39b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestMapVertices.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestMapVertices.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestMapVertices.java
index 896d73d..31bd48b 100644
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestMapVertices.java
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestMapVertices.java
@@ -1,4 +1,22 @@
-package flink.graphs;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.test;
 
 import java.io.FileNotFoundException;
 import java.io.IOException;
@@ -10,14 +28,15 @@ import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.test.TestGraphUtils.DummyCustomParameterizedType;
+import org.apache.flink.graph.test.TestGraphUtils.DummyCustomType;
 import org.apache.flink.test.util.JavaProgramTestBase;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 import org.junit.runners.Parameterized.Parameters;
 
-import flink.graphs.TestGraphUtils.DummyCustomParameterizedType;
-import flink.graphs.TestGraphUtils.DummyCustomType;
-
 @RunWith(Parameterized.class)
 public class TestMapVertices extends JavaProgramTestBase {
 
@@ -72,8 +91,8 @@ public class TestMapVertices extends JavaProgramTestBase {
 				 */
 				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 				
-				Graph<Long, Long, Long> graph = Graph.create(TestGraphUtils.getLongLongVertexData(env),
-						TestGraphUtils.getLongLongEdgeData(env));
+				Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+						TestGraphUtils.getLongLongEdgeData(env), env);
 				
 				DataSet<Vertex<Long, Long>> mappedVertices = graph.mapVertices(new MapFunction<Vertex<Long, Long>, Long>() {
 					public Long map(Vertex<Long, Long> value) throws Exception {
@@ -95,8 +114,8 @@ public class TestMapVertices extends JavaProgramTestBase {
 				 */
 				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 				
-				Graph<Long, Long, Long> graph = Graph.create(TestGraphUtils.getLongLongVertexData(env),
-						TestGraphUtils.getLongLongEdgeData(env));
+				Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+						TestGraphUtils.getLongLongEdgeData(env), env);
 				
 				DataSet<Vertex<Long, String>> mappedVertices = graph.mapVertices(new MapFunction<Vertex<Long, Long>, String>() {
 					public String map(Vertex<Long, Long> vertex) throws Exception {
@@ -137,8 +156,8 @@ public class TestMapVertices extends JavaProgramTestBase {
 				 */
 				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 				
-				Graph<Long, Long, Long> graph = Graph.create(TestGraphUtils.getLongLongVertexData(env),
-						TestGraphUtils.getLongLongEdgeData(env));
+				Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+						TestGraphUtils.getLongLongEdgeData(env), env);
 				
 				DataSet<Vertex<Long, Tuple1<Long>>> mappedVertices = graph.mapVertices(new MapFunction<Vertex<Long, Long>, Tuple1<Long>>() {
 					public Tuple1<Long> map(Vertex<Long, Long> vertex) throws Exception {
@@ -162,8 +181,8 @@ public class TestMapVertices extends JavaProgramTestBase {
 				 */
 				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 				
-				Graph<Long, Long, Long> graph = Graph.create(TestGraphUtils.getLongLongVertexData(env),
-						TestGraphUtils.getLongLongEdgeData(env));
+				Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+						TestGraphUtils.getLongLongEdgeData(env), env);
 				
 				DataSet<Vertex<Long, DummyCustomType>> mappedVertices = graph.mapVertices(new MapFunction<Vertex<Long, Long>, DummyCustomType>() {
 					public DummyCustomType map(Vertex<Long, Long> vertex) throws Exception {
@@ -187,8 +206,8 @@ public class TestMapVertices extends JavaProgramTestBase {
 				 */
 				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 				
-				Graph<Long, Long, Long> graph = Graph.create(TestGraphUtils.getLongLongVertexData(env),
-						TestGraphUtils.getLongLongEdgeData(env));
+				Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+						TestGraphUtils.getLongLongEdgeData(env), env);
 				
 				DataSet<Vertex<Long, DummyCustomParameterizedType<Double>>> mappedVertices = graph.mapVertices(
 						new MapFunction<Vertex<Long, Long>, DummyCustomParameterizedType<Double>>() {

http://git-wip-us.apache.org/repos/asf/flink/blob/081de39b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestPGA.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestPGA.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestPGA.java
deleted file mode 100644
index 4d68604..0000000
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestPGA.java
+++ /dev/null
@@ -1,122 +0,0 @@
-//package flink.graphs;
-//
-//import java.util.ArrayList;
-//import java.util.List;
-//
-//import org.apache.flink.api.common.functions.CoGroupFunction;
-//import org.apache.flink.api.common.functions.FlatJoinFunction;
-//import org.apache.flink.api.common.functions.GroupReduceFunction;
-//import org.apache.flink.api.java.DataSet;
-//import org.apache.flink.api.java.ExecutionEnvironment;
-//import org.apache.flink.api.java.tuple.Tuple2;
-//import org.apache.flink.api.java.tuple.Tuple3;
-//import org.apache.flink.util.Collector;
-//import org.junit.Before;
-//import org.junit.Test;
-//
-//public class TestPGA {
-//
-//    // Assume existing graph object
-//    // Tuple2 ids and values: 0,1,2,3
-//    // Edges: 0->1, 1->3, 0->3, 1->2
-//
-//    static Graph<Integer, Integer, Integer> graph;
-//    static ExecutionEnvironment env;
-//
-//    @Before
-//    public void testSetUp() {
-//        env = ExecutionEnvironment.getExecutionEnvironment();
-//        setUpGraph();
-//    }
-//
-//
-//    public static void setUpGraph() {
-//
-//        List<Tuple2<Integer, Integer>> Tuple2List = new ArrayList<Tuple2<Integer, Integer>>();
-//
-//        for (int i = 0; i < 4; i++) {
-//            Tuple2<Integer, Integer> v = new Tuple2<Integer, Integer>(i, i);
-//            Tuple2List.add(v);
-//        }
-//
-//
-//        List<Tuple3<Integer, Integer, Integer>> edgeList = new ArrayList<>();
-//
-//        edgeList.add(new Tuple3<Integer, Integer, Integer>(0, 1, 0));
-//        edgeList.add(new Tuple3<Integer, Integer, Integer>(1, 3, 0));
-//        edgeList.add(new Tuple3<Integer, Integer, Integer>(0, 3, 0));
-//        edgeList.add(new Tuple3<Integer, Integer, Integer>(1, 2, 0));
-//
-//        DataSet<Tuple2<Integer, Integer>> vertices = env.fromCollection(Tuple2List);
-//        DataSet<Tuple3<Integer, Integer, Integer>> edges = env.fromCollection(edgeList);
-//
-//        graph = new Graph<Integer, Integer, Integer>(vertices, edges, env);
-//    }
-//    @SuppressWarnings("serial")
-//	@Test
-//    public void testPga() throws Exception {
-//        // Test pga by running connected components
-//        // Expected output is that all vertices end up with the same attribute, 0
-//
-//        // Send the vertex attribute to all neighbors
-//        CoGroupFunction<Tuple2<Integer, Integer>, Tuple3<Integer, Integer, Integer>, Tuple2<Integer, Integer>>
-//                sendAttribute =
-//                new CoGroupFunction<Tuple2<Integer, Integer>, Tuple3<Integer, Integer, Integer>, Tuple2<Integer, Integer>>() {
-//            @Override
-//            public void coGroup(Iterable<Tuple2<Integer, Integer>> vertices,
-//                                Iterable<Tuple3<Integer, Integer, Integer>> edges,
-//                                Collector<Tuple2<Integer, Integer>> tuple2Collector) throws Exception {
-//                for (Tuple2<Integer, Integer> vertex : vertices) {
-//                    for (Tuple3<Integer, Integer, Integer> edge: edges) {
-//                        tuple2Collector.collect(new Tuple2<Integer, Integer>(edge.f1, vertex.f1));
-//                    }
-//                }
-//            }
-//        };
-//
-//        // Gather all messages and keep the message with the smallest attribute
-//        GroupReduceFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>>
-//                gatherAttributes =
-//                new GroupReduceFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>>() {
-//                    @Override
-//                    public void reduce(Iterable<Tuple2<Integer, Integer>> messages,
-//                                       Collector<Tuple2<Integer, Integer>> msgCollector) throws Exception {
-//
-//                        Tuple2<Integer, Integer> minTuple = new Tuple2<Integer, Integer>(Integer.MAX_VALUE, Integer.MAX_VALUE);
-//                        for (Tuple2<Integer, Integer> message : messages) {
-//                            if (message.f1 < minTuple.f1) {
-//                                minTuple = message.copy();
-//                            }
-//                        }
-//                        msgCollector.collect(minTuple);
-//                    }
-//                };
-//
-//        // Check if the produced message is smaller than the current vertex attribute, if yes change attribute
-//        FlatJoinFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Tuple2<Integer, Integer>>
-//                apply =
-//                new FlatJoinFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Tuple2<Integer, Integer>>() {
-//                    @Override
-//                    public void join(Tuple2<Integer, Integer> msg,
-//                                     Tuple2<Integer, Integer> vertex,
-//                                     Collector<Tuple2<Integer, Integer>> vertexCollector) throws Exception {
-//                        if (msg.f1 < vertex.f1) {
-//                            vertexCollector.collect(msg.copy());
-//                        }
-//                    }
-//                };
-//
-//
-//        // Run the pga iterations
-//        Graph<Integer, Integer, Integer> connected = graph.pga(sendAttribute, gatherAttributes, apply, 100);
-//
-//        DataSet<Tuple2<Integer, Integer>> conVerts = connected.getVertices();
-//
-//        // All vertices should end up with attribute 0
-//        conVerts.print();
-//        //TODO(thvasilo): Automate correctness testing
-//
-//        env.execute();
-//
-//    }
-//}

http://git-wip-us.apache.org/repos/asf/flink/blob/081de39b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestReduceOnEdgesMethods.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestReduceOnEdgesMethods.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestReduceOnEdgesMethods.java
index 1081bd4..89ef7c1 100644
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestReduceOnEdgesMethods.java
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestReduceOnEdgesMethods.java
@@ -1,4 +1,22 @@
-package flink.graphs;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.test;
 
 import java.io.FileNotFoundException;
 import java.io.IOException;
@@ -9,6 +27,12 @@ import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.EdgeDirection;
+import org.apache.flink.graph.EdgesFunction;
+import org.apache.flink.graph.EdgesFunctionWithVertexValue;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
 import org.apache.flink.test.util.JavaProgramTestBase;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
@@ -68,7 +92,7 @@ public class TestReduceOnEdgesMethods extends JavaProgramTestBase {
 				 * for each vertex
 		         */
 				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-				Graph<Long, Long, Long> graph = Graph.create(TestGraphUtils.getLongLongVertexData(env), 
+				Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), 
 						TestGraphUtils.getLongLongEdgeData(env), env);
 
 				DataSet<Tuple2<Long, Long>> verticesWithLowestOutNeighbor = 
@@ -104,7 +128,7 @@ public class TestReduceOnEdgesMethods extends JavaProgramTestBase {
 				 * for each vertex
 		         */
 				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-				Graph<Long, Long, Long> graph = Graph.create(TestGraphUtils.getLongLongVertexData(env), 
+				Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), 
 						TestGraphUtils.getLongLongEdgeData(env), env);
 
 				DataSet<Tuple2<Long, Long>> verticesWithLowestOutNeighbor = 
@@ -140,7 +164,7 @@ public class TestReduceOnEdgesMethods extends JavaProgramTestBase {
 				 * of a vertex
 		         */
 				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-				Graph<Long, Long, Long> graph = Graph.create(TestGraphUtils.getLongLongVertexData(env), 
+				Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), 
 						TestGraphUtils.getLongLongEdgeData(env), env);
 
 				DataSet<Tuple2<Long, Long>> verticesWithMaxEdgeWeight = 
@@ -173,7 +197,7 @@ public class TestReduceOnEdgesMethods extends JavaProgramTestBase {
 				 * for each vertex
 		         */
 				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-				Graph<Long, Long, Long> graph = Graph.create(TestGraphUtils.getLongLongVertexData(env), 
+				Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), 
 						TestGraphUtils.getLongLongEdgeData(env), env);
 
 				DataSet<Tuple2<Long, Long>> verticesWithLowestOutNeighbor = 
@@ -212,7 +236,7 @@ public class TestReduceOnEdgesMethods extends JavaProgramTestBase {
 				 * for each vertex
 		         */
 				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-				Graph<Long, Long, Long> graph = Graph.create(TestGraphUtils.getLongLongVertexData(env), 
+				Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), 
 						TestGraphUtils.getLongLongEdgeData(env), env);
 
 				DataSet<Tuple2<Long, Long>> verticesWithLowestOutNeighbor = 
@@ -251,7 +275,7 @@ public class TestReduceOnEdgesMethods extends JavaProgramTestBase {
 				 * of a vertex
 		         */
 				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-				Graph<Long, Long, Long> graph = Graph.create(TestGraphUtils.getLongLongVertexData(env), 
+				Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), 
 						TestGraphUtils.getLongLongEdgeData(env), env);
 
 				DataSet<Tuple2<Long, Long>> verticesWithMaxEdgeWeight = 

http://git-wip-us.apache.org/repos/asf/flink/blob/081de39b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestReduceOnNeighborMethods.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestReduceOnNeighborMethods.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestReduceOnNeighborMethods.java
index fec9b12..2624960 100644
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestReduceOnNeighborMethods.java
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestReduceOnNeighborMethods.java
@@ -1,4 +1,22 @@
-package flink.graphs;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.test;
 
 import java.io.FileNotFoundException;
 import java.io.IOException;
@@ -11,6 +29,12 @@ import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.EdgeDirection;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.NeighborsFunction;
+import org.apache.flink.graph.NeighborsFunctionWithVertexValue;
+import org.apache.flink.graph.Vertex;
 import org.apache.flink.test.util.JavaProgramTestBase;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
@@ -70,7 +94,7 @@ public class TestReduceOnNeighborMethods extends JavaProgramTestBase {
 				 * for each vertex
 		         */
 				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-				Graph<Long, Long, Long> graph = Graph.create(TestGraphUtils.getLongLongVertexData(env), 
+				Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), 
 						TestGraphUtils.getLongLongEdgeData(env), env);
 
 				DataSet<Tuple2<Long, Long>> verticesWithSumOfOutNeighborValues = 
@@ -100,7 +124,7 @@ public class TestReduceOnNeighborMethods extends JavaProgramTestBase {
 				 * times the edge weights for each vertex
 		         */
 				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-				Graph<Long, Long, Long> graph = Graph.create(TestGraphUtils.getLongLongVertexData(env), 
+				Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), 
 						TestGraphUtils.getLongLongEdgeData(env), env);
 
 				DataSet<Tuple2<Long, Long>> verticesWithSum = 
@@ -131,7 +155,7 @@ public class TestReduceOnNeighborMethods extends JavaProgramTestBase {
 				 * for each vertex
 		         */
 				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-				Graph<Long, Long, Long> graph = Graph.create(TestGraphUtils.getLongLongVertexData(env), 
+				Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), 
 						TestGraphUtils.getLongLongEdgeData(env), env);
 
 				DataSet<Tuple2<Long, Long>> verticesWithSumOfOutNeighborValues = 
@@ -161,7 +185,7 @@ public class TestReduceOnNeighborMethods extends JavaProgramTestBase {
 				 * for each vertex
 		         */
 				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-				Graph<Long, Long, Long> graph = Graph.create(TestGraphUtils.getLongLongVertexData(env), 
+				Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), 
 						TestGraphUtils.getLongLongEdgeData(env), env);
 
 				DataSet<Tuple2<Long, Long>> verticesWithSumOfOutNeighborValues = 
@@ -195,7 +219,7 @@ public class TestReduceOnNeighborMethods extends JavaProgramTestBase {
 				 * times the edge weights for each vertex
 		         */
 				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-				Graph<Long, Long, Long> graph = Graph.create(TestGraphUtils.getLongLongVertexData(env), 
+				Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), 
 						TestGraphUtils.getLongLongEdgeData(env), env);
 
 				DataSet<Tuple2<Long, Long>> verticesWithSum = 
@@ -230,7 +254,7 @@ public class TestReduceOnNeighborMethods extends JavaProgramTestBase {
 				 * for each vertex
 		         */
 				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-				Graph<Long, Long, Long> graph = Graph.create(TestGraphUtils.getLongLongVertexData(env), 
+				Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), 
 						TestGraphUtils.getLongLongEdgeData(env), env);
 
 				DataSet<Tuple2<Long, Long>> verticesWithSumOfOutNeighborValues = 

http://git-wip-us.apache.org/repos/asf/flink/blob/081de39b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestVertexCentricConnectedComponents.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestVertexCentricConnectedComponents.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestVertexCentricConnectedComponents.java
index 30ac49b..614ddd5 100644
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestVertexCentricConnectedComponents.java
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestVertexCentricConnectedComponents.java
@@ -17,7 +17,7 @@
  */
 
 
-package flink.graphs;
+package org.apache.flink.graph.test;
 
 import java.io.BufferedReader;
 
@@ -25,14 +25,16 @@ import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.spargel.MessageIterator;
+import org.apache.flink.graph.spargel.MessagingFunction;
+import org.apache.flink.graph.spargel.VertexUpdateFunction;
 import org.apache.flink.test.testdata.ConnectedComponentsData;
 import org.apache.flink.test.util.JavaProgramTestBase;
 import org.apache.flink.types.NullValue;
 
-import flink.graphs.spargel.MessageIterator;
-import flink.graphs.spargel.MessagingFunction;
-import flink.graphs.spargel.VertexUpdateFunction;
-
 @SuppressWarnings("serial")
 public class TestVertexCentricConnectedComponents extends JavaProgramTestBase {
 
@@ -60,7 +62,7 @@ public class TestVertexCentricConnectedComponents extends JavaProgramTestBase {
 		DataSet<Edge<Long, NullValue>> edges = edgeString.map(new EdgeParser());
 		
 		DataSet<Vertex<Long, Long>> initialVertices = vertexIds.map(new IdAssigner());
-		Graph<Long, Long, NullValue> graph = Graph.create(initialVertices, edges, env); 
+		Graph<Long, Long, NullValue> graph = Graph.fromDataSet(initialVertices, edges, env); 
 		
 		Graph<Long, Long, NullValue> result = graph.runVertexCentricIteration(new CCUpdater(), new CCMessager(), 100);
 		


Mime
View raw message