gobblin-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hut...@apache.org
Subject incubator-gobblin git commit: [GOBBLIN-233] Add concurrent map to avoid multiple job submission from GobblinHelixJobScheduler
Date Fri, 01 Sep 2017 18:20:49 GMT
Repository: incubator-gobblin
Updated Branches:
  refs/heads/master 3df82bff0 -> 037950769


[GOBBLIN-233] Add concurrent map to avoid multiple job submission from GobblinHelixJobScheduler

Closes #2086 from yukuai518/workflow


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/03795076
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/03795076
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/03795076

Branch: refs/heads/master
Commit: 0379507690734e1a58fdb28b512c1df23b7d4509
Parents: 3df82bf
Author: Kuai Yu <kuyu@linkedin.com>
Authored: Fri Sep 1 11:20:44 2017 -0700
Committer: Hung Tran <hutran@linkedin.com>
Committed: Fri Sep 1 11:20:44 2017 -0700

----------------------------------------------------------------------
 .../apache/gobblin/cluster/GobblinHelixJob.java |  5 +-
 .../cluster/GobblinHelixJobLauncher.java        | 34 ++++++++-
 .../cluster/GobblinHelixJobScheduler.java       | 10 +--
 .../cluster/GobblinHelixJobLauncherTest.java    | 76 +++++++++++++++++++-
 4 files changed, 113 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/03795076/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJob.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJob.java
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJob.java
index 265be3c..b8fd2f4 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJob.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJob.java
@@ -19,6 +19,7 @@ package org.apache.gobblin.cluster;
 
 import java.util.List;
 import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Future;
 
 import lombok.extern.slf4j.Slf4j;
