tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject tez git commit: TEZ-3308. Add counters to capture input split length. Contributed by Harish Jaiprakash.
Date Tue, 28 Jun 2016 01:30:04 GMT
Repository: tez
Updated Branches:
  refs/heads/master d2b9222fb -> c6a7d76ea


TEZ-3308. Add counters to capture input split length. Contributed by Harish Jaiprakash.


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/c6a7d76e
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/c6a7d76e
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/c6a7d76e

Branch: refs/heads/master
Commit: c6a7d76eab86643cdb7d7537c3d5a4df4eb7387c
Parents: d2b9222
Author: Siddharth Seth <sseth@apache.org>
Authored: Mon Jun 27 18:29:39 2016 -0700
Committer: Siddharth Seth <sseth@apache.org>
Committed: Mon Jun 27 18:29:39 2016 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 +
 .../apache/tez/common/counters/TaskCounter.java |  7 +++-
 .../org/apache/tez/mapreduce/input/MRInput.java | 40 ++++++++++++++++----
 .../tez/mapreduce/input/MultiMRInput.java       | 31 +++++++++++----
 .../apache/tez/mapreduce/input/TestMRInput.java | 11 +++++-
 .../tez/mapreduce/input/TestMultiMRInput.java   | 29 +++++++++-----
 6 files changed, 92 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/c6a7d76e/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 26ff72c..1617c91 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES
 
 ALL CHANGES:
 
+  TEZ-3308. Add counters to capture input split length.
   TEZ-3302. Add a version of processorContext.waitForAllInputsReady and waitForAnyInputReady
with a timeout.
   TEZ-3291. Optimize splits grouping when locality information is not available.
   TEZ-3305. TestAnalyzer fails on Hadoop 2.7.
@@ -67,6 +68,7 @@ INCOMPATIBLE CHANGES
 
 ALL CHANGES:
 
+  TEZ-3308. Add counters to capture input split length.
   TEZ-3302. Add a version of processorContext.waitForAllInputsReady and waitForAnyInputReady
with a timeout.
   TEZ-3305. TestAnalyzer fails on Hadoop 2.7.
   TEZ-3304. TestHistoryParser fails with Hadoop 2.7.

