apex-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Bhupesh Chawda <bhup...@datatorrent.com>
Subject Re: [DISCUSS] Proposal for adapting Malhar operators for batch use cases
Date Thu, 23 Feb 2017 08:02:48 GMT
Hi Thomas,

My response inline:

On Wed, Feb 22, 2017 at 10:17 PM, Thomas Weise <thw@apache.org> wrote:

> Hi Bhupesh,
>
> This looks great. You use the watermark as measure of completeness and the
> window to isolate the state, which is how it should work.
>
> Questions/comments:
>
> Why does the count operator have a 2ms window when this should be driven by
> the watermark from the input operator?
>
>
​In this example, we trigger at the Watermark. So the count (windowed)
operator accumulates the state until the watermark and then emits all the
accumulated counts.
2 ms is not necessary, we can make it 1ms. But in this case its not the
time duration that matters. The file input operator makes sure all tuples
belonging to a file become part of the same window by making the timestamp
in those tuples same. So all tuples in first file go out with timestamp 0,
second file with timestamp 1 and so on.
​


> I don't think there should be separate "windowed" connector operators. The
> watermark support needs to be incorporated into existing operators and
> configurable. Windowing is a concept that the entire library needs to be
> aware of. I see no reason to arrange classes in separate "window" packages
> except those that are really specific to windowing support such as the
> watermark tuple.
>

​I think, making changes to the existing operators would make them too
heavy and complex. I suggest we extend the existing operators and have new
classes with just the logic for watermarks. This will also help bugs
resulting due to the new implementations isolated.
We can keep these in the same package as the existing operators. Just the
window specific classes (like watermarks) will go into the window package.
​


>
> Why does the control tuple have an operatorId in it?
>

​Operator id is not used in the current example, but may help the user to
understand the originating partition for a watermark tuple. This will be in
scenarios where we cannot distinguish between watermark tuples from
different partitions; unlike the file based watermarks where filename is
the distinguishing property.
​

>
> Once you make the changes to the operators, please also augment the
> documentation and examples (in this case wordcount demo).
>

​Sure.​



