Return-Path: X-Original-To: apmail-flink-commits-archive@minotaur.apache.org Delivered-To: apmail-flink-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id DA2F01855B for ; Wed, 3 Feb 2016 18:30:19 +0000 (UTC) Received: (qmail 53736 invoked by uid 500); 3 Feb 2016 18:23:39 -0000 Delivered-To: apmail-flink-commits-archive@flink.apache.org Received: (qmail 53644 invoked by uid 500); 3 Feb 2016 18:23:39 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 53480 invoked by uid 99); 3 Feb 2016 18:23:39 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 03 Feb 2016 18:23:39 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id E3BE5E35C6; Wed, 3 Feb 2016 18:23:38 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: uce@apache.org To: commits@flink.apache.org Date: Wed, 03 Feb 2016 18:23:42 -0000 Message-Id: <75053c49b4094b8fa126ebee50175b6b@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [5/6] flink git commit: [docs] Move libraries to batch and streaming guides http://git-wip-us.apache.org/repos/asf/flink/blob/35ec26cd/docs/apis/batch/libs/ml/als.md ---------------------------------------------------------------------- diff --git a/docs/apis/batch/libs/ml/als.md b/docs/apis/batch/libs/ml/als.md new file mode 100644 index 0000000..5cfa159 --- /dev/null +++ b/docs/apis/batch/libs/ml/als.md @@ -0,0 +1,178 @@ +--- +mathjax: include +title: Alternating Least Squares + +# Sub navigation +sub-nav-group: batch +sub-nav-parent: flinkml +sub-nav-title: ALS +--- + + +* This will be replaced by the TOC +{:toc} + +## Description + +The alternating least squares (ALS) algorithm factorizes a given matrix $R$ into two factors $U$ and $V$ such that $R \approx U^TV$. +The unknown row dimension is given as a parameter to the algorithm and is called latent factors. +Since matrix factorization can be used in the context of recommendation, the matrices $U$ and $V$ can be called user and item matrix, respectively. +The $i$th column of the user matrix is denoted by $u_i$ and the $i$th column of the item matrix is $v_i$. +The matrix $R$ can be called the ratings matrix with $$(R)_{i,j} = r_{i,j}$$. + +In order to find the user and item matrix, the following problem is solved: + +$$\arg\min_{U,V} \sum_{\{i,j\mid r_{i,j} \not= 0\}} \left(r_{i,j} - u_{i}^Tv_{j}\right)^2 + +\lambda \left(\sum_{i} n_{u_i} \left\lVert u_i \right\rVert^2 + \sum_{j} n_{v_j} \left\lVert v_j \right\rVert^2 \right)$$ + +with $\lambda$ being the regularization factor, $$n_{u_i}$$ being the number of items the user $i$ has rated and $$n_{v_j}$$ being the number of times the item $j$ has been rated. +This regularization scheme to avoid overfitting is called weighted-$\lambda$-regularization. +Details can be found in the work of [Zhou et al.](http://dx.doi.org/10.1007/978-3-540-68880-8_32). + +By fixing one of the matrices $U$ or $V$, we obtain a quadratic form which can be solved directly. +The solution of the modified problem is guaranteed to monotonically decrease the overall cost function. +By applying this step alternately to the matrices $U$ and $V$, we can iteratively improve the matrix factorization. + +The matrix $R$ is given in its sparse representation as a tuple of $(i, j, r)$ where $i$ denotes the row index, $j$ the column index and $r$ is the matrix value at position $(i,j)$. + +## Operations + +`ALS` is a `Predictor`. +As such, it supports the `fit` and `predict` operation. + +### Fit + +ALS is trained on the sparse representation of the rating matrix: + +* `fit: DataSet[(Int, Int, Double)] => Unit` + +### Predict + +ALS predicts for each tuple of row and column index the rating: + +* `predict: DataSet[(Int, Int)] => DataSet[(Int, Int, Double)]` + +## Parameters + +The alternating least squares implementation can be controlled by the following parameters: + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
ParametersDescription
NumFactors +

+ The number of latent factors to use for the underlying model. + It is equivalent to the dimension of the calculated user and item vectors. + (Default value: 10) +

+
Lambda +

+ Regularization factor. Tune this value in order to avoid overfitting or poor performance due to strong generalization. + (Default value: 1) +

+
Iterations +

+ The maximum number of iterations. + (Default value: 10) +

+
Blocks +

+ The number of blocks into which the user and item matrix are grouped. + The fewer blocks one uses, the less data is sent redundantly. + However, bigger blocks entail bigger update messages which have to be stored on the heap. + If the algorithm fails because of an OutOfMemoryException, then try to increase the number of blocks. + (Default value: None) +

+
Seed +

+ Random seed used to generate the initial item matrix for the algorithm. + (Default value: 0) +

+
TemporaryPath +

+ Path to a temporary directory into which intermediate results are stored. + If this value is set, then the algorithm is split into two preprocessing steps, the ALS iteration and a post-processing step which calculates a last ALS half-step. + The preprocessing steps calculate the OutBlockInformation and InBlockInformation for the given rating matrix. + The results of the individual steps are stored in the specified directory. + By splitting the algorithm into multiple smaller steps, Flink does not have to split the available memory amongst too many operators. + This allows the system to process bigger individual messages and improves the overall performance. + (Default value: None) +

