flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From g...@apache.org
Subject [1/2] flink git commit: [FLINK-4217] [gelly] Gelly drivers should read CSV values as strings
Date Fri, 22 Jul 2016 18:38:41 GMT
Repository: flink
Updated Branches:
  refs/heads/master 54f02ec7b -> b71ac354d


[FLINK-4217] [gelly] Gelly drivers should read CSV values as strings

The user must now specify the ID type as "integer" or "string" when
reading a graph from a CSV file.

This closes #2250


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b71ac354
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b71ac354
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b71ac354

Branch: refs/heads/master
Commit: b71ac354d19dccdd8bfa837b92f6bc814c9d29c6
Parents: e2ef74e
Author: Greg Hogan <code@greghogan.com>
Authored: Thu Jul 14 10:02:23 2016 -0400
Committer: Greg Hogan <code@greghogan.com>
Committed: Fri Jul 22 14:07:19 2016 -0400

----------------------------------------------------------------------
 .../graph/examples/ClusteringCoefficient.java   | 59 +++++++++++++++-----
 .../org/apache/flink/graph/examples/HITS.java   | 36 ++++++++----
 .../flink/graph/examples/JaccardIndex.java      | 31 +++++++---
 .../flink/graph/examples/TriangleListing.java   | 48 ++++++++++++----
 4 files changed, 130 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b71ac354/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/ClusteringCoefficient.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/ClusteringCoefficient.java
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/ClusteringCoefficient.java
index 547ef97..8641428 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/ClusteringCoefficient.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/ClusteringCoefficient.java
@@ -29,6 +29,7 @@ import org.apache.flink.api.java.utils.DataSetUtils;
 import org.apache.flink.api.java.utils.ParameterTool;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.GraphAnalytic;
+import org.apache.flink.graph.GraphCsvReader;
 import org.apache.flink.graph.asm.translate.LongValueToIntValue;
 import org.apache.flink.graph.asm.translate.TranslateGraphIds;
 import org.apache.flink.graph.generator.RMatGraph;
@@ -37,6 +38,7 @@ import org.apache.flink.graph.generator.random.RandomGenerableFactory;
 import org.apache.flink.types.IntValue;
 import org.apache.flink.types.LongValue;
 import org.apache.flink.types.NullValue;
+import org.apache.flink.types.StringValue;
 
 import java.text.NumberFormat;
 
