crunch-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Lucy Chen <lucychen2014f...@gmail.com>
Subject Re: question about join
Date Wed, 13 May 2015 22:10:27 GMT
Hi,

      I tried to dump the members from Label class to a TupleN, and dump
only sample_id and sample_name from Feats class to a Pair, and then do the
joining,

JoinStrategy<String, TupleN, Pair<String, String>> strategy

= new DefaultJoinStrategy<String, TupleN, Pair<String, String>>(20);

PTable<String, Pair<TupleN, Pair<String, String>>> joined_training =
strategy.

join(labels_data, feats_data, JoinType.INNER_JOIN);


The data dump step is good; I already verified the outputs.

And still get the same exception from the joining. Looks like that it is
definitely not the issue of the Avro type that I defined. Now just got
stuck here...


Error: java.lang.ClassCastException: java.lang.String cannot be cast to
org.apache.crunch.Pair at
org.apache.crunch.lib.join.DefaultJoinStrategy$2.map(DefaultJoinStrategy.java:94)
at org.apache.crunch.MapFn.process(MapFn.java:34) at
org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:98) at
org.apache.crunch.impl.mr.emit.IntermediateEmitter.emit(IntermediateEmitter.java:56)
at org.apache.crunch.MapFn.process(MapFn.java:34) at
org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:98) at
org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:109) at
org.apache.crunch.impl.mr.run.CrunchMapper.map(CrunchMapper.java:60) at
org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145) at
org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:763) at
org.apache.hadoop.mapred.MapTask.run(MapTask.java:339) at
org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:162) at
java.security.AccessController.doPrivileged(Native Method) at
javax.security.auth.Subject.doAs(Subject.java:415) at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:157)

On Wed, May 13, 2015 at 2:05 PM, Lucy Chen <lucychen2014fall@gmail.com>
wrote:

