spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sro...@apache.org
Subject spark git commit: [SPARK-16345][DOCUMENTATION][EXAMPLES][GRAPHX] Extract graphx programming guide example snippets from source files instead of hard code them
Date Sat, 02 Jul 2016 15:29:31 GMT
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 f3a359939 -> 0d0b41609


[SPARK-16345][DOCUMENTATION][EXAMPLES][GRAPHX] Extract graphx programming guide example snippets
from source files instead of hard code them

## What changes were proposed in this pull request?

I extract 6 example programs from GraphX programming guide and replace them with
`include_example` label.

The 6 example programs are:
- AggregateMessagesExample.scala
- SSSPExample.scala
- TriangleCountingExample.scala
- ConnectedComponentsExample.scala
- ComprehensiveExample.scala
- PageRankExample.scala

All the example code can run using
`bin/run-example graphx.EXAMPLE_NAME`

## How was this patch tested?

Manual.

Author: WeichenXu <WeichenXu123@outlook.com>

Closes #14015 from WeichenXu123/graphx_example_plugin.

(cherry picked from commit 0bd7cd18bc4d535b0c4499913f6747b3f6315ac2)
Signed-off-by: Sean Owen <sowen@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0d0b4160
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0d0b4160
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0d0b4160

Branch: refs/heads/branch-2.0
Commit: 0d0b416097a095fa771a7d5ae368546c26cb2d8b
Parents: f3a3599
Author: WeichenXu <WeichenXu123@outlook.com>
Authored: Sat Jul 2 16:29:00 2016 +0100
Committer: Sean Owen <sowen@cloudera.com>
Committed: Sat Jul 2 16:29:26 2016 +0100

----------------------------------------------------------------------
 docs/graphx-programming-guide.md                | 133 +------------------
 .../graphx/AggregateMessagesExample.scala       |  72 ++++++++++
 .../examples/graphx/ComprehensiveExample.scala  |  80 +++++++++++
 .../graphx/ConnectedComponentsExample.scala     |  68 ++++++++++
 .../spark/examples/graphx/PageRankExample.scala |  61 +++++++++
 .../spark/examples/graphx/SSSPExample.scala     |  69 ++++++++++
 .../graphx/TriangleCountingExample.scala        |  70 ++++++++++
 7 files changed, 426 insertions(+), 127 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/0d0b4160/docs/graphx-programming-guide.md
----------------------------------------------------------------------
diff --git a/docs/graphx-programming-guide.md b/docs/graphx-programming-guide.md
index e376b66..2e9966c 100644
--- a/docs/graphx-programming-guide.md
+++ b/docs/graphx-programming-guide.md
@@ -603,29 +603,7 @@ slightly unreliable and instead opted for more explicit user control.
 In the following example we use the [`aggregateMessages`][Graph.aggregateMessages] operator
to
 compute the average age of the more senior followers of each user.
 
-{% highlight scala %}
-// Import random graph generation library
-import org.apache.spark.graphx.util.GraphGenerators
-// Create a graph with "age" as the vertex property.  Here we use a random graph for simplicity.
-val graph: Graph[Double, Int] =
-  GraphGenerators.logNormalGraph(sc, numVertices = 100).mapVertices( (id, _) => id.toDouble
)
-// Compute the number of older followers and their total age
-val olderFollowers: VertexRDD[(Int, Double)] = graph.aggregateMessages[(Int, Double)](
-  triplet => { // Map Function
-    if (triplet.srcAttr > triplet.dstAttr) {
-      // Send message to destination vertex containing counter and age
-      triplet.sendToDst(1, triplet.srcAttr)
-    }
-  },
-  // Add counter and age
-  (a, b) => (a._1 + b._1, a._2 + b._2) // Reduce Function
-)
-// Divide total age by number of older followers to get average age of older followers
-val avgAgeOfOlderFollowers: VertexRDD[Double] =
-  olderFollowers.mapValues( (id, value) => value match { case (count, totalAge) =>
totalAge / count } )
-// Display the results
-avgAgeOfOlderFollowers.collect.foreach(println(_))
-{% endhighlight %}
+{% include_example scala/org/apache/spark/examples/graphx/AggregateMessagesExample.scala
%}
 
 > The `aggregateMessages` operation performs optimally when the messages (and the sums
of
 > messages) are constant sized (e.g., floats and addition instead of lists and concatenation).
