crunch-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Josh Wills <jwi...@cloudera.com>
Subject Re: question about join
Date Thu, 14 May 2015 01:09:32 GMT
Hey Lucy,

I tried to see if I could replicate the error by writing the following
integration test (done inside of AvroReflectIT)-- does something like this
fail when you run it? And you're using Crunch 0.11-hadoop2, or something
earlier?

@Test
public void testJoinReflectedData() throws Exception {
  Pipeline pipeline = new MRPipeline(AvroReflectIT.class,
tmpDir.getDefaultConfiguration());
  AvroType<StringWrapper> atype = Avros.records(StringWrapper.class);
  PCollection<StringWrapper> p1 = pipeline.create(ImmutableList.of(
    new StringWrapper("josh"), new StringWrapper("wills"), new
StringWrapper("foo")), atype);
  PCollection<StringWrapper> p2 = pipeline.create(ImmutableList.of(
          new StringWrapper("mike"), new StringWrapper("wills"), new
StringWrapper("foo")), atype);
  PTable<String, StringWrapper> pt1 = p1.parallelDo(new
MapFn<StringWrapper, Pair<String, StringWrapper>>() {
    @Override
    public Pair<String, StringWrapper> map(StringWrapper input) {
      return Pair.of(input.getValue(), input);
    }
  }, Avros.tableOf(Avros.strings(), atype));
  PTable<String, StringWrapper> pt2 = p2.parallelDo(new
MapFn<StringWrapper, Pair<String, StringWrapper>>() {
    @Override
    public Pair<String, StringWrapper> map(StringWrapper input) {
      return Pair.of(input.getValue(), input);
    }
  }, Avros.tableOf(Avros.strings(), atype));
  JoinStrategy<String, StringWrapper, StringWrapper> joinStrategy =
new DefaultJoinStrategy<String, StringWrapper, StringWrapper>();
  PTable<String, Pair<StringWrapper, StringWrapper>> res =
joinStrategy.join(pt1, pt2, JoinType.INNER_JOIN);
  for (Pair<String, Pair<StringWrapper, StringWrapper>> p : res.materialize())
{
    System.out.println(p);
  }
}


On Wed, May 13, 2015 at 3:10 PM, Lucy Chen <lucychen2014fall@gmail.com>
wrote:

