giraph-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From m...@apache.org
Subject git commit: updated refs/heads/trunk to f2b28e1
Date Fri, 01 Nov 2013 21:59:08 GMT
Updated Branches:
  refs/heads/trunk 909d4c3cc -> f2b28e14b


GIRAPH-790: Add a way to automatically retry a job (majakabiljo)


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/f2b28e14
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/f2b28e14
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/f2b28e14

Branch: refs/heads/trunk
Commit: f2b28e14b6a8b70c25e86e9823e7ed3f64aaf459
Parents: 909d4c3
Author: Maja Kabiljo <majakabiljo@fb.com>
Authored: Fri Nov 1 14:58:10 2013 -0700
Committer: Maja Kabiljo <majakabiljo@fb.com>
Committed: Fri Nov 1 14:59:08 2013 -0700

----------------------------------------------------------------------
 CHANGELOG                                       |  2 +
 .../apache/giraph/conf/GiraphConfiguration.java | 20 ++++++++
 .../org/apache/giraph/conf/GiraphConstants.java |  9 ++++
 .../ImmutableClassesGiraphConfiguration.java    | 10 ++++
 .../job/DefaultGiraphJobRetryChecker.java       | 33 ++++++++++++++
 .../java/org/apache/giraph/job/GiraphJob.java   | 48 ++++++++++++--------
 .../giraph/job/GiraphJobRetryChecker.java       | 36 +++++++++++++++
 7 files changed, 139 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/f2b28e14/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index 4465ac1..7df5b3e 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,6 +1,8 @@
 Giraph Change Log
 
 Release 1.1.0 - unreleased
+  GIRAPH-790: Add a way to automatically retry a job (majakabiljo)
+
   GIRAPH-789: Upgrade hive-io to 0.20 - less metastore accesses (majakabiljo)
 
   GIRAPH-787: Use HiveIO 1.9 (gmalewicz via aching)

http://git-wip-us.apache.org/repos/asf/giraph/blob/f2b28e14/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
index 4dee396..f176bfe 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
@@ -34,6 +34,7 @@ import org.apache.giraph.io.VertexOutputFormat;
 import org.apache.giraph.io.filters.EdgeInputFilter;
 import org.apache.giraph.io.filters.VertexInputFilter;
 import org.apache.giraph.job.GiraphJobObserver;
+import org.apache.giraph.job.GiraphJobRetryChecker;
 import org.apache.giraph.master.MasterCompute;
 import org.apache.giraph.master.MasterObserver;
 import org.apache.giraph.partition.GraphPartitionerFactory;
@@ -299,6 +300,25 @@ public class GiraphConfiguration extends Configuration
   }
 
   /**
+   * Get job retry checker class
+   *
+   * @return GiraphJobRetryChecker class set.
+   */
+  public Class<? extends GiraphJobRetryChecker> getJobRetryCheckerClass() {
+    return JOB_RETRY_CHECKER_CLASS.get(this);
+  }
+
+  /**
+   * Set job retry checker class
+   *
+   * @param klass GiraphJobRetryChecker class to set.
+   */
+  public void setJobRetryCheckerClass(
+      Class<? extends GiraphJobRetryChecker> klass) {
+    JOB_RETRY_CHECKER_CLASS.set(this, klass);
+  }
+
+  /**
    * Check whether to enable jmap dumping thread.
    *
    * @return true if jmap dumper is enabled.

http://git-wip-us.apache.org/repos/asf/giraph/blob/f2b28e14/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
index eb8eb21..6f32e46 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
@@ -47,8 +47,10 @@ import org.apache.giraph.io.filters.DefaultEdgeInputFilter;
 import org.apache.giraph.io.filters.DefaultVertexInputFilter;
 import org.apache.giraph.io.filters.EdgeInputFilter;
 import org.apache.giraph.io.filters.VertexInputFilter;
+import org.apache.giraph.job.DefaultGiraphJobRetryChecker;
 import org.apache.giraph.job.DefaultJobObserver;
 import org.apache.giraph.job.GiraphJobObserver;
+import org.apache.giraph.job.GiraphJobRetryChecker;
 import org.apache.giraph.job.HaltApplicationUtils;
 import org.apache.giraph.master.DefaultMasterCompute;
 import org.apache.giraph.master.MasterCompute;
@@ -193,6 +195,13 @@ public interface GiraphConstants {
           DefaultJobObserver.class, GiraphJobObserver.class,
           "Observer class to watch over job status - optional");
 
+  /** Observer class to watch over job status - optional */
+  ClassConfOption<GiraphJobRetryChecker> JOB_RETRY_CHECKER_CLASS =
+      ClassConfOption.create("giraph.jobRetryCheckerClass",
+          DefaultGiraphJobRetryChecker.class, GiraphJobRetryChecker.class,
+          "Class which decides whether a failed job should be retried - " +
+              "optional");
+
   // At least one of the input format classes is required.
   /** VertexInputFormat class */
   ClassConfOption<VertexInputFormat> VERTEX_INPUT_FORMAT_CLASS =

http://git-wip-us.apache.org/repos/asf/giraph/blob/f2b28e14/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
b/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
index 6bb6c00..b33938a 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
@@ -51,6 +51,7 @@ import org.apache.giraph.io.superstep_output.NoOpSuperstepOutput;
 import org.apache.giraph.io.superstep_output.SuperstepOutput;
 import org.apache.giraph.io.superstep_output.SynchronizedSuperstepOutput;
 import org.apache.giraph.job.GiraphJobObserver;
