crunch-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Josh Wills <josh.wi...@gmail.com>
Subject Re: question about Crunch pipeline
Date Wed, 25 Mar 2015 06:20:45 GMT
Hey Lucy,

Without the write(To.textFile("/labeledData"), WriteMode.OVERWRITE) line,
the Crunch pipeline isn't actually doing anything; the succeeded() function
returns true in this case b/c Crunch "succeeded" at not running anything--
I know, it's kind of weird. A couple of guesses as to what's wrong:

1) The FeatObject class needs a no-arg constructor for Avro reflection to
work properly, or
2) The pipeline is failing when it tries to write add the labeledData
directory under the root directory b/c of a filesystem permissions problem.
Try writing out to /tmp/labeledData and see if that does the trick.

On my machine, log files from MR jobs run in local mode get written out
under /tmp/hadoop-$USER/mapred, so that's a good place to look. If you add
a keep.failed.task.files=true setting to the Configuration you're using for
your job, the log files for the failed tasks should stick around after the
job finishes so you can examine them to debug the problem.

Best,
Josh


On Tue, Mar 24, 2015 at 1:56 PM, Lucy Chen <lucychen2014fall@gmail.com>
wrote:

> Hi, I just recently started to use Crunch, and got issues really confused
> me. With the following codes running, result.succeeded() is 1;
>
>  however, if I added one line  "labelData.write(At.textFile("/labelData"),
> WriteMode.OVERWRITE);" before running the pipeline, result.succeeded() is
> 0.
>
> I wonder why this happened and what's the potential issue caused it
> failed. Did I have something wrong?
>
>
> Thanks for your help.
>
>
> Lu
>
>
> public class LabelDataCollector implements Serializable{
>
>   //covert the raw text inputs to the FeatObject object
>
> public PCollection<FeatObject> getFeatSamples(Pipeline pipeline, String
> labelDataPath)
>
> {
>
> PCollection<String> rawInputs = pipeline.readTextFile(labelDataPath);
>
>  PCollection<String> validLines = rawInputs.filter(new
> FeatValueNullFilter());
>
>
>  PType<FeatObject> featObjectType = Avros.reflects(FeatObject.class);
>
> PCollection<FeatObject> featSamples = validLines.parallelDo(new
> GenerateFeatSample(), featObjectType);
>
> return featSamples;
>
>  }
>
>
> public static void main(String args[]) throws IOException,
> InterruptedException {
>
> Configuration conf = new Configuration();
>
> conf.set("fs.default.name", "file:///");
>
> conf.set("mapred.job.tracker", "local");
>
> Pipeline pipeline = new MRPipeline(LabelDataCollector.class,
> "LabelDataCollector", conf);
>
> LabelDataCollector ldc = new LabelDataCollector();
>
> PCollection<FeatObject> labelData = ldc.getFeatSamples(pipeline,
> "/feat_inputs");
>
>
> // Execute the pipeline as a MapReduce.
>
>                 PipelineResult result = pipeline.done();
>
>                 System.out.println(result.succeeded() ? 0 : 1);
>
> }
>
>
> }
>
>
> public class GenerateFeatSample extends DoFn<String, FeatObject>{
>
>
> private final static Logger logger = Logger
>
>       .getLogger(FeatObject.class.getName());
>
> private static final long serialVersionUID = 1L;
>
>
> public void process(String input, Emitter<FeatObject> emitter){
>
>  if (input == null || input.isEmpty()) {
>
> logger.error("Input is null or empty");
>
>       return;
>
>     }
>
>         logger.info("convert the input string to a feature object!");
>
> emitter.emit(new FeatObject(input));
>
>  }
>
>
> }
>
>
> public class FeatObject implements java.io.Serializable, Cloneable{
>
>  private static final long serialVersionUID = 1L;
>
> private String labelID;
>
> private String sampleID;
>
> private int pos_neg_ind;
>
> private Map<String, Double> feat_val_pair;
>
> private int num_of_feat;
>
>   public FeatObject(String input)
>
>        {
>
>          ...
>
>        }
>
> }
>
>

Mime
View raw message