crunch-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Tycho Lamerigts (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (CRUNCH-485) groupByKey on Spark incorrect if key is Avro record with defined sort order
Date Wed, 07 Jan 2015 08:54:34 GMT

    [ https://issues.apache.org/jira/browse/CRUNCH-485?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14267428#comment-14267428
] 

Tycho Lamerigts commented on CRUNCH-485:
----------------------------------------

I did indeed try requireSortedKeys() but it merely sorts the results *after* grouping. In
other words, the grouping is not affected and is still incorrect.

Note that this is not about sorting per se, the groupByKey needs to determine if key objects
are _equal_ and Avro objects use sort order to determine equality too. See example below.
Crunch-on-MapReduce takes this into account, whereas Crunch-on-Spark doesn't. I do not know
if the difference is intentional, but it definitely is inconsistent and I would argue that
Crunch-on-MapReduce behavior is the desired behavior.

{code}
package example;

import com.google.common.collect.Lists;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Field;
import org.apache.avro.Schema.Field.Order;
import org.apache.avro.Schema.Type;
import org.apache.avro.generic.GenericData.Record;
import org.apache.avro.generic.GenericRecordBuilder;
import org.codehaus.jackson.node.JsonNodeFactory;
import org.junit.Test;
import static org.junit.Assert.assertEquals;

public class AvroEqualsTest {

    @Test
    public void fieldsWithIgnoredSortOrderAreNotUsedInEquals() {
        Schema mySchema = Schema.createRecord(Lists.newArrayList(new Field("field1",
                Schema.create(Type.STRING),
                null,
                JsonNodeFactory.instance.textNode(""),
                Order.ASCENDING), new Field("field2",
                Schema.create(Type.STRING),
                null,
                JsonNodeFactory.instance.textNode(""),
                Order.IGNORE)));

        GenericRecordBuilder myGRB = new GenericRecordBuilder(mySchema);
        Record myRecord1 = myGRB.set("field1", "hello").set("field2", "world").build();
        Record myRecord2 = myGRB.set("field1", "hello").set("field2", "there").build();
        assertEquals(myRecord1, myRecord2);
    }
}

{code}

> groupByKey on Spark incorrect if key is Avro record with defined sort order
> ---------------------------------------------------------------------------
>
>                 Key: CRUNCH-485
>                 URL: https://issues.apache.org/jira/browse/CRUNCH-485
>             Project: Crunch
>          Issue Type: Bug
>          Components: Core
>    Affects Versions: 0.11.0
>            Reporter: Tycho Lamerigts
>            Assignee: Josh Wills
>
> GroupByKey on Spark is incorrect if the key type is an Avro record with defined sort
order (http://avro.apache.org/docs/1.7.7/spec.html#order).
> Instead, it serializes the entire avro record to a binary blob (byte array) and groups
identical blobs. This is wrong. By contrast, groupByKey on MapReduce works as expected, so
it does take Avro's sort order into account.
> The culprit is probably the following code from org.apache.crunch.impl.spark.collect.PGroupedTableImpl#getJavaRDDLikeInternal
> {code}
> groupedRDD = parentRDD.map(new PairMapFunction(ptype.getOutputMapFn(), runtime.getRuntimeContext()))
>           .mapToPair(new MapOutputFunction(keySerde, valueSerde))
>           .groupByKey(numPartitions);
> {code}
> where MapOutputFunction simply converts the entire key object to a binary blob, without
taking sort order into account.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message