gobblin-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a...@apache.org
Subject [1/2] incubator-gobblin git commit: Allow GobblinHelixJobScheduler to disable the services started by it's super class.
Date Sat, 29 Jul 2017 02:27:24 GMT
Repository: incubator-gobblin
Updated Branches:
  refs/heads/master d9d7d5f0c -> 467fe8fc8


Allow GobblinHelixJobScheduler to disable the services started by it's super class.


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/dc3d1227
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/dc3d1227
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/dc3d1227

Branch: refs/heads/master
Commit: dc3d122769ed6c3afcb8e9af6210f594b5fc7585
Parents: 0975312
Author: Joel Baranick <joel.baranick@ensighten.com>
Authored: Mon May 1 09:58:37 2017 -0700
Committer: Joel Baranick <joel.baranick@ensighten.com>
Committed: Fri Jul 28 12:46:09 2017 -0700

----------------------------------------------------------------------
 .../cluster/GobblinHelixJobScheduler.java       |  4 ++
 .../java/gobblin/scheduler/JobScheduler.java    | 48 ++++++++++++--------
 2 files changed, 32 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/dc3d1227/gobblin-cluster/src/main/java/gobblin/cluster/GobblinHelixJobScheduler.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/main/java/gobblin/cluster/GobblinHelixJobScheduler.java b/gobblin-cluster/src/main/java/gobblin/cluster/GobblinHelixJobScheduler.java
index 05595bc..c598c72 100644
--- a/gobblin-cluster/src/main/java/gobblin/cluster/GobblinHelixJobScheduler.java
+++ b/gobblin-cluster/src/main/java/gobblin/cluster/GobblinHelixJobScheduler.java
@@ -103,6 +103,10 @@ public class GobblinHelixJobScheduler extends JobScheduler {
   }
 
   @Override
+  protected void startServices() throws Exception {
+  }
+
+  @Override
   public void runJob(Properties jobProps, JobListener jobListener) throws JobException {
     try {
       JobLauncher jobLauncher = buildGobblinHelixJobLauncher(jobProps);

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/dc3d1227/gobblin-runtime/src/main/java/gobblin/scheduler/JobScheduler.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/gobblin/scheduler/JobScheduler.java b/gobblin-runtime/src/main/java/gobblin/scheduler/JobScheduler.java
index 5eb1a6a..2736637 100644
--- a/gobblin-runtime/src/main/java/gobblin/scheduler/JobScheduler.java
+++ b/gobblin-runtime/src/main/java/gobblin/scheduler/JobScheduler.java
@@ -17,9 +17,8 @@
 
 package gobblin.scheduler;
 
+import java.io.Closeable;
 import java.io.IOException;
-import java.nio.file.FileSystems;
-import java.nio.file.Paths;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
@@ -50,7 +49,6 @@ import org.quartz.UnableToInterruptJobException;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.slf4j.MDC;
 
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
@@ -131,6 +129,8 @@ public class JobScheduler extends AbstractIdleService {
   // A period of time for scheduler to wait until jobs are finished
   private final boolean waitForJobCompletion;
 
+  private final Closer closer = Closer.create();
+
   public JobScheduler(Properties properties, SchedulerService scheduler)
       throws Exception {
     this.properties = properties;
@@ -155,14 +155,6 @@ public class JobScheduler extends AbstractIdleService {
         this.properties.getProperty(ConfigurationKeys.SCHEDULER_WAIT_FOR_JOB_COMPLETION_KEY,
             ConfigurationKeys.DEFAULT_SCHEDULER_WAIT_FOR_JOB_COMPLETION));
 
-    if (this.properties.containsKey(ConfigurationKeys.JOB_CONFIG_FILE_DIR_KEY) &&
-        !this.properties.containsKey(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY))
{
-      String path = FileSystems.getDefault()
-          .getPath(this.properties.getProperty(ConfigurationKeys.JOB_CONFIG_FILE_DIR_KEY))
-          .normalize().toAbsolutePath().toString();
-      this.properties.setProperty(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY, "file:///"
+ path);
-    }
-
     if (this.properties.containsKey(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY))
{
       this.jobConfigFileDirPath = new Path(this.properties.getProperty(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY));
       this.listener = new PathAlterationListenerAdaptorForMonitor(jobConfigFileDirPath, this);
@@ -185,28 +177,34 @@ public class JobScheduler extends AbstractIdleService {
     }
 
     // Note: This should not be mandatory, gobblin-cluster modes have their own job configuration
managers
-    if (this.properties.containsKey(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY))
{
-      startGeneralJobConfigFileMonitor();
-      scheduleGeneralConfiguredJobs();
+    if (this.properties.containsKey(ConfigurationKeys.JOB_CONFIG_FILE_DIR_KEY)
+            || this.properties.containsKey(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY))
{
+
+      if (this.properties.containsKey(ConfigurationKeys.JOB_CONFIG_FILE_DIR_KEY) &&
!this.properties.containsKey(
+              ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY)) {
+        this.properties.setProperty(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY,
+                "file://" + this.properties.getProperty(ConfigurationKeys.JOB_CONFIG_FILE_DIR_KEY));
+      }
+      startServices();
     }
   }
 
+  protected void startServices() throws Exception {
+    startGeneralJobConfigFileMonitor();
+    scheduleGeneralConfiguredJobs();
+  }
+
   @Override
   protected void shutDown()
       throws Exception {
     LOG.info("Stopping the job scheduler");
-
-    if (this.properties.containsKey(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY) ||
this.properties.containsKey(
-        ConfigurationKeys.JOB_CONFIG_FILE_DIR_KEY)) {
-      this.pathAlterationDetector.stop(1000);
-    }
+    closer.close();
 
     List<JobExecutionContext> currentExecutions = this.scheduler.getScheduler().getCurrentlyExecutingJobs();
     for (JobExecutionContext jobExecutionContext : currentExecutions) {
       this.scheduler.getScheduler().interrupt(jobExecutionContext.getFireInstanceId());
     }
 
-
     ExecutorsUtils.shutdownExecutorService(this.jobExecutor, Optional.of(LOG));
   }
 
@@ -454,6 +452,16 @@ public class JobScheduler extends AbstractIdleService {
       throws Exception {
     SchedulerUtils.addPathAlterationObserver(this.pathAlterationDetector, this.listener,
jobConfigFileDirPath);
     this.pathAlterationDetector.start();
+    this.closer.register(new Closeable() {
+      @Override
+      public void close() throws IOException {
+        try {
+          pathAlterationDetector.stop(1000);
+        } catch (InterruptedException e) {
+          throw new IOException(e);
+        }
+      }
+    });
   }
 
   /**


Mime
View raw message