crunch-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "David Whiting (JIRA)" <j...@apache.org>
Subject [jira] [Updated] (CRUNCH-301) Cogrouping tables where RHS has a Scala tuple value type causes duplicated and missing RHS values
Date Wed, 20 Nov 2013 15:49:35 GMT

     [ https://issues.apache.org/jira/browse/CRUNCH-301?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

David Whiting updated CRUNCH-301:
---------------------------------

    Attachment: IsolatedBug.scala

Attached code to reproduce

> Cogrouping tables where RHS has a Scala tuple value type causes duplicated and missing
RHS values
> -------------------------------------------------------------------------------------------------
>
>                 Key: CRUNCH-301
>                 URL: https://issues.apache.org/jira/browse/CRUNCH-301
>             Project: Crunch
>          Issue Type: Bug
>          Components: Scrunch
>    Affects Versions: 0.8.0
>         Environment: Hadoop 2
>            Reporter: David Whiting
>         Attachments: IsolatedBug.scala
>
>
> Suppose you have three record types, Rec1, Rec2 and Rec3.
> Rec1 references Rec2 via key1, and Rec2 references Rec3 (one-to-many) by key2. If you
innerJoin Rec2 and Rec3 to make a PCollection[(Rec2,Rec3)] and they cogroup it against Rec1,
then instead of surfacing n different (Rec2,Rec3) tuples applicable to the Rec1, it surfaces
just one of the (Rec2, Rec3) tuples multiple times.
> This only happens when running with MRPipeline, and not with MemPipeline.
> This is the simplest complete program I could come up with which will produce this unexpected
result:
> {code}
> package testcases
> import org.apache.crunch.impl.mr.MRPipeline
> import org.apache.crunch.io.{From, To}
> import org.apache.crunch.scrunch.PCollection
> import org.apache.crunch.types.avro.{ReflectDataFactory, Avros}
> import org.apache.avro.file.DataFileWriter
> import org.apache.hadoop.fs.{Path, FSDataOutputStream}
> import org.apache.hadoop.conf.Configuration
> object IsolatedBug {
>   case class Rec1(var k: Int, var v: String) { def this() = this(0, "") }
>   case class Rec2(var k: Int, var k2: String, var v: Double) { def this() = this(0, "",
0.0) }
>   case class Rec3(var k2: String, var v:Int) { def this() = this("", 0)}
>   def run() {
>     val prefix = "/user/davw/tmp/isolation"
>     val ones = Seq(Rec1(1, "hello"), Rec1(1, "tjena"), Rec1(2, "goodbye"))
>     val twos = Seq(Rec2(1, "a", 0.4), Rec2(1, "a", 0.5), Rec2(1, "b", 0.6), Rec2(1, "b",
0.7), Rec2(2, "c", 9.9))
>     val threes = Seq(Rec3("a", 4), Rec3("b", 5), Rec3("c", 6))
>     writeCollection(new Path(prefix + "/ones"), ones)
>     writeCollection(new Path(prefix + "/twos"), twos)
>     writeCollection(new Path(prefix + "/threes"), threes)
>     val pipe = new MRPipeline(getClass)
>     val oneF = new PCollection(pipe.read(From.avroFile(prefix + "/ones", Avros.reflects(classOf[Rec1]))))
>     val twoF = new PCollection(pipe.read(From.avroFile(prefix + "/twos", Avros.reflects(classOf[Rec2]))))
>     val threeF = new PCollection(pipe.read(From.avroFile(prefix + "/threes", Avros.reflects(classOf[Rec3]))))
>     (oneF.by(_.k)
>       cogroup
>       (twoF.by(_.k2)
>          innerJoin threeF.by(_.k2))
>         .values()
>         .by(_._1.k))
>       .values()
>       .map(
>         {case (one, twothree) =>
>           (one ++ twothree)
>             .map(_.toString)
>             .reduce((a,b) => a + "\t" + b)})
>       .write(To.textFile(prefix + "/output"))
>     pipe.done()
>   }
>   def writeCollection(path: Path, records: Iterable[_ <: AnyRef]) {
>     writeAvroFile(path.getFileSystem(new Configuration()).create(path, true), records)
>   }
>   @SuppressWarnings(Array("rawtypes", "unchecked"))
>   private def writeAvroFile[T <: AnyRef](outputStream: FSDataOutputStream, records:
Iterable[T]) {
>     val r: AnyRef = records.iterator.next()
>     val schema = new ReflectDataFactory().getReflectData.getSchema(r.getClass)
>     val writer = new ReflectDataFactory().getWriter[T](schema)
>     val dataFileWriter = new DataFileWriter(writer)
>     dataFileWriter.create(schema, outputStream)
>     for (record <- records) {
>       dataFileWriter.append(record)
>     }
>     dataFileWriter.close()
>     outputStream.close()
>   }
>   def main(args: Seq[String]) { run() }
> }
> {code}
> The result that is produced is:
> {code}
> Rec1(1,tjena)	Rec1(1,hello)	(Rec2(1,a,0.5),Rec3(a,4))	(Rec2(1,a,0.5),Rec3(a,4))	(Rec2(1,a,0.5),Rec3(a,4))
(Rec2(1,a,0.5),Rec3(a,4))
> Rec1(2,goodbye)	(Rec2(2,c,9.9),Rec3(c,6))
> {code}
> As you can see, there's a single (Rec2, Rec3) tuple repeated many times, instead of showing
all the distinct ones. This does not happen if you join against Rec2 on its own.



--
This message was sent by Atlassian JIRA
(v6.1#6144)

Mime
View raw message