Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 8EDF5200C3A for ; Fri, 31 Mar 2017 18:35:30 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 8D89C160B7C; Fri, 31 Mar 2017 16:35:30 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 5E3D3160B9F for ; Fri, 31 Mar 2017 18:35:28 +0200 (CEST) Received: (qmail 91387 invoked by uid 500); 31 Mar 2017 16:35:27 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 91085 invoked by uid 99); 31 Mar 2017 16:35:24 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 31 Mar 2017 16:35:24 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 8D3E8DFF7C; Fri, 31 Mar 2017 16:35:24 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: greg@apache.org To: commits@flink.apache.org Date: Fri, 31 Mar 2017 16:35:29 -0000 Message-Id: <76e45de2dc114ad584f900f1db15d86c@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [6/6] flink git commit: [FLINK-5913] [gelly] Example drivers archived-at: Fri, 31 Mar 2017 16:35:30 -0000 [FLINK-5913] [gelly] Example drivers Replace existing and create new algorithm Driver implementations for each of the library methods. This closes #3635 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a48357db Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a48357db Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a48357db Branch: refs/heads/master Commit: a48357db8c4187fd08f3b17880899ebbcb5d3b5e Parents: ded25be Author: Greg Hogan Authored: Wed Oct 26 15:18:50 2016 -0400 Committer: Greg Hogan Committed: Fri Mar 31 11:17:26 2017 -0400 ---------------------------------------------------------------------- .../main/java/org/apache/flink/graph/Usage.java | 2 - .../apache/flink/graph/drivers/AdamicAdar.java | 71 ++++ .../graph/drivers/ClusteringCoefficient.java | 378 +++++-------------- .../graph/drivers/ConnectedComponents.java | 105 ++++++ .../apache/flink/graph/drivers/EdgeList.java | 92 +++++ .../apache/flink/graph/drivers/Graph500.java | 165 -------- .../flink/graph/drivers/GraphMetrics.java | 265 ++++--------- .../org/apache/flink/graph/drivers/HITS.java | 188 ++------- .../flink/graph/drivers/JaccardIndex.java | 224 ++--------- .../apache/flink/graph/drivers/PageRank.java | 74 ++++ .../flink/graph/drivers/SimpleDriver.java | 65 ++++ .../flink/graph/drivers/TriangleListing.java | 362 +++++------------- .../drivers/parameter/IterationConvergence.java | 89 +++++ .../graph/examples/ConnectedComponents.java | 141 ------- .../examples/GSASingleSourceShortestPaths.java | 4 +- .../parameter/IterationConvergenceTest.java | 66 ++++ .../examples/ConnectedComponentsITCase.java | 72 ---- .../main/java/org/apache/flink/graph/Graph.java | 16 +- .../graph/library/ConnectedComponents.java | 5 +- .../graph/library/GSAConnectedComponents.java | 8 +- .../flink/graph/library/LabelPropagation.java | 5 +- .../clustering/directed/TriangleListing.java | 2 +- .../undirected/LocalClusteringCoefficient.java | 2 +- .../graph/library/link_analysis/PageRank.java | 8 +- .../graph/library/similarity/AdamicAdar.java | 2 +- .../graph/library/similarity/JaccardIndex.java | 2 +- .../apache/flink/graph/utils/GraphUtils.java | 10 +- .../flink/graph/utils/NullValueEdgeMapper.java | 32 -- 28 files changed, 919 insertions(+), 1536 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/a48357db/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/Usage.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/Usage.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/Usage.java index d923bf0..642fe5b 100644 --- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/Usage.java +++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/Usage.java @@ -28,7 +28,6 @@ public class Usage { private static final Class[] DRIVERS = new Class[]{ org.apache.flink.graph.drivers.ClusteringCoefficient.class, - org.apache.flink.graph.drivers.Graph500.class, org.apache.flink.graph.drivers.GraphMetrics.class, org.apache.flink.graph.drivers.HITS.class, org.apache.flink.graph.drivers.JaccardIndex.class, @@ -36,7 +35,6 @@ public class Usage { }; private static final Class[] EXAMPLES = new Class[]{ - org.apache.flink.graph.examples.ConnectedComponents.class, org.apache.flink.graph.examples.EuclideanGraphWeighing.class, org.apache.flink.graph.examples.GSASingleSourceShortestPaths.class, org.apache.flink.graph.examples.IncrementalSSSP.class, http://git-wip-us.apache.org/repos/asf/flink/blob/a48357db/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/AdamicAdar.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/AdamicAdar.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/AdamicAdar.java new file mode 100644 index 0000000..742c1de --- /dev/null +++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/AdamicAdar.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.drivers; + +import org.apache.commons.lang3.text.StrBuilder; +import org.apache.commons.lang3.text.WordUtils; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.drivers.output.CSV; +import org.apache.flink.graph.drivers.output.Print; +import org.apache.flink.graph.drivers.parameter.LongParameter; +import org.apache.flink.graph.library.similarity.AdamicAdar.Result; +import org.apache.flink.types.CopyableValue; + +import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT; + +/** + * Driver for {@link org.apache.flink.graph.library.similarity.AdamicAdar}. + */ +public class AdamicAdar, VV, EV> +extends SimpleDriver> +implements Driver, CSV, Print { + + private LongParameter littleParallelism = new LongParameter(this, "little_parallelism") + .setDefaultValue(PARALLELISM_DEFAULT); + + @Override + public String getName() { + return this.getClass().getSimpleName(); + } + + @Override + public String getShortDescription() { + return "similarity score weighted by centerpoint degree"; + } + + @Override + public String getLongDescription() { + return WordUtils.wrap(new StrBuilder() + .appendln("Adamic-Adar measures the similarity between vertex neighborhoods and is " + + "computed as the sum of the inverse logarithm of centerpoint degree over shared " + + "neighbors.") + .appendNewLine() + .append("The algorithm result contains two vertex IDs and the similarity score.") + .toString(), 80); + } + + @Override + public void plan(Graph graph) throws Exception { + int lp = littleParallelism.getValue().intValue(); + + result = graph + .run(new org.apache.flink.graph.library.similarity.AdamicAdar() + .setLittleParallelism(lp)); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/a48357db/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/ClusteringCoefficient.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/ClusteringCoefficient.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/ClusteringCoefficient.java index 004390d..c463c0a 100644 --- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/ClusteringCoefficient.java +++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/ClusteringCoefficient.java @@ -18,333 +18,127 @@ package org.apache.flink.graph.drivers; -import org.apache.commons.lang3.StringEscapeUtils; import org.apache.commons.lang3.text.StrBuilder; import org.apache.commons.lang3.text.WordUtils; -import org.apache.commons.math3.random.JDKRandomGenerator; -import org.apache.flink.api.common.JobExecutionResult; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.io.CsvOutputFormat; -import org.apache.flink.api.java.utils.DataSetUtils; -import org.apache.flink.api.java.utils.ParameterTool; -import org.apache.flink.client.program.ProgramParametrizationException; import org.apache.flink.graph.Graph; import org.apache.flink.graph.GraphAnalytic; -import org.apache.flink.graph.GraphCsvReader; -import org.apache.flink.graph.asm.translate.TranslateGraphIds; -import org.apache.flink.graph.asm.translate.translators.LongValueToUnsignedIntValue; -import org.apache.flink.graph.generator.RMatGraph; -import org.apache.flink.graph.generator.random.JDKRandomGeneratorFactory; -import org.apache.flink.graph.generator.random.RandomGenerableFactory; -import org.apache.flink.types.IntValue; -import org.apache.flink.types.LongValue; -import org.apache.flink.types.NullValue; -import org.apache.flink.types.StringValue; - -import java.text.NumberFormat; +import org.apache.flink.graph.asm.result.PrintableResult; +import org.apache.flink.graph.drivers.output.CSV; +import org.apache.flink.graph.drivers.output.Hash; +import org.apache.flink.graph.drivers.output.Print; +import org.apache.flink.graph.drivers.parameter.ChoiceParameter; +import org.apache.flink.graph.drivers.parameter.LongParameter; +import org.apache.flink.types.CopyableValue; import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT; /** - * Driver for the library implementations of Global and Local Clustering Coefficient. - * - * This example reads a simple directed or undirected graph from a CSV file or - * generates an RMat graph with the given scale and edge factor then calculates - * the local clustering coefficient for each vertex and the global clustering - * coefficient for the graph. + * Driver for directed and undirected clustering coefficient algorithm and analytics. * + * @see org.apache.flink.graph.library.clustering.directed.AverageClusteringCoefficient * @see org.apache.flink.graph.library.clustering.directed.GlobalClusteringCoefficient * @see org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient + * @see org.apache.flink.graph.library.clustering.undirected.AverageClusteringCoefficient * @see org.apache.flink.graph.library.clustering.undirected.GlobalClusteringCoefficient * @see org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient */ -public class ClusteringCoefficient { - - private static final int DEFAULT_SCALE = 10; - - private static final int DEFAULT_EDGE_FACTOR = 16; - - private static final boolean DEFAULT_CLIP_AND_FLIP = true; - - private static String getUsage(String message) { - return new StrBuilder() - .appendNewLine() - .appendln(WordUtils.wrap("The local clustering coefficient measures the connectedness of each" + - " vertex's neighborhood and the global clustering coefficient measures the connectedness of the graph." + - " Scores range from 0.0 (no edges between neighbors or vertices) to 1.0 (neighborhood or graph" + - " is a clique).", 80)) - .appendNewLine() - .appendln(WordUtils.wrap("This algorithm returns tuples containing the vertex ID, the degree of" + - " the vertex, and the number of edges between vertex neighbors.", 80)) - .appendNewLine() - .appendln("usage: ClusteringCoefficient --directed --input --output ") - .appendNewLine() - .appendln("options:") - .appendln(" --input csv --type [--simplify ] --input_filename FILENAME [--input_line_delimiter LINE_DELIMITER] [--input_field_delimiter FIELD_DELIMITER]") - .appendln(" --input rmat [--scale SCALE] [--edge_factor EDGE_FACTOR]") - .appendNewLine() - .appendln(" --output print") - .appendln(" --output hash") - .appendln(" --output csv --output_filename FILENAME [--output_line_delimiter LINE_DELIMITER] [--output_field_delimiter FIELD_DELIMITER]") - .appendNewLine() - .appendln("Usage error: " + message) - .toString(); - } - - public static void main(String[] args) throws Exception { - // Set up the execution environment - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - env.getConfig().enableObjectReuse(); - - ParameterTool parameters = ParameterTool.fromArgs(args); - env.getConfig().setGlobalJobParameters(parameters); - - if (! parameters.has("directed")) { - throw new ProgramParametrizationException(getUsage("must declare execution mode as '--directed true' or '--directed false'")); - } - boolean directedAlgorithm = parameters.getBoolean("directed"); - - int little_parallelism = parameters.getInt("little_parallelism", PARALLELISM_DEFAULT); - - // global and local clustering coefficient results - GraphAnalytic gcc; - GraphAnalytic acc; - DataSet lcc; - - switch (parameters.get("input", "")) { - case "csv": { - String lineDelimiter = StringEscapeUtils.unescapeJava( - parameters.get("input_line_delimiter", CsvOutputFormat.DEFAULT_LINE_DELIMITER)); - - String fieldDelimiter = StringEscapeUtils.unescapeJava( - parameters.get("input_field_delimiter", CsvOutputFormat.DEFAULT_FIELD_DELIMITER)); - - GraphCsvReader reader = Graph - .fromCsvReader(parameters.get("input_filename"), env) - .ignoreCommentsEdges("#") - .lineDelimiterEdges(lineDelimiter) - .fieldDelimiterEdges(fieldDelimiter); - - switch (parameters.get("type", "")) { - case "integer": { - Graph graph = reader - .keyType(LongValue.class); +public class ClusteringCoefficient & CopyableValue, VV, EV> +extends SimpleDriver +implements Driver, CSV, Hash, Print { - if (directedAlgorithm) { - if (parameters.getBoolean("simplify", false)) { - graph = graph - .run(new org.apache.flink.graph.asm.simple.directed.Simplify() - .setParallelism(little_parallelism)); - } + private static final String DIRECTED = "directed"; - gcc = graph - .run(new org.apache.flink.graph.library.clustering.directed.GlobalClusteringCoefficient() - .setLittleParallelism(little_parallelism)); - acc = graph - .run(new org.apache.flink.graph.library.clustering.directed.AverageClusteringCoefficient() - .setLittleParallelism(little_parallelism)); - lcc = graph - .run(new org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient() - .setLittleParallelism(little_parallelism)); - } else { - if (parameters.getBoolean("simplify", false)) { - graph = graph - .run(new org.apache.flink.graph.asm.simple.undirected.Simplify(false) - .setParallelism(little_parallelism)); - } + private static final String UNDIRECTED = "undirected"; - gcc = graph - .run(new org.apache.flink.graph.library.clustering.undirected.GlobalClusteringCoefficient() - .setLittleParallelism(little_parallelism)); - acc = graph - .run(new org.apache.flink.graph.library.clustering.undirected.AverageClusteringCoefficient() - .setLittleParallelism(little_parallelism)); - lcc = graph - .run(new org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient() - .setLittleParallelism(little_parallelism)); - } - } break; + private ChoiceParameter order = new ChoiceParameter(this, "order") + .addChoices(DIRECTED, UNDIRECTED); - case "string": { - Graph graph = reader - .keyType(StringValue.class); + private LongParameter littleParallelism = new LongParameter(this, "little_parallelism") + .setDefaultValue(PARALLELISM_DEFAULT); - if (directedAlgorithm) { - if (parameters.getBoolean("simplify", false)) { - graph = graph - .run(new org.apache.flink.graph.asm.simple.directed.Simplify() - .setParallelism(little_parallelism)); - } + private GraphAnalytic globalClusteringCoefficient; - gcc = graph - .run(new org.apache.flink.graph.library.clustering.directed.GlobalClusteringCoefficient() - .setLittleParallelism(little_parallelism)); - acc = graph - .run(new org.apache.flink.graph.library.clustering.directed.AverageClusteringCoefficient() - .setLittleParallelism(little_parallelism)); - lcc = graph - .run(new org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient() - .setLittleParallelism(little_parallelism)); - } else { - if (parameters.getBoolean("simplify", false)) { - graph = graph - .run(new org.apache.flink.graph.asm.simple.undirected.Simplify(false) - .setParallelism(little_parallelism)); - } + private GraphAnalytic averageClusteringCoefficient; - gcc = graph - .run(new org.apache.flink.graph.library.clustering.undirected.GlobalClusteringCoefficient() - .setLittleParallelism(little_parallelism)); - acc = graph - .run(new org.apache.flink.graph.library.clustering.undirected.AverageClusteringCoefficient() - .setLittleParallelism(little_parallelism)); - lcc = graph - .run(new org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient() - .setLittleParallelism(little_parallelism)); - } - } break; - - default: - throw new ProgramParametrizationException(getUsage("invalid CSV type")); - } - } break; - - case "rmat": { - int scale = parameters.getInt("scale", DEFAULT_SCALE); - int edgeFactor = parameters.getInt("edge_factor", DEFAULT_EDGE_FACTOR); - - RandomGenerableFactory rnd = new JDKRandomGeneratorFactory(); - - long vertexCount = 1L << scale; - long edgeCount = vertexCount * edgeFactor; - - Graph graph = new RMatGraph<>(env, rnd, vertexCount, edgeCount) - .setParallelism(little_parallelism) - .generate(); - - if (directedAlgorithm) { - if (scale > 32) { - Graph newGraph = graph - .run(new org.apache.flink.graph.asm.simple.directed.Simplify() - .setParallelism(little_parallelism)); - - gcc = newGraph - .run(new org.apache.flink.graph.library.clustering.directed.GlobalClusteringCoefficient() - .setLittleParallelism(little_parallelism)); - acc = newGraph - .run(new org.apache.flink.graph.library.clustering.directed.AverageClusteringCoefficient() - .setLittleParallelism(little_parallelism)); - lcc = newGraph - .run(new org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient() - .setIncludeZeroDegreeVertices(false) - .setLittleParallelism(little_parallelism)); - } else { - Graph newGraph = graph - .run(new TranslateGraphIds(new LongValueToUnsignedIntValue()) - .setParallelism(little_parallelism)) - .run(new org.apache.flink.graph.asm.simple.directed.Simplify() - .setParallelism(little_parallelism)); - - gcc = newGraph - .run(new org.apache.flink.graph.library.clustering.directed.GlobalClusteringCoefficient() - .setLittleParallelism(little_parallelism)); - acc = newGraph - .run(new org.apache.flink.graph.library.clustering.directed.AverageClusteringCoefficient() - .setLittleParallelism(little_parallelism)); - lcc = newGraph - .run(new org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient() - .setIncludeZeroDegreeVertices(false) - .setLittleParallelism(little_parallelism)); - } - } else { - boolean clipAndFlip = parameters.getBoolean("clip_and_flip", DEFAULT_CLIP_AND_FLIP); + @Override + public String getName() { + return this.getClass().getSimpleName(); + } - if (scale > 32) { - Graph newGraph = graph - .run(new org.apache.flink.graph.asm.simple.undirected.Simplify(clipAndFlip) - .setParallelism(little_parallelism)); + @Override + public String getShortDescription() { + return "measure the connectedness of vertex neighborhoods"; + } - gcc = newGraph - .run(new org.apache.flink.graph.library.clustering.undirected.GlobalClusteringCoefficient() - .setLittleParallelism(little_parallelism)); - acc = newGraph - .run(new org.apache.flink.graph.library.clustering.undirected.AverageClusteringCoefficient() - .setLittleParallelism(little_parallelism)); - lcc = newGraph - .run(new org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient() - .setIncludeZeroDegreeVertices(false) - .setLittleParallelism(little_parallelism)); - } else { - Graph newGraph = graph - .run(new TranslateGraphIds(new LongValueToUnsignedIntValue()) - .setParallelism(little_parallelism)) - .run(new org.apache.flink.graph.asm.simple.undirected.Simplify(clipAndFlip) - .setParallelism(little_parallelism)); + @Override + public String getLongDescription() { + return WordUtils.wrap(new StrBuilder() + .appendln("The local clustering coefficient measures the connectedness of each " + + "vertex's neighborhood. The global clustering coefficient measures the " + + "connected of the graph. The average clustering coefficient is the mean local " + + "clustering coefficient. Each score ranges from 0.0 (no edges between vertex " + + "neighbors) to 1.0 (neighborhood or graph is a clique).") + .appendNewLine() + .append("The algorithm result contains the vertex ID, degree, and number of edges " + + "connecting neighbors.") + .toString(), 80); + } - gcc = newGraph - .run(new org.apache.flink.graph.library.clustering.undirected.GlobalClusteringCoefficient() - .setLittleParallelism(little_parallelism)); - acc = newGraph - .run(new org.apache.flink.graph.library.clustering.undirected.AverageClusteringCoefficient() - .setLittleParallelism(little_parallelism)); - lcc = newGraph - .run(new org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient() - .setIncludeZeroDegreeVertices(false) - .setLittleParallelism(little_parallelism)); - } - } - } break; + @Override + public void plan(Graph graph) throws Exception { + int lp = littleParallelism.getValue().intValue(); - default: - throw new ProgramParametrizationException(getUsage("invalid input type")); - } + switch (order.getValue()) { + case DIRECTED: + result = graph + .run(new org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient() + .setLittleParallelism(lp)); - switch (parameters.get("output", "")) { - case "print": - if (directedAlgorithm) { - for (Object e: lcc.collect()) { - org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient.Result result = - (org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient.Result)e; - System.out.println(result.toPrintableString()); - } - } else { - for (Object e: lcc.collect()) { - org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient.Result result = - (org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient.Result)e; - System.out.println(result.toPrintableString()); - } - } - break; + globalClusteringCoefficient = graph + .run(new org.apache.flink.graph.library.clustering.directed.GlobalClusteringCoefficient() + .setLittleParallelism(lp)); - case "hash": - System.out.println(DataSetUtils.checksumHashCode(lcc)); + averageClusteringCoefficient = graph + .run(new org.apache.flink.graph.library.clustering.directed.AverageClusteringCoefficient() + .setLittleParallelism(lp)); break; - case "csv": - String filename = parameters.get("output_filename"); + case UNDIRECTED: + result = graph + .run(new org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient() + .setLittleParallelism(lp)); - String lineDelimiter = StringEscapeUtils.unescapeJava( - parameters.get("output_line_delimiter", CsvOutputFormat.DEFAULT_LINE_DELIMITER)); + globalClusteringCoefficient = graph + .run(new org.apache.flink.graph.library.clustering.undirected.GlobalClusteringCoefficient() + .setLittleParallelism(lp)); - String fieldDelimiter = StringEscapeUtils.unescapeJava( - parameters.get("output_field_delimiter", CsvOutputFormat.DEFAULT_FIELD_DELIMITER)); - - lcc.writeAsCsv(filename, lineDelimiter, fieldDelimiter); - - env.execute("Clustering Coefficient"); + averageClusteringCoefficient = graph + .run(new org.apache.flink.graph.library.clustering.undirected.AverageClusteringCoefficient() + .setLittleParallelism(lp)); break; - - default: - throw new ProgramParametrizationException(getUsage("invalid output type")); } + } - System.out.println(gcc.getResult()); - System.out.println(acc.getResult()); + @Override + public void hash(String executionName) throws Exception { + super.hash(executionName); + printAnalytics(); + } - JobExecutionResult result = env.getLastJobExecutionResult(); + @Override + public void print(String executionName) throws Exception { + super.print(executionName); + printAnalytics(); + } + + @Override + public void writeCSV(String filename, String lineDelimiter, String fieldDelimiter) { + super.writeCSV(filename, lineDelimiter, fieldDelimiter); + printAnalytics(); + } - NumberFormat nf = NumberFormat.getInstance(); - System.out.println("Execution runtime: " + nf.format(result.getNetRuntime()) + " ms"); + private void printAnalytics() { + System.out.println(globalClusteringCoefficient.getResult().toPrintableString()); + System.out.println(averageClusteringCoefficient.getResult().toPrintableString()); } } http://git-wip-us.apache.org/repos/asf/flink/blob/a48357db/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/ConnectedComponents.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/ConnectedComponents.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/ConnectedComponents.java new file mode 100644 index 0000000..32263cf --- /dev/null +++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/ConnectedComponents.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.drivers; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.Vertex; +import org.apache.flink.graph.asm.dataset.ChecksumHashCode; +import org.apache.flink.graph.asm.dataset.ChecksumHashCode.Checksum; +import org.apache.flink.graph.asm.dataset.Collect; +import org.apache.flink.graph.drivers.output.CSV; +import org.apache.flink.graph.drivers.output.Hash; +import org.apache.flink.graph.drivers.output.Print; +import org.apache.flink.graph.drivers.parameter.ParameterizedBase; +import org.apache.flink.graph.library.GSAConnectedComponents; + +import java.util.List; + +/** + * Driver for {@link org.apache.flink.graph.library.GSAConnectedComponents}. + * + * The gather-sum-apply implementation is used because scatter-gather does not + * handle object reuse (see FLINK-5891). + */ +public class ConnectedComponents, VV, EV> +extends ParameterizedBase +implements Driver, CSV, Hash, Print { + + private DataSet> components; + + @Override + public String getName() { + return this.getClass().getSimpleName(); + } + + @Override + public String getShortDescription() { + return "ConnectedComponents"; + } + + @Override + public String getLongDescription() { + return "ConnectedComponents"; + } + + @Override + public void plan(Graph graph) throws Exception { + components = graph + .mapVertices(new MapVertices()) + .run(new GSAConnectedComponents(Integer.MAX_VALUE)); + } + + @Override + public void hash(String executionName) throws Exception { + Checksum checksum = new ChecksumHashCode>() + .run(components) + .execute(executionName); + + System.out.println(checksum); + } + + @Override + public void print(String executionName) throws Exception { + Collect> collector = new Collect<>(); + + // Refactored due to openjdk7 compile error: https://travis-ci.org/greghogan/flink/builds/200487761 + List> records = collector.run(components).execute(executionName); + + for (Vertex result : records) { + System.out.println(result); + } + } + + @Override + public void writeCSV(String filename, String lineDelimiter, String fieldDelimiter) { + components + .writeAsCsv(filename, lineDelimiter, fieldDelimiter) + .name("CSV: " + filename); + } + + private static final class MapVertices + implements MapFunction, T> { + @Override + public T map(Vertex value) throws Exception { + return value.f0; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/a48357db/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/EdgeList.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/EdgeList.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/EdgeList.java new file mode 100644 index 0000000..85f32c3 --- /dev/null +++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/EdgeList.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.drivers; + +import org.apache.flink.api.java.DataSet; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.asm.dataset.ChecksumHashCode; +import org.apache.flink.graph.asm.dataset.ChecksumHashCode.Checksum; +import org.apache.flink.graph.asm.dataset.Collect; +import org.apache.flink.graph.drivers.output.CSV; +import org.apache.flink.graph.drivers.output.Hash; +import org.apache.flink.graph.drivers.output.Print; +import org.apache.flink.graph.drivers.parameter.ParameterizedBase; + +import java.util.List; + +/** + * Convert a {@link Graph} to the {@link DataSet} of {@link Edge}s. + */ +public class EdgeList +extends ParameterizedBase +implements Driver, CSV, Hash, Print { + + private DataSet> edges; + + @Override + public String getName() { + return this.getClass().getSimpleName(); + } + + @Override + public String getShortDescription() { + return "the edge list"; + } + + @Override + public String getLongDescription() { + return "Pass-through of the graph's edge list."; + } + + @Override + public void plan(Graph graph) throws Exception { + edges = graph + .getEdges(); + } + + @Override + public void hash(String executionName) throws Exception { + Checksum checksum = new ChecksumHashCode>() + .run(edges) + .execute(executionName); + + System.out.println(checksum); + } + + @Override + public void print(String executionName) throws Exception { + Collect> collector = new Collect<>(); + + // Refactored due to openjdk7 compile error: https://travis-ci.org/greghogan/flink/builds/200487761 + List> records = collector.run(edges).execute(executionName); + + for (Edge result : records) { + System.out.println(result); + } + + } + + @Override + public void writeCSV(String filename, String lineDelimiter, String fieldDelimiter) { + edges + .writeAsCsv(filename, lineDelimiter, fieldDelimiter) + .name("CSV: " + filename); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/a48357db/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/Graph500.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/Graph500.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/Graph500.java deleted file mode 100644 index c2abbf7..0000000 --- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/Graph500.java +++ /dev/null @@ -1,165 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.graph.drivers; - -import org.apache.commons.lang3.StringEscapeUtils; -import org.apache.commons.lang3.text.StrBuilder; -import org.apache.commons.lang3.text.WordUtils; -import org.apache.commons.math3.random.JDKRandomGenerator; -import org.apache.flink.api.common.JobExecutionResult; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.io.CsvOutputFormat; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.utils.DataSetUtils; -import org.apache.flink.api.java.utils.ParameterTool; -import org.apache.flink.client.program.ProgramParametrizationException; -import org.apache.flink.graph.Graph; -import org.apache.flink.graph.generator.RMatGraph; -import org.apache.flink.graph.generator.random.JDKRandomGeneratorFactory; -import org.apache.flink.graph.generator.random.RandomGenerableFactory; -import org.apache.flink.types.LongValue; -import org.apache.flink.types.NullValue; - -import java.text.NumberFormat; - -/** - * Generate an RMat graph for Graph 500. - * - * Note that this does not yet implement permutation of vertex labels or edges. - * - * @see Graph 500 - */ -public class Graph500 { - - private static final int DEFAULT_SCALE = 10; - - private static final int DEFAULT_EDGE_FACTOR = 16; - - private static final boolean DEFAULT_CLIP_AND_FLIP = true; - - private static String getUsage(String message) { - return new StrBuilder() - .appendNewLine() - .appendln("A Graph500 generator using the Recursive Matrix (RMat) graph generator.") - .appendNewLine() - .appendln(WordUtils.wrap("The graph matrix contains 2^scale vertices although not every vertex will" + - " be represented in an edge. The number of edges is edge_factor * 2^scale edges" + - " although some edges may be duplicates.", 80)) - .appendNewLine() - .appendln("Note: this does not yet implement permutation of vertex labels or edges.") - .appendNewLine() - .appendln("usage: Graph500 --directed --simplify --output ") - .appendNewLine() - .appendln("options:") - .appendln(" --output print") - .appendln(" --output hash") - .appendln(" --output csv --output_filename FILENAME [--output_line_delimiter LINE_DELIMITER] [--output_field_delimiter FIELD_DELIMITER]") - .appendNewLine() - .appendln("Usage error: " + message) - .toString(); - } - - public static void main(String[] args) throws Exception { - // Set up the execution environment - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - env.getConfig().enableObjectReuse(); - - ParameterTool parameters = ParameterTool.fromArgs(args); - env.getConfig().setGlobalJobParameters(parameters); - - if (! parameters.has("directed")) { - throw new ProgramParametrizationException(getUsage("must declare execution mode as '--directed true' or '--directed false'")); - } - boolean directed = parameters.getBoolean("directed"); - - if (! parameters.has("simplify")) { - throw new ProgramParametrizationException(getUsage("must declare '--simplify true' or '--simplify false'")); - } - boolean simplify = parameters.getBoolean("simplify"); - - - // Generate RMat graph - int scale = parameters.getInt("scale", DEFAULT_SCALE); - int edgeFactor = parameters.getInt("edge_factor", DEFAULT_EDGE_FACTOR); - - RandomGenerableFactory rnd = new JDKRandomGeneratorFactory(); - - long vertexCount = 1L << scale; - long edgeCount = vertexCount * edgeFactor; - - boolean clipAndFlip = parameters.getBoolean("clip_and_flip", DEFAULT_CLIP_AND_FLIP); - - Graph graph = new RMatGraph<>(env, rnd, vertexCount, edgeCount) - .generate(); - - if (directed) { - if (simplify) { - graph = graph - .run(new org.apache.flink.graph.asm.simple.directed.Simplify()); - } - } else { - if (simplify) { - graph = graph - .run(new org.apache.flink.graph.asm.simple.undirected.Simplify(clipAndFlip)); - } else { - graph = graph.getUndirected(); - } - } - - DataSet> edges = graph - .getEdges() - .project(0, 1); - - // Print, hash, or write RMat graph to disk - switch (parameters.get("output", "")) { - case "print": - System.out.println(); - edges.print(); - break; - - case "hash": - System.out.println(); - System.out.println(DataSetUtils.checksumHashCode(edges)); - break; - - case "csv": - String filename = parameters.getRequired("output_filename"); - - String lineDelimiter = StringEscapeUtils.unescapeJava( - parameters.get("output_line_delimiter", CsvOutputFormat.DEFAULT_LINE_DELIMITER)); - - String fieldDelimiter = StringEscapeUtils.unescapeJava( - parameters.get("output_field_delimiter", CsvOutputFormat.DEFAULT_FIELD_DELIMITER)); - - edges.writeAsCsv(filename, lineDelimiter, fieldDelimiter); - - env.execute("Graph500"); - break; - default: - throw new ProgramParametrizationException(getUsage("invalid output type")); - } - - JobExecutionResult result = env.getLastJobExecutionResult(); - - NumberFormat nf = NumberFormat.getInstance(); - System.out.println(); - System.out.println("Execution runtime: " + nf.format(result.getNetRuntime()) + " ms"); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/a48357db/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/GraphMetrics.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/GraphMetrics.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/GraphMetrics.java index 9b246df..cc5a894 100644 --- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/GraphMetrics.java +++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/GraphMetrics.java @@ -18,224 +18,109 @@ package org.apache.flink.graph.drivers; -import org.apache.commons.lang3.StringEscapeUtils; import org.apache.commons.lang3.text.StrBuilder; -import org.apache.commons.lang3.text.WordUtils; -import org.apache.commons.math3.random.JDKRandomGenerator; -import org.apache.flink.api.common.JobExecutionResult; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.io.CsvOutputFormat; -import org.apache.flink.api.java.utils.ParameterTool; -import org.apache.flink.client.program.ProgramParametrizationException; import org.apache.flink.graph.Graph; import org.apache.flink.graph.GraphAnalytic; -import org.apache.flink.graph.GraphCsvReader; -import org.apache.flink.graph.asm.translate.TranslateGraphIds; -import org.apache.flink.graph.asm.translate.translators.LongValueToUnsignedIntValue; -import org.apache.flink.graph.generator.RMatGraph; -import org.apache.flink.graph.generator.random.JDKRandomGeneratorFactory; -import org.apache.flink.graph.generator.random.RandomGenerableFactory; -import org.apache.flink.types.IntValue; -import org.apache.flink.types.LongValue; -import org.apache.flink.types.NullValue; -import org.apache.flink.types.StringValue; - -import java.text.NumberFormat; +import org.apache.flink.graph.asm.result.PrintableResult; +import org.apache.flink.graph.drivers.output.Hash; +import org.apache.flink.graph.drivers.output.Print; +import org.apache.flink.graph.drivers.parameter.ChoiceParameter; +import org.apache.flink.graph.drivers.parameter.ParameterizedBase; +import org.apache.flink.types.CopyableValue; /** - * Computes vertex and edge metrics on a directed or undirected graph. + * Driver for directed and undirected graph metrics analytics. * * @see org.apache.flink.graph.library.metric.directed.EdgeMetrics * @see org.apache.flink.graph.library.metric.directed.VertexMetrics * @see org.apache.flink.graph.library.metric.undirected.EdgeMetrics * @see org.apache.flink.graph.library.metric.undirected.VertexMetrics */ -public class GraphMetrics { +public class GraphMetrics & CopyableValue, VV, EV> +extends ParameterizedBase +implements Driver, Hash, Print { - private static final int DEFAULT_SCALE = 10; + private static final String DIRECTED = "directed"; - private static final int DEFAULT_EDGE_FACTOR = 16; + private static final String UNDIRECTED = "undirected"; - private static final boolean DEFAULT_CLIP_AND_FLIP = true; + private ChoiceParameter order = new ChoiceParameter(this, "order") + .addChoices(DIRECTED, UNDIRECTED); - private static String getUsage(String message) { + private GraphAnalytic vertexMetrics; + + private GraphAnalytic edgeMetrics; + + @Override + public String getName() { + return this.getClass().getSimpleName(); + } + + @Override + public String getShortDescription() { + return "compute vertex and edge metrics"; + } + + @Override + public String getLongDescription() { return new StrBuilder() + .appendln("Computes metrics on a directed or undirected graph.") .appendNewLine() - .appendln(WordUtils.wrap("Computes vertex and edge metrics on a directed or undirected graph.", 80)) - .appendNewLine() - .appendln("usage: GraphMetrics --directed --input ") + .appendln("Vertex metrics:") + .appendln("- number of vertices") + .appendln("- number of edges") + .appendln("- number of unidirectional edges (directed only)") + .appendln("- number of bidirectional edges (directed only)") + .appendln("- average degree") + .appendln("- number of triplets") + .appendln("- maximum degree") + .appendln("- maximum out degree (directed only)") + .appendln("- maximum in degree (directed only)") + .appendln("- maximum number of triplets") .appendNewLine() - .appendln("options:") - .appendln(" --input csv --type [--simplify ] --input_filename FILENAME [--input_line_delimiter LINE_DELIMITER] [--input_field_delimiter FIELD_DELIMITER]") - .appendln(" --input rmat [--scale SCALE] [--edge_factor EDGE_FACTOR]") - .appendNewLine() - .appendln("Usage error: " + message) + .appendln("Edge metrics:") + .appendln("- number of triangle triplets") + .appendln("- number of rectangle triplets") + .appendln("- maximum number of triangle triplets") + .append("- maximum number of rectangle triplets") .toString(); } - public static void main(String[] args) throws Exception { - // Set up the execution environment - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - env.getConfig().enableObjectReuse(); + @Override + public void plan(Graph graph) throws Exception { + switch (order.getValue()) { + case DIRECTED: + vertexMetrics = graph + .run(new org.apache.flink.graph.library.metric.directed.VertexMetrics()); - ParameterTool parameters = ParameterTool.fromArgs(args); - env.getConfig().setGlobalJobParameters(parameters); + edgeMetrics = graph + .run(new org.apache.flink.graph.library.metric.directed.EdgeMetrics()); + break; - if (! parameters.has("directed")) { - throw new ProgramParametrizationException(getUsage("must declare execution mode as '--directed true' or '--directed false'")); - } - boolean directedAlgorithm = parameters.getBoolean("directed"); - - GraphAnalytic vm; - GraphAnalytic em; - - switch (parameters.get("input", "")) { - case "csv": { - String lineDelimiter = StringEscapeUtils.unescapeJava( - parameters.get("input_line_delimiter", CsvOutputFormat.DEFAULT_LINE_DELIMITER)); - - String fieldDelimiter = StringEscapeUtils.unescapeJava( - parameters.get("input_field_delimiter", CsvOutputFormat.DEFAULT_FIELD_DELIMITER)); - - GraphCsvReader reader = Graph - .fromCsvReader(parameters.getRequired("input_filename"), env) - .ignoreCommentsEdges("#") - .lineDelimiterEdges(lineDelimiter) - .fieldDelimiterEdges(fieldDelimiter); - - switch (parameters.get("type", "")) { - case "integer": { - Graph graph = reader - .keyType(LongValue.class); - - if (directedAlgorithm) { - if (parameters.getBoolean("simplify", false)) { - graph = graph - .run(new org.apache.flink.graph.asm.simple.directed.Simplify()); - } - - vm = graph - .run(new org.apache.flink.graph.library.metric.directed.VertexMetrics()); - em = graph - .run(new org.apache.flink.graph.library.metric.directed.EdgeMetrics()); - } else { - if (parameters.getBoolean("simplify", false)) { - graph = graph - .run(new org.apache.flink.graph.asm.simple.undirected.Simplify(false)); - } - - vm = graph - .run(new org.apache.flink.graph.library.metric.undirected.VertexMetrics()); - em = graph - .run(new org.apache.flink.graph.library.metric.undirected.EdgeMetrics()); - } - } break; - - case "string": { - Graph graph = reader - .keyType(StringValue.class); - - if (directedAlgorithm) { - if (parameters.getBoolean("simplify", false)) { - graph = graph - .run(new org.apache.flink.graph.asm.simple.directed.Simplify()); - } - - vm = graph - .run(new org.apache.flink.graph.library.metric.directed.VertexMetrics()); - em = graph - .run(new org.apache.flink.graph.library.metric.directed.EdgeMetrics()); - } else { - if (parameters.getBoolean("simplify", false)) { - graph = graph - .run(new org.apache.flink.graph.asm.simple.undirected.Simplify(false)); - } - - vm = graph - .run(new org.apache.flink.graph.library.metric.undirected.VertexMetrics()); - em = graph - .run(new org.apache.flink.graph.library.metric.undirected.EdgeMetrics()); - } - } break; - - default: - throw new ProgramParametrizationException(getUsage("invalid CSV type")); - } - } break; - - case "rmat": { - int scale = parameters.getInt("scale", DEFAULT_SCALE); - int edgeFactor = parameters.getInt("edge_factor", DEFAULT_EDGE_FACTOR); - - RandomGenerableFactory rnd = new JDKRandomGeneratorFactory(); - - long vertexCount = 1L << scale; - long edgeCount = vertexCount * edgeFactor; - - - Graph graph = new RMatGraph<>(env, rnd, vertexCount, edgeCount) - .generate(); - - if (directedAlgorithm) { - if (scale > 32) { - Graph newGraph = graph - .run(new org.apache.flink.graph.asm.simple.directed.Simplify()); - - vm = newGraph - .run(new org.apache.flink.graph.library.metric.directed.VertexMetrics()); - em = newGraph - .run(new org.apache.flink.graph.library.metric.directed.EdgeMetrics()); - } else { - Graph newGraph = graph - .run(new TranslateGraphIds(new LongValueToUnsignedIntValue())) - .run(new org.apache.flink.graph.asm.simple.directed.Simplify()); - - vm = newGraph - .run(new org.apache.flink.graph.library.metric.directed.VertexMetrics()); - em = newGraph - .run(new org.apache.flink.graph.library.metric.directed.EdgeMetrics()); - } - } else { - boolean clipAndFlip = parameters.getBoolean("clip_and_flip", DEFAULT_CLIP_AND_FLIP); - - if (scale > 32) { - Graph newGraph = graph - .run(new org.apache.flink.graph.asm.simple.undirected.Simplify(clipAndFlip)); - - vm = newGraph - .run(new org.apache.flink.graph.library.metric.undirected.VertexMetrics()); - em = newGraph - .run(new org.apache.flink.graph.library.metric.undirected.EdgeMetrics()); - } else { - Graph newGraph = graph - .run(new TranslateGraphIds(new LongValueToUnsignedIntValue())) - .run(new org.apache.flink.graph.asm.simple.undirected.Simplify(clipAndFlip)); - - vm = newGraph - .run(new org.apache.flink.graph.library.metric.undirected.VertexMetrics()); - em = newGraph - .run(new org.apache.flink.graph.library.metric.undirected.EdgeMetrics()); - } - } - } break; - - default: - throw new ProgramParametrizationException(getUsage("invalid input type")); + case UNDIRECTED: + vertexMetrics = graph + .run(new org.apache.flink.graph.library.metric.undirected.VertexMetrics()); + + edgeMetrics = graph + .run(new org.apache.flink.graph.library.metric.undirected.EdgeMetrics()); + break; } + } - env.execute("Graph Metrics"); + @Override + public void hash(String executionName) throws Exception { + print(executionName); + } - System.out.println(); - System.out.print("Vertex metrics:\n "); - System.out.println(vm.getResult().toString().replace(";", "\n ")); - System.out.println(); - System.out.print("Edge metrics:\n "); - System.out.println(em.getResult().toString().replace(";", "\n ")); + @Override + public void print(String executionName) throws Exception { + vertexMetrics.execute(executionName); - JobExecutionResult result = env.getLastJobExecutionResult(); + System.out.print("Vertex metrics:\n "); + System.out.println(vertexMetrics.getResult().toPrintableString().replace(";", "\n ")); - NumberFormat nf = NumberFormat.getInstance(); System.out.println(); - System.out.println("Execution runtime: " + nf.format(result.getNetRuntime()) + " ms"); + System.out.print("Edge metrics:\n "); + System.out.println(edgeMetrics.getResult().toPrintableString().replace(";", "\n ")); } } http://git-wip-us.apache.org/repos/asf/flink/blob/a48357db/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/HITS.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/HITS.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/HITS.java index db27f0e..6081fea 100644 --- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/HITS.java +++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/HITS.java @@ -18,177 +18,51 @@ package org.apache.flink.graph.drivers; -import org.apache.commons.lang3.StringEscapeUtils; import org.apache.commons.lang3.text.StrBuilder; import org.apache.commons.lang3.text.WordUtils; -import org.apache.commons.math3.random.JDKRandomGenerator; -import org.apache.flink.api.common.JobExecutionResult; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.io.CsvOutputFormat; -import org.apache.flink.api.java.utils.DataSetUtils; -import org.apache.flink.api.java.utils.ParameterTool; -import org.apache.flink.client.program.ProgramParametrizationException; import org.apache.flink.graph.Graph; -import org.apache.flink.graph.GraphCsvReader; -import org.apache.flink.graph.asm.simple.directed.Simplify; -import org.apache.flink.graph.asm.translate.TranslateGraphIds; -import org.apache.flink.graph.asm.translate.translators.LongValueToUnsignedIntValue; -import org.apache.flink.graph.generator.RMatGraph; -import org.apache.flink.graph.generator.random.JDKRandomGeneratorFactory; -import org.apache.flink.graph.generator.random.RandomGenerableFactory; +import org.apache.flink.graph.drivers.output.CSV; +import org.apache.flink.graph.drivers.output.Print; +import org.apache.flink.graph.drivers.parameter.IterationConvergence; import org.apache.flink.graph.library.link_analysis.HITS.Result; -import org.apache.flink.types.IntValue; -import org.apache.flink.types.LongValue; -import org.apache.flink.types.NullValue; -import org.apache.flink.types.StringValue; - -import java.text.NumberFormat; /** - * Driver for the library implementation of HITS (Hubs and Authorities). - * - * This example reads a simple, undirected graph from a CSV file or generates - * an undirected RMat graph with the given scale and edge factor then calculates - * hub and authority scores for each vertex. - * - * @see org.apache.flink.graph.library.link_analysis.HITS + * Driver for {@link org.apache.flink.graph.library.link_analysis.HITS}. */ -public class HITS { +public class HITS +extends SimpleDriver> +implements Driver, CSV, Print { private static final int DEFAULT_ITERATIONS = 10; - private static final int DEFAULT_SCALE = 10; + private IterationConvergence iterationConvergence = new IterationConvergence(this, DEFAULT_ITERATIONS); - private static final int DEFAULT_EDGE_FACTOR = 16; - - private static String getUsage(String message) { - return new StrBuilder() - .appendNewLine() - .appendln(WordUtils.wrap("Hyperlink-Induced Topic Search computes two interdependent" + - " scores for every vertex in a directed graph. A good \"hub\" links to good \"authorities\"" + - " and good \"authorities\" are linked from good \"hubs\".", 80)) - .appendNewLine() - .appendln("usage: HITS --input --output ") - .appendNewLine() - .appendln("options:") - .appendln(" --input csv --type --input_filename FILENAME [--input_line_delimiter LINE_DELIMITER] [--input_field_delimiter FIELD_DELIMITER]") - .appendln(" --input rmat [--scale SCALE] [--edge_factor EDGE_FACTOR]") - .appendNewLine() - .appendln(" --output print") - .appendln(" --output hash") - .appendln(" --output csv --output_filename FILENAME [--output_line_delimiter LINE_DELIMITER] [--output_field_delimiter FIELD_DELIMITER]") - .appendNewLine() - .appendln("Usage error: " + message) - .toString(); + @Override + public String getName() { + return this.getClass().getSimpleName(); } - public static void main(String[] args) throws Exception { - // Set up the execution environment - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - env.getConfig().enableObjectReuse(); - - ParameterTool parameters = ParameterTool.fromArgs(args); - env.getConfig().setGlobalJobParameters(parameters); - - int iterations = parameters.getInt("iterations", DEFAULT_ITERATIONS); - - DataSet hits; - - switch (parameters.get("input", "")) { - case "csv": { - String lineDelimiter = StringEscapeUtils.unescapeJava( - parameters.get("input_line_delimiter", CsvOutputFormat.DEFAULT_LINE_DELIMITER)); - - String fieldDelimiter = StringEscapeUtils.unescapeJava( - parameters.get("input_field_delimiter", CsvOutputFormat.DEFAULT_FIELD_DELIMITER)); - - GraphCsvReader reader = Graph - .fromCsvReader(parameters.getRequired("input_filename"), env) - .ignoreCommentsEdges("#") - .lineDelimiterEdges(lineDelimiter) - .fieldDelimiterEdges(fieldDelimiter); - - switch (parameters.get("type", "")) { - case "integer": { - hits = reader - .keyType(LongValue.class) - .run(new org.apache.flink.graph.library.link_analysis.HITS(iterations)); - } break; - - case "string": { - hits = reader - .keyType(StringValue.class) - .run(new org.apache.flink.graph.library.link_analysis.HITS(iterations)); - } break; - - default: - throw new ProgramParametrizationException(getUsage("invalid CSV type")); - } - } break; - - case "rmat": { - int scale = parameters.getInt("scale", DEFAULT_SCALE); - int edgeFactor = parameters.getInt("edge_factor", DEFAULT_EDGE_FACTOR); - - RandomGenerableFactory rnd = new JDKRandomGeneratorFactory(); - - long vertexCount = 1L << scale; - long edgeCount = vertexCount * edgeFactor; - - Graph graph = new RMatGraph<>(env, rnd, vertexCount, edgeCount) - .generate(); - - if (scale > 32) { - hits = graph - .run(new Simplify()) - .run(new org.apache.flink.graph.library.link_analysis.HITS(iterations)); - } else { - hits = graph - .run(new TranslateGraphIds(new LongValueToUnsignedIntValue())) - .run(new Simplify()) - .run(new org.apache.flink.graph.library.link_analysis.HITS(iterations)); - } - } break; - - default: - throw new ProgramParametrizationException(getUsage("invalid input type")); - } - - switch (parameters.get("output", "")) { - case "print": - System.out.println(); - for (Object e: hits.collect()) { - System.out.println(((Result)e).toPrintableString()); - } - break; - - case "hash": - System.out.println(); - System.out.println(DataSetUtils.checksumHashCode(hits)); - break; - - case "csv": - String filename = parameters.getRequired("output_filename"); - - String lineDelimiter = StringEscapeUtils.unescapeJava( - parameters.get("output_line_delimiter", CsvOutputFormat.DEFAULT_LINE_DELIMITER)); - - String fieldDelimiter = StringEscapeUtils.unescapeJava( - parameters.get("output_field_delimiter", CsvOutputFormat.DEFAULT_FIELD_DELIMITER)); - - hits.writeAsCsv(filename, lineDelimiter, fieldDelimiter); - - env.execute("HITS"); - break; - default: - throw new ProgramParametrizationException(getUsage("invalid output type")); - } + @Override + public String getShortDescription() { + return "score vertices as hubs and authorities"; + } - JobExecutionResult result = env.getLastJobExecutionResult(); + @Override + public String getLongDescription() { + return WordUtils.wrap(new StrBuilder() + .appendln("Hyperlink-Induced Topic Search computes two interdependent scores for " + + "each vertex in a directed graph. A good \"hub\" links to good \"authorities\" " + + "and good \"authorities\" are linked to from good \"hubs\".") + .appendNewLine() + .append("The result contains the vertex ID, hub score, and authority score.") + .toString(), 80); + } - NumberFormat nf = NumberFormat.getInstance(); - System.out.println(); - System.out.println("Execution runtime: " + nf.format(result.getNetRuntime()) + " ms"); + @Override + public void plan(Graph graph) throws Exception { + result = graph + .run(new org.apache.flink.graph.library.link_analysis.HITS( + iterationConvergence.getValue().iterations, + iterationConvergence.getValue().convergenceThreshold)); } } http://git-wip-us.apache.org/repos/asf/flink/blob/a48357db/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/JaccardIndex.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/JaccardIndex.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/JaccardIndex.java index 09479a6..1c836ea 100644 --- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/JaccardIndex.java +++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/JaccardIndex.java @@ -18,211 +18,57 @@ package org.apache.flink.graph.drivers; -import org.apache.commons.lang3.StringEscapeUtils; import org.apache.commons.lang3.text.StrBuilder; import org.apache.commons.lang3.text.WordUtils; -import org.apache.commons.math3.random.JDKRandomGenerator; -import org.apache.flink.api.common.JobExecutionResult; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.io.CsvOutputFormat; -import org.apache.flink.api.java.utils.DataSetUtils; -import org.apache.flink.api.java.utils.ParameterTool; -import org.apache.flink.client.program.ProgramParametrizationException; import org.apache.flink.graph.Graph; -import org.apache.flink.graph.GraphCsvReader; -import org.apache.flink.graph.asm.simple.undirected.Simplify; -import org.apache.flink.graph.asm.translate.TranslateGraphIds; -import org.apache.flink.graph.asm.translate.translators.LongValueToUnsignedIntValue; -import org.apache.flink.graph.generator.RMatGraph; -import org.apache.flink.graph.generator.random.JDKRandomGeneratorFactory; -import org.apache.flink.graph.generator.random.RandomGenerableFactory; +import org.apache.flink.graph.drivers.output.CSV; +import org.apache.flink.graph.drivers.output.Hash; +import org.apache.flink.graph.drivers.output.Print; +import org.apache.flink.graph.drivers.parameter.LongParameter; import org.apache.flink.graph.library.similarity.JaccardIndex.Result; -import org.apache.flink.types.IntValue; -import org.apache.flink.types.LongValue; -import org.apache.flink.types.NullValue; -import org.apache.flink.types.StringValue; - -import java.text.NumberFormat; +import org.apache.flink.types.CopyableValue; import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT; /** - * Driver for the library implementation of Jaccard Index. - * - * This example reads a simple, undirected graph from a CSV file or generates - * an undirected RMat graph with the given scale and edge factor then calculates - * all non-zero Jaccard Index similarity scores between vertices. - * - * @see org.apache.flink.graph.library.similarity.JaccardIndex + * Driver for {@link org.apache.flink.graph.library.similarity.JaccardIndex}. */ -public class JaccardIndex { +public class JaccardIndex, VV, EV> +extends SimpleDriver> +implements Driver, CSV, Hash, Print { - private static final int DEFAULT_SCALE = 10; + private LongParameter littleParallelism = new LongParameter(this, "little_parallelism") + .setDefaultValue(PARALLELISM_DEFAULT); - private static final int DEFAULT_EDGE_FACTOR = 16; + @Override + public String getName() { + return this.getClass().getSimpleName(); + } - private static final boolean DEFAULT_CLIP_AND_FLIP = true; + @Override + public String getShortDescription() { + return "similarity score as fraction of common neighbors"; + } - private static String getUsage(String message) { - return new StrBuilder() - .appendNewLine() - .appendln(WordUtils.wrap("The Jaccard Index measures the similarity between vertex" + - " neighborhoods and is computed as the number of shared neighbors divided by the number of" + - " distinct neighbors. Scores range from 0.0 (no shared neighbors) to 1.0 (all neighbors are" + - " shared).", 80)) - .appendNewLine() - .appendln(WordUtils.wrap("This algorithm returns 4-tuples containing two vertex IDs, the" + - " number of shared neighbors, and the number of distinct neighbors.", 80)) - .appendNewLine() - .appendln("usage: JaccardIndex --input --output ") - .appendNewLine() - .appendln("options:") - .appendln(" --input csv --type [--simplify ] --input_filename FILENAME [--input_line_delimiter LINE_DELIMITER] [--input_field_delimiter FIELD_DELIMITER]") - .appendln(" --input rmat [--scale SCALE] [--edge_factor EDGE_FACTOR]") - .appendNewLine() - .appendln(" --output print") - .appendln(" --output hash") - .appendln(" --output csv --output_filename FILENAME [--output_line_delimiter LINE_DELIMITER] [--output_field_delimiter FIELD_DELIMITER]") + @Override + public String getLongDescription() { + return WordUtils.wrap(new StrBuilder() + .appendln("Jaccard Index measures the similarity between vertex neighborhoods and " + + "is computed as the number of shared neighbors divided by the number of " + + "distinct neighbors. Scores range from 0.0 (no shared neighbors) to 1.0 (all " + + "neighbors are shared).") .appendNewLine() - .appendln("Usage error: " + message) - .toString(); + .append("The result contains two vertex IDs, the number of shared neighbors, and " + + "the number of distinct neighbors.") + .toString(), 80); } - public static void main(String[] args) throws Exception { - // Set up the execution environment - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - env.getConfig().enableObjectReuse(); - - ParameterTool parameters = ParameterTool.fromArgs(args); - env.getConfig().setGlobalJobParameters(parameters); - - int little_parallelism = parameters.getInt("little_parallelism", PARALLELISM_DEFAULT); - - DataSet ji; - - switch (parameters.get("input", "")) { - case "csv": { - String lineDelimiter = StringEscapeUtils.unescapeJava( - parameters.get("input_line_delimiter", CsvOutputFormat.DEFAULT_LINE_DELIMITER)); - - String fieldDelimiter = StringEscapeUtils.unescapeJava( - parameters.get("input_field_delimiter", CsvOutputFormat.DEFAULT_FIELD_DELIMITER)); - - GraphCsvReader reader = Graph - .fromCsvReader(parameters.getRequired("input_filename"), env) - .ignoreCommentsEdges("#") - .lineDelimiterEdges(lineDelimiter) - .fieldDelimiterEdges(fieldDelimiter); - - switch (parameters.get("type", "")) { - case "integer": { - Graph graph = reader - .keyType(LongValue.class); - - if (parameters.getBoolean("simplify", false)) { - graph = graph - .run(new org.apache.flink.graph.asm.simple.undirected.Simplify(false) - .setParallelism(little_parallelism)); - } - - ji = graph - .run(new org.apache.flink.graph.library.similarity.JaccardIndex() - .setLittleParallelism(little_parallelism)); - } break; - - case "string": { - Graph graph = reader - .keyType(StringValue.class); - - if (parameters.getBoolean("simplify", false)) { - graph = graph - .run(new org.apache.flink.graph.asm.simple.undirected.Simplify(false) - .setParallelism(little_parallelism)); - } - - ji = graph - .run(new org.apache.flink.graph.library.similarity.JaccardIndex() - .setLittleParallelism(little_parallelism)); - } break; - - default: - throw new ProgramParametrizationException(getUsage("invalid CSV type")); - } - } break; - - case "rmat": { - int scale = parameters.getInt("scale", DEFAULT_SCALE); - int edgeFactor = parameters.getInt("edge_factor", DEFAULT_EDGE_FACTOR); - - RandomGenerableFactory rnd = new JDKRandomGeneratorFactory(); - - long vertexCount = 1L << scale; - long edgeCount = vertexCount * edgeFactor; - - Graph graph = new RMatGraph<>(env, rnd, vertexCount, edgeCount) - .setParallelism(little_parallelism) - .generate(); - - boolean clipAndFlip = parameters.getBoolean("clip_and_flip", DEFAULT_CLIP_AND_FLIP); - - if (scale > 32) { - ji = graph - .run(new Simplify(clipAndFlip) - .setParallelism(little_parallelism)) - .run(new org.apache.flink.graph.library.similarity.JaccardIndex() - .setLittleParallelism(little_parallelism)); - } else { - ji = graph - .run(new TranslateGraphIds(new LongValueToUnsignedIntValue()) - .setParallelism(little_parallelism)) - .run(new Simplify(clipAndFlip) - .setParallelism(little_parallelism)) - .run(new org.apache.flink.graph.library.similarity.JaccardIndex() - .setLittleParallelism(little_parallelism)); - } - } break; - - default: - throw new ProgramParametrizationException(getUsage("invalid input type")); - } - - switch (parameters.get("output", "")) { - case "print": - System.out.println(); - for (Object e: ji.collect()) { - Result result = (Result)e; - System.out.println(result.toPrintableString()); - } - break; - - case "hash": - System.out.println(); - System.out.println(DataSetUtils.checksumHashCode(ji)); - break; - - case "csv": - String filename = parameters.getRequired("output_filename"); - - String lineDelimiter = StringEscapeUtils.unescapeJava( - parameters.get("output_line_delimiter", CsvOutputFormat.DEFAULT_LINE_DELIMITER)); - - String fieldDelimiter = StringEscapeUtils.unescapeJava( - parameters.get("output_field_delimiter", CsvOutputFormat.DEFAULT_FIELD_DELIMITER)); - - ji.writeAsCsv(filename, lineDelimiter, fieldDelimiter); - - env.execute("Jaccard Index"); - break; - - default: - throw new ProgramParametrizationException(getUsage("invalid output type")); - } - - JobExecutionResult result = env.getLastJobExecutionResult(); + @Override + public void plan(Graph graph) throws Exception { + int lp = littleParallelism.getValue().intValue(); - NumberFormat nf = NumberFormat.getInstance(); - System.out.println(); - System.out.println("Execution runtime: " + nf.format(result.getNetRuntime()) + " ms"); + result = graph + .run(new org.apache.flink.graph.library.similarity.JaccardIndex() + .setLittleParallelism(lp)); } } http://git-wip-us.apache.org/repos/asf/flink/blob/a48357db/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/PageRank.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/PageRank.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/PageRank.java new file mode 100644 index 0000000..8cef077 --- /dev/null +++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/PageRank.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.drivers; + +import org.apache.commons.lang3.text.StrBuilder; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.drivers.output.CSV; +import org.apache.flink.graph.drivers.output.Print; +import org.apache.flink.graph.drivers.parameter.DoubleParameter; +import org.apache.flink.graph.drivers.parameter.IterationConvergence; +import org.apache.flink.graph.library.link_analysis.PageRank.Result; + +/** + * @see org.apache.flink.graph.library.link_analysis.PageRank + */ +public class PageRank +extends SimpleDriver> +implements Driver, CSV, Print { + + private static final int DEFAULT_ITERATIONS = 10; + + private DoubleParameter dampingFactor = new DoubleParameter(this, "damping_factor") + .setDefaultValue(0.85) + .setMinimumValue(0.0, false) + .setMaximumValue(1.0, false); + + private IterationConvergence iterationConvergence = new IterationConvergence(this, DEFAULT_ITERATIONS); + + @Override + public String getName() { + return this.getClass().getSimpleName(); + } + + @Override + public String getShortDescription() { + return "score vertices by the number and quality of incoming links"; + } + + @Override + public String getLongDescription() { + return new StrBuilder() + .appendln("PageRank computes a per-vertex score which is the sum of PageRank scores " + + "transmitted over in-edges. Each vertex's score is divided evenly among " + + "out-edges. High-scoring vertices are linked to by other high-scoring vertices.") + .appendNewLine() + .append("The result contains the vertex ID and PageRank score.") + .toString(); + } + + @Override + public void plan(Graph graph) throws Exception { + result = graph + .run(new org.apache.flink.graph.library.link_analysis.PageRank( + dampingFactor.getValue(), + iterationConvergence.getValue().iterations, + iterationConvergence.getValue().convergenceThreshold)); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/a48357db/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/SimpleDriver.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/SimpleDriver.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/SimpleDriver.java new file mode 100644 index 0000000..98bdfc5 --- /dev/null +++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/SimpleDriver.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.drivers; + +import org.apache.flink.api.java.DataSet; +import org.apache.flink.graph.asm.dataset.ChecksumHashCode; +import org.apache.flink.graph.asm.dataset.ChecksumHashCode.Checksum; +import org.apache.flink.graph.asm.dataset.Collect; +import org.apache.flink.graph.asm.result.PrintableResult; +import org.apache.flink.graph.drivers.parameter.ParameterizedBase; + +import java.util.List; + +/** + * A base driver storing a single result {@link DataSet} with values + * implementing {@link PrintableResult}. + * + * @param algorithm's result type + */ +public abstract class SimpleDriver +extends ParameterizedBase { + + protected DataSet result; + + public void hash(String executionName) throws Exception { + Checksum checksum = new ChecksumHashCode() + .run((DataSet) result) + .execute(executionName); + + System.out.println(checksum); + } + + public void print(String executionName) throws Exception { + Collect collector = new Collect<>(); + + // Refactored due to openjdk7 compile error: https://travis-ci.org/greghogan/flink/builds/200487761 + List records = collector.run((DataSet) result).execute(executionName); + + for (R result : records) { + System.out.println(result.toPrintableString()); + } + } + + public void writeCSV(String filename, String lineDelimiter, String fieldDelimiter) { + result + .writeAsCsv(filename, lineDelimiter, fieldDelimiter) + .name("CSV: " + filename); + } +}