@@ -55,7 +56,7 @@ public class GobblinHelixJob extends BaseGobblinJob implements InterruptableJob
   @Override
   public void executeImpl(JobExecutionContext context) throws JobExecutionException {
     JobDataMap dataMap = context.getJobDetail().getJobDataMap();
-
+    ConcurrentHashMap runningMap = (ConcurrentHashMap)dataMap.get(GobblinHelixJobScheduler.JOB_RUNNING_MAP);
     final JobScheduler jobScheduler = (JobScheduler) dataMap.get(JobScheduler.JOB_SCHEDULER_KEY);
     // the properties may get mutated during job execution and the scheduler reuses it for
the next round of scheduling,
     // so clone it
@@ -67,7 +68,7 @@ public class GobblinHelixJob extends BaseGobblinJob implements InterruptableJob
     List<? extends Tag<?>> eventMetadata = (List<? extends Tag<?>>)
dataMap.get(GobblinHelixJobScheduler.METADATA_TAGS);
 
     try {
-      final JobLauncher jobLauncher = new GobblinHelixJobLauncher(jobProps, helixManager,
appWorkDir, eventMetadata);
+      final JobLauncher jobLauncher = new GobblinHelixJobLauncher(jobProps, helixManager,
appWorkDir, eventMetadata, runningMap);
       if (Boolean.valueOf(jobProps.getProperty(GobblinClusterConfigurationKeys.JOB_EXECUTE_IN_SCHEDULING_THREAD,
               Boolean.toString(GobblinClusterConfigurationKeys.JOB_EXECUTE_IN_SCHEDULING_THREAD_DEFAULT))))
{
         jobScheduler.runJob(jobProps, jobListener, jobLauncher);

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/03795076/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
index 1e645d9..74d7169 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
@@ -23,7 +23,10 @@ import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
 
+import org.apache.gobblin.runtime.JobException;
+import org.apache.gobblin.runtime.listeners.JobListener;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -58,7 +61,6 @@ import org.apache.gobblin.runtime.ExecutionModel;
 import org.apache.gobblin.runtime.JobLauncher;
 import org.apache.gobblin.runtime.JobState;
 import org.apache.gobblin.runtime.Task;
-import org.apache.gobblin.runtime.TaskConfigurationKeys;
 import org.apache.gobblin.runtime.TaskState;
 import org.apache.gobblin.runtime.TaskStateCollectorService;
 import org.apache.gobblin.runtime.util.StateStores;
@@ -70,6 +72,8 @@ import org.apache.gobblin.util.JobLauncherUtils;
 import org.apache.gobblin.util.ParallelRunner;
 import org.apache.gobblin.util.SerializationUtils;
 
+import javax.annotation.Nullable;
+
 
 /**
  * An implementation of {@link JobLauncher} that launches a Gobblin job using the Helix task
framework.
@@ -119,17 +123,18 @@ public class GobblinHelixJobLauncher extends AbstractJobLauncher {
 
   private volatile boolean jobSubmitted = false;
   private volatile boolean jobComplete = false;
+  private final ConcurrentHashMap<String, Boolean> runningMap;
   private final StateStores stateStores;
   private final Config jobConfig;
 
   public GobblinHelixJobLauncher(Properties jobProps, final HelixManager helixManager, Path
appWorkDir,
-      List<? extends Tag<?>> metadataTags)
+      List<? extends Tag<?>> metadataTags, ConcurrentHashMap<String, Boolean>
runningMap)
       throws Exception {
     super(jobProps, addAdditionalMetadataTags(jobProps, metadataTags));
 
     this.helixManager = helixManager;
     this.helixTaskDriver = new TaskDriver(this.helixManager);
-
+    this.runningMap = runningMap;
     this.appWorkDir = appWorkDir;
     this.inputWorkUnitDir = new Path(appWorkDir, GobblinClusterConfigurationKeys.INPUT_WORK_UNIT_DIR_NAME);
     this.outputTaskStateDir = new Path(this.appWorkDir, GobblinClusterConfigurationKeys.OUTPUT_TASK_STATE_DIR_NAME
+
@@ -287,6 +292,29 @@ public class GobblinHelixJobLauncher extends AbstractJobLauncher {
 
   }
 
+  public void launchJob(@Nullable JobListener jobListener)
+      throws JobException {
+    boolean isLaunched = false;
+    this.runningMap.putIfAbsent(this.jobContext.getJobName(), false);
+    try {
+      if (this.runningMap.replace(this.jobContext.getJobName(), false, true)) {
+        LOGGER.info ("Job {} will be executed, add into running map.", this.jobContext.getJobId());
+        isLaunched = true;
+        super.launchJob(jobListener);
+      } else {
+        LOGGER.warn ("Job {} will not be executed because other jobs are still running.",
this.jobContext.getJobId());
+      }
+    } finally {
+      if (isLaunched) {
+        if (this.runningMap.replace(this.jobContext.getJobName(), true, false)) {
+          LOGGER.info ("Job {} is done, remove from running map.", this.jobContext.getJobId());
+        } else {
+          throw new IllegalStateException("A launched job should have running state equal
to true in the running map.");
+        }
+      }
+    }
+  }
+
   /**
    * Add a single {@link WorkUnit} (flattened).
    */

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/03795076/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java
index 0a43380..d1ef74e 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java
@@ -22,7 +22,7 @@ import java.net.URISyntaxException;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
-
+import java.util.concurrent.ConcurrentHashMap;
 import org.apache.hadoop.fs.Path;
 import org.apache.helix.HelixManager;
 
@@ -61,12 +61,14 @@ public class GobblinHelixJobScheduler extends JobScheduler {
   static final String HELIX_MANAGER_KEY = "helixManager";
   static final String APPLICATION_WORK_DIR_KEY = "applicationWorkDir";
   static final String METADATA_TAGS = "metadataTags";
+  static final String JOB_RUNNING_MAP = "jobRunningMap";
 
   private final Properties properties;
   private final HelixManager helixManager;
   private final EventBus eventBus;
   private final Path appWorkDir;
   private final List<? extends Tag<?>> metadataTags;
+  private final ConcurrentHashMap<String, Boolean> jobRunningMap;
   private final MutableJobCatalog jobCatalog;
 
   public GobblinHelixJobScheduler(Properties properties, HelixManager helixManager, EventBus
eventBus,
@@ -76,7 +78,7 @@ public class GobblinHelixJobScheduler extends JobScheduler {
     this.properties = properties;
     this.helixManager = helixManager;
     this.eventBus = eventBus;
-
+    this.jobRunningMap = new ConcurrentHashMap<>();
     this.appWorkDir = appWorkDir;
     this.metadataTags = metadataTags;
     this.jobCatalog = jobCatalog;
@@ -94,7 +96,7 @@ public class GobblinHelixJobScheduler extends JobScheduler {
     additionalJobDataMap.put(HELIX_MANAGER_KEY, this.helixManager);
     additionalJobDataMap.put(APPLICATION_WORK_DIR_KEY, this.appWorkDir);
     additionalJobDataMap.put(METADATA_TAGS, this.metadataTags);
-
+    additionalJobDataMap.put(JOB_RUNNING_MAP, this.jobRunningMap);
     try {
       scheduleJob(jobProps, jobListener, additionalJobDataMap, GobblinHelixJob.class);
     } catch (Exception e) {
@@ -118,7 +120,7 @@ public class GobblinHelixJobScheduler extends JobScheduler {
 
   private GobblinHelixJobLauncher buildGobblinHelixJobLauncher(Properties jobProps)
       throws Exception {
-    return new GobblinHelixJobLauncher(jobProps, this.helixManager, this.appWorkDir, this.metadataTags);
+    return new GobblinHelixJobLauncher(jobProps, this.helixManager, this.appWorkDir, this.metadataTags,
this.jobRunningMap);
   }
 
   @Subscribe

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/03795076/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobLauncherTest.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobLauncherTest.java
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobLauncherTest.java
index b9dc5cd..3483c62 100644
--- a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobLauncherTest.java
+++ b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobLauncherTest.java
@@ -17,17 +17,22 @@
 
 package org.apache.gobblin.cluster;
 
-import org.apache.gobblin.metastore.DatasetStateStore;
-import org.apache.gobblin.util.ClassAliasResolver;
 import java.io.Closeable;
 import java.io.File;
 import java.io.IOException;
 import java.net.URL;
 import java.util.List;
 import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.avro.Schema;
 import org.apache.curator.test.TestingServer;
+import org.apache.gobblin.metastore.DatasetStateStore;
+import org.apache.gobblin.runtime.JobContext;
+import org.apache.gobblin.runtime.listeners.AbstractJobListener;
+import org.apache.gobblin.util.ClassAliasResolver;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -56,6 +61,8 @@ import org.apache.gobblin.runtime.JobException;
 import org.apache.gobblin.runtime.JobState;
 import org.apache.gobblin.util.ConfigUtils;
 
+import lombok.Getter;
+
 
 /**
  * Unit tests for {@link GobblinHelixJobLauncher}.
@@ -87,6 +94,10 @@ public class GobblinHelixJobLauncherTest {
 
   private GobblinHelixJobLauncher gobblinHelixJobLauncher;
 
+  private GobblinHelixJobLauncher gobblinHelixJobLauncher1;
+
+  private GobblinHelixJobLauncher gobblinHelixJobLauncher2;
+
   private GobblinTaskRunner gobblinTaskRunner;
 
   private DatasetStateStore datasetStateStore;
@@ -150,8 +161,22 @@ public class GobblinHelixJobLauncherTest {
     TestHelper.createSourceJsonFile(sourceJsonFile);
     properties.setProperty(ConfigurationKeys.SOURCE_FILEBASED_FILES_TO_PULL, sourceJsonFile.getAbsolutePath());
 
+    ConcurrentHashMap<String, Boolean> runningMap = new ConcurrentHashMap<>();
+
+    // Normal job launcher
+    properties.setProperty(ConfigurationKeys.JOB_ID_KEY, "job_" + this.jobName + "_1504201348470");
     this.gobblinHelixJobLauncher = this.closer.register(
-        new GobblinHelixJobLauncher(properties, this.helixManager, this.appWorkDir, ImmutableList.<Tag<?>>of()));
+        new GobblinHelixJobLauncher(properties, this.helixManager, this.appWorkDir, ImmutableList.<Tag<?>>of(),
runningMap));
+
+    // Job launcher(1) to test parallel job running
+    properties.setProperty(ConfigurationKeys.JOB_ID_KEY, "job_" + this.jobName + "_1504201348471");
+    this.gobblinHelixJobLauncher1 = this.closer.register(
+        new GobblinHelixJobLauncher(properties, this.helixManager, this.appWorkDir, ImmutableList.<Tag<?>>of(),
runningMap));
+
+    // Job launcher(2) to test parallel job running
+    properties.setProperty(ConfigurationKeys.JOB_ID_KEY, "job_" + this.jobName + "_1504201348472");
+    this.gobblinHelixJobLauncher2 = this.closer.register(
+        new GobblinHelixJobLauncher(properties, this.helixManager, this.appWorkDir, ImmutableList.<Tag<?>>of(),
runningMap));
 
     this.gobblinTaskRunner =
         new GobblinTaskRunner(TestHelper.TEST_APPLICATION_NAME, TestHelper.TEST_HELIX_INSTANCE_NAME,
@@ -196,6 +221,51 @@ public class GobblinHelixJobLauncherTest {
     Assert.assertEquals(datasetState.getTaskStates().get(0).getWorkingState(), WorkUnitState.WorkingState.COMMITTED);
   }
 
+  private static class SuspendJobListener extends AbstractJobListener {
+    @Getter
+    private AtomicInteger completes = new AtomicInteger();
+    private CountDownLatch stg1;
+    private CountDownLatch stg2;
+    public SuspendJobListener (CountDownLatch stg1, CountDownLatch stg2) {
+      this.stg1 = stg1;
+      this.stg2 = stg2;
+    }
+
+    @Override
+    public void onJobStart (JobContext jobContext) throws Exception {
+      stg1.countDown();
+      stg2.await();
+    }
+
+    @Override
+    public void onJobCompletion(JobContext jobContext) throws Exception {
+      completes.addAndGet(1);
+    }
+  }
+
+  public void testLaunchMultipleJobs() throws JobException, IOException, InterruptedException
{
+    CountDownLatch stg1 = new CountDownLatch(1);
+    CountDownLatch stg2 = new CountDownLatch(1);
+    CountDownLatch stg3 = new CountDownLatch(1);
+    SuspendJobListener testListener = new SuspendJobListener(stg1, stg2);
+    (new Thread(() -> {
+      try {
+        GobblinHelixJobLauncherTest.this.gobblinHelixJobLauncher1.launchJob(testListener);
+        stg3.countDown();
+      } catch (JobException e) {
+      }
+    })).start();
+
+    // Wait for the first job to start
+    stg1.await();
+    // When first job is in the middle of running, launch the second job (which should do
NOOP because previous job is still running)
+    this.gobblinHelixJobLauncher2.launchJob(testListener);
+    stg2.countDown();
+    // Wait for the first job to finish
+    stg3.await();
+    Assert.assertEquals(testListener.getCompletes().get() == 1, true);
+  }
+
   @AfterClass
   public void tearDown() throws IOException {
     try {


Mime
View raw message