giraph-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From edu...@apache.org
Subject git commit: updated refs/heads/trunk to 8af7670
Date Wed, 14 Jan 2015 23:10:41 GMT
Repository: giraph
Updated Branches:
  refs/heads/trunk 5a58a7754 -> 8af7670c3


GIRAPH-980 Way to disable checkpoints for particular job and on particular supersteps

Summary:
Added CheckpointSupportedChecker, using this interface one can specify whether the job is
checkpointable or not.
By default all jobs that don't do output during the computation are checkpointable.

Test Plan: Run several jobs with output during the computation enabled and disabled. Also
wrote custom CheckpointSupportedChecker to test if I can disable checkpoints on particular
superstep

Reviewers: majakabiljo, pavanka, pavanka.26, maja.kabiljo

Reviewed By: maja.kabiljo

Differential Revision: https://reviews.facebook.net/D31113


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/8af7670c
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/8af7670c
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/8af7670c

Branch: refs/heads/trunk
Commit: 8af7670c3afb4924b2603712c1532925ce8b89fa
Parents: 5a58a77
Author: Sergey Edunov <edunov@fb.com>
Authored: Wed Jan 14 14:06:09 2015 -0800
Committer: Sergey Edunov <edunov@fb.com>
Committed: Wed Jan 14 14:07:50 2015 -0800

----------------------------------------------------------------------
 CHANGELOG                                       |  4 +-
 .../org/apache/giraph/bsp/CheckpointStatus.java | 31 ------------
 .../bsp/checkpoints/CheckpointStatus.java       | 31 ++++++++++++
 .../checkpoints/CheckpointSupportedChecker.java | 51 ++++++++++++++++++++
 .../DefaultCheckpointSupportedChecker.java      | 39 +++++++++++++++
 .../DisabledCheckpointSupportedChecker.java     | 34 +++++++++++++
 .../giraph/bsp/checkpoints/package-info.java    | 21 ++++++++
 .../apache/giraph/conf/GiraphConfiguration.java | 13 +++++
 .../org/apache/giraph/conf/GiraphConstants.java | 17 ++++++-
 .../giraph/graph/FinishedSuperstepStats.java    |  2 +-
 .../org/apache/giraph/graph/GlobalStats.java    |  2 +-
 .../apache/giraph/graph/GraphTaskManager.java   |  2 +-
 .../apache/giraph/master/BspServiceMaster.java  | 37 ++++++++++++--
 .../apache/giraph/worker/BspServiceWorker.java  |  2 +-
 14 files changed, 246 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/8af7670c/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index ab2a21f..d487874 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,7 +1,9 @@
 Giraph Change Log
 
 Release 1.2.0 - unreleased
-
+ 
+  GIRAPH-980 Way to disable checkpoints for particular job and on particular supersteps (edunov)
+ 
   GIRAPH-983 Remove checkpoint related error messages from console (edunov)
 
   GIRAPH-978 Giraph-Debugger Test Graphs not working (nishantgandhi99 via edunov)

