Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 24F8A2004F5 for ; Fri, 1 Sep 2017 20:20:51 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 2370C16D7F4; Fri, 1 Sep 2017 18:20:51 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 4A7BE16D7F3 for ; Fri, 1 Sep 2017 20:20:50 +0200 (CEST) Received: (qmail 6872 invoked by uid 500); 1 Sep 2017 18:20:49 -0000 Mailing-List: contact commits-help@gobblin.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@gobblin.incubator.apache.org Delivered-To: mailing list commits@gobblin.incubator.apache.org Received: (qmail 6863 invoked by uid 99); 1 Sep 2017 18:20:49 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 01 Sep 2017 18:20:49 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 384F1F3334; Fri, 1 Sep 2017 18:20:49 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: hutran@apache.org To: commits@gobblin.apache.org Message-Id: <625cf00b41104db99196d27da4621b77@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: incubator-gobblin git commit: [GOBBLIN-233] Add concurrent map to avoid multiple job submission from GobblinHelixJobScheduler Date: Fri, 1 Sep 2017 18:20:49 +0000 (UTC) archived-at: Fri, 01 Sep 2017 18:20:51 -0000 Repository: incubator-gobblin Updated Branches: refs/heads/master 3df82bff0 -> 037950769 [GOBBLIN-233] Add concurrent map to avoid multiple job submission from GobblinHelixJobScheduler Closes #2086 from yukuai518/workflow Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/03795076 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/03795076 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/03795076 Branch: refs/heads/master Commit: 0379507690734e1a58fdb28b512c1df23b7d4509 Parents: 3df82bf Author: Kuai Yu Authored: Fri Sep 1 11:20:44 2017 -0700 Committer: Hung Tran Committed: Fri Sep 1 11:20:44 2017 -0700 ---------------------------------------------------------------------- .../apache/gobblin/cluster/GobblinHelixJob.java | 5 +- .../cluster/GobblinHelixJobLauncher.java | 34 ++++++++- .../cluster/GobblinHelixJobScheduler.java | 10 +-- .../cluster/GobblinHelixJobLauncherTest.java | 76 +++++++++++++++++++- 4 files changed, 113 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/03795076/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJob.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJob.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJob.java index 265be3c..b8fd2f4 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJob.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJob.java @@ -19,6 +19,7 @@ package org.apache.gobblin.cluster; import java.util.List; import java.util.Properties; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Future; import lombok.extern.slf4j.Slf4j; @@ -55,7 +56,7 @@ public class GobblinHelixJob extends BaseGobblinJob implements InterruptableJob @Override public void executeImpl(JobExecutionContext context) throws JobExecutionException { JobDataMap dataMap = context.getJobDetail().getJobDataMap(); - + ConcurrentHashMap runningMap = (ConcurrentHashMap)dataMap.get(GobblinHelixJobScheduler.JOB_RUNNING_MAP); final JobScheduler jobScheduler = (JobScheduler) dataMap.get(JobScheduler.JOB_SCHEDULER_KEY); // the properties may get mutated during job execution and the scheduler reuses it for the next round of scheduling, // so clone it @@ -67,7 +68,7 @@ public class GobblinHelixJob extends BaseGobblinJob implements InterruptableJob List> eventMetadata = (List>) dataMap.get(GobblinHelixJobScheduler.METADATA_TAGS); try { - final JobLauncher jobLauncher = new GobblinHelixJobLauncher(jobProps, helixManager, appWorkDir, eventMetadata); + final JobLauncher jobLauncher = new GobblinHelixJobLauncher(jobProps, helixManager, appWorkDir, eventMetadata, runningMap); if (Boolean.valueOf(jobProps.getProperty(GobblinClusterConfigurationKeys.JOB_EXECUTE_IN_SCHEDULING_THREAD, Boolean.toString(GobblinClusterConfigurationKeys.JOB_EXECUTE_IN_SCHEDULING_THREAD_DEFAULT)))) { jobScheduler.runJob(jobProps, jobListener, jobLauncher); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/03795076/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java index 1e645d9..74d7169 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java @@ -23,7 +23,10 @@ import java.util.List; import java.util.Map; import java.util.Properties; import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; +import org.apache.gobblin.runtime.JobException; +import org.apache.gobblin.runtime.listeners.JobListener; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -58,7 +61,6 @@ import org.apache.gobblin.runtime.ExecutionModel; import org.apache.gobblin.runtime.JobLauncher; import org.apache.gobblin.runtime.JobState; import org.apache.gobblin.runtime.Task; -import org.apache.gobblin.runtime.TaskConfigurationKeys; import org.apache.gobblin.runtime.TaskState; import org.apache.gobblin.runtime.TaskStateCollectorService; import org.apache.gobblin.runtime.util.StateStores; @@ -70,6 +72,8 @@ import org.apache.gobblin.util.JobLauncherUtils; import org.apache.gobblin.util.ParallelRunner; import org.apache.gobblin.util.SerializationUtils; +import javax.annotation.Nullable; + /** * An implementation of {@link JobLauncher} that launches a Gobblin job using the Helix task framework. @@ -119,17 +123,18 @@ public class GobblinHelixJobLauncher extends AbstractJobLauncher { private volatile boolean jobSubmitted = false; private volatile boolean jobComplete = false; + private final ConcurrentHashMap runningMap; private final StateStores stateStores; private final Config jobConfig; public GobblinHelixJobLauncher(Properties jobProps, final HelixManager helixManager, Path appWorkDir, - List> metadataTags) + List> metadataTags, ConcurrentHashMap runningMap) throws Exception { super(jobProps, addAdditionalMetadataTags(jobProps, metadataTags)); this.helixManager = helixManager; this.helixTaskDriver = new TaskDriver(this.helixManager); - + this.runningMap = runningMap; this.appWorkDir = appWorkDir; this.inputWorkUnitDir = new Path(appWorkDir, GobblinClusterConfigurationKeys.INPUT_WORK_UNIT_DIR_NAME); this.outputTaskStateDir = new Path(this.appWorkDir, GobblinClusterConfigurationKeys.OUTPUT_TASK_STATE_DIR_NAME + @@ -287,6 +292,29 @@ public class GobblinHelixJobLauncher extends AbstractJobLauncher { } + public void launchJob(@Nullable JobListener jobListener) + throws JobException { + boolean isLaunched = false; + this.runningMap.putIfAbsent(this.jobContext.getJobName(), false); + try { + if (this.runningMap.replace(this.jobContext.getJobName(), false, true)) { + LOGGER.info ("Job {} will be executed, add into running map.", this.jobContext.getJobId()); + isLaunched = true; + super.launchJob(jobListener); + } else { + LOGGER.warn ("Job {} will not be executed because other jobs are still running.", this.jobContext.getJobId()); + } + } finally { + if (isLaunched) { + if (this.runningMap.replace(this.jobContext.getJobName(), true, false)) { + LOGGER.info ("Job {} is done, remove from running map.", this.jobContext.getJobId()); + } else { + throw new IllegalStateException("A launched job should have running state equal to true in the running map."); + } + } + } + } + /** * Add a single {@link WorkUnit} (flattened). */ http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/03795076/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java index 0a43380..d1ef74e 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java @@ -22,7 +22,7 @@ import java.net.URISyntaxException; import java.util.List; import java.util.Map; import java.util.Properties; - +import java.util.concurrent.ConcurrentHashMap; import org.apache.hadoop.fs.Path; import org.apache.helix.HelixManager; @@ -61,12 +61,14 @@ public class GobblinHelixJobScheduler extends JobScheduler { static final String HELIX_MANAGER_KEY = "helixManager"; static final String APPLICATION_WORK_DIR_KEY = "applicationWorkDir"; static final String METADATA_TAGS = "metadataTags"; + static final String JOB_RUNNING_MAP = "jobRunningMap"; private final Properties properties; private final HelixManager helixManager; private final EventBus eventBus; private final Path appWorkDir; private final List> metadataTags; + private final ConcurrentHashMap jobRunningMap; private final MutableJobCatalog jobCatalog; public GobblinHelixJobScheduler(Properties properties, HelixManager helixManager, EventBus eventBus, @@ -76,7 +78,7 @@ public class GobblinHelixJobScheduler extends JobScheduler { this.properties = properties; this.helixManager = helixManager; this.eventBus = eventBus; - + this.jobRunningMap = new ConcurrentHashMap<>(); this.appWorkDir = appWorkDir; this.metadataTags = metadataTags; this.jobCatalog = jobCatalog; @@ -94,7 +96,7 @@ public class GobblinHelixJobScheduler extends JobScheduler { additionalJobDataMap.put(HELIX_MANAGER_KEY, this.helixManager); additionalJobDataMap.put(APPLICATION_WORK_DIR_KEY, this.appWorkDir); additionalJobDataMap.put(METADATA_TAGS, this.metadataTags); - + additionalJobDataMap.put(JOB_RUNNING_MAP, this.jobRunningMap); try { scheduleJob(jobProps, jobListener, additionalJobDataMap, GobblinHelixJob.class); } catch (Exception e) { @@ -118,7 +120,7 @@ public class GobblinHelixJobScheduler extends JobScheduler { private GobblinHelixJobLauncher buildGobblinHelixJobLauncher(Properties jobProps) throws Exception { - return new GobblinHelixJobLauncher(jobProps, this.helixManager, this.appWorkDir, this.metadataTags); + return new GobblinHelixJobLauncher(jobProps, this.helixManager, this.appWorkDir, this.metadataTags, this.jobRunningMap); } @Subscribe http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/03795076/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobLauncherTest.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobLauncherTest.java b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobLauncherTest.java index b9dc5cd..3483c62 100644 --- a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobLauncherTest.java +++ b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobLauncherTest.java @@ -17,17 +17,22 @@ package org.apache.gobblin.cluster; -import org.apache.gobblin.metastore.DatasetStateStore; -import org.apache.gobblin.util.ClassAliasResolver; import java.io.Closeable; import java.io.File; import java.io.IOException; import java.net.URL; import java.util.List; import java.util.Properties; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.avro.Schema; import org.apache.curator.test.TestingServer; +import org.apache.gobblin.metastore.DatasetStateStore; +import org.apache.gobblin.runtime.JobContext; +import org.apache.gobblin.runtime.listeners.AbstractJobListener; +import org.apache.gobblin.util.ClassAliasResolver; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -56,6 +61,8 @@ import org.apache.gobblin.runtime.JobException; import org.apache.gobblin.runtime.JobState; import org.apache.gobblin.util.ConfigUtils; +import lombok.Getter; + /** * Unit tests for {@link GobblinHelixJobLauncher}. @@ -87,6 +94,10 @@ public class GobblinHelixJobLauncherTest { private GobblinHelixJobLauncher gobblinHelixJobLauncher; + private GobblinHelixJobLauncher gobblinHelixJobLauncher1; + + private GobblinHelixJobLauncher gobblinHelixJobLauncher2; + private GobblinTaskRunner gobblinTaskRunner; private DatasetStateStore datasetStateStore; @@ -150,8 +161,22 @@ public class GobblinHelixJobLauncherTest { TestHelper.createSourceJsonFile(sourceJsonFile); properties.setProperty(ConfigurationKeys.SOURCE_FILEBASED_FILES_TO_PULL, sourceJsonFile.getAbsolutePath()); + ConcurrentHashMap runningMap = new ConcurrentHashMap<>(); + + // Normal job launcher + properties.setProperty(ConfigurationKeys.JOB_ID_KEY, "job_" + this.jobName + "_1504201348470"); this.gobblinHelixJobLauncher = this.closer.register( - new GobblinHelixJobLauncher(properties, this.helixManager, this.appWorkDir, ImmutableList.>of())); + new GobblinHelixJobLauncher(properties, this.helixManager, this.appWorkDir, ImmutableList.>of(), runningMap)); + + // Job launcher(1) to test parallel job running + properties.setProperty(ConfigurationKeys.JOB_ID_KEY, "job_" + this.jobName + "_1504201348471"); + this.gobblinHelixJobLauncher1 = this.closer.register( + new GobblinHelixJobLauncher(properties, this.helixManager, this.appWorkDir, ImmutableList.>of(), runningMap)); + + // Job launcher(2) to test parallel job running + properties.setProperty(ConfigurationKeys.JOB_ID_KEY, "job_" + this.jobName + "_1504201348472"); + this.gobblinHelixJobLauncher2 = this.closer.register( + new GobblinHelixJobLauncher(properties, this.helixManager, this.appWorkDir, ImmutableList.>of(), runningMap)); this.gobblinTaskRunner = new GobblinTaskRunner(TestHelper.TEST_APPLICATION_NAME, TestHelper.TEST_HELIX_INSTANCE_NAME, @@ -196,6 +221,51 @@ public class GobblinHelixJobLauncherTest { Assert.assertEquals(datasetState.getTaskStates().get(0).getWorkingState(), WorkUnitState.WorkingState.COMMITTED); } + private static class SuspendJobListener extends AbstractJobListener { + @Getter + private AtomicInteger completes = new AtomicInteger(); + private CountDownLatch stg1; + private CountDownLatch stg2; + public SuspendJobListener (CountDownLatch stg1, CountDownLatch stg2) { + this.stg1 = stg1; + this.stg2 = stg2; + } + + @Override + public void onJobStart (JobContext jobContext) throws Exception { + stg1.countDown(); + stg2.await(); + } + + @Override + public void onJobCompletion(JobContext jobContext) throws Exception { + completes.addAndGet(1); + } + } + + public void testLaunchMultipleJobs() throws JobException, IOException, InterruptedException { + CountDownLatch stg1 = new CountDownLatch(1); + CountDownLatch stg2 = new CountDownLatch(1); + CountDownLatch stg3 = new CountDownLatch(1); + SuspendJobListener testListener = new SuspendJobListener(stg1, stg2); + (new Thread(() -> { + try { + GobblinHelixJobLauncherTest.this.gobblinHelixJobLauncher1.launchJob(testListener); + stg3.countDown(); + } catch (JobException e) { + } + })).start(); + + // Wait for the first job to start + stg1.await(); + // When first job is in the middle of running, launch the second job (which should do NOOP because previous job is still running) + this.gobblinHelixJobLauncher2.launchJob(testListener); + stg2.countDown(); + // Wait for the first job to finish + stg3.await(); + Assert.assertEquals(testListener.getCompletes().get() == 1, true); + } + @AfterClass public void tearDown() throws IOException { try {