Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id E7AC7200C14 for ; Mon, 23 Jan 2017 14:12:57 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id E653C160B49; Mon, 23 Jan 2017 13:12:57 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id BC5B2160B3E for ; Mon, 23 Jan 2017 14:12:56 +0100 (CET) Received: (qmail 53765 invoked by uid 500); 23 Jan 2017 13:12:12 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 53392 invoked by uid 99); 23 Jan 2017 13:12:07 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 23 Jan 2017 13:12:07 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id DA467DFCF5; Mon, 23 Jan 2017 13:12:06 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sewen@apache.org To: commits@flink.apache.org Date: Mon, 23 Jan 2017 13:12:10 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [5/6] flink git commit: [FLINK-5562] [gelly] Driver fixes: Improve parametrization and output formatting. archived-at: Mon, 23 Jan 2017 13:12:58 -0000 [FLINK-5562] [gelly] Driver fixes: Improve parametrization and output formatting. This closes #3187 This closes #3188 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a246ec2a Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a246ec2a Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a246ec2a Branch: refs/heads/master Commit: a246ec2a558bcd9d4add718c429be648a947422f Parents: 741e2f5 Author: Greg Hogan Authored: Wed Oct 26 15:18:50 2016 -0400 Committer: Stephan Ewen Committed: Mon Jan 23 14:03:04 2017 +0100 ---------------------------------------------------------------------- .../graph/drivers/ClusteringCoefficient.java | 2 +- .../apache/flink/graph/drivers/Graph500.java | 37 ++++++++++++++++---- .../flink/graph/drivers/GraphMetrics.java | 11 +++--- .../org/apache/flink/graph/drivers/HITS.java | 11 +++--- .../flink/graph/drivers/JaccardIndex.java | 11 +++--- .../flink/graph/drivers/TriangleListing.java | 7 ++-- .../flink/graph/AbstractGraphAnalytic.java | 5 +-- .../org/apache/flink/graph/AnalyticHelper.java | 8 ++++- 8 files changed, 65 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/a246ec2a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/ClusteringCoefficient.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/ClusteringCoefficient.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/ClusteringCoefficient.java index cd28ee4..79a17a4 100644 --- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/ClusteringCoefficient.java +++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/ClusteringCoefficient.java @@ -78,7 +78,7 @@ public class ClusteringCoefficient { .appendln(WordUtils.wrap("This algorithm returns tuples containing the vertex ID, the degree of" + " the vertex, and the number of edges between vertex neighbors.", 80)) .appendNewLine() - .appendln("usage: ClusteringCoefficient --directed --input --output ") + .appendln("usage: ClusteringCoefficient --directed --input --output ") .appendNewLine() .appendln("options:") .appendln(" --input csv --type [--simplify ] --input_filename FILENAME [--input_line_delimiter LINE_DELIMITER] [--input_field_delimiter FIELD_DELIMITER]") http://git-wip-us.apache.org/repos/asf/flink/blob/a246ec2a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/Graph500.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/Graph500.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/Graph500.java index 3509d2e..c2abbf7 100644 --- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/Graph500.java +++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/Graph500.java @@ -31,7 +31,6 @@ import org.apache.flink.api.java.utils.DataSetUtils; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.client.program.ProgramParametrizationException; import org.apache.flink.graph.Graph; -import org.apache.flink.graph.asm.simple.undirected.Simplify; import org.apache.flink.graph.generator.RMatGraph; import org.apache.flink.graph.generator.random.JDKRandomGeneratorFactory; import org.apache.flink.graph.generator.random.RandomGenerableFactory; @@ -53,8 +52,6 @@ public class Graph500 { private static final int DEFAULT_EDGE_FACTOR = 16; - private static final boolean DEFAULT_SIMPLIFY = false; - private static final boolean DEFAULT_CLIP_AND_FLIP = true; private static String getUsage(String message) { @@ -68,6 +65,9 @@ public class Graph500 { .appendNewLine() .appendln("Note: this does not yet implement permutation of vertex labels or edges.") .appendNewLine() + .appendln("usage: Graph500 --directed --simplify --output ") + .appendNewLine() + .appendln("options:") .appendln(" --output print") .appendln(" --output hash") .appendln(" --output csv --output_filename FILENAME [--output_line_delimiter LINE_DELIMITER] [--output_field_delimiter FIELD_DELIMITER]") @@ -84,6 +84,17 @@ public class Graph500 { ParameterTool parameters = ParameterTool.fromArgs(args); env.getConfig().setGlobalJobParameters(parameters); + if (! parameters.has("directed")) { + throw new ProgramParametrizationException(getUsage("must declare execution mode as '--directed true' or '--directed false'")); + } + boolean directed = parameters.getBoolean("directed"); + + if (! parameters.has("simplify")) { + throw new ProgramParametrizationException(getUsage("must declare '--simplify true' or '--simplify false'")); + } + boolean simplify = parameters.getBoolean("simplify"); + + // Generate RMat graph int scale = parameters.getInt("scale", DEFAULT_SCALE); int edgeFactor = parameters.getInt("edge_factor", DEFAULT_EDGE_FACTOR); @@ -93,14 +104,23 @@ public class Graph500 { long vertexCount = 1L << scale; long edgeCount = vertexCount * edgeFactor; - boolean simplify = parameters.getBoolean("simplify", DEFAULT_SIMPLIFY); boolean clipAndFlip = parameters.getBoolean("clip_and_flip", DEFAULT_CLIP_AND_FLIP); Graph graph = new RMatGraph<>(env, rnd, vertexCount, edgeCount) .generate(); - if (simplify) { - graph = graph.run(new Simplify(clipAndFlip)); + if (directed) { + if (simplify) { + graph = graph + .run(new org.apache.flink.graph.asm.simple.directed.Simplify()); + } + } else { + if (simplify) { + graph = graph + .run(new org.apache.flink.graph.asm.simple.undirected.Simplify(clipAndFlip)); + } else { + graph = graph.getUndirected(); + } } DataSet> edges = graph @@ -110,15 +130,17 @@ public class Graph500 { // Print, hash, or write RMat graph to disk switch (parameters.get("output", "")) { case "print": + System.out.println(); edges.print(); break; case "hash": + System.out.println(); System.out.println(DataSetUtils.checksumHashCode(edges)); break; case "csv": - String filename = parameters.get("filename"); + String filename = parameters.getRequired("output_filename"); String lineDelimiter = StringEscapeUtils.unescapeJava( parameters.get("output_line_delimiter", CsvOutputFormat.DEFAULT_LINE_DELIMITER)); @@ -137,6 +159,7 @@ public class Graph500 { JobExecutionResult result = env.getLastJobExecutionResult(); NumberFormat nf = NumberFormat.getInstance(); + System.out.println(); System.out.println("Execution runtime: " + nf.format(result.getNetRuntime()) + " ms"); } } http://git-wip-us.apache.org/repos/asf/flink/blob/a246ec2a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/GraphMetrics.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/GraphMetrics.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/GraphMetrics.java index 899ae66..9b246df 100644 --- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/GraphMetrics.java +++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/GraphMetrics.java @@ -63,7 +63,7 @@ public class GraphMetrics { .appendNewLine() .appendln(WordUtils.wrap("Computes vertex and edge metrics on a directed or undirected graph.", 80)) .appendNewLine() - .appendln("usage: GraphMetrics --directed --input ") + .appendln("usage: GraphMetrics --directed --input ") .appendNewLine() .appendln("options:") .appendln(" --input csv --type [--simplify ] --input_filename FILENAME [--input_line_delimiter LINE_DELIMITER] [--input_field_delimiter FIELD_DELIMITER]") @@ -98,7 +98,7 @@ public class GraphMetrics { parameters.get("input_field_delimiter", CsvOutputFormat.DEFAULT_FIELD_DELIMITER)); GraphCsvReader reader = Graph - .fromCsvReader(parameters.get("input_filename"), env) + .fromCsvReader(parameters.getRequired("input_filename"), env) .ignoreCommentsEdges("#") .lineDelimiterEdges(lineDelimiter) .fieldDelimiterEdges(fieldDelimiter); @@ -225,14 +225,17 @@ public class GraphMetrics { env.execute("Graph Metrics"); + System.out.println(); System.out.print("Vertex metrics:\n "); System.out.println(vm.getResult().toString().replace(";", "\n ")); - System.out.print("\nEdge metrics:\n "); + System.out.println(); + System.out.print("Edge metrics:\n "); System.out.println(em.getResult().toString().replace(";", "\n ")); JobExecutionResult result = env.getLastJobExecutionResult(); NumberFormat nf = NumberFormat.getInstance(); - System.out.println("\nExecution runtime: " + nf.format(result.getNetRuntime()) + " ms"); + System.out.println(); + System.out.println("Execution runtime: " + nf.format(result.getNetRuntime()) + " ms"); } } http://git-wip-us.apache.org/repos/asf/flink/blob/a246ec2a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/HITS.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/HITS.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/HITS.java index b035bd7..453b543 100644 --- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/HITS.java +++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/HITS.java @@ -69,7 +69,7 @@ public class HITS { " scores for every vertex in a directed graph. A good \"hub\" links to good \"authorities\"" + " and good \"authorities\" are linked from good \"hubs\".", 80)) .appendNewLine() - .appendln("usage: HITS --input --output ") + .appendln("usage: HITS --input --output ") .appendNewLine() .appendln("options:") .appendln(" --input csv --type --input_filename FILENAME [--input_line_delimiter LINE_DELIMITER] [--input_field_delimiter FIELD_DELIMITER]") @@ -104,7 +104,7 @@ public class HITS { parameters.get("input_field_delimiter", CsvOutputFormat.DEFAULT_FIELD_DELIMITER)); GraphCsvReader reader = Graph - .fromCsvReader(parameters.get("input_filename"), env) + .fromCsvReader(parameters.getRequired("input_filename"), env) .ignoreCommentsEdges("#") .lineDelimiterEdges(lineDelimiter) .fieldDelimiterEdges(fieldDelimiter); @@ -157,17 +157,19 @@ public class HITS { switch (parameters.get("output", "")) { case "print": + System.out.println(); for (Object e: hits.collect()) { System.out.println(((Result)e).toVerboseString()); } break; case "hash": + System.out.println(); System.out.println(DataSetUtils.checksumHashCode(hits)); break; case "csv": - String filename = parameters.get("output_filename"); + String filename = parameters.getRequired("output_filename"); String lineDelimiter = StringEscapeUtils.unescapeJava( parameters.get("output_line_delimiter", CsvOutputFormat.DEFAULT_LINE_DELIMITER)); @@ -177,7 +179,7 @@ public class HITS { hits.writeAsCsv(filename, lineDelimiter, fieldDelimiter); - env.execute(); + env.execute("HITS"); break; default: throw new ProgramParametrizationException(getUsage("invalid output type")); @@ -186,6 +188,7 @@ public class HITS { JobExecutionResult result = env.getLastJobExecutionResult(); NumberFormat nf = NumberFormat.getInstance(); + System.out.println(); System.out.println("Execution runtime: " + nf.format(result.getNetRuntime()) + " ms"); } } http://git-wip-us.apache.org/repos/asf/flink/blob/a246ec2a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/JaccardIndex.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/JaccardIndex.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/JaccardIndex.java index cb11af9..abb675a 100644 --- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/JaccardIndex.java +++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/JaccardIndex.java @@ -75,7 +75,7 @@ public class JaccardIndex { .appendln(WordUtils.wrap("This algorithm returns 4-tuples containing two vertex IDs, the" + " number of shared neighbors, and the number of distinct neighbors.", 80)) .appendNewLine() - .appendln("usage: JaccardIndex --input --output ") + .appendln("usage: JaccardIndex --input --output ") .appendNewLine() .appendln("options:") .appendln(" --input csv --type [--simplify ] --input_filename FILENAME [--input_line_delimiter LINE_DELIMITER] [--input_field_delimiter FIELD_DELIMITER]") @@ -110,7 +110,7 @@ public class JaccardIndex { parameters.get("input_field_delimiter", CsvOutputFormat.DEFAULT_FIELD_DELIMITER)); GraphCsvReader reader = Graph - .fromCsvReader(parameters.get("input_filename"), env) + .fromCsvReader(parameters.getRequired("input_filename"), env) .ignoreCommentsEdges("#") .lineDelimiterEdges(lineDelimiter) .fieldDelimiterEdges(fieldDelimiter); @@ -137,7 +137,7 @@ public class JaccardIndex { if (parameters.getBoolean("simplify", false)) { graph = graph - .run(new org.apache.flink.graph.asm.simple.directed.Simplify() + .run(new org.apache.flink.graph.asm.simple.undirected.Simplify(false) .setParallelism(little_parallelism)); } @@ -189,6 +189,7 @@ public class JaccardIndex { switch (parameters.get("output", "")) { case "print": + System.out.println(); for (Object e: ji.collect()) { Result result = (Result)e; System.out.println(result.toVerboseString()); @@ -196,11 +197,12 @@ public class JaccardIndex { break; case "hash": + System.out.println(); System.out.println(DataSetUtils.checksumHashCode(ji)); break; case "csv": - String filename = parameters.get("output_filename"); + String filename = parameters.getRequired("output_filename"); String lineDelimiter = StringEscapeUtils.unescapeJava( parameters.get("output_line_delimiter", CsvOutputFormat.DEFAULT_LINE_DELIMITER)); @@ -220,6 +222,7 @@ public class JaccardIndex { JobExecutionResult result = env.getLastJobExecutionResult(); NumberFormat nf = NumberFormat.getInstance(); + System.out.println(); System.out.println("Execution runtime: " + nf.format(result.getNetRuntime()) + " ms"); } } http://git-wip-us.apache.org/repos/asf/flink/blob/a246ec2a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/TriangleListing.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/TriangleListing.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/TriangleListing.java index 818b0d8..1fecc3d 100644 --- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/TriangleListing.java +++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/TriangleListing.java @@ -116,7 +116,7 @@ public class TriangleListing { parameters.get("input_field_delimiter", CsvOutputFormat.DEFAULT_FIELD_DELIMITER)); GraphCsvReader reader = Graph - .fromCsvReader(parameters.get("input_filename"), env) + .fromCsvReader(parameters.getRequired("input_filename"), env) .ignoreCommentsEdges("#") .lineDelimiterEdges(lineDelimiter) .fieldDelimiterEdges(fieldDelimiter); @@ -284,6 +284,7 @@ public class TriangleListing { switch (parameters.get("output", "")) { case "print": + System.out.println(); if (directedAlgorithm) { for (Object e: tl.collect()) { org.apache.flink.graph.library.clustering.directed.TriangleListing.Result result = @@ -296,11 +297,12 @@ public class TriangleListing { break; case "hash": + System.out.println(); System.out.println(DataSetUtils.checksumHashCode(tl)); break; case "csv": - String filename = parameters.get("output_filename"); + String filename = parameters.getRequired("output_filename"); String lineDelimiter = StringEscapeUtils.unescapeJava( parameters.get("output_line_delimiter", CsvOutputFormat.DEFAULT_LINE_DELIMITER)); @@ -324,6 +326,7 @@ public class TriangleListing { JobExecutionResult result = env.getLastJobExecutionResult(); NumberFormat nf = NumberFormat.getInstance(); + System.out.println(); System.out.println("Execution runtime: " + nf.format(result.getNetRuntime()) + " ms"); } } http://git-wip-us.apache.org/repos/asf/flink/blob/a246ec2a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/AbstractGraphAnalytic.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/AbstractGraphAnalytic.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/AbstractGraphAnalytic.java index b13e82e..4d3d055 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/AbstractGraphAnalytic.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/AbstractGraphAnalytic.java @@ -38,14 +38,12 @@ implements GraphAnalytic { public GraphAnalytic run(Graph input) throws Exception { env = input.getContext(); - return null; + return this; } @Override public T execute() throws Exception { - Preconditions.checkNotNull(env); - env.execute(); return getResult(); } @@ -54,7 +52,6 @@ implements GraphAnalytic { public T execute(String jobName) throws Exception { Preconditions.checkNotNull(jobName); - Preconditions.checkNotNull(env); env.execute(jobName); return getResult(); http://git-wip-us.apache.org/repos/asf/flink/blob/a246ec2a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/AnalyticHelper.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/AnalyticHelper.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/AnalyticHelper.java index b07a8c3..dbe3e0c 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/AnalyticHelper.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/AnalyticHelper.java @@ -18,11 +18,13 @@ package org.apache.flink.graph; +import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.api.common.accumulators.Accumulator; import org.apache.flink.api.common.io.RichOutputFormat; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.configuration.Configuration; import org.apache.flink.util.AbstractID; +import org.apache.flink.util.Preconditions; import java.io.IOException; import java.io.Serializable; @@ -74,6 +76,10 @@ extends RichOutputFormat { * @return The value of the accumulator with the given name */ public A getAccumulator(ExecutionEnvironment env, String accumulatorName) { - return env.getLastJobExecutionResult().getAccumulatorResult(id + SEPARATOR + accumulatorName); + JobExecutionResult result = env.getLastJobExecutionResult(); + + Preconditions.checkNotNull(result, "No result found for job, was execute() called before getting the result?"); + + return result.getAccumulatorResult(id + SEPARATOR + accumulatorName); } }