http://git-wip-us.apache.org/repos/asf/giraph/blob/8af7670c/giraph-core/src/main/java/org/apache/giraph/bsp/CheckpointStatus.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/bsp/CheckpointStatus.java b/giraph-core/src/main/java/org/apache/giraph/bsp/CheckpointStatus.java
deleted file mode 100644
index 74db490..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/bsp/CheckpointStatus.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.bsp;
-
-/**
- * Enum represents possible checkpoint state.
- */
-public enum CheckpointStatus {
-  /** Do nothing, no checkpoint required */
-  NONE,
-  /** Regular checkpoint */
-  CHECKPOINT,
-  /** Do checkpoint and then halt further computation */
-  CHECKPOINT_AND_HALT
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/8af7670c/giraph-core/src/main/java/org/apache/giraph/bsp/checkpoints/CheckpointStatus.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/bsp/checkpoints/CheckpointStatus.java
b/giraph-core/src/main/java/org/apache/giraph/bsp/checkpoints/CheckpointStatus.java
new file mode 100644
index 0000000..8de4c77
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/bsp/checkpoints/CheckpointStatus.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.bsp.checkpoints;
+
+/**
+ * Enum represents possible checkpoint state.
+ */
+public enum CheckpointStatus {
+  /** Do nothing, no checkpoint required */
+  NONE,
+  /** Regular checkpoint */
+  CHECKPOINT,
+  /** Do checkpoint and then halt further computation */
+  CHECKPOINT_AND_HALT
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/8af7670c/giraph-core/src/main/java/org/apache/giraph/bsp/checkpoints/CheckpointSupportedChecker.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/bsp/checkpoints/CheckpointSupportedChecker.java
b/giraph-core/src/main/java/org/apache/giraph/bsp/checkpoints/CheckpointSupportedChecker.java
new file mode 100644
index 0000000..fe1c845
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/bsp/checkpoints/CheckpointSupportedChecker.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.bsp.checkpoints;
+
+import org.apache.giraph.conf.GiraphConfiguration;
+import org.apache.giraph.master.MasterCompute;
+
+/**
+ * To prevent accidental checkpointing of non-checkpointable app
+ * you may provide implementation of this interface. Most apps are
+ * checkpointable by default, however some apps are not checkpointable,
+ * e.g. apps that use static variables to pass data around between supersteps
+ * or start new threads or use external resources. This interface allows
+ * you to specify if and when your app is checkpointable.
+ */
+public interface CheckpointSupportedChecker {
+
+  /**
+   * Does the job support checkpoints?
+   * It is true by default, set it to false if your job uses some
+   * non-checkpointable features:
+   * - static variables for storing data between supersteps.
+   * - starts new threads or uses Timers
+   * - writes output before job is complete, etc
+   * This method is called on master and has access to
+   * job configuration and master compute.
+   *
+   * @param conf giraph configuration
+   * @param masterCompute instance of master compute
+   * @return true if checkpointing on current superstep is supported
+   * by this application.
+   */
+  boolean isCheckpointSupported(GiraphConfiguration conf,
+                                       MasterCompute masterCompute);
+
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/8af7670c/giraph-core/src/main/java/org/apache/giraph/bsp/checkpoints/DefaultCheckpointSupportedChecker.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/bsp/checkpoints/DefaultCheckpointSupportedChecker.java
b/giraph-core/src/main/java/org/apache/giraph/bsp/checkpoints/DefaultCheckpointSupportedChecker.java
new file mode 100644
index 0000000..ef7b10c
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/bsp/checkpoints/DefaultCheckpointSupportedChecker.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.bsp.checkpoints;
+
+import org.apache.giraph.conf.GiraphConfiguration;
+import org.apache.giraph.master.MasterCompute;
+
+/**
+ * Default checkpoint supported checker.
+ * Most tasks will support checkpointing by default unless they do output
+ * during the computation. Some tasks however can use non-checkpointable
+ * features, e.g.: static variables to pass data around between supersteps
+ * or new threads started from MasterCompute or external resources. In that
+ * case to prevent accidental checkpointing you should provide checker
+ * implementation.
+ */
+public class DefaultCheckpointSupportedChecker
+    implements CheckpointSupportedChecker {
+  @Override
+  public boolean isCheckpointSupported(GiraphConfiguration conf,
+                                       MasterCompute masterCompute) {
+    return !conf.doOutputDuringComputation();
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/8af7670c/giraph-core/src/main/java/org/apache/giraph/bsp/checkpoints/DisabledCheckpointSupportedChecker.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/bsp/checkpoints/DisabledCheckpointSupportedChecker.java
b/giraph-core/src/main/java/org/apache/giraph/bsp/checkpoints/DisabledCheckpointSupportedChecker.java
new file mode 100644
index 0000000..7a13187
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/bsp/checkpoints/DisabledCheckpointSupportedChecker.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.bsp.checkpoints;
+
+import org.apache.giraph.conf.GiraphConfiguration;
+import org.apache.giraph.master.MasterCompute;
+
+/**
+ * Disable all checkpoints.
+ */
+public class DisabledCheckpointSupportedChecker
+    implements CheckpointSupportedChecker {
+
+  @Override
+  public boolean isCheckpointSupported(GiraphConfiguration conf,
+                                       MasterCompute masterCompute) {
+    return false;
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/8af7670c/giraph-core/src/main/java/org/apache/giraph/bsp/checkpoints/package-info.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/bsp/checkpoints/package-info.java
b/giraph-core/src/main/java/org/apache/giraph/bsp/checkpoints/package-info.java
new file mode 100644
index 0000000..3e56161
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/bsp/checkpoints/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Package of generic bulk synchronous processing objects.
+ */
+package org.apache.giraph.bsp.checkpoints;

http://git-wip-us.apache.org/repos/asf/giraph/blob/8af7670c/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
index 7f1a764..8c64b5d 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
@@ -19,6 +19,7 @@
 package org.apache.giraph.conf;
 
 import org.apache.giraph.aggregators.AggregatorWriter;
+import org.apache.giraph.bsp.checkpoints.CheckpointSupportedChecker;
 import org.apache.giraph.combiner.MessageCombiner;
 import org.apache.giraph.edge.OutEdges;
 import org.apache.giraph.edge.ReuseObjectsOutEdges;
@@ -1003,6 +1004,18 @@ public class GiraphConfiguration extends Configuration
   }
 
   /**
+   * Set runtime checkpoint support checker.
+   * The instance of this class will have to decide whether
+   * checkpointing is allowed on current superstep.
+   *
+   * @param clazz checkpoint supported checker class
+   */
+  public void setCheckpointSupportedChecker(
+      Class<? extends CheckpointSupportedChecker> clazz) {
+    GiraphConstants.CHECKPOINT_SUPPORTED_CHECKER.set(this, clazz);
+  }
+
+  /**
    * Set the max task attempts
    *
    * @param maxTaskAttempts Max task attempts to use

http://git-wip-us.apache.org/repos/asf/giraph/blob/8af7670c/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
index bbf3bd2..f1cf520 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
@@ -19,6 +19,8 @@ package org.apache.giraph.conf;
 
 import org.apache.giraph.aggregators.AggregatorWriter;
 import org.apache.giraph.aggregators.TextAggregatorWriter;
+import org.apache.giraph.bsp.checkpoints.CheckpointSupportedChecker;
+import org.apache.giraph.bsp.checkpoints.DefaultCheckpointSupportedChecker;
 import org.apache.giraph.combiner.MessageCombiner;
 import org.apache.giraph.comm.messages.InMemoryMessageStoreFactory;
 import org.apache.giraph.comm.messages.MessageEncodeAndStoreType;
@@ -1159,7 +1161,7 @@ public interface GiraphConstants {
   /**
    * Compression algorithm to be used for checkpointing.
    * Defined by extension for hadoop compatibility reasons.
-  */
+   */
   StrConfOption CHECKPOINT_COMPRESSION_CODEC =
       new StrConfOption("giraph.checkpoint.compression.codec",
           ".deflate",
@@ -1167,6 +1169,19 @@ public interface GiraphConstants {
               "storing checkpoint. Available options include but " +
               "not restricted to: .deflate, .gz, .bz2, .lzo");
 
+  /**
+   * Defines if and when checkpointing is supported by this job.
+   * By default checkpointing is always supported unless output during the
+   * computation is enabled.
+   */
+  ClassConfOption<CheckpointSupportedChecker> CHECKPOINT_SUPPORTED_CHECKER =
+      ClassConfOption.create("giraph.checkpoint.supported.checker",
+          DefaultCheckpointSupportedChecker.class,
+          CheckpointSupportedChecker.class,
+          "This is the way to specify if checkpointing is " +
+              "supported by the job");
+
+
   /** Number of threads to use in async message store, 0 means
    * we should not use async message processing */
   IntConfOption ASYNC_MESSAGE_STORE_THREADS_COUNT =

http://git-wip-us.apache.org/repos/asf/giraph/blob/8af7670c/giraph-core/src/main/java/org/apache/giraph/graph/FinishedSuperstepStats.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/FinishedSuperstepStats.java
b/giraph-core/src/main/java/org/apache/giraph/graph/FinishedSuperstepStats.java
index f7895a9..cfb9799 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/FinishedSuperstepStats.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/FinishedSuperstepStats.java
@@ -17,7 +17,7 @@
  */
 package org.apache.giraph.graph;
 
-import org.apache.giraph.bsp.CheckpointStatus;
+import org.apache.giraph.bsp.checkpoints.CheckpointStatus;
 
 /**
  * Immutable graph stats after the completion of a superstep

http://git-wip-us.apache.org/repos/asf/giraph/blob/8af7670c/giraph-core/src/main/java/org/apache/giraph/graph/GlobalStats.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/GlobalStats.java b/giraph-core/src/main/java/org/apache/giraph/graph/GlobalStats.java
index e11f02c..499c862 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/GlobalStats.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/GlobalStats.java
@@ -22,7 +22,7 @@ import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 
-import org.apache.giraph.bsp.CheckpointStatus;
+import org.apache.giraph.bsp.checkpoints.CheckpointStatus;
 import org.apache.giraph.partition.PartitionStats;
 import org.apache.hadoop.io.Writable;
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/8af7670c/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
index d479d74..ff9b028 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
@@ -34,7 +34,7 @@ import java.util.concurrent.TimeUnit;
 import org.apache.giraph.bsp.BspService;
 import org.apache.giraph.bsp.CentralizedServiceMaster;
 import org.apache.giraph.bsp.CentralizedServiceWorker;
-import org.apache.giraph.bsp.CheckpointStatus;
+import org.apache.giraph.bsp.checkpoints.CheckpointStatus;
 import org.apache.giraph.comm.messages.MessageStore;
 import org.apache.giraph.conf.ClassConfOption;
 import org.apache.giraph.conf.GiraphConstants;

http://git-wip-us.apache.org/repos/asf/giraph/blob/8af7670c/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
index 49ceb9d..5281ae8 100644
--- a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
+++ b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
@@ -50,12 +50,14 @@ import org.apache.giraph.bsp.ApplicationState;
 import org.apache.giraph.bsp.BspInputFormat;
 import org.apache.giraph.bsp.BspService;
 import org.apache.giraph.bsp.CentralizedServiceMaster;
-import org.apache.giraph.bsp.CheckpointStatus;
+import org.apache.giraph.bsp.checkpoints.CheckpointStatus;
 import org.apache.giraph.bsp.SuperstepState;
+import org.apache.giraph.bsp.checkpoints.CheckpointSupportedChecker;
 import org.apache.giraph.comm.MasterClient;
 import org.apache.giraph.comm.MasterServer;
 import org.apache.giraph.comm.netty.NettyMasterClient;
 import org.apache.giraph.comm.netty.NettyMasterServer;
+import org.apache.giraph.conf.GiraphConfiguration;
 import org.apache.giraph.conf.GiraphConstants;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.counters.GiraphStats;
@@ -89,6 +91,7 @@ import org.apache.giraph.utils.JMapHistoDumper;
 import org.apache.giraph.utils.LogStacktraceCallable;
 import org.apache.giraph.utils.ProgressableUtils;
 import org.apache.giraph.utils.ReactiveJMapHistoDumper;
+import org.apache.giraph.utils.ReflectionUtils;
 import org.apache.giraph.utils.WritableUtils;
 import org.apache.giraph.worker.WorkerInfo;
 import org.apache.giraph.zk.BspEvent;
@@ -193,6 +196,8 @@ public class BspServiceMaster<I extends WritableComparable,
   private final int checkpointFrequency;
   /** Current checkpoint status */
   private CheckpointStatus checkpointStatus;
+  /** Checks if checkpointing supported */
+  private CheckpointSupportedChecker checkpointSupportedChecker;
 
   /**
    * Constructor for setting up the master.
@@ -231,6 +236,9 @@ public class BspServiceMaster<I extends WritableComparable,
 
     this.checkpointFrequency = conf.getCheckpointFrequency();
     this.checkpointStatus = CheckpointStatus.NONE;
+    this.checkpointSupportedChecker =
+        ReflectionUtils.newInstance(
+            GiraphConstants.CHECKPOINT_SUPPORTED_CHECKER.get(conf));
 
     GiraphMetrics.get().addSuperstepResetObserver(this);
     GiraphStats.init(context);
@@ -1759,7 +1767,12 @@ public class BspServiceMaster<I extends WritableComparable,
     try {
       if (getZkExt().
           exists(basePath + FORCE_CHECKPOINT_USER_FLAG, false) != null) {
-        return CheckpointStatus.CHECKPOINT_AND_HALT;
+        if (isCheckpointingSupported(getConfiguration(), masterCompute)) {
+          return CheckpointStatus.CHECKPOINT_AND_HALT;
+        } else {
+          LOG.warn("Attempted to manually checkpoint the job that " +
+              "does not support checkpoints. Ignoring");
+        }
       }
     } catch (KeeperException e) {
       throw new IllegalStateException(
@@ -1779,12 +1792,30 @@ public class BspServiceMaster<I extends WritableComparable,
       return CheckpointStatus.NONE;
     }
     if (((superstep - firstCheckpoint) % checkpointFrequency) == 0) {
-      return CheckpointStatus.CHECKPOINT;
+      if (isCheckpointingSupported(getConfiguration(), masterCompute)) {
+        return CheckpointStatus.CHECKPOINT;
+      }
     }
     return CheckpointStatus.NONE;
   }
 
   /**
+   * Returns false if job doesn't support checkpoints.
+   * Job may not support checkpointing if it does output during
+   * computation, uses static variables to keep data between supersteps,
+   * starts new threads etc.
+   * @param conf Immutable configuration of the job
+   * @param masterCompute instance of master compute
+   * @return true if it is safe to checkpoint the job
+   */
+  private boolean isCheckpointingSupported(
+      GiraphConfiguration conf, MasterCompute masterCompute) {
+    return checkpointSupportedChecker.isCheckpointSupported(
+        conf, masterCompute);
+  }
+
+
+  /**
    * This doMasterCompute is only called
    * after masterCompute is initialized
    */

http://git-wip-us.apache.org/repos/asf/giraph/blob/8af7670c/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
index 381e51a..f614a33 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
@@ -42,7 +42,7 @@ import net.iharder.Base64;
 import org.apache.giraph.bsp.ApplicationState;
 import org.apache.giraph.bsp.BspService;
 import org.apache.giraph.bsp.CentralizedServiceWorker;
-import org.apache.giraph.bsp.CheckpointStatus;
+import org.apache.giraph.bsp.checkpoints.CheckpointStatus;
 import org.apache.giraph.comm.ServerData;
 import org.apache.giraph.comm.WorkerClient;
 import org.apache.giraph.comm.WorkerClientRequestProcessor;


Mime
View raw message