hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acmur...@apache.org
Subject svn commit: r1470218 - in /hadoop/common/branches/branch-1: CHANGES.txt src/mapred/org/apache/hadoop/mapred/JobEndNotifier.java src/test/org/apache/hadoop/mapred/TestJobEndNotifier.java
Date Sat, 20 Apr 2013 19:21:02 GMT
Author: acmurthy
Date: Sat Apr 20 19:21:02 2013
New Revision: 1470218

URL: http://svn.apache.org/r1470218
Log:
MAPREDUCE-5066. Added a timeout for the job.end.notification.url. Contributed by Ivan Mitic.

Added:
    hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/TestJobEndNotifier.java
Modified:
    hadoop/common/branches/branch-1/CHANGES.txt
    hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobEndNotifier.java

Modified: hadoop/common/branches/branch-1/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/CHANGES.txt?rev=1470218&r1=1470217&r2=1470218&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/CHANGES.txt (original)
+++ hadoop/common/branches/branch-1/CHANGES.txt Sat Apr 20 19:21:02 2013
@@ -608,6 +608,9 @@ Release 1.2.0 - unreleased
 
     HADOOP-9473. Typo in FileUtil copy() method. (Glen Mazza via suresh)
 
+    MAPREDUCE-5066. Added a timeout for the job.end.notification.url. (Ivan
+    Mitic via acmurthy)
+
 Release 1.1.2 - 2013.01.30
 
   INCOMPATIBLE CHANGES

Modified: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobEndNotifier.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobEndNotifier.java?rev=1470218&r1=1470217&r2=1470218&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobEndNotifier.java
(original)
+++ hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobEndNotifier.java
Sat Apr 20 19:21:02 2013
@@ -39,51 +39,51 @@ public class JobEndNotifier {
   private static volatile boolean running;
   private static BlockingQueue<JobEndStatusInfo> queue =
     new DelayQueue<JobEndStatusInfo>();
+  // Set default timeout to 5 seconds
+  public static final int MAPREDUCE_JOBEND_NOTIFICATION_TIMEOUT_DEFAULT = 5000;
 
   public static void startNotifier() {
     running = true;
     thread = new Thread(
-                        new Runnable() {
-                          public void run() {
-                            try {
-                              while (running) {
-                                sendNotification(queue.take());
-                              }
-                            }
-                            catch (InterruptedException irex) {
-                              if (running) {
-                                LOG.error("Thread has ended unexpectedly", irex);
-                              }
-                            }
-                          }
-
-                          private void sendNotification(JobEndStatusInfo notification) {
-                            try {
-                              int code = httpNotification(notification.getUri());
-                              if (code != 200) {
-                                throw new IOException("Invalid response status code: " +
code);
-                              }
-                            }
-                            catch (IOException ioex) {
-                              LOG.error("Notification failure [" + notification + "]", ioex);
-                              if (notification.configureForRetry()) {
-                                try {
-                                  queue.put(notification);
-                                }
-                                catch (InterruptedException iex) {
-                                  LOG.error("Notification queuing error [" + notification
+ "]",
-                                            iex);
-                                }
-                              }
-                            }
-                            catch (Exception ex) {
-                              LOG.error("Notification failure [" + notification + "]", ex);
-                            }
-                          }
-
-                        }
+        new Runnable() {
+          public void run() {
+            try {
+              while (running) {
+                sendNotification(queue.take());
+              }
+            }
+            catch (InterruptedException irex) {
+              if (running) {
+                LOG.error("Thread has ended unexpectedly", irex);
+              }
+            }
+          }
 
-                        );
+          private void sendNotification(JobEndStatusInfo notification) {
+            try {
+              int code = httpNotification(notification.getUri(),
+                  notification.getTimeout());
+              if (code != 200) {
+                throw new IOException("Invalid response status code: " + code);
+              }
+            }
+            catch (IOException ioex) {
+              LOG.error("Notification failure [" + notification + "]", ioex);
+              if (notification.configureForRetry()) {
+                try {
+                  queue.put(notification);
+                }
+                catch (InterruptedException iex) {
+                  LOG.error("Notification queuing error [" + notification + "]",
+                      iex);
+                }
+              }
+            }
+            catch (Exception ex) {
+              LOG.error("Notification failure [" + notification + "]", ex);
+            }
+          }
+        });
     thread.start();
   }
 
