gobblin-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hut...@apache.org
Subject incubator-gobblin git commit: [GOBBLIN-308] Change boot sequence of gobblin cluster to fix the hanging issue
Date Wed, 08 Nov 2017 00:35:11 GMT
Repository: incubator-gobblin
Updated Branches:
  refs/heads/master 6198120e3 -> 43d5ed520


[GOBBLIN-308] Change boot sequence of gobblin cluster to fix the hanging issue

Closes #2162 from yukuai518/cluster-stuck


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/43d5ed52
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/43d5ed52
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/43d5ed52

Branch: refs/heads/master
Commit: 43d5ed5204930f4b7c68d5171adf572b09ed1413
Parents: 6198120
Author: Kuai Yu <kuyu@linkedin.com>
Authored: Tue Nov 7 16:35:03 2017 -0800
Committer: Hung Tran <hutran@linkedin.com>
Committed: Tue Nov 7 16:35:03 2017 -0800

----------------------------------------------------------------------
 .../cluster/StreamingJobConfigurationManager.java     | 14 +++++++++-----
 .../gobblin/service/StreamingKafkaSpecConsumer.java   | 10 ++++++----
 2 files changed, 15 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/43d5ed52/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/StreamingJobConfigurationManager.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/StreamingJobConfigurationManager.java
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/StreamingJobConfigurationManager.java
index 7370dc6..849dd6a 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/StreamingJobConfigurationManager.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/StreamingJobConfigurationManager.java
@@ -95,11 +95,6 @@ public class StreamingJobConfigurationManager extends JobConfigurationManager
{
   protected void startUp() throws Exception {
     LOGGER.info("Starting the " + StreamingJobConfigurationManager.class.getSimpleName());
 
-    // if the instance consumer is a service then need to start it to consume job specs
-    if (this.specConsumer instanceof Service) {
-      ((Service) this.specConsumer).startAsync().awaitRunning();
-    }
-
     // submit command to fetch job specs
     this.fetchJobSpecExecutor.execute(new Runnable() {
       @Override
@@ -116,6 +111,15 @@ public class StreamingJobConfigurationManager extends JobConfigurationManager
{
         }
       }
     });
+
+    // if the instance consumer is a service then need to start it to consume job specs
+    // IMPORTANT: StreamingKafkaSpecConsumer needs to be launched after a fetching thread
is created.
+    //            This is because StreamingKafkaSpecConsumer will invoke addListener(new
JobSpecListener()) during startup,
+    //            which will push job specs into a blocking queue _jobSpecQueue. A fetching
thread will help to consume the
+    //            blocking queue to prevent a hanging issue.
+    if (this.specConsumer instanceof Service) {
+      ((Service) this.specConsumer).startAsync().awaitRunning();
+    }
   }
 
   private void fetchJobSpecs() throws ExecutionException, InterruptedException {

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/43d5ed52/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/StreamingKafkaSpecConsumer.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/StreamingKafkaSpecConsumer.java
b/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/StreamingKafkaSpecConsumer.java
index 7d7b702..23966e9 100644
--- a/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/StreamingKafkaSpecConsumer.java
+++ b/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/StreamingKafkaSpecConsumer.java
@@ -61,7 +61,7 @@ public class StreamingKafkaSpecConsumer extends AbstractIdleService implements
S
   private static final int DEFAULT_SPEC_STREAMING_BLOCKING_QUEUE_SIZE = 100;
   private final AvroJobSpecKafkaJobMonitor _jobMonitor;
   private final BlockingQueue<ImmutablePair<SpecExecutor.Verb, Spec>> _jobSpecQueue;
-
+  private final MutableJobCatalog _jobCatalog;
   public StreamingKafkaSpecConsumer(Config config, MutableJobCatalog jobCatalog, Optional<Logger>
log) {
     String topic = config.getString(SPEC_KAFKA_TOPICS_KEY);
     Config defaults = ConfigFactory.parseMap(ImmutableMap.of(AvroJobSpecKafkaJobMonitor.TOPIC_KEY,
topic,
@@ -74,11 +74,9 @@ public class StreamingKafkaSpecConsumer extends AbstractIdleService implements
S
       throw new RuntimeException("Could not create job monitor", e);
     }
 
+    _jobCatalog = jobCatalog;
     _jobSpecQueue = new LinkedBlockingQueue<>(ConfigUtils.getInt(config, "SPEC_STREAMING_BLOCKING_QUEUE_SIZE",
         DEFAULT_SPEC_STREAMING_BLOCKING_QUEUE_SIZE));
-
-    // listener will add job specs to a blocking queue to send to callers of changedSpecs()
-    jobCatalog.addListener(new JobSpecListener());
   }
 
   public StreamingKafkaSpecConsumer(Config config, MutableJobCatalog jobCatalog, Logger log)
{
@@ -116,6 +114,10 @@ public class StreamingKafkaSpecConsumer extends AbstractIdleService implements
S
 
   @Override
   protected void startUp() {
+    // listener will add job specs to a blocking queue to send to callers of changedSpecs()
+    // IMPORTANT: This addListener should be invoked after job catalog has been initialized.
This is guaranteed because
+    //            StreamingKafkaSpecConsumer is boot after jobCatalog in GobblinClusterManager::startAppLauncherAndServices()
+    _jobCatalog.addListener(new JobSpecListener());
     _jobMonitor.startAsync().awaitRunning();
   }
 


Mime
View raw message