@@ -793,29 +771,7 @@ second argument list contains the user defined functions for receiving
messages
 We can use the Pregel operator to express computation such as single source
 shortest path in the following example.
 
-{% highlight scala %}
-import org.apache.spark.graphx._
-// Import random graph generation library
-import org.apache.spark.graphx.util.GraphGenerators
-// A graph with edge attributes containing distances
-val graph: Graph[Long, Double] =
-  GraphGenerators.logNormalGraph(sc, numVertices = 100).mapEdges(e => e.attr.toDouble)
-val sourceId: VertexId = 42 // The ultimate source
-// Initialize the graph such that all vertices except the root have distance infinity.
-val initialGraph = graph.mapVertices((id, _) => if (id == sourceId) 0.0 else Double.PositiveInfinity)
-val sssp = initialGraph.pregel(Double.PositiveInfinity)(
-  (id, dist, newDist) => math.min(dist, newDist), // Vertex Program
-  triplet => {  // Send Message
-    if (triplet.srcAttr + triplet.attr < triplet.dstAttr) {
-      Iterator((triplet.dstId, triplet.srcAttr + triplet.attr))
-    } else {
-      Iterator.empty
-    }
-  },
-  (a,b) => math.min(a,b) // Merge Message
-  )
-println(sssp.vertices.collect.mkString("\n"))
-{% endhighlight %}
+{% include_example scala/org/apache/spark/examples/graphx/SSSPExample.scala %}
 
 <a name="graph_builders"></a>
 
@@ -1009,64 +965,19 @@ GraphX comes with static and dynamic implementations of PageRank as
methods on t
 
 GraphX also includes an example social network dataset that we can run PageRank on. A set
of users is given in `data/graphx/users.txt`, and a set of relationships between users is
given in `data/graphx/followers.txt`. We compute the PageRank of each user as follows:
 
-{% highlight scala %}
-// Load the edges as a graph
-val graph = GraphLoader.edgeListFile(sc, "data/graphx/followers.txt")
-// Run PageRank
-val ranks = graph.pageRank(0.0001).vertices
-// Join the ranks with the usernames
-val users = sc.textFile("data/graphx/users.txt").map { line =>
-  val fields = line.split(",")
-  (fields(0).toLong, fields(1))
-}
-val ranksByUsername = users.join(ranks).map {
-  case (id, (username, rank)) => (username, rank)
-}
-// Print the result
-println(ranksByUsername.collect().mkString("\n"))
-{% endhighlight %}
+{% include_example scala/org/apache/spark/examples/graphx/PageRankExample.scala %}
 
 ## Connected Components
 
 The connected components algorithm labels each connected component of the graph with the
