flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From weijie tong <tongweijie...@gmail.com>
Subject Re: questions about Flink's HashJoin performance
Date Mon, 15 May 2017 14:26:29 GMT
The Flink version is 1.2.0

On Mon, May 15, 2017 at 10:24 PM, weijie tong <tongweijie178@gmail.com>
wrote:

> @Till thanks for your reply.
>
> My code is similar to   HashTableITCase.testInMemoryMutableHashTable()
> . It just use the MutableHashTable class , there's  no other Flink's
> configuration.  The main code body is:
>
> this.recordBuildSideAccessor = RecordSerializer.get();
>> this.recordProbeSideAccessor = RecordSerializer.get();
>> final int[] buildKeyPos = new int[]{buildSideJoinIndex};
>> final int[] probeKeyPos = new int[]{probeSideJoinIndex};
>> final Class<? extends Value>[] keyType = (Class<? extends Value>[]) new
Class[]{BytesValue.class};
>> this.recordBuildSideComparator = new RecordComparator(buildKeyPos, keyType);
>> this.recordProbeSideComparator = new RecordComparator(probeKeyPos, keyType);
>> this.pactRecordComparator = new HashJoinVectorJointGroupIterator.RecordPairComparator(buildSideJoinIndex);
>> Sequence<Record> buildSideRecordsSeq = makeSequenceRecordOfSameSideSegments(buildSideSegs,
localJoinQuery);
>> Sequence<Record> probeSideRecordsSeq = makeSequenceRecordOfSameSideSegments(probeSideSegs,
localJoinQuery);
>> List<MemorySegment> memorySegments;
>> int pageSize = hashTableMemoryManager.getTotalNumPages();
>> try {
>>   memorySegments = this.hashTableMemoryManager.allocatePages(MEM_OWNER, pageSize);
>> }
>> catch (MemoryAllocationException e) {
>>   LOGGER.error("could not allocate " + pageSize + " pages memory for HashJoin", e);
>>   Throwables.propagate(e);
>>   return;
>> }
>> try {
>>   Stopwatch stopwatch = Stopwatch.createStarted();
>>   UniformRecordGenerator buildInput = new UniformRecordGenerator(buildSideRecordsSeq);
>>   UniformRecordGenerator probeInput = new UniformRecordGenerator(probeSideRecordsSeq);
>>   join = new MutableHashTable<Record, Record>(
>>       recordBuildSideAccessor,
>>       recordProbeSideAccessor,
>>       recordBuildSideComparator,
>>       recordProbeSideComparator,
>>       pactRecordComparator,
>>       memorySegments,
>>       ioManager
>>   );
>>   join.open(buildInput,probeInput);
>>
>>   LOGGER.info("construct hash table elapsed:" + stopwatch.elapsed(TimeUnit.MILLISECONDS)
+ "ms");
>>
>>
> The BytesValue type is self defined one which holds byte[] , but just like
> the original StringValue, also has the same serDe performance.
>
>
> while (join.nextRecord()) {
>   Record currentProbeRecord = join.getCurrentProbeRecord();
>   MutableObjectIterator<Record> buildSideIterator = join.getBuildSideIterator();
>   while (buildSideIterator.next(reusedBuildSideRow) != null) {
>     materializeRecord2OutVector(reusedBuildSideRow, buildSideIndex2Value, buildSideIndex2Vector,
rowNum);
>     materializeRecord2OutVector(currentProbeRecord, probeSideIndex2Value, probeSideIndex2Vector,
rowNum);
>     rowNum++;
>   }}
>
>
>
>
> I have tried both the Record ,Row class as the type of records without any
> better improved performance . I also tried batched the input records. That
> means the  buildInput or probeInput variables of the first code block
> which iterate one Record a time from another batched Records . Batched
> records's content stay in memory in Drill's ValueVector format. Once a
> record is need to participate in the build or probe phase from a
> iterate.next() call,
> it will be fetched from the batched in memory ValueVector content. But no
> performance gains.
>
>
> The top hotspot profile from Jprofiler is below:
> >
> Hot spot,"Self time (microseconds)","Average Time","Invocations"
> org.apache.flink.types.Record.serialize,1014127,"n/a","n/a"
> org.apache.flink.types.Record.deserialize,60684,"n/a","n/a"
> org.apache.flink.types.Record.copyTo,83007,"n/a","n/a"
> org.apache.flink.runtime.operators.hash.MutableHashTable.open,55238,"
> n/a","n/a"
> org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord,
> 10955,"n/a","n/a"
> org.apache.flink.runtime.memory.MemoryManager.release,33484,"n/a","n/a"
> org.apache.flink.runtime.memory.MemoryManager.
> allocatePages,104259,"n/a","n/a"
>
>
> My log show that hashjoin.open()  method costs too much time.
> >
> construct hash table elapsed:1885ms
>
>
>
>
> On Mon, May 15, 2017 at 6:20 PM, Till Rohrmann <trohrmann@apache.org>
> wrote:
>
>> Hi Weijie,
>>
>> it might be the case that batching the processing of multiple rows can
>> give you an improved performance compared to single row processing.
>>
>> Maybe you could share the exact benchmark base line results and the code
>> you use to test Flink's MutableHashTable with us. Also the Flink
>> configuration and how you run it would be of interest. That way we might be
>> able to see if we can tune Flink a bit more.
>>
>> Cheers,
>> Till
>>
>> On Sun, May 14, 2017 at 5:23 AM, weijie tong <tongweijie178@gmail.com>
>> wrote:
>>
>>> I has a test case to use Flink's MutableHashTable class to do a hash
>>> join on a local machine with 64g memory, 64cores. The test case is one
>>> build table with 14w rows ,one probe table with 320w rows ,the matched
>>> result rows is 12 w.
>>>
>>> It takes 2.2 seconds to complete the join.The performance seems bad. I
>>> ensure there's no overflow, the smaller table is the build side. The
>>> MutableObjectIterator is a sequence of Rows. The Row is composed of several
>>> fields which are byte[]. Through my log,I find the open() method takes
>>> 1.560 seconds. The probe iterates phase takes 680ms.  And my Jprofiler's
>>> profile shows the MutableObjectIterator's next() method call is the
>>> hotspot.
>>>
>>>
>>> I want to know how to tune this scenario. I find Drill's HashJoin is
>>> batch model. Its build side's input is a RecordBatch which holds batch of
>>> rows and memory size is approach to L2 cache. Through this strategy it will
>>> gain less method calls (that means call to next() ) and much efficient to
>>> cpu calculation.  I also find SQL server's paper noticed the batch model's
>>> performance gains (https://www.microsoft.com/en-
>>> us/research/wp-content/uploads/2013/06/Apollo3-Sigmod-2013-final.pdf)
>>>  .   I guess the performance's down is due to the single row iterate model.
>>>
>>>
>>> Hope someone to correct my opinion. Also maybe I have a wrong use  of
>>> the MutableHashTable. wait for someone to give an advice.
>>>
>>
>>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message