@@ -73,7 +75,7 @@ public class ClusteringCoefficient {
 		System.out.println("usage: ClusteringCoefficient --directed <true | false> --input
<csv | rmat [options]> --output <print | hash | csv [options]");
 		System.out.println();
 		System.out.println("options:");
-		System.out.println("  --input csv --input_filename FILENAME [--input_line_delimiter LINE_DELIMITER]
[--input_field_delimiter FIELD_DELIMITER]");
+		System.out.println("  --input csv --type <integer | string> --input_filename FILENAME
[--input_line_delimiter LINE_DELIMITER] [--input_field_delimiter FIELD_DELIMITER]");
 		System.out.println("  --input rmat [--scale SCALE] [--edge_factor EDGE_FACTOR]");
 		System.out.println();
 		System.out.println("  --output print");
@@ -105,23 +107,50 @@ public class ClusteringCoefficient {
 				String fieldDelimiter = StringEscapeUtils.unescapeJava(
 					parameters.get("input_field_delimiter", CsvOutputFormat.DEFAULT_FIELD_DELIMITER));
 
-				Graph<LongValue, NullValue, NullValue> graph = Graph
+				GraphCsvReader reader = Graph
 					.fromCsvReader(parameters.get("input_filename"), env)
 						.ignoreCommentsEdges("#")
 						.lineDelimiterEdges(lineDelimiter)
-						.fieldDelimiterEdges(fieldDelimiter)
-						.keyType(LongValue.class);
-
-				if (directedAlgorithm) {
-					gcc = graph
-						.run(new org.apache.flink.graph.library.clustering.directed.GlobalClusteringCoefficient<LongValue,
NullValue, NullValue>());
-					lcc = graph
-						.run(new org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient<LongValue,
NullValue, NullValue>());
-				} else {
-					gcc = graph
-						.run(new org.apache.flink.graph.library.clustering.undirected.GlobalClusteringCoefficient<LongValue,
NullValue, NullValue>());
-					lcc = graph
-						.run(new org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient<LongValue,
NullValue, NullValue>());
+						.fieldDelimiterEdges(fieldDelimiter);
+
+				switch (parameters.get("type", "")) {
+					case "integer": {
+						Graph<LongValue, NullValue, NullValue> graph = reader
+							.keyType(LongValue.class);
+
+						if (directedAlgorithm) {
+							gcc = graph
+								.run(new org.apache.flink.graph.library.clustering.directed.GlobalClusteringCoefficient<LongValue,
NullValue, NullValue>());
+							lcc = graph
+								.run(new org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient<LongValue,
NullValue, NullValue>());
+						} else {
+							gcc = graph
+								.run(new org.apache.flink.graph.library.clustering.undirected.GlobalClusteringCoefficient<LongValue,
NullValue, NullValue>());
+							lcc = graph
+								.run(new org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient<LongValue,
NullValue, NullValue>());
+						}
+					} break;
+
+					case "string": {
+						Graph<StringValue, NullValue, NullValue> graph = reader
+							.keyType(StringValue.class);
+
+						if (directedAlgorithm) {
+							gcc = graph
+								.run(new org.apache.flink.graph.library.clustering.directed.GlobalClusteringCoefficient<StringValue,
NullValue, NullValue>());
+							lcc = graph
+								.run(new org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient<StringValue,
NullValue, NullValue>());
+						} else {
+							gcc = graph
+								.run(new org.apache.flink.graph.library.clustering.undirected.GlobalClusteringCoefficient<StringValue,
NullValue, NullValue>());
+							lcc = graph
+								.run(new org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient<StringValue,
NullValue, NullValue>());
+						}
+					} break;
+
+					default:
+						printUsage();
+						return;
 				}
 			} break;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b71ac354/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/HITS.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/HITS.java
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/HITS.java
index c67fa9c..c772a3a 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/HITS.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/HITS.java
@@ -28,6 +28,7 @@ import org.apache.flink.api.java.io.CsvOutputFormat;
 import org.apache.flink.api.java.utils.DataSetUtils;
 import org.apache.flink.api.java.utils.ParameterTool;
 import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.GraphCsvReader;
 import org.apache.flink.graph.asm.simple.directed.Simplify;
 import org.apache.flink.graph.asm.translate.LongValueToIntValue;
 import org.apache.flink.graph.asm.translate.TranslateGraphIds;
@@ -38,6 +39,7 @@ import org.apache.flink.graph.library.link_analysis.HITS.Result;
 import org.apache.flink.types.IntValue;
 import org.apache.flink.types.LongValue;
 import org.apache.flink.types.NullValue;
+import org.apache.flink.types.StringValue;
 
 import java.text.NumberFormat;
 
@@ -59,14 +61,14 @@ public class HITS {
 	public static final int DEFAULT_EDGE_FACTOR = 16;
 
 	private static void printUsage() {
-		System.out.println(WordUtils.wrap("", 80));
-		System.out.println();
-		System.out.println(WordUtils.wrap("", 80));
+		System.out.println(WordUtils.wrap("Hyperlink-Induced Topic Search computes two interdependent"
+
+			" scores for every vertex in a directed graph. A good \"hub\" links to good \"authorities\""
+
+			" and good \"authorities\" are linked from good \"hubs\".", 80));
 		System.out.println();
 		System.out.println("usage: HITS --input <csv | rmat [options]> --output <print
| hash | csv [options]");
 		System.out.println();
 		System.out.println("options:");
-		System.out.println("  --input csv --input_filename FILENAME [--input_line_delimiter LINE_DELIMITER]
[--input_field_delimiter FIELD_DELIMITER]");
+		System.out.println("  --input csv --type <integer | string> --input_filename FILENAME
[--input_line_delimiter LINE_DELIMITER] [--input_field_delimiter FIELD_DELIMITER]");
 		System.out.println("  --input rmat [--scale SCALE] [--edge_factor EDGE_FACTOR]");
 		System.out.println();
 		System.out.println("  --output print");
@@ -92,15 +94,29 @@ public class HITS {
 				String fieldDelimiter = StringEscapeUtils.unescapeJava(
 					parameters.get("input_field_delimiter", CsvOutputFormat.DEFAULT_FIELD_DELIMITER));
 
-				Graph<LongValue, NullValue, NullValue> graph = Graph
+				GraphCsvReader reader = Graph
 					.fromCsvReader(parameters.get("input_filename"), env)
 						.ignoreCommentsEdges("#")
 						.lineDelimiterEdges(lineDelimiter)
-						.fieldDelimiterEdges(fieldDelimiter)
-						.keyType(LongValue.class);
-
-				hits = graph
-					.run(new org.apache.flink.graph.library.link_analysis.HITS<LongValue, NullValue,
NullValue>(iterations));
+						.fieldDelimiterEdges(fieldDelimiter);
+
+				switch (parameters.get("type", "")) {
+					case "integer": {
+						hits = reader
+							.keyType(LongValue.class)
+							.run(new org.apache.flink.graph.library.link_analysis.HITS<LongValue, NullValue,
NullValue>(iterations));
+					} break;
+
+					case "string": {
+						hits = reader
+							.keyType(StringValue.class)
+							.run(new org.apache.flink.graph.library.link_analysis.HITS<StringValue, NullValue,
NullValue>(iterations));
+					} break;
+
+					default:
+						printUsage();
+						return;
+				}
 				} break;
 
 			case "rmat": {

http://git-wip-us.apache.org/repos/asf/flink/blob/b71ac354/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/JaccardIndex.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/JaccardIndex.java
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/JaccardIndex.java
index 454afac..2158fa2 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/JaccardIndex.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/JaccardIndex.java
@@ -28,6 +28,7 @@ import org.apache.flink.api.java.io.CsvOutputFormat;
 import org.apache.flink.api.java.utils.DataSetUtils;
 import org.apache.flink.api.java.utils.ParameterTool;
 import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.GraphCsvReader;
 import org.apache.flink.graph.asm.simple.undirected.Simplify;
 import org.apache.flink.graph.asm.translate.LongValueToIntValue;
 import org.apache.flink.graph.asm.translate.TranslateGraphIds;
@@ -38,6 +39,7 @@ import org.apache.flink.graph.library.similarity.JaccardIndex.Result;
 import org.apache.flink.types.IntValue;
 import org.apache.flink.types.LongValue;
 import org.apache.flink.types.NullValue;
+import org.apache.flink.types.StringValue;
 
 import java.text.NumberFormat;
 
@@ -70,7 +72,7 @@ public class JaccardIndex {
 		System.out.println("usage: JaccardIndex --input <csv | rmat [options]> --output <print
| hash | csv [options]");
 		System.out.println();
 		System.out.println("options:");
-		System.out.println("  --input csv --input_filename FILENAME [--input_line_delimiter LINE_DELIMITER]
[--input_field_delimiter FIELD_DELIMITER]");
+		System.out.println("  --input csv --type <integer | string> --input_filename FILENAME
[--input_line_delimiter LINE_DELIMITER] [--input_field_delimiter FIELD_DELIMITER]");
 		System.out.println("  --input rmat [--scale SCALE] [--edge_factor EDGE_FACTOR]");
 		System.out.println();
 		System.out.println("  --output print");
@@ -95,16 +97,29 @@ public class JaccardIndex {
 				String fieldDelimiter = StringEscapeUtils.unescapeJava(
 					parameters.get("input_field_delimiter", CsvOutputFormat.DEFAULT_FIELD_DELIMITER));
 
-				Graph<LongValue, NullValue, NullValue> graph = Graph
+				GraphCsvReader reader = Graph
 					.fromCsvReader(parameters.get("input_filename"), env)
 						.ignoreCommentsEdges("#")
 						.lineDelimiterEdges(lineDelimiter)
-						.fieldDelimiterEdges(fieldDelimiter)
-						.keyType(LongValue.class);
-
-				ji = graph
-					.run(new org.apache.flink.graph.library.similarity.JaccardIndex<LongValue, NullValue,
NullValue>());
-
+						.fieldDelimiterEdges(fieldDelimiter);
+
+				switch (parameters.get("type", "")) {
+					case "integer": {
+						ji = reader
+							.keyType(LongValue.class)
+							.run(new org.apache.flink.graph.library.similarity.JaccardIndex<LongValue, NullValue,
NullValue>());
+					} break;
+
+					case "string": {
+						ji = reader
+							.keyType(StringValue.class)
+							.run(new org.apache.flink.graph.library.similarity.JaccardIndex<StringValue, NullValue,
NullValue>());
+					} break;
+
+					default:
+						printUsage();
+						return;
+				}
 				} break;
 
 			case "rmat": {

http://git-wip-us.apache.org/repos/asf/flink/blob/b71ac354/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/TriangleListing.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/TriangleListing.java
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/TriangleListing.java
index 9bd41f0..cd06dde 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/TriangleListing.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/TriangleListing.java
@@ -28,6 +28,7 @@ import org.apache.flink.api.java.io.CsvOutputFormat;
 import org.apache.flink.api.java.utils.DataSetUtils;
 import org.apache.flink.api.java.utils.ParameterTool;
 import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.GraphCsvReader;
 import org.apache.flink.graph.asm.simple.undirected.Simplify;
 import org.apache.flink.graph.asm.translate.LongValueToIntValue;
 import org.apache.flink.graph.asm.translate.TranslateGraphIds;
@@ -37,6 +38,7 @@ import org.apache.flink.graph.generator.random.RandomGenerableFactory;
 import org.apache.flink.types.IntValue;
 import org.apache.flink.types.LongValue;
 import org.apache.flink.types.NullValue;
+import org.apache.flink.types.StringValue;
 
 import java.text.NumberFormat;
 
@@ -67,7 +69,7 @@ public class TriangleListing {
 		System.out.println("usage: TriangleListing --directed <true | false> --input <csv
| rmat [options]> --output <print | hash | csv [options]");
 		System.out.println();
 		System.out.println("options:");
-		System.out.println("  --input csv --input_filename FILENAME [--input_line_delimiter LINE_DELIMITER]
[--input_field_delimiter FIELD_DELIMITER]");
+		System.out.println("  --input csv --type <integer | string> --input_filename FILENAME
[--input_line_delimiter LINE_DELIMITER] [--input_field_delimiter FIELD_DELIMITER]");
 		System.out.println("  --input rmat [--scale SCALE] [--edge_factor EDGE_FACTOR]");
 		System.out.println();
 		System.out.println("  --output print");
@@ -97,21 +99,45 @@ public class TriangleListing {
 				String fieldDelimiter = StringEscapeUtils.unescapeJava(
 					parameters.get("input_field_delimiter", CsvOutputFormat.DEFAULT_FIELD_DELIMITER));
 
-				Graph<LongValue, NullValue, NullValue> graph = Graph
+				GraphCsvReader reader = Graph
 					.fromCsvReader(parameters.get("input_filename"), env)
 						.ignoreCommentsEdges("#")
 						.lineDelimiterEdges(lineDelimiter)
-						.fieldDelimiterEdges(fieldDelimiter)
-						.keyType(LongValue.class);
-
-				if (directedAlgorithm) {
-					tl = graph
-						.run(new org.apache.flink.graph.library.clustering.directed.TriangleListing<LongValue,
NullValue, NullValue>());
-				} else {
-					tl = graph
-						.run(new org.apache.flink.graph.library.clustering.undirected.TriangleListing<LongValue,
NullValue, NullValue>());
+						.fieldDelimiterEdges(fieldDelimiter);
+
+				switch (parameters.get("type", "")) {
+					case "integer": {
+						Graph<LongValue, NullValue, NullValue> graph = reader
+							.keyType(LongValue.class);
+
+						if (directedAlgorithm) {
+							tl = graph
+								.run(new org.apache.flink.graph.library.clustering.directed.TriangleListing<LongValue,
NullValue, NullValue>());
+						} else {
+							tl = graph
+								.run(new org.apache.flink.graph.library.clustering.undirected.TriangleListing<LongValue,
NullValue, NullValue>());
+						}
+					} break;
+
+					case "string": {
+						Graph<StringValue, NullValue, NullValue> graph = reader
+							.keyType(StringValue.class);
+
+						if (directedAlgorithm) {
+							tl = graph
+								.run(new org.apache.flink.graph.library.clustering.directed.TriangleListing<StringValue,
NullValue, NullValue>());
+						} else {
+							tl = graph
+								.run(new org.apache.flink.graph.library.clustering.undirected.TriangleListing<StringValue,
NullValue, NullValue>());
+						}
+					} break;
+
+					default:
+						printUsage();
+						return;
 				}
 
+
 			} break;
 
 			case "rmat": {


Mime
View raw message