@@ -97,9 +97,10 @@ public class JobEndNotifier {
     JobEndStatusInfo notification = null;
     String uri = conf.getJobEndNotificationURI();
     if (uri != null) {
-      // +1 to make logic for first notification identical to a retry
-      int retryAttempts = conf.getInt("job.end.retry.attempts", 0) + 1;
+      int retryAttempts = conf.getInt("job.end.retry.attempts", 0);
       long retryInterval = conf.getInt("job.end.retry.interval", 30000);
+      int timeout = conf.getInt("mapreduce.job.end-notification.timeout",
+          MAPREDUCE_JOBEND_NOTIFICATION_TIMEOUT_DEFAULT);
       if (uri.contains("$jobId")) {
         uri = uri.replace("$jobId", status.getJobID().toString());
       }
@@ -109,7 +110,8 @@ public class JobEndNotifier {
             (status.getRunState() == JobStatus.FAILED) ? "FAILED" : "KILLED";
         uri = uri.replace("$jobStatus", statusStr);
       }
-      notification = new JobEndStatusInfo(uri, retryAttempts, retryInterval);
+      notification = new JobEndStatusInfo(
+          uri, retryAttempts, retryInterval, timeout);
     }
     return notification;
   }
@@ -126,12 +128,17 @@ public class JobEndNotifier {
     }
   }
 
