flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-2073) Add contribution guide for FlinkML
Date Tue, 26 May 2015 18:22:29 GMT

    [ https://issues.apache.org/jira/browse/FLINK-2073?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14559554#comment-14559554
] 

ASF GitHub Bot commented on FLINK-2073:
---------------------------------------

Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/727#discussion_r31063469
  
    --- Diff: docs/libs/ml/contribution_guide.md ---
    @@ -20,7 +21,329 @@ specific language governing permissions and limitations
     under the License.
     -->
     
    +The Flink community highly appreciates all sorts of contributions to FlinkML.
    +FlinkML offers people interested in machine learning to work on a highly active open
source project which makes scalable ML reality.
    +The following document describes how to contribute to FlinkML.
    +
     * This will be replaced by the TOC
     {:toc}
     
    -Coming soon. In the meantime, check our list of [open issues on JIRA](https://issues.apache.org/jira/browse/FLINK-1748?jql=component%20%3D%20%22Machine%20Learning%20Library%22%20AND%20project%20%3D%20FLINK%20AND%20resolution%20%3D%20Unresolved%20ORDER%20BY%20priority%20DESC)
    +## Getting Started
    +
    +In order to get started first read Flink's [contribution guide](http://flink.apache.org/how-to-contribute.html).
    +Everything from this guide also applies to FlinkML.
    +
    +## Pick a Topic
    +
    +If you are looking for some new ideas, then you should check out the list of [unresolved
issues on JIRA](https://issues.apache.org/jira/issues/?jql=component%20%3D%20%22Machine%20Learning%20Library%22%20AND%20project%20%3D%20FLINK%20AND%20resolution%20%3D%20Unresolved%20ORDER%20BY%20priority%20DESC).
    +Once you decide to contribute to one of these issues, you should take ownership of it
and track your progress with this issue.
    +That way, the other contributors know the state of the different issues and redundant
work is avoided.
    +
    +If you already know what you want to contribute to FlinkML all the better.
    +It is still advisable to create a JIRA issue for your idea to tell the Flink community
what you want to do, though.
    +
    +## Testing
    +
    +New contributions should come with tests to verify the correct behavior of the algorithm.
    +The tests help to maintain the algorithm's correctness throughout code changes, e.g.
refactorings.
    +
    +We distinguish between unit tests, which are executed during maven's test phase, and
integration tests, which are executed during maven's verify phase.
    +Maven automatically makes this distinction by using the following naming rules:
    +All test cases whose class name ends with a suffix fulfilling the regular expression
`(IT|Integration)(Test|Suite|Case)`, are considered integration tests.
    +The rest are considered unit tests and should only test behavior which is local to the
component under test.
    +
    +An integration test is a test which requires the full Flink system to be started.
    +In order to do that properly, all integration test cases have to mix in the trait `FlinkTestBase`.
    +This trait will set the right `ExecutionEnvironment` so that the test will be executed
on a special `FlinkMiniCluster` designated for testing purposes.
    +Thus, an integration test could look the following:
    +
    +{% highlight scala %}
    +class ExampleITSuite extends FlatSpec with FlinkTestBase {
    +  behavior of "An example algorithm"
    +  
    +  it should "do something" in {
    +    ...
    +  }
    +}
    +{% endhighlight %}
    +
    +The test style does not have to be `FlatSpec` but can be any other scalatest `Suite`
subclass. 
    +
    +## Documentation
    +
    +When contributing new algorithms, it is required to add code comments describing the
functioning of the algorithm and its parameters with which the user can control its behavior.
    +Additionally, we would like to encourage contributors to add this information to the
online documentation.
    +The online documentation for FlinkML's components can be found in the directory `docs/libs/ml`.
    +
    +Every new algorithm is described by a single markdown file.
    +This file should contain at least the following points:
    +
    +1. What does the algorithm do
    +2. How does the algorithm work (or reference to description) 
    +3. Parameter description with default values
    +4. Code snippet showing how the algorithm is used
    +
    +In order to use latex syntax in the markdown file, you have to include `mathjax: include`
in the YAML front matter.
    + 
    +{% highlight java %}
    +---
    +mathjax: include
    +title: Example title
    +---
    +{% endhighlight %}
    +
    +In order to use displayed mathematics, you have to put your latex code in `$$ ... $$`.
    +For in-line mathematics, use `$ ... $`.
    +Additionally some predefined latex commands are included into the scope of your markdown
file.
    +See `docs/_include/latex_commands.html` for the complete list of predefined latex commands.
    +
    +## Contributing
    +
    +Once you have implemented the algorithm with adequate test coverage and added documentation,
you are ready to open a pull request.
    +Details of how to open a pull request can be found [here](http://flink.apache.org/how-to-contribute.html#contributing-code--documentation).

    +
    +## How to Implement a Pipeline Operator
    +
    +FlinkML follows the principle to make machine learning as easy and accessible as possible.
    +Therefore, it supports a flexible pipelining mechanism which allows users to quickly
define their analysis pipelines consisting of a multitude of different components.
    +A pipeline operator is either a `Transformer` or a `Predictor`.
    +A `Transformer` can be fitted to training data and transforms data from one format into
another format.
    +A scaler which changes the mean and variance of its input data according to the mean
and variance of some training data is an example for a `Transformer`.
    +In contrast, a `Predictor` encapsulates a data model and the corresponding logic to train
it.
    +Once a `Predictor` has trained the model, it can be used to make new predictions.
    +A support vector machine which is first trained to obtain the support vectors and then
used to classify data points is an example for a `Predictor`.
    +A general description of FlinkML's pipelining can be found [here]({{site.baseurl}}/libs/ml/pipelines.html).
    +In order to support the pipelining, algorithms have to adhere to a certain design pattern,
which we will describe next.
    +
    +Let's assume that we want to implement a pipeline operator which changes the mean of
your data.
    +At first, we have to reflect which type of pipeline operator it is.
    +Since centering data is a common preprocessing step in any analysis pipeline, we will
implement it as a `Transformer`.
    +Therefore, we first create a `MeanTransformer` class which inherits from `Transformer`
    +
    +{% highlight scala %}
    +class MeanTransformer extends Transformer[Centering] {}
    +{% endhighlight %}
    +
    +Since we want to be able to configure the mean of the resulting data, we have to add
a configuration parameter.
    +
    +{% highlight scala %}
    +class MeanTransformer extends Transformer[Centering] {
    +  def setMean(mean: Double): Mean = {
    +    parameters.add(MeanTransformer.Mean, mu)
    +  }
    +}
    +
    +object MeanTransformer {
    +  case object Mean extends Parameter[Double] {
    +    override val defaultValue: Option[Double] = Some(0.0)
    +  }
    +  
    +  def apply(): MeanTransformer = new MeanTransformer
    +}
    +{% endhighlight %}
    +
    +Parameters are defined in the companion object of the transformer class and extend the
`Parameter` class.
    +The default value will be used if no other value has been set by the user of this component.
    +If no default value has been specified, meaning that `defaultValue = None`, then the
algorithm has to handle this situation accordingly.
    +
    +We can now instantiate a `MeanTransformer` object and set the mean value of the transformed
data.
    +But we still have to implement how the transformation works.
    +The workflow can be separated into two phases.
    +Within the first phase, the transformer learns the mean of the given training data.
    +This knowledge can then be used in the second phase to transform the provided data with
respect to the configured resulting mean value.
    +
    +The learning of the mean can be implemented within the `fit` operation of a `Transformer`.
    +Within the `fit` operation, a pipeline component is trained with respect to the given
training data.
    +The algorithm is, however, **not** implemented by overriding the `fit` method but by
providing an implementation of a corresponding `FitOperation` for the correct type.
    +Taking a look at the definition of the `fit` method in `Estimator`, which is the parent
class of `Transformer`, reveals what why this is the case.
    +
    +{% highlight scala %}
    +trait Estimator[Self] extends WithParameters with Serializable {
    +  that: Self =>
    +
    +  def fit[Training](
    +      training: DataSet[Training],
    +      fitParameters: ParameterMap = ParameterMap.Empty)
    +      (implicit fitOperation: FitOperation[Self, Training]): Unit = {
    +    FlinkMLTools.registerFlinkMLTypes(training.getExecutionEnvironment)
    +    fitOperation.fit(this, fitParameters, training)
    +  }
    +}
    +{% endhighlight %}
    +
    +We see that the `fit` method is called with an input data set of type `Training`, an
optional parameter list and in the second parameter list with an implicit parameter of type
`FitOperation`.
    +Within the body of the function, first some machine learning types are registered and
then the `fit` method of the `FitOperation` parameter is called.
    +The instance gives itself, the parameter map and the training data set as a parameters
to the method.
    +Thus, all the program logic takes place within the `FitOperation`.
    +
    +The `FitOperation` has two type parameters.
    +The first defines the pipeline operator type for which this `FitOperation` shall work
and the second type parameter defines the type of the data set elements.
    +If we first wanted to implement the `MeanTransformer` to work on `DenseVector`, we would,
thus, have to provide an implementation for `FitOperation[MeanTransformer, DenseVector]`.
    + 
    +{% highlight scala %}
    +val denseVectorMeanFitOperation = new FitOperation[MeanTransformer, DenseVector] {
    +  override def fit(instance: MeanTransformer, fitParameters: ParameterMap, input: DataSet[DenseVector])
: Unit = {
    +    import org.apache.flink.ml.math.Breeze._
    +    val meanTrainingData: DataSet[DenseVector] = input
    +      .map{ x => (x.asBreeze, 1) }
    +      .reduce{
    +        (left, right) => 
    +          (left._1 + right._1, left._2 + right._2) 
    +      }
    +      .map{ p => (p._1/p._2).fromBreeze }
    +  }
    +}
    +{% endhighlight %}
    +
    +A `FitOperation[T, I]` has a `fit` method which is called with an instance of type `T`,
a parameter map and an input `DataSet[I]`.
    +In our case `T=MeanTransformer` and `I=DenseVector`.
    +The parameter map is necessary if our fit step depends on some parameter values which
were not given directly at creation time of the `Transformer`.
    +The `FitOperation` of the `MeanTransformer` sums the `DenseVector` instances of the given
input data set up and divides the result by the total number of vectors.
    +That way, we obtain a `DataSet[DenseVector]` with a single element which is the mean
value.
    +
    +But if we look closely at the implementation, we see that the result of the mean computation
is never stored anywhere.
    +If we want to use this knowledge in a later step to adjust the mean of some other input,
we have to keep it around.
    +And here is where the parameter of type `MeanTransformer` which is given to the `fit`
method comes into play.
    +We can use this instance to store state, which is used by a subsequent `transform` operation
which works on the same object.
    +But first we have to extend `MeanTransformer` by a member field and then adjust the `FitOperation`
implementation.
    +
    +{% highlight scala %}
    +class MeanTransformer extends Transformer[Centering] {
    +  var meanOption: Option[DataSet[DenseVector]] = None
    +
    +  def setMean(mean: Double): Mean = {
    +    parameters.add(MeanTransformer.Mean, mu)
    +  }
    +}
    +
    +val denseVectorMeanFitOperation = new FitOperation[MeanTransformer, DenseVector] {
    +  override def fit(instance: MeanTransformer, fitParameters: ParameterMap, input: DataSet[DenseVector])
: Unit = {
    +    import org.apache.flink.ml.math.Breeze._
    +    
    +    instance.meanOption = Some(input
    +      .map{ x => (x.asBreeze, 1) }
    +      .reduce{
    +        (left, right) => 
    +          (left._1 + right._1, left._2 + right._2) 
    +      }
    +      .map{ p => (p._1/p._2).fromBreeze })
    +  }
    +}
    +{% endhighlight %}
    +
    +If we look at the `transform` method in `Transformer`, we will see that we also need
an implementation of `TransformOperation`.
    +A possible mean transforming implementation could look the following.
    +
    +{% highlight scala %}
    +
    +val denseVectorMeanTransformOperation = new TransformOperation[MeanTransformer, DenseVector,
DenseVector] {
    +  override def transform(
    +      instance: MeanTransformer, 
    +      transformParameters: ParameterMap, 
    +      input: DataSet[DenseVector]) 
    +    : DataSet[DenseVector] = {
    +    val resultingParameters = parameters ++ transformParameters
    +    
    +    val resultingMean = resultingParameters(MeanTransformer.Mean)
    +    
    +    instance.meanOption match {
    +      case Some(trainingMean) => {
    +        input.map{ new MeanTransformMapper(resultingMean) }.withBroadcastSet(trainingMean,
"trainingMean")
    +      }
    +      case None => throw new RuntimeException("MeanTransformer has not been fitted
to data.")
    +    }
    +  }
    +}
    +
    +class MeanTransformMapper(resultingMean: Double) extends RichMapFunction[DenseVector,
DenseVector] {
    +  var trainingMean: DenseVector = null
    +
    +  override def open(parameters: Configuration): Unit = {
    +    trainingMean = getRuntimeContext().getBroadcastVariable[DenseVector]("trainingMean").get(0)
    +  }
    +  
    +  override def map(vector: DenseVector): DenseVector = {
    +    import org.apache.flink.ml.math.Breeze._
    +    
    +    val result = vector.asBreeze - trainingMean.asBreeze + resultingMean
    +    
    +    result.fromBreeze
    +  }
    +}
    +{% endhighlight %}
    +
    +Now we have everything implemented to fit our `MeanTransformer` to a training data set
of `DenseVector` instances and to transform them.
    +However, when we execute the `fit` operation
    +
    +{% highlight scala %}
    +val trainingData: DataSet[DenseVector] = ...
    +val meanTransformer = MeanTransformer()
    +
    +meanTransformer.fit(trainingData)
    +{% endhighlight %}
    +
    +we receive the following error at runtime: `"There is no FitOperation defined for class
MeanTransformer which trains on a DataSet[org.apache.flink.ml.math.DenseVector]"`.
    +The reason is that the Scala compiler could not find a fitting `FitOperation` value with
the right type parameters for the implicit parameter of the `fit` method.
    +Therefore, it chose a fallback implicit value which gives you this error message at runtime.
    +In order to make the compiler aware of our implementation, we have to define it as an
implicit value and put it in the scope of the `MeanTransformer's` companion object.
    +
    +{% highlight scala %}
    +object MeanTransformer{
    +  implicit val denseVectorMeanFitOperation = new FitOperation[MeanTransformer, DenseVector]
...
    +  
    +  implicit val denseVectorMeanTransformOperation = new TransformOperation[MeanTransformer,
DenseVector, DenseVector] ...
    +}
    +{% endhighlight %}
    +
    +Now we can call `fit` and `transform` of our `MeanTransformer` with `DataSet[DenseVector]`
as input.
    +Furthermore, we can now use this transformer as part of an analysis pipeline where we
have a `DenseVector` as input and expected output.
    +
    +{% highlight scala %}
    +val trainingData: DataSet[DenseVector] = ...
    +
    +val mean = MeanTransformer.setMean(1.0)
    +val polyFeaturs = PolynomialFeatures().setDegree(3)
    +
    +val pipeline = mean.chainTransformer(polyFeatures)
    +
    +pipeline.fit(trainingData)
    +{% endhighlight %}
    +
    +It is noteworthy that there is no additional code needed to enable chaining.
    +The system automatically constructs the pipeline logic using the operations of the individual
components.
    +
    +So far everything works fine with `DenseVector`.
    +But what happens, if we call our transformer with `LabeledVector` instead?
    +{% highlight scala %}
    +val trainingData: DataSet[LabeledVector] = ...
    +
    +val mean = MeanTransformer()
    +
    +mean.fit(trainingData)
    +{% endhighlight %}
    +
    +As before we see the following exception upon execution of the program: `"There is no
FitOperation defined for class MeanTransformer which trains on a DataSet[org.apache.flink.ml.common.LabeledVector]"`.
    +It is noteworthy, that this exception is thrown in the pre-flight phase, which means
that the job has not been submitted to the runtime system.
    +This has the advantage that you won't see a job which runs for a couple of days and then
fails because of an incompatible pipeline component.
    +Type compatibility is, thus, checked at the very beginning for the complete job.
    +
    +In order to make the `MeanTransformer` work on `LabeledVector` as well, we have to provide
the corresponding operations.
    +Consequently, we have to define a `FitOperation[MeanTransformer, LabeledVector]` and
`TransformOperation[MeanTransformer, LabeledVector, LabeledVector]` as implicit values in
the scope of `MeanTransformer's` companion object.
    +
    +{% highlight scala %}
    +object MeanTransformer {
    +  implicit val labeledVectorFitOperation = new FitOperation[MeanTransformer, LabeledVector]
...
    +  
    +  implicit val labeledVectorTransformOperation = new TransformOperation[MeanTransformer,
LabeledVector, LabeledVector] ...
    +}
    +{% endhighlight %}
    +
    +If we wanted to implement a `Predictor` instead of a `Transformer`, then we would have
to do the same.
    --- End diff --
    
    agreed


> Add contribution guide for FlinkML
> ----------------------------------
>
>                 Key: FLINK-2073
>                 URL: https://issues.apache.org/jira/browse/FLINK-2073
>             Project: Flink
>          Issue Type: New Feature
>          Components: Documentation, Machine Learning Library
>            Reporter: Theodore Vasiloudis
>            Assignee: Till Rohrmann
>             Fix For: 0.9
>
>
> We need a guide for contributions to FlinkML in order to encourage the extension of the
library, and provide guidelines for developers.
> One thing that should be included is a step-by-step guide to create a transformer, or
other Estimator



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message