hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cdoug...@apache.org
Subject svn commit: r1131737 - in /hadoop/common/branches/branch-0.20-security: ./ src/mapred/org/apache/hadoop/mapred/ src/test/org/apache/hadoop/mapred/
Date Sun, 05 Jun 2011 05:23:00 GMT
Author: cdouglas
Date: Sun Jun  5 05:22:59 2011
New Revision: 1131737

URL: http://svn.apache.org/viewvc?rev=1131737&view=rev
Log:
MAPREDUCE-2529. Add support for regex-based shuffle metric counting
exceptions. Contributed by Thomas Graves

Added:
    hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/TestShuffleExceptionCount.java
Modified:
    hadoop/common/branches/branch-0.20-security/CHANGES.txt
    hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/ShuffleServerInstrumentation.java
    hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/TaskTracker.java

Modified: hadoop/common/branches/branch-0.20-security/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/CHANGES.txt?rev=1131737&r1=1131736&r2=1131737&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.20-security/CHANGES.txt Sun Jun  5 05:22:59 2011
@@ -56,6 +56,9 @@ Release 0.20.205.0 - unreleased
     MAPREDUCE-2524. Port reduce failure reporting semantics from trunk, to
     fail faulty maps more aggressively. (Thomas Graves via cdouglas)
 
+    MAPREDUCE-2529. Add support for regex-based shuffle metric counting
+    exceptions. (Thomas Graves via cdouglas)
+
 Release 0.20.204.0 - unreleased
 
   NEW FEATURES

Modified: hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/ShuffleServerInstrumentation.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/ShuffleServerInstrumentation.java?rev=1131737&r1=1131736&r2=1131737&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/ShuffleServerInstrumentation.java
(original)
+++ hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/ShuffleServerInstrumentation.java
Sun Jun  5 05:22:59 2011
@@ -37,6 +37,8 @@ class ShuffleServerInstrumentation imple
       registry.newCounter("shuffle_failed_outputs", "", 0);
   final MetricMutableCounterInt successOutputs =
       registry.newCounter("shuffle_success_outputs", "", 0);
+  final MetricMutableCounterInt exceptionsCaught =
+    registry.newCounter("shuffle_exceptions_caught", "", 0);
 
   ShuffleServerInstrumentation(TaskTracker tt) {
     ttWorkerThreads = tt.workerThreads;
@@ -69,6 +71,12 @@ class ShuffleServerInstrumentation imple
     successOutputs.incr();
   }
 
+  //@Override
+  void exceptionsCaught() {
+    exceptionsCaught.incr();
+  }
+
+
   @Override
   public void getMetrics(MetricsBuilder builder, boolean all) {
     MetricsRecordBuilder rb = builder.addRecord(registry.name());

Modified: hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/TaskTracker.java?rev=1131737&r1=1131736&r2=1131737&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
(original)
+++ hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
Sun Jun  5 05:22:59 2011
@@ -1468,6 +1468,15 @@ public class TaskTracker implements MRCo
     server.setAttribute("log", LOG);
     server.setAttribute("localDirAllocator", localDirAllocator);
     server.setAttribute("shuffleServerMetrics", shuffleServerMetrics);
+
+    String exceptionStackRegex =
+      conf.get("mapreduce.reduce.shuffle.catch.exception.stack.regex");
+    String exceptionMsgRegex =
+      conf.get("mapreduce.reduce.shuffle.catch.exception.message.regex");
+
+    server.setAttribute("exceptionStackRegex", exceptionStackRegex);
+    server.setAttribute("exceptionMsgRegex", exceptionMsgRegex);
+
     server.addInternalServlet("mapOutput", "/mapOutput", MapOutputServlet.class);
     server.addServlet("taskLog", "/tasklog", TaskLogServlet.class);
     server.start();
@@ -3682,6 +3691,10 @@ public class TaskTracker implements MRCo
         (ShuffleServerInstrumentation) context.getAttribute("shuffleServerMetrics");
       TaskTracker tracker = 
         (TaskTracker) context.getAttribute("task.tracker");
+      String exceptionStackRegex =
+        (String) context.getAttribute("exceptionStackRegex");
+      String exceptionMsgRegex =
+        (String) context.getAttribute("exceptionMsgRegex");
 
       verifyRequest(request, response, tracker, jobId);
 
@@ -3787,12 +3800,14 @@ public class TaskTracker implements MRCo
                  " from map: " + mapId + " given " + info.partLength + "/" + 
                  info.rawLength);
         }
