crunch-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Lucy Chen <>
Subject question about join
Date Wed, 13 May 2015 19:22:11 GMT
Hi all,

         I had a join step in my crunch pipeline, and it looks like the

//get label data

PType<Labels> LabelsType = Avros.records(Labels.class);

PCollection<Labels> training_labels = input.parallelDo(new
LabelDataParser(), LabelsType);

PTable<String, Labels> labels_data = training_labels.

 parallelDo(new KeyOnLabels("sample_ID"), tableOf(strings(), LabelsType));

//get features

PType<Feats> FeatsType = Avros.records(Feats.class);

PCollection<Feats> training_feats = Feature.FeatLoader(pipeline,

PTable<String, Feats> feats_data = training_feats.parallelDo(new KeyOnFeats(
"sample_ID"), tableOf(strings(), FeatsType));

//join labels and features

JoinStrategy<String, Labels, Feats> strategy = new
DefaultJoinStrategy<String, Labels, Feats>(20);

PTable<String, Pair<Labels, Feats>> joined_training = strategy.

join(labels_data, feats_data, JoinType.INNER_JOIN);

//class Labels

public class Labels implements, Cloneable{

 private String class_ID;

private String sample_ID;

private int binary_ind;

 public Labels()


this(null, null, 0);


         public Labels(String class_ID, String sample_ID, int ind)


this.class_ID = class_ID;

this.sample_ID = sample_ID;

this.binary_ind = ind;




//class Feats

public class *Feats* implements, Cloneable{

 private String sample_id;

private String sample_name;

private Map<String, Float> feat;

 public Feats()


this(null, null, null);


 public Feats(String id, String name, Map<String, Float> feat)


this.sample_id = id;

this.sample_name = name;

this.feat = feat;




   The outputs of labels_data and feats_data are both fine; but the join
step throws the following exception:

Error: java.lang.ClassCastException: java.lang.String cannot be cast to
org.apache.crunch.Pair at
at org.apache.crunch.MapFn.process( at at
at org.apache.crunch.MapFn.process( at at at at at
org.apache.hadoop.mapred.MapTask.runNewMapper( at at
org.apache.hadoop.mapred.YarnChild$ at Method) at at
at org.apache.hadoop.mapred.YarnChild.main(

       This issue already bothered me for a while; Did any one get a
similar issue here? Is there another option that will solve it?

      Btw, I already successfully ran the following joining job:

JoinStrategy<String, Float, Tuple3<String, String, Float>> strategy = new
DefaultJoinStrategy<String, Float, Tuple3<String, String, Float>>(100);

 PTable<String, Pair<Float,Tuple3<String, String, Float>>> joined =
strategy.join(input_A, input_B, JoinType.INNER_JOIN);

   So I guess the issue may be still related to the Avro types that I

        Thanks for your advice.


View raw message