crunch-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jeff Quinn <j...@nuna.com>
Subject Re: Crunch Planner Hint to Not Combine Tasks
Date Wed, 02 Dec 2015 06:23:25 GMT
Hey Landon,

Taking a look at the original code you posted, the issue is that you are
not making any #write calls before you call #run. As a result the call to
#run doesn't actually produce any MR jobs; the planner thinks there is not
any work to do because there is no sink defined for the pipeline at the
point. I would revise this code to add:

records_with_intervals.write(To.sequenceFile(new
Path("/my/special/temp/path")), WriteMode.CHECKPOINT)

before

pipeline.run()

And so on for the rest of the times you call #run. As you have pointed out
in your follow up, calling PCollection#cache basically accomplishes the
same thing (and maybe even is the preferred way?).

Anyway, as for the data spills, I see now I did not fully appreciate your
problem. Indeed scaleFactor will only ever affect the number of reducers
for the job, so all of this separation of DoFns will never really help you
in that department. To adjust the number of mappers I would look at the
parameters Ron Hashimshony provided earlier, which will force the creation
of more map tasks. Crunch will take care of the reducers for you because
you are setting scaleFactor correctly. I think perhaps you do not even need
to bother with splitting up your DoFns :).


On Tue, Dec 1, 2015 at 2:11 PM, Robinson, Landon - Landon <
landon.t.robinson@lowes.com> wrote:

