edgent-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (QUARKS-230) Add timer triggered window aggregations
Date Fri, 15 Jul 2016 21:57:20 GMT

    [ https://issues.apache.org/jira/browse/QUARKS-230?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15380189#comment-15380189
] 

ASF GitHub Bot commented on QUARKS-230:
---------------------------------------

Github user dlaboss commented on a diff in the pull request:

    https://github.com/apache/incubator-quarks/pull/167#discussion_r71046560
  
    --- Diff: api/topology/src/main/java/quarks/topology/TWindow.java ---
    @@ -105,8 +129,85 @@ Licensed to the Apache Software Foundation (ASF) under one
          * @return A stream that contains the latest aggregations of partitions in this window.
          */
         <U> TStream<U> batch(BiFunction<List<T>, K, U> batcher);
    +
    +    /**
    +     * Declares a stream that is a continuous, sliding, 
    +     * timer triggered aggregation of
    +     * partitions in this window.
    +     * <P>
    +     * Periodically trigger an invocation of
    +     * {@code aggregator.apply(tuples, key)}, where {@code tuples} is
    +     * a {@code List<T>} containing all the tuples in the partition in
    +     * insertion order from oldest to newest.  The list is stable
    +     * during the aggregator invocation.  
    +     * The list will be empty if the partition is empty.
    +     * </P>
    +     * <P> 
    +     * A non-null {@code aggregator} result is added to the returned stream.
    +     * </P>
    +     * <P>
    +     * Thus the returned stream will contain a sequence of tuples where the
    +     * most recent tuple represents the most up to date aggregation of a
    +     * partition.
    +     *
    +     * @param <U> Tuple type
    +     * @param period how often to invoke the aggregator
    +     * @param unit TimeUnit for {@code period}
    +     * @param aggregator
    +     *            Logic to aggregation a partition.
    +     * @return A stream that contains the latest aggregations of partitions in this window.
    +     * 
    +     * @see #aggregate(BiFunction)
    +     */
    +    <U> TStream<U> timedAggregate(long period, TimeUnit unit, BiFunction<List<T>,
K, U> aggregator);
         
         /**
    +     * Declares a stream that represents a 
    +     * timer triggered batched aggregation of
    +     * partitions in this window. 
    +     * <P>
    +     * Periodically trigger an invocation of
    +     * {@code batcher.apply(tuples, key)}, where {@code tuples} is
    +     * a {@code List<T>} containing all the tuples in the partition in
    +     * insertion order from oldest to newest  The list is stable
    +     * during the batcher invocation.
    +     * The list will be empty if the partition is empty.
    +     * <P>
    +     * A non-null {@code batcher} result is added to the returned stream.
    +     * The partition's contents are cleared after a batch is processed.
    +     * </P>
    +     * <P>
    +     * Thus the returned stream will contain a sequence of tuples where the
    +     * most recent tuple represents the most up to date aggregation of a
    +     * partition.
    +     * 
    +     * @param <U> Tuple type
    +     * @param period how often to invoke the batcher
    +     * @param unit TimeUnit for {@code period}
    +     * @param batcher
    +     *            Logic to aggregation a partition.
    +     * @return A stream that contains the latest aggregations of partitions in this window.
    +     * 
    +     * @see #batch(BiFunction)
    +     */
    +    <U> TStream<U> timedBatch(long period, TimeUnit unit, BiFunction<List<T>,
K, U> batcher);
    --- End diff --
    
    a timedAggregate() aggregation invocation doesn't evict anything afterwards (only the
window's count/time config affects eviction, just like the non-timed aggregate()).  Hence
a tuple, if it isn't evicted between aggregator invocations, will be included in multiple
aggregations.  A batched aggregation, timer triggered or content change triggered, always
evicts everything hence a tuple will be included in at most one aggregation.


> Add timer triggered window aggregations
> ---------------------------------------
>
>                 Key: QUARKS-230
>                 URL: https://issues.apache.org/jira/browse/QUARKS-230
>             Project: Quarks
>          Issue Type: New Feature
>            Reporter: Dale LaBossiere
>            Assignee: Dale LaBossiere
>
> A recent use case involved the desire for "timer triggered" instead of "partition content
change triggered" window aggregations.  E.g., "I want to trigger an aggregation every second
over the last 10 tuples in a window partition" -- a count-based window with timer triggered
aggregations.
> I propose adding 3 methods to TWindow.  Two in direct support of "timer triggered" aggregations
(this processing model seems like it could be common enough to warrant making it conveniently
available) and one to make it easier to use the lower level Window API to define and use other
processing models.
> I'm submitting a PR with the details for review but the net is these additions to TWindow:
> ```
>     <U> TStream<U> timedAggregate(long period, TimeUnit unit, BiFunction<List<T>,
K, U> aggregator);
>     <U> TStream<U> timedBatch(long period, TimeUnit unit, BiFunction<List<T>,
K, U> batcher);
>     <U, L extends List<T>> TStream<U> process(Window<T,K,L> window,
BiFunction<List<T>, K, U> aggregator);
> ```
> See https://github.com/apache/incubator-quarks/pull/167



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message