flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [5/5] flink git commit: [FLINK-2240] [runtime] Pass flag to configure use of bloom filters through runtime configuration.
Date Thu, 06 Aug 2015 16:52:19 GMT
[FLINK-2240] [runtime] Pass flag to configure use of bloom filters through runtime configuration.

Also make sure that most tests run with enabled bloom filters, to increase test coverage.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0b73b438
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0b73b438
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0b73b438

Branch: refs/heads/master
Commit: 0b73b4387a855627209a4dbaef930321a5090594
Parents: 61dcae3
Author: Stephan Ewen <sewen@apache.org>
Authored: Thu Aug 6 15:49:07 2015 +0200
Committer: Stephan Ewen <sewen@apache.org>
Committed: Thu Aug 6 18:13:00 2015 +0200

----------------------------------------------------------------------
 docs/setup/config.md                            |  15 +-
 .../flink/configuration/ConfigConstants.java    |  26 +--
 .../flink/runtime/execution/Environment.java    |  12 +-
 .../AbstractCachedBuildSideJoinDriver.java      |  20 +-
 .../flink/runtime/operators/JoinDriver.java     |  65 +++++--
 .../flink/runtime/operators/PactDriver.java     |  10 +-
 .../runtime/operators/PactTaskContext.java      |   9 +-
 .../runtime/operators/RegularPactTask.java      |  24 ++-
 .../operators/hash/HashMatchIteratorBase.java   |  11 +-
 .../operators/hash/MutableHashTable.java        | 192 ++++---------------
 .../NonReusingBuildFirstHashMatchIterator.java  |   8 +-
 ...ngBuildFirstReOpenableHashMatchIterator.java |  26 ++-
 .../NonReusingBuildSecondHashMatchIterator.java |   8 +-
 ...gBuildSecondReOpenableHashMatchIterator.java |  23 ++-
 .../hash/ReOpenableMutableHashTable.java        |  18 +-
 .../ReusingBuildFirstHashMatchIterator.java     |   8 +-
 ...ngBuildFirstReOpenableHashMatchIterator.java |  22 ++-
 .../ReusingBuildSecondHashMatchIterator.java    |   8 +-
 ...gBuildSecondReOpenableHashMatchIterator.java |  26 ++-
 .../runtime/taskmanager/RuntimeEnvironment.java |  16 +-
 .../operators/drivers/TestTaskContext.java      |   9 +
 .../MutableHashTablePerformanceBenchmark.java   |  10 +-
 .../hash/NonReusingHashMatchIteratorITCase.java |  12 +-
 .../NonReusingReOpenableHashTableITCase.java    |  33 ++--
 .../hash/ReusingHashMatchIteratorITCase.java    |  12 +-
 .../hash/ReusingReOpenableHashTableITCase.java  |  33 ++--
 .../operators/testutils/DriverTestBase.java     |  18 +-
 .../operators/testutils/MockEnvironment.java    |  11 +-
 .../testutils/UnaryOperatorTestBase.java        |  12 +-
 .../operators/util/HashVsSortMiniBenchmark.java |   4 +-
 .../runtime/tasks/StreamMockEnvironment.java    |  14 +-
 31 files changed, 338 insertions(+), 377 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0b73b438/docs/setup/config.md
----------------------------------------------------------------------
diff --git a/docs/setup/config.md b/docs/setup/config.md
index ba541d4..53b9ae0 100644
--- a/docs/setup/config.md
+++ b/docs/setup/config.md
@@ -244,11 +244,6 @@ free for objects created by user-defined functions. (DEFAULT: 0.7)
 This parameter is only evaluated, if `taskmanager.memory.size` is not set.
 - `jobclient.polling.interval`: The interval (in seconds) in which the client
 polls the JobManager for the status of its job (DEFAULT: 2).
-- `taskmanager.runtime.max-fan`: The maximal fan-in for external merge joins and
-fan-out for spilling hash tables. Limits the number of file handles per operator,
-but may cause intermediate merging/partitioning, if set too small (DEFAULT: 128).
-- `taskmanager.runtime.sort-spilling-threshold`: A sort operation starts spilling
-when this fraction of its memory budget is full (DEFAULT: 0.8).
 - `taskmanager.heartbeat-interval`: The interval in which the TaskManager sends
 heartbeats to the JobManager.
 - `jobmanager.max-heartbeat-delay-before-failure.msecs`: The maximum time that a
@@ -324,6 +319,16 @@ sample exceeds this value (possible because of misconfiguration of the parser),
 the sampling aborts. This value can be overridden for a specific input with the
 input format's parameters (DEFAULT: 2097152 (= 2 MiBytes)).
 
