flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From va...@apache.org
Subject [08/10] flink git commit: [FLINK-1523] [gelly] Added VertexWithDegrees as a subclass of Vertex
Date Tue, 19 May 2015 21:03:37 GMT
[FLINK-1523] [gelly] Added VertexWithDegrees as a subclass of Vertex


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e1720673
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e1720673
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e1720673

Branch: refs/heads/master
Commit: e17206737e3701838c74c89ff108f32b11c4656c
Parents: 1dfbcba
Author: andralungu <lungu.andra@gmail.com>
Authored: Tue Apr 28 20:08:35 2015 +0200
Committer: vasia <vasia@apache.org>
Committed: Tue May 19 22:38:04 2015 +0200

----------------------------------------------------------------------
 .../java/org/apache/flink/graph/Vertex.java     | 34 +--------
 .../apache/flink/graph/VertexWithDegrees.java   | 72 ++++++++++++++++++++
 .../graph/example/IncrementalSSSPExample.java   |  3 +-
 .../flink/graph/spargel/MessagingFunction.java  |  3 +-
 .../graph/spargel/VertexCentricIteration.java   |  3 +-
 .../graph/spargel/VertexUpdateFunction.java     |  3 +-
 .../test/VertexCentricConfigurationITCase.java  | 10 +--
 ...CentricConfigurationWithExceptionITCase.java |  5 +-
 8 files changed, 89 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e1720673/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Vertex.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Vertex.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Vertex.java
