flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [04/13] flink git commit: [FLINK-1285] Make execution mode configurable
Date Thu, 08 Jan 2015 10:58:59 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/b7b32a05/flink-runtime/src/test/java/org/apache/flink/runtime/util/KeyGroupedIteratorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/KeyGroupedIteratorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/KeyGroupedIteratorTest.java
deleted file mode 100644
index 39ff077..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/util/KeyGroupedIteratorTest.java
+++ /dev/null
@@ -1,401 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.util;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.NoSuchElementException;
-
-import org.apache.flink.api.common.typeutils.record.RecordComparator;
-import org.apache.flink.api.common.typeutils.record.RecordSerializer;
-import org.apache.flink.runtime.util.KeyGroupedIterator;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.Record;
-import org.apache.flink.types.StringValue;
-import org.apache.flink.util.MutableObjectIterator;
-import org.apache.flink.util.TraversableOnceException;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-/**
- * Test for the key grouped iterator, which advances in windows containing the same key and provides a sub-iterator
- * over the records with the same key.
- */
-public class KeyGroupedIteratorTest {
-	
-	private MutableObjectIterator<Record> sourceIter;		// the iterator that provides the input
-	
-	private KeyGroupedIterator<Record> psi;						// the grouping iterator, progressing in key steps
-	
-	@Before
-	public void setup() {
-		final ArrayList<IntStringPair> source = new ArrayList<IntStringPair>();
-		
-		// add elements to the source
-		source.add(new IntStringPair(new IntValue(1), new StringValue("A")));
-		source.add(new IntStringPair(new IntValue(2), new StringValue("B")));
-		source.add(new IntStringPair(new IntValue(3), new StringValue("C")));
-		source.add(new IntStringPair(new IntValue(3), new StringValue("D")));
-		source.add(new IntStringPair(new IntValue(4), new StringValue("E")));
-		source.add(new IntStringPair(new IntValue(4), new StringValue("F")));
-		source.add(new IntStringPair(new IntValue(4), new StringValue("G")));
-		source.add(new IntStringPair(new IntValue(5), new StringValue("H")));
-		source.add(new IntStringPair(new IntValue(5), new StringValue("I")));
-		source.add(new IntStringPair(new IntValue(5), new StringValue("J")));
-		source.add(new IntStringPair(new IntValue(5), new StringValue("K")));
-		source.add(new IntStringPair(new IntValue(5), new StringValue("L")));
-		
-		
-		this.sourceIter = new MutableObjectIterator<Record>() {
-			final Iterator<IntStringPair> it = source.iterator();
-			
-			@Override
-			public Record next(Record reuse) throws IOException {
-				if (it.hasNext()) {
-					IntStringPair pair = it.next();
-					reuse.setField(0, pair.getInteger());
-					reuse.setField(1, pair.getString());
-					return reuse;
-				}
-				else {
-					return null;
-				}
-			}
-
-			@Override
-			public Record next() throws IOException {
-				if (it.hasNext()) {
-					IntStringPair pair = it.next();
-					Record result = new Record(2);
-					result.setField(0, pair.getInteger());
-					result.setField(1, pair.getString());
-					return result;
-				}
-				else {
-					return null;
-				}
-			}
-
-		};
-		
-		final RecordSerializer serializer = RecordSerializer.get();
-		@SuppressWarnings("unchecked")
-		final RecordComparator comparator = new RecordComparator(new int[] {0}, new Class[] {IntValue.class});
-		
-		this.psi = new KeyGroupedIterator<Record>(this.sourceIter, serializer, comparator);
-	}
-
-	@Test
-	public void testNextKeyOnly() throws Exception {
-		try {
-			Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
-			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(1))));
-			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 1, this.psi.getCurrent().getField(0, IntValue.class).getValue());
-			
-			Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
-			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(2))));
-			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 2, this.psi.getCurrent().getField(0, IntValue.class).getValue());
-			
-			Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
-			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(3))));
-			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 3, this.psi.getCurrent().getField(0, IntValue.class).getValue());
-			
-			Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
-			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(4))));
-			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 4, this.psi.getCurrent().getField(0, IntValue.class).getValue());
-			
-			Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
-			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(5))));
-			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 5, this.psi.getCurrent().getField(0, IntValue.class).getValue());
-			
-			Assert.assertFalse("KeyGroupedIterator must not have another key.", this.psi.nextKey());
-			Assert.assertNull("KeyGroupedIterator must not have another value.", this.psi.getValues());
-			
-			Assert.assertFalse("KeyGroupedIterator must not have another key.", this.psi.nextKey());
-			Assert.assertFalse("KeyGroupedIterator must not have another key.", this.psi.nextKey());
-		} catch (Exception e) {
-			e.printStackTrace();
-			Assert.fail("The test encountered an unexpected exception.");
-		}
-	}
-	
-	@Test
-	public void testFullIterationThroughAllValues() throws IOException
-	{
-		try {
-			// Key 1, Value A
-			Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
-			Assert.assertTrue(hasIterator(this.psi.getValues()));
-			Assert.assertFalse(hasIterator(this.psi.getValues()));
-			Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
-			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(1))));
-			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 1, this.psi.getCurrent().getField(0, IntValue.class).getValue());
-			Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("A"), this.psi.getValues().next().getField(1, StringValue.class));
-			Assert.assertFalse("KeyGroupedIterator must not have another value.", this.psi.getValues().hasNext());
-			
-			// Key 2, Value B
-			Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
-			Assert.assertTrue(hasIterator(this.psi.getValues()));
-			Assert.assertFalse(hasIterator(this.psi.getValues()));
-			Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
-			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(2))));
-			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 2, this.psi.getCurrent().getField(0, IntValue.class).getValue());
-			Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("B"), this.psi.getValues().next().getField(1, StringValue.class));
-			Assert.assertFalse("KeyGroupedIterator must not have another value.", this.psi.getValues().hasNext());
-			
-			// Key 3, Values C, D
-			Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
-			Assert.assertTrue(hasIterator(this.psi.getValues()));
-			Assert.assertFalse(hasIterator(this.psi.getValues()));
-			Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
-			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(3))));
-			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 3, this.psi.getCurrent().getField(0, IntValue.class).getValue());
-			Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("C"), this.psi.getValues().next().getField(1, StringValue.class));
-			Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
-			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(3))));
-			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 3, this.psi.getCurrent().getField(0, IntValue.class).getValue());
-			Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("D"), this.psi.getValues().next().getField(1, StringValue.class));
-			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(3))));
-			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 3, this.psi.getCurrent().getField(0, IntValue.class).getValue());
-			try {
-				this.psi.getValues().next();
-				Assert.fail("A new KeyGroupedIterator must not have any value available and hence throw an exception on next().");
-			}
-			catch (NoSuchElementException nseex) {}
-			Assert.assertFalse("KeyGroupedIterator must not have another value.", this.psi.getValues().hasNext());
-			try {
-				this.psi.getValues().next();
-				Assert.fail("A new KeyGroupedIterator must not have any value available and hence throw an exception on next().");
-			}
-			catch (NoSuchElementException nseex) {}
-			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(3))));
-			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 3, this.psi.getCurrent().getField(0, IntValue.class).getValue());
-			
-			// Key 4, Values E, F, G
-			Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
-			Assert.assertTrue(hasIterator(this.psi.getValues()));
-			Assert.assertFalse(hasIterator(this.psi.getValues()));
-			Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
-			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(4))));
-			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 4, this.psi.getCurrent().getField(0, IntValue.class).getValue());
-			Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("E"), this.psi.getValues().next().getField(1, StringValue.class));
-			Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
-			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(4))));
-			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 4, this.psi.getCurrent().getField(0, IntValue.class).getValue());
-			Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("F"), this.psi.getValues().next().getField(1, StringValue.class));
-			Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
-			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(4))));
-			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 4, this.psi.getCurrent().getField(0, IntValue.class).getValue());
-			Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("G"), this.psi.getValues().next().getField(1, StringValue.class));
-			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(4))));
-			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 4, this.psi.getCurrent().getField(0, IntValue.class).getValue());
-			Assert.assertFalse("KeyGroupedIterator must not have another value.", this.psi.getValues().hasNext());
-			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(4))));
-			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 4, this.psi.getCurrent().getField(0, IntValue.class).getValue());
-			
-			// Key 5, Values H, I, J, K, L
-			Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
-			Assert.assertTrue(hasIterator(this.psi.getValues()));
-			Assert.assertFalse(hasIterator(this.psi.getValues()));
-			Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
-			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(5))));
-			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 5, this.psi.getCurrent().getField(0, IntValue.class).getValue());
-			Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("H"), this.psi.getValues().next().getField(1, StringValue.class));
-			Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
-			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(5))));
-			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 5, this.psi.getCurrent().getField(0, IntValue.class).getValue());
-			Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("I"), this.psi.getValues().next().getField(1, StringValue.class));
-			Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
-			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(5))));
-			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 5, this.psi.getCurrent().getField(0, IntValue.class).getValue());
-			Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("J"), this.psi.getValues().next().getField(1, StringValue.class));
-			Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
-			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(5))));
-			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 5, this.psi.getCurrent().getField(0, IntValue.class).getValue());
-			Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("K"), this.psi.getValues().next().getField(1, StringValue.class));
-			Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
-			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(5))));
-			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 5, this.psi.getCurrent().getField(0, IntValue.class).getValue());
-			Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("L"), this.psi.getValues().next().getField(1, StringValue.class));
-			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(5))));
-			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 5, this.psi.getCurrent().getField(0, IntValue.class).getValue());
-			try {
-				this.psi.getValues().next();
-				Assert.fail("A new KeyGroupedIterator must not have any value available and hence throw an exception on next().");
-			}
-			catch (NoSuchElementException nseex) {}
-			Assert.assertFalse("KeyGroupedIterator must not have another value.", this.psi.getValues().hasNext());
-			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(5))));
-			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 5, this.psi.getCurrent().getField(0, IntValue.class).getValue());
-			try {
-				this.psi.getValues().next();
-				Assert.fail("A new KeyGroupedIterator must not have any value available and hence throw an exception on next().");
-			}
-			catch (NoSuchElementException nseex) {}
-			
-			Assert.assertFalse("KeyGroupedIterator must not have another key.", this.psi.nextKey());
-			Assert.assertFalse("KeyGroupedIterator must not have another key.", this.psi.nextKey());
-			Assert.assertNull(this.psi.getValues());
-		} catch (Exception e) {
-			e.printStackTrace();
-			Assert.fail("The test encountered an unexpected exception.");
-		}
-	}
-	
-	@Test
-	public void testMixedProgress() throws Exception
-	{
-		try {
-			// Progression only via nextKey() and hasNext() - Key 1, Value A
-			Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
-			Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
-			Assert.assertTrue(hasIterator(this.psi.getValues()));
-			Assert.assertFalse(hasIterator(this.psi.getValues()));
-			
-			// Progression only through nextKey() - Key 2, Value B
-			Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
-			Assert.assertTrue(hasIterator(this.psi.getValues()));
-			Assert.assertFalse(hasIterator(this.psi.getValues()));
-			
-			// Progression first though haNext() and next(), then through hasNext() - Key 3, Values C, D
-			Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
-			Assert.assertTrue(hasIterator(this.psi.getValues()));
-			Assert.assertFalse(hasIterator(this.psi.getValues()));
-			Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
-			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(3))));
-			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 3, this.psi.getCurrent().getField(0, IntValue.class).getValue());
-			Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("C"), this.psi.getValues().next().getField(1, StringValue.class));
-			Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
-			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(3))));
-			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 3, this.psi.getCurrent().getField(0, IntValue.class).getValue());
-			
-			// Progression first via next() only, then hasNext() only Key 4, Values E, F, G
-			Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
-			Assert.assertTrue(hasIterator(this.psi.getValues()));
-			Assert.assertFalse(hasIterator(this.psi.getValues()));
-			Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("E"), this.psi.getValues().next().getField(1, StringValue.class));
-			Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
-			
-			// Key 5, Values H, I, J, K, L
-			Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
-			Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("H"), this.psi.getValues().next().getField(1, StringValue.class));
-			Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
-			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(5))));
-			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 5, this.psi.getCurrent().getField(0, IntValue.class).getValue());
-			Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("I"), this.psi.getValues().next().getField(1, StringValue.class));
-			Assert.assertTrue(hasIterator(this.psi.getValues()));
-			Assert.assertFalse(hasIterator(this.psi.getValues()));
-			Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
-			
-			// end
-			Assert.assertFalse("KeyGroupedIterator must not have another key.", this.psi.nextKey());
-			Assert.assertFalse("KeyGroupedIterator must not have another key.", this.psi.nextKey());
-		} catch (Exception e) {
-			e.printStackTrace();
-			Assert.fail("The test encountered an unexpected exception.");
-		}
-	}
-	
-	@Test
-	public void testHasNextDoesNotOverweiteCurrentRecord() throws Exception
-	{
-		try {
-			Iterator<Record> valsIter = null;
-			Record rec = null;
-			
-			Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
-			valsIter = this.psi.getValues();
-			Assert.assertNotNull("Returned Iterator must not be null", valsIter);
-			Assert.assertTrue("KeyGroupedIterator's value iterator must have another value.", valsIter.hasNext());
-			rec = valsIter.next();
-			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 1, rec.getField(0, IntValue.class).getValue());
-			Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("A"), rec.getField(1, StringValue.class));
-			Assert.assertFalse("KeyGroupedIterator must have another value.", valsIter.hasNext());
-			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 1, rec.getField(0, IntValue.class).getValue());
-			Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("A"), rec.getField(1, StringValue.class));			
-			Assert.assertFalse("KeyGroupedIterator's value iterator must not have another value.", valsIter.hasNext());
-			
-			Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
-			valsIter = this.psi.getValues();
-			Assert.assertNotNull("Returned Iterator must not be null", valsIter);
-			Assert.assertTrue("KeyGroupedIterator's value iterator must have another value.", valsIter.hasNext());
-			rec = valsIter.next();
-			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 2, rec.getField(0, IntValue.class).getValue());
-			Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("B"), rec.getField(1, StringValue.class));
-			Assert.assertFalse("KeyGroupedIterator must have another value.", valsIter.hasNext());
-			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 2, rec.getField(0, IntValue.class).getValue());
-			Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("B"), rec.getField(1, StringValue.class));
-			Assert.assertFalse("KeyGroupedIterator's value iterator must not have another value.", valsIter.hasNext());
-			
-			Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
-			valsIter = this.psi.getValues();
-			Assert.assertNotNull("Returned Iterator must not be null", valsIter);
-			Assert.assertTrue("KeyGroupedIterator's value iterator must have another value.", valsIter.hasNext());
-			rec = valsIter.next();
-			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 3, rec.getField(0, IntValue.class).getValue());
-			Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("C"), rec.getField(1, StringValue.class));
-			Assert.assertTrue("KeyGroupedIterator's value iterator must have another value.", valsIter.hasNext());
-			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 3, rec.getField(0, IntValue.class).getValue());
-			Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("C"), rec.getField(1, StringValue.class));
-			rec = valsIter.next();
-			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 3, rec.getField(0, IntValue.class).getValue());
-			Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("D"), rec.getField(1, StringValue.class));
-			Assert.assertFalse("KeyGroupedIterator's value iterator must have another value.", valsIter.hasNext());
-			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 3, rec.getField(0, IntValue.class).getValue());
-			Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("D"), rec.getField(1, StringValue.class));
-			Assert.assertFalse("KeyGroupedIterator's value iterator must have another value.", valsIter.hasNext());
-			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 3, rec.getField(0, IntValue.class).getValue());
-			Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("D"), rec.getField(1, StringValue.class));
-		} catch (Exception e) {
-			e.printStackTrace();
-			Assert.fail("The test encountered an unexpected exception.");
-		}
-	}
-	
-	private static final class IntStringPair
-	{
-		private final IntValue integer;
-		private final StringValue string;
-
-		IntStringPair(IntValue integer, StringValue string) {
-			this.integer = integer;
-			this.string = string;
-		}
-
-		public IntValue getInteger() {
-			return integer;
-		}
-
-		public StringValue getString() {
-			return string;
-		}
-	}
-	
-	public boolean hasIterator(Iterable<?> iterable) {
-		try {
-			iterable.iterator();
-			return true;
-		}
-		catch (TraversableOnceException e) {
-			return false;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/b7b32a05/flink-runtime/src/test/java/org/apache/flink/runtime/util/NonReusingKeyGroupedIteratorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/NonReusingKeyGroupedIteratorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/NonReusingKeyGroupedIteratorTest.java
new file mode 100644
index 0000000..9f651b1
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/NonReusingKeyGroupedIteratorTest.java
@@ -0,0 +1,371 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+import org.apache.flink.api.common.typeutils.record.RecordComparator;
+import org.apache.flink.api.common.typeutils.record.RecordSerializer;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.Record;
+import org.apache.flink.types.StringValue;
+import org.apache.flink.util.MutableObjectIterator;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Test for the safe key grouped iterator, which advances in windows containing the same key and provides a sub-iterator
+ * over the records with the same key.
+ */
+public class NonReusingKeyGroupedIteratorTest {
+	
+	private MutableObjectIterator<Record> sourceIter;		// the iterator that provides the input
+	
+	private NonReusingKeyGroupedIterator<Record> psi;					// the grouping iterator, progressing in key steps
+	
+	@Before
+	public void setup()
+	{
+		final ArrayList<IntStringPair> source = new ArrayList<IntStringPair>();
+		
+		// add elements to the source
+		source.add(new IntStringPair(new IntValue(1), new StringValue("A")));
+		source.add(new IntStringPair(new IntValue(2), new StringValue("B")));
+		source.add(new IntStringPair(new IntValue(3), new StringValue("C")));
+		source.add(new IntStringPair(new IntValue(3), new StringValue("D")));
+		source.add(new IntStringPair(new IntValue(4), new StringValue("E")));
+		source.add(new IntStringPair(new IntValue(4), new StringValue("F")));
+		source.add(new IntStringPair(new IntValue(4), new StringValue("G")));
+		source.add(new IntStringPair(new IntValue(5), new StringValue("H")));
+		source.add(new IntStringPair(new IntValue(5), new StringValue("I")));
+		source.add(new IntStringPair(new IntValue(5), new StringValue("J")));
+		source.add(new IntStringPair(new IntValue(5), new StringValue("K")));
+		source.add(new IntStringPair(new IntValue(5), new StringValue("L")));
+		
+		
+		this.sourceIter = new MutableObjectIterator<Record>() {
+			final Iterator<IntStringPair> it = source.iterator();
+			
+			@Override
+			public Record next(Record reuse) throws IOException {
+				if (it.hasNext()) {
+					IntStringPair pair = it.next();
+					reuse.setField(0, pair.getInteger());
+					reuse.setField(1, pair.getString());
+					return reuse;
+				}
+				else {
+					return null;
+				}
+			}
+
+			@Override
+			public Record next() throws IOException {
+				if (it.hasNext()) {
+					IntStringPair pair = it.next();
+					Record result = new Record(2);
+					result.setField(0, pair.getInteger());
+					result.setField(1, pair.getString());
+					return result;
+				}
+				else {
+					return null;
+				}
+			}
+		};
+		
+		final RecordSerializer serializer = RecordSerializer.get();
+		@SuppressWarnings("unchecked")
+		final RecordComparator comparator = new RecordComparator(new int[] {0}, new Class[] {IntValue.class});
+		
+		this.psi = new NonReusingKeyGroupedIterator<Record>(this.sourceIter, serializer, comparator);
+	}
+
+	@Test
+	public void testNextKeyOnly() throws Exception
+	{
+		try {
+			Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
+			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(1))));
+			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 1, this.psi.getCurrent().getField(0, IntValue.class).getValue());
+			
+			Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
+			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(2))));
+			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 2, this.psi.getCurrent().getField(0, IntValue.class).getValue());
+			
+			Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
+			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(3))));
+			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 3, this.psi.getCurrent().getField(0, IntValue.class).getValue());
+			
+			Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
+			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(4))));
+			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 4, this.psi.getCurrent().getField(0, IntValue.class).getValue());
+			
+			Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
+			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(5))));
+			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 5, this.psi.getCurrent().getField(0, IntValue.class).getValue());
+			
+			Assert.assertFalse("KeyGroupedIterator must not have another key.", this.psi.nextKey());
+			Assert.assertNull("KeyGroupedIterator must not have another value.", this.psi.getValues());
+			
+			Assert.assertFalse("KeyGroupedIterator must not have another key.", this.psi.nextKey());
+			Assert.assertFalse("KeyGroupedIterator must not have another key.", this.psi.nextKey());
+		} catch (Exception e) {
+			e.printStackTrace();
+			Assert.fail("The test encountered an unexpected exception.");
+		}
+	}
+	
+	@Test
+	public void testFullIterationThroughAllValues() throws IOException
+	{
+		try {
+			// Key 1, Value A
+			Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
+			Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
+			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(1))));
+			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 1, this.psi.getCurrent().getField(0, IntValue.class).getValue());
+			Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("A"), this.psi.getValues().next().getField(1, StringValue.class));
+			Assert.assertFalse("KeyGroupedIterator must not have another value.", this.psi.getValues().hasNext());
+			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 1, this.psi.getCurrent().getField(0, IntValue.class).getValue());
+			
+			// Key 2, Value B
+			Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
+			Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
+			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(2))));
+			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 2, this.psi.getCurrent().getField(0, IntValue.class).getValue());
+			Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("B"), this.psi.getValues().next().getField(1, StringValue.class));
+			Assert.assertFalse("KeyGroupedIterator must not have another value.", this.psi.getValues().hasNext());
+			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 2, this.psi.getCurrent().getField(0, IntValue.class).getValue());
+			
+			// Key 3, Values C, D
+			Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
+			Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
+			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(3))));
+			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 3, this.psi.getCurrent().getField(0, IntValue.class).getValue());
+			Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("C"), this.psi.getValues().next().getField(1, StringValue.class));
+			Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
+			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(3))));
+			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 3, this.psi.getCurrent().getField(0, IntValue.class).getValue());
+			Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("D"), this.psi.getValues().next().getField(1, StringValue.class));
+			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(3))));
+			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 3, this.psi.getCurrent().getField(0, IntValue.class).getValue());
+			try {
+				this.psi.getValues().next();
+				Assert.fail("A new KeyGroupedIterator must not have any value available and hence throw an exception on next().");
+			}
+			catch (NoSuchElementException nseex) {}
+			Assert.assertFalse("KeyGroupedIterator must not have another value.", this.psi.getValues().hasNext());
+			try {
+				this.psi.getValues().next();
+				Assert.fail("A new KeyGroupedIterator must not have any value available and hence throw an exception on next().");
+			}
+			catch (NoSuchElementException nseex) {}
+			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(3))));
+			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 3, this.psi.getCurrent().getField(0, IntValue.class).getValue());
+			
+			// Key 4, Values E, F, G
+			Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
+			Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
+			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(4))));
+			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 4, this.psi.getCurrent().getField(0, IntValue.class).getValue());
+			Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("E"), this.psi.getValues().next().getField(1, StringValue.class));
+			Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
+			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(4))));
+			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 4, this.psi.getCurrent().getField(0, IntValue.class).getValue());
+			Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("F"), this.psi.getValues().next().getField(1, StringValue.class));
+			Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
+			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(4))));
+			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 4, this.psi.getCurrent().getField(0, IntValue.class).getValue());
+			Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("G"), this.psi.getValues().next().getField(1, StringValue.class));
+			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(4))));
+			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 4, this.psi.getCurrent().getField(0, IntValue.class).getValue());
+			Assert.assertFalse("KeyGroupedIterator must not have another value.", this.psi.getValues().hasNext());
+			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(4))));
+			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 4, this.psi.getCurrent().getField(0, IntValue.class).getValue());
+			
+			// Key 5, Values H, I, J, K, L
+			Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
+			Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
+			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(5))));
+			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 5, this.psi.getCurrent().getField(0, IntValue.class).getValue());
+			Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("H"), this.psi.getValues().next().getField(1, StringValue.class));
+			Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
+			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(5))));
+			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 5, this.psi.getCurrent().getField(0, IntValue.class).getValue());
+			Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("I"), this.psi.getValues().next().getField(1, StringValue.class));
+			Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
+			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(5))));
+			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 5, this.psi.getCurrent().getField(0, IntValue.class).getValue());
+			Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("J"), this.psi.getValues().next().getField(1, StringValue.class));
+			Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
+			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(5))));
+			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 5, this.psi.getCurrent().getField(0, IntValue.class).getValue());
+			Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("K"), this.psi.getValues().next().getField(1, StringValue.class));
+			Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
+			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(5))));
+			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 5, this.psi.getCurrent().getField(0, IntValue.class).getValue());
+			Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("L"), this.psi.getValues().next().getField(1, StringValue.class));
+			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(5))));
+			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 5, this.psi.getCurrent().getField(0, IntValue.class).getValue());
+			try {
+				this.psi.getValues().next();
+				Assert.fail("A new KeyGroupedIterator must not have any value available and hence throw an exception on next().");
+			}
+			catch (NoSuchElementException nseex) {}
+			Assert.assertFalse("KeyGroupedIterator must not have another value.", this.psi.getValues().hasNext());
+			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(5))));
+			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 5, this.psi.getCurrent().getField(0, IntValue.class).getValue());
+			try {
+				this.psi.getValues().next();
+				Assert.fail("A new KeyGroupedIterator must not have any value available and hence throw an exception on next().");
+			}
+			catch (NoSuchElementException nseex) {}
+			
+			Assert.assertFalse("KeyGroupedIterator must not have another key.", this.psi.nextKey());
+			Assert.assertFalse("KeyGroupedIterator must not have another key.", this.psi.nextKey());
+		} catch (Exception e) {
+			e.printStackTrace();
+			Assert.fail("The test encountered an unexpected exception.");
+		}
+	}
+	
+	@Test
+	public void testMixedProgress() throws Exception
+	{
+		try {
+			// Progression only via nextKey() and hasNext() - Key 1, Value A
+			Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
+			Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
+			
+			// Progression only through nextKey() - Key 2, Value B
+			Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
+			
+			// Progression first though haNext() and next(), then through hasNext() - Key 3, Values C, D
+			Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
+			Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
+			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(3))));
+			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 3, this.psi.getCurrent().getField(0, IntValue.class).getValue());
+			Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("C"), this.psi.getValues().next().getField(1, StringValue.class));
+			Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
+			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(3))));
+			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 3, this.psi.getCurrent().getField(0, IntValue.class).getValue());
+			
+			// Progression first via next() only, then hasNext() only Key 4, Values E, F, G
+			Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
+			Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("E"), this.psi.getValues().next().getField(1, StringValue.class));
+			Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
+			
+			// Key 5, Values H, I, J, K, L
+			Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
+			Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("H"), this.psi.getValues().next().getField(1, StringValue.class));
+			Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
+			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(5))));
+			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 5, this.psi.getCurrent().getField(0, IntValue.class).getValue());
+			Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("I"), this.psi.getValues().next().getField(1, StringValue.class));
+			Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
+			
+			// end
+			Assert.assertFalse("KeyGroupedIterator must not have another key.", this.psi.nextKey());
+			Assert.assertFalse("KeyGroupedIterator must not have another key.", this.psi.nextKey());
+		} catch (Exception e) {
+			e.printStackTrace();
+			Assert.fail("The test encountered an unexpected exception.");
+		}
+	}
+	
+	@Test
+	public void testHasNextDoesNotOverweiteCurrentRecord() throws Exception
+	{
+		try {
+			Iterator<Record> valsIter = null;
+			Record rec = null;
+			
+			Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
+			valsIter = this.psi.getValues();
+			Assert.assertNotNull("Returned Iterator must not be null", valsIter);
+			Assert.assertTrue("KeyGroupedIterator's value iterator must have another value.", valsIter.hasNext());
+			rec = valsIter.next();
+			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 1, rec.getField(0, IntValue.class).getValue());
+			Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("A"), rec.getField(1, StringValue.class));
+			Assert.assertFalse("KeyGroupedIterator must have another value.", valsIter.hasNext());
+			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 1, rec.getField(0, IntValue.class).getValue());
+			Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("A"), rec.getField(1, StringValue.class));			
+			Assert.assertFalse("KeyGroupedIterator's value iterator must not have another value.", valsIter.hasNext());
+			
+			Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
+			valsIter = this.psi.getValues();
+			Assert.assertNotNull("Returned Iterator must not be null", valsIter);
+			Assert.assertTrue("KeyGroupedIterator's value iterator must have another value.", valsIter.hasNext());
+			rec = valsIter.next();
+			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 2, rec.getField(0, IntValue.class).getValue());
+			Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("B"), rec.getField(1, StringValue.class));
+			Assert.assertFalse("KeyGroupedIterator must have another value.", valsIter.hasNext());
+			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 2, rec.getField(0, IntValue.class).getValue());
+			Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("B"), rec.getField(1, StringValue.class));
+			Assert.assertFalse("KeyGroupedIterator's value iterator must not have another value.", valsIter.hasNext());
+			
+			Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
+			valsIter = this.psi.getValues();
+			Assert.assertNotNull("Returned Iterator must not be null", valsIter);
+			Assert.assertTrue("KeyGroupedIterator's value iterator must have another value.", valsIter.hasNext());
+			rec = valsIter.next();
+			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 3, rec.getField(0, IntValue.class).getValue());
+			Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("C"), rec.getField(1, StringValue.class));
+			Assert.assertTrue("KeyGroupedIterator's value iterator must have another value.", valsIter.hasNext());
+			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 3, rec.getField(0, IntValue.class).getValue());
+			Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("C"), rec.getField(1, StringValue.class));
+			rec = valsIter.next();
+			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 3, rec.getField(0, IntValue.class).getValue());
+			Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("D"), rec.getField(1, StringValue.class));
+			Assert.assertFalse("KeyGroupedIterator's value iterator must have another value.", valsIter.hasNext());
+			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 3, rec.getField(0, IntValue.class).getValue());
+			Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("D"), rec.getField(1, StringValue.class));
+			Assert.assertFalse("KeyGroupedIterator's value iterator must have another value.", valsIter.hasNext());
+			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 3, rec.getField(0, IntValue.class).getValue());
+			Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("D"), rec.getField(1, StringValue.class));
+		} catch (Exception e) {
+			e.printStackTrace();
+			Assert.fail("The test encountered an unexpected exception.");
+		}
+	}
+	
+	private static final class IntStringPair
+	{
+		private final IntValue integer;
+		private final StringValue string;
+
+		IntStringPair(IntValue integer, StringValue string) {
+			this.integer = integer;
+			this.string = string;
+		}
+
+		public IntValue getInteger() {
+			return integer;
+		}
+
+		public StringValue getString() {
+			return string;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b7b32a05/flink-runtime/src/test/java/org/apache/flink/runtime/util/ReusingKeyGroupedIteratorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/ReusingKeyGroupedIteratorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/ReusingKeyGroupedIteratorTest.java
new file mode 100644
index 0000000..8a9f8ba
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/ReusingKeyGroupedIteratorTest.java
@@ -0,0 +1,400 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+import org.apache.flink.api.common.typeutils.record.RecordComparator;
+import org.apache.flink.api.common.typeutils.record.RecordSerializer;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.Record;
+import org.apache.flink.types.StringValue;
+import org.apache.flink.util.MutableObjectIterator;
+import org.apache.flink.util.TraversableOnceException;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Test for the key grouped iterator, which advances in windows containing the same key and provides a sub-iterator
+ * over the records with the same key.
+ */
+public class ReusingKeyGroupedIteratorTest {
+	
+	private MutableObjectIterator<Record> sourceIter;		// the iterator that provides the input
+	
+	private ReusingKeyGroupedIterator<Record> psi;						// the grouping iterator, progressing in key steps
+	
+	@Before
+	public void setup() {
+		final ArrayList<IntStringPair> source = new ArrayList<IntStringPair>();
+		
+		// add elements to the source
+		source.add(new IntStringPair(new IntValue(1), new StringValue("A")));
+		source.add(new IntStringPair(new IntValue(2), new StringValue("B")));
+		source.add(new IntStringPair(new IntValue(3), new StringValue("C")));
+		source.add(new IntStringPair(new IntValue(3), new StringValue("D")));
+		source.add(new IntStringPair(new IntValue(4), new StringValue("E")));
+		source.add(new IntStringPair(new IntValue(4), new StringValue("F")));
+		source.add(new IntStringPair(new IntValue(4), new StringValue("G")));
+		source.add(new IntStringPair(new IntValue(5), new StringValue("H")));
+		source.add(new IntStringPair(new IntValue(5), new StringValue("I")));
+		source.add(new IntStringPair(new IntValue(5), new StringValue("J")));
+		source.add(new IntStringPair(new IntValue(5), new StringValue("K")));
+		source.add(new IntStringPair(new IntValue(5), new StringValue("L")));
+		
+		
+		this.sourceIter = new MutableObjectIterator<Record>() {
+			final Iterator<IntStringPair> it = source.iterator();
+			
+			@Override
+			public Record next(Record reuse) throws IOException {
+				if (it.hasNext()) {
+					IntStringPair pair = it.next();
+					reuse.setField(0, pair.getInteger());
+					reuse.setField(1, pair.getString());
+					return reuse;
+				}
+				else {
+					return null;
+				}
+			}
+
+			@Override
+			public Record next() throws IOException {
+				if (it.hasNext()) {
+					IntStringPair pair = it.next();
+					Record result = new Record(2);
+					result.setField(0, pair.getInteger());
+					result.setField(1, pair.getString());
+					return result;
+				}
+				else {
+					return null;
+				}
+			}
+
+		};
+		
+		final RecordSerializer serializer = RecordSerializer.get();
+		@SuppressWarnings("unchecked")
+		final RecordComparator comparator = new RecordComparator(new int[] {0}, new Class[] {IntValue.class});
+		
+		this.psi = new ReusingKeyGroupedIterator<Record>(this.sourceIter, serializer, comparator);
+	}
+
+	@Test
+	public void testNextKeyOnly() throws Exception {
+		try {
+			Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
+			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(1))));
+			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 1, this.psi.getCurrent().getField(0, IntValue.class).getValue());
+			
+			Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
+			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(2))));
+			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 2, this.psi.getCurrent().getField(0, IntValue.class).getValue());
+			
+			Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
+			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(3))));
+			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 3, this.psi.getCurrent().getField(0, IntValue.class).getValue());
+			
+			Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
+			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(4))));
+			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 4, this.psi.getCurrent().getField(0, IntValue.class).getValue());
+			
+			Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
+			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(5))));
+			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 5, this.psi.getCurrent().getField(0, IntValue.class).getValue());
+			
+			Assert.assertFalse("KeyGroupedIterator must not have another key.", this.psi.nextKey());
+			Assert.assertNull("KeyGroupedIterator must not have another value.", this.psi.getValues());
+			
+			Assert.assertFalse("KeyGroupedIterator must not have another key.", this.psi.nextKey());
+			Assert.assertFalse("KeyGroupedIterator must not have another key.", this.psi.nextKey());
+		} catch (Exception e) {
+			e.printStackTrace();
+			Assert.fail("The test encountered an unexpected exception.");
+		}
+	}
+	
+	@Test
+	public void testFullIterationThroughAllValues() throws IOException
+	{
+		try {
+			// Key 1, Value A
+			Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
+			Assert.assertTrue(hasIterator(this.psi.getValues()));
+			Assert.assertFalse(hasIterator(this.psi.getValues()));
+			Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
+			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(1))));
+			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 1, this.psi.getCurrent().getField(0, IntValue.class).getValue());
+			Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("A"), this.psi.getValues().next().getField(1, StringValue.class));
+			Assert.assertFalse("KeyGroupedIterator must not have another value.", this.psi.getValues().hasNext());
+			
+			// Key 2, Value B
+			Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
+			Assert.assertTrue(hasIterator(this.psi.getValues()));
+			Assert.assertFalse(hasIterator(this.psi.getValues()));
+			Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
+			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(2))));
+			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 2, this.psi.getCurrent().getField(0, IntValue.class).getValue());
+			Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("B"), this.psi.getValues().next().getField(1, StringValue.class));
+			Assert.assertFalse("KeyGroupedIterator must not have another value.", this.psi.getValues().hasNext());
+			
+			// Key 3, Values C, D
+			Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
+			Assert.assertTrue(hasIterator(this.psi.getValues()));
+			Assert.assertFalse(hasIterator(this.psi.getValues()));
+			Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
+			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(3))));
+			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 3, this.psi.getCurrent().getField(0, IntValue.class).getValue());
+			Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("C"), this.psi.getValues().next().getField(1, StringValue.class));
+			Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
+			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(3))));
+			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 3, this.psi.getCurrent().getField(0, IntValue.class).getValue());
+			Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("D"), this.psi.getValues().next().getField(1, StringValue.class));
+			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(3))));
+			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 3, this.psi.getCurrent().getField(0, IntValue.class).getValue());
+			try {
+				this.psi.getValues().next();
+				Assert.fail("A new KeyGroupedIterator must not have any value available and hence throw an exception on next().");
+			}
+			catch (NoSuchElementException nseex) {}
+			Assert.assertFalse("KeyGroupedIterator must not have another value.", this.psi.getValues().hasNext());
+			try {
+				this.psi.getValues().next();
+				Assert.fail("A new KeyGroupedIterator must not have any value available and hence throw an exception on next().");
+			}
+			catch (NoSuchElementException nseex) {}
+			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(3))));
+			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 3, this.psi.getCurrent().getField(0, IntValue.class).getValue());
+			
+			// Key 4, Values E, F, G
+			Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
+			Assert.assertTrue(hasIterator(this.psi.getValues()));
+			Assert.assertFalse(hasIterator(this.psi.getValues()));
+			Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
+			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(4))));
+			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 4, this.psi.getCurrent().getField(0, IntValue.class).getValue());
+			Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("E"), this.psi.getValues().next().getField(1, StringValue.class));
+			Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
+			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(4))));
+			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 4, this.psi.getCurrent().getField(0, IntValue.class).getValue());
+			Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("F"), this.psi.getValues().next().getField(1, StringValue.class));
+			Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
+			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(4))));
+			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 4, this.psi.getCurrent().getField(0, IntValue.class).getValue());
+			Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("G"), this.psi.getValues().next().getField(1, StringValue.class));
+			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(4))));
+			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 4, this.psi.getCurrent().getField(0, IntValue.class).getValue());
+			Assert.assertFalse("KeyGroupedIterator must not have another value.", this.psi.getValues().hasNext());
+			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(4))));
+			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 4, this.psi.getCurrent().getField(0, IntValue.class).getValue());
+			
+			// Key 5, Values H, I, J, K, L
+			Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
+			Assert.assertTrue(hasIterator(this.psi.getValues()));
+			Assert.assertFalse(hasIterator(this.psi.getValues()));
+			Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
+			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(5))));
+			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 5, this.psi.getCurrent().getField(0, IntValue.class).getValue());
+			Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("H"), this.psi.getValues().next().getField(1, StringValue.class));
+			Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
+			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(5))));
+			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 5, this.psi.getCurrent().getField(0, IntValue.class).getValue());
+			Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("I"), this.psi.getValues().next().getField(1, StringValue.class));
+			Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
+			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(5))));
+			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 5, this.psi.getCurrent().getField(0, IntValue.class).getValue());
+			Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("J"), this.psi.getValues().next().getField(1, StringValue.class));
+			Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
+			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(5))));
+			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 5, this.psi.getCurrent().getField(0, IntValue.class).getValue());
+			Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("K"), this.psi.getValues().next().getField(1, StringValue.class));
+			Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
+			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(5))));
+			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 5, this.psi.getCurrent().getField(0, IntValue.class).getValue());
+			Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("L"), this.psi.getValues().next().getField(1, StringValue.class));
+			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(5))));
+			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 5, this.psi.getCurrent().getField(0, IntValue.class).getValue());
+			try {
+				this.psi.getValues().next();
+				Assert.fail("A new KeyGroupedIterator must not have any value available and hence throw an exception on next().");
+			}
+			catch (NoSuchElementException nseex) {}
+			Assert.assertFalse("KeyGroupedIterator must not have another value.", this.psi.getValues().hasNext());
+			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(5))));
+			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 5, this.psi.getCurrent().getField(0, IntValue.class).getValue());
+			try {
+				this.psi.getValues().next();
+				Assert.fail("A new KeyGroupedIterator must not have any value available and hence throw an exception on next().");
+			}
+			catch (NoSuchElementException nseex) {}
+			
+			Assert.assertFalse("KeyGroupedIterator must not have another key.", this.psi.nextKey());
+			Assert.assertFalse("KeyGroupedIterator must not have another key.", this.psi.nextKey());
+			Assert.assertNull(this.psi.getValues());
+		} catch (Exception e) {
+			e.printStackTrace();
+			Assert.fail("The test encountered an unexpected exception.");
+		}
+	}
+	
+	@Test
+	public void testMixedProgress() throws Exception
+	{
+		try {
+			// Progression only via nextKey() and hasNext() - Key 1, Value A
+			Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
+			Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
+			Assert.assertTrue(hasIterator(this.psi.getValues()));
+			Assert.assertFalse(hasIterator(this.psi.getValues()));
+			
+			// Progression only through nextKey() - Key 2, Value B
+			Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
+			Assert.assertTrue(hasIterator(this.psi.getValues()));
+			Assert.assertFalse(hasIterator(this.psi.getValues()));
+			
+			// Progression first though haNext() and next(), then through hasNext() - Key 3, Values C, D
+			Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
+			Assert.assertTrue(hasIterator(this.psi.getValues()));
+			Assert.assertFalse(hasIterator(this.psi.getValues()));
+			Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
+			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(3))));
+			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 3, this.psi.getCurrent().getField(0, IntValue.class).getValue());
+			Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("C"), this.psi.getValues().next().getField(1, StringValue.class));
+			Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
+			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(3))));
+			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 3, this.psi.getCurrent().getField(0, IntValue.class).getValue());
+			
+			// Progression first via next() only, then hasNext() only Key 4, Values E, F, G
+			Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
+			Assert.assertTrue(hasIterator(this.psi.getValues()));
+			Assert.assertFalse(hasIterator(this.psi.getValues()));
+			Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("E"), this.psi.getValues().next().getField(1, StringValue.class));
+			Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
+			
+			// Key 5, Values H, I, J, K, L
+			Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
+			Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("H"), this.psi.getValues().next().getField(1, StringValue.class));
+			Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
+			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(5))));
+			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 5, this.psi.getCurrent().getField(0, IntValue.class).getValue());
+			Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("I"), this.psi.getValues().next().getField(1, StringValue.class));
+			Assert.assertTrue(hasIterator(this.psi.getValues()));
+			Assert.assertFalse(hasIterator(this.psi.getValues()));
+			Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
+			
+			// end
+			Assert.assertFalse("KeyGroupedIterator must not have another key.", this.psi.nextKey());
+			Assert.assertFalse("KeyGroupedIterator must not have another key.", this.psi.nextKey());
+		} catch (Exception e) {
+			e.printStackTrace();
+			Assert.fail("The test encountered an unexpected exception.");
+		}
+	}
+	
+	@Test
+	public void testHasNextDoesNotOverweiteCurrentRecord() throws Exception
+	{
+		try {
+			Iterator<Record> valsIter = null;
+			Record rec = null;
+			
+			Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
+			valsIter = this.psi.getValues();
+			Assert.assertNotNull("Returned Iterator must not be null", valsIter);
+			Assert.assertTrue("KeyGroupedIterator's value iterator must have another value.", valsIter.hasNext());
+			rec = valsIter.next();
+			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 1, rec.getField(0, IntValue.class).getValue());
+			Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("A"), rec.getField(1, StringValue.class));
+			Assert.assertFalse("KeyGroupedIterator must have another value.", valsIter.hasNext());
+			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 1, rec.getField(0, IntValue.class).getValue());
+			Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("A"), rec.getField(1, StringValue.class));			
+			Assert.assertFalse("KeyGroupedIterator's value iterator must not have another value.", valsIter.hasNext());
+			
+			Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
+			valsIter = this.psi.getValues();
+			Assert.assertNotNull("Returned Iterator must not be null", valsIter);
+			Assert.assertTrue("KeyGroupedIterator's value iterator must have another value.", valsIter.hasNext());
+			rec = valsIter.next();
+			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 2, rec.getField(0, IntValue.class).getValue());
+			Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("B"), rec.getField(1, StringValue.class));
+			Assert.assertFalse("KeyGroupedIterator must have another value.", valsIter.hasNext());
+			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 2, rec.getField(0, IntValue.class).getValue());
+			Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("B"), rec.getField(1, StringValue.class));
+			Assert.assertFalse("KeyGroupedIterator's value iterator must not have another value.", valsIter.hasNext());
+			
+			Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
+			valsIter = this.psi.getValues();
+			Assert.assertNotNull("Returned Iterator must not be null", valsIter);
+			Assert.assertTrue("KeyGroupedIterator's value iterator must have another value.", valsIter.hasNext());
+			rec = valsIter.next();
+			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 3, rec.getField(0, IntValue.class).getValue());
+			Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("C"), rec.getField(1, StringValue.class));
+			Assert.assertTrue("KeyGroupedIterator's value iterator must have another value.", valsIter.hasNext());
+			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 3, rec.getField(0, IntValue.class).getValue());
+			Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("C"), rec.getField(1, StringValue.class));
+			rec = valsIter.next();
+			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 3, rec.getField(0, IntValue.class).getValue());
+			Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("D"), rec.getField(1, StringValue.class));
+			Assert.assertFalse("KeyGroupedIterator's value iterator must have another value.", valsIter.hasNext());
+			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 3, rec.getField(0, IntValue.class).getValue());
+			Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("D"), rec.getField(1, StringValue.class));
+			Assert.assertFalse("KeyGroupedIterator's value iterator must have another value.", valsIter.hasNext());
+			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 3, rec.getField(0, IntValue.class).getValue());
+			Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("D"), rec.getField(1, StringValue.class));
+		} catch (Exception e) {
+			e.printStackTrace();
+			Assert.fail("The test encountered an unexpected exception.");
+		}
+	}
+	
+	private static final class IntStringPair
+	{
+		private final IntValue integer;
+		private final StringValue string;
+
+		IntStringPair(IntValue integer, StringValue string) {
+			this.integer = integer;
+			this.string = string;
+		}
+
+		public IntValue getInteger() {
+			return integer;
+		}
+
+		public StringValue getString() {
+			return string;
+		}
+	}
+	
+	public boolean hasIterator(Iterable<?> iterable) {
+		try {
+			iterable.iterator();
+			return true;
+		}
+		catch (TraversableOnceException e) {
+			return false;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b7b32a05/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBase.java
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBase.java b/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBase.java
index 3304c7d..5e3a0a8 100644
--- a/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBase.java
+++ b/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBase.java
@@ -85,7 +85,7 @@ public abstract class JavaProgramTestBase extends AbstractTestBase {
 	// --------------------------------------------------------------------------------------------
 
 	@Test
-	public void testJob() throws Exception {
+	public void testJobWithObjectReuse() throws Exception {
 		isCollectionExecution = false;
 		
 		startCluster();
@@ -102,6 +102,7 @@ public abstract class JavaProgramTestBase extends AbstractTestBase {
 			
 			// prepare the test environment
 			TestEnvironment env = new TestEnvironment(this.executor, this.degreeOfParallelism);
+			env.getConfig().enableObjectReuse();
 			env.setAsContext();
 			
 			// call the test program
@@ -130,6 +131,54 @@ public abstract class JavaProgramTestBase extends AbstractTestBase {
 			stopCluster();
 		}
 	}
+
+	@Test
+	public void testJobWithoutObjectReuse() throws Exception {
+		isCollectionExecution = false;
+
+		startCluster();
+		try {
+			// pre-submit
+			try {
+				preSubmit();
+			}
+			catch (Exception e) {
+				System.err.println(e.getMessage());
+				e.printStackTrace();
+				Assert.fail("Pre-submit work caused an error: " + e.getMessage());
+			}
+
+			// prepare the test environment
+			TestEnvironment env = new TestEnvironment(this.executor, this.degreeOfParallelism);
+			env.getConfig().disableObjectReuse();
+			env.setAsContext();
+
+			// call the test program
+			try {
+				testProgram();
+				this.latestExecutionResult = env.latestResult;
+			}
+			catch (Exception e) {
+				System.err.println(e.getMessage());
+				e.printStackTrace();
+				Assert.fail("Error while calling the test program: " + e.getMessage());
+			}
+
+			Assert.assertNotNull("The test program never triggered an execution.", this.latestExecutionResult);
+
+			// post-submit
+			try {
+				postSubmit();
+			}
+			catch (Exception e) {
+				System.err.println(e.getMessage());
+				e.printStackTrace();
+				Assert.fail("Post-submit work caused an error: " + e.getMessage());
+			}
+		} finally {
+			stopCluster();
+		}
+	}
 	
 	@Test
 	public void testJobCollectionExecution() throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/b7b32a05/flink-tests/src/test/java/org/apache/flink/test/iterative/CoGroupConnectedComponentsITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/CoGroupConnectedComponentsITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/CoGroupConnectedComponentsITCase.java
index 60328fe..0a4673a 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/CoGroupConnectedComponentsITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/CoGroupConnectedComponentsITCase.java
@@ -100,7 +100,7 @@ public class CoGroupConnectedComponentsITCase extends RecordAPITestBase {
 			}
 			Record old = current.next();
 			long oldId = old.getField(1, LongValue.class).getValue();
-			
+
 			long minimumComponentID = Long.MAX_VALUE;
 
 			while (candidates.hasNext()) {
@@ -110,7 +110,7 @@ public class CoGroupConnectedComponentsITCase extends RecordAPITestBase {
 					minimumComponentID = candidateComponentID;
 				}
 			}
-			
+
 			if (minimumComponentID < oldId) {
 				newComponentId.setValue(minimumComponentID);
 				old.setField(1, newComponentId);

http://git-wip-us.apache.org/repos/asf/flink/blob/b7b32a05/flink-tests/src/test/java/org/apache/flink/test/iterative/DependencyConnectedComponentsITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/DependencyConnectedComponentsITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/DependencyConnectedComponentsITCase.java
index b59fe4f..15079ec 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/DependencyConnectedComponentsITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/DependencyConnectedComponentsITCase.java
@@ -57,6 +57,9 @@ public class DependencyConnectedComponentsITCase extends JavaProgramTestBase {
 	
 	@Override
 	protected void preSubmit() throws Exception {
+		verticesInput.clear();
+		edgesInput.clear();
+
 		// vertices input
 		verticesInput.add(new Tuple2<Long, Long>(1l,1l));
 		verticesInput.add(new Tuple2<Long, Long>(2l,2l));

http://git-wip-us.apache.org/repos/asf/flink/blob/b7b32a05/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ReduceITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ReduceITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ReduceITCase.java
index 065be67..1168e3c 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ReduceITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ReduceITCase.java
@@ -244,29 +244,6 @@ public class ReduceITCase extends MultipleProgramsTestBase {
 	}
 
 	@Test
-	public void testReduceWithUDFThatReturnsTheSecondInputObject() throws Exception {
-		/*
-		 * Reduce with UDF that returns the second input object (check mutable object handling)
-		 */
-
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
-		DataSet<Tuple3<Integer, Long, String>> reduceDs = ds.
-				groupBy(1).reduce(new InputReturningTuple3Reduce());
-
-		reduceDs.writeAsCsv(resultPath);
-		env.execute();
-
-		expected = "1,1,Hi\n" +
-				"5,2,Hi again!\n" +
-				"15,3,Hi again!\n" +
-				"34,4,Hi again!\n" +
-				"65,5,Hi again!\n" +
-				"111,6,Hi again!\n";
-	}
-
-	@Test
 	public void testReduceATupleReturningKeySelector() throws Exception {
 		/*
 		 * Reduce with a Tuple-returning KeySelector
@@ -451,20 +428,6 @@ public class ReduceITCase extends MultipleProgramsTestBase {
 		}
 	}
 	
-	public static class InputReturningTuple3Reduce implements ReduceFunction<Tuple3<Integer, Long, String>> {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public Tuple3<Integer, Long, String> reduce(
-				Tuple3<Integer, Long, String> in1,
-				Tuple3<Integer, Long, String> in2) throws Exception {
-
-			in2.f0 = in1.f0 + in2.f0;
-			in2.f2 = "Hi again!";
-			return in2;
-		}
-	}
-	
 	public static class AllAddingTuple3Reduce implements ReduceFunction<Tuple3<Integer, Long, String>> {
 		private static final long serialVersionUID = 1L;
 		private final Tuple3<Integer, Long, String> out = new Tuple3<Integer, Long, String>();


Mime
View raw message