hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acmur...@apache.org
Subject svn commit: r1470219 - in /hadoop/common/branches/branch-1.2: CHANGES.txt src/mapred/org/apache/hadoop/mapred/JobEndNotifier.java src/test/org/apache/hadoop/mapred/TestJobEndNotifier.java
Date Sat, 20 Apr 2013 19:24:05 GMT
Author: acmurthy
Date: Sat Apr 20 19:24:05 2013
New Revision: 1470219

URL: http://svn.apache.org/r1470219
Log:
Merge -c 1470218 from branch-1 to branch-1.2 to fix MAPREDUCE-5066. Added a timeout for the
job.end.notification.url. Contributed by Ivan Mitic.

Added:
    hadoop/common/branches/branch-1.2/src/test/org/apache/hadoop/mapred/TestJobEndNotifier.java
      - copied unchanged from r1470218, hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/TestJobEndNotifier.java
Modified:
    hadoop/common/branches/branch-1.2/CHANGES.txt
    hadoop/common/branches/branch-1.2/src/mapred/org/apache/hadoop/mapred/JobEndNotifier.java

Modified: hadoop/common/branches/branch-1.2/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.2/CHANGES.txt?rev=1470219&r1=1470218&r2=1470219&view=diff
==============================================================================
--- hadoop/common/branches/branch-1.2/CHANGES.txt (original)
+++ hadoop/common/branches/branch-1.2/CHANGES.txt Sat Apr 20 19:24:05 2013
@@ -565,6 +565,9 @@ Release 1.2.0 - 2013.04.16
 
     HADOOP-9473. Typo in FileUtil copy() method. (Glen Mazza via suresh)
 
+    MAPREDUCE-5066. Added a timeout for the job.end.notification.url. (Ivan
+    Mitic via acmurthy)
+
 Release 1.1.2 - 2013.01.30
 
   INCOMPATIBLE CHANGES

Modified: hadoop/common/branches/branch-1.2/src/mapred/org/apache/hadoop/mapred/JobEndNotifier.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.2/src/mapred/org/apache/hadoop/mapred/JobEndNotifier.java?rev=1470219&r1=1470218&r2=1470219&view=diff
==============================================================================
--- hadoop/common/branches/branch-1.2/src/mapred/org/apache/hadoop/mapred/JobEndNotifier.java
(original)
+++ hadoop/common/branches/branch-1.2/src/mapred/org/apache/hadoop/mapred/JobEndNotifier.java
Sat Apr 20 19:24:05 2013
@@ -39,51 +39,51 @@ public class JobEndNotifier {
   private static volatile boolean running;
   private static BlockingQueue<JobEndStatusInfo> queue =
     new DelayQueue<JobEndStatusInfo>();
+  // Set default timeout to 5 seconds
+  public static final int MAPREDUCE_JOBEND_NOTIFICATION_TIMEOUT_DEFAULT = 5000;
 
   public static void startNotifier() {
     running = true;
     thread = new Thread(
-                        new Runnable() {
-                          public void run() {
-                            try {
-                              while (running) {
-                                sendNotification(queue.take());
-                              }
-                            }
-                            catch (InterruptedException irex) {
-                              if (running) {
-                                LOG.error("Thread has ended unexpectedly", irex);
-                              }
-                            }
-                          }
-
-                          private void sendNotification(JobEndStatusInfo notification) {
-                            try {
-                              int code = httpNotification(notification.getUri());
-                              if (code != 200) {
-                                throw new IOException("Invalid response status code: " +
code);
-                              }
-                            }
-                            catch (IOException ioex) {
-                              LOG.error("Notification failure [" + notification + "]", ioex);
-                              if (notification.configureForRetry()) {
-                                try {
-                                  queue.put(notification);
-                                }
-                                catch (InterruptedException iex) {
-                                  LOG.error("Notification queuing error [" + notification
+ "]",
-                                            iex);
-                                }
-                              }
-                            }
-                            catch (Exception ex) {
-                              LOG.error("Notification failure [" + notification + "]", ex);
-                            }
-                          }
-
-                        }
+        new Runnable() {
+          public void run() {
+            try {
+              while (running) {
+                sendNotification(queue.take());
+              }
+            }
+            catch (InterruptedException irex) {
+              if (running) {
+                LOG.error("Thread has ended unexpectedly", irex);
+              }
+            }
+          }
 
-                        );
+          private void sendNotification(JobEndStatusInfo notification) {
+            try {
+              int code = httpNotification(notification.getUri(),
+                  notification.getTimeout());
+              if (code != 200) {
+                throw new IOException("Invalid response status code: " + code);
+              }
+            }
+            catch (IOException ioex) {
+              LOG.error("Notification failure [" + notification + "]", ioex);
+              if (notification.configureForRetry()) {
+                try {
+                  queue.put(notification);
+                }
+                catch (InterruptedException iex) {
+                  LOG.error("Notification queuing error [" + notification + "]",
+                      iex);
+                }
+              }
+            }
+            catch (Exception ex) {
+              LOG.error("Notification failure [" + notification + "]", ex);
+            }
+          }
+        });
     thread.start();
   }
 
