crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jwi...@apache.org
Subject git commit: CRUNCH-172: Refine the synchronization mechanism in CrunchJobControl. Contributed by Chao Shi.
Date Mon, 11 Mar 2013 00:32:32 GMT
Updated Branches:
  refs/heads/master 913d76718 -> 65822ab7b


CRUNCH-172: Refine the synchronization mechanism in CrunchJobControl. Contributed by Chao
Shi.


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/65822ab7
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/65822ab7
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/65822ab7

Branch: refs/heads/master
Commit: 65822ab7bd34c5ce228d19e48dc1dd5610ae39b1
Parents: 913d767
Author: Josh Wills <jwills@apache.org>
Authored: Sun Mar 10 16:33:54 2013 -0700
Committer: Josh Wills <jwills@apache.org>
Committed: Sun Mar 10 17:29:34 2013 -0700

----------------------------------------------------------------------
 .../mapreduce/lib/jobcontrol/CrunchJobControl.java |  113 +++------------
 .../impl/mr/exec/CappedExponentialCounter.java     |   40 +++++
 .../org/apache/crunch/impl/mr/exec/MRExecutor.java |   48 ++++---
 .../impl/mr/exec/CappedExponentialCounterTest.java |   42 ++++++
 4 files changed, 127 insertions(+), 116 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/65822ab7/crunch/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchJobControl.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchJobControl.java
b/crunch/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchJobControl.java
index 80f701b..0342ad4 100644
--- a/crunch/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchJobControl.java
+++ b/crunch/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchJobControl.java
@@ -27,7 +27,6 @@ import java.util.Map;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.crunch.hadoop.mapreduce.lib.jobcontrol.CrunchControlledJob.State;
-import org.apache.hadoop.conf.Configuration;
 
 /**
  * This class encapsulates a set of MapReduce jobs and its dependency.
@@ -38,20 +37,8 @@ import org.apache.hadoop.conf.Configuration;
  * This class provides APIs for the client app to add a job to the group and to
  * get the jobs in the group in different states. When a job is added, an ID
  * unique to the group is assigned to the job.
- * 
- * This class has a thread that submits jobs when they become ready, monitors
- * the states of the running jobs, and updates the states of jobs based on the
- * state changes of their depending jobs states. The class provides APIs for
- * suspending/resuming the thread, and for stopping the thread.
  */
