flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From g...@apache.org
Subject flink git commit: [FLINK-3771] [gelly] Methods for translating Graphs
Date Mon, 02 May 2016 16:07:06 GMT
Repository: flink
Updated Branches:
  refs/heads/master a0f539738 -> 08e4bf944


[FLINK-3771] [gelly] Methods for translating Graphs

Methods for translation of the type or value of graph IDs, vertex
values, and edge values.

This closes #1900


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/08e4bf94
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/08e4bf94
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/08e4bf94

Branch: refs/heads/master
Commit: 08e4bf9440a566c874c2b8e74ae2127ff264c672
Parents: a0f5397
Author: Greg Hogan <code@greghogan.com>
Authored: Sat Apr 16 07:06:44 2016 -0400
Committer: EC2 Default User <ec2-user@ip-10-0-6-60.ec2.internal>
Committed: Mon May 2 16:06:53 2016 +0000

----------------------------------------------------------------------
 docs/apis/batch/libs/gelly.md                   |  87 +++++
 .../org/apache/flink/graph/scala/Graph.scala    |  84 +++++
 .../main/java/org/apache/flink/graph/Graph.java |  39 +++
 .../graph/asm/translate/LongValueAddOffset.java |  49 +++
 .../asm/translate/LongValueToIntValue.java      |  47 +++
 .../asm/translate/LongValueToStringValue.java   |  39 +++
 .../flink/graph/asm/translate/Translate.java    | 346 +++++++++++++++++++
 .../asm/translate/TranslateEdgeValues.java      |  81 +++++
 .../graph/asm/translate/TranslateGraphIds.java  |  88 +++++
 .../asm/translate/TranslateVertexValues.java    |  81 +++++
 .../graph/asm/translate/TranslateTest.java      | 135 ++++++++
 11 files changed, 1076 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/08e4bf94/docs/apis/batch/libs/gelly.md
----------------------------------------------------------------------
diff --git a/docs/apis/batch/libs/gelly.md b/docs/apis/batch/libs/gelly.md
index 53d628d..fb9bc43 100644
--- a/docs/apis/batch/libs/gelly.md
+++ b/docs/apis/batch/libs/gelly.md
@@ -433,6 +433,42 @@ val updatedGraph = graph.mapVertices(v => v.getValue + 1)
 </div>
 </div>
 
+* <strong>Translate</strong>: Gelly provides specialized methods for translating the value and/or type of vertex and edge IDs (`translateGraphIDs`), vertex values (`translateVertexValues`), or edge values (`translateEdgeValues`). Translation is performed by the user-defined map function, several of which are provided in the `org.apache.flink.graph.asm.translate` package. The same `MapFunction` can be used for all the three translate methods.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+Graph<Long, Long, Long> graph = Graph.fromDataSet(vertices, edges, env);
+
+// translate each vertex and edge ID to a String
+Graph<String, Long, Long> updatedGraph = graph.translateGraphIds(
+				new MapFunction<Long, String>() {
+					public String map(Long id) {
+						return id.toString();
+					}
+				});
+
+// translate vertex IDs, edge IDs, vertex values, and edge values to LongValue
+Graph<LongValue, LongValue, LongValue> updatedGraph = graph
+                .translateGraphIds(new LongToLongValue())
+                .translateVertexValues(new LongToLongValue())
+                .translateEdgeValues(new LongToLongValue())
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val env = ExecutionEnvironment.getExecutionEnvironment
+val graph = Graph.fromDataSet(vertices, edges, env)
+
+// translate each vertex and edge ID to a String
+val updatedGraph = graph.translateGraphIds(id => id.toString)
+{% endhighlight %}
+</div>
+</div>
+
+
 * <strong>Filter</strong>: A filter transformation applies a user-defined filter function on the vertices or edges of the `Graph`. `filterOnEdges` will create a sub-graph of the original graph, keeping only the edges that satisfy the provided predicate. Note that the vertex dataset will not be modified. Respectively, `filterOnVertices` applies a filter on the vertices of the graph. Edges whose source and/or target do not satisfy the vertex predicate are removed from the resulting edge dataset. The `subgraph` method can be used to apply a filter function to the vertices and the edges at the same time.
 
 <div class="codetabs" markdown="1">
@@ -2013,6 +2049,57 @@ vertex and edge in the output graph stores the common group value and the number
 
 {% top %}
 