@@ -97,9 +97,10 @@ public class JobEndNotifier {
     JobEndStatusInfo notification = null;
     String uri = conf.getJobEndNotificationURI();
     if (uri != null) {
-      // +1 to make logic for first notification identical to a retry
-      int retryAttempts = conf.getInt("job.end.retry.attempts", 0) + 1;
+      int retryAttempts = conf.getInt("job.end.retry.attempts", 0);
       long retryInterval = conf.getInt("job.end.retry.interval", 30000);
+      int timeout = conf.getInt("mapreduce.job.end-notification.timeout",
+          MAPREDUCE_JOBEND_NOTIFICATION_TIMEOUT_DEFAULT);
       if (uri.contains("$jobId")) {
         uri = uri.replace("$jobId", status.getJobID().toString());
       }
@@ -109,7 +110,8 @@ public class JobEndNotifier {
             (status.getRunState() == JobStatus.FAILED) ? "FAILED" : "KILLED";
         uri = uri.replace("$jobStatus", statusStr);
       }
-      notification = new JobEndStatusInfo(uri, retryAttempts, retryInterval);
+      notification = new JobEndStatusInfo(
+          uri, retryAttempts, retryInterval, timeout);
     }
     return notification;
   }
@@ -126,12 +128,17 @@ public class JobEndNotifier {
     }
   }
 
-  private static int httpNotification(String uri) throws IOException {
+  private static int httpNotification(String uri, int timeout)
+      throws IOException {
     URI url = new URI(uri, false);
-    HttpClient m_client = new HttpClient();
+
+    HttpClient httpClient = new HttpClient();
+    httpClient.getParams().setSoTimeout(timeout);
+    httpClient.getParams().setConnectionManagerTimeout(timeout);
+
     HttpMethod method = new GetMethod(url.getEscapedURI());
     method.setRequestHeader("Accept", "*/*");
-    return m_client.executeMethod(method);
+    return httpClient.executeMethod(method);
   }
 
   // for use by the LocalJobRunner, without using a thread&queue,
@@ -139,9 +146,10 @@ public class JobEndNotifier {
   public static void localRunnerNotification(JobConf conf, JobStatus status) {
     JobEndStatusInfo notification = createNotification(conf, status);
     if (notification != null) {
-      while (notification.configureForRetry()) {
+      do {
         try {
-          int code = httpNotification(notification.getUri());
+          int code = httpNotification(notification.getUri(),
+              notification.getTimeout());
           if (code != 200) {
             throw new IOException("Invalid response status code: " + code);
           }
@@ -157,13 +165,13 @@ public class JobEndNotifier {
         }
         try {
           synchronized (Thread.currentThread()) {
-            Thread.currentThread().sleep(notification.getRetryInterval());
+            Thread.sleep(notification.getRetryInterval());
           }
         }
         catch (InterruptedException iex) {
           LOG.error("Notification retry error [" + notification + "]", iex);
         }
-      }
+      } while (notification.configureForRetry());
     }
   }
 
@@ -172,11 +180,14 @@ public class JobEndNotifier {
     private int retryAttempts;
     private long retryInterval;
     private long delayTime;
+    private int timeout;
 
-    JobEndStatusInfo(String uri, int retryAttempts, long retryInterval) {
+    JobEndStatusInfo(String uri, int retryAttempts, long retryInterval,
+        int timeout) {
       this.uri = uri;
       this.retryAttempts = retryAttempts;
       this.retryInterval = retryInterval;
+      this.timeout = timeout;
       this.delayTime = System.currentTimeMillis();
     }
 
@@ -192,6 +203,10 @@ public class JobEndNotifier {
       return retryInterval;
     }
 
+    public int getTimeout() {
+      return timeout;
+    }
+
     public long getDelayTime() {
       return delayTime;
     }



Mime
View raw message