crunch-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "David Whiting (JIRA)" <j...@apache.org>
Subject [jira] [Updated] (CRUNCH-301) Cogrouping tables where RHS has a Scala tuple value type causes duplicated and missing RHS values
Date Wed, 20 Nov 2013 15:45:37 GMT

     [ https://issues.apache.org/jira/browse/CRUNCH-301?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

David Whiting updated CRUNCH-301:
---------------------------------

    Description: 
Suppose you have three record types, Rec1, Rec2 and Rec3.
Rec1 references Rec2 via key1, and Rec2 references Rec3 (one-to-many) by key2. If you innerJoin
Rec2 and Rec3 to make a PCollection[(Rec2,Rec3)] and they cogroup it against Rec1, then instead
of surfacing n different (Rec2,Rec3) tuples applicable to the Rec1, it surfaces just one of
the (Rec2, Rec3) tuples multiple times.

This only happens when running with MRPipeline, and not with MemPipeline.

This is the simplest complete program I could come up with which will produce this unexpected
result:

{code}
package testcases

import org.apache.crunch.impl.mr.MRPipeline
import org.apache.crunch.io.{From, To}
import org.apache.crunch.scrunch.PCollection
import org.apache.crunch.types.avro.{ReflectDataFactory, Avros}
import org.apache.avro.file.DataFileWriter
import org.apache.hadoop.fs.{Path, FSDataOutputStream}
import org.apache.hadoop.conf.Configuration

object IsolatedBug {

  case class Rec1(var k: Int, var v: String) { def this() = this(0, "") }
  case class Rec2(var k: Int, var k2: String, var v: Double) { def this() = this(0, "", 0.0)
}
  case class Rec3(var k2: String, var v:Int) { def this() = this("", 0)}
  def run() {
    val prefix = "/user/davw/tmp/isolation"

    val ones = Seq(Rec1(1, "hello"), Rec1(1, "tjena"), Rec1(2, "goodbye"))
    val twos = Seq(Rec2(1, "a", 0.4), Rec2(1, "a", 0.5), Rec2(1, "b", 0.6), Rec2(1, "b", 0.7),
Rec2(2, "c", 9.9))
    val threes = Seq(Rec3("a", 4), Rec3("b", 5), Rec3("c", 6))

    writeCollection(new Path(prefix + "/ones"), ones)
    writeCollection(new Path(prefix + "/twos"), twos)
    writeCollection(new Path(prefix + "/threes"), threes)

    val pipe = new MRPipeline(getClass)
    val oneF = new PCollection(pipe.read(From.avroFile(prefix + "/ones", Avros.reflects(classOf[Rec1]))))
    val twoF = new PCollection(pipe.read(From.avroFile(prefix + "/twos", Avros.reflects(classOf[Rec2]))))
    val threeF = new PCollection(pipe.read(From.avroFile(prefix + "/threes", Avros.reflects(classOf[Rec3]))))
    (oneF.by(_.k)
      cogroup
      (twoF.by(_.k2)
         innerJoin threeF.by(_.k2))
        .values()
        .by(_._1.k))
      .values()
      .map(
        {case (one, twothree) =>
          (one ++ twothree)
            .map(_.toString)
            .reduce((a,b) => a + "\t" + b)})
      .write(To.textFile(prefix + "/output"))

    pipe.done()

  }

  def writeCollection(path: Path, records: Iterable[_ <: AnyRef]) {
    writeAvroFile(path.getFileSystem(new Configuration()).create(path, true), records)
  }

  @SuppressWarnings(Array("rawtypes", "unchecked"))
  private def writeAvroFile[T <: AnyRef](outputStream: FSDataOutputStream, records: Iterable[T])
{
    val r: AnyRef = records.iterator.next()
    val schema = new ReflectDataFactory().getReflectData.getSchema(r.getClass)

    val writer = new ReflectDataFactory().getWriter[T](schema)
    val dataFileWriter = new DataFileWriter(writer)
    dataFileWriter.create(schema, outputStream)

    for (record <- records) {
      dataFileWriter.append(record)
    }
    dataFileWriter.close()
    outputStream.close()
  }

  def main(args: Seq[String]) { run() }
}
{code}

The result that is produced is:
{code}
Rec1(1,tjena)	Rec1(1,hello)	(Rec2(1,a,0.5),Rec3(a,4))	(Rec2(1,a,0.5),Rec3(a,4))	(Rec2(1,a,0.5),Rec3(a,4))
(Rec2(1,a,0.5),Rec3(a,4))
Rec1(2,goodbye)	(Rec2(2,c,9.9),Rec3(c,6))
{code}

As you can see, there's a single (Rec2, Rec3) tuple repeated many times, instead of showing
all the distinct ones. This does not happen if you join against Rec2 on its own.


  was:
Suppose you have three record types, Rec1, Rec2 and Rec3.
Rec1 references Rec2 via key1, and Rec2 references Rec3 (one-to-many) by key2. If you innerJoin
Rec2 and Rec3 to make a PCollection[(Rec2,Rec3)] and they cogroup it against Rec1, then instead
of surfacing n different (Rec2,Rec3) tuples applicable to the Rec1, it surfaces just one of
the (Rec2, Rec3) tuples multiple times.

This only happens when running with MRPipeline, and not with MemPipeline.

This is the simplest complete program I could come up with which will produce this unexpected
result:

{code:scala}
package testcases

import org.apache.crunch.impl.mr.MRPipeline
import org.apache.crunch.io.{From, To}
import org.apache.crunch.scrunch.PCollection
import org.apache.crunch.types.avro.{ReflectDataFactory, Avros}
import org.apache.avro.file.DataFileWriter
import org.apache.hadoop.fs.{Path, FSDataOutputStream}
import org.apache.hadoop.conf.Configuration

object IsolatedBug {

  case class Rec1(var k: Int, var v: String) { def this() = this(0, "") }
  case class Rec2(var k: Int, var k2: String, var v: Double) { def this() = this(0, "", 0.0)
}
  case class Rec3(var k2: String, var v:Int) { def this() = this("", 0)}
  def run() {
    val prefix = "/user/davw/tmp/isolation"

    val ones = Seq(Rec1(1, "hello"), Rec1(1, "tjena"), Rec1(2, "goodbye"))
    val twos = Seq(Rec2(1, "a", 0.4), Rec2(1, "a", 0.5), Rec2(1, "b", 0.6), Rec2(1, "b", 0.7),
Rec2(2, "c", 9.9))
    val threes = Seq(Rec3("a", 4), Rec3("b", 5), Rec3("c", 6))

    writeCollection(new Path(prefix + "/ones"), ones)
    writeCollection(new Path(prefix + "/twos"), twos)
    writeCollection(new Path(prefix + "/threes"), threes)

    val pipe = new MRPipeline(getClass)
    val oneF = new PCollection(pipe.read(From.avroFile(prefix + "/ones", Avros.reflects(classOf[Rec1]))))
    val twoF = new PCollection(pipe.read(From.avroFile(prefix + "/twos", Avros.reflects(classOf[Rec2]))))
    val threeF = new PCollection(pipe.read(From.avroFile(prefix + "/threes", Avros.reflects(classOf[Rec3]))))
    (oneF.by(_.k)
      cogroup
      (twoF.by(_.k2)
         innerJoin threeF.by(_.k2))
        .values()
        .by(_._1.k))
      .values()
      .map(
        {case (one, twothree) =>
          (one ++ twothree)
            .map(_.toString)
            .reduce((a,b) => a + "\t" + b)})
      .write(To.textFile(prefix + "/output"))

    pipe.done()

  }

  def writeCollection(path: Path, records: Iterable[_ <: AnyRef]) {
    writeAvroFile(path.getFileSystem(new Configuration()).create(path, true), records)
  }

  @SuppressWarnings(Array("rawtypes", "unchecked"))
  private def writeAvroFile[T <: AnyRef](outputStream: FSDataOutputStream, records: Iterable[T])
{
    val r: AnyRef = records.iterator.next()
    val schema = new ReflectDataFactory().getReflectData.getSchema(r.getClass)

    val writer = new ReflectDataFactory().getWriter[T](schema)
    val dataFileWriter = new DataFileWriter(writer)
    dataFileWriter.create(schema, outputStream)

    for (record <- records) {
      dataFileWriter.append(record)
    }
    dataFileWriter.close()
    outputStream.close()
  }

  def main(args: Seq[String]) { run() }
}
{code}

The result that is produced is:
{code}
Rec1(1,tjena)	Rec1(1,hello)	(Rec2(1,a,0.5),Rec3(a,4))	(Rec2(1,a,0.5),Rec3(a,4))	(Rec2(1,a,0.5),Rec3(a,4))
(Rec2(1,a,0.5),Rec3(a,4))
Rec1(2,goodbye)	(Rec2(2,c,9.9),Rec3(c,6))
{code}

As you can see, there's a single (Rec2, Rec3) tuple repeated many times, instead of showing
all the distinct ones. This does not happen if you join against Rec2 on its own.



> Cogrouping tables where RHS has a Scala tuple value type causes duplicated and missing
RHS values
> -------------------------------------------------------------------------------------------------
>
>                 Key: CRUNCH-301
>                 URL: https://issues.apache.org/jira/browse/CRUNCH-301
>             Project: Crunch
>          Issue Type: Bug
>          Components: Scrunch
>    Affects Versions: 0.8.0
>         Environment: Hadoop 2
>            Reporter: David Whiting
>
> Suppose you have three record types, Rec1, Rec2 and Rec3.
> Rec1 references Rec2 via key1, and Rec2 references Rec3 (one-to-many) by key2. If you
innerJoin Rec2 and Rec3 to make a PCollection[(Rec2,Rec3)] and they cogroup it against Rec1,
then instead of surfacing n different (Rec2,Rec3) tuples applicable to the Rec1, it surfaces
just one of the (Rec2, Rec3) tuples multiple times.
> This only happens when running with MRPipeline, and not with MemPipeline.
> This is the simplest complete program I could come up with which will produce this unexpected
result:
> {code}
> package testcases
> import org.apache.crunch.impl.mr.MRPipeline
> import org.apache.crunch.io.{From, To}
> import org.apache.crunch.scrunch.PCollection
> import org.apache.crunch.types.avro.{ReflectDataFactory, Avros}
> import org.apache.avro.file.DataFileWriter
> import org.apache.hadoop.fs.{Path, FSDataOutputStream}
> import org.apache.hadoop.conf.Configuration
> object IsolatedBug {
>   case class Rec1(var k: Int, var v: String) { def this() = this(0, "") }
>   case class Rec2(var k: Int, var k2: String, var v: Double) { def this() = this(0, "",
0.0) }
>   case class Rec3(var k2: String, var v:Int) { def this() = this("", 0)}
>   def run() {
>     val prefix = "/user/davw/tmp/isolation"
>     val ones = Seq(Rec1(1, "hello"), Rec1(1, "tjena"), Rec1(2, "goodbye"))
>     val twos = Seq(Rec2(1, "a", 0.4), Rec2(1, "a", 0.5), Rec2(1, "b", 0.6), Rec2(1, "b",
0.7), Rec2(2, "c", 9.9))
>     val threes = Seq(Rec3("a", 4), Rec3("b", 5), Rec3("c", 6))
>     writeCollection(new Path(prefix + "/ones"), ones)
>     writeCollection(new Path(prefix + "/twos"), twos)
>     writeCollection(new Path(prefix + "/threes"), threes)
>     val pipe = new MRPipeline(getClass)
>     val oneF = new PCollection(pipe.read(From.avroFile(prefix + "/ones", Avros.reflects(classOf[Rec1]))))
>     val twoF = new PCollection(pipe.read(From.avroFile(prefix + "/twos", Avros.reflects(classOf[Rec2]))))
>     val threeF = new PCollection(pipe.read(From.avroFile(prefix + "/threes", Avros.reflects(classOf[Rec3]))))
>     (oneF.by(_.k)
>       cogroup
>       (twoF.by(_.k2)
>          innerJoin threeF.by(_.k2))
>         .values()
>         .by(_._1.k))
>       .values()
>       .map(
>         {case (one, twothree) =>
>           (one ++ twothree)
>             .map(_.toString)
>             .reduce((a,b) => a + "\t" + b)})
>       .write(To.textFile(prefix + "/output"))
>     pipe.done()
>   }
>   def writeCollection(path: Path, records: Iterable[_ <: AnyRef]) {
>     writeAvroFile(path.getFileSystem(new Configuration()).create(path, true), records)
>   }
>   @SuppressWarnings(Array("rawtypes", "unchecked"))
>   private def writeAvroFile[T <: AnyRef](outputStream: FSDataOutputStream, records:
Iterable[T]) {
>     val r: AnyRef = records.iterator.next()
>     val schema = new ReflectDataFactory().getReflectData.getSchema(r.getClass)
>     val writer = new ReflectDataFactory().getWriter[T](schema)
>     val dataFileWriter = new DataFileWriter(writer)
>     dataFileWriter.create(schema, outputStream)
>     for (record <- records) {
>       dataFileWriter.append(record)
>     }
>     dataFileWriter.close()
>     outputStream.close()
>   }
>   def main(args: Seq[String]) { run() }
> }
> {code}
> The result that is produced is:
> {code}
> Rec1(1,tjena)	Rec1(1,hello)	(Rec2(1,a,0.5),Rec3(a,4))	(Rec2(1,a,0.5),Rec3(a,4))	(Rec2(1,a,0.5),Rec3(a,4))
(Rec2(1,a,0.5),Rec3(a,4))
> Rec1(2,goodbye)	(Rec2(2,c,9.9),Rec3(c,6))
> {code}
> As you can see, there's a single (Rec2, Rec3) tuple repeated many times, instead of showing
all the distinct ones. This does not happen if you join against Rec2 on its own.



--
This message was sent by Atlassian JIRA
(v6.1#6144)

Mime
View raw message