Return-Path: X-Original-To: apmail-flink-user-archive@minotaur.apache.org Delivered-To: apmail-flink-user-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 25EB018072 for ; Mon, 29 Jun 2015 13:01:51 +0000 (UTC) Received: (qmail 51790 invoked by uid 500); 29 Jun 2015 13:01:51 -0000 Delivered-To: apmail-flink-user-archive@flink.apache.org Received: (qmail 51714 invoked by uid 500); 29 Jun 2015 13:01:51 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@flink.apache.org Delivered-To: mailing list user@flink.apache.org Received: (qmail 51704 invoked by uid 99); 29 Jun 2015 13:01:50 -0000 Received: from Unknown (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 29 Jun 2015 13:01:50 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id 87E4018275B for ; Mon, 29 Jun 2015 13:01:50 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 2.981 X-Spam-Level: ** X-Spam-Status: No, score=2.981 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, HEADER_FROM_DIFFERENT_DOMAINS=0.001, HTML_MESSAGE=3, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, SPF_PASS=-0.001, URIBL_BLOCKED=0.001] autolearn=disabled Authentication-Results: spamd3-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-eu-west.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id zP-QcWLCScwp for ; Mon, 29 Jun 2015 13:01:42 +0000 (UTC) Received: from mail-vn0-f45.google.com (mail-vn0-f45.google.com [209.85.216.45]) by mx1-eu-west.apache.org (ASF Mail Server at mx1-eu-west.apache.org) with ESMTPS id 253B120530 for ; Mon, 29 Jun 2015 13:01:41 +0000 (UTC) Received: by vnbg190 with SMTP id g190so3437870vnb.2 for ; Mon, 29 Jun 2015 06:01:33 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20120113; h=mime-version:sender:in-reply-to:references:date:message-id:subject :from:to:content-type; bh=88EdYy2FzO6xr/AOwnri/ZnIeXwyl33LR6lJmowGyQY=; b=u1Sno2VOXgraODxzCOEEnHZlhNMqkxtTXY5nwe9dhpPvyyltheqDbx5ft3m6lFUs7G oqovmowkQdeeiAtiQlB/Z9AKPfxU679KQdaYDqK8FxDedYuswdNddAwq1s58GRmjDDqC QQBtHzrPoyuMGuF2yvd8MmN5dmT6PUJa0enk7ysLjYw2H+5m28P4Pk1z/6uEgNE7bcdw 39E3PD7kUqTgJdP2RR97G5YR5+0Agl58bBVx/oMqiZGM26ztt5p9YFBXSUtYRVRzY1fB dYkPBhVKFXIS7/jgpiwgTMpXKo2myFqwo8iesFb91PmckilmpsewhKW8DSHjBJyY8W1w 7lUQ== MIME-Version: 1.0 X-Received: by 10.52.26.5 with SMTP id h5mr12656625vdg.3.1435582893657; Mon, 29 Jun 2015 06:01:33 -0700 (PDT) Sender: ewenstephan@gmail.com Received: by 10.31.164.210 with HTTP; Mon, 29 Jun 2015 06:01:33 -0700 (PDT) In-Reply-To: <1435582730.15421.15.camel@laguna> References: <1435581026.15421.12.camel@laguna> <1435582730.15421.15.camel@laguna> Date: Mon, 29 Jun 2015 15:01:33 +0200 X-Google-Sender-Auth: AaJCbanbitptjEtppDBuuyInI44 Message-ID: Subject: Re: Logs meaning states From: Stephan Ewen To: user@flink.apache.org Content-Type: multipart/alternative; boundary=20cf307ac3a56c11400519a7b08e --20cf307ac3a56c11400519a7b08e Content-Type: text/plain; charset=UTF-8 In general, avoid collect if you can. Collect brings data top the client, where the computation is not parallel any more. Try to do as much on the DataSet as possible. On Mon, Jun 29, 2015 at 2:58 PM, Juan Fumero < juan.jose.fumero.alfonso@oracle.com> wrote: > Hi Stephan, > so should I use another method instead of collect? It seems > multithread is not working with this. > > > Juan > > On Mon, 2015-06-29 at 14:51 +0200, Stephan Ewen wrote: > > Hi Juan! > > > > > > This is an artifact of a workaround right now. The actual collect() > > logic happens in the flatMap() and the sink is a dummy that executes > > nothing. The flatMap writes the data to be collected to the > > "accumulator" that delivers it back. > > > > > > Greetings, > > Stephan > > > > > > > > On Mon, Jun 29, 2015 at 2:30 PM, Juan Fumero > > wrote: > > Hi, > > I am starting with Flink. I have tried to look for the > > documentation but I havent found it clear. > > > > I wonder the difference between these two states: > > > > FlatMap RUNNING vs DataSink RUNNIG. > > > > FlatMap is doing data any data transformation? Compilation? In > > which point is actually executing the function provided in the > > MapFunction? How could I know exactly the time for the kernel > > computation? > > > > It seems is using one thread in this step, even though I > > specified 16 threads in the createLocalEnvironment. > > > > CHAIN DataSource (at applyFunction(ApplyFunction.java:96) > > (org.apache.flink.api.java.io.CollectionInputFormat)) -> Map > > (Map at applyFunction(ApplyFunction.java:108)) -> FlatMap > > (collect())(1/1) switched to RUNNING > > > > Here is running only one thread for almost 35 seconds. > > > > The rest of the execution is very fast (less than one second > > for computing the square of an array of 500000 integer > > elements) > > > > Thanks > > Juan > > > > Here the full log. > > > > 06/29/2015 14:13:25 Job execution switched to status RUNNING. > > 06/29/2015 14:13:25 CHAIN DataSource (at > > applyFunction(ApplyFunction.java:96) > > (org.apache.flink.api.java.io.CollectionInputFormat)) -> Map > > (Map at applyFunction(ApplyFunction.java:108)) -> FlatMap > > (collect())(1/1) switched to SCHEDULED > > 06/29/2015 14:13:25 CHAIN DataSource (at > > applyFunction(ApplyFunction.java:96) > > (org.apache.flink.api.java.io.CollectionInputFormat)) -> Map > > (Map at applyFunction(ApplyFunction.java:108)) -> FlatMap > > (collect())(1/1) switched to DEPLOYING > > 06/29/2015 14:13:26 CHAIN DataSource (at > > applyFunction(ApplyFunction.java:96) > > (org.apache.flink.api.java.io.CollectionInputFormat)) -> Map > > (Map at applyFunction(ApplyFunction.java:108)) -> FlatMap > > (collect())(1/1) switched to RUNNING > > 06/29/2015 14:14:01 DataSink (collect() sink)(1/1) switched to > > SCHEDULED > > 06/29/2015 14:14:01 CHAIN DataSource (at > > applyFunction(ApplyFunction.java:96) > > (org.apache.flink.api.java.io.CollectionInputFormat)) -> Map > > (Map at applyFunction(ApplyFunction.java:108)) -> FlatMap > > (collect())(1/1) switched to FINISHED > > 06/29/2015 14:14:01 DataSink (collect() sink)(1/1) switched to > > DEPLOYING > > 06/29/2015 14:14:01 DataSink (collect() sink)(1/1) switched to > > RUNNING > > 06/29/2015 14:14:01 DataSink (collect() sink)(1/1) switched to > > FINISHED > > 06/29/2015 14:14:01 Job execution switched to status FINISHED. > > > > > > > > > > > > > > > --20cf307ac3a56c11400519a7b08e Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable
In general, avoid collect if you can. Collect brings data = top the client, where the computation is not parallel any more.