index 9286443..c5eb973 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Vertex.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Vertex.java
@@ -31,19 +31,11 @@ public class Vertex<K, V> extends Tuple2<K, V> {
 
 	private static final long serialVersionUID = 1L;
 
-	private Long inDegree;
-	private Long outDegree;
-
-	public Vertex(){
-		inDegree = -1L;
-		outDegree = -1L;
-	}
+	public Vertex(){}
 
 	public Vertex(K k, V val) {
 		this.f0 = k;
 		this.f1 = val;
-		inDegree = 0L;
-		outDegree = 0L;
 	}
 
 	public K getId() {
@@ -61,28 +53,4 @@ public class Vertex<K, V> extends Tuple2<K, V> {
 	public void setValue(V val) {
 		this.f1 = val;
 	}
-
-	public Long getInDegree() throws Exception{
-		if(inDegree == -1) {
-			throw new InaccessibleMethodException("The degree option was not set. To access the degrees,
" +
-					"call iterationConfiguration.setOptDegrees(true).");
-		}
-		return inDegree;
-	}
-
-	public void setInDegree(Long inDegree) {
-		this.inDegree = inDegree;
-	}
-
-	public Long getOutDegree() throws Exception{
-		if(outDegree == -1) {
-			throw new InaccessibleMethodException("The degree option was not set. To access the degrees,
" +
-					"call iterationConfiguration.setOptDegrees(true).");
-		}
-		return outDegree;
-	}
-
-	public void setOutDegree(Long outDegree) {
-		this.outDegree = outDegree;
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e1720673/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/VertexWithDegrees.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/VertexWithDegrees.java
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/VertexWithDegrees.java
new file mode 100644
index 0000000..b692475
--- /dev/null
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/VertexWithDegrees.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph;
+
+import java.io.Serializable;
+
+/**
+ * Represents the graph's nodes. It carries an ID and a value as well as the vertex inDegree
and outDegree.
+ * For vertices with no value, use {@link org.apache.flink.types.NullValue} as the value
type.
+ *
+ * @param <K>
+ * @param <V>
+ */
+public class VertexWithDegrees<K extends Comparable<K> & Serializable, V extends
Serializable>
+		extends Vertex<K, V> {
+
+	private long inDegree;
+
+	private long outDegree;
+
+	public VertexWithDegrees() {
+		super();
+		inDegree = -1l;
+		outDegree = -1l;
+	}
+
+	public VertexWithDegrees(K k, V v) {
+		super(k,v);
+		inDegree = 0l;
+		outDegree = 0l;
+	}
+
+	public Long getInDegree() throws Exception{
+		if(inDegree == -1) {
+			throw new InaccessibleMethodException("The degree option was not set. To access the degrees,
" +
+					"call iterationConfiguration.setOptDegrees(true).");
+		}
+		return inDegree;
+	}
+
+	public void setInDegree(Long inDegree) {
+		this.inDegree = inDegree;
+	}
+
+	public Long getOutDegree() throws Exception{
+		if(outDegree == -1) {
+			throw new InaccessibleMethodException("The degree option was not set. To access the degrees,
" +
+					"call iterationConfiguration.setOptDegrees(true).");
+		}
+		return outDegree;
+	}
+
+	public void setOutDegree(Long outDegree) {
+		this.outDegree = outDegree;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e1720673/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/IncrementalSSSPExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/IncrementalSSSPExample.java
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/IncrementalSSSPExample.java
index 6d209c4..ec43207 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/IncrementalSSSPExample.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/IncrementalSSSPExample.java
@@ -26,6 +26,7 @@ import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.EdgeDirection;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.VertexWithDegrees;
 import org.apache.flink.graph.example.utils.IncrementalSSSPData;
 import org.apache.flink.graph.spargel.IterationConfiguration;
 import org.apache.flink.graph.spargel.MessageIterator;
@@ -159,7 +160,7 @@ public class IncrementalSSSPExample implements ProgramDescription {
 		@Override
 		public void updateVertex(Vertex<Long, Double> vertex, MessageIterator<Double>
inMessages) throws Exception {
 			if (inMessages.hasNext()) {
-				Long outDegree = vertex.getOutDegree() - 1;
+				Long outDegree = ((VertexWithDegrees)vertex).getOutDegree() - 1;
 				// check if the vertex has another SP-Edge
 				if (outDegree > 0) {
 					// there is another shortest path from the source to this vertex

http://git-wip-us.apache.org/repos/asf/flink/blob/e1720673/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/MessagingFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/MessagingFunction.java
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/MessagingFunction.java
index 69ba526..b3092fd 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/MessagingFunction.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/MessagingFunction.java
@@ -31,6 +31,7 @@ import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.EdgeDirection;
 import org.apache.flink.graph.InaccessibleMethodException;
 import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.VertexWithDegrees;
 import org.apache.flink.types.Value;
 import org.apache.flink.util.Collector;
 
@@ -281,7 +282,7 @@ public abstract class MessagingFunction<VertexKey, VertexValue, Message,
EdgeVal
 	 */
 	void sendMessagesFromVertexCentricIteration(Vertex<VertexKey, Tuple3<VertexValue,
Long, Long>> newVertexState)
 			throws Exception {
-		Vertex<VertexKey, VertexValue> vertex = new Vertex<VertexKey, VertexValue>(newVertexState.getId(),
+		VertexWithDegrees<VertexKey, VertexValue> vertex = new VertexWithDegrees<VertexKey,
VertexValue>(newVertexState.getId(),
 				newVertexState.getValue().f0);
 		vertex.setInDegree(newVertexState.getValue().f1);
 		vertex.setOutDegree(newVertexState.getValue().f2);

http://git-wip-us.apache.org/repos/asf/flink/blob/e1720673/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricIteration.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricIteration.java
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricIteration.java
index ce82bf1..915a3ee 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricIteration.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricIteration.java
@@ -43,6 +43,7 @@ import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.EdgeDirection;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.VertexWithDegrees;
 import org.apache.flink.util.Collector;
 
 import com.google.common.base.Preconditions;
@@ -630,7 +631,7 @@ public class VertexCentricIteration<VertexKey, VertexValue,	Message,
EdgeValue>
 									Tuple3<VertexKey, Long, Long> degrees,
 									Collector<Vertex<VertexKey, Tuple3<VertexValue, Long, Long>>>
out) throws Exception {
 
-						out.collect(new Vertex<VertexKey, Tuple3<VertexValue, Long, Long>>(vertex.getId(),
+						out.collect(new VertexWithDegrees<VertexKey, Tuple3<VertexValue, Long, Long>>(vertex.getId(),
 								new Tuple3<VertexValue, Long, Long>(vertex.getValue(), degrees.f1, degrees.f2)));
 					}
 				});

http://git-wip-us.apache.org/repos/asf/flink/blob/e1720673/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexUpdateFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexUpdateFunction.java
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexUpdateFunction.java
index b54042a..bc2e857 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexUpdateFunction.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexUpdateFunction.java
@@ -26,6 +26,7 @@ import org.apache.flink.api.common.functions.IterationRuntimeContext;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.graph.InaccessibleMethodException;
 import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.VertexWithDegrees;
 import org.apache.flink.types.Value;
 import org.apache.flink.util.Collector;
 
@@ -205,7 +206,7 @@ public abstract class VertexUpdateFunction<VertexKey, VertexValue,
Message> impl
 	 */
 	void updateVertexFromVertexCentricIteration(Vertex<VertexKey, Tuple3<VertexValue,
Long, Long>> vertexState,
 												MessageIterator<Message> inMessages) throws Exception {
-		Vertex<VertexKey, VertexValue> vertex = new Vertex<VertexKey, VertexValue>(vertexState.getId(),
+		VertexWithDegrees<VertexKey, VertexValue> vertex = new VertexWithDegrees<VertexKey,
VertexValue>(vertexState.getId(),
 				vertexState.getValue().f0);
 		vertex.setInDegree(vertexState.getValue().f1);
 		vertex.setOutDegree(vertexState.getValue().f2);

http://git-wip-us.apache.org/repos/asf/flink/blob/e1720673/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/VertexCentricConfigurationITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/VertexCentricConfigurationITCase.java
b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/VertexCentricConfigurationITCase.java
index 354c7b2..1c36906 100644
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/VertexCentricConfigurationITCase.java
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/VertexCentricConfigurationITCase.java
@@ -442,7 +442,7 @@ public class VertexCentricConfigurationITCase extends MultipleProgramsTestBase
{
 		@Override
 		public void updateVertex(Vertex<Long, Long> vertex, MessageIterator<Long> inMessages)
{
 			try {
-				setNewVertexValue(vertex.getInDegree());
+				setNewVertexValue(((VertexWithDegrees) vertex).getInDegree());
 			} catch (Exception e) {
 				e.printStackTrace();
 			}
@@ -455,7 +455,7 @@ public class VertexCentricConfigurationITCase extends MultipleProgramsTestBase
{
 		@Override
 		public void updateVertex(Vertex<Long, Long> vertex, MessageIterator<Long> inMessages)
{
 			try {
-				setNewVertexValue(vertex.getOutDegree());
+				setNewVertexValue(((VertexWithDegrees) vertex).getOutDegree());
 			} catch (Exception e) {
 				e.printStackTrace();
 			}
@@ -523,7 +523,7 @@ public class VertexCentricConfigurationITCase extends MultipleProgramsTestBase
{
 				count++;
 			}
 
-			setNewVertexValue(count == (vertex.getInDegree() + vertex.getOutDegree()));
+			setNewVertexValue(count == (((VertexWithDegrees)vertex).getInDegree() + ((VertexWithDegrees)vertex).getOutDegree()));
 		}
 	}
 
@@ -533,8 +533,8 @@ public class VertexCentricConfigurationITCase extends MultipleProgramsTestBase
{
 		@Override
 		public void updateVertex(Vertex<Long, Tuple3<Long, Long, Boolean>> vertex,
MessageIterator<Long> inMessages) {
 			try {
-				setNewVertexValue(new Tuple3(vertex.getValue().f0, vertex.getValue().f1, (vertex.getInDegree()
== vertex.getValue().f0)
-						&& (vertex.getOutDegree() == vertex.getValue().f1) && vertex.getValue().f2));
+				setNewVertexValue(new Tuple3(vertex.getValue().f0, vertex.getValue().f1, (((VertexWithDegrees)vertex).getInDegree()
== vertex.getValue().f0)
+						&& (((VertexWithDegrees)vertex).getOutDegree() == vertex.getValue().f1) &&
vertex.getValue().f2));
 			} catch (Exception e) {
 				e.printStackTrace();
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/e1720673/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/VertexCentricConfigurationWithExceptionITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/VertexCentricConfigurationWithExceptionITCase.java
b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/VertexCentricConfigurationWithExceptionITCase.java
index 21b3b07..5c57f47 100644
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/VertexCentricConfigurationWithExceptionITCase.java
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/VertexCentricConfigurationWithExceptionITCase.java
@@ -25,6 +25,7 @@ import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.VertexWithDegrees;
 import org.apache.flink.graph.spargel.MessageIterator;
 import org.apache.flink.graph.spargel.MessagingFunction;
 import org.apache.flink.graph.spargel.VertexUpdateFunction;
@@ -155,7 +156,7 @@ public class VertexCentricConfigurationWithExceptionITCase {
 
 		@Override
 		public void updateVertex(Vertex<Long, Long> vertex, MessageIterator<Long> inMessages)
throws Exception {
-			setNewVertexValue(vertex.getInDegree());
+			setNewVertexValue(((VertexWithDegrees) vertex).getInDegree());
 		}
 	}
 
@@ -164,7 +165,7 @@ public class VertexCentricConfigurationWithExceptionITCase {
 
 		@Override
 		public void updateVertex(Vertex<Long, Long> vertex, MessageIterator<Long> inMessages)
throws Exception {
-			setNewVertexValue(vertex.getOutDegree());
+			setNewVertexValue(((VertexWithDegrees)vertex).getOutDegree());
 		}
 	}
 


Mime
View raw message