giraph-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ni...@apache.org
Subject git commit: GIRAPH-464: MasterObserver#applicationFailed callback (nitay)
Date Thu, 03 Jan 2013 23:10:10 GMT
Updated Branches:
  refs/heads/trunk 57ea55610 -> 533d52159


GIRAPH-464: MasterObserver#applicationFailed callback (nitay)


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/533d5215
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/533d5215
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/533d5215

Branch: refs/heads/trunk
Commit: 533d52159590f87e8e9996a853e7582e67ae8cd6
Parents: 57ea556
Author: Nitay Joffe <nitay@apache.org>
Authored: Thu Jan 3 12:29:46 2013 -0500
Committer: Nitay Joffe <nitay@apache.org>
Committed: Thu Jan 3 18:09:58 2013 -0500

----------------------------------------------------------------------
 CHANGELOG                                          |    2 +
 .../giraph/bsp/CentralizedServiceMaster.java       |    7 +++++
 .../org/apache/giraph/graph/BspServiceMaster.java  |   22 ++++++++++++---
 .../java/org/apache/giraph/graph/GraphMapper.java  |    3 +-
 .../java/org/apache/giraph/graph/MasterThread.java |   21 +++++---------
 .../giraph/master/DefaultMasterObserver.java       |   15 +++++-----
 .../org/apache/giraph/master/MasterObserver.java   |    7 +++++
 7 files changed, 49 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/533d5215/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index 0649b77..e36ffcd 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,6 +1,8 @@
 Giraph Change Log
 
 Release 0.2.0 - unreleased
+  GIRAPH-464: MasterObserver#applicationFailed callback (nitay)
+
   GIRAPH-458: split formats module into accumulo,hbase,hcatalog (nitay)
 
   GIRAPH-463: Create VertexResolver only once (apresta)

http://git-wip-us.apache.org/repos/asf/giraph/blob/533d5215/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java
b/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java
index a328737..c351313 100644
--- a/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java
+++ b/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java
@@ -132,4 +132,11 @@ public interface CentralizedServiceMaster<I extends WritableComparable,
    * Application has finished.
    */
   void postApplication();
+
+  /**
+   * Called when the job fails in order to let the Master do any cleanup.
+   *
+   * @param e Exception job failed from. May be null.
+   */
+  void failureCleanup(Exception e);
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/533d5215/giraph-core/src/main/java/org/apache/giraph/graph/BspServiceMaster.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/BspServiceMaster.java b/giraph-core/src/main/java/org/apache/giraph/graph/BspServiceMaster.java
index ee64a46..41bbcee 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/BspServiceMaster.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/BspServiceMaster.java
@@ -57,6 +57,7 @@ import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.JobID;
 import org.apache.hadoop.mapred.RunningJob;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.Mapper;
@@ -293,8 +294,6 @@ public class BspServiceMaster<I extends WritableComparable,
 
   /**
    * When there is no salvaging this job, fail it.
-   *
-   * @throws IOException
    */
   private void failJob() {
     LOG.fatal("failJob: Killing job " + getJobId());
@@ -305,9 +304,9 @@ public class BspServiceMaster<I extends WritableComparable,
               (org.apache.hadoop.mapred.JobConf)
               getContext().getConfiguration());
       @SuppressWarnings("deprecation")
-      org.apache.hadoop.mapred.JobID jobId =
-          org.apache.hadoop.mapred.JobID.forName(getJobId());
+      JobID jobId = JobID.forName(getJobId());
       RunningJob job = jobClient.getJob(jobId);
+      failureCleanup(null);
       job.killJob();
     } catch (IOException e) {
       throw new RuntimeException(e);
@@ -1631,6 +1630,21 @@ public class BspServiceMaster<I extends WritableComparable,
   }
 
   @Override
+  public void failureCleanup(Exception e) {
+    for (MasterObserver observer : observers) {
+      try {
+        observer.applicationFailed(e);
+        // CHECKSTYLE: stop IllegalCatchCheck
+      } catch (RuntimeException re) {
+        // CHECKSTYLE: resume IllegalCatchCheck
+        LOG.error(re.getClass().getName() + " from observer " +
+            observer.getClass().getName(), re);
+      }
+      getContext().progress();
+    }
+  }
+
+  @Override
   public void cleanup() throws IOException {
     // All master processes should denote they are done by adding special
     // znode.  Once the number of znodes equals the number of partitions

http://git-wip-us.apache.org/repos/asf/giraph/blob/533d5215/giraph-core/src/main/java/org/apache/giraph/graph/GraphMapper.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/GraphMapper.java b/giraph-core/src/main/java/org/apache/giraph/graph/GraphMapper.java
index eb35723..e491840 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/GraphMapper.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/GraphMapper.java
@@ -416,8 +416,7 @@ public class GraphMapper<I extends WritableComparable, V extends Writable,
         }
         serviceMaster = new BspServiceMaster<I, V, E, M>(
             serverPortList, sessionMsecTimeout, context, this);
