Return-Path: X-Original-To: apmail-tez-commits-archive@minotaur.apache.org Delivered-To: apmail-tez-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 6E251172D4 for ; Thu, 5 Feb 2015 21:00:46 +0000 (UTC) Received: (qmail 91879 invoked by uid 500); 5 Feb 2015 21:00:46 -0000 Delivered-To: apmail-tez-commits-archive@tez.apache.org Received: (qmail 91784 invoked by uid 500); 5 Feb 2015 21:00:46 -0000 Mailing-List: contact commits-help@tez.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@tez.apache.org Delivered-To: mailing list commits@tez.apache.org Received: (qmail 91691 invoked by uid 99); 5 Feb 2015 21:00:46 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 05 Feb 2015 21:00:46 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 2C042E03F7; Thu, 5 Feb 2015 21:00:45 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sseth@apache.org To: commits@tez.apache.org Date: Thu, 05 Feb 2015 21:00:47 -0000 Message-Id: <5f5e519667f341379bcb4754fc754d1d@git.apache.org> In-Reply-To: <95f9600993c243ebbcba5227a05620b4@git.apache.org> References: <95f9600993c243ebbcba5227a05620b4@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [3/8] tez git commit: TEZ-1999. IndexOutOfBoundsException during merge (rbalamohan) TEZ-1999. IndexOutOfBoundsException during merge (rbalamohan) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/ad6bf07e Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/ad6bf07e Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/ad6bf07e Branch: refs/heads/TEZ-2003 Commit: ad6bf07eba9923fca2627503652d16cfceb72d39 Parents: b726869 Author: Rajesh Balamohan Authored: Sat Jan 31 19:53:23 2015 +0530 Committer: Rajesh Balamohan Committed: Sat Jan 31 19:53:23 2015 +0530 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../library/common/sort/impl/TezMerger.java | 29 +- .../library/common/sort/impl/TestTezMerger.java | 518 ++++++++++++++++++- 3 files changed, 525 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/ad6bf07e/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 5c0bec0..03b0624 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -7,6 +7,7 @@ Release 0.7.0: Unreleased INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-1999. IndexOutOfBoundsException during merge. TEZ-2000. Source vertex exists error during DAG submission. TEZ-2008. Add methods to SecureShuffleUtils to verify a reply based on a provided Key. TEZ-1995. Build failure against hadoop 2.2. http://git-wip-us.apache.org/repos/asf/tez/blob/ad6bf07e/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java index ed9a59d..5dd538a 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java @@ -487,13 +487,32 @@ public class TezMerger { return value; } + private void populatePreviousKey() throws IOException { + key.reset(); + BufferUtils.copy(key, prevKey); + } + private void adjustPriorityQueue(Segment reader) throws IOException{ long startPos = reader.getPosition(); - if (hasNext != null && hasNext != KeyState.SAME_KEY) { - key.reset(); - // TODO: This copy can be an unwanted operation when all keys are unique. Revisit this - // when we have better stats. - BufferUtils.copy(key, prevKey); + if (hasNext == null) { + /** + * hasNext can be null during first iteration & prevKey is initialized here. + * In cases of NO_KEY/NEW_KEY, we readjust the queue later. If new segment/file is found + * during this process, we need to compare keys for RLE across segment boundaries. + * prevKey can't be empty at that time (e.g custom comparators) + */ + populatePreviousKey(); + } else { + //indicates a key has been read already + if (hasNext != KeyState.SAME_KEY) { + /** + * Store previous key before reading next for later key comparisons. + * If all keys in a segment are unique, it would always hit this code path and key copies + * are wasteful in such condition, as these comparisons are mainly done for RLE. + * TODO: When better stats are available, this condition can be avoided. + */ + populatePreviousKey(); + } } hasNext = reader.readRawKey(); long endPos = reader.getPosition(); http://git-wip-us.apache.org/repos/asf/tez/blob/ad6bf07e/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestTezMerger.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestTezMerger.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestTezMerger.java index ac17d8d..1e14b9b 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestTezMerger.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestTezMerger.java @@ -18,6 +18,7 @@ package org.apache.tez.runtime.library.common.sort.impl; +import com.google.common.base.Preconditions; import com.google.common.collect.LinkedListMultimap; import com.google.common.collect.ListMultimap; import com.google.common.collect.Lists; @@ -32,23 +33,27 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DataInputBuffer; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.RawComparator; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.WritableComparator; import org.apache.hadoop.util.Progress; import org.apache.hadoop.util.Progressable; import org.apache.tez.common.TezRuntimeFrameworkConfigs; import org.apache.tez.runtime.library.api.TezRuntimeConfiguration; import org.apache.tez.runtime.library.common.ConfigUtils; import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.TestMergeManager; -import org.junit.After; import org.junit.AfterClass; -import org.junit.Assert; -import org.junit.Before; import org.junit.Test; import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Random; +import static org.junit.Assert.assertTrue; + public class TestTezMerger { private static final Log LOG = LogFactory.getLog(TestTezMerger.class); @@ -56,6 +61,11 @@ public class TestTezMerger { private static Configuration defaultConf = new Configuration(); private static FileSystem localFs = null; private static Path workDir = null; + private static RawComparator comparator = null; + private static Random rnd = new Random(); + + private static final String SAME_KEY = "SAME_KEY"; + private static final String DIFF_KEY = "DIFF_KEY"; //store the generated data for final verification private static ListMultimap verificationDataSet = LinkedListMultimap.create(); @@ -76,6 +86,7 @@ public class TestTezMerger { Path baseDir = new Path(workDir, TestMergeManager.class.getName()); String localDirs = baseDir.toString(); defaultConf.setStrings(TezRuntimeFrameworkConfigs.LOCAL_DIRS, localDirs); + comparator = ConfigUtils.getIntermediateInputKeyComparator(defaultConf); } @AfterClass @@ -83,8 +94,7 @@ public class TestTezMerger { localFs.delete(workDir, true); } - - @Test + @Test(timeout = 80000) public void testMerge() throws Exception { /** * test with number of files, keys per file and mergefactor @@ -95,6 +105,7 @@ public class TestTezMerger { merge(100, 0, 5); //small files + merge(12, 4, 2); merge(2, 10, 2); merge(1, 10, 1); merge(5, 10, 3); @@ -105,17 +116,487 @@ public class TestTezMerger { merge(5, 1000, 5); merge(5, 1000, 10); merge(5, 1000, 100); + + //Create random mix of files (empty files + files with keys) + List pathList = new LinkedList(); + pathList.clear(); + pathList.addAll(createIFiles(Math.max(2, rnd.nextInt(20)), 0)); + pathList.addAll(createIFiles(Math.max(2, rnd.nextInt(20)), Math.max(2, rnd.nextInt(10)))); + merge(pathList, Math.max(2, rnd.nextInt(10))); + } + + private Path createIFileWithTextData(List data) throws IOException { + Path path = new Path(workDir + "/src", "data_" + System.nanoTime() + ".out"); + FSDataOutputStream out = localFs.create(path); + IFile.Writer writer = new IFile.Writer(defaultConf, out, Text.class, + Text.class, null, null, null, true); + for (String key : data) { + writer.append(new Text(key), new Text(key + "_" + System.nanoTime())); + } + writer.close(); + out.close(); + return path; + } + + /** + * Verify if the records are as per the expected data set + * + * @param records + * @param expectedResult + * @throws IOException + */ + private void verify(TezRawKeyValueIterator records, String[][] expectedResult) + throws IOException { + //Iterate through merged dataset (shouldn't throw any exceptions) + int i = 0; + while (records.next()) { + DataInputBuffer key = records.getKey(); + DataInputBuffer value = records.getValue(); + + Text k = new Text(); + k.readFields(key); + Text v = new Text(); + v.readFields(value); + + assertTrue(k.toString().equals(expectedResult[i][0])); + + String correctResult = expectedResult[i][1]; + + if (records.isSameKey()) { + assertTrue("Expected " + correctResult, correctResult.equalsIgnoreCase(SAME_KEY)); + LOG.info("\tSame Key : key=" + k + ", val=" + v); + } else { + assertTrue("Expected " + correctResult, correctResult.equalsIgnoreCase(DIFF_KEY)); + LOG.info("key=" + k + ", val=" + v); + } + + i++; + } + } + + @Test(timeout = 5000) + public void testWithCustomComparator_WithEmptyStrings() throws Exception { + List pathList = new LinkedList(); + List data = Lists.newLinkedList(); + //Merge datasets with custom comparator + RawComparator rc = new CustomComparator(); + + LOG.info("Test with custom comparator with empty strings in middle"); + + //Test with 4 files, where some texts are empty strings + data.add("0"); + data.add("0"); + pathList.add(createIFileWithTextData(data)); + + //Second file with empty key + data.clear(); + data.add(""); + pathList.add(createIFileWithTextData(data)); + + //Third file + data.clear(); + data.add("0"); + data.add("0"); + pathList.add(createIFileWithTextData(data)); + + //Third file + data.clear(); + data.add("1"); + data.add("2"); + pathList.add(createIFileWithTextData(data)); + + TezRawKeyValueIterator records = merge(pathList, rc); + + String[][] expectedResult = + { + //formatting intentionally + { "", DIFF_KEY }, + { "0", DIFF_KEY }, + { "0", SAME_KEY }, + { "0", SAME_KEY }, + { "0", SAME_KEY }, + { "1", DIFF_KEY }, + { "2", DIFF_KEY } + }; + + verify(records, expectedResult); + pathList.clear(); + data.clear(); + } + + @Test(timeout = 5000) + public void testWithCustomComparator_No_RLE() throws Exception { + List pathList = new LinkedList(); + List data = Lists.newLinkedList(); + //Merge datasets with custom comparator + RawComparator rc = new CustomComparator(); + + LOG.info("Test with custom comparator with no RLE"); + + //Test with 3 files, + data.add("1"); + data.add("4"); + data.add("5"); + pathList.add(createIFileWithTextData(data)); + + //Second file with empty key + data.clear(); + data.add("2"); + data.add("6"); + data.add("7"); + pathList.add(createIFileWithTextData(data)); + + //Third file + data.clear(); + data.add("3"); + data.add("8"); + data.add("9"); + pathList.add(createIFileWithTextData(data)); + + TezRawKeyValueIterator records = merge(pathList, rc); + + String[][] expectedResult = + { + { "1", DIFF_KEY }, + { "2", DIFF_KEY }, + { "3", DIFF_KEY }, + { "4", DIFF_KEY }, + { "5", DIFF_KEY }, + { "6", DIFF_KEY }, + { "7", DIFF_KEY }, + { "8", DIFF_KEY }, + { "9", DIFF_KEY } + }; + + verify(records, expectedResult); + pathList.clear(); + data.clear(); + } + + @Test(timeout = 5000) + public void testWithCustomComparator_RLE_acrossFiles() throws Exception { + List pathList = new LinkedList(); + List data = Lists.newLinkedList(); + + LOG.info("Test with custom comparator with RLE spanning across segment boundaries"); + + //Test with 2 files, where the RLE keys can span across files + //First file + data.clear(); + data.add("0"); + data.add("0"); + pathList.add(createIFileWithTextData(data)); + + //Second file + data.clear(); + data.add("0"); + data.add("1"); + pathList.add(createIFileWithTextData(data)); + + //Merge datasets with custom comparator + RawComparator rc = new CustomComparator(); + TezRawKeyValueIterator records = merge(pathList, rc); + + //expected result + String[][] expectedResult = + { + //formatting intentionally + { "0", DIFF_KEY }, + { "0", SAME_KEY }, + { "0", SAME_KEY }, + { "1", DIFF_KEY } + }; + + verify(records, expectedResult); + pathList.clear(); + data.clear(); + + } + + @Test(timeout = 5000) + public void testWithCustomComparator_mixedFiles() throws Exception { + List pathList = new LinkedList(); + List data = Lists.newLinkedList(); + + LOG.info("Test with custom comparator with mixed set of segments (empty, non-empty etc)"); + + //Test with 2 files, where the RLE keys can span across files + //First file + data.clear(); + data.add("0"); + pathList.add(createIFileWithTextData(data)); + + //Second file; empty file + data.clear(); + pathList.add(createIFileWithTextData(data)); + + //Third file with empty key + data.clear(); + data.add(""); + pathList.add(createIFileWithTextData(data)); + + //Fourth file with repeated keys + data.clear(); + data.add("0"); + data.add("0"); + data.add("0"); + pathList.add(createIFileWithTextData(data)); + + //Merge datasets with custom comparator + RawComparator rc = new CustomComparator(); + TezRawKeyValueIterator records = merge(pathList, rc); + + //expected result + String[][] expectedResult = + { + //formatting intentionally + { "", DIFF_KEY }, + { "0", DIFF_KEY }, + { "0", SAME_KEY }, + { "0", SAME_KEY }, + { "0", SAME_KEY } + }; + + verify(records, expectedResult); + pathList.clear(); + data.clear(); + } + + @Test(timeout = 5000) + public void testWithCustomComparator_RLE() throws Exception { + List pathList = new LinkedList(); + List data = Lists.newLinkedList(); + + LOG.info("Test with custom comparator 2 files one containing RLE and also other segment " + + "starting with same key"); + + //Test with 2 files, same keys in middle of file + //First file + data.clear(); + data.add("1"); + data.add("2"); + data.add("2"); + pathList.add(createIFileWithTextData(data)); + + //Second file + data.clear(); + data.add("2"); + data.add("3"); + pathList.add(createIFileWithTextData(data)); + + TezRawKeyValueIterator records = merge(pathList, new CustomComparator()); + + String[][] expectedResult = + { + //formatting intentionally + { "1", DIFF_KEY }, + { "2", DIFF_KEY }, + { "2", SAME_KEY }, + { "2", SAME_KEY }, + { "3", DIFF_KEY } + }; + + verify(records, expectedResult); + pathList.clear(); + data.clear(); + } + + @Test(timeout = 5000) + public void testWithCustomComparator_RLE2() throws Exception { + List pathList = new LinkedList(); + List data = Lists.newLinkedList(); + + LOG.info( + "Test with custom comparator 3 files with RLE (starting keys) spanning across boundaries"); + + //Test with 3 files, same keys in middle of file + //First file + data.clear(); + data.add("0"); + data.add("1"); + data.add("1"); + pathList.add(createIFileWithTextData(data)); + + //Second file + data.clear(); + data.add("0"); + data.add("1"); + pathList.add(createIFileWithTextData(data)); + + //Third file + data.clear(); + data.add("0"); + data.add("1"); + data.add("1"); + pathList.add(createIFileWithTextData(data)); + + TezRawKeyValueIterator records = merge(pathList, new CustomComparator()); + String[][] expectedResult = + { + //formatting intentionally + { "0", DIFF_KEY }, + { "0", SAME_KEY }, + { "0", SAME_KEY }, + { "1", DIFF_KEY }, + { "1", SAME_KEY }, + { "1", SAME_KEY }, + { "1", SAME_KEY }, + { "1", SAME_KEY } + + }; + + verify(records, expectedResult); + pathList.clear(); + data.clear(); + } + + @Test(timeout = 5000) + public void testWithCustomComparator() throws Exception { + List pathList = new LinkedList(); + List data = Lists.newLinkedList(); + + LOG.info( + "Test with custom comparator 3 files with RLE (starting keys) spanning across boundaries"); + + //Test with 3 files + //First file + data.clear(); + data.add("0"); + pathList.add(createIFileWithTextData(data)); + + //Second file + data.clear(); + data.add("0"); + pathList.add(createIFileWithTextData(data)); + + //Third file + data.clear(); + data.add("1"); + pathList.add(createIFileWithTextData(data)); + + TezRawKeyValueIterator records = merge(pathList, new CustomComparator()); + String[][] expectedResult = + { + //formatting intentionally + { "0", DIFF_KEY }, + { "0", SAME_KEY }, + { "1", DIFF_KEY } + }; + + verify(records, expectedResult); + pathList.clear(); + data.clear(); + } + + @Test(timeout = 5000) + public void testWithCustomComparator_RLE3() throws Exception { + List pathList = new LinkedList(); + List data = Lists.newLinkedList(); + + LOG.info("Test with custom comparator"); + + //Test with 3 files, same keys in middle of file + //First file + data.clear(); + data.add("0"); + pathList.add(createIFileWithTextData(data)); + + //Second file + data.clear(); + data.add("0"); + data.add("1"); + data.add("1"); + pathList.add(createIFileWithTextData(data)); + + TezRawKeyValueIterator records = merge(pathList, new CustomComparator()); + + String[][] expectedResult = + { + //formatting intentionally + { "0", DIFF_KEY }, + { "0", SAME_KEY }, + { "1", DIFF_KEY }, + { "1", SAME_KEY } }; + + verify(records, expectedResult); + pathList.clear(); + data.clear(); + } + + @Test(timeout = 5000) + public void testWithCustomComparator_allEmptyFiles() throws Exception { + List pathList = new LinkedList(); + List data = Lists.newLinkedList(); + + LOG.info("Test with custom comparator where all files are empty"); + + //First file + pathList.add(createIFileWithTextData(data)); + + //Second file + pathList.add(createIFileWithTextData(data)); + + //Third file + pathList.add(createIFileWithTextData(data)); + + //Fourth file + pathList.add(createIFileWithTextData(data)); + + TezRawKeyValueIterator records = merge(pathList, new CustomComparator()); + + String[][] expectedResult = new String[0][0]; + + verify(records, expectedResult); + } + + /** + * Merge the data sets + * + * @param pathList + * @param rc + * @return + * @throws IOException + */ + private TezRawKeyValueIterator merge(List pathList, RawComparator rc) throws IOException { + TezMerger merger = new TezMerger(); + TezRawKeyValueIterator records = merger.merge(defaultConf, localFs, IntWritable.class, + LongWritable.class, null, false, 0, 1024, pathList.toArray(new Path[pathList.size()]), + true, 4, new Path(workDir, "tmp_" + System.nanoTime()), ((rc == null) ? comparator : rc), + new Reporter(), null, null, + null, new Progress()); + return records; + } + + + + //Sample comparator to test TEZ-1999 corner case + static class CustomComparator extends WritableComparator { + @Override + //Not a valid comparison, but just to check byte boundaries + public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { + Preconditions.checkArgument(l2 > 0 && l1 > 0, "l2=" + l2 + ",l1=" + l1); + ByteBuffer bb1 = ByteBuffer.wrap(b1, s1, l1); + ByteBuffer bb2 = ByteBuffer.wrap(b2, s2, l2); + return bb1.compareTo(bb2); + } + } + + private void merge(List pathList, int mergeFactor) throws Exception { + merge(pathList, mergeFactor, null); } private void merge(int fileCount, int keysPerFile, int mergeFactor) throws Exception { List pathList = createIFiles(fileCount, keysPerFile); + merge(pathList, mergeFactor, null); + } + private void merge(List pathList, int mergeFactor, RawComparator rc) throws Exception { //Merge datasets TezMerger merger = new TezMerger(); TezRawKeyValueIterator records = merger.merge(defaultConf, localFs, IntWritable.class, LongWritable.class, null, false, 0, 1024, pathList.toArray(new Path[pathList.size()]), true, mergeFactor, new Path(workDir, "tmp_" + System.nanoTime()), - ConfigUtils.getIntermediateInputKeyComparator(defaultConf), new Reporter(), null, null, + ((rc == null) ? comparator : rc), new Reporter(), null, null, null, new Progress()); @@ -134,9 +615,9 @@ public class TestTezMerger { if (records.isSameKey()) { LOG.info("\tSame Key : key=" + k.get() + ", val=" + v.get()); //More than one key should be present in the source data - Assert.assertTrue(verificationDataSet.get(k.get()).size() > 1); + assertTrue(verificationDataSet.get(k.get()).size() > 1); //Ensure this is same as the previous key we saw - Assert.assertTrue(pk == k.get()); + assertTrue("previousKey=" + pk + ", current=" + k.get(), pk == k.get()); } else { LOG.info("key=" + k.get() + ", val=" + v.get()); } @@ -147,30 +628,30 @@ public class TestTezMerger { } //Verify if the number of distinct entries is the same in source and the test - Assert.assertTrue("dataMap=" + dataMap.keySet().size() + ", verificationSet=" + - verificationDataSet.keySet().size(), + assertTrue("dataMap=" + dataMap.keySet().size() + ", verificationSet=" + + verificationDataSet.keySet().size(), dataMap.keySet().size() == verificationDataSet.keySet().size()); //Verify with source data for (Integer key : verificationDataSet.keySet()) { - Assert.assertTrue("Data size for " + key + " not matching with source; dataSize:" + dataMap + assertTrue("Data size for " + key + " not matching with source; dataSize:" + dataMap .get(key).intValue() + ", source:" + verificationDataSet.get(key).size(), dataMap.get(key).intValue() == verificationDataSet.get(key).size()); } //Verify if every key has the same number of repeated items in the source dataset as well for (Map.Entry entry : dataMap.entrySet()) { - Assert.assertTrue(entry.getKey() + "", verificationDataSet.get(entry.getKey()).size() == entry + assertTrue(entry.getKey() + "", verificationDataSet.get(entry.getKey()).size() == entry .getValue()); } LOG.info("******************"); + verificationDataSet.clear(); } private List createIFiles(int fileCount, int keysPerFile) throws IOException { List pathList = Lists.newLinkedList(); - verificationDataSet.clear(); Random rnd = new Random(); for (int i = 0; i < fileCount; i++) { int repeatCount = ((i % 2 == 0) && keysPerFile > 0) ? rnd.nextInt(keysPerFile) : 0; @@ -180,8 +661,10 @@ public class TestTezMerger { return pathList; } - static Path writeIFile(int keysPerFile, int repeatCount) throws IOException { + static Path writeIFile(int keysPerFile, int repeatCount) throws + IOException { TreeMultimap dataSet = createDataForIFile(keysPerFile, repeatCount); + LOG.info("DataSet size : " + dataSet.size()); Path path = new Path(workDir + "/src", "data_" + System.nanoTime() + ".out"); FSDataOutputStream out = localFs.create(path); //create IFile with RLE @@ -202,7 +685,7 @@ public class TestTezMerger { /** * Generate data set for ifile. Create repeated keys if needed. * - * @param keyCount approximate number of keys to be created + * @param keyCount approximate number of keys to be created * @param repeatCount number of times a key should be repeated * @return */ @@ -210,9 +693,9 @@ public class TestTezMerger { TreeMultimap dataSet = TreeMultimap.create(); Random rnd = new Random(); for (int i = 0; i < keyCount; i++) { - if (repeatCount > 0 && (rnd.nextInt(keyCount) % 2 == 0)) { + if (repeatCount > 0 && (rnd.nextInt(keyCount) % 2 == 0)) { //repeat this key - for(int j = 0; j < repeatCount; j++) { + for (int j = 0; j < repeatCount; j++) { IntWritable key = new IntWritable(rnd.nextInt(keyCount)); LongWritable value = new LongWritable(System.nanoTime()); dataSet.put(key.get(), value.get()); @@ -234,7 +717,6 @@ public class TestTezMerger { return dataSet; } - private static class Reporter implements Progressable { @Override public void progress() {