flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rmetz...@apache.org
Subject [11/50] flink git commit: [FLINK-3195] [examples] Consolidate batch examples into one project, unify batch and streaming examples under on parent project
Date Thu, 14 Jan 2016 16:16:08 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/wordcount/WordCountPojo.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/wordcount/WordCountPojo.java b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/wordcount/WordCountPojo.java
new file mode 100644
index 0000000..b0653ca
--- /dev/null
+++ b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/wordcount/WordCountPojo.java
@@ -0,0 +1,173 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.examples.java.wordcount;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.core.fs.FileSystem.WriteMode;
+import org.apache.flink.examples.java.wordcount.util.WordCountData;
+import org.apache.flink.util.Collector;
+
+/**
+ * This example shows an implementation of WordCount without using the
+ * Tuple2 type, but a custom class.
+ */
+@SuppressWarnings("serial")
+public class WordCountPojo {
+	
+	/**
+	 * This is the POJO (Plain Old Java Object) that is being used
+	 * for all the operations.
+	 * As long as all fields are public or have a getter/setter, the system can handle them
+	 */
+	public static class Word {
+		
+		// fields
+		private String word;
+		private int frequency;
+		
+		// constructors
+		public Word() {}
+		
+		public Word(String word, int i) {
+			this.word = word;
+			this.frequency = i;
+		}
+		
+		// getters setters
+		public String getWord() {
+			return word;
+		}
+		
+		public void setWord(String word) {
+			this.word = word;
+		}
+		
+		public int getFrequency() {
+			return frequency;
+		}
+		
+		public void setFrequency(int frequency) {
+			this.frequency = frequency;
+		}
+
+		@Override
+		public String toString() {
+			return "Word="+word+" freq="+frequency;
+		}
+	}
+	
+	public static void main(String[] args) throws Exception {
+		if (!parseParameters(args)) {
+			return;
+		}
+		
+		// set up the execution environment
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		
+		// get input data
+		DataSet<String> text = getTextDataSet(env);
+		
+		DataSet<Word> counts = 
+			// split up the lines into Word objects (with frequency = 1)
+			text.flatMap(new Tokenizer())
+			// group by the field word and sum up the frequency
+			.groupBy("word")
+			.reduce(new ReduceFunction<Word>() {
+				@Override
+				public Word reduce(Word value1, Word value2) throws Exception {
+					return new Word(value1.word,value1.frequency + value2.frequency);
+				}
+			});
+		
+		if (fileOutput) {
+			counts.writeAsText(outputPath, WriteMode.OVERWRITE);
+			// execute program
+			env.execute("WordCount-Pojo Example");
+		} else {
+			counts.print();
+		}
+
+	}
+	
+	// *************************************************************************
+	//     USER FUNCTIONS
+	// *************************************************************************
+	
+	/**
+	 * Implements the string tokenizer that splits sentences into words as a user-defined
+	 * FlatMapFunction. The function takes a line (String) and splits it into 
+	 * multiple Word objects.
+	 */
+	public static final class Tokenizer implements FlatMapFunction<String, Word> {
+
+		@Override
+		public void flatMap(String value, Collector<Word> out) {
+			// normalize and split the line
+			String[] tokens = value.toLowerCase().split("\\W+");
+			
+			// emit the pairs
+			for (String token : tokens) {
+				if (token.length() > 0) {
+					out.collect(new Word(token, 1));
+				}
+			}
+		}
+	}
+	
+	// *************************************************************************
+	//     UTIL METHODS
+	// *************************************************************************
+	
+	private static boolean fileOutput = false;
+	private static String textPath;
+	private static String outputPath;
+	
+	private static boolean parseParameters(String[] args) {
+		
+		if (args.length > 0) {
+			// parse input arguments
+			fileOutput = true;
+			if (args.length == 2) {
+				textPath = args[0];
+				outputPath = args[1];
+			} else {
+				System.err.println("Usage: WordCount <text path> <result path>");
+				return false;
+			}
+		} else {
+			System.out.println("Executing WordCount example with built-in default data.");
+			System.out.println("  Provide parameters to read input data from a file.");
+			System.out.println("  Usage: WordCount <text path> <result path>");
+		}
+		return true;
+	}
+	
+	private static DataSet<String> getTextDataSet(ExecutionEnvironment env) {
+		if (fileOutput) {
+			// read the text file from given input path
+			return env.readTextFile(textPath);
+		} else {
+			// get default test text data
+			return WordCountData.getDefaultTextLineDataSet(env);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/wordcount/util/WordCountData.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/wordcount/util/WordCountData.java b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/wordcount/util/WordCountData.java
new file mode 100644
index 0000000..52efee7
--- /dev/null
+++ b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/wordcount/util/WordCountData.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.examples.java.wordcount.util;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+
+/**
+ * Provides the default data sets used for the WordCount example program.
+ * The default data sets are used, if no parameters are given to the program.
+ *
+ */
+public class WordCountData {
+
+	public static final String[] WORDS = new String[] {
+		"To be, or not to be,--that is the question:--",
+		"Whether 'tis nobler in the mind to suffer",
+		"The slings and arrows of outrageous fortune",
+		"Or to take arms against a sea of troubles,",
+		"And by opposing end them?--To die,--to sleep,--",
+		"No more; and by a sleep to say we end",
+		"The heartache, and the thousand natural shocks",
+		"That flesh is heir to,--'tis a consummation",
+		"Devoutly to be wish'd. To die,--to sleep;--",
+		"To sleep! perchance to dream:--ay, there's the rub;",
+		"For in that sleep of death what dreams may come,",
+		"When we have shuffled off this mortal coil,",
+		"Must give us pause: there's the respect",
+		"That makes calamity of so long life;",
+		"For who would bear the whips and scorns of time,",
+		"The oppressor's wrong, the proud man's contumely,",
+		"The pangs of despis'd love, the law's delay,",
+		"The insolence of office, and the spurns",
+		"That patient merit of the unworthy takes,",
+		"When he himself might his quietus make",
+		"With a bare bodkin? who would these fardels bear,",
+		"To grunt and sweat under a weary life,",
+		"But that the dread of something after death,--",
+		"The undiscover'd country, from whose bourn",
+		"No traveller returns,--puzzles the will,",
+		"And makes us rather bear those ills we have",
+		"Than fly to others that we know not of?",
+		"Thus conscience does make cowards of us all;",
+		"And thus the native hue of resolution",
+		"Is sicklied o'er with the pale cast of thought;",
+		"And enterprises of great pith and moment,",
+		"With this regard, their currents turn awry,",
+		"And lose the name of action.--Soft you now!",
+		"The fair Ophelia!--Nymph, in thy orisons",
+		"Be all my sins remember'd."
+	};
+
+	public static DataSet<String> getDefaultTextLineDataSet(ExecutionEnvironment env) {
+		return env.fromElements(WORDS);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-examples/flink-examples-batch/src/main/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-batch/src/main/resources/log4j-test.properties b/flink-examples/flink-examples-batch/src/main/resources/log4j-test.properties
new file mode 100644
index 0000000..65bd0b8
--- /dev/null
+++ b/flink-examples/flink-examples-batch/src/main/resources/log4j-test.properties
@@ -0,0 +1,23 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+log4j.rootLogger=INFO, console
+
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-examples/flink-examples-batch/src/main/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-batch/src/main/resources/log4j.properties b/flink-examples/flink-examples-batch/src/main/resources/log4j.properties
new file mode 100644
index 0000000..da32ea0
--- /dev/null
+++ b/flink-examples/flink-examples-batch/src/main/resources/log4j.properties
@@ -0,0 +1,23 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+log4j.rootLogger=INFO, console
+
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n

http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-examples/flink-examples-batch/src/main/resources/logback.xml
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-batch/src/main/resources/logback.xml b/flink-examples/flink-examples-batch/src/main/resources/logback.xml
new file mode 100644
index 0000000..95f2d04
--- /dev/null
+++ b/flink-examples/flink-examples-batch/src/main/resources/logback.xml
@@ -0,0 +1,29 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<configuration>
+    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+        <encoder>
+            <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n</pattern>
+        </encoder>
+    </appender>
+
+    <root level="INFO">
+        <appender-ref ref="STDOUT"/>
+    </root>
+</configuration>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/clustering/KMeans.scala
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/clustering/KMeans.scala b/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/clustering/KMeans.scala
new file mode 100644
index 0000000..08a3e62
--- /dev/null
+++ b/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/clustering/KMeans.scala
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.examples.scala.clustering
+
+import org.apache.flink.api.common.functions._
+import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields
+import org.apache.flink.api.scala._
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.examples.java.clustering.util.KMeansData
+
+import scala.collection.JavaConverters._
+
+/**
+ * This example implements a basic K-Means clustering algorithm.
+ *
+ * K-Means is an iterative clustering algorithm and works as follows:
+ * K-Means is given a set of data points to be clustered and an initial set of ''K'' cluster
+ * centers.
+ * In each iteration, the algorithm computes the distance of each data point to each cluster center.
+ * Each point is assigned to the cluster center which is closest to it.
+ * Subsequently, each cluster center is moved to the center (''mean'') of all points that have
+ * been assigned to it.
+ * The moved cluster centers are fed into the next iteration. 
+ * The algorithm terminates after a fixed number of iterations (as in this implementation) 
+ * or if cluster centers do not (significantly) move in an iteration.
+ * This is the Wikipedia entry for the [[http://en.wikipedia
+ * .org/wiki/K-means_clustering K-Means Clustering algorithm]].
+ *
+ * This implementation works on two-dimensional data points.
+ * It computes an assignment of data points to cluster centers, i.e., 
+ * each data point is annotated with the id of the final cluster (center) it belongs to.
+ *
+ * Input files are plain text files and must be formatted as follows:
+ *
+ *  - Data points are represented as two double values separated by a blank character.
+ *    Data points are separated by newline characters.
+ *    For example `"1.2 2.3\n5.3 7.2\n"` gives two data points (x=1.2, y=2.3) and (x=5.3,
+ *    y=7.2).
+ *  - Cluster centers are represented by an integer id and a point value.
+ *    For example `"1 6.2 3.2\n2 2.9 5.7\n"` gives two centers (id=1, x=6.2,
+ *    y=3.2) and (id=2, x=2.9, y=5.7).
+ *
+ * Usage:
+ * {{{
+ *   KMeans <points path> <centers path> <result path> <num iterations>
+ * }}}
+ * If no parameters are provided, the program is run with default data from
+ * [[org.apache.flink.examples.java.clustering.util.KMeansData]]
+ * and 10 iterations.
+ *
+ * This example shows how to use:
+ *
+ *  - Bulk iterations
+ *  - Broadcast variables in bulk iterations
+ *  - Custom Java objects (PoJos)
+ */
+object KMeans {
+
+  def main(args: Array[String]) {
+    if (!parseParameters(args)) {
+      return
+    }
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+
+    val points: DataSet[Point] = getPointDataSet(env)
+    val centroids: DataSet[Centroid] = getCentroidDataSet(env)
+
+    val finalCentroids = centroids.iterate(numIterations) { currentCentroids =>
+      val newCentroids = points
+        .map(new SelectNearestCenter).withBroadcastSet(currentCentroids, "centroids")
+        .map { x => (x._1, x._2, 1L) }.withForwardedFields("_1; _2")
+        .groupBy(0)
+        .reduce { (p1, p2) => (p1._1, p1._2.add(p2._2), p1._3 + p2._3) }.withForwardedFields("_1")
+        .map { x => new Centroid(x._1, x._2.div(x._3)) }.withForwardedFields("_1->id")
+      newCentroids
+    }
+
+    val clusteredPoints: DataSet[(Int, Point)] =
+      points.map(new SelectNearestCenter).withBroadcastSet(finalCentroids, "centroids")
+
+    if (fileOutput) {
+      clusteredPoints.writeAsCsv(outputPath, "\n", " ")
+      env.execute("Scala KMeans Example")
+    }
+    else {
+      clusteredPoints.print()
+    }
+
+  }
+
+  private def parseParameters(programArguments: Array[String]): Boolean = {
+    if (programArguments.length > 0) {
+      fileOutput = true
+      if (programArguments.length == 4) {
+        pointsPath = programArguments(0)
+        centersPath = programArguments(1)
+        outputPath = programArguments(2)
+        numIterations = Integer.parseInt(programArguments(3))
+
+        true
+      }
+      else {
+        System.err.println("Usage: KMeans <points path> <centers path> <result path> <num " +
+          "iterations>")
+
+        false
+      }
+    }
+    else {
+      System.out.println("Executing K-Means example with default parameters and built-in default " +
+        "data.")
+      System.out.println("  Provide parameters to read input data from files.")
+      System.out.println("  See the documentation for the correct format of input files.")
+      System.out.println("  We provide a data generator to create synthetic input files for this " +
+        "program.")
+      System.out.println("  Usage: KMeans <points path> <centers path> <result path> <num " +
+        "iterations>")
+
+      true
+    }
+  }
+
+  private def getPointDataSet(env: ExecutionEnvironment): DataSet[Point] = {
+    if (fileOutput) {
+      env.readCsvFile[(Double, Double)](
+        pointsPath,
+        fieldDelimiter = " ",
+        includedFields = Array(0, 1))
+        .map { x => new Point(x._1, x._2)}
+    }
+    else {
+      val points = KMeansData.POINTS map {
+        case Array(x, y) => new Point(x.asInstanceOf[Double], y.asInstanceOf[Double])
+      }
+      env.fromCollection(points)
+    }
+  }
+
+  private def getCentroidDataSet(env: ExecutionEnvironment): DataSet[Centroid] = {
+    if (fileOutput) {
+      env.readCsvFile[(Int, Double, Double)](
+        centersPath,
+        fieldDelimiter = " ",
+        includedFields = Array(0, 1, 2))
+        .map { x => new Centroid(x._1, x._2, x._3)}
+    }
+    else {
+      val centroids = KMeansData.CENTROIDS map {
+        case Array(id, x, y) =>
+          new Centroid(id.asInstanceOf[Int], x.asInstanceOf[Double], y.asInstanceOf[Double])
+      }
+      env.fromCollection(centroids)
+    }
+  }
+
+  private var fileOutput: Boolean = false
+  private var pointsPath: String = null
+  private var centersPath: String = null
+  private var outputPath: String = null
+  private var numIterations: Int = 10
+
+  /**
+   * A simple two-dimensional point.
+   */
+  class Point(var x: Double, var y: Double) extends Serializable {
+    def this() {
+      this(0, 0)
+    }
+
+    def add(other: Point): Point = {
+      x += other.x
+      y += other.y
+      this
+    }
+
+    def div(other: Long): Point = {
+      x /= other
+      y /= other
+      this
+    }
+
+    def euclideanDistance(other: Point): Double = {
+      Math.sqrt((x - other.x) * (x - other.x) + (y - other.y) * (y - other.y))
+    }
+
+    def clear(): Unit = {
+      x = 0
+      y = 0
+    }
+
+    override def toString: String = {
+      x + " " + y
+    }
+  }
+
+  /**
+   * A simple two-dimensional centroid, basically a point with an ID.
+   */
+  class Centroid(var id: Int, x: Double, y: Double) extends Point(x, y) {
+    def this() {
+      this(0, 0, 0)
+    }
+
+    def this(id: Int, p: Point) {
+      this(id, p.x, p.y)
+    }
+
+    override def toString: String = {
+      id + " " + super.toString
+    }
+  }
+
+  /** Determines the closest cluster center for a data point. */
+  @ForwardedFields(Array("*->_2"))
+  final class SelectNearestCenter extends RichMapFunction[Point, (Int, Point)] {
+    private var centroids: Traversable[Centroid] = null
+
+    /** Reads the centroid values from a broadcast variable into a collection. */
+    override def open(parameters: Configuration) {
+      centroids = getRuntimeContext.getBroadcastVariable[Centroid]("centroids").asScala
+    }
+
+    def map(p: Point): (Int, Point) = {
+      var minDistance: Double = Double.MaxValue
+      var closestCentroidId: Int = -1
+      for (centroid <- centroids) {
+        val distance = p.euclideanDistance(centroid)
+        if (distance < minDistance) {
+          minDistance = distance
+          closestCentroidId = centroid.id
+        }
+      }
+      (closestCentroidId, p)
+    }
+
+  }
+}
+
+

http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/graph/ConnectedComponents.scala
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/graph/ConnectedComponents.scala b/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/graph/ConnectedComponents.scala
new file mode 100644
index 0000000..3df9791
--- /dev/null
+++ b/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/graph/ConnectedComponents.scala
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.examples.scala.graph
+
+import org.apache.flink.api.scala._
+import org.apache.flink.examples.java.graph.util.ConnectedComponentsData
+import org.apache.flink.util.Collector
+
+/**
+ * An implementation of the connected components algorithm, using a delta iteration.
+ *
+ * Initially, the algorithm assigns each vertex an unique ID. In each step, a vertex picks the
+ * minimum of its own ID and its neighbors' IDs, as its new ID and tells its neighbors about its
+ * new ID. After the algorithm has completed, all vertices in the same component will have the same
+ * ID.
+ *
+ * A vertex whose component ID did not change needs not propagate its information in the next
+ * step. Because of that, the algorithm is easily expressible via a delta iteration. We here model
+ * the solution set as the vertices with their current component ids, and the workset as the changed
+ * vertices. Because we see all vertices initially as changed, the initial workset and the initial
+ * solution set are identical. Also, the delta to the solution set is consequently also the next
+ * workset.
+ * 
+ * Input files are plain text files and must be formatted as follows:
+ *
+ *   - Vertices represented as IDs and separated by new-line characters. For example,
+ *     `"1\n2\n12\n42\n63"` gives five vertices (1), (2), (12), (42), and (63).
+ *   - Edges are represented as pairs for vertex IDs which are separated by space characters. Edges
+ *     are separated by new-line characters. For example `"1 2\n2 12\n1 12\n42 63"`
+ *     gives four (undirected) edges (1)-(2), (2)-(12), (1)-(12), and (42)-(63).
+ *
+ * Usage:
+ * {{{
+ *   ConnectedComponents <vertices path> <edges path> <result path> <max number of iterations>
+ * }}}
+ *   
+ * If no parameters are provided, the program is run with default data from
+ * [[org.apache.flink.examples.java.graph.util.ConnectedComponentsData]] and 10 iterations.
+ * 
+ *
+ * This example shows how to use:
+ *
+ *   - Delta Iterations
+ *   - Generic-typed Functions 
+ *   
+ */
+object ConnectedComponents {
+  def main(args: Array[String]) {
+    if (!parseParameters(args)) {
+      return
+    }
+    // set up execution environment
+    val env = ExecutionEnvironment.getExecutionEnvironment
+
+    // read vertex and edge data
+    // assign the initial components (equal to the vertex id)
+    val vertices = getVerticesDataSet(env).map { id => (id, id) }.withForwardedFields("*->_1;*->_2")
+
+    // undirected edges by emitting for each input edge the input edges itself and an inverted
+    // version
+    val edges = getEdgesDataSet(env).flatMap { edge => Seq(edge, (edge._2, edge._1)) }
+
+    // open a delta iteration
+    val verticesWithComponents = vertices.iterateDelta(vertices, maxIterations, Array("_1")) {
+      (s, ws) =>
+
+        // apply the step logic: join with the edges
+        val allNeighbors = ws.join(edges).where(0).equalTo(0) { (vertex, edge) =>
+          (edge._2, vertex._2)
+        }.withForwardedFieldsFirst("_2->_2").withForwardedFieldsSecond("_2->_1")
+
+        // select the minimum neighbor
+        val minNeighbors = allNeighbors.groupBy(0).min(1)
+
+        // update if the component of the candidate is smaller
+        val updatedComponents = minNeighbors.join(s).where(0).equalTo(0) {
+          (newVertex, oldVertex, out: Collector[(Long, Long)]) =>
+            if (newVertex._2 < oldVertex._2) out.collect(newVertex)
+        }.withForwardedFieldsFirst("*")
+
+        // delta and new workset are identical
+        (updatedComponents, updatedComponents)
+    }
+    if (fileOutput) {
+      verticesWithComponents.writeAsCsv(outputPath, "\n", " ")
+      env.execute("Scala Connected Components Example")
+    } else {
+      verticesWithComponents.print()
+    }
+
+  }
+ 
+  private def parseParameters(args: Array[String]): Boolean = {
+    if (args.length > 0) {
+      fileOutput = true
+      if (args.length == 4) {
+        verticesPath = args(0)
+        edgesPath = args(1)
+        outputPath = args(2)
+        maxIterations = args(3).toInt
+
+        true
+      } else {
+        System.err.println("Usage: ConnectedComponents <vertices path> <edges path> <result path>" +
+          " <max number of iterations>")
+
+        false
+      }
+    } else {
+      System.out.println("Executing Connected Components example with built-in default data.")
+      System.out.println("  Provide parameters to read input data from a file.")
+      System.out.println("  Usage: ConnectedComponents <vertices path> <edges path> <result path>" +
+        " <max number of iterations>")
+
+      true
+    }
+  }
+
+  private def getVerticesDataSet(env: ExecutionEnvironment): DataSet[Long] = {
+    if (fileOutput) {
+       env.readCsvFile[Tuple1[Long]](
+        verticesPath,
+        includedFields = Array(0))
+        .map { x => x._1 }
+    }
+    else {
+      env.fromCollection(ConnectedComponentsData.VERTICES)
+    }
+  }
+  
+  private def getEdgesDataSet(env: ExecutionEnvironment): DataSet[(Long, Long)] = {
+    if (fileOutput) {
+      env.readCsvFile[(Long, Long)](
+        edgesPath,
+        fieldDelimiter = " ",
+        includedFields = Array(0, 1))
+        .map { x => (x._1, x._2)}
+    }
+    else {
+      val edgeData = ConnectedComponentsData.EDGES map {
+        case Array(x, y) => (x.asInstanceOf[Long], y.asInstanceOf[Long])
+      }
+      env.fromCollection(edgeData)
+    }
+  }
+
+  private var fileOutput: Boolean = false
+  private var verticesPath: String = null
+  private var edgesPath: String = null
+  private var maxIterations: Int = 10
+  private var outputPath: String = null
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/graph/DeltaPageRank.scala
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/graph/DeltaPageRank.scala b/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/graph/DeltaPageRank.scala
new file mode 100644
index 0000000..41fb307
--- /dev/null
+++ b/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/graph/DeltaPageRank.scala
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.examples.scala.graph
+
+import org.apache.flink.api.scala._
+import org.apache.flink.util.Collector
+
+object DeltaPageRank {
+
+  private final val DAMPENING_FACTOR: Double = 0.85
+  private final val NUM_VERTICES = 5
+  private final val INITIAL_RANK = 1.0 / NUM_VERTICES
+  private final val RANDOM_JUMP = (1 - DAMPENING_FACTOR) / NUM_VERTICES
+  private final val THRESHOLD = 0.0001 / NUM_VERTICES
+
+  type Page = (Long, Double)
+  type Adjacency = (Long, Array[Long])
+
+  def main(args: Array[String]) {
+
+    val maxIterations = 100
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+
+    val rawLines: DataSet[String] = env.fromElements(
+                                                      "1 2 3 4",
+                                                      "2 1",
+                                                      "3 5",
+                                                      "4 2 3",
+                                                      "5 2 4")
+    val adjacency: DataSet[Adjacency] = rawLines
+      .map(str => {
+        val elements = str.split(' ')
+        val id = elements(0).toLong
+        val neighbors = elements.slice(1, elements.length).map(_.toLong)
+        (id, neighbors)
+      })
+
+    val initialRanks: DataSet[Page] = adjacency.flatMap {
+      (adj, out: Collector[Page]) =>
+        {
+          val targets = adj._2
+          val rankPerTarget = INITIAL_RANK * DAMPENING_FACTOR / targets.length
+
+          // dampend fraction to targets
+          for (target <- targets) {
+            out.collect((target, rankPerTarget))
+          }
+
+          // random jump to self
+          out.collect((adj._1, RANDOM_JUMP))
+        }
+    }
+      .groupBy(0).sum(1)
+
+    val initialDeltas = initialRanks.map { (page) => (page._1, page._2 - INITIAL_RANK) }
+                                      .withForwardedFields("_1")
+
+    val iteration = initialRanks.iterateDelta(initialDeltas, maxIterations, Array(0)) {
+
+      (solutionSet, workset) =>
+        {
+          val deltas = workset.join(adjacency).where(0).equalTo(0) {
+            (lastDeltas, adj, out: Collector[Page]) =>
+              {
+                val targets = adj._2
+                val deltaPerTarget = DAMPENING_FACTOR * lastDeltas._2 / targets.length
+
+                for (target <- targets) {
+                  out.collect((target, deltaPerTarget))
+                }
+              }
+          }
+            .groupBy(0).sum(1)
+            .filter(x => Math.abs(x._2) > THRESHOLD)
+
+          val rankUpdates = solutionSet.join(deltas).where(0).equalTo(0) {
+            (current, delta) => (current._1, current._2 + delta._2)
+          }.withForwardedFieldsFirst("_1")
+
+          (rankUpdates, deltas)
+        }
+    }
+
+    iteration.print()
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/graph/EnumTrianglesBasic.scala
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/graph/EnumTrianglesBasic.scala b/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/graph/EnumTrianglesBasic.scala
new file mode 100644
index 0000000..170aa1d
--- /dev/null
+++ b/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/graph/EnumTrianglesBasic.scala
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.examples.scala.graph
+
+import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields
+import org.apache.flink.api.scala._
+import scala.collection.JavaConverters._
+import org.apache.flink.api.scala.ExecutionEnvironment
+import org.apache.flink.api.common.functions.GroupReduceFunction
+import org.apache.flink.util.Collector
+import org.apache.flink.examples.java.graph.util.EnumTrianglesData
+import org.apache.flink.api.common.operators.Order
+
+import scala.collection.mutable
+
+/**
+ * Triangle enumeration is a pre-processing step to find closely connected parts in graphs.
+ * A triangle consists of three edges that connect three vertices with each other.
+ * 
+ * The algorithm works as follows:
+ * It groups all edges that share a common vertex and builds triads, i.e., triples of vertices 
+ * that are connected by two edges. Finally, all triads are filtered for which no third edge exists 
+ * that closes the triangle.
+ *  
+ * Input files are plain text files and must be formatted as follows:
+ *
+ *  - Edges are represented as pairs for vertex IDs which are separated by space
+ *   characters. Edges are separated by new-line characters.
+ *   For example `"1 2\n2 12\n1 12\n42 63"` gives four (undirected) edges (1)-(2), (2)-(12),
+ *   (1)-(12), and (42)-(63) that include a triangle
+ *
+ * <pre>
+ *     (1)
+ *     /  \
+ *   (2)-(12)
+ * </pre>
+ * 
+ * Usage: 
+ * {{{
+ * EnumTriangleBasic <edge path> <result path>
+ * }}}
+ * <br>
+ * If no parameters are provided, the program is run with default data from 
+ * [[org.apache.flink.examples.java.graph.util.EnumTrianglesData]]
+ * 
+ * This example shows how to use:
+ *
+ *  - Custom Java objects which extend Tuple
+ *  - Group Sorting
+ *
+ */
+object EnumTrianglesBasic {
+
+  def main(args: Array[String]) {
+    if (!parseParameters(args)) {
+      return
+    }
+
+    // set up execution environment
+    val env = ExecutionEnvironment.getExecutionEnvironment
+
+    // read input data
+    val edges = getEdgeDataSet(env)
+    
+    // project edges by vertex id
+    val edgesById = edges map(e => if (e.v1 < e.v2) e else Edge(e.v2, e.v1) )
+    
+    val triangles = edgesById
+            // build triads
+            .groupBy("v1").sortGroup("v2", Order.ASCENDING).reduceGroup(new TriadBuilder())
+            // filter triads
+            .join(edgesById).where("v2", "v3").equalTo("v1", "v2") { (t, _) => t }
+              .withForwardedFieldsFirst("*")
+    
+    // emit result
+    if (fileOutput) {
+      triangles.writeAsCsv(outputPath, "\n", ",")
+      // execute program
+      env.execute("TriangleEnumeration Example")
+    } else {
+      triangles.print()
+    }
+    
+
+  }
+
+  // *************************************************************************
+  //     USER DATA TYPES
+  // *************************************************************************
+
+  case class Edge(v1: Int, v2: Int) extends Serializable
+  case class Triad(v1: Int, v2: Int, v3: Int) extends Serializable
+  
+    
+  // *************************************************************************
+  //     USER FUNCTIONS
+  // *************************************************************************
+
+  /**
+   *  Builds triads (triples of vertices) from pairs of edges that share a vertex. The first vertex
+   *  of a triad is the shared vertex, the second and third vertex are ordered by vertexId. Assumes
+   *  that input edges share the first vertex and are in ascending order of the second vertex.
+   */
+  @ForwardedFields(Array("v1->v1"))
+  class TriadBuilder extends GroupReduceFunction[Edge, Triad] {
+
+    val vertices = mutable.MutableList[Integer]()
+    
+    override def reduce(edges: java.lang.Iterable[Edge], out: Collector[Triad]) = {
+      
+      // clear vertex list
+      vertices.clear()
+
+      // build and emit triads
+      for(e <- edges.asScala) {
+      
+        // combine vertex with all previously read vertices
+        for(v <- vertices) {
+          out.collect(Triad(e.v1, v, e.v2))
+        }
+        vertices += e.v2
+      }
+    }
+  }
+
+  // *************************************************************************
+  //     UTIL METHODS
+  // *************************************************************************
+
+  private def parseParameters(args: Array[String]): Boolean = {
+    if (args.length > 0) {
+      fileOutput = true
+      if (args.length == 2) {
+        edgePath = args(0)
+        outputPath = args(1)
+
+        true
+      } else {
+        System.err.println("Usage: EnumTriangleBasic <edge path> <result path>")
+
+        false
+      }
+    } else {
+      System.out.println("Executing Enum Triangles Basic example with built-in default data.")
+      System.out.println("  Provide parameters to read input data from files.")
+      System.out.println("  See the documentation for the correct format of input files.")
+      System.out.println("  Usage: EnumTriangleBasic <edge path> <result path>")
+
+      true
+    }
+  }
+
+  private def getEdgeDataSet(env: ExecutionEnvironment): DataSet[Edge] = {
+    if (fileOutput) {
+      env.readCsvFile[Edge](edgePath, fieldDelimiter = " ", includedFields = Array(0, 1))
+    } else {
+      val edges = EnumTrianglesData.EDGES.map {
+        case Array(v1, v2) => new Edge(v1.asInstanceOf[Int], v2.asInstanceOf[Int])
+      }
+      env.fromCollection(edges)
+    }
+  }
+  
+  
+  private var fileOutput: Boolean = false
+  private var edgePath: String = null
+  private var outputPath: String = null
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/graph/EnumTrianglesOpt.scala
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/graph/EnumTrianglesOpt.scala b/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/graph/EnumTrianglesOpt.scala
new file mode 100644
index 0000000..060a5f9
--- /dev/null
+++ b/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/graph/EnumTrianglesOpt.scala
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.examples.scala.graph
+
+import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields
+import org.apache.flink.api.scala._
+import scala.collection.JavaConverters._
+import org.apache.flink.api.scala.ExecutionEnvironment
+import org.apache.flink.api.common.functions.GroupReduceFunction
+import org.apache.flink.util.Collector
+import org.apache.flink.examples.java.graph.util.EnumTrianglesData
+import org.apache.flink.api.common.operators.Order
+
+import scala.collection.mutable
+
+
+/**
+ * Triangle enumeration is a pre-processing step to find closely connected parts in graphs.
+ * A triangle consists of three edges that connect three vertices with each other.
+ *
+ * The basic algorithm works as follows:
+ * It groups all edges that share a common vertex and builds triads, i.e., triples of vertices 
+ * that are connected by two edges. Finally, all triads are filtered for which no third edge exists 
+ * that closes the triangle.
+ *
+ * For a group of ''i'' edges that share a common vertex, the number of built triads is quadratic
+ * ''(n*(n-1))/2)''. Therefore, an optimization of the algorithm is to group edges on the vertex
+ * with the smaller output degree to reduce the number of triads.
+ * This implementation extends the basic algorithm by computing output degrees of edge vertices and 
+ * grouping on edges on the vertex with the smaller degree.
+ *
+ * Input files are plain text files and must be formatted as follows:
+ *
+ *  - Edges are represented as pairs for vertex IDs which are separated by space
+ *    characters. Edges are separated by new-line characters.
+ *    For example `"1 2\n2 12\n1 12\n42 63"` gives four (undirected) edges (1)-(2), (2)-(12),
+ *    (1)-(12), and (42)-(63) that include a triangle
+ *
+ * <pre>
+ *     (1)
+ *     /  \
+ *   (2)-(12)
+ * </pre>
+ *
+ * Usage:
+ * {{{
+ *   EnumTriangleOpt <edge path> <result path>
+ * }}}
+ *
+ * If no parameters are provided, the program is run with default data from
+ * [[org.apache.flink.examples.java.graph.util.EnumTrianglesData]].
+ *
+ * This example shows how to use:
+ *
+ *  - Custom Java objects which extend Tuple
+ *  - Group Sorting
+ *
+ */
+object EnumTrianglesOpt {
+
+  def main(args: Array[String]) {
+    if (!parseParameters(args)) {
+      return
+    }
+
+    // set up execution environment
+    val env = ExecutionEnvironment.getExecutionEnvironment
+
+    // read input data
+    val edges = getEdgeDataSet(env)
+
+    val edgesWithDegrees = edges
+      // duplicate and switch edges
+      .flatMap(e => Seq(e, Edge(e.v2, e.v1)))
+      // add degree of first vertex
+      .groupBy("v1").sortGroup("v2", Order.ASCENDING).reduceGroup(new DegreeCounter())
+      // join degrees of vertices
+      .groupBy("v1", "v2").reduce {
+        (e1, e2) =>
+          if (e1.d2 == 0) {
+            new EdgeWithDegrees(e1.v1, e1.d1, e1.v2, e2.d2)
+          } else {
+            new EdgeWithDegrees(e1.v1, e2.d1, e1.v2, e1.d2)
+          }
+      }.withForwardedFields("v1;v2")
+
+    // project edges by degrees, vertex with smaller degree comes first
+    val edgesByDegree = edgesWithDegrees
+      .map(e => if (e.d1 <= e.d2) Edge(e.v1, e.v2) else Edge(e.v2, e.v1))
+    // project edges by Id, vertex with smaller Id comes first
+    val edgesById = edgesByDegree
+      .map(e => if (e.v1 < e.v2) e else Edge(e.v2, e.v1))
+
+    val triangles = edgesByDegree
+      // build triads
+      .groupBy("v1").sortGroup("v2", Order.ASCENDING).reduceGroup(new TriadBuilder())
+      // filter triads
+      .join(edgesById).where("v2", "v3").equalTo("v1", "v2") { (t, _) => t}
+        .withForwardedFieldsFirst("*")
+
+    // emit result
+    if (fileOutput) {
+      triangles.writeAsCsv(outputPath, "\n", ",")
+      // execute program
+      env.execute("TriangleEnumeration Example")
+    } else {
+      triangles.print()
+    }
+
+  }
+
+  // *************************************************************************
+  //     USER DATA TYPES
+  // *************************************************************************
+
+  case class Edge(v1: Int, v2: Int) extends Serializable
+
+  case class Triad(v1: Int, v2: Int, v3: Int) extends Serializable
+
+  case class EdgeWithDegrees(v1: Int, d1: Int, v2: Int, d2: Int) extends Serializable
+
+
+  // *************************************************************************
+  //     USER FUNCTIONS
+  // *************************************************************************
+
+  /**
+   * Counts the number of edges that share a common vertex.
+   * Emits one edge for each input edge with a degree annotation for the shared vertex.
+   * For each emitted edge, the first vertex is the vertex with the smaller id.
+   */
+  class DegreeCounter extends GroupReduceFunction[Edge, EdgeWithDegrees] {
+
+    val vertices = mutable.MutableList[Integer]()
+    var groupVertex = 0
+
+    override def reduce(edges: java.lang.Iterable[Edge], out: Collector[EdgeWithDegrees]) = {
+
+      // empty vertex list
+      vertices.clear()
+
+      // collect all vertices
+      for (e <- edges.asScala) {
+        groupVertex = e.v1
+        if (!vertices.contains(e.v2) && e.v1 != e.v2) {
+          vertices += e.v2
+        }
+      }
+
+      // count vertices to obtain degree of groupVertex
+      val degree = vertices.length
+
+      // create and emit edges with degrees
+      for (v <- vertices) {
+        if (v < groupVertex) {
+          out.collect(new EdgeWithDegrees(v, 0, groupVertex, degree))
+        } else {
+          out.collect(new EdgeWithDegrees(groupVertex, degree, v, 0))
+        }
+      }
+    }
+  }
+
+  /**
+   * Builds triads (triples of vertices) from pairs of edges that share a vertex.
+   * The first vertex of a triad is the shared vertex, the second and third vertex are ordered by
+   * vertexId.
+   * Assumes that input edges share the first vertex and are in ascending order of the second
+   * vertex.
+   */
+  @ForwardedFields(Array("v1"))
+  class TriadBuilder extends GroupReduceFunction[Edge, Triad] {
+
+    val vertices = mutable.MutableList[Integer]()
+
+    override def reduce(edges: java.lang.Iterable[Edge], out: Collector[Triad]) = {
+
+      // clear vertex list
+      vertices.clear()
+
+      // build and emit triads
+      for (e <- edges.asScala) {
+        // combine vertex with all previously read vertices
+        for (v <- vertices) {
+          out.collect(Triad(e.v1, v, e.v2))
+        }
+        vertices += e.v2
+      }
+    }
+  }
+
+  // *************************************************************************
+  //     UTIL METHODS
+  // *************************************************************************
+
+  private def parseParameters(args: Array[String]): Boolean = {
+    if (args.length > 0) {
+      fileOutput = true
+      if (args.length == 2) {
+        edgePath = args(0)
+        outputPath = args(1)
+
+        true
+      } else {
+        System.err.println("Usage: EnumTriangleOpt <edge path> <result path>")
+
+        false
+      }
+    } else {
+      System.out.println("Executing Enum Triangles Optimized example with built-in default data.")
+      System.out.println("  Provide parameters to read input data from files.")
+      System.out.println("  See the documentation for the correct format of input files.")
+      System.out.println("  Usage: EnumTriangleBasic <edge path> <result path>")
+
+      true
+    }
+  }
+
+  private def getEdgeDataSet(env: ExecutionEnvironment): DataSet[Edge] = {
+    if (fileOutput) {
+      env.readCsvFile[Edge](
+        edgePath,
+        fieldDelimiter = " ",
+        includedFields = Array(0, 1))
+    } else {
+      val edges = EnumTrianglesData.EDGES.map {
+        case Array(v1, v2) => new Edge(v1.asInstanceOf[Int], v2.asInstanceOf[Int])}
+      env.fromCollection(edges)
+    }
+  }
+
+
+  private var fileOutput: Boolean = false
+  private var edgePath: String = null
+  private var outputPath: String = null
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/graph/PageRankBasic.scala
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/graph/PageRankBasic.scala b/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/graph/PageRankBasic.scala
new file mode 100644
index 0000000..e1d4af6
--- /dev/null
+++ b/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/graph/PageRankBasic.scala
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.examples.scala.graph
+
+import java.lang.Iterable
+
+import org.apache.flink.api.common.functions.GroupReduceFunction
+import org.apache.flink.api.scala._
+import org.apache.flink.examples.java.graph.util.PageRankData
+import org.apache.flink.api.java.aggregation.Aggregations.SUM
+
+import org.apache.flink.util.Collector
+
+import scala.collection.JavaConverters._
+
+/**
+ * A basic implementation of the Page Rank algorithm using a bulk iteration.
+ * 
+ * This implementation requires a set of pages and a set of directed links as input and works as
+ * follows.
+ *
+ * In each iteration, the rank of every page is evenly distributed to all pages it points to. Each
+ * page collects the partial ranks of all pages that point to it, sums them up, and applies a
+ * dampening factor to the sum. The result is the new rank of the page. A new iteration is started
+ * with the new ranks of all pages. This implementation terminates after a fixed number of
+ * iterations. This is the Wikipedia entry for the
+ * [[http://en.wikipedia.org/wiki/Page_rank Page Rank algorithm]]
+ * 
+ * Input files are plain text files and must be formatted as follows:
+ *
+ *  - Pages represented as an (long) ID separated by new-line characters.
+ *    For example `"1\n2\n12\n42\n63"` gives five pages with IDs 1, 2, 12, 42, and 63.
+ *  - Links are represented as pairs of page IDs which are separated by space  characters. Links
+ *    are separated by new-line characters.
+ *    For example `"1 2\n2 12\n1 12\n42 63"` gives four (directed) links (1)->(2), (2)->(12),
+ *    (1)->(12), and (42)->(63). For this simple implementation it is required that each page has
+ *    at least one incoming and one outgoing link (a page can point to itself).
+ *
+ * Usage:
+ * {{{
+ *   PageRankBasic <pages path> <links path> <output path> <num pages> <num iterations>
+ * }}}
+ *
+ * If no parameters are provided, the program is run with default data from
+ * [[org.apache.flink.examples.java.graph.util.PageRankData]] and 10 iterations.
+ * 
+ * This example shows how to use:
+ *
+ *  - Bulk Iterations
+ *  - Default Join
+ *  - Configure user-defined functions using constructor parameters.
+ *
+ */
+object PageRankBasic {
+
+  private final val DAMPENING_FACTOR: Double = 0.85
+  private final val EPSILON: Double = 0.0001
+
+  def main(args: Array[String]) {
+    if (!parseParameters(args)) {
+      return
+    }
+
+    // set up execution environment
+    val env = ExecutionEnvironment.getExecutionEnvironment
+
+    // read input data
+    val pages = getPagesDataSet(env)
+    val links = getLinksDataSet(env)
+
+    // assign initial ranks to pages
+    val pagesWithRanks = pages.map(p => Page(p, 1.0 / numPages)).withForwardedFields("*->pageId")
+
+    // build adjacency list from link input
+    val adjacencyLists = links
+      .groupBy("sourceId").reduceGroup( new GroupReduceFunction[Link, AdjacencyList] {
+        override def reduce(values: Iterable[Link], out: Collector[AdjacencyList]): Unit = {
+          var outputId = -1L
+          val outputList = values.asScala map { t => outputId = t.sourceId; t.targetId }
+          out.collect(new AdjacencyList(outputId, outputList.toArray))
+        }
+      })
+
+    // start iteration
+    val finalRanks = pagesWithRanks.iterateWithTermination(maxIterations) {
+      currentRanks =>
+        val newRanks = currentRanks
+          // distribute ranks to target pages
+          .join(adjacencyLists).where("pageId").equalTo("sourceId") {
+            (page, adjacent, out: Collector[Page]) =>
+              val targets = adjacent.targetIds
+              val len = targets.length
+              adjacent.targetIds foreach { t => out.collect(Page(t, page.rank /len )) }
+          }
+          // collect ranks and sum them up
+          .groupBy("pageId").aggregate(SUM, "rank")
+          // apply dampening factor
+          .map { p =>
+            Page(p.pageId, (p.rank * DAMPENING_FACTOR) + ((1 - DAMPENING_FACTOR) / numPages))
+          }.withForwardedFields("pageId")
+
+        // terminate if no rank update was significant
+        val termination = currentRanks.join(newRanks).where("pageId").equalTo("pageId") {
+          (current, next, out: Collector[Int]) =>
+            // check for significant update
+            if (math.abs(current.rank - next.rank) > EPSILON) out.collect(1)
+        }
+        (newRanks, termination)
+    }
+
+    val result = finalRanks
+
+    // emit result
+    if (fileOutput) {
+      result.writeAsCsv(outputPath, "\n", " ")
+      // execute program
+      env.execute("Basic PageRank Example")
+    } else {
+      result.print()
+    }
+  }
+
+  // *************************************************************************
+  //     USER TYPES
+  // *************************************************************************
+
+  case class Link(sourceId: Long, targetId: Long)
+
+  case class Page(pageId: Long, rank: Double)
+
+  case class AdjacencyList(sourceId: Long, targetIds: Array[Long])
+
+  // *************************************************************************
+  //     UTIL METHODS
+  // *************************************************************************
+
+  private def parseParameters(args: Array[String]): Boolean = {
+    if (args.length > 0) {
+      fileOutput = true
+      if (args.length == 5) {
+        pagesInputPath = args(0)
+        linksInputPath = args(1)
+        outputPath = args(2)
+        numPages = args(3).toLong
+        maxIterations = args(4).toInt
+
+        true
+      } else {
+        System.err.println("Usage: PageRankBasic <pages path> <links path> <output path> <num " +
+          "pages> <num iterations>")
+
+        false
+      }
+    } else {
+      System.out.println("Executing PageRank Basic example with default parameters and built-in " +
+        "default data.")
+      System.out.println("  Provide parameters to read input data from files.")
+      System.out.println("  See the documentation for the correct format of input files.")
+      System.out.println("  Usage: PageRankBasic <pages path> <links path> <output path> <num " +
+        "pages> <num iterations>")
+
+      numPages = PageRankData.getNumberOfPages
+
+      true
+    }
+  }
+
+  private def getPagesDataSet(env: ExecutionEnvironment): DataSet[Long] = {
+    if (fileOutput) {
+      env.readCsvFile[Tuple1[Long]](pagesInputPath, fieldDelimiter = " ", lineDelimiter = "\n")
+        .map(x => x._1)
+    } else {
+      env.generateSequence(1, 15)
+    }
+  }
+
+  private def getLinksDataSet(env: ExecutionEnvironment): DataSet[Link] = {
+    if (fileOutput) {
+      env.readCsvFile[Link](linksInputPath, fieldDelimiter = " ",
+        includedFields = Array(0, 1))
+    } else {
+      val edges = PageRankData.EDGES.map { case Array(v1, v2) => Link(v1.asInstanceOf[Long],
+        v2.asInstanceOf[Long])}
+      env.fromCollection(edges)
+    }
+  }
+
+  private var fileOutput: Boolean = false
+  private var pagesInputPath: String = null
+  private var linksInputPath: String = null
+  private var outputPath: String = null
+  private var numPages: Long = 0
+  private var maxIterations: Int = 10
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/graph/TransitiveClosureNaive.scala
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/graph/TransitiveClosureNaive.scala b/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/graph/TransitiveClosureNaive.scala
new file mode 100644
index 0000000..b7c0714
--- /dev/null
+++ b/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/graph/TransitiveClosureNaive.scala
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.examples.scala.graph
+
+import org.apache.flink.api.scala._
+import org.apache.flink.examples.java.graph.util.ConnectedComponentsData
+import org.apache.flink.util.Collector
+
+object  TransitiveClosureNaive {
+
+  def main (args: Array[String]): Unit = {
+    if (!parseParameters(args)) {
+      return
+    }
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+
+    val edges = getEdgesDataSet(env)
+
+    val paths = edges.iterateWithTermination(maxIterations) { prevPaths: DataSet[(Long, Long)] =>
+
+      val nextPaths = prevPaths
+        .join(edges)
+        .where(1).equalTo(0) {
+          (left, right) => (left._1,right._2)
+        }.withForwardedFieldsFirst("_1").withForwardedFieldsSecond("_2")
+        .union(prevPaths)
+        .groupBy(0, 1)
+        .reduce((l, r) => l).withForwardedFields("_1; _2")
+
+      val terminate = prevPaths
+        .coGroup(nextPaths)
+        .where(0).equalTo(0) {
+          (prev, next, out: Collector[(Long, Long)]) => {
+            val prevPaths = prev.toSet
+            for (n <- next)
+              if (!prevPaths.contains(n)) out.collect(n)
+          }
+      }.withForwardedFieldsSecond("*")
+      (nextPaths, terminate)
+    }
+
+    if (fileOutput) {
+      paths.writeAsCsv(outputPath, "\n", " ")
+      env.execute("Scala Transitive Closure Example")
+    } else {
+      paths.print()
+    }
+
+
+
+  }
+
+
+  private var fileOutput: Boolean = false
+  private var edgesPath: String = null
+  private var outputPath: String = null
+  private var maxIterations: Int = 10
+
+  private def parseParameters(programArguments: Array[String]): Boolean = {
+    if (programArguments.length > 0) {
+      fileOutput = true
+      if (programArguments.length == 3) {
+        edgesPath = programArguments(0)
+        outputPath = programArguments(1)
+        maxIterations = Integer.parseInt(programArguments(2))
+      }
+      else {
+        System.err.println("Usage: TransitiveClosure <edges path> <result path> <max number of " +
+          "iterations>")
+        return false
+      }
+    }
+    else {
+      System.out.println("Executing TransitiveClosure example with default parameters and " +
+        "built-in default data.")
+      System.out.println("  Provide parameters to read input data from files.")
+      System.out.println("  See the documentation for the correct format of input files.")
+      System.out.println("  Usage: TransitiveClosure <edges path> <result path> <max number of " +
+        "iterations>")
+    }
+    true
+  }
+
+  private def getEdgesDataSet(env: ExecutionEnvironment): DataSet[(Long, Long)] = {
+    if (fileOutput) {
+      env.readCsvFile[(Long, Long)](
+        edgesPath,
+        fieldDelimiter = " ",
+        includedFields = Array(0, 1))
+        .map { x => (x._1, x._2)}
+    }
+    else {
+      val edgeData = ConnectedComponentsData.EDGES map {
+        case Array(x, y) => (x.asInstanceOf[Long], y.asInstanceOf[Long])
+      }
+      env.fromCollection(edgeData)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/misc/PiEstimation.scala
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/misc/PiEstimation.scala b/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/misc/PiEstimation.scala
new file mode 100644
index 0000000..3453ee8
--- /dev/null
+++ b/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/misc/PiEstimation.scala
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.examples.scala.misc
+
+import org.apache.flink.api.scala._
+
+object PiEstimation {
+
+  def main(args: Array[String]) {
+
+    val numSamples: Long = if (args.length > 0) args(0).toLong else 1000000
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+
+    // count how many of the samples would randomly fall into
+    // the upper right quadrant of the unit circle
+    val count =
+      env.generateSequence(1, numSamples)
+        .map  { sample =>
+          val x = Math.random()
+          val y = Math.random()
+          if (x * x + y * y < 1) 1L else 0L
+        }
+        .reduce(_ + _)
+
+    // ratio of samples in upper right quadrant vs total samples gives surface of upper
+    // right quadrant, times 4 gives surface of whole unit circle, i.e. PI
+    val pi = count
+      .map ( _ * 4.0 / numSamples)
+
+    println("We estimate Pi to be:")
+
+    pi.print()
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/ml/LinearRegression.scala
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/ml/LinearRegression.scala b/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/ml/LinearRegression.scala
new file mode 100644
index 0000000..2a7b786
--- /dev/null
+++ b/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/ml/LinearRegression.scala
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.examples.scala.ml
+
+import org.apache.flink.api.common.functions._
+import org.apache.flink.api.scala._
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.examples.java.ml.util.LinearRegressionData
+
+import scala.collection.JavaConverters._
+
+/**
+ * This example implements a basic Linear Regression  to solve the y = theta0 + theta1*x problem
+ * using batch gradient descent algorithm.
+ *
+ * Linear Regression with BGD(batch gradient descent) algorithm is an iterative algorithm and
+ * works as follows:
+ *
+ * Giving a data set and target set, the BGD try to find out the best parameters for the data set
+ * to fit the target set.
+ * In each iteration, the algorithm computes the gradient of the cost function and use it to
+ * update all the parameters.
+ * The algorithm terminates after a fixed number of iterations (as in this implementation).
+ * With enough iteration, the algorithm can minimize the cost function and find the best parameters
+ * This is the Wikipedia entry for the
+ * [[http://en.wikipedia.org/wiki/Linear_regression Linear regression]] and
+ * [[http://en.wikipedia.org/wiki/Gradient_descent Gradient descent algorithm]].
+ *
+ * This implementation works on one-dimensional data and finds the best two-dimensional theta to
+ * fit the target.
+ *
+ * Input files are plain text files and must be formatted as follows:
+ *
+ *  - Data points are represented as two double values separated by a blank character. The first
+ *    one represent the X(the training data) and the second represent the Y(target). Data points are
+ *    separated by newline characters.
+ *    For example `"-0.02 -0.04\n5.3 10.6\n"`gives two data points
+ *    (x=-0.02, y=-0.04) and (x=5.3, y=10.6).
+ *
+ * This example shows how to use:
+ *
+ *  - Bulk iterations
+ *  - Broadcast variables in bulk iterations
+ */
+object LinearRegression {
+
+  def main(args: Array[String]) {
+    if (!parseParameters(args)) {
+      return
+    }
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val data = getDataSet(env)
+    val parameters = getParamsDataSet(env)
+
+    val result = parameters.iterate(numIterations) { currentParameters =>
+      val newParameters = data
+        .map(new SubUpdate).withBroadcastSet(currentParameters, "parameters")
+        .reduce { (p1, p2) =>
+          val result = p1._1 + p2._1
+          (result, p1._2 + p2._2)
+        }
+        .map { x => x._1.div(x._2) }
+      newParameters
+    }
+
+    if (fileOutput) {
+      result.writeAsText(outputPath)
+      env.execute("Scala Linear Regression example")
+    }
+    else {
+      result.print()
+    }
+  }
+
+  /**
+   * A simple data sample, x means the input, and y means the target.
+   */
+  case class Data(var x: Double, var y: Double)
+
+  /**
+   * A set of parameters -- theta0, theta1.
+   */
+  case class Params(theta0: Double, theta1: Double) {
+    def div(a: Int): Params = {
+      Params(theta0 / a, theta1 / a)
+    }
+
+    def + (other: Params) = {
+      Params(theta0 + other.theta0, theta1 + other.theta1)
+    }
+  }
+
+  // *************************************************************************
+  //     USER FUNCTIONS
+  // *************************************************************************
+
+  /**
+   * Compute a single BGD type update for every parameters.
+   */
+  class SubUpdate extends RichMapFunction[Data, (Params, Int)] {
+
+    private var parameter: Params = null
+
+    /** Reads the parameters from a broadcast variable into a collection. */
+    override def open(parameters: Configuration) {
+      val parameters = getRuntimeContext.getBroadcastVariable[Params]("parameters").asScala
+      parameter = parameters.head
+    }
+
+    def map(in: Data): (Params, Int) = {
+      val theta0 =
+        parameter.theta0 - 0.01 * ((parameter.theta0 + (parameter.theta1 * in.x)) - in.y)
+      val theta1 =
+        parameter.theta1 - 0.01 * (((parameter.theta0 + (parameter.theta1 * in.x)) - in.y) * in.x)
+      (Params(theta0, theta1), 1)
+    }
+  }
+
+  // *************************************************************************
+  //     UTIL METHODS
+  // *************************************************************************
+  private var fileOutput: Boolean = false
+  private var dataPath: String = null
+  private var outputPath: String = null
+  private var numIterations: Int = 10
+
+  private def parseParameters(programArguments: Array[String]): Boolean = {
+    if (programArguments.length > 0) {
+      fileOutput = true
+      if (programArguments.length == 3) {
+        dataPath = programArguments(0)
+        outputPath = programArguments(1)
+        numIterations = programArguments(2).toInt
+
+        true
+      }
+      else {
+        System.err.println("Usage: LinearRegression <data path> <result path> <num iterations>")
+
+        false
+      }
+    }
+    else {
+      System.out.println("Executing Linear Regression example with default parameters and " +
+        "built-in default data.")
+      System.out.println("  Provide parameters to read input data from files.")
+      System.out.println("  See the documentation for the correct format of input files.")
+      System.out.println("  We provide a data generator to create synthetic input files for this " +
+        "program.")
+      System.out.println("  Usage: LinearRegression <data path> <result path> <num iterations>")
+
+      true
+    }
+  }
+
+  private def getDataSet(env: ExecutionEnvironment): DataSet[Data] = {
+    if (fileOutput) {
+      env.readCsvFile[(Double, Double)](
+        dataPath,
+        fieldDelimiter = " ",
+        includedFields = Array(0, 1))
+        .map { t => new Data(t._1, t._2) }
+    }
+    else {
+      val data = LinearRegressionData.DATA map {
+        case Array(x, y) => Data(x.asInstanceOf[Double], y.asInstanceOf[Double])
+      }
+      env.fromCollection(data)
+    }
+  }
+
+  private def getParamsDataSet(env: ExecutionEnvironment): DataSet[Params] = {
+    val params = LinearRegressionData.PARAMS map {
+      case Array(x, y) => Params(x.asInstanceOf[Double], y.asInstanceOf[Double])
+    }
+    env.fromCollection(params)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/relational/TPCHQuery10.scala
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/relational/TPCHQuery10.scala b/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/relational/TPCHQuery10.scala
new file mode 100644
index 0000000..9d4d2ee
--- /dev/null
+++ b/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/relational/TPCHQuery10.scala
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.examples.scala.relational
+
+import org.apache.flink.api.scala._
+
+import org.apache.flink.api.java.aggregation.Aggregations
+
+/**
+ * This program implements a modified version of the TPC-H query 10. 
+ * 
+ * The original query can be found at
+ * [http://www.tpc.org/tpch/spec/tpch2.16.0.pdf](http://www.tpc.org/tpch/spec/tpch2.16.0.pdf)
+ * (page 45).
+ *
+ * This program implements the following SQL equivalent:
+ *
+ * {{{
+ * SELECT 
+ *        c_custkey,
+ *        c_name, 
+ *        c_address,
+ *        n_name, 
+ *        c_acctbal
+ *        SUM(l_extendedprice * (1 - l_discount)) AS revenue,  
+ * FROM   
+ *        customer, 
+ *        orders, 
+ *        lineitem, 
+ *        nation 
+ * WHERE 
+ *        c_custkey = o_custkey 
+ *        AND l_orderkey = o_orderkey 
+ *        AND YEAR(o_orderdate) > '1990' 
+ *        AND l_returnflag = 'R' 
+ *        AND c_nationkey = n_nationkey 
+ * GROUP BY 
+ *        c_custkey, 
+ *        c_name, 
+ *        c_acctbal, 
+ *        n_name, 
+ *        c_address
+ * }}}
+ *
+ * Compared to the original TPC-H query this version does not print 
+ * c_phone and c_comment, only filters by years greater than 1990 instead of
+ * a period of 3 months, and does not sort the result by revenue..
+ *
+ * Input files are plain text CSV files using the pipe character ('|') as field separator 
+ * as generated by the TPC-H data generator which is available at 
+ * [http://www.tpc.org/tpch/](a href="http://www.tpc.org/tpch/).
+ *
+ * Usage: 
+ * {{{
+ *TPCHQuery10 <customer-csv path> <orders-csv path> <lineitem-csv path> <nation path> <result path>
+ * }}}
+ *  
+ * This example shows how to use:
+ *  - tuple data types
+ *  - build-in aggregation functions
+ *  - join with size hints
+ *  
+ */
+object TPCHQuery10 {
+
+  def main(args: Array[String]) {
+    if (!parseParameters(args)) {
+      return
+    }
+
+    // get execution environment
+    val env = ExecutionEnvironment.getExecutionEnvironment
+
+    // get customer data set: (custkey, name, address, nationkey, acctbal) 
+    val customers = getCustomerDataSet(env)
+    // get orders data set: (orderkey, custkey, orderdate)
+    val orders = getOrdersDataSet(env)
+    // get lineitem data set: (orderkey, extendedprice, discount, returnflag)
+    val lineitems = getLineitemDataSet(env)
+    // get nation data set: (nationkey, name)    
+    val nations = getNationDataSet(env)
+
+    // filter orders by years
+    val orders1990 = orders.filter( o => o._3.substring(0,4).toInt > 1990)
+                           .map( o => (o._1, o._2))
+    
+    // filter lineitems by return status
+    val lineitemsReturn = lineitems.filter( l => l._4.equals("R"))
+                                   .map( l => (l._1, l._2 * (1 - l._3)) )
+
+    // compute revenue by customer
+    val revenueByCustomer = orders1990.joinWithHuge(lineitemsReturn).where(0).equalTo(0)
+                                        .apply( (o,l) => (o._2, l._2) )
+                                      .groupBy(0)
+                                      .aggregate(Aggregations.SUM, 1)
+
+    // compute final result by joining customer and nation information with revenue
+    val result = customers.joinWithTiny(nations).where(3).equalTo(0)
+                            .apply( (c, n) => (c._1, c._2, c._3, n._2, c._5) )
+                          .join(revenueByCustomer).where(0).equalTo(0)
+                            .apply( (c, r) => (c._1, c._2, c._3, c._4, c._5, r._2) )
+    // emit result
+    result.writeAsCsv(outputPath, "\n", "|")
+
+    // execute program
+    env.execute("Scala TPCH Query 10 Example")
+  }
+  
+  
+  // *************************************************************************
+  //     UTIL METHODS
+  // *************************************************************************
+  
+  private var customerPath: String = null
+  private var ordersPath: String = null
+  private var lineitemPath: String = null
+  private var nationPath: String = null
+  private var outputPath: String = null
+
+  private def parseParameters(args: Array[String]): Boolean = {
+    if (args.length == 5) {
+      customerPath = args(0)
+      ordersPath = args(1)
+      lineitemPath = args(2)
+      nationPath = args(3)
+      outputPath = args(4)
+      true
+    } else {
+      System.err.println("This program expects data from the TPC-H benchmark as input data.\n" +
+          "  Due to legal restrictions, we can not ship generated data.\n" +
+          "  You can find the TPC-H data generator at http://www.tpc.org/tpch/.\n" +
+          "  Usage: TPCHQuery10 <customer-csv path> <orders-csv path> " + 
+                                "<lineitem-csv path> <nation-csv path> <result path>")
+      false
+    }
+  }
+  
+  private def getCustomerDataSet(env: ExecutionEnvironment): 
+                         DataSet[Tuple5[Int, String, String, Int, Double]] = {
+    env.readCsvFile[Tuple5[Int, String, String, Int, Double]](
+        customerPath,
+        fieldDelimiter = "|",
+        includedFields = Array(0,1,2,3,5) )
+  }
+  
+  private def getOrdersDataSet(env: ExecutionEnvironment): DataSet[Tuple3[Int, Int, String]] = {
+    env.readCsvFile[Tuple3[Int, Int, String]](
+        ordersPath,
+        fieldDelimiter = "|",
+        includedFields = Array(0, 1, 4) )
+  }
+  
+  private def getLineitemDataSet(env: ExecutionEnvironment):
+                         DataSet[Tuple4[Int, Double, Double, String]] = {
+    env.readCsvFile[Tuple4[Int, Double, Double, String]](
+        lineitemPath,
+        fieldDelimiter = "|",
+        includedFields = Array(0, 5, 6, 8) )
+  }
+
+  private def getNationDataSet(env: ExecutionEnvironment): DataSet[Tuple2[Int, String]] = {
+    env.readCsvFile[Tuple2[Int, String]](
+        nationPath,
+        fieldDelimiter = "|",
+        includedFields = Array(0, 1) )
+  }
+  
+}


Mime
View raw message