> Hi Josh,
>
>          Those two functions bring the keys for each PCollections so that
> they can join in the next step. Here is the functions look like:
>
> public class KeyOnLabels extends DoFn<Labels, Pair<String, Labels>>{
>
>  private String key;
>
>  public KeyOnLabels(String input)
>
> {
>
> this.key = input;
>
> }
>
>  @Override
>
> public void process(Labels input, Emitter<Pair<String, Labels>> emitter)
>
> {
>
> if(key.equalsIgnoreCase("class_ID"))
>
>    emitter.emit(Pair.of(input.getClassID(), input));
>
> else if(key.equalsIgnoreCase("sample_ID"))
>
> emitter.emit(Pair.of(input.getSampleID(), input));
>
> else
>
> emitter.emit(Pair.of(Integer.toString(input.getBinaryIndicator()), input));
>
>  }
>
>
> }
>
> public class KeyOnFeats extends DoFn<Feats, Pair<String, Feats>>{
>
>  private final static Logger logger = Logger
>
>       .getLogger(KeyOnFeats.class.getName());
>
> private String key;
>
> public KeyOnFeats(String input)
>
> {
>
> this.key = input;
>
> }
>
> @Override
>
> public void process(Feats input, Emitter<Pair<String, Feats>> emitter)
>
> {
>
> if(key.equals("sample_ID"))
>
> emitter.emit(Pair.of(input.getSampleID(), input));
>
> else
>
> logger.error("The key should be specified as sample_ID only!");
>
> }
>
>
> }
>
>
> Meanwhile I checked the two outputs and they looked like normal.Something
> like the follows:
>
>
> {"key": "ID1", "value": Feats@14f5e86}
>
>
> {"key": "ID1", "value": Labels@489983f3}
>
>
>     That's why I feel something weird happened in the join step.
>
>
>     Thanks!
>
>
> Lucy
>
> On Wed, May 13, 2015 at 12:34 PM, Josh Wills <josh.wills@gmail.com> wrote:
>
>> What do the KeyOnLabels and KeyOnFeats functions return? From the error,
>> it looks like the labels_data PTable is actually a PCollection<String>,
>> which would happen if KeyOnLabels was somehow returning a String in some
>> situations despite claiming to return a Pair<String, Labels>.
>>
>> On Wed, May 13, 2015 at 12:31 PM, Lucy Chen <lucychen2014fall@gmail.com>
>> wrote:
>>
>>> Meanwhile, I just realized that one of my joining jobs look also OK,
>>> which also had the Avro type in it:
>>>
>>> JoinStrategy<String, Feats, Feats> strategy
>>>
>>> = new DefaultJoinStrategy<String, Feats, Feats>(50);
>>>
>>> PTable<String, Pair<Feats,Feats>> joined = strategy.join(input_A,
>>> input_B, JoinType.INNER_JOIN);
>>>
>>>
>>> So the Avro type probably is not the issue.
>>>
>>>
>>> Any advice?
>>>
>>>
>>> Lucy
>>>
>>> On Wed, May 13, 2015 at 12:22 PM, Lucy Chen <lucychen2014fall@gmail.com>
>>> wrote:
>>>
>>>> Hi all,
>>>>
>>>>          I had a join step in my crunch pipeline, and it looks like the
>>>> following:
>>>>
>>>> //get label data
>>>>
>>>> PType<Labels> LabelsType = Avros.records(Labels.class);
>>>>
>>>> PCollection<Labels> training_labels = input.parallelDo(new
>>>> LabelDataParser(), LabelsType);
>>>>
>>>> PTable<String, Labels> labels_data = training_labels.
>>>>
>>>>  parallelDo(new KeyOnLabels("sample_ID"), tableOf(strings(),
>>>> LabelsType));
>>>>
>>>> //get features
>>>>
>>>> PType<Feats> FeatsType = Avros.records(Feats.class);
>>>>
>>>> PCollection<Feats> training_feats = Feature.FeatLoader(pipeline,
>>>> sample_features_inputs);
>>>>
>>>> PTable<String, Feats> feats_data = training_feats.parallelDo(new
>>>> KeyOnFeats("sample_ID"), tableOf(strings(), FeatsType));
>>>>
>>>>
>>>> //join labels and features
>>>>
>>>> JoinStrategy<String, Labels, Feats> strategy = new
>>>> DefaultJoinStrategy<String, Labels, Feats>(20);
>>>>
>>>> PTable<String, Pair<Labels, Feats>> joined_training = strategy.
>>>>
>>>> join(labels_data, feats_data, JoinType.INNER_JOIN);
>>>>
>>>>
>>>> //class Labels
>>>>
>>>> public class Labels implements java.io.Serializable, Cloneable{
>>>>
>>>>  private String class_ID;
>>>>
>>>> private String sample_ID;
>>>>
>>>> private int binary_ind;
>>>>
>>>>  public Labels()
>>>>
>>>> {
>>>>
>>>> this(null, null, 0);
>>>>
>>>> }
>>>>
>>>>          public Labels(String class_ID, String sample_ID, int ind)
>>>>
>>>> {
>>>>
>>>> this.class_ID = class_ID;
>>>>
>>>> this.sample_ID = sample_ID;
>>>>
>>>> this.binary_ind = ind;
>>>>
>>>> }
>>>>
>>>>         ...
>>>>
>>>> }
>>>>
>>>>
>>>> //class Feats
>>>>
>>>>
>>>> public class *Feats* implements java.io.Serializable, Cloneable{
>>>>
>>>>  private String sample_id;
>>>>
>>>> private String sample_name;
>>>>
>>>> private Map<String, Float> feat;
>>>>
>>>>  public Feats()
>>>>
>>>> {
>>>>
>>>> this(null, null, null);
>>>>
>>>> }
>>>>
>>>>  public Feats(String id, String name, Map<String, Float> feat)
>>>>
>>>> {
>>>>
>>>> this.sample_id = id;
>>>>
>>>> this.sample_name = name;
>>>>
>>>> this.feat = feat;
>>>>
>>>>  }
>>>>
>>>>        ...
>>>>
>>>>
>>>> }
>>>>
>>>>
>>>>    The outputs of labels_data and feats_data are both fine; but the
>>>> join step throws the following exception:
>>>>
>>>>
>>>> Error: java.lang.ClassCastException: java.lang.String cannot be cast to
>>>> org.apache.crunch.Pair at
>>>> org.apache.crunch.lib.join.DefaultJoinStrategy$1.map(DefaultJoinStrategy.java:87)
>>>> at org.apache.crunch.MapFn.process(MapFn.java:34) at
>>>> org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:98) at
>>>> org.apache.crunch.impl.mr.emit.IntermediateEmitter.emit(IntermediateEmitter.java:56)
>>>> at org.apache.crunch.MapFn.process(MapFn.java:34) at
>>>> org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:98) at
>>>> org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:109) at
>>>> org.apache.crunch.impl.mr.run.CrunchMapper.map(CrunchMapper.java:60) at
>>>> org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145) at
>>>> org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:763) at
>>>> org.apache.hadoop.mapred.MapTask.run(MapTask.java:339) at
>>>> org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:162) at
>>>> java.security.AccessController.doPrivileged(Native Method) at
>>>> javax.security.auth.Subject.doAs(Subject.java:415) at
>>>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491)
>>>> at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:157)
>>>>
>>>>
>>>>        This issue already bothered me for a while; Did any one get a
>>>> similar issue here? Is there another option that will solve it?
>>>>
>>>>
>>>>       Btw, I already successfully ran the following joining job:
>>>>
>>>>
>>>> JoinStrategy<String, Float, Tuple3<String, String, Float>> strategy
=
>>>> new DefaultJoinStrategy<String, Float, Tuple3<String, String,
>>>> Float>>(100);
>>>>
>>>>  PTable<String, Pair<Float,Tuple3<String, String, Float>>>
joined =
>>>> strategy.join(input_A, input_B, JoinType.INNER_JOIN);
>>>>
>>>>
>>>>    So I guess the issue may be still related to the Avro types that I
>>>> defined.
>>>>
>>>>
>>>>         Thanks for your advice.
>>>>
>>>>
>>>> Lucy
>>>>
>>>>
>>>>
>>>>
>>>>
>>>
>>
>

Mime
View raw message