crunch-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Josh Wills (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (CRUNCH-510) PCollection.materialize with Spark should use collect()
Date Wed, 08 Apr 2015 19:57:14 GMT

    [ https://issues.apache.org/jira/browse/CRUNCH-510?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14485898#comment-14485898
] 

Josh Wills commented on CRUNCH-510:
-----------------------------------

I wrote a collect-based materialization scheme for Spark-Dataflow:

https://github.com/cloudera/spark-dataflow/blob/master/src/main/java/com/cloudera/dataflow/spark/EvaluationContext.java

...but you're right, I was lazy when I did SparkPipeline and re-used the MRPipeline materialization
logic. The main reason was actually mapside joins, which (right now) rely on a materialized
file existing in HDFS that can then be read into the distributed cache and passed out to the
subsequent job runs. So for that logic to still work, we would need the SparkPipeline.materialize
to be sort of clever and decide how/where to materialize the data based on the client context.

> PCollection.materialize with Spark should use collect()
> -------------------------------------------------------
>
>                 Key: CRUNCH-510
>                 URL: https://issues.apache.org/jira/browse/CRUNCH-510
>             Project: Crunch
>          Issue Type: Improvement
>          Components: Core
>            Reporter: Micah Whitacre
>            Assignee: Josh Wills
>
> When troubleshooting some other code noticed that when using the SparkPipeline and the
code forces a materialize() to be called...
> {code}
>       delta = Aggregate.max(scores.parallelDo(new MapFn<Pair<String, PageRankData>,
Float>() {
>         @Override
>         public Float map(Pair<String, PageRankData> input) {
>           PageRankData prd = input.second();
>           return Math.abs(prd.score - prd.lastScore);
>         }
>       }, ptf.floats())).getValue();
> {code}
> That the underlying code actually results in writing out the value to HDFS:
> {noformat}
> 15/04/08 13:59:33 INFO DAGScheduler: Job 1 finished: saveAsNewAPIHadoopFile at SparkRuntime.java:332,
took 0.223622 s
> {noformat}
> Since Spark has the method collect() on RDDs, that should accomplish a similar bit of
functionality, I wonder if we could switch to use that and cut down on the need to persist
it to HDFS.  I think this is currently happening because of sharing logic between MRPipeline
and SparkPipeline and have no context about how we could possibly break it apart easily.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message