> On a side note, I was able to use a tip from the User Guide to force the
> Crunch planner not to combine jobs, using pcollection.cache() (you can also
> use .materialize()) between DoFn calls (similar to the earlier suggested
> approach).
>
> However, I’m *still having an issue with my job not leveraging enough
> nodes/mappers for the job that grows exponentially. *Again, I have a DoFn
> where the data grows to a size 46 times larger as it passes through.
> I’ve tried using scaleFactor of 46.0f, but it doesn’t do much more than
> assign more reducer tasks.
>
> The issue happens that all the data spills about 1/3 the way through the
> job to the linux node disk, because the space is filled up. I need the job
> to leverage more mappers on more nodes. Is there a way?
> Thank you immensely!
> ---------------------------------------------------------------------------
> Landon Robinson
> Big Data/Hadoop Engineer
> ---------------------------------------------------------------------------
>
> From: <Robinson>, LCI <landon.t.robinson@lowes.com>
> Reply-To: Apache Crunch Mailing List <user@crunch.apache.org>
> Date: Tuesday, December 1, 2015 at 2:33 PM
> To: Jeff Quinn <jeff@nuna.com>
>
> Cc: Apache Crunch Mailing List <user@crunch.apache.org>
> Subject: Re: Crunch Planner Hint to Not Combine Tasks
>
> More code, as requested! Hope this sheds some light on what I may be doing
> wrong…
> Thanks Jeff and Everett (and others) for all the help!
>
> *CODE*
>
> logger.info("Generating Map-Reduce Pipeline...");
> Pipeline pipeline = new MRPipeline(MyClass.class, "Crunch Pipeline",
> crunchConf);
>
> logger.info("Establishing OrcFile Target for Later Output...");
> OrcFileTarget target = new OrcFileTarget(new Path(outputPath));
> //</editor-fold>
>
> // =======================================================================
> // * INGEST DATA
> // * Ingest a text file, and for every row in it, create a Java object.
> // =======================================================================
>
> //<editor-fold desc="== Ingestion, Object Creation ==">
> logger.info("Reading file (" + inputPath + ") into PCollection...");
> PCollection<String> my_data = pipeline.readTextFile(inputPath);
> logger.info("# of lines ingested from (" + inputPath + "): " +
> my_data.length().getValue());
>
> logger.info("Converting Data to 'RecordStageOne' objects...");
> PCollection<RecordStageOne> records =
> my_data.parallelDo(DoFn_CreateJavaRecords(),
> Avros.records(RecordStageOne.class));
> logger.info("Records created: " + records.length().getValue() + " out of
> " + my_data.length().getValue() + " rows in file.");
>
> //</editor-fold>
>
> // =======================================================================
> // * APPLY INTERVAL CALCULATIONS TO CURRENT RECORD SET
> // * For every record, Calculate and Apply Affected Intervals
> // =======================================================================
>
> //<editor-fold desc="== Apply Intervals ==">
> logger.info("Determining and Applying Working Intervals on data...");
> PCollection<RecordStageTwo> records_with_intervals =
> records.parallelDo(DoFn_ApplyIntervals(time_intervals),
>         Avros.records(RecordStageTwo.class));
>
> logger.info("Records successfully processed in Apply Intervals step: " +
> records_with_intervals.length().getValue() + "/" +
> records.length().getValue());
>
> pipeline.run();
> //</editor-fold>
>
> *//It’s at this point that Crunch Planner combines the rest of the
> parallelDo calls into a single job, and I think overwhelms the single
> worker the job runs on..*
>
> // =======================================================================
> // * CALCULATE CONTRIBUTION RESULTS (THE 3 COUNTS)
> // * Report every record that matches a primary key and count its values.
> // =======================================================================
>
> //<editor-fold desc="== Calculate Count Contributions ==">
> logger.info("Calculating contributions to clock in count, employee count,
> and hours worked...");
> PTable<String, Tuple3<Integer, Integer, Integer>> calculatedSet =
> records_with_intervals.parallelDo(DoFn_CalculateResults(time_intervals),
>         tableOf(strings(), triples(ints(), ints(), ints())));
>
>
> pipeline.run();
>
> //</editor-fold>
>
> // =======================================================================
> // * GROUP AND AGGREGATE
> // * Group records by their key and sum their values (the 3 counts)
> // =======================================================================
>
> //<editor-fold desc="== Group and Aggregate ==">
> logger.info("Grouping Records By Key and Aggregating Their New
> Values...");
> PTable<String, Tuple3<Integer, Integer, Integer>> reducer = calculatedSet
>         .groupByKey()
>         .combineValues(Aggregators.tripAggregator(
>                 Aggregators.SUM_INTS(),
>                 Aggregators.SUM_INTS(),
>                 Aggregators.SUM_INTS()));
>
>
> pipeline.run();
>
> //</editor-fold>
>
> // =======================================================================
> // * PRODUCE FINAL RECORDS IN ORC FORMAT
> // * Convert final records to ORC format.
> // =======================================================================
>
> //<editor-fold desc="== Convert Data to ORC Records ==">
> logger.info("Producing Final ORC-Format Records...");
> PCollection<TupleN> finalDataOrc =
> reducer.parallelDo(DoFn_ProduceFinalRecords(), Orcs.tuples(
>         Writables.ints(),
>         Writables.ints(),
>         Writables.strings(),
>         Writables.strings(),
>         Writables.strings(),
>         Writables.ints(),
>         Writables.ints(),
>         Writables.ints(),
>         Writables.ints(),
>         Writables.strings()
> ));
>
> //</editor-fold>
>
> // =======================================================================
> // * WRITE TO ORC FILE AND EXECUTE PIPELINE
> // * Report results in final .orc file.
> // =======================================================================
>
> //<editor-fold desc="== ORC File Output, Pipeline Closure, and Exit ==">
> logger.info("Writing ORC file(s) to " + outputPath);
> pipeline.write(finalDataOrc, target, Target.WriteMode.OVERWRITE);
>
> PipelineResult result = pipeline.done();
> logger.info("Pipeline ending.");
>
> *END OF CODE*
>
> The issue still: Crunch combines all the jobs halfway through the code,
> even with calls to pipeline.run() and setting max jobs to 1.
> Help/Guidance appreciated!
> ---------------------------------------------------------------------------
> Landon Robinson
> Big Data/Hadoop Engineer
> ---------------------------------------------------------------------------
>
> From: Jeff Quinn <jeff@nuna.com>
> Date: Tuesday, November 24, 2015 at 7:59 PM
> To: LCI <landon.t.robinson@lowes.com>
> Cc: Apache Crunch Mailing List <user@crunch.apache.org>
> Subject: Re: Crunch Planner Hint to Not Combine Tasks
>
> For the MRExecutor, each materialized PCollection is backed by a Path, if
> something deletes that Path you are left with this error.
>
> So it looks like the crunch temporary directory was cleaned up by the time
> you called #write. Can you post more code? Is there a call to Pipeline#done
> somewhere before this (that is the only call that cleans up crunch tmp as
> far as a I know)
>
> On Tue, Nov 24, 2015 at 4:54 PM, Robinson, Landon - Landon <
> landon.t.robinson@lowes.com> wrote:
>
>> Jeff/Everett,
>> Thanks *so* much! Though I am slightly confused... I implemented what I
>> believe you were going for, after a Dofn is processed I used the write
>> command everett mentioned followed by a call to pipeline.run, but received
>> this issue:
>>
>> 2015-11-24 19:52:31,346 ERROR [main]
>> org.apache.crunch.materialize.MaterializableIterable: Could not
>> materialize: SeqFile(/tmp/crunch-245883570/p1)
>> java.io.IOException: No files found to materialize at:
>> /tmp/crunch-245883570/p1
>>
>> Any ideas?
>>
>> ---------------------------------------------------------------------------
>> Landon Robinson
>> Big Data/Hadoop Engineer
>>
>> ---------------------------------------------------------------------------
>>
>> From: Jeff Quinn <jeff@nuna.com>
>> Reply-To: Apache Crunch Mailing List <user@crunch.apache.org>
>> Date: Tuesday, November 24, 2015 at 1:36 PM
>> To: LCI <landon.t.robinson@lowes.com>
>> Cc: Apache Crunch Mailing List <user@crunch.apache.org>
>>
>> Subject: Re: Crunch Planner Hint to Not Combine Tasks
>>
>> Hey Landon,
>>
>> Happy to help!
>>
>> As Everett said you can even skip the "Ingest that file into a new
>> Pcollection" step, as long as you write to a SequenceFile target, this will
>> be implicit.
>>
>> As for #run vs #done, what we do is call #run in between each segment of
>> DoFns and then #done once finally at the end. #run and #done do basically
>> the same thing (#done calls #run), except #done cleans up the crunch
>> temporary directories after calling #run.
>>
>>
>>
>>
>> On Tue, Nov 24, 2015 at 10:00 AM, Robinson, Landon - Landon <
>> landon.t.robinson@lowes.com> wrote:
>>
>>> Apologies, pipeline.execute should be pipeline.done
>>>
>>> ---------------------------------------------------------------------------
>>> Landon Robinson
>>> Big Data/Hadoop Engineer
>>> Lowe’s Companies Inc. | IT Business Intelligence
>>>
>>> ---------------------------------------------------------------------------
>>>
>>> From: <Robinson>, LCI <landon.t.robinson@lowes.com>
>>> Reply-To: Apache Crunch Mailing List <user@crunch.apache.org>
>>> Date: Tuesday, November 24, 2015 at 12:55 PM
>>> To: Apache Crunch Mailing List <user@crunch.apache.org>, Jeff Quinn <
>>> jeff@nuna.com>
>>>
>>> Subject: Re: Crunch Planner Hint to Not Combine Tasks
>>>
>>> Jeff,
>>>
>>> Thanks for that awesome set of tips. Building on your solution, here’s
>>> some information about ours:
>>> Our program:
>>>
>>>    - Starts the MR pipeline
>>>    - Does some DoFn ParallelDos
>>>    - Calls pipeline.execute
>>>
>>> We never leveraged pipeline.run(). Going with your suggestion, would my
>>> course of action be:
>>>
>>>    - Start the MR pipeline
>>>    - Do some of the dofn parallelDos
>>>    - Call pipeline.write
>>>    - Call pipeline.run
>>>    - Ingest that file into a new Pcollection
>>>    - Call another (or more) Dofns
>>>    - Call pipeline.execute
>>>
>>> Does that seem in line with your recommendation, Jeff? Let me know if my
>>> logic needs adjusting…
>>> - Landon
>>>
>>> ---------------------------------------------------------------------------
>>> Landon Robinson
>>> Big Data/Hadoop Engineer
>>>
>>> ---------------------------------------------------------------------------
>>>
>>> From: Jeff Quinn <jeff@nuna.com>
>>> Reply-To: Apache Crunch Mailing List <user@crunch.apache.org>
>>> Date: Tuesday, November 24, 2015 at 12:00 PM
>>> To: Apache Crunch Mailing List <user@crunch.apache.org>
>>> Subject: Re: Crunch Planner Hint to Not Combine Tasks
>>>
>>> Hey Landon,
>>>
>>> Our team has dealt with this exact problem before. Our solution was to
>>> call PCollection#write after each DoFn or string of DoFns then call
>>> Pipeline#run, which will do an iteration of crunch planning / job
>>> submission, then use that same PCollection object for the next round of
>>> DoFns, call #write, and then #call run again, etc.
>>>
>>> #parallelDo -> #write -> #run -> repeat
>>>
>>> The importance of calling #write is that it the crunch planner will not
>>> actually do any work unless it has seen that there is at least one
>>> materialization of the data. This technique allows you to guarantee your
>>> DoFns are segmented into different MR jobs, as DoFns cannot be combined if
>>> they are already completed.
>>>
>>> Hope this helps,
>>>
>>> Jeff
>>>
>>> On Tue, Nov 24, 2015 at 6:14 AM, Ron Hashimshony <
>>> ron.hashimshony@myheritage.com> wrote:
>>>
>>>> Try set mapreduce.input.fileinputformat.split.minsize
>>>> & mapreduce.input.fileinputformat.split.maxsize to a lower number from
the
>>>> default (usually 64 MB).
>>>> If you know of a specific DoFn in which this is required, better put it
>>>> there in its configure function.
>>>>
>>>> On Tue, Nov 24, 2015 at 3:28 PM Robinson, Landon - Landon <
>>>> landon.t.robinson@lowes.com> wrote:
>>>>
>>>>> Hi all,
>>>>>
>>>>> I have a Crunch job that tries to combine the last four tasks of my
>>>>> program into one M/R job.
>>>>> That’s normally not a problem, but my data *starts small and grows
>>>>> exponentially* in the most major of those DoFn tasks, resulting in
>>>>> spills to disk (local, not HDFS).
>>>>>
>>>>> I’ve already:
>>>>>
>>>>>    - Implemented scaleFactor on the DoFn where the data will emit
>>>>>    back more records than it consumed, which is 40.0f
>>>>>    - Set io.sort.mb parameter to cluster setting, which is 1792
>>>>>    - Implemented map-side compression with snappy
>>>>>
>>>>> Data set I’m ingesting is from a previous map-reduce job, which comes
>>>>> out to 19 files of 10mb size (which in Crunch comes to 2 splits).
>>>>> Help?
>>>>>
>>>>> ---------------------------------------------------------------------------
>>>>> Landon Robinson
>>>>> Big Data/Hadoop Engineer
>>>>>
>>>>> ---------------------------------------------------------------------------
>>>>> NOTICE: All information in and attached to the e-mails below may be
>>>>> proprietary, confidential, privileged and otherwise protected from improper
>>>>> or erroneous disclosure. If you are not the sender's intended recipient,
>>>>> you are not authorized to intercept, read, print, retain, copy, forward,
or
>>>>> disseminate this message. If you have erroneously received this
>>>>> communication, please notify the sender immediately by phone
>>>>> (704-758-1000) or by e-mail and destroy all copies of this message
>>>>> electronic, paper, or otherwise.
>>>>>
>>>>> *By transmitting documents via this email: Users, Customers, Suppliers
>>>>> and Vendors collectively acknowledge and agree the transmittal of
>>>>> information via email is voluntary, is offered as a convenience, and
is not
>>>>> a secured method of communication; Not to transmit any payment information
>>>>> E.G. credit card, debit card, checking account, wire transfer information,
>>>>> passwords, or sensitive and personal information E.G. Driver's license,
>>>>> DOB, social security, or any other information the user wishes to remain
>>>>> confidential; To transmit only non-confidential information such as plans,
>>>>> pictures and drawings and to assume all risk and liability for and
>>>>> indemnify Lowe's from any claims, losses or damages that may arise from
the
>>>>> transmittal of documents or including non-confidential information in
the
>>>>> body of an email transmittal. Thank you. *
>>>>>
>>>>
>>>
>>> *DISCLAIMER:* The contents of this email, including any attachments,
>>> may contain information that is confidential, proprietary in nature,
>>> protected health information (PHI), or otherwise protected by law from
>>> disclosure, and is solely for the use of the intended recipient(s). If you
>>> are not the intended recipient, you are hereby notified that any use,
>>> disclosure or copying of this email, including any attachments, is
>>> unauthorized and strictly prohibited. If you have received this email in
>>> error, please notify the sender of this email. Please delete this and all
>>> copies of this email from your system. Any opinions either expressed or
>>> implied in this email and all attachments, are those of its author only,
>>> and do not necessarily reflect those of Nuna Health, Inc.
>>> NOTICE: All information in and attached to the e-mails below may be
>>> proprietary, confidential, privileged and otherwise protected from improper
>>> or erroneous disclosure. If you are not the sender's intended recipient,
>>> you are not authorized to intercept, read, print, retain, copy, forward, or
>>> disseminate this message. If you have erroneously received this
>>> communication, please notify the sender immediately by phone
>>> (704-758-1000) or by e-mail and destroy all copies of this message
>>> electronic, paper, or otherwise.
>>>
>>> *By transmitting documents via this email: Users, Customers, Suppliers
>>> and Vendors collectively acknowledge and agree the transmittal of
>>> information via email is voluntary, is offered as a convenience, and is not
>>> a secured method of communication; Not to transmit any payment information
>>> E.G. credit card, debit card, checking account, wire transfer information,
>>> passwords, or sensitive and personal information E.G. Driver's license,
>>> DOB, social security, or any other information the user wishes to remain
>>> confidential; To transmit only non-confidential information such as plans,
>>> pictures and drawings and to assume all risk and liability for and
>>> indemnify Lowe's from any claims, losses or damages that may arise from the
>>> transmittal of documents or including non-confidential information in the
>>> body of an email transmittal. Thank you. *
>>> NOTICE: All information in and attached to the e-mails below may be
>>> proprietary, confidential, privileged and otherwise protected from improper
>>> or erroneous disclosure. If you are not the sender's intended recipient,
>>> you are not authorized to intercept, read, print, retain, copy, forward, or
>>> disseminate this message. If you have erroneously received this
>>> communication, please notify the sender immediately by phone
>>> (704-758-1000) or by e-mail and destroy all copies of this message
>>> electronic, paper, or otherwise.
>>>
>>> *By transmitting documents via this email: Users, Customers, Suppliers
>>> and Vendors collectively acknowledge and agree the transmittal of
>>> information via email is voluntary, is offered as a convenience, and is not
>>> a secured method of communication; Not to transmit any payment information
>>> E.G. credit card, debit card, checking account, wire transfer information,
>>> passwords, or sensitive and personal information E.G. Driver's license,
>>> DOB, social security, or any other information the user wishes to remain
>>> confidential; To transmit only non-confidential information such as plans,
>>> pictures and drawings and to assume all risk and liability for and
>>> indemnify Lowe's from any claims, losses or damages that may arise from the
>>> transmittal of documents or including non-confidential information in the
>>> body of an email transmittal. Thank you. *
>>>
>>
>>
>> *DISCLAIMER:* The contents of this email, including any attachments, may
>> contain information that is confidential, proprietary in nature, protected
>> health information (PHI), or otherwise protected by law from disclosure,
>> and is solely for the use of the intended recipient(s). If you are not the
>> intended recipient, you are hereby notified that any use, disclosure or
>> copying of this email, including any attachments, is unauthorized and
>> strictly prohibited. If you have received this email in error, please
>> notify the sender of this email. Please delete this and all copies of this
>> email from your system. Any opinions either expressed or implied in this
>> email and all attachments, are those of its author only, and do not
>> necessarily reflect those of Nuna Health, Inc.
>> NOTICE: All information in and attached to the e-mails below may be
>> proprietary, confidential, privileged and otherwise protected from improper
>> or erroneous disclosure. If you are not the sender's intended recipient,
>> you are not authorized to intercept, read, print, retain, copy, forward, or
>> disseminate this message. If you have erroneously received this
>> communication, please notify the sender immediately by phone
>> (704-758-1000) or by e-mail and destroy all copies of this message
>> electronic, paper, or otherwise.
>>
>> *By transmitting documents via this email: Users, Customers, Suppliers
>> and Vendors collectively acknowledge and agree the transmittal of
>> information via email is voluntary, is offered as a convenience, and is not
>> a secured method of communication; Not to transmit any payment information
>> E.G. credit card, debit card, checking account, wire transfer information,
>> passwords, or sensitive and personal information E.G. Driver's license,
>> DOB, social security, or any other information the user wishes to remain
>> confidential; To transmit only non-confidential information such as plans,
>> pictures and drawings and to assume all risk and liability for and
>> indemnify Lowe's from any claims, losses or damages that may arise from the
>> transmittal of documents or including non-confidential information in the
>> body of an email transmittal. Thank you. *
>>
>
>
> *DISCLAIMER:* The contents of this email, including any attachments, may
> contain information that is confidential, proprietary in nature, protected
> health information (PHI), or otherwise protected by law from disclosure,
> and is solely for the use of the intended recipient(s). If you are not the
> intended recipient, you are hereby notified that any use, disclosure or
> copying of this email, including any attachments, is unauthorized and
> strictly prohibited. If you have received this email in error, please
> notify the sender of this email. Please delete this and all copies of this
> email from your system. Any opinions either expressed or implied in this
> email and all attachments, are those of its author only, and do not
> necessarily reflect those of Nuna Health, Inc.
> NOTICE: All information in and attached to the e-mails below may be
> proprietary, confidential, privileged and otherwise protected from improper
> or erroneous disclosure. If you are not the sender's intended recipient,
> you are not authorized to intercept, read, print, retain, copy, forward, or
> disseminate this message. If you have erroneously received this
> communication, please notify the sender immediately by phone (704-758-1000)
> or by e-mail and destroy all copies of this message electronic, paper, or
> otherwise.
>
> *By transmitting documents via this email: Users, Customers, Suppliers and
> Vendors collectively acknowledge and agree the transmittal of information
> via email is voluntary, is offered as a convenience, and is not a secured
> method of communication; Not to transmit any payment information E.G.
> credit card, debit card, checking account, wire transfer information,
> passwords, or sensitive and personal information E.G. Driver's license,
> DOB, social security, or any other information the user wishes to remain
> confidential; To transmit only non-confidential information such as plans,
> pictures and drawings and to assume all risk and liability for and
> indemnify Lowe's from any claims, losses or damages that may arise from the
> transmittal of documents or including non-confidential information in the
> body of an email transmittal. Thank you. *
> NOTICE: All information in and attached to the e-mails below may be
> proprietary, confidential, privileged and otherwise protected from improper
> or erroneous disclosure. If you are not the sender's intended recipient,
> you are not authorized to intercept, read, print, retain, copy, forward, or
> disseminate this message. If you have erroneously received this
> communication, please notify the sender immediately by phone (704-758-1000)
> or by e-mail and destroy all copies of this message electronic, paper, or
> otherwise.
>
> *By transmitting documents via this email: Users, Customers, Suppliers and
> Vendors collectively acknowledge and agree the transmittal of information
> via email is voluntary, is offered as a convenience, and is not a secured
> method of communication; Not to transmit any payment information E.G.
> credit card, debit card, checking account, wire transfer information,
> passwords, or sensitive and personal information E.G. Driver's license,
> DOB, social security, or any other information the user wishes to remain
> confidential; To transmit only non-confidential information such as plans,
> pictures and drawings and to assume all risk and liability for and
> indemnify Lowe's from any claims, losses or damages that may arise from the
> transmittal of documents or including non-confidential information in the
> body of an email transmittal. Thank you. *
>

-- 
*DISCLAIMER:* The contents of this email, including any attachments, may 
contain information that is confidential, proprietary in nature, protected 
health information (PHI), or otherwise protected by law from disclosure, 
and is solely for the use of the intended recipient(s). If you are not the 
intended recipient, you are hereby notified that any use, disclosure or 
copying of this email, including any attachments, is unauthorized and 
strictly prohibited. If you have received this email in error, please 
notify the sender of this email. Please delete this and all copies of this 
email from your system. Any opinions either expressed or implied in this 
email and all attachments, are those of its author only, and do not 
necessarily reflect those of Nuna Health, Inc.

Mime
View raw message