+
+ +## Examples + +{% highlight scala %} +// Read input data set from a csv file +val inputDS: DataSet[(Int, Int, Double)] = env.readCsvFile[(Int, Int, Double)]( + pathToTrainingFile) + +// Setup the ALS learner +val als = ALS() +.setIterations(10) +.setNumFactors(10) +.setBlocks(100) +.setTemporaryPath("hdfs://tempPath") + +// Set the other parameters via a parameter map +val parameters = ParameterMap() +.add(ALS.Lambda, 0.9) +.add(ALS.Seed, 42L) + +// Calculate the factorization +als.fit(inputDS, parameters) + +// Read the testing data set from a csv file +val testingDS: DataSet[(Int, Int)] = env.readCsvFile[(Int, Int)](pathToData) + +// Calculate the ratings according to the matrix factorization +val predictedRatings = als.predict(testingDS) +{% endhighlight %} http://git-wip-us.apache.org/repos/asf/flink/blob/35ec26cd/docs/apis/batch/libs/ml/contribution_guide.md ---------------------------------------------------------------------- diff --git a/docs/apis/batch/libs/ml/contribution_guide.md b/docs/apis/batch/libs/ml/contribution_guide.md new file mode 100644 index 0000000..30df530 --- /dev/null +++ b/docs/apis/batch/libs/ml/contribution_guide.md @@ -0,0 +1,110 @@ +--- +mathjax: include +title: How to Contribute + +# Sub navigation +sub-nav-group: batch +sub-nav-parent: flinkml +sub-nav-title: How To Contribute +--- + + +The Flink community highly appreciates all sorts of contributions to FlinkML. +FlinkML offers people interested in machine learning to work on a highly active open source project which makes scalable ML reality. +The following document describes how to contribute to FlinkML. + +* This will be replaced by the TOC +{:toc} + +## Getting Started + +In order to get started first read Flink's [contribution guide](http://flink.apache.org/how-to-contribute.html). +Everything from this guide also applies to FlinkML. + +## Pick a Topic + +If you are looking for some new ideas you should first look into our [roadmap](https://cwiki.apache.org/confluence/display/FLINK/FlinkML%3A+Vision+and+Roadmap), then you should check out the list of [unresolved issues on JIRA](https://issues.apache.org/jira/issues/?jql=component%20%3D%20%22Machine%20Learning%20Library%22%20AND%20project%20%3D%20FLINK%20AND%20resolution%20%3D%20Unresolved%20ORDER%20BY%20priority%20DESC). +Once you decide to contribute to one of these issues, you should take ownership of it and track your progress with this issue. +That way, the other contributors know the state of the different issues and redundant work is avoided. + +If you already know what you want to contribute to FlinkML all the better. +It is still advisable to create a JIRA issue for your idea to tell the Flink community what you want to do, though. + +## Testing + +New contributions should come with tests to verify the correct behavior of the algorithm. +The tests help to maintain the algorithm's correctness throughout code changes, e.g. refactorings. + +We distinguish between unit tests, which are executed during Maven's test phase, and integration tests, which are executed during maven's verify phase. +Maven automatically makes this distinction by using the following naming rules: +All test cases whose class name ends with a suffix fulfilling the regular expression `(IT|Integration)(Test|Suite|Case)`, are considered integration tests. +The rest are considered unit tests and should only test behavior which is local to the component under test. + +An integration test is a test which requires the full Flink system to be started. +In order to do that properly, all integration test cases have to mix in the trait `FlinkTestBase`. +This trait will set the right `ExecutionEnvironment` so that the test will be executed on a special `FlinkMiniCluster` designated for testing purposes. +Thus, an integration test could look the following: + +{% highlight scala %} +class ExampleITSuite extends FlatSpec with FlinkTestBase { + behavior of "An example algorithm" + + it should "do something" in { + ... + } +} +{% endhighlight %} + +The test style does not have to be `FlatSpec` but can be any other scalatest `Suite` subclass. +See [ScalaTest testing styles](http://scalatest.org/user_guide/selecting_a_style) for more information. + +## Documentation + +When contributing new algorithms, it is required to add code comments describing the way the algorithm works and its parameters with which the user can control its behavior. +Additionally, we would like to encourage contributors to add this information to the online documentation. +The online documentation for FlinkML's components can be found in the directory `docs/libs/ml`. + +Every new algorithm is described by a single markdown file. +This file should contain at least the following points: + +1. What does the algorithm do +2. How does the algorithm work (or reference to description) +3. Parameter description with default values +4. Code snippet showing how the algorithm is used + +In order to use latex syntax in the markdown file, you have to include `mathjax: include` in the YAML front matter. + +{% highlight java %} +--- +mathjax: include +htmlTitle: FlinkML - Example title +title: FlinkML - Example title +--- +{% endhighlight %} + +In order to use displayed mathematics, you have to put your latex code in `$$ ... $$`. +For in-line mathematics, use `$ ... $`. +Additionally some predefined latex commands are included into the scope of your markdown file. +See `docs/_include/latex_commands.html` for the complete list of predefined latex commands. + +## Contributing + +Once you have implemented the algorithm with adequate test coverage and added documentation, you are ready to open a pull request. +Details of how to open a pull request can be found [here](http://flink.apache.org/how-to-contribute.html#contributing-code--documentation). http://git-wip-us.apache.org/repos/asf/flink/blob/35ec26cd/docs/apis/batch/libs/ml/distance_metrics.md ---------------------------------------------------------------------- diff --git a/docs/apis/batch/libs/ml/distance_metrics.md b/docs/apis/batch/libs/ml/distance_metrics.md new file mode 100644 index 0000000..303de4a --- /dev/null +++ b/docs/apis/batch/libs/ml/distance_metrics.md @@ -0,0 +1,111 @@ +--- +mathjax: include +title: Distance Metrics + +# Sub navigation +sub-nav-group: batch +sub-nav-parent: flinkml +sub-nav-title: Distance Metrics +--- + + +* This will be replaced by the TOC +{:toc} + +## Description + +Different metrics of distance are convenient for different types of analysis. Flink ML provides +built-in implementations for many standard distance metrics. You can create custom +distance metrics by implementing the `DistanceMetric` trait. + +## Built-in Implementations + +Currently, FlinkML supports the following metrics: + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
MetricDescription
Euclidean Distance + $$d(\x, \y) = \sqrt{\sum_{i=1}^n \left(x_i - y_i \right)^2}$$ +
Squared Euclidean Distance + $$d(\x, \y) = \sum_{i=1}^n \left(x_i - y_i \right)^2$$ +
Cosine Similarity + $$d(\x, \y) = 1 - \frac{\x^T \y}{\Vert \x \Vert \Vert \y \Vert}$$ +
Chebyshev Distance + $$d(\x, \y) = \max_{i}\left(\left \vert x_i - y_i \right\vert \right)$$ +
Manhattan Distance + $$d(\x, \y) = \sum_{i=1}^n \left\vert x_i - y_i \right\vert$$ +
Minkowski Distance + $$d(\x, \y) = \left( \sum_{i=1}^{n} \left( x_i - y_i \right)^p \right)^{\rfrac{1}{p}}$$ +
Tanimoto Distance + $$d(\x, \y) = 1 - \frac{\x^T\y}{\Vert \x \Vert^2 + \Vert \y \Vert^2 - \x^T\y}$$ + with $\x$ and $\y$ being bit-vectors +
+ +## Custom Implementation + +You can create your own distance metric by implementing the `DistanceMetric` trait. + +{% highlight scala %} +class MyDistance extends DistanceMetric { + override def distance(a: Vector, b: Vector) = ... // your implementation for distance metric +} + +object MyDistance { + def apply() = new MyDistance() +} + +val myMetric = MyDistance() +{% endhighlight %} http://git-wip-us.apache.org/repos/asf/flink/blob/35ec26cd/docs/apis/batch/libs/ml/index.md ---------------------------------------------------------------------- diff --git a/docs/apis/batch/libs/ml/index.md b/docs/apis/batch/libs/ml/index.md new file mode 100644 index 0000000..c3b6316 --- /dev/null +++ b/docs/apis/batch/libs/ml/index.md @@ -0,0 +1,138 @@ +--- +title: "FlinkML - Machine Learning for Flink" +# Top navigation +top-nav-group: libs +top-nav-pos: 2 +top-nav-title: Machine Learning +# Sub navigation +sub-nav-group: batch +sub-nav-id: flinkml +sub-nav-pos: 2 +sub-nav-parent: libs +sub-nav-title: Machine Learning +--- + + +FlinkML is the Machine Learning (ML) library for Flink. It is a new effort in the Flink community, +with a growing list of algorithms and contributors. With FlinkML we aim to provide +scalable ML algorithms, an intuitive API, and tools that help minimize glue code in end-to-end ML +systems. You can see more details about our goals and where the library is headed in our [vision +and roadmap here](https://cwiki.apache.org/confluence/display/FLINK/FlinkML%3A+Vision+and+Roadmap). + +* This will be replaced by the TOC +{:toc} + +## Supported Algorithms + +FlinkML currently supports the following algorithms: + +### Supervised Learning + +* [SVM using Communication efficient distributed dual coordinate ascent (CoCoA)](svm.html) +* [Multiple linear regression](multiple_linear_regression.html) +* [Optimization Framework](optimization.html) + +### Data Preprocessing + +* [Polynomial Features](polynomial_features.html) +* [Standard Scaler](standard_scaler.html) +* [MinMax Scaler](min_max_scaler.html) + +### Recommendation + +* [Alternating Least Squares (ALS)](als.html) + +### Utilities + +* [Distance Metrics](distance_metrics.html) + +## Getting Started + +You can check out our [quickstart guide](quickstart.html) for a comprehensive getting started +example. + +If you want to jump right in, you have to [set up a Flink program]({{ site.baseurl }}/apis/batch/index.html#linking-with-flink). +Next, you have to add the FlinkML dependency to the `pom.xml` of your project. + +{% highlight xml %} + + org.apache.flink + flink-ml{{ site.scala_version_suffix }} + {{site.version }} + +{% endhighlight %} + +Note that FlinkML is currently not part of the binary distribution. +See linking with it for cluster execution [here]({{site.baseurl}}/apis/cluster_execution.html#linking-with-modules-not-contained-in-the-binary-distribution). + +Now you can start solving your analysis task. +The following code snippet shows how easy it is to train a multiple linear regression model. + +{% highlight scala %} +// LabeledVector is a feature vector with a label (class or real value) +val trainingData: DataSet[LabeledVector] = ... +val testingData: DataSet[Vector] = ... + +val mlr = MultipleLinearRegression() + .setStepsize(1.0) + .setIterations(100) + .setConvergenceThreshold(0.001) + +mlr.fit(trainingData) + +// The fitted model can now be used to make predictions +val predictions: DataSet[LabeledVector] = mlr.predict(testingData) +{% endhighlight %} + +## Pipelines + +A key concept of FlinkML is its [scikit-learn](http://scikit-learn.org) inspired pipelining mechanism. +It allows you to quickly build complex data analysis pipelines how they appear in every data scientist's daily work. +An in-depth description of FlinkML's pipelines and their internal workings can be found [here](pipelines.html). + +The following example code shows how easy it is to set up an analysis pipeline with FlinkML. + +{% highlight scala %} +val trainingData: DataSet[LabeledVector] = ... +val testingData: DataSet[Vector] = ... + +val scaler = StandardScaler() +val polyFeatures = PolynomialFeatures().setDegree(3) +val mlr = MultipleLinearRegression() + +// Construct pipeline of standard scaler, polynomial features and multiple linear regression +val pipeline = scaler.chainTransformer(polyFeatures).chainPredictor(mlr) + +// Train pipeline +pipeline.fit(trainingData) + +// Calculate predictions +val predictions: DataSet[LabeledVector] = pipeline.predict(testingData) +{% endhighlight %} + +One can chain a `Transformer` to another `Transformer` or a set of chained `Transformers` by calling the method `chainTransformer`. +If one wants to chain a `Predictor` to a `Transformer` or a set of chained `Transformers`, one has to call the method `chainPredictor`. + + +## How to contribute + +The Flink community welcomes all contributors who want to get involved in the development of Flink and its libraries. +In order to get quickly started with contributing to FlinkML, please read our official +[contribution guide]({{site.baseurl}}/libs/ml/contribution_guide.html). http://git-wip-us.apache.org/repos/asf/flink/blob/35ec26cd/docs/apis/batch/libs/ml/min_max_scaler.md ---------------------------------------------------------------------- diff --git a/docs/apis/batch/libs/ml/min_max_scaler.md b/docs/apis/batch/libs/ml/min_max_scaler.md new file mode 100644 index 0000000..2948a96 --- /dev/null +++ b/docs/apis/batch/libs/ml/min_max_scaler.md @@ -0,0 +1,116 @@ +--- +mathjax: include +title: MinMax Scaler + +# Sub navigation +sub-nav-group: batch +sub-nav-parent: flinkml +sub-nav-title: MinMax Scaler +--- + + +* This will be replaced by the TOC +{:toc} + +## Description + + The MinMax scaler scales the given data set, so that all values will lie between a user specified range [min,max]. + In case the user does not provide a specific minimum and maximum value for the scaling range, the MinMax scaler transforms the features of the input data set to lie in the [0,1] interval. + Given a set of input data $x_1, x_2,... x_n$, with minimum value: + + $$x_{min} = min({x_1, x_2,..., x_n})$$ + + and maximum value: + + $$x_{max} = max({x_1, x_2,..., x_n})$$ + +The scaled data set $z_1, z_2,...,z_n$ will be: + + $$z_{i}= \frac{x_{i} - x_{min}}{x_{max} - x_{min}} \left ( max - min \right ) + min$$ + +where $\textit{min}$ and $\textit{max}$ are the user specified minimum and maximum values of the range to scale. + +## Operations + +`MinMaxScaler` is a `Transformer`. +As such, it supports the `fit` and `transform` operation. + +### Fit + +MinMaxScaler is trained on all subtypes of `Vector` or `LabeledVector`: + +* `fit[T <: Vector]: DataSet[T] => Unit` +* `fit: DataSet[LabeledVector] => Unit` + +### Transform + +MinMaxScaler transforms all subtypes of `Vector` or `LabeledVector` into the respective type: + +* `transform[T <: Vector]: DataSet[T] => DataSet[T]` +* `transform: DataSet[LabeledVector] => DataSet[LabeledVector]` + +## Parameters + +The MinMax scaler implementation can be controlled by the following two parameters: + + + + + + + + + + + + + + + + + + + +
ParametersDescription
Min +

