Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 49FDC200B78 for ; Fri, 2 Sep 2016 16:39:37 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 4871C160AAE; Fri, 2 Sep 2016 14:39:37 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 40F57160A8C for ; Fri, 2 Sep 2016 16:39:36 +0200 (CEST) Received: (qmail 59727 invoked by uid 500); 2 Sep 2016 14:39:35 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@flink.apache.org Delivered-To: mailing list user@flink.apache.org Received: (qmail 59713 invoked by uid 99); 2 Sep 2016 14:39:35 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 02 Sep 2016 14:39:35 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id D9120C1ED3 for ; Fri, 2 Sep 2016 14:39:34 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.28 X-Spam-Level: * X-Spam-Status: No, score=1.28 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, HTML_MESSAGE=2, RCVD_IN_DNSWL_LOW=-0.7, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01] autolearn=disabled Authentication-Results: spamd1-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=greghogan-com.20150623.gappssmtp.com Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id fw3DdYo0Viec for ; Fri, 2 Sep 2016 14:39:31 +0000 (UTC) Received: from mail-qk0-f170.google.com (mail-qk0-f170.google.com [209.85.220.170]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with ESMTPS id 86C685F1F3 for ; Fri, 2 Sep 2016 14:39:30 +0000 (UTC) Received: by mail-qk0-f170.google.com with SMTP id z190so120642678qkc.0 for ; Fri, 02 Sep 2016 07:39:30 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=greghogan-com.20150623.gappssmtp.com; s=20150623; h=mime-version:in-reply-to:references:from:date:message-id:subject:to; bh=3jjpVS6dVH04wrQKSVm26q+z9XJjyA9pKhFibC8YxKQ=; b=WcR9m1CbPWk6RR15Bqor1fSJ4kePR06cuKwAwh5CaWYkXgJGrsUibz5akNBctV5mcp 0dFHvQlhk5HP5ttP73QqDTW7xXS+9lV4h8CUk0IgvqlsBmlXq6+vIL/rAqxlUlj1oyt0 dvnJepTixVLlF+EUGkNnAWm2Gzuzx3/RnyMfHZ9PtFPhoryyJs0tDq26lxlvPsWeMySn w+py49YUVlATIFQ/V/lPDpSkm+7eKyEVzMsKycMEkcZ4nj/L6et11SR8uWcPwss7+OW5 tHLFYfKVNY8JFilt5NevH6phLl7NgMy7K2KforkPjumpFd6IH3KsI26lkoAZiHbH0jbr 7y8w== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20130820; h=x-gm-message-state:mime-version:in-reply-to:references:from:date :message-id:subject:to; bh=3jjpVS6dVH04wrQKSVm26q+z9XJjyA9pKhFibC8YxKQ=; b=TeoLSW5vKevri1uWmw7BS/NtHEVxnj7dqaqGKYwB0J0JNXvMVIlkqBK04qg96oY5CO ykssrbaO3XJPQghVey2XIG5fxw5CnXg0MPLhB8FcdZqIok/we4Q8AcY71o4Fl0f2Ae5T IjNV7BRSw97Xgq6Avw1gPbkn0EOvVp4t4BluLr7Op1Z87LE6yW97zqBkEwMflyhUkDob h//nBO6qYSKvk61wwGx7mlSOwITRb8uaOlInc+dH1LbcdmB3qTY8wd2ap62r66v+PQr9 WHugeGRwYHO47PDWKww1kf6YLm3eh0dC9hK8hvduFvEL5WA+HnY88dJSe0FIAm7XzVbp 0DNA== X-Gm-Message-State: AE9vXwOvS0r+vQ8iHI2FhBoKuV9/DG7I0lreWIsTjmTbd58zojA2qv37UX1SU7OpOgSNlvnPaUa8n3GriGkD/w== X-Received: by 10.55.22.163 with SMTP id 35mr3832498qkw.179.1472827169267; Fri, 02 Sep 2016 07:39:29 -0700 (PDT) MIME-Version: 1.0 Received: by 10.237.48.139 with HTTP; Fri, 2 Sep 2016 07:39:28 -0700 (PDT) X-Originating-IP: [144.51.242.14] In-Reply-To: <6abda2d9-82bc-aa27-399c-77475446c7fd@campus.tu-berlin.de> References: <6abda2d9-82bc-aa27-399c-77475446c7fd@campus.tu-berlin.de> From: Greg Hogan Date: Fri, 2 Sep 2016 10:39:28 -0400 Message-ID: Subject: Re: Flink Iterations vs. While loop To: user@flink.apache.org Content-Type: multipart/alternative; boundary=001a114968e63d8029053b874c9d archived-at: Fri, 02 Sep 2016 14:39:37 -0000 --001a114968e63d8029053b874c9d Content-Type: text/plain; charset=UTF-8 Hi Dan, Where are you reading the 200 GB "data" from? How much memory per node? If the DataSet is read from a distributed filesystem and if with iterations Flink must spill to disk then I wouldn't expect much difference. About how many iterations are run in the 30 minutes? I don't know that this is reported explicitly, but if your convergence function only has one input record per iteration then the reported total is the iteration count. One other thought, we should soon have support for object reuse with arrays (FLINK-3695). This would be implemented as DoubleValueArray or ValueArray rather than double[] but it would be interesting to test for a change in performance. Greg On Fri, Sep 2, 2016 at 6:16 AM, Dan Drewes wrote: > Hi, > > for my bachelor thesis I'm testing an implementation of L-BFGS algorithm > with Flink Iterations against a version without Flink Iterations but a > casual while loop instead. Both programs use the same Map and Reduce > transformations in each iteration. It was expected, that the performance of > the Flink Iterations would scale better with increasing size of the input > data set. However, the measured results on an ibm-power-cluster are very > similar for both versions, e.g. around 30 minutes for 200 GB data. The > cluster has 8 nodes, was configured with 4 slots per node and I used a > total parallelism of 32. > In every Iteration of the while loop a new flink job is started and I > thought, that also the data would be distributed over the network again in > each iteration which should consume a significant and measurable amount of > time. Is that thought wrong or what is the computional overhead of the > flink iterations that is equalizing this disadvantage? > I include the relevant part of both programs and also attach the generated > execution plans. > Thank you for any ideas as I could not find much about this issue in the > flink docs. > > Best, Dan > > *Flink Iterations:* > > DataSet data = ... > > State state = initialState(m, initweights,0,new double[initweights.length]); > DataSet statedataset = env.fromElements(state); > //start of iteration sectionIterativeDataSet loop= statedataset.iterate(niter);; > > > DataSet statewithnewlossgradient = data.map(difffunction).withBroadcastSet(loop, "state") > .reduce(accumulate) > .map(new NormLossGradient(datasize)) > .map(new SetLossGradient()).withBroadcastSet(loop,"state") > .map(new LBFGS()); > > > DataSet converged = statewithnewlossgradient.filter( > new FilterFunction() { > @Override public boolean filter(State value) throws Exception { > if(value.getIflag()[0] == 0){ > return false; > } > return true; > } > } > ); > > DataSet finalstate = loop.closeWith(statewithnewlossgradient,converged); > > > > > *While loop: * > > DataSet data =... > State state = initialState(m, initweights,0,new double[initweights.length]); > int cnt=0;do{ > LBFGS lbfgs = new LBFGS(); > statedataset=data.map(difffunction).withBroadcastSet(statedataset, "state") > .reduce(accumulate) > .map(new NormLossGradient(datasize)) > .map(new SetLossGradient()).withBroadcastSet(statedataset,"state") > .map(lbfgs); > cnt++; > }while (cnt > > --001a114968e63d8029053b874c9d Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable
Hi Dan,

Where are you reading the 2= 00 GB "data" from? How much memory per node? If the DataSet is re= ad from a distributed filesystem and if with iterations Flink must spill to= disk then I wouldn't expect much difference. About how many iterations= are run in the 30 minutes? I don't know that this is reported explicit= ly, but if your convergence function only has one input record per iteratio= n then the reported total is the iteration count.

One oth= er thought, we should soon have support for object reuse with arrays (FLINK= -3695). This would be implemented as DoubleValueArray or ValueArray<Doub= leValue> rather than double[] but it would be interesting to test for a = change in performance.

Greg

On Fri, Sep 2, 2016 at 6:16 AM,= Dan Drewes <drewes@campus.tu-berlin.de> wrote:
=
=20 =20 =20
Hi,

for my bachelor thesis I'm testing an implementation of L-BFGS algorithm with Flink Iterations against a version without Flink Iterations but a casual while loop instead. Both programs use the same Map and Reduce transformations in each iteration. It was expected, that the performance of the Flink Iterations would scale better with increasing size of the input data set. However, the measured results on an ibm-power-cluster are very similar for both versions, e.g. around 30 minutes for 200 GB data. The cluster has 8 nodes, was configured with 4 slots per node and I used a total parallelism of 32.
In every Iteration of the while loop a new flink job is started and I thought, that also the data would be distributed over the network again in each iteration which should consume a significant and measurable amount of time. Is that thought wrong or what is the computional overhead of the flink iterations that is equalizing this disadvantage?
I include the relevant part of both programs and also attach the generated execution plans.
Thank you for any ideas as I could not find much about this issue in the flink docs.

Best, Dan

Flink Iterations:

=20
D=
ataSet<double[]> data =3D ...
S=
tate state =3D initialState=
(m, initweights,0,new double[initweights.length]);
DataSet<State> stated=
ataset =3D env.fromElements(state);

//start of iteration sectio=
n

IterativeDataSet<State> loop=3D statedataset.iterate(niter);;


DataSet<State> statew=
ithnewlossgradient =3D data.map(difffunction).withBroadcastSet(loop, <=
span style=3D"color:#008000;font-weight:bold">"state")
              .reduce(accumulate)
              .map(new NormLossGradient(datasize))
              .map(new SetLossGradient()).withBroadcastSet(loop,"state")
              .map(new LBFGS());


DataSet<State> conver=
ged =3D statewithnewlossgradient.filter(
   new FilterFunction=
<State>() {
      @Override
      public boolean filter(State value) throws Exception {
         if(value.get=
Iflag()[0] =3D=3D 0){
            return false;
         }
         return true;
      }
   }
);

DataSet<State> finals=
tate =3D loop.closeWith(statewithnewlossgradient,converged);
    
While loop:

=20
<=
span style=3D"background-color:#e4e4ff">DataSet<double[]> data =3D...=20
State state =3D initialState(m, initweights,0,new double[initweights.length]);

int cnt=3D0;
do{
   LBFGS lbfgs =3D new LBFGS();
   statedataset=3Ddata.map(difffunction).withBroadcastSet(stateda=
taset, "state")
      .reduce(accumulate)
      .map(new NormLo=
ssGradient(datasize))
      .map(new SetLos=
sGradient()).withBroadcastSet(statedataset,"state")
      .map(lbfgs);
   cnt++;
}while (cnt<niter =
&& statedataset.collect().get(0).getIflag()[0] !=3D 0);


--001a114968e63d8029053b874c9d--