Return-Path: X-Original-To: apmail-flink-commits-archive@minotaur.apache.org Delivered-To: apmail-flink-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id C0514107B0 for ; Tue, 17 Mar 2015 22:45:21 +0000 (UTC) Received: (qmail 45521 invoked by uid 500); 17 Mar 2015 22:45:21 -0000 Delivered-To: apmail-flink-commits-archive@flink.apache.org Received: (qmail 45438 invoked by uid 500); 17 Mar 2015 22:45:21 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 45251 invoked by uid 99); 17 Mar 2015 22:45:21 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 17 Mar 2015 22:45:21 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id F326DE1817; Tue, 17 Mar 2015 22:45:20 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: trohrmann@apache.org To: commits@flink.apache.org Date: Tue, 17 Mar 2015 22:45:26 -0000 Message-Id: <03c583fe388f47ef9f90dd64f7adda9e@git.apache.org> In-Reply-To: <17d7d54256b64f4595cba809a818b096@git.apache.org> References: <17d7d54256b64f4595cba809a818b096@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [7/8] flink git commit: [FLINK-1697] [ml] Adds web documentation for alternating least squares. Adds web documentation for polynomial base feature mapper. [FLINK-1697] [ml] Adds web documentation for alternating least squares. Adds web documentation for polynomial base feature mapper. [ml] Adds comments [ml] Set degree of parallelism of test suites to 2 [ml] Replaces FlatSpec tests with JUnit integration test cases in order to suppress the sysout output. [ml] Adds missing clients-test jar [docs] Sets jekyll's baseurl to http://ci.apache.org/projects/flink/flink-docs-master [ml] Replaces JBlas by java netlib to avoid license issues of included fortran libraries [ml] Adds com.github.fommil.netlib:core to license file [ml] Adds Scala docs to FlinkTools [ml] Adds comments to LabeledVector and the math package object This closes #479. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/21e2d96f Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/21e2d96f Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/21e2d96f Branch: refs/heads/master Commit: 21e2d96f893e4460a8d85c501e31dc09ed2f0043 Parents: ff83c8c Author: Till Rohrmann Authored: Tue Mar 10 15:41:40 2015 +0100 Committer: Till Rohrmann Committed: Tue Mar 17 23:28:34 2015 +0100 ---------------------------------------------------------------------- LICENSE | 1 - docs/_config.yml | 4 +- docs/build_docs.sh | 2 +- docs/ml/alternating_least_squares.md | 131 +++++++++++++++- docs/ml/multiple_linear_regression.md | 57 ++++--- docs/ml/polynomial_base_feature_extractor.md | 28 ---- docs/ml/polynomial_base_feature_mapper.md | 91 +++++++++++ flink-clients/pom.xml | 11 ++ flink-dist/src/main/flink-bin/LICENSE | 1 + .../apache/flink/runtime/client/JobClient.scala | 2 +- flink-staging/flink-ml/pom.xml | 14 +- .../apache/flink/ml/common/ChainedLearner.scala | 13 ++ .../flink/ml/common/ChainedTransformer.scala | 12 ++ .../org/apache/flink/ml/common/FlinkTools.scala | 88 +++++++++-- .../apache/flink/ml/common/LabeledVector.scala | 6 + .../org/apache/flink/ml/common/Learner.scala | 14 +- .../apache/flink/ml/common/Transformer.scala | 24 ++- .../flink/ml/feature/PolynomialBase.scala | 4 +- .../org/apache/flink/ml/math/DenseVector.scala | 9 ++ .../scala/org/apache/flink/ml/math/JBlas.scala | 70 --------- .../scala/org/apache/flink/ml/math/Vector.scala | 7 + .../org/apache/flink/ml/math/package.scala | 9 +- .../apache/flink/ml/recommendation/ALS.scala | 98 ++++++------ .../regression/MultipleLinearRegression.scala | 79 ++++++---- .../flink/ml/feature/PolynomialBaseITCase.scala | 132 ++++++++++++++++ .../flink/ml/feature/PolynomialBaseSuite.scala | 118 -------------- .../flink/ml/recommendation/ALSITCase.scala | 152 +++++++++++++++++++ .../flink/ml/recommendation/ALSSuite.scala | 141 ----------------- .../MultipleLinearRegressionITCase.scala | 115 ++++++++++++++ .../MultipleLinearRegressionSuite.scala | 100 ------------ .../flink/ml/regression/RegressionData.scala | 3 +- 31 files changed, 956 insertions(+), 580 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/21e2d96f/LICENSE ---------------------------------------------------------------------- diff --git a/LICENSE b/LICENSE index e58d1a5..85d0d85 100644 --- a/LICENSE +++ b/LICENSE @@ -228,7 +228,6 @@ The Apache Flink project bundles the following files under the MIT License: - normalize.css v3.0.0 (http://git.io/normalize) - Copyright (c) Nicolas Gallagher and Jonathan Neal - Font Awesome - Code (http://fortawesome.github.io/Font-Awesome/) - Copyright (c) 2014 Dave Gandy - D3 dagre renderer (https://github.com/cpettitt/dagre-d3) - Copyright (c) 2012-2013 Chris Pettitt - - scopt (http://github.com/scopt/scopt) All rights reserved. http://git-wip-us.apache.org/repos/asf/flink/blob/21e2d96f/docs/_config.yml ---------------------------------------------------------------------- diff --git a/docs/_config.yml b/docs/_config.yml index 612aa6f..d7cf349 100644 --- a/docs/_config.yml +++ b/docs/_config.yml @@ -30,7 +30,7 @@ FLINK_SCALA_VERSION_SHORT: "2.10" FLINK_ISSUES_URL: https://issues.apache.org/jira/browse/FLINK FLINK_GITHUB_URL: https://github.com/apache/flink -FLINK_WEBSITE_URL: http://flink.apache.org/ +FLINK_WEBSITE_URL: http://flink.apache.org FLINK_DOWNLOAD_URL: http://flink.apache.org/downloads.html FLINK_DOWNLOAD_URL_HADOOP1_STABLE: http://www.apache.org/dyn/closer.cgi/flink/flink-0.8.1/flink-0.8.1-bin-hadoop1.tgz @@ -59,3 +59,5 @@ kramdown: toc_levels: 1..3 host: localhost + +baseurl: http://ci.apache.org/projects/flink/flink-docs-master http://git-wip-us.apache.org/repos/asf/flink/blob/21e2d96f/docs/build_docs.sh ---------------------------------------------------------------------- diff --git a/docs/build_docs.sh b/docs/build_docs.sh index 4f8a7c9..b65f7c9 100755 --- a/docs/build_docs.sh +++ b/docs/build_docs.sh @@ -54,7 +54,7 @@ JEKYLL_CMD="build" while getopts ":p" opt; do case $opt in p) - JEKYLL_CMD="serve --watch" + JEKYLL_CMD="serve --baseurl "" --watch" ;; esac done http://git-wip-us.apache.org/repos/asf/flink/blob/21e2d96f/docs/ml/alternating_least_squares.md ---------------------------------------------------------------------- diff --git a/docs/ml/alternating_least_squares.md b/docs/ml/alternating_least_squares.md index bf97b1b..7a4a5d5 100644 --- a/docs/ml/alternating_least_squares.md +++ b/docs/ml/alternating_least_squares.md @@ -1,4 +1,5 @@ --- +mathjax: include title: Alternating Least Squares --- - -* This will be replaced by the TOC -{:toc} - -## Description - -## Parameters \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/21e2d96f/docs/ml/polynomial_base_feature_mapper.md ---------------------------------------------------------------------- diff --git a/docs/ml/polynomial_base_feature_mapper.md b/docs/ml/polynomial_base_feature_mapper.md new file mode 100644 index 0000000..2964f04 --- /dev/null +++ b/docs/ml/polynomial_base_feature_mapper.md @@ -0,0 +1,91 @@ +--- +mathjax: include +title: Polynomial Base Feature Mapper +--- + + +* This will be replaced by the TOC +{:toc} + +## Description + +The polynomial base feature mapper maps a vector into the polynomial feature space of degree $d$. +The dimension of the input vector determines the number of polynomial factors whose values are the respective vector entries. +Given a vector $(x, y, z, \ldots)^T$ the resulting feature vector looks like: + +$$\left(x, y, z, x^2, xy, y^2, yz, z^2, x^3, x^2y, x^2z, xy^2, xyz, xz^2, y^3, \ldots\right)^T$$ + +Flink's implementation orders the polynomials in decreasing order of their degree. + +Given the vector $\left(3,2\right)^T$, the polynomial base feature vector of degree 3 would look like + + $$\left(3^3, 3^2\cdot2, 3\cdot2^2, 2^3, 3^2, 3\cdot2, 2^2, 3, 2\right)^T$$ + +This transformer can be prepended to all `Transformer` and `Learner` implementations which expect an input of type `LabeledVector`. + +## Parameters + +The polynomial base feature mapper can be controlled by the following parameters: + + + + + + + + + + + + + + + +
ParametersDescription
Degree +

