Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 053E1200B55 for ; Sun, 17 Jul 2016 05:31:41 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 03AF1160A7C; Sun, 17 Jul 2016 03:31:41 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id F06D8160A6C for ; Sun, 17 Jul 2016 05:31:39 +0200 (CEST) Received: (qmail 43234 invoked by uid 500); 17 Jul 2016 03:31:38 -0000 Mailing-List: contact user-help@crunch.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@crunch.apache.org Delivered-To: mailing list user@crunch.apache.org Received: (qmail 43223 invoked by uid 99); 17 Jul 2016 03:31:38 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 17 Jul 2016 03:31:38 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id 2E00C180502 for ; Sun, 17 Jul 2016 03:31:38 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 2.179 X-Spam-Level: ** X-Spam-Status: No, score=2.179 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, FREEMAIL_REPLY=1, HTML_MESSAGE=2, RCVD_IN_DNSWL_LOW=-0.7, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, SPF_PASS=-0.001] autolearn=disabled Authentication-Results: spamd3-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id oMTkOtXuTVIv for ; Sun, 17 Jul 2016 03:31:35 +0000 (UTC) Received: from mail-oi0-f54.google.com (mail-oi0-f54.google.com [209.85.218.54]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTPS id 6C7F95F20C for ; Sun, 17 Jul 2016 03:31:35 +0000 (UTC) Received: by mail-oi0-f54.google.com with SMTP id j185so206842849oih.0 for ; Sat, 16 Jul 2016 20:31:35 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20120113; h=mime-version:in-reply-to:references:from:date:message-id:subject:to; bh=jCilLYNZ4bgRGbLMwhA1A+pmEbDtlWCyjohVSTpc3G4=; b=XyhCGnlsxWdz1qXnhl12ubvr7P5QdV9A2gs9q5OIkhPdpmIbjguEwT8pzyWS5GWDtu iDwWNfGcCWWvkVVkjZBU1lyvBvaS9OUt/ZwXPKHHRuaO9yOHA6GYtxkdHSf4l3083hbh e9PDH1WEqEmLzQsyLHXp1y1+bBgP6M9olUifwCLIJHgBiig3abs8LQ2BewPZfkl/+7hY 2/qjgEp/idAKnHIivFl1YJGUUwSwS14aUxKHm1bkrDjMbkh3lvqkzIZUQL2qhMjgKnIq SDbgZmOkFoeLGgxTmojMc0GTKUYJi4gPOu6ZynLDG9hwVFODWIclVFetwg4Cx7LIUhMx 3QKw== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20130820; h=x-gm-message-state:mime-version:in-reply-to:references:from:date :message-id:subject:to; bh=jCilLYNZ4bgRGbLMwhA1A+pmEbDtlWCyjohVSTpc3G4=; b=UP/fohjy7QpjfDtzn5+HE4RETPdJ9gFzROmQd60YABivzTY88tfICOO6qgvbzgD4sV GB6k1cqlvKBVVJCQyGCbP2pegSz+6f/AgUpChbqPjphblzFiLOE760fYQ37TtEo1bu/U mtrj71oAU+5ghcEg4ypzxEYxDEOi8ZC0wQpQrE56vvlqhU1STEqtov76UZf2TmQNMkrz AFGjS6bcug+4ZYrEfm3F7I5U/zbizyI+dxwQ4oBpsz6VhtPwyCii6CwU3E0l8eLjqwKZ 7fPCbNhDG+umwNLKwR7rfdkKQmF0m/IJWs6tMJDn0/FpNOJW8h0OCd+3yIUlYKWXWzfK 6jtA== X-Gm-Message-State: ALyK8tKqcCipiuPCLhTDpbpvjvpp30sJY6rFZ97uAqO8zCZl/psmkYVpMGGR7U21DyfaqJVNzjoJCdQNj6M72A== X-Received: by 10.202.252.137 with SMTP id a131mr13086128oii.4.1468726287865; Sat, 16 Jul 2016 20:31:27 -0700 (PDT) MIME-Version: 1.0 Received: by 10.202.75.204 with HTTP; Sat, 16 Jul 2016 20:31:08 -0700 (PDT) In-Reply-To: References: <7C1189D566F1FED2.1C8D5E2A-8BDE-4FB4-B524-67BF2E78B596@mail.outlook.com> <704129E5-5F14-467B-802E-8DBB3768CE56@gmail.com> <8A4341CE-475A-4624-9232-1AA9031BBD44@gmail.com> <14F65134-3716-4E15-8D05-D3CCFBCBCDCB@gmail.com> From: Josh Wills Date: Sat, 16 Jul 2016 20:31:08 -0700 Message-ID: Subject: Re: Processing many map only collections in single pipeline with spark To: "user@crunch.apache.org" Content-Type: multipart/alternative; boundary=001a113cd768a91bea0537cc7cdc archived-at: Sun, 17 Jul 2016 03:31:41 -0000 --001a113cd768a91bea0537cc7cdc Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: quoted-printable The TL;DR is that Spark doesn't really have a proper multiple outputs model ala Crunch/MR-- i.e., jobs aren't kicked off until you do some sort of write, and as soon as you do a write, Spark myopically executes all of the code that needs to happen in order for that write to be completed. You need to be fairly clever about sequencing your writes and doing intermediate caching to make sure your pipeline executes efficiently. On the Crunch/MR side, I'm a little surprised that we're only executing the map-only jobs one at a time-- I'm assuming you're not mucking with the crunch.max.running.jobs parameter in some way? (see: https://crunch.apache.org/user-guide.html#mrpipeline ) J On Sat, Jul 16, 2016 at 7:29 PM, David Ortiz wrote: > Hmm. Just out of curiosity, what if you do Pipeline.read in place of > readTextFile? > > On Sat, Jul 16, 2016, 10:08 PM Ben Juhn wrote: > >> Nope, it queues up the jobs in series there too. >> >> On Jul 16, 2016, at 6:01 PM, David Ortiz wrote: >> >> *run in parallel >> >> On Sat, Jul 16, 2016, 5:36 PM David Ortiz wrote: >> >>> Just out of curiosity, if you use mrpipeline does it fun on parallel? >>> If so, issue may be in spark since I believe crunch leaves it to spark = to >>> handle best method of execution. >>> >>> On Sat, Jul 16, 2016, 4:29 PM Ben Juhn wrote: >>> >>>> Hey David, >>>> >>>> I have 100 active executors, each job typically only uses a few. It= =E2=80=99s >>>> running on yarn. >>>> >>>> Thanks, >>>> Ben >>>> >>>> On Jul 16, 2016, at 12:53 PM, David Ortiz wrote: >>>> >>>> What are the cluster resources available vs what a single map uses? >>>> >>>> On Sat, Jul 16, 2016, 3:04 PM Ben Juhn wrote: >>>> >>>>> I enabled FAIR scheduling hoping that would help but only one job is >>>>> showing up a time. >>>>> >>>>> Thanks, >>>>> Ben >>>>> >>>>> On Jul 15, 2016, at 8:17 PM, Ben Juhn wrote: >>>>> >>>>> Each input is of a different format, and the DoFn implementation >>>>> handles them depending on instantiation parameters. >>>>> >>>>> Thanks, >>>>> Ben >>>>> >>>>> On Jul 15, 2016, at 7:09 PM, Stephen Durfey >>>>> wrote: >>>>> >>>>> Instead of using readTextFile on the pipeline, try using the read >>>>> method and use the TextFileSource, which can accept in a collection o= f >>>>> paths. >>>>> >>>>> >>>>> https://github.com/apache/crunch/blob/master/crunch-core/src/main/jav= a/org/apache/crunch/io/text/TextFileSource.java >>>>> >>>>> >>>>> >>>>> >>>>> On Fri, Jul 15, 2016 at 8:53 PM -0500, "Ben Juhn" >>>> > wrote: >>>>> >>>>> Hello, >>>>>> >>>>>> I have a job configured the following way: >>>>>> >>>>>> for (String path : paths) { >>>>>> PCollection col =3D pipeline.readTextFile(path); >>>>>> col.parallelDo(new MyDoFn(path), Writables.strings()).write(To.t= extFile(=E2=80=9Cout/=E2=80=9C + path), Target.WriteMode.APPEND); >>>>>> } >>>>>> pipeline.done(); >>>>>> >>>>>> It results in one spark job for each path, and the jobs run in seque= nce even though there are no dependencies. Is it possible to have the jobs= run in parallel? >>>>>> >>>>>> Thanks, >>>>>> >>>>>> Ben >>>>>> >>>>>> >>>>>> >>>>> >>>>> >>>> >> --001a113cd768a91bea0537cc7cdc Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable
The TL;DR is that Spark doesn't really have a proper m= ultiple outputs model ala Crunch/MR-- i.e., jobs aren't kicked off unti= l you do some sort of write, and as soon as you do a write, Spark myopicall= y executes all of the code that needs to happen in order for that write to = be completed. You need to be fairly clever about sequencing your writes and= doing intermediate caching to make sure your pipeline executes efficiently= .

On the Crunch/MR side, I'm a little surprised that we're o= nly executing the map-only jobs one at a time-- I'm assuming you're= not mucking with the crunch.max= .running.jobs parameter in some way? (see:=C2=A0https://crunch.apache.org/user-gu= ide.html#mrpipeline )


J

On Sat, Jul 16, 201= 6 at 7:29 PM, David Ortiz <dpo5003@gmail.com> wrote:

Hmm.=C2=A0 Just out of curiosity,= what if you do Pipeline.read in place of readTextFile?


On Sat, Jul 16, 2016, 10:08= PM Ben Juhn <b= enjijuhn@gmail.com> wrote:
<= div style=3D"word-wrap:break-word">Nope, it queues up the jobs in series th= ere too.

On Jul 16, 2016, at 6:01 PM, David Ortiz <dpo5003@gmail.com> wrot= e:

*run in parallel


On Sat, Jul 16, 2016, 5:36 = PM David Ortiz <d= po5003@gmail.com> wrote:

Just out of curiosity, if you use mrpipeline does it fun on par= allel?=C2=A0 If so, issue may be in spark since I believe crunch leaves it = to spark to handle best method of execution.


On Sat, Jul 16, 2016, 4:29 = PM Ben Juhn <be= njijuhn@gmail.com> wrote:
Hey David,

I have 100 = active executors, each job typically only uses a few.=C2=A0 It=E2=80=99s ru= nning on yarn.

Thanks,
Ben

<= div>On Jul 16, 2016, at 12:53 PM, David Ortiz <dpo5003@gmail.com> wrote:

What are the cluster resources available vs what a single = map uses?


On Sat, Jul 16, 2016, 3:04 = PM Ben Juhn <be= njijuhn@gmail.com> wrote:
I enabled FAIR scheduling hoping that wou= ld help but only one job is showing up a time.

Thanks,
Ben

<= blockquote type=3D"cite">
On Jul 15, 2016, at 8:17 PM, Ben Juhn <benjijuhn@gmail.com> wrote:

Each input = is of a different format, and the DoFn implementation handles them dependin= g on instantiation parameters.

Thanks,
Ben


Instead of using= readTextFile on the pipeline, try using the read method and use the TextFi= leSource, which can accept in a collection of paths.=C2=A0

https://gi= thub.com/apache/crunch/blob/master/crunch-core/src/main/java/org/apache/cru= nch/io/text/TextFileSource.java




On Fri, Jul 15, 2016 at 8:53 PM -0500, "Ben= Juhn" <benjijuhn@gmail.com> wrote:

Hello,

I have a job configured the following way:
<= div>
for (String path : paths) {
PCollect= ion<String> col =3D pipeline.readTextFile(path);
col.parallelD= o(new MyDoFn(pat= h), Writables.strings()).write(To.= textFile(=E2=80=9Cout/=E2=80=9C + = path), Target.WriteMode.APPEND= );
}
pipeline.done();
It results in one spark job for= each path, and the jobs run in sequence even though there are no dependenc= ies. Is it possible to have the jobs run in parallel?
Thanks,
Ben





--001a113cd768a91bea0537cc7cdc--