crunch-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Lucy Chen <lucychen2014f...@gmail.com>
Subject question about join
Date Wed, 13 May 2015 19:22:11 GMT
Hi all,

         I had a join step in my crunch pipeline, and it looks like the
following:

//get label data

PType<Labels> LabelsType = Avros.records(Labels.class);

PCollection<Labels> training_labels = input.parallelDo(new
LabelDataParser(), LabelsType);

PTable<String, Labels> labels_data = training_labels.

 parallelDo(new KeyOnLabels("sample_ID"), tableOf(strings(), LabelsType));

//get features

PType<Feats> FeatsType = Avros.records(Feats.class);

PCollection<Feats> training_feats = Feature.FeatLoader(pipeline,
sample_features_inputs);

PTable<String, Feats> feats_data = training_feats.parallelDo(new KeyOnFeats(
"sample_ID"), tableOf(strings(), FeatsType));


//join labels and features

JoinStrategy<String, Labels, Feats> strategy = new
DefaultJoinStrategy<String, Labels, Feats>(20);

PTable<String, Pair<Labels, Feats>> joined_training = strategy.

join(labels_data, feats_data, JoinType.INNER_JOIN);


//class Labels

public class Labels implements java.io.Serializable, Cloneable{

 private String class_ID;

private String sample_ID;

private int binary_ind;

 public Labels()

{

this(null, null, 0);

}

         public Labels(String class_ID, String sample_ID, int ind)

{

this.class_ID = class_ID;

this.sample_ID = sample_ID;

this.binary_ind = ind;

}

        ...

}


//class Feats


public class *Feats* implements java.io.Serializable, Cloneable{

 private String sample_id;

private String sample_name;

private Map<String, Float> feat;

 public Feats()

{

this(null, null, null);

}

 public Feats(String id, String name, Map<String, Float> feat)

{

this.sample_id = id;

this.sample_name = name;

this.feat = feat;

 }

       ...


}


   The outputs of labels_data and feats_data are both fine; but the join
step throws the following exception:


Error: java.lang.ClassCastException: java.lang.String cannot be cast to
org.apache.crunch.Pair at
org.apache.crunch.lib.join.DefaultJoinStrategy$1.map(DefaultJoinStrategy.java:87)
at org.apache.crunch.MapFn.process(MapFn.java:34) at
org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:98) at
org.apache.crunch.impl.mr.emit.IntermediateEmitter.emit(IntermediateEmitter.java:56)
at org.apache.crunch.MapFn.process(MapFn.java:34) at
org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:98) at
org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:109) at
org.apache.crunch.impl.mr.run.CrunchMapper.map(CrunchMapper.java:60) at
org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145) at
org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:763) at
org.apache.hadoop.mapred.MapTask.run(MapTask.java:339) at
org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:162) at
java.security.AccessController.doPrivileged(Native Method) at
javax.security.auth.Subject.doAs(Subject.java:415) at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:157)


       This issue already bothered me for a while; Did any one get a
similar issue here? Is there another option that will solve it?


      Btw, I already successfully ran the following joining job:


JoinStrategy<String, Float, Tuple3<String, String, Float>> strategy = new
DefaultJoinStrategy<String, Float, Tuple3<String, String, Float>>(100);

 PTable<String, Pair<Float,Tuple3<String, String, Float>>> joined =
strategy.join(input_A, input_B, JoinType.INNER_JOIN);


   So I guess the issue may be still related to the Avro types that I
defined.


        Thanks for your advice.


Lucy

Mime
View raw message