+ The maximum polynomial degree. + (Default value: 10) +

+
+ +## Examples + +{% highlight scala %} +// Obtain the training data set +val trainingDS: DataSet[LabeledVector] = ... + +// Setup polynomial base feature extractor of degree 3 +val polyBase = PolynomialBase() +.setDegree(3) + +// Setup the multiple linear regression learner +val mlr = MultipleLinearRegression() + +// Control the learner via the parameter map +val parameters = ParameterMap() +.add(MultipleLinearRegression.Iterations, 20) +.add(MultipleLinearRegression.Stepsize, 0.5) + +// Create pipeline PolynomialBase -> MultipleLinearRegression +val chained = polyBase.chain(mlr) + +// Learn the model +val model = chained.fit(trainingDS) +{% endhighlight %} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/21e2d96f/flink-clients/pom.xml ---------------------------------------------------------------------- diff --git a/flink-clients/pom.xml b/flink-clients/pom.xml index d7dccad..95d17d7 100644 --- a/flink-clients/pom.xml +++ b/flink-clients/pom.xml @@ -116,6 +116,17 @@ under the License. + org.apache.maven.plugins + maven-jar-plugin + + + + test-jar + + + + + maven-assembly-plugin 2.4 http://git-wip-us.apache.org/repos/asf/flink/blob/21e2d96f/flink-dist/src/main/flink-bin/LICENSE ---------------------------------------------------------------------- diff --git a/flink-dist/src/main/flink-bin/LICENSE b/flink-dist/src/main/flink-bin/LICENSE index 89d8eca..d0b7fb4 100644 --- a/flink-dist/src/main/flink-bin/LICENSE +++ b/flink-dist/src/main/flink-bin/LICENSE @@ -302,6 +302,7 @@ The Apache Flink project bundles the following components under BSD-style licenses: [3-clause BSD license] + - core (com.github.fommil.netlib:core:1.1.2 - https://github.com/fommil/netlib-java/core) - Kryo (https://github.com/EsotericSoftware/kryo) - Copyright (c) 2008, Nathan Sweet - D3 (http://d3js.org/) - Copyright (c) 2010-2014, Michael Bostock - LevelDB JNI (https://github.com/fusesource/leveldbjni/) - Copyright (c) 2011, FuseSource Corp. http://git-wip-us.apache.org/repos/asf/flink/blob/21e2d96f/flink-runtime/src/main/scala/org/apache/flink/runtime/client/JobClient.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/client/JobClient.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/client/JobClient.scala index 19b3050..f1c6450 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/client/JobClient.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/client/JobClient.scala @@ -28,7 +28,7 @@ import org.apache.flink.api.common.JobExecutionResult import org.apache.flink.configuration.{ConfigConstants, Configuration} import org.apache.flink.runtime.ActorLogMessages import org.apache.flink.runtime.akka.AkkaUtils -import org.apache.flink.runtime.jobgraph.{JobID, JobGraph} +import org.apache.flink.runtime.jobgraph.JobGraph import org.apache.flink.runtime.jobmanager.JobManager import org.apache.flink.runtime.messages.JobClientMessages.{SubmitJobDetached, SubmitJobAndWait} import org.apache.flink.runtime.messages.JobManagerMessages._ http://git-wip-us.apache.org/repos/asf/flink/blob/21e2d96f/flink-staging/flink-ml/pom.xml ---------------------------------------------------------------------- diff --git a/flink-staging/flink-ml/pom.xml b/flink-staging/flink-ml/pom.xml index 53188e1..24ba591 100644 --- a/flink-staging/flink-ml/pom.xml +++ b/flink-staging/flink-ml/pom.xml @@ -46,9 +46,9 @@ - org.jblas - jblas - 1.2.3 + com.github.fommil.netlib + core + 1.1.2 @@ -57,6 +57,14 @@ ${project.version} test + + + org.apache.flink + flink-clients + ${project.version} + test-jar + test + http://git-wip-us.apache.org/repos/asf/flink/blob/21e2d96f/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/common/ChainedLearner.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/common/ChainedLearner.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/common/ChainedLearner.scala index cd0f403..b1a0a2f 100644 --- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/common/ChainedLearner.scala +++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/common/ChainedLearner.scala @@ -20,6 +20,19 @@ package org.apache.flink.ml.common import org.apache.flink.api.scala.DataSet +/** This class represents a [[org.apache.flink.ml.common.Learner]] which is chained to a + * [[Transformer]]. + * + * Calling the method `fit` on this object will pipe the input data through the given + * [[Transformer]], whose output is fed to the [[Learner]]. + * + * @param head Preceding [[Transformer]] pipeline + * @param tail [[Learner]] instance + * @tparam IN Type of the training data + * @tparam TEMP Type of the produced data by the transformer pipeline and input type to the + * [[Learner]] + * @tparam OUT Type of the trained model + */ class ChainedLearner[IN, TEMP, OUT](val head: Transformer[IN, TEMP], val tail: Learner[TEMP, OUT]) extends Learner[IN, OUT] { http://git-wip-us.apache.org/repos/asf/flink/blob/21e2d96f/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/common/ChainedTransformer.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/common/ChainedTransformer.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/common/ChainedTransformer.scala index 9a262cb..3f108bf 100644 --- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/common/ChainedTransformer.scala +++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/common/ChainedTransformer.scala @@ -20,6 +20,18 @@ package org.apache.flink.ml.common import org.apache.flink.api.scala.DataSet +/** This class represents a chain of multiple [[Transformer]]. + * + * Calling the method `transform` on this object will first apply the preceding [[Transformer]] to + * the input data. The resulting output data is then fed to the succeeding [[Transformer]]. + * + * @param head Preceding [[Transformer]] + * @param tail Succeeding [[Transformer]] + * @tparam IN Type of incoming elements + * @tparam TEMP Type of output elements of the preceding [[Transformer]] and input type of + * succeeding [[Transformer]] + * @tparam OUT Type of outgoing elements + */ class ChainedTransformer[IN, TEMP, OUT](val head: Transformer[IN, TEMP], val tail: Transformer[TEMP, OUT]) extends Transformer[IN, OUT] { http://git-wip-us.apache.org/repos/asf/flink/blob/21e2d96f/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/common/FlinkTools.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/common/FlinkTools.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/common/FlinkTools.scala index 2b12f30..22bbe82 100644 --- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/common/FlinkTools.scala +++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/common/FlinkTools.scala @@ -27,18 +27,24 @@ import org.apache.flink.core.fs.Path import scala.reflect.ClassTag -/** - * Collection of convenience functions - */ +/** FlinkTools contains a set of convenience functions for Flink's machine learning library: + * + * - persist: + * Takes up to 5 [[DataSet]]s and file paths. Each [[DataSet]] is written to the specified + * path and subsequently re-read from disk. This method can be used to effectively split the + * execution graph at the given [[DataSet]]. Writing it to disk triggers its materialization + * and specifying it as a source will prevent the re-execution of it. + */ object FlinkTools { - /** - * - * @param dataset - * @param path - * @tparam T - * @return - */ + /** Writes a [[DataSet]] to the specified path and returns it as a DataSource for subsequent + * operations. + * + * @param dataset [[DataSet]] to write to disk + * @param path File path to write dataset to + * @tparam T Type of the [[DataSet]] elements + * @return [[DataSet]] reading the just written file + */ def persist[T: ClassTag: TypeInformation](dataset: DataSet[T], path: String): DataSet[T] = { val env = dataset.getExecutionEnvironment val outputFormat = new TypeSerializerOutputFormat[T] @@ -57,6 +63,17 @@ object FlinkTools { env.createInput(inputFormat) } + /** Writes multiple [[DataSet]]s to the specified paths and returns them as DataSources for + * subsequent operations. + * + * @param ds1 First [[DataSet]] to write to disk + * @param ds2 Second [[DataSet]] to write to disk + * @param path1 Path for ds1 + * @param path2 Path for ds2 + * @tparam A Type of the first [[DataSet]]'s elements + * @tparam B Type of the second [[DataSet]]'s elements + * @return Tuple of [[DataSet]]s reading the just written files + */ def persist[A: ClassTag: TypeInformation ,B: ClassTag: TypeInformation](ds1: DataSet[A], ds2: DataSet[B], path1: String, path2: String):(DataSet[A], DataSet[B]) = { val env = ds1.getExecutionEnvironment @@ -88,6 +105,20 @@ object FlinkTools { (env.createInput(if1), env.createInput(if2)) } + /** Writes multiple [[DataSet]]s to the specified paths and returns them as DataSources for + * subsequent operations. + * + * @param ds1 First [[DataSet]] to write to disk + * @param ds2 Second [[DataSet]] to write to disk + * @param ds3 Third [[DataSet]] to write to disk + * @param path1 Path for ds1 + * @param path2 Path for ds2 + * @param path3 Path for ds3 + * @tparam A Type of first [[DataSet]]'s elements + * @tparam B Type of second [[DataSet]]'s elements + * @tparam C Type of third [[DataSet]]'s elements + * @return Tuple of [[DataSet]]s reading the just written files + */ def persist[A: ClassTag: TypeInformation ,B: ClassTag: TypeInformation, C: ClassTag: TypeInformation](ds1: DataSet[A], ds2: DataSet[B], ds3: DataSet[C], path1: String, path2: String, path3: String): (DataSet[A], DataSet[B], DataSet[C]) = { @@ -131,6 +162,23 @@ object FlinkTools { (env.createInput(if1), env.createInput(if2), env.createInput(if3)) } + /** Writes multiple [[DataSet]]s to the specified paths and returns them as DataSources for + * subsequent operations. + * + * @param ds1 First [[DataSet]] to write to disk + * @param ds2 Second [[DataSet]] to write to disk + * @param ds3 Third [[DataSet]] to write to disk + * @param ds4 Fourth [[DataSet]] to write to disk + * @param path1 Path for ds1 + * @param path2 Path for ds2 + * @param path3 Path for ds3 + * @param path4 Path for ds4 + * @tparam A Type of first [[DataSet]]'s elements + * @tparam B Type of second [[DataSet]]'s elements + * @tparam C Type of third [[DataSet]]'s elements + * @tparam D Type of fourth [[DataSet]]'s elements + * @return Tuple of [[DataSet]]s reading the just written files + */ def persist[A: ClassTag: TypeInformation ,B: ClassTag: TypeInformation, C: ClassTag: TypeInformation, D: ClassTag: TypeInformation](ds1: DataSet[A], ds2: DataSet[B], ds3: DataSet[C], ds4: DataSet[D], @@ -188,6 +236,26 @@ object FlinkTools { (env.createInput(if1), env.createInput(if2), env.createInput(if3), env.createInput(if4)) } + /** Writes multiple [[DataSet]]s to the specified paths and returns them as DataSources for + * subsequent operations. + * + * @param ds1 First [[DataSet]] to write to disk + * @param ds2 Second [[DataSet]] to write to disk + * @param ds3 Third [[DataSet]] to write to disk + * @param ds4 Fourth [[DataSet]] to write to disk + * @param ds5 Fifth [[DataSet]] to write to disk + * @param path1 Path for ds1 + * @param path2 Path for ds2 + * @param path3 Path for ds3 + * @param path4 Path for ds4 + * @param path5 Path for ds5 + * @tparam A Type of first [[DataSet]]'s elements + * @tparam B Type of second [[DataSet]]'s elements + * @tparam C Type of third [[DataSet]]'s elements + * @tparam D Type of fourth [[DataSet]]'s elements + * @tparam E Type of fifth [[DataSet]]'s elements + * @return Tuple of [[DataSet]]s reading the just written files + */ def persist[A: ClassTag: TypeInformation ,B: ClassTag: TypeInformation, C: ClassTag: TypeInformation, D: ClassTag: TypeInformation, E: ClassTag: TypeInformation] (ds1: DataSet[A], ds2: DataSet[B], ds3: DataSet[C], ds4: DataSet[D], ds5: DataSet[E], path1: http://git-wip-us.apache.org/repos/asf/flink/blob/21e2d96f/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/common/LabeledVector.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/common/LabeledVector.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/common/LabeledVector.scala index 3c4a257..f3d6172 100644 --- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/common/LabeledVector.scala +++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/common/LabeledVector.scala @@ -20,4 +20,10 @@ package org.apache.flink.ml.common import org.apache.flink.ml.math.Vector +/** This class represents a vector with an associated label as it is required for many supervised + * learning tasks. + * + * @param vector Data point + * @param label Label of the data point + */ case class LabeledVector(vector: Vector, label: Double) {} http://git-wip-us.apache.org/repos/asf/flink/blob/21e2d96f/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/common/Learner.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/common/Learner.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/common/Learner.scala index 0d56dc8..c8082c7 100644 --- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/common/Learner.scala +++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/common/Learner.scala @@ -19,8 +19,20 @@ package org.apache.flink.ml.common import org.apache.flink.api.scala.DataSet -import org.apache.flink.ml.common.WithParameters +/** Base trait for an algorithm which trains a model based on some training data + * + * The idea is that all algorithms which train a model implement this trait. That way + * they can be chained with [[Transformer]] which act as a preprocessing step for the actual + * learning. In that sense, [[Learner]] denote the end of a pipeline and cannot be further + * chained. + * + * Every learner has to implement the `fit` method which takes the training data and learns + * a model from the data. + * + * @tparam IN Type of the training data + * @tparam OUT Type of the trained model + */ trait Learner[IN, OUT] extends WithParameters { def fit(input: DataSet[IN], parameters: ParameterMap = ParameterMap.Empty): OUT } http://git-wip-us.apache.org/repos/asf/flink/blob/21e2d96f/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/common/Transformer.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/common/Transformer.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/common/Transformer.scala index 76abc62..02d63cf 100644 --- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/common/Transformer.scala +++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/common/Transformer.scala @@ -20,14 +20,24 @@ package org.apache.flink.ml.common import org.apache.flink.api.scala.DataSet -/** - * A transformer represents - * - * @tparam IN Type of incoming elements - * @tparam OUT Type of outgoing elements - */ +/** Base trait for an algorithm which transforms the input data to some output data. + * + * A [[Transformer]] is used to transform input data to some output data. Transformations might + * be feature extractors, feature mappings, whitening or centralization just to name a few. + * + * [[Transformer]] can be chained with other [[Transformer]] creating a [[ChainedTransformer]], + * which again can be chained. Chaining a [[Transformer]] with a [[Learner]] creates a + * [[ChainedLearner]] which terminates a pipeline. + * + * A [[Transformer]] implementation has to implement the method `transform`, which defines how + * the input data is transformed into the output data. + * + * @tparam IN Type of incoming elements + * @tparam OUT Type of outgoing elements + */ trait Transformer[IN, OUT] extends WithParameters { - def chain[CHAINED](transformer: Transformer[OUT, CHAINED]): ChainedTransformer[IN, OUT, CHAINED] = { + def chain[CHAINED](transformer: Transformer[OUT, CHAINED]): + ChainedTransformer[IN, OUT, CHAINED] = { new ChainedTransformer[IN, OUT, CHAINED](this, transformer) } http://git-wip-us.apache.org/repos/asf/flink/blob/21e2d96f/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/feature/PolynomialBase.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/feature/PolynomialBase.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/feature/PolynomialBase.scala index 632ded6..04f698e 100644 --- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/feature/PolynomialBase.scala +++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/feature/PolynomialBase.scala @@ -28,13 +28,13 @@ import org.apache.flink.api.scala._ /** Maps a vector into the polynomial feature space. * * This transformer takes a a vector of values `(x, y, z, ...)` and maps it into the - * polynomial feature space of degree `n`. That is to say, it calculates the following + * polynomial feature space of degree `d`. That is to say, it calculates the following * representation: * * `(x, y, z, x^2, xy, y^2, yz, z^2, x^3, x^2y, x^2z, xyz, ...)^T` * * This transformer can be prepended to all [[Transformer]] and - * [[org.apache.flink.ml.commonLearner]] implementations which expect an input of + * [[org.apache.flink.ml.common.Learner]] implementations which expect an input of * [[LabeledVector]]. * * @example http://git-wip-us.apache.org/repos/asf/flink/blob/21e2d96f/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/DenseVector.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/DenseVector.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/DenseVector.scala index 8e0eed0..d407a70 100644 --- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/DenseVector.scala +++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/DenseVector.scala @@ -60,6 +60,15 @@ case class DenseVector(val values: Array[Double]) extends Vector { case _ => false } } + + /** + * Copies the vector instance + * + * @return Copy of the vector instance + */ + override def copy: Vector = { + DenseVector(values.clone()) + } } object DenseVector { http://git-wip-us.apache.org/repos/asf/flink/blob/21e2d96f/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/JBlas.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/JBlas.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/JBlas.scala deleted file mode 100644 index 5d6eca4..0000000 --- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/JBlas.scala +++ /dev/null @@ -1,70 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.ml.math - -import org.jblas.DoubleMatrix - -/** - * Convenience functions for the interaction with JBlas. If you want to use JBlas and allow an - * easy transition from Flink's matrix abstraction to JBlas's and vice versa, simply import - * all elements contained in the JBlas object. - */ -object JBlas { - - /** - * Implicit conversion from Flink's [[DenseMatrix]] to JBlas's [[DoubleMatrix]] - * - * @param matrix DenseMatrix to be converted - * @return DoubleMatrix resulting from the given matrix - */ - implicit def denseMatrix2JBlas(matrix: DenseMatrix): DoubleMatrix = { - new DoubleMatrix(matrix.numRows, matrix.numCols, matrix.values: _*) - } - - /** - * Implicit class to extends [[DoubleMatrix]] such that Flink's [[DenseMatrix]] and - * [[DenseVector]] can easily be retrieved from. - * @param matrix - */ - implicit class RichDoubleMatrix(matrix: DoubleMatrix) { - def fromJBlas: DenseMatrix = DenseMatrix(matrix.rows, matrix.columns, matrix.data) - - def fromJBlas2Vector: DenseVector = { - require(matrix.columns == 1, "The JBlas matrix contains more than 1 column.") - - DenseVector(matrix.data) - } - } - - /** - * Implicit conversion from Flink's [[Vector]] to JBlas's [[DoubleMatrix]] - * - * @param vector Vector to be converted - * @return DoubleMatrix resulting from the given vector - */ - implicit def vector2JBlas(vector: Vector): DoubleMatrix = { - vector match { - case x: DenseVector => denseVector2JBlas(x) - } - } - - private def denseVector2JBlas(vector: DenseVector): DoubleMatrix = { - new DoubleMatrix(vector.size, 1, vector.values: _*) - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/21e2d96f/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/Vector.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/Vector.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/Vector.scala index ddda003..20d820c 100644 --- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/Vector.scala +++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/Vector.scala @@ -36,4 +36,11 @@ trait Vector { * @return element with index */ def apply(index: Int): Double + + /** + * Copies the vector instance + * + * @return Copy of the vector instance + */ + def copy: Vector } http://git-wip-us.apache.org/repos/asf/flink/blob/21e2d96f/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/package.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/package.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/package.scala index fce008a..e82e38f 100644 --- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/package.scala +++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/package.scala @@ -19,7 +19,8 @@ package org.apache.flink.ml /** - * Convenience to handle Flink's [[org.apache.flink.ml.math.Matrix]] and [[Vector]] abstraction. + * Convenience methods to handle Flink's [[org.apache.flink.ml.math.Matrix]] and [[Vector]] + * abstraction. */ package object math { implicit class RichMatrix(matrix: Matrix) extends Iterable[Double] { @@ -38,4 +39,10 @@ package object math { } } } + + implicit def vector2Array(vector: Vector): Array[Double] = { + vector match { + case dense: DenseVector => dense.values + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/21e2d96f/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/recommendation/ALS.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/recommendation/ALS.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/recommendation/ALS.scala index 1051ae5..5ff59d1 100644 --- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/recommendation/ALS.scala +++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/recommendation/ALS.scala @@ -13,11 +13,12 @@ * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.flink.ml.recommendation -import java.lang +import java.{util, lang} import org.apache.flink.api.scala._ import org.apache.flink.api.common.operators.Order @@ -28,7 +29,9 @@ import org.apache.flink.types.Value import org.apache.flink.util.Collector import org.apache.flink.api.common.functions.{Partitioner => FlinkPartitioner, GroupReduceFunction, CoGroupFunction} -import org.jblas.{Solve, SimpleBlas, DoubleMatrix} +import com.github.fommil.netlib.BLAS.{ getInstance => blas } +import com.github.fommil.netlib.LAPACK.{ getInstance => lapack } +import org.netlib.util.intW import scala.collection.mutable import scala.collection.mutable.ArrayBuffer @@ -43,21 +46,23 @@ import scala.util.Random * column of the item matrix is `v_i`. The matrix `R` is called the ratings matrix and * `(R)_{i,j} = r_{i,j}`. * - * In order to find the user and item matrix the following problem is solved: + * In order to find the user and item matrix, the following problem is solved: * * `argmin_{U,V} sum_(i,j\ with\ r_{i,j} != 0) (r_{i,j} - u_{i}^Tv_{j})^2 + * \lambda (sum_(i) n_{u_i} ||u_i||^2 + sum_(j) n_{v_j} ||v_j||^2)` * - * Overfitting is avoided by using a weighted-lambda-regularization scheme. + * with `\lambda` being the regularization factor, `n_{u_i}` being the number of items the user `i` + * has rated and `n_{v_j}` being the number of times the item `j` has been rated. This + * regularization scheme to avoid overfitting is called weighted-lambda-regularization. Details + * can be found in the work of [[http://dx.doi.org/10.1007/978-3-540-68880-8_32 Zhou et al.]]. * * By fixing one of the matrices `U` or `V` one obtains a quadratic form which can be solved. The * solution of the modified problem is guaranteed to decrease the overall cost function. By * applying this step alternately to the matrices `U` and `V`, we can iteratively improve the - * overall solution. Details can be found in the work of - * [[http://dx.doi.org/10.1007/978-3-540-68880-8_32 Zhou et al.]]. + * matrix factorization. * * The matrix `R` is given in its sparse representation as a tuple of `(i, j, r)` where `i` is the - * row index, `j` is the column index and `r` is the matrix a position `(i,j)`. + * row index, `j` is the column index and `r` is the matrix value at position `(i,j)`. * * @example * {{{ @@ -68,7 +73,7 @@ import scala.util.Random * .setIterations(10) * .setNumFactors(10) * - * val model = als.fit(inputDS)) + * val model = als.fit(inputDS) * * val data2Predict: DataSet[(Int, Int)] = env.readCsvFile[(Int, Int)](pathToData) * @@ -79,20 +84,23 @@ import scala.util.Random * * - [[ALS.NumFactors]]: * The number of latent factors. It is the dimension of the calculated user and item vectors. + * (Default value: '''10''') * * - [[ALS.Lambda]]: * Regularization factor. Tune this value in order to avoid overfitting/generalization. + * (Default value: '''1''') * - * - [[ALS.Iterations]]: The number of iterations to perform. + * - [[ALS.Iterations]]: The number of iterations to perform. (Default value: '''10''') * * - [[ALS.Blocks]]: * The number of blocks into which the user and item matrix a grouped. The fewer * blocks one uses, the less data is sent redundantly. However, bigger blocks entail bigger * update messages which have to be stored on the Heap. If the algorithm fails because of - * an OutOfMemoryException, then try to increase the number of blocks. + * an OutOfMemoryException, then try to increase the number of blocks. (Default value: '''None''') * * - [[ALS.Seed]]: - * Random seed used to generate the initial item matrix for the algorithm + * Random seed used to generate the initial item matrix for the algorithm. + * (Default value: '''0''') * * - [[ALS.TemporaryPath]]: * Path to a temporary directory into which intermediate results are stored. If @@ -103,7 +111,7 @@ import scala.util.Random * the individual steps are stored in the specified directory. By splitting the algorithm * into multiple smaller steps, Flink does not have to split the available memory amongst too many * operators. This allows the system to process bigger individual messasges and improves the - * overall performance. + * overall performance. (Default value: '''None''') * * The ALS implementation is based on Spark's MLLib implementation of ALS which you can find * [[https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/ @@ -318,10 +326,10 @@ class ALS extends Learner[(Int, Int, Double), ALSModel] with Serializable { // in order to save space, store only the upper triangle of the XtX matrix val triangleSize = (factors*factors - factors)/2 + factors - val matrix = DoubleMatrix.zeros(triangleSize) - val fullMatrix = DoubleMatrix.zeros(factors, factors) - val userXtX = new ArrayBuffer[DoubleMatrix]() - val userXy = new ArrayBuffer[DoubleMatrix]() + val matrix = Array.fill(triangleSize)(0.0) + val fullMatrix = Array.fill(factors * factors)(0.0) + val userXtX = new ArrayBuffer[Array[Double]]() + val userXy = new ArrayBuffer[Array[Double]]() val numRatings = new ArrayBuffer[Int]() override def coGroup(left: lang.Iterable[(Int, Int, Array[Array[Double]])], @@ -341,8 +349,8 @@ class ALS extends Learner[(Int, Int, Double), ALSModel] with Serializable { val oldLength = userXtX.length while(i < (numUsers - oldLength)) { - userXtX += DoubleMatrix.zeros(triangleSize) - userXy += DoubleMatrix.zeros(factors) + userXtX += Array.fill(triangleSize)(0.0) + userXy += Array.fill(factors)(0.0) numRatings.+=(0) i += 1 @@ -356,8 +364,9 @@ class ALS extends Learner[(Int, Int, Double), ALSModel] with Serializable { i = 0 while(i < matricesToClear){ numRatings(i) = 0 - userXtX(i).fill(0.0f) - userXy(i).fill(0.0f) + + util.Arrays.fill(userXtX(i), 0.0) + util.Arrays.fill(userXy(i), 0.0) i += 1 } @@ -372,7 +381,8 @@ class ALS extends Learner[(Int, Int, Double), ALSModel] with Serializable { var p = 0 while(p < blockFactors.length){ - val vector = new DoubleMatrix(blockFactors(p)) + val vector = blockFactors(p) + outerProduct(vector, matrix, factors) val (users, ratings) = inInfo.ratingsForBlock(itemBlock)(p) @@ -380,8 +390,8 @@ class ALS extends Learner[(Int, Int, Double), ALSModel] with Serializable { var i = 0 while (i < users.length) { numRatings(users(i)) += 1 - userXtX(users(i)).addi(matrix) - SimpleBlas.axpy(ratings(i), vector, userXy(users(i))) + blas.daxpy(matrix.length, 1, matrix, 1, userXtX(users(i)), 1) + blas.daxpy(vector.length, ratings(i), vector, 1, userXy(users(i)), 1) i += 1 } @@ -401,12 +411,14 @@ class ALS extends Learner[(Int, Int, Double), ALSModel] with Serializable { // add regularization constant while(f < factors){ - fullMatrix.data(f*factors + f) += lambda * numRatings(i) + fullMatrix(f*factors + f) += lambda * numRatings(i) f += 1 } // calculate new user vector - array(i) = Solve.solvePositive(fullMatrix, userXy(i)).data + val result = new intW(0) + lapack.dposv("U", factors, 1, fullMatrix, factors , userXy(i), factors, result) + array(i) = userXy(i) i += 1 } @@ -696,16 +708,13 @@ class ALS extends Learner[(Int, Int, Double), ALSModel] with Serializable { // ================================ Math helper functions ======================================== - def outerProduct(vector: DoubleMatrix, matrix: DoubleMatrix, factors: Int): Unit = { - val vd = vector.data - val md = matrix.data - + def outerProduct(vector: Array[Double], matrix: Array[Double], factors: Int): Unit = { var row = 0 var pos = 0 while(row < factors){ var col = 0 while(col <= row){ - md(pos) = vd(row) * vd(col) + matrix(pos) = vector(row) * vector(col) col += 1 pos += 1 } @@ -714,24 +723,22 @@ class ALS extends Learner[(Int, Int, Double), ALSModel] with Serializable { } } - def generateFullMatrix(triangularMatrix: DoubleMatrix, fullMatrix: DoubleMatrix, + def generateFullMatrix(triangularMatrix: Array[Double], fullMatrix: Array[Double], factors: Int): Unit = { var row = 0 var pos = 0 - val fmd = fullMatrix.data - val tmd = triangularMatrix.data while(row < factors){ var col = 0 while(col < row){ - fmd(row*factors + col) = tmd(pos) - fmd(col*factors + row) = tmd(pos) + fullMatrix(row*factors + col) = triangularMatrix(pos) + fullMatrix(col*factors + row) = triangularMatrix(pos) pos += 1 col += 1 } - fmd(row*factors + row) = tmd(pos) + fullMatrix(row*factors + row) = triangularMatrix(pos) pos += 1 row += 1 @@ -893,7 +900,8 @@ object ALS { * @param itemFactors Calcualted item matrix * @param lambda Regularization value used to calculate the model */ -class ALSModel(@transient val userFactors: DataSet[Factors],@transient val itemFactors: DataSet[Factors], +class ALSModel(@transient val userFactors: DataSet[Factors], + @transient val itemFactors: DataSet[Factors], val lambda: Double) extends Transformer[(Int, Int), (Int, Int, Double)] with Serializable{ @@ -905,10 +913,10 @@ Serializable{ triple => { val (((uID, iID), uFactors), iFactors) = triple - val uFactorsVector = new DoubleMatrix(uFactors.factors) - val iFactorsVector = new DoubleMatrix(iFactors.factors) + val uFactorsVector = uFactors.factors + val iFactorsVector = iFactors.factors - val prediction = SimpleBlas.dot(uFactorsVector, iFactorsVector) + val prediction = blas.ddot(uFactorsVector.length, uFactorsVector, 1, iFactorsVector, 1) (uID, iID, prediction) } @@ -925,13 +933,13 @@ Serializable{ triple => { val (((uID, iID), uFactors), iFactors) = triple - val uFactorsVector = new DoubleMatrix(uFactors.factors) - val iFactorsVector = new DoubleMatrix(iFactors.factors) + val uFactorsVector = uFactors.factors + val iFactorsVector = iFactors.factors - val squaredUNorm2 = uFactorsVector.dot(uFactorsVector) - val squaredINorm2 = iFactorsVector.dot(iFactorsVector) + val squaredUNorm2 = blas.ddot(uFactorsVector.length, uFactorsVector, 1, uFactorsVector, 1) + val squaredINorm2 = blas.ddot(iFactorsVector.length, iFactorsVector, 1, iFactorsVector, 1) - val prediction = SimpleBlas.dot(uFactorsVector, iFactorsVector) + val prediction = blas.ddot(uFactorsVector.length, uFactorsVector, 1, iFactorsVector, 1) (uID, iID, prediction, squaredUNorm2, squaredINorm2) } http://git-wip-us.apache.org/repos/asf/flink/blob/21e2d96f/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/regression/MultipleLinearRegression.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/regression/MultipleLinearRegression.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/regression/MultipleLinearRegression.scala index 523d132..8060d2b 100644 --- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/regression/MultipleLinearRegression.scala +++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/regression/MultipleLinearRegression.scala @@ -23,11 +23,10 @@ import org.apache.flink.api.scala.DataSet import org.apache.flink.configuration.Configuration import org.apache.flink.ml.math.Vector import org.apache.flink.ml.common._ -import org.apache.flink.ml.math.JBlas._ import org.apache.flink.api.scala._ -import org.jblas.{SimpleBlas, DoubleMatrix} +import com.github.fommil.netlib.BLAS.{ getInstance => blas } /** Multiple linear regression using the ordinary least squares (OLS) estimator. * @@ -77,8 +76,9 @@ import org.jblas.{SimpleBlas, DoubleMatrix} * * - [[MultipleLinearRegression.Stepsize]]: * Initial step size for the gradient descent method. - * This value controls how far the gradient descent method moves in the opposite direction of the gradient. - * Tuning this parameter might be crucial to make it stable and to obtain a better performance. + * This value controls how far the gradient descent method moves in the opposite direction of the + * gradient. Tuning this parameter might be crucial to make it stable and to obtain a better + * performance. * * - [[MultipleLinearRegression.ConvergenceThreshold]]: * Threshold for relative change of sum of squared residuals until convergence. @@ -113,7 +113,11 @@ with Serializable { val convergenceThreshold = map.get(ConvergenceThreshold) // calculate dimension of the feature vectors - val dimension = input.map{_.vector.size}.reduce { math.max(_, _) } + val dimension = input.map{_.vector.size}.reduce { + (a, b) => + require(a == b, "All input vector must have the same dimension.") + a + } // initial weight vector is set to 0 val initialWeightVector = createInitialWeightVector(dimension) @@ -150,7 +154,9 @@ with Serializable { val (leftBetas, leftBeta0, leftCount) = left val (rightBetas, rightBeta0, rightCount) = right - (leftBetas.add(rightBetas), leftBeta0 + rightBeta0, leftCount + rightCount) + blas.daxpy(leftBetas.length, 1.0, rightBetas, 1, leftBetas, 1) + + (leftBetas, leftBeta0 + rightBeta0, leftCount + rightCount) }.map { new LinearRegressionWeightsUpdate(stepsize) }.withBroadcastSet(weightVector, WEIGHTVECTOR_BROADCAST) @@ -197,7 +203,8 @@ with Serializable { val (leftBetas, leftBeta0, leftCount) = left val (rightBetas, rightBeta0, rightCount) = right - (leftBetas.add(rightBetas), leftBeta0 + rightBeta0, leftCount + rightCount) + blas.daxpy(leftBetas.length, 1, rightBetas, 1, leftBetas, 1) + (leftBetas, leftBeta0 + rightBeta0, leftCount + rightCount) }.map { new LinearRegressionWeightsUpdate(stepsize) }.withBroadcastSet(weightVector, WEIGHTVECTOR_BROADCAST) @@ -216,11 +223,11 @@ with Serializable { * @return DataSet of a zero vector of dimension d */ private def createInitialWeightVector(dimensionDS: DataSet[Int]): - DataSet[(DoubleMatrix, Double)] = { + DataSet[(Array[Double], Double)] = { dimensionDS.map { dimension => val values = Array.fill(dimension)(0.0) - (new DoubleMatrix(dimension, 1, values: _*), 0.0) + (values, 0.0) } } } @@ -261,13 +268,13 @@ object MultipleLinearRegression { private class SquaredResiduals extends RichMapFunction[LabeledVector, Double] { import MultipleLinearRegression.WEIGHTVECTOR_BROADCAST - var weightVector: DoubleMatrix = null + var weightVector: Array[Double] = null var weight0: Double = 0.0 @throws(classOf[Exception]) override def open(configuration: Configuration): Unit = { val list = this.getRuntimeContext. - getBroadcastVariable[(DoubleMatrix, Double)](WEIGHTVECTOR_BROADCAST) + getBroadcastVariable[(Array[Double], Double)](WEIGHTVECTOR_BROADCAST) val weightsPair = list.get(0) @@ -279,7 +286,9 @@ private class SquaredResiduals extends RichMapFunction[LabeledVector, Double] { val vector = value.vector val label = value.label - val residual = weightVector.dot(vector) + weight0 - label + val dotProduct = blas.ddot(weightVector.length, weightVector, 1, vector, 1) + + val residual = dotProduct + weight0 - label residual*residual } @@ -294,17 +303,17 @@ private class SquaredResiduals extends RichMapFunction[LabeledVector, Double] { * The weight vector is received as a broadcast variable. */ private class LinearRegressionGradientDescent extends -RichMapFunction[LabeledVector, (DoubleMatrix, Double, Int)] { +RichMapFunction[LabeledVector, (Array[Double], Double, Int)] { import MultipleLinearRegression.WEIGHTVECTOR_BROADCAST - var weightVector: DoubleMatrix = null + var weightVector: Array[Double] = null var weight0: Double = 0.0 @throws(classOf[Exception]) override def open(configuration: Configuration): Unit = { val list = this.getRuntimeContext. - getBroadcastVariable[(DoubleMatrix, Double)](WEIGHTVECTOR_BROADCAST) + getBroadcastVariable[(Array[Double], Double)](WEIGHTVECTOR_BROADCAST) val weightsPair = list.get(0) @@ -312,13 +321,19 @@ RichMapFunction[LabeledVector, (DoubleMatrix, Double, Int)] { weight0 = weightsPair._2 } - override def map(value: LabeledVector): (DoubleMatrix, Double, Int) = { + override def map(value: LabeledVector): (Array[Double], Double, Int) = { val x = value.vector val label = value.label - val error = weightVector.dot(x) + weight0 - label + val dotProduct = blas.ddot(weightVector.length, weightVector, 1, x, 1) + + val error = dotProduct + weight0 - label + + // reuse vector x + val weightsGradient = x + + blas.dscal(weightsGradient.length, 2*error, weightsGradient, 1) - val weightsGradient = x.mul(2 * error) val weight0Gradient = 2 * error (weightsGradient, weight0Gradient, 1) @@ -332,17 +347,17 @@ RichMapFunction[LabeledVector, (DoubleMatrix, Double, Int)] { * @param stepsize Initial value of the step size used to update the weight vector */ private class LinearRegressionWeightsUpdate(val stepsize: Double) extends -RichMapFunction[(DoubleMatrix, Double, Int), (DoubleMatrix, Double)] { +RichMapFunction[(Array[Double], Double, Int), (Array[Double], Double)] { import MultipleLinearRegression.WEIGHTVECTOR_BROADCAST - var weights: DoubleMatrix = null + var weights: Array[Double] = null var weight0: Double = 0.0 @throws(classOf[Exception]) override def open(configuration: Configuration): Unit = { val list = this.getRuntimeContext. - getBroadcastVariable[(DoubleMatrix, Double)](WEIGHTVECTOR_BROADCAST) + getBroadcastVariable[(Array[Double], Double)](WEIGHTVECTOR_BROADCAST) val weightsPair = list.get(0) @@ -350,8 +365,10 @@ RichMapFunction[(DoubleMatrix, Double, Int), (DoubleMatrix, Double)] { weight0 = weightsPair._2 } - override def map(value: (DoubleMatrix, Double, Int)): (DoubleMatrix, Double) = { - val weightsGradient = value._1.div(value._3) + override def map(value: (Array[Double], Double, Int)): (Array[Double], Double) = { + val weightsGradient = value._1 + blas.dscal(weightsGradient.length, 1.0/value._3, weightsGradient, 1) + val weight0Gradient = value._2 / value._3 val iteration = getIterationRuntimeContext.getSuperstepNumber @@ -360,9 +377,8 @@ RichMapFunction[(DoubleMatrix, Double, Int), (DoubleMatrix, Double)] { // decreasing val effectiveStepsize = stepsize/math.sqrt(iteration) - val newWeights = new DoubleMatrix(weights.rows, weights.columns) - newWeights.copy(weights) - SimpleBlas.axpy( -effectiveStepsize, weightsGradient, newWeights) + val newWeights = weights.clone + blas.daxpy(newWeights.length, -effectiveStepsize, weightsGradient, 1, newWeights, 1) val newWeight0 = weight0 - effectiveStepsize * weight0Gradient (newWeights, newWeight0) @@ -383,7 +399,7 @@ RichMapFunction[(DoubleMatrix, Double, Int), (DoubleMatrix, Double)] { * @param weights DataSet containing the calculated weight vector */ class MultipleLinearRegressionModel private[regression] -(val weights: DataSet[(DoubleMatrix, Double)]) extends +(val weights: DataSet[(Array[Double], Double)]) extends Transformer[ Vector, LabeledVector ] { import MultipleLinearRegression.WEIGHTVECTOR_BROADCAST @@ -403,13 +419,14 @@ Transformer[ Vector, LabeledVector ] { } private class LinearRegressionPrediction extends RichMapFunction[Vector, LabeledVector] { - private var weights: DoubleMatrix = null + private var weights: Array[Double] = null private var weight0: Double = 0 @throws(classOf[Exception]) override def open(configuration: Configuration): Unit = { - val t = getRuntimeContext.getBroadcastVariable[(DoubleMatrix, Double)](WEIGHTVECTOR_BROADCAST) + val t = getRuntimeContext + .getBroadcastVariable[(Array[Double], Double)](WEIGHTVECTOR_BROADCAST) val weightsPair = t.get(0) @@ -418,7 +435,9 @@ Transformer[ Vector, LabeledVector ] { } override def map(value: Vector): LabeledVector = { - val prediction = weights.dot(value) + weight0 + val dotProduct = blas.ddot(weights.length, weights, 1, value, 1) + + val prediction = dotProduct + weight0 LabeledVector(value, prediction) } http://git-wip-us.apache.org/repos/asf/flink/blob/21e2d96f/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/feature/PolynomialBaseITCase.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/feature/PolynomialBaseITCase.scala b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/feature/PolynomialBaseITCase.scala new file mode 100644 index 0000000..28fdfa6 --- /dev/null +++ b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/feature/PolynomialBaseITCase.scala @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.feature + +import org.apache.flink.api.scala.ExecutionEnvironment +import org.apache.flink.client.CliFrontendTestUtils +import org.apache.flink.ml.common.LabeledVector +import org.apache.flink.ml.math.DenseVector +import org.junit.{BeforeClass, Test} +import org.scalatest.ShouldMatchers + +import org.apache.flink.api.scala._ + +class PolynomialBaseITCase extends ShouldMatchers { + + @Test + def testMapElementToPolynomialVectorSpace (): Unit = { + val env = ExecutionEnvironment.getExecutionEnvironment + + env.setDegreeOfParallelism (2) + + val input = Seq ( + LabeledVector (DenseVector (1), 1.0), + LabeledVector (DenseVector (2), 2.0) + ) + + val inputDS = env.fromCollection (input) + + val transformer = PolynomialBase () + .setDegree (3) + + val transformedDS = transformer.transform (inputDS) + + val expectedMap = List ( + (1.0 -> DenseVector (1.0, 1.0, 1.0) ), + (2.0 -> DenseVector (8.0, 4.0, 2.0) ) + ) toMap + + val result = transformedDS.collect + + for (entry <- result) { + expectedMap.contains (entry.label) should be (true) + entry.vector should equal (expectedMap (entry.label) ) + } + } + + @Test + def testMapVectorToPolynomialVectorSpace(): Unit = { + val env = ExecutionEnvironment.getExecutionEnvironment + + env.setDegreeOfParallelism(2) + + val input = Seq( + LabeledVector(DenseVector(2, 3), 1.0), + LabeledVector(DenseVector(2, 3, 4), 2.0) + ) + + val expectedMap = List( + (1.0 -> DenseVector(8.0, 12.0, 18.0, 27.0, 4.0, 6.0, 9.0, 2.0, 3.0)), + (2.0 -> DenseVector(8.0, 12.0, 16.0, 18.0, 24.0, 32.0, 27.0, 36.0, 48.0, 64.0, 4.0, 6.0, 8.0, + 9.0, 12.0, 16.0, 2.0, 3.0, 4.0)) + ) toMap + + val inputDS = env.fromCollection(input) + + val transformer = PolynomialBase() + .setDegree(3) + + val transformedDS = transformer.transform(inputDS) + + val result = transformedDS.collect + + for(entry <- result) { + expectedMap.contains(entry.label) should be(true) + entry.vector should equal(expectedMap(entry.label)) + } + } + + @Test + def testReturnEmptyVectorIfDegreeIsZero(): Unit = { + val env = ExecutionEnvironment.getExecutionEnvironment + + env.setDegreeOfParallelism(2) + + val input = Seq( + LabeledVector(DenseVector(2, 3), 1.0), + LabeledVector(DenseVector(2, 3, 4), 2.0) + ) + + val inputDS = env.fromCollection(input) + + val transformer = PolynomialBase() + .setDegree(0) + + val transformedDS = transformer.transform(inputDS) + + val result = transformedDS.collect + + val expectedMap = List( + (1.0 -> DenseVector()), + (2.0 -> DenseVector()) + ) toMap + + for(entry <- result) { + expectedMap.contains(entry.label) should be(true) + entry.vector should equal(expectedMap(entry.label)) + } + } +} + +object PolynomialBaseITCase { + @BeforeClass + def setup(): Unit = { + CliFrontendTestUtils.pipeSystemOutToNull() + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/21e2d96f/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/feature/PolynomialBaseSuite.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/feature/PolynomialBaseSuite.scala b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/feature/PolynomialBaseSuite.scala deleted file mode 100644 index 8da822f..0000000 --- a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/feature/PolynomialBaseSuite.scala +++ /dev/null @@ -1,118 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.ml.feature - -import org.apache.flink.api.scala.ExecutionEnvironment -import org.apache.flink.ml.common.LabeledVector -import org.apache.flink.ml.math.DenseVector -import org.scalatest.{ShouldMatchers, FlatSpec} - -import org.apache.flink.api.scala._ - -class PolynomialBaseSuite extends FlatSpec with ShouldMatchers { - behavior of "A PolynomialBase" - - it should "map an element into a polynomial vector space" in { - val env = ExecutionEnvironment.getExecutionEnvironment - - val input = Seq( - LabeledVector(DenseVector(1), 1.0), - LabeledVector(DenseVector(2), 2.0) - ) - - val inputDS = env.fromCollection(input) - - val transformer = PolynomialBase() - .setDegree(3) - - val transformedDS = transformer.transform(inputDS) - - val expectedMap = List( - (1.0 -> DenseVector(1.0, 1.0, 1.0)), - (2.0 -> DenseVector(8.0, 4.0, 2.0)) - ) toMap - - val result = transformedDS.collect - - for(entry <- result) { - expectedMap.contains(entry.label) should be(true) - entry.vector should equal(expectedMap(entry.label)) - } - - } - - it should "map a vector into a polynomial vector space" in { - val env = ExecutionEnvironment.getExecutionEnvironment - - val input = Seq( - LabeledVector(DenseVector(2, 3), 1.0), - LabeledVector(DenseVector(2, 3, 4), 2.0) - ) - - val expectedMap = List( - (1.0 -> DenseVector(8.0, 12.0, 18.0, 27.0, 4.0, 6.0, 9.0, 2.0, 3.0)), - (2.0 -> DenseVector(8.0, 12.0, 16.0, 18.0, 24.0, 32.0, 27.0, 36.0, 48.0, 64.0, 4.0, 6.0, 8.0, - 9.0, 12.0, 16.0, 2.0, 3.0, 4.0)) - ) toMap - - val inputDS = env.fromCollection(input) - - val transformer = PolynomialBase() - .setDegree(3) - - val transformedDS = transformer.transform(inputDS) - - val result = transformedDS.collect - - for(entry <- result) { - expectedMap.contains(entry.label) should be(true) - entry.vector should equal(expectedMap(entry.label)) - } - - println(result) - } - - it should "return an empty vector if the polynomial degree is set to 0" in { - val env = ExecutionEnvironment.getExecutionEnvironment - - val input = Seq( - LabeledVector(DenseVector(2, 3), 1.0), - LabeledVector(DenseVector(2, 3, 4), 2.0) - ) - - val inputDS = env.fromCollection(input) - - val transformer = PolynomialBase() - .setDegree(0) - - val transformedDS = transformer.transform(inputDS) - - val result = transformedDS.collect - - val expectedMap = List( - (1.0 -> DenseVector()), - (2.0 -> DenseVector()) - ) toMap - - for(entry <- result) { - expectedMap.contains(entry.label) should be(true) - entry.vector should equal(expectedMap(entry.label)) - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/21e2d96f/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/recommendation/ALSITCase.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/recommendation/ALSITCase.scala b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/recommendation/ALSITCase.scala new file mode 100644 index 0000000..f2c52d3 --- /dev/null +++ b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/recommendation/ALSITCase.scala @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.recommendation + +import org.apache.flink.api.scala.ExecutionEnvironment +import org.apache.flink.client.CliFrontendTestUtils +import org.junit.{BeforeClass, Test} +import org.scalatest.ShouldMatchers + +import org.apache.flink.api.scala._ + +class ALSITCase extends ShouldMatchers { + + @Test + def testMatrixFactorization(): Unit = { + import ALSData._ + + val env = ExecutionEnvironment.getExecutionEnvironment + + env.setDegreeOfParallelism(2) + + val als = ALS() + .setIterations(iterations) + .setLambda(lambda) + .setBlocks(4) + .setNumFactors(numFactors) + + val inputDS = env.fromCollection(data) + + val model = als.fit(inputDS) + + val testData = env.fromCollection(expectedResult.map{ + case (userID, itemID, rating) => (userID, itemID) + }) + + val predictions = model.transform(testData).collect + + predictions.length should equal(expectedResult.length) + + val resultMap = expectedResult map { + case (uID, iID, value) => (uID, iID) -> value + } toMap + + predictions foreach { + case (uID, iID, value) => { + resultMap.isDefinedAt(((uID, iID))) should be(true) + + value should be(resultMap((uID, iID)) +- 0.1) + } + } + + val risk = model.empiricalRisk(inputDS).collect(0) + + risk should be(expectedEmpiricalRisk +- 1) + } +} + +object ALSITCase { + + @BeforeClass + def setup(): Unit = { + CliFrontendTestUtils.pipeSystemOutToNull() + } +} + +object ALSData { + + val iterations = 9 + val lambda = 1.0 + val numFactors = 5 + + val data: Seq[(Int, Int, Double)] = { + Seq( + (2,13,534.3937734561154), + (6,14,509.63176469621936), + (4,14,515.8246770897443), + (7,3,495.05234565105), + (2,3,532.3281786219485), + (5,3,497.1906356844367), + (3,3,512.0640508585093), + (10,3,500.2906742233019), + (1,4,521.9189079662882), + (2,4,515.0734651491396), + (1,7,522.7532725967008), + (8,4,492.65683825096403), + (4,8,492.65683825096403), + (10,8,507.03319667905413), + (7,1,522.7532725967008), + (1,1,572.2230209271174), + (2,1,563.5849190220224), + (6,1,518.4844061038742), + (9,1,529.2443732217674), + (8,1,543.3202505434103), + (7,2,516.0188923307859), + (1,2,563.5849190220224), + (1,11,515.1023793011227), + (8,2,536.8571133978352), + (2,11,507.90776961762225), + (3,2,532.3281786219485), + (5,11,476.24185144363304), + (4,2,515.0734651491396), + (4,11,469.92049343738233), + (3,12,509.4713776280098), + (4,12,494.6533165132021), + (7,5,482.2907867916308), + (6,5,477.5940040923741), + (4,5,480.9040684364228), + (1,6,518.4844061038742), + (6,6,470.6605085832807), + (8,6,489.6360564705307), + (4,6,472.74052954447046), + (7,9,482.5837650471611), + (5,9,487.00175463269863), + (9,9,500.69514584780944), + (4,9,477.71644808419325), + (7,10,485.3852917539852), + (8,10,507.03319667905413), + (3,10,500.2906742233019), + (5,15,488.08215944254437), + (6,15,480.16929757607346) + ) + } + + val expectedResult: Seq[(Int, Int, Double)] = { + Seq( + (2, 2, 526.1037), + (5, 9, 468.5680), + (10, 3, 484.8975), + (5, 13, 451.6228), + (1, 15, 493.4956), + (4, 11, 456.3862) + ) + } + + val expectedEmpiricalRisk = 505374.1877 +} http://git-wip-us.apache.org/repos/asf/flink/blob/21e2d96f/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/recommendation/ALSSuite.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/recommendation/ALSSuite.scala b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/recommendation/ALSSuite.scala deleted file mode 100644 index 770d4d2..0000000 --- a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/recommendation/ALSSuite.scala +++ /dev/null @@ -1,141 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.ml.recommendation - -import org.apache.flink.api.scala.ExecutionEnvironment -import org.scalatest.{ShouldMatchers, FlatSpec} - -import org.apache.flink.api.scala._ - -class ALSSuite extends FlatSpec with ShouldMatchers { - - behavior of "ALS" - - it should "factorize a given matrix" in { - import ALSData._ - - val env = ExecutionEnvironment.getExecutionEnvironment - - val als = ALS() - .setIterations(iterations) - .setLambda(lambda) - .setBlocks(4) - .setNumFactors(numFactors) - - val inputDS = env.fromCollection(data) - - val model = als.fit(inputDS) - - val testData = env.fromCollection(expectedResult.map{ - case (userID, itemID, rating) => (userID, itemID) - }) - - val predictions = model.transform(testData).collect - - predictions.length should equal(expectedResult.length) - - val resultMap = expectedResult map { - case (uID, iID, value) => (uID, iID) -> value - } toMap - - predictions foreach { - case (uID, iID, value) => { - resultMap.isDefinedAt(((uID, iID))) should be(true) - - value should be(resultMap((uID, iID)) +- 0.1) - } - } - - val risk = model.empiricalRisk(inputDS).collect(0) - - risk should be(expectedEmpiricalRisk +- 1) - } -} - -object ALSData { - - val iterations = 9 - val lambda = 1.0 - val numFactors = 5 - - val data: Seq[(Int, Int, Double)] = { - Seq( - (2,13,534.3937734561154), - (6,14,509.63176469621936), - (4,14,515.8246770897443), - (7,3,495.05234565105), - (2,3,532.3281786219485), - (5,3,497.1906356844367), - (3,3,512.0640508585093), - (10,3,500.2906742233019), - (1,4,521.9189079662882), - (2,4,515.0734651491396), - (1,7,522.7532725967008), - (8,4,492.65683825096403), - (4,8,492.65683825096403), - (10,8,507.03319667905413), - (7,1,522.7532725967008), - (1,1,572.2230209271174), - (2,1,563.5849190220224), - (6,1,518.4844061038742), - (9,1,529.2443732217674), - (8,1,543.3202505434103), - (7,2,516.0188923307859), - (1,2,563.5849190220224), - (1,11,515.1023793011227), - (8,2,536.8571133978352), - (2,11,507.90776961762225), - (3,2,532.3281786219485), - (5,11,476.24185144363304), - (4,2,515.0734651491396), - (4,11,469.92049343738233), - (3,12,509.4713776280098), - (4,12,494.6533165132021), - (7,5,482.2907867916308), - (6,5,477.5940040923741), - (4,5,480.9040684364228), - (1,6,518.4844061038742), - (6,6,470.6605085832807), - (8,6,489.6360564705307), - (4,6,472.74052954447046), - (7,9,482.5837650471611), - (5,9,487.00175463269863), - (9,9,500.69514584780944), - (4,9,477.71644808419325), - (7,10,485.3852917539852), - (8,10,507.03319667905413), - (3,10,500.2906742233019), - (5,15,488.08215944254437), - (6,15,480.16929757607346) - ) - } - - val expectedResult: Seq[(Int, Int, Double)] = { - Seq( - (2, 2, 526.1037), - (5, 9, 468.5680), - (10, 3, 484.8975), - (5, 13, 451.6228), - (1, 15, 493.4956), - (4, 11, 456.3862) - ) - } - - val expectedEmpiricalRisk = 505374.1877 -} http://git-wip-us.apache.org/repos/asf/flink/blob/21e2d96f/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/regression/MultipleLinearRegressionITCase.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/regression/MultipleLinearRegressionITCase.scala b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/regression/MultipleLinearRegressionITCase.scala new file mode 100644 index 0000000..eb825b9 --- /dev/null +++ b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/regression/MultipleLinearRegressionITCase.scala @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.regression + +import org.apache.flink.api.scala.ExecutionEnvironment +import org.apache.flink.client.CliFrontendTestUtils +import org.apache.flink.ml.common.ParameterMap +import org.apache.flink.ml.feature.PolynomialBase +import org.junit.{BeforeClass, Test} +import org.scalatest.ShouldMatchers + +import org.apache.flink.api.scala._ + +class MultipleLinearRegressionITCase extends ShouldMatchers { + + @Test + def testEstimationOfLinearFunction(): Unit = { + val env = ExecutionEnvironment.getExecutionEnvironment + + env.setDegreeOfParallelism(2) + + val learner = MultipleLinearRegression() + + import RegressionData._ + + val parameters = ParameterMap() + + parameters.add(MultipleLinearRegression.Stepsize, 1.0) + parameters.add(MultipleLinearRegression.Iterations, 10) + parameters.add(MultipleLinearRegression.ConvergenceThreshold, 0.001) + + val inputDS = env.fromCollection(data) + val model = learner.fit(inputDS, parameters) + + val weightList = model.weights.collect + + weightList.size should equal(1) + + val (weights, weight0) = weightList(0) + + expectedWeights zip weights foreach { + case (expectedWeight, weight) => + weight should be (expectedWeight +- 1) + } + weight0 should be (expectedWeight0 +- 0.4) + + val srs = model.squaredResidualSum(inputDS).collect(0) + + srs should be (expectedSquaredResidualSum +- 2) + } + + @Test + def testEstimationOfCubicFunction(): Unit = { + val env = ExecutionEnvironment.getExecutionEnvironment + + env.setDegreeOfParallelism(2) + + val polynomialBase = PolynomialBase() + val learner = MultipleLinearRegression() + + val pipeline = polynomialBase.chain(learner) + + val inputDS = env.fromCollection(RegressionData.polynomialData) + + val parameters = ParameterMap() + .add(PolynomialBase.Degree, 3) + .add(MultipleLinearRegression.Stepsize, 0.002) + .add(MultipleLinearRegression.Iterations, 100) + + val model = pipeline.fit(inputDS, parameters) + + val weightList = model.weights.collect + + weightList.size should equal(1) + + val (weights, weight0) = weightList(0) + + RegressionData.expectedPolynomialWeights.zip(weights) foreach { + case (expectedWeight, weight) => + weight should be(expectedWeight +- 0.1) + } + + weight0 should be(RegressionData.expectedPolynomialWeight0 +- 0.1) + + val transformedInput = polynomialBase.transform(inputDS, parameters) + + val srs = model.squaredResidualSum(transformedInput).collect(0) + + srs should be(RegressionData.expectedPolynomialSquaredResidualSum +- 5) + } +} + +object MultipleLinearRegressionITCase{ + + @BeforeClass + def setup(): Unit = { + CliFrontendTestUtils.pipeSystemOutToNull() + } +}