Try to do as much on the DataSet as possible.

On Mon, Jun 29, 2015 at 2:5= 8 PM, Juan Fumero <juan.jose.fumero.alfonso@oracle.com> wrote:
Hi Stephan,
=C2=A0 so should I use another method instead of collect? It seems
multithread is not working with this.


Juan

On Mon, 2015-06-29 at 14:51 +0200, Stephan Ewen wrote:
> Hi Juan!
>
>
> This is an artifact of a workaround right now. The actual collect() > logic happens in the flatMap() and the sink is a dummy that executes > nothing. The flatMap writes the data to be collected to the
> "accumulator" that delivers it back.
>
>
> Greetings,
> Stephan
>
>
>
> On Mon, Jun 29, 2015 at 2:30 PM, Juan Fumero
> <
juan.jose.f= umero.alfonso@oracle.com> wrote:
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0Hi,
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0I am starting with Flink. I ha= ve tried to look for the
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0documentation but I havent found it c= lear.
>
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0I wonder the difference between these= two states:
>
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0FlatMap RUNNING vs DataSink RUNNIG. >
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0FlatMap is doing data any data transf= ormation? Compilation? In
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0which point is actually executing the= function provided in the
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0MapFunction? How could I know exactly= the time for the kernel
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0computation?
>
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0It seems is using one thread in this = step, even though I
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0specified 16 threads in the createLoc= alEnvironment.
>
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0CHAIN DataSource (at applyFunction(Ap= plyFunction.java:96)
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0(org.apache.flink.api.java.io.Collect= ionInputFormat)) -> Map
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0(Map at applyFunction(ApplyFunction.j= ava:108)) -> FlatMap
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0(collect())(1/1) switched to RUNNING<= br> >
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0Here is running only one thread for a= lmost 35 seconds.
>
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0The rest of the execution is very fas= t (less than one second
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0for computing the square of an array = of 500000 integer
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0elements)
>
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0Thanks
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0Juan
>
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0Here the full log.
>
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A006/29/2015 14:13:25 Job execution swi= tched to status RUNNING.
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A006/29/2015 14:13:25 CHAIN DataSource = (at
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0applyFunction(ApplyFunction.java:96)<= br> >=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0(org.apache.flink.api.java.io.Collect= ionInputFormat)) -> Map
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0(Map at applyFunction(ApplyFunction.j= ava:108)) -> FlatMap
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0(collect())(1/1) switched to SCHEDULE= D
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A006/29/2015 14:13:25 CHAIN DataSource = (at
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0applyFunction(ApplyFunction.java:96)<= br> >=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0(org.apache.flink.api.java.io.Collect= ionInputFormat)) -> Map
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0(Map at applyFunction(ApplyFunction.j= ava:108)) -> FlatMap
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0(collect())(1/1) switched to DEPLOYIN= G
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A006/29/2015 14:13:26 CHAIN DataSource = (at
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0applyFunction(ApplyFunction.java:96)<= br> >=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0(org.apache.flink.api.java.io.Collect= ionInputFormat)) -> Map
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0(Map at applyFunction(ApplyFunction.j= ava:108)) -> FlatMap
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0(collect())(1/1) switched to RUNNING<= br> >=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A006/29/2015 14:14:01 DataSink (collect= () sink)(1/1) switched to
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0SCHEDULED
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A006/29/2015 14:14:01 CHAIN DataSource = (at
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0applyFunction(ApplyFunction.java:96)<= br> >=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0(org.apache.flink.api.java.io.Collect= ionInputFormat)) -> Map
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0(Map at applyFunction(ApplyFunction.j= ava:108)) -> FlatMap
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0(collect())(1/1) switched to FINISHED=
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A006/29/2015 14:14:01 DataSink (collect= () sink)(1/1) switched to
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0DEPLOYING
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A006/29/2015 14:14:01 DataSink (collect= () sink)(1/1) switched to
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0RUNNING
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A006/29/2015 14:14:01 DataSink (collect= () sink)(1/1) switched to
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0FINISHED
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A006/29/2015 14:14:01 Job execution swi= tched to status FINISHED.
>
>
>
>
>
>



--20cf307ac3a56c11400519a7b08e--