crunch-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Mikael Goldmann (JIRA)" <j...@apache.org>
Subject [jira] [Comment Edited] (CRUNCH-601) Short PCollections in SparkPipeline get length null.
Date Wed, 17 Aug 2016 20:44:20 GMT

    [ https://issues.apache.org/jira/browse/CRUNCH-601?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15425337#comment-15425337
] 

Mikael Goldmann edited comment on CRUNCH-601 at 8/17/16 8:44 PM:
-----------------------------------------------------------------

This (on top of the patch) passes the test even for length zero (but there may of course be
other corner cases as long as one trusts getSize() == 0).
{code}
  // In crunch-core/src/main/java/org/apache/crunch/lib/Aggregate.java
  public static <S> PObject<Long> length(PCollection<S> collect) {
    PTypeFamily tf = collect.getTypeFamily();
    PTable<Integer, Long> countTable = collect
        .parallelDo("Aggregate.count", new MapFn<S, Pair<Integer, Long>>() {
          public Pair<Integer, Long> map(S input) {
            return Pair.of(1, 1L);
          }
          public void cleanup(Emitter<Pair<Integer, Long>> e) {
            e.emit(Pair.of(1, 0L));
          }
        }, tf.tableOf(tf.ints(), tf.longs()))
        .groupByKey(GroupingOptions.builder().numReducers(1).build())
        .combineValues(Aggregators.SUM_LONGS());
    PCollection<Long> count = countTable.values();
    // Stuff below is what's new
    final FirstElementPObject<Long> first = new FirstElementPObject<>(count);
    return new PObject<Long>() {
      @Override
      public Long getValue() {
        final Long value = first.getValue();
        return value == null ? 0 : value;
      }
    };
  }
{code}


was (Author: migoldmann):
This (on top of the patch) passes the test even for length zero (but there may of course be
other corner cases as long as one trusts getSize() == 0).
{code}
  // In crunch-core/src/main/java/org/apache/crunch/lib/Aggregate.java
  public static <S> PObject<Long> length(PCollection<S> collect) {
    PTypeFamily tf = collect.getTypeFamily();
    PTable<Integer, Long> countTable = collect
        .parallelDo("Aggregate.count", new MapFn<S, Pair<Integer, Long>>() {
          public Pair<Integer, Long> map(S input) {
            return Pair.of(1, 1L);
          }
          public void cleanup(Emitter<Pair<Integer, Long>> e) {
            e.emit(Pair.of(1, 0L));
          }
        }, tf.tableOf(tf.ints(), tf.longs()))
        .groupByKey(GroupingOptions.builder().numReducers(1).build())
        .combineValues(Aggregators.SUM_LONGS());
    PCollection<Long> count = countTable.values();
    final FirstElementPObject<Long> first = new FirstElementPObject<>(count);
    return new PObject<Long>() {
      @Override
      public Long getValue() {
        final Long value = first.getValue();
        return value == null ? 0 : value;
      }
    };
  }
{code}

> Short PCollections in SparkPipeline get length null.
> ----------------------------------------------------
>
>                 Key: CRUNCH-601
>                 URL: https://issues.apache.org/jira/browse/CRUNCH-601
>             Project: Crunch
>          Issue Type: Bug
>          Components: Spark
>    Affects Versions: 0.13.0
>         Environment: Running in local mode on Mac as well as in a ubuntu 14.04 docker
container
>            Reporter: Mikael Goldmann
>            Priority: Minor
>         Attachments: CRUNCH-601.patch, SmallCollectionLengthTest.java
>
>
> I'll attach a file with a test that I would expect to pass but which fails.
> It creates five PCollection<String> of lengths 0, 1, 2, 3, 4 gets the lengths,
runs the pipeline and prints the lengths. Finally it asserts that all lengths are non-null.
> I would expect it to print lengths 0, 1, 2, 3, 4 and pass.
> What it does is print lengths null, null, null, 3, 4 and fail.
> I think the underlying reason is the use of getSize() on an unmaterialized object and
assuming that when the estimate that getSize() returns is 0, then the PCollection is guaranteed
to be empty, which is false in some cases.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message