flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tillrohrmann <...@git.apache.org>
Subject [GitHub] flink pull request: [FLINK-1901] [core] Create sample operator for...
Date Wed, 19 Aug 2015 16:52:07 GMT
Github user tillrohrmann commented on the pull request:

    https://github.com/apache/flink/pull/949#issuecomment-132692640
  
    Sorry for the late review @ChengXiangLi. I finished it now and the PR looks really good.

    
    There is only one thing I would like to change before merging it. This is to move the
`sample` and the `sampleWithSize` methods from the `DataSet` to the `DataSetUtils` class.
This will effectively make the `sample` methods not part of the core API. The reason for this
is that the `sampleWithSize` method breaks with one of Flink's guarantees, which is the robustness
of the core API functions against `OutOfMemoryExceptions`. Let me elaborate on it for better
understanding.
    
    All of Flink internal operations work on serialized data which is stored in Flink's *managed*
memory. The managed memory allows Flink to detect when to spill elements from memory to disk
to avoid out of memory exceptions and, thus, to make the system robust. The managed memory
is a pre-allocated area of memory which is administered by the `MemoryManager`. The `MemoryManager`
allows you to allocate and deallocate `MemorySegments` in a c-style fashion. However, once
a data item enters a UDF, the item has to be deserialized putting it on the remaining heap.
This is not bad if your UDF does not accumulate these elements. However, the `sampleWithSize`
method materializes up to `numSamples` elements on the heap. Depending on the number of samples
and the data item size, this might be enough to eat up all remaining heap memory space and
to crash the JVM.
    
    I think that your current implementation will work for most use cases but in order to
make it part of the core API, we also have to deal with the case where our sample cannot be
materialized on the remaining heap of a running Flink program. In order to achieve this, I
think it would be necessary to implement a native `topK` operator. With *native* I mean an
operator which works on Flink's managed memory and, thus, can also deal with spilling records
to disk. Having such a `topK` operator, we could reimplement the reservoir sampling algorithm
the following way: For sampling without replacement we first assign weights in a map operation
to each element. Then we call topK with respect to the weights and obtain the sample. For
the sampling with replacement we could simply use a flat map operation to assign `numSamples`
times a weight to each element. Then we again call `topK` with respect to the weight.
    
    For the topK implementation, we would need something like a `PriorityQueue` which operates
on managed memory (similar to the `CompactingHashTable` which is a hash table working on managed
memory). Thus, we would have a priority queue which stores the priority values of each record
and a pointer to the record which is kept in managed memory. Whenever an element is removed
from the priority queue, we can also free the occupied managed memory. In case that we run
out of managed memory, we have to spill some of the records to disk which are still in the
race for the top k. As a first step, we can skip the spilling and just throw a proper exception
(other than `OutOfMemoryException`) when we run out of memory. Afterwards, we can incrementally
add the spilling functionality.
    
    I know that you've already spent a lot of effort into writing the sampling operator and
that this result might be a little bit demotivating. However, if we want to make it right
and robust, then I think this is the way to go. Additionally we would add a proper topK operator
to Flink's API which is missing big time :-) If you want to, then you could also take the
lead here. The further discussion should then happen in a separate issue. I'm more than willing
to assist you in implementing this operator. What do you think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

Mime
View raw message