crunch-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Josh Wills <josh.wi...@gmail.com>
Subject Re: question about join
Date Wed, 13 May 2015 19:34:54 GMT
What do the KeyOnLabels and KeyOnFeats functions return? From the error, it
looks like the labels_data PTable is actually a PCollection<String>, which
would happen if KeyOnLabels was somehow returning a String in some
situations despite claiming to return a Pair<String, Labels>.

On Wed, May 13, 2015 at 12:31 PM, Lucy Chen <lucychen2014fall@gmail.com>
wrote:

> Meanwhile, I just realized that one of my joining jobs look also OK, which
> also had the Avro type in it:
>
> JoinStrategy<String, Feats, Feats> strategy
>
> = new DefaultJoinStrategy<String, Feats, Feats>(50);
>
> PTable<String, Pair<Feats,Feats>> joined = strategy.join(input_A, input_B,
> JoinType.INNER_JOIN);
>
>
> So the Avro type probably is not the issue.
>
>
> Any advice?
>
>
> Lucy
>
> On Wed, May 13, 2015 at 12:22 PM, Lucy Chen <lucychen2014fall@gmail.com>
> wrote:
>
>> Hi all,
>>
>>          I had a join step in my crunch pipeline, and it looks like the
>> following:
>>
>> //get label data
>>
>> PType<Labels> LabelsType = Avros.records(Labels.class);
>>
>> PCollection<Labels> training_labels = input.parallelDo(new
>> LabelDataParser(), LabelsType);
>>
>> PTable<String, Labels> labels_data = training_labels.
>>
>>  parallelDo(new KeyOnLabels("sample_ID"), tableOf(strings(),
>> LabelsType));
>>
>> //get features
>>
>> PType<Feats> FeatsType = Avros.records(Feats.class);
>>
>> PCollection<Feats> training_feats = Feature.FeatLoader(pipeline,
>> sample_features_inputs);
>>
>> PTable<String, Feats> feats_data = training_feats.parallelDo(new
>> KeyOnFeats("sample_ID"), tableOf(strings(), FeatsType));
>>
>>
>> //join labels and features
>>
>> JoinStrategy<String, Labels, Feats> strategy = new
>> DefaultJoinStrategy<String, Labels, Feats>(20);
>>
>> PTable<String, Pair<Labels, Feats>> joined_training = strategy.
>>
>> join(labels_data, feats_data, JoinType.INNER_JOIN);
>>
>>
>> //class Labels
>>
>> public class Labels implements java.io.Serializable, Cloneable{
>>
>>  private String class_ID;
>>
>> private String sample_ID;
>>
>> private int binary_ind;
>>
>>  public Labels()
>>
>> {
>>
>> this(null, null, 0);
>>
>> }
>>
>>          public Labels(String class_ID, String sample_ID, int ind)
>>
>> {
>>
>> this.class_ID = class_ID;
>>
>> this.sample_ID = sample_ID;
>>
>> this.binary_ind = ind;
>>
>> }
>>
>>         ...
>>
>> }
>>
>>
>> //class Feats
>>
>>
>> public class *Feats* implements java.io.Serializable, Cloneable{
>>
>>  private String sample_id;
>>
>> private String sample_name;
>>
>> private Map<String, Float> feat;
>>
>>  public Feats()
>>
>> {
>>
>> this(null, null, null);
>>
>> }
>>
>>  public Feats(String id, String name, Map<String, Float> feat)
>>
>> {
>>
>> this.sample_id = id;
>>
>> this.sample_name = name;
>>
>> this.feat = feat;
>>
>>  }
>>
>>        ...
>>
>>
>> }
>>
>>
>>    The outputs of labels_data and feats_data are both fine; but the join
>> step throws the following exception:
>>
>>
>> Error: java.lang.ClassCastException: java.lang.String cannot be cast to
>> org.apache.crunch.Pair at
>> org.apache.crunch.lib.join.DefaultJoinStrategy$1.map(DefaultJoinStrategy.java:87)
>> at org.apache.crunch.MapFn.process(MapFn.java:34) at
>> org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:98) at
>> org.apache.crunch.impl.mr.emit.IntermediateEmitter.emit(IntermediateEmitter.java:56)
>> at org.apache.crunch.MapFn.process(MapFn.java:34) at
>> org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:98) at
>> org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:109) at
>> org.apache.crunch.impl.mr.run.CrunchMapper.map(CrunchMapper.java:60) at
>> org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145) at
>> org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:763) at
>> org.apache.hadoop.mapred.MapTask.run(MapTask.java:339) at
>> org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:162) at
>> java.security.AccessController.doPrivileged(Native Method) at
>> javax.security.auth.Subject.doAs(Subject.java:415) at
>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491)
>> at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:157)
>>
>>
>>        This issue already bothered me for a while; Did any one get a
>> similar issue here? Is there another option that will solve it?
>>
>>
>>       Btw, I already successfully ran the following joining job:
>>
>>
>> JoinStrategy<String, Float, Tuple3<String, String, Float>> strategy =
new
>> DefaultJoinStrategy<String, Float, Tuple3<String, String, Float>>(100);
>>
>>  PTable<String, Pair<Float,Tuple3<String, String, Float>>> joined
=
>> strategy.join(input_A, input_B, JoinType.INNER_JOIN);
>>
>>
>>    So I guess the issue may be still related to the Avro types that I
>> defined.
>>
>>
>>         Thanks for your advice.
>>
>>
>> Lucy
>>
>>
>>
>>
>>
>

Mime
View raw message