flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Stephan Ewen <se...@apache.org>
Subject Re: inconsistency in count and print
Date Mon, 18 May 2015 17:48:59 GMT
Hi Michele!

I cannot tell you what the problem is at a first glance, but here are some
pointers that may help you find the problem:

Input split creation determinism
  - The number of input splits is not really deterministic. It depends on
the parallelism of the source (this tells the system how many splits it
should create at least).
  - The splits themselves may also not be strictly deterministic, they can
be influenced by the order in which files in directories get enumerated.

==> The records that a specific split contains may vary between two runs. I
am not sure if that is an issue in your implementation is the hash or the
file name, which should not change.


Input split assignment determinism:
  - The input splits are assigned dynamically at runtime to the data
sources. Through that dynamic assignment, Flink balances between slower and
faster sources and tries to get the best locality.

==> Not every split is processed by the same subtask of the data source in
two successive runs.


Maybe these pointers help you rethink some of your assumptions. I would
suspect you make some assumption that is not guaranteed by the system.
It would be good if you shared a bit more of your code, such as the flow of
the program (what functions follow which functions).

If you do not want to share the code publicly on this mailing list, you can
send me a private mail at sewen@apache.org


Greetings,
Stephan



On Sat, May 16, 2015 at 12:22 PM, Michele Bertoni <
michele1.bertoni@mail.polimi.it> wrote:

>  I forgot i was importing guava in this way
>
>  import com.google.common.hash.{HashFunction, Hashing}
> including it in maven
>
>  but i had also the opportunity to use
>  import org.apache.flink.shaded.com.google.common.hash.{HashFunction,
> Hashing}
>
>  none of them is working properly
>
>
>  Il giorno 16/mag/2015, alle ore 12:20, Michele Bertoni <
> michele1.bertoni@mail.polimi.it> ha scritto:
>
>  The first time I hash my data is in the reading phase: each line is
> added of one field that is the hash of its file name, I do this with a
> custom reader that extends the DelimitedInputFormat and override the open,
> nextRecord and readRecord methods
>
>  /* … */
> private var id : Long = 0L
>
>  override def open(split : FileInputSplit) = {
>     super.open (split)
>     //TODO hasher problem: guava fails, java hashcode works
>     //val hf : HashFunction = Hashing.sha256()
>     //id = hf.newHasher.putString(split.getPath.getName.toString,
> Charsets.UTF_8).hash.asLong
>     id = (split.getPath.getName.toString).hashCode.toLong
>   }
>
>  override def readRecord(reusable : (FlinkRegionType), bytes :
> Array[Byte], offset : Int, numBytes : Int) : (FlinkRegionType) = {
>     (parser(id, new String(bytes.slice(offset,offset+numBytes),
> Charset.forName(charsetName))))
>   }
>
>  override def nextRecord(record : FlinkRegionType) : FlinkRegionType = {
>     try{
>       super.nextRecord(record)
>     } catch {
>        case e : ParsingException => {
>          logger.info(“Region Data format error in the tuple: " +
> e.getMessage)
>          nextRecord(record)
>        }
>     }
>   }
>
>  /* … */
>
>
>
>  Then every time I join two dataset or want to aggregate (groupBy) by
> many different field of the tuple I create a new hash of the concatenation
> of the respective id
>
>  val joinResult : DataSet[(Long, String, Long, Long, Char, Array[GValue],
> List[Array[GValue]], Int, Long)] =
>       ref
>         .joinWithHuge(exp).where(0,2).equalTo(0,2){
>           (r : (String, Int, Int, Long, String, Long, Long, Char,
> Array[GValue]), x : (String, Int, Int, Long, String, Long, Long, Char,
> Array[GValue]), out : Collector[(Long, String, Long, Long, Char,
> Array[GValue], List[Array[GValue]], Int, Long)]) => {
>             if(/* regions cross */) {
>
>                //TODO hasher problem: guava fails, java hashcode works
>               //val hashId = hf.newHasher().putString(r._4.toString +
> x._4.toString, Charsets.UTF_8).hash.asLong
>               val hashId = (r._4.toString + x._4.toString).hashCode.toLong
>
>                //TODO hasher problem: guava fails, java hashcode works
>               //val aggregationId =
> hf.newHasher().putString(hashId.toString + r._5.toString + r._6.toString +
> r._7.toString + r._8.toString + r._9.map((g) => g.toString).sorted.reduce(_
> + _).toString, Charsets.UTF_8).hash.asLong
>               val aggregationId = (hashId.toString + r._5.toString +
> r._6.toString + r._7.toString + r._8.toString + r._9.map((g) =>
> g.toString).sorted.reduce(_ + _).toString).hashCode.toLong
>               out.collect(hashId, r._5, r._6, r._7, r._8, r._9,
> List(x._9), 1, aggregationId)
>             }
>         }
>     }
>
>
>  This is just an example, I have two kind of data the one I showed is the
> core data, then I have the meta data associated to the core via the same
> hash of the original file name
> Also on the meta I have similar functionality of joining grouping and
> re-hashing
> Again with the java hashcode (see above) anything seems to work
>
>
>
>
>  Il giorno 16/mag/2015, alle ore 12:00, Fabian Hueske <fhueske@gmail.com>
> ha scritto:
>
>  Invalid hash values can certainly cause non-deterministic results.
>
>  Can you provide a code snippet that shows how and where you used the
> Guava Hasher?
>
> 2015-05-16 11:52 GMT+02:00 Michele Bertoni <
> michele1.bertoni@mail.polimi.it>:
>
>> Is it possible that is due to the hasher?
>>
>> Inside my code i was using the google guava hasher (sha256 as a Long hash)
>> sometimes I got errors from it (ArrayOutOfBoundException) sometimes i
>> just got different hash for the same id, especially when running on an
>> not-local execution environment
>>
>> I removed it anywhere and I started using the java hashcode, now it is
>> seems to work
>>
>>
>> > Il giorno 16/mag/2015, alle ore 09:15, Michele Bertoni <
>> michele1.bertoni@mail.polimi.it> ha scritto:
>> >
>> > Hi,
>> > it is 2 days i am going mad with a problem, every time i run the code
>> (on the same dataset) i get a different result
>> >
>> > while i was trying debugging i found this
>> >
>> > i have this code
>> >
>> > val aggregationResult  = //something that creates the dataset and uses
>> join, group, reduce and map
>> > logger.error("res count " + aggregationResult.count)
>> > aggregationResult.print
>> >
>> >
>> >
>> > the logger prints a dataset size of 7
>> > the output result is made of 6 elements
>> >
>> > this happens randomly sometimes the result is larger than the count and
>> sometimes they are both correct at 10
>> >
>> >
>> >
>> > flink version 0.9milestone1
>> >
>> > any idea of what can make it “not deterministic”?
>> > thanks for help
>>
>>
>
>
>

Mime
View raw message