+Graph Algorithms
+-----------
+
+The logic blocks with which the `Graph` API and top-level algorithms are assembled are accessible in Gelly as graph
+algorithms in the `org.apache.flink.graph.asm` package. These algorithms provide optimization and tuning through
+configuration parameters and may provide implicit runtime reuse when processing the same input with a similar
+configuration.
+
+<table class="table table-bordered">
+  <thead>
+    <tr>
+      <th class="text-left" style="width: 20%">Algorithm</th>
+      <th class="text-center">Description</th>
+    </tr>
+  </thead>
+
+  <tbody>
+    <tr>
+      <td><strong>TranslateGraphIds</strong></td>
+      <td>
+        <p>Translate vertex and edge IDs using the given <code>MapFunction</code>.</p>
+{% highlight java %}
+graph.run(new TranslateGraphIds(new LongValueToStringValue()));
+{% endhighlight %}
+      </td>
+    </tr>
+
+    <tr>
+      <td><strong>TranslateVertexValues</strong></td>
+      <td>
+        <p>Translate vertex values using the given <code>MapFunction</code>.</p>
+{% highlight java %}
+graph.run(new TranslateVertexValues(new LongValueAddOffset(vertexCount)));
+{% endhighlight %}
+      </td>
+    </tr>
+
+    <tr>
+      <td><strong>TranslateEdgeValues</strong></td>
+      <td>
+        <p>Translate edge values using the given <code>MapFunction</code>.</p>
+{% highlight java %}
+graph.run(new TranslateEdgeValues(new Nullify()));
+{% endhighlight %}
+      </td>
+    </tr>
+  </tbody>
+</table>
+
+{% top %}
+
 Graph Generators
 -----------
 

