gobblin-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a...@apache.org
Subject incubator-gobblin git commit: [GOBBLIN-324] Add the cluster working directory config
Date Wed, 29 Nov 2017 21:39:06 GMT
Repository: incubator-gobblin
Updated Branches:
  refs/heads/master 178acbb47 -> 5d343e318


[GOBBLIN-324] Add the cluster working directory config

Currently, the appWorkDir value is passed to the
GobblinClusterManager
constructor and the GobblinTaskRunner constructor.
It's used to determine where the state files will
be stored.
The default launch scripts call the main methods
which pass in a
hardcoded "null" value and the code will take a
default value like
file:/Users/username/standalone_cluster/1

It's useful to specify this value via a
configuration.

When the config is not specified, the behavior is
the same as before.

Also add some examples in the sample config file.

Testing:

Add new unit tests.
Manually run the cluster with and without this
config.

Closes #2174 from HappyRay/add-work-dir-config


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/5d343e31
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/5d343e31
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/5d343e31

Branch: refs/heads/master
Commit: 5d343e3188c99eb950ca4af0728769220e9caf5c
Parents: 178acbb
Author: Ray Yang <ruiguo@gmail.com>
Authored: Wed Nov 29 13:38:47 2017 -0800
Committer: Abhishek Tiwari <abhishektiwari.btech@gmail.com>
Committed: Wed Nov 29 13:38:47 2017 -0800

----------------------------------------------------------------------
 conf/standalone/application.conf                | 16 ++++-
 .../GobblinClusterConfigurationKeys.java        |  4 +-
 .../gobblin/cluster/GobblinClusterManager.java  |  2 +-
 .../gobblin/cluster/GobblinClusterUtils.java    | 15 ++++-
 .../gobblin/cluster/GobblinTaskRunner.java      |  2 +-
 .../cluster/GobblinClusterUtilsTest.java        | 62 ++++++++++++++++++++
 6 files changed, 92 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d343e31/conf/standalone/application.conf
----------------------------------------------------------------------
diff --git a/conf/standalone/application.conf b/conf/standalone/application.conf
index fa601dd..e9b8323 100644
--- a/conf/standalone/application.conf
+++ b/conf/standalone/application.conf
@@ -16,10 +16,22 @@
 #
 
 # Sample configuration properties for the Gobblin Standalone cluster
+gobblin.cluster.workDir=${gobblin.cluster.work.dir}/GobblinStandaloneCluster
+
+# default is the JobConfigurationManager
+# use this manager to accept jobs from Kafka. It requires some additional Kafka related parameters.
+#gobblin.cluster.job.configuration.manager=org.apache.gobblin.cluster.StreamingJobConfigurationManager
+#spec.kafka.topics=ruyang_test_kafka_gobblin
+#kafka.brokers="hostname:12913/kafka-queuing"
+#jobSpecMonitor.kafka.zookeeper.connect="hostname:12913/kafka-queuing"
 
 # Cluster configuration properties
-gobblin.cluster.helix.cluster.name=GobblinStandaloneCluster
-gobblin.cluster.job.conf.path=<path where Gobblin job configuration file are located>
+gobblin.cluster.helix.cluster.name=GobblinStandaloneClusterCli
+
+# used by the JobConfigurationManager
+gobblin.cluster.job.conf.path=${gobblin.cluster.work.dir}/jobs
+
+gobblin.cluster.jobconf.fullyQualifiedPath=${gobblin.cluster.work.dir}/jobs
 
 # File system URIs
 writer.fs.uri=${fs.uri}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d343e31/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