+ The minimum value of the range for the scaled data set. (Default value: 0.0) +

+
Max +

+ The maximum value of the range for the scaled data set. (Default value: 1.0) +

+
+ +## Examples + +{% highlight scala %} +// Create MinMax scaler transformer +val minMaxscaler = MinMaxScaler() + .setMin(-1.0) + +// Obtain data set to be scaled +val dataSet: DataSet[Vector] = ... + +// Learn the minimum and maximum values of the training data +minMaxscaler.fit(dataSet) + +// Scale the provided data set to have min=-1.0 and max=1.0 +val scaledDS = minMaxscaler.transform(dataSet) +{% endhighlight %} http://git-wip-us.apache.org/repos/asf/flink/blob/35ec26cd/docs/apis/batch/libs/ml/multiple_linear_regression.md ---------------------------------------------------------------------- diff --git a/docs/apis/batch/libs/ml/multiple_linear_regression.md b/docs/apis/batch/libs/ml/multiple_linear_regression.md new file mode 100644 index 0000000..436f95b --- /dev/null +++ b/docs/apis/batch/libs/ml/multiple_linear_regression.md @@ -0,0 +1,164 @@ +--- +mathjax: include +title: Multiple linear regression + +# Sub navigation +sub-nav-group: batch +sub-nav-parent: flinkml +sub-nav-title: Multiple Linear Regression +--- + + +* This will be replaced by the TOC +{:toc} + +## Description + + Multiple linear regression tries to find a linear function which best fits the provided input data. + Given a set of input data with its value $(\mathbf{x}, y)$, multiple linear regression finds + a vector $\mathbf{w}$ such that the sum of the squared residuals is minimized: + + $$ S(\mathbf{w}) = \sum_{i=1} \left(y - \mathbf{w}^T\mathbf{x_i} \right)^2$$ + + Written in matrix notation, we obtain the following formulation: + + $$\mathbf{w}^* = \arg \min_{\mathbf{w}} (\mathbf{y} - X\mathbf{w})^2$$ + + This problem has a closed form solution which is given by: + + $$\mathbf{w}^* = \left(X^TX\right)^{-1}X^T\mathbf{y}$$ + + However, in cases where the input data set is so huge that a complete parse over the whole data + set is prohibitive, one can apply stochastic gradient descent (SGD) to approximate the solution. + SGD first calculates for a random subset of the input data set the gradients. The gradient + for a given point $\mathbf{x}_i$ is given by: + + $$\nabla_{\mathbf{w}} S(\mathbf{w}, \mathbf{x_i}) = 2\left(\mathbf{w}^T\mathbf{x_i} - + y\right)\mathbf{x_i}$$ + + The gradients are averaged and scaled. The scaling is defined by $\gamma = \frac{s}{\sqrt{j}}$ + with $s$ being the initial step size and $j$ being the current iteration number. The resulting gradient is subtracted from the + current weight vector giving the new weight vector for the next iteration: + + $$\mathbf{w}_{t+1} = \mathbf{w}_t - \gamma \frac{1}{n}\sum_{i=1}^n \nabla_{\mathbf{w}} S(\mathbf{w}, \mathbf{x_i})$$ + + The multiple linear regression algorithm computes either a fixed number of SGD iterations or terminates based on a dynamic convergence criterion. + The convergence criterion is the relative change in the sum of squared residuals: + + $$\frac{S_{k-1} - S_k}{S_{k-1}} < \rho$$ + +## Operations + +`MultipleLinearRegression` is a `Predictor`. +As such, it supports the `fit` and `predict` operation. + +### Fit + +MultipleLinearRegression is trained on a set of `LabeledVector`: + +* `fit: DataSet[LabeledVector] => Unit` + +### Predict + +MultipleLinearRegression predicts for all subtypes of `Vector` the corresponding regression value: + +* `predict[T <: Vector]: DataSet[T] => DataSet[LabeledVector]` + +If we call predict with a `DataSet[LabeledVector]`, we make a prediction on the regression value +for each example, and return a `DataSet[(Double, Double)]`. In each tuple the first element +is the true value, as was provided from the input `DataSet[LabeledVector]` and the second element +is the predicted value. You can then use these `(truth, prediction)` tuples to evaluate +the algorithm's performance. + +* `predict: DataSet[LabeledVector] => DataSet[(Double, Double)]` + +## Parameters + + The multiple linear regression implementation can be controlled by the following parameters: + + + + + + + + + + + + + + + + + + + + + + + + + + + +
ParametersDescription
Iterations +

+ The maximum number of iterations. (Default value: 10) +

+
Stepsize +

+ Initial step size for the gradient descent method. + This value controls how far the gradient descent method moves in the opposite direction of the gradient. + Tuning this parameter might be crucial to make it stable and to obtain a better performance. + (Default value: 0.1) +

+
ConvergenceThreshold +

+ Threshold for relative change of the sum of squared residuals until the iteration is stopped. + (Default value: None) +

+
LearningRateMethod +

+ Learning rate method used to calculate the effective learning rate for each iteration. + See the list of supported learing rate methods. + (Default value: LearningRateMethod.Default) +