http://git-wip-us.apache.org/repos/asf/tez/blob/c6a7d76e/tez-api/src/main/java/org/apache/tez/common/counters/TaskCounter.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/common/counters/TaskCounter.java b/tez-api/src/main/java/org/apache/tez/common/counters/TaskCounter.java
index 7dcdf8a..2f18bc6 100644
--- a/tez-api/src/main/java/org/apache/tez/common/counters/TaskCounter.java
+++ b/tez-api/src/main/java/org/apache/tez/common/counters/TaskCounter.java
@@ -84,7 +84,12 @@ public enum TaskCounter {
    * 
    */
   INPUT_RECORDS_PROCESSED,
-  
+
+  /**
+   * Number bytes for a task context, currently used by MRInput.
+   */
+  INPUT_SPLIT_LENGTH_BYTES,
+
   // 
   /**
    * Represents the number of actual output records.

http://git-wip-us.apache.org/repos/asf/tez/blob/c6a7d76e/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
index e859058..af4b05c 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitIndex;
 import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
@@ -489,10 +490,16 @@ public class MRInput extends MRInputBase {
         TaskSplitMetaInfo thisTaskMetaInfo = allMetaInfo[getContext().getTaskIndex()];
         TaskSplitIndex splitMetaInfo = new TaskSplitIndex(thisTaskMetaInfo.getSplitLocation(),
             thisTaskMetaInfo.getStartOffset());
+        long splitLength = -1;
         if (useNewApi) {
           org.apache.hadoop.mapreduce.InputSplit newInputSplit = MRInputUtils
               .getNewSplitDetailsFromDisk(splitMetaInfo, jobConf, getContext().getCounters()
                   .findCounter(TaskCounter.SPLIT_RAW_BYTES));
+          try {
+            splitLength = newInputSplit.getLength();
+          } catch (InterruptedException e) {
+            LOG.warn("Got interrupted while reading split length: ", e);
+          }
           mrReader = new MRReaderMapReduce(jobConf, newInputSplit, getContext().getCounters(),
               inputRecordCounter, getContext().getApplicationId().getClusterTimestamp(),
               getContext().getTaskVertexIndex(), getContext().getApplicationId().getId(),
@@ -501,10 +508,15 @@ public class MRInput extends MRInputBase {
           org.apache.hadoop.mapred.InputSplit oldInputSplit = MRInputUtils
               .getOldSplitDetailsFromDisk(splitMetaInfo, jobConf, getContext().getCounters()
                   .findCounter(TaskCounter.SPLIT_RAW_BYTES));
+          splitLength = oldInputSplit.getLength();
           mrReader =
               new MRReaderMapred(jobConf, oldInputSplit, getContext().getCounters(),
                   inputRecordCounter, getContext());
         }
+        if (splitLength != -1) {
+          getContext().getCounters().findCounter(TaskCounter.INPUT_SPLIT_LENGTH_BYTES)
+              .increment(splitLength);
+        }
       }
     } finally {
       rrLock.unlock();
@@ -650,24 +662,36 @@ public class MRInput extends MRInputBase {
     }
     Preconditions.checkState(initEvent != null, "InitEvent must be specified");
     MRSplitProto splitProto = MRSplitProto.parseFrom(ByteString.copyFrom(initEvent.getUserPayload()));
-    Object split = null;
+    Object splitObj = null;
+    long splitLength = -1;
     if (useNewApi) {
-      split = MRInputUtils.getNewSplitDetailsFromEvent(splitProto, jobConf);
+      InputSplit split = MRInputUtils.getNewSplitDetailsFromEvent(splitProto, jobConf);
+      splitObj = split;
+      try {
+        splitLength = split.getLength();
+      } catch (InterruptedException e) {
+        LOG.warn("Thread interrupted while getting split length: ", e);
+      }
       if (LOG.isDebugEnabled()) {
         LOG.debug(getContext().getSourceVertexName() + " split Details -> SplitClass:
" +
-            split.getClass().getName() + ", NewSplit: "
-            + split);
+            split.getClass().getName() + ", NewSplit: " + split + ", length: " + splitLength);
       }
 
     } else {
-      split = MRInputUtils.getOldSplitDetailsFromEvent(splitProto, jobConf);
+      org.apache.hadoop.mapred.InputSplit split =
+          MRInputUtils.getOldSplitDetailsFromEvent(splitProto, jobConf);
+      splitObj = split;
+      splitLength = split.getLength();
       if (LOG.isDebugEnabled()) {
         LOG.debug(getContext().getSourceVertexName() + " split Details -> SplitClass:
" +
-            split.getClass().getName() + ", OldSplit: "
-            + split);
+            split.getClass().getName() + ", OldSplit: " + split + ", length: " + splitLength);
       }
     }
-    mrReader.setSplit(split);
+    if (splitLength != -1) {
+      getContext().getCounters().findCounter(TaskCounter.INPUT_SPLIT_LENGTH_BYTES)
+          .increment(splitLength);
+    }
+    mrReader.setSplit(splitObj);
     LOG.info(getContext().getSourceVertexName() + " initialized RecordReader from event");
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/c6a7d76e/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MultiMRInput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MultiMRInput.java
b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MultiMRInput.java
index 2b60f29..efbeeaa 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MultiMRInput.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MultiMRInput.java
@@ -40,6 +40,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Public;
 import org.apache.hadoop.classification.InterfaceStability.Evolving;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.tez.common.counters.TaskCounter;
 import org.apache.tez.mapreduce.input.base.MRInputBase;
 import org.apache.tez.mapreduce.lib.MRInputUtils;
@@ -168,30 +169,44 @@ public class MultiMRInput extends MRInputBase {
       LOG.debug(getContext().getSourceVertexName() + " initializing Reader: " + eventCount.get());
     }
     MRSplitProto splitProto = MRSplitProto.parseFrom(ByteString.copyFrom(event.getUserPayload()));
-    Object split = null;
     MRReader reader = null;
     JobConf localJobConf = new JobConf(jobConf);
+    long splitLength = -1;
     if (useNewApi) {
-      split = MRInputUtils.getNewSplitDetailsFromEvent(splitProto, localJobConf);
-      reader = new MRReaderMapReduce(localJobConf, (org.apache.hadoop.mapreduce.InputSplit)
split,
+      InputSplit split = MRInputUtils.getNewSplitDetailsFromEvent(splitProto, localJobConf);
+      try {
+        splitLength = split.getLength();
+      } catch (InterruptedException e) {
+        LOG.warn("Got interrupted while reading split length: ", e);
+      }
+      reader = new MRReaderMapReduce(localJobConf, split,
           getContext().getCounters(), inputRecordCounter, getContext().getApplicationId()
           .getClusterTimestamp(), getContext().getTaskVertexIndex(), getContext()
           .getApplicationId().getId(), getContext().getTaskIndex(), getContext()
           .getTaskAttemptNumber(), getContext());
       if (LOG.isDebugEnabled()) {
         LOG.debug(getContext().getSourceVertexName() + " split Details -> SplitClass:
" +
-            split.getClass().getName() + ", NewSplit: " + split);
+            split.getClass().getName() + ", NewSplit: " + split + ", length: " + splitLength);
+      }
+      if (splitLength != -1) {
+        getContext().getCounters().findCounter(TaskCounter.INPUT_SPLIT_LENGTH_BYTES)
+            .increment(splitLength);
       }
-
     } else {
-      split = MRInputUtils.getOldSplitDetailsFromEvent(splitProto, localJobConf);
-      reader = new MRReaderMapred(localJobConf, (org.apache.hadoop.mapred.InputSplit) split,
+      org.apache.hadoop.mapred.InputSplit split =
+          MRInputUtils.getOldSplitDetailsFromEvent(splitProto, localJobConf);
+      splitLength = split.getLength();
+      reader = new MRReaderMapred(localJobConf, split,
           getContext().getCounters(), inputRecordCounter, getContext());
       if (LOG.isDebugEnabled()) {
         LOG.debug(getContext().getSourceVertexName() + " split Details -> SplitClass:
" +
-            split.getClass().getName() + ", OldSplit: " + split);
+            split.getClass().getName() + ", OldSplit: " + split + ", length: " + splitLength);
       }
     }
+    if (splitLength != -1) {
+      getContext().getCounters().findCounter(TaskCounter.INPUT_SPLIT_LENGTH_BYTES)
+          .increment(splitLength);
+    }
     LOG.info(getContext().getSourceVertexName() + " initialized RecordReader from event");
     return reader;
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/c6a7d76e/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMRInput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMRInput.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMRInput.java
index b878416..9109cd9 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMRInput.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMRInput.java
@@ -28,6 +28,7 @@ import java.io.DataOutput;
 import java.io.IOException;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Random;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.hadoop.conf.Configuration;
@@ -38,6 +39,8 @@ import org.apache.hadoop.mapred.RecordReader;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.tez.common.counters.TaskCounter;
+import org.apache.tez.common.counters.TezCounter;
 import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.api.DataSourceDescriptor;
 import org.apache.tez.mapreduce.hadoop.MRInputHelpers;
@@ -138,10 +141,12 @@ public class TestMRInput {
     List<Event> events = new LinkedList<>();
     events.add(diEvent);
     mrInput.handleEvents(events);
+    TezCounter counter = mrInput.getContext().getCounters()
+        .findCounter(TaskCounter.INPUT_SPLIT_LENGTH_BYTES);
+    assertEquals(counter.getValue(), TestInputSplit.length);
     assertTrue(TestInputFormat.invoked.get());
   }
 
-
   /**
    * Test class to verify
    */
@@ -210,9 +215,11 @@ public class TestMRInput {
 
   public static class TestInputSplit implements InputSplit {
 
+    public static long length = Math.abs(new Random().nextLong());
+
     @Override
     public long getLength() throws IOException {
-      return 0;
+      return length;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/tez/blob/c6a7d76e/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMultiMRInput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMultiMRInput.java
b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMultiMRInput.java
index 1733bfc..ab4a5d9 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMultiMRInput.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMultiMRInput.java
@@ -34,6 +34,7 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Random;
 import java.util.UUID;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -48,6 +49,7 @@ import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.SequenceFileInputFormat;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.tez.common.TezUtils;
+import org.apache.tez.common.counters.TaskCounter;
 import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.mapreduce.hadoop.MRInputHelpers;
@@ -141,8 +143,9 @@ public class TestMultiMRInput {
     List<Event> eventList = new ArrayList<Event>();
 
     String file1 = "file1";
+    AtomicLong file1Length = new AtomicLong();
     LinkedHashMap<LongWritable, Text> data1 = createInputData(localFs, workDir, jobConf,
file1, 0,
-        10);
+        10, file1Length);
     SequenceFileInputFormat<LongWritable, Text> format =
         new SequenceFileInputFormat<LongWritable, Text>();
     InputSplit[] splits = format.getSplits(jobConf, 1);
@@ -171,13 +174,16 @@ public class TestMultiMRInput {
         assertEquals(val, data1.remove(key));
       }
       try {
-        boolean hasNext = reader.next(); //should throw exception
+        reader.next(); //should throw exception
         fail();
       } catch(IOException e) {
         assertTrue(e.getMessage().contains("For usage, please refer to"));
       }
     }
     assertEquals(1, readerCount);
+    long counterValue = input.getContext().getCounters()
+        .findCounter(TaskCounter.INPUT_SPLIT_LENGTH_BYTES).getValue();
+    assertEquals(file1Length.get(), counterValue);
   }
 
   @Test(timeout = 5000)
@@ -202,12 +208,14 @@ public class TestMultiMRInput {
     LinkedHashMap<LongWritable, Text> data = new LinkedHashMap<LongWritable, Text>();
 
     String file1 = "file1";
+    AtomicLong file1Length = new AtomicLong();
     LinkedHashMap<LongWritable, Text> data1 = createInputData(localFs, workDir, jobConf,
file1, 0,
-        10);
+        10, file1Length);
 
     String file2 = "file2";
+    AtomicLong file2Length = new AtomicLong();
     LinkedHashMap<LongWritable, Text> data2 = createInputData(localFs, workDir, jobConf,
file2, 10,
-        20);
+        20, file2Length);
 
     data.putAll(data1);
     data.putAll(data2);
@@ -245,12 +253,15 @@ public class TestMultiMRInput {
       }
 
       try {
-        boolean hasNext = reader.next(); //should throw exception
+        reader.next(); //should throw exception
         fail();
       } catch(IOException e) {
         assertTrue(e.getMessage().contains("For usage, please refer to"));
       }
     }