index ea75dc3..ab6f8b4 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
@@ -35,7 +35,7 @@ public class GobblinClusterConfigurationKeys {
   public static final String STANDALONE_CLUSTER_MODE = "standalone_cluster";
   public static final String STANDALONE_CLUSTER_MODE_KEY = GOBBLIN_CLUSTER_PREFIX + "standaloneMode";
   public static final boolean DEFAULT_STANDALONE_CLUSTER_MODE = false;
-
+  public static final String CLUSTRER_WORK_DIR = GOBBLIN_CLUSTER_PREFIX + "workDir";
 
   // Helix configuration properties.
   public static final String HELIX_CLUSTER_NAME_KEY = GOBBLIN_CLUSTER_PREFIX + "helix.cluster.name";
@@ -80,4 +80,4 @@ public class GobblinClusterConfigurationKeys {
   public static final String STOP_TIMEOUT_SECONDS = GOBBLIN_CLUSTER_PREFIX + "stopTimeoutSeconds";
   public static final long DEFAULT_STOP_TIMEOUT_SECONDS = 60;
 
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d343e31/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java
index 8ced294..7948a8a 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java
@@ -163,7 +163,7 @@ public class GobblinClusterManager implements ApplicationLauncher {
 
     this.fs = buildFileSystem(config);
     this.appWorkDir = appWorkDirOptional.isPresent() ? appWorkDirOptional.get()
-        : GobblinClusterUtils.getAppWorkDirPath(this.fs, clusterName, applicationId);
+        : GobblinClusterUtils.getAppWorkDirPathFromConfig(config, this.fs, clusterName, applicationId);
 
     initializeAppLauncherAndServices();
   }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d343e31/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterUtils.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterUtils.java
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterUtils.java
index a8a335a..3082720 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterUtils.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterUtils.java
@@ -17,14 +17,15 @@
 
 package org.apache.gobblin.cluster;
 
+import static org.apache.gobblin.cluster.GobblinClusterConfigurationKeys.CLUSTRER_WORK_DIR;
+
+import com.typesafe.config.Config;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
-
+import org.apache.gobblin.annotation.Alpha;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
-import org.apache.gobblin.annotation.Alpha;
-
 @Alpha
 public class GobblinClusterUtils {
 
@@ -51,6 +52,14 @@ public class GobblinClusterUtils {
     return new Path(fs.getHomeDirectory(), getAppWorkDirPath(applicationName, applicationId));
   }
 
+  public static Path getAppWorkDirPathFromConfig(Config config, FileSystem fs,
+      String applicationName, String applicationId) {
+    if (config.hasPath(CLUSTRER_WORK_DIR)) {
+      return new Path(config.getString(CLUSTRER_WORK_DIR));
+    }
+    return new Path(fs.getHomeDirectory(), getAppWorkDirPath(applicationName, applicationId));
+  }
+
   /**
    * Get the application working directory {@link String}.
    *

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d343e31/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
index f86874b..1de9bb1 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
@@ -160,7 +160,7 @@ public class GobblinTaskRunner {
     TaskStateTracker taskStateTracker = new GobblinHelixTaskStateTracker(properties, this.helixManager);
 
     Path appWorkDir = appWorkDirOptional.isPresent() ? appWorkDirOptional.get() :
-        GobblinClusterUtils.getAppWorkDirPath(this.fs, applicationName, applicationId);
+        GobblinClusterUtils.getAppWorkDirPathFromConfig(config, this.fs, applicationName,
applicationId);
 
     List<Service> services = Lists.newArrayList(taskExecutor, taskStateTracker,
         new JMXReportingService(ImmutableMap.of("task.executor" ,taskExecutor.getTaskExecutorQueueMetricSet())));

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d343e31/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinClusterUtilsTest.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinClusterUtilsTest.java
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinClusterUtilsTest.java
new file mode 100644
index 0000000..4d83658
--- /dev/null
+++ b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinClusterUtilsTest.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.cluster;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.testng.annotations.Test;
+
+public class GobblinClusterUtilsTest {
+
+  FileSystem fs = mock(FileSystem.class);
+
+  @Test
+  public void work_dir_should_get_value_from_config_when_specified() throws Exception {
+    Map<String, String> configMap = new HashMap<>();
+    configMap.put("gobblin.cluster.workDir", "/foo/bar");
+
+    Config config = ConfigFactory.parseMap(configMap);
+
+    Path workDirPath = GobblinClusterUtils
+        .getAppWorkDirPathFromConfig(config, fs, "appName", "appid");
+
+    assertEquals(new Path("/foo/bar"), workDirPath);
+
+  }
+
+  @Test
+  public void work_dir_should_get_default_calculated_value_when_not_specified() throws Exception
{
+    Map<String, String> configMap = new HashMap<>();
+    Config config = ConfigFactory.parseMap(configMap);
+
+    when(fs.getHomeDirectory()).thenReturn(new Path("/home/"));
+
+    Path workDirPath = GobblinClusterUtils
+        .getAppWorkDirPathFromConfig(config, fs, "appName", "appid");
+
+    assertEquals(new Path("/home/appName/appid"), workDirPath);
+  }
+}


Mime
View raw message