flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [1/3] git commit: Add an object-based variant of the solution set
Date Fri, 03 Oct 2014 12:40:36 GMT
Repository: incubator-flink
Updated Branches:
  refs/heads/master b689f3fca -> 02314adca


Add an object-based variant of the solution set


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/da3e507d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/da3e507d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/da3e507d

Branch: refs/heads/master
Commit: da3e507d28a8fe533277cac413aeac060c6447f4
Parents: b689f3f
Author: Stephan Ewen <sewen@apache.org>
Authored: Wed Oct 1 22:17:05 2014 +0200
Committer: Stephan Ewen <sewen@apache.org>
Committed: Fri Oct 3 13:42:05 2014 +0200

----------------------------------------------------------------------
 .../plantranslate/NepheleJobGraphGenerator.java |   2 +
 flink-core/pom.xml                              |   6 ++
 .../operators/base/DeltaIterationBase.java      |  27 ++++-
 .../operators/base/GroupReduceOperatorBase.java |   2 -
 .../api/common/operators/util/JoinHashMap.java  |  97 +++++++++++++++++
 .../common/typeutils/TypePairComparator.java    |   5 +-
 .../api/java/functions/SemanticPropUtil.java    |   6 +-
 .../api/java/operators/DeltaIteration.java      |  28 +++++
 .../api/java/operators/OperatorTranslation.java |   2 +
 .../iterative/concurrent/SolutionSetBroker.java |   7 +-
 ...SolutionSetObjectsUpdateOutputCollector.java |  68 ++++++++++++
 .../task/AbstractIterativePactTask.java         |  26 +++--
 .../iterative/task/IterationHeadPactTask.java   |  61 ++++++++---
 .../CoGroupWithSolutionSetFirstDriver.java      |  92 +++++++++++-----
 .../CoGroupWithSolutionSetSecondDriver.java     | 101 ++++++++++++-----
 .../JoinWithSolutionSetFirstDriver.java         |  57 +++++++---
 .../JoinWithSolutionSetSecondDriver.java        |  61 ++++++++---
 .../runtime/operators/util/TaskConfig.java      |  10 ++
 .../ConnectedComponentsWithObjectMapITCase.java | 108 +++++++++++++++++++
 .../graph/WorksetConnectedComponents.java       |   1 -
 pom.xml                                         |   7 ++
 21 files changed, 661 insertions(+), 113 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/da3e507d/flink-compiler/src/main/java/org/apache/flink/compiler/plantranslate/NepheleJobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/plantranslate/NepheleJobGraphGenerator.java b/flink-compiler/src/main/java/org/apache/flink/compiler/plantranslate/NepheleJobGraphGenerator.java
index da7e0a2..dbce56b 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/plantranslate/NepheleJobGraphGenerator.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/plantranslate/NepheleJobGraphGenerator.java
@@ -961,6 +961,8 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode> {
 			toReturn = headVertex;
 		}
 		
+		headConfig.setSolutionSetUnmanaged(iteration.getIterationNode().getIterationContract().isSolutionSetUnManaged());
+		
 		// create the iteration descriptor and the iteration to it
 		IterationDescriptor descr = this.iterations.get(iteration);
 		if (descr == null) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/da3e507d/flink-core/pom.xml
----------------------------------------------------------------------
diff --git a/flink-core/pom.xml b/flink-core/pom.xml
index ec60080..7d53ff8 100644
--- a/flink-core/pom.xml
+++ b/flink-core/pom.xml
@@ -41,6 +41,12 @@ under the License.
 			<version>${project.version}</version>
 		</dependency>
 		
+		<dependency>
+			<groupId>commons-collections</groupId>
+			<artifactId>commons-collections</artifactId>
+			<!-- managed version -->
+		</dependency>
+		
 		<!--  guava needs to be in "provided" scope, to make sure it is not included into the jars by the shading -->
 		<dependency>
 			<groupId>com.google.guava</groupId>

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/da3e507d/flink-core/src/main/java/org/apache/flink/api/common/operators/base/DeltaIterationBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/DeltaIterationBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/DeltaIterationBase.java
index 9a3c97b..7ac17eb 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/DeltaIterationBase.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/DeltaIterationBase.java
@@ -52,7 +52,7 @@ import org.apache.flink.util.Visitor;
  * workset is considered the second input.
  */
 public class DeltaIterationBase<ST, WT> extends DualInputOperator<ST, WT, ST, AbstractRichFunction> implements IterationOperator {
-
+	
 	private final Operator<ST> solutionSetPlaceholder;
 
 	private final Operator<WT> worksetPlaceholder;
@@ -72,6 +72,8 @@ public class DeltaIterationBase<ST, WT> extends DualInputOperator<ST, WT, ST, Ab
 	private int maxNumberOfIterations = -1;
 
 	private final AggregatorRegistry aggregators = new AggregatorRegistry();
+	
+	private boolean solutionSetUnManaged;
 
 	// --------------------------------------------------------------------------------------------
 
@@ -243,6 +245,29 @@ public class DeltaIterationBase<ST, WT> extends DualInputOperator<ST, WT, ST, Ab
 		throw new UnsupportedOperationException("The DeltaIteration meta operator cannot have broadcast inputs.");
 	}
 	
+	/**
+	 * Sets whether to keep the solution set in managed memory (safe against heap exhaustion) or unmanaged memory
+	 * (objects on heap).
+	 * 
+	 * @param solutionSetUnManaged True to keep the solution set in unmanaged memory, false to keep it in managed memory.
+	 * 
+	 * @see #isSolutionSetUnManaged()
+	 */
+	public void setSolutionSetUnManaged(boolean solutionSetUnManaged) {
+		this.solutionSetUnManaged = solutionSetUnManaged;
+	}
+	
+	/**
+	 * gets whether the solution set is in managed or unmanaged memory.
+	 * 
+	 * @return True, if the solution set is in unmanaged memory (object heap), false if in managed memory.
+	 * 
+	 * @see #setSolutionSetUnManaged(boolean)
+	 */
+	public boolean isSolutionSetUnManaged() {
+		return solutionSetUnManaged;
+	}
+	
 	// --------------------------------------------------------------------------------------------
 	// Place-holder Operators
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/da3e507d/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorBase.java
index 147a510..d039f18 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorBase.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorBase.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.api.common.operators.base;
 
 
@@ -29,7 +28,6 @@ import org.apache.flink.api.common.operators.util.UserCodeClassWrapper;
 import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper;
 import org.apache.flink.api.common.operators.util.UserCodeWrapper;
 