> Thanks,
> Thomas
>
>
>
> On Wed, Feb 22, 2017 at 4:51 AM, Bhupesh Chawda <bhupesh@datatorrent.com>
> wrote:
>
> > Hi Thomas,
> >
> > Sorry for the delay.
> > I agree that the watermark concept is general and is understood by
> > intermediate transformations. File name is some additional information in
> > the watermark which helps the start and end operators do stuff related to
> > batch.
> > As suggested, I have created a wordcount application which uses
> watermarks
> > to create separate windows for each file by means of a long (timestamp).
> > I am linking the source for reference:
> >
> > Watermarks:
> > https://github.com/bhupeshchawda/apex-malhar/blob/batch-io-operators/
> > library/src/main/java/org/apache/apex/malhar/lib/window/
> > windowable/FileWatermark.java
> >
> > Extended File Input and Output operators:
> > https://github.com/bhupeshchawda/apex-malhar/blob/batch-io-operators/
> > library/src/main/java/org/apache/apex/malhar/lib/window/windowable/
> > WindowedFileInputOperator.java
> > https://github.com/bhupeshchawda/apex-malhar/blob/batch-io-operators/
> > library/src/main/java/org/apache/apex/malhar/lib/window/windowable/
> > WindowedFileOutputOperator.java
> >
> >
> > WordCount Application:
> >
> > https://github.com/bhupeshchawda/apex-malhar/blob/batch-io-operators/
> > library/src/test/java/org/apache/apex/malhar/lib/window/
> > windowable/WindowedWordCount.java
> >
> >
> > The input operator attaches a timestamp with each file which allows the
> > WindowedOperator to identify each file and its state in a distinct
> window.
> >
> > Additionally, using the additional file information, the application can
> > store the counts in similarly named files at the destination.
> >
> >
> > Thanks.
> >
> > _______________________________________________________
> >
> > Bhupesh Chawda
> >
> > Software Engineer
> >
> > E: bhupesh@datatorrent.com | Twitter: @bhupeshsc
> >
> > www.datatorrent.com  |  apex.apache.org
> >
> >
> >
> > On Sat, Feb 18, 2017 at 10:24 PM, Thomas Weise <thw@apache.org> wrote:
> >
> > > Hi Bhupesh,
> > >
> > > I think this needs a generic watermark concept that is independent of
> > > source and destination and can be understood by intermediate
> > > transformations. File names don't meet this criteria.
> > >
> > > One possible approach is to have a monotonic increasing file sequence
> > > (instead of time, if it is not applicable) that can be mapped to
> > watermark.
> > > You can still tag on the file name to the control tuple as extra
> > > information so that a file output operator that understands it can do
> > > whatever it wants with it. But it should also work without it, let's
> say
> > > when we write the output to the console.
> > >
> > > The key here is that you can demonstrate that an intermediate stateful
> > > transformation will work. I would suggest to try wordcount per input
> file
> > > with the window operator that emits the counts at file boundary,
> without
> > > knowing anything about files.
> > >
> > > Thanks,
> > > Thomas
> > >
> > >
> > > On Sat, Feb 18, 2017 at 8:04 AM, Bhupesh Chawda <
> bhupesh@datatorrent.com
> > >
> > > wrote:
> > >
> > > > Hi Thomas,
> > > >
> > > > For an input operator which is supposed to generate watermarks for
> > > > downstream operators, I can think about the following watermarks that
> > the
> > > > operator can emit:
> > > > 1. Time based watermarks (the high watermark / low watermark)
> > > > 2. Number of tuple based watermarks (Every n tuples)
> > > > 3. File based watermarks (Start file, end file)
> > > > 4. Final watermark
> > > >
> > > > File based watermarks seem to be applicable for batch (file based) as
> > > well,
> > > > and hence I thought of looking at these first. Does this seem to be
> in
> > > line
> > > > with the thought process?
> > > >
> > > > ~ Bhupesh
> > > >
> > > >
> > > >
> > > > _______________________________________________________
> > > >
> > > > Bhupesh Chawda
> > > >
> > > > Software Engineer
> > > >
> > > > E: bhupesh@datatorrent.com | Twitter: @bhupeshsc
> > > >
> > > > www.datatorrent.com  |  apex.apache.org
> > > >
> > > >
> > > >
> > > > On Thu, Feb 16, 2017 at 10:37 AM, Thomas Weise <thw@apache.org>
> wrote:
> > > >
> > > > > I don't think this should be designed based on a simplistic file
> > > > > input-output scenario. It would be good to include a stateful
> > > > > transformation based on event time.
> > > > >
> > > > > More complex pipelines contain stateful transformations that depend
> > on
> > > > > windowing and watermarks. I think we need a watermark concept that
> is
> > > > based
> > > > > on progress in event time (or other monotonic increasing sequence)
> > that
> > > > > other operators can generically work with.
> > > > >
> > > > > Note that even file input in many cases can produce time based
> > > > watermarks,
> > > > > for example when you read part files that are bound by event time.
> > > > >
> > > > > Thanks,
> > > > > Thomas
> > > > >
> > > > >
> > > > > On Wed, Feb 15, 2017 at 4:02 AM, Bhupesh Chawda <
> > > bhupesh@datatorrent.com
> > > > >
> > > > > wrote:
> > > > >
> > > > > > For better understanding the use case for control tuples in
> batch,
> > ​I
> > > > am
> > > > > > creating a prototype for a batch application using File Input
and
> > > File
> > > > > > Output operators.
> > > > > >
> > > > > > To enable basic batch processing for File IO operators, I am
> > > proposing
> > > > > the
> > > > > > following changes to File input and output operators:
> > > > > > 1. File Input operator emits a watermark each time it opens
and
> > > closes
> > > > a
> > > > > > file. These can be "start file" and "end file" watermarks which
> > > include
> > > > > the
> > > > > > corresponding file names. The "start file" tuple should be sent
> > > before
> > > > > any
> > > > > > of the data from that file flows.
> > > > > > 2. File Input operator can be configured to end the application
> > > after a
> > > > > > single or n scans of the directory (a batch). This is where
the
> > > > operator
> > > > > > emits the final watermark (the end of application control tuple).
> > > This
> > > > > will
> > > > > > also shutdown the application.
> > > > > > 3. The File output operator handles these control tuples. "Start
> > > file"
> > > > > > initializes the file name for the incoming tuples. "End file"
> > > watermark
> > > > > > forces a finalize on that file.
> > > > > >
> > > > > > The user would be able to enable the operators to send only
those
> > > > > > watermarks that are needed in the application. If none of the
> > options
> > > > are
> > > > > > configured, the operators behave as in a streaming application.
> > > > > >
> > > > > > There are a few challenges in the implementation where the input
> > > > operator
> > > > > > is partitioned. In this case, the correlation between the
> start/end
> > > > for a
> > > > > > file and the data tuples for that file is lost. Hence we need
to
> > > > maintain
> > > > > > the filename as part of each tuple in the pipeline.
> > > > > >
> > > > > > The "start file" and "end file" control tuples in this example
> are
> > > > > > temporary names for watermarks. We can have generic "start
> batch" /
> > > > "end
> > > > > > batch" tuples which could be used for other use cases as well.
> The
> > > > Final
> > > > > > watermark is common and serves the same purpose in each case.
> > > > > >
> > > > > > Please let me know your thoughts on this.
> > > > > >
> > > > > > ~ Bhupesh
> > > > > >
> > > > > >
> > > > > >
> > > > > > On Wed, Jan 18, 2017 at 12:22 AM, Bhupesh Chawda <
> > > > > bhupesh@datatorrent.com>
> > > > > > wrote:
> > > > > >
> > > > > > > Yes, this can be part of operator configuration. Given
this,
> for
> > a
> > > > user
> > > > > > to
> > > > > > > define a batch application, would mean configuring the
> connectors
> > > > > (mostly
> > > > > > > the input operator) in the application for the desired
> behavior.
> > > > > > Similarly,
> > > > > > > there can be other use cases that can be achieved other
than
> > batch.
> > > > > > >
> > > > > > > We may also need to take care of the following:
> > > > > > > 1. Make sure that the watermarks or control tuples are
> consistent
> > > > > across
> > > > > > > sources. Meaning an HDFS sink should be able to interpret
the
> > > > watermark
> > > > > > > tuple sent out by, say, a JDBC source.
> > > > > > > 2. In addition to I/O connectors, we should also look at
the
> need
> > > for
> > > > > > > processing operators to understand some of the control
tuples /
> > > > > > watermarks.
> > > > > > > For example, we may want to reset the operator behavior
on
> > arrival
> > > of
> > > > > > some
> > > > > > > watermark tuple.
> > > > > > >
> > > > > > > ~ Bhupesh
> > > > > > >
> > > > > > > On Tue, Jan 17, 2017 at 9:59 PM, Thomas Weise <thw@apache.org>
> > > > wrote:
> > > > > > >
> > > > > > >> The HDFS source can operate in two modes, bounded or
> unbounded.
> > If
> > > > you
> > > > > > >> scan
> > > > > > >> only once, then it should emit the final watermark
after it is
> > > done.
> > > > > > >> Otherwise it would emit watermarks based on a policy
(files
> > names
> > > > > etc.).
> > > > > > >> The mechanism to generate the marks may depend on the
type of
> > > source
> > > > > and
> > > > > > >> the user needs to be able to influence/configure it.
> > > > > > >>
> > > > > > >> Thomas
> > > > > > >>
> > > > > > >>
> > > > > > >> On Tue, Jan 17, 2017 at 5:03 AM, Bhupesh Chawda <
> > > > > > bhupesh@datatorrent.com>
> > > > > > >> wrote:
> > > > > > >>
> > > > > > >> > Hi Thomas,
> > > > > > >> >
> > > > > > >> > I am not sure that I completely understand your
suggestion.
> > Are
> > > > you
> > > > > > >> > suggesting to broaden the scope of the proposal
to treat all
> > > > sources
> > > > > > as
> > > > > > >> > bounded as well as unbounded?
> > > > > > >> >
> > > > > > >> > In case of Apex, we treat all sources as unbounded
sources.
> > Even
> > > > > > bounded
> > > > > > >> > sources like HDFS file source is treated as unbounded
by
> means
> > > of
> > > > > > >> scanning
> > > > > > >> > the input directory repeatedly.
> > > > > > >> >
> > > > > > >> > Let's consider HDFS file source for example:
> > > > > > >> > In this case, if we treat it as a bounded source,
we can
> > define
> > > > > hooks
> > > > > > >> which
> > > > > > >> > allows us to detect the end of the file and send
the "final
> > > > > > watermark".
> > > > > > >> We
> > > > > > >> > could also consider HDFS file source as a streaming
source
> and
> > > > > define
> > > > > > >> hooks
> > > > > > >> > which send watermarks based on different kinds
of windows.
> > > > > > >> >
> > > > > > >> > Please correct me if I misunderstand.
> > > > > > >> >
> > > > > > >> > ~ Bhupesh
> > > > > > >> >
> > > > > > >> >
> > > > > > >> > On Mon, Jan 16, 2017 at 9:23 PM, Thomas Weise
<
> thw@apache.org
> > >
> > > > > wrote:
> > > > > > >> >
> > > > > > >> > > Bhupesh,
> > > > > > >> > >
> > > > > > >> > > Please see how that can be solved in a unified
way using
> > > windows
> > > > > and
> > > > > > >> > > watermarks. It is bounded data vs. unbounded
data. In Beam
> > for
> > > > > > >> example,
> > > > > > >> > you
> > > > > > >> > > can use the "global window" and the final
watermark to
> > > > accomplish
> > > > > > what
> > > > > > >> > you
> > > > > > >> > > are looking for. Batch is just a special
case of streaming
> > > where
> > > > > the
> > > > > > >> > source
> > > > > > >> > > emits the final watermark.
> > > > > > >> > >
> > > > > > >> > > Thanks,
> > > > > > >> > > Thomas
> > > > > > >> > >
> > > > > > >> > >
> > > > > > >> > > On Mon, Jan 16, 2017 at 1:02 AM, Bhupesh
Chawda <
> > > > > > >> bhupesh@datatorrent.com
> > > > > > >> > >
> > > > > > >> > > wrote:
> > > > > > >> > >
> > > > > > >> > > > Yes, if the user needs to develop a
batch application,
> > then
> > > > > batch
> > > > > > >> aware
> > > > > > >> > > > operators need to be used in the application.
> > > > > > >> > > > The nature of the application is mostly
controlled by
> the
> > > > input
> > > > > > and
> > > > > > >> the
> > > > > > >> > > > output operators used in the application.
> > > > > > >> > > >
> > > > > > >> > > > For example, consider an application
which needs to
> filter
> > > > > records
> > > > > > >> in a
> > > > > > >> > > > input file and store the filtered records
in another
> file.
> > > The
> > > > > > >> nature
> > > > > > >> > of
> > > > > > >> > > > this app is to end once the entire file
is processed.
> > > > Following
> > > > > > >> things
> > > > > > >> > > are
> > > > > > >> > > > expected of the application:
> > > > > > >> > > >
> > > > > > >> > > >    1. Once the input data is over, finalize
the output
> > file
> > > > from
> > > > > > >> .tmp
> > > > > > >> > > >    files. - Responsibility of output
operator
> > > > > > >> > > >    2. End the application, once the
data is read and
> > > > processed -
> > > > > > >> > > >    Responsibility of input operator
> > > > > > >> > > >
> > > > > > >> > > > These functions are essential to allow
the user to do
> > higher
> > > > > level
> > > > > > >> > > > operations like scheduling or running
a workflow of
> batch
> > > > > > >> applications.
> > > > > > >> > > >
> > > > > > >> > > > I am not sure about intermediate (processing)
operators,
> > as
> > > > > there
> > > > > > >> is no
> > > > > > >> > > > change in their functionality for batch
use cases.
> > Perhaps,
> > > > > > allowing
> > > > > > >> > > > running multiple batches in a single
application may
> > require
> > > > > > similar
> > > > > > >> > > > changes in processing operators as well.
> > > > > > >> > > >
> > > > > > >> > > > ~ Bhupesh
> > > > > > >> > > >
> > > > > > >> > > > On Mon, Jan 16, 2017 at 2:19 PM, Priyanka
Gugale <
> > > > > > priyag@apache.org
> > > > > > >> >
> > > > > > >> > > > wrote:
> > > > > > >> > > >
> > > > > > >> > > > > Will it make an impression on user
that, if he has a
> > batch
> > > > > > >> usecase he
> > > > > > >> > > has
> > > > > > >> > > > > to use batch aware operators only?
If so, is that what
> > we
> > > > > > expect?
> > > > > > >> I
> > > > > > >> > am
> > > > > > >> > > > not
> > > > > > >> > > > > aware of how do we implement batch
scenario so this
> > might
> > > > be a
> > > > > > >> basic
> > > > > > >> > > > > question.
> > > > > > >> > > > >
> > > > > > >> > > > > -Priyanka
> > > > > > >> > > > >
> > > > > > >> > > > > On Mon, Jan 16, 2017 at 12:02 PM,
Bhupesh Chawda <
> > > > > > >> > > > bhupesh@datatorrent.com>
> > > > > > >> > > > > wrote:
> > > > > > >> > > > >
> > > > > > >> > > > > > Hi All,
> > > > > > >> > > > > >
> > > > > > >> > > > > > While design / implementation
for custom control
> > tuples
> > > is
> > > > > > >> > ongoing, I
> > > > > > >> > > > > > thought it would be a good
idea to consider its
> > > usefulness
> > > > > in
> > > > > > >> one
> > > > > > >> > of
> > > > > > >> > > > the
> > > > > > >> > > > > > use cases -  batch applications.
> > > > > > >> > > > > >
> > > > > > >> > > > > > This is a proposal to adapt
/ extend existing
> > operators
> > > in
> > > > > the
> > > > > > >> > Apache
> > > > > > >> > > > > Apex
> > > > > > >> > > > > > Malhar library so that it
is easy to use them in
> batch
> > > use
> > > > > > >> cases.
> > > > > > >> > > > > > Naturally, this would be applicable
for only a
> subset
> > of
> > > > > > >> operators
> > > > > > >> > > like
> > > > > > >> > > > > > File, JDBC and NoSQL databases.
> > > > > > >> > > > > > For example, for a file based
store, (say HDFS
> store),
> > > we
> > > > > > could
> > > > > > >> > have
> > > > > > >> > > > > > FileBatchInput and FileBatchOutput
operators which
> > allow
> > > > > easy
> > > > > > >> > > > integration
> > > > > > >> > > > > > into a batch application.
These operators would be
> > > > extended
> > > > > > from
> > > > > > >> > > their
> > > > > > >> > > > > > existing implementations and
would be "Batch Aware",
> > in
> > > > that
> > > > > > >> they
> > > > > > >> > may
> > > > > > >> > > > > > understand the meaning of
some specific control
> tuples
> > > > that
> > > > > > flow
> > > > > > >> > > > through
> > > > > > >> > > > > > the DAG. Start batch and end
batch seem to be the
> > > obvious
> > > > > > >> > candidates
> > > > > > >> > > > that
> > > > > > >> > > > > > come to mind. On receipt of
such control tuples,
> they
> > > may
> > > > > try
> > > > > > to
> > > > > > >> > > modify
> > > > > > >> > > > > the
> > > > > > >> > > > > > behavior of the operator -
to reinitialize some
> > metrics
> > > or
> > > > > > >> finalize
> > > > > > >> > > an
> > > > > > >> > > > > > output file for example.
> > > > > > >> > > > > >
> > > > > > >> > > > > > We can discuss the potential
control tuples and
> > actions
> > > in
> > > > > > >> detail,
> > > > > > >> > > but
> > > > > > >> > > > > > first I would like to understand
the views of the
> > > > community
> > > > > > for
> > > > > > >> > this
> > > > > > >> > > > > > proposal.
> > > > > > >> > > > > >
> > > > > > >> > > > > > ~ Bhupesh
> > > > > > >> > > > > >
> > > > > > >> > > > >
> > > > > > >> > > >
> > > > > > >> > >
> > > > > > >> >
> > > > > > >>
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message