flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [3/6] flink git commit: [FLINK-2534] [runtime] Some improvements in CompactingHashTable
Date Mon, 17 Aug 2015 12:14:51 GMT
[FLINK-2534] [runtime] Some improvements in CompactingHashTable

This closes #1029


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/63ee34c5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/63ee34c5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/63ee34c5

Branch: refs/heads/master
Commit: 63ee34c5b894e2795e74a3c2aa3d5dc9ac2d5b88
Parents: 0ecc563
Author: HuangWHWHW <404823056@qq.com>
Authored: Mon Aug 17 11:31:02 2015 +0800
Committer: Stephan Ewen <sewen@apache.org>
Committed: Mon Aug 17 11:59:39 2015 +0200

----------------------------------------------------------------------
 .../runtime/operators/hash/CompactingHashTable.java     | 12 ++----------
 1 file changed, 2 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/63ee34c5/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/CompactingHashTable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/CompactingHashTable.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/CompactingHashTable.java
index d07c7e3..ff6548e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/CompactingHashTable.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/CompactingHashTable.java
@@ -380,8 +380,6 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>
{
 		int countInSegment = bucket.getInt(bucketInSegmentOffset + HEADER_COUNT_OFFSET);
 		int numInSegment = 0;
 		int posInSegment = bucketInSegmentOffset + BUCKET_HEADER_LENGTH;
-		
-		long currentForwardPointer = BUCKET_FORWARD_POINTER_NOT_SET;
 
 		// loop over all segments that are involved in the bucket (original bucket plus overflow
buckets)
 		while (true) {
@@ -396,7 +394,6 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>
{
 					// get the pointer to the pair
 					final int pointerOffset = bucketInSegmentOffset + BUCKET_POINTER_START_OFFSET + (numInSegment
* POINTER_LEN);
 					final long pointer = bucket.getLong(pointerOffset);
-					numInSegment++;
 					
 					// deserialize the key to check whether it is really equal, or whether we had only a
hash collision
 					T valueAtPosition = partition.readRecordAt(pointer);
@@ -406,9 +403,7 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>
{
 						return;
 					}
 				}
-				else {
-					numInSegment++;
-				}
+				numInSegment++;
 			}
 			
 			// this segment is done. check if there is another chained bucket
@@ -436,7 +431,6 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>
{
 			countInSegment = bucket.getInt(bucketInSegmentOffset + HEADER_COUNT_OFFSET);
 			posInSegment = bucketInSegmentOffset + BUCKET_HEADER_LENGTH;
 			numInSegment = 0;
-			currentForwardPointer = newForwardPointer;
 		}
 	}
 	
@@ -869,9 +863,7 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>
{
 		final long numRecordsStorable = totalSize / (recordLenBytes + RECORD_OVERHEAD_BYTES);
 		final long bucketBytes = numRecordsStorable * RECORD_OVERHEAD_BYTES;
 		long numBuckets = bucketBytes / (2 * HASH_BUCKET_SIZE) + 1;
-		while(numBuckets % numPartitions != 0) {
-			numBuckets++;
-		}
+		numBuckets += numPartitions - numBuckets % numPartitions;
 		return numBuckets > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) numBuckets;
 	}
 	


Mime
View raw message