Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id AE248200B44 for ; Thu, 30 Jun 2016 01:49:28 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 9106A160A6E; Wed, 29 Jun 2016 23:49:28 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 8F04E160A57 for ; Thu, 30 Jun 2016 01:49:27 +0200 (CEST) Received: (qmail 42623 invoked by uid 500); 29 Jun 2016 23:49:26 -0000 Mailing-List: contact commits-help@tez.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@tez.apache.org Delivered-To: mailing list commits@tez.apache.org Received: (qmail 42610 invoked by uid 99); 29 Jun 2016 23:49:26 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 29 Jun 2016 23:49:26 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id A3EB0E5CE1; Wed, 29 Jun 2016 23:49:26 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: rbalamohan@apache.org To: commits@tez.apache.org Message-Id: <0ae31c8d5aa9497da337aaa1f4160747@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: tez git commit: TEZ-3314. Double counting input bytes in MultiMRInput (Contributed by Harish JP via rbalamohan) Date: Wed, 29 Jun 2016 23:49:26 +0000 (UTC) archived-at: Wed, 29 Jun 2016 23:49:28 -0000 Repository: tez Updated Branches: refs/heads/master c6a7d76ea -> 540eab018 TEZ-3314. Double counting input bytes in MultiMRInput (Contributed by Harish JP via rbalamohan) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/540eab01 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/540eab01 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/540eab01 Branch: refs/heads/master Commit: 540eab0183787b434cbe95154bf2d1884dfaf46d Parents: c6a7d76 Author: Rajesh Balamohan Authored: Wed Jun 29 16:49:14 2016 -0700 Committer: Rajesh Balamohan Committed: Wed Jun 29 16:49:14 2016 -0700 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../tez/mapreduce/input/MultiMRInput.java | 4 - .../tez/mapreduce/input/TestMultiMRInput.java | 163 ++++++++++--------- 3 files changed, 84 insertions(+), 84 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/540eab01/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 1617c91..deb02fc 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-3314. Double counting input bytes in MultiMRInput. TEZ-3308. Add counters to capture input split length. TEZ-3302. Add a version of processorContext.waitForAllInputsReady and waitForAnyInputReady with a timeout. TEZ-3291. Optimize splits grouping when locality information is not available. http://git-wip-us.apache.org/repos/asf/tez/blob/540eab01/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MultiMRInput.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MultiMRInput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MultiMRInput.java index efbeeaa..de54b0d 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MultiMRInput.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MultiMRInput.java @@ -188,10 +188,6 @@ public class MultiMRInput extends MRInputBase { LOG.debug(getContext().getSourceVertexName() + " split Details -> SplitClass: " + split.getClass().getName() + ", NewSplit: " + split + ", length: " + splitLength); } - if (splitLength != -1) { - getContext().getCounters().findCounter(TaskCounter.INPUT_SPLIT_LENGTH_BYTES) - .increment(splitLength); - } } else { org.apache.hadoop.mapred.InputSplit split = MRInputUtils.getOldSplitDetailsFromEvent(splitProto, localJobConf); http://git-wip-us.apache.org/repos/asf/tez/blob/540eab01/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMultiMRInput.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMultiMRInput.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMultiMRInput.java index ab4a5d9..8d77a05 100644 --- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMultiMRInput.java +++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMultiMRInput.java @@ -29,6 +29,7 @@ import static org.mockito.Mockito.verify; import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.LinkedHashMap; import java.util.LinkedList; import java.util.List; @@ -38,15 +39,18 @@ import java.util.concurrent.atomic.AtomicLong; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.serializer.SerializationFactory; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.SequenceFileInputFormat; +import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.tez.common.TezUtils; import org.apache.tez.common.counters.TaskCounter; @@ -98,13 +102,7 @@ public class TestMultiMRInput { jobConf.setInputFormat(org.apache.hadoop.mapred.SequenceFileInputFormat.class); FileInputFormat.setInputPaths(jobConf, workDir); - MRInputUserPayloadProto.Builder builder = MRInputUserPayloadProto.newBuilder(); - builder.setGroupingEnabled(false); - builder.setConfigurationBytes(TezUtils.createByteStringFromConf(jobConf)); - byte[] payload = builder.build().toByteArray(); - - InputContext inputContext = createTezInputContext(payload); - + InputContext inputContext = createTezInputContext(jobConf); MultiMRInput mMrInput = new MultiMRInput(inputContext, 0); @@ -131,21 +129,14 @@ public class TestMultiMRInput { jobConf.setInputFormat(org.apache.hadoop.mapred.SequenceFileInputFormat.class); FileInputFormat.setInputPaths(jobConf, workDir); - MRInputUserPayloadProto.Builder builder = MRInputUserPayloadProto.newBuilder(); - builder.setGroupingEnabled(false); - builder.setConfigurationBytes(TezUtils.createByteStringFromConf(jobConf)); - byte[] payload = builder.build().toByteArray(); - - InputContext inputContext = createTezInputContext(payload); + InputContext inputContext = createTezInputContext(jobConf); MultiMRInput input = new MultiMRInput(inputContext, 1); input.initialize(); - List eventList = new ArrayList(); - String file1 = "file1"; - AtomicLong file1Length = new AtomicLong(); - LinkedHashMap data1 = createInputData(localFs, workDir, jobConf, file1, 0, - 10, file1Length); + AtomicLong inputLength = new AtomicLong(); + LinkedHashMap data = createSplits(1, workDir, jobConf, inputLength); + SequenceFileInputFormat format = new SequenceFileInputFormat(); InputSplit[] splits = format.getSplits(jobConf, 1); @@ -156,34 +147,47 @@ public class TestMultiMRInput { InputDataInformationEvent.createWithSerializedPayload(0, splitProto.toByteString().asReadOnlyByteBuffer()); - eventList.clear(); + List eventList = new ArrayList(); eventList.add(event); input.handleEvents(eventList); - int readerCount = 0; - int recordCount = 0; - for (KeyValueReader reader : input.getKeyValueReaders()) { - readerCount++; - while (reader.next()) { - verify(inputContext, times(++recordCount) ).notifyProgress(); - if (data1.size() == 0) { - fail("Found more records than expected"); - } - Object key = reader.getCurrentKey(); - Object val = reader.getCurrentValue(); - assertEquals(val, data1.remove(key)); - } - try { - reader.next(); //should throw exception - fail(); - } catch(IOException e) { - assertTrue(e.getMessage().contains("For usage, please refer to")); - } - } - assertEquals(1, readerCount); - long counterValue = input.getContext().getCounters() - .findCounter(TaskCounter.INPUT_SPLIT_LENGTH_BYTES).getValue(); - assertEquals(file1Length.get(), counterValue); + assertReaders(input, data, 1, inputLength.get()); + } + + @Test + public void testNewFormatSplits() throws Exception { + Path workDir = new Path(TEST_ROOT_DIR, "testNewFormatSplits"); + Job job = Job.getInstance(defaultConf); + job.setInputFormatClass(org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat.class); + org.apache.hadoop.mapreduce.lib.input.FileInputFormat.setInputPaths(job, workDir); + Configuration conf = job.getConfiguration(); + conf.setBoolean("mapred.mapper.new-api", true); + + // Create sequence file. + AtomicLong inputLength = new AtomicLong(); + LinkedHashMap data = createSplits(1, workDir, conf, inputLength); + + // Get split information. + org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat format = + new org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat<>(); + List splits = format.getSplits(job); + assertEquals(1, splits.size()); + + // Create the event. + MRSplitProto splitProto = + MRInputHelpers.createSplitProto(splits.get(0), new SerializationFactory(conf)); + InputDataInformationEvent event = InputDataInformationEvent.createWithSerializedPayload(0, + splitProto.toByteString().asReadOnlyByteBuffer()); + + // Create input context. + InputContext inputContext = createTezInputContext(conf); + + // Create the MR input object and process the event + MultiMRInput input = new MultiMRInput(inputContext, 1); + input.initialize(); + input.handleEvents(Collections.singletonList(event)); + + assertReaders(input, data, 1, inputLength.get()); } @Test(timeout = 5000) @@ -194,31 +198,13 @@ public class TestMultiMRInput { jobConf.setInputFormat(org.apache.hadoop.mapred.SequenceFileInputFormat.class); FileInputFormat.setInputPaths(jobConf, workDir); - MRInputUserPayloadProto.Builder builder = MRInputUserPayloadProto.newBuilder(); - builder.setGroupingEnabled(false); - builder.setConfigurationBytes(TezUtils.createByteStringFromConf(jobConf)); - byte[] payload = builder.build().toByteArray(); - - InputContext inputContext = createTezInputContext(payload); + InputContext inputContext = createTezInputContext(jobConf); MultiMRInput input = new MultiMRInput(inputContext, 2); input.initialize(); - List eventList = new ArrayList(); - LinkedHashMap data = new LinkedHashMap(); - - String file1 = "file1"; - AtomicLong file1Length = new AtomicLong(); - LinkedHashMap data1 = createInputData(localFs, workDir, jobConf, file1, 0, - 10, file1Length); - - String file2 = "file2"; - AtomicLong file2Length = new AtomicLong(); - LinkedHashMap data2 = createInputData(localFs, workDir, jobConf, file2, 10, - 20, file2Length); - - data.putAll(data1); - data.putAll(data2); + AtomicLong inputLength = new AtomicLong(); + LinkedHashMap data = createSplits(2, workDir, jobConf, inputLength); SequenceFileInputFormat format = new SequenceFileInputFormat(); @@ -235,15 +221,22 @@ public class TestMultiMRInput { InputDataInformationEvent.createWithSerializedPayload(0, splitProto2.toByteString().asReadOnlyByteBuffer()); - eventList.clear(); + List eventList = new ArrayList(); eventList.add(event1); eventList.add(event2); input.handleEvents(eventList); + assertReaders(input, data, 2, inputLength.get()); + } + + private void assertReaders(MultiMRInput input, LinkedHashMap data, + int expectedReaderCounts, long inputBytes) throws Exception { int readerCount = 0; + int recordCount = 0; for (KeyValueReader reader : input.getKeyValueReaders()) { readerCount++; while (reader.next()) { + verify(input.getContext(), times(++recordCount + readerCount - 1)).notifyProgress(); if (data.size() == 0) { fail("Found more records than expected"); } @@ -261,8 +254,8 @@ public class TestMultiMRInput { } long counterValue = input.getContext().getCounters() .findCounter(TaskCounter.INPUT_SPLIT_LENGTH_BYTES).getValue(); - assertEquals(file1Length.get() + file2Length.get(), counterValue); - assertEquals(2, readerCount); + assertEquals(inputBytes, counterValue); + assertEquals(expectedReaderCounts, readerCount); } @Test(timeout = 5000) @@ -272,19 +265,13 @@ public class TestMultiMRInput { jobConf.setInputFormat(org.apache.hadoop.mapred.SequenceFileInputFormat.class); FileInputFormat.setInputPaths(jobConf, workDir); - MRInputUserPayloadProto.Builder builder = MRInputUserPayloadProto.newBuilder(); - builder.setGroupingEnabled(false); - builder.setConfigurationBytes(TezUtils.createByteStringFromConf(jobConf)); - byte[] payload = builder.build().toByteArray(); - - InputContext inputContext = createTezInputContext(payload); + InputContext inputContext = createTezInputContext(jobConf); MultiMRInput input = new MultiMRInput(inputContext, 1); input.initialize(); - List eventList = new ArrayList(); - String file1 = "file1"; - createInputData(localFs, workDir, jobConf, file1, 0, 10, new AtomicLong()); + createSplits(1, workDir, jobConf, new AtomicLong()); + SequenceFileInputFormat format = new SequenceFileInputFormat(); InputSplit[] splits = format.getSplits(jobConf, 1); @@ -298,7 +285,7 @@ public class TestMultiMRInput { InputDataInformationEvent.createWithSerializedPayload(1, splitProto.toByteString().asReadOnlyByteBuffer()); - eventList.clear(); + List eventList = new ArrayList(); eventList.add(event1); eventList.add(event2); try { @@ -310,7 +297,23 @@ public class TestMultiMRInput { } } - private InputContext createTezInputContext(byte[] payload) { + private LinkedHashMap createSplits(int splitCount, Path workDir, + Configuration conf, AtomicLong totalSize) throws Exception { + LinkedHashMap data = new LinkedHashMap(); + for (int i = 0; i < splitCount; ++i) { + int start = i * 10; + int end = start + 10; + data.putAll(createInputData(localFs, workDir, conf, "file" + i, start, end, totalSize)); + } + return data; + } + + private InputContext createTezInputContext(Configuration conf) throws Exception { + MRInputUserPayloadProto.Builder builder = MRInputUserPayloadProto.newBuilder(); + builder.setGroupingEnabled(false); + builder.setConfigurationBytes(TezUtils.createByteStringFromConf(conf)); + byte[] payload = builder.build().toByteArray(); + ApplicationId applicationId = ApplicationId.newInstance(10000, 1); TezCounters counters = new TezCounters(); @@ -336,7 +339,7 @@ public class TestMultiMRInput { } public static LinkedHashMap createInputData(FileSystem fs, Path workDir, - JobConf job, String filename, long startKey, long numKeys, AtomicLong fileLength) + Configuration job, String filename, long startKey, long numKeys, AtomicLong fileLength) throws IOException { LinkedHashMap data = new LinkedHashMap(); Path file = new Path(workDir, filename); @@ -356,7 +359,7 @@ public class TestMultiMRInput { writer.append(key, value); LOG.info(" : <" + key.get() + ", " + value + ">"); } - fileLength.set(writer.getLength()); + fileLength.addAndGet(writer.getLength()); } finally { writer.close(); }