apex-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Thomas Weise <...@apache.org>
Subject Re: [DISCUSS] Proposal for adapting Malhar operators for batch use cases
Date Mon, 27 Feb 2017 16:58:59 GMT
On Mon, Feb 27, 2017 at 8:50 AM, Bhupesh Chawda <bhupesh@datatorrent.com>
wrote:

> I think my comments related to count based windows might be causing
> confusion. Let's not discuss count based scenarios for now.
>
> Just want to make sure we are on the same page wrt. the "each file is a
> batch" use case. As mentioned by Thomas, the each tuple from the same file
> has the same timestamp (which is just a sequence number) and that helps
> keep tuples from each file in a separate window.
>

Yes, in this case it is a sequence number, but it could be a time stamp
also, depending on the file naming convention. And if it was event time
processing, the watermark would be derived from records within the file.

Agreed, the source should have a mechanism to control the time stamp
extraction along with everything else pertaining to the watermark
generation.


> We could also implement a "timestampExtractor" interface to identify the
> timestamp (sequence number) for a file.
>
> ~ Bhupesh
>
>
> _______________________________________________________
>
> Bhupesh Chawda
>
> E: bhupesh@datatorrent.com | Twitter: @bhupeshsc
>
> www.datatorrent.com  |  apex.apache.org
>
>
>
> On Mon, Feb 27, 2017 at 9:52 PM, Thomas Weise <thw@apache.org> wrote:
>
> > I don't think this is a use case for count based window.
> >
> > We have multiple files that are retrieved in a sequence and there is no
> > knowledge of the number of records per file. The requirement is to
> > aggregate each file separately and emit the aggregate when the file is
> read
> > fully. There is no concept of "end of something" for an individual key
> and
> > global window isn't applicable.
> >
> > However, as already explained and implemented by Bhupesh, this can be
> > solved using watermark and window (in this case the window timestamp
> isn't
> > a timestamp, but a file sequence, but that doesn't matter.
> >
> > Thomas
> >
> >
> > On Mon, Feb 27, 2017 at 8:05 AM, David Yan <davidyan@gmail.com> wrote:
> >
> > > I don't think this is the way to go. Global Window only means the
> > timestamp
> > > does not matter (or that there is no timestamp). It does not
> necessarily
> > > mean it's a large batch. Unless there is some notion of event time for
> > each
> > > file, you don't want to embed the file into the window itself.
> > >
> > > If you want the result broken up by file name, and if the files are to
> be
> > > processed in parallel, I think making the file name be part of the key
> is
> > > the way to go. I think it's very confusing if we somehow make the file
> to
> > > be part of the window.
> > >
> > > For count-based window, it's not implemented yet and you're welcome to
> > add
> > > that feature. In case of count-based windows, there would be no notion
> of
> > > time and you probably only trigger at the end of each window. In the
> case
> > > of count-based windows, the watermark only matters for batch since you
> > need
> > > a way to know when the batch has ended (if the count is 10, the number
> of
> > > tuples in the batch is let's say 105, you need a way to end the last
> > window
> > > with 5 tuples).
> > >
> > > David
> > >
> > > On Mon, Feb 27, 2017 at 2:41 AM, Bhupesh Chawda <
> bhupesh@datatorrent.com
> > >
> > > wrote:
> > >
> > > > Hi David,
> > > >
> > > > Thanks for your comments.
> > > >
> > > > The wordcount example that I created based on the windowed operator
> > does
> > > > processing of word counts per file (each file as a separate batch),
> > i.e.
> > > > process counts for each file and dump into separate files.
> > > > As I understand Global window is for one large batch; i.e. all
> incoming
> > > > data falls into the same batch. This could not be processed using
> > > > GlobalWindow option as we need more than one windows. In this case, I
> > > > configured the windowed operator to have time windows of 1ms each and
> > > > passed data for each file with increasing timestamps: (file1, 1),
> > (file2,
> > > > 2) and so on. Is there a better way of handling this scenario?
> > > >
> > > > Regarding (2 - count based windows), I think there is a trigger
> option
> > to
> > > > process count based windows. In case I want to process every 1000
> > tuples
> > > as
> > > > a batch, I could set the Trigger option to CountTrigger with the
> > > > accumulation set to Discarding. Is this correct?
> > > >
> > > > I agree that (4. Final Watermark) can be done using Global window.
> > > >
> > > > ​~ Bhupesh​
> > > >
> > > > _______________________________________________________
> > > >
> > > > Bhupesh Chawda
> > > >
> > > > E: bhupesh@datatorrent.com | Twitter: @bhupeshsc
> > > >
> > > > www.datatorrent.com  |  apex.apache.org
> > > >
> > > >
> > > >
> > > > On Mon, Feb 27, 2017 at 12:18 PM, David Yan <davidyan@gmail.com>
> > wrote:
> > > >
> > > > > I'm worried that we are making the watermark concept too
> complicated.
> > > > >
> > > > > Watermarks should simply just tell you what windows can be
> considered
> > > > > complete.
> > > > >
> > > > > Point 2 is basically a count-based window. Watermarks do not play
a
> > > role
> > > > > here because the window is always complete at the n-th tuple.
> > > > >
> > > > > If I understand correctly, point 3 is for batch processing of
> files.
> > > > Unless
> > > > > the files contain timed events, it sounds to be that this can be
> > > achieved
> > > > > with just a Global Window. For signaling EOF, a watermark with a
> > > > +infinity
> > > > > timestamp can be used so that triggers will be fired upon receipt
> of
> > > that
> > > > > watermark.
> > > > >
> > > > > For point 4, just like what I mentioned above, can be achieved
> with a
> > > > > watermark with a +infinity timestamp.
> > > > >
> > > > > David
> > > > >
> > > > >
> > > > >
> > > > >
> > > > > On Sat, Feb 18, 2017 at 8:04 AM, Bhupesh Chawda <
> > > bhupesh@datatorrent.com
> > > > >
> > > > > wrote:
> > > > >
> > > > > > Hi Thomas,
> > > > > >
> > > > > > For an input operator which is supposed to generate watermarks
> for
> > > > > > downstream operators, I can think about the following watermarks
> > that
> > > > the
> > > > > > operator can emit:
> > > > > > 1. Time based watermarks (the high watermark / low watermark)
> > > > > > 2. Number of tuple based watermarks (Every n tuples)
> > > > > > 3. File based watermarks (Start file, end file)
> > > > > > 4. Final watermark
> > > > > >
> > > > > > File based watermarks seem to be applicable for batch (file
> based)
> > as
> > > > > well,
> > > > > > and hence I thought of looking at these first. Does this seem
to
> be
> > > in
> > > > > line
> > > > > > with the thought process?
> > > > > >
> > > > > > ~ Bhupesh
> > > > > >
> > > > > >
> > > > > >
> > > > > > _______________________________________________________
> > > > > >
> > > > > > Bhupesh Chawda
> > > > > >
> > > > > > Software Engineer
> > > > > >
> > > > > > E: bhupesh@datatorrent.com | Twitter: @bhupeshsc
> > > > > >
> > > > > > www.datatorrent.com  |  apex.apache.org
> > > > > >
> > > > > >
> > > > > >
> > > > > > On Thu, Feb 16, 2017 at 10:37 AM, Thomas Weise <thw@apache.org>
> > > wrote:
> > > > > >
> > > > > > > I don't think this should be designed based on a simplistic
> file
> > > > > > > input-output scenario. It would be good to include a stateful
> > > > > > > transformation based on event time.
> > > > > > >
> > > > > > > More complex pipelines contain stateful transformations
that
> > depend
> > > > on
> > > > > > > windowing and watermarks. I think we need a watermark concept
> > that
> > > is
> > > > > > based
> > > > > > > on progress in event time (or other monotonic increasing
> > sequence)
> > > > that
> > > > > > > other operators can generically work with.
> > > > > > >
> > > > > > > Note that even file input in many cases can produce time
based
> > > > > > watermarks,
> > > > > > > for example when you read part files that are bound by
event
> > time.
> > > > > > >
> > > > > > > Thanks,
> > > > > > > Thomas
> > > > > > >
> > > > > > >
> > > > > > > On Wed, Feb 15, 2017 at 4:02 AM, Bhupesh Chawda <
> > > > > bhupesh@datatorrent.com
> > > > > > >
> > > > > > > wrote:
> > > > > > >
> > > > > > > > For better understanding the use case for control
tuples in
> > > batch,
> > > > ​I
> > > > > > am
> > > > > > > > creating a prototype for a batch application using
File Input
> > and
> > > > > File
> > > > > > > > Output operators.
> > > > > > > >
> > > > > > > > To enable basic batch processing for File IO operators,
I am
> > > > > proposing
> > > > > > > the
> > > > > > > > following changes to File input and output operators:
> > > > > > > > 1. File Input operator emits a watermark each time
it opens
> and
> > > > > closes
> > > > > > a
> > > > > > > > file. These can be "start file" and "end file" watermarks
> which
> > > > > include
> > > > > > > the
> > > > > > > > corresponding file names. The "start file" tuple should
be
> sent
> > > > > before
> > > > > > > any
> > > > > > > > of the data from that file flows.
> > > > > > > > 2. File Input operator can be configured to end the
> application
> > > > > after a
> > > > > > > > single or n scans of the directory (a batch). This
is where
> the
> > > > > > operator
> > > > > > > > emits the final watermark (the end of application
control
> > tuple).
> > > > > This
> > > > > > > will
> > > > > > > > also shutdown the application.
> > > > > > > > 3. The File output operator handles these control
tuples.
> > "Start
> > > > > file"
> > > > > > > > initializes the file name for the incoming tuples.
"End file"
> > > > > watermark
> > > > > > > > forces a finalize on that file.
> > > > > > > >
> > > > > > > > The user would be able to enable the operators to
send only
> > those
> > > > > > > > watermarks that are needed in the application. If
none of the
> > > > options
> > > > > > are
> > > > > > > > configured, the operators behave as in a streaming
> application.
> > > > > > > >
> > > > > > > > There are a few challenges in the implementation where
the
> > input
> > > > > > operator
> > > > > > > > is partitioned. In this case, the correlation between
the
> > > start/end
> > > > > > for a
> > > > > > > > file and the data tuples for that file is lost. Hence
we need
> > to
> > > > > > maintain
> > > > > > > > the filename as part of each tuple in the pipeline.
> > > > > > > >
> > > > > > > > The "start file" and "end file" control tuples in
this
> example
> > > are
> > > > > > > > temporary names for watermarks. We can have generic
"start
> > > batch" /
> > > > > > "end
> > > > > > > > batch" tuples which could be used for other use cases
as
> well.
> > > The
> > > > > > Final
> > > > > > > > watermark is common and serves the same purpose in
each case.
> > > > > > > >
> > > > > > > > Please let me know your thoughts on this.
> > > > > > > >
> > > > > > > > ~ Bhupesh
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > > On Wed, Jan 18, 2017 at 12:22 AM, Bhupesh Chawda <
> > > > > > > bhupesh@datatorrent.com>
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Yes, this can be part of operator configuration.
Given
> this,
> > > for
> > > > a
> > > > > > user
> > > > > > > > to
> > > > > > > > > define a batch application, would mean configuring
the
> > > connectors
> > > > > > > (mostly
> > > > > > > > > the input operator) in the application for the
desired
> > > behavior.
> > > > > > > > Similarly,
> > > > > > > > > there can be other use cases that can be achieved
other
> than
> > > > batch.
> > > > > > > > >
> > > > > > > > > We may also need to take care of the following:
> > > > > > > > > 1. Make sure that the watermarks or control tuples
are
> > > consistent
> > > > > > > across
> > > > > > > > > sources. Meaning an HDFS sink should be able
to interpret
> the
> > > > > > watermark
> > > > > > > > > tuple sent out by, say, a JDBC source.
> > > > > > > > > 2. In addition to I/O connectors, we should also
look at
> the
> > > need
> > > > > for
> > > > > > > > > processing operators to understand some of the
control
> > tuples /
> > > > > > > > watermarks.
> > > > > > > > > For example, we may want to reset the operator
behavior on
> > > > arrival
> > > > > of
> > > > > > > > some
> > > > > > > > > watermark tuple.
> > > > > > > > >
> > > > > > > > > ~ Bhupesh
> > > > > > > > >
> > > > > > > > > On Tue, Jan 17, 2017 at 9:59 PM, Thomas Weise
<
> > thw@apache.org>
> > > > > > wrote:
> > > > > > > > >
> > > > > > > > >> The HDFS source can operate in two modes,
bounded or
> > > unbounded.
> > > > If
> > > > > > you
> > > > > > > > >> scan
> > > > > > > > >> only once, then it should emit the final
watermark after
> it
> > is
> > > > > done.
> > > > > > > > >> Otherwise it would emit watermarks based
on a policy
> (files
> > > > names
> > > > > > > etc.).
> > > > > > > > >> The mechanism to generate the marks may depend
on the type
> > of
> > > > > source
> > > > > > > and
> > > > > > > > >> the user needs to be able to influence/configure
it.
> > > > > > > > >>
> > > > > > > > >> Thomas
> > > > > > > > >>
> > > > > > > > >>
> > > > > > > > >> On Tue, Jan 17, 2017 at 5:03 AM, Bhupesh
Chawda <
> > > > > > > > bhupesh@datatorrent.com>
> > > > > > > > >> wrote:
> > > > > > > > >>
> > > > > > > > >> > Hi Thomas,
> > > > > > > > >> >
> > > > > > > > >> > I am not sure that I completely understand
your
> > suggestion.
> > > > Are
> > > > > > you
> > > > > > > > >> > suggesting to broaden the scope of the
proposal to treat
> > all
> > > > > > sources
> > > > > > > > as
> > > > > > > > >> > bounded as well as unbounded?
> > > > > > > > >> >
> > > > > > > > >> > In case of Apex, we treat all sources
as unbounded
> > sources.
> > > > Even
> > > > > > > > bounded
> > > > > > > > >> > sources like HDFS file source is treated
as unbounded by
> > > means
> > > > > of
> > > > > > > > >> scanning
> > > > > > > > >> > the input directory repeatedly.
> > > > > > > > >> >
> > > > > > > > >> > Let's consider HDFS file source for
example:
> > > > > > > > >> > In this case, if we treat it as a bounded
source, we can
> > > > define
> > > > > > > hooks
> > > > > > > > >> which
> > > > > > > > >> > allows us to detect the end of the file
and send the
> > "final
> > > > > > > > watermark".
> > > > > > > > >> We
> > > > > > > > >> > could also consider HDFS file source
as a streaming
> source
> > > and
> > > > > > > define
> > > > > > > > >> hooks
> > > > > > > > >> > which send watermarks based on different
kinds of
> windows.
> > > > > > > > >> >
> > > > > > > > >> > Please correct me if I misunderstand.
> > > > > > > > >> >
> > > > > > > > >> > ~ Bhupesh
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > > >> > On Mon, Jan 16, 2017 at 9:23 PM, Thomas
Weise <
> > > thw@apache.org
> > > > >
> > > > > > > wrote:
> > > > > > > > >> >
> > > > > > > > >> > > Bhupesh,
> > > > > > > > >> > >
> > > > > > > > >> > > Please see how that can be solved
in a unified way
> using
> > > > > windows
> > > > > > > and
> > > > > > > > >> > > watermarks. It is bounded data
vs. unbounded data. In
> > Beam
> > > > for
> > > > > > > > >> example,
> > > > > > > > >> > you
> > > > > > > > >> > > can use the "global window" and
the final watermark to
> > > > > > accomplish
> > > > > > > > what
> > > > > > > > >> > you
> > > > > > > > >> > > are looking for. Batch is just
a special case of
> > streaming
> > > > > where
> > > > > > > the
> > > > > > > > >> > source
> > > > > > > > >> > > emits the final watermark.
> > > > > > > > >> > >
> > > > > > > > >> > > Thanks,
> > > > > > > > >> > > Thomas
> > > > > > > > >> > >
> > > > > > > > >> > >
> > > > > > > > >> > > On Mon, Jan 16, 2017 at 1:02 AM,
Bhupesh Chawda <
> > > > > > > > >> bhupesh@datatorrent.com
> > > > > > > > >> > >
> > > > > > > > >> > > wrote:
> > > > > > > > >> > >
> > > > > > > > >> > > > Yes, if the user needs to
develop a batch
> application,
> > > > then
> > > > > > > batch
> > > > > > > > >> aware
> > > > > > > > >> > > > operators need to be used
in the application.
> > > > > > > > >> > > > The nature of the application
is mostly controlled
> by
> > > the
> > > > > > input
> > > > > > > > and
> > > > > > > > >> the
> > > > > > > > >> > > > output operators used in the
application.
> > > > > > > > >> > > >
> > > > > > > > >> > > > For example, consider an application
which needs to
> > > filter
> > > > > > > records
> > > > > > > > >> in a
> > > > > > > > >> > > > input file and store the filtered
records in another
> > > file.
> > > > > The
> > > > > > > > >> nature
> > > > > > > > >> > of
> > > > > > > > >> > > > this app is to end once the
entire file is
> processed.
> > > > > > Following
> > > > > > > > >> things
> > > > > > > > >> > > are
> > > > > > > > >> > > > expected of the application:
> > > > > > > > >> > > >
> > > > > > > > >> > > >    1. Once the input data
is over, finalize the
> output
> > > > file
> > > > > > from
> > > > > > > > >> .tmp
> > > > > > > > >> > > >    files. - Responsibility
of output operator
> > > > > > > > >> > > >    2. End the application,
once the data is read and
> > > > > > processed -
> > > > > > > > >> > > >    Responsibility of input
operator
> > > > > > > > >> > > >
> > > > > > > > >> > > > These functions are essential
to allow the user to
> do
> > > > higher
> > > > > > > level
> > > > > > > > >> > > > operations like scheduling
or running a workflow of
> > > batch
> > > > > > > > >> applications.
> > > > > > > > >> > > >
> > > > > > > > >> > > > I am not sure about intermediate
(processing)
> > operators,
> > > > as
> > > > > > > there
> > > > > > > > >> is no
> > > > > > > > >> > > > change in their functionality
for batch use cases.
> > > > Perhaps,
> > > > > > > > allowing
> > > > > > > > >> > > > running multiple batches in
a single application may
> > > > require
> > > > > > > > similar
> > > > > > > > >> > > > changes in processing operators
as well.
> > > > > > > > >> > > >
> > > > > > > > >> > > > ~ Bhupesh
> > > > > > > > >> > > >
> > > > > > > > >> > > > On Mon, Jan 16, 2017 at 2:19
PM, Priyanka Gugale <
> > > > > > > > priyag@apache.org
> > > > > > > > >> >
> > > > > > > > >> > > > wrote:
> > > > > > > > >> > > >
> > > > > > > > >> > > > > Will it make an impression
on user that, if he
> has a
> > > > batch
> > > > > > > > >> usecase he
> > > > > > > > >> > > has
> > > > > > > > >> > > > > to use batch aware operators
only? If so, is that
> > what
> > > > we
> > > > > > > > expect?
> > > > > > > > >> I
> > > > > > > > >> > am
> > > > > > > > >> > > > not
> > > > > > > > >> > > > > aware of how do we implement
batch scenario so
> this
> > > > might
> > > > > > be a
> > > > > > > > >> basic
> > > > > > > > >> > > > > question.
> > > > > > > > >> > > > >
> > > > > > > > >> > > > > -Priyanka
> > > > > > > > >> > > > >
> > > > > > > > >> > > > > On Mon, Jan 16, 2017
at 12:02 PM, Bhupesh Chawda <
> > > > > > > > >> > > > bhupesh@datatorrent.com>
> > > > > > > > >> > > > > wrote:
> > > > > > > > >> > > > >
> > > > > > > > >> > > > > > Hi All,
> > > > > > > > >> > > > > >
> > > > > > > > >> > > > > > While design / implementation
for custom control
> > > > tuples
> > > > > is
> > > > > > > > >> > ongoing, I
> > > > > > > > >> > > > > > thought it would
be a good idea to consider its
> > > > > usefulness
> > > > > > > in
> > > > > > > > >> one
> > > > > > > > >> > of
> > > > > > > > >> > > > the
> > > > > > > > >> > > > > > use cases -  batch
applications.
> > > > > > > > >> > > > > >
> > > > > > > > >> > > > > > This is a proposal
to adapt / extend existing
> > > > operators
> > > > > in
> > > > > > > the
> > > > > > > > >> > Apache
> > > > > > > > >> > > > > Apex
> > > > > > > > >> > > > > > Malhar library so
that it is easy to use them in
> > > batch
> > > > > use
> > > > > > > > >> cases.
> > > > > > > > >> > > > > > Naturally, this
would be applicable for only a
> > > subset
> > > > of
> > > > > > > > >> operators
> > > > > > > > >> > > like
> > > > > > > > >> > > > > > File, JDBC and NoSQL
databases.
> > > > > > > > >> > > > > > For example, for
a file based store, (say HDFS
> > > store),
> > > > > we
> > > > > > > > could
> > > > > > > > >> > have
> > > > > > > > >> > > > > > FileBatchInput and
FileBatchOutput operators
> which
> > > > allow
> > > > > > > easy
> > > > > > > > >> > > > integration
> > > > > > > > >> > > > > > into a batch application.
These operators would
> be
> > > > > > extended
> > > > > > > > from
> > > > > > > > >> > > their
> > > > > > > > >> > > > > > existing implementations
and would be "Batch
> > Aware",
> > > > in
> > > > > > that
> > > > > > > > >> they
> > > > > > > > >> > may
> > > > > > > > >> > > > > > understand the meaning
of some specific control
> > > tuples
> > > > > > that
> > > > > > > > flow
> > > > > > > > >> > > > through
> > > > > > > > >> > > > > > the DAG. Start batch
and end batch seem to be
> the
> > > > > obvious
> > > > > > > > >> > candidates
> > > > > > > > >> > > > that
> > > > > > > > >> > > > > > come to mind. On
receipt of such control tuples,
> > > they
> > > > > may
> > > > > > > try
> > > > > > > > to
> > > > > > > > >> > > modify
> > > > > > > > >> > > > > the
> > > > > > > > >> > > > > > behavior of the
operator - to reinitialize some
> > > > metrics
> > > > > or
> > > > > > > > >> finalize
> > > > > > > > >> > > an
> > > > > > > > >> > > > > > output file for
example.
> > > > > > > > >> > > > > >
> > > > > > > > >> > > > > > We can discuss the
potential control tuples and
> > > > actions
> > > > > in
> > > > > > > > >> detail,
> > > > > > > > >> > > but
> > > > > > > > >> > > > > > first I would like
to understand the views of
> the
> > > > > > community
> > > > > > > > for
> > > > > > > > >> > this
> > > > > > > > >> > > > > > proposal.
> > > > > > > > >> > > > > >
> > > > > > > > >> > > > > > ~ Bhupesh
> > > > > > > > >> > > > > >
> > > > > > > > >> > > > >
> > > > > > > > >> > > >
> > > > > > > > >> > >
> > > > > > > > >> >
> > > > > > > > >>
> > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message