-        masterThread = new MasterThread<I, V, E, M>(
-            (BspServiceMaster<I, V, E, M>) serviceMaster, context);
+        masterThread = new MasterThread<I, V, E, M>(serviceMaster, context);
         masterThread.start();
       }
       if ((mapFunctions == MapFunctions.WORKER_ONLY) ||

http://git-wip-us.apache.org/repos/asf/giraph/blob/533d5215/giraph-core/src/main/java/org/apache/giraph/graph/MasterThread.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/MasterThread.java b/giraph-core/src/main/java/org/apache/giraph/graph/MasterThread.java
index dbded04..e27de42 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/MasterThread.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/MasterThread.java
@@ -28,9 +28,7 @@ import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.Mapper.Context;
 import org.apache.log4j.Logger;
-import org.apache.zookeeper.KeeperException;
 
-import java.io.IOException;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.TreeMap;
@@ -71,7 +69,7 @@ public class MasterThread<I extends WritableComparable, V extends Writable,
    *        been called.
    * @param context Context from the Mapper.
    */
-  MasterThread(BspServiceMaster<I, V, E, M> bspServiceMaster,
+  MasterThread(CentralizedServiceMaster<I, V, E, M> bspServiceMaster,
       Context context) {
     super(MasterThread.class.getName());
     this.bspServiceMaster = bspServiceMaster;
@@ -173,19 +171,14 @@ public class MasterThread<I extends WritableComparable, V extends
Writable,
         GiraphTimers.getInstance().getTotalMs().
           increment(System.currentTimeMillis() - startMillis);
       }
-    } catch (IOException e) {
+      bspServiceMaster.postApplication();
+      // CHECKSTYLE: stop IllegalCatchCheck
+    } catch (Exception e) {
+      // CHECKSTYLE: resume IllegalCatchCheck
+      bspServiceMaster.failureCleanup(e);
       LOG.error("masterThread: Master algorithm failed with " +
-          "IOException ", e);
-      throw new IllegalStateException(e);
-    } catch (InterruptedException e) {
-      LOG.error("masterThread: Master algorithm failed with " +
-          "InterruptedException", e);
-      throw new IllegalStateException(e);
-    } catch (KeeperException e) {
-      LOG.error("masterThread: Master algorithm failed with " +
-          "KeeperException", e);
+          e.getClass().getSimpleName(), e);
       throw new IllegalStateException(e);
     }
-    bspServiceMaster.postApplication();
   }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/533d5215/giraph-core/src/main/java/org/apache/giraph/master/DefaultMasterObserver.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/DefaultMasterObserver.java
b/giraph-core/src/main/java/org/apache/giraph/master/DefaultMasterObserver.java
index 4b4dee6..f566979 100644
--- a/giraph-core/src/main/java/org/apache/giraph/master/DefaultMasterObserver.java
+++ b/giraph-core/src/main/java/org/apache/giraph/master/DefaultMasterObserver.java
@@ -38,18 +38,17 @@ public class DefaultMasterObserver implements MasterObserver {
   }
 
   @Override
-  public void preApplication() {
-  }
+  public void preApplication() { }
 
   @Override
-  public void postApplication() {
-  }
+  public void postApplication() { }
 
   @Override
-  public void preSuperstep() {
-  }
+  public void applicationFailed(Exception e) { }
 
   @Override
-  public void postSuperstep() {
-  }
+  public void preSuperstep() { }
+
+  @Override
+  public void postSuperstep() { }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/533d5215/giraph-core/src/main/java/org/apache/giraph/master/MasterObserver.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/MasterObserver.java b/giraph-core/src/main/java/org/apache/giraph/master/MasterObserver.java
index b8f5a26..a72b18a 100644
--- a/giraph-core/src/main/java/org/apache/giraph/master/MasterObserver.java
+++ b/giraph-core/src/main/java/org/apache/giraph/master/MasterObserver.java
@@ -35,6 +35,13 @@ public interface MasterObserver extends ImmutableClassesGiraphConfigurable
{
   void postApplication();
 
   /**
+   * If there is an error during the application.
+   *
+   * @param e Exception that caused failure. May be null.
+   */
+  void applicationFailed(Exception e);
+
+  /**
    * Before each superstep starts.
    */
   void preSuperstep();


Mime
View raw message