Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id A9105200C29 for ; Tue, 28 Feb 2017 16:57:44 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id A7892160B7C; Tue, 28 Feb 2017 15:57:44 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 5888A160B6A for ; Tue, 28 Feb 2017 16:57:43 +0100 (CET) Received: (qmail 61543 invoked by uid 500); 28 Feb 2017 15:57:42 -0000 Mailing-List: contact dev-help@apex.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@apex.apache.org Delivered-To: mailing list dev@apex.apache.org Received: (qmail 61531 invoked by uid 99); 28 Feb 2017 15:57:42 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 28 Feb 2017 15:57:42 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id B4C84C094B for ; Tue, 28 Feb 2017 15:57:41 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.68 X-Spam-Level: * X-Spam-Status: No, score=1.68 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, HTML_MESSAGE=2, RCVD_IN_DNSWL_LOW=-0.7, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RCVD_IN_SORBS_SPAM=0.5, SPF_PASS=-0.001, URIBL_BLOCKED=0.001] autolearn=disabled Authentication-Results: spamd4-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id elRzQi85GTav for ; Tue, 28 Feb 2017 15:57:33 +0000 (UTC) Received: from mail-qk0-f173.google.com (mail-qk0-f173.google.com [209.85.220.173]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTPS id C5BCE5FE0B for ; Tue, 28 Feb 2017 15:57:32 +0000 (UTC) Received: by mail-qk0-f173.google.com with SMTP id s186so24781304qkb.1 for ; Tue, 28 Feb 2017 07:57:32 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20161025; h=mime-version:in-reply-to:references:from:date:message-id:subject:to; bh=kSI5AFu0mLUVz39uySCxj6pjHK/5grIEv2DqpepANA8=; b=OSmXdVFRIbNcW6ezqAq4whFrRAkUcoaTB6AmGnoh9mdzG9ScShN3mjjreX3f9ufrlQ Ngyz06wGGYDcBE5Eu89JCGVjiRBa0LanPy4GiZ32uJV/R3YJUKx4SwjiNirbgOCNbLVH Bov7UDvYaMRwmJLtqdbwjb09aVt5XXcCKHETqvLIF9f9WC+YQ29QZCg+/5BuPDlbdc5G SEm+zeI0hrgqd4AQDtIiqlZsrDefTvUGuTyVE/EC9+18332yzlJ72rfXJwz5oUXNeYZ1 WpoBR+ep+8WRXR4RJwC6KdcVE/ILg9JK/zxRiWl3kgtXpVm8fl11cxb/zaCOuNV9S5sC 2GaA== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:mime-version:in-reply-to:references:from:date :message-id:subject:to; bh=kSI5AFu0mLUVz39uySCxj6pjHK/5grIEv2DqpepANA8=; b=mi85w09LiWBEz3E0hatFvyPv0f8Pn67+yhoCBlnSnWiJJJJaphRqfRBPiEcs/wtFxS EbD1YoqBbIsrRk39rAMo8rJZLA92VRbJllRNwCne55pGLJwDYlULO/azaiSqCZ11GRoL VcHh8ACyJEjrJkTiayirOcRRGtN7x3cpYQm/EV9HFgQR2x4cCZiA3TWGQUBo6JveMPor SDZ4Ako+AgLI2m1piMRYWUSiTbsPC6gr/HciRyjQ6vTiGO4smt4M3NQM88087fH+Jt4v veZM+1KGI/36Vv4Mjd35TKFl0zt311ithNWOBkhEVMFevsFo306iiVTf03Yxm9u3rP0M oXfg== X-Gm-Message-State: AMke39mNI8fz8wfcOunukQRmmwVm0+Rcl95YuK5KQ6gqxYlWKMGKast20M4Bnu856Dmv813RHhnl7t4tUnxTUQ== X-Received: by 10.237.50.6 with SMTP id y6mr3477013qtd.115.1488297450523; Tue, 28 Feb 2017 07:57:30 -0800 (PST) MIME-Version: 1.0 Received: by 10.140.81.209 with HTTP; Tue, 28 Feb 2017 07:57:09 -0800 (PST) In-Reply-To: References: From: David Yan Date: Tue, 28 Feb 2017 07:57:09 -0800 Message-ID: Subject: Re: [DISCUSS] Proposal for adapting Malhar operators for batch use cases To: dev@apex.apache.org Content-Type: multipart/alternative; boundary=94eb2c12381cdbdcb20549994010 archived-at: Tue, 28 Feb 2017 15:57:44 -0000 --94eb2c12381cdbdcb20549994010 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: quoted-printable There is a discussion in the Flink mailing list about key-based watermarks. I think it's relevant to our use case here. https://lists.apache.org/thread.html/2b90d5b1d5e2654212cfbbcc6510ef424bbafc= 4fadb164bd5aff9216@%3Cdev.flink.apache.org%3E David On Tue, Feb 28, 2017 at 2:13 AM, Bhupesh Chawda wrote: > Hi David, > > If using time window does not seem appropriate, we can have another class > which is more suited for such sequential and distinct windows. Perhaps, a > CustomWindow option can be introduced which takes in a window id. The > purpose of this window option could be to translate the window id into > appropriate timestamps. > > Another option would be to go with a custom timestampExtractor for such > tuples which translates the each unique file name to a distinct timestamp > while using time windows in the windowed operator. > > ~ Bhupesh > > > _______________________________________________________ > > Bhupesh Chawda > > E: bhupesh@datatorrent.com | Twitter: @bhupeshsc > > www.datatorrent.com | apex.apache.org > > > > On Tue, Feb 28, 2017 at 12:28 AM, David Yan wrote: > > > I now see your rationale on putting the filename in the window. > > As far as I understand, the reasons why the filename is not part of the > key > > and the Global Window is not used are: > > > > 1) The files are processed in sequence, not in parallel > > 2) The windowed operator should not keep the state associated with the > file > > when the processing of the file is done > > 3) The trigger should be fired for the file when a file is done > processing. > > > > However, if the file is just a sequence has nothing to do with a > timestamp, > > assigning a timestamp to a file is not an intuitive thing to do and wou= ld > > just create confusions to the users, especially when it's used as an > > example for new users. > > > > How about having a separate class called SequenceWindow? And perhaps > > TimeWindow can inherit from it? > > > > David > > > > On Mon, Feb 27, 2017 at 8:58 AM, Thomas Weise wrote: > > > > > On Mon, Feb 27, 2017 at 8:50 AM, Bhupesh Chawda < > bhupesh@datatorrent.com > > > > > > wrote: > > > > > > > I think my comments related to count based windows might be causing > > > > confusion. Let's not discuss count based scenarios for now. > > > > > > > > Just want to make sure we are on the same page wrt. the "each file > is a > > > > batch" use case. As mentioned by Thomas, the each tuple from the sa= me > > > file > > > > has the same timestamp (which is just a sequence number) and that > helps > > > > keep tuples from each file in a separate window. > > > > > > > > > > Yes, in this case it is a sequence number, but it could be a time sta= mp > > > also, depending on the file naming convention. And if it was event ti= me > > > processing, the watermark would be derived from records within the > file. > > > > > > Agreed, the source should have a mechanism to control the time stamp > > > extraction along with everything else pertaining to the watermark > > > generation. > > > > > > > > > > We could also implement a "timestampExtractor" interface to identif= y > > the > > > > timestamp (sequence number) for a file. > > > > > > > > ~ Bhupesh > > > > > > > > > > > > _______________________________________________________ > > > > > > > > Bhupesh Chawda > > > > > > > > E: bhupesh@datatorrent.com | Twitter: @bhupeshsc > > > > > > > > www.datatorrent.com | apex.apache.org > > > > > > > > > > > > > > > > On Mon, Feb 27, 2017 at 9:52 PM, Thomas Weise > wrote: > > > > > > > > > I don't think this is a use case for count based window. > > > > > > > > > > We have multiple files that are retrieved in a sequence and there > is > > no > > > > > knowledge of the number of records per file. The requirement is t= o > > > > > aggregate each file separately and emit the aggregate when the fi= le > > is > > > > read > > > > > fully. There is no concept of "end of something" for an individua= l > > key > > > > and > > > > > global window isn't applicable. > > > > > > > > > > However, as already explained and implemented by Bhupesh, this ca= n > be > > > > > solved using watermark and window (in this case the window > timestamp > > > > isn't > > > > > a timestamp, but a file sequence, but that doesn't matter. > > > > > > > > > > Thomas > > > > > > > > > > > > > > > On Mon, Feb 27, 2017 at 8:05 AM, David Yan > > wrote: > > > > > > > > > > > I don't think this is the way to go. Global Window only means t= he > > > > > timestamp > > > > > > does not matter (or that there is no timestamp). It does not > > > > necessarily > > > > > > mean it's a large batch. Unless there is some notion of event > time > > > for > > > > > each > > > > > > file, you don't want to embed the file into the window itself. > > > > > > > > > > > > If you want the result broken up by file name, and if the files > are > > > to > > > > be > > > > > > processed in parallel, I think making the file name be part of > the > > > key > > > > is > > > > > > the way to go. I think it's very confusing if we somehow make t= he > > > file > > > > to > > > > > > be part of the window. > > > > > > > > > > > > For count-based window, it's not implemented yet and you're > welcome > > > to > > > > > add > > > > > > that feature. In case of count-based windows, there would be no > > > notion > > > > of > > > > > > time and you probably only trigger at the end of each window. I= n > > the > > > > case > > > > > > of count-based windows, the watermark only matters for batch > since > > > you > > > > > need > > > > > > a way to know when the batch has ended (if the count is 10, the > > > number > > > > of > > > > > > tuples in the batch is let's say 105, you need a way to end the > > last > > > > > window > > > > > > with 5 tuples). > > > > > > > > > > > > David > > > > > > > > > > > > On Mon, Feb 27, 2017 at 2:41 AM, Bhupesh Chawda < > > > > bhupesh@datatorrent.com > > > > > > > > > > > > wrote: > > > > > > > > > > > > > Hi David, > > > > > > > > > > > > > > Thanks for your comments. > > > > > > > > > > > > > > The wordcount example that I created based on the windowed > > operator > > > > > does > > > > > > > processing of word counts per file (each file as a separate > > batch), > > > > > i.e. > > > > > > > process counts for each file and dump into separate files. > > > > > > > As I understand Global window is for one large batch; i.e. al= l > > > > incoming > > > > > > > data falls into the same batch. This could not be processed > using > > > > > > > GlobalWindow option as we need more than one windows. In this > > > case, I > > > > > > > configured the windowed operator to have time windows of 1ms > each > > > and > > > > > > > passed data for each file with increasing timestamps: (file1, > 1), > > > > > (file2, > > > > > > > 2) and so on. Is there a better way of handling this scenario= ? > > > > > > > > > > > > > > Regarding (2 - count based windows), I think there is a trigg= er > > > > option > > > > > to > > > > > > > process count based windows. In case I want to process every > 1000 > > > > > tuples > > > > > > as > > > > > > > a batch, I could set the Trigger option to CountTrigger with > the > > > > > > > accumulation set to Discarding. Is this correct? > > > > > > > > > > > > > > I agree that (4. Final Watermark) can be done using Global > > window. > > > > > > > > > > > > > > =E2=80=8B~ Bhupesh=E2=80=8B > > > > > > > > > > > > > > _______________________________________________________ > > > > > > > > > > > > > > Bhupesh Chawda > > > > > > > > > > > > > > E: bhupesh@datatorrent.com | Twitter: @bhupeshsc > > > > > > > > > > > > > > www.datatorrent.com | apex.apache.org > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Mon, Feb 27, 2017 at 12:18 PM, David Yan < > davidyan@gmail.com> > > > > > wrote: > > > > > > > > > > > > > > > I'm worried that we are making the watermark concept too > > > > complicated. > > > > > > > > > > > > > > > > Watermarks should simply just tell you what windows can be > > > > considered > > > > > > > > complete. > > > > > > > > > > > > > > > > Point 2 is basically a count-based window. Watermarks do no= t > > > play a > > > > > > role > > > > > > > > here because the window is always complete at the n-th tupl= e. > > > > > > > > > > > > > > > > If I understand correctly, point 3 is for batch processing = of > > > > files. > > > > > > > Unless > > > > > > > > the files contain timed events, it sounds to be that this c= an > > be > > > > > > achieved > > > > > > > > with just a Global Window. For signaling EOF, a watermark > with > > a > > > > > > > +infinity > > > > > > > > timestamp can be used so that triggers will be fired upon > > receipt > > > > of > > > > > > that > > > > > > > > watermark. > > > > > > > > > > > > > > > > For point 4, just like what I mentioned above, can be > achieved > > > > with a > > > > > > > > watermark with a +infinity timestamp. > > > > > > > > > > > > > > > > David > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Sat, Feb 18, 2017 at 8:04 AM, Bhupesh Chawda < > > > > > > bhupesh@datatorrent.com > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > Hi Thomas, > > > > > > > > > > > > > > > > > > For an input operator which is supposed to generate > > watermarks > > > > for > > > > > > > > > downstream operators, I can think about the following > > > watermarks > > > > > that > > > > > > > the > > > > > > > > > operator can emit: > > > > > > > > > 1. Time based watermarks (the high watermark / low > watermark) > > > > > > > > > 2. Number of tuple based watermarks (Every n tuples) > > > > > > > > > 3. File based watermarks (Start file, end file) > > > > > > > > > 4. Final watermark > > > > > > > > > > > > > > > > > > File based watermarks seem to be applicable for batch (fi= le > > > > based) > > > > > as > > > > > > > > well, > > > > > > > > > and hence I thought of looking at these first. Does this > seem > > > to > > > > be > > > > > > in > > > > > > > > line > > > > > > > > > with the thought process? > > > > > > > > > > > > > > > > > > ~ Bhupesh > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > _______________________________________________________ > > > > > > > > > > > > > > > > > > Bhupesh Chawda > > > > > > > > > > > > > > > > > > Software Engineer > > > > > > > > > > > > > > > > > > E: bhupesh@datatorrent.com | Twitter: @bhupeshsc > > > > > > > > > > > > > > > > > > www.datatorrent.com | apex.apache.org > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Thu, Feb 16, 2017 at 10:37 AM, Thomas Weise < > > thw@apache.org > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > I don't think this should be designed based on a > simplistic > > > > file > > > > > > > > > > input-output scenario. It would be good to include a > > stateful > > > > > > > > > > transformation based on event time. > > > > > > > > > > > > > > > > > > > > More complex pipelines contain stateful transformations > > that > > > > > depend > > > > > > > on > > > > > > > > > > windowing and watermarks. I think we need a watermark > > concept > > > > > that > > > > > > is > > > > > > > > > based > > > > > > > > > > on progress in event time (or other monotonic increasin= g > > > > > sequence) > > > > > > > that > > > > > > > > > > other operators can generically work with. > > > > > > > > > > > > > > > > > > > > Note that even file input in many cases can produce tim= e > > > based > > > > > > > > > watermarks, > > > > > > > > > > for example when you read part files that are bound by > > event > > > > > time. > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > Thomas > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Wed, Feb 15, 2017 at 4:02 AM, Bhupesh Chawda < > > > > > > > > bhupesh@datatorrent.com > > > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > For better understanding the use case for control > tuples > > in > > > > > > batch, > > > > > > > =E2=80=8BI > > > > > > > > > am > > > > > > > > > > > creating a prototype for a batch application using Fi= le > > > Input > > > > > and > > > > > > > > File > > > > > > > > > > > Output operators. > > > > > > > > > > > > > > > > > > > > > > To enable basic batch processing for File IO > operators, I > > > am > > > > > > > > proposing > > > > > > > > > > the > > > > > > > > > > > following changes to File input and output operators: > > > > > > > > > > > 1. File Input operator emits a watermark each time it > > opens > > > > and > > > > > > > > closes > > > > > > > > > a > > > > > > > > > > > file. These can be "start file" and "end file" > watermarks > > > > which > > > > > > > > include > > > > > > > > > > the > > > > > > > > > > > corresponding file names. The "start file" tuple shou= ld > > be > > > > sent > > > > > > > > before > > > > > > > > > > any > > > > > > > > > > > of the data from that file flows. > > > > > > > > > > > 2. File Input operator can be configured to end the > > > > application > > > > > > > > after a > > > > > > > > > > > single or n scans of the directory (a batch). This is > > where > > > > the > > > > > > > > > operator > > > > > > > > > > > emits the final watermark (the end of application > control > > > > > tuple). > > > > > > > > This > > > > > > > > > > will > > > > > > > > > > > also shutdown the application. > > > > > > > > > > > 3. The File output operator handles these control > tuples. > > > > > "Start > > > > > > > > file" > > > > > > > > > > > initializes the file name for the incoming tuples. "E= nd > > > file" > > > > > > > > watermark > > > > > > > > > > > forces a finalize on that file. > > > > > > > > > > > > > > > > > > > > > > The user would be able to enable the operators to sen= d > > only > > > > > those > > > > > > > > > > > watermarks that are needed in the application. If non= e > of > > > the > > > > > > > options > > > > > > > > > are > > > > > > > > > > > configured, the operators behave as in a streaming > > > > application. > > > > > > > > > > > > > > > > > > > > > > There are a few challenges in the implementation wher= e > > the > > > > > input > > > > > > > > > operator > > > > > > > > > > > is partitioned. In this case, the correlation between > the > > > > > > start/end > > > > > > > > > for a > > > > > > > > > > > file and the data tuples for that file is lost. Hence > we > > > need > > > > > to > > > > > > > > > maintain > > > > > > > > > > > the filename as part of each tuple in the pipeline. > > > > > > > > > > > > > > > > > > > > > > The "start file" and "end file" control tuples in thi= s > > > > example > > > > > > are > > > > > > > > > > > temporary names for watermarks. We can have generic > > "start > > > > > > batch" / > > > > > > > > > "end > > > > > > > > > > > batch" tuples which could be used for other use cases > as > > > > well. > > > > > > The > > > > > > > > > Final > > > > > > > > > > > watermark is common and serves the same purpose in ea= ch > > > case. > > > > > > > > > > > > > > > > > > > > > > Please let me know your thoughts on this. > > > > > > > > > > > > > > > > > > > > > > ~ Bhupesh > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Wed, Jan 18, 2017 at 12:22 AM, Bhupesh Chawda < > > > > > > > > > > bhupesh@datatorrent.com> > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > Yes, this can be part of operator configuration. > Given > > > > this, > > > > > > for > > > > > > > a > > > > > > > > > user > > > > > > > > > > > to > > > > > > > > > > > > define a batch application, would mean configuring > the > > > > > > connectors > > > > > > > > > > (mostly > > > > > > > > > > > > the input operator) in the application for the > desired > > > > > > behavior. > > > > > > > > > > > Similarly, > > > > > > > > > > > > there can be other use cases that can be achieved > other > > > > than > > > > > > > batch. > > > > > > > > > > > > > > > > > > > > > > > > We may also need to take care of the following: > > > > > > > > > > > > 1. Make sure that the watermarks or control tuples > are > > > > > > consistent > > > > > > > > > > across > > > > > > > > > > > > sources. Meaning an HDFS sink should be able to > > interpret > > > > the > > > > > > > > > watermark > > > > > > > > > > > > tuple sent out by, say, a JDBC source. > > > > > > > > > > > > 2. In addition to I/O connectors, we should also lo= ok > > at > > > > the > > > > > > need > > > > > > > > for > > > > > > > > > > > > processing operators to understand some of the > control > > > > > tuples / > > > > > > > > > > > watermarks. > > > > > > > > > > > > For example, we may want to reset the operator > behavior > > > on > > > > > > > arrival > > > > > > > > of > > > > > > > > > > > some > > > > > > > > > > > > watermark tuple. > > > > > > > > > > > > > > > > > > > > > > > > ~ Bhupesh > > > > > > > > > > > > > > > > > > > > > > > > On Tue, Jan 17, 2017 at 9:59 PM, Thomas Weise < > > > > > thw@apache.org> > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > >> The HDFS source can operate in two modes, bounded = or > > > > > > unbounded. > > > > > > > If > > > > > > > > > you > > > > > > > > > > > >> scan > > > > > > > > > > > >> only once, then it should emit the final watermark > > after > > > > it > > > > > is > > > > > > > > done. > > > > > > > > > > > >> Otherwise it would emit watermarks based on a poli= cy > > > > (files > > > > > > > names > > > > > > > > > > etc.). > > > > > > > > > > > >> The mechanism to generate the marks may depend on > the > > > type > > > > > of > > > > > > > > source > > > > > > > > > > and > > > > > > > > > > > >> the user needs to be able to influence/configure i= t. > > > > > > > > > > > >> > > > > > > > > > > > >> Thomas > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > >> On Tue, Jan 17, 2017 at 5:03 AM, Bhupesh Chawda < > > > > > > > > > > > bhupesh@datatorrent.com> > > > > > > > > > > > >> wrote: > > > > > > > > > > > >> > > > > > > > > > > > >> > Hi Thomas, > > > > > > > > > > > >> > > > > > > > > > > > > >> > I am not sure that I completely understand your > > > > > suggestion. > > > > > > > Are > > > > > > > > > you > > > > > > > > > > > >> > suggesting to broaden the scope of the proposal = to > > > treat > > > > > all > > > > > > > > > sources > > > > > > > > > > > as > > > > > > > > > > > >> > bounded as well as unbounded? > > > > > > > > > > > >> > > > > > > > > > > > > >> > In case of Apex, we treat all sources as unbound= ed > > > > > sources. > > > > > > > Even > > > > > > > > > > > bounded > > > > > > > > > > > >> > sources like HDFS file source is treated as > > unbounded > > > by > > > > > > means > > > > > > > > of > > > > > > > > > > > >> scanning > > > > > > > > > > > >> > the input directory repeatedly. > > > > > > > > > > > >> > > > > > > > > > > > > >> > Let's consider HDFS file source for example: > > > > > > > > > > > >> > In this case, if we treat it as a bounded source= , > we > > > can > > > > > > > define > > > > > > > > > > hooks > > > > > > > > > > > >> which > > > > > > > > > > > >> > allows us to detect the end of the file and send > the > > > > > "final > > > > > > > > > > > watermark". > > > > > > > > > > > >> We > > > > > > > > > > > >> > could also consider HDFS file source as a > streaming > > > > source > > > > > > and > > > > > > > > > > define > > > > > > > > > > > >> hooks > > > > > > > > > > > >> > which send watermarks based on different kinds o= f > > > > windows. > > > > > > > > > > > >> > > > > > > > > > > > > >> > Please correct me if I misunderstand. > > > > > > > > > > > >> > > > > > > > > > > > > >> > ~ Bhupesh > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > > >> > On Mon, Jan 16, 2017 at 9:23 PM, Thomas Weise < > > > > > > thw@apache.org > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > >> > > > > > > > > > > > > >> > > Bhupesh, > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > Please see how that can be solved in a unified > way > > > > using > > > > > > > > windows > > > > > > > > > > and > > > > > > > > > > > >> > > watermarks. It is bounded data vs. unbounded > data. > > > In > > > > > Beam > > > > > > > for > > > > > > > > > > > >> example, > > > > > > > > > > > >> > you > > > > > > > > > > > >> > > can use the "global window" and the final > > watermark > > > to > > > > > > > > > accomplish > > > > > > > > > > > what > > > > > > > > > > > >> > you > > > > > > > > > > > >> > > are looking for. Batch is just a special case = of > > > > > streaming > > > > > > > > where > > > > > > > > > > the > > > > > > > > > > > >> > source > > > > > > > > > > > >> > > emits the final watermark. > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > Thanks, > > > > > > > > > > > >> > > Thomas > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > On Mon, Jan 16, 2017 at 1:02 AM, Bhupesh Chawd= a > < > > > > > > > > > > > >> bhupesh@datatorrent.com > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > wrote: > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > Yes, if the user needs to develop a batch > > > > application, > > > > > > > then > > > > > > > > > > batch > > > > > > > > > > > >> aware > > > > > > > > > > > >> > > > operators need to be used in the application= . > > > > > > > > > > > >> > > > The nature of the application is mostly > > controlled > > > > by > > > > > > the > > > > > > > > > input > > > > > > > > > > > and > > > > > > > > > > > >> the > > > > > > > > > > > >> > > > output operators used in the application. > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > For example, consider an application which > needs > > > to > > > > > > filter > > > > > > > > > > records > > > > > > > > > > > >> in a > > > > > > > > > > > >> > > > input file and store the filtered records in > > > another > > > > > > file. > > > > > > > > The > > > > > > > > > > > >> nature > > > > > > > > > > > >> > of > > > > > > > > > > > >> > > > this app is to end once the entire file is > > > > processed. > > > > > > > > > Following > > > > > > > > > > > >> things > > > > > > > > > > > >> > > are > > > > > > > > > > > >> > > > expected of the application: > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > 1. Once the input data is over, finalize > the > > > > output > > > > > > > file > > > > > > > > > from > > > > > > > > > > > >> .tmp > > > > > > > > > > > >> > > > files. - Responsibility of output operato= r > > > > > > > > > > > >> > > > 2. End the application, once the data is > read > > > and > > > > > > > > > processed - > > > > > > > > > > > >> > > > Responsibility of input operator > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > These functions are essential to allow the > user > > to > > > > do > > > > > > > higher > > > > > > > > > > level > > > > > > > > > > > >> > > > operations like scheduling or running a > workflow > > > of > > > > > > batch > > > > > > > > > > > >> applications. > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > I am not sure about intermediate (processing= ) > > > > > operators, > > > > > > > as > > > > > > > > > > there > > > > > > > > > > > >> is no > > > > > > > > > > > >> > > > change in their functionality for batch use > > cases. > > > > > > > Perhaps, > > > > > > > > > > > allowing > > > > > > > > > > > >> > > > running multiple batches in a single > application > > > may > > > > > > > require > > > > > > > > > > > similar > > > > > > > > > > > >> > > > changes in processing operators as well. > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > ~ Bhupesh > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > On Mon, Jan 16, 2017 at 2:19 PM, Priyanka > > Gugale < > > > > > > > > > > > priyag@apache.org > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > wrote: > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > Will it make an impression on user that, i= f > he > > > > has a > > > > > > > batch > > > > > > > > > > > >> usecase he > > > > > > > > > > > >> > > has > > > > > > > > > > > >> > > > > to use batch aware operators only? If so, = is > > > that > > > > > what > > > > > > > we > > > > > > > > > > > expect? > > > > > > > > > > > >> I > > > > > > > > > > > >> > am > > > > > > > > > > > >> > > > not > > > > > > > > > > > >> > > > > aware of how do we implement batch scenari= o > so > > > > this > > > > > > > might > > > > > > > > > be a > > > > > > > > > > > >> basic > > > > > > > > > > > >> > > > > question. > > > > > > > > > > > >> > > > > > > > > > > > > > > > >> > > > > -Priyanka > > > > > > > > > > > >> > > > > > > > > > > > > > > > >> > > > > On Mon, Jan 16, 2017 at 12:02 PM, Bhupesh > > > Chawda < > > > > > > > > > > > >> > > > bhupesh@datatorrent.com> > > > > > > > > > > > >> > > > > wrote: > > > > > > > > > > > >> > > > > > > > > > > > > > > > >> > > > > > Hi All, > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > While design / implementation for custom > > > control > > > > > > > tuples > > > > > > > > is > > > > > > > > > > > >> > ongoing, I > > > > > > > > > > > >> > > > > > thought it would be a good idea to > consider > > > its > > > > > > > > usefulness > > > > > > > > > > in > > > > > > > > > > > >> one > > > > > > > > > > > >> > of > > > > > > > > > > > >> > > > the > > > > > > > > > > > >> > > > > > use cases - batch applications. > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > This is a proposal to adapt / extend > > existing > > > > > > > operators > > > > > > > > in > > > > > > > > > > the > > > > > > > > > > > >> > Apache > > > > > > > > > > > >> > > > > Apex > > > > > > > > > > > >> > > > > > Malhar library so that it is easy to use > > them > > > in > > > > > > batch > > > > > > > > use > > > > > > > > > > > >> cases. > > > > > > > > > > > >> > > > > > Naturally, this would be applicable for > > only a > > > > > > subset > > > > > > > of > > > > > > > > > > > >> operators > > > > > > > > > > > >> > > like > > > > > > > > > > > >> > > > > > File, JDBC and NoSQL databases. > > > > > > > > > > > >> > > > > > For example, for a file based store, (sa= y > > HDFS > > > > > > store), > > > > > > > > we > > > > > > > > > > > could > > > > > > > > > > > >> > have > > > > > > > > > > > >> > > > > > FileBatchInput and FileBatchOutput > operators > > > > which > > > > > > > allow > > > > > > > > > > easy > > > > > > > > > > > >> > > > integration > > > > > > > > > > > >> > > > > > into a batch application. These operator= s > > > would > > > > be > > > > > > > > > extended > > > > > > > > > > > from > > > > > > > > > > > >> > > their > > > > > > > > > > > >> > > > > > existing implementations and would be > "Batch > > > > > Aware", > > > > > > > in > > > > > > > > > that > > > > > > > > > > > >> they > > > > > > > > > > > >> > may > > > > > > > > > > > >> > > > > > understand the meaning of some specific > > > control > > > > > > tuples > > > > > > > > > that > > > > > > > > > > > flow > > > > > > > > > > > >> > > > through > > > > > > > > > > > >> > > > > > the DAG. Start batch and end batch seem = to > > be > > > > the > > > > > > > > obvious > > > > > > > > > > > >> > candidates > > > > > > > > > > > >> > > > that > > > > > > > > > > > >> > > > > > come to mind. On receipt of such control > > > tuples, > > > > > > they > > > > > > > > may > > > > > > > > > > try > > > > > > > > > > > to > > > > > > > > > > > >> > > modify > > > > > > > > > > > >> > > > > the > > > > > > > > > > > >> > > > > > behavior of the operator - to reinitiali= ze > > > some > > > > > > > metrics > > > > > > > > or > > > > > > > > > > > >> finalize > > > > > > > > > > > >> > > an > > > > > > > > > > > >> > > > > > output file for example. > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > We can discuss the potential control > tuples > > > and > > > > > > > actions > > > > > > > > in > > > > > > > > > > > >> detail, > > > > > > > > > > > >> > > but > > > > > > > > > > > >> > > > > > first I would like to understand the vie= ws > > of > > > > the > > > > > > > > > community > > > > > > > > > > > for > > > > > > > > > > > >> > this > > > > > > > > > > > >> > > > > > proposal. > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > ~ Bhupesh > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > --94eb2c12381cdbdcb20549994010--