crunch-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Lucy Chen <lucychen2014f...@gmail.com>
Subject Re: question about join
Date Thu, 14 May 2015 06:02:10 GMT
Hi Josh,

         I am using crunch-core-0.11.0-hadoop2.jar; I did not find a class
AvroReflectIT under org.apache.crunch.io.avro. Is it in a new version?
Meanwhile how should I set the variable tmpDir in my code? What do you mean
by "done inside of AvroReflectIT"? You mean copy paste the above code
within the class AvroReflectIT and run it?

         Thanks!

Lucy

On Wed, May 13, 2015 at 6:09 PM, Josh Wills <jwills@cloudera.com> wrote:

> Hey Lucy,
>
> I tried to see if I could replicate the error by writing the following
> integration test (done inside of AvroReflectIT)-- does something like this
> fail when you run it? And you're using Crunch 0.11-hadoop2, or something
> earlier?
>
> @Test
> public void testJoinReflectedData() throws Exception {
>   Pipeline pipeline = new MRPipeline(AvroReflectIT.class, tmpDir.getDefaultConfiguration());
>   AvroType<StringWrapper> atype = Avros.records(StringWrapper.class);
>   PCollection<StringWrapper> p1 = pipeline.create(ImmutableList.of(
>     new StringWrapper("josh"), new StringWrapper("wills"), new StringWrapper("foo")),
atype);
>   PCollection<StringWrapper> p2 = pipeline.create(ImmutableList.of(
>           new StringWrapper("mike"), new StringWrapper("wills"), new StringWrapper("foo")),
atype);
>   PTable<String, StringWrapper> pt1 = p1.parallelDo(new MapFn<StringWrapper,
Pair<String, StringWrapper>>() {
>     @Override
>     public Pair<String, StringWrapper> map(StringWrapper input) {
>       return Pair.of(input.getValue(), input);
>     }
>   }, Avros.tableOf(Avros.strings(), atype));
>   PTable<String, StringWrapper> pt2 = p2.parallelDo(new MapFn<StringWrapper,
Pair<String, StringWrapper>>() {
>     @Override
>     public Pair<String, StringWrapper> map(StringWrapper input) {
>       return Pair.of(input.getValue(), input);
>     }
>   }, Avros.tableOf(Avros.strings(), atype));
>   JoinStrategy<String, StringWrapper, StringWrapper> joinStrategy = new DefaultJoinStrategy<String,
StringWrapper, StringWrapper>();
>   PTable<String, Pair<StringWrapper, StringWrapper>> res = joinStrategy.join(pt1,
pt2, JoinType.INNER_JOIN);
>   for (Pair<String, Pair<StringWrapper, StringWrapper>> p : res.materialize())
{
>     System.out.println(p);
>   }
> }
>
>
> On Wed, May 13, 2015 at 3:10 PM, Lucy Chen <lucychen2014fall@gmail.com>
> wrote:
>
>> Hi,
>>
>>       I tried to dump the members from Label class to a TupleN, and dump
>> only sample_id and sample_name from Feats class to a Pair, and then do the
>> joining,
>>
>> JoinStrategy<String, TupleN, Pair<String, String>> strategy
>>
>> = new DefaultJoinStrategy<String, TupleN, Pair<String, String>>(20);
>>
>> PTable<String, Pair<TupleN, Pair<String, String>>> joined_training
=
>> strategy.
>>
>> join(labels_data, feats_data, JoinType.INNER_JOIN);
>>
>>
>> The data dump step is good; I already verified the outputs.
>>
>> And still get the same exception from the joining. Looks like that it is
>> definitely not the issue of the Avro type that I defined. Now just got
>> stuck here...
>>
>>
>> Error: java.lang.ClassCastException: java.lang.String cannot be cast to
>> org.apache.crunch.Pair at
>> org.apache.crunch.lib.join.DefaultJoinStrategy$2.map(DefaultJoinStrategy.java:94)
>> at org.apache.crunch.MapFn.process(MapFn.java:34) at
>> org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:98) at
>> org.apache.crunch.impl.mr.emit.IntermediateEmitter.emit(IntermediateEmitter.java:56)
>> at org.apache.crunch.MapFn.process(MapFn.java:34) at
>> org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:98) at
>> org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:109) at
>> org.apache.crunch.impl.mr.run.CrunchMapper.map(CrunchMapper.java:60) at
>> org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145) at
>> org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:763) at
>> org.apache.hadoop.mapred.MapTask.run(MapTask.java:339) at
>> org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:162) at
>> java.security.AccessController.doPrivileged(Native Method) at
>> javax.security.auth.Subject.doAs(Subject.java:415) at
>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491)
>> at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:157)
>>
>> On Wed, May 13, 2015 at 2:05 PM, Lucy Chen <lucychen2014fall@gmail.com>
>> wrote:
>>
>>> Hi Josh,
>>>
>>>          Those two functions bring the keys for each PCollections so
>>> that they can join in the next step. Here is the functions look like:
>>>
>>> public class KeyOnLabels extends DoFn<Labels, Pair<String, Labels>>{
>>>
>>>  private String key;
>>>
>>>  public KeyOnLabels(String input)
>>>
>>> {
>>>
>>> this.key = input;
>>>
>>> }
>>>
>>>  @Override
>>>
>>> public void process(Labels input, Emitter<Pair<String, Labels>> emitter)
>>>
>>> {
>>>
>>> if(key.equalsIgnoreCase("class_ID"))
>>>
>>>    emitter.emit(Pair.of(input.getClassID(), input));
>>>
>>> else if(key.equalsIgnoreCase("sample_ID"))
>>>
>>> emitter.emit(Pair.of(input.getSampleID(), input));
>>>
>>> else
>>>
>>> emitter.emit(Pair.of(Integer.toString(input.getBinaryIndicator()),
>>> input));
>>>
>>>  }
>>>
>>>
>>> }
>>>
>>> public class KeyOnFeats extends DoFn<Feats, Pair<String, Feats>>{
>>>
>>>  private final static Logger logger = Logger
>>>
>>>       .getLogger(KeyOnFeats.class.getName());
>>>
>>> private String key;
>>>
>>> public KeyOnFeats(String input)
>>>
>>> {
>>>
>>> this.key = input;
>>>
>>> }
>>>
>>> @Override
>>>
>>> public void process(Feats input, Emitter<Pair<String, Feats>> emitter)
>>>
>>> {
>>>
>>> if(key.equals("sample_ID"))
>>>
>>> emitter.emit(Pair.of(input.getSampleID(), input));
>>>
>>> else
>>>
>>> logger.error("The key should be specified as sample_ID only!");
>>>
>>> }
>>>
>>>
>>> }
>>>
>>>
>>> Meanwhile I checked the two outputs and they looked like
>>> normal.Something like the follows:
>>>
>>>
>>> {"key": "ID1", "value": Feats@14f5e86}
>>>
>>>
>>> {"key": "ID1", "value": Labels@489983f3}
>>>
>>>
>>>     That's why I feel something weird happened in the join step.
>>>
>>>
>>>     Thanks!
>>>
>>>
>>> Lucy
>>>
>>> On Wed, May 13, 2015 at 12:34 PM, Josh Wills <josh.wills@gmail.com>
>>> wrote:
>>>
>>>> What do the KeyOnLabels and KeyOnFeats functions return? From the
>>>> error, it looks like the labels_data PTable is actually a
>>>> PCollection<String>, which would happen if KeyOnLabels was somehow
>>>> returning a String in some situations despite claiming to return a
>>>> Pair<String, Labels>.
>>>>
>>>> On Wed, May 13, 2015 at 12:31 PM, Lucy Chen <lucychen2014fall@gmail.com
>>>> > wrote:
>>>>
>>>>> Meanwhile, I just realized that one of my joining jobs look also OK,
>>>>> which also had the Avro type in it:
>>>>>
>>>>> JoinStrategy<String, Feats, Feats> strategy
>>>>>
>>>>> = new DefaultJoinStrategy<String, Feats, Feats>(50);
>>>>>
>>>>> PTable<String, Pair<Feats,Feats>> joined = strategy.join(input_A,
>>>>> input_B, JoinType.INNER_JOIN);
>>>>>
>>>>>
>>>>> So the Avro type probably is not the issue.
>>>>>
>>>>>
>>>>> Any advice?
>>>>>
>>>>>
>>>>> Lucy
>>>>>
>>>>> On Wed, May 13, 2015 at 12:22 PM, Lucy Chen <
>>>>> lucychen2014fall@gmail.com> wrote:
>>>>>
>>>>>> Hi all,
>>>>>>
>>>>>>          I had a join step in my crunch pipeline, and it looks like
>>>>>> the following:
>>>>>>
>>>>>> //get label data
>>>>>>
>>>>>> PType<Labels> LabelsType = Avros.records(Labels.class);
>>>>>>
>>>>>> PCollection<Labels> training_labels = input.parallelDo(new
>>>>>> LabelDataParser(), LabelsType);
>>>>>>
>>>>>> PTable<String, Labels> labels_data = training_labels.
>>>>>>
>>>>>>  parallelDo(new KeyOnLabels("sample_ID"), tableOf(strings(),
>>>>>> LabelsType));
>>>>>>
>>>>>> //get features
>>>>>>
>>>>>> PType<Feats> FeatsType = Avros.records(Feats.class);
>>>>>>
>>>>>> PCollection<Feats> training_feats = Feature.FeatLoader(pipeline,
>>>>>> sample_features_inputs);
>>>>>>
>>>>>> PTable<String, Feats> feats_data = training_feats.parallelDo(new
>>>>>> KeyOnFeats("sample_ID"), tableOf(strings(), FeatsType));
>>>>>>
>>>>>>
>>>>>> //join labels and features
>>>>>>
>>>>>> JoinStrategy<String, Labels, Feats> strategy = new
>>>>>> DefaultJoinStrategy<String, Labels, Feats>(20);
>>>>>>
>>>>>> PTable<String, Pair<Labels, Feats>> joined_training =
strategy.
>>>>>>
>>>>>> join(labels_data, feats_data, JoinType.INNER_JOIN);
>>>>>>
>>>>>>
>>>>>> //class Labels
>>>>>>
>>>>>> public class Labels implements java.io.Serializable, Cloneable{
>>>>>>
>>>>>>  private String class_ID;
>>>>>>
>>>>>> private String sample_ID;
>>>>>>
>>>>>> private int binary_ind;
>>>>>>
>>>>>>  public Labels()
>>>>>>
>>>>>> {
>>>>>>
>>>>>> this(null, null, 0);
>>>>>>
>>>>>> }
>>>>>>
>>>>>>          public Labels(String class_ID, String sample_ID, int ind)
>>>>>>
>>>>>> {
>>>>>>
>>>>>> this.class_ID = class_ID;
>>>>>>
>>>>>> this.sample_ID = sample_ID;
>>>>>>
>>>>>> this.binary_ind = ind;
>>>>>>
>>>>>> }
>>>>>>
>>>>>>         ...
>>>>>>
>>>>>> }
>>>>>>
>>>>>>
>>>>>> //class Feats
>>>>>>
>>>>>>
>>>>>> public class *Feats* implements java.io.Serializable, Cloneable{
>>>>>>
>>>>>>  private String sample_id;
>>>>>>
>>>>>> private String sample_name;
>>>>>>
>>>>>> private Map<String, Float> feat;
>>>>>>
>>>>>>  public Feats()
>>>>>>
>>>>>> {
>>>>>>
>>>>>> this(null, null, null);
>>>>>>
>>>>>> }
>>>>>>
>>>>>>  public Feats(String id, String name, Map<String, Float> feat)
>>>>>>
>>>>>> {
>>>>>>
>>>>>> this.sample_id = id;
>>>>>>
>>>>>> this.sample_name = name;
>>>>>>
>>>>>> this.feat = feat;
>>>>>>
>>>>>>  }
>>>>>>
>>>>>>        ...
>>>>>>
>>>>>>
>>>>>> }
>>>>>>
>>>>>>
>>>>>>    The outputs of labels_data and feats_data are both fine; but the
>>>>>> join step throws the following exception:
>>>>>>
>>>>>>
>>>>>> Error: java.lang.ClassCastException: java.lang.String cannot be cast
>>>>>> to org.apache.crunch.Pair at
>>>>>> org.apache.crunch.lib.join.DefaultJoinStrategy$1.map(DefaultJoinStrategy.java:87)
>>>>>> at org.apache.crunch.MapFn.process(MapFn.java:34) at
>>>>>> org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:98) at
>>>>>> org.apache.crunch.impl.mr.emit.IntermediateEmitter.emit(IntermediateEmitter.java:56)
>>>>>> at org.apache.crunch.MapFn.process(MapFn.java:34) at
>>>>>> org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:98) at
>>>>>> org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:109) at
>>>>>> org.apache.crunch.impl.mr.run.CrunchMapper.map(CrunchMapper.java:60)
at
>>>>>> org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145) at
>>>>>> org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:763) at
>>>>>> org.apache.hadoop.mapred.MapTask.run(MapTask.java:339) at
>>>>>> org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:162) at
>>>>>> java.security.AccessController.doPrivileged(Native Method) at
>>>>>> javax.security.auth.Subject.doAs(Subject.java:415) at
>>>>>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491)
>>>>>> at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:157)
>>>>>>
>>>>>>
>>>>>>        This issue already bothered me for a while; Did any one get
a
>>>>>> similar issue here? Is there another option that will solve it?
>>>>>>
>>>>>>
>>>>>>       Btw, I already successfully ran the following joining job:
>>>>>>
>>>>>>
>>>>>> JoinStrategy<String, Float, Tuple3<String, String, Float>>
strategy =
>>>>>> new DefaultJoinStrategy<String, Float, Tuple3<String, String,
>>>>>> Float>>(100);
>>>>>>
>>>>>>  PTable<String, Pair<Float,Tuple3<String, String, Float>>>
joined =
>>>>>> strategy.join(input_A, input_B, JoinType.INNER_JOIN);
>>>>>>
>>>>>>
>>>>>>    So I guess the issue may be still related to the Avro types that
>>>>>> I defined.
>>>>>>
>>>>>>
>>>>>>         Thanks for your advice.
>>>>>>
>>>>>>
>>>>>> Lucy
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>
>
> --
> Director of Data Science
> Cloudera <http://www.cloudera.com>
> Twitter: @josh_wills <http://twitter.com/josh_wills>
>

Mime
View raw message