edgent-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dlaboss <...@git.apache.org>
Subject [GitHub] incubator-quarks pull request #167: [WIP] [COMMENTS?] [QUARKS-230] Add timer...
Date Fri, 15 Jul 2016 22:06:04 GMT
Github user dlaboss commented on a diff in the pull request:

    https://github.com/apache/incubator-quarks/pull/167#discussion_r71047423
  
    --- Diff: api/topology/src/main/java/quarks/topology/TWindow.java ---
    @@ -105,8 +129,85 @@ Licensed to the Apache Software Foundation (ASF) under one
          * @return A stream that contains the latest aggregations of partitions in this window.
          */
         <U> TStream<U> batch(BiFunction<List<T>, K, U> batcher);
    +
    +    /**
    +     * Declares a stream that is a continuous, sliding, 
    +     * timer triggered aggregation of
    +     * partitions in this window.
    +     * <P>
    +     * Periodically trigger an invocation of
    +     * {@code aggregator.apply(tuples, key)}, where {@code tuples} is
    +     * a {@code List<T>} containing all the tuples in the partition in
    +     * insertion order from oldest to newest.  The list is stable
    +     * during the aggregator invocation.  
    +     * The list will be empty if the partition is empty.
    +     * </P>
    +     * <P> 
    +     * A non-null {@code aggregator} result is added to the returned stream.
    +     * </P>
    +     * <P>
    +     * Thus the returned stream will contain a sequence of tuples where the
    +     * most recent tuple represents the most up to date aggregation of a
    +     * partition.
    +     *
    +     * @param <U> Tuple type
    +     * @param period how often to invoke the aggregator
    +     * @param unit TimeUnit for {@code period}
    +     * @param aggregator
    +     *            Logic to aggregation a partition.
    +     * @return A stream that contains the latest aggregations of partitions in this window.
    +     * 
    +     * @see #aggregate(BiFunction)
    +     */
    +    <U> TStream<U> timedAggregate(long period, TimeUnit unit, BiFunction<List<T>,
K, U> aggregator);
         
         /**
    +     * Declares a stream that represents a 
    +     * timer triggered batched aggregation of
    +     * partitions in this window. 
    +     * <P>
    +     * Periodically trigger an invocation of
    +     * {@code batcher.apply(tuples, key)}, where {@code tuples} is
    +     * a {@code List<T>} containing all the tuples in the partition in
    +     * insertion order from oldest to newest  The list is stable
    +     * during the batcher invocation.
    +     * The list will be empty if the partition is empty.
    +     * <P>
    +     * A non-null {@code batcher} result is added to the returned stream.
    +     * The partition's contents are cleared after a batch is processed.
    +     * </P>
    +     * <P>
    +     * Thus the returned stream will contain a sequence of tuples where the
    +     * most recent tuple represents the most up to date aggregation of a
    +     * partition.
    +     * 
    +     * @param <U> Tuple type
    +     * @param period how often to invoke the batcher
    +     * @param unit TimeUnit for {@code period}
    +     * @param batcher
    +     *            Logic to aggregation a partition.
    +     * @return A stream that contains the latest aggregations of partitions in this window.
    +     * 
    +     * @see #batch(BiFunction)
    +     */
    +    <U> TStream<U> timedBatch(long period, TimeUnit unit, BiFunction<List<T>,
K, U> batcher);
    +    
    +    /**
    +     * Declares a stream that represents an aggregation of
    +     * partitions in this window using the specified {@link Window}.
    +     * <P>
    +     * This method makes it easier to create aggregation streams
    +     * using the lower level {@code Window} API to construct and
    +     * supply windows with configurations not directly exposed by {@code TWindow}.
    +     * 
    +     * @param window the window implementation
    +     * @param aggregator the aggregation function
    +     *  
    +     * @return A stream that contains the latest aggregations of partitions in this window.
    +     */
    +    <U, L extends List<T>> TStream<U> process(Window<T,K,L> window,
BiFunction<List<T>, K, U> aggregator);
    --- End diff --
    
    Uh... :-)  Yeah, something's not quite right in that the TWindow.last(...) established
the time-based/count-based nature of the TWindow and as presented above, the supplied Window
can violate that.  Maybe it's more like TStream.last(window, processor)?  Mostly I was just
trying to expose a way the make it easier to utilize the Window API without having to know
about other impl details like synchronizing the processor, creating an Aggregator oplet, ...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

Mime
View raw message