crunch-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Gabriel Reid (JIRA)" <>
Subject [jira] [Updated] (CRUNCH-607) MemGroupedTable - iterator() can only be called once on this Iterable
Date Mon, 02 May 2016 15:36:13 GMT


Gabriel Reid updated CRUNCH-607:
    Attachment: CRUNCH-607.patch

Patch to fix this. 

The issue is/was that MemCollection creates a materialized copy of its underlying Iterable
of values in the constructor, which means that reuse of a PGroupedTable to stem off multiple
PCollections will result in this situation. The fix is just to not materialized the iterable
unless its actually needed.

> MemGroupedTable - iterator() can only be called once on this Iterable
> ---------------------------------------------------------------------
>                 Key: CRUNCH-607
>                 URL:
>             Project: Crunch
>          Issue Type: Bug
>          Components: Core
>    Affects Versions: 0.8.4
>            Reporter: Nathan Schile
>            Assignee: Josh Wills
>         Attachments: CRUNCH-607.patch
> From the user mailing list:
> I'm running into a "java.lang.IllegalStateException: iterator() can only be called once
on this Iterable" [1] when running a unit test that 
> utilizes a MemCollection. The pipeline appears to runs fine when running on a cluster.
I have a PGroupedTable that I am running multiple operations (mapValues, ungroup) [2] on that
is causing the exception. The mapValues and the ungroup operations are run in separate jobs
on the cluster, so I don't believe I should be running into iterator issues. This constraint
was introduced by CRUNCH-192 [3]. 
> Gabriel Reid stated "Looks like the MemPipeline is doing something wrong when multiple
PCollections are created from a single PCollection." [4]
> [1]
> apache-crunch-0.8.4 branch
> iterator() can only be called once on this Iterable
> java.lang.IllegalStateException: iterator() can only be called once on this Iterable
> 	at org.apache.crunch.impl.SingleUseIterable.iterator(
> 	at org.apache.crunch.impl.mem.collect.MemGroupedTable$UngroupFn.process(
> 	at org.apache.crunch.impl.mem.collect.MemGroupedTable$UngroupFn.process(
> 	at org.apache.crunch.impl.mem.collect.MemCollection.parallelDo(
> 	at org.apache.crunch.impl.mem.collect.MemCollection.parallelDo(
> 	at org.apache.crunch.impl.mem.collect.MemGroupedTable.ungroup(
> [2]
> PGroupedTable<String, Item> itemsByPersonId =
>         AvroCollections.keyByAvroField(items, "person_id", Avros.strings()).groupByKey();
> PTable<String, Long> newestOrderByPersonId =
>         itemsByPersonId
>             .mapValues("Find latest order for each person", new MaxOrderMapFn(), Avros.longs());
> return itemsByPersonId.ungroup().join(newestOrderByPersonId).parallelDo(
>         "Filter old items, leaving only current order items",
>         new PreviousOrderItemFilter(),
>         Avros.tableOf(Avros.strings(), Avros.specifics(Item.class)));
> [3]
> [4]

This message was sent by Atlassian JIRA

View raw message