Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 22195200C40 for ; Thu, 23 Mar 2017 12:55:54 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 20917160B84; Thu, 23 Mar 2017 11:55:54 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 5CB14160B75 for ; Thu, 23 Mar 2017 12:55:52 +0100 (CET) Received: (qmail 44007 invoked by uid 500); 23 Mar 2017 11:55:51 -0000 Mailing-List: contact dev-help@apex.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@apex.apache.org Delivered-To: mailing list dev@apex.apache.org Received: (qmail 43985 invoked by uid 99); 23 Mar 2017 11:55:50 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 23 Mar 2017 11:55:50 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id A84E2C25DD for ; Thu, 23 Mar 2017 11:55:49 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -0.787 X-Spam-Level: X-Spam-Status: No, score=-0.787 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, HTML_MESSAGE=2, RCVD_IN_DNSWL_NONE=-0.0001, RCVD_IN_MSPIKE_H2=-2.796, SPF_PASS=-0.001, T_KAM_HTML_FONT_INVALID=0.01] autolearn=disabled Authentication-Results: spamd1-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=datatorrent-com.20150623.gappssmtp.com Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id 1KdX165koEdC for ; Thu, 23 Mar 2017 11:55:44 +0000 (UTC) Received: from mail-ua0-f171.google.com (mail-ua0-f171.google.com [209.85.217.171]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with ESMTPS id D16FA5FAD1 for ; Thu, 23 Mar 2017 11:55:43 +0000 (UTC) Received: by mail-ua0-f171.google.com with SMTP id v22so7285503uaa.1 for ; Thu, 23 Mar 2017 04:55:43 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=datatorrent-com.20150623.gappssmtp.com; s=20150623; h=mime-version:in-reply-to:references:from:date:message-id:subject:to; bh=w6Kkifplk5RZiDUaBWFIK7MYL3uTKnZIKjxbO+iarLI=; b=keB+enwQAlZuKOIMdFP5JdCApg9UtK3jlF/XjBcV05tRZ8H2F3dfxIzM+qlcSE6H0Z MvCW6De8U8g4YIu2C2ge/25mrL6W5TVuvm1jRv3tzk+Uw9AminOPnaPPoY9TdC73WQ4p UinHHIgRnkCY8T71P6JJDL72CaZzV5pxD9f45ab31FcMHFj6mzgmFQw77l63QJX2X4bU LjDhuQfWNJOg8mQPpNMdWNnaGLVTLYRopy2Ilf71yenE2VO0VbZ6MNLeIDpVk+cypYW/ q+sU9v4dxEOsx7Fy+VjRsUnLso3XIU4PeNWyWyr2L43r/6TG8wH4+bkzEbToDoiONQp8 7E+g== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:mime-version:in-reply-to:references:from:date :message-id:subject:to; bh=w6Kkifplk5RZiDUaBWFIK7MYL3uTKnZIKjxbO+iarLI=; b=PYHyFHV9uye2VVHjG9ZLE727EZWWb9u6J4nn2FkMOB2iQ3NZxc7ZoFNgwgv95eyZui 8E3PuATB2vrzgfriAeNSjEAogkRh5j8mSZ2RtQaKzYgItRDK3szoZ6e9fpeMnmy3zk4+ PIhhdwlmcScu0EwX0FXsWCbMu9XgMK54zzqJ4P+wpGDBQVlaya+RC+k61BWlrN/S0ADE q3Br9IyvpDsmft6lmxL867rKpE39NFx4IUj9WOFVYitVxwoeXw9fqQdh4fNsRwikmip6 ZDEpLNyGKfm53vnrmCFgya5nhnHb9lzXUXeNsIKkZTfRC3AoF4tH/Rezw7qExY81Jm29 9OwQ== X-Gm-Message-State: AFeK/H1xlv12stMnO2gW/yi8c5nRp5W/3ejr58tnFsT+cS0Pyk7IzgafiW0znxp3k/rdR1aVNAImL5N6DIrzMCli X-Received: by 10.176.65.41 with SMTP id j38mr834777uad.88.1490270142377; Thu, 23 Mar 2017 04:55:42 -0700 (PDT) MIME-Version: 1.0 Received: by 10.103.45.73 with HTTP; Thu, 23 Mar 2017 04:55:21 -0700 (PDT) In-Reply-To: References: From: Bhupesh Chawda Date: Thu, 23 Mar 2017 17:25:21 +0530 Message-ID: Subject: Re: [DISCUSS] Proposal for adapting Malhar operators for batch use cases To: dev Content-Type: multipart/alternative; boundary=94eb2c12359274d937054b648e5d archived-at: Thu, 23 Mar 2017 11:55:54 -0000 --94eb2c12359274d937054b648e5d Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: quoted-printable Hi All, I think we have some agreement on the way we should use control tuples for File I/O operators to support batch. In order to have more operators in Malhar, support this paradigm, I think we should also look at store operators - JDBC, Cassandra, HBase etc. The case with these operators is simpler as most of these do not poll the sources (except JDBC poller operator) and just stop once they have read a fixed amount of data. In other words, these are inherently batch sources. The only change that we should add to these operators is to shut down the DAG once the reading of data is done. For a windowed operator this would mean a Global window with a final watermark before the DAG is shut down. ~ Bhupesh _______________________________________________________ Bhupesh Chawda E: bhupesh@datatorrent.com | Twitter: @bhupeshsc www.datatorrent.com | apex.apache.org On Tue, Feb 28, 2017 at 10:59 PM, Bhupesh Chawda wrote: > Hi Thomas, > > Even though the windowing operator is not just "event time", it seems it > is too much dependent on the "time" attribute of the incoming tuple. This > is the reason we had to model the file index as a timestamp to solve the > batch case for files. > Perhaps we should work on increasing the scope of the windowed operator t= o > consider other types of windows as well. The Sequence option suggested by > David seems to be something in that direction. > > ~ Bhupesh > > > _______________________________________________________ > > Bhupesh Chawda > > E: bhupesh@datatorrent.com | Twitter: @bhupeshsc > > www.datatorrent.com | apex.apache.org > > > > On Tue, Feb 28, 2017 at 10:48 PM, Thomas Weise wrote: > >> That's correct, we are looking at a generalized approach for state >> management vs. a series of special cases. >> >> And to be clear, windowing does not imply event time, otherwise it would >> be >> "EventTimeOperator" :-) >> >> Thomas >> >> On Tue, Feb 28, 2017 at 9:11 AM, Bhupesh Chawda >> wrote: >> >> > Hi David, >> > >> > I went through the discussion, but it seems like it is more on the eve= nt >> > time watermark handling as opposed to batches. What we are trying to d= o >> is >> > have watermarks serve the purpose of demarcating batches using control >> > tuples. Since each batch is separate from others, we would like to hav= e >> > stateful processing within a batch, but not across batches. >> > At the same time, we would like to do this in a manner which is >> consistent >> > with the windowing mechanism provided by the windowed operator. This >> will >> > allow us to treat a single batch as a (bounded) stream and apply all t= he >> > event time windowing concepts in that time span. >> > >> > For example, let's say I need to process data for a day (24 hours) as = a >> > single batch. The application is still streaming in nature: it would e= nd >> > the batch after a day and start a new batch the next day. At the same >> time, >> > I would be able to have early trigger firings every minute as well as >> drop >> > any data which is say, 5 mins late. All this within a single day. >> > >> > ~ Bhupesh >> > >> > >> > >> > _______________________________________________________ >> > >> > Bhupesh Chawda >> > >> > E: bhupesh@datatorrent.com | Twitter: @bhupeshsc >> > >> > www.datatorrent.com | apex.apache.org >> > >> > >> > >> > On Tue, Feb 28, 2017 at 9:27 PM, David Yan wrote: >> > >> > > There is a discussion in the Flink mailing list about key-based >> > watermarks. >> > > I think it's relevant to our use case here. >> > > https://lists.apache.org/thread.html/2b90d5b1d5e2654212cfbbcc6510ef >> > > 424bbafc4fadb164bd5aff9216@%3Cdev.flink.apache.org%3E >> > > >> > > David >> > > >> > > On Tue, Feb 28, 2017 at 2:13 AM, Bhupesh Chawda < >> bhupesh@datatorrent.com >> > > >> > > wrote: >> > > >> > > > Hi David, >> > > > >> > > > If using time window does not seem appropriate, we can have anothe= r >> > class >> > > > which is more suited for such sequential and distinct windows. >> > Perhaps, a >> > > > CustomWindow option can be introduced which takes in a window id. >> The >> > > > purpose of this window option could be to translate the window id >> into >> > > > appropriate timestamps. >> > > > >> > > > Another option would be to go with a custom timestampExtractor for >> such >> > > > tuples which translates the each unique file name to a distinct >> > timestamp >> > > > while using time windows in the windowed operator. >> > > > >> > > > ~ Bhupesh >> > > > >> > > > >> > > > _______________________________________________________ >> > > > >> > > > Bhupesh Chawda >> > > > >> > > > E: bhupesh@datatorrent.com | Twitter: @bhupeshsc >> > > > >> > > > www.datatorrent.com | apex.apache.org >> > > > >> > > > >> > > > >> > > > On Tue, Feb 28, 2017 at 12:28 AM, David Yan >> > wrote: >> > > > >> > > > > I now see your rationale on putting the filename in the window. >> > > > > As far as I understand, the reasons why the filename is not part >> of >> > the >> > > > key >> > > > > and the Global Window is not used are: >> > > > > >> > > > > 1) The files are processed in sequence, not in parallel >> > > > > 2) The windowed operator should not keep the state associated wi= th >> > the >> > > > file >> > > > > when the processing of the file is done >> > > > > 3) The trigger should be fired for the file when a file is done >> > > > processing. >> > > > > >> > > > > However, if the file is just a sequence has nothing to do with a >> > > > timestamp, >> > > > > assigning a timestamp to a file is not an intuitive thing to do >> and >> > > would >> > > > > just create confusions to the users, especially when it's used a= s >> an >> > > > > example for new users. >> > > > > >> > > > > How about having a separate class called SequenceWindow? And >> perhaps >> > > > > TimeWindow can inherit from it? >> > > > > >> > > > > David >> > > > > >> > > > > On Mon, Feb 27, 2017 at 8:58 AM, Thomas Weise >> > wrote: >> > > > > >> > > > > > On Mon, Feb 27, 2017 at 8:50 AM, Bhupesh Chawda < >> > > > bhupesh@datatorrent.com >> > > > > > >> > > > > > wrote: >> > > > > > >> > > > > > > I think my comments related to count based windows might be >> > causing >> > > > > > > confusion. Let's not discuss count based scenarios for now. >> > > > > > > >> > > > > > > Just want to make sure we are on the same page wrt. the "eac= h >> > file >> > > > is a >> > > > > > > batch" use case. As mentioned by Thomas, the each tuple from >> the >> > > same >> > > > > > file >> > > > > > > has the same timestamp (which is just a sequence number) and >> that >> > > > helps >> > > > > > > keep tuples from each file in a separate window. >> > > > > > > >> > > > > > >> > > > > > Yes, in this case it is a sequence number, but it could be a >> time >> > > stamp >> > > > > > also, depending on the file naming convention. And if it was >> event >> > > time >> > > > > > processing, the watermark would be derived from records within >> the >> > > > file. >> > > > > > >> > > > > > Agreed, the source should have a mechanism to control the time >> > stamp >> > > > > > extraction along with everything else pertaining to the >> watermark >> > > > > > generation. >> > > > > > >> > > > > > >> > > > > > > We could also implement a "timestampExtractor" interface to >> > > identify >> > > > > the >> > > > > > > timestamp (sequence number) for a file. >> > > > > > > >> > > > > > > ~ Bhupesh >> > > > > > > >> > > > > > > >> > > > > > > _______________________________________________________ >> > > > > > > >> > > > > > > Bhupesh Chawda >> > > > > > > >> > > > > > > E: bhupesh@datatorrent.com | Twitter: @bhupeshsc >> > > > > > > >> > > > > > > www.datatorrent.com | apex.apache.org >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > On Mon, Feb 27, 2017 at 9:52 PM, Thomas Weise > > >> > > > wrote: >> > > > > > > >> > > > > > > > I don't think this is a use case for count based window. >> > > > > > > > >> > > > > > > > We have multiple files that are retrieved in a sequence an= d >> > there >> > > > is >> > > > > no >> > > > > > > > knowledge of the number of records per file. The >> requirement is >> > > to >> > > > > > > > aggregate each file separately and emit the aggregate when >> the >> > > file >> > > > > is >> > > > > > > read >> > > > > > > > fully. There is no concept of "end of something" for an >> > > individual >> > > > > key >> > > > > > > and >> > > > > > > > global window isn't applicable. >> > > > > > > > >> > > > > > > > However, as already explained and implemented by Bhupesh, >> this >> > > can >> > > > be >> > > > > > > > solved using watermark and window (in this case the window >> > > > timestamp >> > > > > > > isn't >> > > > > > > > a timestamp, but a file sequence, but that doesn't matter. >> > > > > > > > >> > > > > > > > Thomas >> > > > > > > > >> > > > > > > > >> > > > > > > > On Mon, Feb 27, 2017 at 8:05 AM, David Yan < >> davidyan@gmail.com >> > > >> > > > > wrote: >> > > > > > > > >> > > > > > > > > I don't think this is the way to go. Global Window only >> means >> > > the >> > > > > > > > timestamp >> > > > > > > > > does not matter (or that there is no timestamp). It does >> not >> > > > > > > necessarily >> > > > > > > > > mean it's a large batch. Unless there is some notion of >> event >> > > > time >> > > > > > for >> > > > > > > > each >> > > > > > > > > file, you don't want to embed the file into the window >> > itself. >> > > > > > > > > >> > > > > > > > > If you want the result broken up by file name, and if th= e >> > files >> > > > are >> > > > > > to >> > > > > > > be >> > > > > > > > > processed in parallel, I think making the file name be >> part >> > of >> > > > the >> > > > > > key >> > > > > > > is >> > > > > > > > > the way to go. I think it's very confusing if we somehow >> make >> > > the >> > > > > > file >> > > > > > > to >> > > > > > > > > be part of the window. >> > > > > > > > > >> > > > > > > > > For count-based window, it's not implemented yet and >> you're >> > > > welcome >> > > > > > to >> > > > > > > > add >> > > > > > > > > that feature. In case of count-based windows, there woul= d >> be >> > no >> > > > > > notion >> > > > > > > of >> > > > > > > > > time and you probably only trigger at the end of each >> window. >> > > In >> > > > > the >> > > > > > > case >> > > > > > > > > of count-based windows, the watermark only matters for >> batch >> > > > since >> > > > > > you >> > > > > > > > need >> > > > > > > > > a way to know when the batch has ended (if the count is >> 10, >> > the >> > > > > > number >> > > > > > > of >> > > > > > > > > tuples in the batch is let's say 105, you need a way to >> end >> > the >> > > > > last >> > > > > > > > window >> > > > > > > > > with 5 tuples). >> > > > > > > > > >> > > > > > > > > David >> > > > > > > > > >> > > > > > > > > On Mon, Feb 27, 2017 at 2:41 AM, Bhupesh Chawda < >> > > > > > > bhupesh@datatorrent.com >> > > > > > > > > >> > > > > > > > > wrote: >> > > > > > > > > >> > > > > > > > > > Hi David, >> > > > > > > > > > >> > > > > > > > > > Thanks for your comments. >> > > > > > > > > > >> > > > > > > > > > The wordcount example that I created based on the >> windowed >> > > > > operator >> > > > > > > > does >> > > > > > > > > > processing of word counts per file (each file as a >> separate >> > > > > batch), >> > > > > > > > i.e. >> > > > > > > > > > process counts for each file and dump into separate >> files. >> > > > > > > > > > As I understand Global window is for one large batch; >> i.e. >> > > all >> > > > > > > incoming >> > > > > > > > > > data falls into the same batch. This could not be >> processed >> > > > using >> > > > > > > > > > GlobalWindow option as we need more than one windows. = In >> > this >> > > > > > case, I >> > > > > > > > > > configured the windowed operator to have time windows = of >> > 1ms >> > > > each >> > > > > > and >> > > > > > > > > > passed data for each file with increasing timestamps: >> > (file1, >> > > > 1), >> > > > > > > > (file2, >> > > > > > > > > > 2) and so on. Is there a better way of handling this >> > > scenario? >> > > > > > > > > > >> > > > > > > > > > Regarding (2 - count based windows), I think there is = a >> > > trigger >> > > > > > > option >> > > > > > > > to >> > > > > > > > > > process count based windows. In case I want to process >> > every >> > > > 1000 >> > > > > > > > tuples >> > > > > > > > > as >> > > > > > > > > > a batch, I could set the Trigger option to CountTrigge= r >> > with >> > > > the >> > > > > > > > > > accumulation set to Discarding. Is this correct? >> > > > > > > > > > >> > > > > > > > > > I agree that (4. Final Watermark) can be done using >> Global >> > > > > window. >> > > > > > > > > > >> > > > > > > > > > =E2=80=8B~ Bhupesh=E2=80=8B >> > > > > > > > > > >> > > > > > > > > > ______________________________________________________= _ >> > > > > > > > > > >> > > > > > > > > > Bhupesh Chawda >> > > > > > > > > > >> > > > > > > > > > E: bhupesh@datatorrent.com | Twitter: @bhupeshsc >> > > > > > > > > > >> > > > > > > > > > www.datatorrent.com | apex.apache.org >> > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > On Mon, Feb 27, 2017 at 12:18 PM, David Yan < >> > > > davidyan@gmail.com> >> > > > > > > > wrote: >> > > > > > > > > > >> > > > > > > > > > > I'm worried that we are making the watermark concept >> too >> > > > > > > complicated. >> > > > > > > > > > > >> > > > > > > > > > > Watermarks should simply just tell you what windows >> can >> > be >> > > > > > > considered >> > > > > > > > > > > complete. >> > > > > > > > > > > >> > > > > > > > > > > Point 2 is basically a count-based window. Watermark= s >> do >> > > not >> > > > > > play a >> > > > > > > > > role >> > > > > > > > > > > here because the window is always complete at the n-= th >> > > tuple. >> > > > > > > > > > > >> > > > > > > > > > > If I understand correctly, point 3 is for batch >> > processing >> > > of >> > > > > > > files. >> > > > > > > > > > Unless >> > > > > > > > > > > the files contain timed events, it sounds to be that >> this >> > > can >> > > > > be >> > > > > > > > > achieved >> > > > > > > > > > > with just a Global Window. For signaling EOF, a >> watermark >> > > > with >> > > > > a >> > > > > > > > > > +infinity >> > > > > > > > > > > timestamp can be used so that triggers will be fired >> upon >> > > > > receipt >> > > > > > > of >> > > > > > > > > that >> > > > > > > > > > > watermark. >> > > > > > > > > > > >> > > > > > > > > > > For point 4, just like what I mentioned above, can b= e >> > > > achieved >> > > > > > > with a >> > > > > > > > > > > watermark with a +infinity timestamp. >> > > > > > > > > > > >> > > > > > > > > > > David >> > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > On Sat, Feb 18, 2017 at 8:04 AM, Bhupesh Chawda < >> > > > > > > > > bhupesh@datatorrent.com >> > > > > > > > > > > >> > > > > > > > > > > wrote: >> > > > > > > > > > > >> > > > > > > > > > > > Hi Thomas, >> > > > > > > > > > > > >> > > > > > > > > > > > For an input operator which is supposed to generat= e >> > > > > watermarks >> > > > > > > for >> > > > > > > > > > > > downstream operators, I can think about the >> following >> > > > > > watermarks >> > > > > > > > that >> > > > > > > > > > the >> > > > > > > > > > > > operator can emit: >> > > > > > > > > > > > 1. Time based watermarks (the high watermark / low >> > > > watermark) >> > > > > > > > > > > > 2. Number of tuple based watermarks (Every n tuple= s) >> > > > > > > > > > > > 3. File based watermarks (Start file, end file) >> > > > > > > > > > > > 4. Final watermark >> > > > > > > > > > > > >> > > > > > > > > > > > File based watermarks seem to be applicable for >> batch >> > > (file >> > > > > > > based) >> > > > > > > > as >> > > > > > > > > > > well, >> > > > > > > > > > > > and hence I thought of looking at these first. Doe= s >> > this >> > > > seem >> > > > > > to >> > > > > > > be >> > > > > > > > > in >> > > > > > > > > > > line >> > > > > > > > > > > > with the thought process? >> > > > > > > > > > > > >> > > > > > > > > > > > ~ Bhupesh >> > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > ______________________________ >> > _________________________ >> > > > > > > > > > > > >> > > > > > > > > > > > Bhupesh Chawda >> > > > > > > > > > > > >> > > > > > > > > > > > Software Engineer >> > > > > > > > > > > > >> > > > > > > > > > > > E: bhupesh@datatorrent.com | Twitter: @bhupeshsc >> > > > > > > > > > > > >> > > > > > > > > > > > www.datatorrent.com | apex.apache.org >> > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > On Thu, Feb 16, 2017 at 10:37 AM, Thomas Weise < >> > > > > thw@apache.org >> > > > > > > >> > > > > > > > > wrote: >> > > > > > > > > > > > >> > > > > > > > > > > > > I don't think this should be designed based on a >> > > > simplistic >> > > > > > > file >> > > > > > > > > > > > > input-output scenario. It would be good to >> include a >> > > > > stateful >> > > > > > > > > > > > > transformation based on event time. >> > > > > > > > > > > > > >> > > > > > > > > > > > > More complex pipelines contain stateful >> > transformations >> > > > > that >> > > > > > > > depend >> > > > > > > > > > on >> > > > > > > > > > > > > windowing and watermarks. I think we need a >> watermark >> > > > > concept >> > > > > > > > that >> > > > > > > > > is >> > > > > > > > > > > > based >> > > > > > > > > > > > > on progress in event time (or other monotonic >> > > increasing >> > > > > > > > sequence) >> > > > > > > > > > that >> > > > > > > > > > > > > other operators can generically work with. >> > > > > > > > > > > > > >> > > > > > > > > > > > > Note that even file input in many cases can >> produce >> > > time >> > > > > > based >> > > > > > > > > > > > watermarks, >> > > > > > > > > > > > > for example when you read part files that are >> bound >> > by >> > > > > event >> > > > > > > > time. >> > > > > > > > > > > > > >> > > > > > > > > > > > > Thanks, >> > > > > > > > > > > > > Thomas >> > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > > On Wed, Feb 15, 2017 at 4:02 AM, Bhupesh Chawda = < >> > > > > > > > > > > bhupesh@datatorrent.com >> > > > > > > > > > > > > >> > > > > > > > > > > > > wrote: >> > > > > > > > > > > > > >> > > > > > > > > > > > > > For better understanding the use case for >> control >> > > > tuples >> > > > > in >> > > > > > > > > batch, >> > > > > > > > > > =E2=80=8BI >> > > > > > > > > > > > am >> > > > > > > > > > > > > > creating a prototype for a batch application >> using >> > > File >> > > > > > Input >> > > > > > > > and >> > > > > > > > > > > File >> > > > > > > > > > > > > > Output operators. >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > To enable basic batch processing for File IO >> > > > operators, I >> > > > > > am >> > > > > > > > > > > proposing >> > > > > > > > > > > > > the >> > > > > > > > > > > > > > following changes to File input and output >> > operators: >> > > > > > > > > > > > > > 1. File Input operator emits a watermark each >> time >> > it >> > > > > opens >> > > > > > > and >> > > > > > > > > > > closes >> > > > > > > > > > > > a >> > > > > > > > > > > > > > file. These can be "start file" and "end file" >> > > > watermarks >> > > > > > > which >> > > > > > > > > > > include >> > > > > > > > > > > > > the >> > > > > > > > > > > > > > corresponding file names. The "start file" tup= le >> > > should >> > > > > be >> > > > > > > sent >> > > > > > > > > > > before >> > > > > > > > > > > > > any >> > > > > > > > > > > > > > of the data from that file flows. >> > > > > > > > > > > > > > 2. File Input operator can be configured to en= d >> the >> > > > > > > application >> > > > > > > > > > > after a >> > > > > > > > > > > > > > single or n scans of the directory (a batch). >> This >> > is >> > > > > where >> > > > > > > the >> > > > > > > > > > > > operator >> > > > > > > > > > > > > > emits the final watermark (the end of >> application >> > > > control >> > > > > > > > tuple). >> > > > > > > > > > > This >> > > > > > > > > > > > > will >> > > > > > > > > > > > > > also shutdown the application. >> > > > > > > > > > > > > > 3. The File output operator handles these >> control >> > > > tuples. >> > > > > > > > "Start >> > > > > > > > > > > file" >> > > > > > > > > > > > > > initializes the file name for the incoming >> tuples. >> > > "End >> > > > > > file" >> > > > > > > > > > > watermark >> > > > > > > > > > > > > > forces a finalize on that file. >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > The user would be able to enable the operators >> to >> > > send >> > > > > only >> > > > > > > > those >> > > > > > > > > > > > > > watermarks that are needed in the application. >> If >> > > none >> > > > of >> > > > > > the >> > > > > > > > > > options >> > > > > > > > > > > > are >> > > > > > > > > > > > > > configured, the operators behave as in a >> streaming >> > > > > > > application. >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > There are a few challenges in the implementati= on >> > > where >> > > > > the >> > > > > > > > input >> > > > > > > > > > > > operator >> > > > > > > > > > > > > > is partitioned. In this case, the correlation >> > between >> > > > the >> > > > > > > > > start/end >> > > > > > > > > > > > for a >> > > > > > > > > > > > > > file and the data tuples for that file is lost= . >> > Hence >> > > > we >> > > > > > need >> > > > > > > > to >> > > > > > > > > > > > maintain >> > > > > > > > > > > > > > the filename as part of each tuple in the >> pipeline. >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > The "start file" and "end file" control tuples >> in >> > > this >> > > > > > > example >> > > > > > > > > are >> > > > > > > > > > > > > > temporary names for watermarks. We can have >> generic >> > > > > "start >> > > > > > > > > batch" / >> > > > > > > > > > > > "end >> > > > > > > > > > > > > > batch" tuples which could be used for other us= e >> > cases >> > > > as >> > > > > > > well. >> > > > > > > > > The >> > > > > > > > > > > > Final >> > > > > > > > > > > > > > watermark is common and serves the same purpos= e >> in >> > > each >> > > > > > case. >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > Please let me know your thoughts on this. >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > ~ Bhupesh >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > On Wed, Jan 18, 2017 at 12:22 AM, Bhupesh >> Chawda < >> > > > > > > > > > > > > bhupesh@datatorrent.com> >> > > > > > > > > > > > > > wrote: >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > > Yes, this can be part of operator >> configuration. >> > > > Given >> > > > > > > this, >> > > > > > > > > for >> > > > > > > > > > a >> > > > > > > > > > > > user >> > > > > > > > > > > > > > to >> > > > > > > > > > > > > > > define a batch application, would mean >> > configuring >> > > > the >> > > > > > > > > connectors >> > > > > > > > > > > > > (mostly >> > > > > > > > > > > > > > > the input operator) in the application for t= he >> > > > desired >> > > > > > > > > behavior. >> > > > > > > > > > > > > > Similarly, >> > > > > > > > > > > > > > > there can be other use cases that can be >> achieved >> > > > other >> > > > > > > than >> > > > > > > > > > batch. >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > We may also need to take care of the >> following: >> > > > > > > > > > > > > > > 1. Make sure that the watermarks or control >> > tuples >> > > > are >> > > > > > > > > consistent >> > > > > > > > > > > > > across >> > > > > > > > > > > > > > > sources. Meaning an HDFS sink should be able >> to >> > > > > interpret >> > > > > > > the >> > > > > > > > > > > > watermark >> > > > > > > > > > > > > > > tuple sent out by, say, a JDBC source. >> > > > > > > > > > > > > > > 2. In addition to I/O connectors, we should >> also >> > > look >> > > > > at >> > > > > > > the >> > > > > > > > > need >> > > > > > > > > > > for >> > > > > > > > > > > > > > > processing operators to understand some of t= he >> > > > control >> > > > > > > > tuples / >> > > > > > > > > > > > > > watermarks. >> > > > > > > > > > > > > > > For example, we may want to reset the operat= or >> > > > behavior >> > > > > > on >> > > > > > > > > > arrival >> > > > > > > > > > > of >> > > > > > > > > > > > > > some >> > > > > > > > > > > > > > > watermark tuple. >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > ~ Bhupesh >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > On Tue, Jan 17, 2017 at 9:59 PM, Thomas Weis= e >> < >> > > > > > > > thw@apache.org> >> > > > > > > > > > > > wrote: >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> The HDFS source can operate in two modes, >> > bounded >> > > or >> > > > > > > > > unbounded. >> > > > > > > > > > If >> > > > > > > > > > > > you >> > > > > > > > > > > > > > >> scan >> > > > > > > > > > > > > > >> only once, then it should emit the final >> > watermark >> > > > > after >> > > > > > > it >> > > > > > > > is >> > > > > > > > > > > done. >> > > > > > > > > > > > > > >> Otherwise it would emit watermarks based on= a >> > > policy >> > > > > > > (files >> > > > > > > > > > names >> > > > > > > > > > > > > etc.). >> > > > > > > > > > > > > > >> The mechanism to generate the marks may >> depend >> > on >> > > > the >> > > > > > type >> > > > > > > > of >> > > > > > > > > > > source >> > > > > > > > > > > > > and >> > > > > > > > > > > > > > >> the user needs to be able to >> influence/configure >> > > it. >> > > > > > > > > > > > > > >> >> > > > > > > > > > > > > > >> Thomas >> > > > > > > > > > > > > > >> >> > > > > > > > > > > > > > >> >> > > > > > > > > > > > > > >> On Tue, Jan 17, 2017 at 5:03 AM, Bhupesh >> Chawda >> > < >> > > > > > > > > > > > > > bhupesh@datatorrent.com> >> > > > > > > > > > > > > > >> wrote: >> > > > > > > > > > > > > > >> >> > > > > > > > > > > > > > >> > Hi Thomas, >> > > > > > > > > > > > > > >> > >> > > > > > > > > > > > > > >> > I am not sure that I completely understan= d >> > your >> > > > > > > > suggestion. >> > > > > > > > > > Are >> > > > > > > > > > > > you >> > > > > > > > > > > > > > >> > suggesting to broaden the scope of the >> > proposal >> > > to >> > > > > > treat >> > > > > > > > all >> > > > > > > > > > > > sources >> > > > > > > > > > > > > > as >> > > > > > > > > > > > > > >> > bounded as well as unbounded? >> > > > > > > > > > > > > > >> > >> > > > > > > > > > > > > > >> > In case of Apex, we treat all sources as >> > > unbounded >> > > > > > > > sources. >> > > > > > > > > > Even >> > > > > > > > > > > > > > bounded >> > > > > > > > > > > > > > >> > sources like HDFS file source is treated = as >> > > > > unbounded >> > > > > > by >> > > > > > > > > means >> > > > > > > > > > > of >> > > > > > > > > > > > > > >> scanning >> > > > > > > > > > > > > > >> > the input directory repeatedly. >> > > > > > > > > > > > > > >> > >> > > > > > > > > > > > > > >> > Let's consider HDFS file source for >> example: >> > > > > > > > > > > > > > >> > In this case, if we treat it as a bounded >> > > source, >> > > > we >> > > > > > can >> > > > > > > > > > define >> > > > > > > > > > > > > hooks >> > > > > > > > > > > > > > >> which >> > > > > > > > > > > > > > >> > allows us to detect the end of the file a= nd >> > send >> > > > the >> > > > > > > > "final >> > > > > > > > > > > > > > watermark". >> > > > > > > > > > > > > > >> We >> > > > > > > > > > > > > > >> > could also consider HDFS file source as a >> > > > streaming >> > > > > > > source >> > > > > > > > > and >> > > > > > > > > > > > > define >> > > > > > > > > > > > > > >> hooks >> > > > > > > > > > > > > > >> > which send watermarks based on different >> kinds >> > > of >> > > > > > > windows. >> > > > > > > > > > > > > > >> > >> > > > > > > > > > > > > > >> > Please correct me if I misunderstand. >> > > > > > > > > > > > > > >> > >> > > > > > > > > > > > > > >> > ~ Bhupesh >> > > > > > > > > > > > > > >> > >> > > > > > > > > > > > > > >> > >> > > > > > > > > > > > > > >> > On Mon, Jan 16, 2017 at 9:23 PM, Thomas >> Weise >> > < >> > > > > > > > > thw@apache.org >> > > > > > > > > > > >> > > > > > > > > > > > > wrote: >> > > > > > > > > > > > > > >> > >> > > > > > > > > > > > > > >> > > Bhupesh, >> > > > > > > > > > > > > > >> > > >> > > > > > > > > > > > > > >> > > Please see how that can be solved in a >> > unified >> > > > way >> > > > > > > using >> > > > > > > > > > > windows >> > > > > > > > > > > > > and >> > > > > > > > > > > > > > >> > > watermarks. It is bounded data vs. >> unbounded >> > > > data. >> > > > > > In >> > > > > > > > Beam >> > > > > > > > > > for >> > > > > > > > > > > > > > >> example, >> > > > > > > > > > > > > > >> > you >> > > > > > > > > > > > > > >> > > can use the "global window" and the fin= al >> > > > > watermark >> > > > > > to >> > > > > > > > > > > > accomplish >> > > > > > > > > > > > > > what >> > > > > > > > > > > > > > >> > you >> > > > > > > > > > > > > > >> > > are looking for. Batch is just a specia= l >> > case >> > > of >> > > > > > > > streaming >> > > > > > > > > > > where >> > > > > > > > > > > > > the >> > > > > > > > > > > > > > >> > source >> > > > > > > > > > > > > > >> > > emits the final watermark. >> > > > > > > > > > > > > > >> > > >> > > > > > > > > > > > > > >> > > Thanks, >> > > > > > > > > > > > > > >> > > Thomas >> > > > > > > > > > > > > > >> > > >> > > > > > > > > > > > > > >> > > >> > > > > > > > > > > > > > >> > > On Mon, Jan 16, 2017 at 1:02 AM, Bhupes= h >> > > Chawda >> > > > < >> > > > > > > > > > > > > > >> bhupesh@datatorrent.com >> > > > > > > > > > > > > > >> > > >> > > > > > > > > > > > > > >> > > wrote: >> > > > > > > > > > > > > > >> > > >> > > > > > > > > > > > > > >> > > > Yes, if the user needs to develop a >> batch >> > > > > > > application, >> > > > > > > > > > then >> > > > > > > > > > > > > batch >> > > > > > > > > > > > > > >> aware >> > > > > > > > > > > > > > >> > > > operators need to be used in the >> > > application. >> > > > > > > > > > > > > > >> > > > The nature of the application is most= ly >> > > > > controlled >> > > > > > > by >> > > > > > > > > the >> > > > > > > > > > > > input >> > > > > > > > > > > > > > and >> > > > > > > > > > > > > > >> the >> > > > > > > > > > > > > > >> > > > output operators used in the >> application. >> > > > > > > > > > > > > > >> > > > >> > > > > > > > > > > > > > >> > > > For example, consider an application >> which >> > > > needs >> > > > > > to >> > > > > > > > > filter >> > > > > > > > > > > > > records >> > > > > > > > > > > > > > >> in a >> > > > > > > > > > > > > > >> > > > input file and store the filtered >> records >> > in >> > > > > > another >> > > > > > > > > file. >> > > > > > > > > > > The >> > > > > > > > > > > > > > >> nature >> > > > > > > > > > > > > > >> > of >> > > > > > > > > > > > > > >> > > > this app is to end once the entire >> file is >> > > > > > > processed. >> > > > > > > > > > > > Following >> > > > > > > > > > > > > > >> things >> > > > > > > > > > > > > > >> > > are >> > > > > > > > > > > > > > >> > > > expected of the application: >> > > > > > > > > > > > > > >> > > > >> > > > > > > > > > > > > > >> > > > 1. Once the input data is over, >> > finalize >> > > > the >> > > > > > > output >> > > > > > > > > > file >> > > > > > > > > > > > from >> > > > > > > > > > > > > > >> .tmp >> > > > > > > > > > > > > > >> > > > files. - Responsibility of output >> > > operator >> > > > > > > > > > > > > > >> > > > 2. End the application, once the >> data >> > is >> > > > read >> > > > > > and >> > > > > > > > > > > > processed - >> > > > > > > > > > > > > > >> > > > Responsibility of input operator >> > > > > > > > > > > > > > >> > > > >> > > > > > > > > > > > > > >> > > > These functions are essential to allo= w >> the >> > > > user >> > > > > to >> > > > > > > do >> > > > > > > > > > higher >> > > > > > > > > > > > > level >> > > > > > > > > > > > > > >> > > > operations like scheduling or running= a >> > > > workflow >> > > > > > of >> > > > > > > > > batch >> > > > > > > > > > > > > > >> applications. >> > > > > > > > > > > > > > >> > > > >> > > > > > > > > > > > > > >> > > > I am not sure about intermediate >> > > (processing) >> > > > > > > > operators, >> > > > > > > > > > as >> > > > > > > > > > > > > there >> > > > > > > > > > > > > > >> is no >> > > > > > > > > > > > > > >> > > > change in their functionality for bat= ch >> > use >> > > > > cases. >> > > > > > > > > > Perhaps, >> > > > > > > > > > > > > > allowing >> > > > > > > > > > > > > > >> > > > running multiple batches in a single >> > > > application >> > > > > > may >> > > > > > > > > > require >> > > > > > > > > > > > > > similar >> > > > > > > > > > > > > > >> > > > changes in processing operators as >> well. >> > > > > > > > > > > > > > >> > > > >> > > > > > > > > > > > > > >> > > > ~ Bhupesh >> > > > > > > > > > > > > > >> > > > >> > > > > > > > > > > > > > >> > > > On Mon, Jan 16, 2017 at 2:19 PM, >> Priyanka >> > > > > Gugale < >> > > > > > > > > > > > > > priyag@apache.org >> > > > > > > > > > > > > > >> > >> > > > > > > > > > > > > > >> > > > wrote: >> > > > > > > > > > > > > > >> > > > >> > > > > > > > > > > > > > >> > > > > Will it make an impression on user >> that, >> > > if >> > > > he >> > > > > > > has a >> > > > > > > > > > batch >> > > > > > > > > > > > > > >> usecase he >> > > > > > > > > > > > > > >> > > has >> > > > > > > > > > > > > > >> > > > > to use batch aware operators only? = If >> > so, >> > > is >> > > > > > that >> > > > > > > > what >> > > > > > > > > > we >> > > > > > > > > > > > > > expect? >> > > > > > > > > > > > > > >> I >> > > > > > > > > > > > > > >> > am >> > > > > > > > > > > > > > >> > > > not >> > > > > > > > > > > > > > >> > > > > aware of how do we implement batch >> > > scenario >> > > > so >> > > > > > > this >> > > > > > > > > > might >> > > > > > > > > > > > be a >> > > > > > > > > > > > > > >> basic >> > > > > > > > > > > > > > >> > > > > question. >> > > > > > > > > > > > > > >> > > > > >> > > > > > > > > > > > > > >> > > > > -Priyanka >> > > > > > > > > > > > > > >> > > > > >> > > > > > > > > > > > > > >> > > > > On Mon, Jan 16, 2017 at 12:02 PM, >> > Bhupesh >> > > > > > Chawda < >> > > > > > > > > > > > > > >> > > > bhupesh@datatorrent.com> >> > > > > > > > > > > > > > >> > > > > wrote: >> > > > > > > > > > > > > > >> > > > > >> > > > > > > > > > > > > > >> > > > > > Hi All, >> > > > > > > > > > > > > > >> > > > > > >> > > > > > > > > > > > > > >> > > > > > While design / implementation for >> > custom >> > > > > > control >> > > > > > > > > > tuples >> > > > > > > > > > > is >> > > > > > > > > > > > > > >> > ongoing, I >> > > > > > > > > > > > > > >> > > > > > thought it would be a good idea t= o >> > > > consider >> > > > > > its >> > > > > > > > > > > usefulness >> > > > > > > > > > > > > in >> > > > > > > > > > > > > > >> one >> > > > > > > > > > > > > > >> > of >> > > > > > > > > > > > > > >> > > > the >> > > > > > > > > > > > > > >> > > > > > use cases - batch applications. >> > > > > > > > > > > > > > >> > > > > > >> > > > > > > > > > > > > > >> > > > > > This is a proposal to adapt / >> extend >> > > > > existing >> > > > > > > > > > operators >> > > > > > > > > > > in >> > > > > > > > > > > > > the >> > > > > > > > > > > > > > >> > Apache >> > > > > > > > > > > > > > >> > > > > Apex >> > > > > > > > > > > > > > >> > > > > > Malhar library so that it is easy >> to >> > use >> > > > > them >> > > > > > in >> > > > > > > > > batch >> > > > > > > > > > > use >> > > > > > > > > > > > > > >> cases. >> > > > > > > > > > > > > > >> > > > > > Naturally, this would be applicab= le >> > for >> > > > > only a >> > > > > > > > > subset >> > > > > > > > > > of >> > > > > > > > > > > > > > >> operators >> > > > > > > > > > > > > > >> > > like >> > > > > > > > > > > > > > >> > > > > > File, JDBC and NoSQL databases. >> > > > > > > > > > > > > > >> > > > > > For example, for a file based >> store, >> > > (say >> > > > > HDFS >> > > > > > > > > store), >> > > > > > > > > > > we >> > > > > > > > > > > > > > could >> > > > > > > > > > > > > > >> > have >> > > > > > > > > > > > > > >> > > > > > FileBatchInput and FileBatchOutpu= t >> > > > operators >> > > > > > > which >> > > > > > > > > > allow >> > > > > > > > > > > > > easy >> > > > > > > > > > > > > > >> > > > integration >> > > > > > > > > > > > > > >> > > > > > into a batch application. These >> > > operators >> > > > > > would >> > > > > > > be >> > > > > > > > > > > > extended >> > > > > > > > > > > > > > from >> > > > > > > > > > > > > > >> > > their >> > > > > > > > > > > > > > >> > > > > > existing implementations and woul= d >> be >> > > > "Batch >> > > > > > > > Aware", >> > > > > > > > > > in >> > > > > > > > > > > > that >> > > > > > > > > > > > > > >> they >> > > > > > > > > > > > > > >> > may >> > > > > > > > > > > > > > >> > > > > > understand the meaning of some >> > specific >> > > > > > control >> > > > > > > > > tuples >> > > > > > > > > > > > that >> > > > > > > > > > > > > > flow >> > > > > > > > > > > > > > >> > > > through >> > > > > > > > > > > > > > >> > > > > > the DAG. Start batch and end batc= h >> > seem >> > > to >> > > > > be >> > > > > > > the >> > > > > > > > > > > obvious >> > > > > > > > > > > > > > >> > candidates >> > > > > > > > > > > > > > >> > > > that >> > > > > > > > > > > > > > >> > > > > > come to mind. On receipt of such >> > control >> > > > > > tuples, >> > > > > > > > > they >> > > > > > > > > > > may >> > > > > > > > > > > > > try >> > > > > > > > > > > > > > to >> > > > > > > > > > > > > > >> > > modify >> > > > > > > > > > > > > > >> > > > > the >> > > > > > > > > > > > > > >> > > > > > behavior of the operator - to >> > > reinitialize >> > > > > > some >> > > > > > > > > > metrics >> > > > > > > > > > > or >> > > > > > > > > > > > > > >> finalize >> > > > > > > > > > > > > > >> > > an >> > > > > > > > > > > > > > >> > > > > > output file for example. >> > > > > > > > > > > > > > >> > > > > > >> > > > > > > > > > > > > > >> > > > > > We can discuss the potential >> control >> > > > tuples >> > > > > > and >> > > > > > > > > > actions >> > > > > > > > > > > in >> > > > > > > > > > > > > > >> detail, >> > > > > > > > > > > > > > >> > > but >> > > > > > > > > > > > > > >> > > > > > first I would like to understand >> the >> > > views >> > > > > of >> > > > > > > the >> > > > > > > > > > > > community >> > > > > > > > > > > > > > for >> > > > > > > > > > > > > > >> > this >> > > > > > > > > > > > > > >> > > > > > proposal. >> > > > > > > > > > > > > > >> > > > > > >> > > > > > > > > > > > > > >> > > > > > ~ Bhupesh >> > > > > > > > > > > > > > >> > > > > > >> > > > > > > > > > > > > > >> > > > > >> > > > > > > > > > > > > > >> > > > >> > > > > > > > > > > > > > >> > > >> > > > > > > > > > > > > > >> > >> > > > > > > > > > > > > > >> >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >> > > --94eb2c12359274d937054b648e5d--