crunch-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Masf <masfwo...@gmail.com>
Subject Re: Problem Avro to PTable
Date Mon, 01 Aug 2016 19:45:04 GMT
Hi.
I suppose that it will be "Avros.tableOf(Avros.strings(), Avros.strings())"
instead of "Avros.tableOf(String, String)"
Right?

Thanks.
Miguel.

On Mon, Aug 1, 2016 at 9:13 PM, Micah Whitacre <mkwhitacre@gmail.com> wrote:

> Try changing this from:
> *final* PTable<String, String> bhSide = bh.by(*new* BHExtractor(),
> Writables.*strings*());
>
> to
>
> final PTable<String, String> bhSide = bh.parallelDo(new KeyExtractor(),
> Avros.tableOf(String, String));
>
> public class KeyExtractor extends MapFn<String, Pair<String, String>>{
> public Pair<String, String> map(String input){
>     String key = ...;
>     return Pair.of(key, input);
> }
> }
>
> This will let you avoid mixing PTypeFamilies.  I'm guessing you already
> have most of the code but instead of just emitting the Key you emit the
> pair.
>
> On Mon, Aug 1, 2016 at 2:06 PM, Masf <masfworld@gmail.com> wrote:
>
>> Hi.
>> Thanks for the reply.
>> As you said, it works when I execute "by" method with "Avros.strings()".
>> However, it fails when I try to build the join
>>
>> *final* JoinStrategy<String, myAvro2, String> strategy = *new*
>> DefaultJoinStrategy<>();
>> *final* PTable<String, Pair<myAvro2, String>> joined = strategy.join(
>> positionSide, bhSide, JoinType.*LEFT_OUTER_JOIN*); <-- It fails
>>
>> The exception is produced when the job executes the las statement:
>>
>> Caused by: java.lang.ClassCastException:
>> org.apache.crunch.types.writable.WritableType cannot be cast to
>> org.apache.crunch.types.avro.AvroType
>>
>>         at
>> org.apache.crunch.types.avro.Avros.createTupleSchema(Avros.java:831)
>>
>>         at
>> org.apache.crunch.types.avro.Avros.createTupleSchema(Avros.java:818)
>>
>>         at org.apache.crunch.types.avro.Avros.pairs(Avros.java:622)
>>
>>         at
>> org.apache.crunch.types.avro.AvroTypeFamily.pairs(AvroTypeFamily.java:116)
>>
>>         at
>> org.apache.crunch.lib.join.DefaultJoinStrategy.preJoin(DefaultJoinStrategy.java:84)
>>
>>         at
>> org.apache.crunch.lib.join.DefaultJoinStrategy.join(DefaultJoinStrategy.java:73)
>>
>>         at
>> org.apache.crunch.lib.join.DefaultJoinStrategy.join(DefaultJoinStrategy.java:52)
>>
>>         at com.db.myapp.driver.myapp.run(myapp.java:66)
>>
>>
>>
>> On Mon, Aug 1, 2016 at 7:31 PM, Micah Whitacre <mkwhitacre@gmail.com>
>> wrote:
>>
>>> You cannot mix PTypeFamilies in a single PType.  In this case change:
>>>
>>> *final* PTable<String, myAvro2> positionSide = vp.by(*new* BHExtractorAvro(),
>>> Writables.*strings*());
>>>
>>> to
>>>
>>> *final* PTable<String, myAvro2> positionSide = vp.by(*new* BHExtractorAvro(),
>>> Avros.*strings*());
>>>
>>> I'm not sure if you'll hit the same problem when you join but in the
>>> code you provided that's the reason for the exception.
>>>
>>> On Mon, Aug 1, 2016 at 12:21 PM, Masf <masfworld@gmail.com> wrote:
>>>
>>>> Hi.
>>>>
>>>> I'm trying to build a join between a csv and avro. First I get csv to
>>>> pcollection:
>>>>
>>>> *final* PCollection<String> bh = pipeline.readTextFile("/pathcsv/");
>>>>
>>>>
>>>> Second, I read the avro file and later I do a transformation
>>>>
>>>> *final* PCollection<MyAvro> gp = pipeline.read(From.*avroFile*(
>>>> inputPath,Avros.*specifics*(myAvro.*class*)));
>>>>
>>>> *final* PCollection<myAvro2> vp = gp.parallelDo("trans", *new*
>>>> MapTrasnf(), Avros.*records*(myAvro2.*class*));
>>>>
>>>> *Before to make the join, I extract keys:*
>>>>
>>>> *final* PTable<String, String> bhSide = bh.by(*new* BHExtractor(),
>>>> Writables.*strings*());
>>>>
>>>> *final* PTable<String, myAvro2> positionSide = vp.by(*new*
>>>> BHExtractorAvro(), Writables.*strings*());
>>>>
>>>> Applying "by" method to Avro PCollection returns an exception and I
>>>> don't know why:
>>>>
>>>> Caused by: java.lang.ClassCastException:
>>>> org.apache.crunch.types.writable.WritableType cannot be cast to
>>>> org.apache.crunch.types.avro.AvroType
>>>>
>>>>         at org.apache.crunch.types.avro.Avros.tableOf(Avros.java:895)
>>>>
>>>>         at
>>>> org.apache.crunch.types.avro.AvroTypeFamily.tableOf(AvroTypeFamily.java:136)
>>>>
>>>>         at org.apache.crunch.impl.dist.collect.PCollectionImpl.by
>>>> (PCollectionImpl.java:270)
>>>>
>>>>         at com.db.myapp.driver.myapp.run(myapp.java:62)
>>>>
>>>>
>>>>
>>>> --
>>>>
>>>>
>>>> Regards.
>>>> Miguel Ángel
>>>>
>>>
>>>
>>
>>
>> --
>>
>>
>> Saludos.
>> Miguel Ángel
>>
>
>


-- 


Saludos.
Miguel Ángel

Mime
View raw message