+import org.apache.giraph.job.GiraphJobRetryChecker;
 import org.apache.giraph.master.MasterCompute;
 import org.apache.giraph.master.MasterObserver;
 import org.apache.giraph.master.SuperstepClasses;
@@ -698,6 +699,15 @@ public class ImmutableClassesGiraphConfiguration<I extends WritableComparable,
   }
 
   /**
+   * Create job retry checker
+   *
+   * @return GiraphJobRetryChecker set in configuration.
+   */
+  public GiraphJobRetryChecker getJobRetryChecker() {
+    return ReflectionUtils.newInstance(getJobRetryCheckerClass(), this);
+  }
+
+  /**
    * Get the user's subclassed edge value class.
    *
    * @return User's vertex edge value class

http://git-wip-us.apache.org/repos/asf/giraph/blob/f2b28e14/giraph-core/src/main/java/org/apache/giraph/job/DefaultGiraphJobRetryChecker.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/job/DefaultGiraphJobRetryChecker.java
b/giraph-core/src/main/java/org/apache/giraph/job/DefaultGiraphJobRetryChecker.java
new file mode 100644
index 0000000..0cab86c
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/job/DefaultGiraphJobRetryChecker.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.job;
+
+import org.apache.hadoop.mapreduce.Job;
+
+/**
+ * Default implementation of {@link GiraphJobRetryChecker},
+ * which never retries the job.
+ */
+public class DefaultGiraphJobRetryChecker implements GiraphJobRetryChecker {
+  @Override
+  public boolean shouldRetry(Job submittedJob, int tryCount) {
+    // By default, don't retry failed jobs
+    return false;
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/f2b28e14/giraph-core/src/main/java/org/apache/giraph/job/GiraphJob.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/job/GiraphJob.java b/giraph-core/src/main/java/org/apache/giraph/job/GiraphJob.java
index fca14ac..40670bb 100644
--- a/giraph-core/src/main/java/org/apache/giraph/job/GiraphJob.java
+++ b/giraph-core/src/main/java/org/apache/giraph/job/GiraphJob.java
@@ -231,27 +231,37 @@ public class GiraphJob {
     ImmutableClassesGiraphConfiguration conf =
         new ImmutableClassesGiraphConfiguration(giraphConfiguration);
     checkLocalJobRunnerConfiguration(conf);
-    Job submittedJob = new Job(conf, jobName);
-    if (submittedJob.getJar() == null) {
-      submittedJob.setJarByClass(getClass());
-    }
-    submittedJob.setNumReduceTasks(0);
-    submittedJob.setMapperClass(GraphMapper.class);
-    submittedJob.setInputFormatClass(BspInputFormat.class);
-    submittedJob.setOutputFormatClass(BspOutputFormat.class);
 
-    GiraphJobObserver jobObserver = conf.getJobObserver();
-    jobObserver.launchingJob(submittedJob);
-    submittedJob.submit();
-    if (LOG.isInfoEnabled()) {
-      LOG.info("run: Tracking URL: " + submittedJob.getTrackingURL());
-    }
-    HaltApplicationUtils.printHaltInfo(submittedJob, conf);
-    jobObserver.jobRunning(submittedJob);
+    int tryCount = 0;
+    GiraphJobRetryChecker retryChecker = conf.getJobRetryChecker();
+    while (true) {
+      tryCount++;
+      Job submittedJob = new Job(conf, jobName);
+      if (submittedJob.getJar() == null) {
+        submittedJob.setJarByClass(getClass());
+      }
+      submittedJob.setNumReduceTasks(0);
+      submittedJob.setMapperClass(GraphMapper.class);
+      submittedJob.setInputFormatClass(BspInputFormat.class);
+      submittedJob.setOutputFormatClass(BspOutputFormat.class);
 
-    boolean passed = submittedJob.waitForCompletion(verbose);
-    jobObserver.jobFinished(submittedJob, passed);
+      GiraphJobObserver jobObserver = conf.getJobObserver();
+      jobObserver.launchingJob(submittedJob);
+      submittedJob.submit();
+      if (LOG.isInfoEnabled()) {
+        LOG.info("run: Tracking URL: " + submittedJob.getTrackingURL());
+      }
+      HaltApplicationUtils.printHaltInfo(submittedJob, conf);
+      jobObserver.jobRunning(submittedJob);
 
-    return passed;
+      boolean passed = submittedJob.waitForCompletion(verbose);
+      jobObserver.jobFinished(submittedJob, passed);
+      if (passed || !retryChecker.shouldRetry(submittedJob, tryCount)) {
+        return passed;
+      }
+      if (LOG.isInfoEnabled()) {
+        LOG.info("run: Retrying job, " + tryCount + " try");
+      }
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/f2b28e14/giraph-core/src/main/java/org/apache/giraph/job/GiraphJobRetryChecker.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/job/GiraphJobRetryChecker.java b/giraph-core/src/main/java/org/apache/giraph/job/GiraphJobRetryChecker.java
new file mode 100644
index 0000000..53a800e
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/job/GiraphJobRetryChecker.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.job;
+
+import org.apache.hadoop.mapreduce.Job;
+
+/**
+ * Class to decide whether a GiraphJob should be restarted after failure.
+ */
+public interface GiraphJobRetryChecker {
+  /**
+   * Check if the job should be retried
+   *
+   * @param submittedJob Job that ran and failed
+   * @param tryCount How many times have we tried to run the job until now
+   *
+   * @return True iff job should be retried
+   */
+  boolean shouldRetry(Job submittedJob, int tryCount);
+}


Mime
View raw message