crunch-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Kristoffer Sjögren <sto...@gmail.com>
Subject Re: How to count MemPipeline?
Date Wed, 11 Mar 2015 17:33:28 GMT
Makes sense. Cheers for the elaborate explanation.


On Wed, Mar 11, 2015 at 4:21 PM, David Ortiz <dpo5003@gmail.com> wrote:

> I believe it's because the Iterable type is not a Writable type.  It's
> like when you're doing a SQL group by.  Any field you aren't grouping by
> has to have an aggregate function to get an output.  In the case of Crunch,
> groupByKey does the first part of that, but you still need to define your
> aggregate function (max, min, first, sum, etc.) before the operation is
> completed and ready to write out.
>
> Another way to look at it, is whatever job you are doing, that
> PGroupedTable you get from groupByKey is the input to the Reducer, so you
> still need to actually do your reduce function before you get the final
> output.
>
> On Wed, Mar 11, 2015 at 11:01 AM Kristoffer Sjögren <stoffe@gmail.com>
> wrote:
>
>> Ah, that's it, thanks!
>>
>> Still, i'm not sure why PGroupedTable.count() fails complaining about
>> WritableType. It seems to be the natural thing to do? The error message is
>> also confusing since the key type i'm using is in fact WritableType.
>>
>>
>>
>> On Wed, Mar 11, 2015 at 3:39 PM, David Ortiz <dpo5003@gmail.com> wrote:
>>
>>> Oh!  If what you want is the count of each unique combination of
>>> key/input, try changing the output from tableOf to pairs, so you get a
>>> PCollection<Pair<String, String>>, then you can do a count on that
>>> collection to get the count of each unique pair.
>>>
>>> On Wed, Mar 11, 2015 at 10:15 AM David Ortiz <dpo5003@gmail.com> wrote:
>>>
>>>> Ah.  Fair enough.  To get that effect, you will need to do a combine
>>>> function I think.  Under the hood, that PGroupedTable groupByKey gives you
>>>> something like PCollection<String, Iterable<String>>.  Off hand,
I don't
>>>> know of a Writable type for Iterable, so my guess is you need to take care
>>>> of that before the count.
>>>>
>>>> On Wed, Mar 11, 2015 at 9:51 AM Kristoffer Sjögren <stoffe@gmail.com>
>>>> wrote:
>>>>
>>>>> The example is incomplete.
>>>>>
>>>>> In reality I parse keys from the string and want to count number of
>>>>> occurrences for each unique key combination.
>>>>>
>>>>> On Wed, Mar 11, 2015 at 2:44 PM, David Ortiz <dpo5003@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Kristoffer,
>>>>>>
>>>>>>       Based on that code snippet, why not just do:
>>>>>>
>>>>>> PCollection<String> lines = MemPipeline.typedCollectionOf(Writables.strings(),
>>>>>> input);
>>>>>> PTable<String, Long> lineCount = lines.count();
>>>>>>
>>>>>> Since the initial snippet is just creating a pair with two copies
of
>>>>>> the input string, I believe that would accomplish what you're after.
 If
>>>>>> you need the String twice with the count you could add a MapFn afterwards
>>>>>> to create whatever Tuple structure you need.
>>>>>>
>>>>>> Thanks,
>>>>>>      Dave
>>>>>>
>>>>>>
>>>>>> On Wed, Mar 11, 2015 at 9:41 AM Kristoffer Sjögren <stoffe@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Micah
>>>>>>>
>>>>>>> Ah yes, i'm using the static import from Writables.string().
>>>>>>>
>>>>>>> Cheers,
>>>>>>> -Kristoffer
>>>>>>>
>>>>>>> On Wed, Mar 11, 2015 at 2:29 PM, Micah Whitacre <
>>>>>>> mkwhitacre@gmail.com> wrote:
>>>>>>>
>>>>>>>> Kristoffer,
>>>>>>>>   What PTypeFamily are you using for the "tableOf(strings(),
>>>>>>>> strings())"?  It looks like you are using Writables.strings()
up above but
>>>>>>>> looks like you are using static imports down below so wasn't
sure if you
>>>>>>>> had switched to AvroTypeFamily instead.
>>>>>>>>
>>>>>>>> Micah
>>>>>>>>
>>>>>>>> On Wed, Mar 11, 2015 at 8:17 AM, Kristoffer Sjögren <
>>>>>>>> stoffe@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Hi
>>>>>>>>>
>>>>>>>>> I'm trying to count the occurrence of a key in a grouped
table.
>>>>>>>>> But the following code snippet [1] fails [2] when calling
count() on a
>>>>>>>>> MemPipeline in version 0.8.2+71-cdh4.6.0.
>>>>>>>>>
>>>>>>>>> Am I using the API incorrectly or is this a bug?
>>>>>>>>>
>>>>>>>>> Cheers,
>>>>>>>>> -Kristoffer
>>>>>>>>>
>>>>>>>>> [1]
>>>>>>>>>
>>>>>>>>> PCollection<String> lines = MemPipeline.typedCollectionOf(Writables.strings(),
>>>>>>>>> input);
>>>>>>>>> lines.parallelDo(new DoFn<String, Pair<String,
String>>() {
>>>>>>>>>   @Override
>>>>>>>>>   public void process(String input, Emitter<Pair<String,
String>>
>>>>>>>>> emitter) {
>>>>>>>>>     emitter.emit(Pair.of(input, input));
>>>>>>>>>   }
>>>>>>>>> }, tableOf(strings(), strings()))
>>>>>>>>> .groupByKey()
>>>>>>>>> .count();
>>>>>>>>>
>>>>>>>>> [2]
>>>>>>>>>
>>>>>>>>> java.lang.IllegalArgumentException: Key type must be
of class
>>>>>>>>> WritableType
>>>>>>>>> at org.apache.crunch.types.writable.Writables.tableOf(
>>>>>>>>> Writables.java:351)
>>>>>>>>> at org.apache.crunch.types.writable.WritableTypeFamily.
>>>>>>>>> tableOf(WritableTypeFamily.java:95)
>>>>>>>>> at org.apache.crunch.lib.Aggregate.count(Aggregate.java:65)
>>>>>>>>> at org.apache.crunch.lib.Aggregate.count(Aggregate.java:56)
>>>>>>>>> at org.apache.crunch.impl.mem.collect.MemCollection.count(
>>>>>>>>> MemCollection.java:230)
>>>>>>>>> at mapred.functions.FunctionsTest.testGroupActionCount(
>>>>>>>>> FunctionsTest.java:79)
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>
>>

Mime
View raw message