flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Michele Bertoni <michele1.bert...@mail.polimi.it>
Subject Re: inconsistency in count and print
Date Sat, 16 May 2015 10:22:49 GMT
I forgot i was importing guava in this way

import com.google.common.hash.{HashFunction, Hashing}
including it in maven

but i had also the opportunity to use
import org.apache.flink.shaded.com.google.common.hash.{HashFunction, Hashing}

none of them is working properly


Il giorno 16/mag/2015, alle ore 12:20, Michele Bertoni <michele1.bertoni@mail.polimi.it<mailto:michele1.bertoni@mail.polimi.it>>
ha scritto:

The first time I hash my data is in the reading phase: each line is added of one field that
is the hash of its file name, I do this with a custom reader that extends the DelimitedInputFormat
and override the open, nextRecord and readRecord methods

/* … */
private var id : Long = 0L

override def open(split : FileInputSplit) = {
    super.open (split)
    //TODO hasher problem: guava fails, java hashcode works
    //val hf : HashFunction = Hashing.sha256()
    //id = hf.newHasher.putString(split.getPath.getName.toString, Charsets.UTF_8).hash.asLong
    id = (split.getPath.getName.toString).hashCode.toLong
  }

override def readRecord(reusable : (FlinkRegionType), bytes : Array[Byte], offset : Int, numBytes
: Int) : (FlinkRegionType) = {
    (parser(id, new String(bytes.slice(offset,offset+numBytes), Charset.forName(charsetName))))
  }

override def nextRecord(record : FlinkRegionType) : FlinkRegionType = {
    try{
      super.nextRecord(record)
    } catch {
       case e : ParsingException => {
         logger.info(“Region Data format error in the tuple: " + e.getMessage)
         nextRecord(record)
       }
    }
  }

/* … */



Then every time I join two dataset or want to aggregate (groupBy) by many different field
of the tuple I create a new hash of the concatenation of the respective id

val joinResult : DataSet[(Long, String, Long, Long, Char, Array[GValue], List[Array[GValue]],
Int, Long)] =
      ref
        .joinWithHuge(exp).where(0,2).equalTo(0,2){
          (r : (String, Int, Int, Long, String, Long, Long, Char, Array[GValue]), x : (String,
Int, Int, Long, String, Long, Long, Char, Array[GValue]), out : Collector[(Long, String, Long,
Long, Char, Array[GValue], List[Array[GValue]], Int, Long)]) => {
            if(/* regions cross */) {

              //TODO hasher problem: guava fails, java hashcode works
              //val hashId = hf.newHasher().putString(r._4.toString + x._4.toString, Charsets.UTF_8).hash.asLong
              val hashId = (r._4.toString + x._4.toString).hashCode.toLong

              //TODO hasher problem: guava fails, java hashcode works
              //val aggregationId = hf.newHasher().putString(hashId.toString + r._5.toString
+ r._6.toString + r._7.toString + r._8.toString + r._9.map((g) => g.toString).sorted.reduce(_
+ _).toString, Charsets.UTF_8).hash.asLong
              val aggregationId = (hashId.toString + r._5.toString + r._6.toString + r._7.toString
+ r._8.toString + r._9.map((g) => g.toString).sorted.reduce(_ + _).toString).hashCode.toLong
              out.collect(hashId, r._5, r._6, r._7, r._8, r._9, List(x._9), 1, aggregationId)
            }
        }
    }


This is just an example, I have two kind of data the one I showed is the core data, then I
have the meta data associated to the core via the same hash of the original file name
Also on the meta I have similar functionality of joining grouping and re-hashing
Again with the java hashcode (see above) anything seems to work




Il giorno 16/mag/2015, alle ore 12:00, Fabian Hueske <fhueske@gmail.com<mailto:fhueske@gmail.com>>
ha scritto:

Invalid hash values can certainly cause non-deterministic results.

Can you provide a code snippet that shows how and where you used the Guava Hasher?

2015-05-16 11:52 GMT+02:00 Michele Bertoni <michele1.bertoni@mail.polimi.it<mailto:michele1.bertoni@mail.polimi.it>>:
Is it possible that is due to the hasher?

Inside my code i was using the google guava hasher (sha256 as a Long hash)
sometimes I got errors from it (ArrayOutOfBoundException) sometimes i just got different hash
for the same id, especially when running on an not-local execution environment

I removed it anywhere and I started using the java hashcode, now it is seems to work


> Il giorno 16/mag/2015, alle ore 09:15, Michele Bertoni <michele1.bertoni@mail.polimi.it<mailto:michele1.bertoni@mail.polimi.it>>
ha scritto:
>
> Hi,
> it is 2 days i am going mad with a problem, every time i run the code (on the same dataset)
i get a different result
>
> while i was trying debugging i found this
>
> i have this code
>
> val aggregationResult  = //something that creates the dataset and uses join, group, reduce
and map
> logger.error("res count " + aggregationResult.count)
> aggregationResult.print
>
>
>
> the logger prints a dataset size of 7
> the output result is made of 6 elements
>
> this happens randomly sometimes the result is larger than the count and sometimes they
are both correct at 10
>
>
>
> flink version 0.9milestone1
>
> any idea of what can make it “not deterministic”?
> thanks for help




Mime
View raw message