crunch-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Josh Wills <josh.wi...@gmail.com>
Subject Re: question about join
Date Thu, 14 May 2015 13:27:09 GMT
We have some testing helper code in the project (the dependency is named
crunch-test-0.11.0-hadoop2) that we use for running integration tests in
the main project. The snippet I sent was a modification of this one:

https://github.com/apache/crunch/blob/master/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroReflectIT.java

On Wed, May 13, 2015 at 11:02 PM, Lucy Chen <lucychen2014fall@gmail.com>
wrote:

> Hi Josh,
>
>          I am using crunch-core-0.11.0-hadoop2.jar; I did not find a class
> AvroReflectIT under org.apache.crunch.io.avro. Is it in a new version?
> Meanwhile how should I set the variable tmpDir in my code? What do you
> mean by "done inside of AvroReflectIT"? You mean copy paste the above
> code within the class AvroReflectIT and run it?
>
>          Thanks!
>
> Lucy
>
> On Wed, May 13, 2015 at 6:09 PM, Josh Wills <jwills@cloudera.com> wrote:
>
>> Hey Lucy,
>>
>> I tried to see if I could replicate the error by writing the following
>> integration test (done inside of AvroReflectIT)-- does something like this
>> fail when you run it? And you're using Crunch 0.11-hadoop2, or something
>> earlier?
>>
>> @Test
>> public void testJoinReflectedData() throws Exception {
>>   Pipeline pipeline = new MRPipeline(AvroReflectIT.class, tmpDir.getDefaultConfiguration());
>>   AvroType<StringWrapper> atype = Avros.records(StringWrapper.class);
>>   PCollection<StringWrapper> p1 = pipeline.create(ImmutableList.of(
>>     new StringWrapper("josh"), new StringWrapper("wills"), new StringWrapper("foo")),
atype);
>>   PCollection<StringWrapper> p2 = pipeline.create(ImmutableList.of(
>>           new StringWrapper("mike"), new StringWrapper("wills"), new StringWrapper("foo")),
atype);
>>   PTable<String, StringWrapper> pt1 = p1.parallelDo(new MapFn<StringWrapper,
Pair<String, StringWrapper>>() {
>>     @Override
>>     public Pair<String, StringWrapper> map(StringWrapper input) {
>>       return Pair.of(input.getValue(), input);
>>     }
>>   }, Avros.tableOf(Avros.strings(), atype));
>>   PTable<String, StringWrapper> pt2 = p2.parallelDo(new MapFn<StringWrapper,
Pair<String, StringWrapper>>() {
>>     @Override
>>     public Pair<String, StringWrapper> map(StringWrapper input) {
>>       return Pair.of(input.getValue(), input);
>>     }
>>   }, Avros.tableOf(Avros.strings(), atype));
>>   JoinStrategy<String, StringWrapper, StringWrapper> joinStrategy = new DefaultJoinStrategy<String,
StringWrapper, StringWrapper>();
>>   PTable<String, Pair<StringWrapper, StringWrapper>> res = joinStrategy.join(pt1,
pt2, JoinType.INNER_JOIN);
>>   for (Pair<String, Pair<StringWrapper, StringWrapper>> p : res.materialize())
{
>>     System.out.println(p);
>>   }
>> }
>>
>>
>> On Wed, May 13, 2015 at 3:10 PM, Lucy Chen <lucychen2014fall@gmail.com>
>> wrote:
>>
>>> Hi,
>>>
>>>       I tried to dump the members from Label class to a TupleN, and dump
>>> only sample_id and sample_name from Feats class to a Pair, and then do the
>>> joining,
>>>
>>> JoinStrategy<String, TupleN, Pair<String, String>> strategy
>>>
>>> = new DefaultJoinStrategy<String, TupleN, Pair<String, String>>(20);
>>>
>>> PTable<String, Pair<TupleN, Pair<String, String>>> joined_training
=
>>> strategy.
>>>
>>> join(labels_data, feats_data, JoinType.INNER_JOIN);
>>>
>>>
>>> The data dump step is good; I already verified the outputs.
>>>
>>> And still get the same exception from the joining. Looks like that it is
>>> definitely not the issue of the Avro type that I defined. Now just got
>>> stuck here...
>>>
>>>
>>> Error: java.lang.ClassCastException: java.lang.String cannot be cast to
>>> org.apache.crunch.Pair at
>>> org.apache.crunch.lib.join.DefaultJoinStrategy$2.map(DefaultJoinStrategy.java:94)
>>> at org.apache.crunch.MapFn.process(MapFn.java:34) at
>>> org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:98) at
>>> org.apache.crunch.impl.mr.emit.IntermediateEmitter.emit(IntermediateEmitter.java:56)
>>> at org.apache.crunch.MapFn.process(MapFn.java:34) at
>>> org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:98) at
>>> org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:109) at
>>> org.apache.crunch.impl.mr.run.CrunchMapper.map(CrunchMapper.java:60) at
>>> org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145) at
>>> org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:763) at
>>> org.apache.hadoop.mapred.MapTask.run(MapTask.java:339) at
>>> org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:162) at
>>> java.security.AccessController.doPrivileged(Native Method) at
>>> javax.security.auth.Subject.doAs(Subject.java:415) at
>>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491)
>>> at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:157)
>>>
>>> On Wed, May 13, 2015 at 2:05 PM, Lucy Chen <lucychen2014fall@gmail.com>
>>> wrote:
>>>
>>>> Hi Josh,
>>>>
>>>>          Those two functions bring the keys for each PCollections so
>>>> that they can join in the next step. Here is the functions look like:
>>>>
>>>> public class KeyOnLabels extends DoFn<Labels, Pair<String, Labels>>{
>>>>
>>>>  private String key;
>>>>
>>>>  public KeyOnLabels(String input)
>>>>
>>>> {
>>>>
>>>> this.key = input;
>>>>
>>>> }
>>>>
>>>>  @Override
>>>>
>>>> public void process(Labels input, Emitter<Pair<String, Labels>>
>>>> emitter)
>>>>
>>>> {
>>>>
>>>> if(key.equalsIgnoreCase("class_ID"))
>>>>
>>>>    emitter.emit(Pair.of(input.getClassID(), input));
>>>>
>>>> else if(key.equalsIgnoreCase("sample_ID"))
>>>>
>>>> emitter.emit(Pair.of(input.getSampleID(), input));
>>>>
>>>> else
>>>>
>>>> emitter.emit(Pair.of(Integer.toString(input.getBinaryIndicator()),
>>>> input));
>>>>
>>>>  }
>>>>
>>>>
>>>> }
>>>>
>>>> public class KeyOnFeats extends DoFn<Feats, Pair<String, Feats>>{
>>>>
>>>>  private final static Logger logger = Logger
>>>>
>>>>       .getLogger(KeyOnFeats.class.getName());
>>>>
>>>> private String key;
>>>>
>>>> public KeyOnFeats(String input)
>>>>
>>>> {
>>>>
>>>> this.key = input;
>>>>
>>>> }
>>>>
>>>> @Override
>>>>
>>>> public void process(Feats input, Emitter<Pair<String, Feats>>
emitter)
>>>>
>>>> {
>>>>
>>>> if(key.equals("sample_ID"))
>>>>
>>>> emitter.emit(Pair.of(input.getSampleID(), input));
>>>>
>>>> else
>>>>
>>>> logger.error("The key should be specified as sample_ID only!");
>>>>
>>>> }
>>>>
>>>>
>>>> }
>>>>
>>>>
>>>> Meanwhile I checked the two outputs and they looked like
>>>> normal.Something like the follows:
>>>>
>>>>
>>>> {"key": "ID1", "value": Feats@14f5e86}
>>>>
>>>>
>>>> {"key": "ID1", "value": Labels@489983f3}
>>>>
>>>>
>>>>     That's why I feel something weird happened in the join step.
>>>>
>>>>
>>>>     Thanks!
>>>>
>>>>
>>>> Lucy
>>>>
>>>> On Wed, May 13, 2015 at 12:34 PM, Josh Wills <josh.wills@gmail.com>
>>>> wrote:
>>>>
>>>>> What do the KeyOnLabels and KeyOnFeats functions return? From the
>>>>> error, it looks like the labels_data PTable is actually a
>>>>> PCollection<String>, which would happen if KeyOnLabels was somehow
>>>>> returning a String in some situations despite claiming to return a
>>>>> Pair<String, Labels>.
>>>>>
>>>>> On Wed, May 13, 2015 at 12:31 PM, Lucy Chen <
>>>>> lucychen2014fall@gmail.com> wrote:
>>>>>
>>>>>> Meanwhile, I just realized that one of my joining jobs look also
OK,
>>>>>> which also had the Avro type in it:
>>>>>>
>>>>>> JoinStrategy<String, Feats, Feats> strategy
>>>>>>
>>>>>> = new DefaultJoinStrategy<String, Feats, Feats>(50);
>>>>>>
>>>>>> PTable<String, Pair<Feats,Feats>> joined = strategy.join(input_A,
>>>>>> input_B, JoinType.INNER_JOIN);
>>>>>>
>>>>>>
>>>>>> So the Avro type probably is not the issue.
>>>>>>
>>>>>>
>>>>>> Any advice?
>>>>>>
>>>>>>
>>>>>> Lucy
>>>>>>
>>>>>> On Wed, May 13, 2015 at 12:22 PM, Lucy Chen <
>>>>>> lucychen2014fall@gmail.com> wrote:
>>>>>>
>>>>>>> Hi all,
>>>>>>>
>>>>>>>          I had a join step in my crunch pipeline, and it looks
like
>>>>>>> the following:
>>>>>>>
>>>>>>> //get label data
>>>>>>>
>>>>>>> PType<Labels> LabelsType = Avros.records(Labels.class);
>>>>>>>
>>>>>>> PCollection<Labels> training_labels = input.parallelDo(new
>>>>>>> LabelDataParser(), LabelsType);
>>>>>>>
>>>>>>> PTable<String, Labels> labels_data = training_labels.
>>>>>>>
>>>>>>>  parallelDo(new KeyOnLabels("sample_ID"), tableOf(strings(),
>>>>>>> LabelsType));
>>>>>>>
>>>>>>> //get features
>>>>>>>
>>>>>>> PType<Feats> FeatsType = Avros.records(Feats.class);
>>>>>>>
>>>>>>> PCollection<Feats> training_feats = Feature.FeatLoader(pipeline,
>>>>>>> sample_features_inputs);
>>>>>>>
>>>>>>> PTable<String, Feats> feats_data = training_feats.parallelDo(new
>>>>>>> KeyOnFeats("sample_ID"), tableOf(strings(), FeatsType));
>>>>>>>
>>>>>>>
>>>>>>> //join labels and features
>>>>>>>
>>>>>>> JoinStrategy<String, Labels, Feats> strategy = new
>>>>>>> DefaultJoinStrategy<String, Labels, Feats>(20);
>>>>>>>
>>>>>>> PTable<String, Pair<Labels, Feats>> joined_training
= strategy.
>>>>>>>
>>>>>>> join(labels_data, feats_data, JoinType.INNER_JOIN);
>>>>>>>
>>>>>>>
>>>>>>> //class Labels
>>>>>>>
>>>>>>> public class Labels implements java.io.Serializable, Cloneable{
>>>>>>>
>>>>>>>  private String class_ID;
>>>>>>>
>>>>>>> private String sample_ID;
>>>>>>>
>>>>>>> private int binary_ind;
>>>>>>>
>>>>>>>  public Labels()
>>>>>>>
>>>>>>> {
>>>>>>>
>>>>>>> this(null, null, 0);
>>>>>>>
>>>>>>> }
>>>>>>>
>>>>>>>          public Labels(String class_ID, String sample_ID, int
ind)
>>>>>>>
>>>>>>> {
>>>>>>>
>>>>>>> this.class_ID = class_ID;
>>>>>>>
>>>>>>> this.sample_ID = sample_ID;
>>>>>>>
>>>>>>> this.binary_ind = ind;
>>>>>>>
>>>>>>> }
>>>>>>>
>>>>>>>         ...
>>>>>>>
>>>>>>> }
>>>>>>>
>>>>>>>
>>>>>>> //class Feats
>>>>>>>
>>>>>>>
>>>>>>> public class *Feats* implements java.io.Serializable, Cloneable{
>>>>>>>
>>>>>>>  private String sample_id;
>>>>>>>
>>>>>>> private String sample_name;
>>>>>>>
>>>>>>> private Map<String, Float> feat;
>>>>>>>
>>>>>>>  public Feats()
>>>>>>>
>>>>>>> {
>>>>>>>
>>>>>>> this(null, null, null);
>>>>>>>
>>>>>>> }
>>>>>>>
>>>>>>>  public Feats(String id, String name, Map<String, Float>
feat)
>>>>>>>
>>>>>>> {
>>>>>>>
>>>>>>> this.sample_id = id;
>>>>>>>
>>>>>>> this.sample_name = name;
>>>>>>>
>>>>>>> this.feat = feat;
>>>>>>>
>>>>>>>  }
>>>>>>>
>>>>>>>        ...
>>>>>>>
>>>>>>>
>>>>>>> }
>>>>>>>
>>>>>>>
>>>>>>>    The outputs of labels_data and feats_data are both fine; but
the
>>>>>>> join step throws the following exception:
>>>>>>>
>>>>>>>
>>>>>>> Error: java.lang.ClassCastException: java.lang.String cannot
be cast
>>>>>>> to org.apache.crunch.Pair at
>>>>>>> org.apache.crunch.lib.join.DefaultJoinStrategy$1.map(DefaultJoinStrategy.java:87)
>>>>>>> at org.apache.crunch.MapFn.process(MapFn.java:34) at
>>>>>>> org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:98)
at
>>>>>>> org.apache.crunch.impl.mr.emit.IntermediateEmitter.emit(IntermediateEmitter.java:56)
>>>>>>> at org.apache.crunch.MapFn.process(MapFn.java:34) at
>>>>>>> org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:98)
at
>>>>>>> org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:109)
at
>>>>>>> org.apache.crunch.impl.mr.run.CrunchMapper.map(CrunchMapper.java:60)
at
>>>>>>> org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145) at
>>>>>>> org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:763)
at
>>>>>>> org.apache.hadoop.mapred.MapTask.run(MapTask.java:339) at
>>>>>>> org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:162)
at
>>>>>>> java.security.AccessController.doPrivileged(Native Method) at
>>>>>>> javax.security.auth.Subject.doAs(Subject.java:415) at
>>>>>>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491)
>>>>>>> at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:157)
>>>>>>>
>>>>>>>
>>>>>>>        This issue already bothered me for a while; Did any one
get
>>>>>>> a similar issue here? Is there another option that will solve
it?
>>>>>>>
>>>>>>>
>>>>>>>       Btw, I already successfully ran the following joining job:
>>>>>>>
>>>>>>>
>>>>>>> JoinStrategy<String, Float, Tuple3<String, String, Float>>
strategy
>>>>>>> = new DefaultJoinStrategy<String, Float, Tuple3<String,
String,
>>>>>>> Float>>(100);
>>>>>>>
>>>>>>>  PTable<String, Pair<Float,Tuple3<String, String, Float>>>
joined =
>>>>>>> strategy.join(input_A, input_B, JoinType.INNER_JOIN);
>>>>>>>
>>>>>>>
>>>>>>>    So I guess the issue may be still related to the Avro types
that
>>>>>>> I defined.
>>>>>>>
>>>>>>>
>>>>>>>         Thanks for your advice.
>>>>>>>
>>>>>>>
>>>>>>> Lucy
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>>
>> --
>> Director of Data Science
>> Cloudera <http://www.cloudera.com>
>> Twitter: @josh_wills <http://twitter.com/josh_wills>
>>
>
>

Mime
View raw message