apex-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From AJAY GUPTA <ajaygit...@gmail.com>
Subject Re: [DISCUSS] Proposal for adapting Malhar operators for batch use cases
Date Fri, 28 Apr 2017 17:35:07 GMT
Vlad,

The approach you suggested doesn't work because the CSVParser outputs
Object Data Type irrespective of the POJO class being emitted.


Ajay

On Fri, Apr 28, 2017 at 8:13 PM, Vlad Rozov <v.rozov@datatorrent.com> wrote:

> Make your POJO class implement WindowedOperator Tuple interface (it may
> return itself in getValue()).
>
> Thank you,
>
> Vlad
>
> On 4/28/17 02:44, AJAY GUPTA wrote:
>
>> Hi All,
>>
>> I am creating an application which is using Windowed Operator. This
>> application involves CsvParser operator emitting a POJO object which is to
>> be passed as input to WindowedOperator. The WindowedOperator requires an
>> instance of Tuple class as input :
>> *public final transient DefaultInputPort<Tuple<InputT>>
>> input = new DefaultInputPort<Tuple<InputT>>() *
>>
>> Due to this, the addStream cannot work as the output of CsvParser's output
>> port is not compatible with input port type of WindowedOperator.
>> One way to solve this problem is to have an operator between the above two
>> operators as a convertor.
>> I would like to know if there is any other more generic approach to solve
>> this problem without writing a new Operator for every new application
>> using
>> Windowed Operators.
>>
>> Thanks,
>> Ajay
>>
>>
>>
>> On Thu, Mar 23, 2017 at 5:25 PM, Bhupesh Chawda <bhupesh@datatorrent.com>
>> wrote:
>>
>> Hi All,
>>>
>>> I think we have some agreement on the way we should use control tuples
>>> for
>>> File I/O operators to support batch.
>>>
>>> In order to have more operators in Malhar, support this paradigm, I think
>>> we should also look at store operators - JDBC, Cassandra, HBase etc.
>>> The case with these operators is simpler as most of these do not poll the
>>> sources (except JDBC poller operator) and just stop once they have read a
>>> fixed amount of data. In other words, these are inherently batch sources.
>>> The only change that we should add to these operators is to shut down the
>>> DAG once the reading of data is done. For a windowed operator this would
>>> mean a Global window with a final watermark before the DAG is shut down.
>>>
>>> ~ Bhupesh
>>>
>>>
>>> _______________________________________________________
>>>
>>> Bhupesh Chawda
>>>
>>> E: bhupesh@datatorrent.com | Twitter: @bhupeshsc
>>>
>>> www.datatorrent.com  |  apex.apache.org
>>>
>>>
>>>
>>> On Tue, Feb 28, 2017 at 10:59 PM, Bhupesh Chawda <
>>> bhupesh@datatorrent.com>
>>> wrote:
>>>
>>> Hi Thomas,
>>>>
>>>> Even though the windowing operator is not just "event time", it seems it
>>>> is too much dependent on the "time" attribute of the incoming tuple.
>>>> This
>>>> is the reason we had to model the file index as a timestamp to solve the
>>>> batch case for files.
>>>> Perhaps we should work on increasing the scope of the windowed operator
>>>>
>>> to
>>>
>>>> consider other types of windows as well. The Sequence option suggested
>>>> by
>>>> David seems to be something in that direction.
>>>>
>>>> ~ Bhupesh
>>>>
>>>>
>>>> _______________________________________________________
>>>>
>>>> Bhupesh Chawda
>>>>
>>>> E: bhupesh@datatorrent.com | Twitter: @bhupeshsc
>>>>
>>>> www.datatorrent.com  |  apex.apache.org
>>>>
>>>>
>>>>
>>>> On Tue, Feb 28, 2017 at 10:48 PM, Thomas Weise <thw@apache.org> wrote:
>>>>
>>>> That's correct, we are looking at a generalized approach for state
>>>>> management vs. a series of special cases.
>>>>>
>>>>> And to be clear, windowing does not imply event time, otherwise it
>>>>> would
>>>>> be
>>>>> "EventTimeOperator" :-)
>>>>>
>>>>> Thomas
>>>>>
>>>>> On Tue, Feb 28, 2017 at 9:11 AM, Bhupesh Chawda <
>>>>>
>>>> bhupesh@datatorrent.com>
>>>
>>>> wrote:
>>>>>
>>>>> Hi David,
>>>>>>
>>>>>> I went through the discussion, but it seems like it is more on the
>>>>>>
>>>>> event
>>>
>>>> time watermark handling as opposed to batches. What we are trying to
>>>>>>
>>>>> do
>>>
>>>> is
>>>>>
>>>>>> have watermarks serve the purpose of demarcating batches using control
>>>>>> tuples. Since each batch is separate from others, we would like to
>>>>>>
>>>>> have
>>>
>>>> stateful processing within a batch, but not across batches.
>>>>>> At the same time, we would like to do this in a manner which is
>>>>>>
>>>>> consistent
>>>>>
>>>>>> with the windowing mechanism provided by the windowed operator. This
>>>>>>
>>>>> will
>>>>>
>>>>>> allow us to treat a single batch as a (bounded) stream and apply
all
>>>>>>
>>>>> the
>>>
>>>> event time windowing concepts in that time span.
>>>>>>
>>>>>> For example, let's say I need to process data for a day (24 hours)
as
>>>>>>
>>>>> a
>>>
>>>> single batch. The application is still streaming in nature: it would
>>>>>>
>>>>> end
>>>
>>>> the batch after a day and start a new batch the next day. At the same
>>>>>>
>>>>> time,
>>>>>
>>>>>> I would be able to have early trigger firings every minute as well
as
>>>>>>
>>>>> drop
>>>>>
>>>>>> any data which is say, 5 mins late. All this within a single day.
>>>>>>
>>>>>> ~ Bhupesh
>>>>>>
>>>>>>
>>>>>>
>>>>>> _______________________________________________________
>>>>>>
>>>>>> Bhupesh Chawda
>>>>>>
>>>>>> E: bhupesh@datatorrent.com | Twitter: @bhupeshsc
>>>>>>
>>>>>> www.datatorrent.com  |  apex.apache.org
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Tue, Feb 28, 2017 at 9:27 PM, David Yan <davidyan@gmail.com>
>>>>>>
>>>>> wrote:
>>>
>>>> There is a discussion in the Flink mailing list about key-based
>>>>>>>
>>>>>> watermarks.
>>>>>>
>>>>>>> I think it's relevant to our use case here.
>>>>>>> https://lists.apache.org/thread.html/2b90d5b1d5e2654212cfbbcc6510ef
>>>>>>> 424bbafc4fadb164bd5aff9216@%3Cdev.flink.apache.org%3E
>>>>>>>
>>>>>>> David
>>>>>>>
>>>>>>> On Tue, Feb 28, 2017 at 2:13 AM, Bhupesh Chawda <
>>>>>>>
>>>>>> bhupesh@datatorrent.com
>>>>>
>>>>>> wrote:
>>>>>>>
>>>>>>> Hi David,
>>>>>>>>
>>>>>>>> If using time window does not seem appropriate, we can have
>>>>>>>>
>>>>>>> another
>>>
>>>> class
>>>>>>
>>>>>>> which is more suited for such sequential and distinct windows.
>>>>>>>>
>>>>>>> Perhaps, a
>>>>>>
>>>>>>> CustomWindow option can be introduced which takes in a window
id.
>>>>>>>>
>>>>>>> The
>>>>>
>>>>>> purpose of this window option could be to translate the window id
>>>>>>>>
>>>>>>> into
>>>>>
>>>>>> appropriate timestamps.
>>>>>>>>
>>>>>>>> Another option would be to go with a custom timestampExtractor
for
>>>>>>>>
>>>>>>> such
>>>>>
>>>>>> tuples which translates the each unique file name to a distinct
>>>>>>>>
>>>>>>> timestamp
>>>>>>
>>>>>>> while using time windows in the windowed operator.
>>>>>>>>
>>>>>>>> ~ Bhupesh
>>>>>>>>
>>>>>>>>
>>>>>>>> _______________________________________________________
>>>>>>>>
>>>>>>>> Bhupesh Chawda
>>>>>>>>
>>>>>>>> E: bhupesh@datatorrent.com | Twitter: @bhupeshsc
>>>>>>>>
>>>>>>>> www.datatorrent.com  |  apex.apache.org
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Tue, Feb 28, 2017 at 12:28 AM, David Yan <davidyan@gmail.com>
>>>>>>>>
>>>>>>> wrote:
>>>>>>
>>>>>>> I now see your rationale on putting the filename in the window.
>>>>>>>>> As far as I understand, the reasons why the filename
is not part
>>>>>>>>>
>>>>>>>> of
>>>>>
>>>>>> the
>>>>>>
>>>>>>> key
>>>>>>>>
>>>>>>>>> and the Global Window is not used are:
>>>>>>>>>
>>>>>>>>> 1) The files are processed in sequence, not in parallel
>>>>>>>>> 2) The windowed operator should not keep the state associated
>>>>>>>>>
>>>>>>>> with
>>>
>>>> the
>>>>>>
>>>>>>> file
>>>>>>>>
>>>>>>>>> when the processing of the file is done
>>>>>>>>> 3) The trigger should be fired for the file when a file
is done
>>>>>>>>>
>>>>>>>> processing.
>>>>>>>>
>>>>>>>>> However, if the file is just a sequence has nothing to
do with a
>>>>>>>>>
>>>>>>>> timestamp,
>>>>>>>>
>>>>>>>>> assigning a timestamp to a file is not an intuitive thing
to do
>>>>>>>>>
>>>>>>>> and
>>>>>
>>>>>> would
>>>>>>>
>>>>>>>> just create confusions to the users, especially when it's
used
>>>>>>>>>
>>>>>>>> as
>>>
>>>> an
>>>>>
>>>>>> example for new users.
>>>>>>>>>
>>>>>>>>> How about having a separate class called SequenceWindow?
And
>>>>>>>>>
>>>>>>>> perhaps
>>>>>
>>>>>> TimeWindow can inherit from it?
>>>>>>>>>
>>>>>>>>> David
>>>>>>>>>
>>>>>>>>> On Mon, Feb 27, 2017 at 8:58 AM, Thomas Weise <thw@apache.org>
>>>>>>>>>
>>>>>>>> wrote:
>>>>>>
>>>>>>> On Mon, Feb 27, 2017 at 8:50 AM, Bhupesh Chawda <
>>>>>>>>>>
>>>>>>>>> bhupesh@datatorrent.com
>>>>>>>>
>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>> I think my comments related to count based windows
might be
>>>>>>>>>>>
>>>>>>>>>> causing
>>>>>>
>>>>>>> confusion. Let's not discuss count based scenarios for now.
>>>>>>>>>>>
>>>>>>>>>>> Just want to make sure we are on the same page
wrt. the
>>>>>>>>>>>
>>>>>>>>>> "each
>>>
>>>> file
>>>>>>
>>>>>>> is a
>>>>>>>>
>>>>>>>>> batch" use case. As mentioned by Thomas, the each tuple
from
>>>>>>>>>>>
>>>>>>>>>> the
>>>>>
>>>>>> same
>>>>>>>
>>>>>>>> file
>>>>>>>>>>
>>>>>>>>>>> has the same timestamp (which is just a sequence
number) and
>>>>>>>>>>>
>>>>>>>>>> that
>>>>>
>>>>>> helps
>>>>>>>>
>>>>>>>>> keep tuples from each file in a separate window.
>>>>>>>>>>>
>>>>>>>>>>> Yes, in this case it is a sequence number, but
it could be a
>>>>>>>>>>
>>>>>>>>> time
>>>>>
>>>>>> stamp
>>>>>>>
>>>>>>>> also, depending on the file naming convention. And if it
was
>>>>>>>>>>
>>>>>>>>> event
>>>>>
>>>>>> time
>>>>>>>
>>>>>>>> processing, the watermark would be derived from records within
>>>>>>>>>>
>>>>>>>>> the
>>>>>
>>>>>> file.
>>>>>>>>
>>>>>>>>> Agreed, the source should have a mechanism to control
the time
>>>>>>>>>>
>>>>>>>>> stamp
>>>>>>
>>>>>>> extraction along with everything else pertaining to the
>>>>>>>>>>
>>>>>>>>> watermark
>>>>>
>>>>>> generation.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> We could also implement a "timestampExtractor" interface
to
>>>>>>>>>>>
>>>>>>>>>> identify
>>>>>>>
>>>>>>>> the
>>>>>>>>>
>>>>>>>>>> timestamp (sequence number) for a file.
>>>>>>>>>>>
>>>>>>>>>>> ~ Bhupesh
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> _______________________________________________________
>>>>>>>>>>>
>>>>>>>>>>> Bhupesh Chawda
>>>>>>>>>>>
>>>>>>>>>>> E: bhupesh@datatorrent.com | Twitter: @bhupeshsc
>>>>>>>>>>>
>>>>>>>>>>> www.datatorrent.com  |  apex.apache.org
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Mon, Feb 27, 2017 at 9:52 PM, Thomas Weise
<
>>>>>>>>>>>
>>>>>>>>>> thw@apache.org
>>>
>>>> wrote:
>>>>>>>>
>>>>>>>>> I don't think this is a use case for count based window.
>>>>>>>>>>>>
>>>>>>>>>>>> We have multiple files that are retrieved
in a sequence
>>>>>>>>>>>>
>>>>>>>>>>> and
>>>
>>>> there
>>>>>>
>>>>>>> is
>>>>>>>>
>>>>>>>>> no
>>>>>>>>>
>>>>>>>>>> knowledge of the number of records per file. The
>>>>>>>>>>>>
>>>>>>>>>>> requirement is
>>>>>
>>>>>> to
>>>>>>>
>>>>>>>> aggregate each file separately and emit the aggregate when
>>>>>>>>>>>>
>>>>>>>>>>> the
>>>>>
>>>>>> file
>>>>>>>
>>>>>>>> is
>>>>>>>>>
>>>>>>>>>> read
>>>>>>>>>>>
>>>>>>>>>>>> fully. There is no concept of "end of something"
for an
>>>>>>>>>>>>
>>>>>>>>>>> individual
>>>>>>>
>>>>>>>> key
>>>>>>>>>
>>>>>>>>>> and
>>>>>>>>>>>
>>>>>>>>>>>> global window isn't applicable.
>>>>>>>>>>>>
>>>>>>>>>>>> However, as already explained and implemented
by Bhupesh,
>>>>>>>>>>>>
>>>>>>>>>>> this
>>>>>
>>>>>> can
>>>>>>>
>>>>>>>> be
>>>>>>>>
>>>>>>>>> solved using watermark and window (in this case the window
>>>>>>>>>>>>
>>>>>>>>>>> timestamp
>>>>>>>>
>>>>>>>>> isn't
>>>>>>>>>>>
>>>>>>>>>>>> a timestamp, but a file sequence, but that
doesn't matter.
>>>>>>>>>>>>
>>>>>>>>>>>> Thomas
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Mon, Feb 27, 2017 at 8:05 AM, David Yan
<
>>>>>>>>>>>>
>>>>>>>>>>> davidyan@gmail.com
>>>>>
>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> I don't think this is the way to go. Global Window
only
>>>>>>>>>>>>>
>>>>>>>>>>>> means
>>>>>
>>>>>> the
>>>>>>>
>>>>>>>> timestamp
>>>>>>>>>>>>
>>>>>>>>>>>>> does not matter (or that there is no
timestamp). It does
>>>>>>>>>>>>>
>>>>>>>>>>>> not
>>>>>
>>>>>> necessarily
>>>>>>>>>>>
>>>>>>>>>>>> mean it's a large batch. Unless there is
some notion of
>>>>>>>>>>>>>
>>>>>>>>>>>> event
>>>>>
>>>>>> time
>>>>>>>>
>>>>>>>>> for
>>>>>>>>>>
>>>>>>>>>>> each
>>>>>>>>>>>>
>>>>>>>>>>>>> file, you don't want to embed the file
into the window
>>>>>>>>>>>>>
>>>>>>>>>>>> itself.
>>>>>>
>>>>>>> If you want the result broken up by file name, and if
>>>>>>>>>>>>>
>>>>>>>>>>>> the
>>>
>>>> files
>>>>>>
>>>>>>> are
>>>>>>>>
>>>>>>>>> to
>>>>>>>>>>
>>>>>>>>>>> be
>>>>>>>>>>>
>>>>>>>>>>>> processed in parallel, I think making the
file name be
>>>>>>>>>>>>>
>>>>>>>>>>>> part
>>>>>
>>>>>> of
>>>>>>
>>>>>>> the
>>>>>>>>
>>>>>>>>> key
>>>>>>>>>>
>>>>>>>>>>> is
>>>>>>>>>>>
>>>>>>>>>>>> the way to go. I think it's very confusing
if we somehow
>>>>>>>>>>>>>
>>>>>>>>>>>> make
>>>>>
>>>>>> the
>>>>>>>
>>>>>>>> file
>>>>>>>>>>
>>>>>>>>>>> to
>>>>>>>>>>>
>>>>>>>>>>>> be part of the window.
>>>>>>>>>>>>>
>>>>>>>>>>>>> For count-based window, it's not implemented
yet and
>>>>>>>>>>>>>
>>>>>>>>>>>> you're
>>>>>
>>>>>> welcome
>>>>>>>>
>>>>>>>>> to
>>>>>>>>>>
>>>>>>>>>>> add
>>>>>>>>>>>>
>>>>>>>>>>>>> that feature. In case of count-based
windows, there
>>>>>>>>>>>>>
>>>>>>>>>>>> would
>>>
>>>> be
>>>>>
>>>>>> no
>>>>>>
>>>>>>> notion
>>>>>>>>>>
>>>>>>>>>>> of
>>>>>>>>>>>
>>>>>>>>>>>> time and you probably only trigger at the
end of each
>>>>>>>>>>>>>
>>>>>>>>>>>> window.
>>>>>
>>>>>> In
>>>>>>>
>>>>>>>> the
>>>>>>>>>
>>>>>>>>>> case
>>>>>>>>>>>
>>>>>>>>>>>> of count-based windows, the watermark only
matters for
>>>>>>>>>>>>>
>>>>>>>>>>>> batch
>>>>>
>>>>>> since
>>>>>>>>
>>>>>>>>> you
>>>>>>>>>>
>>>>>>>>>>> need
>>>>>>>>>>>>
>>>>>>>>>>>>> a way to know when the batch has ended
(if the count is
>>>>>>>>>>>>>
>>>>>>>>>>>> 10,
>>>>>
>>>>>> the
>>>>>>
>>>>>>> number
>>>>>>>>>>
>>>>>>>>>>> of
>>>>>>>>>>>
>>>>>>>>>>>> tuples in the batch is let's say 105, you
need a way to
>>>>>>>>>>>>>
>>>>>>>>>>>> end
>>>>>
>>>>>> the
>>>>>>
>>>>>>> last
>>>>>>>>>
>>>>>>>>>> window
>>>>>>>>>>>>
>>>>>>>>>>>>> with 5 tuples).
>>>>>>>>>>>>>
>>>>>>>>>>>>> David
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Mon, Feb 27, 2017 at 2:41 AM, Bhupesh
Chawda <
>>>>>>>>>>>>>
>>>>>>>>>>>> bhupesh@datatorrent.com
>>>>>>>>>>>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>> Hi David,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thanks for your comments.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> The wordcount example that I created
based on the
>>>>>>>>>>>>>>
>>>>>>>>>>>>> windowed
>>>>>
>>>>>> operator
>>>>>>>>>
>>>>>>>>>> does
>>>>>>>>>>>>
>>>>>>>>>>>>> processing of word counts per file (each
file as a
>>>>>>>>>>>>>>
>>>>>>>>>>>>> separate
>>>>>
>>>>>> batch),
>>>>>>>>>
>>>>>>>>>> i.e.
>>>>>>>>>>>>
>>>>>>>>>>>>> process counts for each file and dump
into separate
>>>>>>>>>>>>>>
>>>>>>>>>>>>> files.
>>>>>
>>>>>> As I understand Global window is for one large batch;
>>>>>>>>>>>>>>
>>>>>>>>>>>>> i.e.
>>>>>
>>>>>> all
>>>>>>>
>>>>>>>> incoming
>>>>>>>>>>>
>>>>>>>>>>>> data falls into the same batch. This could
not be
>>>>>>>>>>>>>>
>>>>>>>>>>>>> processed
>>>>>
>>>>>> using
>>>>>>>>
>>>>>>>>> GlobalWindow option as we need more than one windows.
>>>>>>>>>>>>>>
>>>>>>>>>>>>> In
>>>
>>>> this
>>>>>>
>>>>>>> case, I
>>>>>>>>>>
>>>>>>>>>>> configured the windowed operator to have time
windows
>>>>>>>>>>>>>>
>>>>>>>>>>>>> of
>>>
>>>> 1ms
>>>>>>
>>>>>>> each
>>>>>>>>
>>>>>>>>> and
>>>>>>>>>>
>>>>>>>>>>> passed data for each file with increasing timestamps:
>>>>>>>>>>>>>>
>>>>>>>>>>>>> (file1,
>>>>>>
>>>>>>> 1),
>>>>>>>>
>>>>>>>>> (file2,
>>>>>>>>>>>>
>>>>>>>>>>>>> 2) and so on. Is there a better way of
handling this
>>>>>>>>>>>>>>
>>>>>>>>>>>>> scenario?
>>>>>>>
>>>>>>>> Regarding (2 - count based windows), I think there is
>>>>>>>>>>>>>>
>>>>>>>>>>>>> a
>>>
>>>> trigger
>>>>>>>
>>>>>>>> option
>>>>>>>>>>>
>>>>>>>>>>>> to
>>>>>>>>>>>>
>>>>>>>>>>>>> process count based windows. In case
I want to process
>>>>>>>>>>>>>>
>>>>>>>>>>>>> every
>>>>>>
>>>>>>> 1000
>>>>>>>>
>>>>>>>>> tuples
>>>>>>>>>>>>
>>>>>>>>>>>>> as
>>>>>>>>>>>>>
>>>>>>>>>>>>>> a batch, I could set the Trigger
option to
>>>>>>>>>>>>>>
>>>>>>>>>>>>> CountTrigger
>>>
>>>> with
>>>>>>
>>>>>>> the
>>>>>>>>
>>>>>>>>> accumulation set to Discarding. Is this correct?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I agree that (4. Final Watermark)
can be done using
>>>>>>>>>>>>>>
>>>>>>>>>>>>> Global
>>>>>
>>>>>> window.
>>>>>>>>>
>>>>>>>>>> ​~ Bhupesh​
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> ______________________________
>>>>>>>>>>>>>>
>>>>>>>>>>>>> _________________________
>>>
>>>> Bhupesh Chawda
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> E: bhupesh@datatorrent.com | Twitter:
@bhupeshsc
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> www.datatorrent.com  |  apex.apache.org
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Mon, Feb 27, 2017 at 12:18 PM,
David Yan <
>>>>>>>>>>>>>>
>>>>>>>>>>>>> davidyan@gmail.com>
>>>>>>>>
>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> I'm worried that we are making the watermark
concept
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>> too
>>>>>
>>>>>> complicated.
>>>>>>>>>>>
>>>>>>>>>>>> Watermarks should simply just tell you what
windows
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>> can
>>>>>
>>>>>> be
>>>>>>
>>>>>>> considered
>>>>>>>>>>>
>>>>>>>>>>>> complete.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Point 2 is basically a count-based
window.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Watermarks
>>>
>>>> do
>>>>>
>>>>>> not
>>>>>>>
>>>>>>>> play a
>>>>>>>>>>
>>>>>>>>>>> role
>>>>>>>>>>>>>
>>>>>>>>>>>>>> here because the window is always
complete at the
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>> n-th
>>>
>>>> tuple.
>>>>>>>
>>>>>>>> If I understand correctly, point 3 is for batch
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>> processing
>>>>>>
>>>>>>> of
>>>>>>>
>>>>>>>> files.
>>>>>>>>>>>
>>>>>>>>>>>> Unless
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> the files contain timed events,
it sounds to be that
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>> this
>>>>>
>>>>>> can
>>>>>>>
>>>>>>>> be
>>>>>>>>>
>>>>>>>>>> achieved
>>>>>>>>>>>>>
>>>>>>>>>>>>>> with just a Global Window. For signaling
EOF, a
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>> watermark
>>>>>
>>>>>> with
>>>>>>>>
>>>>>>>>> a
>>>>>>>>>
>>>>>>>>>> +infinity
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> timestamp can be used so that
triggers will be fired
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>> upon
>>>>>
>>>>>> receipt
>>>>>>>>>
>>>>>>>>>> of
>>>>>>>>>>>
>>>>>>>>>>>> that
>>>>>>>>>>>>>
>>>>>>>>>>>>>> watermark.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> For point 4, just like what I
mentioned above, can
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>> be
>>>
>>>> achieved
>>>>>>>>
>>>>>>>>> with a
>>>>>>>>>>>
>>>>>>>>>>>> watermark with a +infinity timestamp.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> David
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Sat, Feb 18, 2017 at 8:04
AM, Bhupesh Chawda <
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>> bhupesh@datatorrent.com
>>>>>>>>>>>>>
>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Hi Thomas,
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> For an input operator which
is supposed to
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> generate
>>>
>>>> watermarks
>>>>>>>>>
>>>>>>>>>> for
>>>>>>>>>>>
>>>>>>>>>>>> downstream operators, I can think about the
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> following
>>>>>
>>>>>> watermarks
>>>>>>>>>>
>>>>>>>>>>> that
>>>>>>>>>>>>
>>>>>>>>>>>>> the
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> operator can emit:
>>>>>>>>>>>>>>>> 1. Time based watermarks
(the high watermark / low
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> watermark)
>>>>>>>>
>>>>>>>>> 2. Number of tuple based watermarks (Every n
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> tuples)
>>>
>>>> 3. File based watermarks (Start file, end file)
>>>>>>>>>>>>>>>> 4. Final watermark
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> File based watermarks seem
to be applicable for
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> batch
>>>>>
>>>>>> (file
>>>>>>>
>>>>>>>> based)
>>>>>>>>>>>
>>>>>>>>>>>> as
>>>>>>>>>>>>
>>>>>>>>>>>>> well,
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> and hence I thought of looking
at these first.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Does
>>>
>>>> this
>>>>>>
>>>>>>> seem
>>>>>>>>
>>>>>>>>> to
>>>>>>>>>>
>>>>>>>>>>> be
>>>>>>>>>>>
>>>>>>>>>>>> in
>>>>>>>>>>>>>
>>>>>>>>>>>>>> line
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> with the thought process?
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> ~ Bhupesh
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> ______________________________
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> _________________________
>>>>>>
>>>>>>> Bhupesh Chawda
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Software Engineer
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> E: bhupesh@datatorrent.com
| Twitter: @bhupeshsc
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> www.datatorrent.com  |  apex.apache.org
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Thu, Feb 16, 2017 at 10:37
AM, Thomas Weise <
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> thw@apache.org
>>>>>>>>>
>>>>>>>>>> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> I don't think this should be designed
based on a
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> simplistic
>>>>>>>>
>>>>>>>>> file
>>>>>>>>>>>
>>>>>>>>>>>> input-output scenario. It would be good to
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> include a
>>>>>
>>>>>> stateful
>>>>>>>>>
>>>>>>>>>> transformation based on event time.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> More complex pipelines
contain stateful
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> transformations
>>>>>>
>>>>>>> that
>>>>>>>>>
>>>>>>>>>> depend
>>>>>>>>>>>>
>>>>>>>>>>>>> on
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> windowing and watermarks. I think
we need a
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> watermark
>>>>>
>>>>>> concept
>>>>>>>>>
>>>>>>>>>> that
>>>>>>>>>>>>
>>>>>>>>>>>>> is
>>>>>>>>>>>>>
>>>>>>>>>>>>>> based
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> on progress in event
time (or other monotonic
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> increasing
>>>>>>>
>>>>>>>> sequence)
>>>>>>>>>>>>
>>>>>>>>>>>>> that
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> other operators can generically
work with.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Note that even file input
in many cases can
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> produce
>>>>>
>>>>>> time
>>>>>>>
>>>>>>>> based
>>>>>>>>>>
>>>>>>>>>>> watermarks,
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> for example when you
read part files that are
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> bound
>>>>>
>>>>>> by
>>>>>>
>>>>>>> event
>>>>>>>>>
>>>>>>>>>> time.
>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>>> Thomas
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> On Wed, Feb 15, 2017
at 4:02 AM, Bhupesh Chawda
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> <
>>>
>>>> bhupesh@datatorrent.com
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message