apex-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From AJAY GUPTA <ajaygit...@gmail.com>
Subject Re: [DISCUSS] Proposal for adapting Malhar operators for batch use cases
Date Tue, 09 May 2017 12:25:38 GMT
After some discussion and trying out the approach discussed above, it seems
we would need to separate out the concepts of Watermarks and Batch Control
tuples.
The windowed operator needs to be modified to understand batch control
tuples.

Even if we have watermark tuples which also include batch information,
windowed operator will fail when the source data is event time based. This
is because in this scenario, there are two notions of time in the watermark:
1. Time used to denote the file / batch boundary
2. The event time in the data.

For this reason, it makes sense to separate the concepts of batch tuples
(start something / end something) from the watermark tuples (which
essentially deal with event times).

We could argue having a watermark tuple indicating end of the batch - a
final watermark (with time = Long.MAX) which would finalize all windows in
the windowed operator. However, now, if a next batch needs to be processed
subsequently by the same windowed operator, we would need to reset the
state of the operator as it has moved ahead in the event time domain. The
batch control tuples can do this resetting of state (in other words,
preparation for processing a new batch of data).

As an example, consider telecom data logs for same 24 hrs of 2 regions (A
and B) which are to be processed as a batch. After processing data records
from region A, a "final" watermark would be emitted indicating end of all
data from region A. Now, unless we clear the windowed operator's state
information (current watermark, data storage) from the windowed operator,
the data records from region B will not be processed. In such scenario,
receiving an end batch control tuple can indicate the operator to reset its
state.


Ajay


On Sun, Apr 30, 2017 at 4:32 AM, Vlad Rozov <v.rozov@datatorrent.com> wrote:

> public static class Pojo implements Tuple
> {
>   @Override
>   public Object getValue()
>   {
>     return this;
>   }
> }
>
> @Override
> public void populateDAG(DAG dag, Configuration conf)
> {
>   CsvParser csvParser = dag.addOperator("csvParser", CsvParser.class);
>   WindowedOperatorImpl<Pojo, Pojo, Pojo> windowedOperator =
> dag.addOperator("windowOperator", WindowedOperatorImpl.class);
>   dag.addStream("csvToWindowed", csvParser.out, new
> InputPort[]{windowedOperator.input});
> }
>
>
> Thank you,
>
> Vlad
>
> On 4/29/17 15:20, AJAY GUPTA wrote:
>
>> Even this will not work because the output port of CsvParser is of type
>> Object. Even though Customer extends Tuple<Object>, it will still fail to
>> work since Tuple<Object> gets output as Object.
>>
>> *DefaultOutputPort<Object> output = new DefaultOutputPort<Object>();*
>>
>> The input port type at windowed operator with InputT = Object :
>> *DefaultInputPort<Tuple<Object>>*
>>
>>
>> Ajay
>>
>>
>> On Sun, Apr 30, 2017 at 1:45 AM, Vlad Rozov <v.rozov@datatorrent.com>
>> wrote:
>>
>> Use Object in place of InputT in the WindowedOperatorImpl. Cast Object to
>>> the actual type of InputT at runtime. Introducing an operator just to do
>>> a
>>> cast is not a good design decision, IMO.
>>>
>>> Thank you,
>>> Vlad
>>>
>>> Отправлено с iPhone
>>>
>>> On Apr 29, 2017, at 02:50, AJAY GUPTA <ajaygit158@gmail.com> wrote:
>>>>
>>>> I am using WindowedOperatorImpl and it is declared as follows.
>>>>
>>>> WindowedOperatorImpl<InputT, AccumulationType, OutputType>
>>>>
>>> windowedOperator
>>>
>>>> = new WindowedOperatorImpl<>();
>>>>
>>>> In my application scenario, the InputT is Customer POJO which is getting
>>>> output as an Object by CsvParser.
>>>>
>>>>
>>>> Ajay
>>>>
>>>> On Fri, Apr 28, 2017 at 11:53 PM, Vlad Rozov <v.rozov@datatorrent.com>
>>>> wrote:
>>>>
>>>> How do you declare WindowedOperator?
>>>>>
>>>>> Thank you,
>>>>>
>>>>> Vlad
>>>>>
>>>>>
>>>>> On 4/28/17 10:35, AJAY GUPTA wrote:
>>>>>>
>>>>>> Vlad,
>>>>>>
>>>>>> The approach you suggested doesn't work because the CSVParser outputs
>>>>>> Object Data Type irrespective of the POJO class being emitted.
>>>>>>
>>>>>>
>>>>>> Ajay
>>>>>>
>>>>>> On Fri, Apr 28, 2017 at 8:13 PM, Vlad Rozov <v.rozov@datatorrent.com>
>>>>>> wrote:
>>>>>>
>>>>>> Make your POJO class implement WindowedOperator Tuple interface (it
>>>>>> may
>>>>>>
>>>>>>> return itself in getValue()).
>>>>>>>
>>>>>>> Thank you,
>>>>>>>
>>>>>>> Vlad
>>>>>>>
>>>>>>> On 4/28/17 02:44, AJAY GUPTA wrote:
>>>>>>>
>>>>>>> Hi All,
>>>>>>>
>>>>>>>> I am creating an application which is using Windowed Operator. This
>>>>>>>> application involves CsvParser operator emitting a POJO object which
>>>>>>>>
>>>>>>> is
>>>
>>>> to
>>>>>>>> be passed as input to WindowedOperator. The WindowedOperator
>>>>>>>>
>>>>>>> requires an
>>>
>>>> instance of Tuple class as input :
>>>>>>>> *public final transient DefaultInputPort<Tuple<InputT>>
>>>>>>>> input = new DefaultInputPort<Tuple<InputT>>() *
>>>>>>>>
>>>>>>>> Due to this, the addStream cannot work as the output of CsvParser's
>>>>>>>> output
>>>>>>>> port is not compatible with input port type of WindowedOperator.
>>>>>>>> One way to solve this problem is to have an operator between the
>>>>>>>>
>>>>>>> above
>>>
>>>> two
>>>>>>>> operators as a convertor.
>>>>>>>> I would like to know if there is any other more generic approach to
>>>>>>>> solve
>>>>>>>> this problem without writing a new Operator for every new
>>>>>>>> application
>>>>>>>> using
>>>>>>>> Windowed Operators.
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> Ajay
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Thu, Mar 23, 2017 at 5:25 PM, Bhupesh Chawda <
>>>>>>>> bhupesh@datatorrent.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>> Hi All,
>>>>>>>>
>>>>>>>> I think we have some agreement on the way we should use control
>>>>>>>>>
>>>>>>>> tuples
>>>
>>>> for
>>>>>>>>> File I/O operators to support batch.
>>>>>>>>>
>>>>>>>>> In order to have more operators in Malhar, support this paradigm, I
>>>>>>>>> think
>>>>>>>>> we should also look at store operators - JDBC, Cassandra, HBase
>>>>>>>>> etc.
>>>>>>>>> The case with these operators is simpler as most of these do not
>>>>>>>>>
>>>>>>>> poll
>>>
>>>> the
>>>>>>>>> sources (except JDBC poller operator) and just stop once they have
>>>>>>>>> read a
>>>>>>>>> fixed amount of data. In other words, these are inherently batch
>>>>>>>>> sources.
>>>>>>>>> The only change that we should add to these operators is to shut
>>>>>>>>>
>>>>>>>> down
>>>
>>>> the
>>>>>>>>> DAG once the reading of data is done. For a windowed operator this
>>>>>>>>> would
>>>>>>>>> mean a Global window with a final watermark before the DAG is shut
>>>>>>>>> down.
>>>>>>>>>
>>>>>>>>> ~ Bhupesh
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> _______________________________________________________
>>>>>>>>>
>>>>>>>>> Bhupesh Chawda
>>>>>>>>>
>>>>>>>>> E: bhupesh@datatorrent.com | Twitter: @bhupeshsc
>>>>>>>>>
>>>>>>>>> www.datatorrent.com  |  apex.apache.org
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Tue, Feb 28, 2017 at 10:59 PM, Bhupesh Chawda <
>>>>>>>>> bhupesh@datatorrent.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>> Hi Thomas,
>>>>>>>>>
>>>>>>>>> Even though the windowing operator is not just "event time", it
>>>>>>>>>>
>>>>>>>>> seems
>>>
>>>> it
>>>>>>>>>> is too much dependent on the "time" attribute of the incoming
>>>>>>>>>>
>>>>>>>>> tuple.
>>>
>>>> This
>>>>>>>>>> is the reason we had to model the file index as a timestamp to
>>>>>>>>>>
>>>>>>>>> solve
>>>
>>>> the
>>>>>>>>>> batch case for files.
>>>>>>>>>> Perhaps we should work on increasing the scope of the windowed
>>>>>>>>>> operator
>>>>>>>>>>
>>>>>>>>>> to
>>>>>>>>>>
>>>>>>>>> consider other types of windows as well. The Sequence option
>>>>>>>>>
>>>>>>>> suggested
>>>
>>>> by
>>>>>>>>>> David seems to be something in that direction.
>>>>>>>>>>
>>>>>>>>>> ~ Bhupesh
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> _______________________________________________________
>>>>>>>>>>
>>>>>>>>>> Bhupesh Chawda
>>>>>>>>>>
>>>>>>>>>> E: bhupesh@datatorrent.com | Twitter: @bhupeshsc
>>>>>>>>>>
>>>>>>>>>> www.datatorrent.com  |  apex.apache.org
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Tue, Feb 28, 2017 at 10:48 PM, Thomas Weise <thw@apache.org>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>> That's correct, we are looking at a generalized approach for state
>>>>>>>>>>
>>>>>>>>>> management vs. a series of special cases.
>>>>>>>>>>>
>>>>>>>>>>> And to be clear, windowing does not imply event time, otherwise
>>>>>>>>>>> it
>>>>>>>>>>> would
>>>>>>>>>>> be
>>>>>>>>>>> "EventTimeOperator" :-)
>>>>>>>>>>>
>>>>>>>>>>> Thomas
>>>>>>>>>>>
>>>>>>>>>>> On Tue, Feb 28, 2017 at 9:11 AM, Bhupesh Chawda <
>>>>>>>>>>>
>>>>>>>>>>> bhupesh@datatorrent.com>
>>>>>>>>>>>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>> Hi David,
>>>>>>>>>>>
>>>>>>>>>>> I went through the discussion, but it seems like it is more on
>>>>>>>>>>>>
>>>>>>>>>>> the
>>>
>>>> event
>>>>>>>>>>>>
>>>>>>>>>>> time watermark handling as opposed to batches. What we are trying
>>>>>>>>>>
>>>>>>>>> to
>>>
>>>> do
>>>>>>>>>>>
>>>>>>>>>>> is
>>>>>>>>>>
>>>>>>>>>> have watermarks serve the purpose of demarcating batches using
>>>>>>>>>>>
>>>>>>>>>>>> control
>>>>>>>>>>>> tuples. Since each batch is separate from others, we would like
>>>>>>>>>>>>
>>>>>>>>>>> to
>>>
>>>> have
>>>>>>>>>>>>
>>>>>>>>>>> stateful processing within a batch, but not across batches.
>>>>>>>>>>
>>>>>>>>>> At the same time, we would like to do this in a manner which is
>>>>>>>>>>>
>>>>>>>>>>>> consistent
>>>>>>>>>>>>
>>>>>>>>>>> with the windowing mechanism provided by the windowed operator.
>>>>>>>>>>>
>>>>>>>>>> This
>>>
>>>> will
>>>>>>>>>>>>
>>>>>>>>>>> allow us to treat a single batch as a (bounded) stream and apply
>>>>>>>>>>>
>>>>>>>>>> all
>>>
>>>> the
>>>>>>>>>>>>
>>>>>>>>>>> event time windowing concepts in that time span.
>>>>>>>>>>
>>>>>>>>>> For example, let's say I need to process data for a day (24
>>>>>>>>>>>
>>>>>>>>>> hours) as
>>>
>>>> a
>>>>>>>>>>>>
>>>>>>>>>>> single batch. The application is still streaming in nature: it
>>>>>>>>>>
>>>>>>>>> would
>>>
>>>> end
>>>>>>>>>>>
>>>>>>>>>>> the batch after a day and start a new batch the next day. At the
>>>>>>>>>>
>>>>>>>>> same
>>>
>>>> time,
>>>>>>>>>>>
>>>>>>>>>>> I would be able to have early trigger firings every minute as
>>>>>>>>>>>
>>>>>>>>>> well as
>>>
>>>> drop
>>>>>>>>>>>>
>>>>>>>>>>> any data which is say, 5 mins late. All this within a single day.
>>>>>>>>>>>
>>>>>>>>>>>> ~ Bhupesh
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> _______________________________________________________
>>>>>>>>>>>>
>>>>>>>>>>>> Bhupesh Chawda
>>>>>>>>>>>>
>>>>>>>>>>>> E: bhupesh@datatorrent.com | Twitter: @bhupeshsc
>>>>>>>>>>>>
>>>>>>>>>>>> www.datatorrent.com  |  apex.apache.org
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Tue, Feb 28, 2017 at 9:27 PM, David Yan <davidyan@gmail.com>
>>>>>>>>>>>>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>> There is a discussion in the Flink mailing list about key-based
>>>>>>>>>>
>>>>>>>>>> watermarks.
>>>>>>>>>>>
>>>>>>>>>>>> I think it's relevant to our use case here.
>>>>>>>>>>>>
>>>>>>>>>>>>> https://lists.apache.org/thread.html/2b90d5b1d5e2654212cfbbc
>>>>>>>>>>>>> c6510ef
>>>>>>>>>>>>> 424bbafc4fadb164bd5aff9216@%3Cdev.flink.apache.org%3E
>>>>>>>>>>>>>
>>>>>>>>>>>>> David
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Tue, Feb 28, 2017 at 2:13 AM, Bhupesh Chawda <
>>>>>>>>>>>>>
>>>>>>>>>>>>> bhupesh@datatorrent.com
>>>>>>>>>>>>>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>> Hi David,
>>>>>>>>>>>>>
>>>>>>>>>>>>> If using time window does not seem appropriate, we can have
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> another
>>>>>>>>>>>>>>
>>>>>>>>>>>>> class
>>>>>>>>>>>>
>>>>>>>>>>> which is more suited for such sequential and distinct windows.
>>>>>>>>>>>
>>>>>>>>>>>> Perhaps, a
>>>>>>>>>>>>> CustomWindow option can be introduced which takes in a window
>>>>>>>>>>>>>
>>>>>>>>>>>> id.
>>>
>>>> The
>>>>>>>>>>>>>
>>>>>>>>>>>>> purpose of this window option could be to translate the window
>>>>>>>>>>>> id
>>>>>>>>>>>>
>>>>>>>>>>>> into
>>>>>>>>>>>>>
>>>>>>>>>>>>> appropriate timestamps.
>>>>>>>>>>>>
>>>>>>>>>>>> Another option would be to go with a custom timestampExtractor
>>>>>>>>>>>>>
>>>>>>>>>>>> for
>>>
>>>> such
>>>>>>>>>>>>>>
>>>>>>>>>>>>> tuples which translates the each unique file name to a distinct
>>>>>>>>>>>>
>>>>>>>>>>>> timestamp
>>>>>>>>>>>>> while using time windows in the windowed operator.
>>>>>>>>>>>>>
>>>>>>>>>>>>> ~ Bhupesh
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> _______________________________________________________
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Bhupesh Chawda
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> E: bhupesh@datatorrent.com | Twitter: @bhupeshsc
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> www.datatorrent.com  |  apex.apache.org
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Tue, Feb 28, 2017 at 12:28 AM, David Yan <
>>>>>>>>>>>>>>
>>>>>>>>>>>>> davidyan@gmail.com>
>>>
>>>> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>> I now see your rationale on putting the filename in the window.
>>>>>>>>>>>>>
>>>>>>>>>>>>> As far as I understand, the reasons why the filename is not
>>>>>>>>>>>>>>
>>>>>>>>>>>>> part
>>>
>>>> of
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>> the
>>>>>>>>>>>>>
>>>>>>>>>>>> key
>>>>>>>>>>>>
>>>>>>>>>>>>> and the Global Window is not used are:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> 1) The files are processed in sequence, not in parallel
>>>>>>>>>>>>>>> 2) The windowed operator should not keep the state associated
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> with
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>> the
>>>>>>>>>>>>>
>>>>>>>>>>>> file
>>>>>>>>>>>
>>>>>>>>>>>> when the processing of the file is done
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> 3) The trigger should be fired for the file when a file is
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>> done
>>>
>>>> processing.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>> However, if the file is just a sequence has nothing to do with
>>>>>>>>>>>>>>
>>>>>>>>>>>>> a
>>>
>>>> timestamp,
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>> assigning a timestamp to a file is not an intuitive thing to
>>>>>>>>>>>>>> do
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>> would
>>>>>>>>>>>>> just create confusions to the users, especially when it's used
>>>>>>>>>>>>>
>>>>>>>>>>>>>> as
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> an
>>>>>>>>>>>>>
>>>>>>>>>>>> example for new users.
>>>>>>>>>>>
>>>>>>>>>>>> How about having a separate class called SequenceWindow? And
>>>>>>>>>>>>>
>>>>>>>>>>>>>> perhaps
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>> TimeWindow can inherit from it?
>>>>>>>>>>>>> David
>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Mon, Feb 27, 2017 at 8:58 AM, Thomas Weise <thw@apache.org
>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Mon, Feb 27, 2017 at 8:50 AM, Bhupesh Chawda <
>>>>>>>>>>>>>
>>>>>>>>>>>>> bhupesh@datatorrent.com
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I think my comments related to count based windows might be
>>>>>>>>>>>>>>>> causing
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> confusion. Let's not discuss count based scenarios for now.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Just want to make sure we are on the same page wrt. the
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> "each
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> file
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>> is a
>>>>>>>>>>>
>>>>>>>>>>>> batch" use case. As mentioned by Thomas, the each tuple from
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> same
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>> file
>>>>>>>>>>>>>
>>>>>>>>>>>>>> has the same timestamp (which is just a sequence number) and
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> that
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> helps
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>> keep tuples from each file in a separate window.
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Yes, in this case it is a sequence number, but it could be a
>>>>>>>>>>>>>>>> time
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> stamp
>>>>>>>>>>>>>>
>>>>>>>>>>>>> also, depending on the file naming convention. And if it was
>>>>>>>>>>>>>
>>>>>>>>>>>>>> event
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> time
>>>>>>>>>>>>>>
>>>>>>>>>>>>> processing, the watermark would be derived from records within
>>>>>>>>>>>>>
>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> file.
>>>>>>>>>>>>>>
>>>>>>>>>>>>> Agreed, the source should have a mechanism to control the time
>>>>>>>>>>>>>
>>>>>>>>>>>>>> stamp
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> extraction along with everything else pertaining to the
>>>>>>>>>>>>>> watermark
>>>>>>>>>>>>>> generation.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> We could also implement a "timestampExtractor" interface to
>>>>>>>>>>>>>>>> identify
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>> timestamp (sequence number) for a file.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> ~ Bhupesh
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> _______________________________________________________
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Bhupesh Chawda
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> E: bhupesh@datatorrent.com | Twitter: @bhupeshsc
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> www.datatorrent.com  |  apex.apache.org
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> On Mon, Feb 27, 2017 at 9:52 PM, Thomas Weise <
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> thw@apache.org
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I don't think this is a use case for count based window.
>>>>>>>>>>>
>>>>>>>>>>>> We have multiple files that are retrieved in a sequence
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> there
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> is
>>>>>>>>>>>
>>>>>>>>>>>> no
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> knowledge of the number of records per file. The
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> requirement is
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> aggregate each file separately and emit the aggregate when
>>>>>>>>>>>>>
>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> file
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> is
>>>>>>>>>>>>>
>>>>>>>>>>>>>> read
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> fully. There is no concept of "end of something" for an
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> individual
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> key
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> global window isn't applicable.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> However, as already explained and implemented by Bhupesh,
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> this
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> can
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> be
>>>>>>>>>>>>>
>>>>>>>>>>>>>> solved using watermark and window (in this case the window
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> timestamp
>>>>>>>>>>>>>>>> isn't
>>>>>>>>>>>>>>>> a timestamp, but a file sequence, but that doesn't matter.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Thomas
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> On Mon, Feb 27, 2017 at 8:05 AM, David Yan <
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> davidyan@gmail.com
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I don't think this is the way to go. Global Window only
>>>>>>>>>>>>>
>>>>>>>>>>>>>> means
>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> timestamp
>>>>>>>>>>>>>
>>>>>>>>>>>>>> does not matter (or that there is no timestamp). It does
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> not
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> necessarily
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> mean it's a large batch. Unless there is some notion of
>>>>>>>>>>>>>
>>>>>>>>>>>>>> event
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> time
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> for
>>>>>>>>>>>>>
>>>>>>>>>>>>>> each
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> file, you don't want to embed the file into the window
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> itself.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> If you want the result broken up by file name, and if
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> files
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> are
>>>>>>>>>>>
>>>>>>>>>>>> to
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> be
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> processed in parallel, I think making the file name be
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> part
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> of
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>
>>>>>>>>>>>>> key
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> is
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> the way to go. I think it's very confusing if we somehow
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> make
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> file
>>>>>>>>>>>>>
>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> be part of the window.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> For count-based window, it's not implemented yet and
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> you're
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> welcome
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>
>>>>>>>>>>>>>> add
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> that feature. In case of count-based windows, there
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> would
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> be
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> no
>>>>>>>>>>>
>>>>>>>>>>>> notion
>>>>>>>>>>>>
>>>>>>>>>>>>> of
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> time and you probably only trigger at the end of each
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> window.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> In
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>
>>>>>>>>>>>>>> case
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> of count-based windows, the watermark only matters for
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> batch
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> since
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> you
>>>>>>>>>>>>>
>>>>>>>>>>>>>> need
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> a way to know when the batch has ended (if the count is
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> 10,
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> number
>>>>>>>>>>>>
>>>>>>>>>>>>> of
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> tuples in the batch is let's say 105, you need a way to
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> end
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> last
>>>>>>>>>>>>
>>>>>>>>>>>>> window
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> with 5 tuples).
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> David
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> On Mon, Feb 27, 2017 at 2:41 AM, Bhupesh Chawda <
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> bhupesh@datatorrent.com
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Hi David,
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Thanks for your comments.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> The wordcount example that I created based on the
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> windowed
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> operator
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> does
>>>>>>>>>>>>>
>>>>>>>>>>>>>> processing of word counts per file (each file as a
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> separate
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> batch),
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> i.e.
>>>>>>>>>>>>>
>>>>>>>>>>>>>> process counts for each file and dump into separate
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> files.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> As I understand Global window is for one large batch;
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> i.e.
>>>>>>>>>>>>>
>>>>>>>>>>>>>> all
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> incoming
>>>>>>>>>>>>>
>>>>>>>>>>>>>> data falls into the same batch. This could not be
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> processed
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> using
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> GlobalWindow option as we need more than one windows.
>>>>>>>>>>>>>
>>>>>>>>>>>>>> In
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> this
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> case, I
>>>>>>>>>>>
>>>>>>>>>>>> configured the windowed operator to have time windows
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> of
>>>>>>>>>>>>>>>>>> 1ms
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> each
>>>>>>>>>>>
>>>>>>>>>>>> and
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> passed data for each file with increasing timestamps:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> (file1,
>>>>>>>>>>>>>>>>>> 1),
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> (file2,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> 2) and so on. Is there a better way of handling this
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> scenario?
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Regarding (2 - count based windows), I think there is
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> a
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> trigger
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> option
>>>>>>>>>>>
>>>>>>>>>>>> to
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> process count based windows. In case I want to process
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> every
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> 1000
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> tuples
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> as
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> a batch, I could set the Trigger option to
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> CountTrigger
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> with
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>
>>>>>>>>>>>> accumulation set to Discarding. Is this correct?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I agree that (4. Final Watermark) can be done using
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Global
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> window.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> ​~ Bhupesh​
>>>>>>>>>>>>>
>>>>>>>>>>>>>> ______________________________
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> _________________________
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Bhupesh Chawda
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> E: bhupesh@datatorrent.com | Twitter: @bhupeshsc
>>>>>>>>>>>
>>>>>>>>>>>> www.datatorrent.com  |  apex.apache.org
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> On Mon, Feb 27, 2017 at 12:18 PM, David Yan <
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> davidyan@gmail.com>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> I'm worried that we are making the watermark concept
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> too
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> complicated.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Watermarks should simply just tell you what windows
>>>>>>>>>>>>>
>>>>>>>>>>>>>> can
>>>>>>>>>>>>>>>>>>> be
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> considered
>>>>>>>>>>>>
>>>>>>>>>>>>> complete.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Point 2 is basically a count-based window.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Watermarks
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> do
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> not
>>>>>>>>>>>
>>>>>>>>>>>> play a
>>>>>>>>>>>>>
>>>>>>>>>>>>>> role
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> here because the window is always complete at the
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> n-th
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> tuple.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> If I understand correctly, point 3 is for batch
>>>>>>>>>>>
>>>>>>>>>>>> processing
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> of
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> files.
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Unless
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> the files contain timed events, it sounds to be that
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> this
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> can
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> be
>>>>>>>>>>>>>
>>>>>>>>>>>>>> achieved
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> with just a Global Window. For signaling EOF, a
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> watermark
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> with
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> a
>>>>>>>>>>>>>
>>>>>>>>>>>>>> +infinity
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> timestamp can be used so that triggers will be fired
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> upon
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> receipt
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> of
>>>>>>>>>>>>>
>>>>>>>>>>>>>> that
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> watermark.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> For point 4, just like what I mentioned above, can
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> be
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> achieved
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> with a
>>>>>>>>>>>
>>>>>>>>>>>> watermark with a +infinity timestamp.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> David
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> On Sat, Feb 18, 2017 at 8:04 AM, Bhupesh Chawda <
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> bhupesh@datatorrent.com
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Hi Thomas,
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> For an input operator which is supposed to
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> generate
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> watermarks
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> for
>>>>>>>>>>>
>>>>>>>>>>>> downstream operators, I can think about the
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> following
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> watermarks
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> that
>>>>>>>>>>>>>
>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> operator can emit:
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> 1. Time based watermarks (the high watermark / low
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> watermark)
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> 2. Number of tuple based watermarks (Every n
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> tuples)
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> 3. File based watermarks (Start file, end file)
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> 4. Final watermark
>>>>>>>>>>>
>>>>>>>>>>>> File based watermarks seem to be applicable for
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> batch
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> (file
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> based)
>>>>>>>>>>>>>
>>>>>>>>>>>>>> as
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> well,
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> and hence I thought of looking at these first.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Does
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> this
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> seem
>>>>>>>>>>>
>>>>>>>>>>>> to
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> be
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> in
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> line
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> with the thought process?
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> ~ Bhupesh
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> ______________________________
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> _________________________
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Bhupesh Chawda
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Software Engineer
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> E: bhupesh@datatorrent.com | Twitter: @bhupeshsc
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> www.datatorrent.com  |  apex.apache.org
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> On Thu, Feb 16, 2017 at 10:37 AM, Thomas Weise <
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> thw@apache.org
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> I don't think this should be designed based on a
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> simplistic
>>>>>>>>>>>>>>>>>>>>> file
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> input-output scenario. It would be good to
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> include a
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> stateful
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> transformation based on event time.
>>>>>>>>>>>>>
>>>>>>>>>>>>>> More complex pipelines contain stateful
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> transformations
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> that
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> depend
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> on
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> windowing and watermarks. I think we need a
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> watermark
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> concept
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> that
>>>>>>>>>>>>>
>>>>>>>>>>>>>> is
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> based
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> on progress in event time (or other monotonic
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> increasing
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> sequence)
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> that
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> other operators can generically work with.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Note that even file input in many cases can
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> produce
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> time
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> based
>>>>>>>>>>>>>
>>>>>>>>>>>>>> watermarks,
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> for example when you read part files that are
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> bound
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> by
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> event
>>>>>>>>>>>>
>>>>>>>>>>>>> time.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Thomas
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> On Wed, Feb 15, 2017 at 4:02 AM, Bhupesh Chawda
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> <
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> bhupesh@datatorrent.com
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message