apex-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Bhupesh Chawda <bhup...@datatorrent.com>
Subject Re: [DISCUSS] Proposal for adapting Malhar operators for batch use cases
Date Sat, 18 Feb 2017 16:04:51 GMT
Hi Thomas,

For an input operator which is supposed to generate watermarks for
downstream operators, I can think about the following watermarks that the
operator can emit:
1. Time based watermarks (the high watermark / low watermark)
2. Number of tuple based watermarks (Every n tuples)
3. File based watermarks (Start file, end file)
4. Final watermark

File based watermarks seem to be applicable for batch (file based) as well,
and hence I thought of looking at these first. Does this seem to be in line
with the thought process?

~ Bhupesh



_______________________________________________________

Bhupesh Chawda

Software Engineer

E: bhupesh@datatorrent.com | Twitter: @bhupeshsc

www.datatorrent.com  |  apex.apache.org



On Thu, Feb 16, 2017 at 10:37 AM, Thomas Weise <thw@apache.org> wrote:

> I don't think this should be designed based on a simplistic file
> input-output scenario. It would be good to include a stateful
> transformation based on event time.
>
> More complex pipelines contain stateful transformations that depend on
> windowing and watermarks. I think we need a watermark concept that is based
> on progress in event time (or other monotonic increasing sequence) that
> other operators can generically work with.
>
> Note that even file input in many cases can produce time based watermarks,
> for example when you read part files that are bound by event time.
>
> Thanks,
> Thomas
>
>
> On Wed, Feb 15, 2017 at 4:02 AM, Bhupesh Chawda <bhupesh@datatorrent.com>
> wrote:
>
> > For better understanding the use case for control tuples in batch, ‚ÄčI am
> > creating a prototype for a batch application using File Input and File
> > Output operators.
> >
> > To enable basic batch processing for File IO operators, I am proposing
> the
> > following changes to File input and output operators:
> > 1. File Input operator emits a watermark each time it opens and closes a
> > file. These can be "start file" and "end file" watermarks which include
> the
> > corresponding file names. The "start file" tuple should be sent before
> any
> > of the data from that file flows.
> > 2. File Input operator can be configured to end the application after a
> > single or n scans of the directory (a batch). This is where the operator
> > emits the final watermark (the end of application control tuple). This
> will
> > also shutdown the application.
> > 3. The File output operator handles these control tuples. "Start file"
> > initializes the file name for the incoming tuples. "End file" watermark
> > forces a finalize on that file.
> >
> > The user would be able to enable the operators to send only those
> > watermarks that are needed in the application. If none of the options are
> > configured, the operators behave as in a streaming application.
> >
> > There are a few challenges in the implementation where the input operator
> > is partitioned. In this case, the correlation between the start/end for a
> > file and the data tuples for that file is lost. Hence we need to maintain
> > the filename as part of each tuple in the pipeline.
> >
> > The "start file" and "end file" control tuples in this example are
> > temporary names for watermarks. We can have generic "start batch" / "end
> > batch" tuples which could be used for other use cases as well. The Final
> > watermark is common and serves the same purpose in each case.
> >
> > Please let me know your thoughts on this.
> >
> > ~ Bhupesh
> >
> >
> >
> > On Wed, Jan 18, 2017 at 12:22 AM, Bhupesh Chawda <
> bhupesh@datatorrent.com>
> > wrote:
> >
> > > Yes, this can be part of operator configuration. Given this, for a user
> > to
> > > define a batch application, would mean configuring the connectors
> (mostly
> > > the input operator) in the application for the desired behavior.
> > Similarly,
> > > there can be other use cases that can be achieved other than batch.
> > >
> > > We may also need to take care of the following:
> > > 1. Make sure that the watermarks or control tuples are consistent
> across
> > > sources. Meaning an HDFS sink should be able to interpret the watermark
> > > tuple sent out by, say, a JDBC source.
> > > 2. In addition to I/O connectors, we should also look at the need for
> > > processing operators to understand some of the control tuples /
> > watermarks.
> > > For example, we may want to reset the operator behavior on arrival of
> > some
> > > watermark tuple.
> > >
> > > ~ Bhupesh
> > >
> > > On Tue, Jan 17, 2017 at 9:59 PM, Thomas Weise <thw@apache.org> wrote:
> > >
> > >> The HDFS source can operate in two modes, bounded or unbounded. If you
> > >> scan
> > >> only once, then it should emit the final watermark after it is done.
> > >> Otherwise it would emit watermarks based on a policy (files names
> etc.).
> > >> The mechanism to generate the marks may depend on the type of source
> and
> > >> the user needs to be able to influence/configure it.
> > >>
> > >> Thomas
> > >>
> > >>
> > >> On Tue, Jan 17, 2017 at 5:03 AM, Bhupesh Chawda <
> > bhupesh@datatorrent.com>
> > >> wrote:
> > >>
> > >> > Hi Thomas,
> > >> >
> > >> > I am not sure that I completely understand your suggestion. Are you
> > >> > suggesting to broaden the scope of the proposal to treat all sources
> > as
> > >> > bounded as well as unbounded?
> > >> >
> > >> > In case of Apex, we treat all sources as unbounded sources. Even
> > bounded
> > >> > sources like HDFS file source is treated as unbounded by means of
> > >> scanning
> > >> > the input directory repeatedly.
> > >> >
> > >> > Let's consider HDFS file source for example:
> > >> > In this case, if we treat it as a bounded source, we can define
> hooks
> > >> which
> > >> > allows us to detect the end of the file and send the "final
> > watermark".
> > >> We
> > >> > could also consider HDFS file source as a streaming source and
> define
> > >> hooks
> > >> > which send watermarks based on different kinds of windows.
> > >> >
> > >> > Please correct me if I misunderstand.
> > >> >
> > >> > ~ Bhupesh
> > >> >
> > >> >
> > >> > On Mon, Jan 16, 2017 at 9:23 PM, Thomas Weise <thw@apache.org>
> wrote:
> > >> >
> > >> > > Bhupesh,
> > >> > >
> > >> > > Please see how that can be solved in a unified way using windows
> and
> > >> > > watermarks. It is bounded data vs. unbounded data. In Beam for
> > >> example,
> > >> > you
> > >> > > can use the "global window" and the final watermark to accomplish
> > what
> > >> > you
> > >> > > are looking for. Batch is just a special case of streaming where
> the
> > >> > source
> > >> > > emits the final watermark.
> > >> > >
> > >> > > Thanks,
> > >> > > Thomas
> > >> > >
> > >> > >
> > >> > > On Mon, Jan 16, 2017 at 1:02 AM, Bhupesh Chawda <
> > >> bhupesh@datatorrent.com
> > >> > >
> > >> > > wrote:
> > >> > >
> > >> > > > Yes, if the user needs to develop a batch application, then
> batch
> > >> aware
> > >> > > > operators need to be used in the application.
> > >> > > > The nature of the application is mostly controlled by the
input
> > and
> > >> the
> > >> > > > output operators used in the application.
> > >> > > >
> > >> > > > For example, consider an application which needs to filter
> records
> > >> in a
> > >> > > > input file and store the filtered records in another file.
The
> > >> nature
> > >> > of
> > >> > > > this app is to end once the entire file is processed. Following
> > >> things
> > >> > > are
> > >> > > > expected of the application:
> > >> > > >
> > >> > > >    1. Once the input data is over, finalize the output file
from
> > >> .tmp
> > >> > > >    files. - Responsibility of output operator
> > >> > > >    2. End the application, once the data is read and processed
-
> > >> > > >    Responsibility of input operator
> > >> > > >
> > >> > > > These functions are essential to allow the user to do higher
> level
> > >> > > > operations like scheduling or running a workflow of batch
> > >> applications.
> > >> > > >
> > >> > > > I am not sure about intermediate (processing) operators,
as
> there
> > >> is no
> > >> > > > change in their functionality for batch use cases. Perhaps,
> > allowing
> > >> > > > running multiple batches in a single application may require
> > similar
> > >> > > > changes in processing operators as well.
> > >> > > >
> > >> > > > ~ Bhupesh
> > >> > > >
> > >> > > > On Mon, Jan 16, 2017 at 2:19 PM, Priyanka Gugale <
> > priyag@apache.org
> > >> >
> > >> > > > wrote:
> > >> > > >
> > >> > > > > Will it make an impression on user that, if he has
a batch
> > >> usecase he
> > >> > > has
> > >> > > > > to use batch aware operators only? If so, is that what
we
> > expect?
> > >> I
> > >> > am
> > >> > > > not
> > >> > > > > aware of how do we implement batch scenario so this
might be a
> > >> basic
> > >> > > > > question.
> > >> > > > >
> > >> > > > > -Priyanka
> > >> > > > >
> > >> > > > > On Mon, Jan 16, 2017 at 12:02 PM, Bhupesh Chawda <
> > >> > > > bhupesh@datatorrent.com>
> > >> > > > > wrote:
> > >> > > > >
> > >> > > > > > Hi All,
> > >> > > > > >
> > >> > > > > > While design / implementation for custom control
tuples is
> > >> > ongoing, I
> > >> > > > > > thought it would be a good idea to consider its
usefulness
> in
> > >> one
> > >> > of
> > >> > > > the
> > >> > > > > > use cases -  batch applications.
> > >> > > > > >
> > >> > > > > > This is a proposal to adapt / extend existing
operators in
> the
> > >> > Apache
> > >> > > > > Apex
> > >> > > > > > Malhar library so that it is easy to use them
in batch use
> > >> cases.
> > >> > > > > > Naturally, this would be applicable for only a
subset of
> > >> operators
> > >> > > like
> > >> > > > > > File, JDBC and NoSQL databases.
> > >> > > > > > For example, for a file based store, (say HDFS
store), we
> > could
> > >> > have
> > >> > > > > > FileBatchInput and FileBatchOutput operators which
allow
> easy
> > >> > > > integration
> > >> > > > > > into a batch application. These operators would
be extended
> > from
> > >> > > their
> > >> > > > > > existing implementations and would be "Batch Aware",
in that
> > >> they
> > >> > may
> > >> > > > > > understand the meaning of some specific control
tuples that
> > flow
> > >> > > > through
> > >> > > > > > the DAG. Start batch and end batch seem to be
the obvious
> > >> > candidates
> > >> > > > that
> > >> > > > > > come to mind. On receipt of such control tuples,
they may
> try
> > to
> > >> > > modify
> > >> > > > > the
> > >> > > > > > behavior of the operator - to reinitialize some
metrics or
> > >> finalize
> > >> > > an
> > >> > > > > > output file for example.
> > >> > > > > >
> > >> > > > > > We can discuss the potential control tuples and
actions in
> > >> detail,
> > >> > > but
> > >> > > > > > first I would like to understand the views of
the community
> > for
> > >> > this
> > >> > > > > > proposal.
> > >> > > > > >
> > >> > > > > > ~ Bhupesh
> > >> > > > > >
> > >> > > > >
> > >> > > >
> > >> > >
> > >> >
> > >>
> > >
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message