crunch-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Lucy Chen <lucychen2014f...@gmail.com>
Subject question about Crunch pipeline
Date Tue, 24 Mar 2015 23:56:27 GMT
Hi, I just recently started to use Crunch, and got issues really confused
me. With the following codes running, result.succeeded() is 1;

 however, if I added one line  "labelData.write(At.textFile("/labelData"),
WriteMode.OVERWRITE);" before running the pipeline, result.succeeded() is
0.

I wonder why this happened and what's the potential issue caused it failed.
Did I have something wrong?


Thanks for your help.


Lu


public class LabelDataCollector implements Serializable{

  //covert the raw text inputs to the FeatObject object

public PCollection<FeatObject> getFeatSamples(Pipeline pipeline, String
labelDataPath)

{

PCollection<String> rawInputs = pipeline.readTextFile(labelDataPath);

 PCollection<String> validLines = rawInputs.filter(new
FeatValueNullFilter());


 PType<FeatObject> featObjectType = Avros.reflects(FeatObject.class);

PCollection<FeatObject> featSamples = validLines.parallelDo(new
GenerateFeatSample(), featObjectType);

return featSamples;

 }


public static void main(String args[]) throws IOException,
InterruptedException {

Configuration conf = new Configuration();

conf.set("fs.default.name", "file:///");

conf.set("mapred.job.tracker", "local");

Pipeline pipeline = new MRPipeline(LabelDataCollector.class,
"LabelDataCollector", conf);

LabelDataCollector ldc = new LabelDataCollector();

PCollection<FeatObject> labelData = ldc.getFeatSamples(pipeline,
"/feat_inputs");


// Execute the pipeline as a MapReduce.

                PipelineResult result = pipeline.done();

                System.out.println(result.succeeded() ? 0 : 1);

}


}


public class GenerateFeatSample extends DoFn<String, FeatObject>{


private final static Logger logger = Logger

      .getLogger(FeatObject.class.getName());

private static final long serialVersionUID = 1L;


public void process(String input, Emitter<FeatObject> emitter){

 if (input == null || input.isEmpty()) {

logger.error("Input is null or empty");

      return;

    }

        logger.info("convert the input string to a feature object!");

emitter.emit(new FeatObject(input));

 }


}


public class FeatObject implements java.io.Serializable, Cloneable{

 private static final long serialVersionUID = 1L;

private String labelID;

private String sampleID;

private int pos_neg_ind;

private Map<String, Double> feat_val_pair;

private int num_of_feat;

  public FeatObject(String input)

       {

         ...

       }

}

Mime
View raw message