crunch-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From David Ortiz <dpo5...@gmail.com>
Subject Re: How to count MemPipeline?
Date Wed, 11 Mar 2015 15:21:49 GMT
I believe it's because the Iterable type is not a Writable type.  It's like
when you're doing a SQL group by.  Any field you aren't grouping by has to
have an aggregate function to get an output.  In the case of Crunch,
groupByKey does the first part of that, but you still need to define your
aggregate function (max, min, first, sum, etc.) before the operation is
completed and ready to write out.

Another way to look at it, is whatever job you are doing, that
PGroupedTable you get from groupByKey is the input to the Reducer, so you
still need to actually do your reduce function before you get the final
output.

On Wed, Mar 11, 2015 at 11:01 AM Kristoffer Sjögren <stoffe@gmail.com>
wrote:

> Ah, that's it, thanks!
>
> Still, i'm not sure why PGroupedTable.count() fails complaining about
> WritableType. It seems to be the natural thing to do? The error message is
> also confusing since the key type i'm using is in fact WritableType.
>
>
>
> On Wed, Mar 11, 2015 at 3:39 PM, David Ortiz <dpo5003@gmail.com> wrote:
>
>> Oh!  If what you want is the count of each unique combination of
>> key/input, try changing the output from tableOf to pairs, so you get a
>> PCollection<Pair<String, String>>, then you can do a count on that
>> collection to get the count of each unique pair.
>>
>> On Wed, Mar 11, 2015 at 10:15 AM David Ortiz <dpo5003@gmail.com> wrote:
>>
>>> Ah.  Fair enough.  To get that effect, you will need to do a combine
>>> function I think.  Under the hood, that PGroupedTable groupByKey gives you
>>> something like PCollection<String, Iterable<String>>.  Off hand,
I don't
>>> know of a Writable type for Iterable, so my guess is you need to take care
>>> of that before the count.
>>>
>>> On Wed, Mar 11, 2015 at 9:51 AM Kristoffer Sjögren <stoffe@gmail.com>
>>> wrote:
>>>
>>>> The example is incomplete.
>>>>
>>>> In reality I parse keys from the string and want to count number of
>>>> occurrences for each unique key combination.
>>>>
>>>> On Wed, Mar 11, 2015 at 2:44 PM, David Ortiz <dpo5003@gmail.com> wrote:
>>>>
>>>>> Kristoffer,
>>>>>
>>>>>       Based on that code snippet, why not just do:
>>>>>
>>>>> PCollection<String> lines = MemPipeline.typedCollectionOf(Writables.strings(),
>>>>> input);
>>>>> PTable<String, Long> lineCount = lines.count();
>>>>>
>>>>> Since the initial snippet is just creating a pair with two copies of
>>>>> the input string, I believe that would accomplish what you're after.
 If
>>>>> you need the String twice with the count you could add a MapFn afterwards
>>>>> to create whatever Tuple structure you need.
>>>>>
>>>>> Thanks,
>>>>>      Dave
>>>>>
>>>>>
>>>>> On Wed, Mar 11, 2015 at 9:41 AM Kristoffer Sjögren <stoffe@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi Micah
>>>>>>
>>>>>> Ah yes, i'm using the static import from Writables.string().
>>>>>>
>>>>>> Cheers,
>>>>>> -Kristoffer
>>>>>>
>>>>>> On Wed, Mar 11, 2015 at 2:29 PM, Micah Whitacre <mkwhitacre@gmail.com
>>>>>> > wrote:
>>>>>>
>>>>>>> Kristoffer,
>>>>>>>   What PTypeFamily are you using for the "tableOf(strings(),
>>>>>>> strings())"?  It looks like you are using Writables.strings()
up above but
>>>>>>> looks like you are using static imports down below so wasn't
sure if you
>>>>>>> had switched to AvroTypeFamily instead.
>>>>>>>
>>>>>>> Micah
>>>>>>>
>>>>>>> On Wed, Mar 11, 2015 at 8:17 AM, Kristoffer Sjögren <
>>>>>>> stoffe@gmail.com> wrote:
>>>>>>>
>>>>>>>> Hi
>>>>>>>>
>>>>>>>> I'm trying to count the occurrence of a key in a grouped
table. But
>>>>>>>> the following code snippet [1] fails [2] when calling count()
on a
>>>>>>>> MemPipeline in version 0.8.2+71-cdh4.6.0.
>>>>>>>>
>>>>>>>> Am I using the API incorrectly or is this a bug?
>>>>>>>>
>>>>>>>> Cheers,
>>>>>>>> -Kristoffer
>>>>>>>>
>>>>>>>> [1]
>>>>>>>>
>>>>>>>> PCollection<String> lines = MemPipeline.typedCollectionOf(Writables.strings(),
>>>>>>>> input);
>>>>>>>> lines.parallelDo(new DoFn<String, Pair<String, String>>()
{
>>>>>>>>   @Override
>>>>>>>>   public void process(String input, Emitter<Pair<String,
String>>
>>>>>>>> emitter) {
>>>>>>>>     emitter.emit(Pair.of(input, input));
>>>>>>>>   }
>>>>>>>> }, tableOf(strings(), strings()))
>>>>>>>> .groupByKey()
>>>>>>>> .count();
>>>>>>>>
>>>>>>>> [2]
>>>>>>>>
>>>>>>>> java.lang.IllegalArgumentException: Key type must be of class
>>>>>>>> WritableType
>>>>>>>> at org.apache.crunch.types.writable.Writables.tableOf(
>>>>>>>> Writables.java:351)
>>>>>>>> at org.apache.crunch.types.writable.WritableTypeFamily.
>>>>>>>> tableOf(WritableTypeFamily.java:95)
>>>>>>>> at org.apache.crunch.lib.Aggregate.count(Aggregate.java:65)
>>>>>>>> at org.apache.crunch.lib.Aggregate.count(Aggregate.java:56)
>>>>>>>> at org.apache.crunch.impl.mem.collect.MemCollection.count(
>>>>>>>> MemCollection.java:230)
>>>>>>>> at mapred.functions.FunctionsTest.testGroupActionCount(
>>>>>>>> FunctionsTest.java:79)
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>
>

Mime
View raw message