giraph-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From m...@apache.org
Subject git commit: updated refs/heads/trunk to 9b6d6f9
Date Thu, 19 May 2016 09:08:02 GMT
Repository: giraph
Updated Branches:
  refs/heads/trunk 608d50697 -> 9b6d6f9f6


GIRAPH-1065: Allow extending JobProgressTrackerService

Summary: We might want to perform additional actions on events from JobProgressTrackerService.
Allow overriding it and specifying another class to use.

Test Plan: Ran a job with custom JobProgressTrackerService and verify actions on it are called

Differential Revision: https://reviews.facebook.net/D58383


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/9b6d6f9f
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/9b6d6f9f
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/9b6d6f9f

Branch: refs/heads/trunk
Commit: 9b6d6f9f6f91c5d6a8a153635f7e5f3c52a1f4f2
Parents: 608d506
Author: Maja Kabiljo <majakabiljo@fb.com>
Authored: Wed May 18 09:27:06 2016 -0700
Committer: Maja Kabiljo <majakabiljo@fb.com>
Committed: Thu May 19 02:01:26 2016 -0700

----------------------------------------------------------------------
 .gitignore                                      |   3 +
 .../org/apache/giraph/conf/GiraphConstants.java |   9 +
 .../job/DefaultJobProgressTrackerService.java   | 217 +++++++++++++++++++
 .../java/org/apache/giraph/job/GiraphJob.java   |   2 +-
 .../giraph/job/JobProgressTrackerService.java   | 189 +---------------
 src/site/xdoc/options.xml                       |   6 +
 6 files changed, 244 insertions(+), 182 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/9b6d6f9f/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
index 42ecd00..3ae52e2 100644
--- a/.gitignore
+++ b/.gitignore
@@ -31,3 +31,6 @@ failed-profile.txt
 
 /for-each-profile-results.txt
 /giraph-hive/derby.log
+
+# File with all giraph conf options
+/src/site/xdoc/options.xml

http://git-wip-us.apache.org/repos/asf/giraph/blob/9b6d6f9f/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
index 15eca3c..b5bb9ed 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
@@ -60,9 +60,11 @@ import org.apache.giraph.io.filters.EdgeInputFilter;
 import org.apache.giraph.io.filters.VertexInputFilter;
 import org.apache.giraph.job.DefaultGiraphJobRetryChecker;
 import org.apache.giraph.job.DefaultJobObserver;
+import org.apache.giraph.job.DefaultJobProgressTrackerService;
 import org.apache.giraph.job.GiraphJobObserver;
 import org.apache.giraph.job.GiraphJobRetryChecker;
 import org.apache.giraph.job.HaltApplicationUtils;
+import org.apache.giraph.job.JobProgressTrackerService;
 import org.apache.giraph.mapping.MappingStore;
 import org.apache.giraph.mapping.MappingStoreOps;
 import org.apache.giraph.mapping.translate.TranslateEdge;
