flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-3477) Add hash-based combine strategy for ReduceFunction
Date Wed, 25 May 2016 23:22:13 GMT

    [ https://issues.apache.org/jira/browse/FLINK-3477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15301097#comment-15301097

ASF GitHub Bot commented on FLINK-3477:

Github user fhueske commented on a diff in the pull request:

    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReduceHashTable.java
    @@ -0,0 +1,1048 @@
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.runtime.operators.hash;
    +import org.apache.flink.api.common.functions.ReduceFunction;
    +import org.apache.flink.api.common.typeutils.SameTypePairComparator;
    +import org.apache.flink.api.common.typeutils.TypeComparator;
    +import org.apache.flink.api.common.typeutils.TypePairComparator;
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.core.memory.DataInputView;
    +import org.apache.flink.core.memory.MemorySegment;
    +import org.apache.flink.runtime.io.disk.RandomAccessInputView;
    +import org.apache.flink.runtime.memory.AbstractPagedOutputView;
    +import org.apache.flink.util.MathUtils;
    +import org.apache.flink.util.Collector;
    +import org.apache.flink.util.MutableObjectIterator;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +import java.io.EOFException;
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.List;
    + * This hash table supports updating elements, and it also has processRecordWithReduce,
    + * which makes one reduce step with the given record.
    + *
    + * The memory is divided into three areas:
    + *  - Bucket area: they contain bucket heads:
    + *    an 8 byte pointer to the first link of a linked list in the record area
    + *  - Record area: this contains the actual data in linked list elements. A linked list
element starts
    + *    with an 8 byte pointer to the next element, and then the record follows.
    + *  - Staging area: This is a small, temporary storage area for writing updated records.
This is needed,
    + *    because before serializing a record, there is no way to know in advance how large
will it be.
    + *    Therefore, we can't serialize directly into the record area when we are doing an
update, because
    + *    if it turns out to be larger then the old record, then it would override some other
    + *    that happens to be after the old one in memory. The solution is to serialize to
the staging area first,
    + *    and then copy it to the place of the original if it has the same size, otherwise
allocate a new linked
    + *    list element at the end of the record area, and mark the old one as abandoned.
This creates "holes" in
    + *    the record area, so compactions are eventually needed.
    + *
    + *  Compaction happens by deleting everything in the bucket area, and then reinserting
all elements.
    + *  The reinsertion happens by forgetting the structure (the linked lists) of the record
area, and reading it
    + *  sequentially, and inserting all non-abandoned records, starting from the beginning
of the record area.
    + *  Note, that insertions never override a record that have not been read by the reinsertion
sweep, because
    + *  both the insertions and readings happen sequentially in the record area, and the
insertions obviously
    + *  never overtake the reading sweep.
    + *
    + *  Note: we have to abandon the old linked list element even when the updated record
has a smaller size
    + *  than the original, because otherwise we wouldn't know where the next record starts
during a reinsertion
    + *  sweep.
    + *
    + *  The number of buckets depends on how large are the records. The serializer might
be able to tell us this,
    + *  so in this case, we will calculate the number of buckets upfront, and won't do resizes.
    + *  If the serializer doesn't know the size, then we start with a small number of buckets,
and do resizes as more
    + *  elements are inserted than the number of buckets.
    + *
    + *  The number of memory segments given to the staging area is usually one, because it
just needs to hold
    + *  one record.
    + *
    + * Note: For hashing, we need to use MathUtils.hash because of its avalanche property,
so that
    + * changing only some high bits of the original value shouldn't leave the lower bits
of the hash unaffected.
    + * This is because when choosing the bucket for a record, we mask only the
    + * lower bits (see numBucketsMask). Lots of collisions would occur when, for example,
    + * the original value that is hashed is some bitset, where lots of different values
    + * that are different only in the higher bits will actually occur.
    + */
    +public class ReduceHashTable<T> extends AbstractMutableHashTable<T> {
    +	private static final Logger LOG = LoggerFactory.getLogger(ReduceHashTable.class);
    +	/** The minimum number of memory segments ReduceHashTable needs to be supplied with
in order to work. */
    +	private static final int MIN_NUM_MEMORY_SEGMENTS = 3;
    +	// Note: the following two constants can't be negative, because negative values are
reserved for storing the
    +	// negated size of the record, when it is abandoned (not part of any linked list).
    +	/** The last link in the linked lists will have this as next pointer. */
    +	private static final long END_OF_LIST = Long.MAX_VALUE;
    +	/** This value means that prevElemPtr is "pointing to the bucket head", and not into
the record segments. */
    +	private static final long INVALID_PREV_POINTER = Long.MAX_VALUE - 1;
    +	private static final long RECORD_OFFSET_IN_LINK = 8;
    +	/** this is used by processRecordWithReduce */
    +	private final ReduceFunction<T> reducer;
    +	/** emit() sends data to outputCollector */
    +	private final Collector<T> outputCollector;
    +	private final boolean objectReuseEnabled;
    +	/**
    +	 * This initially contains all the memory we have, and then segments
    +	 * are taken from it by bucketSegments, recordArea, and stagingSegments.
    +	 */
    +	private final ArrayList<MemorySegment> freeMemorySegments;
    +	private final int numAllMemorySegments;
    +	private final int segmentSize;
    +	/**
    +	 * These will contain the bucket heads.
    +	 * The bucket heads are pointers to the linked lists containing the actual records.
    +	 */
    +	private MemorySegment[] bucketSegments;
    +	private static final int bucketSize = 8, bucketSizeBits = 3;
    +	private int numBuckets;
    +	private int numBucketsMask;
    +	private final int numBucketsPerSegment, numBucketsPerSegmentBits, numBucketsPerSegmentMask;
    +	/**
    +	 * The segments where the actual data is stored.
    +	 */
    +	private final RecordArea recordArea;
    +	/**
    +	 * Segments for the staging area.
    +	 * (It should contain at most one record at all times.)
    +	 */
    +	private final ArrayList<MemorySegment> stagingSegments;
    +	private final RandomAccessInputView stagingSegmentsInView;
    +	private final StagingOutputView stagingSegmentsOutView;
    +	private T reuse;
    +	/** This is the internal prober that insertOrReplaceRecord and processRecordWithReduce
use. */
    +	private final HashTableProber<T> prober;
    +	/** The number of elements currently held by the table. */
    +	private long numElements = 0;
    +	/** The number of bytes wasted by updates that couldn't overwrite the old record due
to size change. */
    +	private long holes = 0;
    +	/**
    +	 * If the serializer knows the size of the records, then we can calculate the optimal
number of buckets
    +	 * upfront, so we don't need resizes.
    +	 */
    +	private boolean enableResize;
    +	/**
    +	 * This constructor is for the case when will only call those operations that are also
    +	 * present on CompactingHashTable.
    +	 */
    +	public ReduceHashTable(TypeSerializer<T> serializer, TypeComparator<T> comparator,
List<MemorySegment> memory) {
    +		this(serializer, comparator, memory, null, null, false);
    +	}
    +	public ReduceHashTable(TypeSerializer<T> serializer, TypeComparator<T> comparator,
List<MemorySegment> memory,
    --- End diff --
    Can we remove the `ReduceFunction` and `Collector` from this class and move the corresponding
logic into the driver?
    It would be good if this table could also be used by a `CombineFunction` (not only a `ReduceFunction`).

    So, I would remove the following methods:
    - `processRecordWithReduce()`: can be implemented by the driver using the Prober methods
`getMatchFor()` and `updateMatch()`
    - `emit()`: can be implemented by the driver using `getEntryIterator()`
    - `emitAndReset()`: same as `emit()` but we need an additional `reset()` method
    I would also rename the table if it becomes less specialized, maybe to `InPlaceMutableHashTable`
or do you have a better idea, @ggevay?

> Add hash-based combine strategy for ReduceFunction
> --------------------------------------------------
>                 Key: FLINK-3477
>                 URL: https://issues.apache.org/jira/browse/FLINK-3477
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Local Runtime
>            Reporter: Fabian Hueske
>            Assignee: Gabor Gevay
> This issue is about adding a hash-based combine strategy for ReduceFunctions.
> The interface of the {{reduce()}} method is as follows:
> {code}
> public T reduce(T v1, T v2)
> {code}
> Input type and output type are identical and the function returns only a single value.
A Reduce function is incrementally applied to compute a final aggregated value. This allows
to hold the preaggregated value in a hash-table and update it with each function call. 
> The hash-based strategy requires special implementation of an in-memory hash table. The
hash table should support in place updates of elements (if the updated value has the same
size as the new value) but also appending updates with invalidation of the old value (if the
binary length of the new value differs). The hash table needs to be able to evict and emit
all elements if it runs out-of-memory.
> We should also add {{HASH}} and {{SORT}} compiler hints to {{DataSet.reduce()}} and {{Grouping.reduce()}}
to allow users to pick the execution strategy.

This message was sent by Atlassian JIRA

View raw message