-  private static int httpNotification(String uri) throws IOException {
+  private static int httpNotification(String uri, int timeout)
+      throws IOException {
     URI url = new URI(uri, false);
-    HttpClient m_client = new HttpClient();
+
+    HttpClient httpClient = new HttpClient();
+    httpClient.getParams().setSoTimeout(timeout);
+    httpClient.getParams().setConnectionManagerTimeout(timeout);
+
     HttpMethod method = new GetMethod(url.getEscapedURI());
     method.setRequestHeader("Accept", "*/*");
-    return m_client.executeMethod(method);
+    return httpClient.executeMethod(method);
   }
 
   // for use by the LocalJobRunner, without using a thread&queue,
@@ -139,9 +146,10 @@ public class JobEndNotifier {
   public static void localRunnerNotification(JobConf conf, JobStatus status) {
     JobEndStatusInfo notification = createNotification(conf, status);
     if (notification != null) {
-      while (notification.configureForRetry()) {
+      do {
         try {
-          int code = httpNotification(notification.getUri());
+          int code = httpNotification(notification.getUri(),
+              notification.getTimeout());
           if (code != 200) {
             throw new IOException("Invalid response status code: " + code);
           }
@@ -157,13 +165,13 @@ public class JobEndNotifier {
         }
         try {
           synchronized (Thread.currentThread()) {
-            Thread.currentThread().sleep(notification.getRetryInterval());
+            Thread.sleep(notification.getRetryInterval());
           }
         }
         catch (InterruptedException iex) {
           LOG.error("Notification retry error [" + notification + "]", iex);
         }
-      }
+      } while (notification.configureForRetry());
     }
   }
 
@@ -172,11 +180,14 @@ public class JobEndNotifier {
     private int retryAttempts;
     private long retryInterval;
     private long delayTime;
+    private int timeout;
 
-    JobEndStatusInfo(String uri, int retryAttempts, long retryInterval) {
+    JobEndStatusInfo(String uri, int retryAttempts, long retryInterval,
+        int timeout) {
       this.uri = uri;
       this.retryAttempts = retryAttempts;
       this.retryInterval = retryInterval;
+      this.timeout = timeout;
       this.delayTime = System.currentTimeMillis();
     }
 
@@ -192,6 +203,10 @@ public class JobEndNotifier {
       return retryInterval;
     }
 
+    public int getTimeout() {
+      return timeout;
+    }
+
     public long getDelayTime() {
       return delayTime;
     }

Added: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/TestJobEndNotifier.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/TestJobEndNotifier.java?rev=1470218&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/TestJobEndNotifier.java
(added)
+++ hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/TestJobEndNotifier.java
Sat Apr 20 19:21:02 2013
@@ -0,0 +1,263 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.PrintStream;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URL;
+
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.http.HttpServer;
+
+public class TestJobEndNotifier extends TestCase {
+  HttpServer server;
+  URL baseUrl;
+
+  @SuppressWarnings("serial")
+  public static class JobEndServlet extends HttpServlet {
+    public static volatile int calledTimes = 0;
+    public static URI requestUri;
+
+    @Override
+    public void doGet(HttpServletRequest request, 
+                      HttpServletResponse response
+                      ) throws ServletException, IOException {
+      InputStreamReader in = new InputStreamReader(request.getInputStream());
+      PrintStream out = new PrintStream(response.getOutputStream());
+
+      calledTimes++;
+      try {
+        requestUri = new URI(null, null,
+            request.getRequestURI(), request.getQueryString(), null);
+      } catch (URISyntaxException e) {
+      }
+
+      in.close();
+      out.close();
+    }
+  }
+
+  // Servlet that delays requests for a long time
+  @SuppressWarnings("serial")
+  public static class DelayServlet extends HttpServlet {
+    public static volatile int calledTimes = 0;
+
+    @Override
+    public void doGet(HttpServletRequest request, 
+                      HttpServletResponse response
+                      ) throws ServletException, IOException {
+      boolean timedOut = false;
+      calledTimes++;
+      try {
+        // Sleep for a long time
+        Thread.sleep(1000000);
+      } catch (InterruptedException e) {
+        timedOut = true;
+      }
+      assertTrue("DelayServlet should be interrupted", timedOut);
+    }
+  }
+
+  // Servlet that fails all requests into it
+  @SuppressWarnings("serial")
+  public static class FailServlet extends HttpServlet {
+    public static volatile int calledTimes = 0;
+
+    @Override
+    public void doGet(HttpServletRequest request, 
+                      HttpServletResponse response
+                      ) throws ServletException, IOException {
+      calledTimes++;
+      throw new IOException("I am failing!");
+    }
+  }
+
+  public void setUp() throws Exception {
+    new File(System.getProperty("build.webapps", "build/webapps") + "/test"
+        ).mkdirs();
+    server = new HttpServer("test", "0.0.0.0", 0, true);
+    server.addServlet("delay", "/delay", DelayServlet.class);
+    server.addServlet("jobend", "/jobend", JobEndServlet.class);
+    server.addServlet("fail", "/fail", FailServlet.class);
+    server.start();
+    int port = server.getPort();
+    baseUrl = new URL("http://localhost:" + port + "/");
+
+    JobEndServlet.calledTimes = 0;
+    JobEndServlet.requestUri = null;
+    DelayServlet.calledTimes = 0;
+    FailServlet.calledTimes = 0;
+  }
+
+  public void tearDown() throws Exception {
+    server.stop();
+  }
+
+  /**
+   * Validate that $jobId and $jobStatus fields are properly substituted
+   * in the output URI
+   */
+  public void testUriSubstitution() throws InterruptedException {
+    try {
+      JobEndNotifier.startNotifier();
+
+      JobStatus jobStatus = createTestJobStatus(
+          "job_20130313155005308_0001", JobStatus.SUCCEEDED);
+      JobConf jobConf = createTestJobConf(
+          new Configuration(), 0,
+          baseUrl + "jobend?jobid=$jobId&status=$jobStatus");
+      JobEndNotifier.registerNotification(jobConf, jobStatus);
+
+      int maxLoop = 100;
+      while (JobEndServlet.calledTimes != 1 && maxLoop-- > 0) {
+        Thread.sleep(100);
+      }
+
+      // Validate params
+      assertEquals(1, JobEndServlet.calledTimes);
+      assertEquals("jobid=job_20130313155005308_0001&status=SUCCEEDED",
+          JobEndServlet.requestUri.getQuery());
+    } finally {
+      JobEndNotifier.stopNotifier();
+    }
+  }
+
+  /**
+   * Validate job.end.retry.attempts logic.
+   */
+  public void testRetryCount() throws InterruptedException {
+    try {
+      JobEndNotifier.startNotifier();
+
+      int retryAttempts = 3;
+      JobStatus jobStatus = createTestJobStatus(
+          "job_20130313155005308_0001", JobStatus.SUCCEEDED);
+      JobConf jobConf = createTestJobConf(
+          new Configuration(), retryAttempts, baseUrl + "fail");
+      JobEndNotifier.registerNotification(jobConf, jobStatus);
+
+      int maxLoop = 100;
+      while (FailServlet.calledTimes != (retryAttempts + 1) && maxLoop-- > 0)
{
+        Thread.sleep(100);
+      }
+
+      // Validate params
+      assertEquals(retryAttempts + 1, FailServlet.calledTimes);
+    } finally {
+      JobEndNotifier.stopNotifier();
+    }
+  }
+
+  /**
+   * Validate that the notification times out after reaching
+   * mapreduce.job.end-notification.timeout.
+   */
+  public void testNotificationTimeout() throws InterruptedException {
+    try {
+      Configuration conf = new Configuration();
+      // Reduce the timeout to 1 second
+      conf.setInt("mapreduce.job.end-notification.timeout", 1000);
+      JobEndNotifier.startNotifier();
+
+      // Submit one notification that will delay infinitely
+      JobStatus jobStatus = createTestJobStatus(
+          "job_20130313155005308_0001", JobStatus.SUCCEEDED);
+      JobConf jobConf = createTestJobConf(
+          conf, 0, baseUrl + "delay");
+      JobEndNotifier.registerNotification(jobConf, jobStatus);
+
+      // Submit another notification that will return promptly
+      jobConf.setJobEndNotificationURI(baseUrl + "jobend");
+      JobEndNotifier.registerNotification(jobConf, jobStatus);
+
+      // Make sure the notification passed thru
+      int maxLoop = 100;
+      while (JobEndServlet.calledTimes != 1 && maxLoop-- > 0) {
+        Thread.sleep(100);
+      }
+      assertEquals("JobEnd notification should have been received by now",
+          1, JobEndServlet.calledTimes);
+      assertEquals(1, DelayServlet.calledTimes);
+      assertEquals("/jobend", JobEndServlet.requestUri.getPath());
+    } finally {
+      JobEndNotifier.stopNotifier();
+    }
+  }
+
+  /**
+   * Basic validation for localRunnerNotification.
+   */
+  public void testLocalJobRunnerUriSubstitution() throws InterruptedException {
+    JobStatus jobStatus = createTestJobStatus(
+        "job_20130313155005308_0001", JobStatus.SUCCEEDED);
+    JobConf jobConf = createTestJobConf(
+        new Configuration(), 0,
+        baseUrl + "jobend?jobid=$jobId&status=$jobStatus");
+    JobEndNotifier.localRunnerNotification(jobConf, jobStatus);
+
+    // No need to wait for the notification to go thru since calls are
+    // synchronous
+
+    // Validate params
+    assertEquals(1, JobEndServlet.calledTimes);
+    assertEquals("jobid=job_20130313155005308_0001&status=SUCCEEDED",
+        JobEndServlet.requestUri.getQuery());
+  }
+
+  /**
+   * Validate job.end.retry.attempts for the localJobRunner.
+   */
+  public void testLocalJobRunnerRetryCount() throws InterruptedException {
+    int retryAttempts = 3;
+    JobStatus jobStatus = createTestJobStatus(
+        "job_20130313155005308_0001", JobStatus.SUCCEEDED);
+    JobConf jobConf = createTestJobConf(
+        new Configuration(), retryAttempts, baseUrl + "fail");
+    JobEndNotifier.localRunnerNotification(jobConf, jobStatus);
+
+    // Validate params
+    assertEquals(retryAttempts + 1, FailServlet.calledTimes);
+  }
+
+  private static JobStatus createTestJobStatus(String jobId, int state) {
+    return new JobStatus(
+        JobID.forName(jobId), 0.5f, 0.0f,
+        state);
+  }
+
+  private static JobConf createTestJobConf(
+      Configuration conf, int retryAttempts, String notificationUri) {
+    JobConf jobConf = new JobConf(conf);
+    jobConf.setInt("job.end.retry.attempts", retryAttempts);
+    jobConf.set("job.end.retry.interval", "0");
+    jobConf.setJobEndNotificationURI(notificationUri);
+    return jobConf;
+  }
+}



Mime
View raw message