Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id E1CE7200B55 for ; Sun, 17 Jul 2016 04:29:28 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id E06C4160A81; Sun, 17 Jul 2016 02:29:28 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 0A80A160A7C for ; Sun, 17 Jul 2016 04:29:27 +0200 (CEST) Received: (qmail 11605 invoked by uid 500); 17 Jul 2016 02:29:27 -0000 Mailing-List: contact user-help@crunch.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@crunch.apache.org Delivered-To: mailing list user@crunch.apache.org Received: (qmail 11595 invoked by uid 99); 17 Jul 2016 02:29:27 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 17 Jul 2016 02:29:27 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id B7CA81A5259 for ; Sun, 17 Jul 2016 02:29:26 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.429 X-Spam-Level: * X-Spam-Status: No, score=1.429 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, FREEMAIL_ENVFROM_END_DIGIT=0.25, HTML_MESSAGE=2, RCVD_IN_DNSWL_LOW=-0.7, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, SPF_PASS=-0.001] autolearn=disabled Authentication-Results: spamd2-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id gJDnaYrpV8lJ for ; Sun, 17 Jul 2016 02:29:24 +0000 (UTC) Received: from mail-it0-f46.google.com (mail-it0-f46.google.com [209.85.214.46]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTPS id 5EC415F613 for ; Sun, 17 Jul 2016 02:29:24 +0000 (UTC) Received: by mail-it0-f46.google.com with SMTP id j124so5997188ith.1 for ; Sat, 16 Jul 2016 19:29:24 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20120113; h=mime-version:references:in-reply-to:from:date:message-id:subject:to; bh=gr3VHtc8udv0Z1gzurivm3Ig1jSK2T75vfPWZTHUB9A=; b=zpZbV3s2HHaLj2it2BF6aRd7ZvEl4QMGNsVNiBr6gzPNXCZ/kP3c7fADoAnOuTXrw/ W0h3xynb3wW3fhxq2ackvbNsOYAGMP9JiGOMgc0k1Fnjf00WbyanmczogLwPkCroMhYT WZSNkYxxOFiI+UwtNMO+qxHUDhGo8fuygi2rSKM28PDFSorzBigCVqUA+WgjQV3fezq+ /OgZG2aDvpCR7Lwk6xjsM3Afq2Adkbg/T/Jc/CC1wJyWq39hwWxJr4z1UQp1G2Yj437q dD+X/39/8gnFXW2G+zVG+PwRzgwRnQfUIRNqXopS9lyJ0p3M7XwaPUlNFvWC6vbYAPIH qHKg== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20130820; h=x-gm-message-state:mime-version:references:in-reply-to:from:date :message-id:subject:to; bh=gr3VHtc8udv0Z1gzurivm3Ig1jSK2T75vfPWZTHUB9A=; b=Mwen9gvFTPPaWMCQGepvBAbHIOI72iW0r28Ib1YWG0QYyDl+hbzBkiIeizn6Y+vHgP VfKPC5jhu0GuwtQMgpq3r/2pRfHQSf+d/rLRXqBMs9ZvWxk1KxTcUA7+hSVKWb7iS/GD 773e+Fa+NJNN5gl1T12uo5W2xUYh8O9C2jtBPVt5gpdf8LagQpEYLBgq1kAEkQ/sL7vL P6xAJgMSPbV9vsjGlby8Tzu7kKTk48YLC5grKI8FeZHuZv08AYzAZsNjYTUYqFwK29yV kDMqxV+PI4OrASHkPIAlShiR2SAWGD8OZKRsACvRRr5C1pUFKSuxUAzWK3vPH0m9SoUs YFNw== X-Gm-Message-State: ALyK8tIJoKifpAUY+s8KgoUxtlRrA8La3BNSC0XtBfBkZ4iQvRXOBf6+DtmL7A5WpRfGDC5cY47qI0LTv+/D6g== X-Received: by 10.36.142.129 with SMTP id h123mr29841196ite.66.1468722563183; Sat, 16 Jul 2016 19:29:23 -0700 (PDT) MIME-Version: 1.0 References: <7C1189D566F1FED2.1C8D5E2A-8BDE-4FB4-B524-67BF2E78B596@mail.outlook.com> <704129E5-5F14-467B-802E-8DBB3768CE56@gmail.com> <8A4341CE-475A-4624-9232-1AA9031BBD44@gmail.com> <14F65134-3716-4E15-8D05-D3CCFBCBCDCB@gmail.com> In-Reply-To: From: David Ortiz Date: Sun, 17 Jul 2016 02:29:13 +0000 Message-ID: Subject: Re: Processing many map only collections in single pipeline with spark To: user@crunch.apache.org Content-Type: multipart/alternative; boundary=94eb2c04a0d2a6f9fe0537cb9e30 archived-at: Sun, 17 Jul 2016 02:29:29 -0000 --94eb2c04a0d2a6f9fe0537cb9e30 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: quoted-printable Hmm. Just out of curiosity, what if you do Pipeline.read in place of readTextFile? On Sat, Jul 16, 2016, 10:08 PM Ben Juhn wrote: > Nope, it queues up the jobs in series there too. > > On Jul 16, 2016, at 6:01 PM, David Ortiz wrote: > > *run in parallel > > On Sat, Jul 16, 2016, 5:36 PM David Ortiz wrote: > >> Just out of curiosity, if you use mrpipeline does it fun on parallel? I= f >> so, issue may be in spark since I believe crunch leaves it to spark to >> handle best method of execution. >> >> On Sat, Jul 16, 2016, 4:29 PM Ben Juhn wrote: >> >>> Hey David, >>> >>> I have 100 active executors, each job typically only uses a few. It=E2= =80=99s >>> running on yarn. >>> >>> Thanks, >>> Ben >>> >>> On Jul 16, 2016, at 12:53 PM, David Ortiz wrote: >>> >>> What are the cluster resources available vs what a single map uses? >>> >>> On Sat, Jul 16, 2016, 3:04 PM Ben Juhn wrote: >>> >>>> I enabled FAIR scheduling hoping that would help but only one job is >>>> showing up a time. >>>> >>>> Thanks, >>>> Ben >>>> >>>> On Jul 15, 2016, at 8:17 PM, Ben Juhn wrote: >>>> >>>> Each input is of a different format, and the DoFn implementation >>>> handles them depending on instantiation parameters. >>>> >>>> Thanks, >>>> Ben >>>> >>>> On Jul 15, 2016, at 7:09 PM, Stephen Durfey wrote= : >>>> >>>> Instead of using readTextFile on the pipeline, try using the read >>>> method and use the TextFileSource, which can accept in a collection of >>>> paths. >>>> >>>> >>>> https://github.com/apache/crunch/blob/master/crunch-core/src/main/java= /org/apache/crunch/io/text/TextFileSource.java >>>> >>>> >>>> >>>> >>>> On Fri, Jul 15, 2016 at 8:53 PM -0500, "Ben Juhn" >>>> wrote: >>>> >>>> Hello, >>>>> >>>>> I have a job configured the following way: >>>>> >>>>> for (String path : paths) { >>>>> PCollection col =3D pipeline.readTextFile(path); >>>>> col.parallelDo(new MyDoFn(path), Writables.strings()).write(To.te= xtFile(=E2=80=9Cout/=E2=80=9C + path), Target.WriteMode.APPEND); >>>>> } >>>>> pipeline.done(); >>>>> >>>>> It results in one spark job for each path, and the jobs run in sequen= ce even though there are no dependencies. Is it possible to have the jobs = run in parallel? >>>>> >>>>> Thanks, >>>>> >>>>> Ben >>>>> >>>>> >>>>> >>>> >>>> >>> > --94eb2c04a0d2a6f9fe0537cb9e30 Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable

Hmm.=C2=A0 Just out of curiosity, what if you do Pipeline.re= ad in place of readTextFile?


On Sat, Jul 16, 2016, 10:08= PM Ben Juhn <benjijuhn@gmail.com= > wrote:
Nope, it queues up the jobs in series there too.

On Jul 16, 2016, at 6:01 PM, David Ortiz <dpo5003@gmail.com> wrote:

<= p dir=3D"ltr">*run in parallel


On Sat, Jul 16, 2016, 5:36 = PM David Ortiz <d= po5003@gmail.com> wrote:

Just out of curiosity, if you use mrpipeline does it fun on par= allel?=C2=A0 If so, issue may be in spark since I believe crunch leaves it = to spark to handle best method of execution.


On Sat, Jul 16, 2016, 4:29 = PM Ben Juhn <be= njijuhn@gmail.com> wrote:
Hey David,

I have 100 = active executors, each job typically only uses a few.=C2=A0 It=E2=80=99s ru= nning on yarn.

Thanks,
Ben

<= div>On Jul 16, 2016, at 12:53 PM, David Ortiz <dpo5003@gmail.com> wrote:

What are the cluster resources available vs what a single = map uses?


On Sat, Jul 16, 2016, 3:04 = PM Ben Juhn <be= njijuhn@gmail.com> wrote:
I enabled FAIR scheduling hoping that wou= ld help but only one job is showing up a time.

Thanks,
Ben

<= blockquote type=3D"cite">
On Jul 15, 2016, at 8:17 PM, Ben Juhn <benjijuhn@gmail.com> wrote:

Each input = is of a different format, and the DoFn implementation handles them dependin= g on instantiation parameters.

Thanks,
Ben


Instead of using= readTextFile on the pipeline, try using the read method and use the TextFi= leSource, which can accept in a collection of paths.=C2=A0

https://gi= thub.com/apache/crunch/blob/master/crunch-core/src/main/java/org/apache/cru= nch/io/text/TextFileSource.java




On Fri, Jul 15, 2016 at 8:53 PM -0500, "Ben= Juhn" <benjijuhn@gmail.com> wrote:

Hello,

I have a job configured the following way:
<= div>
for (String path : paths) {
PCollect= ion<String> col =3D pipeline.readTextFile(path);
col.parallelD= o(new MyDoFn(pat= h), Writables.strings()).write(To.= textFile(=E2=80=9Cout/=E2=80=9C + = path), Target.WriteMode.APPEND= );
}
pipeline.done();
It results in one spark job for= each path, and the jobs run in sequence even though there are no dependenc= ies. Is it possible to have the jobs run in parallel?
Thanks,
Ben




--94eb2c04a0d2a6f9fe0537cb9e30--