crunch-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Lucy Chen <>
Subject question about Crunch pipeline
Date Tue, 24 Mar 2015 23:56:27 GMT
Hi, I just recently started to use Crunch, and got issues really confused
me. With the following codes running, result.succeeded() is 1;

 however, if I added one line  "labelData.write(At.textFile("/labelData"),
WriteMode.OVERWRITE);" before running the pipeline, result.succeeded() is

I wonder why this happened and what's the potential issue caused it failed.
Did I have something wrong?

Thanks for your help.


public class LabelDataCollector implements Serializable{

  //covert the raw text inputs to the FeatObject object

public PCollection<FeatObject> getFeatSamples(Pipeline pipeline, String


PCollection<String> rawInputs = pipeline.readTextFile(labelDataPath);

 PCollection<String> validLines = rawInputs.filter(new

 PType<FeatObject> featObjectType = Avros.reflects(FeatObject.class);

PCollection<FeatObject> featSamples = validLines.parallelDo(new
GenerateFeatSample(), featObjectType);

return featSamples;


public static void main(String args[]) throws IOException,
InterruptedException {

Configuration conf = new Configuration();

conf.set("", "file:///");

conf.set("mapred.job.tracker", "local");

Pipeline pipeline = new MRPipeline(LabelDataCollector.class,
"LabelDataCollector", conf);

LabelDataCollector ldc = new LabelDataCollector();

PCollection<FeatObject> labelData = ldc.getFeatSamples(pipeline,

// Execute the pipeline as a MapReduce.

                PipelineResult result = pipeline.done();

                System.out.println(result.succeeded() ? 0 : 1);



public class GenerateFeatSample extends DoFn<String, FeatObject>{

private final static Logger logger = Logger


private static final long serialVersionUID = 1L;

public void process(String input, Emitter<FeatObject> emitter){

 if (input == null || input.isEmpty()) {

logger.error("Input is null or empty");


    }"convert the input string to a feature object!");

emitter.emit(new FeatObject(input));



public class FeatObject implements, Cloneable{

 private static final long serialVersionUID = 1L;

private String labelID;

private String sampleID;

private int pos_neg_ind;

private Map<String, Double> feat_val_pair;

private int num_of_feat;

  public FeatObject(String input)





View raw message