flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aljos...@apache.org
Subject [51/60] Rewrite Java API Guide as Unified Programming Guide
Date Mon, 22 Sep 2014 12:29:33 GMT
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/97d630d5/docs/scala_api_guide.md
----------------------------------------------------------------------
diff --git a/docs/scala_api_guide.md b/docs/scala_api_guide.md
deleted file mode 100644
index e46a898..0000000
--- a/docs/scala_api_guide.md
+++ /dev/null
@@ -1,1042 +0,0 @@
----
-title: "Scala API Programming Guide"
----
-
-<section id="top">
-Scala Programming Guide
-=======================
-
-This guide explains how to develop Flink programs with the Scala
-programming interface. 
-
-Here we will look at the general structure of a Scala job. You will learn how to
-write data sources, data sinks, and operators to create data flows that can be
-executed using the Flink system.
-
-Writing Scala jobs requires an understanding of Scala, there is excellent
-documentation available [here](http://scala-lang.org/documentation/). Most
-of the examples can be understood by someone with a good understanding
-of programming in general, though.
-
-[Back to top](#top)
-
-<section id="intro-example">
-Word Count Example
-------------------
-
-To start, let's look at a Word Count job implemented in Scala. This program is
-very simple but it will give you a basic idea of what a Scala job looks like.
-
-```scala
-import org.apache.flinkclient.LocalExecutor
-
-import org.apache.flinkapi.scala._
-import org.apache.flinkapi.scala.operators._
-
-object WordCount {
-  def main(args: Array[String]) {
-    val input = TextFile(textInput)
-
-    val words = input.flatMap { _.split(" ") map { (_, 1) } }
-
-    val counts = words.groupBy { case (word, _) => word }
-      .reduce { (w1, w2) => (w1._1, w1._2 + w2._2) }
-
-    val output = counts.write(wordsOutput, CsvOutputFormat())
-    val plan = new ScalaPlan(Seq(output))
-
-    LocalExecutor.execute(plan)
-  }
-}
-``` 
-
-Same as any Flink job a Scala job consists of one or several data
-sources, one or several data sinks and operators in between these that transform
-data. Together these parts are referred to as the data flow graph. It dictates
-the way data is passed when a job is executed.
-
-When using Scala in Flink an important concept to grasp is that of the
-`DataSet`. `DataSet` is an abstract concept that represents actual data sets at
-runtime and which has operations that transform data to create a new transformed
-data set. In this example the `TextFile("/some/input")` call creates a
-`DataSet[String]` that represents the lines of text from the input. The
-`flatMap` operation that looks like a regular Scala flatMap is in fact an
-operation on `DataSet` that passes (at runtime) the data items through the
-provided anonymous function to transform them. The result of the `flatMap`
-operation is a new `DataSet` that represents the transformed data. On this other
-operations be performed. Another such operation are `groupBy` and `reduce`, but
-we will go into details of those later in this guide.
-
-The `write` operation of `DataSet` is used to create a data sink. You provide it
-with a path where the data is to be written to and an output format. This is
-enough for now but we will discuss data formats (for sources and sinks) later.
-
-To execute a data flow graph one or several sinks have to wrapped in a `Plan`
-which can then be executed on a cluster using `RemoteExecutor`. Here, the
-`LocalExecutor` is used to run the flow on the local computer. This is useful
-for debugging your job before running it on an actual cluster.
-
-[Back to top](#top)
-
-<section id="intro-example">
-Project Setup
--------------
-
-We will only cover maven here but the concepts should work equivalently with
-other build systems such as Gradle or sbt. When wanting to develop a Scala job
-all that is needed as dependency is is `flink-scala` (and `flink-clients`, if
-you want to execute your jobs). So all that needs to be done is to add the
-following lines to your POM.
-
-
-```xml
-<dependencies>
-  <dependency>
-    <groupId>org.apache.flink</groupId>
-    <artifactId>flink-scala</artifactId>
-    <version>{{site.FLINK_VERSION_STABLE}}</version>
-  </dependency>
-  <dependency>
-    <groupId>org.apache.flink</groupId>
-    <artifactId>flink-clients</artifactId>
-    <version>{{site.FLINK_VERSION_STABLE}}</version>
-  </dependency>
-</dependencies>
-```
-
-To quickly get started you can use the Flink Scala quickstart available
-[here](scala_api_quickstart.html). This will give you a
-completeMaven project with some working example code that you can use to explore
-the system or as basis for your own projects.
-
-These imports are normally enough for any project:
-
-```scala
-import org.apache.flinkapi.scala._
-import org.apache.flinkapi.scala.operators._
-
-import org.apache.flinkclient.LocalExecutor
-import org.apache.flinkclient.RemoteExecutor
-```
-
-The first two imports contain things like `DataSet`, `Plan`, data sources, data
-sinks, and the operations. The last two imports are required if you want to run
-a data flow on your local machine, respectively cluster.
-
-[Back to top](#top)
-
-<section id="dataset">
-The DataSet Abstraction
------------------------
-
-As already alluded to in the introductory example you write Scala jobs by using
-operations on a `DataSet` to create new transformed `DataSet`. This concept is
-the core of the Flink Scala API so it merits some more explanation. A
-`DataSet` can look and behave like a regular Scala collection in your code but
-it does not contain any actual data but only represents data. For example: when
-you use `TextFile()` you get back a `DataSource[String]` that represents each
-line of text in the input as a `String`. No data is actually loaded or available
-at this point. The set is only used to apply further operations which themselves
-are not executed until the data flow is executed. An operation on `DataSet`
-creates a new `DataSet` that represents the transformation and has a pointer to
-the `DataSet` that represents the data to be transformed. In this way a tree of
-data sets is created that contains both the specification of the flow of data as
-well as all the transformations. This graph can be wrapped in a `Plan` and
-executed.
-
-Working with the system is like working with lazy collections, where execution
-is postponed until the user submits the job.
-
-`DataSet` has a generic parameter, this is the type of each data item or record
-that would be processed by further transformations. This is similar to how
-`List[A]` in Scala would behave. For example in:
-
-```scala
-val input: DataSet[(String, Int)] = ...
-val mapped = input map { a => (a._1, a._2 + 1)}
-```
-
-The anonymous function would retrieve in `a` tuples of type `(String, Int)`.
-
-[Back to top](#top)
-
-<section id="datatypes">
-Data Types
-----------
-
-There are some restrictions regarding the data types that can be used in Scala
-jobs (basically the generic parameter of `DataSet`). The usable types are
-the primitive Scala types, case classes (which includes tuples), and custom
-data types.
-
-Custom data types must implement the interface
-{% gh_link /flink-core/src/main/java/org/apache/flink/types/Value.java "Value" %}.
-For custom data types that should also be used as a grouping key or join key
-the {% gh_link /flink-core/src/main/java/org/apache/flink/types/Key.java "Key" %}
-interface must be implemented.
-
-[Back to top](#top)
-
-<section id="data-sources">
-Creating Data Sources
----------------------
-
-To get an initial `DataSet` on which to perform operations to build a data flow
-graph the following construct is used:
-
-```scala
-val input = DataSource("<file-path>", <input-format>)
-```
-
-The value `input` is now a `DataSet` with the generic type depending on the
-input format.
-
-The file path can be on of either `file:///some/file` to acces files on the
-local machine or `hdfs://some/path` to read files from HDFS. The input
-format can be one of our builtin formats or a custom input format. The builtin
-formats are:
-
-* [TextInputFormat](#text-input-format)
-* [CsvInputFormat](#csv-input-format)
-* [DelimitedInputFormat](#delimited-input-format)
-* [BinaryInputFormat](#binary-input-format)
-* [BinarySerializedInputFormat](#binary-serialized-input-format)
-* [FixedLengthInputFormat](#fixed-length-input-format)
-
-We will now have a look at each of them and show how they are employed and in
-which situations.
-
-[Back to top](#top)
-
-<section id="text-input-format">
-#### TextInputFormat
-
-This input format simply reads a text file line wise and creates a `String`
-for each line. It is used as:
-
-```scala
-TextInputFormat()
-```
-
-As you have already seen in the Word Count Example there is a shortcut for this.
-Instead of using a `DataSource` with `TextInputFormat` you can simply write:
-
-```scala
-val input = TextFile("<file-path>")
-```
-
-The `input` would then be a `DataSet[String]`.
-
-[Back to top](#top)
-
-<section id="csv-input-format">
-#### CsvInputFormat
-
-This input format is mainly used to read Csv-Files, as the name suggests. Input
-files must be text files. You can specify the `String` that should be used
-as the separator between individual records (this would often be newline) and
-also the separator between fields of a record (this would often be a comma).
-The `CsvInputFormat` will automatically read the records and create
-Scala tuples or custom case class objects for you. The format can be used
-in one of the following ways:
-
-```scala
-CsvInputFormat[Out]()
-CsvInputFormat[Out](recordDelim: String)
-CsvInputFormat[Out](recordDelim: String, fieldDelim: Char)
-
-CsvInputFormat[Out](fieldIndices: Seq[Int])
-CsvInputFormat[Out](fieldIndices: Seq[Int], recordDelim: String)
-CsvInputFormat[Out](fieldIndices: Seq[Int], recordDelim: String, fieldDelim: Char)
-```
-
-The default record delimiter is a newline, the default field delimiter is a
-comma. The type parameter `Out` must be a case class type, which also includes
-tuple types since they are internally case classes.
-
-Normally, all the fields of a record are read. If you want to explicitly
-specify which fields of the record should be read you can use one of the
-tree variants with a `fieldIndices` parameter. Here you give a list
-of the fields that should be read. Field indices start from zero.
-
-An example usage could look as follows:
-
-```scala
-val input = DataSource("file:///some/file", CsvInputFormat[(Int, Int, String)](Seq(1, 17,
42), "\n", ','))
-```
-
-Here only the specified fields would be read and 3-tuples created for you.
-The type of input would be `DataSet[(Int, Int, String)]`.
-
-[Back to top](#top)
-
-<section id="delimited-input-format">
-#### DelimitedInputFormat
-
-This input format is meant for textual records that are separated by
-some delimiter. The delimiter could be a newline, for example. It is used like
-this:
-
-```scala
-DelimitedInputFormat[Out](parseFunction: String => Out, delim: String = "\n")
-```
-
-The input files will be split on the supplied delimiter (or the default newline)
-and the supplied parse function must parse the textual representation in the
-`String` and return an object. The type of this object will then also be the
-type of the `DataSet` created by the `DataSource` operation.
-
-Just as with `BinaryInputFormat` the function can be an anonymous function, so
-you could have:
-
-```scala
-val input = DataSource("file:///some/file", BinaryInputFormat( { line =>
-  line match {
-    case EdgeInputPattern(from, to) => Path(from.toInt, to.toInt, 1)
-  }
-}))
-```
-
-In this example EdgeInputPattern is some regular expression used for parsing
-a line of text and `Path` is a custom case class that is used to represent
-the data. The type of input would in this case be `DataSet[Path]`.
-
-[Back to top](#top)
-
-<section id="binary-input-format">
-#### BinaryInputFormat
-
-This input format is best used when you have a custom binary format that
-you store the data in. It is created using one of the following:
-
-```scala
-BinaryInputFormat[Out](readFunction: DataInput => Out)
-BinaryInputFormat[Out](readFunction: DataInput => Out, blocksize: Long)
-```
-
-So you have to provide a function that gets a
-[java.io.DataInput](http://docs.oracle.com/javase/7/docs/api/java/io/DataInput.html)
-and returns the object that
-contains the data. The type of this object will then also be the type of the
-`DataSet` created by the `DataSource` operation.
-
-The provided function can also be an anonymous function, so you could
-have something like this:
-
-```scala
-val input = DataSource("file:///some/file", BinaryInputFormat( { input =>
-  val one = input.readInt
-  val two = input.readDouble
-  (one, two)  
-}))
-```
-
-Here `input` would be of type `DataSet[(Int, Double)]`.
-
-[Back to top](#top)
-
-<section id="binary-serialized-input-format">
-#### BinarySerializedInputFormat
-
-This input format is only meant to be used in conjunction with
-`BinarySerializedOutputFormat`. You can use these to write elements to files using a
-Flink-internal format that can efficiently be read again. You should only
-use this when output is only meant to be consumed by other Flink jobs.
-The format can be used on one of two ways:
-
-```scala
-BinarySerializedInputFormat[Out]()
-BinarySerializedInputFormat[Out](blocksize: Long)
-```
-
-So if input files contain elements of type `(String, Int)` (a tuple type) you
-could use:
-
-```scala
-val input = DataSource("file:///some/file", BinarySerializedInputFormat[(String, Int)]())
-```
-[Back to top](#top)
-
-<section id="fixed-length-input-format">
-#### FixedLengthInputFormat
-
-This input format is for cases where you want to read binary blocks
-of a fixed size. The size of a block must be specified and you must
-provide code that reads elements from a byte array.
-
-The format is used like this:
-
-```scala
-FixedLengthInputFormat[Out](readFunction: (Array[Byte], Int) => Out, recordLength: Int)
-```
-
-The specified function gets an array and a position at which it must start
-reading the array and returns the element read from the binary data.
-
-[Back to top](#top)
-
-<section id="operations">
-Operations on DataSet
----------------------
-
-As explained in [Java API](java_api_guide.html#transformations),
-a Flink job is a graph of operators that process data coming from
-sources that is finally written to sinks. When you use the Scala front end
-these operators as well as the graph is created behind the scenes. For example,
-when you write code like this:
-
-```scala
-val input = TextFile("file:///some/file")
-
-val words = input.map { x => (x, 1) }
-
-val output = counts.write(words, CsvOutputFormat()))
-
-val plan = new ScalaPlan(Seq(output))
-```
-
-What you get is a graph that has a data source, a map operator (that contains
-the code written inside the anonymous function block), and a data sink. You 
-do not have to know about this to be able to use the Scala front end but
-it helps to remember, that when you are using Scala you are building
-a data flow graph that processes data only when executed.
-
-There are operations on `DataSet` that correspond to all the types of operators
-that the Flink system supports. We will shortly go trough all of them with
-some examples.
-
-[Back to top](#top)
-
-<section id="operator-templates">
-#### Basic Operator Templates
-
-Most of the operations have three similar versions and we will
-explain them here for all of the operators together. The three versions are `map`,
-`flatMap`, and `filter`. All of them accept an anonymous function that
-defines what the operation does but the semantics are different.
-
-The `map` version is a simple one to one mapping. Take a look at the following
-code:
-
-```scala
-val input: DataSet[(String, Int)]
-
-val mapped = input.map { x => (x._1, x._2 + 3) }
-```
-
-This defines a map operator that operates on tuples of String and Int and just
-adds three to the Int (the second fields of the tuple). So, if the input set had
-the tuples (a, 1), (b, 2), and (c, 3) the result after the operator would be
-(a, 4), (b, 5), and (c, 6).
-
-The `flatMap` version works a bit differently,
-here you return something iterable from the anonymous function. The iterable
-could be a list or an array. The elements in this iterable are unnested.
-So for every element in the input data you get a list of elements. The
-concatenation of those is the result of the operator. If you had
-the following code:
-
-```scala
-val input: DataSet[(String, Int)]
-
-val mapped = input.flatMap { x => List( (x._1, x._2), (x._1, x._2 + 1) ) }
-```
-
-and as input the tuples (a, 1) and (b, 1) you would get (a, 1), (a, 2), (b, 1),
-and (b, 2) as result. It is one flat list, and not the individual lists returned
-from the anonymous function.
-
-The third template is `filter`. Here you give an anonymous function that
-returns a Boolean. The elements for which this Boolean is true are part of the
-result of the operation, the others are culled. An example for a filter is this
-code:
-
-
-```scala
-val input: DataSet[(String, Int)]
-
-val mapped = input.filter { x => x._2 >= 3 }
-```
-[Back to top](#top)
-
-<section id="key-selectors">
-#### Field/Key Selectors
-
-For some operations (group, join, and cogroup) it is necessary to specify which
-parts of a data type are to be considered the key. This key is used for grouping
-elements together for reduce and for joining in case of a join or cogroup.
-In Scala the key is specified using a special anonymous function called
-a key selector. The key selector has as input an element of the type of
-the `DataSet` and must return a single value or a tuple of values that should
-be considered the key. This will become clear with some examples: (Note that
-we use the reduce operation here as an example, we will have a look at
-that further down):
-
-```scala
-val input: DataSet[(String, Int)]
-val reduced = input groupBy { x => (x._1) } reduce { ... }
-val reduced2 = input groupBy { case (w, c) => w } reduce { ... }
-
-case class Test(a: String, b: Int, c: Int)
-val input2: DataSet[Test]
-val reduced3 = input2 groupBy { x => (x.a, x.b) } reduce { ... }
-val reduced4 = input2 groupBy { case Test(x,y,z) => (x,y) } reduce { ... }
-```
-
-The anonymous function block passed to `groupBy` is the key selector. The first
-two examples both specify the `String` field of the tuple as key. In the second
-set of examples we see a custom case class and here we select the first two
-fields as a compound key.
-
-It is worth noting that the key selector function is not actually executed 
-at runtime but it is parsed at job creation time where the key information is
-extracted and stored for efficient computation at runtime.
-
-#### Map Operation
-
-Map is an operation that gets one element at a time and can output one or
-several elements. The operations that result in a `MapOperator` in the graph are exactly
-those mentioned in the previous section. For completeness' sake we will mention
-their signatures here (in this and the following such lists `In` is the
-type of the input data set, `DataSet[In]`):
-
-```scala
-def map[Out](fun: In => Out): DataSet[Out]
-def flatMap[Out](fun: In => Iterator[Out]): DataSet[Out]
-def filter(fun: In => Boolean): DataSet[Out]
-```
-
-#### Reduce Operation
-
-Reduce is an operation that looks
-at groups of elements at a time and can, for one group, output one or several
-elements. To specify how elements should be grouped you need to give
-a key selection function, as explained [above](#key-selectors).
-
-The basic template of the reduce operation is:
-
-```scala
-input groupBy { <key selector> } reduce { <reduce function> }
-```
-
-The signature of the reduce function depends on the variety of reduce operation
-selected. There are right now three different versions:
-
-```scala
-def reduce(fun: (In, In) => In): DataSet[In]
-
-def reduceGroup[Out](fun: Iterator[In] => Out): DataSet[Out]
-def combinableReduceGroup(fun: Iterator[In] => In): DataSet[In]
-```
-
-The `reduce` variant is like a `reduceLeft` on a Scala collection with
-the limitation that the output data type must be the same as the input data
-type. You specify how to elements of the selection should be combined,
-this is then used to reduce the elements in one group (of the same key)
-down to one element. This can be used to implement aggregation operators,
-for example:
-
-```scala
-val words: DataSet[(String, Int)]
-val counts = words.groupBy { case (word, count) => word}
-  .reduce { (w1, w1) => (w1._1, w1._2 + w2._2) }
-```
-
-This would add up the Int fields of those tuples that have the same String
-in the first fields. As is for example required in Word Count.
-
-The `reduceGroup` variant can be used when more control is required. Here
-your reduce function gets an `Iterator` that can be used to iterate over
-all the elements in a group. With this type or reduce operation the
-output data type can be different from the input data type. An example
-of this kind of operation is this:
-
-```scala
-val words: DataSet[(String, Int)]
-val minCounts = words.groupBy { case (word, count) => word}
-  .reduceGroup { words => words.minBy { _._2 } }
-```
-
-Here we use the minBy function of Scala collections to determine the
-element with the minimum count in a group.
-
-The `combinableGroupReduce` works like the `groupReduce` with the difference
-that the reduce operation is combinable. This is an optimization one can use,
-please have a look at [Programming Model](pmodel.html "Programming Model") for
-the details.
-
-#### Join Operation
-
-The join operation is similar to a database equi-join. It is a two input
-iteration where you have to specify a key selector for each of the inputs
-and then the anonymous function is called for every pair of matching
-elements from the two input sides.
-
-The basic template is:
-
-```scala
-input1 join input2 where { <key selector 1> } isEqualTo { <key selector 2>} map
{ <join function> }
-```
-
-or, because lines will get to long fast:
-```scala
-input1.join(input2)
-  .where { <key selector 1> }
-  .isEqualTo { <key selector 2>}
-  .map { <join function> }
-```
-
-(Scala can sometimes be quite finicky about where you can omit dots and
-parentheses, so it's best to use dots in multi-line code like this.)
-
-As mentioned in [here](#operator-templates) there are three versions of
-this operator, so you can use one of these in the last position:
-
-```scala
-def map[Out](fun: (LeftIn, RightIn) => Out): DataSet[Out]
-def flatMap[Out](fun: (LeftIn, RightIn) => Iterator[Out]): DataSet[Out]
-def filter(fun: (LeftIn, RightIn) => Boolean): DataSet[(LeftIn, RightIn)]
-```
-
-One example where this can be used is database-style joining with projection:
-
-```scala
-input1.join(input2)
-  .where { case (a, b, c) => (a, b) }
-  .isEqualTo { case (a, b, c, d) => (c, d) }
-  .map { (left, right) => (left._3, right._1) }
-```
-
-Here the join key for the left input is a compound of the first two tuple fields
-while the key for the second input is a compound of the last two fields. We then
-pick one field each from both sides as the result of the operation.
-
-#### CoGroup Operation
-
-The cogroup operation is a cross between join and reduce. It has two inputs
-and you have to specify a key selector for each of them. This is where the
-similarities with join stop. Instead of having one invocation of your user
-code per pair of matching elements all elements from the left and from the right
-are grouped together for one single invocation. In your function you get
-an `Iterator` for the elements from the left input and another `Iterator`
-for the elements from the right input.
-
-The basic template is:
-
-```scala
-input1 cogroup input2 where { <key selector 1> } isEqualTo { <key selector 2>}
map { <cogroup function> }
-```
-
-or, because lines will get to long fast:
-```scala
-input1.cogroup(input2)
-  .where { <key selector 1> }
-  .isEqualTo { <key selector 2>}
-  .map { <cogroup function> }
-```
-
-There are to variants you can use, with the semantics explained
-[here](#operator-templates).
-
-```scala
-def map[Out](fun: (Iterator[LeftIn], Iterator[RightIn]) => Out): DataSet[Out]
-def flatMap[Out](fun: (Iterator[LeftIn], Iterator[RightIn]) => Iterator[Out]): DataSet[Out]
-```
-
-#### Cross Operation
-
-The cross operation is used to form the Cartesian product of the elements
-from two inputs. The basic template is:
-
-```scala
-input1 cross input2 map { <cogroup function> }
-```
-
-Again there are three variants, with the semantics explained
-[here](#operator-templates).
-
-```scala
-def map[Out](fun: (LeftIn, RightIn) => Out): DataSet[Out]
-def flatMap[Out](fun: (LeftIn, RightIn) => Iterator[Out]): DataSet[Out]
-def filter(fun: (LeftIn, RightIn) => Boolean): DataSet[(LeftIn, RightIn)]
-```
-
-#### Union
-
-When you want to have the combination of several data sets as the input of
-an operation you can use a union to combine them. It is used like this
-
-```scala
-val input1: DataSet[String]
-val input2: DataSet[String]
-val unioned = input1.union(input2)
-```
-
-The signature of union is:
-
-```scala
-def union(secondInput: DataSet[A])
-```
-
-Where `A` is the generic type of the `DataSet` on which you execute the `union`.
-
-[Back to top](#top)
-
-<section id="iterations">
-Iterations
-----------
-
-Iterations allow you to implement *loops* in Flink programs.
-[This page](iterations.html) gives a
-general introduction to iterations. This section here provides quick examples
-of how to use the concepts using the Scala API.
-The iteration operators encapsulate a part of the program and execute it
-repeatedly, feeding back the result of one iteration (the partial solution) into
-the next iteration. Flink has two different types of iterations,
-*Bulk Iteration* and *Delta Iteration*.
-
-For both types of iterations you provide the iteration body as a function
-that has data sets as input and returns a new data set. The difference is
-that bulk iterations map from one data set two one new data set while
-delta iterations map two data sets to two new data sets.
-
-#### Bulk Iteration
-
-The signature of the bulk iterate method is this:
-
-```scala
-def iterate(n: Int, stepFunction: DataSet[A] => DataSet[A])
-```
-
-where `A` is the type of the `DataSet` on which `iterate` is called. The number
-of steps is given in `n`. This is how you use it in practice:
-
-```scala
-val dataPoints = DataSource(dataPointInput, DelimitedInputFormat(parseInput))
-val clusterPoints = DataSource(clusterInput, DelimitedInputFormat(parseInput))
-
-def kMeansStep(centers: DataSet[(Int, Point)]) = {
-
-  val distances = dataPoints cross centers map computeDistance
-  val nearestCenters = distances.groupBy { case (pid, _) => pid }
-    .reduceGroup { ds => ds.minBy(_._2.distance) } map asPointSum.tupled
-  val newCenters = nearestCenters.groupBy { case (cid, _) => cid }
-    .reduceGroup sumPointSums map { case (cid, pSum) => cid -> pSum.toPoint() }
-
-  newCenters
-}
-
-val finalCenters = clusterPoints.iterate(numIterations, kMeansStep)
-
-val output = finalCenters.write(clusterOutput, DelimitedOutputFormat(formatOutput.tupled))
-```
-
-Not that we use some functions here which we don't show. If you want, you
-can check out the complete code in our KMeans example.
-
-#### Delta Iteration
-
-The signature of the delta iterate method is this:
-
-```scala
-def iterateWithDelta(workset: DataSet[W], solutionSetKey: A => K, stepFunction: (DataSet[A],
DataSet[W]) => (DataSet[A], DataSet[W]), maxIterations: Int)
-```
-
-where `A` is the type of the `DataSet` on which `iterateWithDelta` is called,
-`W` is the type of the `DataSet` that represents the workset and `K` is the
-key type. The maximum number of iterations must always be given.
-
-For information on how delta iterations in general work on our system, please
-refer to [iterations](iterations.html). A working example job is
-available here:
-[Scala Connected Components Example](examples_scala.html#connected_components) 
-
-[Back to top](#top)
-
-<section id="data-sinks">
-Creating Data Sinks
--------------------
-
-The creation of data sinks is analog to the creation of data sources. `DataSet`
-has a `write` method that is used to create a sink that writes the output
-of the operation to a file in the local file system or HDFS. The general pattern
-is this:
-
-```scala
-val sink = out.write("<file-path>", <output-format>)
-```
-
-Where `out` is some `DataSet`. Just as for data sources, the file path can be
-on of either `file:///some/file` to acces files on the local machine or
-`hdfs://some/path` to read files from HDFS. The output format can be one of our
-builtin formats or a custom output format. The builtin formats are:
-
-* [DelimitedOutputFormat](#delimited-output-format)
-* [CsvOutputFormat](#csv-output-format)
-* [RawOutputFormat](#raw-output-format)
-* [BinaryOutputFormat](#binary-output-format)
-* [BinarySerializedOutputFormat](#binary-serialized-output-format)
-
-We will now have a look at each of them and show how they are employed and in
-which situations.
-
-[Back to top](#top)
-
-<section id="delimited-output-format">
-#### DelimitedOutputFormat
-
-This output format is meant for writing textual records that are separated by
-some delimiter. The delimiter could be a newline, for example. It is used like
-this:
-
-```scala
-DelimitedOutputFormat[In](formatFunction: In => String, delim: String = "\n")
-```
-
-For every element in the `DataSet` the formatting function is called and
-the result of that is appended to the output file. In between the elements
-the `delim` string is inserted.
-
-An example would be:
-
-```scala
-val out: DataSet[(String, Int)]
-val sink = out.write("file:///some/file", DelimitedOutputFormat( { elem =>
-  "%s|%d".format(elem._1, elem._2)
-}))
-```
-
-Here we use Scala String formatting to write the two fields of the tuple
-separated by a pipe character. The default newline delimiter will be inserted
-between the elements in the output files.
-
-[Back to top](#top)
-
-<section id="csv-output-format">
-#### CsvOutputFormat
-
-This output format can be used to automatically write fields of tuple
-elements or case classes to CSV files. You can specify what separator should
-be used between fields of an element and also the separator between elements.
-
-```scala
-CsvOutputFormat[In]()
-CsvOutputFormat[In](recordDelim: String)
-CsvOutputFormat[In](recordDelim: String, fieldDelim: Char)
-```
-
-The default record delimiter is a newline, the default field delimiter is a
-comma. 
-
-An example usage could look as follows:
-
-```scala
-val out: DataSet[(String, Int)]
-val sink = out.write("file:///some/file", CsvOutputFormat())
-```
-
-Notice how we don't need to specify the generic type here, it is inferred.
-
-[Back to top](#top)
-
-<section id="raw-output-format">
-#### RawOutputFormat
-
-This input format can be used when you want to have complete control over
-what gets written. You get an
-[OutputStream](http://docs.oracle.com/javase/7/docs/api/java/io/OutputStream.html)
-and can write the elements of the `DataSet` exactly as you see fit.
-
-A `RawOutputFormat` is created like this:
-
-```scala
-RawOutputFormat[In](writeFunction: (In, OutputStream) => Unit)
-```
-
-The function you pass in gets one element from the `DataSet` and must
-write it to the given `OutputStream`. An example would be the following:
-
-```scala
-val out: DataSet[(String, Int)]
-val sink = out.write("file:///some/file", RawOutputFormat( { (elem, output) =>
-  /* write elem._1 and elem._2 to output */ 
-}))
-```
-
-<section id="binary-output-format">
-#### BinaryOutputFormat
-
-This format is very similar to the RawOutputFormat. The difference is that
-instead of an [OutputStream](http://docs.oracle.com/javase/7/docs/api/java/io/OutputStream.html)
-you get a [DataOutput](http://docs.oracle.com/javase/7/docs/api/java/io/DataOutput.html)
-to which you can write binary data. You can also specify the block size for
-the binary output file. When you don't specify a block size some default
-is used.
-
-A `BinaryOutputFormat` is created like this:
-
-```scala
-BinaryOutputFormat[In](writeFunction: (In, DataOutput) => Unit)
-BinaryOutputFormat[In](writeFunction: (In, DataOutput) => Unit, blockSize: Long)
-```
-[Back to top](#top)
-
-<section id="binary-serialized-output-format">
-#### BinarySerializedOutputFormat
-
-This output format is only meant to be used in conjunction with
-`BinarySerializedInputFormat`. You can use these to write elements to files using a
-Flink-internal format that can efficiently be read again. You should only
-use this when output is only meant to be consumed by other Flink jobs.
-The output format can be used on one of two ways:
-
-```scala
-BinarySerializedOutputFormat[In]()
-BinarySerializedOutputFormat[In](blocksize: Long)
-```
-
-So to write elements of some `DataSet[A]` to a binary file you could use:
-
-```scala
-val out: DataSet[(String, Int)]
-val sink = out.write("file:///some/file", BinarySerializedInputFormat())
-```
-
-As you can see the type of the elements need not be specified, it is inferred
-by Scala.
-
-[Back to top](#top)
-
-<section id="execution">
-Executing Jobs
---------------
-
-To execute a data flow graph the sinks need to be wrapped in a {% gh_link /flink-scala/src/main/scala/org/apache/flink/api/scala/ScalaPlan.scala
"ScalaPlan" %} object like this:
-
-```scala
-val out: DataSet[(String, Int)]
-val sink = out.write("file:///some/file", CsvOutputFormat())
-
-val plan = new ScalaPlan(Seq(sink))
-```
-
-You can put several sinks into the `Seq` that is passed to the constructor.
-
-There are two ways one can execute a data flow plan: local execution and
-remote/cluster execution. When using local execution the plan is executed on
-the local computer. This is handy while developing jobs because you can
-easily debug your code and iterate quickly. When a job is ready to be
-used on bigger data sets it can be executed on a cluster. We will
-now give an example for each of the two execution modes.
-
-First up is local execution:
-
-```scala
-import org.apache.flinkclient.LocalExecutor
-
-...
-
-val plan: ScalaPlan = ...
-LocalExecutor.execute(plan)
-```
-
-This is all there is to it.
-
-Remote (or cluster) execution is a bit more complicated because you have
-to package your code in a jar file so that it can be distributed on the cluster.
-Have a look at the [scala quickstart](scala_api_quickstart.html) to see how you
-can set up a maven project that does the packaging. Remote execution is done
-using the {% gh_link /flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java
"RemoteExecutor" %}, like this:
-
-```scala
-import org.apache.flinkclient.RemoteExecutor
-
-...
-
-val plan: ScalaPlan = ...
-val ex = new RemoteExecutor("<job manager ip address>", <job manager port>, "your.jar");
-ex.executePlan(plan);
-```
-
-The IP address and the port of the Flink job manager depend on your
-setup. Have a look at [cluster quickstart](/quickstart/setup.html) for a quick
-guide about how to set up a cluster. The default cluster port is 6123, so
-if you run a job manger on your local computer you can give this and "localhost"
-as the first to parameters to the `RemoteExecutor` constructor.
-
-[Back to top](#top)
-
-<section id="rich-functions">
-Rich Functions
---------------
-
-Sometimes having a single function that is passed to an operation is not enough.
-Using Rich Functions it is possible to have state inside your operations and
-have code executed before the first element is processed and after the last
-element is processed. For example, instead of a simple function as in this
-example:
-
-```scala
-val mapped = input map { x => x + 1 }
-```
-
-you can have a rich function like this:
-
-```scala
-val mapped = input map( new MapFunction[(String, Int), (String, Int)] {
-  val someState: SomeType = ...
-  override def open(config: Configuration) = {
-    // one-time initialization code
-  }
-
-  override def close() = {
-    // one-time clean-up code
-  }
-
-  override def apply(in: (String, Int)) = {
-    // do complex stuff
-    val result = ...
-    result
-  }
-})
-```
-
-You could also create a custom class that derives from `MapFunction`
-instead of the anonymous class we used here.
-
-There are rich functions for all the various operator types. The basic
-template is the some, though. The common interface that they implement 
-is {% gh_link /flink-core/src/main/java/org/apache/flink/api/common/functions/Function.java
"Function" %}. The `open` and `close` methods can be overridden to run set-up
-and tear-down code. The other methods can be used in a rich function to
-work with the runtime context which gives information about the context
-of the operator. Your operation code must now reside in an `apply` method
-that has the same signature as the anonymous function you would normally
-supply.
-
-The rich functions reside in the package `org.apache.flinkapi.scala.functions`.
-This is a list of all the rich functions can can be used instead of
-simple functions in the respective operations:
-
-```scala
-abstract class MapFunction[In, Out] 
-abstract class FlatMapFunction[In, Out] 
-abstract class FilterFunction[In, Out] 
-
-abstract class ReduceFunction[In]
-abstract class GroupReduceFunction[In, Out]
-abstract class CombinableGroupReduceFunction[In, Out]
-
-abstract class JoinFunction[LeftIn, RightIn, Out]
-abstract class FlatJoinFunction[LeftIn, RightIn, Out]
-
-abstract class CoGroupFunction[LeftIn, RightIn, Out]
-abstract class FlatCoGroupFunction[LeftIn, RightIn, Out]
-
-abstract class CrossFunction[LeftIn, RightIn, Out]
-abstract class FlatCrossFunction[LeftIn, RightIn, Out]
-```
-
-Note that for all the rich stubs, you need to specify the generic type of
-the input (or inputs) and the output type.
-
-[Back to top](#top)
\ No newline at end of file


Mime
View raw message