Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 02DA9200C2B for ; Thu, 16 Feb 2017 06:07:07 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 0178F160B70; Thu, 16 Feb 2017 05:07:07 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 25909160B5E for ; Thu, 16 Feb 2017 06:07:06 +0100 (CET) Received: (qmail 92963 invoked by uid 500); 16 Feb 2017 05:07:05 -0000 Mailing-List: contact dev-help@apex.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@apex.apache.org Delivered-To: mailing list dev@apex.apache.org Received: (qmail 92952 invoked by uid 99); 16 Feb 2017 05:07:05 -0000 Received: from mail-relay.apache.org (HELO mail-relay.apache.org) (140.211.11.15) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 16 Feb 2017 05:07:05 +0000 Received: from mail-ot0-f172.google.com (mail-ot0-f172.google.com [74.125.82.172]) by mail-relay.apache.org (ASF Mail Server at mail-relay.apache.org) with ESMTPSA id E872F1A00C5 for ; Thu, 16 Feb 2017 05:07:04 +0000 (UTC) Received: by mail-ot0-f172.google.com with SMTP id 32so4543116oth.3 for ; Wed, 15 Feb 2017 21:07:04 -0800 (PST) X-Gm-Message-State: AMke39nNXgv+wafULANfuAc22MWtIf7a/hoJfq4S0rjIdMBCkZyeVZsvd3v54sRS3mGNwOLIVPuwrivTttgxqw== X-Received: by 10.157.16.9 with SMTP id h9mr201324ote.40.1487221624154; Wed, 15 Feb 2017 21:07:04 -0800 (PST) MIME-Version: 1.0 Received: by 10.182.11.66 with HTTP; Wed, 15 Feb 2017 21:07:03 -0800 (PST) In-Reply-To: References: From: Thomas Weise Date: Wed, 15 Feb 2017 21:07:03 -0800 X-Gmail-Original-Message-ID: Message-ID: Subject: Re: [DISCUSS] Proposal for adapting Malhar operators for batch use cases To: dev@apex.apache.org Content-Type: multipart/alternative; boundary=001a11402c029c43d605489ec440 archived-at: Thu, 16 Feb 2017 05:07:07 -0000 --001a11402c029c43d605489ec440 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: quoted-printable I don't think this should be designed based on a simplistic file input-output scenario. It would be good to include a stateful transformation based on event time. More complex pipelines contain stateful transformations that depend on windowing and watermarks. I think we need a watermark concept that is based on progress in event time (or other monotonic increasing sequence) that other operators can generically work with. Note that even file input in many cases can produce time based watermarks, for example when you read part files that are bound by event time. Thanks, Thomas On Wed, Feb 15, 2017 at 4:02 AM, Bhupesh Chawda wrote: > For better understanding the use case for control tuples in batch, =E2=80= =8BI am > creating a prototype for a batch application using File Input and File > Output operators. > > To enable basic batch processing for File IO operators, I am proposing th= e > following changes to File input and output operators: > 1. File Input operator emits a watermark each time it opens and closes a > file. These can be "start file" and "end file" watermarks which include t= he > corresponding file names. The "start file" tuple should be sent before an= y > of the data from that file flows. > 2. File Input operator can be configured to end the application after a > single or n scans of the directory (a batch). This is where the operator > emits the final watermark (the end of application control tuple). This wi= ll > also shutdown the application. > 3. The File output operator handles these control tuples. "Start file" > initializes the file name for the incoming tuples. "End file" watermark > forces a finalize on that file. > > The user would be able to enable the operators to send only those > watermarks that are needed in the application. If none of the options are > configured, the operators behave as in a streaming application. > > There are a few challenges in the implementation where the input operator > is partitioned. In this case, the correlation between the start/end for a > file and the data tuples for that file is lost. Hence we need to maintain > the filename as part of each tuple in the pipeline. > > The "start file" and "end file" control tuples in this example are > temporary names for watermarks. We can have generic "start batch" / "end > batch" tuples which could be used for other use cases as well. The Final > watermark is common and serves the same purpose in each case. > > Please let me know your thoughts on this. > > ~ Bhupesh > > > > On Wed, Jan 18, 2017 at 12:22 AM, Bhupesh Chawda > wrote: > > > Yes, this can be part of operator configuration. Given this, for a user > to > > define a batch application, would mean configuring the connectors (most= ly > > the input operator) in the application for the desired behavior. > Similarly, > > there can be other use cases that can be achieved other than batch. > > > > We may also need to take care of the following: > > 1. Make sure that the watermarks or control tuples are consistent acros= s > > sources. Meaning an HDFS sink should be able to interpret the watermark > > tuple sent out by, say, a JDBC source. > > 2. In addition to I/O connectors, we should also look at the need for > > processing operators to understand some of the control tuples / > watermarks. > > For example, we may want to reset the operator behavior on arrival of > some > > watermark tuple. > > > > ~ Bhupesh > > > > On Tue, Jan 17, 2017 at 9:59 PM, Thomas Weise wrote: > > > >> The HDFS source can operate in two modes, bounded or unbounded. If you > >> scan > >> only once, then it should emit the final watermark after it is done. > >> Otherwise it would emit watermarks based on a policy (files names etc.= ). > >> The mechanism to generate the marks may depend on the type of source a= nd > >> the user needs to be able to influence/configure it. > >> > >> Thomas > >> > >> > >> On Tue, Jan 17, 2017 at 5:03 AM, Bhupesh Chawda < > bhupesh@datatorrent.com> > >> wrote: > >> > >> > Hi Thomas, > >> > > >> > I am not sure that I completely understand your suggestion. Are you > >> > suggesting to broaden the scope of the proposal to treat all sources > as > >> > bounded as well as unbounded? > >> > > >> > In case of Apex, we treat all sources as unbounded sources. Even > bounded > >> > sources like HDFS file source is treated as unbounded by means of > >> scanning > >> > the input directory repeatedly. > >> > > >> > Let's consider HDFS file source for example: > >> > In this case, if we treat it as a bounded source, we can define hook= s > >> which > >> > allows us to detect the end of the file and send the "final > watermark". > >> We > >> > could also consider HDFS file source as a streaming source and defin= e > >> hooks > >> > which send watermarks based on different kinds of windows. > >> > > >> > Please correct me if I misunderstand. > >> > > >> > ~ Bhupesh > >> > > >> > > >> > On Mon, Jan 16, 2017 at 9:23 PM, Thomas Weise wrote= : > >> > > >> > > Bhupesh, > >> > > > >> > > Please see how that can be solved in a unified way using windows a= nd > >> > > watermarks. It is bounded data vs. unbounded data. In Beam for > >> example, > >> > you > >> > > can use the "global window" and the final watermark to accomplish > what > >> > you > >> > > are looking for. Batch is just a special case of streaming where t= he > >> > source > >> > > emits the final watermark. > >> > > > >> > > Thanks, > >> > > Thomas > >> > > > >> > > > >> > > On Mon, Jan 16, 2017 at 1:02 AM, Bhupesh Chawda < > >> bhupesh@datatorrent.com > >> > > > >> > > wrote: > >> > > > >> > > > Yes, if the user needs to develop a batch application, then batc= h > >> aware > >> > > > operators need to be used in the application. > >> > > > The nature of the application is mostly controlled by the input > and > >> the > >> > > > output operators used in the application. > >> > > > > >> > > > For example, consider an application which needs to filter recor= ds > >> in a > >> > > > input file and store the filtered records in another file. The > >> nature > >> > of > >> > > > this app is to end once the entire file is processed. Following > >> things > >> > > are > >> > > > expected of the application: > >> > > > > >> > > > 1. Once the input data is over, finalize the output file from > >> .tmp > >> > > > files. - Responsibility of output operator > >> > > > 2. End the application, once the data is read and processed - > >> > > > Responsibility of input operator > >> > > > > >> > > > These functions are essential to allow the user to do higher lev= el > >> > > > operations like scheduling or running a workflow of batch > >> applications. > >> > > > > >> > > > I am not sure about intermediate (processing) operators, as ther= e > >> is no > >> > > > change in their functionality for batch use cases. Perhaps, > allowing > >> > > > running multiple batches in a single application may require > similar > >> > > > changes in processing operators as well. > >> > > > > >> > > > ~ Bhupesh > >> > > > > >> > > > On Mon, Jan 16, 2017 at 2:19 PM, Priyanka Gugale < > priyag@apache.org > >> > > >> > > > wrote: > >> > > > > >> > > > > Will it make an impression on user that, if he has a batch > >> usecase he > >> > > has > >> > > > > to use batch aware operators only? If so, is that what we > expect? > >> I > >> > am > >> > > > not > >> > > > > aware of how do we implement batch scenario so this might be a > >> basic > >> > > > > question. > >> > > > > > >> > > > > -Priyanka > >> > > > > > >> > > > > On Mon, Jan 16, 2017 at 12:02 PM, Bhupesh Chawda < > >> > > > bhupesh@datatorrent.com> > >> > > > > wrote: > >> > > > > > >> > > > > > Hi All, > >> > > > > > > >> > > > > > While design / implementation for custom control tuples is > >> > ongoing, I > >> > > > > > thought it would be a good idea to consider its usefulness i= n > >> one > >> > of > >> > > > the > >> > > > > > use cases - batch applications. > >> > > > > > > >> > > > > > This is a proposal to adapt / extend existing operators in t= he > >> > Apache > >> > > > > Apex > >> > > > > > Malhar library so that it is easy to use them in batch use > >> cases. > >> > > > > > Naturally, this would be applicable for only a subset of > >> operators > >> > > like > >> > > > > > File, JDBC and NoSQL databases. > >> > > > > > For example, for a file based store, (say HDFS store), we > could > >> > have > >> > > > > > FileBatchInput and FileBatchOutput operators which allow eas= y > >> > > > integration > >> > > > > > into a batch application. These operators would be extended > from > >> > > their > >> > > > > > existing implementations and would be "Batch Aware", in that > >> they > >> > may > >> > > > > > understand the meaning of some specific control tuples that > flow > >> > > > through > >> > > > > > the DAG. Start batch and end batch seem to be the obvious > >> > candidates > >> > > > that > >> > > > > > come to mind. On receipt of such control tuples, they may tr= y > to > >> > > modify > >> > > > > the > >> > > > > > behavior of the operator - to reinitialize some metrics or > >> finalize > >> > > an > >> > > > > > output file for example. > >> > > > > > > >> > > > > > We can discuss the potential control tuples and actions in > >> detail, > >> > > but > >> > > > > > first I would like to understand the views of the community > for > >> > this > >> > > > > > proposal. > >> > > > > > > >> > > > > > ~ Bhupesh > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > > > > > --001a11402c029c43d605489ec440--