@@ -1170,6 +1172,13 @@ public interface GiraphConstants {
       new BooleanConfOption("giraph.trackJobProgressOnClient", false,
           "Whether to track job progress on client or not");
 
+  /** Class to use to track job progress on client */
+  ClassConfOption<JobProgressTrackerService> JOB_PROGRESS_TRACKER_CLASS =
+      ClassConfOption.create("giraph.jobProgressTrackerClass",
+          DefaultJobProgressTrackerService.class,
+          JobProgressTrackerService.class,
+          "Class to use to track job progress on client");
+
   /** Number of retries for creating the HDFS files */
   IntConfOption HDFS_FILE_CREATION_RETRIES =
       new IntConfOption("giraph.hdfs.file.creation.retries", 10,

http://git-wip-us.apache.org/repos/asf/giraph/blob/9b6d6f9f/giraph-core/src/main/java/org/apache/giraph/job/DefaultJobProgressTrackerService.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/job/DefaultJobProgressTrackerService.java
b/giraph-core/src/main/java/org/apache/giraph/job/DefaultJobProgressTrackerService.java
new file mode 100644
index 0000000..adca42b
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/job/DefaultJobProgressTrackerService.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.job;
+
+import org.apache.giraph.conf.GiraphConfiguration;
+import org.apache.giraph.conf.GiraphConstants;
+import org.apache.giraph.worker.WorkerProgress;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Default implementation of JobProgressTrackerService
+ */
+public class DefaultJobProgressTrackerService
+    implements JobProgressTrackerService {
+  /** Class logger */
+  private static final Logger LOG =
+      Logger.getLogger(JobProgressTrackerService.class);
+  /** How often to print job's progress */
+  private static final int UPDATE_MILLISECONDS = 10 * 1000;
+
+  /** Configuration */
+  private GiraphConfiguration conf;
+  /** Giraph job callback */
+  private GiraphJobObserver jobObserver;
+  /** Thread which periodically writes job's progress */
+  private Thread writerThread;
+  /** Whether application is finished */
+  private volatile boolean finished = false;
+  /** Number of mappers which the job got */
+  private int mappersStarted;
+  /** Last time number of mappers started was logged */
+  private long lastTimeMappersStartedLogged;
+  /** Map of worker progresses */
+  private final Map<Integer, WorkerProgress> workerProgresses =
+      new ConcurrentHashMap<>();
+  /** Job */
+  private Job job;
+
+  @Override
+  public void init(GiraphConfiguration conf, GiraphJobObserver jobObserver) {
+    this.conf = conf;
+    this.jobObserver = jobObserver;
+
+    if (LOG.isInfoEnabled()) {
+      LOG.info("Waiting for job to start... (this may take a minute)");
+    }
+    startWriterThread();
+  }
+
+  /**
+   * Start the thread which writes progress periodically
+   */
+  private void startWriterThread() {
+    writerThread = new Thread(new Runnable() {
+      @Override
+      public void run() {
+        while (!finished) {
+          if (mappersStarted == conf.getMaxWorkers() + 1 &&
+              !workerProgresses.isEmpty()) {
+            // Combine and log
+            CombinedWorkerProgress combinedWorkerProgress =
+                new CombinedWorkerProgress(workerProgresses.values(), conf);
+            if (LOG.isInfoEnabled()) {
+              LOG.info(combinedWorkerProgress.toString());
+            }
+            // Check if application is done
+            if (combinedWorkerProgress.isDone(conf.getMaxWorkers())) {
+              break;
+            }
+          }
+          try {
+            Thread.sleep(UPDATE_MILLISECONDS);
+          } catch (InterruptedException e) {
+            if (LOG.isInfoEnabled()) {
+              LOG.info("Progress thread interrupted");
+            }
+            break;
+          }
+        }
+      }
+    });
+    writerThread.setDaemon(true);
+    writerThread.start();
+  }
+
+  @Override
+  public void setJob(Job job) {
+    this.job = job;
+  }
+
+  /**
+   * Called when job got all mappers, used to check MAX_ALLOWED_JOB_TIME_MS
+   * and potentially start a thread which will kill the job after this time
+   */
+  private void jobGotAllMappers() {
+    jobObserver.jobGotAllMappers(job);
+    final long maxAllowedJobTimeMs =
+        GiraphConstants.MAX_ALLOWED_JOB_TIME_MS.get(conf);
+    if (maxAllowedJobTimeMs > 0) {
+      // Start a thread which will kill the job if running for too long
+      Thread killThread = new Thread(new Runnable() {
+        @Override
+        public void run() {
+          try {
+            Thread.sleep(maxAllowedJobTimeMs);
+            try {
+              LOG.warn("Killing job because it took longer than " +
+                  maxAllowedJobTimeMs + " milliseconds");
+              job.killJob();
+            } catch (IOException e) {
+              LOG.warn("Failed to kill job", e);
+            }
+          } catch (InterruptedException e) {
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("Thread checking for jobs max allowed time " +
+                  "interrupted");
+            }
+          }
+        }
+      });
+      killThread.setDaemon(true);
+      killThread.start();
+    }
+  }
+
+  @Override
+  public synchronized void mapperStarted() {
+    mappersStarted++;
+    if (LOG.isInfoEnabled()) {
+      if (mappersStarted == conf.getMaxWorkers() + 1) {
+        LOG.info("Got all " + mappersStarted + " mappers");
+        jobGotAllMappers();
+      } else {
+        if (System.currentTimeMillis() - lastTimeMappersStartedLogged >
+            UPDATE_MILLISECONDS) {
+          lastTimeMappersStartedLogged = System.currentTimeMillis();
+          LOG.info("Got " + mappersStarted + " but needs " +
+              (conf.getMaxWorkers() + 1) + " mappers");
+        }
+      }
+    }
+  }
+
+  @Override
+  public void logInfo(String logLine) {
+    if (LOG.isInfoEnabled()) {
+      LOG.info(logLine);
+    }
+  }
+
+  @Override
+  public void logError(String logLine) {
+    LOG.error(logLine);
+  }
+
+  @Override
+  public void logFailure(String reason) {
+    LOG.fatal(reason);
+    finished = true;
+    writerThread.interrupt();
+  }
+
+  @Override
+  public void updateProgress(WorkerProgress workerProgress) {
+    workerProgresses.put(workerProgress.getTaskId(), workerProgress);
+  }
+
+  @Override
+  public void stop(boolean succeeded) {
+    finished = true;
+    writerThread.interrupt();
+    if (LOG.isInfoEnabled()) {
+      LOG.info("Job " + (succeeded ? "finished successfully" : "failed") +
+          ", cleaning up...");
+    }
+  }
+
+  /**
+   * Create job progress server on job client if enabled in configuration.
+   *
+   * @param conf        Configuration
+   * @param jobObserver Giraph job callbacks
+   * @return JobProgressTrackerService
+   */
+  public static JobProgressTrackerService createJobProgressTrackerService(
+      GiraphConfiguration conf, GiraphJobObserver jobObserver) {
+    if (!conf.trackJobProgressOnClient()) {
+      return null;
+    }
+
+    JobProgressTrackerService jobProgressTrackerService =
+        GiraphConstants.JOB_PROGRESS_TRACKER_CLASS.newInstance(conf);
+    jobProgressTrackerService.init(conf, jobObserver);
+    return jobProgressTrackerService;
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/9b6d6f9f/giraph-core/src/main/java/org/apache/giraph/job/GiraphJob.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/job/GiraphJob.java b/giraph-core/src/main/java/org/apache/giraph/job/GiraphJob.java
index 90a73c6..b11f131 100644
--- a/giraph-core/src/main/java/org/apache/giraph/job/GiraphJob.java
+++ b/giraph-core/src/main/java/org/apache/giraph/job/GiraphJob.java
@@ -241,7 +241,7 @@ public class GiraphJob {
       GiraphJobObserver jobObserver = conf.getJobObserver();
 
       JobProgressTrackerService jobProgressTrackerService =
-          JobProgressTrackerService.createJobProgressTrackerService(
+          DefaultJobProgressTrackerService.createJobProgressTrackerService(
               conf, jobObserver);
       ClientThriftServer clientThriftServer = null;
       if (jobProgressTrackerService != null) {

http://git-wip-us.apache.org/repos/asf/giraph/blob/9b6d6f9f/giraph-core/src/main/java/org/apache/giraph/job/JobProgressTrackerService.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/job/JobProgressTrackerService.java
b/giraph-core/src/main/java/org/apache/giraph/job/JobProgressTrackerService.java
index c0189c0..e4b2b66 100644
--- a/giraph-core/src/main/java/org/apache/giraph/job/JobProgressTrackerService.java
+++ b/giraph-core/src/main/java/org/apache/giraph/job/JobProgressTrackerService.java
@@ -19,204 +19,31 @@
 package org.apache.giraph.job;
 
 import org.apache.giraph.conf.GiraphConfiguration;
-import org.apache.giraph.conf.GiraphConstants;
-import org.apache.giraph.worker.WorkerProgress;
 import org.apache.hadoop.mapreduce.Job;
-import org.apache.log4j.Logger;
-
-import java.io.IOException;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
 
 /**
  * Implementation of job progress tracker service on job client
  */
-public class JobProgressTrackerService implements JobProgressTracker {
-  /** Class logger */
-  private static final Logger LOG =
-      Logger.getLogger(JobProgressTrackerService.class);
-  /** How often to print job's progress */
-  private static final int UPDATE_MILLISECONDS = 10 * 1000;
-
-  /** Configuration */
-  private final GiraphConfiguration conf;
-  /** Giraph job callback */
-  private final GiraphJobObserver jobObserver;
-  /** Thread which periodically writes job's progress */
-  private Thread writerThread;
-  /** Whether application is finished */
-  private volatile boolean finished = false;
-  /** Number of mappers which the job got */
-  private int mappersStarted;
-  /** Last time number of mappers started was logged */
-  private long lastTimeMappersStartedLogged;
-  /** Map of worker progresses */
-  private final Map<Integer, WorkerProgress> workerProgresses =
-      new ConcurrentHashMap<>();
-  /** Job */
-  private Job job;
-
+public interface JobProgressTrackerService extends JobProgressTracker {
   /**
-   * Constructor
+   * Initialize the service
    *
    * @param conf Configuration
    * @param jobObserver Giraph job callbacks
    */
-  public JobProgressTrackerService(GiraphConfiguration conf,
-                                   GiraphJobObserver jobObserver) {
-    this.conf = conf;
-    this.jobObserver = jobObserver;
-
-    if (LOG.isInfoEnabled()) {
-      LOG.info("Waiting for job to start... (this may take a minute)");
-    }
-    startWriterThread();
-  }
+  void init(GiraphConfiguration conf, GiraphJobObserver jobObserver);
 
   /**
-   * Start the thread which writes progress periodically
+   * Set job
+   *
+   * @param job Job
    */
-  private void startWriterThread() {
-    writerThread = new Thread(new Runnable() {
-      @Override
-      public void run() {
-        while (!finished) {
-          if (mappersStarted == conf.getMaxWorkers() + 1 &&
-              !workerProgresses.isEmpty()) {
-            // Combine and log
-            CombinedWorkerProgress combinedWorkerProgress =
-                new CombinedWorkerProgress(workerProgresses.values(), conf);
-            if (LOG.isInfoEnabled()) {
-              LOG.info(combinedWorkerProgress.toString());
-            }
-            // Check if application is done
-            if (combinedWorkerProgress.isDone(conf.getMaxWorkers())) {
-              break;
-            }
-          }
-          try {
-            Thread.sleep(UPDATE_MILLISECONDS);
-          } catch (InterruptedException e) {
-            if (LOG.isInfoEnabled()) {
-              LOG.info("Progress thread interrupted");
-            }
-            break;
-          }
-        }
-      }
-    });
-    writerThread.setDaemon(true);
-    writerThread.start();
-  }
-
-  public void setJob(Job job) {
-    this.job = job;
-  }
-
-  /**
-   * Called when job got all mappers, used to check MAX_ALLOWED_JOB_TIME_MS
-   * and potentially start a thread which will kill the job after this time
-   */
-  private void jobGotAllMappers() {
-    jobObserver.jobGotAllMappers(job);
-    final long maxAllowedJobTimeMs =
-        GiraphConstants.MAX_ALLOWED_JOB_TIME_MS.get(conf);
-    if (maxAllowedJobTimeMs > 0) {
-      // Start a thread which will kill the job if running for too long
-      Thread killThread = new Thread(new Runnable() {
-        @Override
-        public void run() {
-          try {
-            Thread.sleep(maxAllowedJobTimeMs);
-            try {
-              LOG.warn("Killing job because it took longer than " +
-                  maxAllowedJobTimeMs + " milliseconds");
-              job.killJob();
-            } catch (IOException e) {
-              LOG.warn("Failed to kill job", e);
-            }
-          } catch (InterruptedException e) {
-            if (LOG.isDebugEnabled()) {
-              LOG.debug("Thread checking for jobs max allowed time " +
-                  "interrupted");
-            }
-          }
-        }
-      });
-      killThread.setDaemon(true);
-      killThread.start();
-    }
-  }
-
-  @Override
-  public synchronized void mapperStarted() {
-    mappersStarted++;
-    if (LOG.isInfoEnabled()) {
-      if (mappersStarted == conf.getMaxWorkers() + 1) {
-        LOG.info("Got all " + mappersStarted + " mappers");
-        jobGotAllMappers();
-      } else {
-        if (System.currentTimeMillis() - lastTimeMappersStartedLogged >
-            UPDATE_MILLISECONDS) {
-          lastTimeMappersStartedLogged = System.currentTimeMillis();
-          LOG.info("Got " + mappersStarted + " but needs " +
-              (conf.getMaxWorkers() + 1) + " mappers");
-        }
-      }
-    }
-  }
-
-  @Override
-  public void logInfo(String logLine) {
-    if (LOG.isInfoEnabled()) {
-      LOG.info(logLine);
-    }
-  }
-
-  @Override
-  public void logError(String logLine) {
-    LOG.error(logLine);
-  }
-
-  @Override
-  public void logFailure(String reason) {
-    LOG.fatal(reason);
-    finished = true;
-    writerThread.interrupt();
-  }
-
-  @Override
-  public void updateProgress(WorkerProgress workerProgress) {
-    workerProgresses.put(workerProgress.getTaskId(), workerProgress);
-  }
+  void setJob(Job job);
 
   /**
    * Stop the thread which logs application progress and server
    *
    * @param succeeded Whether job succeeded or not
    */
-  public void stop(boolean succeeded) {
-    finished = true;
-    writerThread.interrupt();
-    if (LOG.isInfoEnabled()) {
-      LOG.info("Job " + (succeeded ? "finished successfully" : "failed") +
-          ", cleaning up...");
-    }
-  }
-
-  /**
-   * Create job progress server on job client if enabled in configuration.
-   *
-   * @param conf Configuration
-   * @param jobObserver Giraph job callbacks
-   * @return JobProgressTrackerService
-   */
-  public static JobProgressTrackerService createJobProgressTrackerService(
-      GiraphConfiguration conf, GiraphJobObserver jobObserver) {
-    if (!conf.trackJobProgressOnClient()) {
-      return null;
-    }
-
-    return new JobProgressTrackerService(conf, jobObserver);
-  }
+  void stop(boolean succeeded);
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/9b6d6f9f/src/site/xdoc/options.xml
----------------------------------------------------------------------
diff --git a/src/site/xdoc/options.xml b/src/site/xdoc/options.xml
index 2735575..55638ae 100644
--- a/src/site/xdoc/options.xml
+++ b/src/site/xdoc/options.xml
@@ -376,6 +376,12 @@ under the License.
          <td>Observer class to watch over job status - optional</td>
        </tr>
        <tr>
+         <td>giraph.jobProgressTrackerClass</td>
+         <td>class</td>
+         <td>DefaultJobProgressTrackerService</td>
+         <td>Class to use to track job progress on client</td>
+       </tr>
+       <tr>
          <td>giraph.jobRetryCheckerClass</td>
          <td>class</td>
          <td>DefaultGiraphJobRetryChecker</td>


Mime
View raw message