> Hi,
>
>       I tried to dump the members from Label class to a TupleN, and dump
> only sample_id and sample_name from Feats class to a Pair, and then do the
> joining,
>
> JoinStrategy<String, TupleN, Pair<String, String>> strategy
>
> = new DefaultJoinStrategy<String, TupleN, Pair<String, String>>(20);
>
> PTable<String, Pair<TupleN, Pair<String, String>>> joined_training
=
> strategy.
>
> join(labels_data, feats_data, JoinType.INNER_JOIN);
>
>
> The data dump step is good; I already verified the outputs.
>
> And still get the same exception from the joining. Looks like that it is
> definitely not the issue of the Avro type that I defined. Now just got
> stuck here...
>
>
> Error: java.lang.ClassCastException: java.lang.String cannot be cast to
> org.apache.crunch.Pair at
> org.apache.crunch.lib.join.DefaultJoinStrategy$2.map(DefaultJoinStrategy.java:94)
> at org.apache.crunch.MapFn.process(MapFn.java:34) at
> org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:98) at
> org.apache.crunch.impl.mr.emit.IntermediateEmitter.emit(IntermediateEmitter.java:56)
> at org.apache.crunch.MapFn.process(MapFn.java:34) at
> org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:98) at
> org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:109) at
> org.apache.crunch.impl.mr.run.CrunchMapper.map(CrunchMapper.java:60) at
> org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145) at
> org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:763) at
> org.apache.hadoop.mapred.MapTask.run(MapTask.java:339) at
> org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:162) at
> java.security.AccessController.doPrivileged(Native Method) at
> javax.security.auth.Subject.doAs(Subject.java:415) at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491)
> at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:157)
>
> On Wed, May 13, 2015 at 2:05 PM, Lucy Chen <lucychen2014fall@gmail.com>
> wrote:
>
>> Hi Josh,
>>
>>          Those two functions bring the keys for each PCollections so that
>> they can join in the next step. Here is the functions look like:
>>
>> public class KeyOnLabels extends DoFn<Labels, Pair<String, Labels>>{
>>
>>  private String key;
>>
>>  public KeyOnLabels(String input)
>>
>> {
>>
>> this.key = input;
>>
>> }
>>
>>  @Override
>>
>> public void process(Labels input, Emitter<Pair<String, Labels>> emitter)
>>
>> {
>>
>> if(key.equalsIgnoreCase("class_ID"))
>>
>>    emitter.emit(Pair.of(input.getClassID(), input));
>>
>> else if(key.equalsIgnoreCase("sample_ID"))
>>
>> emitter.emit(Pair.of(input.getSampleID(), input));
>>
>> else
>>
>> emitter.emit(Pair.of(Integer.toString(input.getBinaryIndicator()),
>> input));
>>
>>  }
>>
>>
>> }
>>
>> public class KeyOnFeats extends DoFn<Feats, Pair<String, Feats>>{
>>
>>  private final static Logger logger = Logger
>>
>>       .getLogger(KeyOnFeats.class.getName());
>>
>> private String key;
>>
>> public KeyOnFeats(String input)
>>
>> {
>>
>> this.key = input;
>>
>> }
>>
>> @Override
>>
>> public void process(Feats input, Emitter<Pair<String, Feats>> emitter)
>>
>> {
>>
>> if(key.equals("sample_ID"))
>>
>> emitter.emit(Pair.of(input.getSampleID(), input));
>>
>> else
>>
>> logger.error("The key should be specified as sample_ID only!");
>>
>> }
>>
>>
>> }
>>
>>
>> Meanwhile I checked the two outputs and they looked like normal.Something
>> like the follows:
>>
>>
>> {"key": "ID1", "value": Feats@14f5e86}
>>
>>
>> {"key": "ID1", "value": Labels@489983f3}
>>
>>
>>     That's why I feel something weird happened in the join step.
>>
>>
>>     Thanks!
>>
>>
>> Lucy
>>
>> On Wed, May 13, 2015 at 12:34 PM, Josh Wills <josh.wills@gmail.com>
>> wrote:
>>
>>> What do the KeyOnLabels and KeyOnFeats functions return? From the error,
>>> it looks like the labels_data PTable is actually a PCollection<String>,
>>> which would happen if KeyOnLabels was somehow returning a String in some
>>> situations despite claiming to return a Pair<String, Labels>.
>>>
>>> On Wed, May 13, 2015 at 12:31 PM, Lucy Chen <lucychen2014fall@gmail.com>
>>> wrote:
>>>
>>>> Meanwhile, I just realized that one of my joining jobs look also OK,
>>>> which also had the Avro type in it:
>>>>
>>>> JoinStrategy<String, Feats, Feats> strategy
>>>>
>>>> = new DefaultJoinStrategy<String, Feats, Feats>(50);
>>>>
>>>> PTable<String, Pair<Feats,Feats>> joined = strategy.join(input_A,
>>>> input_B, JoinType.INNER_JOIN);
>>>>
>>>>
>>>> So the Avro type probably is not the issue.
>>>>
>>>>
>>>> Any advice?
>>>>
>>>>
>>>> Lucy
>>>>
>>>> On Wed, May 13, 2015 at 12:22 PM, Lucy Chen <lucychen2014fall@gmail.com
>>>> > wrote:
>>>>
>>>>> Hi all,
>>>>>
>>>>>          I had a join step in my crunch pipeline, and it looks like
>>>>> the following:
>>>>>
>>>>> //get label data
>>>>>
>>>>> PType<Labels> LabelsType = Avros.records(Labels.class);
>>>>>
>>>>> PCollection<Labels> training_labels = input.parallelDo(new
>>>>> LabelDataParser(), LabelsType);
>>>>>
>>>>> PTable<String, Labels> labels_data = training_labels.
>>>>>
>>>>>  parallelDo(new KeyOnLabels("sample_ID"), tableOf(strings(),
>>>>> LabelsType));
>>>>>
>>>>> //get features
>>>>>
>>>>> PType<Feats> FeatsType = Avros.records(Feats.class);
>>>>>
>>>>> PCollection<Feats> training_feats = Feature.FeatLoader(pipeline,
>>>>> sample_features_inputs);
>>>>>
>>>>> PTable<String, Feats> feats_data = training_feats.parallelDo(new
>>>>> KeyOnFeats("sample_ID"), tableOf(strings(), FeatsType));
>>>>>
>>>>>
>>>>> //join labels and features
>>>>>
>>>>> JoinStrategy<String, Labels, Feats> strategy = new
>>>>> DefaultJoinStrategy<String, Labels, Feats>(20);
>>>>>
>>>>> PTable<String, Pair<Labels, Feats>> joined_training = strategy.
>>>>>
>>>>> join(labels_data, feats_data, JoinType.INNER_JOIN);
>>>>>
>>>>>
>>>>> //class Labels
>>>>>
>>>>> public class Labels implements java.io.Serializable, Cloneable{
>>>>>
>>>>>  private String class_ID;
>>>>>
>>>>> private String sample_ID;
>>>>>
>>>>> private int binary_ind;
>>>>>
>>>>>  public Labels()
>>>>>
>>>>> {
>>>>>
>>>>> this(null, null, 0);
>>>>>
>>>>> }
>>>>>
>>>>>          public Labels(String class_ID, String sample_ID, int ind)
>>>>>
>>>>> {
>>>>>
>>>>> this.class_ID = class_ID;
>>>>>
>>>>> this.sample_ID = sample_ID;
>>>>>
>>>>> this.binary_ind = ind;
>>>>>
>>>>> }
>>>>>
>>>>>         ...
>>>>>
>>>>> }
>>>>>
>>>>>
>>>>> //class Feats
>>>>>
>>>>>
>>>>> public class *Feats* implements java.io.Serializable, Cloneable{
>>>>>
>>>>>  private String sample_id;
>>>>>
>>>>> private String sample_name;
>>>>>
>>>>> private Map<String, Float> feat;
>>>>>
>>>>>  public Feats()
>>>>>
>>>>> {
>>>>>
>>>>> this(null, null, null);
>>>>>
>>>>> }
>>>>>
>>>>>  public Feats(String id, String name, Map<String, Float> feat)
>>>>>
>>>>> {
>>>>>
>>>>> this.sample_id = id;
>>>>>
>>>>> this.sample_name = name;
>>>>>
>>>>> this.feat = feat;
>>>>>
>>>>>  }
>>>>>
>>>>>        ...
>>>>>
>>>>>
>>>>> }
>>>>>
>>>>>
>>>>>    The outputs of labels_data and feats_data are both fine; but the
>>>>> join step throws the following exception:
>>>>>
>>>>>
>>>>> Error: java.lang.ClassCastException: java.lang.String cannot be cast
>>>>> to org.apache.crunch.Pair at
>>>>> org.apache.crunch.lib.join.DefaultJoinStrategy$1.map(DefaultJoinStrategy.java:87)
>>>>> at org.apache.crunch.MapFn.process(MapFn.java:34) at
>>>>> org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:98) at
>>>>> org.apache.crunch.impl.mr.emit.IntermediateEmitter.emit(IntermediateEmitter.java:56)
>>>>> at org.apache.crunch.MapFn.process(MapFn.java:34) at
>>>>> org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:98) at
>>>>> org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:109) at
>>>>> org.apache.crunch.impl.mr.run.CrunchMapper.map(CrunchMapper.java:60)
at
>>>>> org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145) at
>>>>> org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:763) at
>>>>> org.apache.hadoop.mapred.MapTask.run(MapTask.java:339) at
>>>>> org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:162) at
>>>>> java.security.AccessController.doPrivileged(Native Method) at
>>>>> javax.security.auth.Subject.doAs(Subject.java:415) at
>>>>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491)
>>>>> at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:157)
>>>>>
>>>>>
>>>>>        This issue already bothered me for a while; Did any one get a
>>>>> similar issue here? Is there another option that will solve it?
>>>>>
>>>>>
>>>>>       Btw, I already successfully ran the following joining job:
>>>>>
>>>>>
>>>>> JoinStrategy<String, Float, Tuple3<String, String, Float>>
strategy =
>>>>> new DefaultJoinStrategy<String, Float, Tuple3<String, String,
>>>>> Float>>(100);
>>>>>
>>>>>  PTable<String, Pair<Float,Tuple3<String, String, Float>>>
joined =
>>>>> strategy.join(input_A, input_B, JoinType.INNER_JOIN);
>>>>>
>>>>>
>>>>>    So I guess the issue may be still related to the Avro types that I
>>>>> defined.
>>>>>
>>>>>
>>>>>         Thanks for your advice.
>>>>>
>>>>>
>>>>> Lucy
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>


-- 
Director of Data Science
Cloudera <http://www.cloudera.com>
Twitter: @josh_wills <http://twitter.com/josh_wills>

Mime
View raw message