incubator-crunch-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Josh Wills (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (CRUNCH-67) Multiple writes in a pipeline are not performed
Date Tue, 18 Sep 2012 23:56:08 GMT

    [ https://issues.apache.org/jira/browse/CRUNCH-67?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13458306#comment-13458306
] 

Josh Wills commented on CRUNCH-67:
----------------------------------

So this is an MR job that writes a map-side output (step 3) before it goes into a MapReduce
that has a standard reduce-side output. We need to either be able to write a side-output in
the map-side of an MR job, or detect this situation and break it up into a map-only job that
writes the first file followed by a MR job that reads that file and does the MapReduce. The
latter is probably more feasible at this point.
                
> Multiple writes in a pipeline are not performed
> -----------------------------------------------
>
>                 Key: CRUNCH-67
>                 URL: https://issues.apache.org/jira/browse/CRUNCH-67
>             Project: Crunch
>          Issue Type: Bug
>          Components: Core, Scrunch
>    Affects Versions: 0.4.0
>            Reporter: Kiyan Ahmadizadeh
>            Assignee: Josh Wills
>         Attachments: ShakesMultiWrite.scala
>
>
> Consider the following simple PipelineApp (in Scala) that:
> 1. Reads in a text source.
> 2. Cleans the text of non-alphabetic characters.
> 3. Writes the sanitized text to a text file.
> 4. Computes word counts from the text.
> 5. Writes the word counts to a text file.
> When this code is executed, the write from step 5 is performed successfully, but the
write from step 3 is not. 
> object ShakesMultiWrite extends PipelineApp {
>   val shakes = read(From.textFile("shakes.txt"))
>   // Now let's clean-up the text
>   val cleanShakes = shakes.map {line =>
>     val cleanText = line.replaceAll( """[^A-Za-z\W]""", "").toLowerCase()
>     cleanText
>   }
>   cleanShakes.write(To.textFile("shakesText/cleanShakes"))
>   // Count words
>   val wordCounts = cleanShakes.flatMap { line =>
>       line
>         .split( """\W+""") // Split the text into words.
>         .filter(w => !w.isEmpty()) // Get rid of any empty words created.
>   }.count()
>   wordCounts.write(To.textFile("shakesText/wordCounts"))
>   // Runs the pipeline
>   run()
> }

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

Mime
View raw message