+    long counterValue = input.getContext().getCounters()
+        .findCounter(TaskCounter.INPUT_SPLIT_LENGTH_BYTES).getValue();
+    assertEquals(file1Length.get() + file2Length.get(), counterValue);
     assertEquals(2, readerCount);
   }
 
@@ -273,7 +284,7 @@ public class TestMultiMRInput {
     List<Event> eventList = new ArrayList<Event>();
 
     String file1 = "file1";
-    createInputData(localFs, workDir, jobConf, file1, 0, 10);
+    createInputData(localFs, workDir, jobConf, file1, 0, 10, new AtomicLong());
     SequenceFileInputFormat<LongWritable, Text> format =
         new SequenceFileInputFormat<LongWritable, Text>();
     InputSplit[] splits = format.getSplits(jobConf, 1);
@@ -325,9 +336,8 @@ public class TestMultiMRInput {
   }
 
   public static LinkedHashMap<LongWritable, Text> createInputData(FileSystem fs, Path
workDir,
-                                                                  JobConf job, String filename,
-                                                                  long startKey,
-                                                                  long numKeys) throws IOException
{
+      JobConf job, String filename, long startKey, long numKeys, AtomicLong fileLength)
+          throws IOException {
     LinkedHashMap<LongWritable, Text> data = new LinkedHashMap<LongWritable, Text>();
     Path file = new Path(workDir, filename);
     LOG.info("Generating data at path: " + file);
@@ -346,6 +356,7 @@ public class TestMultiMRInput {
         writer.append(key, value);
         LOG.info("<k, v> : <" + key.get() + ", " + value + ">");
       }
+      fileLength.set(writer.getLength());
     } finally {
       writer.close();
     }


Mime
View raw message