-
 /**
  * @see org.apache.flink.api.common.functions.GroupReduceFunction
  */

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/da3e507d/flink-core/src/main/java/org/apache/flink/api/common/operators/util/JoinHashMap.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/util/JoinHashMap.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/util/JoinHashMap.java
new file mode 100644
index 0000000..ab7596e
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/util/JoinHashMap.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.operators.util;
+
+import org.apache.commons.collections.map.AbstractHashedMap;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+public class JoinHashMap<BT> extends AbstractHashedMap {
+
+	private final TypeSerializer<BT> buildSerializer;
+	
+	private final TypeComparator<BT> buildComparator;
+	
+	
+	public JoinHashMap(TypeSerializer<BT> buildSerializer, TypeComparator<BT> buildComparator) {
+		super(64);
+		this.buildSerializer = buildSerializer;
+		this.buildComparator = buildComparator;
+	}
+	
+	
+	public TypeSerializer<BT> getBuildSerializer() {
+		return buildSerializer;
+	}
+	
+	public TypeComparator<BT> getBuildComparator() {
+		return buildComparator;
+	}
+	
+	public <PT> Prober<PT> createProber(TypeComparator<PT> probeComparator, TypePairComparator<PT, BT> pairComparator) {
+		return new Prober<PT>(probeComparator, pairComparator);
+	}
+
+	@SuppressWarnings("unchecked")
+	public void insertOrReplace(BT record) {
+		int hashCode = hash(buildComparator.hash(record));
+		int index = hashIndex(hashCode, data.length);
+		buildComparator.setReference(record);
+		HashEntry entry = data[index];
+		while (entry != null) {
+			if (entryHashCode(entry) == hashCode && buildComparator.equalToReference((BT) entry.getValue())) {
+				entry.setValue(record);
+				return;
+			}
+			entry = entryNext(entry);
+		}
+		
+		addMapping(index, hashCode, null, record);
+	}
+	
+
+	
+	public class Prober<PT> {
+		
+		public Prober(TypeComparator<PT> probeComparator, TypePairComparator<PT, BT> pairComparator) {
+			this.probeComparator = probeComparator;
+			this.pairComparator = pairComparator;
+		}
+
+		private final TypeComparator<PT> probeComparator;
+		
+		private final TypePairComparator<PT, BT> pairComparator;
+		
+		@SuppressWarnings("unchecked")
+		public BT lookupMatch(PT record) {
+			int hashCode = hash(probeComparator.hash(record));
+			int index = hashIndex(hashCode, data.length);
+			pairComparator.setReference(record);
+			HashEntry entry = data[index];
+			while (entry != null) {
+				if (entryHashCode(entry) == hashCode && pairComparator.equalToReference((BT) entry.getValue())) {
+					return (BT) entry.getValue();
+				}
+				entry = entryNext(entry);
+			}
+			return null;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/da3e507d/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypePairComparator.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypePairComparator.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypePairComparator.java
index d55da72..a11da73 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypePairComparator.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypePairComparator.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.api.common.typeutils;
 
 /**
@@ -32,8 +31,8 @@ package org.apache.flink.api.common.typeutils;
  * @param <T1> The class of the first data type.
  * @param <T2> The class of the second data type. 
  */
-public abstract class TypePairComparator<T1, T2>
-{
+public abstract class TypePairComparator<T1, T2> {
+	
 	/**
 	 * Sets the reference for comparisons.
 	 * 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/da3e507d/flink-java/src/main/java/org/apache/flink/api/java/functions/SemanticPropUtil.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/functions/SemanticPropUtil.java b/flink-java/src/main/java/org/apache/flink/api/java/functions/SemanticPropUtil.java
index 6c6d6b8..7c80bf8 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/functions/SemanticPropUtil.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/functions/SemanticPropUtil.java
@@ -325,9 +325,13 @@ public class SemanticPropUtil {
 
 		while (matcher.find()) {
 			int field = Integer.valueOf(matcher.group());
-			if (!isValidField(outType, field) || !isValidField(inType, field)) {
+			if (!isValidField(outType, field)) {
 				throw new IndexOutOfBoundsException("Annotation: Field " + field + " not available in the output tuple.");
 			}
+			if (!isValidField(inType, field)) {
+				throw new IndexOutOfBoundsException("Annotation: Field " + field + " not available in the input tuple.");
+			}
+			
 			fs = fs.addField(field);
 		}
 		return fs;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/da3e507d/flink-java/src/main/java/org/apache/flink/api/java/operators/DeltaIteration.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/DeltaIteration.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/DeltaIteration.java
index dcc05b7..aec4950 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/DeltaIteration.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/DeltaIteration.java
@@ -56,6 +56,9 @@ public class DeltaIteration<ST, WT> {
 	
 	private int parallelism = -1;
 	
+	private boolean solutionSetUnManaged;
+	
+	
 	public DeltaIteration(ExecutionEnvironment context, TypeInformation<ST> type, DataSet<ST> solutionSet, DataSet<WT> workset, Keys<ST> keys, int maxIterations) {
 		initialSolutionSet = solutionSet;
 		initialWorkset = workset;
@@ -211,6 +214,31 @@ public class DeltaIteration<ST, WT> {
 	}
 	
 	/**
+	 * Sets whether to keep the solution set in managed memory (safe against heap exhaustion) or unmanaged memory
+	 * (objects on heap).
+	 * 
+	 * @param solutionSetUnManaged True to keep the solution set in unmanaged memory, false to keep it in managed memory.
+	 * 
+	 * @see #isSolutionSetUnManaged()
+	 */
+	public void setSolutionSetUnManaged(boolean solutionSetUnManaged) {
+		this.solutionSetUnManaged = solutionSetUnManaged;
+	}
+	
+	/**
+	 * gets whether the solution set is in managed or unmanaged memory.
+	 * 
+	 * @return True, if the solution set is in unmanaged memory (object heap), false if in managed memory.
+	 * 
+	 * @see #setSolutionSetUnManaged(boolean)
+	 */
+	public boolean isSolutionSetUnManaged() {
+		return solutionSetUnManaged;
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	
+	/**
 	 * A {@link DataSet} that acts as a placeholder for the solution set during the iteration.
 	 * 
 	 * @param <ST> The type of the elements in the solution set.

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/da3e507d/flink-java/src/main/java/org/apache/flink/api/java/operators/OperatorTranslation.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/OperatorTranslation.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/OperatorTranslation.java
index eb17134..262a2cb 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/OperatorTranslation.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/OperatorTranslation.java
@@ -226,6 +226,8 @@ public class OperatorTranslation {
 		// register all aggregators
 		iterationOperator.getAggregators().addAll(iterationHead.getAggregators());
 		
+		iterationOperator.setSolutionSetUnManaged(iterationHead.isSolutionSetUnManaged());
+		
 		return iterationOperator;
 	}
 	

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/da3e507d/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/SolutionSetBroker.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/SolutionSetBroker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/SolutionSetBroker.java
index 9ac6516..3d9d0ea 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/SolutionSetBroker.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/SolutionSetBroker.java
@@ -16,15 +16,12 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.iterative.concurrent;
 
-import org.apache.flink.runtime.operators.hash.CompactingHashTable;
-
 /**
  * Used to hand over the hash-join from the iteration head to the solution-set match.
  */
-public class SolutionSetBroker extends Broker<CompactingHashTable<?>> {
+public class SolutionSetBroker extends Broker<Object> {
 
 	/**
 	 * Singleton instance
@@ -34,7 +31,7 @@ public class SolutionSetBroker extends Broker<CompactingHashTable<?>> {
 	/**
 	 * Retrieve the singleton instance.
 	 */
-	public static Broker<CompactingHashTable<?>> instance() {
+	public static Broker<Object> instance() {
 		return INSTANCE;
 	}
 	

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/da3e507d/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/SolutionSetObjectsUpdateOutputCollector.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/SolutionSetObjectsUpdateOutputCollector.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/SolutionSetObjectsUpdateOutputCollector.java
new file mode 100644
index 0000000..502dfc9
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/SolutionSetObjectsUpdateOutputCollector.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.iterative.io;
+
+import org.apache.flink.api.common.operators.util.JoinHashMap;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.util.Collector;
+
+/**
+ * A {@link Collector} to update the solution set of a workset iteration.
+ * <p/>
+ * The records are written to a HashTable hash table to allow in-memory point updates.
+ * <p/>
+ * Records will only be collected, if there is a match after probing the hash table. If the build side iterator is
+ * already positioned for the update, use {@link SolutionSetFastUpdateOutputCollector} to the save re-probing.
+ * 
+ * @see SolutionSetFastUpdateOutputCollector
+ */
+public class SolutionSetObjectsUpdateOutputCollector<T> implements Collector<T> {
+
+	private final Collector<T> delegate;
+
+	private final JoinHashMap<T> hashMap;
+	
+	private final TypeSerializer<T> serializer;
+	
+	public SolutionSetObjectsUpdateOutputCollector(JoinHashMap<T> hashMap) {
+		this(hashMap, null);
+	}
+
+	public SolutionSetObjectsUpdateOutputCollector(JoinHashMap<T> hashMap, Collector<T> delegate) {
+		this.delegate = delegate;
+		this.hashMap = hashMap;
+		this.serializer = hashMap.getBuildSerializer();
+	}
+
+	@Override
+	public void collect(T record) {
+		T copy = serializer.copy(record);
+		hashMap.insertOrReplace(copy);
+		if (delegate != null) {
+			delegate.collect(record);
+		}
+	}
+
+	@Override
+	public void close() {
+		if (delegate != null) {
+			delegate.close();
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/da3e507d/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativePactTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativePactTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativePactTask.java
index 980d33e..6039654 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativePactTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativePactTask.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.iterative.task;
 
 import org.slf4j.Logger;
@@ -25,6 +24,7 @@ import org.apache.flink.api.common.aggregators.Aggregator;
 import org.apache.flink.api.common.aggregators.LongSumAggregator;
 import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.api.common.functions.IterationRuntimeContext;
+import org.apache.flink.api.common.operators.util.JoinHashMap;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
 import org.apache.flink.core.memory.DataOutputView;
@@ -36,6 +36,7 @@ import org.apache.flink.runtime.iterative.concurrent.Broker;
 import org.apache.flink.runtime.iterative.concurrent.IterationAggregatorBroker;
 import org.apache.flink.runtime.iterative.concurrent.SolutionSetBroker;
 import org.apache.flink.runtime.iterative.convergence.WorksetEmptyConvergenceCriterion;
+import org.apache.flink.runtime.iterative.io.SolutionSetObjectsUpdateOutputCollector;
 import org.apache.flink.runtime.iterative.io.SolutionSetUpdateOutputCollector;
 import org.apache.flink.runtime.iterative.io.WorksetUpdateOutputCollector;
 import org.apache.flink.runtime.operators.PactDriver;
@@ -311,19 +312,22 @@ public abstract class AbstractIterativePactTask<S extends Function, OT> extends
 	 * {@link SolutionSetUpdateOutputCollector}
 	 */
 	protected Collector<OT> createSolutionSetUpdateOutputCollector(Collector<OT> delegate) {
-		Broker<CompactingHashTable<?>> solutionSetBroker = SolutionSetBroker.instance();
-
-		/*if (config.getIsSolutionSetUpdateWithoutReprobe()) {
-			@SuppressWarnings("unchecked")
-			MutableHashTable<OT, ?> solutionSet = (MutableHashTable<OT, ?>) solutionSetBroker.get(brokerKey());
-
-			return new SolutionSetFastUpdateOutputCollector<OT>(solutionSet, delegate);
-		} else {*/
+		Broker<Object> solutionSetBroker = SolutionSetBroker.instance();
+		
+		Object ss = solutionSetBroker.get(brokerKey());
+		if (ss instanceof CompactingHashTable) {
 			@SuppressWarnings("unchecked")
-			CompactingHashTable<OT> solutionSet = (CompactingHashTable<OT>) solutionSetBroker.get(brokerKey());
+			CompactingHashTable<OT> solutionSet = (CompactingHashTable<OT>) ss;
 			TypeSerializer<OT> serializer = getOutputSerializer();
 			return new SolutionSetUpdateOutputCollector<OT>(solutionSet, serializer, delegate);
-		//}
+		}
+		else if (ss instanceof JoinHashMap) {
+			@SuppressWarnings("unchecked")
+			JoinHashMap<OT> map = (JoinHashMap<OT>) ss;
+			return new SolutionSetObjectsUpdateOutputCollector<OT>(map, delegate);
+		} else {
+			throw new RuntimeException("Unrecognized solution set handle: " + ss);
+		}
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/da3e507d/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadPactTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadPactTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadPactTask.java
index ea7f3fc..cca4559 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadPactTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadPactTask.java
@@ -24,7 +24,9 @@ import java.util.List;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.operators.util.JoinHashMap;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypeComparatorFactory;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -193,10 +195,30 @@ public class IterationHeadPactTask<X, Y, S extends Function, OT> extends Abstrac
 		}
 	}
 	
+	private <BT> JoinHashMap<BT> initJoinHashMap() {
+		TypeSerializerFactory<BT> solutionTypeSerializerFactory = config.getSolutionSetSerializer(userCodeClassLoader);
+		TypeComparatorFactory<BT> solutionTypeComparatorFactory = config.getSolutionSetComparator(userCodeClassLoader);
+	
+		TypeSerializer<BT> solutionTypeSerializer = solutionTypeSerializerFactory.getSerializer();
+		TypeComparator<BT> solutionTypeComparator = solutionTypeComparatorFactory.createComparator();
+		
+		JoinHashMap<BT> map = new JoinHashMap<BT>(solutionTypeSerializer, solutionTypeComparator);
+		return map;
+	}
+	
 	private void readInitialSolutionSet(CompactingHashTable<X> solutionSet, MutableObjectIterator<X> solutionSetInput) throws IOException {
 		solutionSet.open();
 		solutionSet.buildTable(solutionSetInput);
 	}
+	
+	private void readInitialSolutionSet(JoinHashMap<X> solutionSet, MutableObjectIterator<X> solutionSetInput) throws IOException {
+		TypeSerializer<X> serializer = solutionTypeSerializer.getSerializer();
+		
+		X next;
+		while ((next = solutionSetInput.next(serializer.createInstance())) != null) {
+			solutionSet.insertOrReplace(next);
+		}
+	}
 
 	private SuperstepBarrier initSuperstepBarrier() {
 		SuperstepBarrier barrier = new SuperstepBarrier(userCodeClassLoader);
@@ -212,9 +234,11 @@ public class IterationHeadPactTask<X, Y, S extends Function, OT> extends Abstrac
 
 		final String brokerKey = brokerKey();
 		final int workerIndex = getEnvironment().getIndexInSubtaskGroup();
+		
+		final boolean objectSolutionSet = config.isSolutionSetUnmanaged();
 
-		//MutableHashTable<X, ?> solutionSet = null; // if workset iteration
 		CompactingHashTable<X> solutionSet = null; // if workset iteration
+		JoinHashMap<X> solutionSetObjectMap = null; // if workset iteration with unmanaged solution set
 		
 		boolean waitForSolutionSetUpdate = config.getWaitForSolutionSetUpdate();
 		boolean isWorksetIteration = config.getIsWorksetIteration();
@@ -238,21 +262,24 @@ public class IterationHeadPactTask<X, Y, S extends Function, OT> extends Abstrac
 				solutionTypeSerializer = solutionTypeSerializerFactory;
 
 				// setup the index for the solution set
-				//solutionSet = initHashTable();
-				solutionSet = initCompactingHashTable();
-
-				// read the initial solution set
 				@SuppressWarnings("unchecked")
 				MutableObjectIterator<X> solutionSetInput = (MutableObjectIterator<X>) createInputIterator(inputReaders[initialSolutionSetInput], solutionTypeSerializer);
-				readInitialSolutionSet(solutionSet, solutionSetInput);
-
-				SolutionSetBroker.instance().handIn(brokerKey, solutionSet);
+				
+				// read the initial solution set
+				if (objectSolutionSet) {
+					solutionSetObjectMap = initJoinHashMap();
+					readInitialSolutionSet(solutionSetObjectMap, solutionSetInput);
+					SolutionSetBroker.instance().handIn(brokerKey, solutionSetObjectMap);
+				} else {
+					solutionSet = initCompactingHashTable();
+					readInitialSolutionSet(solutionSet, solutionSetInput);
+					SolutionSetBroker.instance().handIn(brokerKey, solutionSet);
+				}
 
 				if (waitForSolutionSetUpdate) {
 					solutionSetUpdateBarrier = new SolutionSetUpdateBarrier();
 					SolutionSetUpdateBarrierBroker.instance().handIn(brokerKey, solutionSetUpdateBarrier);
 				}
-
 			} else {
 				// bulk iteration case
 				initialSolutionSetInput = -1;
@@ -337,7 +364,11 @@ public class IterationHeadPactTask<X, Y, S extends Function, OT> extends Abstrac
 			}
 
 			if (isWorksetIteration) {
-				streamSolutionSetToFinalOutput(solutionSet);
+				if (objectSolutionSet) {
+					streamSolutionSetToFinalOutput(solutionSetObjectMap);
+				} else {
+					streamSolutionSetToFinalOutput(solutionSet);
+				}
 			} else {
 				streamOutFinalOutputBulk(new InputViewIterator<X>(superstepResult, this.solutionTypeSerializer.getSerializer()));
 			}
@@ -380,6 +411,14 @@ public class IterationHeadPactTask<X, Y, S extends Function, OT> extends Abstrac
 			output.collect(record);
 		}
 	}
+	
+	@SuppressWarnings("unchecked")
+	private void streamSolutionSetToFinalOutput(JoinHashMap<X> soluionSet) throws IOException {
+		final Collector<X> output = this.finalOutputCollector;
+		for (Object e : soluionSet.values()) {
+			output.collect((X) e);
+		}
+	}
 
 	private void feedBackSuperstepResult(DataInputView superstepResult) {
 		this.inputs[this.feedbackDataInput] =
@@ -400,8 +439,6 @@ public class IterationHeadPactTask<X, Y, S extends Function, OT> extends Abstrac
 		if (log.isInfoEnabled()) {
 			log.info(formatLogString("sending " + WorkerDoneEvent.class.getSimpleName() + " to sync"));
 		}
-
 		this.toSync.broadcastEvent(event);
 	}
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/da3e507d/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupWithSolutionSetFirstDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupWithSolutionSetFirstDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupWithSolutionSetFirstDriver.java
index bc1a4bf..5e0ca6d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupWithSolutionSetFirstDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupWithSolutionSetFirstDriver.java
@@ -21,13 +21,16 @@ package org.apache.flink.runtime.operators;
 import java.util.Collections;
 
 import org.apache.flink.api.common.functions.CoGroupFunction;
+import org.apache.flink.api.common.operators.util.JoinHashMap;
 import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeComparatorFactory;
 import org.apache.flink.api.common.typeutils.TypePairComparator;
 import org.apache.flink.api.common.typeutils.TypePairComparatorFactory;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.runtime.iterative.concurrent.SolutionSetBroker;
 import org.apache.flink.runtime.iterative.task.AbstractIterativePactTask;
 import org.apache.flink.runtime.operators.hash.CompactingHashTable;
+import org.apache.flink.runtime.operators.util.TaskConfig;
 import org.apache.flink.runtime.util.KeyGroupedIterator;
 import org.apache.flink.runtime.util.SingleElementIterator;
 import org.apache.flink.util.Collector;
@@ -38,6 +41,8 @@ public class CoGroupWithSolutionSetFirstDriver<IT1, IT2, OT> implements Resettab
 	
 	private CompactingHashTable<IT1> hashTable;
 	
+	private JoinHashMap<IT1> objectMap;
+	
 	private TypeSerializer<IT2> probeSideSerializer;
 	
 	private TypeComparator<IT2> probeSideComparator;
@@ -87,28 +92,48 @@ public class CoGroupWithSolutionSetFirstDriver<IT1, IT2, OT> implements Resettab
 	
 	// --------------------------------------------------------------------------------------------
 
-	@SuppressWarnings("unchecked")
 	@Override
+	@SuppressWarnings("unchecked")
 	public void initialize() {
+		
+		final TypeSerializer<IT1> solutionSetSerializer;
+		final TypeComparator<IT1> solutionSetComparator;
+		
 		// grab a handle to the hash table from the iteration broker
 		if (taskContext instanceof AbstractIterativePactTask) {
 			AbstractIterativePactTask<?, ?> iterativeTaskContext = (AbstractIterativePactTask<?, ?>) taskContext;
 			String identifier = iterativeTaskContext.brokerKey();
-			this.hashTable = (CompactingHashTable<IT1>) SolutionSetBroker.instance().get(identifier);
+			
+			Object table = SolutionSetBroker.instance().get(identifier);
+			if (table instanceof CompactingHashTable) {
+				this.hashTable = (CompactingHashTable<IT1>) table;
+				solutionSetSerializer = this.hashTable.getBuildSideSerializer();
+				solutionSetComparator = this.hashTable.getBuildSideComparator().duplicate();
+			}
+			else if (table instanceof JoinHashMap) {
+				this.objectMap = (JoinHashMap<IT1>) table;
+				solutionSetSerializer = this.objectMap.getBuildSerializer();
+				solutionSetComparator = this.objectMap.getBuildComparator().duplicate();
+			}
+			else {
+				throw new RuntimeException("Unrecognized solution set index: " + table);
+			}
 		} else {
 			throw new RuntimeException("The task context of this driver is no iterative task context.");
 		}
 		
-		TypeSerializer<IT1> buildSideSerializer = hashTable.getBuildSideSerializer();
-		TypeComparator<IT1> buildSideComparator = hashTable.getBuildSideComparator().duplicate();
+		TaskConfig config = taskContext.getTaskConfig();
+		ClassLoader classLoader = taskContext.getUserCodeClassLoader();
+		
+		TypeComparatorFactory<IT2> probeSideComparatorFactory = config.getDriverComparator(0, classLoader);
 		
-		probeSideSerializer = taskContext.<IT2>getInputSerializer(0).getSerializer();
-		probeSideComparator = taskContext.getDriverComparator(0);
+		this.probeSideSerializer = taskContext.<IT2>getInputSerializer(0).getSerializer();
+		this.probeSideComparator = probeSideComparatorFactory.createComparator();
 		
-		solutionSideRecord = buildSideSerializer.createInstance();
+		solutionSideRecord = solutionSetSerializer.createInstance();
 		
-		TypePairComparatorFactory<IT1, IT2> pairCompFactory = taskContext.getTaskConfig().getPairComparatorFactory(taskContext.getUserCodeClassLoader());
-		pairComparator = pairCompFactory.createComparator21(buildSideComparator, probeSideComparator);
+		TypePairComparatorFactory<IT1, IT2> factory = taskContext.getTaskConfig().getPairComparatorFactory(taskContext.getUserCodeClassLoader());
+		pairComparator = factory.createComparator21(solutionSetComparator, this.probeSideComparator);
 	}
 
 	@Override
@@ -124,26 +149,45 @@ public class CoGroupWithSolutionSetFirstDriver<IT1, IT2, OT> implements Resettab
 		final CoGroupFunction<IT1, IT2, OT> coGroupStub = taskContext.getStub();
 		final Collector<OT> collector = taskContext.getOutputCollector();
 		
-		IT1 buildSideRecord = solutionSideRecord;
-			
-		final CompactingHashTable<IT1> join = hashTable;
-		
 		final KeyGroupedIterator<IT2> probeSideInput = new KeyGroupedIterator<IT2>(taskContext.<IT2>getInput(0), probeSideSerializer, probeSideComparator);
 		final SingleElementIterator<IT1> siIter = new SingleElementIterator<IT1>();
 		final Iterable<IT1> emptySolutionSide = Collections.emptySet();
 		
-		final CompactingHashTable<IT1>.HashTableProber<IT2> prober = join.getProber(this.probeSideComparator, this.pairComparator);
-		
-		while (this.running && probeSideInput.nextKey()) {
-			IT2 current = probeSideInput.getCurrent();
-
-			buildSideRecord = prober.getMatchFor(current, buildSideRecord);
-			if (buildSideRecord != null) {
-				siIter.set(buildSideRecord);
-				coGroupStub.coGroup(siIter, probeSideInput.getValues(), collector);
+		if (this.hashTable != null) {
+			final CompactingHashTable<IT1> join = hashTable;
+			final CompactingHashTable<IT1>.HashTableProber<IT2> prober = join.getProber(this.probeSideComparator, this.pairComparator);
+			
+			IT1 buildSideRecord = solutionSideRecord;
+			
+			while (this.running && probeSideInput.nextKey()) {
+				IT2 current = probeSideInput.getCurrent();
+	
+				buildSideRecord = prober.getMatchFor(current, buildSideRecord);
+				if (buildSideRecord != null) {
+					siIter.set(buildSideRecord);
+					coGroupStub.coGroup(siIter, probeSideInput.getValues(), collector);
+				}
+				else {
+					coGroupStub.coGroup(emptySolutionSide, probeSideInput.getValues(), collector);
+				}
 			}
-			else {
-				coGroupStub.coGroup(emptySolutionSide, probeSideInput.getValues(), collector);
+		}
+		else {
+			final JoinHashMap<IT1> join = this.objectMap;
+			final JoinHashMap<IT1>.Prober<IT2> prober = join.createProber(this.probeSideComparator, this.pairComparator);
+			final TypeSerializer<IT1> serializer = join.getBuildSerializer();
+			
+			while (this.running && probeSideInput.nextKey()) {
+				IT2 current = probeSideInput.getCurrent();
+	
+				IT1 buildSideRecord = prober.lookupMatch(current);
+				if (buildSideRecord != null) {
+					siIter.set(serializer.copy(buildSideRecord));
+					coGroupStub.coGroup(siIter, probeSideInput.getValues(), collector);
+				}
+				else {
+					coGroupStub.coGroup(emptySolutionSide, probeSideInput.getValues(), collector);
+				}
 			}
 		}
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/da3e507d/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupWithSolutionSetSecondDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupWithSolutionSetSecondDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupWithSolutionSetSecondDriver.java
index 3359bb2..fb88505 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupWithSolutionSetSecondDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupWithSolutionSetSecondDriver.java
@@ -19,13 +19,16 @@
 package org.apache.flink.runtime.operators;
 
 import org.apache.flink.api.common.functions.CoGroupFunction;
+import org.apache.flink.api.common.operators.util.JoinHashMap;
 import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeComparatorFactory;
 import org.apache.flink.api.common.typeutils.TypePairComparator;
 import org.apache.flink.api.common.typeutils.TypePairComparatorFactory;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.runtime.iterative.concurrent.SolutionSetBroker;
 import org.apache.flink.runtime.iterative.task.AbstractIterativePactTask;
 import org.apache.flink.runtime.operators.hash.CompactingHashTable;
+import org.apache.flink.runtime.operators.util.TaskConfig;
 import org.apache.flink.runtime.util.EmptyIterator;
 import org.apache.flink.runtime.util.KeyGroupedIterator;
 import org.apache.flink.runtime.util.SingleElementIterator;
@@ -37,6 +40,8 @@ public class CoGroupWithSolutionSetSecondDriver<IT1, IT2, OT> implements Resetta
 	
 	private CompactingHashTable<IT2> hashTable;
 	
+	private JoinHashMap<IT2> objectMap;
+	
 	private TypeSerializer<IT1> probeSideSerializer;
 	
 	private TypeComparator<IT1> probeSideComparator;
@@ -86,28 +91,49 @@ public class CoGroupWithSolutionSetSecondDriver<IT1, IT2, OT> implements Resetta
 	
 	// --------------------------------------------------------------------------------------------
 
-	@SuppressWarnings("unchecked")
 	@Override
-	public void initialize() {
+	@SuppressWarnings("unchecked")
+	public void initialize() throws Exception {
+		
+		final TypeSerializer<IT2> solutionSetSerializer;
+		final TypeComparator<IT2> solutionSetComparator;
+		
 		// grab a handle to the hash table from the iteration broker
 		if (taskContext instanceof AbstractIterativePactTask) {
 			AbstractIterativePactTask<?, ?> iterativeTaskContext = (AbstractIterativePactTask<?, ?>) taskContext;
 			String identifier = iterativeTaskContext.brokerKey();
-			this.hashTable = (CompactingHashTable<IT2>) SolutionSetBroker.instance().get(identifier);
-		} else {
-			throw new RuntimeException("The task context of this driver is no iterative task context.");
+			Object table = SolutionSetBroker.instance().get(identifier);
+			
+			if (table instanceof CompactingHashTable) {
+				this.hashTable = (CompactingHashTable<IT2>) table;
+				solutionSetSerializer = this.hashTable.getBuildSideSerializer();
+				solutionSetComparator = this.hashTable.getBuildSideComparator().duplicate();
+			}
+			else if (table instanceof JoinHashMap) {
+				this.objectMap = (JoinHashMap<IT2>) table;
+				solutionSetSerializer = this.objectMap.getBuildSerializer();
+				solutionSetComparator = this.objectMap.getBuildComparator().duplicate();
+			}
+			else {
+				throw new RuntimeException("Unrecognized solution set index: " + table);
+			}
 		}
+		else {
+			throw new Exception("The task context of this driver is no iterative task context.");
+		}
+		
+		TaskConfig config = taskContext.getTaskConfig();
+		ClassLoader classLoader = taskContext.getUserCodeClassLoader();
 		
-		TypeSerializer<IT2> buildSideSerializer = hashTable.getBuildSideSerializer();
-		TypeComparator<IT2> buildSideComparator = hashTable.getBuildSideComparator().duplicate();
+		TypeComparatorFactory<IT1> probeSideComparatorFactory = config.getDriverComparator(0, classLoader); 
 		
-		probeSideSerializer = taskContext.<IT1>getInputSerializer(0).getSerializer();
-		probeSideComparator = taskContext.getDriverComparator(0);
+		this.probeSideSerializer = taskContext.<IT1>getInputSerializer(0).getSerializer();
+		this.probeSideComparator = probeSideComparatorFactory.createComparator();
 		
-		solutionSideRecord = buildSideSerializer.createInstance();
+		solutionSideRecord = solutionSetSerializer.createInstance();
 		
-		TypePairComparatorFactory<IT1, IT2> pairCompFactory = taskContext.getTaskConfig().getPairComparatorFactory(taskContext.getUserCodeClassLoader());
-		pairComparator = pairCompFactory.createComparator12(probeSideComparator, buildSideComparator);
+		TypePairComparatorFactory<IT1, IT2> factory = taskContext.getTaskConfig().getPairComparatorFactory(taskContext.getUserCodeClassLoader());
+		pairComparator = factory.createComparator12(this.probeSideComparator, solutionSetComparator);
 	}
 
 	@Override
@@ -122,27 +148,46 @@ public class CoGroupWithSolutionSetSecondDriver<IT1, IT2, OT> implements Resetta
 
 		final CoGroupFunction<IT1, IT2, OT> coGroupStub = taskContext.getStub();
 		final Collector<OT> collector = taskContext.getOutputCollector();
-		
-		IT2 buildSideRecord = solutionSideRecord;
-			
-		final CompactingHashTable<IT2> join = hashTable;
-		
+
 		final KeyGroupedIterator<IT1> probeSideInput = new KeyGroupedIterator<IT1>(taskContext.<IT1>getInput(0), probeSideSerializer, probeSideComparator);
 		final SingleElementIterator<IT2> siIter = new SingleElementIterator<IT2>();
 		final Iterable<IT2> emptySolutionSide = EmptyIterator.<IT2>get();
 		
-		final CompactingHashTable<IT2>.HashTableProber<IT1> prober = join.getProber(this.probeSideComparator, this.pairComparator);
-		
-		while (this.running && probeSideInput.nextKey()) {
-			IT1 current = probeSideInput.getCurrent();
-
-			buildSideRecord = prober.getMatchFor(current, buildSideRecord);
-			if (buildSideRecord != null) {
-				siIter.set(buildSideRecord);
-				coGroupStub.coGroup(probeSideInput.getValues(), siIter, collector);
+		if (this.hashTable != null) {
+			final CompactingHashTable<IT2> join = hashTable;
+			final CompactingHashTable<IT2>.HashTableProber<IT1> prober = join.getProber(this.probeSideComparator, this.pairComparator);
+			
+			IT2 buildSideRecord = solutionSideRecord;
+			
+			while (this.running && probeSideInput.nextKey()) {
+				IT1 current = probeSideInput.getCurrent();
+	
+				buildSideRecord = prober.getMatchFor(current, buildSideRecord);
+				if (buildSideRecord != null) {
+					siIter.set(buildSideRecord);
+					coGroupStub.coGroup(probeSideInput.getValues(), siIter, collector);
+				}
+				else {
+					coGroupStub.coGroup(probeSideInput.getValues(), emptySolutionSide, collector);
+				}
 			}
-			else {
-				coGroupStub.coGroup(probeSideInput.getValues(), emptySolutionSide, collector);
+		}
+		else {
+			final JoinHashMap<IT2> join = this.objectMap;
+			final JoinHashMap<IT2>.Prober<IT1> prober = join.createProber(this.probeSideComparator, this.pairComparator);
+			final TypeSerializer<IT2> serializer = join.getBuildSerializer();
+			
+			while (this.running && probeSideInput.nextKey()) {
+				IT1 current = probeSideInput.getCurrent();
+	
+				IT2 buildSideRecord = prober.lookupMatch(current);
+				if (buildSideRecord != null) {
+					siIter.set(serializer.copy(buildSideRecord));
+					coGroupStub.coGroup(probeSideInput.getValues(), siIter, collector);
+				}
+				else {
+					coGroupStub.coGroup(probeSideInput.getValues(), emptySolutionSide, collector);
+				}
 			}
 		}
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/da3e507d/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinWithSolutionSetFirstDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinWithSolutionSetFirstDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinWithSolutionSetFirstDriver.java
index 8740ab7..4f9f1ba 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinWithSolutionSetFirstDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinWithSolutionSetFirstDriver.java
@@ -16,10 +16,10 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.operators;
 
 import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.common.operators.util.JoinHashMap;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypeComparatorFactory;
 import org.apache.flink.api.common.typeutils.TypePairComparator;
@@ -38,6 +38,8 @@ public class JoinWithSolutionSetFirstDriver<IT1, IT2, OT> implements ResettableP
 	
 	private CompactingHashTable<IT1> hashTable;
 	
+	private JoinHashMap<IT1> objectMap;
+	
 	private TypeComparator<IT2> probeSideComparator;
 	
 	private TypePairComparator<IT2, IT1> pairComparator;
@@ -90,11 +92,28 @@ public class JoinWithSolutionSetFirstDriver<IT1, IT2, OT> implements ResettableP
 	@SuppressWarnings("unchecked")
 	public void initialize() {
 		
+		final TypeSerializer<IT1> solutionSetSerializer;
+		final TypeComparator<IT1> solutionSetComparator;
+		
 		// grab a handle to the hash table from the iteration broker
 		if (taskContext instanceof AbstractIterativePactTask) {
 			AbstractIterativePactTask<?, ?> iterativeTaskContext = (AbstractIterativePactTask<?, ?>) taskContext;
 			String identifier = iterativeTaskContext.brokerKey();
-			this.hashTable = (CompactingHashTable<IT1>) SolutionSetBroker.instance().get(identifier);
+			
+			Object table = SolutionSetBroker.instance().get(identifier);
+			if (table instanceof CompactingHashTable) {
+				this.hashTable = (CompactingHashTable<IT1>) table;
+				solutionSetSerializer = this.hashTable.getBuildSideSerializer();
+				solutionSetComparator = this.hashTable.getBuildSideComparator().duplicate();
+			}
+			else if (table instanceof JoinHashMap) {
+				this.objectMap = (JoinHashMap<IT1>) table;
+				solutionSetSerializer = this.objectMap.getBuildSerializer();
+				solutionSetComparator = this.objectMap.getBuildComparator().duplicate();
+			}
+			else {
+				throw new RuntimeException("Unrecognized solution set index: " + table);
+			}
 		} else {
 			throw new RuntimeException("The task context of this driver is no iterative task context.");
 		}
@@ -102,11 +121,9 @@ public class JoinWithSolutionSetFirstDriver<IT1, IT2, OT> implements ResettableP
 		TaskConfig config = taskContext.getTaskConfig();
 		ClassLoader classLoader = taskContext.getUserCodeClassLoader();
 		
-		TypeSerializer<IT1> solutionSetSerializer = this.hashTable.getBuildSideSerializer();
 		TypeSerializer<IT2> probeSideSerializer = taskContext.<IT2>getInputSerializer(0).getSerializer();
 		
 		TypeComparatorFactory<IT2> probeSideComparatorFactory = config.getDriverComparator(0, classLoader);
-		TypeComparator<IT1> solutionSetComparator = this.hashTable.getBuildSideComparator().duplicate();
 		this.probeSideComparator = probeSideComparatorFactory.createComparator();
 		
 		solutionSideRecord = solutionSetSerializer.createInstance();
@@ -125,20 +142,36 @@ public class JoinWithSolutionSetFirstDriver<IT1, IT2, OT> implements ResettableP
 
 	@Override
 	public void run() throws Exception {
-
 		final FlatJoinFunction<IT1, IT2, OT> joinFunction = taskContext.getStub();
 		final Collector<OT> collector = taskContext.getOutputCollector();
+		final MutableObjectIterator<IT2> probeSideInput = taskContext.<IT2>getInput(0);
+		
 		
-		IT1 buildSideRecord = this.solutionSideRecord;
 		IT2 probeSideRecord = this.probeSideRecord;
+		
+		if (hashTable != null) {
+			final CompactingHashTable<IT1> join = hashTable;
+			final CompactingHashTable<IT1>.HashTableProber<IT2> prober = join.getProber(probeSideComparator, pairComparator);
 			
-		final CompactingHashTable<IT1> join = hashTable;
-		final MutableObjectIterator<IT2> probeSideInput = taskContext.<IT2>getInput(0);
+			IT1 buildSideRecord = this.solutionSideRecord;
 			
-		final CompactingHashTable<IT1>.HashTableProber<IT2> prober = join.getProber(probeSideComparator, pairComparator);
-		while (this.running && ((probeSideRecord = probeSideInput.next(probeSideRecord)) != null)) {
-			buildSideRecord = prober.getMatchFor(probeSideRecord, buildSideRecord);
-			joinFunction.join(buildSideRecord, probeSideRecord, collector);
+			while (this.running && ((probeSideRecord = probeSideInput.next(probeSideRecord)) != null)) {
+				buildSideRecord = prober.getMatchFor(probeSideRecord, buildSideRecord);
+				joinFunction.join(buildSideRecord, probeSideRecord, collector);
+			}
+		}
+		else if (objectMap != null) {
+			final JoinHashMap<IT1> hashTable = this.objectMap;
+			final JoinHashMap<IT1>.Prober<IT2> prober = this.objectMap.createProber(probeSideComparator, pairComparator);
+			final TypeSerializer<IT1> buildSerializer = hashTable.getBuildSerializer();
+			
+			while (this.running && ((probeSideRecord = probeSideInput.next(probeSideRecord)) != null)) {
+				IT1 match = prober.lookupMatch(probeSideRecord);
+				joinFunction.join(buildSerializer.copy(match), probeSideRecord, collector);
+			}
+		}
+		else {
+			throw new RuntimeException();
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/da3e507d/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinWithSolutionSetSecondDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinWithSolutionSetSecondDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinWithSolutionSetSecondDriver.java
index f762602..7866cab 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinWithSolutionSetSecondDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinWithSolutionSetSecondDriver.java
@@ -16,10 +16,10 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.operators;
 
 import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.common.operators.util.JoinHashMap;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypeComparatorFactory;
 import org.apache.flink.api.common.typeutils.TypePairComparator;
@@ -38,6 +38,8 @@ public class JoinWithSolutionSetSecondDriver<IT1, IT2, OT> implements Resettable
 	
 	private CompactingHashTable<IT2> hashTable;
 	
+	private JoinHashMap<IT2> objectMap;
+	
 	private TypeComparator<IT1> probeSideComparator;
 	
 	private TypePairComparator<IT1, IT2> pairComparator;
@@ -90,23 +92,40 @@ public class JoinWithSolutionSetSecondDriver<IT1, IT2, OT> implements Resettable
 	@SuppressWarnings("unchecked")
 	public void initialize() throws Exception {
 		
+		final TypeSerializer<IT2> solutionSetSerializer;
+		final TypeComparator<IT2> solutionSetComparator;
+		
 		// grab a handle to the hash table from the iteration broker
 		if (taskContext instanceof AbstractIterativePactTask) {
 			AbstractIterativePactTask<?, ?> iterativeTaskContext = (AbstractIterativePactTask<?, ?>) taskContext;
 			String identifier = iterativeTaskContext.brokerKey();
-			this.hashTable = (CompactingHashTable<IT2>) SolutionSetBroker.instance().get(identifier);
-		} else {
+			Object table = SolutionSetBroker.instance().get(identifier);
+			
+			if (table instanceof CompactingHashTable) {
+				this.hashTable = (CompactingHashTable<IT2>) table;
+				solutionSetSerializer = this.hashTable.getBuildSideSerializer();
+				solutionSetComparator = this.hashTable.getBuildSideComparator().duplicate();
+			}
+			else if (table instanceof JoinHashMap) {
+				this.objectMap = (JoinHashMap<IT2>) table;
+				solutionSetSerializer = this.objectMap.getBuildSerializer();
+				solutionSetComparator = this.objectMap.getBuildComparator().duplicate();
+			}
+			else {
+				throw new RuntimeException("Unrecognized solution set index: " + table);
+			}
+		}
+		else {
 			throw new Exception("The task context of this driver is no iterative task context.");
 		}
 		
 		TaskConfig config = taskContext.getTaskConfig();
 		ClassLoader classLoader = taskContext.getUserCodeClassLoader();
 		
-		TypeSerializer<IT2> solutionSetSerializer = this.hashTable.getBuildSideSerializer();
 		TypeSerializer<IT1> probeSideSerializer = taskContext.<IT1>getInputSerializer(0).getSerializer();
 		
-		TypeComparatorFactory<IT1> probeSideComparatorFactory = config.getDriverComparator(0, classLoader);
-		TypeComparator<IT2> solutionSetComparator = this.hashTable.getBuildSideComparator().duplicate();
+		TypeComparatorFactory<IT1> probeSideComparatorFactory = config.getDriverComparator(0, classLoader); 
+		
 		this.probeSideComparator = probeSideComparatorFactory.createComparator();
 		
 		solutionSideRecord = solutionSetSerializer.createInstance();
@@ -128,17 +147,33 @@ public class JoinWithSolutionSetSecondDriver<IT1, IT2, OT> implements Resettable
 
 		final FlatJoinFunction<IT1, IT2, OT> joinFunction = taskContext.getStub();
 		final Collector<OT> collector = taskContext.getOutputCollector();
+		final MutableObjectIterator<IT1> probeSideInput = taskContext.getInput(0);
 		
-		IT2 buildSideRecord = this.solutionSideRecord;
 		IT1 probeSideRecord = this.probeSideRecord;
+		
+		if (hashTable != null) {
+			final CompactingHashTable<IT2> join = hashTable;
+			final CompactingHashTable<IT2>.HashTableProber<IT1> prober = join.getProber(probeSideComparator, pairComparator);
 			
-		final CompactingHashTable<IT2> join = hashTable;
-		final MutableObjectIterator<IT1> probeSideInput = taskContext.getInput(0);
+			IT2 buildSideRecord = this.solutionSideRecord;
+		
+			while (this.running && ((probeSideRecord = probeSideInput.next(probeSideRecord)) != null)) {
+				buildSideRecord = prober.getMatchFor(probeSideRecord, buildSideRecord);
+				joinFunction.join(probeSideRecord, buildSideRecord, collector);
+			}
+		}
+		else if (objectMap != null) {
+			final JoinHashMap<IT2> hashTable = this.objectMap;
+			final JoinHashMap<IT2>.Prober<IT1> prober = this.objectMap.createProber(probeSideComparator, pairComparator);
+			final TypeSerializer<IT2> buildSerializer = hashTable.getBuildSerializer();
 			
-		final CompactingHashTable<IT2>.HashTableProber<IT1> prober = join.getProber(probeSideComparator, pairComparator);
-		while (this.running && ((probeSideRecord = probeSideInput.next(probeSideRecord)) != null)) {
-			buildSideRecord = prober.getMatchFor(probeSideRecord, buildSideRecord);
-			joinFunction.join(probeSideRecord, buildSideRecord, collector);
+			while (this.running && ((probeSideRecord = probeSideInput.next(probeSideRecord)) != null)) {
+				IT2 match = prober.lookupMatch(probeSideRecord);
+				joinFunction.join(probeSideRecord, buildSerializer.copy(match), collector);
+			}
+		}
+		else {
+			throw new RuntimeException();
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/da3e507d/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/TaskConfig.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/TaskConfig.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/TaskConfig.java
index 5f4afe2..003b872 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/TaskConfig.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/TaskConfig.java
@@ -214,6 +214,8 @@ public class TaskConfig {
 	private static final String ITERATION_SOLUTION_SET_UPDATE_WAIT = "iterative.ss-wait";
 
 	private static final String ITERATION_WORKSET_UPDATE = "iterative.ws-update";
+	
+	private static final String SOLUTION_SET_OBJECTS = "itertive.ss.obj";
 
 	// ---------------------------------- Miscellaneous -------------------------------------------
 	
@@ -1113,6 +1115,14 @@ public class TaskConfig {
 		return factory;
 	}
 	
+	public void setSolutionSetUnmanaged(boolean unmanaged) {
+		config.setBoolean(SOLUTION_SET_OBJECTS, unmanaged);
+	}
+	
+	public boolean isSolutionSetUnmanaged() {
+		return config.getBoolean(SOLUTION_SET_OBJECTS, false);
+	}
+	
 	// --------------------------------------------------------------------------------------------
 	//                          Utility class for nested Configurations
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/da3e507d/flink-tests/src/test/java/org/apache/flink/test/iterative/ConnectedComponentsWithObjectMapITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/ConnectedComponentsWithObjectMapITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/ConnectedComponentsWithObjectMapITCase.java
new file mode 100644
index 0000000..9116db8
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/ConnectedComponentsWithObjectMapITCase.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.iterative;
+
+import java.io.BufferedReader;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.aggregation.Aggregations;
+import org.apache.flink.api.java.operators.DeltaIteration;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.examples.java.graph.ConnectedComponents.ComponentIdFilter;
+import org.apache.flink.examples.java.graph.ConnectedComponents.NeighborWithComponentIDJoin;
+import org.apache.flink.examples.java.graph.ConnectedComponents.UndirectEdge;
+import org.apache.flink.test.testdata.ConnectedComponentsData;
+import org.apache.flink.test.util.JavaProgramTestBase;
+
+@SuppressWarnings("serial")
+public class ConnectedComponentsWithObjectMapITCase extends JavaProgramTestBase {
+	
+	private static final long SEED = 0xBADC0FFEEBEEFL;
+	
+	private static final int NUM_VERTICES = 1000;
+	
+	private static final int NUM_EDGES = 10000;
+
+	
+	protected String verticesPath;
+	protected String edgesPath;
+	protected String resultPath;
+
+	
+	
+	@Override
+	protected void preSubmit() throws Exception {
+		verticesPath = createTempFile("vertices.txt", ConnectedComponentsData.getEnumeratingVertices(NUM_VERTICES));
+		edgesPath = createTempFile("edges.txt", ConnectedComponentsData.getRandomOddEvenEdges(NUM_EDGES, NUM_VERTICES, SEED));
+		resultPath = getTempFilePath("results");
+	}
+
+	@Override
+	protected void postSubmit() throws Exception {
+		for (BufferedReader reader : getResultReader(resultPath)) {
+			ConnectedComponentsData.checkOddEvenResult(reader);
+		}
+	}
+
+	
+	@Override
+	protected void testProgram() throws Exception {
+		// set up execution environment
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+				
+		// read vertex and edge data
+		DataSet<Tuple1<Long>> vertices = env.readCsvFile(verticesPath).types(Long.class);
+		
+		DataSet<Tuple2<Long, Long>> edges = env.readCsvFile(edgesPath).fieldDelimiter(' ').types(Long.class, Long.class)
+												.flatMap(new UndirectEdge());
+				
+		// assign the initial components (equal to the vertex id)
+		DataSet<Tuple2<Long, Long>> verticesWithInitialId = vertices.map(new DuplicateValue<Long>());
+						
+		// open a delta iteration
+		DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> iteration =
+				verticesWithInitialId.iterateDelta(verticesWithInitialId, 100, 0);
+		iteration.setSolutionSetUnManaged(true);
+				
+		// apply the step logic: join with the edges, select the minimum neighbor, update if the component of the candidate is smaller
+		DataSet<Tuple2<Long, Long>> changes = iteration.getWorkset().join(edges).where(0).equalTo(0).with(new NeighborWithComponentIDJoin())
+				.groupBy(0).aggregate(Aggregations.MIN, 1)
+				.join(iteration.getSolutionSet()).where(0).equalTo(0)
+				.with(new ComponentIdFilter());
+
+		// close the delta iteration (delta and new workset are identical)
+		DataSet<Tuple2<Long, Long>> result = iteration.closeWith(changes, changes);
+				
+		result.writeAsCsv(resultPath, "\n", " ");
+		
+		// execute program
+		env.execute("Connected Components Example");
+	}
+	
+	public static final class DuplicateValue<T> implements MapFunction<Tuple1<T>, Tuple2<T, T>> {
+		
+		@Override
+		public Tuple2<T, T> map(Tuple1<T> vertex) {
+			return new Tuple2<T, T>(vertex.f0, vertex.f0);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/da3e507d/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/WorksetConnectedComponents.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/WorksetConnectedComponents.java b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/WorksetConnectedComponents.java
index f94700d..2f6f740 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/WorksetConnectedComponents.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/WorksetConnectedComponents.java
@@ -189,7 +189,6 @@ public class WorksetConnectedComponents implements Program, ProgramDescription {
 			.field(LongValue.class, 0)
 			.field(LongValue.class, 1);
 
-		// return the PACT plan
 		Plan plan = new Plan(result, "Workset Connected Components");
 		plan.setDefaultParallelism(numSubTasks);
 		return plan;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/da3e507d/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 3077778..45443d0 100644
--- a/pom.xml
+++ b/pom.xml
@@ -192,6 +192,13 @@ under the License.
 				<version>1.2</version>
 			</dependency>
 			
+			<!-- common-collections is used by us and by hadoop, so we need to define a common version -->
+			<dependency>
+				<groupId>commons-collections</groupId>
+				<artifactId>commons-collections</artifactId>
+				<version>3.2.1</version>
+			</dependency>
+			
 			<!-- Managed dependency required for HBase in flink-hbase  -->
 			<dependency>
 				<groupId>org.javassist</groupId>


Mime
View raw message