Return-Path: X-Original-To: apmail-flink-issues-archive@minotaur.apache.org Delivered-To: apmail-flink-issues-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 79789187B1 for ; Tue, 26 May 2015 18:22:29 +0000 (UTC) Received: (qmail 76275 invoked by uid 500); 26 May 2015 18:22:29 -0000 Delivered-To: apmail-flink-issues-archive@flink.apache.org Received: (qmail 76226 invoked by uid 500); 26 May 2015 18:22:29 -0000 Mailing-List: contact issues-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list issues@flink.apache.org Received: (qmail 76217 invoked by uid 99); 26 May 2015 18:22:29 -0000 Received: from arcas.apache.org (HELO arcas.apache.org) (140.211.11.28) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 26 May 2015 18:22:29 +0000 Date: Tue, 26 May 2015 18:22:29 +0000 (UTC) From: "ASF GitHub Bot (JIRA)" To: issues@flink.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Commented] (FLINK-2073) Add contribution guide for FlinkML MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 7bit X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 [ https://issues.apache.org/jira/browse/FLINK-2073?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14559554#comment-14559554 ] ASF GitHub Bot commented on FLINK-2073: --------------------------------------- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/727#discussion_r31063469 --- Diff: docs/libs/ml/contribution_guide.md --- @@ -20,7 +21,329 @@ specific language governing permissions and limitations under the License. --> +The Flink community highly appreciates all sorts of contributions to FlinkML. +FlinkML offers people interested in machine learning to work on a highly active open source project which makes scalable ML reality. +The following document describes how to contribute to FlinkML. + * This will be replaced by the TOC {:toc} -Coming soon. In the meantime, check our list of [open issues on JIRA](https://issues.apache.org/jira/browse/FLINK-1748?jql=component%20%3D%20%22Machine%20Learning%20Library%22%20AND%20project%20%3D%20FLINK%20AND%20resolution%20%3D%20Unresolved%20ORDER%20BY%20priority%20DESC) +## Getting Started + +In order to get started first read Flink's [contribution guide](http://flink.apache.org/how-to-contribute.html). +Everything from this guide also applies to FlinkML. + +## Pick a Topic + +If you are looking for some new ideas, then you should check out the list of [unresolved issues on JIRA](https://issues.apache.org/jira/issues/?jql=component%20%3D%20%22Machine%20Learning%20Library%22%20AND%20project%20%3D%20FLINK%20AND%20resolution%20%3D%20Unresolved%20ORDER%20BY%20priority%20DESC). +Once you decide to contribute to one of these issues, you should take ownership of it and track your progress with this issue. +That way, the other contributors know the state of the different issues and redundant work is avoided. + +If you already know what you want to contribute to FlinkML all the better. +It is still advisable to create a JIRA issue for your idea to tell the Flink community what you want to do, though. + +## Testing + +New contributions should come with tests to verify the correct behavior of the algorithm. +The tests help to maintain the algorithm's correctness throughout code changes, e.g. refactorings. + +We distinguish between unit tests, which are executed during maven's test phase, and integration tests, which are executed during maven's verify phase. +Maven automatically makes this distinction by using the following naming rules: +All test cases whose class name ends with a suffix fulfilling the regular expression `(IT|Integration)(Test|Suite|Case)`, are considered integration tests. +The rest are considered unit tests and should only test behavior which is local to the component under test. + +An integration test is a test which requires the full Flink system to be started. +In order to do that properly, all integration test cases have to mix in the trait `FlinkTestBase`. +This trait will set the right `ExecutionEnvironment` so that the test will be executed on a special `FlinkMiniCluster` designated for testing purposes. +Thus, an integration test could look the following: + +{% highlight scala %} +class ExampleITSuite extends FlatSpec with FlinkTestBase { + behavior of "An example algorithm" + + it should "do something" in { + ... + } +} +{% endhighlight %} + +The test style does not have to be `FlatSpec` but can be any other scalatest `Suite` subclass. + +## Documentation + +When contributing new algorithms, it is required to add code comments describing the functioning of the algorithm and its parameters with which the user can control its behavior. +Additionally, we would like to encourage contributors to add this information to the online documentation. +The online documentation for FlinkML's components can be found in the directory `docs/libs/ml`. + +Every new algorithm is described by a single markdown file. +This file should contain at least the following points: + +1. What does the algorithm do +2. How does the algorithm work (or reference to description) +3. Parameter description with default values +4. Code snippet showing how the algorithm is used + +In order to use latex syntax in the markdown file, you have to include `mathjax: include` in the YAML front matter. + +{% highlight java %} +--- +mathjax: include +title: Example title +--- +{% endhighlight %} + +In order to use displayed mathematics, you have to put your latex code in `$$ ... $$`. +For in-line mathematics, use `$ ... $`. +Additionally some predefined latex commands are included into the scope of your markdown file. +See `docs/_include/latex_commands.html` for the complete list of predefined latex commands. + +## Contributing + +Once you have implemented the algorithm with adequate test coverage and added documentation, you are ready to open a pull request. +Details of how to open a pull request can be found [here](http://flink.apache.org/how-to-contribute.html#contributing-code--documentation). + +## How to Implement a Pipeline Operator + +FlinkML follows the principle to make machine learning as easy and accessible as possible. +Therefore, it supports a flexible pipelining mechanism which allows users to quickly define their analysis pipelines consisting of a multitude of different components. +A pipeline operator is either a `Transformer` or a `Predictor`. +A `Transformer` can be fitted to training data and transforms data from one format into another format. +A scaler which changes the mean and variance of its input data according to the mean and variance of some training data is an example for a `Transformer`. +In contrast, a `Predictor` encapsulates a data model and the corresponding logic to train it. +Once a `Predictor` has trained the model, it can be used to make new predictions. +A support vector machine which is first trained to obtain the support vectors and then used to classify data points is an example for a `Predictor`. +A general description of FlinkML's pipelining can be found [here]({{site.baseurl}}/libs/ml/pipelines.html). +In order to support the pipelining, algorithms have to adhere to a certain design pattern, which we will describe next. + +Let's assume that we want to implement a pipeline operator which changes the mean of your data. +At first, we have to reflect which type of pipeline operator it is. +Since centering data is a common preprocessing step in any analysis pipeline, we will implement it as a `Transformer`. +Therefore, we first create a `MeanTransformer` class which inherits from `Transformer` + +{% highlight scala %} +class MeanTransformer extends Transformer[Centering] {} +{% endhighlight %} + +Since we want to be able to configure the mean of the resulting data, we have to add a configuration parameter. + +{% highlight scala %} +class MeanTransformer extends Transformer[Centering] { + def setMean(mean: Double): Mean = { + parameters.add(MeanTransformer.Mean, mu) + } +} + +object MeanTransformer { + case object Mean extends Parameter[Double] { + override val defaultValue: Option[Double] = Some(0.0) + } + + def apply(): MeanTransformer = new MeanTransformer +} +{% endhighlight %} + +Parameters are defined in the companion object of the transformer class and extend the `Parameter` class. +The default value will be used if no other value has been set by the user of this component. +If no default value has been specified, meaning that `defaultValue = None`, then the algorithm has to handle this situation accordingly. + +We can now instantiate a `MeanTransformer` object and set the mean value of the transformed data. +But we still have to implement how the transformation works. +The workflow can be separated into two phases. +Within the first phase, the transformer learns the mean of the given training data. +This knowledge can then be used in the second phase to transform the provided data with respect to the configured resulting mean value. + +The learning of the mean can be implemented within the `fit` operation of a `Transformer`. +Within the `fit` operation, a pipeline component is trained with respect to the given training data. +The algorithm is, however, **not** implemented by overriding the `fit` method but by providing an implementation of a corresponding `FitOperation` for the correct type. +Taking a look at the definition of the `fit` method in `Estimator`, which is the parent class of `Transformer`, reveals what why this is the case. + +{% highlight scala %} +trait Estimator[Self] extends WithParameters with Serializable { + that: Self => + + def fit[Training]( + training: DataSet[Training], + fitParameters: ParameterMap = ParameterMap.Empty) + (implicit fitOperation: FitOperation[Self, Training]): Unit = { + FlinkMLTools.registerFlinkMLTypes(training.getExecutionEnvironment) + fitOperation.fit(this, fitParameters, training) + } +} +{% endhighlight %} + +We see that the `fit` method is called with an input data set of type `Training`, an optional parameter list and in the second parameter list with an implicit parameter of type `FitOperation`. +Within the body of the function, first some machine learning types are registered and then the `fit` method of the `FitOperation` parameter is called. +The instance gives itself, the parameter map and the training data set as a parameters to the method. +Thus, all the program logic takes place within the `FitOperation`. + +The `FitOperation` has two type parameters. +The first defines the pipeline operator type for which this `FitOperation` shall work and the second type parameter defines the type of the data set elements. +If we first wanted to implement the `MeanTransformer` to work on `DenseVector`, we would, thus, have to provide an implementation for `FitOperation[MeanTransformer, DenseVector]`. + +{% highlight scala %} +val denseVectorMeanFitOperation = new FitOperation[MeanTransformer, DenseVector] { + override def fit(instance: MeanTransformer, fitParameters: ParameterMap, input: DataSet[DenseVector]) : Unit = { + import org.apache.flink.ml.math.Breeze._ + val meanTrainingData: DataSet[DenseVector] = input + .map{ x => (x.asBreeze, 1) } + .reduce{ + (left, right) => + (left._1 + right._1, left._2 + right._2) + } + .map{ p => (p._1/p._2).fromBreeze } + } +} +{% endhighlight %} + +A `FitOperation[T, I]` has a `fit` method which is called with an instance of type `T`, a parameter map and an input `DataSet[I]`. +In our case `T=MeanTransformer` and `I=DenseVector`. +The parameter map is necessary if our fit step depends on some parameter values which were not given directly at creation time of the `Transformer`. +The `FitOperation` of the `MeanTransformer` sums the `DenseVector` instances of the given input data set up and divides the result by the total number of vectors. +That way, we obtain a `DataSet[DenseVector]` with a single element which is the mean value. + +But if we look closely at the implementation, we see that the result of the mean computation is never stored anywhere. +If we want to use this knowledge in a later step to adjust the mean of some other input, we have to keep it around. +And here is where the parameter of type `MeanTransformer` which is given to the `fit` method comes into play. +We can use this instance to store state, which is used by a subsequent `transform` operation which works on the same object. +But first we have to extend `MeanTransformer` by a member field and then adjust the `FitOperation` implementation. + +{% highlight scala %} +class MeanTransformer extends Transformer[Centering] { + var meanOption: Option[DataSet[DenseVector]] = None + + def setMean(mean: Double): Mean = { + parameters.add(MeanTransformer.Mean, mu) + } +} + +val denseVectorMeanFitOperation = new FitOperation[MeanTransformer, DenseVector] { + override def fit(instance: MeanTransformer, fitParameters: ParameterMap, input: DataSet[DenseVector]) : Unit = { + import org.apache.flink.ml.math.Breeze._ + + instance.meanOption = Some(input + .map{ x => (x.asBreeze, 1) } + .reduce{ + (left, right) => + (left._1 + right._1, left._2 + right._2) + } + .map{ p => (p._1/p._2).fromBreeze }) + } +} +{% endhighlight %} + +If we look at the `transform` method in `Transformer`, we will see that we also need an implementation of `TransformOperation`. +A possible mean transforming implementation could look the following. + +{% highlight scala %} + +val denseVectorMeanTransformOperation = new TransformOperation[MeanTransformer, DenseVector, DenseVector] { + override def transform( + instance: MeanTransformer, + transformParameters: ParameterMap, + input: DataSet[DenseVector]) + : DataSet[DenseVector] = { + val resultingParameters = parameters ++ transformParameters + + val resultingMean = resultingParameters(MeanTransformer.Mean) + + instance.meanOption match { + case Some(trainingMean) => { + input.map{ new MeanTransformMapper(resultingMean) }.withBroadcastSet(trainingMean, "trainingMean") + } + case None => throw new RuntimeException("MeanTransformer has not been fitted to data.") + } + } +} + +class MeanTransformMapper(resultingMean: Double) extends RichMapFunction[DenseVector, DenseVector] { + var trainingMean: DenseVector = null + + override def open(parameters: Configuration): Unit = { + trainingMean = getRuntimeContext().getBroadcastVariable[DenseVector]("trainingMean").get(0) + } + + override def map(vector: DenseVector): DenseVector = { + import org.apache.flink.ml.math.Breeze._ + + val result = vector.asBreeze - trainingMean.asBreeze + resultingMean + + result.fromBreeze + } +} +{% endhighlight %} + +Now we have everything implemented to fit our `MeanTransformer` to a training data set of `DenseVector` instances and to transform them. +However, when we execute the `fit` operation + +{% highlight scala %} +val trainingData: DataSet[DenseVector] = ... +val meanTransformer = MeanTransformer() + +meanTransformer.fit(trainingData) +{% endhighlight %} + +we receive the following error at runtime: `"There is no FitOperation defined for class MeanTransformer which trains on a DataSet[org.apache.flink.ml.math.DenseVector]"`. +The reason is that the Scala compiler could not find a fitting `FitOperation` value with the right type parameters for the implicit parameter of the `fit` method. +Therefore, it chose a fallback implicit value which gives you this error message at runtime. +In order to make the compiler aware of our implementation, we have to define it as an implicit value and put it in the scope of the `MeanTransformer's` companion object. + +{% highlight scala %} +object MeanTransformer{ + implicit val denseVectorMeanFitOperation = new FitOperation[MeanTransformer, DenseVector] ... + + implicit val denseVectorMeanTransformOperation = new TransformOperation[MeanTransformer, DenseVector, DenseVector] ... +} +{% endhighlight %} + +Now we can call `fit` and `transform` of our `MeanTransformer` with `DataSet[DenseVector]` as input. +Furthermore, we can now use this transformer as part of an analysis pipeline where we have a `DenseVector` as input and expected output. + +{% highlight scala %} +val trainingData: DataSet[DenseVector] = ... + +val mean = MeanTransformer.setMean(1.0) +val polyFeaturs = PolynomialFeatures().setDegree(3) + +val pipeline = mean.chainTransformer(polyFeatures) + +pipeline.fit(trainingData) +{% endhighlight %} + +It is noteworthy that there is no additional code needed to enable chaining. +The system automatically constructs the pipeline logic using the operations of the individual components. + +So far everything works fine with `DenseVector`. +But what happens, if we call our transformer with `LabeledVector` instead? +{% highlight scala %} +val trainingData: DataSet[LabeledVector] = ... + +val mean = MeanTransformer() + +mean.fit(trainingData) +{% endhighlight %} + +As before we see the following exception upon execution of the program: `"There is no FitOperation defined for class MeanTransformer which trains on a DataSet[org.apache.flink.ml.common.LabeledVector]"`. +It is noteworthy, that this exception is thrown in the pre-flight phase, which means that the job has not been submitted to the runtime system. +This has the advantage that you won't see a job which runs for a couple of days and then fails because of an incompatible pipeline component. +Type compatibility is, thus, checked at the very beginning for the complete job. + +In order to make the `MeanTransformer` work on `LabeledVector` as well, we have to provide the corresponding operations. +Consequently, we have to define a `FitOperation[MeanTransformer, LabeledVector]` and `TransformOperation[MeanTransformer, LabeledVector, LabeledVector]` as implicit values in the scope of `MeanTransformer's` companion object. + +{% highlight scala %} +object MeanTransformer { + implicit val labeledVectorFitOperation = new FitOperation[MeanTransformer, LabeledVector] ... + + implicit val labeledVectorTransformOperation = new TransformOperation[MeanTransformer, LabeledVector, LabeledVector] ... +} +{% endhighlight %} + +If we wanted to implement a `Predictor` instead of a `Transformer`, then we would have to do the same. --- End diff -- agreed > Add contribution guide for FlinkML > ---------------------------------- > > Key: FLINK-2073 > URL: https://issues.apache.org/jira/browse/FLINK-2073 > Project: Flink > Issue Type: New Feature > Components: Documentation, Machine Learning Library > Reporter: Theodore Vasiloudis > Assignee: Till Rohrmann > Fix For: 0.9 > > > We need a guide for contributions to FlinkML in order to encourage the extension of the library, and provide guidelines for developers. > One thing that should be included is a step-by-step guide to create a transformer, or other Estimator -- This message was sent by Atlassian JIRA (v6.3.4#6332)