crunch-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Micah Whitacre <mkwhita...@gmail.com>
Subject Re: Problem Avro to PTable
Date Mon, 01 Aug 2016 19:48:42 GMT
Correct.  That's what I get for writing code in an email.  :)

On Mon, Aug 1, 2016 at 2:45 PM, Masf <masfworld@gmail.com> wrote:

> Hi.
> I suppose that it will be "Avros.tableOf(Avros.strings(), Avros.strings())"
> instead of "Avros.tableOf(String, String)"
> Right?
>
> Thanks.
> Miguel.
>
> On Mon, Aug 1, 2016 at 9:13 PM, Micah Whitacre <mkwhitacre@gmail.com>
> wrote:
>
>> Try changing this from:
>> *final* PTable<String, String> bhSide = bh.by(*new* BHExtractor(),
>> Writables.*strings*());
>>
>> to
>>
>> final PTable<String, String> bhSide = bh.parallelDo(new KeyExtractor(),
>> Avros.tableOf(String, String));
>>
>> public class KeyExtractor extends MapFn<String, Pair<String, String>>{
>> public Pair<String, String> map(String input){
>>     String key = ...;
>>     return Pair.of(key, input);
>> }
>> }
>>
>> This will let you avoid mixing PTypeFamilies.  I'm guessing you already
>> have most of the code but instead of just emitting the Key you emit the
>> pair.
>>
>> On Mon, Aug 1, 2016 at 2:06 PM, Masf <masfworld@gmail.com> wrote:
>>
>>> Hi.
>>> Thanks for the reply.
>>> As you said, it works when I execute "by" method with "Avros.strings()".
>>> However, it fails when I try to build the join
>>>
>>> *final* JoinStrategy<String, myAvro2, String> strategy = *new*
>>> DefaultJoinStrategy<>();
>>> *final* PTable<String, Pair<myAvro2, String>> joined = strategy.join(
>>> positionSide, bhSide, JoinType.*LEFT_OUTER_JOIN*); <-- It fails
>>>
>>> The exception is produced when the job executes the las statement:
>>>
>>> Caused by: java.lang.ClassCastException:
>>> org.apache.crunch.types.writable.WritableType cannot be cast to
>>> org.apache.crunch.types.avro.AvroType
>>>
>>>         at
>>> org.apache.crunch.types.avro.Avros.createTupleSchema(Avros.java:831)
>>>
>>>         at
>>> org.apache.crunch.types.avro.Avros.createTupleSchema(Avros.java:818)
>>>
>>>         at org.apache.crunch.types.avro.Avros.pairs(Avros.java:622)
>>>
>>>         at
>>> org.apache.crunch.types.avro.AvroTypeFamily.pairs(AvroTypeFamily.java:116)
>>>
>>>         at
>>> org.apache.crunch.lib.join.DefaultJoinStrategy.preJoin(DefaultJoinStrategy.java:84)
>>>
>>>         at
>>> org.apache.crunch.lib.join.DefaultJoinStrategy.join(DefaultJoinStrategy.java:73)
>>>
>>>         at
>>> org.apache.crunch.lib.join.DefaultJoinStrategy.join(DefaultJoinStrategy.java:52)
>>>
>>>         at com.db.myapp.driver.myapp.run(myapp.java:66)
>>>
>>>
>>>
>>> On Mon, Aug 1, 2016 at 7:31 PM, Micah Whitacre <mkwhitacre@gmail.com>
>>> wrote:
>>>
>>>> You cannot mix PTypeFamilies in a single PType.  In this case change:
>>>>
>>>> *final* PTable<String, myAvro2> positionSide = vp.by(*new* BHExtractorAvro(),
>>>> Writables.*strings*());
>>>>
>>>> to
>>>>
>>>> *final* PTable<String, myAvro2> positionSide = vp.by(*new* BHExtractorAvro(),
>>>> Avros.*strings*());
>>>>
>>>> I'm not sure if you'll hit the same problem when you join but in the
>>>> code you provided that's the reason for the exception.
>>>>
>>>> On Mon, Aug 1, 2016 at 12:21 PM, Masf <masfworld@gmail.com> wrote:
>>>>
>>>>> Hi.
>>>>>
>>>>> I'm trying to build a join between a csv and avro. First I get csv to
>>>>> pcollection:
>>>>>
>>>>> *final* PCollection<String> bh = pipeline.readTextFile("/pathcsv/");
>>>>>
>>>>>
>>>>> Second, I read the avro file and later I do a transformation
>>>>>
>>>>> *final* PCollection<MyAvro> gp = pipeline.read(From.*avroFile*(
>>>>> inputPath,Avros.*specifics*(myAvro.*class*)));
>>>>>
>>>>> *final* PCollection<myAvro2> vp = gp.parallelDo("trans", *new*
>>>>> MapTrasnf(), Avros.*records*(myAvro2.*class*));
>>>>>
>>>>> *Before to make the join, I extract keys:*
>>>>>
>>>>> *final* PTable<String, String> bhSide = bh.by(*new* BHExtractor(),
>>>>> Writables.*strings*());
>>>>>
>>>>> *final* PTable<String, myAvro2> positionSide = vp.by(*new*
>>>>> BHExtractorAvro(), Writables.*strings*());
>>>>>
>>>>> Applying "by" method to Avro PCollection returns an exception and I
>>>>> don't know why:
>>>>>
>>>>> Caused by: java.lang.ClassCastException:
>>>>> org.apache.crunch.types.writable.WritableType cannot be cast to
>>>>> org.apache.crunch.types.avro.AvroType
>>>>>
>>>>>         at org.apache.crunch.types.avro.Avros.tableOf(Avros.java:895)
>>>>>
>>>>>         at
>>>>> org.apache.crunch.types.avro.AvroTypeFamily.tableOf(AvroTypeFamily.java:136)
>>>>>
>>>>>         at org.apache.crunch.impl.dist.collect.PCollectionImpl.by
>>>>> (PCollectionImpl.java:270)
>>>>>
>>>>>         at com.db.myapp.driver.myapp.run(myapp.java:62)
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>>
>>>>>
>>>>> Regards.
>>>>> Miguel Ángel
>>>>>
>>>>
>>>>
>>>
>>>
>>> --
>>>
>>>
>>> Saludos.
>>> Miguel Ángel
>>>
>>
>>
>
>
> --
>
>
> Saludos.
> Miguel Ángel
>

Mime
View raw message