ID of its lowest-numbered vertex. For example, in a social network, connected components can
approximate clusters. GraphX contains an implementation of the algorithm in the [`ConnectedComponents`
object][ConnectedComponents], and we compute the connected components of the example social
network dataset from the [PageRank section](#pagerank) as follows:
 
-{% highlight scala %}
-// Load the graph as in the PageRank example
-val graph = GraphLoader.edgeListFile(sc, "data/graphx/followers.txt")
-// Find the connected components
-val cc = graph.connectedComponents().vertices
-// Join the connected components with the usernames
-val users = sc.textFile("data/graphx/users.txt").map { line =>
-  val fields = line.split(",")
-  (fields(0).toLong, fields(1))
-}
-val ccByUsername = users.join(cc).map {
-  case (id, (username, cc)) => (username, cc)
-}
-// Print the result
-println(ccByUsername.collect().mkString("\n"))
-{% endhighlight %}
+{% include_example scala/org/apache/spark/examples/graphx/ConnectedComponentsExample.scala
%}
 
 ## Triangle Counting
 
 A vertex is part of a triangle when it has two adjacent vertices with an edge between them.
GraphX implements a triangle counting algorithm in the [`TriangleCount` object][TriangleCount]
that determines the number of triangles passing through each vertex, providing a measure of
clustering. We compute the triangle count of the social network dataset from the [PageRank
section](#pagerank). *Note that `TriangleCount` requires the edges to be in canonical orientation
(`srcId < dstId`) and the graph to be partitioned using [`Graph.partitionBy`][Graph.partitionBy].*
 
-{% highlight scala %}
-// Load the edges in canonical order and partition the graph for triangle count
-val graph = GraphLoader.edgeListFile(sc, "data/graphx/followers.txt", true).partitionBy(PartitionStrategy.RandomVertexCut)
-// Find the triangle count for each vertex
-val triCounts = graph.triangleCount().vertices
-// Join the triangle counts with the usernames
-val users = sc.textFile("data/graphx/users.txt").map { line =>
-  val fields = line.split(",")
-  (fields(0).toLong, fields(1))
-}
-val triCountByUsername = users.join(triCounts).map { case (id, (username, tc)) =>
-  (username, tc)
-}
-// Print the result
-println(triCountByUsername.collect().mkString("\n"))
-{% endhighlight %}
+{% include_example scala/org/apache/spark/examples/graphx/TriangleCountingExample.scala %}
 
 
 # Examples
@@ -1076,36 +987,4 @@ to important relationships and users, run page-rank on the sub-graph,
and
 then finally return attributes associated with the top users.  I can do
 all of this in just a few lines with GraphX:
 
-{% highlight scala %}
-// Connect to the Spark cluster
-val sc = new SparkContext("spark://master.amplab.org", "research")
-
-// Load my user data and parse into tuples of user id and attribute list
-val users = (sc.textFile("data/graphx/users.txt")
-  .map(line => line.split(",")).map( parts => (parts.head.toLong, parts.tail) ))
-
-// Parse the edge data which is already in userId -> userId format
-val followerGraph = GraphLoader.edgeListFile(sc, "data/graphx/followers.txt")
-
-// Attach the user attributes
-val graph = followerGraph.outerJoinVertices(users) {
-  case (uid, deg, Some(attrList)) => attrList
-  // Some users may not have attributes so we set them as empty
-  case (uid, deg, None) => Array.empty[String]
-}
-
-// Restrict the graph to users with usernames and names
-val subgraph = graph.subgraph(vpred = (vid, attr) => attr.size == 2)
-
-// Compute the PageRank
-val pagerankGraph = subgraph.pageRank(0.001)
-
-// Get the attributes of the top pagerank users
-val userInfoWithPageRank = subgraph.outerJoinVertices(pagerankGraph.vertices) {
-  case (uid, attrList, Some(pr)) => (pr, attrList.toList)
-  case (uid, attrList, None) => (0.0, attrList.toList)
-}
-
-println(userInfoWithPageRank.vertices.top(5)(Ordering.by(_._2._1)).mkString("\n"))
-
-{% endhighlight %}
+{% include_example scala/org/apache/spark/examples/graphx/ComprehensiveExample.scala %}

http://git-wip-us.apache.org/repos/asf/spark/blob/0d0b4160/examples/src/main/scala/org/apache/spark/examples/graphx/AggregateMessagesExample.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/examples/graphx/AggregateMessagesExample.scala
b/examples/src/main/scala/org/apache/spark/examples/graphx/AggregateMessagesExample.scala
new file mode 100644
index 0000000..8f8262d
--- /dev/null
+++ b/examples/src/main/scala/org/apache/spark/examples/graphx/AggregateMessagesExample.scala
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// scalastyle:off println
+package org.apache.spark.examples.graphx
+
+// $example on$
+import org.apache.spark.graphx.{Graph, VertexRDD}
+import org.apache.spark.graphx.util.GraphGenerators
+// $example off$
+import org.apache.spark.sql.SparkSession
+
+/**
+ * An example use the [`aggregateMessages`][Graph.aggregateMessages] operator to
+ * compute the average age of the more senior followers of each user
+ * Run with
+ * {{{
+ * bin/run-example graphx.AggregateMessagesExample
+ * }}}
+ */
+object AggregateMessagesExample {
+
+  def main(args: Array[String]): Unit = {
+    // Creates a SparkSession.
+    val spark = SparkSession
+      .builder
+      .appName(s"${this.getClass.getSimpleName}")
+      .getOrCreate()
+    val sc = spark.sparkContext
+
+    // $example on$
+    // Create a graph with "age" as the vertex property.
+    // Here we use a random graph for simplicity.
+    val graph: Graph[Double, Int] =
+      GraphGenerators.logNormalGraph(sc, numVertices = 100).mapVertices( (id, _) => id.toDouble
)
+    // Compute the number of older followers and their total age
+    val olderFollowers: VertexRDD[(Int, Double)] = graph.aggregateMessages[(Int, Double)](
+      triplet => { // Map Function
+        if (triplet.srcAttr > triplet.dstAttr) {
+          // Send message to destination vertex containing counter and age
+          triplet.sendToDst(1, triplet.srcAttr)
+        }
+      },
+      // Add counter and age
+      (a, b) => (a._1 + b._1, a._2 + b._2) // Reduce Function
+    )
+    // Divide total age by number of older followers to get average age of older followers
+    val avgAgeOfOlderFollowers: VertexRDD[Double] =
+      olderFollowers.mapValues( (id, value) =>
+        value match { case (count, totalAge) => totalAge / count } )
+    // Display the results
+    avgAgeOfOlderFollowers.collect.foreach(println(_))
+    // $example off$
+
+    spark.stop()
+  }
+}
+// scalastyle:on println

http://git-wip-us.apache.org/repos/asf/spark/blob/0d0b4160/examples/src/main/scala/org/apache/spark/examples/graphx/ComprehensiveExample.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/examples/graphx/ComprehensiveExample.scala
b/examples/src/main/scala/org/apache/spark/examples/graphx/ComprehensiveExample.scala
new file mode 100644
index 0000000..6598863
--- /dev/null
+++ b/examples/src/main/scala/org/apache/spark/examples/graphx/ComprehensiveExample.scala
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// scalastyle:off println
+package org.apache.spark.examples.graphx
+
+// $example on$
+import org.apache.spark.graphx.GraphLoader
+// $example off$
+import org.apache.spark.sql.SparkSession
+
+/**
+ * Suppose I want to build a graph from some text files, restrict the graph
+ * to important relationships and users, run page-rank on the sub-graph, and
+ * then finally return attributes associated with the top users.
+ * This example do all of this in just a few lines with GraphX.
+ *
+ * Run with
+ * {{{
+ * bin/run-example graphx.ComprehensiveExample
+ * }}}
+ */
+object ComprehensiveExample {
+
+  def main(args: Array[String]): Unit = {
+    // Creates a SparkSession.
+    val spark = SparkSession
+      .builder
+      .appName(s"${this.getClass.getSimpleName}")
+      .getOrCreate()
+    val sc = spark.sparkContext
+
+    // $example on$
+    // Load my user data and parse into tuples of user id and attribute list
+    val users = (sc.textFile("data/graphx/users.txt")
+      .map(line => line.split(",")).map( parts => (parts.head.toLong, parts.tail) ))
+
+    // Parse the edge data which is already in userId -> userId format
+    val followerGraph = GraphLoader.edgeListFile(sc, "data/graphx/followers.txt")
+
+    // Attach the user attributes
+    val graph = followerGraph.outerJoinVertices(users) {
+      case (uid, deg, Some(attrList)) => attrList
+      // Some users may not have attributes so we set them as empty
+      case (uid, deg, None) => Array.empty[String]
+    }
+
+    // Restrict the graph to users with usernames and names
+    val subgraph = graph.subgraph(vpred = (vid, attr) => attr.size == 2)
+
+    // Compute the PageRank
+    val pagerankGraph = subgraph.pageRank(0.001)
+
+    // Get the attributes of the top pagerank users
+    val userInfoWithPageRank = subgraph.outerJoinVertices(pagerankGraph.vertices) {
+      case (uid, attrList, Some(pr)) => (pr, attrList.toList)
+      case (uid, attrList, None) => (0.0, attrList.toList)
+    }
+
+    println(userInfoWithPageRank.vertices.top(5)(Ordering.by(_._2._1)).mkString("\n"))
+    // $example off$
+
+    spark.stop()
+  }
+}
+// scalastyle:on println

http://git-wip-us.apache.org/repos/asf/spark/blob/0d0b4160/examples/src/main/scala/org/apache/spark/examples/graphx/ConnectedComponentsExample.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/examples/graphx/ConnectedComponentsExample.scala
b/examples/src/main/scala/org/apache/spark/examples/graphx/ConnectedComponentsExample.scala
new file mode 100644
index 0000000..5377ddb
--- /dev/null
+++ b/examples/src/main/scala/org/apache/spark/examples/graphx/ConnectedComponentsExample.scala
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// scalastyle:off println
+package org.apache.spark.examples.graphx
+
+// $example on$
+import org.apache.spark.graphx.GraphLoader
+// $example off$
+import org.apache.spark.sql.SparkSession
+
+/**
+ * A connected components algorithm example.
+ * The connected components algorithm labels each connected component of the graph
+ * with the ID of its lowest-numbered vertex.
+ * For example, in a social network, connected components can approximate clusters.
+ * GraphX contains an implementation of the algorithm in the
+ * [`ConnectedComponents` object][ConnectedComponents],
+ * and we compute the connected components of the example social network dataset.
+ *
+ * Run with
+ * {{{
+ * bin/run-example graphx.ConnectedComponentsExample
+ * }}}
+ */
+object ConnectedComponentsExample {
+  def main(args: Array[String]): Unit = {
+    // Creates a SparkSession.
+    val spark = SparkSession
+      .builder
+      .appName(s"${this.getClass.getSimpleName}")
+      .getOrCreate()
+    val sc = spark.sparkContext
+
+    // $example on$
+    // Load the graph as in the PageRank example
+    val graph = GraphLoader.edgeListFile(sc, "data/graphx/followers.txt")
+    // Find the connected components
+    val cc = graph.connectedComponents().vertices
+    // Join the connected components with the usernames
+    val users = sc.textFile("data/graphx/users.txt").map { line =>
+      val fields = line.split(",")
+      (fields(0).toLong, fields(1))
+    }
+    val ccByUsername = users.join(cc).map {
+      case (id, (username, cc)) => (username, cc)
+    }
+    // Print the result
+    println(ccByUsername.collect().mkString("\n"))
+    // $example off$
+    spark.stop()
+  }
+}
+// scalastyle:on println

http://git-wip-us.apache.org/repos/asf/spark/blob/0d0b4160/examples/src/main/scala/org/apache/spark/examples/graphx/PageRankExample.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/examples/graphx/PageRankExample.scala
b/examples/src/main/scala/org/apache/spark/examples/graphx/PageRankExample.scala
new file mode 100644
index 0000000..9e9affc
--- /dev/null
+++ b/examples/src/main/scala/org/apache/spark/examples/graphx/PageRankExample.scala
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// scalastyle:off println
+package org.apache.spark.examples.graphx
+
+// $example on$
+import org.apache.spark.graphx.GraphLoader
+// $example off$
+import org.apache.spark.sql.SparkSession
+
+/**
+ * A PageRank example on social network dataset
+ * Run with
+ * {{{
+ * bin/run-example graphx.PageRankExample
+ * }}}
+ */
+object PageRankExample {
+  def main(args: Array[String]): Unit = {
+    // Creates a SparkSession.
+    val spark = SparkSession
+      .builder
+      .appName(s"${this.getClass.getSimpleName}")
+      .getOrCreate()
+    val sc = spark.sparkContext
+
+    // $example on$
+    // Load the edges as a graph
+    val graph = GraphLoader.edgeListFile(sc, "data/graphx/followers.txt")
+    // Run PageRank
+    val ranks = graph.pageRank(0.0001).vertices
+    // Join the ranks with the usernames
+    val users = sc.textFile("data/graphx/users.txt").map { line =>
+      val fields = line.split(",")
+      (fields(0).toLong, fields(1))
+    }
+    val ranksByUsername = users.join(ranks).map {
+      case (id, (username, rank)) => (username, rank)
+    }
+    // Print the result
+    println(ranksByUsername.collect().mkString("\n"))
+    // $example off$
+    spark.stop()
+  }
+}
+// scalastyle:on println

http://git-wip-us.apache.org/repos/asf/spark/blob/0d0b4160/examples/src/main/scala/org/apache/spark/examples/graphx/SSSPExample.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/examples/graphx/SSSPExample.scala b/examples/src/main/scala/org/apache/spark/examples/graphx/SSSPExample.scala
new file mode 100644
index 0000000..5e8b196
--- /dev/null
+++ b/examples/src/main/scala/org/apache/spark/examples/graphx/SSSPExample.scala
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// scalastyle:off println
+package org.apache.spark.examples.graphx
+
+// $example on$
+import org.apache.spark.graphx.{Graph, VertexId}
+import org.apache.spark.graphx.util.GraphGenerators
+// $example off$
+import org.apache.spark.sql.SparkSession
+
+/**
+ * An example use the Pregel operator to express computation
+ * such as single source shortest path
+ * Run with
+ * {{{
+ * bin/run-example graphx.SSSPExample
+ * }}}
+ */
+object SSSPExample {
+  def main(args: Array[String]): Unit = {
+    // Creates a SparkSession.
+    val spark = SparkSession
+      .builder
+      .appName(s"${this.getClass.getSimpleName}")
+      .getOrCreate()
+    val sc = spark.sparkContext
+
+    // $example on$
+    // A graph with edge attributes containing distances
+    val graph: Graph[Long, Double] =
+      GraphGenerators.logNormalGraph(sc, numVertices = 100).mapEdges(e => e.attr.toDouble)
+    val sourceId: VertexId = 42 // The ultimate source
+    // Initialize the graph such that all vertices except the root have distance infinity.
+    val initialGraph = graph.mapVertices((id, _) =>
+        if (id == sourceId) 0.0 else Double.PositiveInfinity)
+    val sssp = initialGraph.pregel(Double.PositiveInfinity)(
+      (id, dist, newDist) => math.min(dist, newDist), // Vertex Program
+      triplet => {  // Send Message
+        if (triplet.srcAttr + triplet.attr < triplet.dstAttr) {
+          Iterator((triplet.dstId, triplet.srcAttr + triplet.attr))
+        } else {
+          Iterator.empty
+        }
+      },
+      (a, b) => math.min(a, b) // Merge Message
+    )
+    println(sssp.vertices.collect.mkString("\n"))
+    // $example off$
+
+    spark.stop()
+  }
+}
+// scalastyle:on println

http://git-wip-us.apache.org/repos/asf/spark/blob/0d0b4160/examples/src/main/scala/org/apache/spark/examples/graphx/TriangleCountingExample.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/examples/graphx/TriangleCountingExample.scala
b/examples/src/main/scala/org/apache/spark/examples/graphx/TriangleCountingExample.scala
new file mode 100644
index 0000000..b9bff69
--- /dev/null
+++ b/examples/src/main/scala/org/apache/spark/examples/graphx/TriangleCountingExample.scala
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// scalastyle:off println
+package org.apache.spark.examples.graphx
+
+// $example on$
+import org.apache.spark.graphx.{GraphLoader, PartitionStrategy}
+// $example off$
+import org.apache.spark.sql.SparkSession
+
+/**
+ * A vertex is part of a triangle when it has two adjacent vertices with an edge between
them.
+ * GraphX implements a triangle counting algorithm in the [`TriangleCount` object][TriangleCount]
+ * that determines the number of triangles passing through each vertex,
+ * providing a measure of clustering.
+ * We compute the triangle count of the social network dataset.
+ *
+ * Note that `TriangleCount` requires the edges to be in canonical orientation (`srcId <
dstId`)
+ * and the graph to be partitioned using [`Graph.partitionBy`][Graph.partitionBy].
+ *
+ * Run with
+ * {{{
+ * bin/run-example graphx.TriangleCountingExample
+ * }}}
+ */
+object TriangleCountingExample {
+  def main(args: Array[String]): Unit = {
+    // Creates a SparkSession.
+    val spark = SparkSession
+      .builder
+      .appName(s"${this.getClass.getSimpleName}")
+      .getOrCreate()
+    val sc = spark.sparkContext
+
+    // $example on$
+    // Load the edges in canonical order and partition the graph for triangle count
+    val graph = GraphLoader.edgeListFile(sc, "data/graphx/followers.txt", true)
+      .partitionBy(PartitionStrategy.RandomVertexCut)
+    // Find the triangle count for each vertex
+    val triCounts = graph.triangleCount().vertices
+    // Join the triangle counts with the usernames
+    val users = sc.textFile("data/graphx/users.txt").map { line =>
+      val fields = line.split(",")
+      (fields(0).toLong, fields(1))
+    }
+    val triCountByUsername = users.join(triCounts).map { case (id, (username, tc)) =>
+      (username, tc)
+    }
+    // Print the result
+    println(triCountByUsername.collect().mkString("\n"))
+    // $example off$
+    spark.stop()
+  }
+}
+// scalastyle:on println


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message