+### Runtime Algorithms
+
+- `taskmanager.runtime.max-fan`: The maximal fan-in for external merge joins and
+fan-out for spilling hash tables. Limits the number of file handles per operator,
+but may cause intermediate merging/partitioning, if set too small (DEFAULT: 128).
+- `taskmanager.runtime.sort-spilling-threshold`: A sort operation starts spilling
+when this fraction of its memory budget is full (DEFAULT: 0.8).
+- `taskmanager.runtime.hashjoin-bloom-filters`: If true, the hash join uses bloom filters to pre-filter records against spilled partitions. (DEFAULT: true)
+
+
 ## YARN
 
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0b73b438/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index dad2d99..d145eb2 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -172,6 +172,8 @@ public final class ConfigConstants {
 	 */
 	public static final String TASK_MANAGER_MAX_REGISTRATION_DURATION = "taskmanager.maxRegistrationDuration";
 
+	// --------------------------- Runtime Algorithms -------------------------------
+	
 	/**
 	 * Parameter for the maximum fan for out-of-core algorithms.
 	 * Corresponds to the maximum fan-in for merge-sorts and the maximum fan-out
@@ -184,18 +186,17 @@ public final class ConfigConstants {
 	 * sorter will start spilling to disk.
 	 */
 	public static final String DEFAULT_SORT_SPILLING_THRESHOLD_KEY = "taskmanager.runtime.sort-spilling-threshold";
+
+	/**
+	 * Parameter to switch hash join bloom filters for spilled partitions on and off.
+	 */
+	public static final String RUNTIME_HASH_JOIN_BLOOM_FILTERS_KEY = "taskmanager.runtime.hashjoin-bloom-filters";
 	
 	/**
 	 * The config parameter defining the timeout for filesystem stream opening.
 	 * A value of 0 indicates infinite waiting.
 	 */
 	public static final String FS_STREAM_OPENING_TIMEOUT_KEY = "taskmanager.runtime.fs_timeout";
-	
-	/**
-	 * While spill probe record to disk during probe phase, whether enable bloom filter to filter the probe records
-	 * to minimize the spilled probe records count.
-	 */
-	public static final String HASHJOIN_ENABLE_BLOOMFILTER = "hashjoin.bloomfilter.enabled";
 
 	// ------------------------ YARN Configuration ------------------------
 
@@ -543,6 +544,13 @@ public final class ConfigConstants {
 	 * The default task manager's maximum registration duration
 	 */
 	public static final String DEFAULT_TASK_MANAGER_MAX_REGISTRATION_DURATION = "Inf";
+
+	// ------------------------ Runtime Algorithms ------------------------
+	
+	/**
+	 * Default setting for the switch for hash join bloom filters for spilled partitions.
+	 */
+	public static final boolean DEFAULT_RUNTIME_HASH_JOIN_BLOOM_FILTERS = true;
 	
 	/**
 	 * The default value for the maximum spilling fan in/out.
@@ -558,15 +566,9 @@ public final class ConfigConstants {
 	 * The default timeout for filesystem stream opening: infinite (means max long milliseconds).
 	 */
 	public static final int DEFAULT_FS_STREAM_OPENING_TIMEOUT = 0;
-	
-	/**
-	 * Enable bloom filter for hash join as it promote hash join performance most of the time.
-	 */
-	public static final boolean DEAFULT_HASHJOIN_ENABLE_BLOOMFILTER = true;
 
 	// ------------------------ YARN Configuration ------------------------
 
-
 	/**
 	 * Minimum amount of Heap memory to subtract from the requested TaskManager size.
 	 * We came up with these values experimentally.

http://git-wip-us.apache.org/repos/asf/flink/blob/0b73b438/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
index af29560..c742ce5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
@@ -31,6 +31,7 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
 import org.apache.flink.runtime.memorymanager.MemoryManager;
 import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
 
 import java.util.Map;
 import java.util.concurrent.Future;
@@ -72,14 +73,11 @@ public interface Environment {
 	Configuration getTaskConfiguration();
 
 	/**
-	 * @return The task manager configuration
-	 */
-	Configuration getTaskManagerConfiguration();
-
-	/**
-	 * @return Hostname of the task manager
+	 * Gets the task manager info, with configuration and hostname.
+	 * 
+	 * @return The task manager info, with configuration and hostname. 
 	 */
-	String getHostname();
+	TaskManagerRuntimeInfo getTaskManagerInfo();
 
 	/**
 	 * Returns the job-wide configuration object that was attached to the JobGraph.

http://git-wip-us.apache.org/repos/asf/flink/blob/0b73b438/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractCachedBuildSideJoinDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractCachedBuildSideJoinDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractCachedBuildSideJoinDriver.java
index aff8d01..4096f0c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractCachedBuildSideJoinDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractCachedBuildSideJoinDriver.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.functions.FlatJoinFunction;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypePairComparatorFactory;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.runtime.operators.hash.NonReusingBuildFirstReOpenableHashMatchIterator;
 import org.apache.flink.runtime.operators.hash.NonReusingBuildSecondReOpenableHashMatchIterator;
 import org.apache.flink.runtime.operators.hash.ReusingBuildFirstReOpenableHashMatchIterator;
@@ -74,7 +75,10 @@ public abstract class AbstractCachedBuildSideJoinDriver<IT1, IT2, OT> extends Jo
 				this.taskContext.getTaskConfig().getPairComparatorFactory(this.taskContext.getUserCodeClassLoader());
 
 		double availableMemory = config.getRelativeMemoryDriver();
-
+		boolean hashJoinUseBitMaps = taskContext.getTaskManagerInfo().getConfiguration().getBoolean(
+				ConfigConstants.RUNTIME_HASH_JOIN_BLOOM_FILTERS_KEY,
+				ConfigConstants.DEFAULT_RUNTIME_HASH_JOIN_BLOOM_FILTERS);
+		
 		ExecutionConfig executionConfig = taskContext.getExecutionConfig();
 		objectReuseEnabled = executionConfig.isObjectReuseEnabled();
 
@@ -89,7 +93,8 @@ public abstract class AbstractCachedBuildSideJoinDriver<IT1, IT2, OT> extends Jo
 						this.taskContext.getMemoryManager(),
 						this.taskContext.getIOManager(),
 						this.taskContext.getOwningNepheleTask(),
-						availableMemory);
+						availableMemory,
+						hashJoinUseBitMaps);
 
 
 			} else if (buildSideIndex == 1 && probeSideIndex == 0) {
@@ -102,7 +107,8 @@ public abstract class AbstractCachedBuildSideJoinDriver<IT1, IT2, OT> extends Jo
 						this.taskContext.getMemoryManager(),
 						this.taskContext.getIOManager(),
 						this.taskContext.getOwningNepheleTask(),
-						availableMemory);
+						availableMemory,
+						hashJoinUseBitMaps);
 
 			} else {
 				throw new Exception("Error: Inconsistent setup for repeatable hash join driver.");
@@ -118,7 +124,8 @@ public abstract class AbstractCachedBuildSideJoinDriver<IT1, IT2, OT> extends Jo
 						this.taskContext.getMemoryManager(),
 						this.taskContext.getIOManager(),
 						this.taskContext.getOwningNepheleTask(),
-						availableMemory);
+						availableMemory,
+						hashJoinUseBitMaps);
 
 
 			} else if (buildSideIndex == 1 && probeSideIndex == 0) {
@@ -131,7 +138,8 @@ public abstract class AbstractCachedBuildSideJoinDriver<IT1, IT2, OT> extends Jo
 						this.taskContext.getMemoryManager(),
 						this.taskContext.getIOManager(),
 						this.taskContext.getOwningNepheleTask(),
-						availableMemory);
+						availableMemory,
+						hashJoinUseBitMaps);
 
 			} else {
 				throw new Exception("Error: Inconsistent setup for repeatable hash join driver.");
@@ -148,12 +156,10 @@ public abstract class AbstractCachedBuildSideJoinDriver<IT1, IT2, OT> extends Jo
 
 	@Override
 	public void run() throws Exception {
-
 		final FlatJoinFunction<IT1, IT2, OT> matchStub = this.taskContext.getStub();
 		final Collector<OT> collector = this.taskContext.getOutputCollector();
 		
 		while (this.running && matchIterator != null && matchIterator.callWithNextKey(matchStub, collector));
-			
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/0b73b438/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinDriver.java
index af3da55..5df715f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinDriver.java
@@ -19,25 +19,27 @@
 package org.apache.flink.runtime.operators;
 
 import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.runtime.operators.hash.NonReusingBuildFirstHashMatchIterator;
-import org.apache.flink.runtime.operators.hash.NonReusingBuildSecondHashMatchIterator;
-import org.apache.flink.runtime.operators.sort.NonReusingMergeInnerJoinIterator;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.apache.flink.api.common.functions.FlatJoinFunction;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypePairComparatorFactory;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.memorymanager.MemoryManager;
+import org.apache.flink.runtime.operators.hash.NonReusingBuildFirstHashMatchIterator;
+import org.apache.flink.runtime.operators.hash.NonReusingBuildSecondHashMatchIterator;
 import org.apache.flink.runtime.operators.hash.ReusingBuildFirstHashMatchIterator;
 import org.apache.flink.runtime.operators.hash.ReusingBuildSecondHashMatchIterator;
+import org.apache.flink.runtime.operators.sort.NonReusingMergeInnerJoinIterator;
 import org.apache.flink.runtime.operators.sort.ReusingMergeInnerJoinIterator;
 import org.apache.flink.runtime.operators.util.JoinTaskIterator;
 import org.apache.flink.runtime.operators.util.TaskConfig;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.MutableObjectIterator;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 /**
  * The join driver implements the logic of a join operator at runtime. It instantiates either
  * hash or sort-merge based strategies to find joining pairs of records.
@@ -115,19 +117,40 @@ public class JoinDriver<IT1, IT2, OT> implements PactDriver<FlatJoinFunction<IT1
 		if (LOG.isDebugEnabled()) {
 			LOG.debug("Join Driver object reuse: " + (objectReuseEnabled ? "ENABLED" : "DISABLED") + ".");
 		}
+		
+		boolean hashJoinUseBitMaps = taskContext.getTaskManagerInfo().getConfiguration().getBoolean(
+				ConfigConstants.RUNTIME_HASH_JOIN_BLOOM_FILTERS_KEY,
+				ConfigConstants.DEFAULT_RUNTIME_HASH_JOIN_BLOOM_FILTERS);
 
 		// create and return joining iterator according to provided local strategy.
 		if (objectReuseEnabled) {
 			switch (ls) {
 				case MERGE:
-					this.joinIterator = new ReusingMergeInnerJoinIterator<>(in1, in2, serializer1, comparator1, serializer2, comparator2, pairComparatorFactory.createComparator12(comparator1, comparator2), memoryManager, ioManager, numPages, this.taskContext.getOwningNepheleTask());
-
+					this.joinIterator = new ReusingMergeInnerJoinIterator<>(in1, in2, 
+							serializer1, comparator1,
+							serializer2, comparator2,
+							pairComparatorFactory.createComparator12(comparator1, comparator2),
+							memoryManager, ioManager, numPages, this.taskContext.getOwningNepheleTask());
 					break;
 				case HYBRIDHASH_BUILD_FIRST:
-					this.joinIterator = new ReusingBuildFirstHashMatchIterator<>(in1, in2, serializer1, comparator1, serializer2, comparator2, pairComparatorFactory.createComparator21(comparator1, comparator2), memoryManager, ioManager, this.taskContext.getOwningNepheleTask(), fractionAvailableMemory);
+					this.joinIterator = new ReusingBuildFirstHashMatchIterator<>(in1, in2,
+							serializer1, comparator1,
+							serializer2, comparator2,
+							pairComparatorFactory.createComparator21(comparator1, comparator2),
+							memoryManager, ioManager,
+							this.taskContext.getOwningNepheleTask(),
+							fractionAvailableMemory,
+							hashJoinUseBitMaps);
 					break;
 				case HYBRIDHASH_BUILD_SECOND:
-					this.joinIterator = new ReusingBuildSecondHashMatchIterator<>(in1, in2, serializer1, comparator1, serializer2, comparator2, pairComparatorFactory.createComparator12(comparator1, comparator2), memoryManager, ioManager, this.taskContext.getOwningNepheleTask(), fractionAvailableMemory);
+					this.joinIterator = new ReusingBuildSecondHashMatchIterator<>(in1, in2,
+							serializer1, comparator1,
+							serializer2, comparator2,
+							pairComparatorFactory.createComparator12(comparator1, comparator2),
+							memoryManager, ioManager,
+							this.taskContext.getOwningNepheleTask(),
+							fractionAvailableMemory,
+							hashJoinUseBitMaps);
 					break;
 				default:
 					throw new Exception("Unsupported driver strategy for join driver: " + ls.name());
@@ -135,14 +158,32 @@ public class JoinDriver<IT1, IT2, OT> implements PactDriver<FlatJoinFunction<IT1
 		} else {
 			switch (ls) {
 				case MERGE:
-					this.joinIterator = new NonReusingMergeInnerJoinIterator<>(in1, in2, serializer1, comparator1, serializer2, comparator2, pairComparatorFactory.createComparator12(comparator1, comparator2), memoryManager, ioManager, numPages, this.taskContext.getOwningNepheleTask());
+					this.joinIterator = new NonReusingMergeInnerJoinIterator<>(in1, in2,
+							serializer1, comparator1,
+							serializer2, comparator2,
+							pairComparatorFactory.createComparator12(comparator1, comparator2),
+							memoryManager, ioManager, numPages, this.taskContext.getOwningNepheleTask());
 
 					break;
 				case HYBRIDHASH_BUILD_FIRST:
-					this.joinIterator = new NonReusingBuildFirstHashMatchIterator<>(in1, in2, serializer1, comparator1, serializer2, comparator2, pairComparatorFactory.createComparator21(comparator1, comparator2), memoryManager, ioManager, this.taskContext.getOwningNepheleTask(), fractionAvailableMemory);
+					this.joinIterator = new NonReusingBuildFirstHashMatchIterator<>(in1, in2,
+							serializer1, comparator1,
+							serializer2, comparator2,
+							pairComparatorFactory.createComparator21(comparator1, comparator2),
+							memoryManager, ioManager,
+							this.taskContext.getOwningNepheleTask(),
+							fractionAvailableMemory,
+							hashJoinUseBitMaps);
 					break;
 				case HYBRIDHASH_BUILD_SECOND:
-					this.joinIterator = new NonReusingBuildSecondHashMatchIterator<>(in1, in2, serializer1, comparator1, serializer2, comparator2, pairComparatorFactory.createComparator12(comparator1, comparator2), memoryManager, ioManager, this.taskContext.getOwningNepheleTask(), fractionAvailableMemory);
+					this.joinIterator = new NonReusingBuildSecondHashMatchIterator<>(in1, in2,
+							serializer1, comparator1,
+							serializer2, comparator2,
+							pairComparatorFactory.createComparator12(comparator1, comparator2),
+							memoryManager, ioManager,
+							this.taskContext.getOwningNepheleTask(),
+							fractionAvailableMemory,
+							hashJoinUseBitMaps);
 					break;
 				default:
 					throw new Exception("Unsupported driver strategy for join driver: " + ls.name());

http://git-wip-us.apache.org/repos/asf/flink/blob/0b73b438/flink-runtime/src/main/java/org/apache/flink/runtime/operators/PactDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/PactDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/PactDriver.java
index a53f5bf..288f7ca 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/PactDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/PactDriver.java
@@ -16,16 +16,14 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.operators;
 
 import org.apache.flink.api.common.functions.Function;
 
-
 /**
- * The interface to be implemented by all pact drivers that run alone (or as the primary driver) in a nephele task.
- * The driver is the code that deals with everything that specific to a certain PACT. It implements the actual
- * <i>map</i> or <i>reduce</i> specific code.
+ * The interface to be implemented by all drivers that run alone (or as the primary driver) in a task.
+ * A driver implements the actual code to perform a batch operation, like <i>map()</i>,
+ * <i>reduce()</i>, <i>join()</i>, or <i>coGroup()</i>.
  *
  * @see PactTaskContext
  * 
@@ -37,7 +35,7 @@ public interface PactDriver<S extends Function, OT> {
 	void setup(PactTaskContext<S, OT> context);
 	
 	/**
-	 * Gets the number of inputs (= Nephele Gates and Readers) that the task has.
+	 * Gets the number of inputs that the task has.
 	 * 
 	 * @return The number of inputs.
 	 */

http://git-wip-us.apache.org/repos/asf/flink/blob/0b73b438/flink-runtime/src/main/java/org/apache/flink/runtime/operators/PactTaskContext.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/PactTaskContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/PactTaskContext.java
index bc23fa3..5c2ed67 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/PactTaskContext.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/PactTaskContext.java
@@ -26,15 +26,14 @@ import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.memorymanager.MemoryManager;
 import org.apache.flink.runtime.operators.util.TaskConfig;
+import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.MutableObjectIterator;
 
 
 /**
- * A runtime task is the task that is executed by the flink engine inside a task vertex.
- * It typically has a {@link PactDriver}, and optionally multiple chained drivers. In addition, it
- * deals with the runtime setup and teardown and the control-flow logic. The latter appears especially
- * in the case of iterations.
+ * The task context gives a driver (e.g., {@link MapDriver}, or {@link JoinDriver}) access to
+ * the runtime components and configuration that they can use to fulfil their task.
  *
  * @param <S> The UDF type.
  * @param <OT> The produced data type.
@@ -44,6 +43,8 @@ import org.apache.flink.util.MutableObjectIterator;
 public interface PactTaskContext<S, OT> {
 	
 	TaskConfig getTaskConfig();
+	
+	TaskManagerRuntimeInfo getTaskManagerInfo();
 
 	ClassLoader getUserCodeClassLoader();
 	

http://git-wip-us.apache.org/repos/asf/flink/blob/0b73b438/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
index 78bf383..873d948 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
@@ -60,6 +60,7 @@ import org.apache.flink.runtime.operators.util.ReaderIterator;
 import org.apache.flink.runtime.operators.util.TaskConfig;
 import org.apache.flink.runtime.plugable.DeserializationDelegate;
 import org.apache.flink.runtime.plugable.SerializationDelegate;
+import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
 import org.apache.flink.types.Record;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.InstantiationUtil;
@@ -660,7 +661,7 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i
 	 */
 	protected void initInputReaders() throws Exception {
 		final int numInputs = getNumTaskInputs();
-		final MutableReader<?>[] inputReaders = new MutableReader[numInputs];
+		final MutableReader<?>[] inputReaders = new MutableReader<?>[numInputs];
 
 		int currentReaderOffset = 0;
 
@@ -705,7 +706,7 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i
 	 */
 	protected void initBroadcastInputReaders() throws Exception {
 		final int numBroadcastInputs = this.config.getNumBroadcastInputs();
-		final MutableReader<?>[] broadcastInputReaders = new MutableReader[numBroadcastInputs];
+		final MutableReader<?>[] broadcastInputReaders = new MutableReader<?>[numBroadcastInputs];
 
 		int currentReaderOffset = config.getNumInputs();
 
@@ -737,8 +738,8 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i
 	 */
 	protected void initInputsSerializersAndComparators(int numInputs, int numComparators) throws Exception {
 		this.inputSerializers = new TypeSerializerFactory<?>[numInputs];
-		this.inputComparators = numComparators > 0 ? new TypeComparator[numComparators] : null;
-		this.inputIterators = new MutableObjectIterator[numInputs];
+		this.inputComparators = numComparators > 0 ? new TypeComparator<?>[numComparators] : null;
+		this.inputIterators = new MutableObjectIterator<?>[numInputs];
 
 		ClassLoader userCodeClassLoader = getUserCodeClassLoader();
 		
@@ -764,7 +765,7 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i
 	 * Creates all the serializers and iterators for the broadcast inputs.
 	 */
 	protected void initBroadcastInputsSerializers(int numBroadcastInputs) throws Exception {
-		this.broadcastInputSerializers = new TypeSerializerFactory[numBroadcastInputs];
+		this.broadcastInputSerializers = new TypeSerializerFactory<?>[numBroadcastInputs];
 
 		ClassLoader userCodeClassLoader = getUserCodeClassLoader();
 
@@ -787,8 +788,8 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i
 		final MemoryManager memMan = getMemoryManager();
 		final IOManager ioMan = getIOManager();
 
-		this.localStrategies = new CloseableInputProvider[numInputs];
-		this.inputs = new MutableObjectIterator[numInputs];
+		this.localStrategies = new CloseableInputProvider<?>[numInputs];
+		this.inputs = new MutableObjectIterator<?>[numInputs];
 		this.excludeFromReset = new boolean[numInputs];
 		this.inputIsCached = new boolean[numInputs];
 		this.inputIsAsyncMaterialized = new boolean[numInputs];
@@ -807,8 +808,8 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i
 		// acts as a pipeline breaker. this one should only be there, if a pipeline breaker is needed.
 		// the second variant spills to the side and will not read unless the result is also consumed
 		// in a pipelined fashion.
-		this.resettableInputs = new SpillingResettableMutableObjectIterator[numInputs];
-		this.tempBarriers = new TempBarrier[numInputs];
+		this.resettableInputs = new SpillingResettableMutableObjectIterator<?>[numInputs];
+		this.tempBarriers = new TempBarrier<?>[numInputs];
 
 		for (int i = 0; i < numInputs; i++) {
 			final int memoryPages;
@@ -1044,6 +1045,11 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i
 	}
 
 	@Override
+	public TaskManagerRuntimeInfo getTaskManagerInfo() {
+		return getEnvironment().getTaskManagerInfo();
+	}
+
+	@Override
 	public MemoryManager getMemoryManager() {
 		return getEnvironment().getMemoryManager();
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/0b73b438/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/HashMatchIteratorBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/HashMatchIteratorBase.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/HashMatchIteratorBase.java
index 4e0112a..08b6191 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/HashMatchIteratorBase.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/HashMatchIteratorBase.java
@@ -32,6 +32,7 @@ import java.util.List;
  * Common methods for all Hash Join Iterators.
  */
 public class HashMatchIteratorBase {
+	
 	public <BT, PT> MutableHashTable<BT, PT> getHashJoin(
 			TypeSerializer<BT> buildSideSerializer,
 			TypeComparator<BT> buildSideComparator,
@@ -41,11 +42,15 @@ public class HashMatchIteratorBase {
 			MemoryManager memManager,
 			IOManager ioManager,
 			AbstractInvokable ownerTask,
-			double memoryFraction) throws MemoryAllocationException {
+			double memoryFraction,
+			boolean useBloomFilters) throws MemoryAllocationException {
 
 		final int numPages = memManager.computeNumberOfPages(memoryFraction);
 		final List<MemorySegment> memorySegments = memManager.allocatePages(ownerTask, numPages);
-		return new MutableHashTable<BT, PT>(buildSideSerializer, probeSideSerializer, buildSideComparator, probeSideComparator, pairComparator, memorySegments, ioManager);
+		
+		return new MutableHashTable<BT, PT>(buildSideSerializer, probeSideSerializer,
+				buildSideComparator, probeSideComparator, pairComparator,
+				memorySegments, ioManager,
+				useBloomFilters);
 	}
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0b73b438/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java
index 4a57986..b0042fc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java
@@ -24,10 +24,9 @@ import java.util.List;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.GlobalConfiguration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypePairComparator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -46,22 +45,16 @@ import org.apache.flink.runtime.operators.util.BloomFilter;
 import org.apache.flink.runtime.util.MathUtils;
 import org.apache.flink.util.MutableObjectIterator;
 
-
 /**
  * An implementation of a Hybrid Hash Join. The join starts operating in memory and gradually starts
  * spilling contents to disk, when the memory is not sufficient. It does not need to know a priori 
  * how large the input will be.
- * <p>
- * The design of this class follows on many parts the design presented in
- * "Hash joins and hash teams in Microsoft SQL Server", by Goetz Graefe et al. In its current state, the
- * implementation lacks features like dynamic role reversal, partition tuning, or histogram guided partitioning. 
- *<p>
- *
- *
- * <hr>
  * 
- * The layout of the buckets inside a memory segment is as follows:
+ * <p>The design of this class follows on many parts the design presented in
+ * "Hash joins and hash teams in Microsoft SQL Server", by Goetz Graefe et al. In its current state, the
+ * implementation lacks features like dynamic role reversal, partition tuning, or histogram guided partitioning.</p>
  * 
+ * <p>The layout of the buckets inside a memory segment is as follows:</p>
  * <pre>
  * +----------------------------- Bucket x ----------------------------
  * |Partition (1 byte) | Status (1 byte) | element count (2 bytes) |
@@ -189,8 +182,6 @@ public class MutableHashTable<BT, PT> implements MemorySegmentSource {
 	 */
 	private static final long BUCKET_FORWARD_POINTER_NOT_SET = ~0x0L;
 	
-//	private static final byte BUCKET_STATUS_SPILLED = 1;
-	
 	/**
 	 * Constant for the bucket status, indicating that the bucket is in memory.
 	 */
@@ -274,11 +265,14 @@ public class MutableHashTable<BT, PT> implements MemorySegmentSource {
 	 */
 	protected final int bucketsPerSegmentBits;
 	
-	/**
+	/** 
 	 * An estimate for the average record length.
 	 */
 	private final int avgRecordLen;
 	
+	/** Flag to enable/disable bloom filters for spilled partitions */
+	private final boolean useBloomFilters;
+	
 	// ------------------------------------------------------------------------
 	
 	/**
@@ -296,8 +290,6 @@ public class MutableHashTable<BT, PT> implements MemorySegmentSource {
 	 */
 	private HashBucketIterator<BT, PT> bucketIterator;
 	
-//	private LazyHashBucketIterator<BT, PT> lazyBucketIterator;
-	
 	/**
 	 * Iterator over the elements from the probe side.
 	 */
@@ -319,6 +311,10 @@ public class MutableHashTable<BT, PT> implements MemorySegmentSource {
 	 * of hash-codes and pointers to the elements.
 	 */
 	protected MemorySegment[] buckets;
+
+	/** The bloom filter utility used to transform hash buckets of spilled partitions into a
+	 * probabilistic filter */
+	private BloomFilter bloomFilter;
 	
 	/**
 	 * The number of buckets in the current table. The bucket array is not necessarily fully
@@ -353,25 +349,35 @@ public class MutableHashTable<BT, PT> implements MemorySegmentSource {
 	protected boolean furtherPartitioning = false;
 	
 	private boolean running = true;
-
-	private BloomFilter bloomFilter;
 	
 	// ------------------------------------------------------------------------
 	//                         Construction and Teardown
 	// ------------------------------------------------------------------------
+
+	public MutableHashTable(TypeSerializer<BT> buildSideSerializer, TypeSerializer<PT> probeSideSerializer,
+							TypeComparator<BT> buildSideComparator, TypeComparator<PT> probeSideComparator,
+							TypePairComparator<PT, BT> comparator,
+							List<MemorySegment> memorySegments, IOManager ioManager)
+	{
+		this(buildSideSerializer, probeSideSerializer, buildSideComparator, probeSideComparator, comparator,
+				memorySegments, ioManager, true);
+	}
 	
 	public MutableHashTable(TypeSerializer<BT> buildSideSerializer, TypeSerializer<PT> probeSideSerializer,
 			TypeComparator<BT> buildSideComparator, TypeComparator<PT> probeSideComparator,
-			TypePairComparator<PT, BT> comparator, List<MemorySegment> memorySegments, IOManager ioManager)
+			TypePairComparator<PT, BT> comparator,
+			List<MemorySegment> memorySegments,
+			IOManager ioManager,
+			boolean useBloomFilters)
 	{
 		this(buildSideSerializer, probeSideSerializer, buildSideComparator, probeSideComparator, comparator,
-			memorySegments, ioManager, DEFAULT_RECORD_LEN);
+			memorySegments, ioManager, DEFAULT_RECORD_LEN, useBloomFilters);
 	}
 	
 	public MutableHashTable(TypeSerializer<BT> buildSideSerializer, TypeSerializer<PT> probeSideSerializer,
 			TypeComparator<BT> buildSideComparator, TypeComparator<PT> probeSideComparator,
 			TypePairComparator<PT, BT> comparator, List<MemorySegment> memorySegments,
-			IOManager ioManager, int avgRecordLen)
+			IOManager ioManager, int avgRecordLen, boolean useBloomFilters)
 	{
 		// some sanity checks first
 		if (memorySegments == null) {
@@ -390,6 +396,7 @@ public class MutableHashTable<BT, PT> implements MemorySegmentSource {
 		this.recordComparator = comparator;
 		this.availableMemory = memorySegments;
 		this.ioManager = ioManager;
+		this.useBloomFilters = useBloomFilters;
 		
 		this.avgRecordLen = avgRecordLen > 0 ? avgRecordLen : 
 				buildSideSerializer.getLength() == -1 ? DEFAULT_RECORD_LEN : buildSideSerializer.getLength();
@@ -551,16 +558,12 @@ public class MutableHashTable<BT, PT> implements MemorySegmentSource {
 	}
 	
 	public boolean nextRecord() throws IOException {
-		
+
 		final boolean probeProcessing = processProbeIter();
-		if(probeProcessing) {
-			return true;
-		}
-		return prepareNextPartition();
+		return probeProcessing || prepareNextPartition();
 	}
 	
-	public HashBucketIterator<BT, PT> getMatchesFor(PT record) throws IOException
-	{
+	public HashBucketIterator<BT, PT> getMatchesFor(PT record) throws IOException {
 		final TypeComparator<PT> probeAccessors = this.probeSideComparator;
 		final int hash = hash(probeAccessors.hash(record), this.currentRecursionDepth);
 		final int posHashCode = hash % this.numBuckets;
@@ -585,32 +588,6 @@ public class MutableHashTable<BT, PT> implements MemorySegmentSource {
 		}
 	}
 	
-//	public LazyHashBucketIterator<BT, PT> getLazyMatchesFor(PT record) throws IOException
-//	{
-//		final TypeComparator<PT> probeAccessors = this.probeSideComparator;
-//		final int hash = hash(probeAccessors.hash(record), this.currentRecursionDepth);
-//		final int posHashCode = hash % this.numBuckets;
-//		
-//		// get the bucket for the given hash code
-//		final int bucketArrayPos = posHashCode >> this.bucketsPerSegmentBits;
-//		final int bucketInSegmentOffset = (posHashCode & this.bucketsPerSegmentMask) << NUM_INTRA_BUCKET_BITS;
-//		final MemorySegment bucket = this.buckets[bucketArrayPos];
-//		
-//		// get the basic characteristics of the bucket
-//		final int partitionNumber = bucket.get(bucketInSegmentOffset + HEADER_PARTITION_OFFSET);
-//		final HashPartition<BT, PT> p = this.partitionsBeingBuilt.get(partitionNumber);
-//		
-//		// for an in-memory partition, process set the return iterators, else spill the probe records
-//		if (p.isInMemory()) {
-//			this.recordComparator.setReference(record);
-//			this.lazyBucketIterator.set(bucket, p.overflowSegments, p, hash, bucketInSegmentOffset);
-//			return this.lazyBucketIterator;
-//		}
-//		else {
-//			throw new IllegalStateException("Method is not applicable to partially spilled hash tables.");
-//		}
-//	}
-	
 	public PT getCurrentProbeRecord() {
 		return this.probeIterator.getCurrent();
 	}
@@ -739,7 +716,7 @@ public class MutableHashTable<BT, PT> implements MemorySegmentSource {
 		}
 	}
 
-	final private int getEstimatedMaxBucketEntries(int numBuffers, int bufferSize, int numBuckets, int recordLenBytes) {
+	private int getEstimatedMaxBucketEntries(int numBuffers, int bufferSize, int numBuckets, int recordLenBytes) {
 		final long totalSize = ((long) bufferSize) * numBuffers;
 		final long numRecordsStorable = totalSize / (recordLenBytes + RECORD_OVERHEAD_BYTES);
 		final long maxNumRecordsStorable = (MAX_RECURSION_DEPTH + 1) * numRecordsStorable;
@@ -1092,9 +1069,7 @@ public class MutableHashTable<BT, PT> implements MemorySegmentSource {
 		this.buckets = table;
 		this.numBuckets = numBuckets;
 		
-		boolean enableBloomFilter = GlobalConfiguration.getBoolean(
-			ConfigConstants.HASHJOIN_ENABLE_BLOOMFILTER, ConfigConstants.DEAFULT_HASHJOIN_ENABLE_BLOOMFILTER);
-		if (enableBloomFilter) {
+		if (useBloomFilters) {
 			initBloomFilter(numBuckets);
 		}
 	}
@@ -1107,8 +1082,8 @@ public class MutableHashTable<BT, PT> implements MemorySegmentSource {
 		this.numBuckets = 0;
 		
 		if (this.buckets != null) {
-			for (int i = 0; i < this.buckets.length; i++) {
-				this.availableMemory.add(this.buckets[i]);
+			for (MemorySegment bucket : this.buckets) {
+				this.availableMemory.add(bucket);
 			}
 			this.buckets = null;
 		}
@@ -1138,9 +1113,7 @@ public class MutableHashTable<BT, PT> implements MemorySegmentSource {
 		}
 		final HashPartition<BT, PT> p = partitions.get(largestPartNum);
 		
-		boolean enableBloomFilter = GlobalConfiguration.getBoolean(
-			ConfigConstants.HASHJOIN_ENABLE_BLOOMFILTER, ConfigConstants.DEAFULT_HASHJOIN_ENABLE_BLOOMFILTER);
-		if (enableBloomFilter) {
+		if (useBloomFilters) {
 			buildBloomFilterForBucketsInPartition(largestPartNum, p);
 		}
 		
@@ -1196,7 +1169,7 @@ public class MutableHashTable<BT, PT> implements MemorySegmentSource {
 		buildBloomFilterForExtraOverflowSegments(bucketInSegmentPos, bucket, p);
 	}
 	
-	final private void buildBloomFilterForExtraOverflowSegments(int bucketInSegmentPos, MemorySegment bucket, HashPartition<BT, PT> p) {
+	private void buildBloomFilterForExtraOverflowSegments(int bucketInSegmentPos, MemorySegment bucket, HashPartition<BT, PT> p) {
 		int totalCount = 0;
 		boolean skip = false;
 		long forwardPointer = bucket.getLong(bucketInSegmentPos + HEADER_FORWARD_OFFSET);
@@ -1207,7 +1180,7 @@ public class MutableHashTable<BT, PT> implements MemorySegmentSource {
 				break;
 			}
 			MemorySegment overflowSegment = p.overflowSegments[overflowSegNum];
-			int bucketInOverflowSegmentOffset = (int) (forwardPointer & 0xffffffff);
+			int bucketInOverflowSegmentOffset = (int) forwardPointer;
 			
 			final int count = overflowSegment.getShort(bucketInOverflowSegmentOffset + HEADER_COUNT_OFFSET);
 			totalCount += count;
@@ -1587,93 +1560,6 @@ public class MutableHashTable<BT, PT> implements MemorySegmentSource {
 		}
 
 	} // end HashBucketIterator
-	
-
-	// ======================================================================================================
-	
-//	public static final class LazyHashBucketIterator<BT, PT> {
-//		
-//		private final TypePairComparator<PT, BT> comparator;
-//		
-//		private MemorySegment bucket;
-//		
-//		private MemorySegment[] overflowSegments;
-//		
-//		private HashPartition<BT, PT> partition;
-//		
-//		private int bucketInSegmentOffset;
-//		
-//		private int searchHashCode;
-//		
-//		private int posInSegment;
-//		
-//		private int countInSegment;
-//		
-//		private int numInSegment;
-//		
-//		private LazyHashBucketIterator(TypePairComparator<PT, BT> comparator) {
-//			this.comparator = comparator;
-//		}
-//		
-//		
-//		void set(MemorySegment bucket, MemorySegment[] overflowSegments, HashPartition<BT, PT> partition,
-//				int searchHashCode, int bucketInSegmentOffset) {
-//			
-//			this.bucket = bucket;
-//			this.overflowSegments = overflowSegments;
-//			this.partition = partition;
-//			this.searchHashCode = searchHashCode;
-//			this.bucketInSegmentOffset = bucketInSegmentOffset;
-//			
-//			this.posInSegment = this.bucketInSegmentOffset + BUCKET_HEADER_LENGTH;
-//			this.countInSegment = bucket.getShort(bucketInSegmentOffset + HEADER_COUNT_OFFSET);
-//			this.numInSegment = 0;
-//		}
-//
-//		public boolean next(BT target) {
-//			// loop over all segments that are involved in the bucket (original bucket plus overflow buckets)
-//			while (true) {
-//				
-//				while (this.numInSegment < this.countInSegment) {
-//					
-//					final int thisCode = this.bucket.getInt(this.posInSegment);
-//					this.posInSegment += HASH_CODE_LEN;
-//						
-//					// check if the hash code matches
-//					if (thisCode == this.searchHashCode) {
-//						// get the pointer to the pair
-//						final long pointer = this.bucket.getLong(this.bucketInSegmentOffset + 
-//													BUCKET_POINTER_START_OFFSET + (this.numInSegment * POINTER_LEN));
-//						this.numInSegment++;
-//							
-//						// check whether it is really equal, or whether we had only a hash collision
-//						LazyDeSerializable lds = (LazyDeSerializable) target;
-//						lds.setDeSerializer(this.partition, this.partition.getWriteView(), pointer);
-//						if (this.comparator.equalToReference(target)) {
-//							return true;
-//						}
-//					}
-//					else {
-//						this.numInSegment++;
-//					}
-//				}
-//				
-//				// this segment is done. check if there is another chained bucket
-//				final long forwardPointer = this.bucket.getLong(this.bucketInSegmentOffset + HEADER_FORWARD_OFFSET);
-//				if (forwardPointer == BUCKET_FORWARD_POINTER_NOT_SET) {
-//					return false;
-//				}
-//				
-//				final int overflowSegNum = (int) (forwardPointer >>> 32);
-//				this.bucket = this.overflowSegments[overflowSegNum];
-//				this.bucketInSegmentOffset = (int) (forwardPointer & 0xffffffff);
-//				this.countInSegment = this.bucket.getShort(this.bucketInSegmentOffset + HEADER_COUNT_OFFSET);
-//				this.posInSegment = this.bucketInSegmentOffset + BUCKET_HEADER_LENGTH;
-//				this.numInSegment = 0;
-//			}
-//		}
-//	} 
-	
 
 	// ======================================================================================================
 	

http://git-wip-us.apache.org/repos/asf/flink/blob/0b73b438/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/NonReusingBuildFirstHashMatchIterator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/NonReusingBuildFirstHashMatchIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/NonReusingBuildFirstHashMatchIterator.java
index c2d7805..5000dab 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/NonReusingBuildFirstHashMatchIterator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/NonReusingBuildFirstHashMatchIterator.java
@@ -67,16 +67,16 @@ public class NonReusingBuildFirstHashMatchIterator<V1, V2, O> extends HashMatchI
 			TypePairComparator<V2, V1> pairComparator,
 			MemoryManager memManager, IOManager ioManager,
 			AbstractInvokable ownerTask,
-			double memoryFraction)
-	throws MemoryAllocationException
-	{		
+			double memoryFraction,
+			boolean useBitmapFilters) throws MemoryAllocationException {
+		
 		this.memManager = memManager;
 		this.firstInput = firstInput;
 		this.secondInput = secondInput;
 		this.probeSideSerializer = serializer2;
 
 		this.hashJoin = getHashJoin(serializer1, comparator1, serializer2, comparator2,
-				pairComparator, memManager, ioManager, ownerTask, memoryFraction);
+				pairComparator, memManager, ioManager, ownerTask, memoryFraction, useBitmapFilters);
 	}
 	
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/0b73b438/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/NonReusingBuildFirstReOpenableHashMatchIterator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/NonReusingBuildFirstReOpenableHashMatchIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/NonReusingBuildFirstReOpenableHashMatchIterator.java
index ee870a6..af1626a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/NonReusingBuildFirstReOpenableHashMatchIterator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/NonReusingBuildFirstReOpenableHashMatchIterator.java
@@ -48,25 +48,32 @@ public class NonReusingBuildFirstReOpenableHashMatchIterator<V1, V2, O> extends
 			MemoryManager memManager,
 			IOManager ioManager,
 			AbstractInvokable ownerTask,
-			double memoryFraction)
-		throws MemoryAllocationException
-	{
+			double memoryFraction,
+			boolean useBitmapFilters) throws MemoryAllocationException {
+		
 		super(firstInput, secondInput, serializer1, comparator1, serializer2,
 				comparator2, pairComparator, memManager, ioManager, ownerTask,
-				memoryFraction);
+				memoryFraction, useBitmapFilters);
+		
 		reopenHashTable = (ReOpenableMutableHashTable<V1, V2>) hashJoin;
 	}
 
 	@Override
-	public <BT, PT> MutableHashTable<BT, PT> getHashJoin(TypeSerializer<BT> buildSideSerializer, TypeComparator<BT> buildSideComparator,
+	public <BT, PT> MutableHashTable<BT, PT> getHashJoin(
+			TypeSerializer<BT> buildSideSerializer, TypeComparator<BT> buildSideComparator,
 			TypeSerializer<PT> probeSideSerializer, TypeComparator<PT> probeSideComparator,
 			TypePairComparator<PT, BT> pairComparator,
-			MemoryManager memManager, IOManager ioManager, AbstractInvokable ownerTask, double memoryFraction)
-	throws MemoryAllocationException
-	{
+			MemoryManager memManager, IOManager ioManager,
+			AbstractInvokable ownerTask,
+			double memoryFraction,
+			boolean useBitmapFilters) throws MemoryAllocationException {
+		
 		final int numPages = memManager.computeNumberOfPages(memoryFraction);
 		final List<MemorySegment> memorySegments = memManager.allocatePages(ownerTask, numPages);
-		return new ReOpenableMutableHashTable<BT, PT>(buildSideSerializer, probeSideSerializer, buildSideComparator, probeSideComparator, pairComparator, memorySegments, ioManager);
+		
+		return new ReOpenableMutableHashTable<BT, PT>(buildSideSerializer, probeSideSerializer,
+				buildSideComparator, probeSideComparator, pairComparator,
+				memorySegments, ioManager, useBitmapFilters);
 	}
 
 	/**
@@ -76,5 +83,4 @@ public class NonReusingBuildFirstReOpenableHashMatchIterator<V1, V2, O> extends
 	public void reopenProbe(MutableObjectIterator<V2> probeInput) throws IOException {
 		reopenHashTable.reopenProbe(probeInput);
 	}
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0b73b438/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/NonReusingBuildSecondHashMatchIterator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/NonReusingBuildSecondHashMatchIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/NonReusingBuildSecondHashMatchIterator.java
index 6099ac7..83952c9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/NonReusingBuildSecondHashMatchIterator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/NonReusingBuildSecondHashMatchIterator.java
@@ -66,16 +66,16 @@ public class NonReusingBuildSecondHashMatchIterator<V1, V2, O> extends HashMatch
 			TypePairComparator<V1, V2> pairComparator,
 			MemoryManager memManager, IOManager ioManager,
 			AbstractInvokable ownerTask,
-			double memoryFraction)
-	throws MemoryAllocationException
-	{		
+			double memoryFraction,
+			boolean useBitmapFilters) throws MemoryAllocationException {
+		
 		this.memManager = memManager;
 		this.firstInput = firstInput;
 		this.secondInput = secondInput;
 		this.probeSideSerializer = serializer1;
 		
 		this.hashJoin = getHashJoin(serializer2, comparator2, serializer1,
-				comparator1, pairComparator, memManager, ioManager, ownerTask, memoryFraction);
+				comparator1, pairComparator, memManager, ioManager, ownerTask, memoryFraction, useBitmapFilters);
 	}
 	
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/0b73b438/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/NonReusingBuildSecondReOpenableHashMatchIterator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/NonReusingBuildSecondReOpenableHashMatchIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/NonReusingBuildSecondReOpenableHashMatchIterator.java
index bc7e65b..029be5f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/NonReusingBuildSecondReOpenableHashMatchIterator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/NonReusingBuildSecondReOpenableHashMatchIterator.java
@@ -48,11 +48,12 @@ public class NonReusingBuildSecondReOpenableHashMatchIterator<V1, V2, O> extends
 			MemoryManager memManager,
 			IOManager ioManager,
 			AbstractInvokable ownerTask,
-			double memoryFraction)
-		throws MemoryAllocationException
-	{
+			double memoryFraction,
+			boolean useBitmapFilters) throws MemoryAllocationException {
+		
 		super(firstInput, secondInput, serializer1, comparator1, serializer2,
-				comparator2, pairComparator, memManager, ioManager, ownerTask, memoryFraction);
+				comparator2, pairComparator, memManager, ioManager, ownerTask, memoryFraction, useBitmapFilters);
+		
 		reopenHashTable = (ReOpenableMutableHashTable<V2, V1>) hashJoin;
 	}
 
@@ -62,12 +63,17 @@ public class NonReusingBuildSecondReOpenableHashMatchIterator<V1, V2, O> extends
 			TypeComparator<BT> buildSideComparator,
 			TypeSerializer<PT> probeSideSerializer, TypeComparator<PT> probeSideComparator,
 			TypePairComparator<PT, BT> pairComparator,
-			MemoryManager memManager, IOManager ioManager, AbstractInvokable ownerTask, double memoryFraction)
-	throws MemoryAllocationException
-	{
+			MemoryManager memManager, IOManager ioManager,
+			AbstractInvokable ownerTask,
+			double memoryFraction,
+			boolean useBitmapFilters) throws MemoryAllocationException {
+		
 		final int numPages = memManager.computeNumberOfPages(memoryFraction);
 		final List<MemorySegment> memorySegments = memManager.allocatePages(ownerTask, numPages);
-		return new ReOpenableMutableHashTable<BT, PT>(buildSideSerializer, probeSideSerializer, buildSideComparator, probeSideComparator, pairComparator, memorySegments, ioManager);
+		
+		return new ReOpenableMutableHashTable<BT, PT>(buildSideSerializer, probeSideSerializer,
+				buildSideComparator, probeSideComparator, pairComparator,
+				memorySegments, ioManager, useBitmapFilters);
 	}
 
 	/**
@@ -77,5 +83,4 @@ public class NonReusingBuildSecondReOpenableHashMatchIterator<V1, V2, O> extends
 	public void reopenProbe(MutableObjectIterator<V1> probeInput) throws IOException {
 		reopenHashTable.reopenProbe(probeInput);
 	}
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0b73b438/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReOpenableMutableHashTable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReOpenableMutableHashTable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReOpenableMutableHashTable.java
index 6819924..fd5fcde 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReOpenableMutableHashTable.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReOpenableMutableHashTable.java
@@ -33,17 +33,12 @@ import org.apache.flink.util.MutableObjectIterator;
 
 public class ReOpenableMutableHashTable<BT, PT> extends MutableHashTable<BT, PT> {
 
-	/**
-	 * Channel for the spilled partitions
-	 */
+	/** Channel for the spilled partitions */
 	private final FileIOChannel.Enumerator spilledInMemoryPartitions;
 	
-	/**
-	 * Stores the initial partitions and a list of the files that contain the spilled contents
-	 */
+	/** Stores the initial partitions and a list of the files that contain the spilled contents */
 	private List<HashPartition<BT, PT>> initialPartitions;
 	
-
 	/**
 	 * The values of these variables are stored here after the initial open()
 	 * Required to restore the initial state before each additional probe phase.
@@ -58,16 +53,17 @@ public class ReOpenableMutableHashTable<BT, PT> extends MutableHashTable<BT, PT>
 			TypeComparator<BT> buildSideComparator,
 			TypeComparator<PT> probeSideComparator,
 			TypePairComparator<PT, BT> comparator,
-			List<MemorySegment> memorySegments, IOManager ioManager) {
+			List<MemorySegment> memorySegments, IOManager ioManager,
+			boolean useBitmapFilters) {
+		
 		super(buildSideSerializer, probeSideSerializer, buildSideComparator,
-				probeSideComparator, comparator, memorySegments, ioManager);
+				probeSideComparator, comparator, memorySegments, ioManager, useBitmapFilters);
 		keepBuildSidePartitions = true;
 		spilledInMemoryPartitions = ioManager.createChannelEnumerator();
 	}
 	
 	@Override
-	public void open(MutableObjectIterator<BT> buildSide,
-			MutableObjectIterator<PT> probeSide) throws IOException {
+	public void open(MutableObjectIterator<BT> buildSide, MutableObjectIterator<PT> probeSide) throws IOException {
 		super.open(buildSide, probeSide);
 		initialPartitions = new ArrayList<HashPartition<BT, PT>>( partitionsBeingBuilt );
 		initialPartitionFanOut = (byte) partitionsBeingBuilt.size();

http://git-wip-us.apache.org/repos/asf/flink/blob/0b73b438/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReusingBuildFirstHashMatchIterator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReusingBuildFirstHashMatchIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReusingBuildFirstHashMatchIterator.java
index da76045..b4aaa95 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReusingBuildFirstHashMatchIterator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReusingBuildFirstHashMatchIterator.java
@@ -71,9 +71,9 @@ public class ReusingBuildFirstHashMatchIterator<V1, V2, O> extends HashMatchIter
 			MemoryManager memManager,
 			IOManager ioManager,
 			AbstractInvokable ownerTask,
-			double memoryFraction)
-	throws MemoryAllocationException
-	{		
+			double memoryFraction,
+			boolean useBitmapFilters) throws MemoryAllocationException {
+		
 		this.memManager = memManager;
 		this.firstInput = firstInput;
 		this.secondInput = secondInput;
@@ -83,7 +83,7 @@ public class ReusingBuildFirstHashMatchIterator<V1, V2, O> extends HashMatchIter
 		this.tempBuildSideRecord = serializer1.createInstance();
 
 		this.hashJoin = getHashJoin(serializer1, comparator1, serializer2,
-				comparator2, pairComparator, memManager, ioManager, ownerTask, memoryFraction);
+				comparator2, pairComparator, memManager, ioManager, ownerTask, memoryFraction, useBitmapFilters);
 	}
 	
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/0b73b438/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReusingBuildFirstReOpenableHashMatchIterator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReusingBuildFirstReOpenableHashMatchIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReusingBuildFirstReOpenableHashMatchIterator.java
index 5501271..714a1f5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReusingBuildFirstReOpenableHashMatchIterator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReusingBuildFirstReOpenableHashMatchIterator.java
@@ -48,25 +48,32 @@ public class ReusingBuildFirstReOpenableHashMatchIterator<V1, V2, O> extends Reu
 			MemoryManager memManager,
 			IOManager ioManager,
 			AbstractInvokable ownerTask,
-			double memoryFraction)
+			double memoryFraction,
+			boolean useBitmapFilters)
 		throws MemoryAllocationException
 	{
 		super(firstInput, secondInput, serializer1, comparator1, serializer2,
 				comparator2, pairComparator, memManager, ioManager, ownerTask,
-				memoryFraction);
+				memoryFraction, useBitmapFilters);
 		reopenHashTable = (ReOpenableMutableHashTable<V1, V2>) hashJoin;
 	}
 
 	@Override
-	public <BT, PT> MutableHashTable<BT, PT> getHashJoin(TypeSerializer<BT> buildSideSerializer, TypeComparator<BT> buildSideComparator,
+	public <BT, PT> MutableHashTable<BT, PT> getHashJoin(
+			TypeSerializer<BT> buildSideSerializer, TypeComparator<BT> buildSideComparator,
 			TypeSerializer<PT> probeSideSerializer, TypeComparator<PT> probeSideComparator,
 			TypePairComparator<PT, BT> pairComparator,
-			MemoryManager memManager, IOManager ioManager, AbstractInvokable ownerTask, double memoryFraction)
-	throws MemoryAllocationException
-	{
+			MemoryManager memManager, IOManager ioManager,
+			AbstractInvokable ownerTask,
+			double memoryFraction,
+			boolean useBitmapFilters) throws MemoryAllocationException {
+		
 		final int numPages = memManager.computeNumberOfPages(memoryFraction);
 		final List<MemorySegment> memorySegments = memManager.allocatePages(ownerTask, numPages);
-		return new ReOpenableMutableHashTable<BT, PT>(buildSideSerializer, probeSideSerializer, buildSideComparator, probeSideComparator, pairComparator, memorySegments, ioManager);
+		
+		return new ReOpenableMutableHashTable<BT, PT>(buildSideSerializer, probeSideSerializer,
+				buildSideComparator, probeSideComparator, pairComparator,
+				memorySegments, ioManager, useBitmapFilters);
 	}
 	
 	/**
@@ -76,5 +83,4 @@ public class ReusingBuildFirstReOpenableHashMatchIterator<V1, V2, O> extends Reu
 	public void reopenProbe(MutableObjectIterator<V2> probeInput) throws IOException {
 		reopenHashTable.reopenProbe(probeInput);
 	}
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0b73b438/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReusingBuildSecondHashMatchIterator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReusingBuildSecondHashMatchIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReusingBuildSecondHashMatchIterator.java
index a9435ef..b7c3e29 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReusingBuildSecondHashMatchIterator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReusingBuildSecondHashMatchIterator.java
@@ -71,9 +71,9 @@ public class ReusingBuildSecondHashMatchIterator<V1, V2, O> extends HashMatchIte
 			MemoryManager memManager,
 			IOManager ioManager,
 			AbstractInvokable ownerTask,
-			double memoryFraction)
-	throws MemoryAllocationException
-	{		
+			double memoryFraction,
+			boolean useBitmapFilters) throws MemoryAllocationException {
+		
 		this.memManager = memManager;
 		this.firstInput = firstInput;
 		this.secondInput = secondInput;
@@ -83,7 +83,7 @@ public class ReusingBuildSecondHashMatchIterator<V1, V2, O> extends HashMatchIte
 		this.tempBuildSideRecord = serializer2.createInstance();
 
 		this.hashJoin = getHashJoin(serializer2, comparator2, serializer1, comparator1, pairComparator,
-			memManager, ioManager, ownerTask, memoryFraction);
+			memManager, ioManager, ownerTask, memoryFraction, useBitmapFilters);
 	}
 	
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/0b73b438/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReusingBuildSecondReOpenableHashMatchIterator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReusingBuildSecondReOpenableHashMatchIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReusingBuildSecondReOpenableHashMatchIterator.java
index 559d20a..4b4cdf5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReusingBuildSecondReOpenableHashMatchIterator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReusingBuildSecondReOpenableHashMatchIterator.java
@@ -48,24 +48,31 @@ public class ReusingBuildSecondReOpenableHashMatchIterator<V1, V2, O> extends Re
 			MemoryManager memManager,
 			IOManager ioManager,
 			AbstractInvokable ownerTask,
-			double memoryFraction)
-		throws MemoryAllocationException
-	{
+			double memoryFraction,
+			boolean useBitmapFilters) throws MemoryAllocationException {
+		
 		super(firstInput, secondInput, serializer1, comparator1, serializer2,
-				comparator2, pairComparator, memManager, ioManager, ownerTask, memoryFraction);
+				comparator2, pairComparator, memManager, ioManager, ownerTask, memoryFraction, useBitmapFilters);
+		
 		reopenHashTable = (ReOpenableMutableHashTable<V2, V1>) hashJoin;
 	}
 
 	@Override
-	public <BT, PT> MutableHashTable<BT, PT> getHashJoin(TypeSerializer<BT> buildSideSerializer, TypeComparator<BT> buildSideComparator,
+	public <BT, PT> MutableHashTable<BT, PT> getHashJoin(
+			TypeSerializer<BT> buildSideSerializer, TypeComparator<BT> buildSideComparator,
 			TypeSerializer<PT> probeSideSerializer, TypeComparator<PT> probeSideComparator,
 			TypePairComparator<PT, BT> pairComparator,
-			MemoryManager memManager, IOManager ioManager, AbstractInvokable ownerTask, double memoryFraction)
-	throws MemoryAllocationException
-	{
+			MemoryManager memManager, IOManager ioManager,
+			AbstractInvokable ownerTask,
+			double memoryFraction,
+			boolean useBitmapFilters) throws MemoryAllocationException {
+		
 		final int numPages = memManager.computeNumberOfPages(memoryFraction);
 		final List<MemorySegment> memorySegments = memManager.allocatePages(ownerTask, numPages);
-		return new ReOpenableMutableHashTable<BT, PT>(buildSideSerializer, probeSideSerializer, buildSideComparator, probeSideComparator, pairComparator, memorySegments, ioManager);
+		
+		return new ReOpenableMutableHashTable<BT, PT>(buildSideSerializer, probeSideSerializer,
+				buildSideComparator, probeSideComparator, pairComparator,
+				memorySegments, ioManager, useBitmapFilters);
 	}
 	
 	/**
@@ -75,5 +82,4 @@ public class ReusingBuildSecondReOpenableHashMatchIterator<V1, V2, O> extends Re
 	public void reopenProbe(MutableObjectIterator<V1> probeInput) throws IOException {
 		reopenHashTable.reopenProbe(probeInput);
 	}
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0b73b438/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
index cd6dbd6..8cfc1c3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
@@ -75,9 +75,7 @@ public class RuntimeEnvironment implements Environment {
 
 	private final AccumulatorRegistry accumulatorRegistry;
 
-	private final Configuration taskManagerConfiguration;
-
-	private final String hostname;
+	private final TaskManagerRuntimeInfo taskManagerInfo;
 
 	// ------------------------------------------------------------------------
 
@@ -124,8 +122,7 @@ public class RuntimeEnvironment implements Environment {
 		this.writers = checkNotNull(writers);
 		this.inputGates = checkNotNull(inputGates);
 		this.jobManager = checkNotNull(jobManager);
-		this.taskManagerConfiguration = checkNotNull(taskManagerInfo).getConfiguration();
-		this.hostname = taskManagerInfo.getHostname();
+		this.taskManagerInfo = checkNotNull(taskManagerInfo);
 	}
 
 	// ------------------------------------------------------------------------
@@ -176,13 +173,8 @@ public class RuntimeEnvironment implements Environment {
 	}
 
 	@Override
-	public Configuration getTaskManagerConfiguration(){
-		return taskManagerConfiguration;
-	}
-
-	@Override
-	public String getHostname(){
-		return hostname;
+	public TaskManagerRuntimeInfo getTaskManagerInfo() {
+		return taskManagerInfo;
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/0b73b438/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/TestTaskContext.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/TestTaskContext.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/TestTaskContext.java
index b1466c9..30e417b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/TestTaskContext.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/TestTaskContext.java
@@ -33,6 +33,7 @@ import org.apache.flink.runtime.operators.DriverStrategy;
 import org.apache.flink.runtime.operators.PactTaskContext;
 import org.apache.flink.runtime.operators.testutils.DummyInvokable;
 import org.apache.flink.runtime.operators.util.TaskConfig;
+import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.MutableObjectIterator;
 
@@ -62,6 +63,8 @@ public class TestTaskContext<S, T> implements PactTaskContext<S, T> {
 
 	private ExecutionConfig executionConfig = new ExecutionConfig();
 
+	private TaskManagerRuntimeInfo taskManageInfo;
+
 	// --------------------------------------------------------------------------------------------
 	//  Constructors
 	// --------------------------------------------------------------------------------------------
@@ -70,6 +73,7 @@ public class TestTaskContext<S, T> implements PactTaskContext<S, T> {
 	
 	public TestTaskContext(long memoryInBytes) {
 		this.memoryManager = new DefaultMemoryManager(memoryInBytes,1 ,32 * 1024, true);
+		this.taskManageInfo = new TaskManagerRuntimeInfo("localhost", new Configuration());
 	}
 	
 	// --------------------------------------------------------------------------------------------
@@ -156,6 +160,11 @@ public class TestTaskContext<S, T> implements PactTaskContext<S, T> {
 	}
 
 	@Override
+	public TaskManagerRuntimeInfo getTaskManagerInfo() {
+		return this.taskManageInfo;
+	}
+
+	@Override
 	@SuppressWarnings("unchecked")
 	public <X> MutableObjectIterator<X> getInput(int index) {
 		switch (index) {

http://git-wip-us.apache.org/repos/asf/flink/blob/0b73b438/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/MutableHashTablePerformanceBenchmark.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/MutableHashTablePerformanceBenchmark.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/MutableHashTablePerformanceBenchmark.java
index 452e4c1..c0f8f59 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/MutableHashTablePerformanceBenchmark.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/MutableHashTablePerformanceBenchmark.java
@@ -24,9 +24,6 @@ import java.util.List;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypePairComparator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
@@ -40,6 +37,7 @@ import org.apache.flink.runtime.operators.testutils.types.StringPairComparator;
 import org.apache.flink.runtime.operators.testutils.types.StringPairPairComparator;
 import org.apache.flink.runtime.operators.testutils.types.StringPairSerializer;
 import org.apache.flink.util.MutableObjectIterator;
+
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -192,10 +190,6 @@ public class MutableHashTablePerformanceBenchmark {
 		InputIterator buildIterator = new InputIterator(buildSize, buildStep, buildScope);
 		InputIterator probeIterator = new InputIterator(probeSize, probeStep, probeScope);
 		
-		Configuration conf = new Configuration();
-		conf.setBoolean(ConfigConstants.HASHJOIN_ENABLE_BLOOMFILTER, enableBloomFilter);
-		GlobalConfiguration.includeConfiguration(conf);
-		
 		// allocate the memory for the HashTable
 		List<MemorySegment> memSegments;
 		try {
@@ -212,7 +206,7 @@ public class MutableHashTablePerformanceBenchmark {
 		final MutableHashTable<StringPair, StringPair> join = new MutableHashTable<StringPair, StringPair>(
 			this.pairBuildSideAccesssor, this.pairProbeSideAccesssor,
 			this.pairBuildSideComparator, this.pairProbeSideComparator, this.pairComparator,
-			memSegments, ioManager);
+			memSegments, ioManager, enableBloomFilter);
 		join.open(buildIterator, probeIterator);
 		
 		final StringPair recordReuse = new StringPair();

http://git-wip-us.apache.org/repos/asf/flink/blob/0b73b438/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/NonReusingHashMatchIteratorITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/NonReusingHashMatchIteratorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/NonReusingHashMatchIteratorITCase.java
index f4d2251..0d5a26e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/NonReusingHashMatchIteratorITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/NonReusingHashMatchIteratorITCase.java
@@ -155,7 +155,7 @@ public class NonReusingHashMatchIteratorITCase {
 					new NonReusingBuildFirstHashMatchIterator<Record, Record, Record>(
 						input1, input2, this.recordSerializer, this.record1Comparator, 
 						this.recordSerializer, this.record2Comparator, this.recordPairComparator,
-						this.memoryManager, ioManager, this.parentTask, 1.0);
+						this.memoryManager, ioManager, this.parentTask, 1.0, true);
 			
 			iterator.open();
 			
@@ -242,7 +242,7 @@ public class NonReusingHashMatchIteratorITCase {
 					new NonReusingBuildFirstHashMatchIterator<Record, Record, Record>(
 						input1, input2, this.recordSerializer, this.record1Comparator, 
 						this.recordSerializer, this.record2Comparator, this.recordPairComparator,
-						this.memoryManager, ioManager, this.parentTask, 1.0);
+						this.memoryManager, ioManager, this.parentTask, 1.0, true);
 
 			iterator.open();
 			
@@ -291,7 +291,7 @@ public class NonReusingHashMatchIteratorITCase {
 				new NonReusingBuildSecondHashMatchIterator<Record, Record, Record>(
 					input1, input2, this.recordSerializer, this.record1Comparator, 
 					this.recordSerializer, this.record2Comparator, this.recordPairComparator,
-					this.memoryManager, ioManager, this.parentTask, 1.0);
+					this.memoryManager, ioManager, this.parentTask, 1.0, true);
 
 			iterator.open();
 			
@@ -378,7 +378,7 @@ public class NonReusingHashMatchIteratorITCase {
 				new NonReusingBuildSecondHashMatchIterator<Record, Record, Record>(
 					input1, input2, this.recordSerializer, this.record1Comparator, 
 					this.recordSerializer, this.record2Comparator, this.recordPairComparator,
-					this.memoryManager, ioManager, this.parentTask, 1.0);
+					this.memoryManager, ioManager, this.parentTask, 1.0, true);
 			
 			iterator.open();
 			
@@ -425,7 +425,7 @@ public class NonReusingHashMatchIteratorITCase {
 					new NonReusingBuildSecondHashMatchIterator<IntPair, Record, Record>(
 						input1, input2, this.pairSerializer, this.pairComparator,
 						this.recordSerializer, this.record2Comparator, this.pairRecordPairComparator,
-						this.memoryManager, this.ioManager, this.parentTask, 1.0);
+						this.memoryManager, this.ioManager, this.parentTask, 1.0, true);
 			
 			iterator.open();
 			
@@ -472,7 +472,7 @@ public class NonReusingHashMatchIteratorITCase {
 					new NonReusingBuildFirstHashMatchIterator<IntPair, Record, Record>(
 						input1, input2, this.pairSerializer, this.pairComparator, 
 						this.recordSerializer, this.record2Comparator, this.recordPairPairComparator,
-						this.memoryManager, this.ioManager, this.parentTask, 1.0);
+						this.memoryManager, this.ioManager, this.parentTask, 1.0, true);
 			
 			iterator.open();
 			

http://git-wip-us.apache.org/repos/asf/flink/blob/0b73b438/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/NonReusingReOpenableHashTableITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/NonReusingReOpenableHashTableITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/NonReusingReOpenableHashTableITCase.java
index f5105bb..306a370 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/NonReusingReOpenableHashTableITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/NonReusingReOpenableHashTableITCase.java
@@ -239,7 +239,7 @@ public class NonReusingReOpenableHashTableITCase {
 				new NonReusingBuildFirstReOpenableHashMatchIterator<Record, Record, Record>(
 						buildInput, probeInput, this.recordSerializer, this.record1Comparator,
 					this.recordSerializer, this.record2Comparator, this.recordPairComparator,
-					this.memoryManager, ioManager, this.parentTask, 1.0);
+					this.memoryManager, ioManager, this.parentTask, 1.0, true);
 
 		iterator.open();
 		// do first join with both inputs
@@ -277,7 +277,7 @@ public class NonReusingReOpenableHashTableITCase {
 	//
 	//
 
-	private final MutableObjectIterator<Record> getProbeInput(final int numKeys,
+	private MutableObjectIterator<Record> getProbeInput(final int numKeys,
 			final int probeValsPerKey, final int repeatedValue1, final int repeatedValue2) {
 		MutableObjectIterator<Record> probe1 = new UniformRecordGenerator(numKeys, probeValsPerKey, true);
 		MutableObjectIterator<Record> probe2 = new ConstantsKeyValuePairsIterator(repeatedValue1, 17, 5);
@@ -334,9 +334,9 @@ public class NonReusingReOpenableHashTableITCase {
 		final ReOpenableMutableHashTable<Record, Record> join = new ReOpenableMutableHashTable<Record, Record>(
 				this.recordBuildSideAccesssor, this.recordProbeSideAccesssor,
 				this.recordBuildSideComparator, this.recordProbeSideComparator, this.pactRecordComparator,
-				memSegments, ioManager);
+				memSegments, ioManager, true);
 
-		for(int probe = 0; probe < NUM_PROBES; probe++) {
+		for (int probe = 0; probe < NUM_PROBES; probe++) {
 			// create a probe input that gives 10 million pairs with 10 values sharing a key
 			MutableObjectIterator<Record> probeInput = getProbeInput(NUM_KEYS, PROBE_VALS_PER_KEY, REPEATED_VALUE_1, REPEATED_VALUE_2);
 			if(probe == 0) {
@@ -348,9 +348,8 @@ public class NonReusingReOpenableHashTableITCase {
 			Record record;
 			final Record recordReuse = new Record();
 
-			while (join.nextRecord())
-			{
-				int numBuildValues = 0;
+			while (join.nextRecord()) {
+				long numBuildValues = 0;
 
 				final Record probeRec = join.getCurrentProbeRecord();
 				int key = probeRec.getField(0, IntValue.class).getValue();
@@ -370,10 +369,10 @@ public class NonReusingReOpenableHashTableITCase {
 
 				Long contained = map.get(key);
 				if (contained == null) {
-					contained = Long.valueOf(numBuildValues);
+					contained = numBuildValues;
 				}
 				else {
-					contained = Long.valueOf(contained.longValue() + numBuildValues);
+					contained = contained + numBuildValues;
 				}
 
 				map.put(key, contained);
@@ -450,11 +449,12 @@ public class NonReusingReOpenableHashTableITCase {
 		final ReOpenableMutableHashTable<Record, Record> join = new ReOpenableMutableHashTable<Record, Record>(
 				this.recordBuildSideAccesssor, this.recordProbeSideAccesssor,
 				this.recordBuildSideComparator, this.recordProbeSideComparator, this.pactRecordComparator,
-				memSegments, ioManager);
-		for(int probe = 0; probe < NUM_PROBES; probe++) {
+				memSegments, ioManager, true);
+		
+		for (int probe = 0; probe < NUM_PROBES; probe++) {
 			// create a probe input that gives 10 million pairs with 10 values sharing a key
 			MutableObjectIterator<Record> probeInput = getProbeInput(NUM_KEYS, PROBE_VALS_PER_KEY, REPEATED_VALUE_1, REPEATED_VALUE_2);
-			if(probe == 0) {
+			if (probe == 0) {
 				join.open(buildInput, probeInput);
 			} else {
 				join.reopenProbe(probeInput);
@@ -462,9 +462,8 @@ public class NonReusingReOpenableHashTableITCase {
 			Record record;
 			final Record recordReuse = new Record();
 
-			while (join.nextRecord())
-			{
-				int numBuildValues = 0;
+			while (join.nextRecord()) {
+				long numBuildValues = 0;
 
 				final Record probeRec = join.getCurrentProbeRecord();
 				int key = probeRec.getField(0, IntValue.class).getValue();
@@ -484,10 +483,10 @@ public class NonReusingReOpenableHashTableITCase {
 
 				Long contained = map.get(key);
 				if (contained == null) {
-					contained = Long.valueOf(numBuildValues);
+					contained = numBuildValues;
 				}
 				else {
-					contained = Long.valueOf(contained.longValue() + numBuildValues);
+					contained = contained + numBuildValues;
 				}
 
 				map.put(key, contained);

http://git-wip-us.apache.org/repos/asf/flink/blob/0b73b438/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReusingHashMatchIteratorITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReusingHashMatchIteratorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReusingHashMatchIteratorITCase.java
index 18cd8d0..f770ca4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReusingHashMatchIteratorITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReusingHashMatchIteratorITCase.java
@@ -155,7 +155,7 @@ public class ReusingHashMatchIteratorITCase {
 					new ReusingBuildFirstHashMatchIterator<Record, Record, Record>(
 						input1, input2, this.recordSerializer, this.record1Comparator, 
 						this.recordSerializer, this.record2Comparator, this.recordPairComparator,
-						this.memoryManager, ioManager, this.parentTask, 1.0);
+						this.memoryManager, ioManager, this.parentTask, 1.0, true);
 			
 			iterator.open();
 			
@@ -242,7 +242,7 @@ public class ReusingHashMatchIteratorITCase {
 					new ReusingBuildFirstHashMatchIterator<Record, Record, Record>(
 						input1, input2, this.recordSerializer, this.record1Comparator, 
 						this.recordSerializer, this.record2Comparator, this.recordPairComparator,
-						this.memoryManager, ioManager, this.parentTask, 1.0);
+						this.memoryManager, ioManager, this.parentTask, 1.0, true);
 
 			iterator.open();
 			
@@ -291,7 +291,7 @@ public class ReusingHashMatchIteratorITCase {
 				new ReusingBuildSecondHashMatchIterator<Record, Record, Record>(
 					input1, input2, this.recordSerializer, this.record1Comparator, 
 					this.recordSerializer, this.record2Comparator, this.recordPairComparator,
-					this.memoryManager, ioManager, this.parentTask, 1.0);
+					this.memoryManager, ioManager, this.parentTask, 1.0, true);
 
 			iterator.open();
 			
@@ -378,7 +378,7 @@ public class ReusingHashMatchIteratorITCase {
 				new ReusingBuildSecondHashMatchIterator<Record, Record, Record>(
 					input1, input2, this.recordSerializer, this.record1Comparator, 
 					this.recordSerializer, this.record2Comparator, this.recordPairComparator,
-					this.memoryManager, ioManager, this.parentTask, 1.0);
+					this.memoryManager, ioManager, this.parentTask, 1.0, true);
 			
 			iterator.open();
 			
@@ -425,7 +425,7 @@ public class ReusingHashMatchIteratorITCase {
 					new ReusingBuildSecondHashMatchIterator<IntPair, Record, Record>(
 						input1, input2, this.pairSerializer, this.pairComparator,
 						this.recordSerializer, this.record2Comparator, this.pairRecordPairComparator,
-						this.memoryManager, this.ioManager, this.parentTask, 1.0);
+						this.memoryManager, this.ioManager, this.parentTask, 1.0, true);
 			
 			iterator.open();
 			
@@ -472,7 +472,7 @@ public class ReusingHashMatchIteratorITCase {
 					new ReusingBuildFirstHashMatchIterator<IntPair, Record, Record>(
 						input1, input2, this.pairSerializer, this.pairComparator, 
 						this.recordSerializer, this.record2Comparator, this.recordPairPairComparator,
-						this.memoryManager, this.ioManager, this.parentTask, 1.0);
+						this.memoryManager, this.ioManager, this.parentTask, 1.0, true);
 			
 			iterator.open();
 			


Mime
View raw message