Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id A5AC9200BD2 for ; Sat, 3 Dec 2016 19:46:18 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id A47EB160B16; Sat, 3 Dec 2016 18:46:18 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 9DA87160B0F for ; Sat, 3 Dec 2016 19:46:17 +0100 (CET) Received: (qmail 85823 invoked by uid 500); 3 Dec 2016 18:46:16 -0000 Mailing-List: contact users-help@apex.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: users@apex.apache.org Delivered-To: mailing list users@apex.apache.org Received: (qmail 85813 invoked by uid 99); 3 Dec 2016 18:46:16 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 03 Dec 2016 18:46:16 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id 4A09FC1FB8 for ; Sat, 3 Dec 2016 18:46:16 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 2.499 X-Spam-Level: ** X-Spam-Status: No, score=2.499 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, HTML_MESSAGE=2, RCVD_IN_DNSWL_NONE=-0.0001, RCVD_IN_MSPIKE_H2=-0.001, RCVD_IN_SORBS_SPAM=0.5, SPF_PASS=-0.001, URIBL_BLOCKED=0.001] autolearn=disabled Authentication-Results: spamd1-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=datatorrent-com.20150623.gappssmtp.com Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id zxzPTOhic30t for ; Sat, 3 Dec 2016 18:46:13 +0000 (UTC) Received: from mail-io0-f180.google.com (mail-io0-f180.google.com [209.85.223.180]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with ESMTPS id 54F975F2EF for ; Sat, 3 Dec 2016 18:46:13 +0000 (UTC) Received: by mail-io0-f180.google.com with SMTP id c21so491302897ioj.1 for ; Sat, 03 Dec 2016 10:46:13 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=datatorrent-com.20150623.gappssmtp.com; s=20150623; h=mime-version:in-reply-to:references:from:date:message-id:subject:to; bh=1PUGQCnglMYJORvtngfpv2pDVUCutpsXxWIEv69zMbk=; b=sHMj0U7w8gdv7S/LWNCw9c5w7oKTaiDH0twqPB3/kJGJ/opskOr5AwfAP8fNBSR+d+ Vqlru+1z6Wx8/6SXHH1z7GizUqm53/Zt4wpja8p0CgxjfMrfa2w5l8LB5iqzeEdvxxek osU41QhlSJyWJLI4H3HmuOngrB/loVXAFrP6zkRlSJHwX8tedH4hHhMIT3nrgm6Hb8iY KhJRkGKETIuDbSYUvg664BpXBnnxiHWOvuDWcir+76y45U7zSRRfPOJCrkMipDA7WBBY XsI8+eB+1pNNjWaGMwCtsZzBj2wFiqmB8rg/fkNcKcd2jAhR/CgPcoUDPT/Cjy2dy3xO DbvA== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20130820; h=x-gm-message-state:mime-version:in-reply-to:references:from:date :message-id:subject:to; bh=1PUGQCnglMYJORvtngfpv2pDVUCutpsXxWIEv69zMbk=; b=m6v3RU5U3SGIAoOUzdxHd/WzIj935SnjxpDWYWmx5fb/kNDDLTCQm68dy7w+ckI8P3 mtgZDGmXfUBrXeMEy4cxJlbexOcuw+mWnxbtqdapebcgCGrBV2jygwgaL0U/288JCZ33 mtk3LY1KzIqxMgRoFpyV0dvaWz2J37T/gc5Kzbl5kCk/t2BPbWz+9mCKLkmiTwAt+/8R Xucs9GdHXB+7vLB3+nj43BSd3so3cdafaHIYZOWDwcmbHua3hS8NsRyEQt5ALiqyckrZ 1AS8XKebDO2LI5FyZiIG7FUFANz1qQXPp+LwQVdO4zdMnP67of57HkemBnvLp4aAfzRl wIBg== X-Gm-Message-State: AKaTC026YkosQflUFnXrhRhykl9oZTGyZbAHgDyM/m2Wj59t3LHKVYrXShorJz7fpvyCkq3OwjJW6BajySm5SVYM X-Received: by 10.36.160.69 with SMTP id o66mr2455662ite.123.1480790766090; Sat, 03 Dec 2016 10:46:06 -0800 (PST) MIME-Version: 1.0 Received: by 10.107.18.200 with HTTP; Sat, 3 Dec 2016 10:46:05 -0800 (PST) In-Reply-To: References: From: Munagala Ramanath Date: Sat, 3 Dec 2016 10:46:05 -0800 Message-ID: Subject: Re: Query To: users@apex.apache.org Content-Type: multipart/alternative; boundary=94eb2c03d3e09992630542c5773f archived-at: Sat, 03 Dec 2016 18:46:18 -0000 --94eb2c03d3e09992630542c5773f Content-Type: text/plain; charset=UTF-8 To further clarify Bhupesh's comment, suppose you determine in window N in the input operator the data reading phase is complete and send the control tuple on the dedicated port to the output operator in window N+1. If the downstream operators (including the output operator) P_i are processing respective windows W_i the output operator will not actually see that control tuple until all the W_i have reached N+1. Another option is to use the OperatorRequest mechanism to communicate among the operators out-of-band; an example is at: https://github.com/DataTorrent/examples/tree/master/tutorials/throttle That example shows how to modulate the speed of upstream operators but it can be adapted for your scenario by checking and recording the "completion status" of all the operators. Ram On Sat, Dec 3, 2016 at 5:10 AM, Bhupesh Chawda wrote: > Hi Vishal, > > A window is processed by an operator only when the previous window is > completely processed. When you send the control tuple in a new window, you > can be sure that all previous windows have been processed. > That is the reason I asked you to send the control tuple in a new window. > > For shutdown, you can try throwing a ShutdownException() from the input > operator. This will propagate through the entire Dag and shutdown all the > operators in sequence. > > ~ Bhupesh > > On Dec 3, 2016 18:15, "Vishal Agrawal" > wrote: > >> Thank you Bhupesh. >> >> Another catch is just because input operator has processed last record >> doesn't mean all the intermediate operators have processed it as well. How >> can I ensure that all the operators have processed all the records before >> performing the write operation. >> >> Also is there a way to shutdown the dag programmatically once it has >> performed the write operation. >> >> >> Thanks, >> Vishal >> >> >> On Fri, Dec 2, 2016 at 11:11 PM Bhupesh Chawda >> wrote: >> >>> Hi Vishal, >>> >>> The support for such operations is currently being enhanced in Apex. >>> >>> For now, you can do the following: >>> - Have an additional output port in your input operator as well as an >>> input port in the "Writer" operator. >>> - Once the Input operator has read and emitted all the data that it >>> wanted to, you can send a tuple on the new port that you have created. This >>> tuple will act as your signal. Make sure to do this in a new window - >>> ideally if the input is done in window x, send this tuple in window x+1. >>> - When you receive this tuple on the Writer operator, you can perform >>> the write operation on the external system. >>> >>> ~ Bhupesh >>> >>> On Sat, Dec 3, 2016 at 3:56 AM, Vishal Agrawal < >>> vishal.agrawal123@gmail.com> wrote: >>> >>> Hi, >>> >>> I am performing a batch operation. My input operator is reading multiple >>> files line by line and then there are bunch of operators manipulating the >>> records to evaluate result. >>> My output operator is supposed to write the final result to external >>> system once all the records from each of the files are processed. >>> >>> On completion of reading all the files, how can I trigger an event which >>> will inform my output operator to perform the write operation on external >>> system. >>> >>> >>> Thanks, >>> Vishal >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> --94eb2c03d3e09992630542c5773f Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable
To further clarify Bhupesh's comment, suppose you dete= rmine in window N in the input operator the
data reading phase is compl= ete and send the control tuple on the dedicated port to the output
operator in window N+1. If the downstream operators (including the output= operator) P_i are
processing respective windows W_i the output o= perator will
not actually see that control tuple until all the W_i have= reached N+1.

Another option is to use the Operato= rRequest mechanism to communicate among the operators
out-of-band= ; an example is at:

That exa= mple shows how to modulate the speed of upstream operators but it can be ad= apted for
your scenario by checking and recording the "compl= etion status" of all the operators.

Ram
=

On Sa= t, Dec 3, 2016 at 5:10 AM, Bhupesh Chawda <bhupesh@datatorrent.com> wrote:

Hi Vi= shal,

A window is processed by an operator only when the previous = window is completely processed. When you send the control tuple in a new wi= ndow, you can be sure that all previous windows have been processed.
That is the reason I asked you to send the control tuple in a new window.

For shutdown, you can try throwing a=C2=A0 ShutdownException= () from the input operator. This will propagate through the entire Dag and = shutdown all the operators in sequence.

~ Bhupesh


On Dec 3, 2016 18= :15, "Vishal Agrawal" <vishal.agrawal123@gmail.com> wrote:
Thank you Bhupesh.

Another catch is just because input operator has pro= cessed last record doesn't mean all the intermediate operators have pro= cessed it as well. How can I ensure that all the operators have processed a= ll the records before performing the write operation.

<= div>Also is there a way to shutdown the dag programmatically once it has pe= rformed the write operation.


Thanks= ,
Vishal=C2=A0


On Fri, Dec 2, 2016 at 11:11 PM Bhupesh Chawda <bhupesh@datatorrent.com= > wrote:
Hi Vishal,

The support for such operations is currently being enhanced in = Apex.
For= now, you can do the following:
=C2=A0- Have an additional output port in you= r input operator as well as an input port in the "Writer" operato= r.
=C2=A0- Once the Input operator has read and emitted all the data that it = wanted to, you can send a tuple on the new port that you have created. This= tuple will act as your signal. Make sure to do this in a new window - idea= lly if the input is done in window x, send this tuple in window x+1.
=C2=A0- = When you receive this tuple on the Writer operator, you can perform the wri= te operation on the external system.

~ Bhupesh

On Sat, Dec 3, 2016 at 3:56 AM, Vi= shal Agrawal <vishal.ag= rawal123@gmail.com> wrote:
Hi,

I am performing a batch operation. My input oper= ator is reading multiple files line by line and then there are bunch of ope= rators manipulating the records to evaluate result.
My output operator is su= pposed to write the final result to external system once all the records fr= om each of the files are processed.

On completion of reading all the files, how can= I trigger an event which will inform my output operator to perform the wri= te operation on external system.


Thanks,
Vishal










--94eb2c03d3e09992630542c5773f--