crunch-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Lucy Chen <lucychen2014f...@gmail.com>
Subject Re: question about join
Date Sat, 16 May 2015 00:13:25 GMT
Hi Josh,

         Thanks for your quick response. You are right. It should be
training_labels.write(At.avroFile(output_path+"/training_labels_avro",
LabelsType), WriteMode.OVERWRITE) instead. Sorry about the mistake. After
another round of diagnosis with toy data, the codes suddenly turned out to
work after I tried to store the avro file. Although I don't know how it
turned out to work from not working, now it works without any changes.
Probably some weird thing happened.

        Thanks again for your time and patience.

Best,
Lucy

On Fri, May 15, 2015 at 12:05 PM, Lucy Chen <lucychen2014fall@gmail.com>
wrote:

> Hi Josh,
>
>          Now looks like the issue probably happened with the Class Labels.
> I tried to store the training_labels as an avro output (txt outputs are
> OK),
>
> training_labels.write(At.avroFile(output_path+"/training_labels_avro"),
> LabelsType, WriteMode.OVERWRITE);
>
>
>             but get the following  err:
>
> The method write(Target, Target.WriteMode) in the type PCollection<Labels>
> is not applicable for the arguments (SourceTarget<GenericData.Record>,
> PType<Labels>, Target.WriteMode)
>
>
>               Will get the similar issue when storing the labels_data
> which is the first input of join. It seemed that Crunch did not recognize
> the type of training_labels. But why here it treated it a
> GenericData.Record?
>
>
>               Any suggestions?
>
>
>               Thanks!
>
>
> Lucy
>
> On Wed, May 13, 2015 at 12:22 PM, Lucy Chen <lucychen2014fall@gmail.com>
> wrote:
>
>> Hi all,
>>
>>          I had a join step in my crunch pipeline, and it looks like the
>> following:
>>
>> //get label data
>>
>> PType<Labels> LabelsType = Avros.records(Labels.class);
>>
>> PCollection<Labels> training_labels = input.parallelDo(new
>> LabelDataParser(), LabelsType);
>>
>> PTable<String, Labels> labels_data = training_labels.
>>
>>  parallelDo(new KeyOnLabels("sample_ID"), tableOf(strings(),
>> LabelsType));
>>
>> //get features
>>
>> PType<Feats> FeatsType = Avros.records(Feats.class);
>>
>> PCollection<Feats> training_feats = Feature.FeatLoader(pipeline,
>> sample_features_inputs);
>>
>> PTable<String, Feats> feats_data = training_feats.parallelDo(new
>> KeyOnFeats("sample_ID"), tableOf(strings(), FeatsType));
>>
>>
>> //join labels and features
>>
>> JoinStrategy<String, Labels, Feats> strategy = new
>> DefaultJoinStrategy<String, Labels, Feats>(20);
>>
>> PTable<String, Pair<Labels, Feats>> joined_training = strategy.
>>
>> join(labels_data, feats_data, JoinType.INNER_JOIN);
>>
>>
>> //class Labels
>>
>> public class Labels implements java.io.Serializable, Cloneable{
>>
>>  private String class_ID;
>>
>> private String sample_ID;
>>
>> private int binary_ind;
>>
>>  public Labels()
>>
>> {
>>
>> this(null, null, 0);
>>
>> }
>>
>>          public Labels(String class_ID, String sample_ID, int ind)
>>
>> {
>>
>> this.class_ID = class_ID;
>>
>> this.sample_ID = sample_ID;
>>
>> this.binary_ind = ind;
>>
>> }
>>
>>         ...
>>
>> }
>>
>>
>> //class Feats
>>
>>
>> public class *Feats* implements java.io.Serializable, Cloneable{
>>
>>  private String sample_id;
>>
>> private String sample_name;
>>
>> private Map<String, Float> feat;
>>
>>  public Feats()
>>
>> {
>>
>> this(null, null, null);
>>
>> }
>>
>>  public Feats(String id, String name, Map<String, Float> feat)
>>
>> {
>>
>> this.sample_id = id;
>>
>> this.sample_name = name;
>>
>> this.feat = feat;
>>
>>  }
>>
>>        ...
>>
>>
>> }
>>
>>
>>    The outputs of labels_data and feats_data are both fine; but the join
>> step throws the following exception:
>>
>>
>> Error: java.lang.ClassCastException: java.lang.String cannot be cast to
>> org.apache.crunch.Pair at
>> org.apache.crunch.lib.join.DefaultJoinStrategy$1.map(DefaultJoinStrategy.java:87)
>> at org.apache.crunch.MapFn.process(MapFn.java:34) at
>> org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:98) at
>> org.apache.crunch.impl.mr.emit.IntermediateEmitter.emit(IntermediateEmitter.java:56)
>> at org.apache.crunch.MapFn.process(MapFn.java:34) at
>> org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:98) at
>> org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:109) at
>> org.apache.crunch.impl.mr.run.CrunchMapper.map(CrunchMapper.java:60) at
>> org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145) at
>> org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:763) at
>> org.apache.hadoop.mapred.MapTask.run(MapTask.java:339) at
>> org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:162) at
>> java.security.AccessController.doPrivileged(Native Method) at
>> javax.security.auth.Subject.doAs(Subject.java:415) at
>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491)
>> at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:157)
>>
>>
>>        This issue already bothered me for a while; Did any one get a
>> similar issue here? Is there another option that will solve it?
>>
>>
>>       Btw, I already successfully ran the following joining job:
>>
>>
>> JoinStrategy<String, Float, Tuple3<String, String, Float>> strategy =
new
>> DefaultJoinStrategy<String, Float, Tuple3<String, String, Float>>(100);
>>
>>  PTable<String, Pair<Float,Tuple3<String, String, Float>>> joined
=
>> strategy.join(input_A, input_B, JoinType.INNER_JOIN);
>>
>>
>>    So I guess the issue may be still related to the Avro types that I
>> defined.
>>
>>
>>         Thanks for your advice.
>>
>>
>> Lucy
>>
>>
>>
>>
>>
>

Mime
View raw message