Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id D915B200BE3 for ; Thu, 8 Dec 2016 04:47:40 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id D7AEB160B26; Thu, 8 Dec 2016 03:47:40 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 5F2C9160B0C for ; Thu, 8 Dec 2016 04:47:39 +0100 (CET) Received: (qmail 5863 invoked by uid 500); 8 Dec 2016 03:47:38 -0000 Mailing-List: contact users-help@apex.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: users@apex.apache.org Delivered-To: mailing list users@apex.apache.org Received: (qmail 5851 invoked by uid 99); 8 Dec 2016 03:47:38 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 08 Dec 2016 03:47:38 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id C7063182132 for ; Thu, 8 Dec 2016 03:47:37 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 3.33 X-Spam-Level: *** X-Spam-Status: No, score=3.33 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, FREEMAIL_ENVFROM_END_DIGIT=0.25, HTML_MESSAGE=2, KAM_LINEPADDING=1.2, RCVD_IN_DNSWL_NONE=-0.0001, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, SPF_PASS=-0.001, URIBL_BLOCKED=0.001] autolearn=disabled Authentication-Results: spamd3-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id 3Rk8Bv-pvc1e for ; Thu, 8 Dec 2016 03:47:35 +0000 (UTC) Received: from mail-oi0-f42.google.com (mail-oi0-f42.google.com [209.85.218.42]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with ESMTPS id A6D675F5C6 for ; Thu, 8 Dec 2016 03:47:29 +0000 (UTC) Received: by mail-oi0-f42.google.com with SMTP id y198so441205148oia.1 for ; Wed, 07 Dec 2016 19:47:29 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20120113; h=mime-version:in-reply-to:references:from:date:message-id:subject:to; bh=B+4UXySFKi03kA7Y0S2YXtEx04S5Slr5wsKtOu76BrI=; b=yZCqFKPVq34dklvQWOifU9dHgRMwmIL/zQgWk1YHZgNNljUUudocMjf54RmSMvszzj vbIKSQnt+6iI0RuSJ5TeAYtouoUpzujrzHXMMEZY54z5/zzfliuv87aOZhnmOd4TxAW6 kJ1wf8PJ+syICt/gk3uaSAXYDgKWxvvH0qUkbjP+I6cCJ2DccvuogOwbyt4uvvJTjU4T QntAz6eQq/GYw1d0PHuHk2oajXC+6eBYzJqjyF1znSedraRuSRKiieWZxfWLOfVp11oh T4C1NEJAHeIZvhM17orrPCr167HTUYYIGmryCusqT6i84ycUfFvWmQMTGw5T46cErhHK 9z+A== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20130820; h=x-gm-message-state:mime-version:in-reply-to:references:from:date :message-id:subject:to; bh=B+4UXySFKi03kA7Y0S2YXtEx04S5Slr5wsKtOu76BrI=; b=JLxgsvX+e553jR8xDNNXW4tle8SArfOsOqRr4GyUbR7+rkI2cfmIgQXLWNUkufxUjK jopmE5H0gwb3BaGFqHo7cMPNOWxZAP2kUQ8M4dYHWAzNaHx4eGmLZzjKfWPycmH2ubmp 451RaXX+129BpRbijYjQKkOW/cwpJS5gQSnZ2WMEFYzVkypW4tsrPbeGmd8N75pMCyPJ KIDTFiUg9mVJDUm4BLNIJxmvQoISvk44qtn2f+73t8aonFwihEmE+XfJqL0Xoipa1+Uw 50BHsP6tYUGamdZhxALXK/qUg3l7nForLMWoEz1Kx/0WApddHSdjcaDkS8WqJvOS+m6X bXQQ== X-Gm-Message-State: AKaTC01CYZgQrgB/JbY84n3kdzpsUTfvSRNVUp0s0LI6x4xLJ0j4PBFrKVmCnSp1hzpNvyI/QS2O65NNs0xvaw== X-Received: by 10.157.31.103 with SMTP id x36mr42818978otx.137.1481168788217; Wed, 07 Dec 2016 19:46:28 -0800 (PST) MIME-Version: 1.0 Received: by 10.182.38.161 with HTTP; Wed, 7 Dec 2016 19:46:27 -0800 (PST) In-Reply-To: References: From: Vishal Agrawal Date: Wed, 7 Dec 2016 22:46:27 -0500 Message-ID: Subject: Re: Query To: users@apex.apache.org Content-Type: multipart/alternative; boundary=001a113e2b5c79643205431d7b9e archived-at: Thu, 08 Dec 2016 03:47:41 -0000 --001a113e2b5c79643205431d7b9e Content-Type: text/plain; charset=UTF-8 Thank you everyone for your support !! On Tue, Dec 6, 2016 at 9:57 PM, Vlad Rozov wrote: > I'd recommend to use additional output port solution outlined by Bhupesh. > There are few Apex applications on the field that leverage that solution. > > Thank you, > > Vlad > > > On 12/4/16 11:45, Vishal Agrawal wrote: > > Thank you Bhupesh and Ram. Appreciate your quick response. > > I see ThrottlingStatsListener.processStats() method gets called whenever > new stats are received from the operators. > How frequently these stats are sent by the operators? Is it end of every > window? > > > Thanks, > Vishal > > > > On Sat, Dec 3, 2016 at 1:46 PM, Munagala Ramanath > wrote: > > To further clarify Bhupesh's comment, suppose you determine in window N in > the input operator the > data reading phase is complete and send the control tuple on the dedicated > port to the output > operator in window N+1. If the downstream operators (including the output > operator) P_i are > processing respective windows W_i the output operator will > not actually see that control tuple until all the W_i have reached N+1. > > Another option is to use the OperatorRequest mechanism to communicate > among the operators > out-of-band; an example is at: > https://github.com/DataTorrent/examples/tree/master/tutorials/throttle > > That example shows how to modulate the speed of upstream operators but it > can be adapted for > your scenario by checking and recording the "completion status" of all the > operators. > > Ram > > On Sat, Dec 3, 2016 at 5:10 AM, Bhupesh Chawda > wrote: > > Hi Vishal, > > > A window is processed by an operator only when the previous window is > completely processed. When you send the control tuple in a new window, you > can be sure that all previous windows have been processed. > > > That is the reason I asked you to send the control tuple in a new window. > > > For shutdown, you can try throwing a ShutdownException() from the input > operator. This will propagate through the entire Dag and shutdown all the > operators in sequence. > > > ~ Bhupesh > > > > On Dec 3, 2016 18:15, "Vishal Agrawal" > wrote: > > Thank you Bhupesh. > > Another catch is just because input operator has processed last record > doesn't mean all the intermediate operators have processed it as well. How > can I ensure that all the operators have processed all the records before > performing the write operation. > > Also is there a way to shutdown the dag programmatically once it has > performed the write operation. > > > Thanks, > Vishal > > > On Fri, Dec 2, 2016 at 11:11 PM Bhupesh Chawda > wrote: > > Hi Vishal, > > The support for such operations is currently being enhanced in Apex. > > For now, you can do the following: > - Have an additional output port in your input operator as well as an > input port in the "Writer" operator. > - Once the Input operator has read and emitted all the data that it > wanted to, you can send a tuple on the new port that you have created. This > tuple will act as your signal. Make sure to do this in a new window - > ideally if the input is done in window x, send this tuple in window x+1. > - When you receive this tuple on the Writer operator, you can perform the > write operation on the external system. > > ~ Bhupesh > > On Sat, Dec 3, 2016 at 3:56 AM, Vishal Agrawal < > vishal.agrawal123@gmail.com> wrote: > > Hi, > > I am performing a batch operation. My input operator is reading multiple > files line by line and then there are bunch of operators manipulating the > records to evaluate result. > My output operator is supposed to write the final result to external > system once all the records from each of the files are processed. > > On completion of reading all the files, how can I trigger an event which > will inform my output operator to perform the write operation on external > system. > > > Thanks, > Vishal > > > > > > > > > > > > > > > > > > > > > --001a113e2b5c79643205431d7b9e Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable
Thank you everyone for your support !!

=

O= n Tue, Dec 6, 2016 at 9:57 PM, Vlad Rozov <v.rozov@datatorrent.com> wrote:
=20 =20 =20
I'd recommend to use additional output port solution outlined by Bhupesh. There are few Apex applications on the field that leverage that solution.

Thank you,

Vlad


On 12/4/16 11:45, = Vishal Agrawal wrote:
Thank you Bhupesh an= d Ram. Appreciate your quick response.

I see=C2=A0ThrottlingStatsListener.processStats() method gets called whenever new stats are received from the operators.=C2=A0
How frequently the= se stats are sent by the operators? Is it end of every window?


Thanks,
Vishal=C2=A0



On Sat= , Dec 3, 2016 at 1:46 PM, Munagala Ramanath <ram@datatorrent.com> wrote:
On Sat, Dec 3, 2016 at 5:10 AM, Bhupesh Chawda <bhupesh@datatorr= ent.com> wrote:

Hi Vis= hal,



A wind= ow is processed by an operator only when the previous window is completely processed. When you send the control tuple in a new window, you can be sure that all previous windows have been processed.

That is the reason I asked you to send the control tuple in a new window.



For sh= utdown, you can try throwing a=C2=A0 ShutdownException() from the inp= ut operator. This will propagate through the entire Dag and shutdown all the operators in sequence.



~ Bhup= esh




On Dec 3, 2016 18:15, "Vishal Agrawal" <vishal.agrawal123@gmail.com> wrote:
Thank you Bhupesh.

Another catch is just because input operator has processed last record doesn't mean all the intermediate operators have processed it as well. How can I ensure that all the operators have processed all the records before performing the write operation.

Also is there a way to shutdown the dag programmatically once it has performed the write operation.


Thanks,
Vishal=C2=A0


On Fri, Dec 2, 2016 at 11:11 PM Bhupesh Chawda <bhupesh@datatorrent.com> wrote:
Hi Vishal,=

The support for such operations is currently being enhanced in Apex.

For now, you can do the following:
=C2=A0- Have an additional outp= ut port in your input operator as well as an input port in the "Writer" operator= .
=C2=A0- Once the Input operator h= as read and emitted all the data that it wanted to, you can send a tuple on the new port that you have created. This tuple will act as your signal. Make sure to do this in a new window - ideally if the input is done in window x, send this tuple in window x+1.
=C2=A0- When you receive this tuple on the Writer operator, you can perform the write operation on the external system.

~ Bhupesh

On Sat, Dec 3, 2016 at 3:56 AM, Vishal Agrawal <vishal.agrawal123@gmail.com> wrote:
Hi,

I am performin= g a batch operation. My input operator is reading multiple files line by line and then there are bunch of operators manipulating the records to evaluate result.
My output operator is supposed to write the final result to external system once all the records from each of the files are processed.

On completion of reading all the files, how can I trigger an event which will inform my output operator to perform the write operation on external system.


Thanks,
Vishal




















--001a113e2b5c79643205431d7b9e--