+
+ +## Examples + +{% highlight scala %} +// Create multiple linear regression learner +val mlr = MultipleLinearRegression() +.setIterations(10) +.setStepsize(0.5) +.setConvergenceThreshold(0.001) + +// Obtain training and testing data set +val trainingDS: DataSet[LabeledVector] = ... +val testingDS: DataSet[Vector] = ... + +// Fit the linear model to the provided data +mlr.fit(trainingDS) + +// Calculate the predictions for the test data +val predictions = mlr.predict(testingDS) +{% endhighlight %} http://git-wip-us.apache.org/repos/asf/flink/blob/35ec26cd/docs/apis/batch/libs/ml/optimization.md ---------------------------------------------------------------------- diff --git a/docs/apis/batch/libs/ml/optimization.md b/docs/apis/batch/libs/ml/optimization.md new file mode 100644 index 0000000..ccb7e45 --- /dev/null +++ b/docs/apis/batch/libs/ml/optimization.md @@ -0,0 +1,385 @@ +--- +mathjax: include +title: Optimization +# Sub navigation +sub-nav-group: batch +sub-nav-parent: flinkml +sub-nav-title: Optimization +--- + + +* Table of contents +{:toc} + +## Mathematical Formulation + +The optimization framework in FlinkML is a developer-oriented package that can be used to solve +[optimization](https://en.wikipedia.org/wiki/Mathematical_optimization) +problems common in Machine Learning (ML) tasks. In the supervised learning context, this usually +involves finding a model, as defined by a set of parameters $w$, that minimize a function $f(\wv)$ +given a set of $(\x, y)$ examples, +where $\x$ is a feature vector and $y$ is a real number, which can represent either a real value in +the regression case, or a class label in the classification case. In supervised learning, the +function to be minimized is usually of the form: + + +\begin{equation} \label{eq:objectiveFunc} + f(\wv) := + \frac1n \sum_{i=1}^n L(\wv;\x_i,y_i) + + \lambda\, R(\wv) + \ . +\end{equation} + + +where $L$ is the loss function and $R(\wv)$ the regularization penalty. We use $L$ to measure how +well the model fits the observed data, and we use $R$ in order to impose a complexity cost to the +model, with $\lambda > 0$ being the regularization parameter. + +### Loss Functions + +In supervised learning, we use loss functions in order to measure the model fit, by +penalizing errors in the predictions $p$ made by the model compared to the true $y$ for each +example. Different loss functions can be used for regression (e.g. Squared Loss) and classification +(e.g. Hinge Loss) tasks. + +Some common loss functions are: + +* Squared Loss: $ \frac{1}{2} \left(\wv^T \cdot \x - y\right)^2, \quad y \in \R $ +* Hinge Loss: $ \max \left(0, 1 - y ~ \wv^T \cdot \x\right), \quad y \in \{-1, +1\} $ +* Logistic Loss: $ \log\left(1+\exp\left( -y ~ \wv^T \cdot \x\right)\right), \quad y \in \{-1, +1\}$ + +### Regularization Types + +[Regularization](https://en.wikipedia.org/wiki/Regularization_(mathematics)) in machine learning +imposes penalties to the estimated models, in order to reduce overfitting. The most common penalties +are the $L_1$ and $L_2$ penalties, defined as: + +* $L_1$: $R(\wv) = \norm{\wv}_1$ +* $L_2$: $R(\wv) = \frac{1}{2}\norm{\wv}_2^2$ + +The $L_2$ penalty penalizes large weights, favoring solutions with more small weights rather than +few large ones. +The $L_1$ penalty can be used to drive a number of the solution coefficients to 0, thereby +producing sparse solutions. +The regularization constant $\lambda$ in $\eqref{eq:objectiveFunc}$ determines the amount of regularization applied to the model, +and is usually determined through model cross-validation. +A good comparison of regularization types can be found in [this](http://www.robotics.stanford.edu/~ang/papers/icml04-l1l2.pdf) paper by Andrew Ng. +Which regularization type is supported depends on the actually used optimization algorithm. + +## Stochastic Gradient Descent + +In order to find a (local) minimum of a function, Gradient Descent methods take steps in the +direction opposite to the gradient of the function $\eqref{eq:objectiveFunc}$ taken with +respect to the current parameters (weights). +In order to compute the exact gradient we need to perform one pass through all the points in +a dataset, making the process computationally expensive. +An alternative is Stochastic Gradient Descent (SGD) where at each iteration we sample one point +from the complete dataset and update the parameters for each point, in an online manner. + +In mini-batch SGD we instead sample random subsets of the dataset, and compute the gradient +over each batch. At each iteration of the algorithm we update the weights once, based on +the average of the gradients computed from each mini-batch. + +An important parameter is the learning rate $\eta$, or step size, which can be determined by one of five methods, listed below. The setting of the initial step size can significantly affect the performance of the +algorithm. For some practical tips on tuning SGD see Leon Botou's +"[Stochastic Gradient Descent Tricks](http://research.microsoft.com/pubs/192769/tricks-2012.pdf)". + +The current implementation of SGD uses the whole partition, making it +effectively a batch gradient descent. Once a sampling operator has been introduced in Flink, true +mini-batch SGD will be performed. + +### Regularization + +FlinkML supports Stochastic Gradient Descent with L1, L2 and no regularization. +The following list contains a mapping between the implementing classes and the regularization function. + + + + + + + + + + + + + + + + + + + + + + +
Class NameRegularization function $R(\wv)$
SimpleGradient$R(\wv) = 0$
GradientDescentL1$R(\wv) = \norm{\wv}_1$
GradientDescentL2$R(\wv) = \frac{1}{2}\norm{\wv}_2^2$
+ +### Parameters + + The stochastic gradient descent implementation can be controlled by the following parameters: + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
ParameterDescription
LossFunction +

+ The loss function to be optimized. (Default value: None) +

+
RegularizationConstant +

+ The amount of regularization to apply. (Default value: 0.1) +

+
Iterations +

+ The maximum number of iterations. (Default value: 10) +

+
LearningRate +

+ Initial learning rate for the gradient descent method. + This value controls how far the gradient descent method moves in the opposite direction + of the gradient. + (Default value: 0.1) +

+
ConvergenceThreshold +

+ When set, iterations stop if the relative change in the value of the objective function $\eqref{eq:objectiveFunc}$ is less than the provided threshold, $\tau$. + The convergence criterion is defined as follows: $\left| \frac{f(\wv)_{i-1} - f(\wv)_i}{f(\wv)_{i-1}}\right| < \tau$. + (Default value: None) +

+
LearningRateMethod +

+ (Default value: LearningRateMethod.Default) +

+
Decay +

+ (Default value: 0.0) +

+
+ +### Loss Function + +The loss function which is minimized has to implement the `LossFunction` interface, which defines methods to compute the loss and the gradient of it. +Either one defines ones own `LossFunction` or one uses the `GenericLossFunction` class which constructs the loss function from an outer loss function and a prediction function. +An example can be seen here + +```Scala +val lossFunction = GenericLossFunction(SquaredLoss, LinearPrediction) +``` + +The full list of supported outer loss functions can be found [here](#partial-loss-function-values). +The full list of supported prediction functions can be found [here](#prediction-function-values). + +#### Partial Loss Function Values ## + + + + + + + + + + + + + + + + + + +
Function NameDescriptionLossLoss Derivative
SquaredLoss +

+ Loss function most commonly used for regression tasks. +

+
$\frac{1}{2} (\wv^T \cdot \x - y)^2$$\wv^T \cdot \x - y$
+ +#### Prediction Function Values ## + + + + + + + + + + + + + + + + + + +
Function NameDescriptionPredictionPrediction Gradient
LinearPrediction +

+ The function most commonly used for linear models, such as linear regression and + linear classifiers. +

+
$\x^T \cdot \wv$$\x$
+ +#### Effective Learning Rate ## + +Where: + +- $j$ is the iteration number + +- $\eta_j$ is the step size on step $j$ + +- $\eta_0$ is the initial step size + +- $\lambda$ is the regularization constant + +- $\tau$ is the decay constant, which causes the learning rate to be a decreasing function of $j$, that is to say as iterations increase, learning rate decreases. The exact rate of decay is function specific, see **Inverse Scaling** and **Wei Xu's Method** (which is an extension of the **Inverse Scaling** method). + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
Function NameDescriptionFunctionCalled As
Default +

+ The function default method used for determining the step size. This is equivalent to the inverse scaling method for $\tau$ = 0.5. This special case is kept as the default to maintain backwards compatibility. +

+
$\eta_j = \eta_0/\sqrt{j}$LearningRateMethod.Default
Constant +

+ The step size is constant throughout the learning task. +

+
$\eta_j = \eta_0$LearningRateMethod.Constant
Leon Bottou's Method +