http://git-wip-us.apache.org/repos/asf/flink/blob/08e4bf94/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala b/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala
index 0699521..5e14319 100644
--- a/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala
+++ b/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala
@@ -407,6 +407,90 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) {
   }
 
   /**
+   * Translate vertex and edge IDs using the given MapFunction.
+   *
+   * @param translator implements conversion from K to NEW
+   * @return graph with translated vertex and edge IDs
+   */
+  def translateGraphIds[NEW: TypeInformation : ClassTag](translator: MapFunction[K, NEW]):
+  Graph[NEW, VV, EV] = {
+    new Graph[NEW, VV, EV](jgraph.translateGraphIds(translator))
+  }
+
+  /**
+    * Translate vertex and edge IDs using the given function.
+    *
+    * @param fun implements conversion from K to NEW
+    * @return graph with translated vertex and edge IDs
+    */
+  def translateGraphIds[NEW: TypeInformation : ClassTag](fun: K => NEW):
+  Graph[NEW, VV, EV] = {
+    val mapper: MapFunction[K, NEW] = new MapFunction[K, NEW] {
+      val cleanFun = clean(fun)
+
+      def map(in: K): NEW = cleanFun(in)
+    }
+
+    new Graph[NEW, VV, EV](jgraph.translateGraphIds(mapper))
+  }
+
+  /**
+   * Translate vertex values using the given MapFunction.
+   *
+   * @param translator implements conversion from VV to NEW
+   * @return graph with translated vertex values
+   */
+  def translateVertexValues[NEW: TypeInformation : ClassTag](translator: MapFunction[VV, NEW]):
+  Graph[K, NEW, EV] = {
+    new Graph[K, NEW, EV](jgraph.translateVertexValues(translator))
+  }
+
+  /**
+    * Translate vertex values using the given function.
+    *
+    * @param fun implements conversion from VV to NEW
+    * @return graph with translated vertex values
+    */
+  def translateVertexValues[NEW: TypeInformation : ClassTag](fun: VV => NEW):
+  Graph[K, NEW, EV] = {
+    val mapper: MapFunction[VV, NEW] = new MapFunction[VV, NEW] {
+      val cleanFun = clean(fun)
+
+      def map(in: VV): NEW = cleanFun(in)
+    }
+
+    new Graph[K, NEW, EV](jgraph.translateVertexValues(mapper))
+  }
+
+  /**
+   * Translate edge values using the given MapFunction.
+   *
+   * @param translator implements conversion from EV to NEW
+   * @return graph with translated edge values
+   */
+  def translateEdgeValues[NEW: TypeInformation : ClassTag](translator: MapFunction[EV, NEW]):
+  Graph[K, VV, NEW] = {
+    new Graph[K, VV, NEW](jgraph.translateEdgeValues(translator))
+  }
+
+  /**
+    * Translate edge values using the given function.
+    *
+    * @param fun implements conversion from EV to NEW
+    * @return graph with translated edge values
+    */
+  def translateEdgeValues[NEW: TypeInformation : ClassTag](fun: EV => NEW):
+  Graph[K, VV, NEW] = {
+    val mapper: MapFunction[EV, NEW] = new MapFunction[EV, NEW] {
+      val cleanFun = clean(fun)
+
+      def map(in: EV): NEW = cleanFun(in)
+    }
+
+    new Graph[K, VV, NEW](jgraph.translateEdgeValues(mapper))
+  }
+
+  /**
    * Joins the vertex DataSet of this graph with an input Tuple2 DataSet and applies
    * a user-defined transformation on the values of the matched records.
    * The vertex ID and the first field of the Tuple2 DataSet are used as the join keys.

http://git-wip-us.apache.org/repos/asf/flink/blob/08e4bf94/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
index aabc466..99faf44 100755
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
@@ -46,6 +46,9 @@ import org.apache.flink.api.java.tuple.Tuple4;
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.graph.asm.translate.TranslateEdgeValues;
+import org.apache.flink.graph.asm.translate.TranslateGraphIds;
+import org.apache.flink.graph.asm.translate.TranslateVertexValues;
 import org.apache.flink.graph.gsa.ApplyFunction;
 import org.apache.flink.graph.gsa.GSAConfiguration;
 import org.apache.flink.graph.gsa.GatherFunction;
@@ -547,6 +550,42 @@ public class Graph<K, VV, EV> {
 	}
 
 	/**
+	 * Translate {@link Vertex} and {@link Edge} IDs using the given {@link MapFunction}.
+	 *
+	 * @param translator implements conversion from {@code K} to {@code NEW}
+	 * @param <NEW> new ID type
+	 * @return graph with translated vertex and edge IDs
+	 * @throws Exception
+	 */
+	public <NEW> Graph<NEW, VV, EV> translateGraphIds(MapFunction<K, NEW> translator) throws Exception {
+		return run(new TranslateGraphIds<K, NEW, VV, EV>(translator));
+	}
+
+	/**
+	 * Translate {@link Vertex} values using the given {@link MapFunction}.
+	 *
+	 * @param translator implements conversion from {@code VV} to {@code NEW}
+	 * @param <NEW> new vertex value type
+	 * @return graph with translated vertex values
+	 * @throws Exception
+	 */
+	public <NEW> Graph<K, NEW, EV> translateVertexValues(MapFunction<VV, NEW> translator) throws Exception {
+		return run(new TranslateVertexValues<K, VV, NEW, EV>(translator));
+	}
+
+	/**
+	 * Translate {@link Edge} values using the given {@link MapFunction}.
+	 *
+	 * @param translator implements conversion from {@code EV} to {@code NEW}
+	 * @param <NEW> new edge value type
+	 * @return graph with translated edge values
+	 * @throws Exception
+	 */
+	public <NEW> Graph<K, VV, NEW> translateEdgeValues(MapFunction<EV, NEW> translator) throws Exception {
+		return run(new TranslateEdgeValues<K, VV, EV, NEW>(translator));
+	}
+
+	/**
 	 * Apply a function to the attribute of each edge in the graph.
 	 *
 	 * @param mapper the map function to apply.

http://git-wip-us.apache.org/repos/asf/flink/blob/08e4bf94/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/LongValueAddOffset.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/LongValueAddOffset.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/LongValueAddOffset.java
new file mode 100644
index 0000000..b21fe84
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/LongValueAddOffset.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.asm.translate;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.types.LongValue;
+
+/**
+ * Translate {@link LongValue} by adding a constant offset value.
+ */
+public class LongValueAddOffset
+implements MapFunction<LongValue, LongValue> {
+
+	private final long offset;
+
+	private LongValue output = new LongValue();
+
+	/**
+	 * Translate {@link LongValue} by adding a constant offset value.
+	 *
+	 * @param offset value to be added to each element
+	 */
+	public LongValueAddOffset(long offset) {
+		this.offset = offset;
+	}
+
+	@Override
+	public LongValue map(LongValue value)
+			throws Exception {
+		output.setValue(offset + value.getValue());
+		return output;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/08e4bf94/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/LongValueToIntValue.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/LongValueToIntValue.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/LongValueToIntValue.java
new file mode 100644
index 0000000..c14fe04
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/LongValueToIntValue.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.asm.translate;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.LongValue;
+
+/**
+ * Translate {@link LongValue} to {@link IntValue}.
+ *
+ * Throws {@link RuntimeException} for integer overflow.
+ */
+public class LongValueToIntValue
+implements MapFunction<LongValue, IntValue> {
+
+	private IntValue output = new IntValue();
+
+	@Override
+	public IntValue map(LongValue value)
+			throws Exception {
+		long val = value.getValue();
+
+		if (val > Integer.MAX_VALUE) {
+			throw new RuntimeException("LongValue input overflows IntValue output");
+		}
+
+		output.setValue((int) val);
+		return output;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/08e4bf94/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/LongValueToStringValue.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/LongValueToStringValue.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/LongValueToStringValue.java
new file mode 100644
index 0000000..3d9a5cb
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/LongValueToStringValue.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.asm.translate;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.StringValue;
+
+/**
+ * Translate {@link LongValue} to {@link StringValue}.
+ */
+public class LongValueToStringValue
+implements MapFunction<LongValue, StringValue> {
+
+	private StringValue output = new StringValue();
+
+	@Override
+	public StringValue map(LongValue value)
+			throws Exception {
+		output.setValue(Long.toString(value.getValue()));
+		return output;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/08e4bf94/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/Translate.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/Translate.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/Translate.java
new file mode 100644
index 0000000..585472d
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/Translate.java
@@ -0,0 +1,346 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.asm.translate;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
+import org.apache.flink.api.java.operators.translation.WrappingFunction;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.util.Preconditions;
+
+import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
+import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_UNKNOWN;
+
+/**
+ * Methods for translation of the type or modification of the data of graph
+ * IDs, vertex values, and edge values.
+ */
+public class Translate {
+
+	// --------------------------------------------------------------------------------------------
+	//  Translate vertex IDs
+	// --------------------------------------------------------------------------------------------
+
+	/**
+	 * Translate {@link Vertex} IDs using the given {@link MapFunction}.
+	 *
+	 * @param vertices input vertices
+	 * @param translator implements conversion from {@code OLD} to {@code NEW}
+	 * @param <OLD> old vertex ID type
+	 * @param <NEW> new vertex ID type
+	 * @param <VV> vertex value type
+	 * @return translated vertices
+	 */
+	public static <OLD, NEW, VV> DataSet<Vertex<NEW, VV>> translateVertexIds(DataSet<Vertex<OLD, VV>> vertices, MapFunction<OLD, NEW> translator) {
+		return translateVertexIds(vertices, translator, PARALLELISM_UNKNOWN);
+	}
+
+	/**
+	 * Translate {@link Vertex} IDs using the given {@link MapFunction}.
+	 *
+	 * @param vertices input vertices
+	 * @param translator implements conversion from {@code OLD} to {@code NEW}
+	 * @param parallelism operator parallelism
+	 * @param <OLD> old vertex ID type
+	 * @param <NEW> new vertex ID type
+	 * @param <VV> vertex value type
+	 * @return translated vertices
+	 */
+	@SuppressWarnings("unchecked")
+	public static <OLD, NEW, VV> DataSet<Vertex<NEW, VV>> translateVertexIds(DataSet<Vertex<OLD, VV>> vertices, MapFunction<OLD, NEW> translator, int parallelism) {
+		Preconditions.checkNotNull(vertices);
+		Preconditions.checkNotNull(translator);
+		Preconditions.checkArgument(parallelism > 0 || parallelism == PARALLELISM_DEFAULT || parallelism == PARALLELISM_UNKNOWN,
+			"The parallelism must be greater than zero.");
+
+		Class<Vertex<NEW, VV>> vertexClass = (Class<Vertex<NEW, VV>>)(Class<? extends Vertex>) Vertex.class;
+		TypeInformation<NEW> newType = TypeExtractor.createTypeInfo(MapFunction.class, translator.getClass(), 1, null, null);
+		TypeInformation<VV> vertexValueType = ((TupleTypeInfo<Vertex<OLD, VV>>) vertices.getType()).getTypeAt(1);
+
+		TupleTypeInfo<Vertex<NEW, VV>> returnType = new TupleTypeInfo<>(vertexClass, newType, vertexValueType);
+
+		return vertices
+			.map(new TranslateVertexId<OLD, NEW, VV>(translator))
+			.returns(returnType)
+				.setParallelism(parallelism)
+				.name("Translate vertex IDs");
+	}
+
+	/**
+	 * Translate {@link Vertex} IDs using the given {@link MapFunction}.
+	 *
+	 * @param <OLD> old vertex ID type
+	 * @param <NEW> new vertex ID type
+	 * @param <VV> vertex value type
+	 */
+	@ForwardedFields("1")
+	private static class TranslateVertexId<OLD, NEW, VV>
+	extends WrappingFunction<MapFunction<OLD, NEW>>
+	implements MapFunction<Vertex<OLD, VV>, Vertex<NEW, VV>> {
+		private Vertex<NEW, VV> vertex = new Vertex<>();
+
+		public TranslateVertexId(MapFunction<OLD, NEW> translator) {
+			super(translator);
+		}
+
+		@Override
+		public Vertex<NEW, VV> map(Vertex<OLD, VV> value)
+				throws Exception {
+			vertex.f0 = wrappedFunction.map(value.f0);
+			vertex.f1 = value.f1;
+
+			return vertex;
+		}
+	}
+
+	// --------------------------------------------------------------------------------------------
+	//  Translate edge IDs
+	// --------------------------------------------------------------------------------------------
+
+	/**
+	 * Translate {@link Edge} IDs using the given {@link MapFunction}.
+	 *
+	 * @param edges input edges
+	 * @param translator implements conversion from {@code OLD} to {@code NEW}
+	 * @param <OLD> old edge ID type
+	 * @param <NEW> new edge ID type
+	 * @param <EV> edge value type
+	 * @return translated edges
+	 */
+	public static <OLD, NEW, EV> DataSet<Edge<NEW, EV>> translateEdgeIds(DataSet<Edge<OLD, EV>> edges, MapFunction<OLD, NEW> translator) {
+		return translateEdgeIds(edges, translator, PARALLELISM_UNKNOWN);
+	}
+
+	/**
+	 * Translate {@link Edge} IDs using the given {@link MapFunction}.
+	 *
+	 * @param edges input edges
+	 * @param translator implements conversion from {@code OLD} to {@code NEW}
+	 * @param parallelism operator parallelism
+	 * @param <OLD> old edge ID type
+	 * @param <NEW> new edge ID type
+	 * @param <EV> edge value type
+	 * @return translated edges
+	 */
+	@SuppressWarnings("unchecked")
+	public static <OLD, NEW, EV> DataSet<Edge<NEW, EV>> translateEdgeIds(DataSet<Edge<OLD, EV>> edges, MapFunction<OLD, NEW> translator, int parallelism) {
+		Preconditions.checkNotNull(edges);
+		Preconditions.checkNotNull(translator);
+		Preconditions.checkArgument(parallelism > 0 || parallelism == PARALLELISM_DEFAULT || parallelism == PARALLELISM_UNKNOWN,
+			"The parallelism must be greater than zero.");
+
+		Class<Edge<NEW, EV>> edgeClass = (Class<Edge<NEW, EV>>)(Class<? extends Edge>) Edge.class;
+		TypeInformation<NEW> newType = TypeExtractor.createTypeInfo(MapFunction.class, translator.getClass(), 1, null, null);
+		TypeInformation<EV> edgeValueType = ((TupleTypeInfo<Edge<OLD, EV>>) edges.getType()).getTypeAt(2);
+
+		TupleTypeInfo<Edge<NEW, EV>> returnType = new TupleTypeInfo<>(edgeClass, newType, newType, edgeValueType);
+
+		return edges
+			.map(new TranslateEdgeId<OLD, NEW, EV>(translator))
+			.returns(returnType)
+				.setParallelism(parallelism)
+				.name("Translate edge IDs");
+	}
+
+	/**
+	 * Translate {@link Edge} IDs using the given {@link MapFunction}.
+	 *
+	 * @param <OLD> old edge ID type
+	 * @param <NEW> new edge ID type
+	 * @param <EV> edge value type
+	 */
+	@ForwardedFields("2")
+	private static class TranslateEdgeId<OLD, NEW, EV>
+	extends WrappingFunction<MapFunction<OLD, NEW>>
+	implements MapFunction<Edge<OLD, EV>, Edge<NEW, EV>> {
+		private Edge<NEW, EV> edge = new Edge<>();
+
+		public TranslateEdgeId(MapFunction<OLD, NEW> translator) {
+			super(translator);
+		}
+
+		@Override
+		public Edge<NEW, EV> map(Edge<OLD, EV> value)
+				throws Exception {
+			edge.f0 = wrappedFunction.map(value.f0);
+			edge.f1 = wrappedFunction.map(value.f1);
+			edge.f2 = value.f2;
+
+			return edge;
+		}
+	}
+
+	// --------------------------------------------------------------------------------------------
+	//  Translate vertex values
+	// --------------------------------------------------------------------------------------------
+
+	/**
+	 * Translate {@link Vertex} values using the given {@link MapFunction}.
+	 *
+	 * @param vertices input vertices
+	 * @param translator implements conversion from {@code OLD} to {@code NEW}
+	 * @param <K> vertex ID type
+	 * @param <OLD> old vertex value type
+	 * @param <NEW> new vertex value type
+	 * @return translated vertices
+	 */
+	public static <K, OLD, NEW> DataSet<Vertex<K, NEW>> translateVertexValues(DataSet<Vertex<K, OLD>> vertices, MapFunction<OLD, NEW> translator) {
+		return translateVertexValues(vertices, translator, PARALLELISM_UNKNOWN);
+	}
+
+	/**
+	 * Translate {@link Vertex} values using the given {@link MapFunction}.
+	 *
+	 * @param vertices input vertices
+	 * @param translator implements conversion from {@code OLD} to {@code NEW}
+	 * @param parallelism operator parallelism
+	 * @param <K> vertex ID type
+	 * @param <OLD> old vertex value type
+	 * @param <NEW> new vertex value type
+	 * @return translated vertices
+	 */
+	@SuppressWarnings("unchecked")
+	public static <K, OLD, NEW> DataSet<Vertex<K, NEW>> translateVertexValues(DataSet<Vertex<K, OLD>> vertices, MapFunction<OLD, NEW> translator, int parallelism) {
+		Preconditions.checkNotNull(vertices);
+		Preconditions.checkNotNull(translator);
+		Preconditions.checkArgument(parallelism > 0 || parallelism == PARALLELISM_DEFAULT || parallelism == PARALLELISM_UNKNOWN,
+			"The parallelism must be greater than zero.");
+
+		Class<Vertex<K, NEW>> vertexClass = (Class<Vertex<K, NEW>>)(Class<? extends Vertex>) Vertex.class;
+		TypeInformation<K> idType = ((TupleTypeInfo<Vertex<K, OLD>>) vertices.getType()).getTypeAt(0);
+		TypeInformation<NEW> newType = TypeExtractor.createTypeInfo(MapFunction.class, translator.getClass(), 1, null, null);
+
+		TupleTypeInfo<Vertex<K, NEW>> returnType = new TupleTypeInfo<>(vertexClass, idType, newType);
+
+		return vertices
+			.map(new TranslateVertexValue<K, OLD, NEW>(translator))
+			.returns(returnType)
+				.setParallelism(parallelism)
+				.name("Translate vertex values");
+	}
+
+	/**
+	 * Translate {@link Vertex} values using the given {@link MapFunction}.
+	 *
+	 * @param <K> vertex ID type
+	 * @param <OLD> old vertex value type
+	 * @param <NEW> new vertex value type
+	 */
+	@ForwardedFields("0")
+	private static class TranslateVertexValue<K, OLD, NEW>
+	extends WrappingFunction<MapFunction<OLD, NEW>>
+	implements MapFunction<Vertex<K, OLD>, Vertex<K, NEW>> {
+		private Vertex<K, NEW> vertex = new Vertex<>();
+
+		public TranslateVertexValue(MapFunction<OLD, NEW> translator) {
+			super(translator);
+		}
+
+		@Override
+		public Vertex<K, NEW> map(Vertex<K, OLD> value)
+				throws Exception {
+			vertex.f0 = value.f0;
+			vertex.f1 = wrappedFunction.map(value.f1);
+
+			return vertex;
+		}
+	}
+
+	// --------------------------------------------------------------------------------------------
+	//  Translate edge values
+	// --------------------------------------------------------------------------------------------
+
+	/**
+	 * Translate {@link Edge} values using the given {@link MapFunction}.
+	 *
+	 * @param edges input edges
+	 * @param translator implements conversion from {@code OLD} to {@code NEW}
+	 * @param <K> edge ID type
+	 * @param <OLD> old edge value type
+	 * @param <NEW> new edge value type
+	 * @return translated edges
+	 */
+	public static <K, OLD, NEW> DataSet<Edge<K, NEW>> translateEdgeValues(DataSet<Edge<K, OLD>> edges, MapFunction<OLD, NEW> translator) {
+		return translateEdgeValues(edges, translator, PARALLELISM_UNKNOWN);
+	}
+
+	/**
+	 * Translate {@link Edge} values using the given {@link MapFunction}.
+	 *
+	 * @param edges input edges
+	 * @param translator implements conversion from {@code OLD} to {@code NEW}
+	 * @param parallelism operator parallelism
+	 * @param <K> vertex ID type
+	 * @param <OLD> old edge value type
+	 * @param <NEW> new edge value type
+	 * @return translated edges
+	 */
+	@SuppressWarnings("unchecked")
+	public static <K, OLD, NEW> DataSet<Edge<K, NEW>> translateEdgeValues(DataSet<Edge<K, OLD>> edges, MapFunction<OLD, NEW> translator, int parallelism) {
+		Preconditions.checkNotNull(edges);
+		Preconditions.checkNotNull(translator);
+		Preconditions.checkArgument(parallelism > 0 || parallelism == PARALLELISM_DEFAULT || parallelism == PARALLELISM_UNKNOWN,
+			"The parallelism must be greater than zero.");
+
+		Class<Edge<K, NEW>> edgeClass = (Class<Edge<K, NEW>>)(Class<? extends Edge>) Edge.class;
+		TypeInformation<K> idType = ((TupleTypeInfo<Edge<K, OLD>>) edges.getType()).getTypeAt(0);
+		TypeInformation<NEW> newType = TypeExtractor.createTypeInfo(MapFunction.class, translator.getClass(), 1, null, null);
+
+		TupleTypeInfo<Edge<K, NEW>> returnType = new TupleTypeInfo<>(edgeClass, idType, idType, newType);
+
+		return edges
+			.map(new TranslateEdgeValue<K, OLD, NEW>(translator))
+			.returns(returnType)
+				.setParallelism(parallelism)
+				.name("Translate edge values");
+	}
+
+	/**
+	 * Translate {@link Edge} values using the given {@link MapFunction}.
+	 *
+	 * @param <K> edge ID type
+	 * @param <OLD> old edge value type
+	 * @param <NEW> new edge value type
+	 */
+	@ForwardedFields("0; 1")
+	private static class TranslateEdgeValue<K, OLD, NEW>
+	extends WrappingFunction<MapFunction<OLD, NEW>>
+	implements MapFunction<Edge<K, OLD>, Edge<K, NEW>> {
+		private Edge<K, NEW> edge = new Edge<>();
+
+		public TranslateEdgeValue(MapFunction<OLD, NEW> translator) {
+			super(translator);
+		}
+
+		@Override
+		public Edge<K, NEW> map(Edge<K, OLD> value)
+				throws Exception {
+			edge.f0 = value.f0;
+			edge.f1 = value.f1;
+			edge.f2 = wrappedFunction.map(value.f2);
+
+			return edge;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/08e4bf94/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateEdgeValues.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateEdgeValues.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateEdgeValues.java
new file mode 100644
index 0000000..3dd0478
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateEdgeValues.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.asm.translate;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.GraphAlgorithm;
+import org.apache.flink.util.Preconditions;
+
+import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
+import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_UNKNOWN;
+import static org.apache.flink.graph.asm.translate.Translate.translateEdgeValues;
+
+/**
+ * Translate {@link Edge} values using the given {@link MapFunction}.
+ *
+ * @param <K> vertex ID type
+ * @param <VV> vertex value type
+ * @param <OLD> old edge value type
+ * @param <NEW> new edge value type
+ */
+public class TranslateEdgeValues<K, VV, OLD, NEW>
+implements GraphAlgorithm<K, VV, OLD, Graph<K, VV, NEW>> {
+
+	// Required configuration
+	private MapFunction<OLD,NEW> translator;
+
+	// Optional configuration
+	private int parallelism = PARALLELISM_UNKNOWN;
+
+	/**
+	 * Translate {@link Edge} values using the given {@link MapFunction}.
+	 *
+	 * @param translator implements conversion from {@code OLD} to {@code NEW}
+	 */
+	public TranslateEdgeValues(MapFunction<OLD, NEW> translator) {
+		Preconditions.checkNotNull(translator);
+
+		this.translator = translator;
+	}
+
+	/**
+	 * Override the operator parallelism.
+	 *
+	 * @param parallelism operator parallelism
+	 * @return this
+	 */
+	public TranslateEdgeValues<K, VV, OLD, NEW> setParallelism(int parallelism) {
+		Preconditions.checkArgument(parallelism > 0 || parallelism == PARALLELISM_DEFAULT || parallelism == PARALLELISM_UNKNOWN,
+			"The parallelism must be greater than zero.");
+
+		this.parallelism = parallelism;
+
+		return this;
+	}
+
+	@Override
+	public Graph<K, VV, NEW> run(Graph<K, VV, OLD> input) throws Exception {
+		DataSet<Edge<K, NEW>> translatedEdges = translateEdgeValues(input.getEdges(), translator, parallelism);
+
+		return Graph.fromDataSet(input.getVertices(), translatedEdges, input.getContext());
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/08e4bf94/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateGraphIds.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateGraphIds.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateGraphIds.java
new file mode 100644
index 0000000..3e12880
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateGraphIds.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.asm.translate;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.GraphAlgorithm;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.util.Preconditions;
+
+import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
+import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_UNKNOWN;
+import static org.apache.flink.graph.asm.translate.Translate.translateEdgeIds;
+import static org.apache.flink.graph.asm.translate.Translate.translateVertexIds;
+
+/**
+ * Translate {@link Vertex} and {@link Edge} IDs of a {@link Graph} using the given {@link MapFunction}
+ *
+ * @param <OLD> old graph ID type
+ * @param <NEW> new graph ID type
+ * @param <VV> vertex value type
+ * @param <EV> edge value type
+ */
+public class TranslateGraphIds<OLD, NEW, VV, EV>
+implements GraphAlgorithm<OLD, VV, EV, Graph<NEW, VV, EV>> {
+
+	// Required configuration
+	private MapFunction<OLD,NEW> translator;
+
+	// Optional configuration
+	private int parallelism = PARALLELISM_UNKNOWN;
+
+	/**
+	 * Translate {@link Vertex} and {@link Edge} IDs of a {@link Graph} using the given {@link MapFunction}
+	 *
+	 * @param translator implements conversion from {@code OLD} to {@code NEW}
+	 */
+	public TranslateGraphIds(MapFunction<OLD, NEW> translator) {
+		Preconditions.checkNotNull(translator);
+
+		this.translator = translator;
+	}
+
+	/**
+	 * Override the operator parallelism.
+	 *
+	 * @param parallelism operator parallelism
+	 * @return this
+	 */
+	public TranslateGraphIds<OLD, NEW, VV, EV> setParallelism(int parallelism) {
+		Preconditions.checkArgument(parallelism > 0 || parallelism == PARALLELISM_DEFAULT || parallelism == PARALLELISM_UNKNOWN,
+			"The parallelism must be greater than zero.");
+
+		this.parallelism = parallelism;
+
+		return this;
+	}
+
+	@Override
+	public Graph<NEW, VV, EV> run(Graph<OLD, VV, EV> input) throws Exception {
+		// Vertices
+		DataSet<Vertex<NEW, VV>> translatedVertices = translateVertexIds(input.getVertices(), translator, parallelism);
+
+		// Edges
+		DataSet<Edge<NEW, EV>> translatedEdges = translateEdgeIds(input.getEdges(), translator, parallelism);
+
+		// Graph
+		return Graph.fromDataSet(translatedVertices, translatedEdges, input.getContext());
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/08e4bf94/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateVertexValues.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateVertexValues.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateVertexValues.java
new file mode 100644
index 0000000..7556e02
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateVertexValues.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.asm.translate;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.GraphAlgorithm;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.util.Preconditions;
+
+import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
+import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_UNKNOWN;
+import static org.apache.flink.graph.asm.translate.Translate.translateVertexValues;
+
+/**
+ * Translate {@link Vertex} values using the given {@link MapFunction}.
+ *
+ * @param <K> vertex ID type
+ * @param <OLD> old vertex value type
+ * @param <NEW> new vertex value type
+ * @param <EV> edge value type
+ */
+public class TranslateVertexValues<K, OLD, NEW, EV>
+implements GraphAlgorithm<K, OLD, EV, Graph<K, NEW, EV>> {
+
+	// Required configuration
+	private MapFunction<OLD, NEW> translator;
+
+	// Optional configuration
+	private int parallelism = PARALLELISM_UNKNOWN;
+
+	/**
+	 * Translate {@link Vertex} values using the given {@link MapFunction}.
+	 *
+	 * @param translator implements conversion from {@code OLD} to {@code NEW}
+	 */
+	public TranslateVertexValues(MapFunction<OLD, NEW> translator) {
+		Preconditions.checkNotNull(translator);
+
+		this.translator = translator;
+	}
+
+	/**
+	 * Override the operator parallelism.
+	 *
+	 * @param parallelism operator parallelism
+	 * @return this
+	 */
+	public TranslateVertexValues<K, OLD, NEW, EV> setParallelism(int parallelism) {
+		Preconditions.checkArgument(parallelism > 0 || parallelism == PARALLELISM_DEFAULT || parallelism == PARALLELISM_UNKNOWN,
+			"The parallelism must be greater than zero.");
+
+		this.parallelism = parallelism;
+
+		return this;
+	}
+
+	@Override
+	public Graph<K, NEW, EV> run(Graph<K, OLD, EV> input) throws Exception {
+		DataSet<Vertex<K, NEW>> translatedVertices = translateVertexValues(input.getVertices(), translator, parallelism);
+
+		return Graph.fromDataSet(translatedVertices, input.getEdges(), input.getContext());
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/08e4bf94/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/translate/TranslateTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/translate/TranslateTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/translate/TranslateTest.java
new file mode 100644
index 0000000..2d1ab52
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/translate/TranslateTest.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.asm.translate;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.test.util.TestBaseUtils;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.StringValue;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.LinkedList;
+import java.util.List;
+
+import static org.apache.flink.graph.asm.translate.Translate.translateVertexValues;
+import static org.junit.Assert.assertEquals;
+
+public class TranslateTest {
+
+	private Graph<LongValue, LongValue, LongValue> graph;
+
+	private String expectedVertexResult =
+		"(0,0)\n" +
+		"(1,1)\n" +
+		"(2,2)\n" +
+		"(3,3)\n" +
+		"(4,4)\n" +
+		"(5,5)\n" +
+		"(6,6)\n" +
+		"(7,7)\n" +
+		"(8,8)\n" +
+		"(9,9)";
+
+	private String expectedEdgeResult =
+		"(0,0,0)\n" +
+		"(1,1,1)\n" +
+		"(2,2,2)\n" +
+		"(3,3,3)\n" +
+		"(4,4,4)\n" +
+		"(5,5,5)\n" +
+		"(6,6,6)\n" +
+		"(7,7,7)\n" +
+		"(8,8,8)\n" +
+		"(9,9,9)";
+
+	@Before
+	public void setup() {
+		ExecutionEnvironment env = ExecutionEnvironment.createCollectionsEnvironment();
+
+		int count = 10;
+
+		List<Vertex<LongValue, LongValue>> vertexList = new LinkedList<>();
+		List<Edge<LongValue, LongValue>> edgeList = new LinkedList<>();
+
+		for (long l = 0 ; l < count ; l++) {
+			LongValue lv = new LongValue(l);
+			vertexList.add(new Vertex<>(lv, lv));
+			edgeList.add(new Edge<>(lv, lv, lv));
+		}
+
+		graph = Graph.fromCollection(vertexList, edgeList, env);
+	}
+
+	@Test
+	public void testTranslateGraphIds()
+			throws Exception {
+		Graph<StringValue,LongValue, LongValue> stringIdGraph = graph
+			.translateGraphIds(new LongValueToStringValue());
+
+		for (Vertex<StringValue, LongValue> vertex : stringIdGraph.getVertices().collect()) {
+			assertEquals(StringValue.class, vertex.f0.getClass());
+			assertEquals(LongValue.class, vertex.f1.getClass());
+		}
+
+		for (Edge<StringValue, LongValue> edge : stringIdGraph.getEdges().collect()) {
+			assertEquals(StringValue.class, edge.f0.getClass());
+			assertEquals(StringValue.class, edge.f1.getClass());
+			assertEquals(LongValue.class, edge.f2.getClass());
+		}
+
+		TestBaseUtils.compareResultAsText(stringIdGraph.getVertices().collect(), expectedVertexResult);
+		TestBaseUtils.compareResultAsText(stringIdGraph.getEdges().collect(), expectedEdgeResult);
+	}
+
+	@Test
+	public void testTranslateVertexValues()
+			throws Exception {
+		DataSet<Vertex<LongValue, StringValue>> vertexSet = graph
+			.translateVertexValues(new LongValueToStringValue())
+			.getVertices();
+
+		for (Vertex<LongValue, StringValue> vertex : vertexSet.collect()) {
+			assertEquals(LongValue.class, vertex.f0.getClass());
+			assertEquals(StringValue.class, vertex.f1.getClass());
+		}
+
+		TestBaseUtils.compareResultAsText(vertexSet.collect(), expectedVertexResult);
+	}
+
+	@Test
+	public void testTranslateEdgeValues()
+			throws Exception {
+		DataSet<Edge<LongValue, StringValue>> edgeSet = graph
+			.translateEdgeValues(new LongValueToStringValue())
+			.getEdges();
+
+		for (Edge<LongValue, StringValue> edge : edgeSet.collect()) {
+			assertEquals(LongValue.class, edge.f0.getClass());
+			assertEquals(LongValue.class, edge.f1.getClass());
+			assertEquals(StringValue.class, edge.f2.getClass());
+		}
+
+		TestBaseUtils.compareResultAsText(edgeSet.collect(), expectedEdgeResult);
+	}
+}


Mime
View raw message