flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-1622) Add GroupReducePartial Operator
Date Tue, 10 Mar 2015 08:42:38 GMT

    [ https://issues.apache.org/jira/browse/FLINK-1622?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14354554#comment-14354554
] 

ASF GitHub Bot commented on FLINK-1622:
---------------------------------------

Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/466#discussion_r26105226
  
    --- Diff: flink-scala/src/main/scala/org/apache/flink/api/scala/GroupedDataSet.scala ---
    @@ -355,6 +355,63 @@ class GroupedDataSet[T: ClassTag](
       }
     
       /**
    +   * Partial variant of the reduceGroup transformation which operates only on the individual
    +   * partitions. This may lead to partially reduced results.
    +   * Creates a new [[DataSet]] by passing for each group (elements with the same key)
the list
    +   * of elements to the group reduce function. The function must output one element.
The
    +   * concatenation of those will form the resulting [[DataSet]].
    +   */
    +  def reduceGroupPartially[R: TypeInformation: ClassTag](
    +                                                 fun: (Iterator[T]) => R): DataSet[R]
= {
    +    Validate.notNull(fun, "Group reduce function must not be null.")
    +    val reducer = new GroupReduceFunction[T, R] {
    +      val cleanFun = set.clean(fun)
    +      def reduce(in: java.lang.Iterable[T], out: Collector[R]) {
    +        out.collect(cleanFun(in.iterator().asScala))
    +      }
    +    }
    +    wrap(
    +      new GroupReducePartialOperator[T, R](maybeCreateSortedGrouping(),
    +        implicitly[TypeInformation[R]], reducer, getCallLocationName()))
    +  }
    +
    +  /**
    +   * Partial variant of the reduceGroup transformation which operates only on the individual
    +   * partitions. This may lead to partially reduced results.
    +   * Creates a new [[DataSet]] by passing for each group (elements with the same key)
the list
    +   * of elements to the group reduce function. The function can output zero or more elements
using
    +   * the [[Collector]]. The concatenation of the emitted values will form the resulting
[[DataSet]].
    +   */
    +  def reduceGroupPartially[R: TypeInformation: ClassTag](
    +                                          fun: (Iterator[T], Collector[R]) => Unit):
DataSet[R] = {
    +    Validate.notNull(fun, "Group reduce function must not be null.")
    +    val reducer = new GroupReduceFunction[T, R] {
    +      val cleanFun = set.clean(fun)
    +      def reduce(in: java.lang.Iterable[T], out: Collector[R]) {
    +        cleanFun(in.iterator().asScala, out)
    +      }
    +    }
    +    wrap(
    +      new GroupReducePartialOperator[T, R](maybeCreateSortedGrouping(),
    +        implicitly[TypeInformation[R]], reducer, getCallLocationName()))
    +  }
    +
    +  /**
    +   * Partial variant of the reduceGroup transformation which operates only on the individual
    +   * partitions. This may lead to partially reduced results.
    +   * Creates a new [[DataSet]] by passing for each group (elements with the same key)
the list
    +   * of elements to the [[GroupReduceFunction]]. The function can output zero or more
elements. The
    +   * concatenation of the emitted values will form the resulting [[DataSet]].
    +   */
    +  def reduceGroupPartially[R: TypeInformation: ClassTag](
    +      reducer: GroupReduceFunction[T, R]): DataSet[R] = {
    +    Validate.notNull(reducer, "GroupReduce function must not be null.")
    +    wrap(
    +      new GroupReducePartialOperator[T, R](maybeCreateSortedGrouping(),
    +        implicitly[TypeInformation[R]], reducer, getCallLocationName()))
    +  }
    +
    +  /**
    --- End diff --
    
    Again, why partial reduce on grouped dataset. That's what the regular GroupReduce is.


> Add GroupReducePartial Operator
> -------------------------------
>
>                 Key: FLINK-1622
>                 URL: https://issues.apache.org/jira/browse/FLINK-1622
>             Project: Flink
>          Issue Type: Sub-task
>    Affects Versions: 0.9
>            Reporter: Aljoscha Krettek
>
> This does what a Reduce or GroupReduce Operator does, except it is only performed on
a local partition.
> This is also similar to an explicit combine that can output a type that is different
from the input.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message