+ This is the 'optimal' method of sklearn. + The optimal initial value $t_0$ has to be provided. + Sklearn uses the following heuristic: $t_0 = \max(1.0, L^\prime(-\beta, 1.0) / (\alpha \cdot \beta)$ + with $\beta = \sqrt{\frac{1}{\sqrt{\alpha}}}$ and $L^\prime(prediction, truth)$ being the derivative of the loss function. +

+
$\eta_j = 1 / (\lambda \cdot (t_0 + j -1)) $LearningRateMethod.Bottou
Inverse Scaling +

+ A very common method for determining the step size. +

+
$\eta_j = \eta_0 / j^{\tau}$LearningRateMethod.InvScaling
Wei Xu's Method +

+ Method proposed by Wei Xu in Towards Optimal One Pass Large Scale Learning with + Averaged Stochastic Gradient Descent +

+
$\eta_j = \eta_0 \cdot (1+ \lambda \cdot \eta_0 \cdot j)^{-\tau} $LearningRateMethod.Xu
+ +### Examples + +In the Flink implementation of SGD, given a set of examples in a `DataSet[LabeledVector]` and +optionally some initial weights, we can use `GradientDescentL1.optimize()` in order to optimize +the weights for the given data. + +The user can provide an initial `DataSet[WeightVector]`, +which contains one `WeightVector` element, or use the default weights which are all set to 0. +A `WeightVector` is a container class for the weights, which separates the intercept from the +weight vector. This allows us to avoid applying regularization to the intercept. + + + +{% highlight scala %} +// Create stochastic gradient descent solver +val sgd = GradientDescentL1() + .setLossFunction(SquaredLoss()) + .setRegularizationConstant(0.2) + .setIterations(100) + .setLearningRate(0.01) + .setLearningRateMethod(LearningRateMethod.Xu(-0.75)) + + +// Obtain data +val trainingDS: DataSet[LabeledVector] = ... + +// Optimize the weights, according to the provided data +val weightDS = sgd.optimize(trainingDS) +{% endhighlight %} http://git-wip-us.apache.org/repos/asf/flink/blob/35ec26cd/docs/apis/batch/libs/ml/pipelines.md ---------------------------------------------------------------------- diff --git a/docs/apis/batch/libs/ml/pipelines.md b/docs/apis/batch/libs/ml/pipelines.md new file mode 100644 index 0000000..a6cce45 --- /dev/null +++ b/docs/apis/batch/libs/ml/pipelines.md @@ -0,0 +1,445 @@ +--- +mathjax: include +title: Looking under the hood of pipelines +# Sub navigation +sub-nav-group: batch +sub-nav-parent: flinkml +sub-nav-title: Pipelines +--- + + +* This will be replaced by the TOC +{:toc} + +## Introduction + +The ability to chain together different transformers and predictors is an important feature for +any Machine Learning (ML) library. In FlinkML we wanted to provide an intuitive API, +and at the same +time utilize the capabilities of the Scala language to provide +type-safe implementations of our pipelines. What we hope to achieve then is an easy to use API, +that protects users from type errors at pre-flight (before the job is launched) time, thereby +eliminating cases where long +running jobs are submitted to the cluster only to see them fail due to some +error in the series of data transformations that commonly happen in an ML pipeline. + +In this guide then we will describe the choices we made during the implementation of chainable +transformers and predictors in FlinkML, and provide guidelines on how developers can create their +own algorithms that make use of these capabilities. + +## The what and the why + +So what do we mean by "ML pipelines"? Pipelines in the ML context can be thought of as chains of +operations that have some data as input, perform a number of transformations to that data, +and +then output the transformed data, either to be used as the input (features) of a predictor +function, such as a learning model, or just output the transformed data themselves, to be used in +some other task. The end learner can of course be a part of the pipeline as well. +ML pipelines can often be complicated sets of operations ([in-depth explanation](http://research.google.com/pubs/pub43146.html)) and +can become sources of errors for end-to-end learning systems. + +The purpose of ML pipelines is then to create a +framework that can be used to manage the complexity introduced by these chains of operations. +Pipelines should make it easy for developers to define chained transformations that can be +applied to the +training data, in order to create the end features that will be used to train a +learning model, and then perform the same set of transformations just as easily to unlabeled +(test) data. Pipelines should also simplify cross-validation and model selection on +these chains of operations. + +Finally, by ensuring that the consecutive links in the pipeline chain "fit together" we also +avoid costly type errors. Since each step in a pipeline can be a computationally-heavy operation, +we want to avoid running a pipelined job, unless we are sure that all the input/output pairs in a +pipeline "fit". + +## Pipelines in FlinkML + +The building blocks for pipelines in FlinkML can be found in the `ml.pipeline` package. +FlinkML follows an API inspired by [sklearn](http://scikit-learn.org) which means that we have +`Estimator`, `Transformer` and `Predictor` interfaces. For an in-depth look at the design of the +sklearn API the interested reader is referred to [this](http://arxiv.org/abs/1309.0238) paper. +In short, the `Estimator` is the base class from which `Transformer` and `Predictor` inherit. +`Estimator` defines a `fit` method, and `Transformer` also defines a `transform` method and +`Predictor` defines a `predict` method. + +The `fit` method of the `Estimator` performs the actual training of the model, for example +finding the correct weights in a linear regression task, or the mean and standard deviation of +the data in a feature scaler. +As evident by the naming, classes that implement +`Transformer` are transform operations like [scaling the input](standard_scaler.html) and +`Predictor` implementations are learning algorithms such as [Multiple Linear Regression]({{site.baseurl}}/libs/ml/multiple_linear_regression.html). +Pipelines can be created by chaining together a number of Transformers, and the final link in a pipeline can be a Predictor or another Transformer. +Pipelines that end with Predictor cannot be chained any further. +Below is an example of how a pipeline can be formed: + +{% highlight scala %} +// Training data +val input: DataSet[LabeledVector] = ... +// Test data +val unlabeled: DataSet[Vector] = ... + +val scaler = StandardScaler() +val polyFeatures = PolynomialFeatures() +val mlr = MultipleLinearRegression() + +// Construct the pipeline +val pipeline = scaler + .chainTransformer(polyFeatures) + .chainPredictor(mlr) + +// Train the pipeline (scaler and multiple linear regression) +pipeline.fit(input) + +// Calculate predictions for the testing data +val predictions: DataSet[LabeledVector] = pipeline.predict(unlabeled) + +{% endhighlight %} + +As we mentioned, FlinkML pipelines are type-safe. +If we tried to chain a transformer with output of type `A` to another with input of type `B` we +would get an error at pre-flight time if `A` != `B`. FlinkML achieves this kind of type-safety +through the use of Scala's implicits. + +### Scala implicits + +If you are not familiar with Scala's implicits we can recommend [this excerpt](https://www.artima.com/pins1ed/implicit-conversions-and-parameters.html) +from Martin Odersky's "Programming in Scala". In short, implicit conversions allow for ad-hoc +polymorphism in Scala by providing conversions from one type to another, and implicit values +provide the compiler with default values that can be supplied to function calls through implicit parameters. +The combination of implicit conversions and implicit parameters is what allows us to chain transform +and predict operations together in a type-safe manner. + +### Operations + +As we mentioned, the trait (abstract class) `Estimator` defines a `fit` method. The method has two +parameter lists +(i.e. is a [curried function](http://docs.scala-lang.org/tutorials/tour/currying.html)). The +first parameter list +takes the input (training) `DataSet` and the parameters for the estimator. The second parameter +list takes one `implicit` parameter, of type `FitOperation`. `FitOperation` is a class that also +defines a `fit` method, and this is where the actual logic of training the concrete Estimators +should be implemented. The `fit` method of `Estimator` is essentially a wrapper around the fit +method of `FitOperation`. The `predict` method of `Predictor` and the `transform` method of +`Transform` are designed in a similar manner, with a respective operation class. + +In these methods the operation object is provided as an implicit parameter. +Scala will [look for implicits](http://docs.scala-lang.org/tutorials/FAQ/finding-implicits.html) +in the companion object of a type, so classes that implement these interfaces should provide these +objects as implicit objects inside the companion object. + +As an example we can look at the `StandardScaler` class. `StandardScaler` extends `Transformer`, so it has access to its `fit` and `transform` functions. +These two functions expect objects of `FitOperation` and `TransformOperation` as implicit parameters, +for the `fit` and `transform` methods respectively, which `StandardScaler` provides in its companion +object, through `transformVectors` and `fitVectorStandardScaler`: + +{% highlight scala %} +class StandardScaler extends Transformer[StandardScaler] { + ... +} + +object StandardScaler { + + ... + + implicit def fitVectorStandardScaler[T <: Vector] = new FitOperation[StandardScaler, T] { + override def fit(instance: StandardScaler, fitParameters: ParameterMap, input: DataSet[T]) + : Unit = { + ... + } + + implicit def transformVectors[T <: Vector: VectorConverter: TypeInformation: ClassTag] = { + new TransformOperation[StandardScaler, T, T] { + override def transform( + instance: StandardScaler, + transformParameters: ParameterMap, + input: DataSet[T]) + : DataSet[T] = { + ... + } + +} + +{% endhighlight %} + +Note that `StandardScaler` does **not** override the `fit` method of `Estimator` or the `transform` +method of `Transformer`. Rather, its implementations of `FitOperation` and `TransformOperation` +override their respective `fit` and `transform` methods, which are then called by the `fit` and +`transform` methods of `Estimator` and `Transformer`. Similarly, a class that implements +`Predictor` should define an implicit `PredictOperation` object inside its companion object. + +#### Types and type safety + +Apart from the `fit` and `transform` operations that we listed above, the `StandardScaler` also +provides `fit` and `transform` operations for input of type `LabeledVector`. +This allows us to use the algorithm for input that is labeled or unlabeled, and this happens +automatically, depending on the type of the input that we give to the fit and transform +operations. The correct implicit operation is chosen by the compiler, depending on the input type. + +If we try to call the `fit` or `transform` methods with types that are not supported we will get a +runtime error before the job is launched. +While it would be possible to catch these kinds of errors at compile time as well, the error +messages that we are able to provide the user would be much less informative, which is why we chose +to throw runtime exceptions instead. + +### Chaining + +Chaining is achieved by calling `chainTransformer` or `chainPredictor` on an object +of a class that implements `Transformer`. These methods return a `ChainedTransformer` or +`ChainedPredictor` object respectively. As we mentioned, `ChainedTransformer` objects can be +chained further, while `ChainedPredictor` objects cannot. These classes take care of applying +fit, transform, and predict operations for a pair of successive transformers or +a transformer and a predictor. They also act recursively if the length of the +chain is larger than two, since every `ChainedTransformer` defines a `transform` and `fit` +operation that can be further chained with more transformers or a predictor. + +It is important to note that developers and users do not need to worry about chaining when +implementing their algorithms, all this is handled automatically by FlinkML. + +### How to Implement a Pipeline Operator + +In order to support FlinkML's pipelining, algorithms have to adhere to a certain design pattern, which we will describe in this section. +Let's assume that we want to implement a pipeline operator which changes the mean of your data. +Since centering data is a common pre-processing step in many analysis pipelines, we will implement it as a `Transformer`. +Therefore, we first create a `MeanTransformer` class which inherits from `Transformer` + +{% highlight scala %} +class MeanTransformer extends Transformer[MeanTransformer] {} +{% endhighlight %} + +Since we want to be able to configure the mean of the resulting data, we have to add a configuration parameter. + +{% highlight scala %} +class MeanTransformer extends Transformer[MeanTransformer] { + def setMean(mean: Double): this.type = { + parameters.add(MeanTransformer.Mean, mean) + this + } +} + +object MeanTransformer { + case object Mean extends Parameter[Double] { + override val defaultValue: Option[Double] = Some(0.0) + } + + def apply(): MeanTransformer = new MeanTransformer +} +{% endhighlight %} + +Parameters are defined in the companion object of the transformer class and extend the `Parameter` class. +Since the parameter instances are supposed to act as immutable keys for a parameter map, they should be implemented as `case objects`. +The default value will be used if no other value has been set by the user of this component. +If no default value has been specified, meaning that `defaultValue = None`, then the algorithm has to handle this situation accordingly. + +We can now instantiate a `MeanTransformer` object and set the mean value of the transformed data. +But we still have to implement how the transformation works. +The workflow can be separated into two phases. +Within the first phase, the transformer learns the mean of the given training data. +This knowledge can then be used in the second phase to transform the provided data with respect to the configured resulting mean value. + +The learning of the mean can be implemented within the `fit` operation of our `Transformer`, which it inherited from `Estimator`. +Within the `fit` operation, a pipeline component is trained with respect to the given training data. +The algorithm is, however, **not** implemented by overriding the `fit` method but by providing an implementation of a corresponding `FitOperation` for the correct type. +Taking a look at the definition of the `fit` method in `Estimator`, which is the parent class of `Transformer`, reveals what why this is the case. + +{% highlight scala %} +trait Estimator[Self] extends WithParameters with Serializable { + that: Self => + + def fit[Training]( + training: DataSet[Training], + fitParameters: ParameterMap = ParameterMap.Empty) + (implicit fitOperation: FitOperation[Self, Training]): Unit = { + FlinkMLTools.registerFlinkMLTypes(training.getExecutionEnvironment) + fitOperation.fit(this, fitParameters, training) + } +} +{% endhighlight %} + +We see that the `fit` method is called with an input data set of type `Training`, an optional parameter list and in the second parameter list with an implicit parameter of type `FitOperation`. +Within the body of the function, first some machine learning types are registered and then the `fit` method of the `FitOperation` parameter is called. +The instance gives itself, the parameter map and the training data set as a parameters to the method. +Thus, all the program logic takes place within the `FitOperation`. + +The `FitOperation` has two type parameters. +The first defines the pipeline operator type for which this `FitOperation` shall work and the second type parameter defines the type of the data set elements. +If we first wanted to implement the `MeanTransformer` to work on `DenseVector`, we would, thus, have to provide an implementation for `FitOperation[MeanTransformer, DenseVector]`. + +{% highlight scala %} +val denseVectorMeanFitOperation = new FitOperation[MeanTransformer, DenseVector] { + override def fit(instance: MeanTransformer, fitParameters: ParameterMap, input: DataSet[DenseVector]) : Unit = { + import org.apache.flink.ml.math.Breeze._ + val meanTrainingData: DataSet[DenseVector] = input + .map{ x => (x.asBreeze, 1) } + .reduce{ + (left, right) => + (left._1 + right._1, left._2 + right._2) + } + .map{ p => (p._1/p._2).fromBreeze } + } +} +{% endhighlight %} + +A `FitOperation[T, I]` has a `fit` method which is called with an instance of type `T`, a parameter map and an input `DataSet[I]`. +In our case `T=MeanTransformer` and `I=DenseVector`. +The parameter map is necessary if our fit step depends on some parameter values which were not given directly at creation time of the `Transformer`. +The `FitOperation` of the `MeanTransformer` sums the `DenseVector` instances of the given input data set up and divides the result by the total number of vectors. +That way, we obtain a `DataSet[DenseVector]` with a single element which is the mean value. + +But if we look closely at the implementation, we see that the result of the mean computation is never stored anywhere. +If we want to use this knowledge in a later step to adjust the mean of some other input, we have to keep it around. +And here is where the parameter of type `MeanTransformer` which is given to the `fit` method comes into play. +We can use this instance to store state, which is used by a subsequent `transform` operation which works on the same object. +But first we have to extend `MeanTransformer` by a member field and then adjust the `FitOperation` implementation. + +{% highlight scala %} +class MeanTransformer extends Transformer[Centering] { + var meanOption: Option[DataSet[DenseVector]] = None + + def setMean(mean: Double): Mean = { + parameters.add(MeanTransformer.Mean, mu) + } +} + +val denseVectorMeanFitOperation = new FitOperation[MeanTransformer, DenseVector] { + override def fit(instance: MeanTransformer, fitParameters: ParameterMap, input: DataSet[DenseVector]) : Unit = { + import org.apache.flink.ml.math.Breeze._ + + instance.meanOption = Some(input + .map{ x => (x.asBreeze, 1) } + .reduce{ + (left, right) => + (left._1 + right._1, left._2 + right._2) + } + .map{ p => (p._1/p._2).fromBreeze }) + } +} +{% endhighlight %} + +If we look at the `transform` method in `Transformer`, we will see that we also need an implementation of `TransformOperation`. +A possible mean transforming implementation could look like the following. + +{% highlight scala %} + +val denseVectorMeanTransformOperation = new TransformOperation[MeanTransformer, DenseVector, DenseVector] { + override def transform( + instance: MeanTransformer, + transformParameters: ParameterMap, + input: DataSet[DenseVector]) + : DataSet[DenseVector] = { + val resultingParameters = parameters ++ transformParameters + + val resultingMean = resultingParameters(MeanTransformer.Mean) + + instance.meanOption match { + case Some(trainingMean) => { + input.map{ new MeanTransformMapper(resultingMean) }.withBroadcastSet(trainingMean, "trainingMean") + } + case None => throw new RuntimeException("MeanTransformer has not been fitted to data.") + } + } +} + +class MeanTransformMapper(resultingMean: Double) extends RichMapFunction[DenseVector, DenseVector] { + var trainingMean: DenseVector = null + + override def open(parameters: Configuration): Unit = { + trainingMean = getRuntimeContext().getBroadcastVariable[DenseVector]("trainingMean").get(0) + } + + override def map(vector: DenseVector): DenseVector = { + import org.apache.flink.ml.math.Breeze._ + + val result = vector.asBreeze - trainingMean.asBreeze + resultingMean + + result.fromBreeze + } +} +{% endhighlight %} + +Now we have everything implemented to fit our `MeanTransformer` to a training data set of `DenseVector` instances and to transform them. +However, when we execute the `fit` operation + +{% highlight scala %} +val trainingData: DataSet[DenseVector] = ... +val meanTransformer = MeanTransformer() + +meanTransformer.fit(trainingData) +{% endhighlight %} + +we receive the following error at runtime: `"There is no FitOperation defined for class MeanTransformer which trains on a DataSet[org.apache.flink.ml.math.DenseVector]"`. +The reason is that the Scala compiler could not find a fitting `FitOperation` value with the right type parameters for the implicit parameter of the `fit` method. +Therefore, it chose a fallback implicit value which gives you this error message at runtime. +In order to make the compiler aware of our implementation, we have to define it as an implicit value and put it in the scope of the `MeanTransformer's` companion object. + +{% highlight scala %} +object MeanTransformer{ + implicit val denseVectorMeanFitOperation = new FitOperation[MeanTransformer, DenseVector] ... + + implicit val denseVectorMeanTransformOperation = new TransformOperation[MeanTransformer, DenseVector, DenseVector] ... +} +{% endhighlight %} + +Now we can call `fit` and `transform` of our `MeanTransformer` with `DataSet[DenseVector]` as input. +Furthermore, we can now use this transformer as part of an analysis pipeline where we have a `DenseVector` as input and expected output. + +{% highlight scala %} +val trainingData: DataSet[DenseVector] = ... + +val mean = MeanTransformer.setMean(1.0) +val polyFeaturs = PolynomialFeatures().setDegree(3) + +val pipeline = mean.chainTransformer(polyFeatures) + +pipeline.fit(trainingData) +{% endhighlight %} + +It is noteworthy that there is no additional code needed to enable chaining. +The system automatically constructs the pipeline logic using the operations of the individual components. + +So far everything works fine with `DenseVector`. +But what happens, if we call our transformer with `LabeledVector` instead? +{% highlight scala %} +val trainingData: DataSet[LabeledVector] = ... + +val mean = MeanTransformer() + +mean.fit(trainingData) +{% endhighlight %} + +As before we see the following exception upon execution of the program: `"There is no FitOperation defined for class MeanTransformer which trains on a DataSet[org.apache.flink.ml.common.LabeledVector]"`. +It is noteworthy, that this exception is thrown in the pre-flight phase, which means that the job has not been submitted to the runtime system. +This has the advantage that you won't see a job which runs for a couple of days and then fails because of an incompatible pipeline component. +Type compatibility is, thus, checked at the very beginning for the complete job. + +In order to make the `MeanTransformer` work on `LabeledVector` as well, we have to provide the corresponding operations. +Consequently, we have to define a `FitOperation[MeanTransformer, LabeledVector]` and `TransformOperation[MeanTransformer, LabeledVector, LabeledVector]` as implicit values in the scope of `MeanTransformer`'s companion object. + +{% highlight scala %} +object MeanTransformer { + implicit val labeledVectorFitOperation = new FitOperation[MeanTransformer, LabeledVector] ... + + implicit val labeledVectorTransformOperation = new TransformOperation[MeanTransformer, LabeledVector, LabeledVector] ... +} +{% endhighlight %} + +If we wanted to implement a `Predictor` instead of a `Transformer`, then we would have to provide a `FitOperation`, too. +Moreover, a `Predictor` requires a `PredictOperation` which implements how predictions are calculated from testing data. + + http://git-wip-us.apache.org/repos/asf/flink/blob/35ec26cd/docs/apis/batch/libs/ml/polynomial_features.md ---------------------------------------------------------------------- diff --git a/docs/apis/batch/libs/ml/polynomial_features.md b/docs/apis/batch/libs/ml/polynomial_features.md new file mode 100644 index 0000000..9ef7654 --- /dev/null +++ b/docs/apis/batch/libs/ml/polynomial_features.md @@ -0,0 +1,111 @@ +--- +mathjax: include +title: Polynomial Features +# Sub navigation +sub-nav-group: batch +sub-nav-parent: flinkml +sub-nav-title: Polynomial Features +--- + + +* This will be replaced by the TOC +{:toc} + +## Description + +The polynomial features transformer maps a vector into the polynomial feature space of degree $d$. +The dimension of the input vector determines the number of polynomial factors whose values are the respective vector entries. +Given a vector $(x, y, z, \ldots)^T$ the resulting feature vector looks like: + +$$\left(x, y, z, x^2, xy, y^2, yz, z^2, x^3, x^2y, x^2z, xy^2, xyz, xz^2, y^3, \ldots\right)^T$$ + +Flink's implementation orders the polynomials in decreasing order of their degree. + +Given the vector $\left(3,2\right)^T$, the polynomial features vector of degree 3 would look like + + $$\left(3^3, 3^2\cdot2, 3\cdot2^2, 2^3, 3^2, 3\cdot2, 2^2, 3, 2\right)^T$$ + +This transformer can be prepended to all `Transformer` and `Predictor` implementations which expect an input of type `LabeledVector` or any sub-type of `Vector`. + +## Operations + +`PolynomialFeatures` is a `Transformer`. +As such, it supports the `fit` and `transform` operation. + +### Fit + +PolynomialFeatures is not trained on data and, thus, supports all types of input data. + +### Transform + +PolynomialFeatures transforms all subtypes of `Vector` and `LabeledVector` into their respective types: + +* `transform[T <: Vector]: DataSet[T] => DataSet[T]` +* `transform: DataSet[LabeledVector] => DataSet[LabeledVector]` + +## Parameters + +The polynomial features transformer can be controlled by the following parameters: + + + + + + + + + + + + + + + +
ParametersDescription
Degree +

+ The maximum polynomial degree. + (Default value: 10) +

+
+ +## Examples + +{% highlight scala %} +// Obtain the training data set +val trainingDS: DataSet[LabeledVector] = ... + +// Setup polynomial feature transformer of degree 3 +val polyFeatures = PolynomialFeatures() +.setDegree(3) + +// Setup the multiple linear regression learner +val mlr = MultipleLinearRegression() + +// Control the learner via the parameter map +val parameters = ParameterMap() +.add(MultipleLinearRegression.Iterations, 20) +.add(MultipleLinearRegression.Stepsize, 0.5) + +// Create pipeline PolynomialFeatures -> MultipleLinearRegression +val pipeline = polyFeatures.chainPredictor(mlr) + +// train the model +pipeline.fit(trainingDS) +{% endhighlight %} http://git-wip-us.apache.org/repos/asf/flink/blob/35ec26cd/docs/apis/batch/libs/ml/quickstart.md ---------------------------------------------------------------------- diff --git a/docs/apis/batch/libs/ml/quickstart.md b/docs/apis/batch/libs/ml/quickstart.md new file mode 100644 index 0000000..60f505e --- /dev/null +++ b/docs/apis/batch/libs/ml/quickstart.md @@ -0,0 +1,244 @@ +--- +mathjax: include +title: Quickstart Guide +# Sub navigation +sub-nav-group: batch +sub-nav-parent: flinkml +sub-nav-title: Quickstart Guide +--- + + +* This will be replaced by the TOC +{:toc} + +## Introduction + +FlinkML is designed to make learning from your data a straight-forward process, abstracting away +the complexities that usually come with big data learning tasks. In this +quick-start guide we will show just how easy it is to solve a simple supervised learning problem +using FlinkML. But first some basics, feel free to skip the next few lines if you're already +familiar with Machine Learning (ML). + +As defined by Murphy [[1]](#murphy) ML deals with detecting patterns in data, and using those +learned patterns to make predictions about the future. We can categorize most ML algorithms into +two major categories: Supervised and Unsupervised Learning. + +* **Supervised Learning** deals with learning a function (mapping) from a set of inputs +(features) to a set of outputs. The learning is done using a *training set* of (input, +output) pairs that we use to approximate the mapping function. Supervised learning problems are +further divided into classification and regression problems. In classification problems we try to +predict the *class* that an example belongs to, for example whether a user is going to click on +an ad or not. Regression problems one the other hand, are about predicting (real) numerical +values, often called the dependent variable, for example what the temperature will be tomorrow. + +* **Unsupervised Learning** deals with discovering patterns and regularities in the data. An example +of this would be *clustering*, where we try to discover groupings of the data from the +descriptive features. Unsupervised learning can also be used for feature selection, for example +through [principal components analysis](https://en.wikipedia.org/wiki/Principal_component_analysis). + +## Linking with FlinkML + +In order to use FlinkML in your project, first you have to +[set up a Flink program](http://ci.apache.org/projects/flink/flink-docs-master/apis/programming_guide.html#linking-with-flink). +Next, you have to add the FlinkML dependency to the `pom.xml` of your project: + +{% highlight xml %} + + org.apache.flink + flink-ml{{ site.scala_version_suffix }} + {{site.version }} + +{% endhighlight %} + +## Loading data + +To load data to be used with FlinkML we can use the ETL capabilities of Flink, or specialized +functions for formatted data, such as the LibSVM format. For supervised learning problems it is +common to use the `LabeledVector` class to represent the `(label, features)` examples. A `LabeledVector` +object will have a FlinkML `Vector` member representing the features of the example and a `Double` +member which represents the label, which could be the class in a classification problem, or the dependent +variable for a regression problem. + +As an example, we can use Haberman's Survival Data Set , which you can +[download from the UCI ML repository](http://archive.ics.uci.edu/ml/machine-learning-databases/haberman/haberman.data). +This dataset *"contains cases from a study conducted on the survival of patients who had undergone +surgery for breast cancer"*. The data comes in a comma-separated file, where the first 3 columns +are the features and last column is the class, and the 4th column indicates whether the patient +survived 5 years or longer (label 1), or died within 5 years (label 2). You can check the [UCI +page](https://archive.ics.uci.edu/ml/datasets/Haberman%27s+Survival) for more information on the data. + +We can load the data as a `DataSet[String]` first: + +{% highlight scala %} + +import org.apache.flink.api.scala.ExecutionEnvironment + +val env = ExecutionEnvironment.getExecutionEnvironment + +val survival = env.readCsvFile[(String, String, String, String)]("/path/to/haberman.data") + +{% endhighlight %} + +We can now transform the data into a `DataSet[LabeledVector]`. This will allow us to use the +dataset with the FlinkML classification algorithms. We know that the 4th element of the dataset +is the class label, and the rest are features, so we can build `LabeledVector` elements like this: + +{% highlight scala %} + +import org.apache.flink.ml.common.LabeledVector +import org.apache.flink.ml.math.DenseVector + +val survivalLV = survival + .map{tuple => + val list = tuple.productIterator.toList + val numList = list.map(_.asInstanceOf[String].toDouble) + LabeledVector(numList(3), DenseVector(numList.take(3).toArray)) + } + +{% endhighlight %} + +We can then use this data to train a learner. We will however use another dataset to exemplify +building a learner; that will allow us to show how we can import other dataset formats. + +**LibSVM files** + +A common format for ML datasets is the LibSVM format and a number of datasets using that format can be +found [in the LibSVM datasets website](http://www.csie.ntu.edu.tw/~cjlin/libsvmtools/datasets/). FlinkML provides utilities for loading +datasets using the LibSVM format through the `readLibSVM` function available through the `MLUtils` +object. +You can also save datasets in the LibSVM format using the `writeLibSVM` function. +Let's import the svmguide1 dataset. You can download the +[training set here](http://www.csie.ntu.edu.tw/~cjlin/libsvmtools/datasets/binary/svmguide1) +and the [test set here](http://www.csie.ntu.edu.tw/~cjlin/libsvmtools/datasets/binary/svmguide1.t). +This is an astroparticle binary classification dataset, used by Hsu et al. [[3]](#hsu) in their +practical Support Vector Machine (SVM) guide. It contains 4 numerical features, and the class label. + +We can simply import the dataset then using: + +{% highlight scala %} + +import org.apache.flink.ml.MLUtils + +val astroTrain: DataSet[LabeledVector] = MLUtils.readLibSVM("/path/to/svmguide1") +val astroTest: DataSet[LabeledVector] = MLUtils.readLibSVM("/path/to/svmguide1.t") + +{% endhighlight %} + +This gives us two `DataSet[LabeledVector]` objects that we will use in the following section to +create a classifier. + +## Classification + +Once we have imported the dataset we can train a `Predictor` such as a linear SVM classifier. +We can set a number of parameters for the classifier. Here we set the `Blocks` parameter, +which is used to split the input by the underlying CoCoA algorithm [[2]](#jaggi) uses. The +regularization parameter determines the amount of $l_2$ regularization applied, which is used +to avoid overfitting. The step size determines the contribution of the weight vector updates to +the next weight vector value. This parameter sets the initial step size. + +{% highlight scala %} + +import org.apache.flink.ml.classification.SVM + +val svm = SVM() + .setBlocks(env.getParallelism) + .setIterations(100) + .setRegularization(0.001) + .setStepsize(0.1) + .setSeed(42) + +svm.fit(astroTrain) + +{% endhighlight %} + +We can now make predictions on the test set. + +{% highlight scala %} + +val predictionPairs = svm.predict(astroTest) + +{% endhighlight %} + +Next we will see how we can pre-process our data, and use the ML pipelines capabilities of FlinkML. + +## Data pre-processing and pipelines + +A pre-processing step that is often encouraged [[3]](#hsu) when using SVM classification is scaling +the input features to the [0, 1] range, in order to avoid features with extreme values +dominating the rest. +FlinkML has a number of `Transformers` such as `MinMaxScaler` that are used to pre-process data, +and a key feature is the ability to chain `Transformers` and `Predictors` together. This allows +us to run the same pipeline of transformations and make predictions on the train and test data in +a straight-forward and type-safe manner. You can read more on the pipeline system of FlinkML +[in the pipelines documentation](pipelines.html). + +Let us first create a normalizing transformer for the features in our dataset, and chain it to a +new SVM classifier. + +{% highlight scala %} + +import org.apache.flink.ml.preprocessing.MinMaxScaler + +val scaler = MinMaxScaler() + +val scaledSVM = scaler.chainPredictor(svm) + +{% endhighlight %} + +We can now use our newly created pipeline to make predictions on the test set. +First we call fit again, to train the scaler and the SVM classifier. +The data of the test set will then be automatically scaled before being passed on to the SVM to +make predictions. + +{% highlight scala %} + +scaledSVM.fit(astroTrain) + +val predictionPairsScaled: DataSet[(Double, Double)] = scaledSVM.predict(astroTest) + +{% endhighlight %} + +The scaled inputs should give us better prediction performance. +The result of the prediction on `LabeledVector`s is a data set of tuples where the first entry denotes the true label value and the second entry is the predicted label value. + +## Where to go from here + +This quickstart guide can act as an introduction to the basic concepts of FlinkML, but there's a lot +more you can do. +We recommend going through the [FlinkML documentation](index.html), and trying out the different +algorithms. +A very good way to get started is to play around with interesting datasets from the UCI ML +repository and the LibSVM datasets. +Tackling an interesting problem from a website like [Kaggle](https://www.kaggle.com) or +[DrivenData](http://www.drivendata.org/) is also a great way to learn by competing with other +data scientists. +If you would like to contribute some new algorithms take a look at our +[contribution guide](contribution_guide.html). + +**References** + +[1] Murphy, Kevin P. *Machine learning: a probabilistic perspective.* MIT +press, 2012. + +[2] Jaggi, Martin, et al. *Communication-efficient distributed dual +coordinate ascent.* Advances in Neural Information Processing Systems. 2014. + +[3] Hsu, Chih-Wei, Chih-Chung Chang, and Chih-Jen Lin. + *A practical guide to support vector classification.* 2003. http://git-wip-us.apache.org/repos/asf/flink/blob/35ec26cd/docs/apis/batch/libs/ml/standard_scaler.md ---------------------------------------------------------------------- diff --git a/docs/apis/batch/libs/ml/standard_scaler.md b/docs/apis/batch/libs/ml/standard_scaler.md new file mode 100644 index 0000000..3a9cd4b --- /dev/null +++ b/docs/apis/batch/libs/ml/standard_scaler.md @@ -0,0 +1,116 @@ +--- +mathjax: include +title: Standard Scaler +# Sub navigation +sub-nav-group: batch +sub-nav-parent: flinkml +sub-nav-title: Standard Scaler +--- + + +* This will be replaced by the TOC +{:toc} + +## Description + + The standard scaler scales the given data set, so that all features will have a user specified mean and variance. + In case the user does not provide a specific mean and standard deviation, the standard scaler transforms the features of the input data set to have mean equal to 0 and standard deviation equal to 1. + Given a set of input data $x_1, x_2,... x_n$, with mean: + + $$\bar{x} = \frac{1}{n}\sum_{i=1}^{n}x_{i}$$ + + and standard deviation: + + $$\sigma_{x}=\sqrt{ \frac{1}{n} \sum_{i=1}^{n}(x_{i}-\bar{x})^{2}}$$ + +The scaled data set $z_1, z_2,...,z_n$ will be: + + $$z_{i}= std \left (\frac{x_{i} - \bar{x} }{\sigma_{x}}\right ) + mean$$ + +where $\textit{std}$ and $\textit{mean}$ are the user specified values for the standard deviation and mean. + +## Operations + +`StandardScaler` is a `Transformer`. +As such, it supports the `fit` and `transform` operation. + +### Fit + +StandardScaler is trained on all subtypes of `Vector` or `LabeledVector`: + +* `fit[T <: Vector]: DataSet[T] => Unit` +* `fit: DataSet[LabeledVector] => Unit` + +### Transform + +StandardScaler transforms all subtypes of `Vector` or `LabeledVector` into the respective type: + +* `transform[T <: Vector]: DataSet[T] => DataSet[T]` +* `transform: DataSet[LabeledVector] => DataSet[LabeledVector]` + +## Parameters + +The standard scaler implementation can be controlled by the following two parameters: + + + + + + + + + + + + + + + + + + + +
ParametersDescription
Mean +

+ The mean of the scaled data set. (Default value: 0.0) +

+
Std +

+ The standard deviation of the scaled data set. (Default value: 1.0) +

+
+ +## Examples + +{% highlight scala %} +// Create standard scaler transformer +val scaler = StandardScaler() +.setMean(10.0) +.setStd(2.0) + +// Obtain data set to be scaled +val dataSet: DataSet[Vector] = ... + +// Learn the mean and standard deviation of the training data +scaler.fit(dataSet) + +// Scale the provided data set to have mean=10.0 and std=2.0 +val scaledDS = scaler.transform(dataSet) +{% endhighlight %}