crunch-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Lucy Chen <lucychen2014f...@gmail.com>
Subject Re: question about join
Date Fri, 15 May 2015 19:05:30 GMT
Hi Josh,

         Now looks like the issue probably happened with the Class Labels.
I tried to store the training_labels as an avro output (txt outputs are
OK),

training_labels.write(At.avroFile(output_path+"/training_labels_avro"),
LabelsType, WriteMode.OVERWRITE);


            but get the following  err:

The method write(Target, Target.WriteMode) in the type PCollection<Labels>
is not applicable for the arguments (SourceTarget<GenericData.Record>,
PType<Labels>, Target.WriteMode)


              Will get the similar issue when storing the labels_data which
is the first input of join. It seemed that Crunch did not recognize the
type of training_labels. But why here it treated it a GenericData.Record?


              Any suggestions?


              Thanks!


Lucy

On Wed, May 13, 2015 at 12:22 PM, Lucy Chen <lucychen2014fall@gmail.com>
wrote:

> Hi all,
>
>          I had a join step in my crunch pipeline, and it looks like the
> following:
>
> //get label data
>
> PType<Labels> LabelsType = Avros.records(Labels.class);
>
> PCollection<Labels> training_labels = input.parallelDo(new
> LabelDataParser(), LabelsType);
>
> PTable<String, Labels> labels_data = training_labels.
>
>  parallelDo(new KeyOnLabels("sample_ID"), tableOf(strings(), LabelsType));
>
> //get features
>
> PType<Feats> FeatsType = Avros.records(Feats.class);
>
> PCollection<Feats> training_feats = Feature.FeatLoader(pipeline,
> sample_features_inputs);
>
> PTable<String, Feats> feats_data = training_feats.parallelDo(new
> KeyOnFeats("sample_ID"), tableOf(strings(), FeatsType));
>
>
> //join labels and features
>
> JoinStrategy<String, Labels, Feats> strategy = new
> DefaultJoinStrategy<String, Labels, Feats>(20);
>
> PTable<String, Pair<Labels, Feats>> joined_training = strategy.
>
> join(labels_data, feats_data, JoinType.INNER_JOIN);
>
>
> //class Labels
>
> public class Labels implements java.io.Serializable, Cloneable{
>
>  private String class_ID;
>
> private String sample_ID;
>
> private int binary_ind;
>
>  public Labels()
>
> {
>
> this(null, null, 0);
>
> }
>
>          public Labels(String class_ID, String sample_ID, int ind)
>
> {
>
> this.class_ID = class_ID;
>
> this.sample_ID = sample_ID;
>
> this.binary_ind = ind;
>
> }
>
>         ...
>
> }
>
>
> //class Feats
>
>
> public class *Feats* implements java.io.Serializable, Cloneable{
>
>  private String sample_id;
>
> private String sample_name;
>
> private Map<String, Float> feat;
>
>  public Feats()
>
> {
>
> this(null, null, null);
>
> }
>
>  public Feats(String id, String name, Map<String, Float> feat)
>
> {
>
> this.sample_id = id;
>
> this.sample_name = name;
>
> this.feat = feat;
>
>  }
>
>        ...
>
>
> }
>
>
>    The outputs of labels_data and feats_data are both fine; but the join
> step throws the following exception:
>
>
> Error: java.lang.ClassCastException: java.lang.String cannot be cast to
> org.apache.crunch.Pair at
> org.apache.crunch.lib.join.DefaultJoinStrategy$1.map(DefaultJoinStrategy.java:87)
> at org.apache.crunch.MapFn.process(MapFn.java:34) at
> org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:98) at
> org.apache.crunch.impl.mr.emit.IntermediateEmitter.emit(IntermediateEmitter.java:56)
> at org.apache.crunch.MapFn.process(MapFn.java:34) at
> org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:98) at
> org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:109) at
> org.apache.crunch.impl.mr.run.CrunchMapper.map(CrunchMapper.java:60) at
> org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145) at
> org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:763) at
> org.apache.hadoop.mapred.MapTask.run(MapTask.java:339) at
> org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:162) at
> java.security.AccessController.doPrivileged(Native Method) at
> javax.security.auth.Subject.doAs(Subject.java:415) at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491)
> at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:157)
>
>
>        This issue already bothered me for a while; Did any one get a
> similar issue here? Is there another option that will solve it?
>
>
>       Btw, I already successfully ran the following joining job:
>
>
> JoinStrategy<String, Float, Tuple3<String, String, Float>> strategy = new
> DefaultJoinStrategy<String, Float, Tuple3<String, String, Float>>(100);
>
>  PTable<String, Pair<Float,Tuple3<String, String, Float>>> joined =
> strategy.join(input_A, input_B, JoinType.INNER_JOIN);
>
>
>    So I guess the issue may be still related to the Avro types that I
> defined.
>
>
>         Thanks for your advice.
>
>
> Lucy
>
>
>
>
>

Mime
View raw message