flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [1/2] flink git commit: [FLINK-1498] [tests] Better error messages in external sort tests
Date Mon, 09 Feb 2015 17:54:40 GMT
Repository: flink
Updated Branches:
  refs/heads/master 6ddfcb402 -> 52d9806ba


[FLINK-1498] [tests] Better error messages in external sort tests


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/52d9806b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/52d9806b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/52d9806b

Branch: refs/heads/master
Commit: 52d9806baaff1689f21962febb7dc73d68572289
Parents: 30a52a0
Author: Stephan Ewen <sewen@apache.org>
Authored: Mon Feb 9 17:44:42 2015 +0100
Committer: Stephan Ewen <sewen@apache.org>
Committed: Mon Feb 9 17:45:48 2015 +0100

----------------------------------------------------------------------
 .../operators/sort/ExternalSortITCase.java      | 453 ++++++++++---------
 .../sort/ExternalSortLargeRecordsITCase.java    |   8 +-
 2 files changed, 252 insertions(+), 209 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/52d9806b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortITCase.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortITCase.java
index 6e35ad0..6a1eba7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortITCase.java
@@ -74,6 +74,8 @@ public class ExternalSortITCase {
 	private TypeSerializerFactory<Record> pactRecordSerializer;
 	
 	private TypeComparator<Record> pactRecordComparator;
+	
+	private boolean failed;
 
 	// --------------------------------------------------------------------------------------------
 
@@ -96,7 +98,7 @@ public class ExternalSortITCase {
 		
 		if (this.memoryManager != null) {
 			Assert.assertTrue("Memory leak: not all segments have been returned to the memory manager.",

-				this.memoryManager.verifyEmpty());
+				failed || this.memoryManager.verifyEmpty());
 			this.memoryManager.shutdown();
 			this.memoryManager = null;
 		}
@@ -105,246 +107,281 @@ public class ExternalSortITCase {
 	// --------------------------------------------------------------------------------------------
 	
 	@Test
-	public void testInMemorySort() throws Exception {
-		// comparator
-		final Comparator<TestData.Key> keyComparator = new TestData.KeyComparator();
-		
-		final TestData.Generator generator = new TestData.Generator(SEED, KEY_MAX, VALUE_LENGTH,
KeyMode.RANDOM, ValueMode.CONSTANT, VAL);
-		final MutableObjectIterator<Record> source = new TestData.GeneratorIterator(generator,
NUM_PAIRS);
-
-		// merge iterator
-		LOG.debug("Initializing sortmerger...");
-		
-		Sorter<Record> merger = new UnilateralSortMerger<Record>(this.memoryManager,
this.ioManager, 
-			source, this.parentTask, this.pactRecordSerializer, this.pactRecordComparator,
-				(double)64/78, 2, 0.9f);
-
-		// emit data
-		LOG.debug("Reading and sorting data...");
-
-		// check order
-		MutableObjectIterator<Record> iterator = merger.getIterator();
-		
-		LOG.debug("Checking results...");
-		int pairsEmitted = 1;
-
-		Record rec1 = new Record();
-		Record rec2 = new Record();
-		
-		Assert.assertTrue((rec1 = iterator.next(rec1)) != null);
-		while ((rec2 = iterator.next(rec2)) != null) {
-			final Key k1 = rec1.getField(0, TestData.Key.class);
-			final Key k2 = rec2.getField(0, TestData.Key.class);
-			pairsEmitted++;
+	public void testInMemorySort() {
+		try {
+			// comparator
+			final Comparator<TestData.Key> keyComparator = new TestData.KeyComparator();
 			
-			Assert.assertTrue(keyComparator.compare(k1, k2) <= 0); 
+			final TestData.Generator generator = new TestData.Generator(SEED, KEY_MAX, VALUE_LENGTH,
KeyMode.RANDOM, ValueMode.CONSTANT, VAL);
+			final MutableObjectIterator<Record> source = new TestData.GeneratorIterator(generator,
NUM_PAIRS);
+	
+			// merge iterator
+			LOG.debug("Initializing sortmerger...");
 			
-			Record tmp = rec1;
-			rec1 = rec2;
-			k1.setKey(k2.getKey());
+			Sorter<Record> merger = new UnilateralSortMerger<Record>(this.memoryManager,
this.ioManager, 
+				source, this.parentTask, this.pactRecordSerializer, this.pactRecordComparator,
+					(double)64/78, 2, 0.9f);
+	
+			// emit data
+			LOG.debug("Reading and sorting data...");
+	
+			// check order
+			MutableObjectIterator<Record> iterator = merger.getIterator();
+			
+			LOG.debug("Checking results...");
+			int pairsEmitted = 1;
+	
+			Record rec1 = new Record();
+			Record rec2 = new Record();
 			
-			rec2 = tmp;
+			Assert.assertTrue((rec1 = iterator.next(rec1)) != null);
+			while ((rec2 = iterator.next(rec2)) != null) {
+				final Key k1 = rec1.getField(0, TestData.Key.class);
+				final Key k2 = rec2.getField(0, TestData.Key.class);
+				pairsEmitted++;
+				
+				Assert.assertTrue(keyComparator.compare(k1, k2) <= 0); 
+				
+				Record tmp = rec1;
+				rec1 = rec2;
+				k1.setKey(k2.getKey());
+				
+				rec2 = tmp;
+			}
+			Assert.assertTrue(NUM_PAIRS == pairsEmitted);
+			
+			merger.close();
+		}
+		catch (Exception e) {
+			failed = true;
+			e.printStackTrace();
+			Assert.fail(e.getMessage());
 		}
-		Assert.assertTrue(NUM_PAIRS == pairsEmitted);
-		
-		merger.close();
 	}
 	
 	@Test
-	public void testInMemorySortUsing10Buffers() throws Exception {
-		// comparator
-		final Comparator<TestData.Key> keyComparator = new TestData.KeyComparator();
-		
-		final TestData.Generator generator = new TestData.Generator(SEED, KEY_MAX, VALUE_LENGTH,
KeyMode.RANDOM, ValueMode.CONSTANT, VAL);
-		final MutableObjectIterator<Record> source = new TestData.GeneratorIterator(generator,
NUM_PAIRS);
-
-		// merge iterator
-		LOG.debug("Initializing sortmerger...");
-		
-		Sorter<Record> merger = new UnilateralSortMerger<Record>(this.memoryManager,
this.ioManager, 
-				source, this.parentTask, this.pactRecordSerializer, this.pactRecordComparator,
-				(double)64/78, 10, 2, 0.9f);
-
-		// emit data
-		LOG.debug("Reading and sorting data...");
-
-		// check order
-		MutableObjectIterator<Record> iterator = merger.getIterator();
-		
-		LOG.debug("Checking results...");
-		int pairsEmitted = 1;
-
-		Record rec1 = new Record();
-		Record rec2 = new Record();
-		
-		Assert.assertTrue((rec1 = iterator.next(rec1)) != null);
-		while ((rec2 = iterator.next(rec2)) != null) {
-			final Key k1 = rec1.getField(0, TestData.Key.class);
-			final Key k2 = rec2.getField(0, TestData.Key.class);
-			pairsEmitted++;
+	public void testInMemorySortUsing10Buffers() {
+		try {
+			// comparator
+			final Comparator<TestData.Key> keyComparator = new TestData.KeyComparator();
+			
+			final TestData.Generator generator = new TestData.Generator(SEED, KEY_MAX, VALUE_LENGTH,
KeyMode.RANDOM, ValueMode.CONSTANT, VAL);
+			final MutableObjectIterator<Record> source = new TestData.GeneratorIterator(generator,
NUM_PAIRS);
+	
+			// merge iterator
+			LOG.debug("Initializing sortmerger...");
+			
+			Sorter<Record> merger = new UnilateralSortMerger<Record>(this.memoryManager,
this.ioManager, 
+					source, this.parentTask, this.pactRecordSerializer, this.pactRecordComparator,
+					(double)64/78, 10, 2, 0.9f);
+	
+			// emit data
+			LOG.debug("Reading and sorting data...");
+	
+			// check order
+			MutableObjectIterator<Record> iterator = merger.getIterator();
 			
-			Assert.assertTrue(keyComparator.compare(k1, k2) <= 0); 
+			LOG.debug("Checking results...");
+			int pairsEmitted = 1;
+	
+			Record rec1 = new Record();
+			Record rec2 = new Record();
 			
-			Record tmp = rec1;
-			rec1 = rec2;
-			k1.setKey(k2.getKey());
+			Assert.assertTrue((rec1 = iterator.next(rec1)) != null);
+			while ((rec2 = iterator.next(rec2)) != null) {
+				final Key k1 = rec1.getField(0, TestData.Key.class);
+				final Key k2 = rec2.getField(0, TestData.Key.class);
+				pairsEmitted++;
+				
+				Assert.assertTrue(keyComparator.compare(k1, k2) <= 0); 
+				
+				Record tmp = rec1;
+				rec1 = rec2;
+				k1.setKey(k2.getKey());
+				
+				rec2 = tmp;
+			}
+			Assert.assertTrue(NUM_PAIRS == pairsEmitted);
 			
-			rec2 = tmp;
+			merger.close();
+		}
+		catch (Exception e) {
+			failed = true;
+			e.printStackTrace();
+			Assert.fail(e.getMessage());
 		}
-		Assert.assertTrue(NUM_PAIRS == pairsEmitted);
-		
-		merger.close();
 	}
 
 	@Test
-	public void testSpillingSort() throws Exception {
-		// comparator
-		final Comparator<TestData.Key> keyComparator = new TestData.KeyComparator();
-		
-		final TestData.Generator generator = new TestData.Generator(SEED, KEY_MAX, VALUE_LENGTH,
KeyMode.RANDOM, ValueMode.CONSTANT, VAL);
-		final MutableObjectIterator<Record> source = new TestData.GeneratorIterator(generator,
NUM_PAIRS);
-
-		// merge iterator
-		LOG.debug("Initializing sortmerger...");
-		
-		Sorter<Record> merger = new UnilateralSortMerger<Record>(this.memoryManager,
this.ioManager, 
-				source, this.parentTask, this.pactRecordSerializer, this.pactRecordComparator,
-				(double)16/78, 64, 0.7f);
-
-		// emit data
-		LOG.debug("Reading and sorting data...");
-
-		// check order
-		MutableObjectIterator<Record> iterator = merger.getIterator();
-		
-		LOG.debug("Checking results...");
-		int pairsEmitted = 1;
-
-		Record rec1 = new Record();
-		Record rec2 = new Record();
-		
-		Assert.assertTrue((rec1 = iterator.next(rec1)) != null);
-		while ((rec2 = iterator.next(rec2)) != null) {
-			final Key k1 = rec1.getField(0, TestData.Key.class);
-			final Key k2 = rec2.getField(0, TestData.Key.class);
-			pairsEmitted++;
+	public void testSpillingSort() {
+		try {
+			// comparator
+			final Comparator<TestData.Key> keyComparator = new TestData.KeyComparator();
 			
-			Assert.assertTrue(keyComparator.compare(k1, k2) <= 0); 
+			final TestData.Generator generator = new TestData.Generator(SEED, KEY_MAX, VALUE_LENGTH,
KeyMode.RANDOM, ValueMode.CONSTANT, VAL);
+			final MutableObjectIterator<Record> source = new TestData.GeneratorIterator(generator,
NUM_PAIRS);
+	
+			// merge iterator
+			LOG.debug("Initializing sortmerger...");
+			
+			Sorter<Record> merger = new UnilateralSortMerger<Record>(this.memoryManager,
this.ioManager, 
+					source, this.parentTask, this.pactRecordSerializer, this.pactRecordComparator,
+					(double)16/78, 64, 0.7f);
+	
+			// emit data
+			LOG.debug("Reading and sorting data...");
+	
+			// check order
+			MutableObjectIterator<Record> iterator = merger.getIterator();
+			
+			LOG.debug("Checking results...");
+			int pairsEmitted = 1;
+	
+			Record rec1 = new Record();
+			Record rec2 = new Record();
 			
-			Record tmp = rec1;
-			rec1 = rec2;
-			k1.setKey(k2.getKey());
+			Assert.assertTrue((rec1 = iterator.next(rec1)) != null);
+			while ((rec2 = iterator.next(rec2)) != null) {
+				final Key k1 = rec1.getField(0, TestData.Key.class);
+				final Key k2 = rec2.getField(0, TestData.Key.class);
+				pairsEmitted++;
+				
+				Assert.assertTrue(keyComparator.compare(k1, k2) <= 0); 
+				
+				Record tmp = rec1;
+				rec1 = rec2;
+				k1.setKey(k2.getKey());
+				
+				rec2 = tmp;
+			}
+			Assert.assertTrue(NUM_PAIRS == pairsEmitted);
 			
-			rec2 = tmp;
+			merger.close();
+		}
+		catch (Exception e) {
+			failed = true;
+			e.printStackTrace();
+			Assert.fail(e.getMessage());
 		}
-		Assert.assertTrue(NUM_PAIRS == pairsEmitted);
-		
-		merger.close();
 	}
 
 	@Test
-	public void testSpillingSortWithIntermediateMerge() throws Exception {
-		// amount of pairs
-		final int PAIRS = 10000000;
-
-		// comparator
-		final Comparator<TestData.Key> keyComparator = new TestData.KeyComparator();
-
-		final TestData.Generator generator = new TestData.Generator(SEED, KEY_MAX, VALUE_LENGTH,
KeyMode.RANDOM, ValueMode.FIX_LENGTH);
-		final MutableObjectIterator<Record> source = new TestData.GeneratorIterator(generator,
PAIRS);
-		
-		// merge iterator
-		LOG.debug("Initializing sortmerger...");
-		
-		Sorter<Record> merger = new UnilateralSortMerger<Record>(this.memoryManager,
this.ioManager, 
-				source, this.parentTask, this.pactRecordSerializer, this.pactRecordComparator,
-				(double)64/78, 16, 0.7f);
-		
-		// emit data
-		LOG.debug("Emitting data...");
-
-		// check order
-		MutableObjectIterator<Record> iterator = merger.getIterator();
-		
-		LOG.debug("Checking results...");
-		int pairsRead = 1;
-		int nextStep = PAIRS / 20;
-
-		Record rec1 = new Record();
-		Record rec2 = new Record();
-		
-		Assert.assertTrue((rec1 = iterator.next(rec1)) != null);
-		while ((rec2 = iterator.next(rec2)) != null) {
-			final Key k1 = rec1.getField(0, TestData.Key.class);
-			final Key k2 = rec2.getField(0, TestData.Key.class);
-			pairsRead++;
+	public void testSpillingSortWithIntermediateMerge() {
+		try {
+			// amount of pairs
+			final int PAIRS = 10000000;
+	
+			// comparator
+			final Comparator<TestData.Key> keyComparator = new TestData.KeyComparator();
+	
+			final TestData.Generator generator = new TestData.Generator(SEED, KEY_MAX, VALUE_LENGTH,
KeyMode.RANDOM, ValueMode.FIX_LENGTH);
+			final MutableObjectIterator<Record> source = new TestData.GeneratorIterator(generator,
PAIRS);
 			
-			Assert.assertTrue(keyComparator.compare(k1, k2) <= 0); 
+			// merge iterator
+			LOG.debug("Initializing sortmerger...");
 			
-			Record tmp = rec1;
-			rec1 = rec2;
-			k1.setKey(k2.getKey());
-			rec2 = tmp;
+			Sorter<Record> merger = new UnilateralSortMerger<Record>(this.memoryManager,
this.ioManager, 
+					source, this.parentTask, this.pactRecordSerializer, this.pactRecordComparator,
+					(double)64/78, 16, 0.7f);
 			
-			// log
-			if (pairsRead == nextStep) {
-				nextStep += PAIRS / 20;
-			}
+			// emit data
+			LOG.debug("Emitting data...");
+	
+			// check order
+			MutableObjectIterator<Record> iterator = merger.getIterator();
 			
+			LOG.debug("Checking results...");
+			int pairsRead = 1;
+			int nextStep = PAIRS / 20;
+	
+			Record rec1 = new Record();
+			Record rec2 = new Record();
+			
+			Assert.assertTrue((rec1 = iterator.next(rec1)) != null);
+			while ((rec2 = iterator.next(rec2)) != null) {
+				final Key k1 = rec1.getField(0, TestData.Key.class);
+				final Key k2 = rec2.getField(0, TestData.Key.class);
+				pairsRead++;
+				
+				Assert.assertTrue(keyComparator.compare(k1, k2) <= 0); 
+				
+				Record tmp = rec1;
+				rec1 = rec2;
+				k1.setKey(k2.getKey());
+				rec2 = tmp;
+				
+				// log
+				if (pairsRead == nextStep) {
+					nextStep += PAIRS / 20;
+				}
+				
+			}
+			Assert.assertEquals("Not all pairs were read back in.", PAIRS, pairsRead);
+			merger.close();
+		}
+		catch (Exception e) {
+			failed = true;
+			e.printStackTrace();
+			Assert.fail(e.getMessage());
 		}
-		Assert.assertEquals("Not all pairs were read back in.", PAIRS, pairsRead);
-		merger.close();
 	}
 	
 	@Test
-	public void testSpillingSortWithIntermediateMergeIntPair() throws Exception {
-		// amount of pairs
-		final int PAIRS = 50000000;
-
-		// comparator
-		final RandomIntPairGenerator generator = new RandomIntPairGenerator(12345678, PAIRS);
-		
-		final TypeSerializerFactory<IntPair> serializerFactory = new IntPairSerializer.IntPairSerializerFactory();
-		final TypeComparator<IntPair> comparator = new IntPairComparator();
-		
-		// merge iterator
-		LOG.debug("Initializing sortmerger...");
-		
-		Sorter<IntPair> merger = new UnilateralSortMerger<IntPair>(this.memoryManager,
this.ioManager, 
-				generator, this.parentTask, serializerFactory, comparator, (double)64/78, 4, 0.7f);
-
-		// emit data
-		LOG.debug("Emitting data...");
-		
-		// check order
-		MutableObjectIterator<IntPair> iterator = merger.getIterator();
-		
-		LOG.debug("Checking results...");
-		int pairsRead = 1;
-		int nextStep = PAIRS / 20;
-
-		IntPair rec1 = new IntPair();
-		IntPair rec2 = new IntPair();
-		
-		Assert.assertTrue((rec1 = iterator.next(rec1)) != null);
-		
-		while ((rec2 = iterator.next(rec2)) != null) {
-			final int k1 = rec1.getKey();
-			final int k2 = rec2.getKey();
-			pairsRead++;
+	public void testSpillingSortWithIntermediateMergeIntPair() {
+		try {
+			// amount of pairs
+			final int PAIRS = 50000000;
+	
+			// comparator
+			final RandomIntPairGenerator generator = new RandomIntPairGenerator(12345678, PAIRS);
+			
+			final TypeSerializerFactory<IntPair> serializerFactory = new IntPairSerializer.IntPairSerializerFactory();
+			final TypeComparator<IntPair> comparator = new IntPairComparator();
 			
-			Assert.assertTrue(k1 - k2 <= 0); 
+			// merge iterator
+			LOG.debug("Initializing sortmerger...");
 			
-			IntPair tmp = rec1;
-			rec1 = rec2;
-			rec2 = tmp;
+			Sorter<IntPair> merger = new UnilateralSortMerger<IntPair>(this.memoryManager,
this.ioManager, 
+					generator, this.parentTask, serializerFactory, comparator, (double)64/78, 4, 0.7f);
+	
+			// emit data
+			LOG.debug("Emitting data...");
 			
-			// log
-			if (pairsRead == nextStep) {
-				nextStep += PAIRS / 20;
+			// check order
+			MutableObjectIterator<IntPair> iterator = merger.getIterator();
+			
+			LOG.debug("Checking results...");
+			int pairsRead = 1;
+			int nextStep = PAIRS / 20;
+	
+			IntPair rec1 = new IntPair();
+			IntPair rec2 = new IntPair();
+			
+			Assert.assertTrue((rec1 = iterator.next(rec1)) != null);
+			
+			while ((rec2 = iterator.next(rec2)) != null) {
+				final int k1 = rec1.getKey();
+				final int k2 = rec2.getKey();
+				pairsRead++;
+				
+				Assert.assertTrue(k1 - k2 <= 0); 
+				
+				IntPair tmp = rec1;
+				rec1 = rec2;
+				rec2 = tmp;
+				
+				// log
+				if (pairsRead == nextStep) {
+					nextStep += PAIRS / 20;
+				}
 			}
+			Assert.assertEquals("Not all pairs were read back in.", PAIRS, pairsRead);
+			merger.close();
+		}
+		catch (Exception e) {
+			failed = true;
+			e.printStackTrace();
+			Assert.fail(e.getMessage());
 		}
-		Assert.assertEquals("Not all pairs were read back in.", PAIRS, pairsRead);
-		merger.close();
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/52d9806b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortLargeRecordsITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortLargeRecordsITCase.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortLargeRecordsITCase.java
index 7403ab0..af7b008 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortLargeRecordsITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortLargeRecordsITCase.java
@@ -58,6 +58,8 @@ public class ExternalSortLargeRecordsITCase {
 	private IOManager ioManager;
 
 	private MemoryManager memoryManager;
+	
+	private boolean errored;
 
 	// --------------------------------------------------------------------------------------------
 
@@ -76,7 +78,7 @@ public class ExternalSortLargeRecordsITCase {
 		
 		if (this.memoryManager != null) {
 			Assert.assertTrue("Memory leak: not all segments have been returned to the memory manager.",

-				this.memoryManager.verifyEmpty());
+				errored || this.memoryManager.verifyEmpty());
 			this.memoryManager.shutdown();
 			this.memoryManager = null;
 		}
@@ -147,6 +149,7 @@ public class ExternalSortLargeRecordsITCase {
 			sorter.close();
 		}
 		catch (Exception e) {
+			errored = true;
 			e.printStackTrace();
 			fail(e.getMessage());
 		}
@@ -216,6 +219,7 @@ public class ExternalSortLargeRecordsITCase {
 			sorter.close();
 		}
 		catch (Exception e) {
+			errored = true;
 			e.printStackTrace();
 			fail(e.getMessage());
 		}
@@ -300,6 +304,7 @@ public class ExternalSortLargeRecordsITCase {
 			sorter.close();
 		}
 		catch (Exception e) {
+			errored = true;
 			e.printStackTrace();
 			fail(e.getMessage());
 		}
@@ -370,6 +375,7 @@ public class ExternalSortLargeRecordsITCase {
 			sorter.close();
 		}
 		catch (Exception e) {
+			errored = true;
 			e.printStackTrace();
 			fail(e.getMessage());
 		}


Mime
View raw message