crunch-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "David Whiting (JIRA)" <>
Subject [jira] [Created] (CRUNCH-301) Cogrouping tables where RHS has a Scala tuple value type causes duplicated and missing RHS values
Date Wed, 20 Nov 2013 15:45:37 GMT
David Whiting created CRUNCH-301:

             Summary: Cogrouping tables where RHS has a Scala tuple value type causes duplicated
and missing RHS values
                 Key: CRUNCH-301
             Project: Crunch
          Issue Type: Bug
          Components: Scrunch
    Affects Versions: 0.8.0
         Environment: Hadoop 2
            Reporter: David Whiting

Suppose you have three record types, Rec1, Rec2 and Rec3.
Rec1 references Rec2 via key1, and Rec2 references Rec3 (one-to-many) by key2. If you innerJoin
Rec2 and Rec3 to make a PCollection[(Rec2,Rec3)] and they cogroup it against Rec1, then instead
of surfacing n different (Rec2,Rec3) tuples applicable to the Rec1, it surfaces just one of
the (Rec2, Rec3) tuples multiple times.

This only happens when running with MRPipeline, and not with MemPipeline.

This is the simplest complete program I could come up with which will produce this unexpected

package testcases

import{From, To}
import org.apache.crunch.scrunch.PCollection
import org.apache.crunch.types.avro.{ReflectDataFactory, Avros}
import org.apache.avro.file.DataFileWriter
import org.apache.hadoop.fs.{Path, FSDataOutputStream}
import org.apache.hadoop.conf.Configuration

object IsolatedBug {

  case class Rec1(var k: Int, var v: String) { def this() = this(0, "") }
  case class Rec2(var k: Int, var k2: String, var v: Double) { def this() = this(0, "", 0.0)
  case class Rec3(var k2: String, var v:Int) { def this() = this("", 0)}
  def run() {
    val prefix = "/user/davw/tmp/isolation"

    val ones = Seq(Rec1(1, "hello"), Rec1(1, "tjena"), Rec1(2, "goodbye"))
    val twos = Seq(Rec2(1, "a", 0.4), Rec2(1, "a", 0.5), Rec2(1, "b", 0.6), Rec2(1, "b", 0.7),
Rec2(2, "c", 9.9))
    val threes = Seq(Rec3("a", 4), Rec3("b", 5), Rec3("c", 6))

    writeCollection(new Path(prefix + "/ones"), ones)
    writeCollection(new Path(prefix + "/twos"), twos)
    writeCollection(new Path(prefix + "/threes"), threes)

    val pipe = new MRPipeline(getClass)
    val oneF = new PCollection( + "/ones", Avros.reflects(classOf[Rec1]))))
    val twoF = new PCollection( + "/twos", Avros.reflects(classOf[Rec2]))))
    val threeF = new PCollection( + "/threes", Avros.reflects(classOf[Rec3]))))
        {case (one, twothree) =>
          (one ++ twothree)
            .reduce((a,b) => a + "\t" + b)})
      .write(To.textFile(prefix + "/output"))



  def writeCollection(path: Path, records: Iterable[_ <: AnyRef]) {
    writeAvroFile(path.getFileSystem(new Configuration()).create(path, true), records)

  @SuppressWarnings(Array("rawtypes", "unchecked"))
  private def writeAvroFile[T <: AnyRef](outputStream: FSDataOutputStream, records: Iterable[T])
    val r: AnyRef =
    val schema = new ReflectDataFactory().getReflectData.getSchema(r.getClass)

    val writer = new ReflectDataFactory().getWriter[T](schema)
    val dataFileWriter = new DataFileWriter(writer)
    dataFileWriter.create(schema, outputStream)

    for (record <- records) {

  def main(args: Seq[String]) { run() }

The result that is produced is:
Rec1(1,tjena)	Rec1(1,hello)	(Rec2(1,a,0.5),Rec3(a,4))	(Rec2(1,a,0.5),Rec3(a,4))	(Rec2(1,a,0.5),Rec3(a,4))
Rec1(2,goodbye)	(Rec2(2,c,9.9),Rec3(c,6))

As you can see, there's a single (Rec2, Rec3) tuple repeated many times, instead of showing
all the distinct ones. This does not happen if you join against Rec2 on its own.

This message was sent by Atlassian JIRA

View raw message