+
       } catch (IOException ie) {
         Log log = (Log) context.getAttribute("log");
         String errorMsg = ("getMapOutput(" + mapId + "," + reduceId + 
                            ") failed :\n"+
                            StringUtils.stringifyException(ie));
         log.warn(errorMsg);
+        checkException(ie, exceptionMsgRegex, exceptionStackRegex, shuffleMetrics);
         if (isInputException) {
           tracker.mapOutputLost(TaskAttemptID.forName(mapId), errorMsg);
         }
@@ -3816,6 +3831,38 @@ public class TaskTracker implements MRCo
       shuffleMetrics.successOutput();
     }
     
+    protected void checkException(IOException ie, String exceptionMsgRegex,
+        String exceptionStackRegex, ShuffleServerInstrumentation shuffleMetrics) {
+      // parse exception to see if it looks like a regular expression you
+      // configure. If both msgRegex and StackRegex set then make sure both
+      // match, otherwise only the one set has to match.
+      if (exceptionMsgRegex != null) {
+        String msg = ie.getMessage();
+        if (msg == null || !msg.matches(exceptionMsgRegex)) {
+          return;
+        }
+      }
+      if (exceptionStackRegex != null
+          && !checkStackException(ie, exceptionStackRegex)) {
+        return;
+      }
+      shuffleMetrics.exceptionsCaught();
+    }
+
+    private boolean checkStackException(IOException ie,
+        String exceptionStackRegex) {
+      StackTraceElement[] stack = ie.getStackTrace();
+
+      for (StackTraceElement elem : stack) {
+        String stacktrace = elem.toString();
+        if (stacktrace.matches(exceptionStackRegex)) {
+          return true;
+        }
+      }
+      return false;
+    }
+
+
     /**
      * verify that request has correct HASH for the url
      * and also add a field to reply header with hash of the HASH

Added: hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/TestShuffleExceptionCount.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/TestShuffleExceptionCount.java?rev=1131737&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/TestShuffleExceptionCount.java
(added)
+++ hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/TestShuffleExceptionCount.java
Sun Jun  5 05:22:59 2011
@@ -0,0 +1,177 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import org.apache.hadoop.metrics2.MetricsRecordBuilder;
+import static org.apache.hadoop.test.MetricsAsserts.*;
+
+import org.junit.Test;
+
+public class TestShuffleExceptionCount {
+
+  public static class TestMapOutputServlet extends TaskTracker.MapOutputServlet {
+
+    public void checkException(IOException ie, String exceptionMsgRegex,
+        String exceptionStackRegex, ShuffleServerInstrumentation shuffleMetrics) {
+      super.checkException(ie, exceptionMsgRegex, exceptionStackRegex,
+          shuffleMetrics);
+    }
+
+  }
+
+  @Test
+  public void testCheckException() throws IOException, InterruptedException {
+    TestMapOutputServlet testServlet = new TestMapOutputServlet();
+    JobConf conf = new JobConf();
+    conf.setUser("testuser");
+    conf.setJobName("testJob");
+    conf.setSessionId("testSession");
+
+    TaskTracker tt = new TaskTracker();
+    tt.setConf(conf);
+    ShuffleServerInstrumentation shuffleMetrics =
+      ShuffleServerInstrumentation.create(tt);
+
+    // first test with only MsgRegex set but doesn't match
+    String exceptionMsgRegex = "Broken pipe";
+    String exceptionStackRegex = null;
+    IOException ie = new IOException("EOFException");
+    testServlet.checkException(ie, exceptionMsgRegex, exceptionStackRegex,
+        shuffleMetrics);
+    MetricsRecordBuilder rb = getMetrics(shuffleMetrics);
+    assertCounter("shuffle_exceptions_caught", 0, rb);
+
+    // test with only MsgRegex set that does match
+    ie = new IOException("Broken pipe");
+    exceptionStackRegex = null;
+    testServlet.checkException(ie, exceptionMsgRegex, exceptionStackRegex,
+        shuffleMetrics);
+    rb = getMetrics(shuffleMetrics);
+    assertCounter("shuffle_exceptions_caught", 1, rb);
+
+    // test with neither set, make sure incremented
+    exceptionMsgRegex = null;
+    exceptionStackRegex = null;
+    testServlet.checkException(ie, exceptionMsgRegex, exceptionStackRegex,
+        shuffleMetrics);
+    rb = getMetrics(shuffleMetrics);
+    assertCounter("shuffle_exceptions_caught", 2, rb);
+
+    // test with only StackRegex set doesn't match
+    exceptionMsgRegex = null;
+    exceptionStackRegex = ".*\\.doesnt\\$SelectSet\\.wakeup.*";
+    ie.setStackTrace(constructStackTrace());
+    testServlet.checkException(ie, exceptionMsgRegex, exceptionStackRegex,
+        shuffleMetrics);
+    rb = getMetrics(shuffleMetrics);
+    assertCounter("shuffle_exceptions_caught", 2, rb);
+
+    // test with only StackRegex set does match
+    exceptionMsgRegex = null;
+    exceptionStackRegex = ".*\\.SelectorManager\\$SelectSet\\.wakeup.*";
+    testServlet.checkException(ie, exceptionMsgRegex, exceptionStackRegex,
+        shuffleMetrics);
+    rb = getMetrics(shuffleMetrics);
+    assertCounter("shuffle_exceptions_caught", 3, rb);
+
+    // test with both regex set and matches
+    exceptionMsgRegex = "Broken pipe";
+    ie.setStackTrace(constructStackTraceTwo());
+    testServlet.checkException(ie, exceptionMsgRegex, exceptionStackRegex,
+        shuffleMetrics);
+    rb = getMetrics(shuffleMetrics);
+    assertCounter("shuffle_exceptions_caught", 4, rb);
+
+    // test with both regex set and only msg matches
+    exceptionStackRegex = ".*[1-9]+BOGUSREGEX";
+    testServlet.checkException(ie, exceptionMsgRegex, exceptionStackRegex,
+        shuffleMetrics);
+    rb = getMetrics(shuffleMetrics);
+    assertCounter("shuffle_exceptions_caught", 4, rb);
+
+    // test with both regex set and only stack matches
+    exceptionStackRegex = ".*\\.SelectorManager\\$SelectSet\\.wakeup.*";
+    exceptionMsgRegex = "EOFException";
+    testServlet.checkException(ie, exceptionMsgRegex, exceptionStackRegex,
+        shuffleMetrics);
+    rb = getMetrics(shuffleMetrics);
+    assertCounter("shuffle_exceptions_caught", 4, rb);
+
+  }
+
+  /*
+   * Construction exception like:
+   * java.io.IOException: Broken pipe at
+   * sun.nio.ch.EPollArrayWrapper.interrupt(Native Method) at
+   * sun.nio.ch.EPollArrayWrapper.interrupt(EPollArrayWrapper.java:256) at
+   * sun.nio.ch.EPollSelectorImpl.wakeup(EPollSelectorImpl.java:175) at
+   * org.mortbay.io.nio.SelectorManager$SelectSet.wakeup(SelectorManager.java:831) at
+   * org.mortbay.io.nio.SelectorManager$SelectSet.doSelect(SelectorManager.java:709) at
+   * org.mortbay.io.nio.SelectorManager.doSelect(SelectorManager.java:192) at
+   * org.mortbay.jetty.nio.SelectChannelConnector.accept(SelectChannelConnector.java:124)
at
+   * org.mortbay.jetty.AbstractConnector$Acceptor.run(AbstractConnector.java:708) at
+   * org.mortbay.thread.QueuedThreadPool$PoolThread.run(QueuedThreadPool.java:582)
+   */
+  private StackTraceElement[] constructStackTrace() {
+    StackTraceElement[] stack = new StackTraceElement[9];
+    stack[0] = new StackTraceElement("sun.nio.ch.EPollArrayWrapper", "interrupt", "", -2);
+    stack[1] = new StackTraceElement("sun.nio.ch.EPollArrayWrapper", "interrupt", "EPollArrayWrapper.java",
256);
+    stack[2] = new StackTraceElement("sun.nio.ch.EPollSelectorImpl", "wakeup", "EPollSelectorImpl.java",
175);
+    stack[3] = new StackTraceElement("org.mortbay.io.nio.SelectorManager$SelectSet", "wakeup",
"SelectorManager.java", 831);
+    stack[4] = new StackTraceElement("org.mortbay.io.nio.SelectorManager$SelectSet", "doSelect",
"SelectorManager.java", 709);
+    stack[5] = new StackTraceElement("org.mortbay.io.nio.SelectorManager", "doSelect", "SelectorManager.java",
192);
+    stack[6] = new StackTraceElement("org.mortbay.jetty.nio.SelectChannelConnector", "accept",
"SelectChannelConnector.java", 124);
+    stack[7] = new StackTraceElement("org.mortbay.jetty.AbstractConnector$Acceptor", "run",
"AbstractConnector.java", 708);
+    stack[8] = new StackTraceElement("org.mortbay.thread.QueuedThreadPool$PoolThread", "run",
"QueuedThreadPool.java", 582);
+
+    return stack;
+  }
+
+  /*
+   * java.io.IOException: Broken pipe at
+   * sun.nio.ch.EPollArrayWrapper.interrupt(Native Method) at
+   * sun.nio.ch.EPollArrayWrapper.interrupt(EPollArrayWrapper.java:256) at
+   * sun.nio.ch.EPollSelectorImpl.wakeup(EPollSelectorImpl.java:175) at
+   * org.mortbay.io.nio.SelectorManager$SelectSet.wakeup(SelectorManager.java:831) at
+   * org.mortbay.io.nio.SelectChannelEndPoint.updateKey(SelectChannelEndPoint.java:335) at
+   * org.mortbay.io.nio.SelectChannelEndPoint.blockWritable(SelectChannelEndPoint.java:278)
at
+   * org.mortbay.jetty.AbstractGenerator$Output.blockForOutput(AbstractGenerator.java:545)
at
+   * org.mortbay.jetty.AbstractGenerator$Output.flush(AbstractGenerator.java:572) at
+   * org.mortbay.jetty.HttpConnection$Output.flush(HttpConnection.java:1012) at
+   * org.mortbay.jetty.AbstractGenerator$Output.write(AbstractGenerator.java:651)at
+   * org.mortbay.jetty.AbstractGenerator$Output.write(AbstractGenerator.java:580) at
+   */
+  private StackTraceElement[] constructStackTraceTwo() {
+    StackTraceElement[] stack = new StackTraceElement[11];
+    stack[0] = new StackTraceElement("sun.nio.ch.EPollArrayWrapper", "interrupt", "", -2);
+    stack[1] = new StackTraceElement("sun.nio.ch.EPollArrayWrapper", "interrupt", "EPollArrayWrapper.java",
256);
+    stack[2] = new StackTraceElement("sun.nio.ch.EPollSelectorImpl", "wakeup", "EPollSelectorImpl.java",
175);
+    stack[3] = new StackTraceElement("org.mortbay.io.nio.SelectorManager$SelectSet", "wakeup",
"SelectorManager.java", 831);
+    stack[4] = new StackTraceElement("org.mortbay.io.nio.SelectChannelEndPoint", "updateKey",
"SelectChannelEndPoint.java", 335);
+    stack[5] = new StackTraceElement("org.mortbay.io.nio.SelectChannelEndPoint", "blockWritable",
"SelectChannelEndPoint.java", 278);
+    stack[6] = new StackTraceElement("org.mortbay.jetty.AbstractGenerator$Output", "blockForOutput",
"AbstractGenerator.java", 545);
+    stack[7] = new StackTraceElement("org.mortbay.jetty.AbstractGenerator$Output", "flush",
"AbstractGenerator.java", 572);
+    stack[8] = new StackTraceElement("org.mortbay.jetty.HttpConnection$Output", "flush",
"HttpConnection.java", 1012);
+    stack[9] = new StackTraceElement("org.mortbay.jetty.AbstractGenerator$Output", "write",
"AbstractGenerator.java", 651);
+    stack[10] = new StackTraceElement("org.mortbay.jetty.AbstractGenerator$Output", "write",
"AbstractGenerator.java", 580);
+
+    return stack;
+  }
+
+}



Mime
View raw message