crunch-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Lucy Chen <lucychen2014f...@gmail.com>
Subject Re: question about join
Date Thu, 14 May 2015 23:38:42 GMT
Hi Josh,

       Thanks for your advice. I ran your testing case and the join
function looks fine. No exception from it. Let me do another round of
diagnosis and see whether I can find the issue.

Lucy

[foo,[StringWrapper [value=foo],StringWrapper [value=foo]]]

[wills,[StringWrapper [value=wills],StringWrapper [value=wills]]]

On Thu, May 14, 2015 at 6:27 AM, Josh Wills <josh.wills@gmail.com> wrote:

> We have some testing helper code in the project (the dependency is named
> crunch-test-0.11.0-hadoop2) that we use for running integration tests in
> the main project. The snippet I sent was a modification of this one:
>
>
> https://github.com/apache/crunch/blob/master/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroReflectIT.java
>
> On Wed, May 13, 2015 at 11:02 PM, Lucy Chen <lucychen2014fall@gmail.com>
> wrote:
>
>> Hi Josh,
>>
>>          I am using crunch-core-0.11.0-hadoop2.jar; I did not find a
>> class AvroReflectIT under org.apache.crunch.io.avro. Is it in a new
>> version? Meanwhile how should I set the variable tmpDir in my code? What
>> do you mean by "done inside of AvroReflectIT"? You mean copy paste the
>> above code within the class AvroReflectIT and run it?
>>
>>          Thanks!
>>
>> Lucy
>>
>> On Wed, May 13, 2015 at 6:09 PM, Josh Wills <jwills@cloudera.com> wrote:
>>
>>> Hey Lucy,
>>>
>>> I tried to see if I could replicate the error by writing the following
>>> integration test (done inside of AvroReflectIT)-- does something like this
>>> fail when you run it? And you're using Crunch 0.11-hadoop2, or something
>>> earlier?
>>>
>>> @Test
>>> public void testJoinReflectedData() throws Exception {
>>>   Pipeline pipeline = new MRPipeline(AvroReflectIT.class, tmpDir.getDefaultConfiguration());
>>>   AvroType<StringWrapper> atype = Avros.records(StringWrapper.class);
>>>   PCollection<StringWrapper> p1 = pipeline.create(ImmutableList.of(
>>>     new StringWrapper("josh"), new StringWrapper("wills"), new StringWrapper("foo")),
atype);
>>>   PCollection<StringWrapper> p2 = pipeline.create(ImmutableList.of(
>>>           new StringWrapper("mike"), new StringWrapper("wills"), new StringWrapper("foo")),
atype);
>>>   PTable<String, StringWrapper> pt1 = p1.parallelDo(new MapFn<StringWrapper,
Pair<String, StringWrapper>>() {
>>>     @Override
>>>     public Pair<String, StringWrapper> map(StringWrapper input) {
>>>       return Pair.of(input.getValue(), input);
>>>     }
>>>   }, Avros.tableOf(Avros.strings(), atype));
>>>   PTable<String, StringWrapper> pt2 = p2.parallelDo(new MapFn<StringWrapper,
Pair<String, StringWrapper>>() {
>>>     @Override
>>>     public Pair<String, StringWrapper> map(StringWrapper input) {
>>>       return Pair.of(input.getValue(), input);
>>>     }
>>>   }, Avros.tableOf(Avros.strings(), atype));
>>>   JoinStrategy<String, StringWrapper, StringWrapper> joinStrategy = new
DefaultJoinStrategy<String, StringWrapper, StringWrapper>();
>>>   PTable<String, Pair<StringWrapper, StringWrapper>> res = joinStrategy.join(pt1,
pt2, JoinType.INNER_JOIN);
>>>   for (Pair<String, Pair<StringWrapper, StringWrapper>> p : res.materialize())
{
>>>     System.out.println(p);
>>>   }
>>> }
>>>
>>>
>>> On Wed, May 13, 2015 at 3:10 PM, Lucy Chen <lucychen2014fall@gmail.com>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>>       I tried to dump the members from Label class to a TupleN, and
>>>> dump only sample_id and sample_name from Feats class to a Pair, and then
do
>>>> the joining,
>>>>
>>>> JoinStrategy<String, TupleN, Pair<String, String>> strategy
>>>>
>>>> = new DefaultJoinStrategy<String, TupleN, Pair<String, String>>(20);
>>>>
>>>> PTable<String, Pair<TupleN, Pair<String, String>>> joined_training
=
>>>> strategy.
>>>>
>>>> join(labels_data, feats_data, JoinType.INNER_JOIN);
>>>>
>>>>
>>>> The data dump step is good; I already verified the outputs.
>>>>
>>>> And still get the same exception from the joining. Looks like that it
>>>> is definitely not the issue of the Avro type that I defined. Now just got
>>>> stuck here...
>>>>
>>>>
>>>> Error: java.lang.ClassCastException: java.lang.String cannot be cast to
>>>> org.apache.crunch.Pair at
>>>> org.apache.crunch.lib.join.DefaultJoinStrategy$2.map(DefaultJoinStrategy.java:94)
>>>> at org.apache.crunch.MapFn.process(MapFn.java:34) at
>>>> org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:98) at
>>>> org.apache.crunch.impl.mr.emit.IntermediateEmitter.emit(IntermediateEmitter.java:56)
>>>> at org.apache.crunch.MapFn.process(MapFn.java:34) at
>>>> org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:98) at
>>>> org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:109) at
>>>> org.apache.crunch.impl.mr.run.CrunchMapper.map(CrunchMapper.java:60) at
>>>> org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145) at
>>>> org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:763) at
>>>> org.apache.hadoop.mapred.MapTask.run(MapTask.java:339) at
>>>> org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:162) at
>>>> java.security.AccessController.doPrivileged(Native Method) at
>>>> javax.security.auth.Subject.doAs(Subject.java:415) at
>>>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491)
>>>> at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:157)
>>>>
>>>> On Wed, May 13, 2015 at 2:05 PM, Lucy Chen <lucychen2014fall@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi Josh,
>>>>>
>>>>>          Those two functions bring the keys for each PCollections so
>>>>> that they can join in the next step. Here is the functions look like:
>>>>>
>>>>> public class KeyOnLabels extends DoFn<Labels, Pair<String, Labels>>{
>>>>>
>>>>>  private String key;
>>>>>
>>>>>  public KeyOnLabels(String input)
>>>>>
>>>>> {
>>>>>
>>>>> this.key = input;
>>>>>
>>>>> }
>>>>>
>>>>>  @Override
>>>>>
>>>>> public void process(Labels input, Emitter<Pair<String, Labels>>
>>>>> emitter)
>>>>>
>>>>> {
>>>>>
>>>>> if(key.equalsIgnoreCase("class_ID"))
>>>>>
>>>>>    emitter.emit(Pair.of(input.getClassID(), input));
>>>>>
>>>>> else if(key.equalsIgnoreCase("sample_ID"))
>>>>>
>>>>> emitter.emit(Pair.of(input.getSampleID(), input));
>>>>>
>>>>> else
>>>>>
>>>>> emitter.emit(Pair.of(Integer.toString(input.getBinaryIndicator()),
>>>>> input));
>>>>>
>>>>>  }
>>>>>
>>>>>
>>>>> }
>>>>>
>>>>> public class KeyOnFeats extends DoFn<Feats, Pair<String, Feats>>{
>>>>>
>>>>>  private final static Logger logger = Logger
>>>>>
>>>>>       .getLogger(KeyOnFeats.class.getName());
>>>>>
>>>>> private String key;
>>>>>
>>>>> public KeyOnFeats(String input)
>>>>>
>>>>> {
>>>>>
>>>>> this.key = input;
>>>>>
>>>>> }
>>>>>
>>>>> @Override
>>>>>
>>>>> public void process(Feats input, Emitter<Pair<String, Feats>>
emitter)
>>>>>
>>>>> {
>>>>>
>>>>> if(key.equals("sample_ID"))
>>>>>
>>>>> emitter.emit(Pair.of(input.getSampleID(), input));
>>>>>
>>>>> else
>>>>>
>>>>> logger.error("The key should be specified as sample_ID only!");
>>>>>
>>>>> }
>>>>>
>>>>>
>>>>> }
>>>>>
>>>>>
>>>>> Meanwhile I checked the two outputs and they looked like
>>>>> normal.Something like the follows:
>>>>>
>>>>>
>>>>> {"key": "ID1", "value": Feats@14f5e86}
>>>>>
>>>>>
>>>>> {"key": "ID1", "value": Labels@489983f3}
>>>>>
>>>>>
>>>>>     That's why I feel something weird happened in the join step.
>>>>>
>>>>>
>>>>>     Thanks!
>>>>>
>>>>>
>>>>> Lucy
>>>>>
>>>>> On Wed, May 13, 2015 at 12:34 PM, Josh Wills <josh.wills@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> What do the KeyOnLabels and KeyOnFeats functions return? From the
>>>>>> error, it looks like the labels_data PTable is actually a
>>>>>> PCollection<String>, which would happen if KeyOnLabels was
somehow
>>>>>> returning a String in some situations despite claiming to return
a
>>>>>> Pair<String, Labels>.
>>>>>>
>>>>>> On Wed, May 13, 2015 at 12:31 PM, Lucy Chen <
>>>>>> lucychen2014fall@gmail.com> wrote:
>>>>>>
>>>>>>> Meanwhile, I just realized that one of my joining jobs look also
OK,
>>>>>>> which also had the Avro type in it:
>>>>>>>
>>>>>>> JoinStrategy<String, Feats, Feats> strategy
>>>>>>>
>>>>>>> = new DefaultJoinStrategy<String, Feats, Feats>(50);
>>>>>>>
>>>>>>> PTable<String, Pair<Feats,Feats>> joined = strategy.join(input_A,
>>>>>>> input_B, JoinType.INNER_JOIN);
>>>>>>>
>>>>>>>
>>>>>>> So the Avro type probably is not the issue.
>>>>>>>
>>>>>>>
>>>>>>> Any advice?
>>>>>>>
>>>>>>>
>>>>>>> Lucy
>>>>>>>
>>>>>>> On Wed, May 13, 2015 at 12:22 PM, Lucy Chen <
>>>>>>> lucychen2014fall@gmail.com> wrote:
>>>>>>>
>>>>>>>> Hi all,
>>>>>>>>
>>>>>>>>          I had a join step in my crunch pipeline, and it
looks like
>>>>>>>> the following:
>>>>>>>>
>>>>>>>> //get label data
>>>>>>>>
>>>>>>>> PType<Labels> LabelsType = Avros.records(Labels.class);
>>>>>>>>
>>>>>>>> PCollection<Labels> training_labels = input.parallelDo(new
>>>>>>>> LabelDataParser(), LabelsType);
>>>>>>>>
>>>>>>>> PTable<String, Labels> labels_data = training_labels.
>>>>>>>>
>>>>>>>>  parallelDo(new KeyOnLabels("sample_ID"), tableOf(strings(),
>>>>>>>> LabelsType));
>>>>>>>>
>>>>>>>> //get features
>>>>>>>>
>>>>>>>> PType<Feats> FeatsType = Avros.records(Feats.class);
>>>>>>>>
>>>>>>>> PCollection<Feats> training_feats = Feature.FeatLoader(pipeline,
>>>>>>>> sample_features_inputs);
>>>>>>>>
>>>>>>>> PTable<String, Feats> feats_data = training_feats.parallelDo(new
>>>>>>>> KeyOnFeats("sample_ID"), tableOf(strings(), FeatsType));
>>>>>>>>
>>>>>>>>
>>>>>>>> //join labels and features
>>>>>>>>
>>>>>>>> JoinStrategy<String, Labels, Feats> strategy = new
>>>>>>>> DefaultJoinStrategy<String, Labels, Feats>(20);
>>>>>>>>
>>>>>>>> PTable<String, Pair<Labels, Feats>> joined_training
= strategy.
>>>>>>>>
>>>>>>>> join(labels_data, feats_data, JoinType.INNER_JOIN);
>>>>>>>>
>>>>>>>>
>>>>>>>> //class Labels
>>>>>>>>
>>>>>>>> public class Labels implements java.io.Serializable, Cloneable{
>>>>>>>>
>>>>>>>>  private String class_ID;
>>>>>>>>
>>>>>>>> private String sample_ID;
>>>>>>>>
>>>>>>>> private int binary_ind;
>>>>>>>>
>>>>>>>>  public Labels()
>>>>>>>>
>>>>>>>> {
>>>>>>>>
>>>>>>>> this(null, null, 0);
>>>>>>>>
>>>>>>>> }
>>>>>>>>
>>>>>>>>          public Labels(String class_ID, String sample_ID,
int ind)
>>>>>>>>
>>>>>>>> {
>>>>>>>>
>>>>>>>> this.class_ID = class_ID;
>>>>>>>>
>>>>>>>> this.sample_ID = sample_ID;
>>>>>>>>
>>>>>>>> this.binary_ind = ind;
>>>>>>>>
>>>>>>>> }
>>>>>>>>
>>>>>>>>         ...
>>>>>>>>
>>>>>>>> }
>>>>>>>>
>>>>>>>>
>>>>>>>> //class Feats
>>>>>>>>
>>>>>>>>
>>>>>>>> public class *Feats* implements java.io.Serializable, Cloneable{
>>>>>>>>
>>>>>>>>  private String sample_id;
>>>>>>>>
>>>>>>>> private String sample_name;
>>>>>>>>
>>>>>>>> private Map<String, Float> feat;
>>>>>>>>
>>>>>>>>  public Feats()
>>>>>>>>
>>>>>>>> {
>>>>>>>>
>>>>>>>> this(null, null, null);
>>>>>>>>
>>>>>>>> }
>>>>>>>>
>>>>>>>>  public Feats(String id, String name, Map<String, Float>
feat)
>>>>>>>>
>>>>>>>> {
>>>>>>>>
>>>>>>>> this.sample_id = id;
>>>>>>>>
>>>>>>>> this.sample_name = name;
>>>>>>>>
>>>>>>>> this.feat = feat;
>>>>>>>>
>>>>>>>>  }
>>>>>>>>
>>>>>>>>        ...
>>>>>>>>
>>>>>>>>
>>>>>>>> }
>>>>>>>>
>>>>>>>>
>>>>>>>>    The outputs of labels_data and feats_data are both fine;
but
>>>>>>>> the join step throws the following exception:
>>>>>>>>
>>>>>>>>
>>>>>>>> Error: java.lang.ClassCastException: java.lang.String cannot
be
>>>>>>>> cast to org.apache.crunch.Pair at
>>>>>>>> org.apache.crunch.lib.join.DefaultJoinStrategy$1.map(DefaultJoinStrategy.java:87)
>>>>>>>> at org.apache.crunch.MapFn.process(MapFn.java:34) at
>>>>>>>> org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:98)
at
>>>>>>>> org.apache.crunch.impl.mr.emit.IntermediateEmitter.emit(IntermediateEmitter.java:56)
>>>>>>>> at org.apache.crunch.MapFn.process(MapFn.java:34) at
>>>>>>>> org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:98)
at
>>>>>>>> org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:109)
at
>>>>>>>> org.apache.crunch.impl.mr.run.CrunchMapper.map(CrunchMapper.java:60)
at
>>>>>>>> org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145) at
>>>>>>>> org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:763)
at
>>>>>>>> org.apache.hadoop.mapred.MapTask.run(MapTask.java:339) at
>>>>>>>> org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:162)
at
>>>>>>>> java.security.AccessController.doPrivileged(Native Method)
at
>>>>>>>> javax.security.auth.Subject.doAs(Subject.java:415) at
>>>>>>>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491)
>>>>>>>> at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:157)
>>>>>>>>
>>>>>>>>
>>>>>>>>        This issue already bothered me for a while; Did any
one get
>>>>>>>> a similar issue here? Is there another option that will solve
it?
>>>>>>>>
>>>>>>>>
>>>>>>>>       Btw, I already successfully ran the following joining
job:
>>>>>>>>
>>>>>>>>
>>>>>>>> JoinStrategy<String, Float, Tuple3<String, String,
Float>> strategy
>>>>>>>> = new DefaultJoinStrategy<String, Float, Tuple3<String,
String,
>>>>>>>> Float>>(100);
>>>>>>>>
>>>>>>>>  PTable<String, Pair<Float,Tuple3<String, String,
Float>>> joined
>>>>>>>> = strategy.join(input_A, input_B, JoinType.INNER_JOIN);
>>>>>>>>
>>>>>>>>
>>>>>>>>    So I guess the issue may be still related to the Avro
types
>>>>>>>> that I defined.
>>>>>>>>
>>>>>>>>
>>>>>>>>         Thanks for your advice.
>>>>>>>>
>>>>>>>>
>>>>>>>> Lucy
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>>
>>> --
>>> Director of Data Science
>>> Cloudera <http://www.cloudera.com>
>>> Twitter: @josh_wills <http://twitter.com/josh_wills>
>>>
>>
>>
>

Mime
View raw message