flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-2030) Implement an online histogram with Merging and equalization features
Date Thu, 20 Aug 2015 13:38:46 GMT

    [ https://issues.apache.org/jira/browse/FLINK-2030?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14704896#comment-14704896
] 

ASF GitHub Bot commented on FLINK-2030:
---------------------------------------

Github user sachingoel0101 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/861#discussion_r37529224
  
    --- Diff: flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/MLUtils.scala ---
    @@ -119,4 +123,65 @@ object MLUtils {
     
         stringRepresentation.writeAsText(filePath)
       }
    +
    +  /** Create a [[ContinuousHistogram]] from the input data
    +    *
    +    * @param bins Number of bins required
    +    * @param data input [[DataSet]] of [[Double]]
    +    * @return [[ContinuousHistogram]] over the data
    +    */
    +  def createContinuousHistogram(data: DataSet[Double], bins: Int): DataSet[ContinuousHistogram]
= {
    +    val min = data.reduce((x, y) => Math.min(x, y))
    +    val max = data.reduce((x, y) => Math.max(x, y))
    +
    +    val stats = min.mapWithBcVariable(max) {
    +      (minimum, maximum) => (minimum - 2 * (maximum - minimum), maximum + 2 * (maximum
- minimum))
    +    }
    +
    +    data.mapPartition(new RichMapPartitionFunction[Double, ContinuousHistogram] {
    +      var statistics: (Double, Double) = _
    +
    +      override def open(configuration: Configuration): Unit = {
    +        statistics = getRuntimeContext.getBroadcastVariable(HISTOGRAM_STATS).get(0)
    +        val minimum = statistics._1
    +        val maximum = statistics._2
    +        statistics = (minimum - 2 * (maximum - minimum), maximum + 2 * (maximum - minimum))
    +      }
    +
    +      override def mapPartition(
    +          values: java.lang.Iterable[Double],
    +          out: Collector[ContinuousHistogram])
    +        : Unit = {
    +        val localHistogram = new ContinuousHistogram(bins, statistics._1, statistics._2)
    +        val iterator = values.iterator()
    +        while (iterator.hasNext) {
    +          localHistogram.add(iterator.next())
    +        }
    +        out.collect(localHistogram)
    +      }
    +    })
    +      .withBroadcastSet(stats, HISTOGRAM_STATS)
    +      .reduce((x, y) => x.merge(y, bins))
    +  }
    +
    +  /** Create a [[DiscreteHistogram]] from the input data
    +    *
    +    * @param data input [[DataSet]] of [[Double]]
    +    * @return [[DiscreteHistogram]] over the data
    +    */
    +  def createDiscreteHistogram(data: DataSet[Double]): DataSet[DiscreteHistogram] = {
    +    data.mapPartition(new RichMapPartitionFunction[Double, DiscreteHistogram] {
    --- End diff --
    
    Yes, but that would require creating a Histogram object for every element. This way we
can work with just one object per partition.


> Implement an online histogram with Merging and equalization features
> --------------------------------------------------------------------
>
>                 Key: FLINK-2030
>                 URL: https://issues.apache.org/jira/browse/FLINK-2030
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Machine Learning Library
>            Reporter: Sachin Goel
>            Assignee: Sachin Goel
>            Priority: Minor
>              Labels: ML
>
> For the implementation of the decision tree in https://issues.apache.org/jira/browse/FLINK-1727,
we need to implement an histogram with online updates, merging and equalization features.
A reference implementation is provided in [1]
> [1].http://www.jmlr.org/papers/volume11/ben-haim10a/ben-haim10a.pdf



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message