-public class CrunchJobControl implements Runnable {
-
-  // The thread can be in one of the following state
-  public static enum ThreadState {
-    RUNNING, SUSPENDED, STOPPED, STOPPING, READY
-  };
-
-  private ThreadState runnerState; // the thread state
+public class CrunchJobControl {
 
   private Map<String, CrunchControlledJob> waitingJobs;
   private Map<String, CrunchControlledJob> readyJobs;
@@ -63,7 +50,6 @@ public class CrunchJobControl implements Runnable {
 
   private long nextJobID;
   private String groupName;
-  private int jobPollInterval;
 
   /**
    * Construct a job control for a group of jobs.
@@ -79,8 +65,6 @@ public class CrunchJobControl implements Runnable {
     this.failedJobs = new Hashtable<String, CrunchControlledJob>();
     this.nextJobID = -1;
     this.groupName = groupName;
-    this.runnerState = ThreadState.READY;
-    this.jobPollInterval = isLocalMode() ? 500 : 5000;
   }
 
   private static List<CrunchControlledJob> toList(Map<String, CrunchControlledJob>
jobs) {
@@ -183,39 +167,6 @@ public class CrunchJobControl implements Runnable {
     }
   }
 
-  /**
-   * @return the thread state
-   */
-  public ThreadState getThreadState() {
-    return this.runnerState;
-  }
-
-  /**
-   * set the thread state to STOPPING so that the thread will stop when it wakes
-   * up.
-   */
-  public void stop() {
-    this.runnerState = ThreadState.STOPPING;
-  }
-
-  /**
-   * suspend the running thread
-   */
-  public void suspend() {
-    if (this.runnerState == ThreadState.RUNNING) {
-      this.runnerState = ThreadState.SUSPENDED;
-    }
-  }
-
-  /**
-   * resume the suspended thread
-   */
-  public void resume() {
-    if (this.runnerState == ThreadState.SUSPENDED) {
-      this.runnerState = ThreadState.RUNNING;
-    }
-  }
-
   synchronized private void checkRunningJobs() throws IOException,
       InterruptedException {
 
@@ -253,56 +204,30 @@ public class CrunchJobControl implements Runnable {
     }
   }
 
-  synchronized public boolean allFinished() {
-    return this.waitingJobs.size() == 0 && this.readyJobs.size() == 0
-        && this.runningJobs.size() == 0;
-  }
-
-  /**
-   * The main loop for the thread. The loop does the following: Check the states
-   * of the running jobs Update the states of waiting jobs Submit the jobs in
-   * ready state
-   */
-  public void run() {
-    this.runnerState = ThreadState.RUNNING;
-    while (true) {
-      while (this.runnerState == ThreadState.SUSPENDED) {
+  synchronized public void killAllRunningJobs() {
+    for (CrunchControlledJob job : runningJobs.values()) {
+      if (!job.isCompleted()) {
         try {
-          Thread.sleep(5000);
+          job.killJob();
         } catch (Exception e) {
-
+          log.error("Exception killing job: " + job.getJobName(), e);
         }
       }
-      try {
-        checkRunningJobs();
-        checkWaitingJobs();
-        startReadyJobs();
-      } catch (Exception e) {
-        log.error("Error in run loop", e);
-        this.runnerState = ThreadState.STOPPED;
-      }
-      if (this.runnerState != ThreadState.RUNNING
-          && this.runnerState != ThreadState.SUSPENDED) {
-        break;
-      }
-      try {
-        Thread.sleep(jobPollInterval);
-      } catch (Exception e) {
-
-      }
-      if (this.runnerState != ThreadState.RUNNING
-          && this.runnerState != ThreadState.SUSPENDED) {
-        break;
-      }
     }
-    this.runnerState = ThreadState.STOPPED;
   }
-  
-  private boolean isLocalMode() {
-    Configuration conf = new Configuration();
-    // Try to handle MapReduce version 0.20 or 0.22
-    String jobTrackerAddress = conf.get("mapreduce.jobtracker.address", conf.get("mapred.job.tracker",
"local"));
-    return "local".equals(jobTrackerAddress);
+
+  synchronized public boolean allFinished() {
+    return this.waitingJobs.size() == 0 && this.readyJobs.size() == 0
+        && this.runningJobs.size() == 0;
   }
 
+  /**
+   * Checks the states of the running jobs Update the states of waiting jobs, and submits
the jobs in
+   * ready state (i.e. whose dependencies are all finished in success).
+   */
+  public void pollJobStatusAndStartNewOnes() throws IOException, InterruptedException {
+    checkRunningJobs();
+    checkWaitingJobs();
+    startReadyJobs();
+  }
 }

http://git-wip-us.apache.org/repos/asf/crunch/blob/65822ab7/crunch/src/main/java/org/apache/crunch/impl/mr/exec/CappedExponentialCounter.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/exec/CappedExponentialCounter.java
b/crunch/src/main/java/org/apache/crunch/impl/mr/exec/CappedExponentialCounter.java
new file mode 100644
index 0000000..d90f2e8
--- /dev/null
+++ b/crunch/src/main/java/org/apache/crunch/impl/mr/exec/CappedExponentialCounter.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.mr.exec;
+
+/**
+ * Generate a series of capped numbers exponentially.
+ *
+ * It is used for creating retry intervals. It is NOT thread-safe.
+ */
+public class CappedExponentialCounter {
+
+  private long current;
+  private final long limit;
+
+  public CappedExponentialCounter(long start, long limit) {
+    this.current = start;
+    this.limit = limit;
+  }
+
+  public long get() {
+    long result = current;
+    current = Math.min(current * 2, limit);
+    return result;
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/65822ab7/crunch/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java b/crunch/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java
index 901f91a..a784b66 100644
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java
+++ b/crunch/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java
@@ -17,6 +17,7 @@
  */
 package org.apache.crunch.impl.mr.exec;
 
+import java.io.IOException;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -34,12 +35,18 @@ import org.apache.crunch.hadoop.mapreduce.lib.jobcontrol.CrunchControlledJob;
 import org.apache.crunch.hadoop.mapreduce.lib.jobcontrol.CrunchJobControl;
 import org.apache.crunch.impl.mr.collect.PCollectionImpl;
 import org.apache.crunch.materialize.MaterializableIterable;
+import org.apache.hadoop.conf.Configuration;
 
 import com.google.common.collect.Lists;
 
 /**
+ * Provides APIs for job control at runtime to clients.
  *
+ * This class has a thread that submits jobs when they become ready, monitors
+ * the states of the running jobs, and updates the states of jobs based on the
+ * state changes of their depending jobs states.
  *
+ * It is thread-safe.
  */
 public class MRExecutor implements PipelineExecution {
 
@@ -50,6 +57,7 @@ public class MRExecutor implements PipelineExecution {
   private final Map<PCollectionImpl<?>, MaterializableIterable> toMaterialize;
   private final CountDownLatch doneSignal = new CountDownLatch(1);
   private final CountDownLatch killSignal = new CountDownLatch(1);
+  private final CappedExponentialCounter pollInterval;
   private AtomicReference<Status> status = new AtomicReference<Status>(Status.READY);
   private PipelineResult result;
   private Thread monitorThread;
@@ -67,6 +75,9 @@ public class MRExecutor implements PipelineExecution {
         monitorLoop();
       }
     });
+    this.pollInterval = isLocalMode()
+      ? new CappedExponentialCounter(50, 1000)
+      : new CappedExponentialCounter(500, 10000);
   }
 
   public void addJob(CrunchJob job) {
@@ -85,13 +96,11 @@ public class MRExecutor implements PipelineExecution {
   /** Monitors running status. It is called in {@code MonitorThread}. */
   private void monitorLoop() {
     try {
-      Thread controlThread = new Thread(control);
-      controlThread.start();
       while (killSignal.getCount() > 0 && !control.allFinished()) {
-        killSignal.await(1, TimeUnit.SECONDS);
+        control.pollJobStatusAndStartNewOnes();
+        killSignal.await(pollInterval.get(), TimeUnit.MILLISECONDS);
       }
-      control.stop();
-      killAllRunningJobs();
+      control.killAllRunningJobs();
 
       List<CrunchControlledJob> failures = control.getFailedJobList();
       if (!failures.isEmpty()) {
@@ -102,11 +111,7 @@ public class MRExecutor implements PipelineExecution {
       }
       List<PipelineResult.StageResult> stages = Lists.newArrayList();
       for (CrunchControlledJob job : control.getSuccessfulJobList()) {
-        try {
-          stages.add(new PipelineResult.StageResult(job.getJobName(), job.getJob().getCounters()));
-        } catch (Exception e) {
-          LOG.error("Exception thrown fetching job counters for stage: " + job.getJobName(),
e);
-        }
+        stages.add(new PipelineResult.StageResult(job.getJobName(), job.getJob().getCounters()));
       }
 
       for (PCollectionImpl<?> c : outputTargets.keySet()) {
@@ -145,23 +150,14 @@ public class MRExecutor implements PipelineExecution {
       }
     } catch (InterruptedException e) {
       throw new AssertionError(e); // Nobody should interrupt us.
+    } catch (IOException e) {
+      LOG.error("Pipeline failed due to exception", e);
+      status.set(Status.FAILED);
     } finally {
       doneSignal.countDown();
     }
   }
 
-  private void killAllRunningJobs() {
-    for (CrunchControlledJob job : control.getRunningJobList()) {
-      if (!job.isCompleted()) {
-        try {
-          job.killJob();
-        } catch (Exception e) {
-          LOG.error("Exception killing job: " + job.getJobName(), e);
-        }
-      }
-    }
-  }
-
   @Override
   public String getPlanDotFile() {
     return planDotFile;
@@ -191,4 +187,12 @@ public class MRExecutor implements PipelineExecution {
   public void kill() throws InterruptedException {
     killSignal.countDown();
   }
+
+  private static boolean isLocalMode() {
+    Configuration conf = new Configuration();
+    // Try to handle MapReduce version 0.20 or 0.22
+    String jobTrackerAddress = conf.get("mapreduce.jobtracker.address",
+        conf.get("mapred.job.tracker", "local"));
+    return "local".equals(jobTrackerAddress);
+  }
 }

http://git-wip-us.apache.org/repos/asf/crunch/blob/65822ab7/crunch/src/test/java/org/apache/crunch/impl/mr/exec/CappedExponentialCounterTest.java
----------------------------------------------------------------------
diff --git a/crunch/src/test/java/org/apache/crunch/impl/mr/exec/CappedExponentialCounterTest.java
b/crunch/src/test/java/org/apache/crunch/impl/mr/exec/CappedExponentialCounterTest.java
new file mode 100644
index 0000000..958df12
--- /dev/null
+++ b/crunch/src/test/java/org/apache/crunch/impl/mr/exec/CappedExponentialCounterTest.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.mr.exec;
+
+import static org.junit.Assert.assertEquals;
+
+import org.junit.Test;
+
+public class CappedExponentialCounterTest {
+
+  @Test
+  public void testGet() {
+    CappedExponentialCounter c = new CappedExponentialCounter(1L, Long.MAX_VALUE);
+    assertEquals(1L, c.get());
+    assertEquals(2L, c.get());
+    assertEquals(4L, c.get());
+    assertEquals(8L, c.get());
+  }
+
+  @Test
+  public void testCap() {
+    CappedExponentialCounter c = new CappedExponentialCounter(1L, 2);
+    assertEquals(1L, c.get());
+    assertEquals(2L, c.get());
+    assertEquals(2L, c.get());
+  }
+}


Mime
View raw message